1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master.procedure;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.util.concurrent.atomic.AtomicBoolean;
25 import java.util.concurrent.atomic.AtomicInteger;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.ArrayList;
28 import java.util.HashSet;
29 import java.util.Map;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.hbase.HBaseConfiguration;
35 import org.apache.hadoop.hbase.TableName;
36 import org.apache.hadoop.hbase.master.TableLockManager;
37 import org.apache.hadoop.hbase.procedure2.Procedure;
38 import org.apache.hadoop.hbase.testclassification.SmallTests;
39 import org.junit.After;
40 import org.junit.Assert;
41 import org.junit.Before;
42 import org.junit.Test;
43 import org.junit.experimental.categories.Category;
44
45 import static org.junit.Assert.assertEquals;
46 import static org.junit.Assert.assertFalse;
47 import static org.junit.Assert.assertTrue;
48 import static org.junit.Assert.fail;
49
50 @Category(SmallTests.class)
51 public class TestMasterProcedureQueue {
52 private static final Log LOG = LogFactory.getLog(TestMasterProcedureQueue.class);
53
54 private MasterProcedureQueue queue;
55 private Configuration conf;
56
57 @Before
58 public void setUp() throws IOException {
59 conf = HBaseConfiguration.create();
60 queue = new MasterProcedureQueue(conf, new TableLockManager.NullTableLockManager());
61 }
62
63 @After
64 public void tearDown() throws IOException {
65 assertEquals(0, queue.size());
66 }
67
68 @Test
69 public void testConcurrentCreateDelete() throws Exception {
70 final MasterProcedureQueue procQueue = queue;
71 final TableName table = TableName.valueOf("testtb");
72 final AtomicBoolean running = new AtomicBoolean(true);
73 final AtomicBoolean failure = new AtomicBoolean(false);
74 Thread createThread = new Thread() {
75 @Override
76 public void run() {
77 try {
78 while (running.get() && !failure.get()) {
79 if (procQueue.tryAcquireTableWrite(table, "create")) {
80 procQueue.releaseTableWrite(table);
81 }
82 }
83 } catch (Throwable e) {
84 LOG.error("create failed", e);
85 failure.set(true);
86 }
87 }
88 };
89
90 Thread deleteThread = new Thread() {
91 @Override
92 public void run() {
93 try {
94 while (running.get() && !failure.get()) {
95 if (procQueue.tryAcquireTableWrite(table, "delete")) {
96 procQueue.releaseTableWrite(table);
97 }
98 procQueue.markTableAsDeleted(table);
99 }
100 } catch (Throwable e) {
101 LOG.error("delete failed", e);
102 failure.set(true);
103 }
104 }
105 };
106
107 createThread.start();
108 deleteThread.start();
109 for (int i = 0; i < 100 && running.get() && !failure.get(); ++i) {
110 Thread.sleep(100);
111 }
112 running.set(false);
113 createThread.join();
114 deleteThread.join();
115 assertEquals(false, failure.get());
116 }
117
118
119
120
121 @Test
122 public void testSimpleTableOpsQueues() throws Exception {
123 final int NUM_TABLES = 10;
124 final int NUM_ITEMS = 10;
125
126 int count = 0;
127 for (int i = 1; i <= NUM_TABLES; ++i) {
128 TableName tableName = TableName.valueOf(String.format("test-%04d", i));
129
130 for (int j = 1; j <= NUM_ITEMS; ++j) {
131 queue.addBack(new TestTableProcedure(i * 1000 + j, tableName,
132 TableProcedureInterface.TableOperationType.EDIT));
133 assertEquals(++count, queue.size());
134 }
135 }
136 assertEquals(NUM_TABLES * NUM_ITEMS, queue.size());
137
138 for (int j = 1; j <= NUM_ITEMS; ++j) {
139 for (int i = 1; i <= NUM_TABLES; ++i) {
140 Long procId = queue.poll();
141 assertEquals(--count, queue.size());
142 assertEquals(i * 1000 + j, procId.longValue());
143 }
144 }
145 assertEquals(0, queue.size());
146
147 for (int i = 1; i <= NUM_TABLES; ++i) {
148 TableName tableName = TableName.valueOf(String.format("test-%04d", i));
149
150 assertTrue(queue.markTableAsDeleted(tableName));
151 }
152 }
153
154
155
156
157
158 @Test
159 public void testCreateDeleteTableOperationsWithWriteLock() throws Exception {
160 TableName tableName = TableName.valueOf("testtb");
161
162 queue.addBack(new TestTableProcedure(1, tableName,
163 TableProcedureInterface.TableOperationType.EDIT));
164
165
166 assertFalse(queue.markTableAsDeleted(tableName));
167
168
169 assertEquals(1, queue.poll().longValue());
170
171 assertTrue(queue.tryAcquireTableWrite(tableName, "write"));
172
173 assertEquals(0, queue.size());
174 assertFalse(queue.markTableAsDeleted(tableName));
175
176 queue.releaseTableWrite(tableName);
177
178 assertTrue(queue.markTableAsDeleted(tableName));
179 }
180
181
182
183
184
185 @Test
186 public void testCreateDeleteTableOperationsWithReadLock() throws Exception {
187 final TableName tableName = TableName.valueOf("testtb");
188 final int nitems = 2;
189
190 for (int i = 1; i <= nitems; ++i) {
191 queue.addBack(new TestTableProcedure(i, tableName,
192 TableProcedureInterface.TableOperationType.READ));
193 }
194
195
196 assertFalse(queue.markTableAsDeleted(tableName));
197
198 for (int i = 1; i <= nitems; ++i) {
199
200 assertEquals(i, queue.poll().longValue());
201
202 assertTrue(queue.tryAcquireTableRead(tableName, "read " + i));
203
204 assertFalse(queue.markTableAsDeleted(tableName));
205 }
206
207 for (int i = 1; i <= nitems; ++i) {
208
209 assertFalse(queue.markTableAsDeleted(tableName));
210
211 queue.releaseTableRead(tableName);
212 }
213
214
215 assertEquals(0, queue.size());
216
217 assertTrue(queue.markTableAsDeleted(tableName));
218 }
219
220
221
222
223 @Test
224 public void testVerifyRwLocks() throws Exception {
225 TableName tableName = TableName.valueOf("testtb");
226 queue.addBack(new TestTableProcedure(1, tableName,
227 TableProcedureInterface.TableOperationType.EDIT));
228 queue.addBack(new TestTableProcedure(2, tableName,
229 TableProcedureInterface.TableOperationType.READ));
230 queue.addBack(new TestTableProcedure(3, tableName,
231 TableProcedureInterface.TableOperationType.EDIT));
232 queue.addBack(new TestTableProcedure(4, tableName,
233 TableProcedureInterface.TableOperationType.READ));
234 queue.addBack(new TestTableProcedure(5, tableName,
235 TableProcedureInterface.TableOperationType.READ));
236
237
238 Long procId = queue.poll();
239 assertEquals(1, procId.longValue());
240 assertEquals(true, queue.tryAcquireTableWrite(tableName, "write " + procId));
241
242
243 assertEquals(null, queue.poll());
244
245
246 queue.releaseTableWrite(tableName);
247
248
249 procId = queue.poll();
250 assertEquals(2, procId.longValue());
251 assertEquals(true, queue.tryAcquireTableRead(tableName, "read " + procId));
252
253
254 procId = queue.poll();
255 assertEquals(3, procId.longValue());
256 assertEquals(false, queue.tryAcquireTableWrite(tableName, "write " + procId));
257
258
259 queue.releaseTableRead(tableName);
260 assertEquals(true, queue.tryAcquireTableWrite(tableName, "write " + procId));
261
262
263 assertEquals(null, queue.poll());
264
265
266 queue.releaseTableWrite(tableName);
267
268
269 procId = queue.poll();
270 assertEquals(4, procId.longValue());
271 assertEquals(true, queue.tryAcquireTableRead(tableName, "read " + procId));
272
273
274 procId = queue.poll();
275 assertEquals(5, procId.longValue());
276 assertEquals(true, queue.tryAcquireTableRead(tableName, "read " + procId));
277
278
279 queue.releaseTableRead(tableName);
280 queue.releaseTableRead(tableName);
281
282
283 assertEquals(0, queue.size());
284 assertTrue("queue should be deleted", queue.markTableAsDeleted(tableName));
285 }
286
287
288
289
290
291 @Test(timeout=90000)
292 public void testConcurrentWriteOps() throws Exception {
293 final TestTableProcSet procSet = new TestTableProcSet(queue);
294
295 final int NUM_ITEMS = 10;
296 final int NUM_TABLES = 4;
297 final AtomicInteger opsCount = new AtomicInteger(0);
298 for (int i = 0; i < NUM_TABLES; ++i) {
299 TableName tableName = TableName.valueOf(String.format("testtb-%04d", i));
300 for (int j = 1; j < NUM_ITEMS; ++j) {
301 procSet.addBack(new TestTableProcedure(i * 100 + j, tableName,
302 TableProcedureInterface.TableOperationType.EDIT));
303 opsCount.incrementAndGet();
304 }
305 }
306 assertEquals(opsCount.get(), queue.size());
307
308 final Thread[] threads = new Thread[NUM_TABLES * 2];
309 final HashSet<TableName> concurrentTables = new HashSet<TableName>();
310 final ArrayList<String> failures = new ArrayList<String>();
311 final AtomicInteger concurrentCount = new AtomicInteger(0);
312 for (int i = 0; i < threads.length; ++i) {
313 threads[i] = new Thread() {
314 @Override
315 public void run() {
316 while (opsCount.get() > 0) {
317 try {
318 TableProcedureInterface proc = procSet.acquire();
319 if (proc == null) {
320 queue.signalAll();
321 if (opsCount.get() > 0) {
322 continue;
323 }
324 break;
325 }
326 synchronized (concurrentTables) {
327 assertTrue("unexpected concurrency on " + proc.getTableName(),
328 concurrentTables.add(proc.getTableName()));
329 }
330 assertTrue(opsCount.decrementAndGet() >= 0);
331 try {
332 long procId = ((Procedure)proc).getProcId();
333 TableName tableId = proc.getTableName();
334 int concurrent = concurrentCount.incrementAndGet();
335 assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_TABLES,
336 concurrent >= 1 && concurrent <= NUM_TABLES);
337 LOG.debug("[S] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
338 Thread.sleep(2000);
339 concurrent = concurrentCount.decrementAndGet();
340 LOG.debug("[E] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
341 assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_TABLES);
342 } finally {
343 synchronized (concurrentTables) {
344 assertTrue(concurrentTables.remove(proc.getTableName()));
345 }
346 procSet.release(proc);
347 }
348 } catch (Throwable e) {
349 LOG.error("Failed " + e.getMessage(), e);
350 synchronized (failures) {
351 failures.add(e.getMessage());
352 }
353 } finally {
354 queue.signalAll();
355 }
356 }
357 }
358 };
359 threads[i].start();
360 }
361 for (int i = 0; i < threads.length; ++i) {
362 threads[i].join();
363 }
364 assertTrue(failures.toString(), failures.isEmpty());
365 assertEquals(0, opsCount.get());
366 assertEquals(0, queue.size());
367
368 for (int i = 1; i <= NUM_TABLES; ++i) {
369 TableName table = TableName.valueOf(String.format("testtb-%04d", i));
370 assertTrue("queue should be deleted, table=" + table, queue.markTableAsDeleted(table));
371 }
372 }
373
374 public static class TestTableProcSet {
375 private final MasterProcedureQueue queue;
376 private Map<Long, TableProcedureInterface> procsMap =
377 new ConcurrentHashMap<Long, TableProcedureInterface>();
378
379 public TestTableProcSet(final MasterProcedureQueue queue) {
380 this.queue = queue;
381 }
382
383 public void addBack(TableProcedureInterface tableProc) {
384 Procedure proc = (Procedure)tableProc;
385 procsMap.put(proc.getProcId(), tableProc);
386 queue.addBack(proc);
387 }
388
389 public void addFront(TableProcedureInterface tableProc) {
390 Procedure proc = (Procedure)tableProc;
391 procsMap.put(proc.getProcId(), tableProc);
392 queue.addFront(proc);
393 }
394
395 public TableProcedureInterface acquire() {
396 TableProcedureInterface proc = null;
397 boolean avail = false;
398 while (!avail) {
399 Long procId = queue.poll();
400 proc = procId != null ? procsMap.remove(procId) : null;
401 if (proc == null) break;
402 switch (proc.getTableOperationType()) {
403 case CREATE:
404 case DELETE:
405 case EDIT:
406 avail = queue.tryAcquireTableWrite(proc.getTableName(),
407 "op="+ proc.getTableOperationType());
408 break;
409 case READ:
410 avail = queue.tryAcquireTableRead(proc.getTableName(),
411 "op="+ proc.getTableOperationType());
412 break;
413 }
414 if (!avail) {
415 addFront(proc);
416 LOG.debug("yield procId=" + procId);
417 }
418 }
419 return proc;
420 }
421
422 public void release(TableProcedureInterface proc) {
423 switch (proc.getTableOperationType()) {
424 case CREATE:
425 case DELETE:
426 case EDIT:
427 queue.releaseTableWrite(proc.getTableName());
428 break;
429 case READ:
430 queue.releaseTableRead(proc.getTableName());
431 break;
432 }
433 }
434 }
435
436 public static class TestTableProcedure extends Procedure<Void>
437 implements TableProcedureInterface {
438 private final TableOperationType opType;
439 private final TableName tableName;
440
441 public TestTableProcedure() {
442 throw new UnsupportedOperationException("recovery should not be triggered here");
443 }
444
445 public TestTableProcedure(long procId, TableName tableName, TableOperationType opType) {
446 this.tableName = tableName;
447 this.opType = opType;
448 setProcId(procId);
449 }
450
451 @Override
452 public TableName getTableName() {
453 return tableName;
454 }
455
456 @Override
457 public TableOperationType getTableOperationType() {
458 return opType;
459 }
460
461 @Override
462 protected Procedure[] execute(Void env) {
463 return null;
464 }
465
466 @Override
467 protected void rollback(Void env) {
468 throw new UnsupportedOperationException();
469 }
470
471 @Override
472 protected boolean abort(Void env) {
473 throw new UnsupportedOperationException();
474 }
475
476 @Override
477 protected void serializeStateData(final OutputStream stream) throws IOException {}
478
479 @Override
480 protected void deserializeStateData(final InputStream stream) throws IOException {}
481 }
482 }