View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.client;
21  
22  
23  import java.io.IOException;
24  import java.io.InterruptedIOException;
25  import java.util.ArrayList;
26  import java.util.Arrays;
27  import java.util.HashMap;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Set;
31  import java.util.TreeSet;
32  import java.util.concurrent.BlockingQueue;
33  import java.util.concurrent.BrokenBarrierException;
34  import java.util.concurrent.CyclicBarrier;
35  import java.util.concurrent.ExecutorService;
36  import java.util.concurrent.Future;
37  import java.util.concurrent.LinkedBlockingQueue;
38  import java.util.concurrent.RejectedExecutionException;
39  import java.util.concurrent.SynchronousQueue;
40  import java.util.concurrent.ThreadFactory;
41  import java.util.concurrent.ThreadPoolExecutor;
42  import java.util.concurrent.TimeUnit;
43  import java.util.concurrent.atomic.AtomicBoolean;
44  import java.util.concurrent.atomic.AtomicInteger;
45  import java.util.concurrent.atomic.AtomicLong;
46  
47  import org.apache.commons.logging.Log;
48  import org.apache.commons.logging.LogFactory;
49  import org.apache.hadoop.conf.Configuration;
50  import org.apache.hadoop.hbase.CallQueueTooBigException;
51  import org.apache.hadoop.hbase.Cell;
52  import org.apache.hadoop.hbase.RegionLocations;
53  import org.apache.hadoop.hbase.TableName;
54  import org.apache.hadoop.hbase.HConstants;
55  import org.apache.hadoop.hbase.HRegionInfo;
56  import org.apache.hadoop.hbase.HRegionLocation;
57  import org.apache.hadoop.hbase.testclassification.MediumTests;
58  import org.apache.hadoop.hbase.ServerName;
59  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
60  import org.apache.hadoop.hbase.client.coprocessor.Batch;
61  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
62  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
63  import org.apache.hadoop.hbase.util.Bytes;
64  import org.apache.hadoop.hbase.util.Threads;
65  import org.junit.Assert;
66  import static org.junit.Assert.assertTrue;
67  import org.junit.BeforeClass;
68  import org.junit.Rule;
69  import org.junit.Test;
70  import org.junit.experimental.categories.Category;
71  import org.junit.rules.Timeout;
72  import org.mockito.Mockito;
73  
74  import java.io.IOException;
75  import java.io.InterruptedIOException;
76  import java.util.ArrayList;
77  import java.util.Arrays;
78  import java.util.HashMap;
79  import java.util.List;
80  import java.util.Map;
81  import java.util.Set;
82  import java.util.TreeSet;
83  import java.util.concurrent.ExecutorService;
84  import java.util.concurrent.RejectedExecutionException;
85  import java.util.concurrent.SynchronousQueue;
86  import java.util.concurrent.ThreadFactory;
87  import java.util.concurrent.ThreadPoolExecutor;
88  import java.util.concurrent.TimeUnit;
89  import java.util.concurrent.atomic.AtomicBoolean;
90  import java.util.concurrent.atomic.AtomicInteger;
91  import java.util.concurrent.atomic.AtomicLong;
92  
93  @Category(MediumTests.class)
94  public class TestAsyncProcess {
95    private static final TableName DUMMY_TABLE =
96        TableName.valueOf("DUMMY_TABLE");
97    private static final byte[] DUMMY_BYTES_1 = "DUMMY_BYTES_1".getBytes();
98    private static final byte[] DUMMY_BYTES_2 = "DUMMY_BYTES_2".getBytes();
99    private static final byte[] DUMMY_BYTES_3 = "DUMMY_BYTES_3".getBytes();
100   private static final byte[] FAILS = "FAILS".getBytes();
101   private static final Configuration conf = new Configuration();
102 
103   private static ServerName sn = ServerName.valueOf("s1:1,1");
104   private static ServerName sn2 = ServerName.valueOf("s2:2,2");
105   private static ServerName sn3 = ServerName.valueOf("s3:3,3");
106   private static HRegionInfo hri1 =
107       new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
108   private static HRegionInfo hri2 =
109       new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2);
110   private static HRegionInfo hri3 =
111       new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3);
112   private static HRegionLocation loc1 = new HRegionLocation(hri1, sn);
113   private static HRegionLocation loc2 = new HRegionLocation(hri2, sn);
114   private static HRegionLocation loc3 = new HRegionLocation(hri3, sn2);
115 
116   // Replica stuff
117   private static HRegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1),
118       hri1r2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 2);
119   private static HRegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica(hri2, 1);
120   private static RegionLocations hrls1 = new RegionLocations(new HRegionLocation(hri1, sn),
121       new HRegionLocation(hri1r1, sn2), new HRegionLocation(hri1r2, sn3));
122   private static RegionLocations hrls2 = new RegionLocations(new HRegionLocation(hri2, sn2),
123       new HRegionLocation(hri2r1, sn3));
124   private static RegionLocations hrls3 = new RegionLocations(new HRegionLocation(hri3, sn3), null);
125 
126   private static final String success = "success";
127   private static Exception failure = new Exception("failure");
128 
129   private static int NB_RETRIES = 3;
130 
131   @BeforeClass
132   public static void beforeClass(){
133     conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES);
134   }
135 
136   static class CountingThreadFactory implements ThreadFactory {
137     final AtomicInteger nbThreads;
138     ThreadFactory realFactory =  Threads.newDaemonThreadFactory("test-TestAsyncProcess");
139     @Override
140     public Thread newThread(Runnable r) {
141       nbThreads.incrementAndGet();
142       return realFactory.newThread(r);
143     }
144 
145     CountingThreadFactory(AtomicInteger nbThreads){
146       this.nbThreads = nbThreads;
147     }
148   }
149 
150   static class MyAsyncProcess extends AsyncProcess {
151     final AtomicInteger nbMultiResponse = new AtomicInteger();
152     final AtomicInteger nbActions = new AtomicInteger();
153     public List<AsyncRequestFuture> allReqs = new ArrayList<AsyncRequestFuture>();
154     public AtomicInteger callsCt = new AtomicInteger();
155 
156     @Override
157     protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName,
158         List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
159         Batch.Callback<Res> callback, Object[] results, boolean needResults) {
160       // Test HTable has tableName of null, so pass DUMMY_TABLE
161       AsyncRequestFutureImpl<Res> r = super.createAsyncRequestFuture(
162           DUMMY_TABLE, actions, nonceGroup, pool, callback, results, needResults);
163       allReqs.add(r);
164       callsCt.incrementAndGet();
165       return r;
166     }
167 
168     public MyAsyncProcess(ClusterConnection hc, Configuration conf) {
169       this(hc, conf, new AtomicInteger());
170     }
171 
172     public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
173       super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
174           new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)),
175             new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf));
176     }
177 
178     public MyAsyncProcess(
179         ClusterConnection hc, Configuration conf, boolean useGlobalErrors) {
180       super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
181         new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())),
182           new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
183     }
184 
185     public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors,
186         @SuppressWarnings("unused") boolean dummy) {
187       super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
188               new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())) {
189         @Override
190         public void execute(Runnable command) {
191           throw new RejectedExecutionException("test under failure");
192         }
193       },
194           new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
195     }
196 
197     @Override
198     public <Res> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows,
199         boolean atLeastOne, Callback<Res> callback, boolean needResults)
200             throws InterruptedIOException {
201       // We use results in tests to check things, so override to always save them.
202       return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true);
203     }
204 
205     @Override
206     protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
207       callsCt.incrementAndGet();
208       final MultiResponse mr = createMultiResponse(
209           callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
210             @Override
211             public void addResponse(MultiResponse mr, byte[] regionName, Action<Row> a) {
212               if (Arrays.equals(FAILS, a.getAction().getRow())) {
213                 mr.add(regionName, a.getOriginalIndex(), failure);
214               } else {
215                 mr.add(regionName, a.getOriginalIndex(), success);
216               }
217             }
218           });
219 
220       return new RpcRetryingCaller<MultiResponse>(100, 10, 9) {
221         @Override
222         public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable,
223                                                 int callTimeout)
224         throws IOException, RuntimeException {
225           try {
226             // sleep one second in order for threadpool to start another thread instead of reusing
227             // existing one.
228             Thread.sleep(1000);
229           } catch (InterruptedException e) {
230             // ignore error
231           }
232           return mr;
233         }
234       };
235     }
236   }
237 
238   static class CallerWithFailure extends RpcRetryingCaller<MultiResponse>{
239 
240     private final IOException e;
241 
242     public CallerWithFailure(IOException e) {
243       super(100, 100, 9);
244       this.e = e;
245     }
246 
247     @Override
248     public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout)
249         throws IOException, RuntimeException {
250       throw e;
251     }
252   }
253 
254 
255   static class AsyncProcessWithFailure extends MyAsyncProcess {
256 
257     private final IOException ioe;
258 
259     public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf, IOException ioe) {
260       super(hc, conf, true);
261       this.ioe = ioe;
262       serverTrackerTimeout = 1;
263     }
264 
265     @Override
266     protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
267       callsCt.incrementAndGet();
268       return new CallerWithFailure(ioe);
269     }
270   }
271 
272   class MyAsyncProcessWithReplicas extends MyAsyncProcess {
273     private Set<byte[]> failures = new TreeSet<byte[]>(new Bytes.ByteArrayComparator());
274     private long primarySleepMs = 0, replicaSleepMs = 0;
275     private Map<ServerName, Long> customPrimarySleepMs = new HashMap<ServerName, Long>();
276     private final AtomicLong replicaCalls = new AtomicLong(0);
277 
278     public void addFailures(HRegionInfo... hris) {
279       for (HRegionInfo hri : hris) {
280         failures.add(hri.getRegionName());
281       }
282     }
283 
284     public long getReplicaCallCount() {
285       return replicaCalls.get();
286     }
287 
288     public void setPrimaryCallDelay(ServerName server, long primaryMs) {
289       customPrimarySleepMs.put(server, primaryMs);
290     }
291 
292     public MyAsyncProcessWithReplicas(ClusterConnection hc, Configuration conf) {
293       super(hc, conf);
294     }
295 
296     public void setCallDelays(long primaryMs, long replicaMs) {
297       this.primarySleepMs = primaryMs;
298       this.replicaSleepMs = replicaMs;
299     }
300 
301     @Override
302     protected RpcRetryingCaller<MultiResponse> createCaller(
303         MultiServerCallable<Row> callable) {
304       final MultiResponse mr = createMultiResponse(
305           callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
306             @Override
307             public void addResponse(MultiResponse mr, byte[] regionName, Action<Row> a) {
308               if (failures.contains(regionName)) {
309                 mr.add(regionName, a.getOriginalIndex(), failure);
310               } else {
311                 boolean isStale = !RegionReplicaUtil.isDefaultReplica(a.getReplicaId());
312                 mr.add(regionName, a.getOriginalIndex(),
313                     Result.create(new Cell[0], null, isStale));
314               }
315             }
316           });
317       // Currently AsyncProcess either sends all-replica, or all-primary request.
318       final boolean isDefault = RegionReplicaUtil.isDefaultReplica(
319           callable.getMulti().actions.values().iterator().next().iterator().next().getReplicaId());
320       final ServerName server = ((MultiServerCallable<?>)callable).getServerName();
321       String debugMsg = "Call to " + server + ", primary=" + isDefault + " with "
322           + callable.getMulti().actions.size() + " entries: ";
323       for (byte[] region : callable.getMulti().actions.keySet()) {
324         debugMsg += "[" + Bytes.toStringBinary(region) + "], ";
325       }
326       LOG.debug(debugMsg);
327       if (!isDefault) {
328         replicaCalls.incrementAndGet();
329       }
330 
331       return new RpcRetryingCaller<MultiResponse>(100, 10, 9) {
332         @Override
333         public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout)
334         throws IOException, RuntimeException {
335           long sleep = -1;
336           if (isDefault) {
337             Long customSleep = customPrimarySleepMs.get(server);
338             sleep = (customSleep == null ? primarySleepMs : customSleep.longValue());
339           } else {
340             sleep = replicaSleepMs;
341           }
342           if (sleep != 0) {
343             try {
344               Thread.sleep(sleep);
345             } catch (InterruptedException e) {
346             }
347           }
348           return mr;
349         }
350       };
351     }
352   }
353 
354   static MultiResponse createMultiResponse(final MultiAction<Row> multi,
355       AtomicInteger nbMultiResponse, AtomicInteger nbActions, ResponseGenerator gen) {
356     final MultiResponse mr = new MultiResponse();
357     nbMultiResponse.incrementAndGet();
358     for (Map.Entry<byte[], List<Action<Row>>> entry : multi.actions.entrySet()) {
359       byte[] regionName = entry.getKey();
360       for (Action<Row> a : entry.getValue()) {
361         nbActions.incrementAndGet();
362         gen.addResponse(mr, regionName, a);
363       }
364     }
365     return mr;
366   }
367 
368   private static interface ResponseGenerator {
369     void addResponse(final MultiResponse mr, byte[] regionName, Action<Row> a);
370   }
371 
372   /**
373    * Returns our async process.
374    */
375   static class MyConnectionImpl extends ConnectionManager.HConnectionImplementation {
376     final AtomicInteger nbThreads = new AtomicInteger(0);
377 
378 
379     protected MyConnectionImpl(Configuration conf) {
380       super(conf);
381     }
382 
383     @Override
384     public RegionLocations locateRegion(TableName tableName,
385         byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException {
386       return new RegionLocations(loc1);
387     }
388 
389     @Override
390     public boolean hasCellBlockSupport() {
391       return false;
392     }
393   }
394 
395   /**
396    * Returns our async process.
397    */
398   static class MyConnectionImpl2 extends MyConnectionImpl {
399     List<HRegionLocation> hrl;
400     final boolean usedRegions[];
401 
402     protected MyConnectionImpl2(List<HRegionLocation> hrl) {
403       super(conf);
404       this.hrl = hrl;
405       this.usedRegions = new boolean[hrl.size()];
406     }
407 
408     @Override
409     public RegionLocations locateRegion(TableName tableName,
410         byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException {
411       int i = 0;
412       for (HRegionLocation hr : hrl){
413         if (Arrays.equals(row, hr.getRegionInfo().getStartKey())) {
414           usedRegions[i] = true;
415           return new RegionLocations(hr);
416         }
417         i++;
418       }
419       return null;
420     }
421 
422   }
423 
424   @Rule
425   public Timeout timeout = new Timeout(10000); // 10 seconds max per method tested
426 
427   @Test
428   public void testSubmit() throws Exception {
429     ClusterConnection hc = createHConnection();
430     AsyncProcess ap = new MyAsyncProcess(hc, conf);
431 
432     List<Put> puts = new ArrayList<Put>();
433     puts.add(createPut(1, true));
434 
435     ap.submit(DUMMY_TABLE, puts, false, null, false);
436     Assert.assertTrue(puts.isEmpty());
437   }
438 
439   @Test
440   public void testSubmitWithCB() throws Exception {
441     ClusterConnection hc = createHConnection();
442     final AtomicInteger updateCalled = new AtomicInteger(0);
443     Batch.Callback<Object> cb = new Batch.Callback<Object>() {
444       @Override
445       public void update(byte[] region, byte[] row, Object result) {
446         updateCalled.incrementAndGet();
447       }
448     };
449     AsyncProcess ap = new MyAsyncProcess(hc, conf);
450 
451     List<Put> puts = new ArrayList<Put>();
452     puts.add(createPut(1, true));
453 
454     final AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, cb, false);
455     Assert.assertTrue(puts.isEmpty());
456     ars.waitUntilDone();
457     Assert.assertEquals(updateCalled.get(), 1);
458   }
459 
460   @Test
461   public void testSubmitBusyRegion() throws Exception {
462     ClusterConnection hc = createHConnection();
463     AsyncProcess ap = new MyAsyncProcess(hc, conf);
464 
465     List<Put> puts = new ArrayList<Put>();
466     puts.add(createPut(1, true));
467 
468     ap.incTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
469     ap.submit(DUMMY_TABLE, puts, false, null, false);
470     Assert.assertEquals(puts.size(), 1);
471 
472     ap.decTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
473     ap.submit(DUMMY_TABLE, puts, false, null, false);
474     Assert.assertEquals(0, puts.size());
475   }
476 
477 
478   @Test
479   public void testSubmitBusyRegionServer() throws Exception {
480     ClusterConnection hc = createHConnection();
481     AsyncProcess ap = new MyAsyncProcess(hc, conf);
482 
483     ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer));
484 
485     List<Put> puts = new ArrayList<Put>();
486     puts.add(createPut(1, true));
487     puts.add(createPut(3, true)); // <== this one won't be taken, the rs is busy
488     puts.add(createPut(1, true)); // <== this one will make it, the region is already in
489     puts.add(createPut(2, true)); // <== new region, but the rs is ok
490 
491     ap.submit(DUMMY_TABLE, puts, false, null, false);
492     Assert.assertEquals(" puts=" + puts, 1, puts.size());
493 
494     ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer - 1));
495     ap.submit(DUMMY_TABLE, puts, false, null, false);
496     Assert.assertTrue(puts.isEmpty());
497   }
498 
499   @Test
500   public void testFail() throws Exception {
501     MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
502 
503     List<Put> puts = new ArrayList<Put>();
504     Put p = createPut(1, false);
505     puts.add(p);
506 
507     AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
508     Assert.assertEquals(0, puts.size());
509     ars.waitUntilDone();
510     verifyResult(ars, false);
511     Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
512 
513     Assert.assertEquals(1, ars.getErrors().exceptions.size());
514     Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
515         failure.equals(ars.getErrors().exceptions.get(0)));
516     Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
517         failure.equals(ars.getErrors().exceptions.get(0)));
518 
519     Assert.assertEquals(1, ars.getFailedOperations().size());
520     Assert.assertTrue("was: " + ars.getFailedOperations().get(0),
521         p.equals(ars.getFailedOperations().get(0)));
522   }
523 
524 
525   @Test
526   public void testSubmitTrue() throws IOException {
527     final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
528     ap.tasksInProgress.incrementAndGet();
529     final AtomicInteger ai = new AtomicInteger(1);
530     ap.taskCounterPerRegion.put(hri1.getRegionName(), ai);
531 
532     final AtomicBoolean checkPoint = new AtomicBoolean(false);
533     final AtomicBoolean checkPoint2 = new AtomicBoolean(false);
534 
535     Thread t = new Thread(){
536       @Override
537       public void run(){
538         Threads.sleep(1000);
539         Assert.assertFalse(checkPoint.get()); // TODO: this is timing-dependent
540         ai.decrementAndGet();
541         ap.tasksInProgress.decrementAndGet();
542         checkPoint2.set(true);
543       }
544     };
545 
546     List<Put> puts = new ArrayList<Put>();
547     Put p = createPut(1, true);
548     puts.add(p);
549 
550     ap.submit(DUMMY_TABLE, puts, false, null, false);
551     Assert.assertFalse(puts.isEmpty());
552 
553     t.start();
554 
555     ap.submit(DUMMY_TABLE, puts, true, null, false);
556     Assert.assertTrue(puts.isEmpty());
557 
558     checkPoint.set(true);
559     while (!checkPoint2.get()){
560       Threads.sleep(1);
561     }
562   }
563 
564   @Test
565   public void testFailAndSuccess() throws Exception {
566     MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
567 
568     List<Put> puts = new ArrayList<Put>();
569     puts.add(createPut(1, false));
570     puts.add(createPut(1, true));
571     puts.add(createPut(1, true));
572 
573     AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
574     Assert.assertTrue(puts.isEmpty());
575     ars.waitUntilDone();
576     verifyResult(ars, false, true, true);
577     Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
578     ap.callsCt.set(0);
579     Assert.assertEquals(1, ars.getErrors().actions.size());
580 
581     puts.add(createPut(1, true));
582     // Wait for AP to be free. While ars might have the result, ap counters are decreased later.
583     ap.waitUntilDone();
584     ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
585     Assert.assertEquals(0, puts.size());
586     ars.waitUntilDone();
587     Assert.assertEquals(2, ap.callsCt.get());
588     verifyResult(ars, true);
589   }
590 
591   @Test
592   public void testFlush() throws Exception {
593     MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
594 
595     List<Put> puts = new ArrayList<Put>();
596     puts.add(createPut(1, false));
597     puts.add(createPut(1, true));
598     puts.add(createPut(1, true));
599 
600     AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
601     ars.waitUntilDone();
602     verifyResult(ars, false, true, true);
603     Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
604 
605     Assert.assertEquals(1, ars.getFailedOperations().size());
606   }
607 
608   @Test
609   public void testMaxTask() throws Exception {
610     final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
611 
612     for (int i = 0; i < 1000; i++) {
613       ap.incTaskCounters(Arrays.asList("dummy".getBytes()), sn);
614     }
615 
616     final Thread myThread = Thread.currentThread();
617 
618     Thread t = new Thread() {
619       @Override
620       public void run() {
621         Threads.sleep(2000);
622         myThread.interrupt();
623       }
624     };
625 
626     List<Put> puts = new ArrayList<Put>();
627     puts.add(createPut(1, true));
628 
629     t.start();
630 
631     try {
632       ap.submit(DUMMY_TABLE, puts, false, null, false);
633       Assert.fail("We should have been interrupted.");
634     } catch (InterruptedIOException expected) {
635     }
636 
637     final long sleepTime = 2000;
638 
639     Thread t2 = new Thread() {
640       @Override
641       public void run() {
642         Threads.sleep(sleepTime);
643         while (ap.tasksInProgress.get() > 0) {
644           ap.decTaskCounters(Arrays.asList("dummy".getBytes()), sn);
645         }
646       }
647     };
648     t2.start();
649 
650     long start = System.currentTimeMillis();
651     ap.submit(DUMMY_TABLE, new ArrayList<Row>(), false, null, false);
652     long end = System.currentTimeMillis();
653 
654     //Adds 100 to secure us against approximate timing.
655     Assert.assertTrue(start + 100L + sleepTime > end);
656   }
657 
658   private static ClusterConnection createHConnection() throws IOException {
659     ClusterConnection hc = createHConnectionCommon();
660     setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1));
661     setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2));
662     setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3));
663     setMockLocation(hc, FAILS, new RegionLocations(loc2));
664     return hc;
665   }
666 
667   private static ClusterConnection createHConnectionWithReplicas() throws IOException {
668     ClusterConnection hc = createHConnectionCommon();
669     setMockLocation(hc, DUMMY_BYTES_1, hrls1);
670     setMockLocation(hc, DUMMY_BYTES_2, hrls2);
671     setMockLocation(hc, DUMMY_BYTES_3, hrls3);
672     return hc;
673   }
674 
675   private static void setMockLocation(ClusterConnection hc, byte[] row,
676       RegionLocations result) throws IOException {
677     Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row),
678         Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result);
679   }
680 
681   private static ClusterConnection createHConnectionCommon() {
682     ClusterConnection hc = Mockito.mock(ClusterConnection.class);
683     NonceGenerator ng = Mockito.mock(NonceGenerator.class);
684     Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
685     Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
686     Mockito.when(hc.getConfiguration()).thenReturn(conf);
687     return hc;
688   }
689 
690   @Test
691   public void testHTablePutSuccess() throws Exception {
692     BufferedMutatorImpl ht = Mockito.mock(BufferedMutatorImpl.class);
693     ht.ap = new MyAsyncProcess(createHConnection(), conf, true);
694 
695     Put put = createPut(1, true);
696 
697     Assert.assertEquals(0, ht.getWriteBufferSize());
698     ht.mutate(put);
699     Assert.assertEquals(0, ht.getWriteBufferSize());
700   }
701 
702   private void doHTableFailedPut(boolean bufferOn) throws Exception {
703     ClusterConnection conn = createHConnection();
704     HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE));
705     MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
706     ht.mutator.ap = ap;
707     if (bufferOn) {
708       ht.setWriteBufferSize(1024L * 1024L);
709     } else {
710       ht.setWriteBufferSize(0L);
711     }
712 
713     Put put = createPut(1, false);
714 
715     Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize);
716     try {
717       ht.put(put);
718       if (bufferOn) {
719         ht.flushCommits();
720       }
721       Assert.fail();
722     } catch (RetriesExhaustedException expected) {
723     }
724     Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize);
725     // The table should have sent one request, maybe after multiple attempts
726     AsyncRequestFuture ars = null;
727     for (AsyncRequestFuture someReqs : ap.allReqs) {
728       if (someReqs.getResults().length == 0) continue;
729       Assert.assertTrue(ars == null);
730       ars = someReqs;
731     }
732     Assert.assertTrue(ars != null);
733     verifyResult(ars, false);
734 
735     // This should not raise any exception, puts have been 'received' before by the catch.
736     ht.close();
737   }
738 
739   @Test
740   public void testHTableFailedPutWithBuffer() throws Exception {
741     doHTableFailedPut(true);
742   }
743 
744   @Test
745   public void testHTableFailedPutWithoutBuffer() throws Exception {
746     doHTableFailedPut(false);
747   }
748 
749   @Test
750   public void testHTableFailedPutAndNewPut() throws Exception {
751     ClusterConnection conn = createHConnection();
752     BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null,
753         new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(0));
754     MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
755     mutator.ap = ap;
756 
757     Put p = createPut(1, false);
758     mutator.mutate(p);
759 
760     ap.waitUntilDone(); // Let's do all the retries.
761 
762     // We're testing that we're behaving as we were behaving in 0.94: sending exceptions in the
763     //  doPut if it fails.
764     // This said, it's not a very easy going behavior. For example, when we insert a list of
765     //  puts, we may raise an exception in the middle of the list. It's then up to the caller to
766     //  manage what was inserted, what was tried but failed, and what was not even tried.
767     p = createPut(1, true);
768     Assert.assertEquals(0, mutator.getWriteBuffer().size());
769     try {
770       mutator.mutate(p);
771       Assert.fail();
772     } catch (RetriesExhaustedException expected) {
773     }
774     Assert.assertEquals("the put should not been inserted.", 0, mutator.getWriteBuffer().size());
775   }
776 
777 
778 /*
779   @Test
780   public void testWithNoClearOnFail() throws IOException {
781     HTable ht = new HTable();
782     ht.ap = new MyAsyncProcess(createHConnection(), conf, true);
783     ht.setAutoFlushTo(false);
784 
785     Put p = createPut(1, false);
786     ht.put(p);
787     Assert.assertEquals(0, ht.writeAsyncBuffer.size());
788 
789     try {
790       ht.flushCommits();
791     } catch (RetriesExhaustedWithDetailsException expected) {
792     }
793     Assert.assertEquals(1, ht.writeAsyncBuffer.size());
794 
795     try {
796       ht.close();
797     } catch (RetriesExhaustedWithDetailsException expected) {
798     }
799     Assert.assertEquals(1, ht.writeAsyncBuffer.size());
800   }
801   */
802 
803   @Test
804   public void testBatch() throws IOException, InterruptedException {
805     ClusterConnection conn = new MyConnectionImpl(conf);
806     HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE));
807     ht.multiAp = new MyAsyncProcess(conn, conf, false);
808 
809     List<Put> puts = new ArrayList<Put>();
810     puts.add(createPut(1, true));
811     puts.add(createPut(1, true));
812     puts.add(createPut(1, true));
813     puts.add(createPut(1, true));
814     puts.add(createPut(1, false)); // <=== the bad apple, position 4
815     puts.add(createPut(1, true));
816     puts.add(createPut(1, false)); // <=== another bad apple, position 6
817 
818     Object[] res = new Object[puts.size()];
819     try {
820       ht.processBatch(puts, res);
821       Assert.fail();
822     } catch (RetriesExhaustedException expected) {
823     }
824 
825     Assert.assertEquals(res[0], success);
826     Assert.assertEquals(res[1], success);
827     Assert.assertEquals(res[2], success);
828     Assert.assertEquals(res[3], success);
829     Assert.assertEquals(res[4], failure);
830     Assert.assertEquals(res[5], success);
831     Assert.assertEquals(res[6], failure);
832   }
833 
834   @Test
835   public void testErrorsServers() throws IOException {
836     Configuration configuration = new Configuration(conf);
837     ClusterConnection conn = new MyConnectionImpl(configuration);
838     BufferedMutatorImpl mutator =
839         new BufferedMutatorImpl(conn, null, null, new BufferedMutatorParams(DUMMY_TABLE));
840     configuration.setBoolean(ConnectionManager.RETRIES_BY_SERVER_KEY, true);
841 
842     MyAsyncProcess ap = new MyAsyncProcess(conn, configuration, true);
843     mutator.ap = ap;
844 
845     Assert.assertNotNull(mutator.ap.createServerErrorTracker());
846     Assert.assertTrue(mutator.ap.serverTrackerTimeout > 200);
847     mutator.ap.serverTrackerTimeout = 1;
848 
849     Put p = createPut(1, false);
850     mutator.mutate(p);
851 
852     try {
853       mutator.flush();
854       Assert.fail();
855     } catch (RetriesExhaustedWithDetailsException expected) {
856     }
857     // Checking that the ErrorsServers came into play and didn't make us stop immediately
858     Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
859   }
860 
861   @Test
862   public void testGlobalErrors() throws IOException {
863     ClusterConnection conn = new MyConnectionImpl(conf);
864     BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE);
865     AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new IOException("test"));
866     mutator.ap = ap;
867 
868     Assert.assertNotNull(mutator.ap.createServerErrorTracker());
869 
870     Put p = createPut(1, true);
871     mutator.mutate(p);
872 
873     try {
874       mutator.flush();
875       Assert.fail();
876     } catch (RetriesExhaustedWithDetailsException expected) {
877     }
878     // Checking that the ErrorsServers came into play and didn't make us stop immediately
879     Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
880   }
881 
882 
883   @Test
884   public void testCallQueueTooLarge() throws IOException {
885     ClusterConnection conn = new MyConnectionImpl(conf);
886     BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE);
887     AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new CallQueueTooBigException());
888     mutator.ap = ap;
889 
890     Assert.assertNotNull(mutator.ap.createServerErrorTracker());
891 
892     Put p = createPut(1, true);
893     mutator.mutate(p);
894 
895     try {
896       mutator.flush();
897       Assert.fail();
898     } catch (RetriesExhaustedWithDetailsException expected) {
899     }
900     // Checking that the ErrorsServers came into play and didn't make us stop immediately
901     Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
902   }
903   /**
904    * This test simulates multiple regions on 2 servers. We should have 2 multi requests and
905    *  2 threads: 1 per server, this whatever the number of regions.
906    */
907   @Test
908   public void testThreadCreation() throws Exception {
909     final int NB_REGS = 100;
910     List<HRegionLocation> hrls = new ArrayList<HRegionLocation>(NB_REGS);
911     List<Get> gets = new ArrayList<Get>(NB_REGS);
912     for (int i = 0; i < NB_REGS; i++) {
913       HRegionInfo hri = new HRegionInfo(
914           DUMMY_TABLE, Bytes.toBytes(i * 10L), Bytes.toBytes(i * 10L + 9L), false, i);
915       HRegionLocation hrl = new HRegionLocation(hri, i % 2 == 0 ? sn : sn2);
916       hrls.add(hrl);
917 
918       Get get = new Get(Bytes.toBytes(i * 10L));
919       gets.add(get);
920     }
921 
922     MyConnectionImpl2 con = new MyConnectionImpl2(hrls);
923     HTable ht = new HTable(con, new BufferedMutatorParams(DUMMY_TABLE));
924     MyAsyncProcess ap = new MyAsyncProcess(con, conf, con.nbThreads);
925     ht.multiAp = ap;
926 
927     ht.batch(gets, new Object[gets.size()]);
928 
929     Assert.assertEquals(ap.nbActions.get(), NB_REGS);
930     Assert.assertEquals("1 multi response per server", 2, ap.nbMultiResponse.get());
931     Assert.assertEquals("1 thread per server", 2, con.nbThreads.get());
932 
933     int nbReg = 0;
934     for (int i =0; i<NB_REGS; i++){
935       if (con.usedRegions[i]) nbReg++;
936     }
937     Assert.assertEquals("nbReg=" + nbReg, nbReg, NB_REGS);
938   }
939 
940   @Test
941   public void testReplicaReplicaSuccess() throws Exception {
942     // Main call takes too long so replicas succeed, except for one region w/o replicas.
943     // One region has no replica, so the main call succeeds for it.
944     MyAsyncProcessWithReplicas ap = createReplicaAp(10, 1000, 0);
945     List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
946     AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[3]);
947     verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE);
948     Assert.assertEquals(2, ap.getReplicaCallCount());
949   }
950 
951   @Test
952   public void testReplicaPrimarySuccessWoReplicaCalls() throws Exception {
953     // Main call succeeds before replica calls are kicked off.
954     MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 10, 0);
955     List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
956     AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[3]);
957     verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE);
958     Assert.assertEquals(0, ap.getReplicaCallCount());
959   }
960 
961   @Test
962   public void testReplicaParallelCallsSucceed() throws Exception {
963     // Either main or replica can succeed.
964     MyAsyncProcessWithReplicas ap = createReplicaAp(0, 0, 0);
965     List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
966     AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
967     verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE);
968     long replicaCalls = ap.getReplicaCallCount();
969     Assert.assertTrue(replicaCalls >= 0);
970     Assert.assertTrue(replicaCalls <= 2);
971   }
972 
973   @Test
974   public void testReplicaPartialReplicaCall() throws Exception {
975     // One server is slow, so the result for its region comes from replica, whereas
976     // the result for other region comes from primary before replica calls happen.
977     // There should be no replica call for that region at all.
978     MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0);
979     ap.setPrimaryCallDelay(sn2, 2000);
980     List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
981     AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
982     verifyReplicaResult(ars, RR.FALSE, RR.TRUE);
983     Assert.assertEquals(1, ap.getReplicaCallCount());
984   }
985 
986   @Test
987   public void testReplicaMainFailsBeforeReplicaCalls() throws Exception {
988     // Main calls fail before replica calls can start - this is currently not handled.
989     // It would probably never happen if we can get location (due to retries),
990     // and it would require additional synchronization.
991     MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 1);
992     ap.addFailures(hri1, hri2);
993     List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
994     AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
995     verifyReplicaResult(ars, RR.FAILED, RR.FAILED);
996     Assert.assertEquals(0, ap.getReplicaCallCount());
997   }
998 
999   @Test
1000   public void testReplicaReplicaSuccessWithParallelFailures() throws Exception {
1001     // Main calls fails after replica calls start. For two-replica region, one replica call
1002     // also fails. Regardless, we get replica results for both regions.
1003     MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 1);
1004     ap.addFailures(hri1, hri1r2, hri2);
1005     List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
1006     AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
1007     verifyReplicaResult(ars, RR.TRUE, RR.TRUE);
1008     Assert.assertEquals(2, ap.getReplicaCallCount());
1009   }
1010 
1011   @Test
1012   public void testReplicaAllCallsFailForOneRegion() throws Exception {
1013     // For one of the region, all 3, main and replica, calls fail. For the other, replica
1014     // call fails but its exception should not be visible as it did succeed.
1015     MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 1);
1016     ap.addFailures(hri1, hri1r1, hri1r2, hri2r1);
1017     List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
1018     AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
1019     verifyReplicaResult(ars, RR.FAILED, RR.FALSE);
1020     // We should get 3 exceptions, for main + 2 replicas for DUMMY_BYTES_1
1021     Assert.assertEquals(3, ars.getErrors().getNumExceptions());
1022     for (int i = 0; i < ars.getErrors().getNumExceptions(); ++i) {
1023       Assert.assertArrayEquals(DUMMY_BYTES_1, ars.getErrors().getRow(i).getRow());
1024     }
1025   }
1026 
1027   private MyAsyncProcessWithReplicas createReplicaAp(
1028       int replicaAfterMs, int primaryMs, int replicaMs) throws Exception {
1029     return createReplicaAp(replicaAfterMs, primaryMs, replicaMs, -1);
1030   }
1031 
1032   private MyAsyncProcessWithReplicas createReplicaAp(
1033       int replicaAfterMs, int primaryMs, int replicaMs, int retries) throws Exception {
1034     // TODO: this is kind of timing dependent... perhaps it should detect from createCaller
1035     //       that the replica call has happened and that way control the ordering.
1036     Configuration conf = new Configuration();
1037     ClusterConnection conn = createHConnectionWithReplicas();
1038     conf.setInt(AsyncProcess.PRIMARY_CALL_TIMEOUT_KEY, replicaAfterMs * 1000);
1039     if (retries > 0) {
1040       conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
1041     }
1042     MyAsyncProcessWithReplicas ap = new MyAsyncProcessWithReplicas(conn, conf);
1043     ap.setCallDelays(primaryMs, replicaMs);
1044     return ap;
1045   }
1046 
1047   private static List<Get> makeTimelineGets(byte[]... rows) {
1048     List<Get> result = new ArrayList<Get>();
1049     for (byte[] row : rows) {
1050       Get get = new Get(row);
1051       get.setConsistency(Consistency.TIMELINE);
1052       result.add(get);
1053     }
1054     return result;
1055   }
1056 
1057   private void verifyResult(AsyncRequestFuture ars, boolean... expected) throws Exception {
1058     Object[] actual = ars.getResults();
1059     Assert.assertEquals(expected.length, actual.length);
1060     for (int i = 0; i < expected.length; ++i) {
1061       Assert.assertEquals(expected[i], !(actual[i] instanceof Throwable));
1062     }
1063   }
1064 
1065   /** After reading TheDailyWtf, I always wanted to create a MyBoolean enum like this! */
1066   private enum RR {
1067     TRUE,
1068     FALSE,
1069     DONT_CARE,
1070     FAILED
1071   }
1072 
1073   private void verifyReplicaResult(AsyncRequestFuture ars, RR... expecteds) throws Exception {
1074     Object[] actuals = ars.getResults();
1075     Assert.assertEquals(expecteds.length, actuals.length);
1076     for (int i = 0; i < expecteds.length; ++i) {
1077       Object actual = actuals[i];
1078       RR expected = expecteds[i];
1079       Assert.assertEquals(actual.toString(), expected == RR.FAILED, actual instanceof Throwable);
1080       if (expected != RR.FAILED && expected != RR.DONT_CARE) {
1081         Assert.assertEquals(expected == RR.TRUE, ((Result)actual).isStale());
1082       }
1083     }
1084   }
1085 
1086   /**
1087    * @param regCnt  the region: 1 to 3.
1088    * @param success if true, the put will succeed.
1089    * @return a put
1090    */
1091   private Put createPut(int regCnt, boolean success) {
1092     Put p;
1093     if (!success) {
1094       p = new Put(FAILS);
1095     } else switch (regCnt){
1096       case 1 :
1097         p = new Put(DUMMY_BYTES_1);
1098         break;
1099       case 2:
1100         p = new Put(DUMMY_BYTES_2);
1101         break;
1102       case 3:
1103         p = new Put(DUMMY_BYTES_3);
1104         break;
1105       default:
1106         throw new IllegalArgumentException("unknown " + regCnt);
1107     }
1108 
1109     p.add(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
1110 
1111     return p;
1112   }
1113 
1114   @Test
1115   public void testWaitForMaximumCurrentTasks() throws Exception {
1116     final AtomicLong tasks = new AtomicLong(0);
1117     final AtomicInteger max = new AtomicInteger(0);
1118     final CyclicBarrier barrier = new CyclicBarrier(2);
1119     final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf);
1120     Runnable runnable = new Runnable() {
1121       @Override
1122       public void run() {
1123         try {
1124           barrier.await();
1125           ap.waitForMaximumCurrentTasks(max.get(), tasks, 1, null);
1126         } catch (InterruptedIOException e) {
1127           Assert.fail(e.getMessage());
1128         } catch (InterruptedException e) {
1129           // TODO Auto-generated catch block
1130           e.printStackTrace();
1131         } catch (BrokenBarrierException e) {
1132           // TODO Auto-generated catch block
1133           e.printStackTrace();
1134         }
1135       }
1136     };
1137     // First test that our runnable thread only exits when tasks is zero.
1138     Thread t = new Thread(runnable);
1139     t.start();
1140     barrier.await();
1141     t.join();
1142     // Now assert we stay running if max == zero and tasks is > 0.
1143     barrier.reset();
1144     tasks.set(1000000);
1145     t = new Thread(runnable);
1146     t.start();
1147     barrier.await();
1148     while (tasks.get() > 0) {
1149       assertTrue(t.isAlive());
1150       tasks.set(tasks.get() - 1);
1151     }
1152     t.join();
1153   }
1154 }