1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.wal;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertNotEquals;
24 import static org.junit.Assert.assertNotNull;
25 import static org.junit.Assert.assertTrue;
26 import static org.junit.Assert.fail;
27
28 import java.io.IOException;
29 import java.lang.reflect.Field;
30 import java.util.ArrayList;
31 import java.util.Comparator;
32 import java.util.HashMap;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Set;
36 import java.util.UUID;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.atomic.AtomicLong;
39 import java.util.concurrent.CountDownLatch;
40 import java.util.concurrent.ExecutorService;
41 import java.util.concurrent.Executors;
42
43 import org.apache.commons.lang.mutable.MutableBoolean;
44 import org.apache.commons.logging.Log;
45 import org.apache.commons.logging.LogFactory;
46 import org.apache.hadoop.conf.Configuration;
47 import org.apache.hadoop.fs.FileStatus;
48 import org.apache.hadoop.fs.FileSystem;
49 import org.apache.hadoop.fs.Path;
50 import org.apache.hadoop.hbase.CellScanner;
51 import org.apache.hadoop.hbase.Coprocessor;
52 import org.apache.hadoop.hbase.HBaseConfiguration;
53 import org.apache.hadoop.hbase.HBaseTestingUtility;
54 import org.apache.hadoop.hbase.HColumnDescriptor;
55 import org.apache.hadoop.hbase.HConstants;
56 import org.apache.hadoop.hbase.HRegionInfo;
57 import org.apache.hadoop.hbase.HTableDescriptor;
58 import org.apache.hadoop.hbase.KeyValue;
59 import org.apache.hadoop.hbase.testclassification.MediumTests;
60 import org.apache.hadoop.hbase.TableName;
61 import org.apache.hadoop.hbase.client.Get;
62 import org.apache.hadoop.hbase.client.Put;
63 import org.apache.hadoop.hbase.client.Result;
64 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
65 import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
66 import org.apache.hadoop.hbase.regionserver.HRegion;
67 import org.apache.hadoop.hbase.regionserver.Region;
68 import org.apache.hadoop.hbase.testclassification.MediumTests;
69 import org.apache.hadoop.hbase.util.Bytes;
70 import org.apache.hadoop.hbase.util.EnvironmentEdge;
71 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
72 import org.apache.hadoop.hbase.util.FSUtils;
73 import org.apache.hadoop.hbase.util.Threads;
74 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
75 import org.apache.hadoop.hbase.wal.WAL;
76 import org.apache.hadoop.hbase.wal.WALKey;
77 import org.junit.After;
78 import org.junit.AfterClass;
79 import org.junit.Before;
80 import org.junit.BeforeClass;
81 import org.junit.Rule;
82 import org.junit.Test;
83 import org.junit.experimental.categories.Category;
84 import org.junit.rules.TestName;
85
86
87
88
89 @Category(MediumTests.class)
90 public class TestFSHLog {
91 protected static final Log LOG = LogFactory.getLog(TestFSHLog.class);
92
93 protected static Configuration conf;
94 protected static FileSystem fs;
95 protected static Path dir;
96 protected static Path rootDir;
97 protected static Path walRootDir;
98 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
99
100 @Rule
101 public final TestName currentTest = new TestName();
102
103 @Before
104 public void setUp() throws Exception {
105 FileStatus[] entries = fs.listStatus(new Path("/"));
106 for (FileStatus dir : entries) {
107 fs.delete(dir.getPath(), true);
108 }
109 rootDir = TEST_UTIL.createRootDir();
110 walRootDir = TEST_UTIL.createWALRootDir();
111 dir = new Path(walRootDir, currentTest.getMethodName());
112 assertNotEquals(rootDir, walRootDir);
113 }
114
115 @After
116 public void tearDown() throws Exception {
117 }
118
119 @BeforeClass
120 public static void setUpBeforeClass() throws Exception {
121
122 TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
123
124 TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
125 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
126 TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
127
128
129 TEST_UTIL.getConfiguration()
130 .setInt("hbase.ipc.client.connect.max.retries", 1);
131 TEST_UTIL.getConfiguration().setInt(
132 "dfs.client.block.recovery.retries", 1);
133 TEST_UTIL.getConfiguration().setInt(
134 "hbase.ipc.client.connection.maxidletime", 500);
135 TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
136 SampleRegionWALObserver.class.getName());
137 TEST_UTIL.startMiniDFSCluster(3);
138
139 conf = TEST_UTIL.getConfiguration();
140 fs = TEST_UTIL.getDFSCluster().getFileSystem();
141 }
142
143 @AfterClass
144 public static void tearDownAfterClass() throws Exception {
145 fs.delete(rootDir, true);
146 fs.delete(walRootDir, true);
147 TEST_UTIL.shutdownMiniCluster();
148 }
149
150
151
152
153 @Test
154 public void testWALCoprocessorLoaded() throws Exception {
155
156 FSHLog log = null;
157 try {
158 log = new FSHLog(fs, walRootDir, dir.toString(),
159 HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
160 WALCoprocessorHost host = log.getCoprocessorHost();
161 Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName());
162 assertNotNull(c);
163 } finally {
164 if (log != null) {
165 log.close();
166 }
167 }
168 }
169
170 protected void addEdits(WAL log, HRegionInfo hri, HTableDescriptor htd, int times,
171 AtomicLong sequenceId) throws IOException {
172 final byte[] row = Bytes.toBytes("row");
173 for (int i = 0; i < times; i++) {
174 long timestamp = System.currentTimeMillis();
175 WALEdit cols = new WALEdit();
176 cols.add(new KeyValue(row, row, row, timestamp, row));
177 log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp),
178 cols, sequenceId, true, null);
179 }
180 log.sync();
181 }
182
183
184
185
186
187
188 protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) {
189 wal.startCacheFlush(regionEncodedName, flushedFamilyNames);
190 wal.completeCacheFlush(regionEncodedName);
191 }
192
193
194
195
196
197
198 @Test
199 public void testWALComparator() throws Exception {
200 FSHLog wal1 = null;
201 FSHLog walMeta = null;
202 try {
203 wal1 = new FSHLog(fs, walRootDir, dir.toString(),
204 HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
205 LOG.debug("Log obtained is: " + wal1);
206 Comparator<Path> comp = wal1.LOG_NAME_COMPARATOR;
207 Path p1 = wal1.computeFilename(11);
208 Path p2 = wal1.computeFilename(12);
209
210 assertTrue(comp.compare(p1, p1) == 0);
211
212 assertTrue(comp.compare(p1, p2) < 0);
213 walMeta = new FSHLog(fs, walRootDir, dir.toString(),
214 HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null,
215 DefaultWALProvider.META_WAL_PROVIDER_ID);
216 Comparator<Path> compMeta = walMeta.LOG_NAME_COMPARATOR;
217
218 Path p1WithMeta = walMeta.computeFilename(11);
219 Path p2WithMeta = walMeta.computeFilename(12);
220 assertTrue(compMeta.compare(p1WithMeta, p1WithMeta) == 0);
221 assertTrue(compMeta.compare(p1WithMeta, p2WithMeta) < 0);
222
223 boolean ex = false;
224 try {
225 comp.compare(p1WithMeta, p2);
226 } catch (IllegalArgumentException e) {
227 ex = true;
228 }
229 assertTrue("Comparator doesn't complain while checking meta log files", ex);
230 boolean exMeta = false;
231 try {
232 compMeta.compare(p1WithMeta, p2);
233 } catch (IllegalArgumentException e) {
234 exMeta = true;
235 }
236 assertTrue("Meta comparator doesn't complain while checking log files", exMeta);
237 } finally {
238 if (wal1 != null) {
239 wal1.close();
240 }
241 if (walMeta != null) {
242 walMeta.close();
243 }
244 }
245 }
246
247
248
249
250
251
252
253
254
255
256 @Test
257 public void testFindMemStoresEligibleForFlush() throws Exception {
258 LOG.debug("testFindMemStoresEligibleForFlush");
259 Configuration conf1 = HBaseConfiguration.create(conf);
260 conf1.setInt("hbase.regionserver.maxlogs", 1);
261 FSHLog wal = new FSHLog(fs, walRootDir, dir.toString(),
262 HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null);
263 HTableDescriptor t1 =
264 new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row"));
265 HTableDescriptor t2 =
266 new HTableDescriptor(TableName.valueOf("t2")).addFamily(new HColumnDescriptor("row"));
267 HRegionInfo hri1 =
268 new HRegionInfo(t1.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
269 HRegionInfo hri2 =
270 new HRegionInfo(t2.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
271
272 final AtomicLong sequenceId1 = new AtomicLong(1);
273 final AtomicLong sequenceId2 = new AtomicLong(1);
274
275 try {
276 addEdits(wal, hri1, t1, 2, sequenceId1);
277 wal.rollWriter();
278
279 addEdits(wal, hri1, t1, 2, sequenceId1);
280 wal.rollWriter();
281
282 assertTrue(wal.getNumRolledLogFiles() == 2);
283
284
285
286 byte[][] regionsToFlush = wal.findRegionsToForceFlush();
287 assertEquals(1, regionsToFlush.length);
288 assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
289
290 addEdits(wal, hri2, t2, 2, sequenceId2);
291
292 regionsToFlush = wal.findRegionsToForceFlush();
293 assertEquals(regionsToFlush.length, 1);
294 assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
295
296
297 flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
298 wal.rollWriter();
299
300 assertEquals(1, wal.getNumRolledLogFiles());
301
302 flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getFamiliesKeys());
303 wal.rollWriter(true);
304
305 assertEquals(0, wal.getNumRolledLogFiles());
306
307 addEdits(wal, hri1, t1, 2, sequenceId1);
308 addEdits(wal, hri2, t2, 2, sequenceId2);
309 wal.rollWriter();
310
311 assertEquals(1, wal.getNumRolledLogFiles());
312 addEdits(wal, hri1, t1, 2, sequenceId1);
313 wal.rollWriter();
314
315
316 regionsToFlush = wal.findRegionsToForceFlush();
317 assertEquals(2, regionsToFlush.length);
318
319 flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
320 flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getFamiliesKeys());
321 wal.rollWriter(true);
322 assertEquals(0, wal.getNumRolledLogFiles());
323
324 addEdits(wal, hri1, t1, 2, sequenceId1);
325
326 wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
327 wal.rollWriter();
328 wal.completeCacheFlush(hri1.getEncodedNameAsBytes());
329 assertEquals(1, wal.getNumRolledLogFiles());
330 } finally {
331 if (wal != null) {
332 wal.close();
333 }
334 }
335 }
336
337
338
339
340
341
342
343
344 @Test
345 public void testAllRegionsFlushed() {
346 LOG.debug("testAllRegionsFlushed");
347 Map<byte[], Long> oldestFlushingSeqNo = new HashMap<byte[], Long>();
348 Map<byte[], Long> oldestUnFlushedSeqNo = new HashMap<byte[], Long>();
349 Map<byte[], Long> seqNo = new HashMap<byte[], Long>();
350
351 TableName t1 = TableName.valueOf("t1");
352
353 HRegionInfo hri1 = new HRegionInfo(t1, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
354
355 final AtomicLong sequenceId1 = new AtomicLong(1);
356
357 assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
358
359 seqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.incrementAndGet());
360 oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get());
361
362 assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
363
364 oldestUnFlushedSeqNo.clear();
365 oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get());
366 assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
367
368 oldestFlushingSeqNo.clear();
369 oldestUnFlushedSeqNo.clear();
370 assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
371
372 oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), 1000l);
373 seqNo.put(hri1.getEncodedNameAsBytes(), 1500l);
374 assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
375
376
377
378 oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), 1200l);
379 oldestUnFlushedSeqNo.clear();
380 seqNo.put(hri1.getEncodedNameAsBytes(), 1199l);
381 assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
382 }
383
384 @Test(expected=IOException.class)
385 public void testFailedToCreateWALIfParentRenamed() throws IOException {
386 final String name = "testFailedToCreateWALIfParentRenamed";
387 FSHLog log = new FSHLog(fs, walRootDir, name, HConstants.HREGION_OLDLOGDIR_NAME,
388 conf, null, true, null, null);
389 long filenum = System.currentTimeMillis();
390 Path path = log.computeFilename(filenum);
391 log.createWriterInstance(path);
392 Path parent = path.getParent();
393 path = log.computeFilename(filenum + 1);
394 Path newPath = new Path(parent.getParent(), parent.getName() + "-splitting");
395 fs.rename(parent, newPath);
396 log.createWriterInstance(path);
397 fail("It should fail to create the new WAL");
398 }
399
400
401
402
403
404
405
406
407
408 @Test
409 public void testFlushSequenceIdIsGreaterThanAllEditsInHFile() throws IOException {
410 String testName = "testFlushSequenceIdIsGreaterThanAllEditsInHFile";
411 final TableName tableName = TableName.valueOf(testName);
412 final HRegionInfo hri = new HRegionInfo(tableName);
413 final byte[] rowName = tableName.getName();
414 final HTableDescriptor htd = new HTableDescriptor(tableName);
415 htd.addFamily(new HColumnDescriptor("f"));
416 HRegion r = HRegion.createHRegion(hri, rootDir,
417 TEST_UTIL.getConfiguration(), htd);
418 HRegion.closeHRegion(r);
419 final int countPerFamily = 10;
420 final MutableBoolean goslow = new MutableBoolean(false);
421
422 FSHLog wal = new FSHLog(FileSystem.get(conf), walRootDir,
423 testName, conf) {
424 @Override
425 void atHeadOfRingBufferEventHandlerAppend() {
426 if (goslow.isTrue()) {
427 Threads.sleep(100);
428 LOG.debug("Sleeping before appending 100ms");
429 }
430 super.atHeadOfRingBufferEventHandlerAppend();
431 }
432 };
433 HRegion region = HRegion.openHRegion(TEST_UTIL.getConfiguration(),
434 TEST_UTIL.getTestFileSystem(), rootDir, hri, htd, wal);
435 EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
436 try {
437 List<Put> puts = null;
438 for (HColumnDescriptor hcd: htd.getFamilies()) {
439 puts =
440 TestWALReplay.addRegionEdits(rowName, hcd.getName(), countPerFamily, ee, region, "x");
441 }
442
443
444 final Get g = new Get(rowName);
445 Result result = region.get(g);
446 assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
447
448
449 WALEdit edits = new WALEdit();
450 for (Put p: puts) {
451 CellScanner cs = p.cellScanner();
452 while (cs.advance()) {
453 edits.add(cs.current());
454 }
455 }
456
457 List<UUID> clusterIds = new ArrayList<UUID>();
458 clusterIds.add(UUID.randomUUID());
459
460 goslow.setValue(true);
461 for (int i = 0; i < countPerFamily; i++) {
462 final HRegionInfo info = region.getRegionInfo();
463 final WALKey logkey = new WALKey(info.getEncodedNameAsBytes(), tableName,
464 System.currentTimeMillis(), clusterIds, -1, -1);
465 wal.append(htd, info, logkey, edits, region.getSequenceId(), true, null);
466 }
467 region.flush(true);
468
469 long currentSequenceId = region.getSequenceId().get();
470
471 goslow.setValue(false);
472 synchronized (goslow) {
473 goslow.notifyAll();
474 }
475 assertTrue(currentSequenceId >= region.getSequenceId().get());
476 } finally {
477 region.close(true);
478 wal.close();
479 }
480 }
481
482 @Test
483 public void testSyncRunnerIndexOverflow() throws IOException, NoSuchFieldException,
484 SecurityException, IllegalArgumentException, IllegalAccessException {
485 final String name = "testSyncRunnerIndexOverflow";
486 FSHLog log =
487 new FSHLog(fs, walRootDir, name, HConstants.HREGION_OLDLOGDIR_NAME, conf,
488 null, true, null, null);
489 try {
490 Field ringBufferEventHandlerField = FSHLog.class.getDeclaredField("ringBufferEventHandler");
491 ringBufferEventHandlerField.setAccessible(true);
492 FSHLog.RingBufferEventHandler ringBufferEventHandler =
493 (FSHLog.RingBufferEventHandler) ringBufferEventHandlerField.get(log);
494 Field syncRunnerIndexField =
495 FSHLog.RingBufferEventHandler.class.getDeclaredField("syncRunnerIndex");
496 syncRunnerIndexField.setAccessible(true);
497 syncRunnerIndexField.set(ringBufferEventHandler, Integer.MAX_VALUE - 1);
498 HTableDescriptor htd =
499 new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row"));
500 HRegionInfo hri =
501 new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
502 final AtomicLong sequenceId1 = new AtomicLong(1);
503 for (int i = 0; i < 10; i++) {
504 addEdits(log, hri, htd, 1, sequenceId1);
505 }
506 } finally {
507 log.close();
508 }
509 }
510
511
512
513
514 @Test (timeout = 30000)
515 public void testUnflushedSeqIdTracking() throws IOException, InterruptedException {
516 final String name = "testSyncRunnerIndexOverflow";
517 final byte[] b = Bytes.toBytes("b");
518
519 final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
520 final CountDownLatch holdAppend = new CountDownLatch(1);
521 final CountDownLatch flushFinished = new CountDownLatch(1);
522 final CountDownLatch putFinished = new CountDownLatch(1);
523
524 try (FSHLog log =
525 new FSHLog(fs, walRootDir, name, HConstants.HREGION_OLDLOGDIR_NAME, conf,
526 null, true, null, null)) {
527
528 log.registerWALActionsListener(new WALActionsListener.Base() {
529 @Override
530 public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {
531 if (startHoldingForAppend.get()) {
532 try {
533 holdAppend.await();
534 } catch (InterruptedException e) {
535 LOG.error(e);
536 }
537 }
538 }
539 });
540
541
542 HTableDescriptor htd =
543 new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor(b));
544 HRegionInfo hri =
545 new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
546
547 final HRegion region = TEST_UTIL.createLocalHRegion(hri, htd, log);
548 ExecutorService exec = Executors.newFixedThreadPool(2);
549
550
551 region.put(new Put(b).addColumn(b, b,b));
552
553 startHoldingForAppend.set(true);
554 exec.submit(new Runnable() {
555 @Override
556 public void run() {
557 try {
558 region.put(new Put(b).addColumn(b, b,b));
559 putFinished.countDown();
560 } catch (IOException e) {
561 LOG.error(e);
562 }
563 }
564 });
565
566
567 Threads.sleep(3000);
568
569 exec.submit(new Runnable() {
570 @Override
571 public void run() {
572 try {
573 Region.FlushResult flushResult = region.flush(true);
574 LOG.info("Flush result:" + flushResult.getResult());
575 LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded());
576 flushFinished.countDown();
577 } catch (IOException e) {
578 LOG.error(e);
579 }
580 }
581 });
582
583
584
585 Threads.sleep(3000);
586
587
588 holdAppend.countDown();
589 putFinished.await();
590 flushFinished.await();
591
592
593 assertEquals("Region did not flush?", 1, region.getStoreFileList(new byte[][]{b}).size());
594
595
596 long seqId = log.getEarliestMemstoreSeqNum(hri.getEncodedNameAsBytes());
597 assertEquals("Found seqId for the region which is already flushed",
598 HConstants.NO_SEQNUM, seqId);
599
600 region.close();
601 }
602 }
603 }