View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters;
22  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat;
23  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued;
24  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired;
25  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan;
26  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted;
27  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit;
28  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_dead_server_task;
29  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_failed;
30  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_force;
31  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_threshold_reached;
32  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_unassigned;
33  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_task_deleted;
34  import static org.junit.Assert.assertEquals;
35  import static org.junit.Assert.assertFalse;
36  import static org.junit.Assert.assertTrue;
37  
38  import java.io.IOException;
39  import java.util.List;
40  import java.util.Map;
41  import java.util.UUID;
42  import java.util.concurrent.atomic.AtomicLong;
43  
44  import org.apache.commons.logging.Log;
45  import org.apache.commons.logging.LogFactory;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.fs.FileSystem;
48  import org.apache.hadoop.fs.Path;
49  import org.apache.hadoop.hbase.ChoreService;
50  import org.apache.hadoop.hbase.CoordinatedStateManager;
51  import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
52  import org.apache.hadoop.hbase.HBaseTestingUtility;
53  import org.apache.hadoop.hbase.HConstants;
54  import org.apache.hadoop.hbase.HRegionInfo;
55  import org.apache.hadoop.hbase.testclassification.MediumTests;
56  import org.apache.hadoop.hbase.Server;
57  import org.apache.hadoop.hbase.ServerName;
58  import org.apache.hadoop.hbase.SplitLogCounters;
59  import org.apache.hadoop.hbase.SplitLogTask;
60  import org.apache.hadoop.hbase.Stoppable;
61  import org.apache.hadoop.hbase.Waiter;
62  import org.apache.hadoop.hbase.client.ClusterConnection;
63  import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
64  import org.apache.hadoop.hbase.master.SplitLogManager.Task;
65  import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
66  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
67  import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener;
68  import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
69  import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
70  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
71  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
72  import org.apache.log4j.Level;
73  import org.apache.log4j.Logger;
74  import org.apache.zookeeper.CreateMode;
75  import org.apache.zookeeper.KeeperException;
76  import org.apache.zookeeper.ZooDefs.Ids;
77  import org.junit.After;
78  import org.junit.Assert;
79  import org.junit.Before;
80  import org.junit.Test;
81  import org.junit.experimental.categories.Category;
82  import org.mockito.Mockito;
83  
84  @Category(MediumTests.class)
85  public class TestSplitLogManager {
86    private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class);
87    private final ServerName DUMMY_MASTER = ServerName.valueOf("dummy-master,1,1");
88    private final ServerManager sm = Mockito.mock(ServerManager.class);
89    private final MasterServices master = Mockito.mock(MasterServices.class);
90  
91    static {
92      Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
93    }
94  
95    private ZooKeeperWatcher zkw;
96    private DummyServer ds;
97    private static boolean stopped = false;
98    private SplitLogManager slm;
99    private Configuration conf;
100   private int to;
101   private RecoveryMode mode;
102 
103   private static HBaseTestingUtility TEST_UTIL;
104 
105   class DummyServer implements Server {
106     private ZooKeeperWatcher zkw;
107     private Configuration conf;
108     private CoordinatedStateManager cm;
109 
110     public DummyServer(ZooKeeperWatcher zkw, Configuration conf) {
111       this.zkw = zkw;
112       this.conf = conf;
113       cm = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
114       cm.initialize(this);
115     }
116 
117     @Override
118     public void abort(String why, Throwable e) {
119     }
120 
121     @Override
122     public boolean isAborted() {
123       return false;
124     }
125 
126     @Override
127     public void stop(String why) {
128     }
129 
130     @Override
131     public boolean isStopped() {
132       return false;
133     }
134 
135     @Override
136     public Configuration getConfiguration() {
137       return conf;
138     }
139 
140     @Override
141     public ZooKeeperWatcher getZooKeeper() {
142       return zkw;
143     }
144 
145     @Override
146     public ServerName getServerName() {
147       return null;
148     }
149 
150     @Override
151     public CoordinatedStateManager getCoordinatedStateManager() {
152       return cm;
153     }
154 
155     @Override
156     public ClusterConnection getConnection() {
157       return null;
158     }
159 
160     @Override
161     public MetaTableLocator getMetaTableLocator() {
162       return null;
163     }
164 
165     @Override
166     public ChoreService getChoreService() {
167       return null;
168     }
169   }
170 
171   static Stoppable stopper = new Stoppable() {
172     @Override
173     public void stop(String why) {
174       stopped = true;
175     }
176 
177     @Override
178     public boolean isStopped() {
179       return stopped;
180     }
181   };
182 
183   @Before
184   public void setup() throws Exception {
185     TEST_UTIL = new HBaseTestingUtility();
186     TEST_UTIL.startMiniZKCluster();
187     conf = TEST_UTIL.getConfiguration();
188     // Use a different ZK wrapper instance for each tests.
189     zkw =
190         new ZooKeeperWatcher(conf, "split-log-manager-tests" + UUID.randomUUID().toString(), null);
191     ds = new DummyServer(zkw, conf);
192 
193     ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode);
194     ZKUtil.createAndFailSilent(zkw, zkw.baseZNode);
195     assertTrue(ZKUtil.checkExists(zkw, zkw.baseZNode) != -1);
196     LOG.debug(zkw.baseZNode + " created");
197     ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode);
198     assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1);
199     LOG.debug(zkw.splitLogZNode + " created");
200 
201     stopped = false;
202     resetCounters();
203 
204     // By default, we let the test manage the error as before, so the server
205     // does not appear as dead from the master point of view, only from the split log pov.
206     Mockito.when(sm.isServerOnline(Mockito.any(ServerName.class))).thenReturn(true);
207     Mockito.when(master.getServerManager()).thenReturn(sm);
208 
209     to = 12000;
210     conf.setInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, to);
211     conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to);
212 
213     conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
214     to = to + 4 * 100;
215 
216     this.mode =
217         (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? RecoveryMode.LOG_REPLAY
218             : RecoveryMode.LOG_SPLITTING);
219   }
220 
221   @After
222   public void teardown() throws IOException, KeeperException {
223     stopper.stop("");
224     if (slm != null) slm.stop();
225     TEST_UTIL.shutdownMiniZKCluster();
226   }
227 
228   private interface Expr {
229     long eval();
230   }
231 
232   private void waitForCounter(final AtomicLong ctr, long oldval, long newval, long timems)
233       throws Exception {
234     Expr e = new Expr() {
235       @Override
236       public long eval() {
237         return ctr.get();
238       }
239     };
240     waitForCounter(e, oldval, newval, timems);
241     return;
242   }
243 
244   private void waitForCounter(final Expr e, final long oldval, long newval, long timems)
245       throws Exception {
246 
247     TEST_UTIL.waitFor(timems, 10, new Waiter.Predicate<Exception>() {
248       @Override
249       public boolean evaluate() throws Exception {
250         return (e.eval() != oldval);
251       }
252     });
253 
254     assertEquals(newval, e.eval());
255   }
256 
257   private String submitTaskAndWait(TaskBatch batch, String name) throws KeeperException,
258       InterruptedException {
259     String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name);
260     NodeCreationListener listener = new NodeCreationListener(zkw, tasknode);
261     zkw.registerListener(listener);
262     ZKUtil.watchAndCheckExists(zkw, tasknode);
263 
264     slm.enqueueSplitTask(name, batch);
265     assertEquals(1, batch.installed);
266     assertTrue(slm.findOrCreateOrphanTask(tasknode).batch == batch);
267     assertEquals(1L, tot_mgr_node_create_queued.get());
268 
269     LOG.debug("waiting for task node creation");
270     listener.waitForCreation();
271     LOG.debug("task created");
272     return tasknode;
273   }
274 
275   /**
276    * Test whether the splitlog correctly creates a task in zookeeper
277    * @throws Exception
278    */
279   @Test (timeout=180000)
280   public void testTaskCreation() throws Exception {
281 
282     LOG.info("TestTaskCreation - test the creation of a task in zk");
283     slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
284     TaskBatch batch = new TaskBatch();
285 
286     String tasknode = submitTaskAndWait(batch, "foo/1");
287 
288     byte[] data = ZKUtil.getData(zkw, tasknode);
289     SplitLogTask slt = SplitLogTask.parseFrom(data);
290     LOG.info("Task node created " + slt.toString());
291     assertTrue(slt.isUnassigned(DUMMY_MASTER));
292   }
293 
294   @Test (timeout=180000)
295   public void testOrphanTaskAcquisition() throws Exception {
296     LOG.info("TestOrphanTaskAcquisition");
297 
298     String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
299     SplitLogTask slt = new SplitLogTask.Owned(DUMMY_MASTER, this.mode);
300     zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
301         CreateMode.PERSISTENT);
302 
303     slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
304     waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
305     Task task = slm.findOrCreateOrphanTask(tasknode);
306     assertTrue(task.isOrphan());
307     waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
308     assertFalse(task.isUnassigned());
309     long curt = System.currentTimeMillis();
310     assertTrue((task.last_update <= curt) &&
311         (task.last_update > (curt - 1000)));
312     LOG.info("waiting for manager to resubmit the orphan task");
313     waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
314     assertTrue(task.isUnassigned());
315     waitForCounter(tot_mgr_rescan, 0, 1, to + to/2);
316   }
317 
318   @Test (timeout=180000)
319   public void testUnassignedOrphan() throws Exception {
320     LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" +
321         " startup");
322     String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
323     //create an unassigned orphan task
324     SplitLogTask slt = new SplitLogTask.Unassigned(DUMMY_MASTER, this.mode);
325     zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
326         CreateMode.PERSISTENT);
327     int version = ZKUtil.checkExists(zkw, tasknode);
328 
329     slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
330     waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
331     Task task = slm.findOrCreateOrphanTask(tasknode);
332     assertTrue(task.isOrphan());
333     assertTrue(task.isUnassigned());
334     // wait for RESCAN node to be created
335     waitForCounter(tot_mgr_rescan, 0, 1, to/2);
336     Task task2 = slm.findOrCreateOrphanTask(tasknode);
337     assertTrue(task == task2);
338     LOG.debug("task = " + task);
339     assertEquals(1L, tot_mgr_resubmit.get());
340     assertEquals(1, task.incarnation);
341     assertEquals(0, task.unforcedResubmits.get());
342     assertTrue(task.isOrphan());
343     assertTrue(task.isUnassigned());
344     assertTrue(ZKUtil.checkExists(zkw, tasknode) > version);
345   }
346 
347   @Test (timeout=180000)
348   public void testMultipleResubmits() throws Exception {
349     LOG.info("TestMultipleResbmits - no indefinite resubmissions");
350     conf.setInt("hbase.splitlog.max.resubmit", 2);
351     slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
352     TaskBatch batch = new TaskBatch();
353 
354     String tasknode = submitTaskAndWait(batch, "foo/1");
355     int version = ZKUtil.checkExists(zkw, tasknode);
356     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
357     final ServerName worker2 = ServerName.valueOf("worker2,1,1");
358     final ServerName worker3 = ServerName.valueOf("worker3,1,1");
359     SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
360     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
361     waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
362     waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
363     int version1 = ZKUtil.checkExists(zkw, tasknode);
364     assertTrue(version1 > version);
365     slt = new SplitLogTask.Owned(worker2, this.mode);
366     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
367     waitForCounter(tot_mgr_heartbeat, 1, 2, to/2);
368     waitForCounter(tot_mgr_resubmit, 1, 2, to + to/2);
369     int version2 = ZKUtil.checkExists(zkw, tasknode);
370     assertTrue(version2 > version1);
371     slt = new SplitLogTask.Owned(worker3, this.mode);
372     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
373     waitForCounter(tot_mgr_heartbeat, 2, 3, to/2);
374     waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to/2);
375     Thread.sleep(to + to/2);
376     assertEquals(2L, tot_mgr_resubmit.get() - tot_mgr_resubmit_force.get());
377   }
378 
379   @Test (timeout=180000)
380   public void testRescanCleanup() throws Exception {
381     LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
382 
383     slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
384     TaskBatch batch = new TaskBatch();
385 
386     String tasknode = submitTaskAndWait(batch, "foo/1");
387     int version = ZKUtil.checkExists(zkw, tasknode);
388     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
389     SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
390     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
391     waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
392     waitForCounter(new Expr() {
393       @Override
394       public long eval() {
395         return (tot_mgr_resubmit.get() + tot_mgr_resubmit_failed.get());
396       }
397     }, 0, 1, 5*60000); // wait long enough
398     Assert.assertEquals("Could not run test. Lost ZK connection?", 0, tot_mgr_resubmit_failed.get());
399     int version1 = ZKUtil.checkExists(zkw, tasknode);
400     assertTrue(version1 > version);
401     byte[] taskstate = ZKUtil.getData(zkw, tasknode);
402     slt = SplitLogTask.parseFrom(taskstate);
403     assertTrue(slt.isUnassigned(DUMMY_MASTER));
404 
405     waitForCounter(tot_mgr_rescan_deleted, 0, 1, to/2);
406   }
407 
408   @Test (timeout=180000)
409   public void testTaskDone() throws Exception {
410     LOG.info("TestTaskDone - cleanup task node once in DONE state");
411 
412     slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
413     TaskBatch batch = new TaskBatch();
414     String tasknode = submitTaskAndWait(batch, "foo/1");
415     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
416     SplitLogTask slt = new SplitLogTask.Done(worker1, this.mode);
417     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
418     synchronized (batch) {
419       while (batch.installed != batch.done) {
420         batch.wait();
421       }
422     }
423     waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
424     assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
425   }
426 
427   @Test (timeout=180000)
428   public void testTaskErr() throws Exception {
429     LOG.info("TestTaskErr - cleanup task node once in ERR state");
430 
431     conf.setInt("hbase.splitlog.max.resubmit", 0);
432     slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
433     TaskBatch batch = new TaskBatch();
434 
435     String tasknode = submitTaskAndWait(batch, "foo/1");
436     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
437     SplitLogTask slt = new SplitLogTask.Err(worker1, this.mode);
438     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
439 
440     synchronized (batch) {
441       while (batch.installed != batch.error) {
442         batch.wait();
443       }
444     }
445     waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
446     assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
447     conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination.DEFAULT_MAX_RESUBMIT);
448   }
449 
450   @Test (timeout=180000)
451   public void testTaskResigned() throws Exception {
452     LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
453     assertEquals(tot_mgr_resubmit.get(), 0);
454     slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
455     assertEquals(tot_mgr_resubmit.get(), 0);
456     TaskBatch batch = new TaskBatch();
457     String tasknode = submitTaskAndWait(batch, "foo/1");
458     assertEquals(tot_mgr_resubmit.get(), 0);
459     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
460     assertEquals(tot_mgr_resubmit.get(), 0);
461     SplitLogTask slt = new SplitLogTask.Resigned(worker1, this.mode);
462     assertEquals(tot_mgr_resubmit.get(), 0);
463     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
464     int version = ZKUtil.checkExists(zkw, tasknode);
465     // Could be small race here.
466     if (tot_mgr_resubmit.get() == 0) {
467       waitForCounter(tot_mgr_resubmit, 0, 1, to/2);
468     }
469     assertEquals(tot_mgr_resubmit.get(), 1);
470 
471     byte[] taskstate = ZKUtil.getData(zkw, tasknode);
472     slt = SplitLogTask.parseFrom(taskstate);
473     assertTrue(slt.isUnassigned(DUMMY_MASTER));
474   }
475 
476   @Test (timeout=180000)
477   public void testUnassignedTimeout() throws Exception {
478     LOG.info("TestUnassignedTimeout - iff all tasks are unassigned then" +
479         " resubmit");
480 
481     // create an orphan task in OWNED state
482     String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1");
483     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
484     SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
485     zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
486         CreateMode.PERSISTENT);
487 
488     slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
489     waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
490 
491     // submit another task which will stay in unassigned mode
492     TaskBatch batch = new TaskBatch();
493     submitTaskAndWait(batch, "foo/1");
494 
495     // keep updating the orphan owned node every to/2 seconds
496     for (int i = 0; i < (3 * to)/100; i++) {
497       Thread.sleep(100);
498       final ServerName worker2 = ServerName.valueOf("worker1,1,1");
499       slt = new SplitLogTask.Owned(worker2, this.mode);
500       ZKUtil.setData(zkw, tasknode1, slt.toByteArray());
501     }
502 
503     // since we have stopped heartbeating the owned node therefore it should
504     // get resubmitted
505     LOG.info("waiting for manager to resubmit the orphan task");
506     waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
507 
508     // now all the nodes are unassigned. manager should post another rescan
509     waitForCounter(tot_mgr_resubmit_unassigned, 0, 1, 2 * to + to/2);
510   }
511 
512   @Test (timeout=180000)
513   public void testDeadWorker() throws Exception {
514     LOG.info("testDeadWorker");
515 
516     conf.setLong("hbase.splitlog.max.resubmit", 0);
517     slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
518     TaskBatch batch = new TaskBatch();
519 
520     String tasknode = submitTaskAndWait(batch, "foo/1");
521     int version = ZKUtil.checkExists(zkw, tasknode);
522     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
523     SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
524     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
525     if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
526     slm.handleDeadWorker(worker1);
527     if (tot_mgr_resubmit.get() == 0) waitForCounter(tot_mgr_resubmit, 0, 1, to+to/2);
528     if (tot_mgr_resubmit_dead_server_task.get() == 0) {
529       waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, to + to/2);
530     }
531 
532     int version1 = ZKUtil.checkExists(zkw, tasknode);
533     assertTrue(version1 > version);
534     byte[] taskstate = ZKUtil.getData(zkw, tasknode);
535     slt = SplitLogTask.parseFrom(taskstate);
536     assertTrue(slt.isUnassigned(DUMMY_MASTER));
537     return;
538   }
539 
540   @Test (timeout=180000)
541   public void testWorkerCrash() throws Exception {
542     slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
543     TaskBatch batch = new TaskBatch();
544 
545     String tasknode = submitTaskAndWait(batch, "foo/1");
546     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
547 
548     SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
549     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
550     if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
551 
552     // Not yet resubmitted.
553     Assert.assertEquals(0, tot_mgr_resubmit.get());
554 
555     // This server becomes dead
556     Mockito.when(sm.isServerOnline(worker1)).thenReturn(false);
557 
558     Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded).
559 
560     // It has been resubmitted
561     Assert.assertEquals(1, tot_mgr_resubmit.get());
562   }
563 
564   @Test (timeout=180000)
565   public void testEmptyLogDir() throws Exception {
566     LOG.info("testEmptyLogDir");
567     slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
568     FileSystem fs = TEST_UTIL.getTestFileSystem();
569     Path emptyLogDirPath = new Path(fs.getWorkingDirectory(),
570         UUID.randomUUID().toString());
571     fs.mkdirs(emptyLogDirPath);
572     slm.splitLogDistributed(emptyLogDirPath);
573     assertFalse(fs.exists(emptyLogDirPath));
574   }
575 
576   @Test (timeout = 60000)
577   public void testLogFilesAreArchived() throws Exception {
578     LOG.info("testLogFilesAreArchived");
579     final SplitLogManager slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
580     FileSystem fs = TEST_UTIL.getTestFileSystem();
581     Path dir = TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived");
582     conf.set(HConstants.HBASE_DIR, dir.toString());
583     Path logDirPath = new Path(dir, UUID.randomUUID().toString());
584     fs.mkdirs(logDirPath);
585     // create an empty log file
586     String logFile = ServerName.valueOf("foo", 1, 1).toString();
587     fs.create(new Path(logDirPath, logFile)).close();
588 
589     // spin up a thread mocking split done.
590     new Thread() {
591       @Override
592       public void run() {
593         boolean done = false;
594         while (!done) {
595           for (Map.Entry<String, Task> entry : slm.getTasks().entrySet()) {
596             final ServerName worker1 = ServerName.valueOf("worker1,1,1");
597             SplitLogTask slt = new SplitLogTask.Done(worker1, RecoveryMode.LOG_SPLITTING);
598             boolean encounteredZKException = false;
599             try {
600               ZKUtil.setData(zkw, entry.getKey(), slt.toByteArray());
601             } catch (KeeperException e) {
602               LOG.warn(e);
603               encounteredZKException = true;
604             }
605             if (!encounteredZKException) {
606               done = true;
607             }
608           }
609         }
610       };
611     }.start();
612 
613     slm.splitLogDistributed(logDirPath);
614 
615     assertFalse(fs.exists(logDirPath));
616   }
617 
618   /**
619    * The following test case is aiming to test the situation when distributedLogReplay is turned off
620    * and restart a cluster there should no recovery regions in ZK left.
621    * @throws Exception
622    */
623   @Test(timeout = 300000)
624   public void testRecoveryRegionRemovedFromZK() throws Exception {
625     LOG.info("testRecoveryRegionRemovedFromZK");
626     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
627     String nodePath =
628         ZKUtil.joinZNode(zkw.recoveringRegionsZNode,
629           HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
630     ZKUtil.createSetData(zkw, nodePath, ZKUtil.positionToByteArray(0L));
631 
632     slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
633     slm.removeStaleRecoveringRegions(null);
634 
635     List<String> recoveringRegions =
636         zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false);
637 
638     assertTrue("Recovery regions isn't cleaned", recoveringRegions.isEmpty());
639   }
640 
641   @Test(timeout=60000)
642   public void testGetPreviousRecoveryMode() throws Exception {
643     LOG.info("testGetPreviousRecoveryMode");
644     SplitLogCounters.resetCounters();
645     // Not actually enabling DLR for the cluster, just for the ZkCoordinatedStateManager to use.
646     // The test is just manipulating ZK manually anyways.
647     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
648 
649     zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"),
650       new SplitLogTask.Unassigned(
651         ServerName.valueOf("mgr,1,1"), RecoveryMode.LOG_SPLITTING).toByteArray(),
652         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
653 
654     slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
655     LOG.info("Mode1=" + slm.getRecoveryMode());
656     assertTrue(slm.isLogSplitting());
657     zkw.getRecoverableZooKeeper().delete(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), -1);
658     LOG.info("Mode2=" + slm.getRecoveryMode());
659     slm.setRecoveryMode(false);
660     LOG.info("Mode3=" + slm.getRecoveryMode());
661     assertTrue("Mode4=" + slm.getRecoveryMode(), slm.isLogReplaying());
662   }
663 }