1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.replication;
19
20 import static org.junit.Assert.assertArrayEquals;
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertNotNull;
23 import static org.junit.Assert.fail;
24
25 import java.io.Closeable;
26 import java.io.IOException;
27 import java.util.Arrays;
28 import java.util.List;
29 import java.util.Random;
30 import java.util.concurrent.CountDownLatch;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.fs.FileSystem;
36 import org.apache.hadoop.fs.Path;
37 import org.apache.hadoop.hbase.Cell;
38 import org.apache.hadoop.hbase.HBaseConfiguration;
39 import org.apache.hadoop.hbase.HBaseTestingUtility;
40 import org.apache.hadoop.hbase.HColumnDescriptor;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.HTableDescriptor;
43 import org.apache.hadoop.hbase.KeyValue;
44 import org.apache.hadoop.hbase.MiniHBaseCluster;
45 import org.apache.hadoop.hbase.testclassification.LargeTests;
46 import org.apache.hadoop.hbase.TableName;
47 import org.apache.hadoop.hbase.client.Admin;
48 import org.apache.hadoop.hbase.client.Delete;
49 import org.apache.hadoop.hbase.client.Durability;
50 import org.apache.hadoop.hbase.client.Get;
51 import org.apache.hadoop.hbase.client.HBaseAdmin;
52 import org.apache.hadoop.hbase.client.HTable;
53 import org.apache.hadoop.hbase.client.Put;
54 import org.apache.hadoop.hbase.client.Result;
55 import org.apache.hadoop.hbase.client.Table;
56 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
57 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
58 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
59 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
60 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
61 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
62 import org.apache.hadoop.hbase.regionserver.HRegion;
63 import org.apache.hadoop.hbase.regionserver.HRegionServer;
64 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
65 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
66 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
67 import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
68 import org.apache.hadoop.hbase.util.Bytes;
69 import org.apache.hadoop.hbase.util.HFileTestUtil;
70 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
71 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
72 import org.junit.After;
73 import org.junit.Before;
74 import org.junit.Test;
75 import org.junit.experimental.categories.Category;
76
77 import com.google.protobuf.ServiceException;
78
79 @Category(LargeTests.class)
80 public class TestMasterReplication {
81
82 private static final Log LOG = LogFactory.getLog(TestReplicationBase.class);
83
84 private Configuration baseConfiguration;
85
86 private HBaseTestingUtility[] utilities;
87 private Configuration[] configurations;
88 private MiniZooKeeperCluster miniZK;
89
90 private static final long SLEEP_TIME = 500;
91 private static final int NB_RETRIES = 10;
92
93 private static final TableName tableName = TableName.valueOf("test");
94 private static final byte[] famName = Bytes.toBytes("f");
95 private static final byte[] famName1 = Bytes.toBytes("f1");
96 private static final byte[] row = Bytes.toBytes("row");
97 private static final byte[] row1 = Bytes.toBytes("row1");
98 private static final byte[] row2 = Bytes.toBytes("row2");
99 private static final byte[] row3 = Bytes.toBytes("row3");
100 private static final byte[] row4 = Bytes.toBytes("row4");
101 private static final byte[] noRepfamName = Bytes.toBytes("norep");
102
103 private static final byte[] count = Bytes.toBytes("count");
104 private static final byte[] put = Bytes.toBytes("put");
105 private static final byte[] delete = Bytes.toBytes("delete");
106
107 private HTableDescriptor table;
108
109 @Before
110 public void setUp() throws Exception {
111 baseConfiguration = HBaseConfiguration.create();
112
113
114 baseConfiguration.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20);
115 baseConfiguration.setInt("replication.source.size.capacity", 1024);
116 baseConfiguration.setLong("replication.source.sleepforretries", 100);
117 baseConfiguration.setInt("hbase.regionserver.maxlogs", 10);
118 baseConfiguration.setLong("hbase.master.logcleaner.ttl", 10);
119 baseConfiguration.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
120 HConstants.REPLICATION_ENABLE_DEFAULT);
121 baseConfiguration.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
122 baseConfiguration.set("hbase.replication.source.fs.conf.provider",
123 TestSourceFSConfigurationProvider.class.getCanonicalName());
124 baseConfiguration.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
125 baseConfiguration.setBoolean("dfs.support.append", true);
126 baseConfiguration.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
127 baseConfiguration.setStrings(
128 CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
129 CoprocessorCounter.class.getName());
130
131 table = new HTableDescriptor(tableName);
132 HColumnDescriptor fam = new HColumnDescriptor(famName);
133 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
134 table.addFamily(fam);
135 fam = new HColumnDescriptor(famName1);
136 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
137 table.addFamily(fam);
138 fam = new HColumnDescriptor(noRepfamName);
139 table.addFamily(fam);
140 }
141
142
143
144
145
146
147
148 @Test(timeout = 300000)
149 public void testCyclicReplication1() throws Exception {
150 LOG.info("testSimplePutDelete");
151 int numClusters = 2;
152 Table[] htables = null;
153 try {
154 htables = setUpClusterTablesAndPeers(numClusters);
155
156 int[] expectedCounts = new int[] { 2, 2 };
157
158
159
160 putAndWait(row, famName, htables[0], htables[1]);
161 putAndWait(row1, famName, htables[1], htables[0]);
162 validateCounts(htables, put, expectedCounts);
163
164 deleteAndWait(row, htables[0], htables[1]);
165 deleteAndWait(row1, htables[1], htables[0]);
166 validateCounts(htables, delete, expectedCounts);
167 } finally {
168 close(htables);
169 shutDownMiniClusters();
170 }
171 }
172
173
174
175
176
177 @Test(timeout = 300000)
178 public void testHFileCyclicReplication() throws Exception {
179 LOG.info("testHFileCyclicReplication");
180 int numClusters = 2;
181 Table[] htables = null;
182 try {
183 htables = setUpClusterTablesAndPeers(numClusters);
184
185
186
187 byte[][][] hfileRanges =
188 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
189 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, };
190 int numOfRows = 100;
191 int[] expectedCounts =
192 new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
193
194 loadAndValidateHFileReplication("testHFileCyclicReplication_01", 0, new int[] { 1 }, row,
195 famName, htables, hfileRanges, numOfRows, expectedCounts, true);
196
197
198
199 hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") },
200 new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, };
201 numOfRows = 200;
202 int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0],
203 hfileRanges.length * numOfRows + expectedCounts[1] };
204
205 loadAndValidateHFileReplication("testHFileCyclicReplication_10", 1, new int[] { 0 }, row,
206 famName, htables, hfileRanges, numOfRows, newExpectedCounts, true);
207
208 } finally {
209 close(htables);
210 shutDownMiniClusters();
211 }
212 }
213
214 private Table[] setUpClusterTablesAndPeers(int numClusters) throws Exception {
215 Table[] htables;
216 startMiniClusters(numClusters);
217 createTableOnClusters(table);
218
219 htables = getHTablesOnClusters(tableName);
220
221 addPeer("1", 0, 1);
222 addPeer("1", 1, 0);
223 return htables;
224 }
225
226
227
228
229
230
231
232
233 @Test(timeout = 300000)
234 public void testCyclicReplication2() throws Exception {
235 LOG.info("testCyclicReplication1");
236 int numClusters = 3;
237 Table[] htables = null;
238 try {
239 startMiniClusters(numClusters);
240 createTableOnClusters(table);
241
242
243 addPeer("1", 0, 1);
244 addPeer("1", 1, 2);
245 addPeer("1", 2, 0);
246
247 htables = getHTablesOnClusters(tableName);
248
249
250 putAndWait(row, famName, htables[0], htables[2]);
251 putAndWait(row1, famName, htables[1], htables[0]);
252 putAndWait(row2, famName, htables[2], htables[1]);
253
254 deleteAndWait(row, htables[0], htables[2]);
255 deleteAndWait(row1, htables[1], htables[0]);
256 deleteAndWait(row2, htables[2], htables[1]);
257
258 int[] expectedCounts = new int[] { 3, 3, 3 };
259 validateCounts(htables, put, expectedCounts);
260 validateCounts(htables, delete, expectedCounts);
261
262
263 disablePeer("1", 2);
264
265
266 putAndWait(row3, famName, htables[0], htables[1]);
267
268 htables[1].put(new Put(row4).add(famName, row4, row4));
269
270 enablePeer("1", 2);
271
272
273
274 wait(row4, htables[0], true);
275 } finally {
276 close(htables);
277 shutDownMiniClusters();
278 }
279 }
280
281
282
283
284
285 @Test(timeout = 300000)
286 public void testHFileMultiSlaveReplication() throws Exception {
287 LOG.info("testHFileMultiSlaveReplication");
288 int numClusters = 3;
289 Table[] htables = null;
290 try {
291 startMiniClusters(numClusters);
292 createTableOnClusters(table);
293
294
295 addPeer("1", 0, 1);
296
297 htables = getHTablesOnClusters(tableName);
298
299
300
301 byte[][][] hfileRanges =
302 new byte[][][] { new byte[][] { Bytes.toBytes("mmmm"), Bytes.toBytes("oooo") },
303 new byte[][] { Bytes.toBytes("ppp"), Bytes.toBytes("rrr") }, };
304 int numOfRows = 100;
305
306 int[] expectedCounts =
307 new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
308
309 loadAndValidateHFileReplication("testHFileCyclicReplication_0", 0, new int[] { 1 }, row,
310 famName, htables, hfileRanges, numOfRows, expectedCounts, true);
311
312
313 assertEquals(0, utilities[2].countRows(htables[2]));
314
315 rollWALAndWait(utilities[0], htables[0].getName(), row);
316
317
318 addPeer("2", 0, 2);
319
320
321
322 hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("ssss"), Bytes.toBytes("uuuu") },
323 new byte[][] { Bytes.toBytes("vvv"), Bytes.toBytes("xxx") }, };
324 numOfRows = 200;
325
326 int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0],
327 hfileRanges.length * numOfRows + expectedCounts[1], hfileRanges.length * numOfRows };
328
329 loadAndValidateHFileReplication("testHFileCyclicReplication_1", 0, new int[] { 1, 2 }, row,
330 famName, htables, hfileRanges, numOfRows, newExpectedCounts, true);
331
332 } finally {
333 close(htables);
334 shutDownMiniClusters();
335 }
336 }
337
338
339
340
341
342
343 @Test(timeout = 300000)
344 public void testHFileReplicationForConfiguredTableCfs() throws Exception {
345 LOG.info("testHFileReplicationForConfiguredTableCfs");
346 int numClusters = 2;
347 Table[] htables = null;
348 try {
349 startMiniClusters(numClusters);
350 createTableOnClusters(table);
351
352 htables = getHTablesOnClusters(tableName);
353
354 addPeer("1", 0, 1, tableName.getNameAsString() + ":" + Bytes.toString(famName));
355
356
357 byte[][][] hfileRanges =
358 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
359 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, };
360 int numOfRows = 100;
361 int[] expectedCounts =
362 new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
363
364 loadAndValidateHFileReplication("load_f", 0, new int[] { 1 }, row, famName, htables,
365 hfileRanges, numOfRows, expectedCounts, true);
366
367
368 hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") },
369 new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, };
370 numOfRows = 100;
371
372 int[] newExpectedCounts =
373 new int[] { hfileRanges.length * numOfRows + expectedCounts[0], expectedCounts[1] };
374
375 loadAndValidateHFileReplication("load_f1", 0, new int[] { 1 }, row, famName1, htables,
376 hfileRanges, numOfRows, newExpectedCounts, false);
377
378
379
380
381 wait(0, htables[0], hfileRanges.length * numOfRows + expectedCounts[0]);
382
383
384
385 Thread.sleep((NB_RETRIES / 2) * SLEEP_TIME);
386
387 wait(1, htables[1], expectedCounts[1]);
388 } finally {
389 close(htables);
390 shutDownMiniClusters();
391 }
392 }
393
394
395
396
397 @Test(timeout = 300000)
398 public void testCyclicReplication3() throws Exception {
399 LOG.info("testCyclicReplication2");
400 int numClusters = 3;
401 Table[] htables = null;
402 try {
403 startMiniClusters(numClusters);
404 createTableOnClusters(table);
405
406
407 addPeer("1", 0, 1);
408 addPeer("1", 1, 2);
409 addPeer("1", 2, 1);
410
411 htables = getHTablesOnClusters(tableName);
412
413
414 putAndWait(row, famName, htables[0], htables[2]);
415 putAndWait(row1, famName, htables[1], htables[2]);
416 putAndWait(row2, famName, htables[2], htables[1]);
417
418 deleteAndWait(row, htables[0], htables[2]);
419 deleteAndWait(row1, htables[1], htables[2]);
420 deleteAndWait(row2, htables[2], htables[1]);
421
422 int[] expectedCounts = new int[] { 1, 3, 3 };
423 validateCounts(htables, put, expectedCounts);
424 validateCounts(htables, delete, expectedCounts);
425 } finally {
426 close(htables);
427 shutDownMiniClusters();
428 }
429 }
430
431
432
433
434
435 @Test(timeout = 180000, expected = ServiceException.class)
436 public void testReplicateWALEntryWhenReplicationIsDisabled() throws Exception {
437 LOG.info("testSimplePutDelete");
438 baseConfiguration.setBoolean(HConstants.REPLICATION_ENABLE_KEY, false);
439 Table[] htables = null;
440 try {
441 startMiniClusters(1);
442 createTableOnClusters(table);
443 htables = getHTablesOnClusters(tableName);
444
445 HRegionServer rs = utilities[0].getRSForFirstRegionInTable(tableName);
446 RSRpcServices rsrpc = new RSRpcServices(rs);
447 rsrpc.replicateWALEntry(null, null);
448 } finally {
449 close(htables);
450 shutDownMiniClusters();
451 }
452 }
453
454 @After
455 public void tearDown() throws IOException {
456 configurations = null;
457 utilities = null;
458 }
459
460 @SuppressWarnings("resource")
461 private void startMiniClusters(int numClusters) throws Exception {
462 Random random = new Random();
463 utilities = new HBaseTestingUtility[numClusters];
464 configurations = new Configuration[numClusters];
465 for (int i = 0; i < numClusters; i++) {
466 Configuration conf = new Configuration(baseConfiguration);
467 conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + i + random.nextInt());
468 HBaseTestingUtility utility = new HBaseTestingUtility(conf);
469 if (i == 0) {
470 utility.startMiniZKCluster();
471 miniZK = utility.getZkCluster();
472 } else {
473 utility.setZkCluster(miniZK);
474 }
475 utility.startMiniCluster();
476 utilities[i] = utility;
477 configurations[i] = conf;
478 new ZooKeeperWatcher(conf, "cluster" + i, null, true);
479 }
480 }
481
482 private void shutDownMiniClusters() throws Exception {
483 int numClusters = utilities.length;
484 for (int i = numClusters - 1; i >= 0; i--) {
485 if (utilities[i] != null) {
486 utilities[i].shutdownMiniCluster();
487 }
488 }
489 miniZK.shutdown();
490 }
491
492 private void createTableOnClusters(HTableDescriptor table) throws Exception {
493 int numClusters = configurations.length;
494 for (int i = 0; i < numClusters; i++) {
495 Admin hbaseAdmin = null;
496 try {
497 hbaseAdmin = new HBaseAdmin(configurations[i]);
498 hbaseAdmin.createTable(table);
499 } finally {
500 close(hbaseAdmin);
501 }
502 }
503 }
504
505 private void addPeer(String id, int masterClusterNumber,
506 int slaveClusterNumber) throws Exception {
507 ReplicationAdmin replicationAdmin = null;
508 try {
509 replicationAdmin = new ReplicationAdmin(
510 configurations[masterClusterNumber]);
511 replicationAdmin.addPeer(id,
512 utilities[slaveClusterNumber].getClusterKey());
513 } finally {
514 close(replicationAdmin);
515 }
516 }
517
518 private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs)
519 throws Exception {
520 ReplicationAdmin replicationAdmin = null;
521 try {
522 replicationAdmin = new ReplicationAdmin(configurations[masterClusterNumber]);
523 replicationAdmin.addPeer(id, utilities[slaveClusterNumber].getClusterKey(), tableCfs);
524 } finally {
525 close(replicationAdmin);
526 }
527 }
528
529 private void disablePeer(String id, int masterClusterNumber) throws Exception {
530 ReplicationAdmin replicationAdmin = null;
531 try {
532 replicationAdmin = new ReplicationAdmin(
533 configurations[masterClusterNumber]);
534 replicationAdmin.disablePeer(id);
535 } finally {
536 close(replicationAdmin);
537 }
538 }
539
540 private void enablePeer(String id, int masterClusterNumber) throws Exception {
541 ReplicationAdmin replicationAdmin = null;
542 try {
543 replicationAdmin = new ReplicationAdmin(
544 configurations[masterClusterNumber]);
545 replicationAdmin.enablePeer(id);
546 } finally {
547 close(replicationAdmin);
548 }
549 }
550
551 private void close(Closeable... closeables) {
552 try {
553 if (closeables != null) {
554 for (Closeable closeable : closeables) {
555 closeable.close();
556 }
557 }
558 } catch (Exception e) {
559 LOG.warn("Exception occured while closing the object:", e);
560 }
561 }
562
563 @SuppressWarnings("resource")
564 private Table[] getHTablesOnClusters(TableName tableName) throws Exception {
565 int numClusters = utilities.length;
566 Table[] htables = new Table[numClusters];
567 for (int i = 0; i < numClusters; i++) {
568 Table htable = new HTable(configurations[i], tableName);
569 htable.setWriteBufferSize(1024);
570 htables[i] = htable;
571 }
572 return htables;
573 }
574
575 private void validateCounts(Table[] htables, byte[] type,
576 int[] expectedCounts) throws IOException {
577 for (int i = 0; i < htables.length; i++) {
578 assertEquals(Bytes.toString(type) + " were replicated back ",
579 expectedCounts[i], getCount(htables[i], type));
580 }
581 }
582
583 private int getCount(Table t, byte[] type) throws IOException {
584 Get test = new Get(row);
585 test.setAttribute("count", new byte[] {});
586 Result res = t.get(test);
587 return Bytes.toInt(res.getValue(count, type));
588 }
589
590 private void deleteAndWait(byte[] row, Table source, Table target)
591 throws Exception {
592 Delete del = new Delete(row);
593 source.delete(del);
594 wait(row, target, true);
595 }
596
597 private void putAndWait(byte[] row, byte[] fam, Table source, Table target)
598 throws Exception {
599 Put put = new Put(row);
600 put.add(fam, row, row);
601 source.put(put);
602 wait(row, target, false);
603 }
604
605 private void loadAndValidateHFileReplication(String testName, int masterNumber,
606 int[] slaveNumbers, byte[] row, byte[] fam, Table[] tables, byte[][][] hfileRanges,
607 int numOfRows, int[] expectedCounts, boolean toValidate) throws Exception {
608 HBaseTestingUtility util = utilities[masterNumber];
609
610 Path dir = util.getDataTestDirOnTestFS(testName);
611 FileSystem fs = util.getTestFileSystem();
612 dir = dir.makeQualified(fs);
613 Path familyDir = new Path(dir, Bytes.toString(fam));
614
615 int hfileIdx = 0;
616 for (byte[][] range : hfileRanges) {
617 byte[] from = range[0];
618 byte[] to = range[1];
619 HFileTestUtil.createHFile(util.getConfiguration(), fs,
620 new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows);
621 }
622
623 Table source = tables[masterNumber];
624 final TableName tableName = source.getName();
625 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
626 String[] args = { dir.toString(), tableName.toString() };
627 loader.run(args);
628
629 if (toValidate) {
630 for (int slaveClusterNumber : slaveNumbers) {
631 wait(slaveClusterNumber, tables[slaveClusterNumber], expectedCounts[slaveClusterNumber]);
632 }
633 }
634 }
635
636 private void wait(int slaveNumber, Table target, int expectedCount)
637 throws IOException, InterruptedException {
638 int count = 0;
639 for (int i = 0; i < NB_RETRIES; i++) {
640 if (i == NB_RETRIES - 1) {
641 fail("Waited too much time for bulkloaded data replication. Current count=" + count
642 + ", expected count=" + expectedCount);
643 }
644 count = utilities[slaveNumber].countRows(target);
645 if (count != expectedCount) {
646 LOG.info("Waiting more time for bulkloaded data replication.");
647 Thread.sleep(SLEEP_TIME);
648 } else {
649 break;
650 }
651 }
652 }
653
654 private void wait(byte[] row, Table target, boolean isDeleted) throws Exception {
655 Get get = new Get(row);
656 for (int i = 0; i < NB_RETRIES; i++) {
657 if (i == NB_RETRIES - 1) {
658 fail("Waited too much time for replication. Row:" + Bytes.toString(row)
659 + ". IsDeleteReplication:" + isDeleted);
660 }
661 Result res = target.get(get);
662 boolean sleep = isDeleted ? res.size() > 0 : res.size() == 0;
663 if (sleep) {
664 LOG.info("Waiting for more time for replication. Row:"
665 + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted);
666 Thread.sleep(SLEEP_TIME);
667 } else {
668 if (!isDeleted) {
669 assertArrayEquals(res.value(), row);
670 }
671 LOG.info("Obtained row:"
672 + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted);
673 break;
674 }
675 }
676 }
677
678 private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table,
679 final byte[] row) throws IOException {
680 final Admin admin = utility.getHBaseAdmin();
681 final MiniHBaseCluster cluster = utility.getMiniHBaseCluster();
682
683
684 HRegion region = null;
685 for (HRegion candidate : cluster.getRegions(table)) {
686 if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) {
687 region = candidate;
688 break;
689 }
690 }
691 assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region);
692
693 final CountDownLatch latch = new CountDownLatch(1);
694
695
696 final WALActionsListener listener = new WALActionsListener.Base() {
697 @Override
698 public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
699 latch.countDown();
700 }
701 };
702 region.getWAL().registerWALActionsListener(listener);
703
704
705 admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDesc().getTableName(),
706 region.getRegionInfo().getRegionName()));
707
708
709 try {
710 latch.await();
711 } catch (InterruptedException exception) {
712 LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later " +
713 "replication tests fail, it's probably because we should still be waiting.");
714 Thread.currentThread().interrupt();
715 }
716 region.getWAL().unregisterWALActionsListener(listener);
717 }
718
719
720
721
722
723 public static class CoprocessorCounter extends BaseRegionObserver {
724 private int nCount = 0;
725 private int nDelete = 0;
726
727 @Override
728 public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
729 final WALEdit edit, final Durability durability) throws IOException {
730 nCount++;
731 }
732
733 @Override
734 public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
735 final Delete delete, final WALEdit edit, final Durability durability) throws IOException {
736 nDelete++;
737 }
738
739 @Override
740 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
741 final Get get, final List<Cell> result) throws IOException {
742 if (get.getAttribute("count") != null) {
743 result.clear();
744
745 result.add(new KeyValue(count, count, delete, Bytes.toBytes(nDelete)));
746 result.add(new KeyValue(count, count, put, Bytes.toBytes(nCount)));
747 c.bypass();
748 }
749 }
750 }
751
752 }