1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.List;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.fs.FileSystem;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
30 import org.apache.hadoop.hbase.ProcedureInfo;
31 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
32 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
33 import org.apache.hadoop.hbase.testclassification.SmallTests;
34 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
35
36 import org.junit.After;
37 import org.junit.Before;
38 import org.junit.Test;
39 import org.junit.experimental.categories.Category;
40
41 import static org.junit.Assert.assertEquals;
42 import static org.junit.Assert.assertTrue;
43
44 @Category(SmallTests.class)
45 public class TestProcedureExecution {
46 private static final Log LOG = LogFactory.getLog(TestProcedureExecution.class);
47
48 private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
49 private static final Procedure NULL_PROC = null;
50
51 private ProcedureExecutor<Void> procExecutor;
52 private ProcedureStore procStore;
53
54 private HBaseCommonTestingUtility htu;
55 private FileSystem fs;
56 private Path testDir;
57 private Path logDir;
58
59 @Before
60 public void setUp() throws IOException {
61 htu = new HBaseCommonTestingUtility();
62 testDir = htu.getDataTestDir();
63 fs = testDir.getFileSystem(htu.getConfiguration());
64 assertTrue(testDir.depth() > 1);
65
66 logDir = new Path(testDir, "proc-logs");
67 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
68 procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore);
69 procStore.start(PROCEDURE_EXECUTOR_SLOTS);
70 procExecutor.start(PROCEDURE_EXECUTOR_SLOTS);
71 }
72
73 @After
74 public void tearDown() throws IOException {
75 procExecutor.stop();
76 procStore.stop(false);
77 fs.delete(logDir, true);
78 }
79
80 private static class TestProcedureException extends IOException {
81 public TestProcedureException(String msg) { super(msg); }
82 }
83
84 public static class TestSequentialProcedure extends SequentialProcedure<Void> {
85 private final Procedure[] subProcs;
86 private final List<String> state;
87 private final Exception failure;
88 private final String name;
89
90 public TestSequentialProcedure() {
91 throw new UnsupportedOperationException("recovery should not be triggered here");
92 }
93
94 public TestSequentialProcedure(String name, List<String> state, Procedure... subProcs) {
95 this.state = state;
96 this.subProcs = subProcs;
97 this.name = name;
98 this.failure = null;
99 }
100
101 public TestSequentialProcedure(String name, List<String> state, Exception failure) {
102 this.state = state;
103 this.subProcs = null;
104 this.name = name;
105 this.failure = failure;
106 }
107
108 @Override
109 protected Procedure[] execute(Void env) {
110 state.add(name + "-execute");
111 if (failure != null) {
112 setFailure(new RemoteProcedureException(name + "-failure", failure));
113 return null;
114 }
115 return subProcs;
116 }
117
118 @Override
119 protected void rollback(Void env) {
120 state.add(name + "-rollback");
121 }
122
123 @Override
124 protected boolean abort(Void env) {
125 state.add(name + "-abort");
126 return true;
127 }
128 }
129
130 @Test(timeout=30000)
131 public void testBadSubprocList() {
132 List<String> state = new ArrayList<String>();
133 Procedure subProc2 = new TestSequentialProcedure("subProc2", state);
134 Procedure subProc1 = new TestSequentialProcedure("subProc1", state, subProc2, NULL_PROC);
135 Procedure rootProc = new TestSequentialProcedure("rootProc", state, subProc1);
136 long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc);
137
138
139
140 LOG.info(state);
141 ProcedureInfo result = procExecutor.getResult(rootId);
142 assertTrue(state.toString(), result.isFailed());
143 ProcedureTestingUtility.assertIsIllegalArgumentException(result);
144
145 assertEquals(state.toString(), 4, state.size());
146 assertEquals("rootProc-execute", state.get(0));
147 assertEquals("subProc1-execute", state.get(1));
148 assertEquals("subProc1-rollback", state.get(2));
149 assertEquals("rootProc-rollback", state.get(3));
150 }
151
152 @Test(timeout=30000)
153 public void testSingleSequentialProc() {
154 List<String> state = new ArrayList<String>();
155 Procedure subProc2 = new TestSequentialProcedure("subProc2", state);
156 Procedure subProc1 = new TestSequentialProcedure("subProc1", state, subProc2);
157 Procedure rootProc = new TestSequentialProcedure("rootProc", state, subProc1);
158 long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc);
159
160
161 LOG.info(state);
162 ProcedureInfo result = procExecutor.getResult(rootId);
163 ProcedureTestingUtility.assertProcNotFailed(result);
164 assertEquals(state.toString(), 3, state.size());
165 }
166
167 @Test(timeout=30000)
168 public void testSingleSequentialProcRollback() {
169 List<String> state = new ArrayList<String>();
170 Procedure subProc2 = new TestSequentialProcedure("subProc2", state,
171 new TestProcedureException("fail test"));
172 Procedure subProc1 = new TestSequentialProcedure("subProc1", state, subProc2);
173 Procedure rootProc = new TestSequentialProcedure("rootProc", state, subProc1);
174 long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc);
175
176
177 LOG.info(state);
178 ProcedureInfo result = procExecutor.getResult(rootId);
179 assertTrue(state.toString(), result.isFailed());
180 LOG.info(result.getExceptionFullMessage());
181 Throwable cause = ProcedureTestingUtility.getExceptionCause(result);
182 assertTrue("expected TestProcedureException, got " + cause,
183 cause instanceof TestProcedureException);
184
185 assertEquals(state.toString(), 6, state.size());
186 assertEquals("rootProc-execute", state.get(0));
187 assertEquals("subProc1-execute", state.get(1));
188 assertEquals("subProc2-execute", state.get(2));
189 assertEquals("subProc2-rollback", state.get(3));
190 assertEquals("subProc1-rollback", state.get(4));
191 assertEquals("rootProc-rollback", state.get(5));
192 }
193
194 public static class TestFaultyRollback extends SequentialProcedure<Void> {
195 private int retries = 0;
196
197 public TestFaultyRollback() { }
198
199 @Override
200 protected Procedure[] execute(Void env) {
201 setFailure("faulty-rollback-test", new TestProcedureException("test faulty rollback"));
202 return null;
203 }
204
205 @Override
206 protected void rollback(Void env) throws IOException {
207 if (++retries < 3) {
208 LOG.info("inject rollback failure " + retries);
209 throw new IOException("injected failure number " + retries);
210 }
211 LOG.info("execute non faulty rollback step retries=" + retries);
212 }
213
214 @Override
215 protected boolean abort(Void env) { return false; }
216 }
217
218 @Test(timeout=30000)
219 public void testRollbackRetriableFailure() {
220 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, new TestFaultyRollback());
221
222 ProcedureInfo result = procExecutor.getResult(procId);
223 assertTrue("expected a failure", result.isFailed());
224 LOG.info(result.getExceptionFullMessage());
225 Throwable cause = ProcedureTestingUtility.getExceptionCause(result);
226 assertTrue("expected TestProcedureException, got " + cause,
227 cause instanceof TestProcedureException);
228 }
229
230 public static class TestWaitingProcedure extends SequentialProcedure<Void> {
231 private final List<String> state;
232 private final boolean hasChild;
233 private final String name;
234
235 public TestWaitingProcedure() {
236 throw new UnsupportedOperationException("recovery should not be triggered here");
237 }
238
239 public TestWaitingProcedure(String name, List<String> state, boolean hasChild) {
240 this.hasChild = hasChild;
241 this.state = state;
242 this.name = name;
243 }
244
245 @Override
246 protected Procedure[] execute(Void env) {
247 state.add(name + "-execute");
248 setState(ProcedureState.WAITING_TIMEOUT);
249 return hasChild ? new Procedure[] { new TestWaitChild(name, state) } : null;
250 }
251
252 @Override
253 protected void rollback(Void env) {
254 state.add(name + "-rollback");
255 }
256
257 @Override
258 protected boolean abort(Void env) {
259 state.add(name + "-abort");
260 return true;
261 }
262
263 public static class TestWaitChild extends SequentialProcedure<Void> {
264 private final List<String> state;
265 private final String name;
266
267 public TestWaitChild() {
268 throw new UnsupportedOperationException("recovery should not be triggered here");
269 }
270
271 public TestWaitChild(String name, List<String> state) {
272 this.name = name;
273 this.state = state;
274 }
275
276 @Override
277 protected Procedure[] execute(Void env) {
278 state.add(name + "-child-execute");
279 return null;
280 }
281
282 @Override
283 protected void rollback(Void env) {
284 state.add(name + "-child-rollback");
285 }
286
287 @Override
288 protected boolean abort(Void env) {
289 state.add(name + "-child-abort");
290 return true;
291 }
292 }
293 }
294
295 @Test(timeout=30000)
296 public void testAbortTimeout() {
297 final int PROC_TIMEOUT_MSEC = 2500;
298 List<String> state = new ArrayList<String>();
299 Procedure proc = new TestWaitingProcedure("wproc", state, false);
300 proc.setTimeout(PROC_TIMEOUT_MSEC);
301 long startTime = EnvironmentEdgeManager.currentTime();
302 long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
303 long execTime = EnvironmentEdgeManager.currentTime() - startTime;
304 LOG.info(state);
305 assertTrue("we didn't wait enough execTime=" + execTime, execTime >= PROC_TIMEOUT_MSEC);
306 ProcedureInfo result = procExecutor.getResult(rootId);
307 assertTrue(state.toString(), result.isFailed());
308 ProcedureTestingUtility.assertIsTimeoutException(result);
309 assertEquals(state.toString(), 2, state.size());
310 assertEquals("wproc-execute", state.get(0));
311 assertEquals("wproc-rollback", state.get(1));
312 }
313
314 @Test(timeout=30000)
315 public void testAbortTimeoutWithChildren() {
316 List<String> state = new ArrayList<String>();
317 Procedure proc = new TestWaitingProcedure("wproc", state, true);
318 proc.setTimeout(2500);
319 long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
320 LOG.info(state);
321 ProcedureInfo result = procExecutor.getResult(rootId);
322 assertTrue(state.toString(), result.isFailed());
323 ProcedureTestingUtility.assertIsTimeoutException(result);
324 assertEquals(state.toString(), 4, state.size());
325 assertEquals("wproc-execute", state.get(0));
326 assertEquals("wproc-child-execute", state.get(1));
327 assertEquals("wproc-child-rollback", state.get(2));
328 assertEquals("wproc-rollback", state.get(3));
329 }
330 }