View Javadoc

1   /*
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.master;
21  
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertFalse;
24  import static org.junit.Assert.assertTrue;
25  import static org.junit.Assert.fail;
26  
27  import java.io.IOException;
28  import java.util.List;
29  import java.util.Random;
30  import java.util.concurrent.Callable;
31  import java.util.concurrent.CountDownLatch;
32  import java.util.concurrent.ExecutionException;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.Executors;
35  import java.util.concurrent.Future;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.ChoreService;
41  import org.apache.hadoop.hbase.HBaseTestingUtility;
42  import org.apache.hadoop.hbase.HColumnDescriptor;
43  import org.apache.hadoop.hbase.HRegionInfo;
44  import org.apache.hadoop.hbase.HTableDescriptor;
45  import org.apache.hadoop.hbase.testclassification.LargeTests;
46  import org.apache.hadoop.hbase.InterProcessLock;
47  import org.apache.hadoop.hbase.NotServingRegionException;
48  import org.apache.hadoop.hbase.ScheduledChore;
49  import org.apache.hadoop.hbase.ServerName;
50  import org.apache.hadoop.hbase.TableName;
51  import org.apache.hadoop.hbase.TableNotDisabledException;
52  import org.apache.hadoop.hbase.Waiter;
53  import org.apache.hadoop.hbase.client.Admin;
54  import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver;
55  import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
56  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
57  import org.apache.hadoop.hbase.exceptions.LockTimeoutException;
58  import org.apache.hadoop.hbase.regionserver.HRegion;
59  import org.apache.hadoop.hbase.util.Bytes;
60  import org.apache.hadoop.hbase.util.LoadTestTool;
61  import org.apache.hadoop.hbase.util.StoppableImplementation;
62  import org.apache.hadoop.hbase.util.Threads;
63  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
64  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
65  import org.junit.After;
66  import org.junit.Test;
67  import org.junit.experimental.categories.Category;
68  
69  /**
70   * Tests the default table lock manager
71   */
72  @Category(LargeTests.class)
73  public class TestTableLockManager {
74  
75    private static final Log LOG =
76      LogFactory.getLog(TestTableLockManager.class);
77  
78    private static final TableName TABLE_NAME =
79        TableName.valueOf("TestTableLevelLocks");
80  
81    private static final byte[] FAMILY = Bytes.toBytes("f1");
82  
83    private static final byte[] NEW_FAMILY = Bytes.toBytes("f2");
84  
85    private final HBaseTestingUtility TEST_UTIL =
86      new HBaseTestingUtility();
87  
88    private static final CountDownLatch deleteColumn = new CountDownLatch(1);
89    private static final CountDownLatch addColumn = new CountDownLatch(1);
90  
91    public void prepareMiniCluster() throws Exception {
92      TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
93      TEST_UTIL.startMiniCluster(2);
94      TEST_UTIL.createTable(TABLE_NAME, FAMILY);
95    }
96  
97    public void prepareMiniZkCluster() throws Exception {
98      TEST_UTIL.startMiniZKCluster(1);
99    }
100 
101   @After
102   public void tearDown() throws Exception {
103     TEST_UTIL.shutdownMiniCluster();
104   }
105 
106   public static class TestLockTimeoutExceptionMasterObserver extends BaseMasterObserver {
107     @Override
108     public void preDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
109         TableName tableName, byte[] c) throws IOException {
110       deleteColumn.countDown();
111     }
112     @Override
113     public void postDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
114         TableName tableName, byte[] c) throws IOException {
115       Threads.sleep(10000);
116     }
117 
118     @Override
119     public void preAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
120         TableName tableName, HColumnDescriptor column) throws IOException {
121       fail("Add column should have timeouted out for acquiring the table lock");
122     }
123   }
124 
125   @Test(timeout = 600000)
126   public void testAlterAndDisable() throws Exception {
127     prepareMiniCluster();
128     // Send a request to alter a table, then sleep during
129     // the alteration phase. In the mean time, from another
130     // thread, send a request to disable, and then delete a table.
131 
132     HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
133     master.getMasterCoprocessorHost().load(TestAlterAndDisableMasterObserver.class,
134             0, TEST_UTIL.getConfiguration());
135 
136     ExecutorService executor = Executors.newFixedThreadPool(2);
137     Future<Object> alterTableFuture = executor.submit(new Callable<Object>() {
138       @Override
139       public Object call() throws Exception {
140         Admin admin = TEST_UTIL.getHBaseAdmin();
141         admin.addColumn(TABLE_NAME, new HColumnDescriptor(NEW_FAMILY));
142         LOG.info("Added new column family");
143         HTableDescriptor tableDesc = admin.getTableDescriptor(TABLE_NAME);
144         assertTrue(tableDesc.getFamiliesKeys().contains(NEW_FAMILY));
145         return null;
146       }
147     });
148     Future<Object> disableTableFuture = executor.submit(new Callable<Object>() {
149       @Override
150       public Object call() throws Exception {
151         Admin admin = TEST_UTIL.getHBaseAdmin();
152         admin.disableTable(TABLE_NAME);
153         assertTrue(admin.isTableDisabled(TABLE_NAME));
154         admin.deleteTable(TABLE_NAME);
155         assertFalse(admin.tableExists(TABLE_NAME));
156         return null;
157       }
158     });
159 
160     try {
161       disableTableFuture.get();
162       alterTableFuture.get();
163     } catch (ExecutionException e) {
164       if (e.getCause() instanceof AssertionError) {
165         throw (AssertionError) e.getCause();
166       }
167       throw e;
168     }
169   }
170 
171   public static class TestAlterAndDisableMasterObserver extends BaseMasterObserver {
172     @Override
173     public void preAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
174         TableName tableName, HColumnDescriptor column) throws IOException {
175       LOG.debug("addColumn called");
176       addColumn.countDown();
177     }
178 
179     @Override
180     public void postAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
181         TableName tableName, HColumnDescriptor column) throws IOException {
182       Threads.sleep(6000);
183       try {
184         ctx.getEnvironment().getMasterServices().checkTableModifiable(tableName);
185       } catch(TableNotDisabledException expected) {
186         //pass
187         return;
188       } catch(IOException ex) {
189       }
190       fail("was expecting the table to be enabled");
191     }
192 
193     @Override
194     public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
195                                 TableName tableName) throws IOException {
196       try {
197         LOG.debug("Waiting for addColumn to be processed first");
198         //wait for addColumn to be processed first
199         addColumn.await();
200         LOG.debug("addColumn started, we can continue");
201       } catch (InterruptedException ex) {
202         LOG.warn("Sleep interrupted while waiting for addColumn countdown");
203       }
204     }
205 
206     @Override
207     public void postDisableTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
208                                         TableName tableName) throws IOException {
209       Threads.sleep(3000);
210     }
211   }
212 
213   @Test(timeout = 600000)
214   public void testDelete() throws Exception {
215     prepareMiniCluster();
216 
217     Admin admin = TEST_UTIL.getHBaseAdmin();
218     admin.disableTable(TABLE_NAME);
219     admin.deleteTable(TABLE_NAME);
220 
221     //ensure that znode for the table node has been deleted
222     final ZooKeeperWatcher zkWatcher = TEST_UTIL.getZooKeeperWatcher();
223     final String znode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, TABLE_NAME.getNameAsString());
224 
225     TEST_UTIL.waitFor(5000, new Waiter.Predicate<Exception>() {
226       @Override
227       public boolean evaluate() throws Exception {
228         int ver = ZKUtil.checkExists(zkWatcher, znode);
229         return ver < 0;
230       }
231     });
232     int ver = ZKUtil.checkExists(zkWatcher,
233       ZKUtil.joinZNode(zkWatcher.tableLockZNode, TABLE_NAME.getNameAsString()));
234     assertTrue("Unexpected znode version " + ver, ver < 0);
235 
236   }
237 
238   public class TableLockCounter implements InterProcessLock.MetadataHandler {
239 
240     private int lockCount = 0;
241 
242     @Override
243     public void handleMetadata(byte[] metadata) {
244       lockCount++;
245     }
246 
247     public void reset() {
248       lockCount = 0;
249     }
250 
251     public int getLockCount() {
252       return lockCount;
253     }
254   }
255 
256   @Test(timeout = 600000)
257   public void testReapAllTableLocks() throws Exception {
258     prepareMiniZkCluster();
259     ServerName serverName = ServerName.valueOf("localhost:10000", 0);
260     final TableLockManager lockManager = TableLockManager.createTableLockManager(
261         TEST_UTIL.getConfiguration(), TEST_UTIL.getZooKeeperWatcher(), serverName);
262 
263     String tables[] = {"table1", "table2", "table3", "table4"};
264     ExecutorService executor = Executors.newFixedThreadPool(6);
265 
266     final CountDownLatch writeLocksObtained = new CountDownLatch(4);
267     final CountDownLatch writeLocksAttempted = new CountDownLatch(10);
268     //TODO: read lock tables
269 
270     //6 threads will be stuck waiting for the table lock
271     for (int i = 0; i < tables.length; i++) {
272       final String table = tables[i];
273       for (int j = 0; j < i+1; j++) { //i+1 write locks attempted for table[i]
274         executor.submit(new Callable<Void>() {
275           @Override
276           public Void call() throws Exception {
277             writeLocksAttempted.countDown();
278             lockManager.writeLock(TableName.valueOf(table),
279                     "testReapAllTableLocks").acquire();
280             writeLocksObtained.countDown();
281             return null;
282           }
283         });
284       }
285     }
286 
287     writeLocksObtained.await();
288     writeLocksAttempted.await();
289 
290     TableLockCounter counter = new TableLockCounter();
291     do {
292       counter.reset();
293       lockManager.visitAllLocks(counter);
294       Thread.sleep(10);
295     } while (counter.getLockCount() != 10);
296 
297     //now reap all table locks
298     lockManager.reapWriteLocks();
299     TEST_UTIL.getConfiguration().setInt(TableLockManager.TABLE_WRITE_LOCK_TIMEOUT_MS, 0);
300     TableLockManager zeroTimeoutLockManager = TableLockManager.createTableLockManager(
301           TEST_UTIL.getConfiguration(), TEST_UTIL.getZooKeeperWatcher(), serverName);
302 
303     //should not throw table lock timeout exception
304     zeroTimeoutLockManager.writeLock(
305         TableName.valueOf(tables[tables.length - 1]),
306         "zero timeout")
307       .acquire();
308 
309     executor.shutdownNow();
310   }
311 
312   @Test(timeout = 600000)
313   public void testTableReadLock() throws Exception {
314     // test plan: write some data to the table. Continuously alter the table and
315     // force splits
316     // concurrently until we have 5 regions. verify the data just in case.
317     // Every region should contain the same table descriptor
318     // This is not an exact test
319     prepareMiniCluster();
320     LoadTestTool loadTool = new LoadTestTool();
321     loadTool.setConf(TEST_UTIL.getConfiguration());
322     int numKeys = 10000;
323     final TableName tableName = TableName.valueOf("testTableReadLock");
324     final Admin admin = TEST_UTIL.getHBaseAdmin();
325     final HTableDescriptor desc = new HTableDescriptor(tableName);
326     final byte[] family = Bytes.toBytes("test_cf");
327     desc.addFamily(new HColumnDescriptor(family));
328     admin.createTable(desc); // create with one region
329 
330     // write some data, not much
331     int ret = loadTool.run(new String[] { "-tn", tableName.getNameAsString(), "-write",
332         String.format("%d:%d:%d", 1, 10, 10), "-num_keys", String.valueOf(numKeys), "-skip_init" });
333     if (0 != ret) {
334       String errorMsg = "Load failed with error code " + ret;
335       LOG.error(errorMsg);
336       fail(errorMsg);
337     }
338 
339     int familyValues = admin.getTableDescriptor(tableName).getFamily(family).getValues().size();
340     StoppableImplementation stopper = new StoppableImplementation();
341     final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
342 
343     //alter table every 10 sec
344     ScheduledChore alterThread = new ScheduledChore("Alter Chore", stopper, 10000) {
345       @Override
346       protected void chore() {
347         Random random = new Random();
348         try {
349           HTableDescriptor htd = admin.getTableDescriptor(tableName);
350           String val = String.valueOf(random.nextInt());
351           htd.getFamily(family).setValue(val, val);
352           desc.getFamily(family).setValue(val, val); // save it for later
353                                                      // control
354           admin.modifyTable(tableName, htd);
355         } catch (Exception ex) {
356           LOG.warn("Caught exception", ex);
357           fail(ex.getMessage());
358         }
359       }
360     };
361 
362     //split table every 5 sec
363     ScheduledChore splitThread = new ScheduledChore("Split thread", stopper, 5000) {
364       @Override
365       public void chore() {
366         try {
367           HRegion region = TEST_UTIL.getSplittableRegion(tableName, -1);
368           if (region != null) {
369             byte[] regionName = region.getRegionInfo().getRegionName();
370             admin.flushRegion(regionName);
371             admin.compactRegion(regionName);
372             admin.splitRegion(regionName);
373           } else {
374             LOG.warn("Could not find suitable region for the table.  Possibly the " +
375               "region got closed and the attempts got over before " +
376               "the region could have got reassigned.");
377           }
378         } catch (NotServingRegionException nsre) {
379           // the region may be in transition
380           LOG.warn("Caught exception", nsre);
381         } catch (Exception ex) {
382           LOG.warn("Caught exception", ex);
383           fail(ex.getMessage());
384         }
385       }
386     };
387 
388     choreService.scheduleChore(alterThread);
389     choreService.scheduleChore(splitThread);
390     TEST_UTIL.waitTableEnabled(tableName);
391     while (true) {
392       List<HRegionInfo> regions = admin.getTableRegions(tableName);
393       LOG.info(String.format("Table #regions: %d regions: %s:", regions.size(), regions));
394       assertEquals(admin.getTableDescriptor(tableName), desc);
395       for (HRegion region : TEST_UTIL.getMiniHBaseCluster().getRegions(tableName)) {
396         assertEquals(desc, region.getTableDesc());
397       }
398       if (regions.size() >= 5) {
399         break;
400       }
401       Threads.sleep(1000);
402     }
403     stopper.stop("test finished");
404 
405     int newFamilyValues = admin.getTableDescriptor(tableName).getFamily(family).getValues().size();
406     LOG.info(String.format("Altered the table %d times", newFamilyValues - familyValues));
407     assertTrue(newFamilyValues > familyValues); // at least one alter went
408                                                 // through
409 
410     ret = loadTool.run(new String[] { "-tn", tableName.getNameAsString(), "-read", "100:10",
411         "-num_keys", String.valueOf(numKeys), "-skip_init" });
412     if (0 != ret) {
413       String errorMsg = "Verify failed with error code " + ret;
414       LOG.error(errorMsg);
415       fail(errorMsg);
416     }
417 
418     admin.close();
419     choreService.shutdown();
420   }
421 
422 }