1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.wal;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertTrue;
24 import static org.junit.Assert.fail;
25
26 import java.io.FileNotFoundException;
27 import java.io.IOException;
28 import java.lang.reflect.Method;
29 import java.security.PrivilegedExceptionAction;
30 import java.util.ArrayList;
31 import java.util.Collections;
32 import java.util.HashMap;
33 import java.util.HashSet;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.NavigableSet;
37 import java.util.Set;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 import java.util.concurrent.atomic.AtomicInteger;
40 import java.util.concurrent.atomic.AtomicLong;
41
42 import org.apache.commons.logging.Log;
43 import org.apache.commons.logging.LogFactory;
44 import org.apache.hadoop.conf.Configuration;
45 import org.apache.hadoop.fs.FSDataInputStream;
46 import org.apache.hadoop.fs.FSDataOutputStream;
47 import org.apache.hadoop.fs.FileStatus;
48 import org.apache.hadoop.fs.FileSystem;
49 import org.apache.hadoop.fs.FileUtil;
50 import org.apache.hadoop.fs.Path;
51 import org.apache.hadoop.fs.PathFilter;
52 import org.apache.hadoop.hbase.Cell;
53 import org.apache.hadoop.hbase.HBaseConfiguration;
54 import org.apache.hadoop.hbase.HBaseTestingUtility;
55 import org.apache.hadoop.hbase.HConstants;
56 import org.apache.hadoop.hbase.HRegionInfo;
57 import org.apache.hadoop.hbase.KeyValue;
58 import org.apache.hadoop.hbase.testclassification.LargeTests;
59 import org.apache.hadoop.hbase.TableName;
60 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
61 import org.apache.hadoop.hbase.regionserver.HRegion;
62 import org.apache.hadoop.hbase.wal.WAL.Entry;
63 import org.apache.hadoop.hbase.wal.WAL.Reader;
64 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
65 import org.apache.hadoop.hbase.wal.WALSplitter.CorruptedLogFileException;
66 import org.apache.hadoop.hbase.security.User;
67 import org.apache.hadoop.hbase.util.Bytes;
68 import org.apache.hadoop.hbase.util.CancelableProgressable;
69 import org.apache.hadoop.hbase.util.FSUtils;
70 import org.apache.hadoop.hbase.util.Threads;
71 import org.apache.hadoop.hdfs.DFSTestUtil;
72 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
73 import org.apache.hadoop.ipc.RemoteException;
74 import org.junit.After;
75 import org.junit.AfterClass;
76 import org.junit.Before;
77 import org.junit.BeforeClass;
78 import org.junit.Rule;
79 import org.junit.rules.TestName;
80 import org.junit.Test;
81 import org.junit.experimental.categories.Category;
82 import org.mockito.Mockito;
83 import org.mockito.invocation.InvocationOnMock;
84 import org.mockito.stubbing.Answer;
85
86 import com.google.common.base.Joiner;
87 import com.google.common.collect.ImmutableList;
88
89
90 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
91 import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
92 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
93 import org.apache.hadoop.hbase.regionserver.wal.FaultySequenceFileLogReader;
94
95
96
97
98 @Category(LargeTests.class)
99 public class TestWALSplit {
100 {
101
102
103
104
105
106 }
107 private final static Log LOG = LogFactory.getLog(TestWALSplit.class);
108
109 private static Configuration conf;
110 private FileSystem fs;
111
112 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
113
114 private Path HBASEDIR;
115 private Path HBASELOGDIR;
116 private Path WALDIR;
117 private Path OLDLOGDIR;
118 private Path CORRUPTDIR;
119 private Path TABLEDIR;
120
121 private static final int NUM_WRITERS = 10;
122 private static final int ENTRIES = 10;
123
124 private static final TableName TABLE_NAME =
125 TableName.valueOf("t1");
126 private static final byte[] FAMILY = "f1".getBytes();
127 private static final byte[] QUALIFIER = "q1".getBytes();
128 private static final byte[] VALUE = "v1".getBytes();
129 private static final String WAL_FILE_PREFIX = "wal.dat.";
130 private static List<String> REGIONS = new ArrayList<String>();
131 private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors";
132 private static String ROBBER;
133 private static String ZOMBIE;
134 private static String [] GROUP = new String [] {"supergroup"};
135 private RecoveryMode mode;
136
137 static enum Corruptions {
138 INSERT_GARBAGE_ON_FIRST_LINE,
139 INSERT_GARBAGE_IN_THE_MIDDLE,
140 APPEND_GARBAGE,
141 TRUNCATE,
142 TRUNCATE_TRAILER
143 }
144
145 @BeforeClass
146 public static void setUpBeforeClass() throws Exception {
147 conf = TEST_UTIL.getConfiguration();
148 conf.setClass("hbase.regionserver.hlog.writer.impl",
149 InstrumentedLogWriter.class, Writer.class);
150 conf.setBoolean("dfs.support.broken.append", true);
151 conf.setBoolean("dfs.support.append", true);
152
153 System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
154
155 Map<String, String []> u2g_map = new HashMap<String, String []>(2);
156 ROBBER = User.getCurrent().getName() + "-robber";
157 ZOMBIE = User.getCurrent().getName() + "-zombie";
158 u2g_map.put(ROBBER, GROUP);
159 u2g_map.put(ZOMBIE, GROUP);
160 DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
161 conf.setInt("dfs.heartbeat.interval", 1);
162 TEST_UTIL.startMiniDFSCluster(2);
163 }
164
165 @AfterClass
166 public static void tearDownAfterClass() throws Exception {
167 TEST_UTIL.shutdownMiniDFSCluster();
168 }
169
170 @Rule
171 public TestName name = new TestName();
172 private WALFactory wals = null;
173
174 @Before
175 public void setUp() throws Exception {
176 LOG.info("Cleaning up cluster for new test.");
177 fs = TEST_UTIL.getDFSCluster().getFileSystem();
178 HBASEDIR = TEST_UTIL.createRootDir();
179 HBASELOGDIR = TEST_UTIL.createWALRootDir();
180 OLDLOGDIR = new Path(HBASELOGDIR, HConstants.HREGION_OLDLOGDIR_NAME);
181 CORRUPTDIR = new Path(HBASELOGDIR, HConstants.CORRUPT_DIR_NAME);
182 TABLEDIR = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
183 REGIONS.clear();
184 Collections.addAll(REGIONS, "bbb", "ccc");
185 InstrumentedLogWriter.activateFailure = false;
186 this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
187 RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
188 wals = new WALFactory(conf, null, name.getMethodName());
189 WALDIR = new Path(HBASELOGDIR, DefaultWALProvider.getWALDirectoryName(name.getMethodName()));
190
191 }
192
193 @After
194 public void tearDown() throws Exception {
195 try {
196 wals.close();
197 } catch(IOException exception) {
198
199 LOG.info("Ignoring an error while closing down our WALFactory. Fine for some tests, but if" +
200 " you see a failure look here.");
201 LOG.debug("exception details", exception);
202 } finally {
203 wals = null;
204 fs.delete(HBASEDIR, true);
205 fs.delete(HBASELOGDIR, true);
206 }
207 }
208
209
210
211
212
213
214
215 @Test (timeout=300000)
216 public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException {
217 final AtomicLong counter = new AtomicLong(0);
218 AtomicBoolean stop = new AtomicBoolean(false);
219
220 final String region = REGIONS.get(0);
221 final int numWriters = 3;
222 Thread zombie = new ZombieLastLogWriterRegionServer(counter, stop, region, numWriters);
223 try {
224 long startCount = counter.get();
225 zombie.start();
226
227 while (startCount == counter.get()) Threads.sleep(1);
228
229 Threads.sleep(1000);
230 final Configuration conf2 = HBaseConfiguration.create(this.conf);
231 final User robber = User.createUserForTesting(conf2, ROBBER, GROUP);
232 int count = robber.runAs(new PrivilegedExceptionAction<Integer>() {
233 @Override
234 public Integer run() throws Exception {
235 StringBuilder ls = new StringBuilder("Contents of WALDIR (").append(WALDIR)
236 .append("):\n");
237 for (FileStatus status : fs.listStatus(WALDIR)) {
238 ls.append("\t").append(status.toString()).append("\n");
239 }
240 LOG.debug(ls);
241 LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files.");
242 WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf2, wals);
243 LOG.info("Finished splitting out from under zombie.");
244 Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
245 assertEquals("wrong number of split files for region", numWriters, logfiles.length);
246 int count = 0;
247 for (Path logfile: logfiles) {
248 count += countWAL(logfile);
249 }
250 return count;
251 }
252 });
253 LOG.info("zombie=" + counter.get() + ", robber=" + count);
254 assertTrue("The log file could have at most 1 extra log entry, but can't have less. " +
255 "Zombie could write " + counter.get() + " and logfile had only " + count,
256 counter.get() == count || counter.get() + 1 == count);
257 } finally {
258 stop.set(true);
259 zombie.interrupt();
260 Threads.threadDumpingIsAlive(zombie);
261 }
262 }
263
264
265
266
267
268
269
270 class ZombieLastLogWriterRegionServer extends Thread {
271 final AtomicLong editsCount;
272 final AtomicBoolean stop;
273 final int numOfWriters;
274
275
276
277 final String region;
278 final User user;
279
280 public ZombieLastLogWriterRegionServer(AtomicLong counter, AtomicBoolean stop,
281 final String region, final int writers)
282 throws IOException, InterruptedException {
283 super("ZombieLastLogWriterRegionServer");
284 setDaemon(true);
285 this.stop = stop;
286 this.editsCount = counter;
287 this.region = region;
288 this.user = User.createUserForTesting(conf, ZOMBIE, GROUP);
289 numOfWriters = writers;
290 }
291
292 @Override
293 public void run() {
294 try {
295 doWriting();
296 } catch (IOException e) {
297 LOG.warn(getName() + " Writer exiting " + e);
298 } catch (InterruptedException e) {
299 LOG.warn(getName() + " Writer exiting " + e);
300 }
301 }
302
303 private void doWriting() throws IOException, InterruptedException {
304 this.user.runAs(new PrivilegedExceptionAction<Object>() {
305 @Override
306 public Object run() throws Exception {
307
308
309 int walToKeepOpen = numOfWriters - 1;
310
311
312 Writer writer = null;
313 try {
314 writer = generateWALs(numOfWriters, ENTRIES, walToKeepOpen);
315 } catch (IOException e1) {
316 throw new RuntimeException("Failed", e1);
317 }
318
319 editsCount.addAndGet(numOfWriters * ENTRIES);
320 loop(writer);
321
322
323 try {
324 writer.close();
325 fail("Writing closing after parsing should give an error.");
326 } catch (IOException exception) {
327 LOG.debug("ignoring error when closing final writer.", exception);
328 }
329 return null;
330 }
331 });
332 }
333
334 private void loop(final Writer writer) {
335 byte [] regionBytes = Bytes.toBytes(this.region);
336 while (!stop.get()) {
337 try {
338 long seq = appendEntry(writer, TABLE_NAME, regionBytes,
339 ("r" + editsCount.get()).getBytes(), regionBytes, QUALIFIER, VALUE, 0);
340 long count = editsCount.incrementAndGet();
341 LOG.info(getName() + " sync count=" + count + ", seq=" + seq);
342 try {
343 Thread.sleep(1);
344 } catch (InterruptedException e) {
345
346 }
347 } catch (IOException ex) {
348 LOG.error(getName() + " ex " + ex.toString());
349 if (ex instanceof RemoteException) {
350 LOG.error("Juliet: got RemoteException " + ex.getMessage() +
351 " while writing " + (editsCount.get() + 1));
352 } else {
353 LOG.error(getName() + " failed to write....at " + editsCount.get());
354 fail("Failed to write " + editsCount.get());
355 }
356 break;
357 } catch (Throwable t) {
358 LOG.error(getName() + " HOW? " + t);
359 LOG.debug("exception details", t);
360 break;
361 }
362 }
363 LOG.info(getName() + " Writer exiting");
364 }
365 }
366
367
368
369
370
371 @Test (timeout=300000)
372 public void testRecoveredEditsPathForMeta() throws IOException {
373 byte [] encoded = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
374 Path tdir = FSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME);
375 Path regiondir = new Path(tdir,
376 HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
377 fs.mkdirs(regiondir);
378 long now = System.currentTimeMillis();
379 Entry entry =
380 new Entry(new WALKey(encoded,
381 TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
382 new WALEdit());
383 Path p = WALSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR, true);
384 String parentOfParent = p.getParent().getParent().getName();
385 assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
386 }
387
388
389
390
391
392 @Test (timeout=300000)
393 public void testOldRecoveredEditsFileSidelined() throws IOException {
394 byte [] encoded = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
395 Path tdir = FSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME);
396 Path regiondir = new Path(tdir,
397 HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
398 fs.mkdirs(regiondir);
399 long now = System.currentTimeMillis();
400 Entry entry =
401 new Entry(new WALKey(encoded,
402 TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
403 new WALEdit());
404 Path parent = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
405 assertEquals(parent.getName(), HConstants.RECOVERED_EDITS_DIR);
406 fs.createNewFile(parent);
407
408 Path p = WALSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR, true);
409 String parentOfParent = p.getParent().getParent().getName();
410 assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
411 WALFactory.createRecoveredEditsWriter(fs, p, conf).close();
412 }
413
414 private void useDifferentDFSClient() throws IOException {
415
416
417 fs.initialize(fs.getUri(), conf);
418 }
419
420 @Test (timeout=300000)
421 public void testSplitPreservesEdits() throws IOException{
422 final String REGION = "region__1";
423 REGIONS.clear();
424 REGIONS.add(REGION);
425
426 generateWALs(1, 10, -1);
427 useDifferentDFSClient();
428 WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
429 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
430 Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
431 assertEquals(1, splitLog.length);
432
433 assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
434 }
435
436
437
438
439
440 private int splitAndCount(final int expectedFiles, final int expectedEntries)
441 throws IOException {
442 useDifferentDFSClient();
443 WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
444 int result = 0;
445 for (String region : REGIONS) {
446 Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
447 assertEquals(expectedFiles, logfiles.length);
448 int count = 0;
449 for (Path logfile: logfiles) {
450 count += countWAL(logfile);
451 }
452 if (-1 != expectedEntries) {
453 assertEquals(expectedEntries, count);
454 }
455 result += count;
456 }
457 return result;
458 }
459
460 @Test (timeout=300000)
461 public void testEmptyLogFiles() throws IOException {
462 testEmptyLogFiles(true);
463 }
464
465 @Test (timeout=300000)
466 public void testEmptyOpenLogFiles() throws IOException {
467 testEmptyLogFiles(false);
468 }
469
470 private void testEmptyLogFiles(final boolean close) throws IOException {
471 injectEmptyFile(".empty", close);
472 generateWALs(Integer.MAX_VALUE);
473 injectEmptyFile("empty", close);
474 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
475 }
476
477 @Test (timeout=300000)
478 public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException {
479
480 generateWALs(5);
481 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
482 }
483
484 @Test (timeout=300000)
485 public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException {
486 conf.setBoolean(HBASE_SKIP_ERRORS, true);
487 generateWALs(Integer.MAX_VALUE);
488 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
489 Corruptions.APPEND_GARBAGE, true);
490 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
491 }
492
493 @Test (timeout=300000)
494 public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException {
495 conf.setBoolean(HBASE_SKIP_ERRORS, true);
496 generateWALs(Integer.MAX_VALUE);
497 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
498 Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
499 splitAndCount(NUM_WRITERS - 1, (NUM_WRITERS - 1) * ENTRIES);
500 }
501
502 @Test (timeout=300000)
503 public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException {
504 conf.setBoolean(HBASE_SKIP_ERRORS, true);
505 generateWALs(Integer.MAX_VALUE);
506 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
507 Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false);
508
509
510
511 int goodEntries = (NUM_WRITERS - 1) * ENTRIES;
512 int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1;
513 int allRegionsCount = splitAndCount(NUM_WRITERS, -1);
514 assertTrue("The file up to the corrupted area hasn't been parsed",
515 REGIONS.size() * (goodEntries + firstHalfEntries) <= allRegionsCount);
516 }
517
518 @Test (timeout=300000)
519 public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
520 conf.setBoolean(HBASE_SKIP_ERRORS, true);
521 for (FaultySequenceFileLogReader.FailureType failureType :
522 FaultySequenceFileLogReader.FailureType.values()) {
523 final Set<String> walDirContents = splitCorruptWALs(failureType);
524 final Set<String> archivedLogs = new HashSet<String>();
525 final StringBuilder archived = new StringBuilder("Archived logs in CORRUPTDIR:");
526 for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
527 archived.append("\n\t").append(log.toString());
528 archivedLogs.add(log.getPath().getName());
529 }
530 LOG.debug(archived.toString());
531 assertEquals(failureType.name() + ": expected to find all of our wals corrupt.",
532 walDirContents, archivedLogs);
533 }
534 }
535
536
537
538
539
540 private Set<String> splitCorruptWALs(final FaultySequenceFileLogReader.FailureType failureType)
541 throws IOException {
542 Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
543 Reader.class);
544 InstrumentedLogWriter.activateFailure = false;
545
546 try {
547 conf.setClass("hbase.regionserver.hlog.reader.impl",
548 FaultySequenceFileLogReader.class, Reader.class);
549 conf.set("faultysequencefilelogreader.failuretype", failureType.name());
550
551 try {
552 wals.shutdown();
553 } catch (IOException exception) {
554
555 LOG.debug("Ignoring problem closing WALFactory.", exception);
556 }
557 wals.close();
558 try {
559 for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
560 fs.delete(log.getPath(), true);
561 }
562 } catch (FileNotFoundException exception) {
563 LOG.debug("no previous CORRUPTDIR to clean.");
564 }
565
566 wals = new WALFactory(conf, null, name.getMethodName());
567 generateWALs(-1);
568
569 final Set<String> walDirContents = new HashSet<String>();
570 for (FileStatus status : fs.listStatus(WALDIR)) {
571 walDirContents.add(status.getPath().getName());
572 }
573 useDifferentDFSClient();
574 WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
575 return walDirContents;
576 } finally {
577 conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
578 Reader.class);
579 }
580 }
581
582 @Test (timeout=300000, expected = IOException.class)
583 public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows()
584 throws IOException {
585 conf.setBoolean(HBASE_SKIP_ERRORS, false);
586 splitCorruptWALs(FaultySequenceFileLogReader.FailureType.BEGINNING);
587 }
588
589 @Test (timeout=300000)
590 public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs()
591 throws IOException {
592 conf.setBoolean(HBASE_SKIP_ERRORS, false);
593 try {
594 splitCorruptWALs(FaultySequenceFileLogReader.FailureType.BEGINNING);
595 } catch (IOException e) {
596 LOG.debug("split with 'skip errors' set to 'false' correctly threw");
597 }
598 assertEquals("if skip.errors is false all files should remain in place",
599 NUM_WRITERS + 1
600 }
601
602 private void ignoreCorruption(final Corruptions corruption, final int entryCount,
603 final int expectedCount) throws IOException {
604 conf.setBoolean(HBASE_SKIP_ERRORS, false);
605
606 final String REGION = "region__1";
607 REGIONS.clear();
608 REGIONS.add(REGION);
609
610 Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0");
611 generateWALs(1, entryCount, -1);
612 corruptWAL(c1, corruption, true);
613
614 useDifferentDFSClient();
615 WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
616
617 Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
618 assertEquals(1, splitLog.length);
619
620 int actualCount = 0;
621 Reader in = wals.createReader(fs, splitLog[0]);
622 @SuppressWarnings("unused")
623 Entry entry;
624 while ((entry = in.next()) != null) ++actualCount;
625 assertEquals(expectedCount, actualCount);
626 in.close();
627
628
629 FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR);
630 assertEquals(archivedLogs.length, 0);
631
632 }
633
634 @Test (timeout=300000)
635 public void testEOFisIgnored() throws IOException {
636 int entryCount = 10;
637 ignoreCorruption(Corruptions.TRUNCATE, entryCount, entryCount-1);
638 }
639
640 @Test (timeout=300000)
641 public void testCorruptWALTrailer() throws IOException {
642 int entryCount = 10;
643 ignoreCorruption(Corruptions.TRUNCATE_TRAILER, entryCount, entryCount);
644 }
645
646 @Test (timeout=300000)
647 public void testLogsGetArchivedAfterSplit() throws IOException {
648 conf.setBoolean(HBASE_SKIP_ERRORS, false);
649 generateWALs(-1);
650 useDifferentDFSClient();
651 WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
652 FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR);
653 assertEquals("wrong number of files in the archive log", NUM_WRITERS + 1
654 archivedLogs.length);
655 }
656
657 @Test (timeout=300000)
658 public void testSplit() throws IOException {
659 generateWALs(-1);
660 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
661 }
662
663 @Test (timeout=300000)
664 public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit()
665 throws IOException {
666 generateWALs(-1);
667 useDifferentDFSClient();
668 WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
669 FileStatus [] statuses = null;
670 try {
671 statuses = fs.listStatus(WALDIR);
672 if (statuses != null) {
673 fail("Files left in log dir: " +
674 Joiner.on(",").join(FileUtil.stat2Paths(statuses)));
675 }
676 } catch (FileNotFoundException e) {
677
678 }
679 }
680
681 @Test(timeout=300000, expected = IOException.class)
682 public void testSplitWillFailIfWritingToRegionFails() throws Exception {
683
684 Writer writer = generateWALs(4);
685 useDifferentDFSClient();
686
687 String region = "break";
688 Path regiondir = new Path(TABLEDIR, region);
689 fs.mkdirs(regiondir);
690
691 InstrumentedLogWriter.activateFailure = false;
692 appendEntry(writer, TABLE_NAME, Bytes.toBytes(region),
693 ("r" + 999).getBytes(), FAMILY, QUALIFIER, VALUE, 0);
694 writer.close();
695
696 try {
697 InstrumentedLogWriter.activateFailure = true;
698 WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
699 } catch (IOException e) {
700 assertTrue(e.getMessage().
701 contains("This exception is instrumented and should only be thrown for testing"));
702 throw e;
703 } finally {
704 InstrumentedLogWriter.activateFailure = false;
705 }
706 }
707
708 @Test (timeout=300000)
709 public void testSplitDeletedRegion() throws IOException {
710 REGIONS.clear();
711 String region = "region_that_splits";
712 REGIONS.add(region);
713
714 generateWALs(1);
715 useDifferentDFSClient();
716
717 Path regiondir = new Path(TABLEDIR, region);
718 fs.delete(regiondir, true);
719 WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
720 assertFalse(fs.exists(regiondir));
721 }
722
723 @Test (timeout=300000)
724 public void testIOEOnOutputThread() throws Exception {
725 conf.setBoolean(HBASE_SKIP_ERRORS, false);
726
727 generateWALs(-1);
728 useDifferentDFSClient();
729 FileStatus[] logfiles = fs.listStatus(WALDIR);
730 assertTrue("There should be some log file",
731 logfiles != null && logfiles.length > 0);
732
733
734
735 int largestLogFile = 0;
736 long largestSize = 0;
737 for (int i = 0; i < logfiles.length; i++) {
738 if (logfiles[i].getLen() > largestSize) {
739 largestLogFile = i;
740 largestSize = logfiles[i].getLen();
741 }
742 }
743 assertTrue("There should be some log greater than size 0.", 0 < largestSize);
744
745 WALSplitter logSplitter = new WALSplitter(wals,
746 conf, HBASEDIR, fs, null, null, this.mode) {
747 @Override
748 protected Writer createWriter(Path logfile) throws IOException {
749 Writer mockWriter = Mockito.mock(Writer.class);
750 Mockito.doThrow(new IOException("Injected")).when(
751 mockWriter).append(Mockito.<Entry>any());
752 return mockWriter;
753 }
754 };
755
756
757 final AtomicBoolean stop = new AtomicBoolean(false);
758 final Thread someOldThread = new Thread("Some-old-thread") {
759 @Override
760 public void run() {
761 while(!stop.get()) Threads.sleep(10);
762 }
763 };
764 someOldThread.setDaemon(true);
765 someOldThread.start();
766 final Thread t = new Thread("Background-thread-dumper") {
767 public void run() {
768 try {
769 Threads.threadDumpingIsAlive(someOldThread);
770 } catch (InterruptedException e) {
771 e.printStackTrace();
772 }
773 }
774 };
775 t.setDaemon(true);
776 t.start();
777 try {
778 logSplitter.splitLogFile(logfiles[largestLogFile], null);
779 fail("Didn't throw!");
780 } catch (IOException ioe) {
781 assertTrue(ioe.toString().contains("Injected"));
782 } finally {
783
784 stop.set(true);
785 }
786 }
787
788
789
790
791 private void retryOverHdfsProblem(final FileSystem spiedFs) throws Exception {
792 generateWALs(-1);
793 useDifferentDFSClient();
794
795 try {
796 WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals);
797 assertEquals(NUM_WRITERS + 1
798 assertFalse(fs.exists(WALDIR));
799 } catch (IOException e) {
800 fail("There shouldn't be any exception but: " + e.toString());
801 }
802 }
803
804
805 @Test (timeout=300000)
806 public void testMovedWALDuringRecovery() throws Exception {
807
808
809 FileSystem spiedFs = Mockito.spy(fs);
810
811
812 Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")).
813 when(spiedFs).append(Mockito.<Path>any());
814 retryOverHdfsProblem(spiedFs);
815 }
816
817 @Test (timeout=300000)
818 public void testRetryOpenDuringRecovery() throws Exception {
819 FileSystem spiedFs = Mockito.spy(fs);
820
821
822
823
824
825
826
827
828
829 Mockito.doAnswer(new Answer<FSDataInputStream>() {
830 private final String[] errors = new String[] {
831 "Cannot obtain block length", "Could not obtain the last block",
832 "Blocklist for " + OLDLOGDIR + " has changed"};
833 private int count = 0;
834
835 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
836 if (count < 3) {
837 throw new IOException(errors[count++]);
838 }
839 return (FSDataInputStream)invocation.callRealMethod();
840 }
841 }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
842 retryOverHdfsProblem(spiedFs);
843 }
844
845 @Test (timeout=300000)
846 public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException {
847 generateWALs(1, 10, -1);
848 FileStatus logfile = fs.listStatus(WALDIR)[0];
849 useDifferentDFSClient();
850
851 final AtomicInteger count = new AtomicInteger();
852
853 CancelableProgressable localReporter
854 = new CancelableProgressable() {
855 @Override
856 public boolean progress() {
857 count.getAndIncrement();
858 return false;
859 }
860 };
861
862 FileSystem spiedFs = Mockito.spy(fs);
863 Mockito.doAnswer(new Answer<FSDataInputStream>() {
864 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
865 Thread.sleep(1500);
866 return (FSDataInputStream)invocation.callRealMethod();
867 }
868 }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
869
870 try {
871 conf.setInt("hbase.splitlog.report.period", 1000);
872 boolean ret = WALSplitter.splitLogFile(
873 HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, this.mode, wals);
874 assertFalse("Log splitting should failed", ret);
875 assertTrue(count.get() > 0);
876 } catch (IOException e) {
877 fail("There shouldn't be any exception but: " + e.toString());
878 } finally {
879
880 conf.setInt("hbase.splitlog.report.period", 59000);
881 }
882 }
883
884
885
886
887
888 @Test (timeout=300000)
889 public void testThreading() throws Exception {
890 doTestThreading(20000, 128*1024*1024, 0);
891 }
892
893
894
895
896
897 @Test (timeout=300000)
898 public void testThreadingSlowWriterSmallBuffer() throws Exception {
899 doTestThreading(200, 1024, 50);
900 }
901
902
903
904
905
906
907
908
909
910
911
912
913
914 private void doTestThreading(final int numFakeEdits,
915 final int bufferSize,
916 final int writerSlowness) throws Exception {
917
918 Configuration localConf = new Configuration(conf);
919 localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize);
920
921
922 Path logPath = new Path(WALDIR, WAL_FILE_PREFIX + ".fake");
923 FSDataOutputStream out = fs.create(logPath);
924 out.close();
925
926
927 final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4");
928 makeRegionDirs(regions);
929
930
931 WALSplitter logSplitter = new WALSplitter(wals,
932 localConf, HBASEDIR, fs, null, null, this.mode) {
933
934
935 @Override
936 protected Writer createWriter(Path logfile) throws IOException {
937 Writer mockWriter = Mockito.mock(Writer.class);
938 Mockito.doAnswer(new Answer<Void>() {
939 int expectedIndex = 0;
940
941 @Override
942 public Void answer(InvocationOnMock invocation) {
943 if (writerSlowness > 0) {
944 try {
945 Thread.sleep(writerSlowness);
946 } catch (InterruptedException ie) {
947 Thread.currentThread().interrupt();
948 }
949 }
950 Entry entry = (Entry) invocation.getArguments()[0];
951 WALEdit edit = entry.getEdit();
952 List<Cell> cells = edit.getCells();
953 assertEquals(1, cells.size());
954 Cell cell = cells.get(0);
955
956
957 assertEquals(expectedIndex, Bytes.toInt(cell.getRow()));
958 expectedIndex++;
959 return null;
960 }
961 }).when(mockWriter).append(Mockito.<Entry>any());
962 return mockWriter;
963 }
964
965
966 @Override
967 protected Reader getReader(Path curLogFile, CancelableProgressable reporter)
968 throws IOException {
969 Reader mockReader = Mockito.mock(Reader.class);
970 Mockito.doAnswer(new Answer<Entry>() {
971 int index = 0;
972
973 @Override
974 public Entry answer(InvocationOnMock invocation) throws Throwable {
975 if (index >= numFakeEdits) return null;
976
977
978 int regionIdx = index % regions.size();
979 byte region[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx)};
980
981 Entry ret = createTestEntry(TABLE_NAME, region,
982 Bytes.toBytes((int)(index / regions.size())),
983 FAMILY, QUALIFIER, VALUE, index);
984 index++;
985 return ret;
986 }
987 }).when(mockReader).next();
988 return mockReader;
989 }
990 };
991
992 logSplitter.splitLogFile(fs.getFileStatus(logPath), null);
993
994
995 Map<byte[], Long> outputCounts = logSplitter.outputSink.getOutputCounts();
996 for (Map.Entry<byte[], Long> entry : outputCounts.entrySet()) {
997 LOG.info("Got " + entry.getValue() + " output edits for region " +
998 Bytes.toString(entry.getKey()));
999 assertEquals((long)entry.getValue(), numFakeEdits / regions.size());
1000 }
1001 assertEquals("Should have as many outputs as regions", regions.size(), outputCounts.size());
1002 }
1003
1004
1005 @Test (timeout=300000)
1006 public void testSplitLogFileDeletedRegionDir() throws IOException {
1007 LOG.info("testSplitLogFileDeletedRegionDir");
1008 final String REGION = "region__1";
1009 REGIONS.clear();
1010 REGIONS.add(REGION);
1011
1012 generateWALs(1, 10, -1);
1013 useDifferentDFSClient();
1014
1015 Path regiondir = new Path(TABLEDIR, REGION);
1016 LOG.info("Region directory is" + regiondir);
1017 fs.delete(regiondir, true);
1018 WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1019 assertFalse(fs.exists(regiondir));
1020 }
1021
1022 @Test (timeout=300000)
1023 public void testSplitLogFileEmpty() throws IOException {
1024 LOG.info("testSplitLogFileEmpty");
1025 injectEmptyFile(".empty", true);
1026 useDifferentDFSClient();
1027
1028 WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1029 Path tdir = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
1030 assertFalse(fs.exists(tdir));
1031
1032 assertEquals(0, countWAL(fs.listStatus(OLDLOGDIR)[0].getPath()));
1033 }
1034
1035 @Test (timeout=300000)
1036 public void testSplitLogFileMultipleRegions() throws IOException {
1037 LOG.info("testSplitLogFileMultipleRegions");
1038 generateWALs(1, 10, -1);
1039 splitAndCount(1, 10);
1040 }
1041
1042 @Test (timeout=300000)
1043 public void testSplitLogFileFirstLineCorruptionLog()
1044 throws IOException {
1045 conf.setBoolean(HBASE_SKIP_ERRORS, true);
1046 generateWALs(1, 10, -1);
1047 FileStatus logfile = fs.listStatus(WALDIR)[0];
1048
1049 corruptWAL(logfile.getPath(),
1050 Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
1051
1052 useDifferentDFSClient();
1053 WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1054
1055 final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), conf.get(
1056 "hbase.regionserver.hlog.splitlog.corrupt.dir", HConstants.CORRUPT_DIR_NAME));
1057 assertEquals(1, fs.listStatus(corruptDir).length);
1058 }
1059
1060
1061
1062
1063
1064 @Test (timeout=300000)
1065 public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException {
1066 LOG.info("testConcurrentSplitLogAndReplayRecoverEdit");
1067
1068 String regionName = "r0";
1069 final Path regiondir = new Path(TABLEDIR, regionName);
1070 REGIONS.clear();
1071 REGIONS.add(regionName);
1072 generateWALs(-1);
1073
1074 wals.getWAL(Bytes.toBytes(regionName));
1075 FileStatus[] logfiles = fs.listStatus(WALDIR);
1076 assertTrue("There should be some log file",
1077 logfiles != null && logfiles.length > 0);
1078
1079 WALSplitter logSplitter = new WALSplitter(wals,
1080 conf, HBASEDIR, fs, null, null, this.mode) {
1081 @Override
1082 protected Writer createWriter(Path logfile)
1083 throws IOException {
1084 Writer writer = wals.createRecoveredEditsWriter(this.fs, logfile);
1085
1086
1087
1088 NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
1089 if (files != null && !files.isEmpty()) {
1090 for (Path file : files) {
1091 if (!this.fs.delete(file, false)) {
1092 LOG.error("Failed delete of " + file);
1093 } else {
1094 LOG.debug("Deleted recovered.edits file=" + file);
1095 }
1096 }
1097 }
1098 return writer;
1099 }
1100 };
1101 try{
1102 logSplitter.splitLogFile(logfiles[0], null);
1103 } catch (IOException e) {
1104 LOG.info(e);
1105 fail("Throws IOException when spliting "
1106 + "log, it is most likely because writing file does not "
1107 + "exist which is caused by concurrent replayRecoveredEditsIfAny()");
1108 }
1109 if (fs.exists(CORRUPTDIR)) {
1110 if (fs.listStatus(CORRUPTDIR).length > 0) {
1111 fail("There are some corrupt logs, "
1112 + "it is most likely caused by concurrent replayRecoveredEditsIfAny()");
1113 }
1114 }
1115 }
1116
1117 private Writer generateWALs(int leaveOpen) throws IOException {
1118 return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen);
1119 }
1120
1121 private void makeRegionDirs(List<String> regions) throws IOException {
1122 for (String region : regions) {
1123 LOG.debug("Creating dir for region " + region);
1124 fs.mkdirs(new Path(TABLEDIR, region));
1125 }
1126 }
1127
1128
1129
1130
1131
1132 private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException {
1133 makeRegionDirs(REGIONS);
1134 fs.mkdirs(WALDIR);
1135 Writer [] ws = new Writer[writers];
1136 int seq = 0;
1137 for (int i = 0; i < writers; i++) {
1138 ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i));
1139 for (int j = 0; j < entries; j++) {
1140 int prefix = 0;
1141 for (String region : REGIONS) {
1142 String row_key = region + prefix++ + i + j;
1143 appendEntry(ws[i], TABLE_NAME, region.getBytes(), row_key.getBytes(), FAMILY, QUALIFIER,
1144 VALUE, seq++);
1145 }
1146 }
1147 if (i != leaveOpen) {
1148 ws[i].close();
1149 LOG.info("Closing writer " + i);
1150 }
1151 }
1152 if (leaveOpen < 0 || leaveOpen >= writers) {
1153 return null;
1154 }
1155 return ws[leaveOpen];
1156 }
1157
1158 private Path[] getLogForRegion(Path rootdir, TableName table, String region)
1159 throws IOException {
1160 Path tdir = FSUtils.getTableDir(rootdir, table);
1161 @SuppressWarnings("deprecation")
1162 Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
1163 Bytes.toString(region.getBytes())));
1164 FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
1165 @Override
1166 public boolean accept(Path p) {
1167 if (WALSplitter.isSequenceIdFile(p)) {
1168 return false;
1169 }
1170 return true;
1171 }
1172 });
1173 Path[] paths = new Path[files.length];
1174 for (int i = 0; i < files.length; i++) {
1175 paths[i] = files[i].getPath();
1176 }
1177 return paths;
1178 }
1179
1180 private void corruptWAL(Path path, Corruptions corruption, boolean close) throws IOException {
1181 FSDataOutputStream out;
1182 int fileSize = (int) fs.listStatus(path)[0].getLen();
1183
1184 FSDataInputStream in = fs.open(path);
1185 byte[] corrupted_bytes = new byte[fileSize];
1186 in.readFully(0, corrupted_bytes, 0, fileSize);
1187 in.close();
1188
1189 switch (corruption) {
1190 case APPEND_GARBAGE:
1191 fs.delete(path, false);
1192 out = fs.create(path);
1193 out.write(corrupted_bytes);
1194 out.write("-----".getBytes());
1195 closeOrFlush(close, out);
1196 break;
1197
1198 case INSERT_GARBAGE_ON_FIRST_LINE:
1199 fs.delete(path, false);
1200 out = fs.create(path);
1201 out.write(0);
1202 out.write(corrupted_bytes);
1203 closeOrFlush(close, out);
1204 break;
1205
1206 case INSERT_GARBAGE_IN_THE_MIDDLE:
1207 fs.delete(path, false);
1208 out = fs.create(path);
1209 int middle = (int) Math.floor(corrupted_bytes.length / 2);
1210 out.write(corrupted_bytes, 0, middle);
1211 out.write(0);
1212 out.write(corrupted_bytes, middle, corrupted_bytes.length - middle);
1213 closeOrFlush(close, out);
1214 break;
1215
1216 case TRUNCATE:
1217 fs.delete(path, false);
1218 out = fs.create(path);
1219 out.write(corrupted_bytes, 0, fileSize
1220 - (32 + ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT));
1221 closeOrFlush(close, out);
1222 break;
1223
1224 case TRUNCATE_TRAILER:
1225 fs.delete(path, false);
1226 out = fs.create(path);
1227 out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);
1228 closeOrFlush(close, out);
1229 break;
1230 }
1231 }
1232
1233 private void closeOrFlush(boolean close, FSDataOutputStream out)
1234 throws IOException {
1235 if (close) {
1236 out.close();
1237 } else {
1238 Method syncMethod = null;
1239 try {
1240 syncMethod = out.getClass().getMethod("hflush", new Class<?> []{});
1241 } catch (NoSuchMethodException e) {
1242 try {
1243 syncMethod = out.getClass().getMethod("sync", new Class<?> []{});
1244 } catch (NoSuchMethodException ex) {
1245 throw new IOException("This version of Hadoop supports " +
1246 "neither Syncable.sync() nor Syncable.hflush().");
1247 }
1248 }
1249 try {
1250 syncMethod.invoke(out, new Object[]{});
1251 } catch (Exception e) {
1252 throw new IOException(e);
1253 }
1254
1255 }
1256 }
1257
1258 private int countWAL(Path log) throws IOException {
1259 int count = 0;
1260 Reader in = wals.createReader(fs, log);
1261 while (in.next() != null) {
1262 count++;
1263 }
1264 in.close();
1265 return count;
1266 }
1267
1268 public static long appendEntry(Writer writer, TableName table, byte[] region,
1269 byte[] row, byte[] family, byte[] qualifier,
1270 byte[] value, long seq)
1271 throws IOException {
1272 LOG.info(Thread.currentThread().getName() + " append");
1273 writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
1274 LOG.info(Thread.currentThread().getName() + " sync");
1275 writer.sync();
1276 return seq;
1277 }
1278
1279 private static Entry createTestEntry(
1280 TableName table, byte[] region,
1281 byte[] row, byte[] family, byte[] qualifier,
1282 byte[] value, long seq) {
1283 long time = System.nanoTime();
1284 WALEdit edit = new WALEdit();
1285 seq++;
1286 edit.add(new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value));
1287 return new Entry(new WALKey(region, table, seq, time,
1288 HConstants.DEFAULT_CLUSTER_ID), edit);
1289 }
1290
1291 private void injectEmptyFile(String suffix, boolean closeFile)
1292 throws IOException {
1293 Writer writer = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix),
1294 conf);
1295 if (closeFile) writer.close();
1296 }
1297
1298 private boolean logsAreEqual(Path p1, Path p2) throws IOException {
1299 Reader in1, in2;
1300 in1 = wals.createReader(fs, p1);
1301 in2 = wals.createReader(fs, p2);
1302 Entry entry1;
1303 Entry entry2;
1304 while ((entry1 = in1.next()) != null) {
1305 entry2 = in2.next();
1306 if ((entry1.getKey().compareTo(entry2.getKey()) != 0) ||
1307 (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))) {
1308 return false;
1309 }
1310 }
1311 in1.close();
1312 in2.close();
1313 return true;
1314 }
1315 }