1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2;
20
21 import java.io.IOException;
22 import java.util.concurrent.atomic.AtomicReference;
23 import java.util.concurrent.CountDownLatch;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.fs.FileSystem;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
30 import org.apache.hadoop.hbase.ProcedureInfo;
31 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
32 import org.apache.hadoop.hbase.testclassification.SmallTests;
33 import org.apache.hadoop.hbase.security.User;
34 import org.apache.hadoop.hbase.util.Bytes;
35 import org.apache.hadoop.hbase.util.NonceKey;
36 import org.apache.hadoop.hbase.util.Threads;
37
38 import org.junit.After;
39 import org.junit.Before;
40 import org.junit.Test;
41 import org.junit.experimental.categories.Category;
42
43 import static org.junit.Assert.assertEquals;
44 import static org.junit.Assert.assertFalse;
45 import static org.junit.Assert.assertTrue;
46
47 @Category({SmallTests.class})
48 public class TestProcedureNonce {
49 private static final Log LOG = LogFactory.getLog(TestProcedureNonce.class);
50
51 private static final int PROCEDURE_EXECUTOR_SLOTS = 2;
52
53 private static TestProcEnv procEnv;
54 private static ProcedureExecutor<TestProcEnv> procExecutor;
55 private static ProcedureStore procStore;
56
57 private HBaseCommonTestingUtility htu;
58 private FileSystem fs;
59 private Path logDir;
60
61 @Before
62 public void setUp() throws IOException {
63 htu = new HBaseCommonTestingUtility();
64 Path testDir = htu.getDataTestDir();
65 fs = testDir.getFileSystem(htu.getConfiguration());
66 assertTrue(testDir.depth() > 1);
67
68 logDir = new Path(testDir, "proc-logs");
69 procEnv = new TestProcEnv();
70 procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir);
71 procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
72 procExecutor.testing = new ProcedureExecutor.Testing();
73 procStore.start(PROCEDURE_EXECUTOR_SLOTS);
74 procExecutor.start(PROCEDURE_EXECUTOR_SLOTS);
75 }
76
77 @After
78 public void tearDown() throws IOException {
79 procExecutor.stop();
80 procStore.stop(false);
81 fs.delete(logDir, true);
82 }
83
84 @Test(timeout=30000)
85 public void testCompletedProcWithSameNonce() throws Exception {
86 final long nonceGroup = 123;
87 final long nonce = 2222;
88
89
90 final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce);
91 assertFalse(procExecutor.registerNonce(nonceKey) >= 0);
92
93
94 Procedure proc = new TestSingleStepProcedure();
95 long procId = procExecutor.submitProcedure(proc, nonceKey);
96 ProcedureTestingUtility.waitProcedure(procExecutor, procId);
97
98
99 ProcedureTestingUtility.restart(procExecutor);
100 ProcedureTestingUtility.waitProcedure(procExecutor, procId);
101
102
103
104 assertEquals(procId, procExecutor.registerNonce(nonceKey));
105
106 ProcedureInfo result = procExecutor.getResult(procId);
107 ProcedureTestingUtility.assertProcNotFailed(result);
108 }
109
110 @Test(timeout=30000)
111 public void testRunningProcWithSameNonce() throws Exception {
112 final long nonceGroup = 456;
113 final long nonce = 33333;
114
115
116 final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce);
117 assertFalse(procExecutor.registerNonce(nonceKey) >= 0);
118
119
120 CountDownLatch latch = new CountDownLatch(1);
121 TestSingleStepProcedure proc = new TestSingleStepProcedure();
122 procEnv.setWaitLatch(latch);
123 long procId = procExecutor.submitProcedure(proc, nonceKey);
124 while (proc.step != 1) Threads.sleep(25);
125
126
127
128 assertEquals(procId, procExecutor.registerNonce(nonceKey));
129
130
131 latch.countDown();
132
133
134 ProcedureTestingUtility.restart(procExecutor);
135 ProcedureTestingUtility.waitProcedure(procExecutor, procId);
136
137
138
139 assertEquals(procId, procExecutor.registerNonce(nonceKey));
140
141 ProcedureInfo result = procExecutor.getResult(procId);
142 ProcedureTestingUtility.assertProcNotFailed(result);
143 }
144
145 @Test
146 public void testSetFailureResultForNonce() throws IOException {
147 final long nonceGroup = 234;
148 final long nonce = 55555;
149
150
151 final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce);
152 assertFalse(procExecutor.registerNonce(nonceKey) >= 0);
153
154 procExecutor.setFailureResultForNonce(nonceKey, "testProc", User.getCurrent(),
155 new IOException("test failure"));
156
157 final long procId = procExecutor.registerNonce(nonceKey);
158 ProcedureInfo result = procExecutor.getResult(procId);
159 ProcedureTestingUtility.assertProcFailed(result);
160 }
161
162 @Test(timeout=30000)
163 public void testConcurrentNonceRegistration() throws IOException {
164 testConcurrentNonceRegistration(true, 567, 44444);
165 }
166
167 @Test(timeout=30000)
168 public void testConcurrentNonceRegistrationWithRollback() throws IOException {
169 testConcurrentNonceRegistration(false, 890, 55555);
170 }
171
172 private void testConcurrentNonceRegistration(final boolean submitProcedure,
173 final long nonceGroup, final long nonce) throws IOException {
174
175 final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce);
176
177 final AtomicReference<Throwable> t1Exception = new AtomicReference();
178 final AtomicReference<Throwable> t2Exception = new AtomicReference();
179
180 final CountDownLatch t1NonceRegisteredLatch = new CountDownLatch(1);
181 final CountDownLatch t2BeforeNonceRegisteredLatch = new CountDownLatch(1);
182 final Thread[] threads = new Thread[2];
183 threads[0] = new Thread() {
184 @Override
185 public void run() {
186 try {
187
188 assertFalse("unexpected already registered nonce",
189 procExecutor.registerNonce(nonceKey) >= 0);
190 t1NonceRegisteredLatch.countDown();
191
192
193 t2BeforeNonceRegisteredLatch.await();
194 Threads.sleep(1000);
195
196 if (submitProcedure) {
197 CountDownLatch latch = new CountDownLatch(1);
198 TestSingleStepProcedure proc = new TestSingleStepProcedure();
199 procEnv.setWaitLatch(latch);
200
201 procExecutor.submitProcedure(proc, nonceKey);
202 Threads.sleep(100);
203
204
205 latch.countDown();
206 } else {
207 procExecutor.unregisterNonceIfProcedureWasNotSubmitted(nonceKey);
208 }
209 } catch (Throwable e) {
210 t1Exception.set(e);
211 } finally {
212 t1NonceRegisteredLatch.countDown();
213 t2BeforeNonceRegisteredLatch.countDown();
214 }
215 }
216 };
217
218 threads[1] = new Thread() {
219 @Override
220 public void run() {
221 try {
222
223 t1NonceRegisteredLatch.await();
224
225
226 t2BeforeNonceRegisteredLatch.countDown();
227 assertFalse("unexpected non registered nonce",
228 procExecutor.registerNonce(nonceKey) < 0);
229 } catch (Throwable e) {
230 t2Exception.set(e);
231 } finally {
232 t1NonceRegisteredLatch.countDown();
233 t2BeforeNonceRegisteredLatch.countDown();
234 }
235 }
236 };
237
238 for (int i = 0; i < threads.length; ++i) threads[i].start();
239 for (int i = 0; i < threads.length; ++i) Threads.shutdown(threads[i]);
240 ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
241 assertEquals(null, t1Exception.get());
242 assertEquals(null, t2Exception.get());
243 }
244
245 public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> {
246 private int step = 0;
247
248 public TestSingleStepProcedure() { }
249
250 @Override
251 protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
252 step++;
253 env.waitOnLatch();
254 LOG.debug("execute procedure " + this + " step=" + step);
255 step++;
256 setResult(Bytes.toBytes(step));
257 return null;
258 }
259
260 @Override
261 protected void rollback(TestProcEnv env) { }
262
263 @Override
264 protected boolean abort(TestProcEnv env) { return true; }
265 }
266
267 private static class TestProcEnv {
268 private CountDownLatch latch = null;
269
270
271
272
273 public void setWaitLatch(CountDownLatch latch) {
274 this.latch = latch;
275 }
276
277 public void waitOnLatch() throws InterruptedException {
278 if (latch != null) {
279 latch.await();
280 }
281 }
282 }
283 }