1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.client;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.fs.Path;
27 import org.apache.hadoop.hbase.Cell;
28 import org.apache.hadoop.hbase.HBaseConfiguration;
29 import org.apache.hadoop.hbase.HBaseTestingUtility;
30 import org.apache.hadoop.hbase.HColumnDescriptor;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.HTableDescriptor;
33 import org.apache.hadoop.hbase.RegionLocations;
34 import org.apache.hadoop.hbase.ServerName;
35 import org.apache.hadoop.hbase.TableName;
36 import org.apache.hadoop.hbase.testclassification.MediumTests;
37 import org.apache.hadoop.hbase.RegionLocations;
38 import org.apache.hadoop.hbase.Waiter;
39 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
40 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
41 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
42 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
43 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
44 import org.apache.hadoop.hbase.protobuf.RequestConverter;
45 import org.apache.hadoop.hbase.regionserver.RegionScanner;
46 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
47 import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
48 import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
49 import org.apache.hadoop.hbase.util.Bytes;
50 import org.apache.hadoop.hbase.util.Pair;
51 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
52 import org.junit.AfterClass;
53 import org.junit.Assert;
54 import org.junit.BeforeClass;
55 import org.junit.Test;
56 import org.junit.experimental.categories.Category;
57
58 import java.io.IOException;
59 import java.util.ArrayList;
60 import java.util.Arrays;
61 import java.util.List;
62 import java.util.concurrent.CountDownLatch;
63 import java.util.concurrent.TimeUnit;
64 import java.util.concurrent.atomic.AtomicLong;
65 import java.util.concurrent.atomic.AtomicReference;
66
67 @Category(MediumTests.class)
68 public class TestReplicaWithCluster {
69 private static final Log LOG = LogFactory.getLog(TestReplicaWithCluster.class);
70
71 private static final int NB_SERVERS = 3;
72 private static final byte[] row = TestReplicaWithCluster.class.getName().getBytes();
73 private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
74
75
76 private static HBaseTestingUtility HTU2;
77 private static final byte[] f = HConstants.CATALOG_FAMILY;
78
79 private final static int REFRESH_PERIOD = 1000;
80 private final static int META_SCAN_TIMEOUT_IN_MILLISEC = 200;
81
82
83
84
85 public static class SlowMeCopro extends BaseRegionObserver {
86 static final AtomicLong sleepTime = new AtomicLong(0);
87 static final AtomicReference<CountDownLatch> cdl =
88 new AtomicReference<CountDownLatch>(new CountDownLatch(0));
89
90 public SlowMeCopro() {
91 }
92
93 @Override
94 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
95 final Get get, final List<Cell> results) throws IOException {
96
97 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
98 CountDownLatch latch = cdl.get();
99 try {
100 if (sleepTime.get() > 0) {
101 LOG.info("Sleeping for " + sleepTime.get() + " ms");
102 Thread.sleep(sleepTime.get());
103 } else if (latch.getCount() > 0) {
104 LOG.info("Waiting for the counterCountDownLatch");
105 latch.await(2, TimeUnit.MINUTES);
106 if (latch.getCount() > 0) {
107 throw new RuntimeException("Can't wait more");
108 }
109 }
110 } catch (InterruptedException e1) {
111 LOG.error(e1);
112 }
113 } else {
114 LOG.info("We're not the primary replicas.");
115 }
116 }
117 }
118
119
120
121
122 public static class RegionServerHostingPrimayMetaRegionSlowCopro extends BaseRegionObserver {
123 static boolean slowDownPrimaryMetaScan = false;
124
125 @Override
126 public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
127 final Scan scan, final RegionScanner s) throws IOException {
128
129 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
130
131
132 if (slowDownPrimaryMetaScan && (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion()
133 && (replicaId == 0))) {
134 LOG.info("Scan with primary meta region, slow down a bit");
135 try {
136 Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50);
137 } catch (InterruptedException ie) {
138
139 }
140
141 }
142 return null;
143 }
144 }
145
146
147
148
149
150 public static class RegionServerHostingPrimayMetaRegionSlowOrStopCopro extends BaseRegionObserver {
151 static boolean slowDownPrimaryMetaScan = false;
152 static boolean throwException = false;
153
154 @Override
155 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
156 final Get get, final List<Cell> results) throws IOException {
157
158 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
159
160
161 if (throwException) {
162 if (!e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) {
163 LOG.info("Get, throw Region Server Stopped Exceptoin for region " + e.getEnvironment()
164 .getRegion().getRegionInfo());
165 throw new RegionServerStoppedException("Server " +
166 e.getEnvironment().getRegionServerServices().getServerName() + " not running");
167 }
168 } else {
169 LOG.info("Get, We're replica region " + replicaId);
170 }
171 }
172
173 @Override
174 public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
175 final Scan scan, final RegionScanner s) throws IOException {
176
177 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
178
179
180 if (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) {
181 if (slowDownPrimaryMetaScan) {
182 LOG.info("Scan with primary meta region, slow down a bit");
183 try {
184 Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50);
185 } catch (InterruptedException ie) {
186
187 }
188 }
189
190
191 if (throwException) {
192 LOG.info("Scan, throw Region Server Stopped Exceptoin for replica " + e.getEnvironment()
193 .getRegion().getRegionInfo());
194
195 throw new RegionServerStoppedException("Server " +
196 e.getEnvironment().getRegionServerServices().getServerName() + " not running");
197 } else {
198 LOG.info("Scan, We're replica region " + replicaId);
199 }
200 } else {
201 LOG.info("Scan, We're replica region " + replicaId);
202 }
203
204 return null;
205 }
206 }
207
208
209
210
211 public static class RegionServerStoppedCopro extends BaseRegionObserver {
212
213 public RegionServerStoppedCopro() {
214 }
215
216 @Override
217 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
218 final Get get, final List<Cell> results) throws IOException {
219
220 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
221
222
223 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
224 LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId);
225 throw new RegionServerStoppedException("Server " +
226 e.getEnvironment().getRegionServerServices().getServerName()
227 + " not running");
228 } else {
229 LOG.info("We're replica region " + replicaId);
230 }
231 }
232
233 @Override
234 public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
235 final Scan scan, final RegionScanner s) throws IOException {
236
237 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
238
239
240 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
241 LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId);
242 throw new RegionServerStoppedException("Server " +
243 e.getEnvironment().getRegionServerServices().getServerName()
244 + " not running");
245 } else {
246 LOG.info("We're replica region " + replicaId);
247 }
248
249 return null;
250 }
251 }
252
253 @BeforeClass
254 public static void beforeClass() throws Exception {
255
256 HTU.getConfiguration().setInt(
257 StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, REFRESH_PERIOD);
258
259 HTU.getConfiguration().setFloat("hbase.regionserver.logroll.multiplier", 0.0001f);
260 HTU.getConfiguration().setInt("replication.source.size.capacity", 10240);
261 HTU.getConfiguration().setLong("replication.source.sleepforretries", 100);
262 HTU.getConfiguration().setInt("hbase.regionserver.maxlogs", 2);
263 HTU.getConfiguration().setLong("hbase.master.logcleaner.ttl", 10);
264 HTU.getConfiguration().setInt("zookeeper.recovery.retry", 1);
265 HTU.getConfiguration().setInt("zookeeper.recovery.retry.intervalmill", 10);
266
267
268
269 HTU.getConfiguration().setInt("hbase.meta.replica.count", 2);
270
271
272 HTU.getConfiguration().set("hbase.balancer.tablesOnMaster", "none");
273
274
275 HTU.getConfiguration().set("hbase.coprocessor.region.classes",
276 RegionServerHostingPrimayMetaRegionSlowCopro.class.getName());
277
278 HTU.getConfiguration().setInt(HConstants.HBASE_CLIENT_MEAT_REPLICA_SCAN_TIMEOUT,
279 META_SCAN_TIMEOUT_IN_MILLISEC * 1000);
280
281
282 HTU.getConfiguration().set("hbase.coprocessor.region.classes",
283 RegionServerHostingPrimayMetaRegionSlowOrStopCopro.class.getName());
284
285 HTU.getConfiguration().setInt(HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
286 META_SCAN_TIMEOUT_IN_MILLISEC * 1000);
287
288
289 HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.get", 1000000);
290 HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.scan", 1000000);
291
292 HTU.startMiniCluster(NB_SERVERS);
293 HTU.getHBaseCluster().startMaster();
294 }
295
296 @AfterClass
297 public static void afterClass() throws Exception {
298 if (HTU2 != null) {
299 HTU2.shutdownMiniCluster();
300 }
301 HTU.shutdownMiniCluster();
302 }
303
304 @Test(timeout = 30000)
305 public void testCreateDeleteTable() throws IOException {
306
307 HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable");
308 hdt.setRegionReplication(NB_SERVERS);
309 hdt.addCoprocessor(SlowMeCopro.class.getName());
310 Table table = HTU.createTable(hdt, new byte[][]{f}, HTU.getConfiguration());
311
312 Put p = new Put(row);
313 p.add(f, row, row);
314 table.put(p);
315
316 Get g = new Get(row);
317 Result r = table.get(g);
318 Assert.assertFalse(r.isStale());
319
320 try {
321
322 SlowMeCopro.cdl.set(new CountDownLatch(1));
323 g = new Get(row);
324 g.setConsistency(Consistency.TIMELINE);
325 r = table.get(g);
326 Assert.assertTrue(r.isStale());
327 SlowMeCopro.cdl.get().countDown();
328 } finally {
329 SlowMeCopro.cdl.get().countDown();
330 SlowMeCopro.sleepTime.set(0);
331 }
332
333 HTU.getHBaseAdmin().disableTable(hdt.getTableName());
334 HTU.deleteTable(hdt.getTableName());
335 }
336
337 @Test(timeout = 120000)
338 public void testChangeTable() throws Exception {
339 HTableDescriptor hdt = HTU.createTableDescriptor("testChangeTable");
340 hdt.setRegionReplication(NB_SERVERS);
341 hdt.addCoprocessor(SlowMeCopro.class.getName());
342 Table table = HTU.createTable(hdt, new byte[][]{f}, HTU.getConfiguration());
343
344
345 Put p = new Put(row);
346 p.add(f, row, row);
347 table.put(p);
348
349 Get g = new Get(row);
350 Result r = table.get(g);
351 Assert.assertFalse(r.isStale());
352
353
354 HTableDescriptor bHdt = HTU.getHBaseAdmin().getTableDescriptor(hdt.getTableName());
355 HColumnDescriptor hcd = new HColumnDescriptor(row);
356 hdt.addFamily(hcd);
357 HTU.getHBaseAdmin().disableTable(hdt.getTableName());
358 HTU.getHBaseAdmin().modifyTable(hdt.getTableName(), hdt);
359 HTU.getHBaseAdmin().enableTable(hdt.getTableName());
360 HTableDescriptor nHdt = HTU.getHBaseAdmin().getTableDescriptor(hdt.getTableName());
361 Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()),
362 bHdt.getColumnFamilies().length + 1, nHdt.getColumnFamilies().length);
363
364 p = new Put(row);
365 p.add(row, row, row);
366 table.put(p);
367
368 g = new Get(row);
369 r = table.get(g);
370 Assert.assertFalse(r.isStale());
371
372 try {
373 SlowMeCopro.cdl.set(new CountDownLatch(1));
374 g = new Get(row);
375 g.setConsistency(Consistency.TIMELINE);
376 r = table.get(g);
377 Assert.assertTrue(r.isStale());
378 } finally {
379 SlowMeCopro.cdl.get().countDown();
380 SlowMeCopro.sleepTime.set(0);
381 }
382
383 HTU.getHBaseCluster().stopMaster(0);
384 Admin admin = new HBaseAdmin(HTU.getConfiguration());
385 nHdt = admin.getTableDescriptor(hdt.getTableName());
386 Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()),
387 bHdt.getColumnFamilies().length + 1, nHdt.getColumnFamilies().length);
388
389 admin.disableTable(hdt.getTableName());
390 admin.deleteTable(hdt.getTableName());
391 HTU.getHBaseCluster().startMaster();
392 admin.close();
393 }
394
395 @SuppressWarnings("deprecation")
396 @Test(timeout = 300000)
397 public void testReplicaAndReplication() throws Exception {
398 HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaAndReplication");
399 hdt.setRegionReplication(NB_SERVERS);
400
401 HColumnDescriptor fam = new HColumnDescriptor(row);
402 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
403 hdt.addFamily(fam);
404
405 hdt.addCoprocessor(SlowMeCopro.class.getName());
406 HTU.getHBaseAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
407
408 Configuration conf2 = HBaseConfiguration.create(HTU.getConfiguration());
409 conf2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
410 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
411 MiniZooKeeperCluster miniZK = HTU.getZkCluster();
412
413 HTU2 = new HBaseTestingUtility(conf2);
414 HTU2.setZkCluster(miniZK);
415 HTU2.startMiniCluster(NB_SERVERS);
416 LOG.info("Setup second Zk");
417 HTU2.getHBaseAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
418
419 ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
420 admin.addPeer("2", HTU2.getClusterKey());
421 admin.close();
422
423 Put p = new Put(row);
424 p.add(row, row, row);
425 final Table table = new HTable(HTU.getConfiguration(), hdt.getTableName());
426 table.put(p);
427
428 HTU.getHBaseAdmin().flush(table.getName());
429 LOG.info("Put & flush done on the first cluster. Now doing a get on the same cluster.");
430
431 Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() {
432 @Override
433 public boolean evaluate() throws Exception {
434 try {
435 SlowMeCopro.cdl.set(new CountDownLatch(1));
436 Get g = new Get(row);
437 g.setConsistency(Consistency.TIMELINE);
438 Result r = table.get(g);
439 Assert.assertTrue(r.isStale());
440 return !r.isEmpty();
441 } finally {
442 SlowMeCopro.cdl.get().countDown();
443 SlowMeCopro.sleepTime.set(0);
444 }
445 }
446 });
447 table.close();
448 LOG.info("stale get on the first cluster done. Now for the second.");
449
450 final Table table2 = new HTable(HTU.getConfiguration(), hdt.getTableName());
451 Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() {
452 @Override
453 public boolean evaluate() throws Exception {
454 try {
455 SlowMeCopro.cdl.set(new CountDownLatch(1));
456 Get g = new Get(row);
457 g.setConsistency(Consistency.TIMELINE);
458 Result r = table2.get(g);
459 Assert.assertTrue(r.isStale());
460 return !r.isEmpty();
461 } finally {
462 SlowMeCopro.cdl.get().countDown();
463 SlowMeCopro.sleepTime.set(0);
464 }
465 }
466 });
467 table2.close();
468
469 HTU.getHBaseAdmin().disableTable(hdt.getTableName());
470 HTU.deleteTable(hdt.getTableName());
471
472 HTU2.getHBaseAdmin().disableTable(hdt.getTableName());
473 HTU2.deleteTable(hdt.getTableName());
474
475
476
477 }
478
479 @Test(timeout = 30000)
480 public void testBulkLoad() throws IOException {
481
482 LOG.debug("Creating test table");
483 HTableDescriptor hdt = HTU.createTableDescriptor("testBulkLoad");
484 hdt.setRegionReplication(NB_SERVERS);
485 hdt.addCoprocessor(SlowMeCopro.class.getName());
486 Table table = HTU.createTable(hdt, new byte[][]{f}, HTU.getConfiguration());
487
488
489 LOG.debug("Creating test data");
490 Path dir = HTU.getDataTestDirOnTestFS("testBulkLoad");
491 final int numRows = 10;
492 final byte[] qual = Bytes.toBytes("qual");
493 final byte[] val = Bytes.toBytes("val");
494 final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], String>>();
495 for (HColumnDescriptor col : hdt.getColumnFamilies()) {
496 Path hfile = new Path(dir, col.getNameAsString());
497 TestHRegionServerBulkLoad.createHFile(HTU.getTestFileSystem(), hfile, col.getName(),
498 qual, val, numRows);
499 famPaths.add(new Pair<byte[], String>(col.getName(), hfile.toString()));
500 }
501
502
503 LOG.debug("Loading test data");
504 @SuppressWarnings("deprecation")
505 final HConnection conn = HTU.getHBaseAdmin().getConnection();
506 RegionServerCallable<Void> callable = new RegionServerCallable<Void>(
507 conn, hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0)) {
508 @Override
509 public Void call(int timeout) throws Exception {
510 LOG.debug("Going to connect to server " + getLocation() + " for row "
511 + Bytes.toStringBinary(getRow()));
512 byte[] regionName = getLocation().getRegionInfo().getRegionName();
513 BulkLoadHFileRequest request =
514 RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true);
515 getStub().bulkLoadHFile(null, request);
516 return null;
517 }
518 };
519 RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(HTU.getConfiguration());
520 RpcRetryingCaller<Void> caller = factory.<Void>newCaller();
521 caller.callWithRetries(callable, 10000);
522
523
524 LOG.debug("Verifying data load");
525 for (int i = 0; i < numRows; i++) {
526 byte[] row = TestHRegionServerBulkLoad.rowkey(i);
527 Get g = new Get(row);
528 Result r = table.get(g);
529 Assert.assertFalse(r.isStale());
530 }
531
532
533 LOG.debug("Verifying replica queries");
534 try {
535 SlowMeCopro.cdl.set(new CountDownLatch(1));
536 for (int i = 0; i < numRows; i++) {
537 byte[] row = TestHRegionServerBulkLoad.rowkey(i);
538 Get g = new Get(row);
539 g.setConsistency(Consistency.TIMELINE);
540 Result r = table.get(g);
541 Assert.assertTrue(r.isStale());
542 }
543 SlowMeCopro.cdl.get().countDown();
544 } finally {
545 SlowMeCopro.cdl.get().countDown();
546 SlowMeCopro.sleepTime.set(0);
547 }
548
549 HTU.getHBaseAdmin().disableTable(hdt.getTableName());
550 HTU.deleteTable(hdt.getTableName());
551 }
552
553
554
555
556 @Test
557 public void testGetRegionLocationFromPrimaryMetaRegion() throws IOException, InterruptedException {
558 HTU.getHBaseAdmin().setBalancerRunning(false, true);
559
560 ((ConnectionManager.HConnectionImplementation) HTU.getHBaseAdmin().getConnection()).
561 setUseMetaReplicas(true);
562
563
564 HTableDescriptor hdt = HTU.createTableDescriptor("testGetRegionLocationFromPrimaryMetaRegion");
565 hdt.setRegionReplication(2);
566 try {
567
568 HTU.createTable(hdt, new byte[][] { f }, null);
569
570 RegionServerHostingPrimayMetaRegionSlowCopro.slowDownPrimaryMetaScan = true;
571
572
573 RegionLocations url = ((ClusterConnection) HTU.getConnection())
574 .locateRegion(hdt.getTableName(), row, false, false);
575
576 } finally {
577 RegionServerHostingPrimayMetaRegionSlowCopro.slowDownPrimaryMetaScan = false;
578 ((ConnectionManager.HConnectionImplementation) HTU.getHBaseAdmin().getConnection()).
579 setUseMetaReplicas(false);
580 HTU.getHBaseAdmin().setBalancerRunning(true, true);
581 HTU.getHBaseAdmin().disableTable(hdt.getTableName());
582 HTU.deleteTable(hdt.getTableName());
583 }
584 }
585
586
587
588
589
590
591 @Test
592 public void testReplicaGetWithPrimaryAndMetaDown() throws IOException, InterruptedException {
593 HTU.getHBaseAdmin().setBalancerRunning(false, true);
594
595 ((ConnectionManager.HConnectionImplementation)HTU.getHBaseAdmin().getConnection()).
596 setUseMetaReplicas(true);
597
598
599 HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaGetWithPrimaryAndMetaDown");
600 hdt.setRegionReplication(2);
601 try {
602
603 Table table = HTU.createTable(hdt, new byte[][] { f }, null);
604
605
606 RegionLocations mrl = ((ClusterConnection) HTU.getConnection())
607 .locateRegion(TableName.META_TABLE_NAME,
608 HConstants.EMPTY_START_ROW, false, false);
609
610
611 RegionLocations url = ((ClusterConnection) HTU.getConnection())
612 .locateRegion(hdt.getTableName(), row, false, false);
613
614
615 if (!url.getDefaultRegionLocation().getServerName().equals(
616 mrl.getDefaultRegionLocation().getServerName())) {
617 HTU.moveRegionAndWait(url.getDefaultRegionLocation().getRegionInfo(),
618 mrl.getDefaultRegionLocation().getServerName());
619 }
620
621
622
623 if (url.getRegionLocation(1).getServerName().equals(mrl.getDefaultRegionLocation()
624 .getServerName())) {
625 HTU.moveRegionAndWait(url.getRegionLocation(1).getRegionInfo(),
626 url.getDefaultRegionLocation().getServerName());
627 }
628
629
630 while (true) {
631 mrl = ((ClusterConnection) HTU.getConnection())
632 .locateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW, false, false);
633
634
635 url = ((ClusterConnection) HTU.getConnection())
636 .locateRegion(hdt.getTableName(), row, false, true);
637
638 LOG.info("meta locations " + mrl);
639 LOG.info("table locations " + url);
640 ServerName a = url.getDefaultRegionLocation().getServerName();
641 ServerName b = mrl.getDefaultRegionLocation().getServerName();
642 if(a.equals(b)) {
643 break;
644 } else {
645 LOG.info("Waiting for new region info to be updated in meta table");
646 Thread.sleep(100);
647 }
648 }
649
650 Put p = new Put(row);
651 p.addColumn(f, row, row);
652 table.put(p);
653
654
655 HTU.flush(table.getName());
656
657
658 try {
659 Thread.sleep(2 * REFRESH_PERIOD);
660 } catch (InterruptedException e1) {
661 LOG.error(e1);
662 }
663
664
665 RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = true;
666
667
668 Get g = new Get(row);
669 g.setConsistency(Consistency.TIMELINE);
670 Result r = table.get(g);
671 Assert.assertTrue(r.isStale());
672
673
674 r = table.get(g);
675 Assert.assertTrue(r.isStale());
676
677 } finally {
678 ((ConnectionManager.HConnectionImplementation)HTU.getHBaseAdmin().getConnection()).
679 setUseMetaReplicas(false);
680 RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = false;
681 HTU.getHBaseAdmin().setBalancerRunning(true, true);
682 HTU.getHBaseAdmin().disableTable(hdt.getTableName());
683 HTU.deleteTable(hdt.getTableName());
684 }
685 }
686
687 @Test
688 public void testReplicaGetWithPrimaryDown() throws IOException {
689
690 HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable");
691 hdt.setRegionReplication(NB_SERVERS);
692 hdt.addCoprocessor(RegionServerStoppedCopro.class.getName());
693 try {
694
695 HTU.getConfiguration().setInt("hbase.client.retries.number", 1);
696
697 Table table = HTU.createTable(hdt, new byte[][] { f }, null);
698
699 Put p = new Put(row);
700 p.addColumn(f, row, row);
701 table.put(p);
702
703
704 HTU.flush(table.getName());
705
706
707 try {
708 Thread.sleep(2 * REFRESH_PERIOD);
709 } catch (InterruptedException e1) {
710 LOG.error(e1);
711 }
712
713
714 Get g = new Get(row);
715 g.setConsistency(Consistency.TIMELINE);
716 Result r = table.get(g);
717 Assert.assertTrue(r.isStale());
718 } finally {
719 HTU.getConfiguration().unset("hbase.client.retries.number");
720 HTU.getHBaseAdmin().disableTable(hdt.getTableName());
721 HTU.deleteTable(hdt.getTableName());
722 }
723 }
724
725 @Test
726 public void testReplicaScanWithPrimaryDown() throws IOException {
727
728 HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable");
729 hdt.setRegionReplication(NB_SERVERS);
730 hdt.addCoprocessor(RegionServerStoppedCopro.class.getName());
731
732 try {
733
734 HTU.getConfiguration().setInt("hbase.client.retries.number", 1);
735
736 Table table = HTU.createTable(hdt, new byte[][] { f }, null);
737
738 Put p = new Put(row);
739 p.addColumn(f, row, row);
740 table.put(p);
741
742
743 HTU.flush(table.getName());
744
745
746 try {
747 Thread.sleep(2 * REFRESH_PERIOD);
748 } catch (InterruptedException e1) {
749 LOG.error(e1);
750 }
751
752
753
754 Scan scan = new Scan();
755
756
757 scan.addFamily(f);
758 scan.setConsistency(Consistency.TIMELINE);
759
760
761 ResultScanner scanner = table.getScanner(scan);
762
763 Result r = scanner.next();
764
765 Assert.assertTrue(r.isStale());
766 } finally {
767 HTU.getConfiguration().unset("hbase.client.retries.number");
768 HTU.getHBaseAdmin().disableTable(hdt.getTableName());
769 HTU.deleteTable(hdt.getTableName());
770 }
771 }
772 }