1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters;
22 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat;
23 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued;
24 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired;
25 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan;
26 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted;
27 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit;
28 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_dead_server_task;
29 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_failed;
30 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_force;
31 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_threshold_reached;
32 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_unassigned;
33 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_task_deleted;
34 import static org.junit.Assert.assertEquals;
35 import static org.junit.Assert.assertFalse;
36 import static org.junit.Assert.assertTrue;
37
38 import java.io.IOException;
39 import java.util.List;
40 import java.util.Map;
41 import java.util.UUID;
42 import java.util.concurrent.atomic.AtomicLong;
43
44 import org.apache.commons.logging.Log;
45 import org.apache.commons.logging.LogFactory;
46 import org.apache.hadoop.conf.Configuration;
47 import org.apache.hadoop.fs.FileSystem;
48 import org.apache.hadoop.fs.Path;
49 import org.apache.hadoop.hbase.ChoreService;
50 import org.apache.hadoop.hbase.CoordinatedStateManager;
51 import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
52 import org.apache.hadoop.hbase.HBaseTestingUtility;
53 import org.apache.hadoop.hbase.HConstants;
54 import org.apache.hadoop.hbase.HRegionInfo;
55 import org.apache.hadoop.hbase.testclassification.MediumTests;
56 import org.apache.hadoop.hbase.Server;
57 import org.apache.hadoop.hbase.ServerName;
58 import org.apache.hadoop.hbase.SplitLogCounters;
59 import org.apache.hadoop.hbase.SplitLogTask;
60 import org.apache.hadoop.hbase.Stoppable;
61 import org.apache.hadoop.hbase.Waiter;
62 import org.apache.hadoop.hbase.client.ClusterConnection;
63 import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
64 import org.apache.hadoop.hbase.master.SplitLogManager.Task;
65 import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
66 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
67 import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener;
68 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
69 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
70 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
71 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
72 import org.apache.log4j.Level;
73 import org.apache.log4j.Logger;
74 import org.apache.zookeeper.CreateMode;
75 import org.apache.zookeeper.KeeperException;
76 import org.apache.zookeeper.ZooDefs.Ids;
77 import org.junit.After;
78 import org.junit.Assert;
79 import org.junit.Before;
80 import org.junit.Test;
81 import org.junit.experimental.categories.Category;
82 import org.mockito.Mockito;
83
84 @Category(MediumTests.class)
85 public class TestSplitLogManager {
86 private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class);
87 private final ServerName DUMMY_MASTER = ServerName.valueOf("dummy-master,1,1");
88 private final ServerManager sm = Mockito.mock(ServerManager.class);
89 private final MasterServices master = Mockito.mock(MasterServices.class);
90
91 static {
92 Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
93 }
94
95 private ZooKeeperWatcher zkw;
96 private DummyServer ds;
97 private static boolean stopped = false;
98 private SplitLogManager slm;
99 private Configuration conf;
100 private int to;
101 private RecoveryMode mode;
102
103 private static HBaseTestingUtility TEST_UTIL;
104
105 class DummyServer implements Server {
106 private ZooKeeperWatcher zkw;
107 private Configuration conf;
108 private CoordinatedStateManager cm;
109
110 public DummyServer(ZooKeeperWatcher zkw, Configuration conf) {
111 this.zkw = zkw;
112 this.conf = conf;
113 cm = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
114 cm.initialize(this);
115 }
116
117 @Override
118 public void abort(String why, Throwable e) {
119 }
120
121 @Override
122 public boolean isAborted() {
123 return false;
124 }
125
126 @Override
127 public void stop(String why) {
128 }
129
130 @Override
131 public boolean isStopped() {
132 return false;
133 }
134
135 @Override
136 public Configuration getConfiguration() {
137 return conf;
138 }
139
140 @Override
141 public ZooKeeperWatcher getZooKeeper() {
142 return zkw;
143 }
144
145 @Override
146 public ServerName getServerName() {
147 return null;
148 }
149
150 @Override
151 public CoordinatedStateManager getCoordinatedStateManager() {
152 return cm;
153 }
154
155 @Override
156 public ClusterConnection getConnection() {
157 return null;
158 }
159
160 @Override
161 public MetaTableLocator getMetaTableLocator() {
162 return null;
163 }
164
165 @Override
166 public ChoreService getChoreService() {
167 return null;
168 }
169 }
170
171 static Stoppable stopper = new Stoppable() {
172 @Override
173 public void stop(String why) {
174 stopped = true;
175 }
176
177 @Override
178 public boolean isStopped() {
179 return stopped;
180 }
181 };
182
183 @Before
184 public void setup() throws Exception {
185 TEST_UTIL = new HBaseTestingUtility();
186 TEST_UTIL.startMiniZKCluster();
187 conf = TEST_UTIL.getConfiguration();
188
189 zkw =
190 new ZooKeeperWatcher(conf, "split-log-manager-tests" + UUID.randomUUID().toString(), null);
191 ds = new DummyServer(zkw, conf);
192
193 ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode);
194 ZKUtil.createAndFailSilent(zkw, zkw.baseZNode);
195 assertTrue(ZKUtil.checkExists(zkw, zkw.baseZNode) != -1);
196 LOG.debug(zkw.baseZNode + " created");
197 ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode);
198 assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1);
199 LOG.debug(zkw.splitLogZNode + " created");
200
201 stopped = false;
202 resetCounters();
203
204
205
206 Mockito.when(sm.isServerOnline(Mockito.any(ServerName.class))).thenReturn(true);
207 Mockito.when(master.getServerManager()).thenReturn(sm);
208
209 to = 12000;
210 conf.setInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, to);
211 conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to);
212
213 conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
214 to = to + 4 * 100;
215
216 this.mode =
217 (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? RecoveryMode.LOG_REPLAY
218 : RecoveryMode.LOG_SPLITTING);
219 }
220
221 @After
222 public void teardown() throws IOException, KeeperException {
223 stopper.stop("");
224 if (slm != null) slm.stop();
225 TEST_UTIL.shutdownMiniZKCluster();
226 }
227
228 private interface Expr {
229 long eval();
230 }
231
232 private void waitForCounter(final AtomicLong ctr, long oldval, long newval, long timems)
233 throws Exception {
234 Expr e = new Expr() {
235 @Override
236 public long eval() {
237 return ctr.get();
238 }
239 };
240 waitForCounter(e, oldval, newval, timems);
241 return;
242 }
243
244 private void waitForCounter(final Expr e, final long oldval, long newval, long timems)
245 throws Exception {
246
247 TEST_UTIL.waitFor(timems, 10, new Waiter.Predicate<Exception>() {
248 @Override
249 public boolean evaluate() throws Exception {
250 return (e.eval() != oldval);
251 }
252 });
253
254 assertEquals(newval, e.eval());
255 }
256
257 private String submitTaskAndWait(TaskBatch batch, String name) throws KeeperException,
258 InterruptedException {
259 String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name);
260 NodeCreationListener listener = new NodeCreationListener(zkw, tasknode);
261 zkw.registerListener(listener);
262 ZKUtil.watchAndCheckExists(zkw, tasknode);
263
264 slm.enqueueSplitTask(name, batch);
265 assertEquals(1, batch.installed);
266 assertTrue(slm.findOrCreateOrphanTask(tasknode).batch == batch);
267 assertEquals(1L, tot_mgr_node_create_queued.get());
268
269 LOG.debug("waiting for task node creation");
270 listener.waitForCreation();
271 LOG.debug("task created");
272 return tasknode;
273 }
274
275
276
277
278
279 @Test (timeout=180000)
280 public void testTaskCreation() throws Exception {
281
282 LOG.info("TestTaskCreation - test the creation of a task in zk");
283 slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
284 TaskBatch batch = new TaskBatch();
285
286 String tasknode = submitTaskAndWait(batch, "foo/1");
287
288 byte[] data = ZKUtil.getData(zkw, tasknode);
289 SplitLogTask slt = SplitLogTask.parseFrom(data);
290 LOG.info("Task node created " + slt.toString());
291 assertTrue(slt.isUnassigned(DUMMY_MASTER));
292 }
293
294 @Test (timeout=180000)
295 public void testOrphanTaskAcquisition() throws Exception {
296 LOG.info("TestOrphanTaskAcquisition");
297
298 String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
299 SplitLogTask slt = new SplitLogTask.Owned(DUMMY_MASTER, this.mode);
300 zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
301 CreateMode.PERSISTENT);
302
303 slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
304 waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
305 Task task = slm.findOrCreateOrphanTask(tasknode);
306 assertTrue(task.isOrphan());
307 waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
308 assertFalse(task.isUnassigned());
309 long curt = System.currentTimeMillis();
310 assertTrue((task.last_update <= curt) &&
311 (task.last_update > (curt - 1000)));
312 LOG.info("waiting for manager to resubmit the orphan task");
313 waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
314 assertTrue(task.isUnassigned());
315 waitForCounter(tot_mgr_rescan, 0, 1, to + to/2);
316 }
317
318 @Test (timeout=180000)
319 public void testUnassignedOrphan() throws Exception {
320 LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" +
321 " startup");
322 String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
323
324 SplitLogTask slt = new SplitLogTask.Unassigned(DUMMY_MASTER, this.mode);
325 zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
326 CreateMode.PERSISTENT);
327 int version = ZKUtil.checkExists(zkw, tasknode);
328
329 slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
330 waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
331 Task task = slm.findOrCreateOrphanTask(tasknode);
332 assertTrue(task.isOrphan());
333 assertTrue(task.isUnassigned());
334
335 waitForCounter(tot_mgr_rescan, 0, 1, to/2);
336 Task task2 = slm.findOrCreateOrphanTask(tasknode);
337 assertTrue(task == task2);
338 LOG.debug("task = " + task);
339 assertEquals(1L, tot_mgr_resubmit.get());
340 assertEquals(1, task.incarnation);
341 assertEquals(0, task.unforcedResubmits.get());
342 assertTrue(task.isOrphan());
343 assertTrue(task.isUnassigned());
344 assertTrue(ZKUtil.checkExists(zkw, tasknode) > version);
345 }
346
347 @Test (timeout=180000)
348 public void testMultipleResubmits() throws Exception {
349 LOG.info("TestMultipleResbmits - no indefinite resubmissions");
350 conf.setInt("hbase.splitlog.max.resubmit", 2);
351 slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
352 TaskBatch batch = new TaskBatch();
353
354 String tasknode = submitTaskAndWait(batch, "foo/1");
355 int version = ZKUtil.checkExists(zkw, tasknode);
356 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
357 final ServerName worker2 = ServerName.valueOf("worker2,1,1");
358 final ServerName worker3 = ServerName.valueOf("worker3,1,1");
359 SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
360 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
361 waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
362 waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
363 int version1 = ZKUtil.checkExists(zkw, tasknode);
364 assertTrue(version1 > version);
365 slt = new SplitLogTask.Owned(worker2, this.mode);
366 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
367 waitForCounter(tot_mgr_heartbeat, 1, 2, to/2);
368 waitForCounter(tot_mgr_resubmit, 1, 2, to + to/2);
369 int version2 = ZKUtil.checkExists(zkw, tasknode);
370 assertTrue(version2 > version1);
371 slt = new SplitLogTask.Owned(worker3, this.mode);
372 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
373 waitForCounter(tot_mgr_heartbeat, 2, 3, to/2);
374 waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to/2);
375 Thread.sleep(to + to/2);
376 assertEquals(2L, tot_mgr_resubmit.get() - tot_mgr_resubmit_force.get());
377 }
378
379 @Test (timeout=180000)
380 public void testRescanCleanup() throws Exception {
381 LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
382
383 slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
384 TaskBatch batch = new TaskBatch();
385
386 String tasknode = submitTaskAndWait(batch, "foo/1");
387 int version = ZKUtil.checkExists(zkw, tasknode);
388 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
389 SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
390 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
391 waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
392 waitForCounter(new Expr() {
393 @Override
394 public long eval() {
395 return (tot_mgr_resubmit.get() + tot_mgr_resubmit_failed.get());
396 }
397 }, 0, 1, 5*60000);
398 Assert.assertEquals("Could not run test. Lost ZK connection?", 0, tot_mgr_resubmit_failed.get());
399 int version1 = ZKUtil.checkExists(zkw, tasknode);
400 assertTrue(version1 > version);
401 byte[] taskstate = ZKUtil.getData(zkw, tasknode);
402 slt = SplitLogTask.parseFrom(taskstate);
403 assertTrue(slt.isUnassigned(DUMMY_MASTER));
404
405 waitForCounter(tot_mgr_rescan_deleted, 0, 1, to/2);
406 }
407
408 @Test (timeout=180000)
409 public void testTaskDone() throws Exception {
410 LOG.info("TestTaskDone - cleanup task node once in DONE state");
411
412 slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
413 TaskBatch batch = new TaskBatch();
414 String tasknode = submitTaskAndWait(batch, "foo/1");
415 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
416 SplitLogTask slt = new SplitLogTask.Done(worker1, this.mode);
417 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
418 synchronized (batch) {
419 while (batch.installed != batch.done) {
420 batch.wait();
421 }
422 }
423 waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
424 assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
425 }
426
427 @Test (timeout=180000)
428 public void testTaskErr() throws Exception {
429 LOG.info("TestTaskErr - cleanup task node once in ERR state");
430
431 conf.setInt("hbase.splitlog.max.resubmit", 0);
432 slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
433 TaskBatch batch = new TaskBatch();
434
435 String tasknode = submitTaskAndWait(batch, "foo/1");
436 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
437 SplitLogTask slt = new SplitLogTask.Err(worker1, this.mode);
438 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
439
440 synchronized (batch) {
441 while (batch.installed != batch.error) {
442 batch.wait();
443 }
444 }
445 waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
446 assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
447 conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination.DEFAULT_MAX_RESUBMIT);
448 }
449
450 @Test (timeout=180000)
451 public void testTaskResigned() throws Exception {
452 LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
453 assertEquals(tot_mgr_resubmit.get(), 0);
454 slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
455 assertEquals(tot_mgr_resubmit.get(), 0);
456 TaskBatch batch = new TaskBatch();
457 String tasknode = submitTaskAndWait(batch, "foo/1");
458 assertEquals(tot_mgr_resubmit.get(), 0);
459 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
460 assertEquals(tot_mgr_resubmit.get(), 0);
461 SplitLogTask slt = new SplitLogTask.Resigned(worker1, this.mode);
462 assertEquals(tot_mgr_resubmit.get(), 0);
463 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
464 int version = ZKUtil.checkExists(zkw, tasknode);
465
466 if (tot_mgr_resubmit.get() == 0) {
467 waitForCounter(tot_mgr_resubmit, 0, 1, to/2);
468 }
469 assertEquals(tot_mgr_resubmit.get(), 1);
470
471 byte[] taskstate = ZKUtil.getData(zkw, tasknode);
472 slt = SplitLogTask.parseFrom(taskstate);
473 assertTrue(slt.isUnassigned(DUMMY_MASTER));
474 }
475
476 @Test (timeout=180000)
477 public void testUnassignedTimeout() throws Exception {
478 LOG.info("TestUnassignedTimeout - iff all tasks are unassigned then" +
479 " resubmit");
480
481
482 String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1");
483 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
484 SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
485 zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
486 CreateMode.PERSISTENT);
487
488 slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
489 waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
490
491
492 TaskBatch batch = new TaskBatch();
493 submitTaskAndWait(batch, "foo/1");
494
495
496 for (int i = 0; i < (3 * to)/100; i++) {
497 Thread.sleep(100);
498 final ServerName worker2 = ServerName.valueOf("worker1,1,1");
499 slt = new SplitLogTask.Owned(worker2, this.mode);
500 ZKUtil.setData(zkw, tasknode1, slt.toByteArray());
501 }
502
503
504
505 LOG.info("waiting for manager to resubmit the orphan task");
506 waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
507
508
509 waitForCounter(tot_mgr_resubmit_unassigned, 0, 1, 2 * to + to/2);
510 }
511
512 @Test (timeout=180000)
513 public void testDeadWorker() throws Exception {
514 LOG.info("testDeadWorker");
515
516 conf.setLong("hbase.splitlog.max.resubmit", 0);
517 slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
518 TaskBatch batch = new TaskBatch();
519
520 String tasknode = submitTaskAndWait(batch, "foo/1");
521 int version = ZKUtil.checkExists(zkw, tasknode);
522 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
523 SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
524 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
525 if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
526 slm.handleDeadWorker(worker1);
527 if (tot_mgr_resubmit.get() == 0) waitForCounter(tot_mgr_resubmit, 0, 1, to+to/2);
528 if (tot_mgr_resubmit_dead_server_task.get() == 0) {
529 waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, to + to/2);
530 }
531
532 int version1 = ZKUtil.checkExists(zkw, tasknode);
533 assertTrue(version1 > version);
534 byte[] taskstate = ZKUtil.getData(zkw, tasknode);
535 slt = SplitLogTask.parseFrom(taskstate);
536 assertTrue(slt.isUnassigned(DUMMY_MASTER));
537 return;
538 }
539
540 @Test (timeout=180000)
541 public void testWorkerCrash() throws Exception {
542 slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
543 TaskBatch batch = new TaskBatch();
544
545 String tasknode = submitTaskAndWait(batch, "foo/1");
546 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
547
548 SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
549 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
550 if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
551
552
553 Assert.assertEquals(0, tot_mgr_resubmit.get());
554
555
556 Mockito.when(sm.isServerOnline(worker1)).thenReturn(false);
557
558 Thread.sleep(1300);
559
560
561 Assert.assertEquals(1, tot_mgr_resubmit.get());
562 }
563
564 @Test (timeout=180000)
565 public void testEmptyLogDir() throws Exception {
566 LOG.info("testEmptyLogDir");
567 slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
568 FileSystem fs = TEST_UTIL.getTestFileSystem();
569 Path emptyLogDirPath = new Path(fs.getWorkingDirectory(),
570 UUID.randomUUID().toString());
571 fs.mkdirs(emptyLogDirPath);
572 slm.splitLogDistributed(emptyLogDirPath);
573 assertFalse(fs.exists(emptyLogDirPath));
574 }
575
576 @Test (timeout = 60000)
577 public void testLogFilesAreArchived() throws Exception {
578 LOG.info("testLogFilesAreArchived");
579 final SplitLogManager slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
580 FileSystem fs = TEST_UTIL.getTestFileSystem();
581 Path dir = TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived");
582 conf.set(HConstants.HBASE_DIR, dir.toString());
583 Path logDirPath = new Path(dir, UUID.randomUUID().toString());
584 fs.mkdirs(logDirPath);
585
586 String logFile = ServerName.valueOf("foo", 1, 1).toString();
587 fs.create(new Path(logDirPath, logFile)).close();
588
589
590 new Thread() {
591 @Override
592 public void run() {
593 boolean done = false;
594 while (!done) {
595 for (Map.Entry<String, Task> entry : slm.getTasks().entrySet()) {
596 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
597 SplitLogTask slt = new SplitLogTask.Done(worker1, RecoveryMode.LOG_SPLITTING);
598 boolean encounteredZKException = false;
599 try {
600 ZKUtil.setData(zkw, entry.getKey(), slt.toByteArray());
601 } catch (KeeperException e) {
602 LOG.warn(e);
603 encounteredZKException = true;
604 }
605 if (!encounteredZKException) {
606 done = true;
607 }
608 }
609 }
610 };
611 }.start();
612
613 slm.splitLogDistributed(logDirPath);
614
615 assertFalse(fs.exists(logDirPath));
616 }
617
618
619
620
621
622
623 @Test(timeout = 300000)
624 public void testRecoveryRegionRemovedFromZK() throws Exception {
625 LOG.info("testRecoveryRegionRemovedFromZK");
626 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
627 String nodePath =
628 ZKUtil.joinZNode(zkw.recoveringRegionsZNode,
629 HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
630 ZKUtil.createSetData(zkw, nodePath, ZKUtil.positionToByteArray(0L));
631
632 slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
633 slm.removeStaleRecoveringRegions(null);
634
635 List<String> recoveringRegions =
636 zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false);
637
638 assertTrue("Recovery regions isn't cleaned", recoveringRegions.isEmpty());
639 }
640
641 @Test(timeout=60000)
642 public void testGetPreviousRecoveryMode() throws Exception {
643 LOG.info("testGetPreviousRecoveryMode");
644 SplitLogCounters.resetCounters();
645
646
647 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
648
649 zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"),
650 new SplitLogTask.Unassigned(
651 ServerName.valueOf("mgr,1,1"), RecoveryMode.LOG_SPLITTING).toByteArray(),
652 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
653
654 slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
655 LOG.info("Mode1=" + slm.getRecoveryMode());
656 assertTrue(slm.isLogSplitting());
657 zkw.getRecoverableZooKeeper().delete(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), -1);
658 LOG.info("Mode2=" + slm.getRecoveryMode());
659 slm.setRecoveryMode(false);
660 LOG.info("Mode3=" + slm.getRecoveryMode());
661 assertTrue("Mode4=" + slm.getRecoveryMode(), slm.isLogReplaying());
662 }
663 }