1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import static org.junit.Assert.assertArrayEquals;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertTrue;
24 import static org.junit.Assert.fail;
25
26 import java.util.ArrayList;
27 import java.util.HashMap;
28 import java.util.List;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.hbase.Cell;
33 import org.apache.hadoop.hbase.CellUtil;
34 import org.apache.hadoop.hbase.ClusterStatus;
35 import org.apache.hadoop.hbase.HColumnDescriptor;
36 import org.apache.hadoop.hbase.HConstants;
37 import org.apache.hadoop.hbase.HRegionInfo;
38 import org.apache.hadoop.hbase.HTableDescriptor;
39 import org.apache.hadoop.hbase.ServerLoad;
40 import org.apache.hadoop.hbase.ServerName;
41 import org.apache.hadoop.hbase.TableName;
42 import org.apache.hadoop.hbase.client.HTable;
43 import org.apache.hadoop.hbase.client.Admin;
44 import org.apache.hadoop.hbase.client.Delete;
45 import org.apache.hadoop.hbase.client.Get;
46 import org.apache.hadoop.hbase.client.HBaseAdmin;
47 import org.apache.hadoop.hbase.client.Put;
48 import org.apache.hadoop.hbase.client.Result;
49 import org.apache.hadoop.hbase.client.ResultScanner;
50 import org.apache.hadoop.hbase.client.Scan;
51 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
52 import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
53 import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
54 import org.apache.hadoop.hbase.testclassification.LargeTests;
55 import org.apache.hadoop.hbase.wal.WALKey;
56 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
57 import org.apache.hadoop.hbase.replication.regionserver.Replication;
58 import org.apache.hadoop.hbase.util.Bytes;
59 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
60 import org.apache.hadoop.hbase.util.JVMClusterUtil;
61 import org.apache.hadoop.mapreduce.Job;
62 import org.junit.Before;
63 import org.junit.Test;
64 import org.junit.experimental.categories.Category;
65
66 @Category(LargeTests.class)
67 public class TestReplicationSmallTests extends TestReplicationBase {
68
69 private static final Log LOG = LogFactory.getLog(TestReplicationSmallTests.class);
70
71
72
73
74 @Before
75 public void setUp() throws Exception {
76
77
78 for ( JVMClusterUtil.RegionServerThread r :
79 utility1.getHBaseCluster().getRegionServerThreads()) {
80 utility1.getHBaseAdmin().rollWALWriter(r.getRegionServer().getServerName());
81 }
82 utility1.deleteTableData(tableName);
83
84
85
86
87
88 Scan scan = new Scan();
89 int lastCount = 0;
90 for (int i = 0; i < NB_RETRIES; i++) {
91 if (i==NB_RETRIES-1) {
92 fail("Waited too much time for truncate");
93 }
94 ResultScanner scanner = htable2.getScanner(scan);
95 Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
96 scanner.close();
97 if (res.length != 0) {
98 if (res.length < lastCount) {
99 i--;
100 }
101 lastCount = res.length;
102 LOG.info("Still got " + res.length + " rows");
103 Thread.sleep(SLEEP_TIME);
104 } else {
105 break;
106 }
107 }
108 }
109
110
111
112
113
114
115 @Test(timeout=300000)
116 public void testDeleteTypes() throws Exception {
117 LOG.info("testDeleteTypes");
118 final byte[] v1 = Bytes.toBytes("v1");
119 final byte[] v2 = Bytes.toBytes("v2");
120 final byte[] v3 = Bytes.toBytes("v3");
121 htable1 = new HTable(conf1, tableName);
122
123 long t = EnvironmentEdgeManager.currentTime();
124
125 Put put = new Put(row);
126 put.add(famName, row, t, v1);
127 htable1.put(put);
128
129 put = new Put(row);
130 put.add(famName, row, t+1, v2);
131 htable1.put(put);
132
133 put = new Put(row);
134 put.add(famName, row, t+2, v3);
135 htable1.put(put);
136
137 Get get = new Get(row);
138 get.setMaxVersions();
139 for (int i = 0; i < NB_RETRIES; i++) {
140 if (i==NB_RETRIES-1) {
141 fail("Waited too much time for put replication");
142 }
143 Result res = htable2.get(get);
144 if (res.size() < 3) {
145 LOG.info("Rows not available");
146 Thread.sleep(SLEEP_TIME);
147 } else {
148 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3);
149 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2);
150 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[2]), v1);
151 break;
152 }
153 }
154
155 Delete d = new Delete(row);
156 d.deleteColumn(famName, row, t);
157 htable1.delete(d);
158
159 get = new Get(row);
160 get.setMaxVersions();
161 for (int i = 0; i < NB_RETRIES; i++) {
162 if (i==NB_RETRIES-1) {
163 fail("Waited too much time for put replication");
164 }
165 Result res = htable2.get(get);
166 if (res.size() > 2) {
167 LOG.info("Version not deleted");
168 Thread.sleep(SLEEP_TIME);
169 } else {
170 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3);
171 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2);
172 break;
173 }
174 }
175
176
177 d = new Delete(row);
178 d.deleteColumns(famName, row, t+2);
179 htable1.delete(d);
180
181
182
183 get = new Get(row);
184 for (int i = 0; i < NB_RETRIES; i++) {
185 if (i==NB_RETRIES-1) {
186 fail("Waited too much time for del replication");
187 }
188 Result res = htable2.get(get);
189 if (res.size() >= 1) {
190 LOG.info("Rows not deleted");
191 Thread.sleep(SLEEP_TIME);
192 } else {
193 break;
194 }
195 }
196 }
197
198
199
200
201
202 @Test(timeout=300000)
203 public void testSimplePutDelete() throws Exception {
204 LOG.info("testSimplePutDelete");
205 Put put = new Put(row);
206 put.add(famName, row, row);
207
208 htable1 = new HTable(conf1, tableName);
209 htable1.put(put);
210
211 Get get = new Get(row);
212 for (int i = 0; i < NB_RETRIES; i++) {
213 if (i==NB_RETRIES-1) {
214 fail("Waited too much time for put replication");
215 }
216 Result res = htable2.get(get);
217 if (res.size() == 0) {
218 LOG.info("Row not available");
219 Thread.sleep(SLEEP_TIME);
220 } else {
221 assertArrayEquals(res.value(), row);
222 break;
223 }
224 }
225
226 Delete del = new Delete(row);
227 htable1.delete(del);
228
229 get = new Get(row);
230 for (int i = 0; i < NB_RETRIES; i++) {
231 if (i==NB_RETRIES-1) {
232 fail("Waited too much time for del replication");
233 }
234 Result res = htable2.get(get);
235 if (res.size() >= 1) {
236 LOG.info("Row not deleted");
237 Thread.sleep(SLEEP_TIME);
238 } else {
239 break;
240 }
241 }
242 }
243
244
245
246
247
248 @Test(timeout=300000)
249 public void testSmallBatch() throws Exception {
250 LOG.info("testSmallBatch");
251
252 List<Put> puts = new ArrayList<>();
253 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
254 Put put = new Put(Bytes.toBytes(i));
255 put.add(famName, row, row);
256 puts.add(put);
257 }
258 htable1.put(puts);
259
260 Scan scan = new Scan();
261
262 ResultScanner scanner1 = htable1.getScanner(scan);
263 Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
264 scanner1.close();
265 assertEquals(NB_ROWS_IN_BATCH, res1.length);
266
267 for (int i = 0; i < NB_RETRIES; i++) {
268 scan = new Scan();
269 if (i==NB_RETRIES-1) {
270 fail("Waited too much time for normal batch replication");
271 }
272 ResultScanner scanner = htable2.getScanner(scan);
273 Result[] res = scanner.next(NB_ROWS_IN_BATCH);
274 scanner.close();
275 if (res.length != NB_ROWS_IN_BATCH) {
276 LOG.info("Only got " + res.length + " rows");
277 Thread.sleep(SLEEP_TIME);
278 } else {
279 break;
280 }
281 }
282 }
283
284
285
286
287
288
289
290 @Test(timeout = 300000)
291 public void testDisableEnable() throws Exception {
292
293
294 admin.disablePeer("2");
295
296 byte[] rowkey = Bytes.toBytes("disable enable");
297 Put put = new Put(rowkey);
298 put.add(famName, row, row);
299 htable1.put(put);
300
301 Get get = new Get(rowkey);
302 for (int i = 0; i < NB_RETRIES; i++) {
303 Result res = htable2.get(get);
304 if (res.size() >= 1) {
305 fail("Replication wasn't disabled");
306 } else {
307 LOG.info("Row not replicated, let's wait a bit more...");
308 Thread.sleep(SLEEP_TIME);
309 }
310 }
311
312
313 admin.enablePeer("2");
314
315 for (int i = 0; i < NB_RETRIES; i++) {
316 Result res = htable2.get(get);
317 if (res.size() == 0) {
318 LOG.info("Row not available");
319 Thread.sleep(SLEEP_TIME);
320 } else {
321 assertArrayEquals(res.value(), row);
322 return;
323 }
324 }
325 fail("Waited too much time for put replication");
326 }
327
328
329
330
331
332
333
334 @Test(timeout=300000)
335 public void testAddAndRemoveClusters() throws Exception {
336 LOG.info("testAddAndRemoveClusters");
337 admin.removePeer("2");
338 Thread.sleep(SLEEP_TIME);
339 byte[] rowKey = Bytes.toBytes("Won't be replicated");
340 Put put = new Put(rowKey);
341 put.add(famName, row, row);
342 htable1.put(put);
343
344 Get get = new Get(rowKey);
345 for (int i = 0; i < NB_RETRIES; i++) {
346 if (i == NB_RETRIES-1) {
347 break;
348 }
349 Result res = htable2.get(get);
350 if (res.size() >= 1) {
351 fail("Not supposed to be replicated");
352 } else {
353 LOG.info("Row not replicated, let's wait a bit more...");
354 Thread.sleep(SLEEP_TIME);
355 }
356 }
357
358 admin.addPeer("2", utility2.getClusterKey());
359 Thread.sleep(SLEEP_TIME);
360 rowKey = Bytes.toBytes("do rep");
361 put = new Put(rowKey);
362 put.add(famName, row, row);
363 LOG.info("Adding new row");
364 htable1.put(put);
365
366 get = new Get(rowKey);
367 for (int i = 0; i < NB_RETRIES; i++) {
368 if (i==NB_RETRIES-1) {
369 fail("Waited too much time for put replication");
370 }
371 Result res = htable2.get(get);
372 if (res.size() == 0) {
373 LOG.info("Row not available");
374 Thread.sleep(SLEEP_TIME*i);
375 } else {
376 assertArrayEquals(res.value(), row);
377 break;
378 }
379 }
380 }
381
382
383
384
385
386
387
388 @Test(timeout=300000)
389 public void testLoading() throws Exception {
390 LOG.info("Writing out rows to table1 in testLoading");
391 List<Put> puts = new ArrayList<Put>();
392 for (int i = 0; i < NB_ROWS_IN_BIG_BATCH; i++) {
393 Put put = new Put(Bytes.toBytes(i));
394 put.add(famName, row, row);
395 puts.add(put);
396 }
397 htable1.setWriteBufferSize(1024);
398
399
400 htable1.put(puts);
401
402 Scan scan = new Scan();
403
404 ResultScanner scanner = htable1.getScanner(scan);
405 Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
406 scanner.close();
407
408 assertEquals(NB_ROWS_IN_BIG_BATCH, res.length);
409
410 LOG.info("Looking in table2 for replicated rows in testLoading");
411 long start = System.currentTimeMillis();
412
413
414 final long retries = NB_RETRIES * 10;
415 for (int i = 0; i < retries; i++) {
416 scan = new Scan();
417 scanner = htable2.getScanner(scan);
418 res = scanner.next(NB_ROWS_IN_BIG_BATCH);
419 scanner.close();
420 if (res.length != NB_ROWS_IN_BIG_BATCH) {
421 if (i == retries - 1) {
422 int lastRow = -1;
423 for (Result result : res) {
424 int currentRow = Bytes.toInt(result.getRow());
425 for (int row = lastRow+1; row < currentRow; row++) {
426 LOG.error("Row missing: " + row);
427 }
428 lastRow = currentRow;
429 }
430 LOG.error("Last row: " + lastRow);
431 fail("Waited too much time for normal batch replication, " +
432 res.length + " instead of " + NB_ROWS_IN_BIG_BATCH + "; waited=" +
433 (System.currentTimeMillis() - start) + "ms");
434 } else {
435 LOG.info("Only got " + res.length + " rows... retrying");
436 Thread.sleep(SLEEP_TIME);
437 }
438 } else {
439 break;
440 }
441 }
442 }
443
444
445
446
447
448
449
450 @Test(timeout=300000)
451 public void testVerifyRepJob() throws Exception {
452
453
454 testSmallBatch();
455
456 String[] args = new String[] {"2", tableName.getNameAsString()};
457 Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args);
458 if (job == null) {
459 fail("Job wasn't created, see the log");
460 }
461 if (!job.waitForCompletion(true)) {
462 fail("Job failed, see the log");
463 }
464 assertEquals(NB_ROWS_IN_BATCH, job.getCounters().
465 findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
466 assertEquals(0, job.getCounters().
467 findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
468
469 Scan scan = new Scan();
470 ResultScanner rs = htable2.getScanner(scan);
471 Put put = null;
472 for (Result result : rs) {
473 put = new Put(result.getRow());
474 Cell firstVal = result.rawCells()[0];
475 put.add(CellUtil.cloneFamily(firstVal),
476 CellUtil.cloneQualifier(firstVal), Bytes.toBytes("diff data"));
477 htable2.put(put);
478 }
479 Delete delete = new Delete(put.getRow());
480 htable2.delete(delete);
481 job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args);
482 if (job == null) {
483 fail("Job wasn't created, see the log");
484 }
485 if (!job.waitForCompletion(true)) {
486 fail("Job failed, see the log");
487 }
488 assertEquals(0, job.getCounters().
489 findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
490 assertEquals(NB_ROWS_IN_BATCH, job.getCounters().
491 findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
492 }
493
494
495
496
497
498
499 @Test(timeout=300000)
500 public void testCompactionWALEdits() throws Exception {
501 WALProtos.CompactionDescriptor compactionDescriptor =
502 WALProtos.CompactionDescriptor.getDefaultInstance();
503 HRegionInfo hri = new HRegionInfo(htable1.getName(),
504 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
505 WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
506 Replication.scopeWALEdits(htable1.getTableDescriptor(), new WALKey(), edit,
507 htable1.getConfiguration(), null);
508 }
509
510
511
512
513
514
515
516
517 @Test(timeout = 300000)
518 public void testVerifyListReplicatedTable() throws Exception {
519 LOG.info("testVerifyListReplicatedTable");
520
521 final String tName = "VerifyListReplicated_";
522 final String colFam = "cf1";
523 final int numOfTables = 3;
524
525 HBaseAdmin hadmin = new HBaseAdmin(conf1);
526
527
528 for (int i = 0; i < numOfTables; i++) {
529 HTableDescriptor ht = new HTableDescriptor(TableName.valueOf(tName + i));
530 HColumnDescriptor cfd = new HColumnDescriptor(colFam);
531 cfd.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
532 ht.addFamily(cfd);
533 hadmin.createTable(ht);
534 }
535
536
537 List<HashMap<String, String>> replicationColFams = admin.listReplicated();
538 int[] match = new int[numOfTables];
539
540 for (int i = 0; i < replicationColFams.size(); i++) {
541 HashMap<String, String> replicationEntry = replicationColFams.get(i);
542 String tn = replicationEntry.get(ReplicationAdmin.TNAME);
543 if ((tn.startsWith(tName)) && replicationEntry.get(ReplicationAdmin.CFNAME).equals(colFam)) {
544 int m = Integer.parseInt(tn.substring(tn.length() - 1));
545 match[m]++;
546 }
547 }
548
549
550 for (int i = 0; i < match.length; i++) {
551 assertTrue("listReplicated() does not match table " + i, (match[i] == 1));
552 }
553
554
555 for (int i = 0; i < numOfTables; i++) {
556 String ht = tName + i;
557 hadmin.disableTable(ht);
558 hadmin.deleteTable(ht);
559 }
560
561 hadmin.close();
562 }
563
564
565
566
567
568
569
570
571
572 @Test(timeout = 300000)
573 public void testReplicationStatus() throws Exception {
574 LOG.info("testReplicationStatus");
575
576 try (Admin admin = utility1.getConnection().getAdmin()) {
577
578 final byte[] qualName = Bytes.toBytes("q");
579 Put p;
580
581 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
582 p = new Put(Bytes.toBytes("row" + i));
583 p.add(famName, qualName, Bytes.toBytes("val" + i));
584 htable1.put(p);
585 }
586
587 ClusterStatus status = admin.getClusterStatus();
588
589 for (ServerName server : status.getServers()) {
590 ServerLoad sl = status.getLoad(server);
591 List<ReplicationLoadSource> rLoadSourceList = sl.getReplicationLoadSourceList();
592 ReplicationLoadSink rLoadSink = sl.getReplicationLoadSink();
593
594
595 assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() > 0));
596
597
598 assertTrue("failed to get ReplicationLoadSink.AgeOfLastShippedOp ",
599 (rLoadSink.getAgeOfLastAppliedOp() >= 0));
600 assertTrue("failed to get ReplicationLoadSink.TimeStampsOfLastAppliedOp ",
601 (rLoadSink.getTimeStampsOfLastAppliedOp() >= 0));
602 }
603 }
604 }
605 }