1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.util.concurrent.atomic.AtomicBoolean;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.fs.FileSystem;
29 import org.apache.hadoop.fs.FileStatus;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
32 import org.apache.hadoop.hbase.ProcedureInfo;
33 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
34 import org.apache.hadoop.hbase.testclassification.SmallTests;
35 import org.apache.hadoop.hbase.util.Bytes;
36 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
37 import org.apache.hadoop.hbase.util.Threads;
38 import org.junit.After;
39 import org.junit.Before;
40 import org.junit.Test;
41 import org.junit.experimental.categories.Category;
42
43 import static org.junit.Assert.assertEquals;
44 import static org.junit.Assert.assertFalse;
45 import static org.junit.Assert.assertTrue;
46
47 @Category(SmallTests.class)
48 public class TestProcedureRecovery {
49 private static final Log LOG = LogFactory.getLog(TestProcedureRecovery.class);
50
51 private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
52
53 private static ProcedureExecutor<Void> procExecutor;
54 private static ProcedureStore procStore;
55 private static int procSleepInterval;
56
57 private HBaseCommonTestingUtility htu;
58 private FileSystem fs;
59 private Path testDir;
60 private Path logDir;
61
62 @Before
63 public void setUp() throws IOException {
64 htu = new HBaseCommonTestingUtility();
65 testDir = htu.getDataTestDir();
66 fs = testDir.getFileSystem(htu.getConfiguration());
67 assertTrue(testDir.depth() > 1);
68
69 logDir = new Path(testDir, "proc-logs");
70 procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir);
71 procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore);
72 procExecutor.testing = new ProcedureExecutor.Testing();
73 procStore.start(PROCEDURE_EXECUTOR_SLOTS);
74 procExecutor.start(PROCEDURE_EXECUTOR_SLOTS);
75 procSleepInterval = 0;
76
77 ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, false);
78 ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, false);
79 }
80
81 @After
82 public void tearDown() throws IOException {
83 procExecutor.stop();
84 procStore.stop(false);
85 fs.delete(logDir, true);
86 }
87
88 private void restart() throws Exception {
89 dumpLogDirState();
90 ProcedureTestingUtility.restart(procExecutor);
91 dumpLogDirState();
92 }
93
94 public static class TestSingleStepProcedure extends SequentialProcedure<Void> {
95 private int step = 0;
96
97 public TestSingleStepProcedure() { }
98
99 @Override
100 protected Procedure[] execute(Void env) {
101 LOG.debug("execute procedure " + this + " step=" + step);
102 step++;
103 setResult(Bytes.toBytes(step));
104 return null;
105 }
106
107 @Override
108 protected void rollback(Void env) { }
109
110 @Override
111 protected boolean abort(Void env) { return true; }
112 }
113
114 public static class BaseTestStepProcedure extends SequentialProcedure<Void> {
115 private AtomicBoolean abort = new AtomicBoolean(false);
116 private int step = 0;
117
118 @Override
119 protected Procedure[] execute(Void env) {
120 LOG.debug("execute procedure " + this + " step=" + step);
121 ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
122 step++;
123 Threads.sleepWithoutInterrupt(procSleepInterval);
124 if (isAborted()) {
125 setFailure(new RemoteProcedureException(getClass().getName(),
126 new ProcedureAbortedException(
127 "got an abort at " + getClass().getName() + " step=" + step)));
128 return null;
129 }
130 return null;
131 }
132
133 @Override
134 protected void rollback(Void env) {
135 LOG.debug("rollback procedure " + this + " step=" + step);
136 ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
137 step++;
138 }
139
140 @Override
141 protected boolean abort(Void env) {
142 abort.set(true);
143 return true;
144 }
145
146 private boolean isAborted() {
147 boolean aborted = abort.get();
148 BaseTestStepProcedure proc = this;
149 while (proc.hasParent() && !aborted) {
150 proc = (BaseTestStepProcedure)procExecutor.getProcedure(proc.getParentProcId());
151 aborted = proc.isAborted();
152 }
153 return aborted;
154 }
155 }
156
157 public static class TestMultiStepProcedure extends BaseTestStepProcedure {
158 public TestMultiStepProcedure() { }
159
160 @Override
161 public Procedure[] execute(Void env) {
162 super.execute(env);
163 return isFailed() ? null : new Procedure[] { new Step1Procedure() };
164 }
165
166 public static class Step1Procedure extends BaseTestStepProcedure {
167 public Step1Procedure() { }
168
169 @Override
170 protected Procedure[] execute(Void env) {
171 super.execute(env);
172 return isFailed() ? null : new Procedure[] { new Step2Procedure() };
173 }
174 }
175
176 public static class Step2Procedure extends BaseTestStepProcedure {
177 public Step2Procedure() { }
178 }
179 }
180
181 @Test
182 public void testNoopLoad() throws Exception {
183 restart();
184 }
185
186 @Test(timeout=30000)
187 public void testSingleStepProcRecovery() throws Exception {
188 Procedure proc = new TestSingleStepProcedure();
189 procExecutor.testing.killBeforeStoreUpdate = true;
190 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
191 assertFalse(procExecutor.isRunning());
192 procExecutor.testing.killBeforeStoreUpdate = false;
193
194
195 long restartTs = EnvironmentEdgeManager.currentTime();
196 restart();
197 waitProcedure(procId);
198 ProcedureInfo result = procExecutor.getResult(procId);
199 assertTrue(result.getLastUpdate() > restartTs);
200 ProcedureTestingUtility.assertProcNotFailed(result);
201 assertEquals(1, Bytes.toInt(result.getResult()));
202 long resultTs = result.getLastUpdate();
203
204
205 restart();
206 result = procExecutor.getResult(procId);
207 ProcedureTestingUtility.assertProcNotFailed(result);
208 assertEquals(resultTs, result.getLastUpdate());
209 assertEquals(1, Bytes.toInt(result.getResult()));
210 }
211
212 @Test(timeout=30000)
213 public void testMultiStepProcRecovery() throws Exception {
214
215 Procedure proc = new TestMultiStepProcedure();
216 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
217 assertFalse(procExecutor.isRunning());
218
219
220 restart();
221 waitProcedure(procId);
222 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
223 assertFalse(procExecutor.isRunning());
224
225
226 restart();
227 waitProcedure(procId);
228 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
229 assertFalse(procExecutor.isRunning());
230
231
232 restart();
233 waitProcedure(procId);
234 assertTrue(procExecutor.isRunning());
235
236
237 ProcedureInfo result = procExecutor.getResult(procId);
238 ProcedureTestingUtility.assertProcNotFailed(result);
239 }
240
241 @Test(timeout=30000)
242 public void testMultiStepRollbackRecovery() throws Exception {
243
244 Procedure proc = new TestMultiStepProcedure();
245 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
246 assertFalse(procExecutor.isRunning());
247
248
249 restart();
250 waitProcedure(procId);
251 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
252 assertFalse(procExecutor.isRunning());
253
254
255 restart();
256 waitProcedure(procId);
257 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
258 assertFalse(procExecutor.isRunning());
259
260
261 procSleepInterval = 2500;
262 restart();
263 assertTrue(procExecutor.abort(procId));
264 waitProcedure(procId);
265 assertFalse(procExecutor.isRunning());
266
267
268 restart();
269 waitProcedure(procId);
270 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
271 assertFalse(procExecutor.isRunning());
272
273
274 restart();
275 waitProcedure(procId);
276 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
277 assertFalse(procExecutor.isRunning());
278
279
280 restart();
281 waitProcedure(procId);
282
283
284 ProcedureInfo result = procExecutor.getResult(procId);
285 ProcedureTestingUtility.assertIsAbortException(result);
286 }
287
288 public static class TestStateMachineProcedure
289 extends StateMachineProcedure<Void, TestStateMachineProcedure.State> {
290 enum State { STATE_1, STATE_2, STATE_3, DONE }
291
292 public TestStateMachineProcedure() {}
293
294 private AtomicBoolean aborted = new AtomicBoolean(false);
295 private int iResult = 0;
296
297 @Override
298 protected StateMachineProcedure.Flow executeFromState(Void env, State state) {
299 switch (state) {
300 case STATE_1:
301 LOG.info("execute step 1 " + this);
302 setNextState(State.STATE_2);
303 iResult += 3;
304 break;
305 case STATE_2:
306 LOG.info("execute step 2 " + this);
307 setNextState(State.STATE_3);
308 iResult += 5;
309 break;
310 case STATE_3:
311 LOG.info("execute step 3 " + this);
312 Threads.sleepWithoutInterrupt(procSleepInterval);
313 if (aborted.get()) {
314 LOG.info("aborted step 3 " + this);
315 setAbortFailure("test", "aborted");
316 break;
317 }
318 setNextState(State.DONE);
319 iResult += 7;
320 setResult(Bytes.toBytes(iResult));
321 return Flow.NO_MORE_STATE;
322 default:
323 throw new UnsupportedOperationException();
324 }
325 return Flow.HAS_MORE_STATE;
326 }
327
328 @Override
329 protected void rollbackState(Void env, final State state) {
330 switch (state) {
331 case STATE_1:
332 LOG.info("rollback step 1 " + this);
333 break;
334 case STATE_2:
335 LOG.info("rollback step 2 " + this);
336 break;
337 case STATE_3:
338 LOG.info("rollback step 3 " + this);
339 break;
340 default:
341 throw new UnsupportedOperationException();
342 }
343 }
344
345 @Override
346 protected State getState(final int stateId) {
347 return State.values()[stateId];
348 }
349
350 @Override
351 protected int getStateId(final State state) {
352 return state.ordinal();
353 }
354
355 @Override
356 protected State getInitialState() {
357 return State.STATE_1;
358 }
359
360 @Override
361 protected boolean abort(Void env) {
362 aborted.set(true);
363 return true;
364 }
365
366 @Override
367 protected void serializeStateData(final OutputStream stream) throws IOException {
368 super.serializeStateData(stream);
369 stream.write(Bytes.toBytes(iResult));
370 }
371
372 @Override
373 protected void deserializeStateData(final InputStream stream) throws IOException {
374 super.deserializeStateData(stream);
375 byte[] data = new byte[4];
376 stream.read(data);
377 iResult = Bytes.toInt(data);
378 }
379 }
380
381 @Test(timeout=30000)
382 public void testStateMachineRecovery() throws Exception {
383 ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true);
384 ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true);
385
386
387 Procedure proc = new TestStateMachineProcedure();
388 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
389 assertFalse(procExecutor.isRunning());
390
391
392 restart();
393 waitProcedure(procId);
394 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
395 assertFalse(procExecutor.isRunning());
396
397
398 restart();
399 waitProcedure(procId);
400 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
401 assertFalse(procExecutor.isRunning());
402
403
404 restart();
405 waitProcedure(procId);
406 assertTrue(procExecutor.isRunning());
407
408
409 ProcedureInfo result = procExecutor.getResult(procId);
410 ProcedureTestingUtility.assertProcNotFailed(result);
411 assertEquals(15, Bytes.toInt(result.getResult()));
412 }
413
414 @Test(timeout=30000)
415 public void testStateMachineRollbackRecovery() throws Exception {
416 ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true);
417 ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true);
418
419
420 Procedure proc = new TestStateMachineProcedure();
421 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
422 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
423 assertFalse(procExecutor.isRunning());
424
425
426 restart();
427 waitProcedure(procId);
428 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
429 assertFalse(procExecutor.isRunning());
430
431
432 restart();
433 waitProcedure(procId);
434 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
435 assertFalse(procExecutor.isRunning());
436
437
438 procSleepInterval = 2500;
439 restart();
440 assertTrue(procExecutor.abort(procId));
441 waitProcedure(procId);
442 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
443 assertFalse(procExecutor.isRunning());
444
445
446 restart();
447 waitProcedure(procId);
448 assertFalse(procExecutor.isRunning());
449 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
450
451
452 restart();
453 waitProcedure(procId);
454 assertFalse(procExecutor.isRunning());
455 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
456
457
458 restart();
459 waitProcedure(procId);
460 assertTrue(procExecutor.isRunning());
461
462
463 ProcedureInfo result = procExecutor.getResult(procId);
464 ProcedureTestingUtility.assertIsAbortException(result);
465 }
466
467 private void waitProcedure(final long procId) {
468 ProcedureTestingUtility.waitProcedure(procExecutor, procId);
469 dumpLogDirState();
470 }
471
472 private void dumpLogDirState() {
473 try {
474 FileStatus[] files = fs.listStatus(logDir);
475 if (files != null && files.length > 0) {
476 for (FileStatus file: files) {
477 assertTrue(file.toString(), file.isFile());
478 LOG.debug("log file " + file.getPath() + " size=" + file.getLen());
479 }
480 } else {
481 LOG.debug("no files under: " + logDir);
482 }
483 } catch (IOException e) {
484 LOG.warn("Unable to dump " + logDir, e);
485 }
486 }
487 }