1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import java.util.ArrayList;
21 import java.util.List;
22 import java.util.concurrent.CountDownLatch;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicLong;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.HBaseTestingUtility;
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.ServerName;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
34 import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy;
35 import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
36 import org.apache.hadoop.hbase.client.coprocessor.Batch;
37 import org.apache.hadoop.hbase.regionserver.Region;
38 import org.apache.hadoop.hbase.testclassification.MediumTests;
39 import org.apache.hadoop.hbase.util.Bytes;
40 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
41 import org.junit.AfterClass;
42 import org.junit.BeforeClass;
43 import org.junit.Test;
44 import org.junit.experimental.categories.Category;
45
46 import static org.junit.Assert.assertEquals;
47 import static org.junit.Assert.assertNotEquals;
48 import static org.junit.Assert.assertNotNull;
49 import static org.junit.Assert.assertTrue;
50
51
52
53
54 @Category(MediumTests.class)
55 public class TestClientPushback {
56
57 private static final Log LOG = LogFactory.getLog(TestClientPushback.class);
58 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
59
60 private static final byte[] tableName = Bytes.toBytes("client-pushback");
61 private static final byte[] family = Bytes.toBytes("f");
62 private static final byte[] qualifier = Bytes.toBytes("q");
63 private static long flushSizeBytes = 1024;
64
65 @BeforeClass
66 public static void setupCluster() throws Exception{
67 Configuration conf = UTIL.getConfiguration();
68
69 conf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true);
70
71 conf.setClass(ClientBackoffPolicy.BACKOFF_POLICY_CLASS, ExponentialClientBackoffPolicy.class,
72 ClientBackoffPolicy.class);
73
74
75 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeBytes);
76
77 conf.setLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
78
79 UTIL.startMiniCluster();
80 UTIL.createTable(tableName, family);
81 }
82
83 @AfterClass
84 public static void teardownCluster() throws Exception{
85 UTIL.shutdownMiniCluster();
86 }
87
88 @Test(timeout=60000)
89 public void testClientTracksServerPushback() throws Exception{
90 Configuration conf = UTIL.getConfiguration();
91 TableName tablename = TableName.valueOf(tableName);
92 Connection conn = ConnectionFactory.createConnection(conf);
93 HTable table = (HTable) conn.getTable(tablename);
94
95 table.setAutoFlushTo(true);
96
97
98 Put p = new Put(Bytes.toBytes("row"));
99 p.add(family, qualifier, Bytes.toBytes("value1"));
100 table.put(p);
101
102
103 ClusterConnection connection = table.connection;
104 ClientBackoffPolicy backoffPolicy = connection.getBackoffPolicy();
105 assertTrue("Backoff policy is not correctly configured",
106 backoffPolicy instanceof ExponentialClientBackoffPolicy);
107
108 ServerStatisticTracker stats = connection.getStatisticsTracker();
109 assertNotNull( "No stats configured for the client!", stats);
110 Region region = UTIL.getHBaseCluster().getRegionServer(0).getOnlineRegions(tablename).get(0);
111
112 ServerName server = UTIL.getHBaseCluster().getRegionServer(0).getServerName();
113 byte[] regionName = region.getRegionInfo().getRegionName();
114
115
116 ServerStatistics serverStats = stats.getServerStatsForTesting(server);
117 ServerStatistics.RegionStatistics regionStats = serverStats.getStatsForRegion(regionName);
118 int load = regionStats.getMemstoreLoadPercent();
119 if (load < 11) {
120 assertEquals("Load on memstore too low", 11, load);
121 }
122
123
124 long backoffTime = backoffPolicy.getBackoffTime(server, regionName, serverStats);
125 assertNotEquals("Reported load does not produce a backoff", backoffTime, 0);
126 LOG.debug("Backoff calculated for " + region.getRegionInfo().getRegionNameAsString() + " @ " +
127 server + " is " + backoffTime);
128
129
130
131 List<Row> ops = new ArrayList<Row>(1);
132 ops.add(p);
133 final CountDownLatch latch = new CountDownLatch(1);
134 final AtomicLong endTime = new AtomicLong();
135 long startTime = EnvironmentEdgeManager.currentTime();
136 table.mutator.ap.submit(tablename, ops, true, new Batch.Callback<Result>() {
137 @Override
138 public void update(byte[] region, byte[] row, Result result) {
139 endTime.set(EnvironmentEdgeManager.currentTime());
140 latch.countDown();
141 }
142 }, true);
143
144
145
146
147 latch.await(backoffTime * 2, TimeUnit.MILLISECONDS);
148 assertNotEquals("AsyncProcess did not submit the work time", endTime.get(), 0);
149 assertTrue("AsyncProcess did not delay long enough", endTime.get() - startTime >= backoffTime);
150 }
151 }