View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.replication;
19  
20  import static org.junit.Assert.assertArrayEquals;
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertNotNull;
23  import static org.junit.Assert.fail;
24  
25  import java.io.Closeable;
26  import java.io.IOException;
27  import java.util.Arrays;
28  import java.util.List;
29  import java.util.Random;
30  import java.util.concurrent.CountDownLatch;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.fs.FileSystem;
36  import org.apache.hadoop.fs.Path;
37  import org.apache.hadoop.hbase.Cell;
38  import org.apache.hadoop.hbase.HBaseConfiguration;
39  import org.apache.hadoop.hbase.HBaseTestingUtility;
40  import org.apache.hadoop.hbase.HColumnDescriptor;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HTableDescriptor;
43  import org.apache.hadoop.hbase.KeyValue;
44  import org.apache.hadoop.hbase.MiniHBaseCluster;
45  import org.apache.hadoop.hbase.testclassification.LargeTests;
46  import org.apache.hadoop.hbase.TableName;
47  import org.apache.hadoop.hbase.client.Admin;
48  import org.apache.hadoop.hbase.client.Delete;
49  import org.apache.hadoop.hbase.client.Durability;
50  import org.apache.hadoop.hbase.client.Get;
51  import org.apache.hadoop.hbase.client.HBaseAdmin;
52  import org.apache.hadoop.hbase.client.HTable;
53  import org.apache.hadoop.hbase.client.Put;
54  import org.apache.hadoop.hbase.client.Result;
55  import org.apache.hadoop.hbase.client.Table;
56  import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
57  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
58  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
59  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
60  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
61  import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
62  import org.apache.hadoop.hbase.regionserver.HRegion;
63  import org.apache.hadoop.hbase.regionserver.HRegionServer;
64  import org.apache.hadoop.hbase.regionserver.RSRpcServices;
65  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
66  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
67  import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
68  import org.apache.hadoop.hbase.util.Bytes;
69  import org.apache.hadoop.hbase.util.HFileTestUtil;
70  import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
71  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
72  import org.junit.After;
73  import org.junit.Before;
74  import org.junit.Test;
75  import org.junit.experimental.categories.Category;
76  
77  import com.google.protobuf.ServiceException;
78  
79  @Category(LargeTests.class)
80  public class TestMasterReplication {
81  
82    private static final Log LOG = LogFactory.getLog(TestReplicationBase.class);
83  
84    private Configuration baseConfiguration;
85  
86    private HBaseTestingUtility[] utilities;
87    private Configuration[] configurations;
88    private MiniZooKeeperCluster miniZK;
89  
90    private static final long SLEEP_TIME = 500;
91    private static final int NB_RETRIES = 10;
92  
93    private static final TableName tableName = TableName.valueOf("test");
94    private static final byte[] famName = Bytes.toBytes("f");
95    private static final byte[] famName1 = Bytes.toBytes("f1");
96    private static final byte[] row = Bytes.toBytes("row");
97    private static final byte[] row1 = Bytes.toBytes("row1");
98    private static final byte[] row2 = Bytes.toBytes("row2");
99    private static final byte[] row3 = Bytes.toBytes("row3");
100   private static final byte[] row4 = Bytes.toBytes("row4");
101   private static final byte[] noRepfamName = Bytes.toBytes("norep");
102 
103   private static final byte[] count = Bytes.toBytes("count");
104   private static final byte[] put = Bytes.toBytes("put");
105   private static final byte[] delete = Bytes.toBytes("delete");
106 
107   private HTableDescriptor table;
108 
109   @Before
110   public void setUp() throws Exception {
111     baseConfiguration = HBaseConfiguration.create();
112     // smaller block size and capacity to trigger more operations
113     // and test them
114     baseConfiguration.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20);
115     baseConfiguration.setInt("replication.source.size.capacity", 1024);
116     baseConfiguration.setLong("replication.source.sleepforretries", 100);
117     baseConfiguration.setInt("hbase.regionserver.maxlogs", 10);
118     baseConfiguration.setLong("hbase.master.logcleaner.ttl", 10);
119     baseConfiguration.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
120       HConstants.REPLICATION_ENABLE_DEFAULT);
121     baseConfiguration.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
122     baseConfiguration.set("hbase.replication.source.fs.conf.provider",
123       TestSourceFSConfigurationProvider.class.getCanonicalName());
124     baseConfiguration.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
125     baseConfiguration.setBoolean("dfs.support.append", true);
126     baseConfiguration.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
127     baseConfiguration.setStrings(
128         CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
129         CoprocessorCounter.class.getName());
130 
131     table = new HTableDescriptor(tableName);
132     HColumnDescriptor fam = new HColumnDescriptor(famName);
133     fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
134     table.addFamily(fam);
135     fam = new HColumnDescriptor(famName1);
136     fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
137     table.addFamily(fam);
138     fam = new HColumnDescriptor(noRepfamName);
139     table.addFamily(fam);
140   }
141 
142   /**
143    * It tests the replication scenario involving 0 -> 1 -> 0. It does it by
144    * adding and deleting a row to a table in each cluster, checking if it's
145    * replicated. It also tests that the puts and deletes are not replicated back
146    * to the originating cluster.
147    */
148   @Test(timeout = 300000)
149   public void testCyclicReplication1() throws Exception {
150     LOG.info("testSimplePutDelete");
151     int numClusters = 2;
152     Table[] htables = null;
153     try {
154       htables = setUpClusterTablesAndPeers(numClusters);
155 
156       int[] expectedCounts = new int[] { 2, 2 };
157 
158       // add rows to both clusters,
159       // make sure they are both replication
160       putAndWait(row, famName, htables[0], htables[1]);
161       putAndWait(row1, famName, htables[1], htables[0]);
162       validateCounts(htables, put, expectedCounts);
163 
164       deleteAndWait(row, htables[0], htables[1]);
165       deleteAndWait(row1, htables[1], htables[0]);
166       validateCounts(htables, delete, expectedCounts);
167     } finally {
168       close(htables);
169       shutDownMiniClusters();
170     }
171   }
172 
173   /**
174    * It tests the replication scenario involving 0 -> 1 -> 0. It does it by bulk loading a set of
175    * HFiles to a table in each cluster, checking if it's replicated.
176    */
177   @Test(timeout = 300000)
178   public void testHFileCyclicReplication() throws Exception {
179     LOG.info("testHFileCyclicReplication");
180     int numClusters = 2;
181     Table[] htables = null;
182     try {
183       htables = setUpClusterTablesAndPeers(numClusters);
184 
185       // Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated
186       // to cluster '1'.
187       byte[][][] hfileRanges =
188           new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
189               new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, };
190       int numOfRows = 100;
191       int[] expectedCounts =
192           new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
193 
194       loadAndValidateHFileReplication("testHFileCyclicReplication_01", 0, new int[] { 1 }, row,
195         famName, htables, hfileRanges, numOfRows, expectedCounts, true);
196 
197       // Load 200 rows for each hfile range in cluster '1' and validate whether its been replicated
198       // to cluster '0'.
199       hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") },
200           new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, };
201       numOfRows = 200;
202       int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0],
203           hfileRanges.length * numOfRows + expectedCounts[1] };
204 
205       loadAndValidateHFileReplication("testHFileCyclicReplication_10", 1, new int[] { 0 }, row,
206         famName, htables, hfileRanges, numOfRows, newExpectedCounts, true);
207 
208     } finally {
209       close(htables);
210       shutDownMiniClusters();
211     }
212   }
213 
214   private Table[] setUpClusterTablesAndPeers(int numClusters) throws Exception {
215     Table[] htables;
216     startMiniClusters(numClusters);
217     createTableOnClusters(table);
218 
219     htables = getHTablesOnClusters(tableName);
220     // Test the replication scenarios of 0 -> 1 -> 0
221     addPeer("1", 0, 1);
222     addPeer("1", 1, 0);
223     return htables;
224   }
225 
226   /**
227    * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and deleting rows to a
228    * table in each clusters and ensuring that the each of these clusters get the appropriate
229    * mutations. It also tests the grouping scenario where a cluster needs to replicate the edits
230    * originating from itself and also the edits that it received using replication from a different
231    * cluster. The scenario is explained in HBASE-9158
232    */
233   @Test(timeout = 300000)
234   public void testCyclicReplication2() throws Exception {
235     LOG.info("testCyclicReplication1");
236     int numClusters = 3;
237     Table[] htables = null;
238     try {
239       startMiniClusters(numClusters);
240       createTableOnClusters(table);
241 
242       // Test the replication scenario of 0 -> 1 -> 2 -> 0
243       addPeer("1", 0, 1);
244       addPeer("1", 1, 2);
245       addPeer("1", 2, 0);
246 
247       htables = getHTablesOnClusters(tableName);
248 
249       // put "row" and wait 'til it got around
250       putAndWait(row, famName, htables[0], htables[2]);
251       putAndWait(row1, famName, htables[1], htables[0]);
252       putAndWait(row2, famName, htables[2], htables[1]);
253 
254       deleteAndWait(row, htables[0], htables[2]);
255       deleteAndWait(row1, htables[1], htables[0]);
256       deleteAndWait(row2, htables[2], htables[1]);
257 
258       int[] expectedCounts = new int[] { 3, 3, 3 };
259       validateCounts(htables, put, expectedCounts);
260       validateCounts(htables, delete, expectedCounts);
261 
262       // Test HBASE-9158
263       disablePeer("1", 2);
264       // we now have an edit that was replicated into cluster originating from
265       // cluster 0
266       putAndWait(row3, famName, htables[0], htables[1]);
267       // now add a local edit to cluster 1
268       htables[1].put(new Put(row4).add(famName, row4, row4));
269       // re-enable replication from cluster 2 to cluster 0
270       enablePeer("1", 2);
271       // without HBASE-9158 the edit for row4 would have been marked with
272       // cluster 0's id
273       // and hence not replicated to cluster 0
274       wait(row4, htables[0], true);
275     } finally {
276       close(htables);
277       shutDownMiniClusters();
278     }
279   }
280 
281   /**
282    * It tests the multi slave hfile replication scenario involving 0 -> 1, 2. It does it by bulk
283    * loading a set of HFiles to a table in master cluster, checking if it's replicated in its peers.
284    */
285   @Test(timeout = 300000)
286   public void testHFileMultiSlaveReplication() throws Exception {
287     LOG.info("testHFileMultiSlaveReplication");
288     int numClusters = 3;
289     Table[] htables = null;
290     try {
291       startMiniClusters(numClusters);
292       createTableOnClusters(table);
293 
294       // Add a slave, 0 -> 1
295       addPeer("1", 0, 1);
296 
297       htables = getHTablesOnClusters(tableName);
298 
299       // Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated
300       // to cluster '1'.
301       byte[][][] hfileRanges =
302           new byte[][][] { new byte[][] { Bytes.toBytes("mmmm"), Bytes.toBytes("oooo") },
303               new byte[][] { Bytes.toBytes("ppp"), Bytes.toBytes("rrr") }, };
304       int numOfRows = 100;
305 
306       int[] expectedCounts =
307           new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
308 
309       loadAndValidateHFileReplication("testHFileCyclicReplication_0", 0, new int[] { 1 }, row,
310         famName, htables, hfileRanges, numOfRows, expectedCounts, true);
311 
312       // Validate data is not replicated to cluster '2'.
313       assertEquals(0, utilities[2].countRows(htables[2]));
314 
315       rollWALAndWait(utilities[0], htables[0].getName(), row);
316 
317       // Add one more slave, 0 -> 2
318       addPeer("2", 0, 2);
319 
320       // Load 200 rows for each hfile range in cluster '0' and validate whether its been replicated
321       // to cluster '1' and '2'. Previous data should be replicated to cluster '2'.
322       hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("ssss"), Bytes.toBytes("uuuu") },
323           new byte[][] { Bytes.toBytes("vvv"), Bytes.toBytes("xxx") }, };
324       numOfRows = 200;
325 
326       int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0],
327           hfileRanges.length * numOfRows + expectedCounts[1], hfileRanges.length * numOfRows };
328 
329       loadAndValidateHFileReplication("testHFileCyclicReplication_1", 0, new int[] { 1, 2 }, row,
330         famName, htables, hfileRanges, numOfRows, newExpectedCounts, true);
331 
332     } finally {
333       close(htables);
334       shutDownMiniClusters();
335     }
336   }
337   
338   /**
339    * It tests the bulk loaded hfile replication scenario to only explicitly specified table column
340    * families. It does it by bulk loading a set of HFiles belonging to both the CFs of table and set
341    * only one CF data to replicate.
342    */
343   @Test(timeout = 300000)
344   public void testHFileReplicationForConfiguredTableCfs() throws Exception {
345     LOG.info("testHFileReplicationForConfiguredTableCfs");
346     int numClusters = 2;
347     Table[] htables = null;
348     try {
349       startMiniClusters(numClusters);
350       createTableOnClusters(table);
351 
352       htables = getHTablesOnClusters(tableName);
353       // Test the replication scenarios only 'f' is configured for table data replication not 'f1'
354       addPeer("1", 0, 1, tableName.getNameAsString() + ":" + Bytes.toString(famName));
355 
356       // Load 100 rows for each hfile range in cluster '0' for table CF 'f'
357       byte[][][] hfileRanges =
358           new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
359               new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, };
360       int numOfRows = 100;
361       int[] expectedCounts =
362           new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
363 
364       loadAndValidateHFileReplication("load_f", 0, new int[] { 1 }, row, famName, htables,
365         hfileRanges, numOfRows, expectedCounts, true);
366 
367       // Load 100 rows for each hfile range in cluster '0' for table CF 'f1'
368       hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") },
369           new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, };
370       numOfRows = 100;
371 
372       int[] newExpectedCounts =
373           new int[] { hfileRanges.length * numOfRows + expectedCounts[0], expectedCounts[1] };
374 
375       loadAndValidateHFileReplication("load_f1", 0, new int[] { 1 }, row, famName1, htables,
376         hfileRanges, numOfRows, newExpectedCounts, false);
377 
378       // Validate data replication for CF 'f1'
379 
380       // Source cluster table should contain data for the families
381       wait(0, htables[0], hfileRanges.length * numOfRows + expectedCounts[0]);
382 
383       // Sleep for enough time so that the data is still not replicated for the CF which is not
384       // configured for replication
385       Thread.sleep((NB_RETRIES / 2) * SLEEP_TIME);
386       // Peer cluster should have only configured CF data
387       wait(1, htables[1], expectedCounts[1]);
388     } finally {
389       close(htables);
390       shutDownMiniClusters();
391     }
392   }
393 
394   /**
395    * Tests cyclic replication scenario of 0 -> 1 -> 2 -> 1.
396    */
397   @Test(timeout = 300000)
398   public void testCyclicReplication3() throws Exception {
399     LOG.info("testCyclicReplication2");
400     int numClusters = 3;
401     Table[] htables = null;
402     try {
403       startMiniClusters(numClusters);
404       createTableOnClusters(table);
405 
406       // Test the replication scenario of 0 -> 1 -> 2 -> 1
407       addPeer("1", 0, 1);
408       addPeer("1", 1, 2);
409       addPeer("1", 2, 1);
410 
411       htables = getHTablesOnClusters(tableName);
412 
413       // put "row" and wait 'til it got around
414       putAndWait(row, famName, htables[0], htables[2]);
415       putAndWait(row1, famName, htables[1], htables[2]);
416       putAndWait(row2, famName, htables[2], htables[1]);
417 
418       deleteAndWait(row, htables[0], htables[2]);
419       deleteAndWait(row1, htables[1], htables[2]);
420       deleteAndWait(row2, htables[2], htables[1]);
421 
422       int[] expectedCounts = new int[] { 1, 3, 3 };
423       validateCounts(htables, put, expectedCounts);
424       validateCounts(htables, delete, expectedCounts);
425     } finally {
426       close(htables);
427       shutDownMiniClusters();
428     }
429   }
430 
431   /*
432    * Test RSRpcServices#replicateWALEntry when replication is disabled. This is to simulate
433    * HBASE-14840
434    */
435   @Test(timeout = 180000, expected = ServiceException.class)
436   public void testReplicateWALEntryWhenReplicationIsDisabled() throws Exception {
437     LOG.info("testSimplePutDelete");
438     baseConfiguration.setBoolean(HConstants.REPLICATION_ENABLE_KEY, false);
439     Table[] htables = null;
440     try {
441       startMiniClusters(1);
442       createTableOnClusters(table);
443       htables = getHTablesOnClusters(tableName);
444 
445       HRegionServer rs = utilities[0].getRSForFirstRegionInTable(tableName);
446       RSRpcServices rsrpc = new RSRpcServices(rs);
447       rsrpc.replicateWALEntry(null, null);
448     } finally {
449       close(htables);
450       shutDownMiniClusters();
451     }
452   }
453 
454   @After
455   public void tearDown() throws IOException {
456     configurations = null;
457     utilities = null;
458   }
459 
460   @SuppressWarnings("resource")
461   private void startMiniClusters(int numClusters) throws Exception {
462     Random random = new Random();
463     utilities = new HBaseTestingUtility[numClusters];
464     configurations = new Configuration[numClusters];
465     for (int i = 0; i < numClusters; i++) {
466       Configuration conf = new Configuration(baseConfiguration);
467       conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + i + random.nextInt());
468       HBaseTestingUtility utility = new HBaseTestingUtility(conf);
469       if (i == 0) {
470         utility.startMiniZKCluster();
471         miniZK = utility.getZkCluster();
472       } else {
473         utility.setZkCluster(miniZK);
474       }
475       utility.startMiniCluster();
476       utilities[i] = utility;
477       configurations[i] = conf;
478       new ZooKeeperWatcher(conf, "cluster" + i, null, true);
479     }
480   }
481 
482   private void shutDownMiniClusters() throws Exception {
483     int numClusters = utilities.length;
484     for (int i = numClusters - 1; i >= 0; i--) {
485       if (utilities[i] != null) {
486         utilities[i].shutdownMiniCluster();
487       }
488     }
489     miniZK.shutdown();
490   }
491 
492   private void createTableOnClusters(HTableDescriptor table) throws Exception {
493     int numClusters = configurations.length;
494     for (int i = 0; i < numClusters; i++) {
495       Admin hbaseAdmin = null;
496       try {
497         hbaseAdmin = new HBaseAdmin(configurations[i]);
498         hbaseAdmin.createTable(table);
499       } finally {
500         close(hbaseAdmin);
501       }
502     }
503   }
504 
505   private void addPeer(String id, int masterClusterNumber,
506       int slaveClusterNumber) throws Exception {
507     ReplicationAdmin replicationAdmin = null;
508     try {
509       replicationAdmin = new ReplicationAdmin(
510           configurations[masterClusterNumber]);
511       replicationAdmin.addPeer(id,
512           utilities[slaveClusterNumber].getClusterKey());
513     } finally {
514       close(replicationAdmin);
515     }
516   }
517   
518   private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs)
519       throws Exception {
520     ReplicationAdmin replicationAdmin = null;
521     try {
522       replicationAdmin = new ReplicationAdmin(configurations[masterClusterNumber]);
523       replicationAdmin.addPeer(id, utilities[slaveClusterNumber].getClusterKey(), tableCfs);
524     } finally {
525       close(replicationAdmin);
526     }
527   }
528 
529   private void disablePeer(String id, int masterClusterNumber) throws Exception {
530     ReplicationAdmin replicationAdmin = null;
531     try {
532       replicationAdmin = new ReplicationAdmin(
533           configurations[masterClusterNumber]);
534       replicationAdmin.disablePeer(id);
535     } finally {
536       close(replicationAdmin);
537     }
538   }
539 
540   private void enablePeer(String id, int masterClusterNumber) throws Exception {
541     ReplicationAdmin replicationAdmin = null;
542     try {
543       replicationAdmin = new ReplicationAdmin(
544           configurations[masterClusterNumber]);
545       replicationAdmin.enablePeer(id);
546     } finally {
547       close(replicationAdmin);
548     }
549   }
550 
551   private void close(Closeable... closeables) {
552     try {
553       if (closeables != null) {
554         for (Closeable closeable : closeables) {
555           closeable.close();
556         }
557       }
558     } catch (Exception e) {
559       LOG.warn("Exception occured while closing the object:", e);
560     }
561   }
562 
563   @SuppressWarnings("resource")
564   private Table[] getHTablesOnClusters(TableName tableName) throws Exception {
565     int numClusters = utilities.length;
566     Table[] htables = new Table[numClusters];
567     for (int i = 0; i < numClusters; i++) {
568       Table htable = new HTable(configurations[i], tableName);
569       htable.setWriteBufferSize(1024);
570       htables[i] = htable;
571     }
572     return htables;
573   }
574 
575   private void validateCounts(Table[] htables, byte[] type,
576       int[] expectedCounts) throws IOException {
577     for (int i = 0; i < htables.length; i++) {
578       assertEquals(Bytes.toString(type) + " were replicated back ",
579           expectedCounts[i], getCount(htables[i], type));
580     }
581   }
582 
583   private int getCount(Table t, byte[] type) throws IOException {
584     Get test = new Get(row);
585     test.setAttribute("count", new byte[] {});
586     Result res = t.get(test);
587     return Bytes.toInt(res.getValue(count, type));
588   }
589 
590   private void deleteAndWait(byte[] row, Table source, Table target)
591       throws Exception {
592     Delete del = new Delete(row);
593     source.delete(del);
594     wait(row, target, true);
595   }
596 
597   private void putAndWait(byte[] row, byte[] fam, Table source, Table target)
598       throws Exception {
599     Put put = new Put(row);
600     put.add(fam, row, row);
601     source.put(put);
602     wait(row, target, false);
603   }
604 
605   private void loadAndValidateHFileReplication(String testName, int masterNumber,
606       int[] slaveNumbers, byte[] row, byte[] fam, Table[] tables, byte[][][] hfileRanges,
607       int numOfRows, int[] expectedCounts, boolean toValidate) throws Exception {
608     HBaseTestingUtility util = utilities[masterNumber];
609 
610     Path dir = util.getDataTestDirOnTestFS(testName);
611     FileSystem fs = util.getTestFileSystem();
612     dir = dir.makeQualified(fs);
613     Path familyDir = new Path(dir, Bytes.toString(fam));
614 
615     int hfileIdx = 0;
616     for (byte[][] range : hfileRanges) {
617       byte[] from = range[0];
618       byte[] to = range[1];
619       HFileTestUtil.createHFile(util.getConfiguration(), fs,
620         new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows);
621     }
622 
623     Table source = tables[masterNumber];
624     final TableName tableName = source.getName();
625     LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
626     String[] args = { dir.toString(), tableName.toString() };
627     loader.run(args);
628 
629     if (toValidate) {
630       for (int slaveClusterNumber : slaveNumbers) {
631         wait(slaveClusterNumber, tables[slaveClusterNumber], expectedCounts[slaveClusterNumber]);
632       }
633     }
634   }
635 
636   private void wait(int slaveNumber, Table target, int expectedCount)
637       throws IOException, InterruptedException {
638     int count = 0;
639     for (int i = 0; i < NB_RETRIES; i++) {
640       if (i == NB_RETRIES - 1) {
641         fail("Waited too much time for bulkloaded data replication. Current count=" + count
642             + ", expected count=" + expectedCount);
643       }
644       count = utilities[slaveNumber].countRows(target);
645       if (count != expectedCount) {
646         LOG.info("Waiting more time for bulkloaded data replication.");
647         Thread.sleep(SLEEP_TIME);
648       } else {
649         break;
650       }
651     }
652   }
653 
654   private void wait(byte[] row, Table target, boolean isDeleted) throws Exception {
655     Get get = new Get(row);
656     for (int i = 0; i < NB_RETRIES; i++) {
657       if (i == NB_RETRIES - 1) {
658         fail("Waited too much time for replication. Row:" + Bytes.toString(row)
659             + ". IsDeleteReplication:" + isDeleted);
660       }
661       Result res = target.get(get);
662       boolean sleep = isDeleted ? res.size() > 0 : res.size() == 0;
663       if (sleep) {
664         LOG.info("Waiting for more time for replication. Row:"
665             + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted);
666         Thread.sleep(SLEEP_TIME);
667       } else {
668         if (!isDeleted) {
669           assertArrayEquals(res.value(), row);
670         }
671         LOG.info("Obtained row:"
672             + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted);
673         break;
674       }
675     }
676   }
677 
678   private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table,
679       final byte[] row) throws IOException {
680     final Admin admin = utility.getHBaseAdmin();
681     final MiniHBaseCluster cluster = utility.getMiniHBaseCluster();
682 
683     // find the region that corresponds to the given row.
684     HRegion region = null;
685     for (HRegion candidate : cluster.getRegions(table)) {
686       if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) {
687         region = candidate;
688         break;
689       }
690     }
691     assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region);
692 
693     final CountDownLatch latch = new CountDownLatch(1);
694 
695     // listen for successful log rolls
696     final WALActionsListener listener = new WALActionsListener.Base() {
697           @Override
698           public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
699             latch.countDown();
700           }
701         };
702     region.getWAL().registerWALActionsListener(listener);
703 
704     // request a roll
705     admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDesc().getTableName(),
706       region.getRegionInfo().getRegionName()));
707 
708     // wait
709     try {
710       latch.await();
711     } catch (InterruptedException exception) {
712       LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later " +
713           "replication tests fail, it's probably because we should still be waiting.");
714       Thread.currentThread().interrupt();
715     }
716     region.getWAL().unregisterWALActionsListener(listener);
717   }
718 
719   /**
720    * Use a coprocessor to count puts and deletes. as KVs would be replicated back with the same
721    * timestamp there is otherwise no way to count them.
722    */
723   public static class CoprocessorCounter extends BaseRegionObserver {
724     private int nCount = 0;
725     private int nDelete = 0;
726 
727     @Override
728     public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
729         final WALEdit edit, final Durability durability) throws IOException {
730       nCount++;
731     }
732 
733     @Override
734     public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
735         final Delete delete, final WALEdit edit, final Durability durability) throws IOException {
736       nDelete++;
737     }
738 
739     @Override
740     public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
741         final Get get, final List<Cell> result) throws IOException {
742       if (get.getAttribute("count") != null) {
743         result.clear();
744         // order is important!
745         result.add(new KeyValue(count, count, delete, Bytes.toBytes(nDelete)));
746         result.add(new KeyValue(count, count, put, Bytes.toBytes(nCount)));
747         c.bypass();
748       }
749     }
750   }
751 
752 }