View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.procedure2;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  import java.util.concurrent.atomic.AtomicBoolean;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.fs.FileSystem;
29  import org.apache.hadoop.fs.FileStatus;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
32  import org.apache.hadoop.hbase.ProcedureInfo;
33  import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
34  import org.apache.hadoop.hbase.testclassification.SmallTests;
35  import org.apache.hadoop.hbase.util.Bytes;
36  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
37  import org.apache.hadoop.hbase.util.Threads;
38  import org.junit.After;
39  import org.junit.Before;
40  import org.junit.Test;
41  import org.junit.experimental.categories.Category;
42  
43  import static org.junit.Assert.assertEquals;
44  import static org.junit.Assert.assertFalse;
45  import static org.junit.Assert.assertTrue;
46  
47  @Category(SmallTests.class)
48  public class TestProcedureRecovery {
49    private static final Log LOG = LogFactory.getLog(TestProcedureRecovery.class);
50  
51    private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
52  
53    private static ProcedureExecutor<Void> procExecutor;
54    private static ProcedureStore procStore;
55    private static int procSleepInterval;
56  
57    private HBaseCommonTestingUtility htu;
58    private FileSystem fs;
59    private Path testDir;
60    private Path logDir;
61  
62    @Before
63    public void setUp() throws IOException {
64      htu = new HBaseCommonTestingUtility();
65      testDir = htu.getDataTestDir();
66      fs = testDir.getFileSystem(htu.getConfiguration());
67      assertTrue(testDir.depth() > 1);
68  
69      logDir = new Path(testDir, "proc-logs");
70      procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir);
71      procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore);
72      procExecutor.testing = new ProcedureExecutor.Testing();
73      procStore.start(PROCEDURE_EXECUTOR_SLOTS);
74      procExecutor.start(PROCEDURE_EXECUTOR_SLOTS);
75      procSleepInterval = 0;
76  
77      ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, false);
78      ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, false);
79    }
80  
81    @After
82    public void tearDown() throws IOException {
83      procExecutor.stop();
84      procStore.stop(false);
85      fs.delete(logDir, true);
86    }
87  
88    private void restart() throws Exception {
89      dumpLogDirState();
90      ProcedureTestingUtility.restart(procExecutor);
91      dumpLogDirState();
92    }
93  
94    public static class TestSingleStepProcedure extends SequentialProcedure<Void> {
95      private int step = 0;
96  
97      public TestSingleStepProcedure() { }
98  
99      @Override
100     protected Procedure[] execute(Void env) {
101       LOG.debug("execute procedure " + this + " step=" + step);
102       step++;
103       setResult(Bytes.toBytes(step));
104       return null;
105     }
106 
107     @Override
108     protected void rollback(Void env) { }
109 
110     @Override
111     protected boolean abort(Void env) { return true; }
112   }
113 
114   public static class BaseTestStepProcedure extends SequentialProcedure<Void> {
115     private AtomicBoolean abort = new AtomicBoolean(false);
116     private int step = 0;
117 
118     @Override
119     protected Procedure[] execute(Void env) {
120       LOG.debug("execute procedure " + this + " step=" + step);
121       ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
122       step++;
123       Threads.sleepWithoutInterrupt(procSleepInterval);
124       if (isAborted()) {
125         setFailure(new RemoteProcedureException(getClass().getName(),
126           new ProcedureAbortedException(
127             "got an abort at " + getClass().getName() + " step=" + step)));
128         return null;
129       }
130       return null;
131     }
132 
133     @Override
134     protected void rollback(Void env) {
135       LOG.debug("rollback procedure " + this + " step=" + step);
136       ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
137       step++;
138     }
139 
140     @Override
141     protected boolean abort(Void env) {
142       abort.set(true);
143       return true;
144     }
145 
146     private boolean isAborted() {
147       boolean aborted = abort.get();
148       BaseTestStepProcedure proc = this;
149       while (proc.hasParent() && !aborted) {
150         proc = (BaseTestStepProcedure)procExecutor.getProcedure(proc.getParentProcId());
151         aborted = proc.isAborted();
152       }
153       return aborted;
154     }
155   }
156 
157   public static class TestMultiStepProcedure extends BaseTestStepProcedure {
158     public TestMultiStepProcedure() { }
159 
160     @Override
161     public Procedure[] execute(Void env) {
162       super.execute(env);
163       return isFailed() ? null : new Procedure[] { new Step1Procedure() };
164     }
165 
166     public static class Step1Procedure extends BaseTestStepProcedure {
167       public Step1Procedure() { }
168 
169       @Override
170       protected Procedure[] execute(Void env) {
171         super.execute(env);
172         return isFailed() ? null : new Procedure[] { new Step2Procedure() };
173       }
174     }
175 
176     public static class Step2Procedure extends BaseTestStepProcedure {
177       public Step2Procedure() { }
178     }
179   }
180 
181   @Test
182   public void testNoopLoad() throws Exception {
183     restart();
184   }
185 
186   @Test(timeout=30000)
187   public void testSingleStepProcRecovery() throws Exception {
188     Procedure proc = new TestSingleStepProcedure();
189     procExecutor.testing.killBeforeStoreUpdate = true;
190     long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
191     assertFalse(procExecutor.isRunning());
192     procExecutor.testing.killBeforeStoreUpdate = false;
193 
194     // Restart and verify that the procedures restart
195     long restartTs = EnvironmentEdgeManager.currentTime();
196     restart();
197     waitProcedure(procId);
198     ProcedureInfo result = procExecutor.getResult(procId);
199     assertTrue(result.getLastUpdate() > restartTs);
200     ProcedureTestingUtility.assertProcNotFailed(result);
201     assertEquals(1, Bytes.toInt(result.getResult()));
202     long resultTs = result.getLastUpdate();
203 
204     // Verify that after another restart the result is still there
205     restart();
206     result = procExecutor.getResult(procId);
207     ProcedureTestingUtility.assertProcNotFailed(result);
208     assertEquals(resultTs, result.getLastUpdate());
209     assertEquals(1, Bytes.toInt(result.getResult()));
210   }
211 
212   @Test(timeout=30000)
213   public void testMultiStepProcRecovery() throws Exception {
214     // Step 0 - kill
215     Procedure proc = new TestMultiStepProcedure();
216     long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
217     assertFalse(procExecutor.isRunning());
218 
219     // Step 0 exec && Step 1 - kill
220     restart();
221     waitProcedure(procId);
222     ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
223     assertFalse(procExecutor.isRunning());
224 
225     // Step 1 exec && step 2 - kill
226     restart();
227     waitProcedure(procId);
228     ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
229     assertFalse(procExecutor.isRunning());
230 
231     // Step 2 exec
232     restart();
233     waitProcedure(procId);
234     assertTrue(procExecutor.isRunning());
235 
236     // The procedure is completed
237     ProcedureInfo result = procExecutor.getResult(procId);
238     ProcedureTestingUtility.assertProcNotFailed(result);
239   }
240 
241   @Test(timeout=30000)
242   public void testMultiStepRollbackRecovery() throws Exception {
243     // Step 0 - kill
244     Procedure proc = new TestMultiStepProcedure();
245     long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
246     assertFalse(procExecutor.isRunning());
247 
248     // Step 0 exec && Step 1 - kill
249     restart();
250     waitProcedure(procId);
251     ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
252     assertFalse(procExecutor.isRunning());
253 
254     // Step 1 exec && step 2 - kill
255     restart();
256     waitProcedure(procId);
257     ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
258     assertFalse(procExecutor.isRunning());
259 
260     // Step 2 exec - rollback - kill
261     procSleepInterval = 2500;
262     restart();
263     assertTrue(procExecutor.abort(procId));
264     waitProcedure(procId);
265     assertFalse(procExecutor.isRunning());
266 
267     // rollback - kill
268     restart();
269     waitProcedure(procId);
270     ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
271     assertFalse(procExecutor.isRunning());
272 
273     // rollback - complete
274     restart();
275     waitProcedure(procId);
276     ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
277     assertFalse(procExecutor.isRunning());
278 
279     // Restart the executor and get the result
280     restart();
281     waitProcedure(procId);
282 
283     // The procedure is completed
284     ProcedureInfo result = procExecutor.getResult(procId);
285     ProcedureTestingUtility.assertIsAbortException(result);
286   }
287 
288   public static class TestStateMachineProcedure
289       extends StateMachineProcedure<Void, TestStateMachineProcedure.State> {
290     enum State { STATE_1, STATE_2, STATE_3, DONE }
291 
292     public TestStateMachineProcedure() {}
293 
294     private AtomicBoolean aborted = new AtomicBoolean(false);
295     private int iResult = 0;
296 
297     @Override
298     protected StateMachineProcedure.Flow executeFromState(Void env, State state) {
299       switch (state) {
300         case STATE_1:
301           LOG.info("execute step 1 " + this);
302           setNextState(State.STATE_2);
303           iResult += 3;
304           break;
305         case STATE_2:
306           LOG.info("execute step 2 " + this);
307           setNextState(State.STATE_3);
308           iResult += 5;
309           break;
310         case STATE_3:
311           LOG.info("execute step 3 " + this);
312           Threads.sleepWithoutInterrupt(procSleepInterval);
313           if (aborted.get()) {
314             LOG.info("aborted step 3 " + this);
315             setAbortFailure("test", "aborted");
316             break;
317           }
318           setNextState(State.DONE);
319           iResult += 7;
320           setResult(Bytes.toBytes(iResult));
321           return Flow.NO_MORE_STATE;
322         default:
323           throw new UnsupportedOperationException();
324       }
325       return Flow.HAS_MORE_STATE;
326     }
327 
328     @Override
329     protected void rollbackState(Void env, final State state) {
330       switch (state) {
331         case STATE_1:
332           LOG.info("rollback step 1 " + this);
333           break;
334         case STATE_2:
335           LOG.info("rollback step 2 " + this);
336           break;
337         case STATE_3:
338           LOG.info("rollback step 3 " + this);
339           break;
340         default:
341           throw new UnsupportedOperationException();
342       }
343     }
344 
345     @Override
346     protected State getState(final int stateId) {
347       return State.values()[stateId];
348     }
349 
350     @Override
351     protected int getStateId(final State state) {
352       return state.ordinal();
353     }
354 
355     @Override
356     protected State getInitialState() {
357       return State.STATE_1;
358     }
359 
360     @Override
361     protected boolean abort(Void env) {
362       aborted.set(true);
363       return true;
364     }
365 
366     @Override
367     protected void serializeStateData(final OutputStream stream) throws IOException {
368       super.serializeStateData(stream);
369       stream.write(Bytes.toBytes(iResult));
370     }
371 
372     @Override
373     protected void deserializeStateData(final InputStream stream) throws IOException {
374       super.deserializeStateData(stream);
375       byte[] data = new byte[4];
376       stream.read(data);
377       iResult = Bytes.toInt(data);
378     }
379   }
380 
381   @Test(timeout=30000)
382   public void testStateMachineRecovery() throws Exception {
383     ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true);
384     ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true);
385 
386     // Step 1 - kill
387     Procedure proc = new TestStateMachineProcedure();
388     long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
389     assertFalse(procExecutor.isRunning());
390 
391     // Step 1 exec && Step 2 - kill
392     restart();
393     waitProcedure(procId);
394     ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
395     assertFalse(procExecutor.isRunning());
396 
397     // Step 2 exec && step 3 - kill
398     restart();
399     waitProcedure(procId);
400     ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
401     assertFalse(procExecutor.isRunning());
402 
403     // Step 3 exec
404     restart();
405     waitProcedure(procId);
406     assertTrue(procExecutor.isRunning());
407 
408     // The procedure is completed
409     ProcedureInfo result = procExecutor.getResult(procId);
410     ProcedureTestingUtility.assertProcNotFailed(result);
411     assertEquals(15, Bytes.toInt(result.getResult()));
412   }
413 
414   @Test(timeout=30000)
415   public void testStateMachineRollbackRecovery() throws Exception {
416     ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true);
417     ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true);
418 
419     // Step 1 - kill
420     Procedure proc = new TestStateMachineProcedure();
421     long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
422     ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
423     assertFalse(procExecutor.isRunning());
424 
425     // Step 1 exec && Step 2 - kill
426     restart();
427     waitProcedure(procId);
428     ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
429     assertFalse(procExecutor.isRunning());
430 
431     // Step 2 exec && step 3 - kill
432     restart();
433     waitProcedure(procId);
434     ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
435     assertFalse(procExecutor.isRunning());
436 
437     // Step 3 exec - rollback step 3 - kill
438     procSleepInterval = 2500;
439     restart();
440     assertTrue(procExecutor.abort(procId));
441     waitProcedure(procId);
442     ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
443     assertFalse(procExecutor.isRunning());
444 
445     // Rollback step 3 - rollback step 2 - kill
446     restart();
447     waitProcedure(procId);
448     assertFalse(procExecutor.isRunning());
449     ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
450 
451     // Rollback step 2 - step 1 - kill
452     restart();
453     waitProcedure(procId);
454     assertFalse(procExecutor.isRunning());
455     ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
456 
457     // Rollback step 1 - complete
458     restart();
459     waitProcedure(procId);
460     assertTrue(procExecutor.isRunning());
461 
462     // The procedure is completed
463     ProcedureInfo result = procExecutor.getResult(procId);
464     ProcedureTestingUtility.assertIsAbortException(result);
465   }
466 
467   private void waitProcedure(final long procId) {
468     ProcedureTestingUtility.waitProcedure(procExecutor, procId);
469     dumpLogDirState();
470   }
471 
472   private void dumpLogDirState() {
473     try {
474       FileStatus[] files = fs.listStatus(logDir);
475       if (files != null && files.length > 0) {
476         for (FileStatus file: files) {
477           assertTrue(file.toString(), file.isFile());
478           LOG.debug("log file " + file.getPath() + " size=" + file.getLen());
479         }
480       } else {
481         LOG.debug("no files under: " + logDir);
482       }
483     } catch (IOException e) {
484       LOG.warn("Unable to dump " + logDir, e);
485     }
486   }
487 }