1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.Random;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.Future;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.concurrent.atomic.AtomicLong;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.hbase.ClusterStatus;
39 import org.apache.hadoop.hbase.HBaseTestingUtility;
40 import org.apache.hadoop.hbase.HColumnDescriptor;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.HTableDescriptor;
43 import org.apache.hadoop.hbase.TableName;
44 import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
45 import org.apache.hadoop.hbase.regionserver.HRegionServer;
46 import org.apache.hadoop.hbase.regionserver.Region;
47 import org.apache.hadoop.hbase.testclassification.MediumTests;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
50 import org.junit.After;
51 import org.junit.AfterClass;
52 import org.junit.Before;
53 import org.junit.BeforeClass;
54 import org.junit.Test;
55 import org.junit.experimental.categories.Category;
56
57 @Category({MediumTests.class})
58 public class TestFastFail {
59 final Log LOG = LogFactory.getLog(getClass());
60 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
61 private static byte[] FAMILY = Bytes.toBytes("testFamily");
62 private static final Random random = new Random();
63 private static int SLAVES = 3;
64 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
65 private static final int SLEEPTIME = 5000;
66
67
68
69
70 @BeforeClass
71 public static void setUpBeforeClass() throws Exception {
72 TEST_UTIL.startMiniCluster(SLAVES);
73 }
74
75
76
77
78 @AfterClass
79 public static void tearDownAfterClass() throws Exception {
80 TEST_UTIL.shutdownMiniCluster();
81 }
82
83
84
85
86 @Before
87 public void setUp() throws Exception {
88 MyPreemptiveFastFailInterceptor.numBraveSouls.set(0);
89 }
90
91
92
93
94 @After
95 public void tearDown() throws Exception {
96
97 }
98
99 @Test
100 public void testFastFail() throws IOException, InterruptedException {
101 Admin admin = TEST_UTIL.getHBaseAdmin();
102
103 final String tableName = "testClientRelearningExperiment";
104 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(Bytes
105 .toBytes(tableName)));
106 desc.addFamily(new HColumnDescriptor(FAMILY));
107 admin.createTable(desc, Bytes.toBytes("aaaa"), Bytes.toBytes("zzzz"), 32);
108 final long numRows = 1000;
109
110 Configuration conf = TEST_UTIL.getConfiguration();
111 conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, SLEEPTIME * 100);
112 conf.setInt(HConstants.HBASE_CLIENT_PAUSE, SLEEPTIME / 10);
113 conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true);
114 conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, 0);
115 conf.setClass(HConstants.HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL,
116 MyPreemptiveFastFailInterceptor.class,
117 PreemptiveFastFailInterceptor.class);
118
119 final Connection connection = ConnectionFactory.createConnection(conf);
120
121
122
123
124 List<Put> puts = new ArrayList<>();
125 for (long i = 0; i < numRows; i++) {
126 byte[] rowKey = longToByteArrayKey(i);
127 Put put = new Put(rowKey);
128 byte[] value = rowKey;
129 put.add(FAMILY, QUALIFIER, value);
130 puts.add(put);
131 }
132 try (Table table = connection.getTable(TableName.valueOf(tableName))) {
133 table.put(puts);
134 LOG.info("Written all puts.");
135 }
136
137
138
139
140
141 int nThreads = 100;
142 ExecutorService service = Executors.newFixedThreadPool(nThreads);
143 final CountDownLatch continueOtherHalf = new CountDownLatch(1);
144 final CountDownLatch doneHalfway = new CountDownLatch(nThreads);
145
146 final AtomicInteger numSuccessfullThreads = new AtomicInteger(0);
147 final AtomicInteger numFailedThreads = new AtomicInteger(0);
148
149
150 final AtomicLong totalTimeTaken = new AtomicLong(0);
151 final AtomicInteger numBlockedWorkers = new AtomicInteger(0);
152 final AtomicInteger numPreemptiveFastFailExceptions = new AtomicInteger(0);
153
154 List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>();
155 for (int i = 0; i < nThreads; i++) {
156 futures.add(service.submit(new Callable<Boolean>() {
157
158
159
160
161
162 public Boolean call() throws Exception {
163 try (Table table = connection.getTable(TableName.valueOf(tableName))) {
164 Thread.sleep(Math.abs(random.nextInt()) % 250);
165 byte[] row = longToByteArrayKey(Math.abs(random.nextLong())
166 % numRows);
167 Get g = new Get(row);
168 g.addColumn(FAMILY, QUALIFIER);
169 try {
170 table.get(g);
171 } catch (Exception e) {
172 LOG.debug("Get failed : ", e);
173 doneHalfway.countDown();
174 return false;
175 }
176
177
178 doneHalfway.countDown();
179 continueOtherHalf.await();
180
181 long startTime = System.currentTimeMillis();
182 g = new Get(row);
183 g.addColumn(FAMILY, QUALIFIER);
184 try {
185 table.get(g);
186
187 numSuccessfullThreads.addAndGet(1);
188 } catch (Exception e) {
189 if (e instanceof PreemptiveFastFailException) {
190
191 numPreemptiveFastFailExceptions.addAndGet(1);
192 }
193
194 numFailedThreads.addAndGet(1);
195 return false;
196 } finally {
197 long enTime = System.currentTimeMillis();
198 totalTimeTaken.addAndGet(enTime - startTime);
199 if ((enTime - startTime) >= SLEEPTIME) {
200
201
202
203
204
205 numBlockedWorkers.addAndGet(1);
206 }
207 }
208 return true;
209 } catch (Exception e) {
210 LOG.error("Caught unknown exception", e);
211 doneHalfway.countDown();
212 return false;
213 }
214 }
215 }));
216 }
217
218 doneHalfway.await();
219
220 ClusterStatus status = TEST_UTIL.getHBaseCluster().getClusterStatus();
221
222
223 for (int i = 0; i < SLAVES; i++) {
224 HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(i);
225 List<Region> regions = server.getOnlineRegions(TableName.META_TABLE_NAME);
226 if (regions.size() > 0) continue;
227
228 server.getRpcServer().stop();
229 server.stop("Testing");
230 }
231
232
233 continueOtherHalf.countDown();
234
235 Thread.sleep(2 * SLEEPTIME);
236
237 TEST_UTIL.getHBaseCluster().restoreClusterStatus(status);
238
239 int numThreadsReturnedFalse = 0;
240 int numThreadsReturnedTrue = 0;
241 int numThreadsThrewExceptions = 0;
242 for (Future<Boolean> f : futures) {
243 try {
244 numThreadsReturnedTrue += f.get() ? 1 : 0;
245 numThreadsReturnedFalse += f.get() ? 0 : 1;
246 } catch (Exception e) {
247 numThreadsThrewExceptions++;
248 }
249 }
250 LOG.debug("numThreadsReturnedFalse:"
251 + numThreadsReturnedFalse
252 + " numThreadsReturnedTrue:"
253 + numThreadsReturnedTrue
254 + " numThreadsThrewExceptions:"
255 + numThreadsThrewExceptions
256 + " numFailedThreads:"
257 + numFailedThreads.get()
258 + " numSuccessfullThreads:"
259 + numSuccessfullThreads.get()
260 + " numBlockedWorkers:"
261 + numBlockedWorkers.get()
262 + " totalTimeWaited: "
263 + totalTimeTaken.get()
264 / (numBlockedWorkers.get() == 0 ? Long.MAX_VALUE : numBlockedWorkers
265 .get()) + " numPFFEs: " + numPreemptiveFastFailExceptions.get());
266
267 assertEquals("The expected number of all the successfull and the failed "
268 + "threads should equal the total number of threads that we spawned",
269 nThreads, numFailedThreads.get() + numSuccessfullThreads.get());
270 assertEquals(
271 "All the failures should be coming from the secondput failure",
272 numFailedThreads.get(), numThreadsReturnedFalse);
273 assertEquals("Number of threads that threw execution exceptions "
274 + "otherwise should be 0", numThreadsThrewExceptions, 0);
275 assertEquals("The regionservers that returned true should equal to the"
276 + " number of successful threads", numThreadsReturnedTrue,
277 numSuccessfullThreads.get());
278 assertTrue(
279 "There should be atleast one thread that retried instead of failing",
280 MyPreemptiveFastFailInterceptor.numBraveSouls.get() > 0);
281 assertTrue(
282 "There should be atleast one PreemptiveFastFail exception,"
283 + " otherwise, the test makes little sense."
284 + "numPreemptiveFastFailExceptions: "
285 + numPreemptiveFastFailExceptions.get(),
286 numPreemptiveFastFailExceptions.get() > 0);
287 assertTrue(
288 "Only few thread should ideally be waiting for the dead "
289 + "regionserver to be coming back. numBlockedWorkers:"
290 + numBlockedWorkers.get() + " threads that retried : "
291 + MyPreemptiveFastFailInterceptor.numBraveSouls.get(),
292 numBlockedWorkers.get() <= MyPreemptiveFastFailInterceptor.numBraveSouls
293 .get());
294 }
295
296 public static class MyPreemptiveFastFailInterceptor extends
297 PreemptiveFastFailInterceptor {
298 public static AtomicInteger numBraveSouls = new AtomicInteger();
299
300 @Override
301 protected boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) {
302 boolean ret = super.shouldRetryInspiteOfFastFail(fInfo);
303 if (ret)
304 numBraveSouls.addAndGet(1);
305 return ret;
306 }
307
308 public MyPreemptiveFastFailInterceptor(Configuration conf) {
309 super(conf);
310 }
311 }
312
313 private byte[] longToByteArrayKey(long rowKey) {
314 return LoadTestKVGenerator.md5PrefixedKey(rowKey).getBytes();
315 }
316 }