View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.master.procedure;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  import java.util.concurrent.atomic.AtomicBoolean;
25  import java.util.concurrent.atomic.AtomicInteger;
26  import java.util.concurrent.ConcurrentHashMap;
27  import java.util.ArrayList;
28  import java.util.HashSet;
29  import java.util.Map;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.hbase.HBaseConfiguration;
35  import org.apache.hadoop.hbase.TableName;
36  import org.apache.hadoop.hbase.master.TableLockManager;
37  import org.apache.hadoop.hbase.procedure2.Procedure;
38  import org.apache.hadoop.hbase.testclassification.SmallTests;
39  import org.junit.After;
40  import org.junit.Assert;
41  import org.junit.Before;
42  import org.junit.Test;
43  import org.junit.experimental.categories.Category;
44  
45  import static org.junit.Assert.assertEquals;
46  import static org.junit.Assert.assertFalse;
47  import static org.junit.Assert.assertTrue;
48  import static org.junit.Assert.fail;
49  
50  @Category(SmallTests.class)
51  public class TestMasterProcedureQueue {
52    private static final Log LOG = LogFactory.getLog(TestMasterProcedureQueue.class);
53  
54    private MasterProcedureQueue queue;
55    private Configuration conf;
56  
57    @Before
58    public void setUp() throws IOException {
59      conf = HBaseConfiguration.create();
60      queue = new MasterProcedureQueue(conf, new TableLockManager.NullTableLockManager());
61    }
62  
63    @After
64    public void tearDown() throws IOException {
65      assertEquals(0, queue.size());
66    }
67  
68    @Test
69    public void testConcurrentCreateDelete() throws Exception {
70      final MasterProcedureQueue procQueue = queue;
71      final TableName table = TableName.valueOf("testtb");
72      final AtomicBoolean running = new AtomicBoolean(true);
73      final AtomicBoolean failure = new AtomicBoolean(false);
74      Thread createThread = new Thread() {
75        @Override
76        public void run() {
77          try {
78            while (running.get() && !failure.get()) {
79              if (procQueue.tryAcquireTableWrite(table, "create")) {
80                procQueue.releaseTableWrite(table);
81              }
82            }
83          } catch (Throwable e) {
84            LOG.error("create failed", e);
85            failure.set(true);
86          }
87        }
88      };
89  
90      Thread deleteThread = new Thread() {
91        @Override
92        public void run() {
93          try {
94            while (running.get() && !failure.get()) {
95              if (procQueue.tryAcquireTableWrite(table, "delete")) {
96                procQueue.releaseTableWrite(table);
97              }
98              procQueue.markTableAsDeleted(table);
99            }
100         } catch (Throwable e) {
101           LOG.error("delete failed", e);
102           failure.set(true);
103         }
104       }
105     };
106 
107     createThread.start();
108     deleteThread.start();
109     for (int i = 0; i < 100 && running.get() && !failure.get(); ++i) {
110       Thread.sleep(100);
111     }
112     running.set(false);
113     createThread.join();
114     deleteThread.join();
115     assertEquals(false, failure.get());
116   }
117 
118   /**
119    * Verify simple create/insert/fetch/delete of the table queue.
120    */
121   @Test
122   public void testSimpleTableOpsQueues() throws Exception {
123     final int NUM_TABLES = 10;
124     final int NUM_ITEMS = 10;
125 
126     int count = 0;
127     for (int i = 1; i <= NUM_TABLES; ++i) {
128       TableName tableName = TableName.valueOf(String.format("test-%04d", i));
129       // insert items
130       for (int j = 1; j <= NUM_ITEMS; ++j) {
131         queue.addBack(new TestTableProcedure(i * 1000 + j, tableName,
132           TableProcedureInterface.TableOperationType.EDIT));
133         assertEquals(++count, queue.size());
134       }
135     }
136     assertEquals(NUM_TABLES * NUM_ITEMS, queue.size());
137 
138     for (int j = 1; j <= NUM_ITEMS; ++j) {
139       for (int i = 1; i <= NUM_TABLES; ++i) {
140         Long procId = queue.poll();
141         assertEquals(--count, queue.size());
142         assertEquals(i * 1000 + j, procId.longValue());
143       }
144     }
145     assertEquals(0, queue.size());
146 
147     for (int i = 1; i <= NUM_TABLES; ++i) {
148       TableName tableName = TableName.valueOf(String.format("test-%04d", i));
149       // complete the table deletion
150       assertTrue(queue.markTableAsDeleted(tableName));
151     }
152   }
153 
154   /**
155    * Check that the table queue is not deletable until every procedure
156    * in-progress is completed (this is a special case for write-locks).
157    */
158   @Test
159   public void testCreateDeleteTableOperationsWithWriteLock() throws Exception {
160     TableName tableName = TableName.valueOf("testtb");
161 
162     queue.addBack(new TestTableProcedure(1, tableName,
163           TableProcedureInterface.TableOperationType.EDIT));
164 
165     // table can't be deleted because one item is in the queue
166     assertFalse(queue.markTableAsDeleted(tableName));
167 
168     // fetch item and take a lock
169     assertEquals(1, queue.poll().longValue());
170     // take the xlock
171     assertTrue(queue.tryAcquireTableWrite(tableName, "write"));
172     // table can't be deleted because we have the lock
173     assertEquals(0, queue.size());
174     assertFalse(queue.markTableAsDeleted(tableName));
175     // release the xlock
176     queue.releaseTableWrite(tableName);
177     // complete the table deletion
178     assertTrue(queue.markTableAsDeleted(tableName));
179   }
180 
181   /**
182    * Check that the table queue is not deletable until every procedure
183    * in-progress is completed (this is a special case for read-locks).
184    */
185   @Test
186   public void testCreateDeleteTableOperationsWithReadLock() throws Exception {
187     final TableName tableName = TableName.valueOf("testtb");
188     final int nitems = 2;
189 
190     for (int i = 1; i <= nitems; ++i) {
191       queue.addBack(new TestTableProcedure(i, tableName,
192             TableProcedureInterface.TableOperationType.READ));
193     }
194 
195     // table can't be deleted because one item is in the queue
196     assertFalse(queue.markTableAsDeleted(tableName));
197 
198     for (int i = 1; i <= nitems; ++i) {
199       // fetch item and take a lock
200       assertEquals(i, queue.poll().longValue());
201       // take the rlock
202       assertTrue(queue.tryAcquireTableRead(tableName, "read " + i));
203       // table can't be deleted because we have locks and/or items in the queue
204       assertFalse(queue.markTableAsDeleted(tableName));
205     }
206 
207     for (int i = 1; i <= nitems; ++i) {
208       // table can't be deleted because we have locks
209       assertFalse(queue.markTableAsDeleted(tableName));
210       // release the rlock
211       queue.releaseTableRead(tableName);
212     }
213 
214     // there are no items and no lock in the queeu
215     assertEquals(0, queue.size());
216     // complete the table deletion
217     assertTrue(queue.markTableAsDeleted(tableName));
218   }
219 
220   /**
221    * Verify the correct logic of RWLocks on the queue
222    */
223   @Test
224   public void testVerifyRwLocks() throws Exception {
225     TableName tableName = TableName.valueOf("testtb");
226     queue.addBack(new TestTableProcedure(1, tableName,
227           TableProcedureInterface.TableOperationType.EDIT));
228     queue.addBack(new TestTableProcedure(2, tableName,
229           TableProcedureInterface.TableOperationType.READ));
230     queue.addBack(new TestTableProcedure(3, tableName,
231           TableProcedureInterface.TableOperationType.EDIT));
232     queue.addBack(new TestTableProcedure(4, tableName,
233           TableProcedureInterface.TableOperationType.READ));
234     queue.addBack(new TestTableProcedure(5, tableName,
235           TableProcedureInterface.TableOperationType.READ));
236 
237     // Fetch the 1st item and take the write lock
238     Long procId = queue.poll();
239     assertEquals(1, procId.longValue());
240     assertEquals(true, queue.tryAcquireTableWrite(tableName, "write " + procId));
241 
242     // Fetch the 2nd item and verify that the lock can't be acquired
243     assertEquals(null, queue.poll());
244 
245     // Release the write lock and acquire the read lock
246     queue.releaseTableWrite(tableName);
247 
248     // Fetch the 2nd item and take the read lock
249     procId = queue.poll();
250     assertEquals(2, procId.longValue());
251     assertEquals(true, queue.tryAcquireTableRead(tableName, "read " + procId));
252 
253     // Fetch the 3rd item and verify that the lock can't be acquired
254     procId = queue.poll();
255     assertEquals(3, procId.longValue());
256     assertEquals(false, queue.tryAcquireTableWrite(tableName, "write " + procId));
257 
258     // release the rdlock of item 2 and take the wrlock for the 3d item
259     queue.releaseTableRead(tableName);
260     assertEquals(true, queue.tryAcquireTableWrite(tableName, "write " + procId));
261 
262     // Fetch 4th item and verify that the lock can't be acquired
263     assertEquals(null, queue.poll());
264 
265     // Release the write lock and acquire the read lock
266     queue.releaseTableWrite(tableName);
267 
268     // Fetch the 4th item and take the read lock
269     procId = queue.poll();
270     assertEquals(4, procId.longValue());
271     assertEquals(true, queue.tryAcquireTableRead(tableName, "read " + procId));
272 
273     // Fetch the 4th item and take the read lock
274     procId = queue.poll();
275     assertEquals(5, procId.longValue());
276     assertEquals(true, queue.tryAcquireTableRead(tableName, "read " + procId));
277 
278     // Release 4th and 5th read-lock
279     queue.releaseTableRead(tableName);
280     queue.releaseTableRead(tableName);
281 
282     // remove table queue
283     assertEquals(0, queue.size());
284     assertTrue("queue should be deleted", queue.markTableAsDeleted(tableName));
285   }
286 
287   /**
288    * Verify that "write" operations for a single table are serialized,
289    * but different tables can be executed in parallel.
290    */
291   @Test(timeout=90000)
292   public void testConcurrentWriteOps() throws Exception {
293     final TestTableProcSet procSet = new TestTableProcSet(queue);
294 
295     final int NUM_ITEMS = 10;
296     final int NUM_TABLES = 4;
297     final AtomicInteger opsCount = new AtomicInteger(0);
298     for (int i = 0; i < NUM_TABLES; ++i) {
299       TableName tableName = TableName.valueOf(String.format("testtb-%04d", i));
300       for (int j = 1; j < NUM_ITEMS; ++j) {
301         procSet.addBack(new TestTableProcedure(i * 100 + j, tableName,
302           TableProcedureInterface.TableOperationType.EDIT));
303         opsCount.incrementAndGet();
304       }
305     }
306     assertEquals(opsCount.get(), queue.size());
307 
308     final Thread[] threads = new Thread[NUM_TABLES * 2];
309     final HashSet<TableName> concurrentTables = new HashSet<TableName>();
310     final ArrayList<String> failures = new ArrayList<String>();
311     final AtomicInteger concurrentCount = new AtomicInteger(0);
312     for (int i = 0; i < threads.length; ++i) {
313       threads[i] = new Thread() {
314         @Override
315         public void run() {
316           while (opsCount.get() > 0) {
317             try {
318               TableProcedureInterface proc = procSet.acquire();
319               if (proc == null) {
320                 queue.signalAll();
321                 if (opsCount.get() > 0) {
322                   continue;
323                 }
324                 break;
325               }
326               synchronized (concurrentTables) {
327                 assertTrue("unexpected concurrency on " + proc.getTableName(),
328                   concurrentTables.add(proc.getTableName()));
329               }
330               assertTrue(opsCount.decrementAndGet() >= 0);
331               try {
332                 long procId = ((Procedure)proc).getProcId();
333                 TableName tableId = proc.getTableName();
334                 int concurrent = concurrentCount.incrementAndGet();
335                 assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_TABLES,
336                   concurrent >= 1 && concurrent <= NUM_TABLES);
337                 LOG.debug("[S] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
338                 Thread.sleep(2000);
339                 concurrent = concurrentCount.decrementAndGet();
340                 LOG.debug("[E] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
341                 assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_TABLES);
342               } finally {
343                 synchronized (concurrentTables) {
344                   assertTrue(concurrentTables.remove(proc.getTableName()));
345                 }
346                 procSet.release(proc);
347               }
348             } catch (Throwable e) {
349               LOG.error("Failed " + e.getMessage(), e);
350               synchronized (failures) {
351                 failures.add(e.getMessage());
352               }
353             } finally {
354               queue.signalAll();
355             }
356           }
357         }
358       };
359       threads[i].start();
360     }
361     for (int i = 0; i < threads.length; ++i) {
362       threads[i].join();
363     }
364     assertTrue(failures.toString(), failures.isEmpty());
365     assertEquals(0, opsCount.get());
366     assertEquals(0, queue.size());
367 
368     for (int i = 1; i <= NUM_TABLES; ++i) {
369       TableName table = TableName.valueOf(String.format("testtb-%04d", i));
370       assertTrue("queue should be deleted, table=" + table, queue.markTableAsDeleted(table));
371     }
372   }
373 
374   public static class TestTableProcSet {
375     private final MasterProcedureQueue queue;
376     private Map<Long, TableProcedureInterface> procsMap =
377       new ConcurrentHashMap<Long, TableProcedureInterface>();
378 
379     public TestTableProcSet(final MasterProcedureQueue queue) {
380       this.queue = queue;
381     }
382 
383     public void addBack(TableProcedureInterface tableProc) {
384       Procedure proc = (Procedure)tableProc;
385       procsMap.put(proc.getProcId(), tableProc);
386       queue.addBack(proc);
387     }
388 
389     public void addFront(TableProcedureInterface tableProc) {
390       Procedure proc = (Procedure)tableProc;
391       procsMap.put(proc.getProcId(), tableProc);
392       queue.addFront(proc);
393     }
394 
395     public TableProcedureInterface acquire() {
396       TableProcedureInterface proc = null;
397       boolean avail = false;
398       while (!avail) {
399         Long procId = queue.poll();
400         proc = procId != null ? procsMap.remove(procId) : null;
401         if (proc == null) break;
402         switch (proc.getTableOperationType()) {
403           case CREATE:
404           case DELETE:
405           case EDIT:
406             avail = queue.tryAcquireTableWrite(proc.getTableName(),
407               "op="+ proc.getTableOperationType());
408             break;
409           case READ:
410             avail = queue.tryAcquireTableRead(proc.getTableName(),
411               "op="+ proc.getTableOperationType());
412             break;
413         }
414         if (!avail) {
415           addFront(proc);
416           LOG.debug("yield procId=" + procId);
417         }
418       }
419       return proc;
420     }
421 
422     public void release(TableProcedureInterface proc) {
423       switch (proc.getTableOperationType()) {
424         case CREATE:
425         case DELETE:
426         case EDIT:
427           queue.releaseTableWrite(proc.getTableName());
428           break;
429         case READ:
430           queue.releaseTableRead(proc.getTableName());
431           break;
432       }
433     }
434   }
435 
436   public static class TestTableProcedure extends Procedure<Void>
437       implements TableProcedureInterface {
438     private final TableOperationType opType;
439     private final TableName tableName;
440 
441     public TestTableProcedure() {
442       throw new UnsupportedOperationException("recovery should not be triggered here");
443     }
444 
445     public TestTableProcedure(long procId, TableName tableName, TableOperationType opType) {
446       this.tableName = tableName;
447       this.opType = opType;
448       setProcId(procId);
449     }
450 
451     @Override
452     public TableName getTableName() {
453       return tableName;
454     }
455 
456     @Override
457     public TableOperationType getTableOperationType() {
458       return opType;
459     }
460 
461     @Override
462     protected Procedure[] execute(Void env) {
463       return null;
464     }
465 
466     @Override
467     protected void rollback(Void env) {
468       throw new UnsupportedOperationException();
469     }
470 
471     @Override
472     protected boolean abort(Void env) {
473       throw new UnsupportedOperationException();
474     }
475 
476     @Override
477     protected void serializeStateData(final OutputStream stream) throws IOException {}
478 
479     @Override
480     protected void deserializeStateData(final InputStream stream) throws IOException {}
481   }
482 }