1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.fs.FileSystem;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.hbase.ProcedureInfo;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.util.Threads;
33 import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
34 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
35 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
36 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
37 import org.apache.hadoop.hbase.protobuf.generated.ErrorHandlingProtos.ForeignExceptionMessage;
38 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
39 import org.apache.hadoop.hbase.util.NonceKey;
40 import static org.junit.Assert.assertEquals;
41 import static org.junit.Assert.assertFalse;
42 import static org.junit.Assert.assertTrue;
43
44 public class ProcedureTestingUtility {
45 private static final Log LOG = LogFactory.getLog(ProcedureTestingUtility.class);
46
47 private ProcedureTestingUtility() {
48 }
49
50 public static ProcedureStore createStore(final Configuration conf, final FileSystem fs,
51 final Path baseDir) throws IOException {
52 return createWalStore(conf, fs, baseDir);
53 }
54
55 public static WALProcedureStore createWalStore(final Configuration conf, final FileSystem fs,
56 final Path walDir) throws IOException {
57 return new WALProcedureStore(conf, fs, walDir, new WALProcedureStore.LeaseRecovery() {
58 @Override
59 public void recoverFileLease(FileSystem fs, Path path) throws IOException {
60
61 }
62 });
63 }
64
65 public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor)
66 throws Exception {
67 restart(procExecutor, null);
68 }
69
70 public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
71 Runnable beforeStartAction) throws Exception {
72 ProcedureStore procStore = procExecutor.getStore();
73 int storeThreads = procExecutor.getNumThreads();
74 int execThreads = procExecutor.getNumThreads();
75
76 procExecutor.stop();
77 procExecutor.join();
78 procStore.stop(false);
79
80 if (beforeStartAction != null) {
81 beforeStartAction.run();
82 }
83
84 procStore.start(storeThreads);
85 procExecutor.start(execThreads);
86 }
87
88 public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
89 boolean value) {
90 if (procExecutor.testing == null) {
91 procExecutor.testing = new ProcedureExecutor.Testing();
92 }
93 procExecutor.testing.killBeforeStoreUpdate = value;
94 LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate);
95 }
96
97 public static <TEnv> void setToggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
98 boolean value) {
99 if (procExecutor.testing == null) {
100 procExecutor.testing = new ProcedureExecutor.Testing();
101 }
102 procExecutor.testing.toggleKillBeforeStoreUpdate = value;
103 }
104
105 public static <TEnv> void toggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor) {
106 if (procExecutor.testing == null) {
107 procExecutor.testing = new ProcedureExecutor.Testing();
108 }
109 procExecutor.testing.killBeforeStoreUpdate = !procExecutor.testing.killBeforeStoreUpdate;
110 LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate);
111 }
112
113 public static <TEnv> void setKillAndToggleBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
114 boolean value) {
115 ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, value);
116 ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, value);
117 }
118
119 public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc) {
120 return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
121 }
122
123 public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc,
124 final long nonceGroup, final long nonce) {
125 long procId = submitProcedure(procExecutor, proc, nonceGroup, nonce);
126 waitProcedure(procExecutor, procId);
127 return procId;
128 }
129
130 public static <TEnv> long submitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc,
131 final long nonceGroup, final long nonce) {
132 final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce);
133 long procId = procExecutor.registerNonce(nonceKey);
134 assertFalse(procId >= 0);
135 return procExecutor.submitProcedure(proc, nonceKey);
136 }
137
138 public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, long procId) {
139 while (!procExecutor.isFinished(procId) && procExecutor.isRunning()) {
140 Threads.sleepWithoutInterrupt(250);
141 }
142 }
143
144 public static <TEnv> void waitNoProcedureRunning(ProcedureExecutor<TEnv> procExecutor) {
145 int stableRuns = 0;
146 while (stableRuns < 10) {
147 if (procExecutor.getActiveExecutorCount() > 0 || procExecutor.getRunnableSet().size() > 0) {
148 stableRuns = 0;
149 Threads.sleepWithoutInterrupt(100);
150 } else {
151 stableRuns++;
152 Threads.sleepWithoutInterrupt(25);
153 }
154 }
155 }
156
157 public static <TEnv> void assertProcNotYetCompleted(ProcedureExecutor<TEnv> procExecutor,
158 long procId) {
159 assertFalse("expected a running proc", procExecutor.isFinished(procId));
160 assertEquals(null, procExecutor.getResult(procId));
161 }
162
163 public static <TEnv> void assertProcNotFailed(ProcedureExecutor<TEnv> procExecutor,
164 long procId) {
165 ProcedureInfo result = procExecutor.getResult(procId);
166 assertTrue("expected procedure result", result != null);
167 assertProcNotFailed(result);
168 }
169
170 public static void assertProcNotFailed(final ProcedureInfo result) {
171 ForeignExceptionMessage exception = result.getForeignExceptionMessage();
172 String msg = exception != null ? result.getExceptionFullMessage() : "no exception found";
173 assertFalse(msg, result.isFailed());
174 }
175
176 public static Throwable assertProcFailed(final ProcedureInfo result) {
177 assertEquals(true, result.isFailed());
178 LOG.info("procId=" + result.getProcId() + " exception: " + result.getException().getMessage());
179 return getExceptionCause(result);
180 }
181
182 public static void assertIsAbortException(final ProcedureInfo result) {
183 assertEquals(true, result.isFailed());
184 LOG.info(result.getExceptionFullMessage());
185 Throwable cause = getExceptionCause(result);
186 assertTrue("expected abort exception, got " + cause,
187 cause instanceof ProcedureAbortedException);
188 }
189
190 public static void assertIsTimeoutException(final ProcedureInfo result) {
191 assertEquals(true, result.isFailed());
192 LOG.info(result.getExceptionFullMessage());
193 Throwable cause = getExceptionCause(result);
194 assertTrue("expected TimeoutIOException, got " + cause, cause instanceof TimeoutIOException);
195 }
196
197 public static void assertIsIllegalArgumentException(final ProcedureInfo result) {
198 assertEquals(true, result.isFailed());
199 LOG.info(result.getExceptionFullMessage());
200 Throwable cause = ProcedureTestingUtility.getExceptionCause(result);
201 assertTrue("expected IllegalArgumentIOException, got " + cause,
202 cause instanceof IllegalArgumentIOException);
203 }
204
205 public static Throwable getExceptionCause(final ProcedureInfo procInfo) {
206 assert procInfo.getForeignExceptionMessage() != null;
207 return RemoteProcedureException.fromProto(procInfo.getForeignExceptionMessage()).getCause();
208 }
209
210 public static class TestProcedure extends Procedure<Void> {
211 public TestProcedure() {}
212
213 public TestProcedure(long procId) {
214 this(procId, 0);
215 }
216
217 public TestProcedure(long procId, long parentId) {
218 setProcId(procId);
219 if (parentId > 0) {
220 setParentProcId(parentId);
221 }
222 }
223
224 public void addStackId(final int index) {
225 addStackIndex(index);
226 }
227
228 @Override
229 protected Procedure[] execute(Void env) { return null; }
230
231 @Override
232 protected void rollback(Void env) { }
233
234 @Override
235 protected boolean abort(Void env) { return false; }
236
237 @Override
238 protected void serializeStateData(final OutputStream stream) throws IOException { }
239
240 @Override
241 protected void deserializeStateData(final InputStream stream) throws IOException { }
242 }
243 }