1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.Random;
28 import java.util.Set;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.concurrent.atomic.AtomicLong;
34 import java.util.concurrent.atomic.AtomicReference;
35
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.commons.logging.impl.Log4JLogger;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.hbase.Cell;
41 import org.apache.hadoop.hbase.HBaseTestingUtility;
42 import org.apache.hadoop.hbase.HConstants;
43 import org.apache.hadoop.hbase.HRegionInfo;
44 import org.apache.hadoop.hbase.HTableDescriptor;
45 import org.apache.hadoop.hbase.KeyValue;
46 import org.apache.hadoop.hbase.NotServingRegionException;
47 import org.apache.hadoop.hbase.RegionLocations;
48 import org.apache.hadoop.hbase.TableNotFoundException;
49 import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
50 import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFutureImpl;
51 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
52 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
53 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
54 import org.apache.hadoop.hbase.protobuf.RequestConverter;
55 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
56 import org.apache.hadoop.hbase.regionserver.HRegionServer;
57 import org.apache.hadoop.hbase.regionserver.InternalScanner;
58 import org.apache.hadoop.hbase.regionserver.RegionScanner;
59 import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
60 import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
61 import org.apache.hadoop.hbase.testclassification.MediumTests;
62 import org.apache.hadoop.hbase.util.Bytes;
63 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
64 import org.apache.log4j.Level;
65 import org.apache.zookeeper.KeeperException;
66 import org.junit.After;
67 import org.junit.AfterClass;
68 import org.junit.Assert;
69 import org.junit.Before;
70 import org.junit.BeforeClass;
71 import org.junit.Test;
72 import org.junit.experimental.categories.Category;
73
74
75
76
77
78 @Category(MediumTests.class)
79 public class TestReplicasClient {
80 private static final Log LOG = LogFactory.getLog(TestReplicasClient.class);
81
82 static {
83 ((Log4JLogger)RpcRetryingCaller.LOG).getLogger().setLevel(Level.ALL);
84 }
85
86 private static final int NB_SERVERS = 1;
87 private static HTable table = null;
88 private static final byte[] row = TestReplicasClient.class.getName().getBytes();
89
90 private static HRegionInfo hriPrimary;
91 private static HRegionInfo hriSecondary;
92
93 private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
94 private static final byte[] f = HConstants.CATALOG_FAMILY;
95
96 private final static int REFRESH_PERIOD = 1000;
97
98
99
100
101 public static class SlowMeCopro extends BaseRegionObserver {
102 static final AtomicLong sleepTime = new AtomicLong(0);
103 static final AtomicBoolean slowDownNext = new AtomicBoolean(false);
104 static final AtomicInteger countOfNext = new AtomicInteger(0);
105 private static final AtomicReference<CountDownLatch> cdl =
106 new AtomicReference<CountDownLatch>(new CountDownLatch(0));
107 Random r = new Random();
108 public SlowMeCopro() {
109 }
110
111 @Override
112 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
113 final Get get, final List<Cell> results) throws IOException {
114 slowdownCode(e);
115 }
116
117 @Override
118 public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
119 final Scan scan, final RegionScanner s) throws IOException {
120 slowdownCode(e);
121 return s;
122 }
123
124 @Override
125 public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e,
126 final InternalScanner s, final List<Result> results,
127 final int limit, final boolean hasMore) throws IOException {
128
129
130 if (slowDownNext.get()) {
131
132 if (countOfNext.incrementAndGet() == 2) {
133 sleepTime.set(2000);
134 slowdownCode(e);
135 }
136 }
137 return true;
138 }
139
140 private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) {
141 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
142 CountDownLatch latch = getCdl().get();
143 try {
144 if (sleepTime.get() > 0) {
145 LOG.info("Sleeping for " + sleepTime.get() + " ms");
146 Thread.sleep(sleepTime.get());
147 } else if (latch.getCount() > 0) {
148 LOG.info("Waiting for the counterCountDownLatch");
149 latch.await(2, TimeUnit.MINUTES);
150 if (latch.getCount() > 0) {
151 throw new RuntimeException("Can't wait more");
152 }
153 }
154 } catch (InterruptedException e1) {
155 LOG.error(e1);
156 }
157 } else {
158 LOG.info("We're not the primary replicas.");
159 }
160 }
161
162 public static AtomicReference<CountDownLatch> getCdl() {
163 return cdl;
164 }
165 }
166
167 @BeforeClass
168 public static void beforeClass() throws Exception {
169
170 HTU.getConfiguration().setInt(
171 StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, REFRESH_PERIOD);
172 HTU.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
173 ConnectionUtils.setupMasterlessConnection(HTU.getConfiguration());
174 HTU.startMiniCluster(NB_SERVERS);
175
176
177 HTableDescriptor hdt = HTU.createTableDescriptor(TestReplicasClient.class.getSimpleName());
178 hdt.addCoprocessor(SlowMeCopro.class.getName());
179 table = HTU.createTable(hdt, new byte[][]{f}, HTU.getConfiguration());
180
181 hriPrimary = table.getRegionLocation(row, false).getRegionInfo();
182
183
184 hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(),
185 hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1);
186
187
188 LOG.info("Master is going to be stopped");
189 TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
190 Configuration c = new Configuration(HTU.getConfiguration());
191 c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
192 LOG.info("Master has stopped");
193 }
194
195 @AfterClass
196 public static void afterClass() throws Exception {
197 if (table != null) table.close();
198 HTU.shutdownMiniCluster();
199 }
200
201 @Before
202 public void before() throws IOException {
203 HTU.getHBaseAdmin().getConnection().clearRegionCache();
204 try {
205 openRegion(hriPrimary);
206 } catch (Exception ignored) {
207 }
208 try {
209 openRegion(hriSecondary);
210 } catch (Exception ignored) {
211 }
212 }
213
214 @After
215 public void after() throws IOException, KeeperException {
216 try {
217 closeRegion(hriSecondary);
218 } catch (Exception ignored) {
219 }
220 try {
221 closeRegion(hriPrimary);
222 } catch (Exception ignored) {
223 }
224 ZKAssign.deleteNodeFailSilent(HTU.getZooKeeperWatcher(), hriPrimary);
225 ZKAssign.deleteNodeFailSilent(HTU.getZooKeeperWatcher(), hriSecondary);
226
227 HTU.getHBaseAdmin().getConnection().clearRegionCache();
228 }
229
230 private HRegionServer getRS() {
231 return HTU.getMiniHBaseCluster().getRegionServer(0);
232 }
233
234 private void openRegion(HRegionInfo hri) throws Exception {
235 try {
236 if (isRegionOpened(hri)) return;
237 } catch (Exception e){}
238 ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
239
240 AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(
241 getRS().getServerName(), hri, 0, null, null);
242 AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr);
243 Assert.assertEquals(responseOpen.getOpeningStateCount(), 1);
244 Assert.assertEquals(responseOpen.getOpeningState(0),
245 AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED);
246 checkRegionIsOpened(hri);
247 }
248
249 private void closeRegion(HRegionInfo hri) throws Exception {
250 ZKAssign.createNodeClosing(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
251
252 AdminProtos.CloseRegionRequest crr = RequestConverter.buildCloseRegionRequest(
253 getRS().getServerName(), hri.getEncodedName(), true);
254 AdminProtos.CloseRegionResponse responseClose = getRS()
255 .getRSRpcServices().closeRegion(null, crr);
256 Assert.assertTrue(responseClose.getClosed());
257
258 checkRegionIsClosed(hri.getEncodedName());
259
260 ZKAssign.deleteClosedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName(), null);
261 }
262
263 private void checkRegionIsOpened(HRegionInfo hri) throws Exception {
264
265 while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
266 Thread.sleep(1);
267 }
268
269 Assert.assertTrue(
270 ZKAssign.deleteOpenedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName(), null));
271 }
272
273 private boolean isRegionOpened(HRegionInfo hri) throws Exception {
274 return getRS().getRegionByEncodedName(hri.getEncodedName()).isAvailable();
275 }
276
277 private void checkRegionIsClosed(String encodedRegionName) throws Exception {
278
279 while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
280 Thread.sleep(1);
281 }
282
283 try {
284 Assert.assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable());
285 } catch (NotServingRegionException expected) {
286
287 }
288
289
290 }
291
292 private void flushRegion(HRegionInfo regionInfo) throws IOException {
293 TestRegionServerNoMaster.flushRegion(HTU, regionInfo);
294 }
295
296 @Test
297 public void testUseRegionWithoutReplica() throws Exception {
298 byte[] b1 = "testUseRegionWithoutReplica".getBytes();
299 openRegion(hriSecondary);
300 SlowMeCopro.getCdl().set(new CountDownLatch(0));
301 try {
302 Get g = new Get(b1);
303 Result r = table.get(g);
304 Assert.assertFalse(r.isStale());
305 } finally {
306 closeRegion(hriSecondary);
307 }
308 }
309
310 @Test
311 public void testLocations() throws Exception {
312 byte[] b1 = "testLocations".getBytes();
313 openRegion(hriSecondary);
314 ClusterConnection hc = (ClusterConnection) HTU.getHBaseAdmin().getConnection();
315
316 try {
317 hc.clearRegionCache();
318 RegionLocations rl = hc.locateRegion(table.getName(), b1, false, false);
319 Assert.assertEquals(2, rl.size());
320
321 rl = hc.locateRegion(table.getName(), b1, true, false);
322 Assert.assertEquals(2, rl.size());
323
324 hc.clearRegionCache();
325 rl = hc.locateRegion(table.getName(), b1, true, false);
326 Assert.assertEquals(2, rl.size());
327
328 rl = hc.locateRegion(table.getName(), b1, false, false);
329 Assert.assertEquals(2, rl.size());
330 } finally {
331 closeRegion(hriSecondary);
332 }
333 }
334
335 @Test
336 public void testGetNoResultNoStaleRegionWithReplica() throws Exception {
337 byte[] b1 = "testGetNoResultNoStaleRegionWithReplica".getBytes();
338 openRegion(hriSecondary);
339
340 try {
341
342 Get g = new Get(b1);
343 Result r = table.get(g);
344 Assert.assertFalse(r.isStale());
345 } finally {
346 closeRegion(hriSecondary);
347 }
348 }
349
350
351 @Test
352 public void testGetNoResultStaleRegionWithReplica() throws Exception {
353 byte[] b1 = "testGetNoResultStaleRegionWithReplica".getBytes();
354 openRegion(hriSecondary);
355
356 SlowMeCopro.getCdl().set(new CountDownLatch(1));
357 try {
358 Get g = new Get(b1);
359 g.setConsistency(Consistency.TIMELINE);
360 Result r = table.get(g);
361 Assert.assertTrue(r.isStale());
362 } finally {
363 SlowMeCopro.getCdl().get().countDown();
364 closeRegion(hriSecondary);
365 }
366 }
367
368 @Test
369 public void testGetNoResultNotStaleSleepRegionWithReplica() throws Exception {
370 byte[] b1 = "testGetNoResultNotStaleSleepRegionWithReplica".getBytes();
371 openRegion(hriSecondary);
372
373 try {
374
375 SlowMeCopro.sleepTime.set(2000);
376 Get g = new Get(b1);
377 Result r = table.get(g);
378 Assert.assertFalse(r.isStale());
379
380 } finally {
381 SlowMeCopro.sleepTime.set(0);
382 closeRegion(hriSecondary);
383 }
384 }
385
386
387 @Test
388 public void testFlushTable() throws Exception {
389 openRegion(hriSecondary);
390 try {
391 flushRegion(hriPrimary);
392 flushRegion(hriSecondary);
393
394 Put p = new Put(row);
395 p.add(f, row, row);
396 table.put(p);
397
398 flushRegion(hriPrimary);
399 flushRegion(hriSecondary);
400 } finally {
401 Delete d = new Delete(row);
402 table.delete(d);
403 closeRegion(hriSecondary);
404 }
405 }
406
407 @Test
408 public void testFlushPrimary() throws Exception {
409 openRegion(hriSecondary);
410
411 try {
412 flushRegion(hriPrimary);
413
414 Put p = new Put(row);
415 p.add(f, row, row);
416 table.put(p);
417
418 flushRegion(hriPrimary);
419 } finally {
420 Delete d = new Delete(row);
421 table.delete(d);
422 closeRegion(hriSecondary);
423 }
424 }
425
426 @Test
427 public void testFlushSecondary() throws Exception {
428 openRegion(hriSecondary);
429 try {
430 flushRegion(hriSecondary);
431
432 Put p = new Put(row);
433 p.add(f, row, row);
434 table.put(p);
435
436 flushRegion(hriSecondary);
437 } catch (TableNotFoundException expected) {
438 } finally {
439 Delete d = new Delete(row);
440 table.delete(d);
441 closeRegion(hriSecondary);
442 }
443 }
444
445 @Test
446 public void testUseRegionWithReplica() throws Exception {
447 byte[] b1 = "testUseRegionWithReplica".getBytes();
448 openRegion(hriSecondary);
449
450 try {
451
452 Put p = new Put(b1);
453 p.add(f, b1, b1);
454 table.put(p);
455 LOG.info("Put done");
456
457
458 Get g = new Get(b1);
459 Result r = table.get(g);
460 Assert.assertFalse(r.isStale());
461 Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
462 LOG.info("get works and is not stale done");
463
464
465 SlowMeCopro.sleepTime.set(2000);
466 g = new Get(b1);
467 r = table.get(g);
468 Assert.assertFalse(r.isStale());
469 Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
470 SlowMeCopro.sleepTime.set(0);
471 LOG.info("sleep and is not stale done");
472
473
474 SlowMeCopro.getCdl().set(new CountDownLatch(1));
475 g = new Get(b1);
476 g.setConsistency(Consistency.TIMELINE);
477 r = table.get(g);
478 Assert.assertTrue(r.isStale());
479 Assert.assertTrue(r.getColumnCells(f, b1).isEmpty());
480 SlowMeCopro.getCdl().get().countDown();
481
482 LOG.info("stale done");
483
484
485 g = new Get(b1);
486 g.setCheckExistenceOnly(true);
487 r = table.get(g);
488 Assert.assertFalse(r.isStale());
489 Assert.assertTrue(r.getExists());
490 LOG.info("exists not stale done");
491
492
493 SlowMeCopro.getCdl().set(new CountDownLatch(1));
494 g = new Get(b1);
495 g.setCheckExistenceOnly(true);
496 g.setConsistency(Consistency.TIMELINE);
497 r = table.get(g);
498 Assert.assertTrue(r.isStale());
499 Assert.assertFalse("The secondary has stale data", r.getExists());
500 SlowMeCopro.getCdl().get().countDown();
501 LOG.info("exists stale before flush done");
502
503 flushRegion(hriPrimary);
504 flushRegion(hriSecondary);
505 LOG.info("flush done");
506 Thread.sleep(1000 + REFRESH_PERIOD * 2);
507
508
509 SlowMeCopro.getCdl().set(new CountDownLatch(1));
510 g = new Get(b1);
511 g.setConsistency(Consistency.TIMELINE);
512 r = table.get(g);
513 Assert.assertTrue(r.isStale());
514 Assert.assertFalse(r.isEmpty());
515 SlowMeCopro.getCdl().get().countDown();
516 LOG.info("stale done");
517
518
519 SlowMeCopro.getCdl().set(new CountDownLatch(1));
520 g = new Get(b1);
521 g.setCheckExistenceOnly(true);
522 g.setConsistency(Consistency.TIMELINE);
523 r = table.get(g);
524 Assert.assertTrue(r.isStale());
525 Assert.assertTrue(r.getExists());
526 SlowMeCopro.getCdl().get().countDown();
527 LOG.info("exists stale after flush done");
528
529 } finally {
530 SlowMeCopro.getCdl().get().countDown();
531 SlowMeCopro.sleepTime.set(0);
532 Delete d = new Delete(b1);
533 table.delete(d);
534 closeRegion(hriSecondary);
535 }
536 }
537
538 @Test
539 public void testCancelOfMultiGet() throws Exception {
540 openRegion(hriSecondary);
541 try {
542 List<Put> puts = new ArrayList<Put>(2);
543 byte[] b1 = Bytes.toBytes("testCancelOfMultiGet" + 0);
544 Put p = new Put(b1);
545 p.add(f, b1, b1);
546 puts.add(p);
547
548 byte[] b2 = Bytes.toBytes("testCancelOfMultiGet" + 1);
549 p = new Put(b2);
550 p.add(f, b2, b2);
551 puts.add(p);
552 table.put(puts);
553 LOG.debug("PUT done");
554 flushRegion(hriPrimary);
555 LOG.info("flush done");
556
557 Thread.sleep(1000 + REFRESH_PERIOD * 2);
558
559 AsyncProcess ap = ((ClusterConnection) HTU.getHBaseAdmin().getConnection())
560 .getAsyncProcess();
561
562
563 SlowMeCopro.getCdl().set(new CountDownLatch(1));
564
565 List<Get> gets = new ArrayList<Get>();
566 Get g = new Get(b1);
567 g.setCheckExistenceOnly(true);
568 g.setConsistency(Consistency.TIMELINE);
569 gets.add(g);
570 g = new Get(b2);
571 g.setCheckExistenceOnly(true);
572 g.setConsistency(Consistency.TIMELINE);
573 gets.add(g);
574 Object[] results = new Object[2];
575 AsyncRequestFuture reqs = ap.submitAll(table.getPool(), table.getName(),
576 gets, null, results);
577 reqs.waitUntilDone();
578
579 for (Object r : results) {
580 Assert.assertTrue(((Result)r).isStale());
581 Assert.assertTrue(((Result)r).getExists());
582 }
583 Set<MultiServerCallable<Row>> set = ((AsyncRequestFutureImpl<?>)reqs).getCallsInProgress();
584
585 Assert.assertTrue(!set.isEmpty());
586 for (MultiServerCallable<Row> m : set) {
587 Assert.assertTrue(m.isCancelled());
588 }
589 } finally {
590 SlowMeCopro.getCdl().get().countDown();
591 SlowMeCopro.sleepTime.set(0);
592 SlowMeCopro.slowDownNext.set(false);
593 SlowMeCopro.countOfNext.set(0);
594 for (int i = 0; i < 2; i++) {
595 byte[] b1 = Bytes.toBytes("testCancelOfMultiGet" + i);
596 Delete d = new Delete(b1);
597 table.delete(d);
598 }
599 closeRegion(hriSecondary);
600 }
601 }
602
603 @Test
604 public void testScanWithReplicas() throws Exception {
605
606 runMultipleScansOfOneType(false, false);
607 }
608
609 @Test
610 public void testSmallScanWithReplicas() throws Exception {
611
612 runMultipleScansOfOneType(false, true);
613 }
614
615 @Test
616 public void testReverseScanWithReplicas() throws Exception {
617
618 runMultipleScansOfOneType(true, false);
619 }
620
621 @Test
622 public void testCancelOfScan() throws Exception {
623 openRegion(hriSecondary);
624 int NUMROWS = 100;
625 try {
626 for (int i = 0; i < NUMROWS; i++) {
627 byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
628 Put p = new Put(b1);
629 p.add(f, b1, b1);
630 table.put(p);
631 }
632 LOG.debug("PUT done");
633 int caching = 20;
634 byte[] start;
635 start = Bytes.toBytes("testUseRegionWithReplica" + 0);
636
637 flushRegion(hriPrimary);
638 LOG.info("flush done");
639 Thread.sleep(1000 + REFRESH_PERIOD * 2);
640
641
642 SlowMeCopro.slowDownNext.set(true);
643 SlowMeCopro.countOfNext.set(0);
644 SlowMeCopro.sleepTime.set(5000);
645
646 Scan scan = new Scan(start);
647 scan.setCaching(caching);
648 scan.setConsistency(Consistency.TIMELINE);
649 ResultScanner scanner = table.getScanner(scan);
650 Iterator<Result> iter = scanner.iterator();
651 iter.next();
652 Assert.assertTrue(((ClientScanner)scanner).isAnyRPCcancelled());
653 SlowMeCopro.slowDownNext.set(false);
654 SlowMeCopro.countOfNext.set(0);
655 } finally {
656 SlowMeCopro.cdl.get().countDown();
657 SlowMeCopro.sleepTime.set(0);
658 SlowMeCopro.slowDownNext.set(false);
659 SlowMeCopro.countOfNext.set(0);
660 for (int i = 0; i < NUMROWS; i++) {
661 byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
662 Delete d = new Delete(b1);
663 table.delete(d);
664 }
665 closeRegion(hriSecondary);
666 }
667 }
668
669 private void runMultipleScansOfOneType(boolean reversed, boolean small) throws Exception {
670 openRegion(hriSecondary);
671 int NUMROWS = 100;
672 int NUMCOLS = 10;
673 try {
674 for (int i = 0; i < NUMROWS; i++) {
675 byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
676 for (int col = 0; col < NUMCOLS; col++) {
677 Put p = new Put(b1);
678 String qualifier = "qualifer" + col;
679 KeyValue kv = new KeyValue(b1, f, qualifier.getBytes());
680 p.add(kv);
681 table.put(p);
682 }
683 }
684 LOG.debug("PUT done");
685 int caching = 20;
686 long maxResultSize = Long.MAX_VALUE;
687
688 byte[] start;
689 if (reversed) start = Bytes.toBytes("testUseRegionWithReplica" + (NUMROWS - 1));
690 else start = Bytes.toBytes("testUseRegionWithReplica" + 0);
691
692 scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize,
693 start, NUMROWS, NUMCOLS, false, false);
694
695
696
697 SlowMeCopro.sleepTime.set(5000);
698 scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize, start, NUMROWS,
699 NUMCOLS, false, false);
700 SlowMeCopro.sleepTime.set(0);
701
702 flushRegion(hriPrimary);
703 LOG.info("flush done");
704 Thread.sleep(1000 + REFRESH_PERIOD * 2);
705
706
707 SlowMeCopro.sleepTime.set(5000);
708 scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize,
709 start, NUMROWS, NUMCOLS, true, false);
710 SlowMeCopro.sleepTime.set(0);
711
712
713 SlowMeCopro.slowDownNext.set(true);
714 SlowMeCopro.countOfNext.set(0);
715 scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start,
716 NUMROWS, NUMCOLS, true, true);
717 SlowMeCopro.slowDownNext.set(false);
718 SlowMeCopro.countOfNext.set(0);
719
720
721 SlowMeCopro.sleepTime.set(5000);
722 scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize,
723 start, NUMROWS, NUMCOLS, false, false);
724 SlowMeCopro.sleepTime.set(0);
725
726
727
728 maxResultSize = 1;
729 SlowMeCopro.slowDownNext.set(true);
730 SlowMeCopro.countOfNext.set(0);
731 scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start,
732 NUMROWS, NUMCOLS, true, true);
733 maxResultSize = Long.MAX_VALUE;
734 SlowMeCopro.slowDownNext.set(false);
735 SlowMeCopro.countOfNext.set(0);
736 } finally {
737 SlowMeCopro.getCdl().get().countDown();
738 SlowMeCopro.sleepTime.set(0);
739 SlowMeCopro.slowDownNext.set(false);
740 SlowMeCopro.countOfNext.set(0);
741 for (int i = 0; i < NUMROWS; i++) {
742 byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
743 Delete d = new Delete(b1);
744 table.delete(d);
745 }
746 closeRegion(hriSecondary);
747 }
748 }
749
750 private void scanWithReplicas(boolean reversed, boolean small, Consistency consistency,
751 int caching, long maxResultSize, byte[] startRow, int numRows, int numCols,
752 boolean staleExpected, boolean slowNext)
753 throws Exception {
754 Scan scan = new Scan(startRow);
755 scan.setCaching(caching);
756 scan.setMaxResultSize(maxResultSize);
757 scan.setReversed(reversed);
758 scan.setSmall(small);
759 scan.setConsistency(consistency);
760 ResultScanner scanner = table.getScanner(scan);
761 Iterator<Result> iter = scanner.iterator();
762
763
764 HashMap<String, Boolean> map = new HashMap<String, Boolean>();
765
766
767 int rowCount = 0;
768 int cellCount = 0;
769 int countOfStale = 0;
770
771 while (iter.hasNext()) {
772 rowCount++;
773 Result r = iter.next();
774 String row = new String(r.getRow());
775
776 if (map.containsKey(row)) {
777 throw new Exception("Unexpected scan result. Repeated row " + Bytes.toString(r.getRow()));
778 }
779
780 map.put(row, true);
781
782 for (Cell cell : r.rawCells()) {
783 cellCount++;
784 }
785
786 if (!slowNext) Assert.assertTrue(r.isStale() == staleExpected);
787 if (r.isStale()) countOfStale++;
788 }
789 Assert.assertTrue("Count of rows " + rowCount + " num rows expected " + numRows,
790 rowCount == numRows);
791 Assert.assertTrue("Count of cells: " + cellCount + " cells expected: " + numRows * numCols,
792 cellCount == (numRows * numCols));
793
794 if (slowNext) {
795 LOG.debug("Count of Stale " + countOfStale);
796 Assert.assertTrue(countOfStale > 1);
797
798
799
800 if (maxResultSize != Long.MAX_VALUE) {
801 Assert.assertTrue(countOfStale <= numRows);
802 } else {
803 Assert.assertTrue(countOfStale < numRows);
804 }
805 }
806 }
807 }