1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.procedure;
19
20 import static org.mockito.Matchers.any;
21 import static org.mockito.Matchers.anyString;
22 import static org.mockito.Matchers.eq;
23 import static org.mockito.Mockito.doAnswer;
24 import static org.mockito.Mockito.doThrow;
25 import static org.mockito.Mockito.inOrder;
26 import static org.mockito.Mockito.mock;
27 import static org.mockito.Mockito.never;
28 import static org.mockito.Mockito.reset;
29 import static org.mockito.Mockito.spy;
30 import static org.mockito.Mockito.verify;
31 import static org.mockito.Mockito.verifyZeroInteractions;
32 import static org.mockito.Mockito.when;
33
34 import java.io.IOException;
35 import java.util.concurrent.ThreadPoolExecutor;
36
37 import org.apache.hadoop.hbase.testclassification.SmallTests;
38 import org.apache.hadoop.hbase.errorhandling.ForeignException;
39 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
40 import org.apache.hadoop.hbase.errorhandling.TimeoutException;
41 import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl;
42 import org.junit.After;
43 import org.junit.Test;
44 import org.junit.experimental.categories.Category;
45 import org.mockito.InOrder;
46 import org.mockito.Mockito;
47 import org.mockito.invocation.InvocationOnMock;
48 import org.mockito.stubbing.Answer;
49
50
51
52
53 @Category(SmallTests.class)
54 public class TestProcedureMember {
55 private static final long WAKE_FREQUENCY = 100;
56 private static final long TIMEOUT = 100000;
57 private static final long POOL_KEEP_ALIVE = 1;
58
59 private final String op = "some op";
60 private final byte[] data = new byte[0];
61 private final ForeignExceptionDispatcher mockListener = Mockito
62 .spy(new ForeignExceptionDispatcher());
63 private final SubprocedureFactory mockBuilder = mock(SubprocedureFactory.class);
64 private final ProcedureMemberRpcs mockMemberComms = Mockito
65 .mock(ProcedureMemberRpcs.class);
66 private ProcedureMember member;
67 private ForeignExceptionDispatcher dispatcher;
68 Subprocedure spySub;
69
70
71
72
73 @After
74 public void resetTest() {
75 reset(mockListener, mockBuilder, mockMemberComms);
76 if (member != null)
77 try {
78 member.close();
79 } catch (IOException e) {
80 e.printStackTrace();
81 }
82 }
83
84
85
86
87
88 private ProcedureMember buildCohortMember() {
89 String name = "node";
90 ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE);
91 return new ProcedureMember(mockMemberComms, pool, mockBuilder);
92 }
93
94
95
96
97 private void buildCohortMemberPair() throws IOException {
98 dispatcher = new ForeignExceptionDispatcher();
99 String name = "node";
100 ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE);
101 member = new ProcedureMember(mockMemberComms, pool, mockBuilder);
102 when(mockMemberComms.getMemberName()).thenReturn("membername");
103 Subprocedure subproc = new EmptySubprocedure(member, dispatcher);
104 spySub = spy(subproc);
105 when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spySub);
106 addCommitAnswer();
107 }
108
109
110
111
112
113 private void addCommitAnswer() throws IOException {
114 doAnswer(new Answer<Void>() {
115 @Override
116 public Void answer(InvocationOnMock invocation) throws Throwable {
117 member.receivedReachedGlobalBarrier(op);
118 return null;
119 }
120 }).when(mockMemberComms).sendMemberAcquired(any(Subprocedure.class));
121 }
122
123
124
125
126 @Test(timeout = 500)
127 public void testSimpleRun() throws Exception {
128 member = buildCohortMember();
129 EmptySubprocedure subproc = new EmptySubprocedure(member, mockListener);
130 EmptySubprocedure spy = spy(subproc);
131 when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy);
132
133
134 addCommitAnswer();
135
136
137
138 Subprocedure subproc1 = member.createSubprocedure(op, data);
139 member.submitSubprocedure(subproc1);
140
141 subproc.waitForLocallyCompleted();
142
143
144 InOrder order = inOrder(mockMemberComms, spy);
145 order.verify(spy).acquireBarrier();
146 order.verify(mockMemberComms).sendMemberAcquired(eq(spy));
147 order.verify(spy).insideBarrier();
148 order.verify(mockMemberComms).sendMemberCompleted(eq(spy), eq(data));
149 order.verify(mockMemberComms, never()).sendMemberAborted(eq(spy),
150 any(ForeignException.class));
151 }
152
153
154
155
156
157 @Test(timeout = 60000)
158 public void testMemberPrepareException() throws Exception {
159 buildCohortMemberPair();
160
161
162 doAnswer(
163 new Answer<Void>() {
164 @Override
165 public Void answer(InvocationOnMock invocation) throws Throwable {
166 throw new IOException("Forced IOException in member acquireBarrier");
167 }
168 }).when(spySub).acquireBarrier();
169
170
171
172 Subprocedure subproc = member.createSubprocedure(op, data);
173 member.submitSubprocedure(subproc);
174
175 member.closeAndWait(TIMEOUT);
176
177
178 InOrder order = inOrder(mockMemberComms, spySub);
179 order.verify(spySub).acquireBarrier();
180
181 order.verify(mockMemberComms, never()).sendMemberAcquired(eq(spySub));
182 order.verify(spySub, never()).insideBarrier();
183 order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data));
184
185 order.verify(spySub).cancel(anyString(), any(Exception.class));
186 order.verify(spySub).cleanup(any(Exception.class));
187 }
188
189
190
191
192 @Test(timeout = 60000)
193 public void testSendMemberAcquiredCommsFailure() throws Exception {
194 buildCohortMemberPair();
195
196
197 doAnswer(
198 new Answer<Void>() {
199 @Override
200 public Void answer(InvocationOnMock invocation) throws Throwable {
201 throw new IOException("Forced IOException in memeber prepare");
202 }
203 }).when(mockMemberComms).sendMemberAcquired(any(Subprocedure.class));
204
205
206
207 Subprocedure subproc = member.createSubprocedure(op, data);
208 member.submitSubprocedure(subproc);
209
210 member.closeAndWait(TIMEOUT);
211
212
213 InOrder order = inOrder(mockMemberComms, spySub);
214 order.verify(spySub).acquireBarrier();
215 order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
216
217
218 order.verify(spySub, never()).insideBarrier();
219 order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data));
220
221 order.verify(spySub).cancel(anyString(), any(Exception.class));
222 order.verify(spySub).cleanup(any(Exception.class));
223 }
224
225
226
227
228
229
230
231 @Test(timeout = 60000)
232 public void testCoordinatorAbort() throws Exception {
233 buildCohortMemberPair();
234
235
236 final TimeoutException oate = new TimeoutException("bogus timeout", 1,2,0);
237 doAnswer(
238 new Answer<Void>() {
239 @Override
240 public Void answer(InvocationOnMock invocation) throws Throwable {
241
242 spySub.cancel("bogus message", oate);
243
244 Thread.sleep(WAKE_FREQUENCY);
245 return null;
246 }
247 }).when(spySub).waitForReachedGlobalBarrier();
248
249
250
251 Subprocedure subproc = member.createSubprocedure(op, data);
252 member.submitSubprocedure(subproc);
253
254 member.closeAndWait(TIMEOUT);
255
256
257 InOrder order = inOrder(mockMemberComms, spySub);
258 order.verify(spySub).acquireBarrier();
259 order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
260
261 order.verify(spySub, never()).insideBarrier();
262 order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data));
263
264 order.verify(spySub).cancel(anyString(), any(Exception.class));
265 order.verify(spySub).cleanup(any(Exception.class));
266 }
267
268
269
270
271
272
273
274
275
276 @Test(timeout = 60000)
277 public void testMemberCommitException() throws Exception {
278 buildCohortMemberPair();
279
280
281 doAnswer(
282 new Answer<Void>() {
283 @Override
284 public Void answer(InvocationOnMock invocation) throws Throwable {
285 throw new IOException("Forced IOException in memeber prepare");
286 }
287 }).when(spySub).insideBarrier();
288
289
290
291 Subprocedure subproc = member.createSubprocedure(op, data);
292 member.submitSubprocedure(subproc);
293
294 member.closeAndWait(TIMEOUT);
295
296
297 InOrder order = inOrder(mockMemberComms, spySub);
298 order.verify(spySub).acquireBarrier();
299 order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
300 order.verify(spySub).insideBarrier();
301
302
303 order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data));
304
305 order.verify(spySub).cancel(anyString(), any(Exception.class));
306 order.verify(spySub).cleanup(any(Exception.class));
307 }
308
309
310
311
312
313
314
315
316
317 @Test(timeout = 60000)
318 public void testMemberCommitCommsFailure() throws Exception {
319 buildCohortMemberPair();
320 final TimeoutException oate = new TimeoutException("bogus timeout",1,2,0);
321 doAnswer(
322 new Answer<Void>() {
323 @Override
324 public Void answer(InvocationOnMock invocation) throws Throwable {
325
326 spySub.cancel("commit comms fail", oate);
327
328 Thread.sleep(WAKE_FREQUENCY);
329 return null;
330 }
331 }).when(mockMemberComms).sendMemberCompleted(any(Subprocedure.class), eq(data));
332
333
334
335 Subprocedure subproc = member.createSubprocedure(op, data);
336 member.submitSubprocedure(subproc);
337
338 member.closeAndWait(TIMEOUT);
339
340
341 InOrder order = inOrder(mockMemberComms, spySub);
342 order.verify(spySub).acquireBarrier();
343 order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
344 order.verify(spySub).insideBarrier();
345 order.verify(mockMemberComms).sendMemberCompleted(eq(spySub), eq(data));
346
347 order.verify(spySub).cancel(anyString(), any(Exception.class));
348 order.verify(spySub).cleanup(any(Exception.class));
349 }
350
351
352
353
354
355 @Test(timeout = 60000)
356 public void testPropagateConnectionErrorBackToManager() throws Exception {
357
358 member = buildCohortMember();
359 ProcedureMember memberSpy = spy(member);
360
361
362 final ForeignExceptionDispatcher dispatcher = new ForeignExceptionDispatcher();
363 ForeignExceptionDispatcher dispSpy = spy(dispatcher);
364 Subprocedure commit = new EmptySubprocedure(member, dispatcher);
365 Subprocedure spy = spy(commit);
366 when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy);
367
368
369 doThrow(new ForeignException("SRC", "prepare exception")).when(spy).acquireBarrier();
370
371 doThrow(new IOException("Controller is down!")).when(mockMemberComms)
372 .sendMemberAborted(eq(spy), any(ForeignException.class));
373
374
375
376
377 Subprocedure subproc = memberSpy.createSubprocedure(op, data);
378 memberSpy.submitSubprocedure(subproc);
379
380 memberSpy.closeAndWait(TIMEOUT);
381
382
383 InOrder order = inOrder(mockMemberComms, spy, dispSpy);
384
385 order.verify(spy).acquireBarrier();
386 order.verify(mockMemberComms, never()).sendMemberAcquired(spy);
387
388
389
390
391
392
393
394 }
395
396
397
398
399
400
401 @Test
402 public void testNoTaskToBeRunFromRequest() throws Exception {
403 ThreadPoolExecutor pool = mock(ThreadPoolExecutor.class);
404 when(mockBuilder.buildSubprocedure(op, data)).thenReturn(null)
405 .thenThrow(new IllegalStateException("Wrong state!"), new IllegalArgumentException("can't understand the args"));
406 member = new ProcedureMember(mockMemberComms, pool, mockBuilder);
407
408
409 Subprocedure subproc = member.createSubprocedure(op, data);
410 member.submitSubprocedure(subproc);
411
412 try {
413
414 Subprocedure subproc2 = member.createSubprocedure(op, data);
415 member.submitSubprocedure(subproc2);
416 } catch (IllegalStateException ise) {
417 }
418
419 try {
420
421 Subprocedure subproc3 = member.createSubprocedure(op, data);
422 member.submitSubprocedure(subproc3);
423 } catch (IllegalArgumentException iae) {
424 }
425
426
427 verifyZeroInteractions(pool);
428
429
430
431 }
432
433
434
435
436 public class EmptySubprocedure extends SubprocedureImpl {
437 public EmptySubprocedure(ProcedureMember member, ForeignExceptionDispatcher dispatcher) {
438 super( member, op, dispatcher,
439
440 WAKE_FREQUENCY, TIMEOUT);
441 }
442 }
443 }