1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.master;
21
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertFalse;
24 import static org.junit.Assert.assertTrue;
25 import static org.junit.Assert.fail;
26
27 import java.io.IOException;
28 import java.util.List;
29 import java.util.Random;
30 import java.util.concurrent.Callable;
31 import java.util.concurrent.CountDownLatch;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.Future;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.hbase.ChoreService;
41 import org.apache.hadoop.hbase.HBaseTestingUtility;
42 import org.apache.hadoop.hbase.HColumnDescriptor;
43 import org.apache.hadoop.hbase.HRegionInfo;
44 import org.apache.hadoop.hbase.HTableDescriptor;
45 import org.apache.hadoop.hbase.testclassification.LargeTests;
46 import org.apache.hadoop.hbase.InterProcessLock;
47 import org.apache.hadoop.hbase.NotServingRegionException;
48 import org.apache.hadoop.hbase.ScheduledChore;
49 import org.apache.hadoop.hbase.ServerName;
50 import org.apache.hadoop.hbase.TableName;
51 import org.apache.hadoop.hbase.TableNotDisabledException;
52 import org.apache.hadoop.hbase.Waiter;
53 import org.apache.hadoop.hbase.client.Admin;
54 import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver;
55 import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
56 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
57 import org.apache.hadoop.hbase.exceptions.LockTimeoutException;
58 import org.apache.hadoop.hbase.regionserver.HRegion;
59 import org.apache.hadoop.hbase.util.Bytes;
60 import org.apache.hadoop.hbase.util.LoadTestTool;
61 import org.apache.hadoop.hbase.util.StoppableImplementation;
62 import org.apache.hadoop.hbase.util.Threads;
63 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
64 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
65 import org.junit.After;
66 import org.junit.Test;
67 import org.junit.experimental.categories.Category;
68
69
70
71
72 @Category(LargeTests.class)
73 public class TestTableLockManager {
74
75 private static final Log LOG =
76 LogFactory.getLog(TestTableLockManager.class);
77
78 private static final TableName TABLE_NAME =
79 TableName.valueOf("TestTableLevelLocks");
80
81 private static final byte[] FAMILY = Bytes.toBytes("f1");
82
83 private static final byte[] NEW_FAMILY = Bytes.toBytes("f2");
84
85 private final HBaseTestingUtility TEST_UTIL =
86 new HBaseTestingUtility();
87
88 private static final CountDownLatch deleteColumn = new CountDownLatch(1);
89 private static final CountDownLatch addColumn = new CountDownLatch(1);
90
91 public void prepareMiniCluster() throws Exception {
92 TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
93 TEST_UTIL.startMiniCluster(2);
94 TEST_UTIL.createTable(TABLE_NAME, FAMILY);
95 }
96
97 public void prepareMiniZkCluster() throws Exception {
98 TEST_UTIL.startMiniZKCluster(1);
99 }
100
101 @After
102 public void tearDown() throws Exception {
103 TEST_UTIL.shutdownMiniCluster();
104 }
105
106 public static class TestLockTimeoutExceptionMasterObserver extends BaseMasterObserver {
107 @Override
108 public void preDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
109 TableName tableName, byte[] c) throws IOException {
110 deleteColumn.countDown();
111 }
112 @Override
113 public void postDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
114 TableName tableName, byte[] c) throws IOException {
115 Threads.sleep(10000);
116 }
117
118 @Override
119 public void preAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
120 TableName tableName, HColumnDescriptor column) throws IOException {
121 fail("Add column should have timeouted out for acquiring the table lock");
122 }
123 }
124
125 @Test(timeout = 600000)
126 public void testAlterAndDisable() throws Exception {
127 prepareMiniCluster();
128
129
130
131
132 HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
133 master.getMasterCoprocessorHost().load(TestAlterAndDisableMasterObserver.class,
134 0, TEST_UTIL.getConfiguration());
135
136 ExecutorService executor = Executors.newFixedThreadPool(2);
137 Future<Object> alterTableFuture = executor.submit(new Callable<Object>() {
138 @Override
139 public Object call() throws Exception {
140 Admin admin = TEST_UTIL.getHBaseAdmin();
141 admin.addColumn(TABLE_NAME, new HColumnDescriptor(NEW_FAMILY));
142 LOG.info("Added new column family");
143 HTableDescriptor tableDesc = admin.getTableDescriptor(TABLE_NAME);
144 assertTrue(tableDesc.getFamiliesKeys().contains(NEW_FAMILY));
145 return null;
146 }
147 });
148 Future<Object> disableTableFuture = executor.submit(new Callable<Object>() {
149 @Override
150 public Object call() throws Exception {
151 Admin admin = TEST_UTIL.getHBaseAdmin();
152 admin.disableTable(TABLE_NAME);
153 assertTrue(admin.isTableDisabled(TABLE_NAME));
154 admin.deleteTable(TABLE_NAME);
155 assertFalse(admin.tableExists(TABLE_NAME));
156 return null;
157 }
158 });
159
160 try {
161 disableTableFuture.get();
162 alterTableFuture.get();
163 } catch (ExecutionException e) {
164 if (e.getCause() instanceof AssertionError) {
165 throw (AssertionError) e.getCause();
166 }
167 throw e;
168 }
169 }
170
171 public static class TestAlterAndDisableMasterObserver extends BaseMasterObserver {
172 @Override
173 public void preAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
174 TableName tableName, HColumnDescriptor column) throws IOException {
175 LOG.debug("addColumn called");
176 addColumn.countDown();
177 }
178
179 @Override
180 public void postAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
181 TableName tableName, HColumnDescriptor column) throws IOException {
182 Threads.sleep(6000);
183 try {
184 ctx.getEnvironment().getMasterServices().checkTableModifiable(tableName);
185 } catch(TableNotDisabledException expected) {
186
187 return;
188 } catch(IOException ex) {
189 }
190 fail("was expecting the table to be enabled");
191 }
192
193 @Override
194 public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
195 TableName tableName) throws IOException {
196 try {
197 LOG.debug("Waiting for addColumn to be processed first");
198
199 addColumn.await();
200 LOG.debug("addColumn started, we can continue");
201 } catch (InterruptedException ex) {
202 LOG.warn("Sleep interrupted while waiting for addColumn countdown");
203 }
204 }
205
206 @Override
207 public void postDisableTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
208 TableName tableName) throws IOException {
209 Threads.sleep(3000);
210 }
211 }
212
213 @Test(timeout = 600000)
214 public void testDelete() throws Exception {
215 prepareMiniCluster();
216
217 Admin admin = TEST_UTIL.getHBaseAdmin();
218 admin.disableTable(TABLE_NAME);
219 admin.deleteTable(TABLE_NAME);
220
221
222 final ZooKeeperWatcher zkWatcher = TEST_UTIL.getZooKeeperWatcher();
223 final String znode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, TABLE_NAME.getNameAsString());
224
225 TEST_UTIL.waitFor(5000, new Waiter.Predicate<Exception>() {
226 @Override
227 public boolean evaluate() throws Exception {
228 int ver = ZKUtil.checkExists(zkWatcher, znode);
229 return ver < 0;
230 }
231 });
232 int ver = ZKUtil.checkExists(zkWatcher,
233 ZKUtil.joinZNode(zkWatcher.tableLockZNode, TABLE_NAME.getNameAsString()));
234 assertTrue("Unexpected znode version " + ver, ver < 0);
235
236 }
237
238 public class TableLockCounter implements InterProcessLock.MetadataHandler {
239
240 private int lockCount = 0;
241
242 @Override
243 public void handleMetadata(byte[] metadata) {
244 lockCount++;
245 }
246
247 public void reset() {
248 lockCount = 0;
249 }
250
251 public int getLockCount() {
252 return lockCount;
253 }
254 }
255
256 @Test(timeout = 600000)
257 public void testReapAllTableLocks() throws Exception {
258 prepareMiniZkCluster();
259 ServerName serverName = ServerName.valueOf("localhost:10000", 0);
260 final TableLockManager lockManager = TableLockManager.createTableLockManager(
261 TEST_UTIL.getConfiguration(), TEST_UTIL.getZooKeeperWatcher(), serverName);
262
263 String tables[] = {"table1", "table2", "table3", "table4"};
264 ExecutorService executor = Executors.newFixedThreadPool(6);
265
266 final CountDownLatch writeLocksObtained = new CountDownLatch(4);
267 final CountDownLatch writeLocksAttempted = new CountDownLatch(10);
268
269
270
271 for (int i = 0; i < tables.length; i++) {
272 final String table = tables[i];
273 for (int j = 0; j < i+1; j++) {
274 executor.submit(new Callable<Void>() {
275 @Override
276 public Void call() throws Exception {
277 writeLocksAttempted.countDown();
278 lockManager.writeLock(TableName.valueOf(table),
279 "testReapAllTableLocks").acquire();
280 writeLocksObtained.countDown();
281 return null;
282 }
283 });
284 }
285 }
286
287 writeLocksObtained.await();
288 writeLocksAttempted.await();
289
290 TableLockCounter counter = new TableLockCounter();
291 do {
292 counter.reset();
293 lockManager.visitAllLocks(counter);
294 Thread.sleep(10);
295 } while (counter.getLockCount() != 10);
296
297
298 lockManager.reapWriteLocks();
299 TEST_UTIL.getConfiguration().setInt(TableLockManager.TABLE_WRITE_LOCK_TIMEOUT_MS, 0);
300 TableLockManager zeroTimeoutLockManager = TableLockManager.createTableLockManager(
301 TEST_UTIL.getConfiguration(), TEST_UTIL.getZooKeeperWatcher(), serverName);
302
303
304 zeroTimeoutLockManager.writeLock(
305 TableName.valueOf(tables[tables.length - 1]),
306 "zero timeout")
307 .acquire();
308
309 executor.shutdownNow();
310 }
311
312 @Test(timeout = 600000)
313 public void testTableReadLock() throws Exception {
314
315
316
317
318
319 prepareMiniCluster();
320 LoadTestTool loadTool = new LoadTestTool();
321 loadTool.setConf(TEST_UTIL.getConfiguration());
322 int numKeys = 10000;
323 final TableName tableName = TableName.valueOf("testTableReadLock");
324 final Admin admin = TEST_UTIL.getHBaseAdmin();
325 final HTableDescriptor desc = new HTableDescriptor(tableName);
326 final byte[] family = Bytes.toBytes("test_cf");
327 desc.addFamily(new HColumnDescriptor(family));
328 admin.createTable(desc);
329
330
331 int ret = loadTool.run(new String[] { "-tn", tableName.getNameAsString(), "-write",
332 String.format("%d:%d:%d", 1, 10, 10), "-num_keys", String.valueOf(numKeys), "-skip_init" });
333 if (0 != ret) {
334 String errorMsg = "Load failed with error code " + ret;
335 LOG.error(errorMsg);
336 fail(errorMsg);
337 }
338
339 int familyValues = admin.getTableDescriptor(tableName).getFamily(family).getValues().size();
340 StoppableImplementation stopper = new StoppableImplementation();
341 final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
342
343
344 ScheduledChore alterThread = new ScheduledChore("Alter Chore", stopper, 10000) {
345 @Override
346 protected void chore() {
347 Random random = new Random();
348 try {
349 HTableDescriptor htd = admin.getTableDescriptor(tableName);
350 String val = String.valueOf(random.nextInt());
351 htd.getFamily(family).setValue(val, val);
352 desc.getFamily(family).setValue(val, val);
353
354 admin.modifyTable(tableName, htd);
355 } catch (Exception ex) {
356 LOG.warn("Caught exception", ex);
357 fail(ex.getMessage());
358 }
359 }
360 };
361
362
363 ScheduledChore splitThread = new ScheduledChore("Split thread", stopper, 5000) {
364 @Override
365 public void chore() {
366 try {
367 HRegion region = TEST_UTIL.getSplittableRegion(tableName, -1);
368 if (region != null) {
369 byte[] regionName = region.getRegionInfo().getRegionName();
370 admin.flushRegion(regionName);
371 admin.compactRegion(regionName);
372 admin.splitRegion(regionName);
373 } else {
374 LOG.warn("Could not find suitable region for the table. Possibly the " +
375 "region got closed and the attempts got over before " +
376 "the region could have got reassigned.");
377 }
378 } catch (NotServingRegionException nsre) {
379
380 LOG.warn("Caught exception", nsre);
381 } catch (Exception ex) {
382 LOG.warn("Caught exception", ex);
383 fail(ex.getMessage());
384 }
385 }
386 };
387
388 choreService.scheduleChore(alterThread);
389 choreService.scheduleChore(splitThread);
390 TEST_UTIL.waitTableEnabled(tableName);
391 while (true) {
392 List<HRegionInfo> regions = admin.getTableRegions(tableName);
393 LOG.info(String.format("Table #regions: %d regions: %s:", regions.size(), regions));
394 assertEquals(admin.getTableDescriptor(tableName), desc);
395 for (HRegion region : TEST_UTIL.getMiniHBaseCluster().getRegions(tableName)) {
396 assertEquals(desc, region.getTableDesc());
397 }
398 if (regions.size() >= 5) {
399 break;
400 }
401 Threads.sleep(1000);
402 }
403 stopper.stop("test finished");
404
405 int newFamilyValues = admin.getTableDescriptor(tableName).getFamily(family).getValues().size();
406 LOG.info(String.format("Altered the table %d times", newFamilyValues - familyValues));
407 assertTrue(newFamilyValues > familyValues);
408
409
410 ret = loadTool.run(new String[] { "-tn", tableName.getNameAsString(), "-read", "100:10",
411 "-num_keys", String.valueOf(numKeys), "-skip_init" });
412 if (0 != ret) {
413 String errorMsg = "Verify failed with error code " + ret;
414 LOG.error(errorMsg);
415 fail(errorMsg);
416 }
417
418 admin.close();
419 choreService.shutdown();
420 }
421
422 }