View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.client;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.HashMap;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.Random;
28  import java.util.Set;
29  import java.util.concurrent.CountDownLatch;
30  import java.util.concurrent.TimeUnit;
31  import java.util.concurrent.atomic.AtomicBoolean;
32  import java.util.concurrent.atomic.AtomicInteger;
33  import java.util.concurrent.atomic.AtomicLong;
34  import java.util.concurrent.atomic.AtomicReference;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.commons.logging.impl.Log4JLogger;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.Cell;
41  import org.apache.hadoop.hbase.HBaseTestingUtility;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.HRegionInfo;
44  import org.apache.hadoop.hbase.HTableDescriptor;
45  import org.apache.hadoop.hbase.KeyValue;
46  import org.apache.hadoop.hbase.NotServingRegionException;
47  import org.apache.hadoop.hbase.RegionLocations;
48  import org.apache.hadoop.hbase.TableNotFoundException;
49  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
50  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFutureImpl;
51  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
52  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
53  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
54  import org.apache.hadoop.hbase.protobuf.RequestConverter;
55  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
56  import org.apache.hadoop.hbase.regionserver.HRegionServer;
57  import org.apache.hadoop.hbase.regionserver.InternalScanner;
58  import org.apache.hadoop.hbase.regionserver.RegionScanner;
59  import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
60  import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
61  import org.apache.hadoop.hbase.testclassification.MediumTests;
62  import org.apache.hadoop.hbase.util.Bytes;
63  import org.apache.hadoop.hbase.zookeeper.ZKAssign;
64  import org.apache.log4j.Level;
65  import org.apache.zookeeper.KeeperException;
66  import org.junit.After;
67  import org.junit.AfterClass;
68  import org.junit.Assert;
69  import org.junit.Before;
70  import org.junit.BeforeClass;
71  import org.junit.Test;
72  import org.junit.experimental.categories.Category;
73  
74  /**
75   * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole
76   * cluster. See {@link org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster}.
77   */
78  @Category(MediumTests.class)
79  public class TestReplicasClient {
80    private static final Log LOG = LogFactory.getLog(TestReplicasClient.class);
81  
82    static {
83      ((Log4JLogger)RpcRetryingCaller.LOG).getLogger().setLevel(Level.ALL);
84    }
85  
86    private static final int NB_SERVERS = 1;
87    private static HTable table = null;
88    private static final byte[] row = TestReplicasClient.class.getName().getBytes();
89  
90    private static HRegionInfo hriPrimary;
91    private static HRegionInfo hriSecondary;
92  
93    private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
94    private static final byte[] f = HConstants.CATALOG_FAMILY;
95  
96    private final static int REFRESH_PERIOD = 1000;
97  
98    /**
99     * This copro is used to synchronize the tests.
100    */
101   public static class SlowMeCopro extends BaseRegionObserver {
102     static final AtomicLong sleepTime = new AtomicLong(0);
103     static final AtomicBoolean slowDownNext = new AtomicBoolean(false);
104     static final AtomicInteger countOfNext = new AtomicInteger(0);
105     private static final AtomicReference<CountDownLatch> cdl =
106         new AtomicReference<CountDownLatch>(new CountDownLatch(0));
107     Random r = new Random();
108     public SlowMeCopro() {
109     }
110 
111     @Override
112     public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
113                          final Get get, final List<Cell> results) throws IOException {
114       slowdownCode(e);
115     }
116 
117     @Override
118     public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
119         final Scan scan, final RegionScanner s) throws IOException {
120       slowdownCode(e);
121       return s;
122     }
123 
124     @Override
125     public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e,
126         final InternalScanner s, final List<Result> results,
127         final int limit, final boolean hasMore) throws IOException {
128       //this will slow down a certain next operation if the conditions are met. The slowness
129       //will allow the call to go to a replica
130       if (slowDownNext.get()) {
131         //have some "next" return successfully from the primary; hence countOfNext checked
132         if (countOfNext.incrementAndGet() == 2) {
133           sleepTime.set(2000);
134           slowdownCode(e);
135         }
136       }
137       return true;
138     }
139 
140     private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) {
141       if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
142         CountDownLatch latch = getCdl().get();
143         try {
144           if (sleepTime.get() > 0) {
145             LOG.info("Sleeping for " + sleepTime.get() + " ms");
146             Thread.sleep(sleepTime.get());
147           } else if (latch.getCount() > 0) {
148             LOG.info("Waiting for the counterCountDownLatch");
149             latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
150             if (latch.getCount() > 0) {
151               throw new RuntimeException("Can't wait more");
152             }
153           }
154         } catch (InterruptedException e1) {
155           LOG.error(e1);
156         }
157       } else {
158         LOG.info("We're not the primary replicas.");
159       }
160     }
161 
162     public static AtomicReference<CountDownLatch> getCdl() {
163       return cdl;
164     }
165   }
166 
167   @BeforeClass
168   public static void beforeClass() throws Exception {
169     // enable store file refreshing
170     HTU.getConfiguration().setInt(
171         StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, REFRESH_PERIOD);
172     HTU.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
173     ConnectionUtils.setupMasterlessConnection(HTU.getConfiguration());
174     HTU.startMiniCluster(NB_SERVERS);
175 
176     // Create table then get the single region for our new table.
177     HTableDescriptor hdt = HTU.createTableDescriptor(TestReplicasClient.class.getSimpleName());
178     hdt.addCoprocessor(SlowMeCopro.class.getName());
179     table = HTU.createTable(hdt, new byte[][]{f}, HTU.getConfiguration());
180 
181     hriPrimary = table.getRegionLocation(row, false).getRegionInfo();
182 
183     // mock a secondary region info to open
184     hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(),
185         hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1);
186 
187     // No master
188     LOG.info("Master is going to be stopped");
189     TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
190     Configuration c = new Configuration(HTU.getConfiguration());
191     c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
192     LOG.info("Master has stopped");
193   }
194 
195   @AfterClass
196   public static void afterClass() throws Exception {
197     if (table != null) table.close();
198     HTU.shutdownMiniCluster();
199   }
200 
201   @Before
202   public void before() throws IOException {
203     HTU.getHBaseAdmin().getConnection().clearRegionCache();
204     try {
205       openRegion(hriPrimary);
206     } catch (Exception ignored) {
207     }
208     try {
209       openRegion(hriSecondary);
210     } catch (Exception ignored) {
211     }
212   }
213 
214   @After
215   public void after() throws IOException, KeeperException {
216     try {
217       closeRegion(hriSecondary);
218     } catch (Exception ignored) {
219     }
220     try {
221       closeRegion(hriPrimary);
222     } catch (Exception ignored) {
223     }
224     ZKAssign.deleteNodeFailSilent(HTU.getZooKeeperWatcher(), hriPrimary);
225     ZKAssign.deleteNodeFailSilent(HTU.getZooKeeperWatcher(), hriSecondary);
226 
227     HTU.getHBaseAdmin().getConnection().clearRegionCache();
228   }
229 
230   private HRegionServer getRS() {
231     return HTU.getMiniHBaseCluster().getRegionServer(0);
232   }
233 
234   private void openRegion(HRegionInfo hri) throws Exception {
235     try {
236       if (isRegionOpened(hri)) return;
237     } catch (Exception e){}
238     ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
239     // first version is '0'
240     AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(
241       getRS().getServerName(), hri, 0, null, null);
242     AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr);
243     Assert.assertEquals(responseOpen.getOpeningStateCount(), 1);
244     Assert.assertEquals(responseOpen.getOpeningState(0),
245       AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED);
246     checkRegionIsOpened(hri);
247   }
248 
249   private void closeRegion(HRegionInfo hri) throws Exception {
250     ZKAssign.createNodeClosing(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
251 
252     AdminProtos.CloseRegionRequest crr = RequestConverter.buildCloseRegionRequest(
253       getRS().getServerName(), hri.getEncodedName(), true);
254     AdminProtos.CloseRegionResponse responseClose = getRS()
255         .getRSRpcServices().closeRegion(null, crr);
256     Assert.assertTrue(responseClose.getClosed());
257 
258     checkRegionIsClosed(hri.getEncodedName());
259 
260     ZKAssign.deleteClosedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName(), null);
261   }
262 
263   private void checkRegionIsOpened(HRegionInfo hri) throws Exception {
264 
265     while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
266       Thread.sleep(1);
267     }
268 
269     Assert.assertTrue(
270         ZKAssign.deleteOpenedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName(), null));
271   }
272 
273   private boolean isRegionOpened(HRegionInfo hri) throws Exception {
274     return getRS().getRegionByEncodedName(hri.getEncodedName()).isAvailable();
275   }
276 
277   private void checkRegionIsClosed(String encodedRegionName) throws Exception {
278 
279     while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
280       Thread.sleep(1);
281     }
282 
283     try {
284       Assert.assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable());
285     } catch (NotServingRegionException expected) {
286       // That's how it work: if the region is closed we have an exception.
287     }
288 
289     // We don't delete the znode here, because there is not always a znode.
290   }
291 
292   private void flushRegion(HRegionInfo regionInfo) throws IOException {
293     TestRegionServerNoMaster.flushRegion(HTU, regionInfo);
294   }
295 
296   @Test
297   public void testUseRegionWithoutReplica() throws Exception {
298     byte[] b1 = "testUseRegionWithoutReplica".getBytes();
299     openRegion(hriSecondary);
300     SlowMeCopro.getCdl().set(new CountDownLatch(0));
301     try {
302       Get g = new Get(b1);
303       Result r = table.get(g);
304       Assert.assertFalse(r.isStale());
305     } finally {
306       closeRegion(hriSecondary);
307     }
308   }
309 
310   @Test
311   public void testLocations() throws Exception {
312     byte[] b1 = "testLocations".getBytes();
313     openRegion(hriSecondary);
314     ClusterConnection hc = (ClusterConnection) HTU.getHBaseAdmin().getConnection();
315 
316     try {
317       hc.clearRegionCache();
318       RegionLocations rl = hc.locateRegion(table.getName(), b1, false, false);
319       Assert.assertEquals(2, rl.size());
320 
321       rl = hc.locateRegion(table.getName(), b1, true, false);
322       Assert.assertEquals(2, rl.size());
323 
324       hc.clearRegionCache();
325       rl = hc.locateRegion(table.getName(), b1, true, false);
326       Assert.assertEquals(2, rl.size());
327 
328       rl = hc.locateRegion(table.getName(), b1, false, false);
329       Assert.assertEquals(2, rl.size());
330     } finally {
331       closeRegion(hriSecondary);
332     }
333   }
334 
335   @Test
336   public void testGetNoResultNoStaleRegionWithReplica() throws Exception {
337     byte[] b1 = "testGetNoResultNoStaleRegionWithReplica".getBytes();
338     openRegion(hriSecondary);
339 
340     try {
341       // A get works and is not stale
342       Get g = new Get(b1);
343       Result r = table.get(g);
344       Assert.assertFalse(r.isStale());
345     } finally {
346       closeRegion(hriSecondary);
347     }
348   }
349 
350 
351   @Test
352   public void testGetNoResultStaleRegionWithReplica() throws Exception {
353     byte[] b1 = "testGetNoResultStaleRegionWithReplica".getBytes();
354     openRegion(hriSecondary);
355 
356     SlowMeCopro.getCdl().set(new CountDownLatch(1));
357     try {
358       Get g = new Get(b1);
359       g.setConsistency(Consistency.TIMELINE);
360       Result r = table.get(g);
361       Assert.assertTrue(r.isStale());
362     } finally {
363       SlowMeCopro.getCdl().get().countDown();
364       closeRegion(hriSecondary);
365     }
366   }
367 
368   @Test
369   public void testGetNoResultNotStaleSleepRegionWithReplica() throws Exception {
370     byte[] b1 = "testGetNoResultNotStaleSleepRegionWithReplica".getBytes();
371     openRegion(hriSecondary);
372 
373     try {
374       // We sleep; but we won't go to the stale region as we don't get the stale by default.
375       SlowMeCopro.sleepTime.set(2000);
376       Get g = new Get(b1);
377       Result r = table.get(g);
378       Assert.assertFalse(r.isStale());
379 
380     } finally {
381       SlowMeCopro.sleepTime.set(0);
382       closeRegion(hriSecondary);
383     }
384   }
385 
386 
387   @Test
388   public void testFlushTable() throws Exception {
389     openRegion(hriSecondary);
390     try {
391       flushRegion(hriPrimary);
392       flushRegion(hriSecondary);
393 
394       Put p = new Put(row);
395       p.add(f, row, row);
396       table.put(p);
397 
398       flushRegion(hriPrimary);
399       flushRegion(hriSecondary);
400     } finally {
401       Delete d = new Delete(row);
402       table.delete(d);
403       closeRegion(hriSecondary);
404     }
405   }
406 
407   @Test
408   public void testFlushPrimary() throws Exception {
409     openRegion(hriSecondary);
410 
411     try {
412       flushRegion(hriPrimary);
413 
414       Put p = new Put(row);
415       p.add(f, row, row);
416       table.put(p);
417 
418       flushRegion(hriPrimary);
419     } finally {
420       Delete d = new Delete(row);
421       table.delete(d);
422       closeRegion(hriSecondary);
423     }
424   }
425 
426   @Test
427   public void testFlushSecondary() throws Exception {
428     openRegion(hriSecondary);
429     try {
430       flushRegion(hriSecondary);
431 
432       Put p = new Put(row);
433       p.add(f, row, row);
434       table.put(p);
435 
436       flushRegion(hriSecondary);
437     } catch (TableNotFoundException expected) {
438     } finally {
439       Delete d = new Delete(row);
440       table.delete(d);
441       closeRegion(hriSecondary);
442     }
443   }
444 
445   @Test
446   public void testUseRegionWithReplica() throws Exception {
447     byte[] b1 = "testUseRegionWithReplica".getBytes();
448     openRegion(hriSecondary);
449 
450     try {
451       // A simple put works, even if there here a second replica
452       Put p = new Put(b1);
453       p.add(f, b1, b1);
454       table.put(p);
455       LOG.info("Put done");
456 
457       // A get works and is not stale
458       Get g = new Get(b1);
459       Result r = table.get(g);
460       Assert.assertFalse(r.isStale());
461       Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
462       LOG.info("get works and is not stale done");
463 
464       // Even if it we have to wait a little on the main region
465       SlowMeCopro.sleepTime.set(2000);
466       g = new Get(b1);
467       r = table.get(g);
468       Assert.assertFalse(r.isStale());
469       Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
470       SlowMeCopro.sleepTime.set(0);
471       LOG.info("sleep and is not stale done");
472 
473       // But if we ask for stale we will get it
474       SlowMeCopro.getCdl().set(new CountDownLatch(1));
475       g = new Get(b1);
476       g.setConsistency(Consistency.TIMELINE);
477       r = table.get(g);
478       Assert.assertTrue(r.isStale());
479       Assert.assertTrue(r.getColumnCells(f, b1).isEmpty());
480       SlowMeCopro.getCdl().get().countDown();
481 
482       LOG.info("stale done");
483 
484       // exists works and is not stale
485       g = new Get(b1);
486       g.setCheckExistenceOnly(true);
487       r = table.get(g);
488       Assert.assertFalse(r.isStale());
489       Assert.assertTrue(r.getExists());
490       LOG.info("exists not stale done");
491 
492       // exists works on stale but don't see the put
493       SlowMeCopro.getCdl().set(new CountDownLatch(1));
494       g = new Get(b1);
495       g.setCheckExistenceOnly(true);
496       g.setConsistency(Consistency.TIMELINE);
497       r = table.get(g);
498       Assert.assertTrue(r.isStale());
499       Assert.assertFalse("The secondary has stale data", r.getExists());
500       SlowMeCopro.getCdl().get().countDown();
501       LOG.info("exists stale before flush done");
502 
503       flushRegion(hriPrimary);
504       flushRegion(hriSecondary);
505       LOG.info("flush done");
506       Thread.sleep(1000 + REFRESH_PERIOD * 2);
507 
508       // get works and is not stale
509       SlowMeCopro.getCdl().set(new CountDownLatch(1));
510       g = new Get(b1);
511       g.setConsistency(Consistency.TIMELINE);
512       r = table.get(g);
513       Assert.assertTrue(r.isStale());
514       Assert.assertFalse(r.isEmpty());
515       SlowMeCopro.getCdl().get().countDown();
516       LOG.info("stale done");
517 
518       // exists works on stale and we see the put after the flush
519       SlowMeCopro.getCdl().set(new CountDownLatch(1));
520       g = new Get(b1);
521       g.setCheckExistenceOnly(true);
522       g.setConsistency(Consistency.TIMELINE);
523       r = table.get(g);
524       Assert.assertTrue(r.isStale());
525       Assert.assertTrue(r.getExists());
526       SlowMeCopro.getCdl().get().countDown();
527       LOG.info("exists stale after flush done");
528 
529     } finally {
530       SlowMeCopro.getCdl().get().countDown();
531       SlowMeCopro.sleepTime.set(0);
532       Delete d = new Delete(b1);
533       table.delete(d);
534       closeRegion(hriSecondary);
535     }
536   }
537 
538   @Test
539   public void testCancelOfMultiGet() throws Exception {
540     openRegion(hriSecondary);
541     try {
542       List<Put> puts = new ArrayList<Put>(2);
543       byte[] b1 = Bytes.toBytes("testCancelOfMultiGet" + 0);
544       Put p = new Put(b1);
545       p.add(f, b1, b1);
546       puts.add(p);
547 
548       byte[] b2 = Bytes.toBytes("testCancelOfMultiGet" + 1);
549       p = new Put(b2);
550       p.add(f, b2, b2);
551       puts.add(p);
552       table.put(puts);
553       LOG.debug("PUT done");
554       flushRegion(hriPrimary);
555       LOG.info("flush done");
556 
557       Thread.sleep(1000 + REFRESH_PERIOD * 2);
558 
559       AsyncProcess ap = ((ClusterConnection) HTU.getHBaseAdmin().getConnection())
560           .getAsyncProcess();
561 
562       // Make primary slowdown
563       SlowMeCopro.getCdl().set(new CountDownLatch(1));
564 
565       List<Get> gets = new ArrayList<Get>();
566       Get g = new Get(b1);
567       g.setCheckExistenceOnly(true);
568       g.setConsistency(Consistency.TIMELINE);
569       gets.add(g);
570       g = new Get(b2);
571       g.setCheckExistenceOnly(true);
572       g.setConsistency(Consistency.TIMELINE);
573       gets.add(g);
574       Object[] results = new Object[2];
575       AsyncRequestFuture reqs = ap.submitAll(table.getPool(), table.getName(),
576           gets, null, results);
577       reqs.waitUntilDone();
578       // verify we got the right results back
579       for (Object r : results) {
580         Assert.assertTrue(((Result)r).isStale());
581         Assert.assertTrue(((Result)r).getExists());
582       }
583       Set<MultiServerCallable<Row>> set = ((AsyncRequestFutureImpl<?>)reqs).getCallsInProgress();
584       // verify we did cancel unneeded calls
585       Assert.assertTrue(!set.isEmpty());
586       for (MultiServerCallable<Row> m : set) {
587         Assert.assertTrue(m.isCancelled());
588       }
589     } finally {
590       SlowMeCopro.getCdl().get().countDown();
591       SlowMeCopro.sleepTime.set(0);
592       SlowMeCopro.slowDownNext.set(false);
593       SlowMeCopro.countOfNext.set(0);
594       for (int i = 0; i < 2; i++) {
595         byte[] b1 = Bytes.toBytes("testCancelOfMultiGet" + i);
596         Delete d = new Delete(b1);
597         table.delete(d);
598       }
599       closeRegion(hriSecondary);
600     }
601   }
602 
603   @Test
604   public void testScanWithReplicas() throws Exception {
605     //simple scan
606     runMultipleScansOfOneType(false, false);
607   }
608 
609   @Test
610   public void testSmallScanWithReplicas() throws Exception {
611     //small scan
612     runMultipleScansOfOneType(false, true);
613   }
614 
615   @Test
616   public void testReverseScanWithReplicas() throws Exception {
617     //reverse scan
618     runMultipleScansOfOneType(true, false);
619   }
620 
621   @Test
622   public void testCancelOfScan() throws Exception {
623     openRegion(hriSecondary);
624     int NUMROWS = 100;
625     try {
626       for (int i = 0; i < NUMROWS; i++) {
627         byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
628         Put p = new Put(b1);
629         p.add(f, b1, b1);
630         table.put(p);
631       }
632       LOG.debug("PUT done");
633       int caching = 20;
634       byte[] start;
635       start = Bytes.toBytes("testUseRegionWithReplica" + 0);
636 
637       flushRegion(hriPrimary);
638       LOG.info("flush done");
639       Thread.sleep(1000 + REFRESH_PERIOD * 2);
640 
641       // now make some 'next' calls slow
642       SlowMeCopro.slowDownNext.set(true);
643       SlowMeCopro.countOfNext.set(0);
644       SlowMeCopro.sleepTime.set(5000);
645 
646       Scan scan = new Scan(start);
647       scan.setCaching(caching);
648       scan.setConsistency(Consistency.TIMELINE);
649       ResultScanner scanner = table.getScanner(scan);
650       Iterator<Result> iter = scanner.iterator();
651       iter.next();
652       Assert.assertTrue(((ClientScanner)scanner).isAnyRPCcancelled());
653       SlowMeCopro.slowDownNext.set(false);
654       SlowMeCopro.countOfNext.set(0);
655     } finally {
656       SlowMeCopro.cdl.get().countDown();
657       SlowMeCopro.sleepTime.set(0);
658       SlowMeCopro.slowDownNext.set(false);
659       SlowMeCopro.countOfNext.set(0);
660       for (int i = 0; i < NUMROWS; i++) {
661         byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
662         Delete d = new Delete(b1);
663         table.delete(d);
664       }
665       closeRegion(hriSecondary);
666     }
667   }
668 
669   private void runMultipleScansOfOneType(boolean reversed, boolean small) throws Exception {
670     openRegion(hriSecondary);
671     int NUMROWS = 100;
672     int NUMCOLS = 10;
673     try {
674       for (int i = 0; i < NUMROWS; i++) {
675         byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
676         for (int col = 0; col < NUMCOLS; col++) {
677           Put p = new Put(b1);
678           String qualifier = "qualifer" + col;
679           KeyValue kv = new KeyValue(b1, f, qualifier.getBytes());
680           p.add(kv);
681           table.put(p);
682         }
683       }
684       LOG.debug("PUT done");
685       int caching = 20;
686       long maxResultSize = Long.MAX_VALUE;
687 
688       byte[] start;
689       if (reversed) start = Bytes.toBytes("testUseRegionWithReplica" + (NUMROWS - 1));
690       else start = Bytes.toBytes("testUseRegionWithReplica" + 0);
691 
692       scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize,
693         start, NUMROWS, NUMCOLS, false, false);
694 
695       // Even if we were to slow the server down, unless we ask for stale
696       // we won't get it
697       SlowMeCopro.sleepTime.set(5000);
698       scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize, start, NUMROWS,
699         NUMCOLS, false, false);
700       SlowMeCopro.sleepTime.set(0);
701 
702       flushRegion(hriPrimary);
703       LOG.info("flush done");
704       Thread.sleep(1000 + REFRESH_PERIOD * 2);
705 
706       //Now set the flag to get a response even if stale
707       SlowMeCopro.sleepTime.set(5000);
708       scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize,
709         start, NUMROWS, NUMCOLS, true, false);
710       SlowMeCopro.sleepTime.set(0);
711 
712       // now make some 'next' calls slow
713       SlowMeCopro.slowDownNext.set(true);
714       SlowMeCopro.countOfNext.set(0);
715       scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start,
716         NUMROWS, NUMCOLS, true, true);
717       SlowMeCopro.slowDownNext.set(false);
718       SlowMeCopro.countOfNext.set(0);
719 
720       // Make sure we do not get stale data..
721       SlowMeCopro.sleepTime.set(5000);
722       scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize,
723         start, NUMROWS, NUMCOLS, false, false);
724       SlowMeCopro.sleepTime.set(0);
725 
726       // While the next calls are slow, set maxResultSize to 1 so that some partial results will be
727       // returned from the server before the replica switch occurs.
728       maxResultSize = 1;
729       SlowMeCopro.slowDownNext.set(true);
730       SlowMeCopro.countOfNext.set(0);
731       scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start,
732         NUMROWS, NUMCOLS, true, true);
733       maxResultSize = Long.MAX_VALUE;
734       SlowMeCopro.slowDownNext.set(false);
735       SlowMeCopro.countOfNext.set(0);
736     } finally {
737       SlowMeCopro.getCdl().get().countDown();
738       SlowMeCopro.sleepTime.set(0);
739       SlowMeCopro.slowDownNext.set(false);
740       SlowMeCopro.countOfNext.set(0);
741       for (int i = 0; i < NUMROWS; i++) {
742         byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
743         Delete d = new Delete(b1);
744         table.delete(d);
745       }
746       closeRegion(hriSecondary);
747     }
748   }
749 
750   private void scanWithReplicas(boolean reversed, boolean small, Consistency consistency,
751       int caching, long maxResultSize, byte[] startRow, int numRows, int numCols,
752       boolean staleExpected, boolean slowNext)
753           throws Exception {
754     Scan scan = new Scan(startRow);
755     scan.setCaching(caching);
756     scan.setMaxResultSize(maxResultSize);
757     scan.setReversed(reversed);
758     scan.setSmall(small);
759     scan.setConsistency(consistency);
760     ResultScanner scanner = table.getScanner(scan);
761     Iterator<Result> iter = scanner.iterator();
762 
763     // Maps of row keys that we have seen so far
764     HashMap<String, Boolean> map = new HashMap<String, Boolean>();
765 
766     // Tracked metrics
767     int rowCount = 0;
768     int cellCount = 0;
769     int countOfStale = 0;
770 
771     while (iter.hasNext()) {
772       rowCount++;
773       Result r = iter.next();
774       String row = new String(r.getRow());
775 
776       if (map.containsKey(row)) {
777         throw new Exception("Unexpected scan result. Repeated row " + Bytes.toString(r.getRow()));
778       }
779 
780       map.put(row, true);
781 
782       for (Cell cell : r.rawCells()) {
783         cellCount++;
784       }
785 
786       if (!slowNext) Assert.assertTrue(r.isStale() == staleExpected);
787       if (r.isStale()) countOfStale++;
788     }
789     Assert.assertTrue("Count of rows " + rowCount + " num rows expected " + numRows,
790       rowCount == numRows);
791     Assert.assertTrue("Count of cells: " + cellCount + " cells expected: " + numRows * numCols,
792       cellCount == (numRows * numCols));
793 
794     if (slowNext) {
795       LOG.debug("Count of Stale " + countOfStale);
796       Assert.assertTrue(countOfStale > 1);
797 
798       // If the scan was configured in such a way that a full row was NOT retrieved before the
799       // replica switch occurred, then it is possible that all rows were stale
800       if (maxResultSize != Long.MAX_VALUE) {
801         Assert.assertTrue(countOfStale <= numRows);
802       } else {
803         Assert.assertTrue(countOfStale < numRows);
804       }
805     }
806   }
807 }