View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  
21  package org.apache.hadoop.hbase.client;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.fs.Path;
27  import org.apache.hadoop.hbase.Cell;
28  import org.apache.hadoop.hbase.HBaseConfiguration;
29  import org.apache.hadoop.hbase.HBaseTestingUtility;
30  import org.apache.hadoop.hbase.HColumnDescriptor;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.HTableDescriptor;
33  import org.apache.hadoop.hbase.RegionLocations;
34  import org.apache.hadoop.hbase.ServerName;
35  import org.apache.hadoop.hbase.TableName;
36  import org.apache.hadoop.hbase.testclassification.MediumTests;
37  import org.apache.hadoop.hbase.RegionLocations;
38  import org.apache.hadoop.hbase.Waiter;
39  import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
40  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
41  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
42  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
43  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
44  import org.apache.hadoop.hbase.protobuf.RequestConverter;
45  import org.apache.hadoop.hbase.regionserver.RegionScanner;
46  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
47  import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
48  import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
49  import org.apache.hadoop.hbase.util.Bytes;
50  import org.apache.hadoop.hbase.util.Pair;
51  import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
52  import org.junit.AfterClass;
53  import org.junit.Assert;
54  import org.junit.BeforeClass;
55  import org.junit.Test;
56  import org.junit.experimental.categories.Category;
57  
58  import java.io.IOException;
59  import java.util.ArrayList;
60  import java.util.Arrays;
61  import java.util.List;
62  import java.util.concurrent.CountDownLatch;
63  import java.util.concurrent.TimeUnit;
64  import java.util.concurrent.atomic.AtomicLong;
65  import java.util.concurrent.atomic.AtomicReference;
66  
67  @Category(MediumTests.class)
68  public class TestReplicaWithCluster {
69    private static final Log LOG = LogFactory.getLog(TestReplicaWithCluster.class);
70  
71    private static final int NB_SERVERS = 3;
72    private static final byte[] row = TestReplicaWithCluster.class.getName().getBytes();
73    private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
74  
75    // second minicluster used in testing of replication
76    private static HBaseTestingUtility HTU2;
77    private static final byte[] f = HConstants.CATALOG_FAMILY;
78  
79    private final static int REFRESH_PERIOD = 1000;
80    private final static int META_SCAN_TIMEOUT_IN_MILLISEC = 200;
81  
82    /**
83     * This copro is used to synchronize the tests.
84     */
85    public static class SlowMeCopro extends BaseRegionObserver {
86      static final AtomicLong sleepTime = new AtomicLong(0);
87      static final AtomicReference<CountDownLatch> cdl =
88              new AtomicReference<CountDownLatch>(new CountDownLatch(0));
89  
90      public SlowMeCopro() {
91      }
92  
93      @Override
94      public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
95                           final Get get, final List<Cell> results) throws IOException {
96  
97        if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
98          CountDownLatch latch = cdl.get();
99          try {
100           if (sleepTime.get() > 0) {
101             LOG.info("Sleeping for " + sleepTime.get() + " ms");
102             Thread.sleep(sleepTime.get());
103           } else if (latch.getCount() > 0) {
104             LOG.info("Waiting for the counterCountDownLatch");
105             latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
106             if (latch.getCount() > 0) {
107               throw new RuntimeException("Can't wait more");
108             }
109           }
110         } catch (InterruptedException e1) {
111           LOG.error(e1);
112         }
113       } else {
114         LOG.info("We're not the primary replicas.");
115       }
116     }
117   }
118 
119   /**
120    * This copro is used to slow down the primary meta region scan a bit
121    */
122   public static class RegionServerHostingPrimayMetaRegionSlowCopro extends BaseRegionObserver {
123     static boolean slowDownPrimaryMetaScan = false;
124 
125     @Override
126     public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
127         final Scan scan, final RegionScanner s) throws IOException {
128 
129       int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
130 
131       // Slow down with the primary meta region scan
132       if (slowDownPrimaryMetaScan && (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion()
133           && (replicaId == 0))) {
134         LOG.info("Scan with primary meta region, slow down a bit");
135         try {
136           Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50);
137         } catch (InterruptedException ie) {
138           // Ingore
139         }
140 
141       }
142       return null;
143     }
144   }
145 
146 
147   /**
148    * This copro is used to slow down the primary meta region scan a bit
149    */
150   public static class RegionServerHostingPrimayMetaRegionSlowOrStopCopro extends BaseRegionObserver {
151     static boolean slowDownPrimaryMetaScan = false;
152     static boolean throwException = false;
153 
154     @Override
155     public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
156         final Get get, final List<Cell> results) throws IOException {
157 
158       int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
159 
160       // Fail for the primary replica, but not for meta
161       if (throwException) {
162         if (!e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) {
163           LOG.info("Get, throw Region Server Stopped Exceptoin for region " + e.getEnvironment()
164               .getRegion().getRegionInfo());
165           throw new RegionServerStoppedException("Server " +
166               e.getEnvironment().getRegionServerServices().getServerName() + " not running");
167         }
168       } else {
169         LOG.info("Get, We're replica region " + replicaId);
170       }
171     }
172 
173     @Override
174     public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
175         final Scan scan, final RegionScanner s) throws IOException {
176 
177       int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
178 
179       // Slow down with the primary meta region scan
180       if (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) {
181         if (slowDownPrimaryMetaScan) {
182           LOG.info("Scan with primary meta region, slow down a bit");
183           try {
184             Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50);
185           } catch (InterruptedException ie) {
186             // Ingore
187           }
188         }
189 
190         // Fail for the primary replica
191         if (throwException) {
192           LOG.info("Scan, throw Region Server Stopped Exceptoin for replica " + e.getEnvironment()
193               .getRegion().getRegionInfo());
194 
195           throw new RegionServerStoppedException("Server " +
196               e.getEnvironment().getRegionServerServices().getServerName() + " not running");
197         } else {
198           LOG.info("Scan, We're replica region " + replicaId);
199         }
200       } else {
201         LOG.info("Scan, We're replica region " + replicaId);
202       }
203 
204       return null;
205     }
206   }
207 
208   /**
209    * This copro is used to simulate region server down exception for Get and Scan
210    */
211   public static class RegionServerStoppedCopro extends BaseRegionObserver {
212 
213     public RegionServerStoppedCopro() {
214     }
215 
216     @Override
217     public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
218         final Get get, final List<Cell> results) throws IOException {
219 
220       int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
221 
222       // Fail for the primary replica and replica 1
223       if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
224         LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId);
225         throw new RegionServerStoppedException("Server " +
226             e.getEnvironment().getRegionServerServices().getServerName()
227             + " not running");
228       } else {
229         LOG.info("We're replica region " + replicaId);
230       }
231     }
232 
233     @Override
234     public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
235         final Scan scan, final RegionScanner s) throws IOException {
236 
237       int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
238 
239       // Fail for the primary replica and replica 1
240       if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
241         LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId);
242         throw new RegionServerStoppedException("Server " +
243             e.getEnvironment().getRegionServerServices().getServerName()
244             + " not running");
245       } else {
246         LOG.info("We're replica region " + replicaId);
247       }
248 
249       return null;
250     }
251   }
252 
253   @BeforeClass
254   public static void beforeClass() throws Exception {
255     // enable store file refreshing
256     HTU.getConfiguration().setInt(
257             StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, REFRESH_PERIOD);
258 
259     HTU.getConfiguration().setFloat("hbase.regionserver.logroll.multiplier", 0.0001f);
260     HTU.getConfiguration().setInt("replication.source.size.capacity", 10240);
261     HTU.getConfiguration().setLong("replication.source.sleepforretries", 100);
262     HTU.getConfiguration().setInt("hbase.regionserver.maxlogs", 2);
263     HTU.getConfiguration().setLong("hbase.master.logcleaner.ttl", 10);
264     HTU.getConfiguration().setInt("zookeeper.recovery.retry", 1);
265     HTU.getConfiguration().setInt("zookeeper.recovery.retry.intervalmill", 10);
266 
267 
268     // Enable meta replica at server side
269     HTU.getConfiguration().setInt("hbase.meta.replica.count", 2);
270 
271     // Make sure master does not host system tables.
272     HTU.getConfiguration().set("hbase.balancer.tablesOnMaster", "none");
273 
274     // Set system coprocessor so it can be applied to meta regions
275     HTU.getConfiguration().set("hbase.coprocessor.region.classes",
276         RegionServerHostingPrimayMetaRegionSlowCopro.class.getName());
277 
278     HTU.getConfiguration().setInt(HConstants.HBASE_CLIENT_MEAT_REPLICA_SCAN_TIMEOUT,
279         META_SCAN_TIMEOUT_IN_MILLISEC * 1000);
280 
281     // Set system coprocessor so it can be applied to meta regions
282     HTU.getConfiguration().set("hbase.coprocessor.region.classes",
283         RegionServerHostingPrimayMetaRegionSlowOrStopCopro.class.getName());
284 
285     HTU.getConfiguration().setInt(HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
286         META_SCAN_TIMEOUT_IN_MILLISEC * 1000);
287 
288     // Wait for primary call longer so make sure that it will get exception from the primary call
289     HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.get", 1000000);
290     HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.scan", 1000000);
291 
292     HTU.startMiniCluster(NB_SERVERS);
293     HTU.getHBaseCluster().startMaster();
294   }
295 
296   @AfterClass
297   public static void afterClass() throws Exception {
298     if (HTU2 != null) {
299       HTU2.shutdownMiniCluster();
300     }
301     HTU.shutdownMiniCluster();
302   }
303 
304   @Test(timeout = 30000)
305   public void testCreateDeleteTable() throws IOException {
306     // Create table then get the single region for our new table.
307     HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable");
308     hdt.setRegionReplication(NB_SERVERS);
309     hdt.addCoprocessor(SlowMeCopro.class.getName());
310     Table table = HTU.createTable(hdt, new byte[][]{f}, HTU.getConfiguration());
311 
312     Put p = new Put(row);
313     p.add(f, row, row);
314     table.put(p);
315 
316     Get g = new Get(row);
317     Result r = table.get(g);
318     Assert.assertFalse(r.isStale());
319 
320     try {
321       // But if we ask for stale we will get it
322       SlowMeCopro.cdl.set(new CountDownLatch(1));
323       g = new Get(row);
324       g.setConsistency(Consistency.TIMELINE);
325       r = table.get(g);
326       Assert.assertTrue(r.isStale());
327       SlowMeCopro.cdl.get().countDown();
328     } finally {
329       SlowMeCopro.cdl.get().countDown();
330       SlowMeCopro.sleepTime.set(0);
331     }
332 
333     HTU.getHBaseAdmin().disableTable(hdt.getTableName());
334     HTU.deleteTable(hdt.getTableName());
335   }
336 
337   @Test(timeout = 120000)
338   public void testChangeTable() throws Exception {
339     HTableDescriptor hdt = HTU.createTableDescriptor("testChangeTable");
340     hdt.setRegionReplication(NB_SERVERS);
341     hdt.addCoprocessor(SlowMeCopro.class.getName());
342     Table table = HTU.createTable(hdt, new byte[][]{f}, HTU.getConfiguration());
343 
344     // basic test: it should work.
345     Put p = new Put(row);
346     p.add(f, row, row);
347     table.put(p);
348 
349     Get g = new Get(row);
350     Result r = table.get(g);
351     Assert.assertFalse(r.isStale());
352 
353     // Add a CF, it should work.
354     HTableDescriptor bHdt = HTU.getHBaseAdmin().getTableDescriptor(hdt.getTableName());
355     HColumnDescriptor hcd = new HColumnDescriptor(row);
356     hdt.addFamily(hcd);
357     HTU.getHBaseAdmin().disableTable(hdt.getTableName());
358     HTU.getHBaseAdmin().modifyTable(hdt.getTableName(), hdt);
359     HTU.getHBaseAdmin().enableTable(hdt.getTableName());
360     HTableDescriptor nHdt = HTU.getHBaseAdmin().getTableDescriptor(hdt.getTableName());
361     Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()),
362             bHdt.getColumnFamilies().length + 1, nHdt.getColumnFamilies().length);
363 
364     p = new Put(row);
365     p.add(row, row, row);
366     table.put(p);
367 
368     g = new Get(row);
369     r = table.get(g);
370     Assert.assertFalse(r.isStale());
371 
372     try {
373       SlowMeCopro.cdl.set(new CountDownLatch(1));
374       g = new Get(row);
375       g.setConsistency(Consistency.TIMELINE);
376       r = table.get(g);
377       Assert.assertTrue(r.isStale());
378     } finally {
379       SlowMeCopro.cdl.get().countDown();
380       SlowMeCopro.sleepTime.set(0);
381     }
382 
383     HTU.getHBaseCluster().stopMaster(0);
384     Admin admin = new HBaseAdmin(HTU.getConfiguration());
385     nHdt = admin.getTableDescriptor(hdt.getTableName());
386     Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()),
387             bHdt.getColumnFamilies().length + 1, nHdt.getColumnFamilies().length);
388 
389     admin.disableTable(hdt.getTableName());
390     admin.deleteTable(hdt.getTableName());
391     HTU.getHBaseCluster().startMaster();
392     admin.close();
393   }
394 
395   @SuppressWarnings("deprecation")
396   @Test(timeout = 300000)
397   public void testReplicaAndReplication() throws Exception {
398     HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaAndReplication");
399     hdt.setRegionReplication(NB_SERVERS);
400 
401     HColumnDescriptor fam = new HColumnDescriptor(row);
402     fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
403     hdt.addFamily(fam);
404 
405     hdt.addCoprocessor(SlowMeCopro.class.getName());
406     HTU.getHBaseAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
407 
408     Configuration conf2 = HBaseConfiguration.create(HTU.getConfiguration());
409     conf2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
410     conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
411     MiniZooKeeperCluster miniZK = HTU.getZkCluster();
412 
413     HTU2 = new HBaseTestingUtility(conf2);
414     HTU2.setZkCluster(miniZK);
415     HTU2.startMiniCluster(NB_SERVERS);
416     LOG.info("Setup second Zk");
417     HTU2.getHBaseAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
418 
419     ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
420     admin.addPeer("2", HTU2.getClusterKey());
421     admin.close();
422 
423     Put p = new Put(row);
424     p.add(row, row, row);
425     final Table table = new HTable(HTU.getConfiguration(), hdt.getTableName());
426     table.put(p);
427 
428     HTU.getHBaseAdmin().flush(table.getName());
429     LOG.info("Put & flush done on the first cluster. Now doing a get on the same cluster.");
430 
431     Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() {
432       @Override
433       public boolean evaluate() throws Exception {
434         try {
435           SlowMeCopro.cdl.set(new CountDownLatch(1));
436           Get g = new Get(row);
437           g.setConsistency(Consistency.TIMELINE);
438           Result r = table.get(g);
439           Assert.assertTrue(r.isStale());
440           return !r.isEmpty();
441         } finally {
442           SlowMeCopro.cdl.get().countDown();
443           SlowMeCopro.sleepTime.set(0);
444         }
445       }
446     });
447     table.close();
448     LOG.info("stale get on the first cluster done. Now for the second.");
449 
450     final Table table2 = new HTable(HTU.getConfiguration(), hdt.getTableName());
451     Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() {
452       @Override
453       public boolean evaluate() throws Exception {
454         try {
455           SlowMeCopro.cdl.set(new CountDownLatch(1));
456           Get g = new Get(row);
457           g.setConsistency(Consistency.TIMELINE);
458           Result r = table2.get(g);
459           Assert.assertTrue(r.isStale());
460           return !r.isEmpty();
461         } finally {
462           SlowMeCopro.cdl.get().countDown();
463           SlowMeCopro.sleepTime.set(0);
464         }
465       }
466     });
467     table2.close();
468 
469     HTU.getHBaseAdmin().disableTable(hdt.getTableName());
470     HTU.deleteTable(hdt.getTableName());
471 
472     HTU2.getHBaseAdmin().disableTable(hdt.getTableName());
473     HTU2.deleteTable(hdt.getTableName());
474 
475     // We shutdown HTU2 minicluster later, in afterClass(), as shutting down
476     // the minicluster has negative impact of deleting all HConnections in JVM.
477   }
478 
479   @Test(timeout = 30000)
480   public void testBulkLoad() throws IOException {
481     // Create table then get the single region for our new table.
482     LOG.debug("Creating test table");
483     HTableDescriptor hdt = HTU.createTableDescriptor("testBulkLoad");
484     hdt.setRegionReplication(NB_SERVERS);
485     hdt.addCoprocessor(SlowMeCopro.class.getName());
486     Table table = HTU.createTable(hdt, new byte[][]{f}, HTU.getConfiguration());
487 
488     // create hfiles to load.
489     LOG.debug("Creating test data");
490     Path dir = HTU.getDataTestDirOnTestFS("testBulkLoad");
491     final int numRows = 10;
492     final byte[] qual = Bytes.toBytes("qual");
493     final byte[] val = Bytes.toBytes("val");
494     final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], String>>();
495     for (HColumnDescriptor col : hdt.getColumnFamilies()) {
496       Path hfile = new Path(dir, col.getNameAsString());
497       TestHRegionServerBulkLoad.createHFile(HTU.getTestFileSystem(), hfile, col.getName(),
498               qual, val, numRows);
499       famPaths.add(new Pair<byte[], String>(col.getName(), hfile.toString()));
500     }
501 
502     // bulk load HFiles
503     LOG.debug("Loading test data");
504     @SuppressWarnings("deprecation")
505     final HConnection conn = HTU.getHBaseAdmin().getConnection();
506     RegionServerCallable<Void> callable = new RegionServerCallable<Void>(
507             conn, hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0)) {
508       @Override
509       public Void call(int timeout) throws Exception {
510         LOG.debug("Going to connect to server " + getLocation() + " for row "
511                 + Bytes.toStringBinary(getRow()));
512         byte[] regionName = getLocation().getRegionInfo().getRegionName();
513         BulkLoadHFileRequest request =
514                 RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true);
515         getStub().bulkLoadHFile(null, request);
516         return null;
517       }
518     };
519     RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(HTU.getConfiguration());
520     RpcRetryingCaller<Void> caller = factory.<Void>newCaller();
521     caller.callWithRetries(callable, 10000);
522 
523     // verify we can read them from the primary
524     LOG.debug("Verifying data load");
525     for (int i = 0; i < numRows; i++) {
526       byte[] row = TestHRegionServerBulkLoad.rowkey(i);
527       Get g = new Get(row);
528       Result r = table.get(g);
529       Assert.assertFalse(r.isStale());
530     }
531 
532     // verify we can read them from the replica
533     LOG.debug("Verifying replica queries");
534     try {
535       SlowMeCopro.cdl.set(new CountDownLatch(1));
536       for (int i = 0; i < numRows; i++) {
537         byte[] row = TestHRegionServerBulkLoad.rowkey(i);
538         Get g = new Get(row);
539         g.setConsistency(Consistency.TIMELINE);
540         Result r = table.get(g);
541         Assert.assertTrue(r.isStale());
542       }
543       SlowMeCopro.cdl.get().countDown();
544     } finally {
545       SlowMeCopro.cdl.get().countDown();
546       SlowMeCopro.sleepTime.set(0);
547     }
548 
549     HTU.getHBaseAdmin().disableTable(hdt.getTableName());
550     HTU.deleteTable(hdt.getTableName());
551   }
552 
553   // This test is to test when hbase.client.metaReplicaCallTimeout.scan is configured, meta table
554   // scan will always get the result from primary meta region as long as the result is returned
555   // within configured hbase.client.metaReplicaCallTimeout.scan from primary meta region.
556   @Test
557   public void testGetRegionLocationFromPrimaryMetaRegion() throws IOException, InterruptedException {
558     HTU.getHBaseAdmin().setBalancerRunning(false, true);
559 
560     ((ConnectionManager.HConnectionImplementation) HTU.getHBaseAdmin().getConnection()).
561         setUseMetaReplicas(true);
562 
563     // Create table then get the single region for our new table.
564     HTableDescriptor hdt = HTU.createTableDescriptor("testGetRegionLocationFromPrimaryMetaRegion");
565     hdt.setRegionReplication(2);
566     try {
567 
568       HTU.createTable(hdt, new byte[][] { f }, null);
569 
570       RegionServerHostingPrimayMetaRegionSlowCopro.slowDownPrimaryMetaScan = true;
571 
572       // Get user table location, always get it from the primary meta replica
573       RegionLocations url = ((ClusterConnection) HTU.getConnection())
574           .locateRegion(hdt.getTableName(), row, false, false);
575 
576     } finally {
577       RegionServerHostingPrimayMetaRegionSlowCopro.slowDownPrimaryMetaScan = false;
578       ((ConnectionManager.HConnectionImplementation) HTU.getHBaseAdmin().getConnection()).
579           setUseMetaReplicas(false);
580       HTU.getHBaseAdmin().setBalancerRunning(true, true);
581       HTU.getHBaseAdmin().disableTable(hdt.getTableName());
582       HTU.deleteTable(hdt.getTableName());
583     }
584   }
585 
586 
587   // This test is to simulate the case that the meta region and the primary user region
588   // are down, hbase client is able to access user replica regions and return stale data.
589   // Meta replica is enabled to show the case that the meta replica region could be out of sync
590   // with the primary meta region.
591   @Test
592   public void testReplicaGetWithPrimaryAndMetaDown() throws IOException, InterruptedException {
593     HTU.getHBaseAdmin().setBalancerRunning(false, true);
594 
595     ((ConnectionManager.HConnectionImplementation)HTU.getHBaseAdmin().getConnection()).
596         setUseMetaReplicas(true);
597 
598     // Create table then get the single region for our new table.
599     HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaGetWithPrimaryAndMetaDown");
600     hdt.setRegionReplication(2);
601     try {
602 
603       Table table = HTU.createTable(hdt, new byte[][] { f }, null);
604 
605       // Get Meta location
606       RegionLocations mrl = ((ClusterConnection) HTU.getConnection())
607           .locateRegion(TableName.META_TABLE_NAME,
608               HConstants.EMPTY_START_ROW, false, false);
609 
610       // Get user table location
611       RegionLocations url = ((ClusterConnection) HTU.getConnection())
612           .locateRegion(hdt.getTableName(), row, false, false);
613 
614       // Make sure that user primary region is co-hosted with the meta region
615       if (!url.getDefaultRegionLocation().getServerName().equals(
616           mrl.getDefaultRegionLocation().getServerName())) {
617         HTU.moveRegionAndWait(url.getDefaultRegionLocation().getRegionInfo(),
618             mrl.getDefaultRegionLocation().getServerName());
619       }
620 
621       // Make sure that the user replica region is not hosted by the same region server with
622       // primary
623       if (url.getRegionLocation(1).getServerName().equals(mrl.getDefaultRegionLocation()
624           .getServerName())) {
625         HTU.moveRegionAndWait(url.getRegionLocation(1).getRegionInfo(),
626             url.getDefaultRegionLocation().getServerName());
627       }
628 
629       // Wait until the meta table is updated with new location info
630       while (true) {
631         mrl = ((ClusterConnection) HTU.getConnection())
632             .locateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW, false, false);
633 
634         // Get user table location
635         url = ((ClusterConnection) HTU.getConnection())
636             .locateRegion(hdt.getTableName(), row, false, true);
637 
638         LOG.info("meta locations " + mrl);
639         LOG.info("table locations " + url);
640         ServerName a = url.getDefaultRegionLocation().getServerName();
641         ServerName b = mrl.getDefaultRegionLocation().getServerName();
642         if(a.equals(b)) {
643           break;
644         } else {
645           LOG.info("Waiting for new region info to be updated in meta table");
646           Thread.sleep(100);
647         }
648       }
649 
650       Put p = new Put(row);
651       p.addColumn(f, row, row);
652       table.put(p);
653 
654       // Flush so it can be picked by the replica refresher thread
655       HTU.flush(table.getName());
656 
657       // Sleep for some time until data is picked up by replicas
658       try {
659         Thread.sleep(2 * REFRESH_PERIOD);
660       } catch (InterruptedException e1) {
661         LOG.error(e1);
662       }
663 
664       // Simulating the RS down
665       RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = true;
666 
667       // The first Get is supposed to succeed
668       Get g = new Get(row);
669       g.setConsistency(Consistency.TIMELINE);
670       Result r = table.get(g);
671       Assert.assertTrue(r.isStale());
672 
673       // The second Get will succeed as well
674       r = table.get(g);
675       Assert.assertTrue(r.isStale());
676 
677     } finally {
678       ((ConnectionManager.HConnectionImplementation)HTU.getHBaseAdmin().getConnection()).
679           setUseMetaReplicas(false);
680       RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = false;
681       HTU.getHBaseAdmin().setBalancerRunning(true, true);
682       HTU.getHBaseAdmin().disableTable(hdt.getTableName());
683       HTU.deleteTable(hdt.getTableName());
684     }
685   }
686 
687   @Test
688   public void testReplicaGetWithPrimaryDown() throws IOException {
689     // Create table then get the single region for our new table.
690     HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable");
691     hdt.setRegionReplication(NB_SERVERS);
692     hdt.addCoprocessor(RegionServerStoppedCopro.class.getName());
693     try {
694       // Retry less so it can fail faster
695       HTU.getConfiguration().setInt("hbase.client.retries.number", 1);
696 
697       Table table = HTU.createTable(hdt, new byte[][] { f }, null);
698 
699       Put p = new Put(row);
700       p.addColumn(f, row, row);
701       table.put(p);
702 
703       // Flush so it can be picked by the replica refresher thread
704       HTU.flush(table.getName());
705 
706       // Sleep for some time until data is picked up by replicas
707       try {
708         Thread.sleep(2 * REFRESH_PERIOD);
709       } catch (InterruptedException e1) {
710         LOG.error(e1);
711       }
712 
713       // But if we ask for stale we will get it
714       Get g = new Get(row);
715       g.setConsistency(Consistency.TIMELINE);
716       Result r = table.get(g);
717       Assert.assertTrue(r.isStale());
718     } finally {
719       HTU.getConfiguration().unset("hbase.client.retries.number");
720       HTU.getHBaseAdmin().disableTable(hdt.getTableName());
721       HTU.deleteTable(hdt.getTableName());
722     }
723   }
724 
725   @Test
726   public void testReplicaScanWithPrimaryDown() throws IOException {
727     // Create table then get the single region for our new table.
728     HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable");
729     hdt.setRegionReplication(NB_SERVERS);
730     hdt.addCoprocessor(RegionServerStoppedCopro.class.getName());
731 
732     try {
733       // Retry less so it can fail faster
734       HTU.getConfiguration().setInt("hbase.client.retries.number", 1);
735 
736       Table table = HTU.createTable(hdt, new byte[][] { f }, null);
737 
738       Put p = new Put(row);
739       p.addColumn(f, row, row);
740       table.put(p);
741 
742       // Flush so it can be picked by the replica refresher thread
743       HTU.flush(table.getName());
744 
745       // Sleep for some time until data is picked up by replicas
746       try {
747         Thread.sleep(2 * REFRESH_PERIOD);
748       } catch (InterruptedException e1) {
749         LOG.error(e1);
750       }
751 
752       // But if we ask for stale we will get it
753       // Instantiating the Scan class
754       Scan scan = new Scan();
755 
756       // Scanning the required columns
757       scan.addFamily(f);
758       scan.setConsistency(Consistency.TIMELINE);
759 
760       // Getting the scan result
761       ResultScanner scanner = table.getScanner(scan);
762 
763       Result r = scanner.next();
764 
765       Assert.assertTrue(r.isStale());
766     } finally {
767       HTU.getConfiguration().unset("hbase.client.retries.number");
768       HTU.getHBaseAdmin().disableTable(hdt.getTableName());
769       HTU.deleteTable(hdt.getTableName());
770     }
771   }
772 }