1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertNull;
24 import static org.junit.Assert.assertTrue;
25
26 import com.google.common.collect.Sets;
27
28 import java.io.IOException;
29 import java.net.URLEncoder;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.HashMap;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.NavigableMap;
36 import java.util.SortedMap;
37 import java.util.SortedSet;
38 import java.util.TreeSet;
39 import java.util.UUID;
40 import java.util.concurrent.CountDownLatch;
41 import java.util.concurrent.atomic.AtomicLong;
42
43 import org.apache.commons.logging.Log;
44 import org.apache.commons.logging.LogFactory;
45 import org.apache.hadoop.conf.Configuration;
46 import org.apache.hadoop.fs.FileSystem;
47 import org.apache.hadoop.fs.Path;
48 import org.apache.hadoop.hbase.ChoreService;
49 import org.apache.hadoop.hbase.ClusterId;
50 import org.apache.hadoop.hbase.CoordinatedStateManager;
51 import org.apache.hadoop.hbase.HBaseConfiguration;
52 import org.apache.hadoop.hbase.HBaseTestingUtility;
53 import org.apache.hadoop.hbase.HColumnDescriptor;
54 import org.apache.hadoop.hbase.HConstants;
55 import org.apache.hadoop.hbase.HRegionInfo;
56 import org.apache.hadoop.hbase.HTableDescriptor;
57 import org.apache.hadoop.hbase.KeyValue;
58 import org.apache.hadoop.hbase.Server;
59 import org.apache.hadoop.hbase.ServerName;
60 import org.apache.hadoop.hbase.TableName;
61 import org.apache.hadoop.hbase.client.ClusterConnection;
62 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
63 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
64 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
65 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
66 import org.apache.hadoop.hbase.replication.ReplicationFactory;
67 import org.apache.hadoop.hbase.replication.ReplicationPeers;
68 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
69 import org.apache.hadoop.hbase.replication.ReplicationQueues;
70 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
71 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
72 import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
73 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
74 import org.apache.hadoop.hbase.testclassification.MediumTests;
75 import org.apache.hadoop.hbase.util.ByteStringer;
76 import org.apache.hadoop.hbase.util.Bytes;
77 import org.apache.hadoop.hbase.util.FSUtils;
78 import org.apache.hadoop.hbase.wal.WAL;
79 import org.apache.hadoop.hbase.wal.WALFactory;
80 import org.apache.hadoop.hbase.wal.WALKey;
81 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
82 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
83 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
84 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
85 import org.junit.After;
86 import org.junit.AfterClass;
87 import org.junit.Before;
88 import org.junit.BeforeClass;
89 import org.junit.Rule;
90 import org.junit.Test;
91 import org.junit.experimental.categories.Category;
92 import org.junit.rules.TestName;
93
94 @Category(MediumTests.class)
95 public class TestReplicationSourceManager {
96
97 private static final Log LOG =
98 LogFactory.getLog(TestReplicationSourceManager.class);
99
100 private static Configuration conf;
101
102 private static HBaseTestingUtility utility;
103
104 private static Replication replication;
105
106 private static ReplicationSourceManager manager;
107
108 private static ZooKeeperWatcher zkw;
109
110 private static HTableDescriptor htd;
111
112 private static HRegionInfo hri;
113
114 private static final byte[] r1 = Bytes.toBytes("r1");
115
116 private static final byte[] r2 = Bytes.toBytes("r2");
117
118 private static final byte[] f1 = Bytes.toBytes("f1");
119
120 private static final byte[] f2 = Bytes.toBytes("f2");
121
122 private static final TableName test =
123 TableName.valueOf("test");
124
125 private static final String slaveId = "1";
126
127 private static FileSystem fs;
128
129 private static Path oldLogDir;
130
131 private static Path logDir;
132
133 private static CountDownLatch latch;
134
135 private static List<String> files = new ArrayList<String>();
136
137 @BeforeClass
138 public static void setUpBeforeClass() throws Exception {
139
140 conf = HBaseConfiguration.create();
141 conf.set("replication.replicationsource.implementation",
142 ReplicationSourceDummy.class.getCanonicalName());
143 conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
144 HConstants.REPLICATION_ENABLE_DEFAULT);
145 conf.setLong("replication.sleep.before.failover", 2000);
146 conf.setInt("replication.source.maxretriesmultiplier", 10);
147 utility = new HBaseTestingUtility(conf);
148 utility.startMiniZKCluster();
149
150 zkw = new ZooKeeperWatcher(conf, "test", null);
151 ZKUtil.createWithParents(zkw, "/hbase/replication");
152 ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
153 ZKUtil.setData(zkw, "/hbase/replication/peers/1",
154 Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
155 + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
156 ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
157 ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
158 ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
159 ZKUtil.createWithParents(zkw, "/hbase/replication/state");
160 ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
161
162 ZKClusterId.setClusterId(zkw, new ClusterId());
163 FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
164 fs = FileSystem.get(conf);
165 oldLogDir = new Path(utility.getDataTestDir(),
166 HConstants.HREGION_OLDLOGDIR_NAME);
167 logDir = new Path(utility.getDataTestDir(),
168 HConstants.HREGION_LOGDIR_NAME);
169 replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
170 manager = replication.getReplicationManager();
171
172 manager.addSource(slaveId);
173
174 htd = new HTableDescriptor(test);
175 HColumnDescriptor col = new HColumnDescriptor(f1);
176 col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
177 htd.addFamily(col);
178 col = new HColumnDescriptor(f2);
179 col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
180 htd.addFamily(col);
181
182 hri = new HRegionInfo(htd.getTableName(), r1, r2);
183 }
184
185 @AfterClass
186 public static void tearDownAfterClass() throws Exception {
187 manager.join();
188 utility.shutdownMiniCluster();
189 }
190
191 @Rule
192 public TestName testName = new TestName();
193
194 private void cleanLogDir() throws IOException {
195 fs.delete(logDir, true);
196 fs.delete(oldLogDir, true);
197 }
198
199 @Before
200 public void setUp() throws Exception {
201 LOG.info("Start " + testName.getMethodName());
202 cleanLogDir();
203 }
204
205 @After
206 public void tearDown() throws Exception {
207 LOG.info("End " + testName.getMethodName());
208 cleanLogDir();
209 }
210
211 @Test
212 public void testLogRoll() throws Exception {
213 long seq = 0;
214 long baseline = 1000;
215 long time = baseline;
216 KeyValue kv = new KeyValue(r1, f1, r1);
217 WALEdit edit = new WALEdit();
218 edit.add(kv);
219
220 List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
221 listeners.add(replication);
222 final WALFactory wals = new WALFactory(utility.getConfiguration(), listeners,
223 URLEncoder.encode("regionserver:60020", "UTF8"));
224 final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes());
225 final AtomicLong sequenceId = new AtomicLong(1);
226 manager.init();
227 HTableDescriptor htd = new HTableDescriptor();
228 htd.addFamily(new HColumnDescriptor(f1));
229
230 for(long i = 1; i < 101; i++) {
231 if(i > 1 && i % 20 == 0) {
232 wal.rollWriter();
233 }
234 LOG.info(i);
235 final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), test,
236 System.currentTimeMillis()), edit, sequenceId, true ,null);
237 wal.sync(txid);
238 }
239
240
241
242 LOG.info(baseline + " and " + time);
243 baseline += 101;
244 time = baseline;
245 LOG.info(baseline + " and " + time);
246
247 for (int i = 0; i < 3; i++) {
248 wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), test,
249 System.currentTimeMillis()), edit, sequenceId, true, null);
250 }
251 wal.sync();
252
253 assertEquals(6, manager.getWALs().get(slaveId).size());
254
255 wal.rollWriter();
256
257 manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
258 "1", 0, false, false);
259
260 wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), test,
261 System.currentTimeMillis()), edit, sequenceId, true, null);
262 wal.sync();
263
264 assertEquals(1, manager.getWALs().size());
265
266
267
268 }
269
270 @Test
271 public void testClaimQueues() throws Exception {
272 conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
273 final Server server = new DummyServer("hostname0.example.org");
274 ReplicationQueues rq =
275 ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
276 server);
277 rq.init(server.getServerName().toString());
278
279 files.add("log1");
280 files.add("log2");
281 for (String file : files) {
282 rq.addLog("1", file);
283 }
284
285 Server s1 = new DummyServer("dummyserver1.example.org");
286 Server s2 = new DummyServer("dummyserver2.example.org");
287 Server s3 = new DummyServer("dummyserver3.example.org");
288
289
290 DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(
291 server.getServerName().getServerName(), s1);
292 DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(
293 server.getServerName().getServerName(), s2);
294 DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(
295 server.getServerName().getServerName(), s3);
296
297 latch = new CountDownLatch(3);
298
299 w1.start();
300 w2.start();
301 w3.start();
302
303 int populatedMap = 0;
304
305 latch.await();
306 populatedMap += w1.isLogZnodesMapPopulated() + w2.isLogZnodesMapPopulated()
307 + w3.isLogZnodesMapPopulated();
308 assertEquals(1, populatedMap);
309 server.abort("", null);
310 }
311
312 @Test
313 public void testCleanupFailoverQueues() throws Exception {
314 final Server server = new DummyServer("hostname1.example.org");
315 ReplicationQueues rq =
316 ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
317 server);
318 rq.init(server.getServerName().toString());
319
320 SortedSet<String> files = new TreeSet<String>();
321 files.add("log1");
322 files.add("log2");
323 for (String file : files) {
324 rq.addLog("1", file);
325 }
326 Server s1 = new DummyServer("dummyserver1.example.org");
327 ReplicationQueues rq1 =
328 ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
329 rq1.init(s1.getServerName().toString());
330 ReplicationPeers rp1 =
331 ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1);
332 rp1.init();
333 NodeFailoverWorker w1 =
334 manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID(
335 new Long(1), new Long(2)));
336 w1.start();
337 w1.join(5000);
338 assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
339 String id = "1-" + server.getServerName().getServerName();
340 assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id));
341 manager.cleanOldLogs("log2", id, true);
342
343 assertEquals(Sets.newHashSet("log2"), manager.getWalsByIdRecoveredQueues().get(id));
344 }
345
346 @Test
347 public void testNodeFailoverDeadServerParsing() throws Exception {
348 LOG.debug("testNodeFailoverDeadServerParsing");
349 conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
350 final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
351 ReplicationQueues repQueues =
352 ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server);
353 repQueues.init(server.getServerName().toString());
354
355 files.add("log1");
356 files.add("log2");
357 for (String file : files) {
358 repQueues.addLog("1", file);
359 }
360
361
362 Server s1 = new DummyServer("ip-10-8-101-114.ec2.internal");
363 Server s2 = new DummyServer("ec2-107-20-52-47.compute-1.amazonaws.com");
364 Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com");
365
366
367 ReplicationQueues rq1 =
368 ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
369 rq1.init(s1.getServerName().toString());
370 SortedMap<String, SortedSet<String>> testMap =
371 rq1.claimQueues(server.getServerName().getServerName());
372 ReplicationQueues rq2 =
373 ReplicationFactory.getReplicationQueues(s2.getZooKeeper(), s2.getConfiguration(), s2);
374 rq2.init(s2.getServerName().toString());
375 testMap = rq2.claimQueues(s1.getServerName().getServerName());
376 ReplicationQueues rq3 =
377 ReplicationFactory.getReplicationQueues(s3.getZooKeeper(), s3.getConfiguration(), s3);
378 rq3.init(s3.getServerName().toString());
379 testMap = rq3.claimQueues(s2.getServerName().getServerName());
380
381 ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.firstKey());
382 List<String> result = replicationQueueInfo.getDeadRegionServers();
383
384
385 assertTrue(result.contains(server.getServerName().getServerName()));
386 assertTrue(result.contains(s1.getServerName().getServerName()));
387 assertTrue(result.contains(s2.getServerName().getServerName()));
388
389 server.abort("", null);
390 }
391
392 @Test
393 public void testFailoverDeadServerCversionChange() throws Exception {
394 LOG.debug("testFailoverDeadServerCversionChange");
395
396 conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
397 final Server s0 = new DummyServer("cversion-change0.example.org");
398 ReplicationQueues repQueues =
399 ReplicationFactory.getReplicationQueues(s0.getZooKeeper(), conf, s0);
400 repQueues.init(s0.getServerName().toString());
401
402 files.add("log1");
403 files.add("log2");
404 for (String file : files) {
405 repQueues.addLog("1", file);
406 }
407
408 Server s1 = new DummyServer("cversion-change1.example.org");
409 ReplicationQueues rq1 =
410 ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
411 rq1.init(s1.getServerName().toString());
412
413 ReplicationQueuesClient client =
414 ReplicationFactory.getReplicationQueuesClient(s1.getZooKeeper(), s1.getConfiguration(), s1);
415
416 int v0 = client.getQueuesZNodeCversion();
417 rq1.claimQueues(s0.getServerName().getServerName());
418 int v1 = client.getQueuesZNodeCversion();
419
420 assertEquals(v0 + 1, v1);
421
422 s0.abort("", null);
423 }
424
425 @Test
426 public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception {
427
428 WALKey logKey = new WALKey();
429
430 WALEdit logEdit = getBulkLoadWALEdit();
431
432
433 Replication.scopeWALEdits(htd, logKey, logEdit, conf, manager);
434
435
436 assertNull("No bulk load entries scope should be added if bulk load replication is diabled.",
437 logKey.getScopes());
438 }
439
440 @Test
441 public void testBulkLoadWALEdits() throws Exception {
442
443 WALKey logKey = new WALKey();
444
445 WALEdit logEdit = getBulkLoadWALEdit();
446
447 Configuration bulkLoadConf = HBaseConfiguration.create(conf);
448 bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
449
450
451 Replication.scopeWALEdits(htd, logKey, logEdit, bulkLoadConf, manager);
452
453 NavigableMap<byte[], Integer> scopes = logKey.getScopes();
454
455 assertTrue("This family scope is set to global, should be part of replication key scopes.",
456 scopes.containsKey(f1));
457
458 assertFalse("This family scope is set to local, should not be part of replication key scopes",
459 scopes.containsKey(f2));
460 }
461
462 private WALEdit getBulkLoadWALEdit() {
463
464 Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
465 Map<String, Long> storeFilesSize = new HashMap<>(1);
466 List<Path> p = new ArrayList<>(1);
467 Path hfilePath1 = new Path(Bytes.toString(f1));
468 p.add(hfilePath1);
469 try {
470 storeFilesSize.put(hfilePath1.getName(), fs.getFileStatus(hfilePath1).getLen());
471 } catch (IOException e) {
472 LOG.debug("Failed to calculate the size of hfile " + hfilePath1);
473 storeFilesSize.put(hfilePath1.getName(), 0L);
474 }
475 storeFiles.put(f1, p);
476
477 p = new ArrayList<>(1);
478 Path hfilePath2 = new Path(Bytes.toString(f2));
479 p.add(hfilePath2);
480 try {
481 storeFilesSize.put(hfilePath2.getName(), fs.getFileStatus(hfilePath2).getLen());
482 } catch (IOException e) {
483 LOG.debug("Failed to calculate the size of hfile " + hfilePath2);
484 storeFilesSize.put(hfilePath2.getName(), 0L);
485 }
486 storeFiles.put(f2, p);
487
488
489 BulkLoadDescriptor desc = ProtobufUtil.toBulkLoadDescriptor(hri.getTable(),
490 ByteStringer.wrap(hri.getEncodedNameAsBytes()), storeFiles, storeFilesSize, 1);
491
492
493 WALEdit logEdit = WALEdit.createBulkLoadEvent(hri, desc);
494 return logEdit;
495 }
496
497 public void testCleanupUnknownPeerZNode() throws Exception {
498 final Server server = new DummyServer("hostname2.example.org");
499 ReplicationQueues rq =
500 ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
501 server);
502 rq.init(server.getServerName().toString());
503
504
505 String group = "testgroup";
506 rq.addLog("2", group + ".log1");
507 rq.addLog("2", group + ".log2");
508
509 NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName().getServerName());
510 w1.run();
511
512
513 for (String peer : manager.getAllQueues()) {
514 assertTrue(peer.startsWith("1"));
515 }
516 }
517
518 static class DummyNodeFailoverWorker extends Thread {
519 private SortedMap<String, SortedSet<String>> logZnodesMap;
520 Server server;
521 private String deadRsZnode;
522 ReplicationQueues rq;
523
524 public DummyNodeFailoverWorker(String znode, Server s) throws Exception {
525 this.deadRsZnode = znode;
526 this.server = s;
527 this.rq =
528 ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
529 server);
530 this.rq.init(this.server.getServerName().toString());
531 }
532
533 @Override
534 public void run() {
535 try {
536 logZnodesMap = rq.claimQueues(deadRsZnode);
537 server.abort("Done with testing", null);
538 } catch (Exception e) {
539 LOG.error("Got exception while running NodeFailoverWorker", e);
540 } finally {
541 latch.countDown();
542 }
543 }
544
545
546
547
548 private int isLogZnodesMapPopulated() {
549 Collection<SortedSet<String>> sets = logZnodesMap.values();
550 if (sets.size() > 1) {
551 throw new RuntimeException("unexpected size of logZnodesMap: " + sets.size());
552 }
553 if (sets.size() == 1) {
554 SortedSet<String> s = sets.iterator().next();
555 for (String file : files) {
556
557 if (!s.contains(file)) {
558 return 0;
559 }
560 }
561 return 1;
562 }
563 return 0;
564 }
565 }
566
567 static class DummyServer implements Server {
568 String hostname;
569
570 DummyServer() {
571 hostname = "hostname.example.org";
572 }
573
574 DummyServer(String hostname) {
575 this.hostname = hostname;
576 }
577
578 @Override
579 public Configuration getConfiguration() {
580 return conf;
581 }
582
583 @Override
584 public ZooKeeperWatcher getZooKeeper() {
585 return zkw;
586 }
587
588 @Override
589 public CoordinatedStateManager getCoordinatedStateManager() {
590 return null;
591 }
592 @Override
593 public ClusterConnection getConnection() {
594 return null;
595 }
596
597 @Override
598 public MetaTableLocator getMetaTableLocator() {
599 return null;
600 }
601
602 @Override
603 public ServerName getServerName() {
604 return ServerName.valueOf(hostname, 1234, 1L);
605 }
606
607 @Override
608 public void abort(String why, Throwable e) {
609
610 }
611
612 @Override
613 public boolean isAborted() {
614 return false;
615 }
616
617 @Override
618 public void stop(String why) {
619
620 }
621
622 @Override
623 public boolean isStopped() {
624 return false;
625 }
626
627 @Override
628 public ChoreService getChoreService() {
629 return null;
630 }
631 }
632 }