View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.replication;
20  
21  import static org.junit.Assert.assertArrayEquals;
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertTrue;
24  import static org.junit.Assert.fail;
25  
26  import java.util.ArrayList;
27  import java.util.HashMap;
28  import java.util.List;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.hbase.Cell;
33  import org.apache.hadoop.hbase.CellUtil;
34  import org.apache.hadoop.hbase.ClusterStatus;
35  import org.apache.hadoop.hbase.HColumnDescriptor;
36  import org.apache.hadoop.hbase.HConstants;
37  import org.apache.hadoop.hbase.HRegionInfo;
38  import org.apache.hadoop.hbase.HTableDescriptor;
39  import org.apache.hadoop.hbase.ServerLoad;
40  import org.apache.hadoop.hbase.ServerName;
41  import org.apache.hadoop.hbase.TableName;
42  import org.apache.hadoop.hbase.client.HTable;
43  import org.apache.hadoop.hbase.client.Admin;
44  import org.apache.hadoop.hbase.client.Delete;
45  import org.apache.hadoop.hbase.client.Get;
46  import org.apache.hadoop.hbase.client.HBaseAdmin;
47  import org.apache.hadoop.hbase.client.Put;
48  import org.apache.hadoop.hbase.client.Result;
49  import org.apache.hadoop.hbase.client.ResultScanner;
50  import org.apache.hadoop.hbase.client.Scan;
51  import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
52  import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
53  import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
54  import org.apache.hadoop.hbase.testclassification.LargeTests;
55  import org.apache.hadoop.hbase.wal.WALKey;
56  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
57  import org.apache.hadoop.hbase.replication.regionserver.Replication;
58  import org.apache.hadoop.hbase.util.Bytes;
59  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
60  import org.apache.hadoop.hbase.util.JVMClusterUtil;
61  import org.apache.hadoop.mapreduce.Job;
62  import org.junit.Before;
63  import org.junit.Test;
64  import org.junit.experimental.categories.Category;
65  
66  @Category(LargeTests.class)
67  public class TestReplicationSmallTests extends TestReplicationBase {
68  
69    private static final Log LOG = LogFactory.getLog(TestReplicationSmallTests.class);
70  
71    /**
72     * @throws java.lang.Exception
73     */
74    @Before
75    public void setUp() throws Exception {
76      // Starting and stopping replication can make us miss new logs,
77      // rolling like this makes sure the most recent one gets added to the queue
78      for ( JVMClusterUtil.RegionServerThread r :
79          utility1.getHBaseCluster().getRegionServerThreads()) {
80        utility1.getHBaseAdmin().rollWALWriter(r.getRegionServer().getServerName());
81      }
82      utility1.deleteTableData(tableName);
83      // truncating the table will send one Delete per row to the slave cluster
84      // in an async fashion, which is why we cannot just call deleteTableData on
85      // utility2 since late writes could make it to the slave in some way.
86      // Instead, we truncate the first table and wait for all the Deletes to
87      // make it to the slave.
88      Scan scan = new Scan();
89      int lastCount = 0;
90      for (int i = 0; i < NB_RETRIES; i++) {
91        if (i==NB_RETRIES-1) {
92          fail("Waited too much time for truncate");
93        }
94        ResultScanner scanner = htable2.getScanner(scan);
95        Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
96        scanner.close();
97        if (res.length != 0) {
98          if (res.length < lastCount) {
99            i--; // Don't increment timeout if we make progress
100         }
101         lastCount = res.length;
102         LOG.info("Still got " + res.length + " rows");
103         Thread.sleep(SLEEP_TIME);
104       } else {
105         break;
106       }
107     }
108   }
109 
110   /**
111    * Verify that version and column delete marker types are replicated
112    * correctly.
113    * @throws Exception
114    */
115   @Test(timeout=300000)
116   public void testDeleteTypes() throws Exception {
117     LOG.info("testDeleteTypes");
118     final byte[] v1 = Bytes.toBytes("v1");
119     final byte[] v2 = Bytes.toBytes("v2");
120     final byte[] v3 = Bytes.toBytes("v3");
121     htable1 = new HTable(conf1, tableName);
122 
123     long t = EnvironmentEdgeManager.currentTime();
124     // create three versions for "row"
125     Put put = new Put(row);
126     put.add(famName, row, t, v1);
127     htable1.put(put);
128 
129     put = new Put(row);
130     put.add(famName, row, t+1, v2);
131     htable1.put(put);
132 
133     put = new Put(row);
134     put.add(famName, row, t+2, v3);
135     htable1.put(put);
136 
137     Get get = new Get(row);
138     get.setMaxVersions();
139     for (int i = 0; i < NB_RETRIES; i++) {
140       if (i==NB_RETRIES-1) {
141         fail("Waited too much time for put replication");
142       }
143       Result res = htable2.get(get);
144       if (res.size() < 3) {
145         LOG.info("Rows not available");
146         Thread.sleep(SLEEP_TIME);
147       } else {
148         assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3);
149         assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2);
150         assertArrayEquals(CellUtil.cloneValue(res.rawCells()[2]), v1);
151         break;
152       }
153     }
154     // place a version delete marker (delete last version)
155     Delete d = new Delete(row);
156     d.deleteColumn(famName, row, t);
157     htable1.delete(d);
158 
159     get = new Get(row);
160     get.setMaxVersions();
161     for (int i = 0; i < NB_RETRIES; i++) {
162       if (i==NB_RETRIES-1) {
163         fail("Waited too much time for put replication");
164       }
165       Result res = htable2.get(get);
166       if (res.size() > 2) {
167         LOG.info("Version not deleted");
168         Thread.sleep(SLEEP_TIME);
169       } else {
170         assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3);
171         assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2);
172         break;
173       }
174     }
175 
176     // place a column delete marker
177     d = new Delete(row);
178     d.deleteColumns(famName, row, t+2);
179     htable1.delete(d);
180 
181     // now *both* of the remaining version should be deleted
182     // at the replica
183     get = new Get(row);
184     for (int i = 0; i < NB_RETRIES; i++) {
185       if (i==NB_RETRIES-1) {
186         fail("Waited too much time for del replication");
187       }
188       Result res = htable2.get(get);
189       if (res.size() >= 1) {
190         LOG.info("Rows not deleted");
191         Thread.sleep(SLEEP_TIME);
192       } else {
193         break;
194       }
195     }
196   }
197 
198   /**
199    * Add a row, check it's replicated, delete it, check's gone
200    * @throws Exception
201    */
202   @Test(timeout=300000)
203   public void testSimplePutDelete() throws Exception {
204     LOG.info("testSimplePutDelete");
205     Put put = new Put(row);
206     put.add(famName, row, row);
207 
208     htable1 = new HTable(conf1, tableName);
209     htable1.put(put);
210 
211     Get get = new Get(row);
212     for (int i = 0; i < NB_RETRIES; i++) {
213       if (i==NB_RETRIES-1) {
214         fail("Waited too much time for put replication");
215       }
216       Result res = htable2.get(get);
217       if (res.size() == 0) {
218         LOG.info("Row not available");
219         Thread.sleep(SLEEP_TIME);
220       } else {
221         assertArrayEquals(res.value(), row);
222         break;
223       }
224     }
225 
226     Delete del = new Delete(row);
227     htable1.delete(del);
228 
229     get = new Get(row);
230     for (int i = 0; i < NB_RETRIES; i++) {
231       if (i==NB_RETRIES-1) {
232         fail("Waited too much time for del replication");
233       }
234       Result res = htable2.get(get);
235       if (res.size() >= 1) {
236         LOG.info("Row not deleted");
237         Thread.sleep(SLEEP_TIME);
238       } else {
239         break;
240       }
241     }
242   }
243 
244   /**
245    * Try a small batch upload using the write buffer, check it's replicated
246    * @throws Exception
247    */
248   @Test(timeout=300000)
249   public void testSmallBatch() throws Exception {
250     LOG.info("testSmallBatch");
251     // normal Batch tests
252     List<Put> puts = new ArrayList<>();
253     for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
254       Put put = new Put(Bytes.toBytes(i));
255       put.add(famName, row, row);
256       puts.add(put);
257     }
258     htable1.put(puts);
259 
260     Scan scan = new Scan();
261 
262     ResultScanner scanner1 = htable1.getScanner(scan);
263     Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
264     scanner1.close();
265     assertEquals(NB_ROWS_IN_BATCH, res1.length);
266 
267     for (int i = 0; i < NB_RETRIES; i++) {
268       scan = new Scan();
269       if (i==NB_RETRIES-1) {
270         fail("Waited too much time for normal batch replication");
271       }
272       ResultScanner scanner = htable2.getScanner(scan);
273       Result[] res = scanner.next(NB_ROWS_IN_BATCH);
274       scanner.close();
275       if (res.length != NB_ROWS_IN_BATCH) {
276         LOG.info("Only got " + res.length + " rows");
277         Thread.sleep(SLEEP_TIME);
278       } else {
279         break;
280       }
281     }
282   }
283 
284   /**
285    * Test disable/enable replication, trying to insert, make sure nothing's
286    * replicated, enable it, the insert should be replicated
287    *
288    * @throws Exception
289    */
290   @Test(timeout = 300000)
291   public void testDisableEnable() throws Exception {
292 
293     // Test disabling replication
294     admin.disablePeer("2");
295 
296     byte[] rowkey = Bytes.toBytes("disable enable");
297     Put put = new Put(rowkey);
298     put.add(famName, row, row);
299     htable1.put(put);
300 
301     Get get = new Get(rowkey);
302     for (int i = 0; i < NB_RETRIES; i++) {
303       Result res = htable2.get(get);
304       if (res.size() >= 1) {
305         fail("Replication wasn't disabled");
306       } else {
307         LOG.info("Row not replicated, let's wait a bit more...");
308         Thread.sleep(SLEEP_TIME);
309       }
310     }
311 
312     // Test enable replication
313     admin.enablePeer("2");
314 
315     for (int i = 0; i < NB_RETRIES; i++) {
316       Result res = htable2.get(get);
317       if (res.size() == 0) {
318         LOG.info("Row not available");
319         Thread.sleep(SLEEP_TIME);
320       } else {
321         assertArrayEquals(res.value(), row);
322         return;
323       }
324     }
325     fail("Waited too much time for put replication");
326   }
327 
328   /**
329    * Integration test for TestReplicationAdmin, removes and re-add a peer
330    * cluster
331    *
332    * @throws Exception
333    */
334   @Test(timeout=300000)
335   public void testAddAndRemoveClusters() throws Exception {
336     LOG.info("testAddAndRemoveClusters");
337     admin.removePeer("2");
338     Thread.sleep(SLEEP_TIME);
339     byte[] rowKey = Bytes.toBytes("Won't be replicated");
340     Put put = new Put(rowKey);
341     put.add(famName, row, row);
342     htable1.put(put);
343 
344     Get get = new Get(rowKey);
345     for (int i = 0; i < NB_RETRIES; i++) {
346       if (i == NB_RETRIES-1) {
347         break;
348       }
349       Result res = htable2.get(get);
350       if (res.size() >= 1) {
351         fail("Not supposed to be replicated");
352       } else {
353         LOG.info("Row not replicated, let's wait a bit more...");
354         Thread.sleep(SLEEP_TIME);
355       }
356     }
357 
358     admin.addPeer("2", utility2.getClusterKey());
359     Thread.sleep(SLEEP_TIME);
360     rowKey = Bytes.toBytes("do rep");
361     put = new Put(rowKey);
362     put.add(famName, row, row);
363     LOG.info("Adding new row");
364     htable1.put(put);
365 
366     get = new Get(rowKey);
367     for (int i = 0; i < NB_RETRIES; i++) {
368       if (i==NB_RETRIES-1) {
369         fail("Waited too much time for put replication");
370       }
371       Result res = htable2.get(get);
372       if (res.size() == 0) {
373         LOG.info("Row not available");
374         Thread.sleep(SLEEP_TIME*i);
375       } else {
376         assertArrayEquals(res.value(), row);
377         break;
378       }
379     }
380   }
381 
382 
383   /**
384    * Do a more intense version testSmallBatch, one  that will trigger
385    * wal rolling and other non-trivial code paths
386    * @throws Exception
387    */
388   @Test(timeout=300000)
389   public void testLoading() throws Exception {
390     LOG.info("Writing out rows to table1 in testLoading");
391     List<Put> puts = new ArrayList<Put>();
392     for (int i = 0; i < NB_ROWS_IN_BIG_BATCH; i++) {
393       Put put = new Put(Bytes.toBytes(i));
394       put.add(famName, row, row);
395       puts.add(put);
396     }
397     htable1.setWriteBufferSize(1024);
398     // The puts will be iterated through and flushed only when the buffer
399     // size is reached.
400     htable1.put(puts);
401 
402     Scan scan = new Scan();
403 
404     ResultScanner scanner = htable1.getScanner(scan);
405     Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
406     scanner.close();
407 
408     assertEquals(NB_ROWS_IN_BIG_BATCH, res.length);
409 
410     LOG.info("Looking in table2 for replicated rows in testLoading");
411     long start = System.currentTimeMillis();
412     // Retry more than NB_RETRIES.  As it was, retries were done in 5 seconds and we'd fail
413     // sometimes.
414     final long retries = NB_RETRIES * 10;
415     for (int i = 0; i < retries; i++) {
416       scan = new Scan();
417       scanner = htable2.getScanner(scan);
418       res = scanner.next(NB_ROWS_IN_BIG_BATCH);
419       scanner.close();
420       if (res.length != NB_ROWS_IN_BIG_BATCH) {
421         if (i == retries - 1) {
422           int lastRow = -1;
423           for (Result result : res) {
424             int currentRow = Bytes.toInt(result.getRow());
425             for (int row = lastRow+1; row < currentRow; row++) {
426               LOG.error("Row missing: " + row);
427             }
428             lastRow = currentRow;
429           }
430           LOG.error("Last row: " + lastRow);
431           fail("Waited too much time for normal batch replication, " +
432             res.length + " instead of " + NB_ROWS_IN_BIG_BATCH + "; waited=" +
433             (System.currentTimeMillis() - start) + "ms");
434         } else {
435           LOG.info("Only got " + res.length + " rows... retrying");
436           Thread.sleep(SLEEP_TIME);
437         }
438       } else {
439         break;
440       }
441     }
442   }
443 
444   /**
445    * Do a small loading into a table, make sure the data is really the same,
446    * then run the VerifyReplication job to check the results. Do a second
447    * comparison where all the cells are different.
448    * @throws Exception
449    */
450   @Test(timeout=300000)
451   public void testVerifyRepJob() throws Exception {
452     // Populate the tables, at the same time it guarantees that the tables are
453     // identical since it does the check
454     testSmallBatch();
455 
456     String[] args = new String[] {"2", tableName.getNameAsString()};
457     Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args);
458     if (job == null) {
459       fail("Job wasn't created, see the log");
460     }
461     if (!job.waitForCompletion(true)) {
462       fail("Job failed, see the log");
463     }
464     assertEquals(NB_ROWS_IN_BATCH, job.getCounters().
465         findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
466     assertEquals(0, job.getCounters().
467         findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
468 
469     Scan scan = new Scan();
470     ResultScanner rs = htable2.getScanner(scan);
471     Put put = null;
472     for (Result result : rs) {
473       put = new Put(result.getRow());
474       Cell firstVal = result.rawCells()[0];
475       put.add(CellUtil.cloneFamily(firstVal),
476           CellUtil.cloneQualifier(firstVal), Bytes.toBytes("diff data"));
477       htable2.put(put);
478     }
479     Delete delete = new Delete(put.getRow());
480     htable2.delete(delete);
481     job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args);
482     if (job == null) {
483       fail("Job wasn't created, see the log");
484     }
485     if (!job.waitForCompletion(true)) {
486       fail("Job failed, see the log");
487     }
488     assertEquals(0, job.getCounters().
489         findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
490     assertEquals(NB_ROWS_IN_BATCH, job.getCounters().
491         findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
492   }
493 
494   /**
495    * Test for HBASE-9038, Replication.scopeWALEdits would NPE if it wasn't filtering out
496    * the compaction WALEdit
497    * @throws Exception
498    */
499   @Test(timeout=300000)
500   public void testCompactionWALEdits() throws Exception {
501     WALProtos.CompactionDescriptor compactionDescriptor =
502         WALProtos.CompactionDescriptor.getDefaultInstance();
503     HRegionInfo hri = new HRegionInfo(htable1.getName(),
504       HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
505     WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
506     Replication.scopeWALEdits(htable1.getTableDescriptor(), new WALKey(), edit,
507       htable1.getConfiguration(), null);
508   }
509 
510   /**
511    * Test for HBASE-8663
512    * Create two new Tables with colfamilies enabled for replication then run
513    * ReplicationAdmin.listReplicated(). Finally verify the table:colfamilies. Note:
514    * TestReplicationAdmin is a better place for this testing but it would need mocks.
515    * @throws Exception
516    */
517   @Test(timeout = 300000)
518   public void testVerifyListReplicatedTable() throws Exception {
519     LOG.info("testVerifyListReplicatedTable");
520 
521     final String tName = "VerifyListReplicated_";
522     final String colFam = "cf1";
523     final int numOfTables = 3;
524 
525     HBaseAdmin hadmin = new HBaseAdmin(conf1);
526 
527     // Create Tables
528     for (int i = 0; i < numOfTables; i++) {
529       HTableDescriptor ht = new HTableDescriptor(TableName.valueOf(tName + i));
530       HColumnDescriptor cfd = new HColumnDescriptor(colFam);
531       cfd.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
532       ht.addFamily(cfd);
533       hadmin.createTable(ht);
534     }
535 
536     // verify the result
537     List<HashMap<String, String>> replicationColFams = admin.listReplicated();
538     int[] match = new int[numOfTables]; // array of 3 with init value of zero
539 
540     for (int i = 0; i < replicationColFams.size(); i++) {
541       HashMap<String, String> replicationEntry = replicationColFams.get(i);
542       String tn = replicationEntry.get(ReplicationAdmin.TNAME);
543       if ((tn.startsWith(tName)) && replicationEntry.get(ReplicationAdmin.CFNAME).equals(colFam)) {
544         int m = Integer.parseInt(tn.substring(tn.length() - 1)); // get the last digit
545         match[m]++; // should only increase once
546       }
547     }
548 
549     // check the matching result
550     for (int i = 0; i < match.length; i++) {
551       assertTrue("listReplicated() does not match table " + i, (match[i] == 1));
552     }
553 
554     // drop tables
555     for (int i = 0; i < numOfTables; i++) {
556       String ht = tName + i;
557       hadmin.disableTable(ht);
558       hadmin.deleteTable(ht);
559     }
560 
561     hadmin.close();
562   }
563 
564   /**
565    * Test for HBASE-9531
566    * put a few rows into htable1, which should be replicated to htable2
567    * create a ClusterStatus instance 'status' from HBaseAdmin
568    * test : status.getLoad(server).getReplicationLoadSourceList()
569    * test : status.getLoad(server).getReplicationLoadSink()
570    * * @throws Exception
571    */
572   @Test(timeout = 300000)
573   public void testReplicationStatus() throws Exception {
574     LOG.info("testReplicationStatus");
575 
576     try (Admin admin = utility1.getConnection().getAdmin()) {
577 
578       final byte[] qualName = Bytes.toBytes("q");
579       Put p;
580 
581       for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
582         p = new Put(Bytes.toBytes("row" + i));
583         p.add(famName, qualName, Bytes.toBytes("val" + i));
584         htable1.put(p);
585       }
586 
587       ClusterStatus status = admin.getClusterStatus();
588 
589       for (ServerName server : status.getServers()) {
590         ServerLoad sl = status.getLoad(server);
591         List<ReplicationLoadSource> rLoadSourceList = sl.getReplicationLoadSourceList();
592         ReplicationLoadSink rLoadSink = sl.getReplicationLoadSink();
593 
594         // check SourceList has at least one entry
595         assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() > 0));
596 
597         // check Sink exist only as it is difficult to verify the value on the fly
598         assertTrue("failed to get ReplicationLoadSink.AgeOfLastShippedOp ",
599           (rLoadSink.getAgeOfLastAppliedOp() >= 0));
600         assertTrue("failed to get ReplicationLoadSink.TimeStampsOfLastAppliedOp ",
601           (rLoadSink.getTimeStampsOfLastAppliedOp() >= 0));
602       }
603     }
604   }
605 }