1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import static org.junit.Assert.*;
21
22 import java.io.EOFException;
23 import java.io.IOException;
24 import java.io.SyncFailedException;
25 import java.net.ConnectException;
26 import java.net.SocketTimeoutException;
27 import java.nio.channels.ClosedChannelException;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.TimeoutException;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.concurrent.atomic.AtomicInteger;
37
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.hbase.DoNotRetryIOException;
42 import org.apache.hadoop.hbase.HBaseConfiguration;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.HRegionInfo;
45 import org.apache.hadoop.hbase.HRegionLocation;
46 import org.apache.hadoop.hbase.ServerName;
47 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
48 import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
49 import org.apache.hadoop.hbase.testclassification.SmallTests;
50 import org.apache.hadoop.ipc.RemoteException;
51 import org.junit.Assume;
52 import org.junit.Test;
53 import org.junit.experimental.categories.Category;
54
55 @Category({ SmallTests.class })
56 public class TestFastFailWithoutTestUtil {
57 private static final Log LOG = LogFactory.getLog(TestFastFailWithoutTestUtil.class);
58
59
60 private static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
61
62 @Test
63 public void testInterceptorFactoryMethods() {
64 Configuration conf = HBaseConfiguration.create();
65 conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true);
66 RetryingCallerInterceptorFactory interceptorFactory = new RetryingCallerInterceptorFactory(
67 conf);
68
69 RetryingCallerInterceptor interceptorBeforeCast = interceptorFactory
70 .build();
71 assertTrue("We should be getting a PreemptiveFastFailInterceptor",
72 interceptorBeforeCast instanceof PreemptiveFastFailInterceptor);
73 PreemptiveFastFailInterceptor interceptor = (PreemptiveFastFailInterceptor) interceptorBeforeCast;
74
75 RetryingCallerInterceptorContext contextBeforeCast = interceptor
76 .createEmptyContext();
77 assertTrue(
78 "We should be getting a FastFailInterceptorContext since we are interacting with the"
79 + " PreemptiveFastFailInterceptor",
80 contextBeforeCast instanceof FastFailInterceptorContext);
81
82 FastFailInterceptorContext context = (FastFailInterceptorContext) contextBeforeCast;
83 assertTrue(context != null);
84
85 conf = HBaseConfiguration.create();
86 interceptorFactory = new RetryingCallerInterceptorFactory(conf);
87
88 interceptorBeforeCast = interceptorFactory.build();
89 assertTrue(
90 "We should be getting a NoOpRetryableCallerInterceptor since we disabled PFFE",
91 interceptorBeforeCast instanceof NoOpRetryableCallerInterceptor);
92
93 contextBeforeCast = interceptorBeforeCast.createEmptyContext();
94 assertTrue(
95 "We should be getting a NoOpRetryingInterceptorContext from NoOpRetryableCallerInterceptor",
96 contextBeforeCast instanceof NoOpRetryingInterceptorContext);
97
98 assertTrue(context != null);
99 }
100
101 @Test
102 public void testInterceptorContextClear() {
103 PreemptiveFastFailInterceptor interceptor = createPreemptiveInterceptor();
104 FastFailInterceptorContext context = (FastFailInterceptorContext) interceptor
105 .createEmptyContext();
106 context.clear();
107 assertFalse(context.getCouldNotCommunicateWithServer().booleanValue());
108 assertEquals(context.didTry(), false);
109 assertEquals(context.getFailureInfo(), null);
110 assertEquals(context.getServer(), null);
111 assertEquals(context.getTries(), 0);
112 }
113
114 @Test
115 public void testInterceptorContextPrepare() throws IOException {
116 PreemptiveFastFailInterceptor interceptor = TestFastFailWithoutTestUtil
117 .createPreemptiveInterceptor();
118 FastFailInterceptorContext context = (FastFailInterceptorContext) interceptor
119 .createEmptyContext();
120 RetryingCallable<?> callable = new RegionServerCallable<Boolean>(null,
121 null, null) {
122 @Override
123 public Boolean call(int callTimeout) throws Exception {
124 return true;
125 }
126
127 @Override
128 protected HRegionLocation getLocation() {
129 return new HRegionLocation(null, ServerName.valueOf("localhost", 1234,
130 987654321));
131 }
132 };
133 context.prepare(callable);
134 ServerName server = getSomeServerName();
135 assertEquals(context.getServer(), server);
136 context.clear();
137 context.prepare(callable, 2);
138 assertEquals(context.getServer(), server);
139 }
140
141 @Test
142 public void testInterceptorIntercept50Times() throws IOException,
143 InterruptedException {
144 for (int i = 0; i < 50; i++) {
145 testInterceptorIntercept();
146 }
147 }
148
149 public void testInterceptorIntercept() throws IOException,
150 InterruptedException {
151 Configuration conf = HBaseConfiguration.create();
152 long CLEANUP_TIMEOUT = 50;
153 long FAST_FAIL_THRESHOLD = 10;
154 conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true);
155 conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS,
156 CLEANUP_TIMEOUT);
157 conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS,
158 FAST_FAIL_THRESHOLD);
159
160 PreemptiveFastFailInterceptor interceptor = TestFastFailWithoutTestUtil
161 .createPreemptiveInterceptor(conf);
162 FastFailInterceptorContext context = (FastFailInterceptorContext) interceptor
163 .createEmptyContext();
164
165 RetryingCallable<?> callable = getDummyRetryingCallable(getSomeServerName());
166
167
168 int tries = 0;
169 context.prepare(callable, tries);
170 interceptor.intercept(context);
171 interceptor.handleFailure(context, new ConnectException(
172 "Failed to connect to server"));
173 interceptor.updateFailureInfo(context);
174 assertTrue("Interceptor should have updated didTry to true",
175 context.didTry());
176 assertTrue(
177 "The call shouldn't have been successful if there was a ConnectException",
178 context.getCouldNotCommunicateWithServer().booleanValue());
179 assertNull(
180 "Once a failure is identified, the first time the FailureInfo is generated for the server,"
181 + " but it is not assigned to the context yet. It would be assigned on the next"
182 + " intercept.", context.getFailureInfo());
183 assertEquals(context.getTries(), tries);
184 assertFalse(
185 "We are still in the first attempt and so we dont set this variable to true yet.",
186 context.isRetryDespiteFastFailMode());
187
188 Thread.sleep(FAST_FAIL_THRESHOLD + 1);
189
190
191
192 tries++;
193
194 context.prepare(callable, tries);
195 interceptor.intercept(context);
196 interceptor.handleFailure(context, new ConnectException(
197 "Failed to connect to server"));
198 interceptor.updateFailureInfo(context);
199 assertTrue("didTru should remain true", context.didTry());
200 assertTrue(
201 "The call shouldn't have been successful if there was a ConnectException",
202 context.getCouldNotCommunicateWithServer().booleanValue());
203 assertNotNull(
204 "The context this time is updated with a failureInfo, since we already gave it a try.",
205 context.getFailureInfo());
206 assertEquals(context.getTries(), tries);
207 assertTrue(
208 "Since we are alone here we would be given the permission to retryDespiteFailures.",
209 context.isRetryDespiteFastFailMode());
210 context.clear();
211
212 Thread.sleep(CLEANUP_TIMEOUT);
213
214
215 tries++;
216
217 context.clear();
218 context.prepare(callable, tries);
219 interceptor.occasionallyCleanupFailureInformation();
220 assertNull("The cleanup should have cleared the server",
221 interceptor.repeatedFailuresMap.get(context.getServer()));
222 interceptor.intercept(context);
223 interceptor.handleFailure(context, new ConnectException(
224 "Failed to connect to server"));
225 interceptor.updateFailureInfo(context);
226 assertTrue("didTru should remain true", context.didTry());
227 assertTrue(
228 "The call shouldn't have been successful if there was a ConnectException",
229 context.getCouldNotCommunicateWithServer().booleanValue());
230 assertNull("The failureInfo is cleared off from the maps.",
231 context.getFailureInfo());
232 assertEquals(context.getTries(), tries);
233 assertFalse(
234 "Since we are alone here we would be given the permission to retryDespiteFailures.",
235 context.isRetryDespiteFastFailMode());
236 context.clear();
237
238 }
239
240 private <T> RetryingCallable<T> getDummyRetryingCallable(
241 ServerName someServerName) {
242 return new RegionServerCallable<T>(null, null, null) {
243 @Override
244 public T call(int callTimeout) throws Exception {
245 return null;
246 }
247
248 @Override
249 protected HRegionLocation getLocation() {
250 return new HRegionLocation(null, serverName);
251 }
252 };
253 }
254
255 @Test
256 public void testExceptionsIdentifiedByInterceptor() throws IOException {
257 Throwable[] networkexceptions = new Throwable[] {
258 new ConnectException("Mary is unwell"),
259 new SocketTimeoutException("Mike is too late"),
260 new ClosedChannelException(),
261 new SyncFailedException("Dave is not on the same page"),
262 new TimeoutException("Mike is late again"),
263 new EOFException("This is the end... "),
264 new ConnectionClosingException("Its closing") };
265 final String INDUCED = "Induced";
266 Throwable[] nonNetworkExceptions = new Throwable[] {
267 new IOException("Bob died"),
268 new RemoteException("Bob's cousin died", null),
269 new NoSuchMethodError(INDUCED), new NullPointerException(INDUCED),
270 new DoNotRetryIOException(INDUCED), new Error(INDUCED) };
271
272 Configuration conf = HBaseConfiguration.create();
273 long CLEANUP_TIMEOUT = 0;
274 long FAST_FAIL_THRESHOLD = 1000000;
275 conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true);
276 conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS,
277 CLEANUP_TIMEOUT);
278 conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS,
279 FAST_FAIL_THRESHOLD);
280 for (Throwable e : networkexceptions) {
281 PreemptiveFastFailInterceptor interceptor = TestFastFailWithoutTestUtil
282 .createPreemptiveInterceptor(conf);
283 FastFailInterceptorContext context = (FastFailInterceptorContext) interceptor
284 .createEmptyContext();
285
286 RetryingCallable<?> callable = getDummyRetryingCallable(getSomeServerName());
287 context.prepare(callable, 0);
288 interceptor.intercept(context);
289 interceptor.handleFailure(context, e);
290 interceptor.updateFailureInfo(context);
291 assertTrue(
292 "The call shouldn't have been successful if there was a ConnectException",
293 context.getCouldNotCommunicateWithServer().booleanValue());
294 }
295 for (Throwable e : nonNetworkExceptions) {
296 try {
297 PreemptiveFastFailInterceptor interceptor = TestFastFailWithoutTestUtil
298 .createPreemptiveInterceptor(conf);
299 FastFailInterceptorContext context = (FastFailInterceptorContext) interceptor
300 .createEmptyContext();
301
302 RetryingCallable<?> callable = getDummyRetryingCallable(getSomeServerName());
303 context.prepare(callable, 0);
304 interceptor.intercept(context);
305 interceptor.handleFailure(context, e);
306 interceptor.updateFailureInfo(context);
307 assertFalse(
308 "The call shouldn't have been successful if there was a ConnectException",
309 context.getCouldNotCommunicateWithServer().booleanValue());
310 } catch (NoSuchMethodError t) {
311 assertTrue("Exception not induced", t.getMessage().contains(INDUCED));
312 } catch (NullPointerException t) {
313 assertTrue("Exception not induced", t.getMessage().contains(INDUCED));
314 } catch (DoNotRetryIOException t) {
315 assertTrue("Exception not induced", t.getMessage().contains(INDUCED));
316 } catch (Error t) {
317 assertTrue("Exception not induced", t.getMessage().contains(INDUCED));
318 }
319 }
320 }
321
322 protected static PreemptiveFastFailInterceptor createPreemptiveInterceptor(
323 Configuration conf) {
324 conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true);
325 RetryingCallerInterceptorFactory interceptorFactory = new RetryingCallerInterceptorFactory(
326 conf);
327 RetryingCallerInterceptor interceptorBeforeCast = interceptorFactory
328 .build();
329 return (PreemptiveFastFailInterceptor) interceptorBeforeCast;
330 }
331
332 static PreemptiveFastFailInterceptor createPreemptiveInterceptor() {
333 Configuration conf = HBaseConfiguration.create();
334 conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true);
335 return createPreemptiveInterceptor(conf);
336 }
337
338 @Test(timeout = 120000)
339 public void testPreemptiveFastFailException50Times()
340 throws InterruptedException, ExecutionException {
341
342
343 Assume.assumeTrue(!WINDOWS);
344
345 for (int i = 0; i < 50; i++) {
346 testPreemptiveFastFailException();
347 }
348 }
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403 private void testPreemptiveFastFailException() throws InterruptedException,
404 ExecutionException {
405 LOG.debug("Setting up the counters to start the test");
406 priviRetryCounter.set(0);
407 nonPriviRetryCounter.set(0);
408 done.set(0);
409
410 for (int i = 0; i <= RETRIES; i++) {
411 latches[i] = new CountDownLatch(1);
412 latches2[i] = new CountDownLatch(1);
413 }
414
415 PreemptiveFastFailInterceptor interceptor = getInterceptor();
416
417 final RpcRetryingCaller<Void> priviCaller = getRpcRetryingCaller(
418 PAUSE_TIME, RETRIES, interceptor);
419 final RpcRetryingCaller<Void> nonPriviCaller = getRpcRetryingCaller(
420 PAUSE_TIME, RETRIES, interceptor);
421
422 LOG.debug("Submitting the thread 1");
423 Future<Boolean> priviFuture = executor.submit(new Callable<Boolean>() {
424 @Override
425 public Boolean call() throws Exception {
426 try {
427 isPriviThreadLocal.get().set(true);
428 priviCaller
429 .callWithRetries(
430 getRetryingCallable(serverName, exception),
431 CLEANUP_TIMEOUT);
432 } catch (RetriesExhaustedException e) {
433 return true;
434 } catch (PreemptiveFastFailException e) {
435 return false;
436 }
437 return false;
438 }
439 });
440 LOG.debug("Submitting the thread 2");
441 Future<Boolean> nonPriviFuture = executor.submit(new Callable<Boolean>() {
442 @Override
443 public Boolean call() throws Exception {
444 try {
445 isPriviThreadLocal.get().set(false);
446 nonPriviCaller.callWithRetries(
447 getRetryingCallable(serverName, exception),
448 CLEANUP_TIMEOUT);
449 } catch (PreemptiveFastFailException e) {
450 return true;
451 }
452 return false;
453 }
454 });
455 LOG.debug("Waiting for Thread 2 to finish");
456 assertTrue(nonPriviFuture.get());
457 LOG.debug("Waiting for Thread 1 to finish");
458 assertTrue(priviFuture.get());
459
460
461
462
463
464 assertTrue(interceptor.isServerInFailureMap(serverName));
465 final RpcRetryingCaller<Void> priviCallerNew = getRpcRetryingCaller(
466 PAUSE_TIME, RETRIES, interceptor);
467 executor.submit(new Callable<Boolean>() {
468 @Override
469 public Boolean call() throws Exception {
470 priviCallerNew.callWithRetries(
471 getRetryingCallable(serverName, null), CLEANUP_TIMEOUT);
472 return false;
473 }
474 }).get();
475 assertFalse("The server was supposed to be removed from the map",
476 interceptor.isServerInFailureMap(serverName));
477 }
478
479 ExecutorService executor = Executors.newCachedThreadPool();
480
481
482
483
484 final int PAUSE_TIME = 10;
485 final int RETRIES = 3;
486 final int CLEANUP_TIMEOUT = 10000;
487 final long FAST_FAIL_THRESHOLD = PAUSE_TIME / 1;
488
489
490
491
492 final CountDownLatch[] latches = new CountDownLatch[RETRIES + 1];
493 final CountDownLatch[] latches2 = new CountDownLatch[RETRIES + 1];
494 final AtomicInteger done = new AtomicInteger(0);
495
496
497
498
499 final AtomicInteger priviRetryCounter = new AtomicInteger();
500 final AtomicInteger nonPriviRetryCounter = new AtomicInteger();
501 final ServerName serverName = getSomeServerName();
502
503
504
505
506 public final ThreadLocal<AtomicBoolean> isPriviThreadLocal = new ThreadLocal<AtomicBoolean>() {
507 @Override
508 public AtomicBoolean initialValue() {
509 return new AtomicBoolean(true);
510 }
511 };
512 final Exception exception = new ConnectionClosingException("The current connection is closed");
513
514 public PreemptiveFastFailInterceptor getInterceptor() {
515 final Configuration conf = HBaseConfiguration.create();
516 conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true);
517 conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS,
518 CLEANUP_TIMEOUT);
519 conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS,
520 FAST_FAIL_THRESHOLD);
521
522 return new PreemptiveFastFailInterceptor(
523 conf) {
524 @Override
525 public void updateFailureInfo(RetryingCallerInterceptorContext context) {
526 boolean pffe = false;
527 if (!isPriviThreadLocal.get().get()) {
528 pffe = !((FastFailInterceptorContext)context).isRetryDespiteFastFailMode();
529 }
530 if (isPriviThreadLocal.get().get()) {
531 try {
532
533
534 if (done.get() <= 1) {
535 latches2[priviRetryCounter.get()].await();
536 }
537 } catch (InterruptedException e) {
538 fail();
539 }
540 }
541 super.updateFailureInfo(context);
542 if (!isPriviThreadLocal.get().get()) {
543 if (pffe) done.incrementAndGet();
544 latches2[nonPriviRetryCounter.get()].countDown();
545 }
546 }
547
548 @Override
549 public void intercept(RetryingCallerInterceptorContext context)
550 throws PreemptiveFastFailException {
551 if (!isPriviThreadLocal.get().get()) {
552 try {
553 latches[nonPriviRetryCounter.getAndIncrement()].await();
554 } catch (InterruptedException e) {
555 fail();
556 }
557 }
558 super.intercept(context);
559 }
560
561 @Override
562 public void handleFailure(RetryingCallerInterceptorContext context,
563 Throwable t) throws IOException {
564 super.handleFailure(context, t);
565 if (isPriviThreadLocal.get().get()) {
566 latches[priviRetryCounter.getAndIncrement()].countDown();
567 }
568 }
569 };
570 }
571
572 public RpcRetryingCaller<Void> getRpcRetryingCaller(int pauseTime,
573 int retries, RetryingCallerInterceptor interceptor) {
574 return new RpcRetryingCaller<Void>(pauseTime, retries, interceptor, 9, 0) {
575 @Override
576 public Void callWithRetries(RetryingCallable<Void> callable,
577 int callTimeout) throws IOException, RuntimeException {
578 Void ret = super.callWithRetries(callable, callTimeout);
579 return ret;
580 }
581 };
582 }
583
584 protected static ServerName getSomeServerName() {
585 return ServerName.valueOf("localhost", 1234, 987654321);
586 }
587
588 private RegionServerCallable<Void> getRetryingCallable(
589 final ServerName serverName, final Exception e) {
590 return new RegionServerCallable<Void>(null, null, null) {
591 @Override
592 public void prepare(boolean reload) throws IOException {
593 this.location = new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO,
594 serverName);
595 }
596
597 @Override
598 public Void call(int callTimeout) throws Exception {
599 if (e != null)
600 throw e;
601 return null;
602 }
603
604 @Override
605 protected HRegionLocation getLocation() {
606 return new HRegionLocation(null, serverName);
607 }
608
609 @Override
610 public void throwable(Throwable t, boolean retrying) {
611
612 }
613
614 @Override
615 public long sleep(long pause, int tries) {
616 return ConnectionUtils.getPauseTime(pause, tries + 1);
617 }
618 };
619 }
620 }