1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22
23 import java.io.IOException;
24 import java.io.InterruptedIOException;
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Set;
31 import java.util.TreeSet;
32 import java.util.concurrent.BlockingQueue;
33 import java.util.concurrent.BrokenBarrierException;
34 import java.util.concurrent.CyclicBarrier;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.LinkedBlockingQueue;
38 import java.util.concurrent.RejectedExecutionException;
39 import java.util.concurrent.SynchronousQueue;
40 import java.util.concurrent.ThreadFactory;
41 import java.util.concurrent.ThreadPoolExecutor;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicBoolean;
44 import java.util.concurrent.atomic.AtomicInteger;
45 import java.util.concurrent.atomic.AtomicLong;
46
47 import org.apache.commons.logging.Log;
48 import org.apache.commons.logging.LogFactory;
49 import org.apache.hadoop.conf.Configuration;
50 import org.apache.hadoop.hbase.CallQueueTooBigException;
51 import org.apache.hadoop.hbase.Cell;
52 import org.apache.hadoop.hbase.RegionLocations;
53 import org.apache.hadoop.hbase.TableName;
54 import org.apache.hadoop.hbase.HConstants;
55 import org.apache.hadoop.hbase.HRegionInfo;
56 import org.apache.hadoop.hbase.HRegionLocation;
57 import org.apache.hadoop.hbase.testclassification.MediumTests;
58 import org.apache.hadoop.hbase.ServerName;
59 import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
60 import org.apache.hadoop.hbase.client.coprocessor.Batch;
61 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
62 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
63 import org.apache.hadoop.hbase.util.Bytes;
64 import org.apache.hadoop.hbase.util.Threads;
65 import org.junit.Assert;
66 import static org.junit.Assert.assertTrue;
67 import org.junit.BeforeClass;
68 import org.junit.Rule;
69 import org.junit.Test;
70 import org.junit.experimental.categories.Category;
71 import org.junit.rules.Timeout;
72 import org.mockito.Mockito;
73
74 import java.io.IOException;
75 import java.io.InterruptedIOException;
76 import java.util.ArrayList;
77 import java.util.Arrays;
78 import java.util.HashMap;
79 import java.util.List;
80 import java.util.Map;
81 import java.util.Set;
82 import java.util.TreeSet;
83 import java.util.concurrent.ExecutorService;
84 import java.util.concurrent.RejectedExecutionException;
85 import java.util.concurrent.SynchronousQueue;
86 import java.util.concurrent.ThreadFactory;
87 import java.util.concurrent.ThreadPoolExecutor;
88 import java.util.concurrent.TimeUnit;
89 import java.util.concurrent.atomic.AtomicBoolean;
90 import java.util.concurrent.atomic.AtomicInteger;
91 import java.util.concurrent.atomic.AtomicLong;
92
93 @Category(MediumTests.class)
94 public class TestAsyncProcess {
95 private static final TableName DUMMY_TABLE =
96 TableName.valueOf("DUMMY_TABLE");
97 private static final byte[] DUMMY_BYTES_1 = "DUMMY_BYTES_1".getBytes();
98 private static final byte[] DUMMY_BYTES_2 = "DUMMY_BYTES_2".getBytes();
99 private static final byte[] DUMMY_BYTES_3 = "DUMMY_BYTES_3".getBytes();
100 private static final byte[] FAILS = "FAILS".getBytes();
101 private static final Configuration conf = new Configuration();
102
103 private static ServerName sn = ServerName.valueOf("s1:1,1");
104 private static ServerName sn2 = ServerName.valueOf("s2:2,2");
105 private static ServerName sn3 = ServerName.valueOf("s3:3,3");
106 private static HRegionInfo hri1 =
107 new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
108 private static HRegionInfo hri2 =
109 new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2);
110 private static HRegionInfo hri3 =
111 new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3);
112 private static HRegionLocation loc1 = new HRegionLocation(hri1, sn);
113 private static HRegionLocation loc2 = new HRegionLocation(hri2, sn);
114 private static HRegionLocation loc3 = new HRegionLocation(hri3, sn2);
115
116
117 private static HRegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1),
118 hri1r2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 2);
119 private static HRegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica(hri2, 1);
120 private static RegionLocations hrls1 = new RegionLocations(new HRegionLocation(hri1, sn),
121 new HRegionLocation(hri1r1, sn2), new HRegionLocation(hri1r2, sn3));
122 private static RegionLocations hrls2 = new RegionLocations(new HRegionLocation(hri2, sn2),
123 new HRegionLocation(hri2r1, sn3));
124 private static RegionLocations hrls3 = new RegionLocations(new HRegionLocation(hri3, sn3), null);
125
126 private static final String success = "success";
127 private static Exception failure = new Exception("failure");
128
129 private static int NB_RETRIES = 3;
130
131 @BeforeClass
132 public static void beforeClass(){
133 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES);
134 }
135
136 static class CountingThreadFactory implements ThreadFactory {
137 final AtomicInteger nbThreads;
138 ThreadFactory realFactory = Threads.newDaemonThreadFactory("test-TestAsyncProcess");
139 @Override
140 public Thread newThread(Runnable r) {
141 nbThreads.incrementAndGet();
142 return realFactory.newThread(r);
143 }
144
145 CountingThreadFactory(AtomicInteger nbThreads){
146 this.nbThreads = nbThreads;
147 }
148 }
149
150 static class MyAsyncProcess extends AsyncProcess {
151 final AtomicInteger nbMultiResponse = new AtomicInteger();
152 final AtomicInteger nbActions = new AtomicInteger();
153 public List<AsyncRequestFuture> allReqs = new ArrayList<AsyncRequestFuture>();
154 public AtomicInteger callsCt = new AtomicInteger();
155
156 @Override
157 protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName,
158 List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
159 Batch.Callback<Res> callback, Object[] results, boolean needResults) {
160
161 AsyncRequestFutureImpl<Res> r = super.createAsyncRequestFuture(
162 DUMMY_TABLE, actions, nonceGroup, pool, callback, results, needResults);
163 allReqs.add(r);
164 callsCt.incrementAndGet();
165 return r;
166 }
167
168 public MyAsyncProcess(ClusterConnection hc, Configuration conf) {
169 this(hc, conf, new AtomicInteger());
170 }
171
172 public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
173 super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
174 new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)),
175 new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf));
176 }
177
178 public MyAsyncProcess(
179 ClusterConnection hc, Configuration conf, boolean useGlobalErrors) {
180 super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
181 new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())),
182 new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
183 }
184
185 public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors,
186 @SuppressWarnings("unused") boolean dummy) {
187 super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
188 new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())) {
189 @Override
190 public void execute(Runnable command) {
191 throw new RejectedExecutionException("test under failure");
192 }
193 },
194 new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
195 }
196
197 @Override
198 public <Res> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows,
199 boolean atLeastOne, Callback<Res> callback, boolean needResults)
200 throws InterruptedIOException {
201
202 return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true);
203 }
204
205 @Override
206 protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
207 callsCt.incrementAndGet();
208 final MultiResponse mr = createMultiResponse(
209 callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
210 @Override
211 public void addResponse(MultiResponse mr, byte[] regionName, Action<Row> a) {
212 if (Arrays.equals(FAILS, a.getAction().getRow())) {
213 mr.add(regionName, a.getOriginalIndex(), failure);
214 } else {
215 mr.add(regionName, a.getOriginalIndex(), success);
216 }
217 }
218 });
219
220 return new RpcRetryingCaller<MultiResponse>(100, 10, 9) {
221 @Override
222 public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable,
223 int callTimeout)
224 throws IOException, RuntimeException {
225 try {
226
227
228 Thread.sleep(1000);
229 } catch (InterruptedException e) {
230
231 }
232 return mr;
233 }
234 };
235 }
236 }
237
238 static class CallerWithFailure extends RpcRetryingCaller<MultiResponse>{
239
240 private final IOException e;
241
242 public CallerWithFailure(IOException e) {
243 super(100, 100, 9);
244 this.e = e;
245 }
246
247 @Override
248 public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout)
249 throws IOException, RuntimeException {
250 throw e;
251 }
252 }
253
254
255 static class AsyncProcessWithFailure extends MyAsyncProcess {
256
257 private final IOException ioe;
258
259 public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf, IOException ioe) {
260 super(hc, conf, true);
261 this.ioe = ioe;
262 serverTrackerTimeout = 1;
263 }
264
265 @Override
266 protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
267 callsCt.incrementAndGet();
268 return new CallerWithFailure(ioe);
269 }
270 }
271
272 class MyAsyncProcessWithReplicas extends MyAsyncProcess {
273 private Set<byte[]> failures = new TreeSet<byte[]>(new Bytes.ByteArrayComparator());
274 private long primarySleepMs = 0, replicaSleepMs = 0;
275 private Map<ServerName, Long> customPrimarySleepMs = new HashMap<ServerName, Long>();
276 private final AtomicLong replicaCalls = new AtomicLong(0);
277
278 public void addFailures(HRegionInfo... hris) {
279 for (HRegionInfo hri : hris) {
280 failures.add(hri.getRegionName());
281 }
282 }
283
284 public long getReplicaCallCount() {
285 return replicaCalls.get();
286 }
287
288 public void setPrimaryCallDelay(ServerName server, long primaryMs) {
289 customPrimarySleepMs.put(server, primaryMs);
290 }
291
292 public MyAsyncProcessWithReplicas(ClusterConnection hc, Configuration conf) {
293 super(hc, conf);
294 }
295
296 public void setCallDelays(long primaryMs, long replicaMs) {
297 this.primarySleepMs = primaryMs;
298 this.replicaSleepMs = replicaMs;
299 }
300
301 @Override
302 protected RpcRetryingCaller<MultiResponse> createCaller(
303 MultiServerCallable<Row> callable) {
304 final MultiResponse mr = createMultiResponse(
305 callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
306 @Override
307 public void addResponse(MultiResponse mr, byte[] regionName, Action<Row> a) {
308 if (failures.contains(regionName)) {
309 mr.add(regionName, a.getOriginalIndex(), failure);
310 } else {
311 boolean isStale = !RegionReplicaUtil.isDefaultReplica(a.getReplicaId());
312 mr.add(regionName, a.getOriginalIndex(),
313 Result.create(new Cell[0], null, isStale));
314 }
315 }
316 });
317
318 final boolean isDefault = RegionReplicaUtil.isDefaultReplica(
319 callable.getMulti().actions.values().iterator().next().iterator().next().getReplicaId());
320 final ServerName server = ((MultiServerCallable<?>)callable).getServerName();
321 String debugMsg = "Call to " + server + ", primary=" + isDefault + " with "
322 + callable.getMulti().actions.size() + " entries: ";
323 for (byte[] region : callable.getMulti().actions.keySet()) {
324 debugMsg += "[" + Bytes.toStringBinary(region) + "], ";
325 }
326 LOG.debug(debugMsg);
327 if (!isDefault) {
328 replicaCalls.incrementAndGet();
329 }
330
331 return new RpcRetryingCaller<MultiResponse>(100, 10, 9) {
332 @Override
333 public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout)
334 throws IOException, RuntimeException {
335 long sleep = -1;
336 if (isDefault) {
337 Long customSleep = customPrimarySleepMs.get(server);
338 sleep = (customSleep == null ? primarySleepMs : customSleep.longValue());
339 } else {
340 sleep = replicaSleepMs;
341 }
342 if (sleep != 0) {
343 try {
344 Thread.sleep(sleep);
345 } catch (InterruptedException e) {
346 }
347 }
348 return mr;
349 }
350 };
351 }
352 }
353
354 static MultiResponse createMultiResponse(final MultiAction<Row> multi,
355 AtomicInteger nbMultiResponse, AtomicInteger nbActions, ResponseGenerator gen) {
356 final MultiResponse mr = new MultiResponse();
357 nbMultiResponse.incrementAndGet();
358 for (Map.Entry<byte[], List<Action<Row>>> entry : multi.actions.entrySet()) {
359 byte[] regionName = entry.getKey();
360 for (Action<Row> a : entry.getValue()) {
361 nbActions.incrementAndGet();
362 gen.addResponse(mr, regionName, a);
363 }
364 }
365 return mr;
366 }
367
368 private static interface ResponseGenerator {
369 void addResponse(final MultiResponse mr, byte[] regionName, Action<Row> a);
370 }
371
372
373
374
375 static class MyConnectionImpl extends ConnectionManager.HConnectionImplementation {
376 final AtomicInteger nbThreads = new AtomicInteger(0);
377
378
379 protected MyConnectionImpl(Configuration conf) {
380 super(conf);
381 }
382
383 @Override
384 public RegionLocations locateRegion(TableName tableName,
385 byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException {
386 return new RegionLocations(loc1);
387 }
388
389 @Override
390 public boolean hasCellBlockSupport() {
391 return false;
392 }
393 }
394
395
396
397
398 static class MyConnectionImpl2 extends MyConnectionImpl {
399 List<HRegionLocation> hrl;
400 final boolean usedRegions[];
401
402 protected MyConnectionImpl2(List<HRegionLocation> hrl) {
403 super(conf);
404 this.hrl = hrl;
405 this.usedRegions = new boolean[hrl.size()];
406 }
407
408 @Override
409 public RegionLocations locateRegion(TableName tableName,
410 byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException {
411 int i = 0;
412 for (HRegionLocation hr : hrl){
413 if (Arrays.equals(row, hr.getRegionInfo().getStartKey())) {
414 usedRegions[i] = true;
415 return new RegionLocations(hr);
416 }
417 i++;
418 }
419 return null;
420 }
421
422 }
423
424 @Rule
425 public Timeout timeout = new Timeout(10000);
426
427 @Test
428 public void testSubmit() throws Exception {
429 ClusterConnection hc = createHConnection();
430 AsyncProcess ap = new MyAsyncProcess(hc, conf);
431
432 List<Put> puts = new ArrayList<Put>();
433 puts.add(createPut(1, true));
434
435 ap.submit(DUMMY_TABLE, puts, false, null, false);
436 Assert.assertTrue(puts.isEmpty());
437 }
438
439 @Test
440 public void testSubmitWithCB() throws Exception {
441 ClusterConnection hc = createHConnection();
442 final AtomicInteger updateCalled = new AtomicInteger(0);
443 Batch.Callback<Object> cb = new Batch.Callback<Object>() {
444 @Override
445 public void update(byte[] region, byte[] row, Object result) {
446 updateCalled.incrementAndGet();
447 }
448 };
449 AsyncProcess ap = new MyAsyncProcess(hc, conf);
450
451 List<Put> puts = new ArrayList<Put>();
452 puts.add(createPut(1, true));
453
454 final AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, cb, false);
455 Assert.assertTrue(puts.isEmpty());
456 ars.waitUntilDone();
457 Assert.assertEquals(updateCalled.get(), 1);
458 }
459
460 @Test
461 public void testSubmitBusyRegion() throws Exception {
462 ClusterConnection hc = createHConnection();
463 AsyncProcess ap = new MyAsyncProcess(hc, conf);
464
465 List<Put> puts = new ArrayList<Put>();
466 puts.add(createPut(1, true));
467
468 ap.incTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
469 ap.submit(DUMMY_TABLE, puts, false, null, false);
470 Assert.assertEquals(puts.size(), 1);
471
472 ap.decTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
473 ap.submit(DUMMY_TABLE, puts, false, null, false);
474 Assert.assertEquals(0, puts.size());
475 }
476
477
478 @Test
479 public void testSubmitBusyRegionServer() throws Exception {
480 ClusterConnection hc = createHConnection();
481 AsyncProcess ap = new MyAsyncProcess(hc, conf);
482
483 ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer));
484
485 List<Put> puts = new ArrayList<Put>();
486 puts.add(createPut(1, true));
487 puts.add(createPut(3, true));
488 puts.add(createPut(1, true));
489 puts.add(createPut(2, true));
490
491 ap.submit(DUMMY_TABLE, puts, false, null, false);
492 Assert.assertEquals(" puts=" + puts, 1, puts.size());
493
494 ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer - 1));
495 ap.submit(DUMMY_TABLE, puts, false, null, false);
496 Assert.assertTrue(puts.isEmpty());
497 }
498
499 @Test
500 public void testFail() throws Exception {
501 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
502
503 List<Put> puts = new ArrayList<Put>();
504 Put p = createPut(1, false);
505 puts.add(p);
506
507 AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
508 Assert.assertEquals(0, puts.size());
509 ars.waitUntilDone();
510 verifyResult(ars, false);
511 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
512
513 Assert.assertEquals(1, ars.getErrors().exceptions.size());
514 Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
515 failure.equals(ars.getErrors().exceptions.get(0)));
516 Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
517 failure.equals(ars.getErrors().exceptions.get(0)));
518
519 Assert.assertEquals(1, ars.getFailedOperations().size());
520 Assert.assertTrue("was: " + ars.getFailedOperations().get(0),
521 p.equals(ars.getFailedOperations().get(0)));
522 }
523
524
525 @Test
526 public void testSubmitTrue() throws IOException {
527 final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
528 ap.tasksInProgress.incrementAndGet();
529 final AtomicInteger ai = new AtomicInteger(1);
530 ap.taskCounterPerRegion.put(hri1.getRegionName(), ai);
531
532 final AtomicBoolean checkPoint = new AtomicBoolean(false);
533 final AtomicBoolean checkPoint2 = new AtomicBoolean(false);
534
535 Thread t = new Thread(){
536 @Override
537 public void run(){
538 Threads.sleep(1000);
539 Assert.assertFalse(checkPoint.get());
540 ai.decrementAndGet();
541 ap.tasksInProgress.decrementAndGet();
542 checkPoint2.set(true);
543 }
544 };
545
546 List<Put> puts = new ArrayList<Put>();
547 Put p = createPut(1, true);
548 puts.add(p);
549
550 ap.submit(DUMMY_TABLE, puts, false, null, false);
551 Assert.assertFalse(puts.isEmpty());
552
553 t.start();
554
555 ap.submit(DUMMY_TABLE, puts, true, null, false);
556 Assert.assertTrue(puts.isEmpty());
557
558 checkPoint.set(true);
559 while (!checkPoint2.get()){
560 Threads.sleep(1);
561 }
562 }
563
564 @Test
565 public void testFailAndSuccess() throws Exception {
566 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
567
568 List<Put> puts = new ArrayList<Put>();
569 puts.add(createPut(1, false));
570 puts.add(createPut(1, true));
571 puts.add(createPut(1, true));
572
573 AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
574 Assert.assertTrue(puts.isEmpty());
575 ars.waitUntilDone();
576 verifyResult(ars, false, true, true);
577 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
578 ap.callsCt.set(0);
579 Assert.assertEquals(1, ars.getErrors().actions.size());
580
581 puts.add(createPut(1, true));
582
583 ap.waitUntilDone();
584 ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
585 Assert.assertEquals(0, puts.size());
586 ars.waitUntilDone();
587 Assert.assertEquals(2, ap.callsCt.get());
588 verifyResult(ars, true);
589 }
590
591 @Test
592 public void testFlush() throws Exception {
593 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
594
595 List<Put> puts = new ArrayList<Put>();
596 puts.add(createPut(1, false));
597 puts.add(createPut(1, true));
598 puts.add(createPut(1, true));
599
600 AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
601 ars.waitUntilDone();
602 verifyResult(ars, false, true, true);
603 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
604
605 Assert.assertEquals(1, ars.getFailedOperations().size());
606 }
607
608 @Test
609 public void testMaxTask() throws Exception {
610 final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
611
612 for (int i = 0; i < 1000; i++) {
613 ap.incTaskCounters(Arrays.asList("dummy".getBytes()), sn);
614 }
615
616 final Thread myThread = Thread.currentThread();
617
618 Thread t = new Thread() {
619 @Override
620 public void run() {
621 Threads.sleep(2000);
622 myThread.interrupt();
623 }
624 };
625
626 List<Put> puts = new ArrayList<Put>();
627 puts.add(createPut(1, true));
628
629 t.start();
630
631 try {
632 ap.submit(DUMMY_TABLE, puts, false, null, false);
633 Assert.fail("We should have been interrupted.");
634 } catch (InterruptedIOException expected) {
635 }
636
637 final long sleepTime = 2000;
638
639 Thread t2 = new Thread() {
640 @Override
641 public void run() {
642 Threads.sleep(sleepTime);
643 while (ap.tasksInProgress.get() > 0) {
644 ap.decTaskCounters(Arrays.asList("dummy".getBytes()), sn);
645 }
646 }
647 };
648 t2.start();
649
650 long start = System.currentTimeMillis();
651 ap.submit(DUMMY_TABLE, new ArrayList<Row>(), false, null, false);
652 long end = System.currentTimeMillis();
653
654
655 Assert.assertTrue(start + 100L + sleepTime > end);
656 }
657
658 private static ClusterConnection createHConnection() throws IOException {
659 ClusterConnection hc = createHConnectionCommon();
660 setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1));
661 setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2));
662 setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3));
663 setMockLocation(hc, FAILS, new RegionLocations(loc2));
664 return hc;
665 }
666
667 private static ClusterConnection createHConnectionWithReplicas() throws IOException {
668 ClusterConnection hc = createHConnectionCommon();
669 setMockLocation(hc, DUMMY_BYTES_1, hrls1);
670 setMockLocation(hc, DUMMY_BYTES_2, hrls2);
671 setMockLocation(hc, DUMMY_BYTES_3, hrls3);
672 return hc;
673 }
674
675 private static void setMockLocation(ClusterConnection hc, byte[] row,
676 RegionLocations result) throws IOException {
677 Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row),
678 Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result);
679 }
680
681 private static ClusterConnection createHConnectionCommon() {
682 ClusterConnection hc = Mockito.mock(ClusterConnection.class);
683 NonceGenerator ng = Mockito.mock(NonceGenerator.class);
684 Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
685 Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
686 Mockito.when(hc.getConfiguration()).thenReturn(conf);
687 return hc;
688 }
689
690 @Test
691 public void testHTablePutSuccess() throws Exception {
692 BufferedMutatorImpl ht = Mockito.mock(BufferedMutatorImpl.class);
693 ht.ap = new MyAsyncProcess(createHConnection(), conf, true);
694
695 Put put = createPut(1, true);
696
697 Assert.assertEquals(0, ht.getWriteBufferSize());
698 ht.mutate(put);
699 Assert.assertEquals(0, ht.getWriteBufferSize());
700 }
701
702 private void doHTableFailedPut(boolean bufferOn) throws Exception {
703 ClusterConnection conn = createHConnection();
704 HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE));
705 MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
706 ht.mutator.ap = ap;
707 if (bufferOn) {
708 ht.setWriteBufferSize(1024L * 1024L);
709 } else {
710 ht.setWriteBufferSize(0L);
711 }
712
713 Put put = createPut(1, false);
714
715 Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize);
716 try {
717 ht.put(put);
718 if (bufferOn) {
719 ht.flushCommits();
720 }
721 Assert.fail();
722 } catch (RetriesExhaustedException expected) {
723 }
724 Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize);
725
726 AsyncRequestFuture ars = null;
727 for (AsyncRequestFuture someReqs : ap.allReqs) {
728 if (someReqs.getResults().length == 0) continue;
729 Assert.assertTrue(ars == null);
730 ars = someReqs;
731 }
732 Assert.assertTrue(ars != null);
733 verifyResult(ars, false);
734
735
736 ht.close();
737 }
738
739 @Test
740 public void testHTableFailedPutWithBuffer() throws Exception {
741 doHTableFailedPut(true);
742 }
743
744 @Test
745 public void testHTableFailedPutWithoutBuffer() throws Exception {
746 doHTableFailedPut(false);
747 }
748
749 @Test
750 public void testHTableFailedPutAndNewPut() throws Exception {
751 ClusterConnection conn = createHConnection();
752 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null,
753 new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(0));
754 MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
755 mutator.ap = ap;
756
757 Put p = createPut(1, false);
758 mutator.mutate(p);
759
760 ap.waitUntilDone();
761
762
763
764
765
766
767 p = createPut(1, true);
768 Assert.assertEquals(0, mutator.getWriteBuffer().size());
769 try {
770 mutator.mutate(p);
771 Assert.fail();
772 } catch (RetriesExhaustedException expected) {
773 }
774 Assert.assertEquals("the put should not been inserted.", 0, mutator.getWriteBuffer().size());
775 }
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803 @Test
804 public void testBatch() throws IOException, InterruptedException {
805 ClusterConnection conn = new MyConnectionImpl(conf);
806 HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE));
807 ht.multiAp = new MyAsyncProcess(conn, conf, false);
808
809 List<Put> puts = new ArrayList<Put>();
810 puts.add(createPut(1, true));
811 puts.add(createPut(1, true));
812 puts.add(createPut(1, true));
813 puts.add(createPut(1, true));
814 puts.add(createPut(1, false));
815 puts.add(createPut(1, true));
816 puts.add(createPut(1, false));
817
818 Object[] res = new Object[puts.size()];
819 try {
820 ht.processBatch(puts, res);
821 Assert.fail();
822 } catch (RetriesExhaustedException expected) {
823 }
824
825 Assert.assertEquals(res[0], success);
826 Assert.assertEquals(res[1], success);
827 Assert.assertEquals(res[2], success);
828 Assert.assertEquals(res[3], success);
829 Assert.assertEquals(res[4], failure);
830 Assert.assertEquals(res[5], success);
831 Assert.assertEquals(res[6], failure);
832 }
833
834 @Test
835 public void testErrorsServers() throws IOException {
836 Configuration configuration = new Configuration(conf);
837 ClusterConnection conn = new MyConnectionImpl(configuration);
838 BufferedMutatorImpl mutator =
839 new BufferedMutatorImpl(conn, null, null, new BufferedMutatorParams(DUMMY_TABLE));
840 configuration.setBoolean(ConnectionManager.RETRIES_BY_SERVER_KEY, true);
841
842 MyAsyncProcess ap = new MyAsyncProcess(conn, configuration, true);
843 mutator.ap = ap;
844
845 Assert.assertNotNull(mutator.ap.createServerErrorTracker());
846 Assert.assertTrue(mutator.ap.serverTrackerTimeout > 200);
847 mutator.ap.serverTrackerTimeout = 1;
848
849 Put p = createPut(1, false);
850 mutator.mutate(p);
851
852 try {
853 mutator.flush();
854 Assert.fail();
855 } catch (RetriesExhaustedWithDetailsException expected) {
856 }
857
858 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
859 }
860
861 @Test
862 public void testGlobalErrors() throws IOException {
863 ClusterConnection conn = new MyConnectionImpl(conf);
864 BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE);
865 AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new IOException("test"));
866 mutator.ap = ap;
867
868 Assert.assertNotNull(mutator.ap.createServerErrorTracker());
869
870 Put p = createPut(1, true);
871 mutator.mutate(p);
872
873 try {
874 mutator.flush();
875 Assert.fail();
876 } catch (RetriesExhaustedWithDetailsException expected) {
877 }
878
879 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
880 }
881
882
883 @Test
884 public void testCallQueueTooLarge() throws IOException {
885 ClusterConnection conn = new MyConnectionImpl(conf);
886 BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE);
887 AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new CallQueueTooBigException());
888 mutator.ap = ap;
889
890 Assert.assertNotNull(mutator.ap.createServerErrorTracker());
891
892 Put p = createPut(1, true);
893 mutator.mutate(p);
894
895 try {
896 mutator.flush();
897 Assert.fail();
898 } catch (RetriesExhaustedWithDetailsException expected) {
899 }
900
901 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
902 }
903
904
905
906
907 @Test
908 public void testThreadCreation() throws Exception {
909 final int NB_REGS = 100;
910 List<HRegionLocation> hrls = new ArrayList<HRegionLocation>(NB_REGS);
911 List<Get> gets = new ArrayList<Get>(NB_REGS);
912 for (int i = 0; i < NB_REGS; i++) {
913 HRegionInfo hri = new HRegionInfo(
914 DUMMY_TABLE, Bytes.toBytes(i * 10L), Bytes.toBytes(i * 10L + 9L), false, i);
915 HRegionLocation hrl = new HRegionLocation(hri, i % 2 == 0 ? sn : sn2);
916 hrls.add(hrl);
917
918 Get get = new Get(Bytes.toBytes(i * 10L));
919 gets.add(get);
920 }
921
922 MyConnectionImpl2 con = new MyConnectionImpl2(hrls);
923 HTable ht = new HTable(con, new BufferedMutatorParams(DUMMY_TABLE));
924 MyAsyncProcess ap = new MyAsyncProcess(con, conf, con.nbThreads);
925 ht.multiAp = ap;
926
927 ht.batch(gets, new Object[gets.size()]);
928
929 Assert.assertEquals(ap.nbActions.get(), NB_REGS);
930 Assert.assertEquals("1 multi response per server", 2, ap.nbMultiResponse.get());
931 Assert.assertEquals("1 thread per server", 2, con.nbThreads.get());
932
933 int nbReg = 0;
934 for (int i =0; i<NB_REGS; i++){
935 if (con.usedRegions[i]) nbReg++;
936 }
937 Assert.assertEquals("nbReg=" + nbReg, nbReg, NB_REGS);
938 }
939
940 @Test
941 public void testReplicaReplicaSuccess() throws Exception {
942
943
944 MyAsyncProcessWithReplicas ap = createReplicaAp(10, 1000, 0);
945 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
946 AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[3]);
947 verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE);
948 Assert.assertEquals(2, ap.getReplicaCallCount());
949 }
950
951 @Test
952 public void testReplicaPrimarySuccessWoReplicaCalls() throws Exception {
953
954 MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 10, 0);
955 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
956 AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[3]);
957 verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE);
958 Assert.assertEquals(0, ap.getReplicaCallCount());
959 }
960
961 @Test
962 public void testReplicaParallelCallsSucceed() throws Exception {
963
964 MyAsyncProcessWithReplicas ap = createReplicaAp(0, 0, 0);
965 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
966 AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
967 verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE);
968 long replicaCalls = ap.getReplicaCallCount();
969 Assert.assertTrue(replicaCalls >= 0);
970 Assert.assertTrue(replicaCalls <= 2);
971 }
972
973 @Test
974 public void testReplicaPartialReplicaCall() throws Exception {
975
976
977
978 MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0);
979 ap.setPrimaryCallDelay(sn2, 2000);
980 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
981 AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
982 verifyReplicaResult(ars, RR.FALSE, RR.TRUE);
983 Assert.assertEquals(1, ap.getReplicaCallCount());
984 }
985
986 @Test
987 public void testReplicaMainFailsBeforeReplicaCalls() throws Exception {
988
989
990
991 MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 1);
992 ap.addFailures(hri1, hri2);
993 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
994 AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
995 verifyReplicaResult(ars, RR.FAILED, RR.FAILED);
996 Assert.assertEquals(0, ap.getReplicaCallCount());
997 }
998
999 @Test
1000 public void testReplicaReplicaSuccessWithParallelFailures() throws Exception {
1001
1002
1003 MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 1);
1004 ap.addFailures(hri1, hri1r2, hri2);
1005 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
1006 AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
1007 verifyReplicaResult(ars, RR.TRUE, RR.TRUE);
1008 Assert.assertEquals(2, ap.getReplicaCallCount());
1009 }
1010
1011 @Test
1012 public void testReplicaAllCallsFailForOneRegion() throws Exception {
1013
1014
1015 MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 1);
1016 ap.addFailures(hri1, hri1r1, hri1r2, hri2r1);
1017 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
1018 AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
1019 verifyReplicaResult(ars, RR.FAILED, RR.FALSE);
1020
1021 Assert.assertEquals(3, ars.getErrors().getNumExceptions());
1022 for (int i = 0; i < ars.getErrors().getNumExceptions(); ++i) {
1023 Assert.assertArrayEquals(DUMMY_BYTES_1, ars.getErrors().getRow(i).getRow());
1024 }
1025 }
1026
1027 private MyAsyncProcessWithReplicas createReplicaAp(
1028 int replicaAfterMs, int primaryMs, int replicaMs) throws Exception {
1029 return createReplicaAp(replicaAfterMs, primaryMs, replicaMs, -1);
1030 }
1031
1032 private MyAsyncProcessWithReplicas createReplicaAp(
1033 int replicaAfterMs, int primaryMs, int replicaMs, int retries) throws Exception {
1034
1035
1036 Configuration conf = new Configuration();
1037 ClusterConnection conn = createHConnectionWithReplicas();
1038 conf.setInt(AsyncProcess.PRIMARY_CALL_TIMEOUT_KEY, replicaAfterMs * 1000);
1039 if (retries > 0) {
1040 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
1041 }
1042 MyAsyncProcessWithReplicas ap = new MyAsyncProcessWithReplicas(conn, conf);
1043 ap.setCallDelays(primaryMs, replicaMs);
1044 return ap;
1045 }
1046
1047 private static List<Get> makeTimelineGets(byte[]... rows) {
1048 List<Get> result = new ArrayList<Get>();
1049 for (byte[] row : rows) {
1050 Get get = new Get(row);
1051 get.setConsistency(Consistency.TIMELINE);
1052 result.add(get);
1053 }
1054 return result;
1055 }
1056
1057 private void verifyResult(AsyncRequestFuture ars, boolean... expected) throws Exception {
1058 Object[] actual = ars.getResults();
1059 Assert.assertEquals(expected.length, actual.length);
1060 for (int i = 0; i < expected.length; ++i) {
1061 Assert.assertEquals(expected[i], !(actual[i] instanceof Throwable));
1062 }
1063 }
1064
1065
1066 private enum RR {
1067 TRUE,
1068 FALSE,
1069 DONT_CARE,
1070 FAILED
1071 }
1072
1073 private void verifyReplicaResult(AsyncRequestFuture ars, RR... expecteds) throws Exception {
1074 Object[] actuals = ars.getResults();
1075 Assert.assertEquals(expecteds.length, actuals.length);
1076 for (int i = 0; i < expecteds.length; ++i) {
1077 Object actual = actuals[i];
1078 RR expected = expecteds[i];
1079 Assert.assertEquals(actual.toString(), expected == RR.FAILED, actual instanceof Throwable);
1080 if (expected != RR.FAILED && expected != RR.DONT_CARE) {
1081 Assert.assertEquals(expected == RR.TRUE, ((Result)actual).isStale());
1082 }
1083 }
1084 }
1085
1086
1087
1088
1089
1090
1091 private Put createPut(int regCnt, boolean success) {
1092 Put p;
1093 if (!success) {
1094 p = new Put(FAILS);
1095 } else switch (regCnt){
1096 case 1 :
1097 p = new Put(DUMMY_BYTES_1);
1098 break;
1099 case 2:
1100 p = new Put(DUMMY_BYTES_2);
1101 break;
1102 case 3:
1103 p = new Put(DUMMY_BYTES_3);
1104 break;
1105 default:
1106 throw new IllegalArgumentException("unknown " + regCnt);
1107 }
1108
1109 p.add(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
1110
1111 return p;
1112 }
1113
1114 @Test
1115 public void testWaitForMaximumCurrentTasks() throws Exception {
1116 final AtomicLong tasks = new AtomicLong(0);
1117 final AtomicInteger max = new AtomicInteger(0);
1118 final CyclicBarrier barrier = new CyclicBarrier(2);
1119 final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf);
1120 Runnable runnable = new Runnable() {
1121 @Override
1122 public void run() {
1123 try {
1124 barrier.await();
1125 ap.waitForMaximumCurrentTasks(max.get(), tasks, 1, null);
1126 } catch (InterruptedIOException e) {
1127 Assert.fail(e.getMessage());
1128 } catch (InterruptedException e) {
1129
1130 e.printStackTrace();
1131 } catch (BrokenBarrierException e) {
1132
1133 e.printStackTrace();
1134 }
1135 }
1136 };
1137
1138 Thread t = new Thread(runnable);
1139 t.start();
1140 barrier.await();
1141 t.join();
1142
1143 barrier.reset();
1144 tasks.set(1000000);
1145 t = new Thread(runnable);
1146 t.start();
1147 barrier.await();
1148 while (tasks.get() > 0) {
1149 assertTrue(t.isAlive());
1150 tasks.set(tasks.get() - 1);
1151 }
1152 t.join();
1153 }
1154 }