View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.wal;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertTrue;
24  import static org.junit.Assert.fail;
25  
26  import java.io.FileNotFoundException;
27  import java.io.IOException;
28  import java.lang.reflect.Method;
29  import java.security.PrivilegedExceptionAction;
30  import java.util.ArrayList;
31  import java.util.Collections;
32  import java.util.HashMap;
33  import java.util.HashSet;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.NavigableSet;
37  import java.util.Set;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.concurrent.atomic.AtomicLong;
41  
42  import org.apache.commons.logging.Log;
43  import org.apache.commons.logging.LogFactory;
44  import org.apache.hadoop.conf.Configuration;
45  import org.apache.hadoop.fs.FSDataInputStream;
46  import org.apache.hadoop.fs.FSDataOutputStream;
47  import org.apache.hadoop.fs.FileStatus;
48  import org.apache.hadoop.fs.FileSystem;
49  import org.apache.hadoop.fs.FileUtil;
50  import org.apache.hadoop.fs.Path;
51  import org.apache.hadoop.fs.PathFilter;
52  import org.apache.hadoop.hbase.Cell;
53  import org.apache.hadoop.hbase.HBaseConfiguration;
54  import org.apache.hadoop.hbase.HBaseTestingUtility;
55  import org.apache.hadoop.hbase.HConstants;
56  import org.apache.hadoop.hbase.HRegionInfo;
57  import org.apache.hadoop.hbase.KeyValue;
58  import org.apache.hadoop.hbase.testclassification.LargeTests;
59  import org.apache.hadoop.hbase.TableName;
60  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
61  import org.apache.hadoop.hbase.regionserver.HRegion;
62  import org.apache.hadoop.hbase.wal.WAL.Entry;
63  import org.apache.hadoop.hbase.wal.WAL.Reader;
64  import org.apache.hadoop.hbase.wal.WALProvider.Writer;
65  import org.apache.hadoop.hbase.wal.WALSplitter.CorruptedLogFileException;
66  import org.apache.hadoop.hbase.security.User;
67  import org.apache.hadoop.hbase.util.Bytes;
68  import org.apache.hadoop.hbase.util.CancelableProgressable;
69  import org.apache.hadoop.hbase.util.FSUtils;
70  import org.apache.hadoop.hbase.util.Threads;
71  import org.apache.hadoop.hdfs.DFSTestUtil;
72  import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
73  import org.apache.hadoop.ipc.RemoteException;
74  import org.junit.After;
75  import org.junit.AfterClass;
76  import org.junit.Before;
77  import org.junit.BeforeClass;
78  import org.junit.Rule;
79  import org.junit.rules.TestName;
80  import org.junit.Test;
81  import org.junit.experimental.categories.Category;
82  import org.mockito.Mockito;
83  import org.mockito.invocation.InvocationOnMock;
84  import org.mockito.stubbing.Answer;
85  
86  import com.google.common.base.Joiner;
87  import com.google.common.collect.ImmutableList;
88  
89  // imports for things that haven't moved from regionserver.wal yet.
90  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
91  import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
92  import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
93  import org.apache.hadoop.hbase.regionserver.wal.FaultySequenceFileLogReader;
94  
95  /**
96   * Testing {@link WAL} splitting code.
97   */
98  @Category(LargeTests.class)
99  public class TestWALSplit {
100   {
101     // Uncomment the following lines if more verbosity is needed for
102     // debugging (see HBASE-12285 for details).
103     //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
104     //((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
105     //((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
106   }
107   private final static Log LOG = LogFactory.getLog(TestWALSplit.class);
108 
109   private static Configuration conf;
110   private FileSystem fs;
111 
112   protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
113 
114   private Path HBASEDIR;
115   private Path HBASELOGDIR;
116   private Path WALDIR;
117   private Path OLDLOGDIR;
118   private Path CORRUPTDIR;
119   private Path TABLEDIR;
120 
121   private static final int NUM_WRITERS = 10;
122   private static final int ENTRIES = 10; // entries per writer per region
123 
124   private static final TableName TABLE_NAME =
125       TableName.valueOf("t1");
126   private static final byte[] FAMILY = "f1".getBytes();
127   private static final byte[] QUALIFIER = "q1".getBytes();
128   private static final byte[] VALUE = "v1".getBytes();
129   private static final String WAL_FILE_PREFIX = "wal.dat.";
130   private static List<String> REGIONS = new ArrayList<String>();
131   private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors";
132   private static String ROBBER;
133   private static String ZOMBIE;
134   private static String [] GROUP = new String [] {"supergroup"};
135   private RecoveryMode mode;
136 
137   static enum Corruptions {
138     INSERT_GARBAGE_ON_FIRST_LINE,
139     INSERT_GARBAGE_IN_THE_MIDDLE,
140     APPEND_GARBAGE,
141     TRUNCATE,
142     TRUNCATE_TRAILER
143   }
144 
145   @BeforeClass
146   public static void setUpBeforeClass() throws Exception {
147     conf = TEST_UTIL.getConfiguration();
148     conf.setClass("hbase.regionserver.hlog.writer.impl",
149       InstrumentedLogWriter.class, Writer.class);
150     conf.setBoolean("dfs.support.broken.append", true);
151     conf.setBoolean("dfs.support.append", true);
152     // This is how you turn off shortcircuit read currently.  TODO: Fix.  Should read config.
153     System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
154     // Create fake maping user to group and set it to the conf.
155     Map<String, String []> u2g_map = new HashMap<String, String []>(2);
156     ROBBER = User.getCurrent().getName() + "-robber";
157     ZOMBIE = User.getCurrent().getName() + "-zombie";
158     u2g_map.put(ROBBER, GROUP);
159     u2g_map.put(ZOMBIE, GROUP);
160     DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
161     conf.setInt("dfs.heartbeat.interval", 1);
162     TEST_UTIL.startMiniDFSCluster(2);
163   }
164 
165   @AfterClass
166   public static void tearDownAfterClass() throws Exception {
167     TEST_UTIL.shutdownMiniDFSCluster();
168   }
169 
170   @Rule
171   public TestName name = new TestName();
172   private WALFactory wals = null;
173 
174   @Before
175   public void setUp() throws Exception {
176     LOG.info("Cleaning up cluster for new test.");
177     fs = TEST_UTIL.getDFSCluster().getFileSystem();
178     HBASEDIR = TEST_UTIL.createRootDir();
179     HBASELOGDIR = TEST_UTIL.createWALRootDir();
180     OLDLOGDIR = new Path(HBASELOGDIR, HConstants.HREGION_OLDLOGDIR_NAME);
181     CORRUPTDIR = new Path(HBASELOGDIR, HConstants.CORRUPT_DIR_NAME);
182     TABLEDIR = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
183     REGIONS.clear();
184     Collections.addAll(REGIONS, "bbb", "ccc");
185     InstrumentedLogWriter.activateFailure = false;
186     this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? 
187         RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
188     wals = new WALFactory(conf, null, name.getMethodName());
189     WALDIR = new Path(HBASELOGDIR, DefaultWALProvider.getWALDirectoryName(name.getMethodName()));
190     //fs.mkdirs(WALDIR);
191   }
192 
193   @After
194   public void tearDown() throws Exception {
195     try {
196       wals.close();
197     } catch(IOException exception) {
198       // Some tests will move WALs out from under us. In those cases, we'll get an error on close.
199       LOG.info("Ignoring an error while closing down our WALFactory. Fine for some tests, but if" +
200          " you see a failure look here.");
201       LOG.debug("exception details", exception);
202     } finally {
203       wals = null;
204       fs.delete(HBASEDIR, true);
205       fs.delete(HBASELOGDIR, true);
206     }
207   }
208 
209   /**
210    * Simulates splitting a WAL out from under a regionserver that is still trying to write it.
211    * Ensures we do not lose edits.
212    * @throws IOException
213    * @throws InterruptedException
214    */
215   @Test (timeout=300000)
216   public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException {
217     final AtomicLong counter = new AtomicLong(0);
218     AtomicBoolean stop = new AtomicBoolean(false);
219     // Region we'll write edits too and then later examine to make sure they all made it in.
220     final String region = REGIONS.get(0);
221     final int numWriters = 3;
222     Thread zombie = new ZombieLastLogWriterRegionServer(counter, stop, region, numWriters);
223     try {
224       long startCount = counter.get();
225       zombie.start();
226       // Wait till writer starts going.
227       while (startCount == counter.get()) Threads.sleep(1);
228       // Give it a second to write a few appends.
229       Threads.sleep(1000);
230       final Configuration conf2 = HBaseConfiguration.create(this.conf);
231       final User robber = User.createUserForTesting(conf2, ROBBER, GROUP);
232       int count = robber.runAs(new PrivilegedExceptionAction<Integer>() {
233         @Override
234         public Integer run() throws Exception {
235           StringBuilder ls = new StringBuilder("Contents of WALDIR (").append(WALDIR)
236               .append("):\n");
237           for (FileStatus status : fs.listStatus(WALDIR)) {
238             ls.append("\t").append(status.toString()).append("\n");
239           }
240           LOG.debug(ls);
241           LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files.");
242           WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf2, wals);
243           LOG.info("Finished splitting out from under zombie.");
244           Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
245           assertEquals("wrong number of split files for region", numWriters, logfiles.length);
246           int count = 0;
247           for (Path logfile: logfiles) {
248             count += countWAL(logfile);
249           }
250           return count;
251         }
252       });
253       LOG.info("zombie=" + counter.get() + ", robber=" + count);
254       assertTrue("The log file could have at most 1 extra log entry, but can't have less. " +
255         "Zombie could write " + counter.get() + " and logfile had only " + count,
256         counter.get() == count || counter.get() + 1 == count);
257     } finally {
258       stop.set(true);
259       zombie.interrupt();
260       Threads.threadDumpingIsAlive(zombie);
261     }
262   }
263 
264   /**
265    * This thread will keep writing to a 'wal' file even after the split process has started.
266    * It simulates a region server that was considered dead but woke up and wrote some more to the
267    * last log entry. Does its writing as an alternate user in another filesystem instance to
268    * simulate better it being a regionserver.
269    */
270   class ZombieLastLogWriterRegionServer extends Thread {
271     final AtomicLong editsCount;
272     final AtomicBoolean stop;
273     final int numOfWriters;
274     /**
275      * Region to write edits for.
276      */
277     final String region;
278     final User user;
279 
280     public ZombieLastLogWriterRegionServer(AtomicLong counter, AtomicBoolean stop,
281         final String region, final int writers)
282         throws IOException, InterruptedException {
283       super("ZombieLastLogWriterRegionServer");
284       setDaemon(true);
285       this.stop = stop;
286       this.editsCount = counter;
287       this.region = region;
288       this.user = User.createUserForTesting(conf, ZOMBIE, GROUP);
289       numOfWriters = writers;
290     }
291 
292     @Override
293     public void run() {
294       try {
295         doWriting();
296       } catch (IOException e) {
297         LOG.warn(getName() + " Writer exiting " + e);
298       } catch (InterruptedException e) {
299         LOG.warn(getName() + " Writer exiting " + e);
300       }
301     }
302 
303     private void doWriting() throws IOException, InterruptedException {
304       this.user.runAs(new PrivilegedExceptionAction<Object>() {
305         @Override
306         public Object run() throws Exception {
307           // Index of the WAL we want to keep open.  generateWALs will leave open the WAL whose
308           // index we supply here.
309           int walToKeepOpen = numOfWriters - 1;
310           // The below method writes numOfWriters files each with ENTRIES entries for a total of
311           // numOfWriters * ENTRIES added per column family in the region.
312           Writer writer = null;
313           try {
314             writer = generateWALs(numOfWriters, ENTRIES, walToKeepOpen);
315           } catch (IOException e1) {
316             throw new RuntimeException("Failed", e1);
317           }
318           // Update counter so has all edits written so far.
319           editsCount.addAndGet(numOfWriters * ENTRIES);
320           loop(writer);
321           // If we've been interruped, then things should have shifted out from under us.
322           // closing should error
323           try {
324             writer.close();
325             fail("Writing closing after parsing should give an error.");
326           } catch (IOException exception) {
327             LOG.debug("ignoring error when closing final writer.", exception);
328           }
329           return null;
330         }
331       });
332     }
333 
334     private void loop(final Writer writer) {
335       byte [] regionBytes = Bytes.toBytes(this.region);
336       while (!stop.get()) {
337         try {
338           long seq = appendEntry(writer, TABLE_NAME, regionBytes,
339               ("r" + editsCount.get()).getBytes(), regionBytes, QUALIFIER, VALUE, 0);
340           long count = editsCount.incrementAndGet();
341           LOG.info(getName() + " sync count=" + count + ", seq=" + seq);
342           try {
343             Thread.sleep(1);
344           } catch (InterruptedException e) {
345             //
346           }
347         } catch (IOException ex) {
348           LOG.error(getName() + " ex " + ex.toString());
349           if (ex instanceof RemoteException) {
350             LOG.error("Juliet: got RemoteException " + ex.getMessage() +
351               " while writing " + (editsCount.get() + 1));
352           } else {
353             LOG.error(getName() + " failed to write....at " + editsCount.get());
354             fail("Failed to write " + editsCount.get());
355           }
356           break;
357         } catch (Throwable t) {
358           LOG.error(getName() + " HOW? " + t);
359           LOG.debug("exception details", t);
360           break;
361         }
362       }
363       LOG.info(getName() + " Writer exiting");
364     }
365   }
366 
367   /**
368    * @throws IOException
369    * @see https://issues.apache.org/jira/browse/HBASE-3020
370    */
371   @Test (timeout=300000)
372   public void testRecoveredEditsPathForMeta() throws IOException {
373     byte [] encoded = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
374     Path tdir = FSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME);
375     Path regiondir = new Path(tdir,
376         HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
377     fs.mkdirs(regiondir);
378     long now = System.currentTimeMillis();
379     Entry entry =
380         new Entry(new WALKey(encoded,
381             TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
382       new WALEdit());
383     Path p = WALSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR, true);
384     String parentOfParent = p.getParent().getParent().getName();
385     assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
386   }
387 
388   /**
389    * Test old recovered edits file doesn't break WALSplitter.
390    * This is useful in upgrading old instances.
391    */
392   @Test (timeout=300000)
393   public void testOldRecoveredEditsFileSidelined() throws IOException {
394     byte [] encoded = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
395     Path tdir = FSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME);
396     Path regiondir = new Path(tdir,
397         HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
398     fs.mkdirs(regiondir);
399     long now = System.currentTimeMillis();
400     Entry entry =
401         new Entry(new WALKey(encoded,
402             TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
403       new WALEdit());
404     Path parent = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
405     assertEquals(parent.getName(), HConstants.RECOVERED_EDITS_DIR);
406     fs.createNewFile(parent); // create a recovered.edits file
407 
408     Path p = WALSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR, true);
409     String parentOfParent = p.getParent().getParent().getName();
410     assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
411     WALFactory.createRecoveredEditsWriter(fs, p, conf).close();
412   }
413 
414   private void useDifferentDFSClient() throws IOException {
415     // make fs act as a different client now
416     // initialize will create a new DFSClient with a new client ID
417     fs.initialize(fs.getUri(), conf);
418   }
419 
420   @Test (timeout=300000)
421   public void testSplitPreservesEdits() throws IOException{
422     final String REGION = "region__1";
423     REGIONS.clear();
424     REGIONS.add(REGION);
425 
426     generateWALs(1, 10, -1);
427     useDifferentDFSClient();
428     WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
429     Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
430     Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
431     assertEquals(1, splitLog.length);
432 
433     assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
434   }
435 
436   /**
437    * @param expectedEntries -1 to not assert
438    * @return the count across all regions
439    */
440   private int splitAndCount(final int expectedFiles, final int expectedEntries)
441       throws IOException {
442     useDifferentDFSClient();
443     WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
444     int result = 0;
445     for (String region : REGIONS) {
446       Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
447       assertEquals(expectedFiles, logfiles.length);
448       int count = 0;
449       for (Path logfile: logfiles) {
450         count += countWAL(logfile);
451       }
452       if (-1 != expectedEntries) {
453         assertEquals(expectedEntries, count);
454       }
455       result += count;
456     }
457     return result;
458   }
459 
460   @Test (timeout=300000)
461   public void testEmptyLogFiles() throws IOException {
462     testEmptyLogFiles(true);
463   }
464 
465   @Test (timeout=300000)
466   public void testEmptyOpenLogFiles() throws IOException {
467     testEmptyLogFiles(false);
468   }
469 
470   private void testEmptyLogFiles(final boolean close) throws IOException {
471     injectEmptyFile(".empty", close);
472     generateWALs(Integer.MAX_VALUE);
473     injectEmptyFile("empty", close);
474     splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); // skip 2 empty
475   }
476 
477   @Test (timeout=300000)
478   public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException {
479     // generate logs but leave wal.dat.5 open.
480     generateWALs(5);
481     splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
482   }
483 
484   @Test (timeout=300000)
485   public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException {
486     conf.setBoolean(HBASE_SKIP_ERRORS, true);
487     generateWALs(Integer.MAX_VALUE);
488     corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
489             Corruptions.APPEND_GARBAGE, true);
490     splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
491   }
492 
493   @Test (timeout=300000)
494   public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException {
495     conf.setBoolean(HBASE_SKIP_ERRORS, true);
496     generateWALs(Integer.MAX_VALUE);
497     corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
498             Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
499     splitAndCount(NUM_WRITERS - 1, (NUM_WRITERS - 1) * ENTRIES); //1 corrupt
500   }
501 
502   @Test (timeout=300000)
503   public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException {
504     conf.setBoolean(HBASE_SKIP_ERRORS, true);
505     generateWALs(Integer.MAX_VALUE);
506     corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
507             Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false);
508     // the entries in the original logs are alternating regions
509     // considering the sequence file header, the middle corruption should
510     // affect at least half of the entries
511     int goodEntries = (NUM_WRITERS - 1) * ENTRIES;
512     int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1;
513     int allRegionsCount = splitAndCount(NUM_WRITERS, -1);
514     assertTrue("The file up to the corrupted area hasn't been parsed",
515         REGIONS.size() * (goodEntries + firstHalfEntries) <= allRegionsCount);
516   }
517 
518   @Test (timeout=300000)
519   public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
520     conf.setBoolean(HBASE_SKIP_ERRORS, true);
521     for (FaultySequenceFileLogReader.FailureType  failureType :
522         FaultySequenceFileLogReader.FailureType.values()) {
523       final Set<String> walDirContents = splitCorruptWALs(failureType);
524       final Set<String> archivedLogs = new HashSet<String>();
525       final StringBuilder archived = new StringBuilder("Archived logs in CORRUPTDIR:");
526       for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
527         archived.append("\n\t").append(log.toString());
528         archivedLogs.add(log.getPath().getName());
529       }
530       LOG.debug(archived.toString());
531       assertEquals(failureType.name() + ": expected to find all of our wals corrupt.",
532           walDirContents, archivedLogs);
533     }
534   }
535 
536   /**
537    * @return set of wal names present prior to split attempt.
538    * @throws IOException if the split process fails
539    */
540   private Set<String> splitCorruptWALs(final FaultySequenceFileLogReader.FailureType failureType)
541       throws IOException {
542     Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
543         Reader.class);
544     InstrumentedLogWriter.activateFailure = false;
545 
546     try {
547       conf.setClass("hbase.regionserver.hlog.reader.impl",
548           FaultySequenceFileLogReader.class, Reader.class);
549       conf.set("faultysequencefilelogreader.failuretype", failureType.name());
550       // Clean up from previous tests or previous loop
551       try {
552         wals.shutdown();
553       } catch (IOException exception) {
554         // since we're splitting out from under the factory, we should expect some closing failures.
555         LOG.debug("Ignoring problem closing WALFactory.", exception);
556       }
557       wals.close();
558       try {
559         for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
560           fs.delete(log.getPath(), true);
561         }
562       } catch (FileNotFoundException exception) {
563         LOG.debug("no previous CORRUPTDIR to clean.");
564       }
565       // change to the faulty reader
566       wals = new WALFactory(conf, null, name.getMethodName());
567       generateWALs(-1);
568       // Our reader will render all of these files corrupt.
569       final Set<String> walDirContents = new HashSet<String>();
570       for (FileStatus status : fs.listStatus(WALDIR)) {
571         walDirContents.add(status.getPath().getName());
572       }
573       useDifferentDFSClient();
574       WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
575       return walDirContents;
576     } finally {
577       conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
578           Reader.class);
579     }
580   }
581 
582   @Test (timeout=300000, expected = IOException.class)
583   public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows()
584       throws IOException {
585     conf.setBoolean(HBASE_SKIP_ERRORS, false);
586     splitCorruptWALs(FaultySequenceFileLogReader.FailureType.BEGINNING);
587   }
588 
589   @Test (timeout=300000)
590   public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs()
591       throws IOException {
592     conf.setBoolean(HBASE_SKIP_ERRORS, false);
593     try {
594       splitCorruptWALs(FaultySequenceFileLogReader.FailureType.BEGINNING);
595     } catch (IOException e) {
596       LOG.debug("split with 'skip errors' set to 'false' correctly threw");
597     }
598     assertEquals("if skip.errors is false all files should remain in place",
599         NUM_WRITERS + 1 /* Factory WAL */, fs.listStatus(WALDIR).length);
600   }
601 
602   private void ignoreCorruption(final Corruptions corruption, final int entryCount,
603       final int expectedCount) throws IOException {
604     conf.setBoolean(HBASE_SKIP_ERRORS, false);
605 
606     final String REGION = "region__1";
607     REGIONS.clear();
608     REGIONS.add(REGION);
609 
610     Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0");
611     generateWALs(1, entryCount, -1);
612     corruptWAL(c1, corruption, true);
613 
614     useDifferentDFSClient();
615     WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
616 
617     Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
618     assertEquals(1, splitLog.length);
619 
620     int actualCount = 0;
621     Reader in = wals.createReader(fs, splitLog[0]);
622     @SuppressWarnings("unused")
623     Entry entry;
624     while ((entry = in.next()) != null) ++actualCount;
625     assertEquals(expectedCount, actualCount);
626     in.close();
627 
628     // should not have stored the EOF files as corrupt
629     FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR);
630     assertEquals(archivedLogs.length, 0);
631 
632   }
633 
634   @Test (timeout=300000)
635   public void testEOFisIgnored() throws IOException {
636     int entryCount = 10;
637     ignoreCorruption(Corruptions.TRUNCATE, entryCount, entryCount-1);
638   }
639 
640   @Test (timeout=300000)
641   public void testCorruptWALTrailer() throws IOException {
642     int entryCount = 10;
643     ignoreCorruption(Corruptions.TRUNCATE_TRAILER, entryCount, entryCount);
644   }
645 
646   @Test (timeout=300000)
647   public void testLogsGetArchivedAfterSplit() throws IOException {
648     conf.setBoolean(HBASE_SKIP_ERRORS, false);
649     generateWALs(-1);
650     useDifferentDFSClient();
651     WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
652     FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR);
653     assertEquals("wrong number of files in the archive log", NUM_WRITERS + 1 /* wal from factory */,
654         archivedLogs.length);
655   }
656 
657   @Test (timeout=300000)
658   public void testSplit() throws IOException {
659     generateWALs(-1);
660     splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
661   }
662 
663   @Test (timeout=300000)
664   public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit()
665   throws IOException {
666     generateWALs(-1);
667     useDifferentDFSClient();
668     WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
669     FileStatus [] statuses = null;
670     try {
671       statuses = fs.listStatus(WALDIR);
672       if (statuses != null) {
673         fail("Files left in log dir: " +
674             Joiner.on(",").join(FileUtil.stat2Paths(statuses)));
675       }
676     } catch (FileNotFoundException e) {
677       // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null
678     }
679   }
680 
681   @Test(timeout=300000, expected = IOException.class)
682   public void testSplitWillFailIfWritingToRegionFails() throws Exception {
683     //leave 5th log open so we could append the "trap"
684     Writer writer = generateWALs(4);
685     useDifferentDFSClient();
686 
687     String region = "break";
688     Path regiondir = new Path(TABLEDIR, region);
689     fs.mkdirs(regiondir);
690 
691     InstrumentedLogWriter.activateFailure = false;
692     appendEntry(writer, TABLE_NAME, Bytes.toBytes(region),
693         ("r" + 999).getBytes(), FAMILY, QUALIFIER, VALUE, 0);
694     writer.close();
695 
696     try {
697       InstrumentedLogWriter.activateFailure = true;
698       WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
699     } catch (IOException e) {
700       assertTrue(e.getMessage().
701         contains("This exception is instrumented and should only be thrown for testing"));
702       throw e;
703     } finally {
704       InstrumentedLogWriter.activateFailure = false;
705     }
706   }
707 
708   @Test (timeout=300000)
709   public void testSplitDeletedRegion() throws IOException {
710     REGIONS.clear();
711     String region = "region_that_splits";
712     REGIONS.add(region);
713 
714     generateWALs(1);
715     useDifferentDFSClient();
716 
717     Path regiondir = new Path(TABLEDIR, region);
718     fs.delete(regiondir, true);
719     WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
720     assertFalse(fs.exists(regiondir));
721   }
722 
723   @Test (timeout=300000)
724   public void testIOEOnOutputThread() throws Exception {
725     conf.setBoolean(HBASE_SKIP_ERRORS, false);
726 
727     generateWALs(-1);
728     useDifferentDFSClient();
729     FileStatus[] logfiles = fs.listStatus(WALDIR);
730     assertTrue("There should be some log file",
731       logfiles != null && logfiles.length > 0);
732     // wals with no entries (like the one we don't use in the factory)
733     // won't cause a failure since nothing will ever be written.
734     // pick the largest one since it's most likely to have entries.
735     int largestLogFile = 0;
736     long largestSize = 0;
737     for (int i = 0; i < logfiles.length; i++) {
738       if (logfiles[i].getLen() > largestSize) {
739         largestLogFile = i;
740         largestSize = logfiles[i].getLen();
741       }
742     }
743     assertTrue("There should be some log greater than size 0.", 0 < largestSize);
744     // Set up a splitter that will throw an IOE on the output side
745     WALSplitter logSplitter = new WALSplitter(wals,
746         conf, HBASEDIR, fs, null, null, this.mode) {
747       @Override
748       protected Writer createWriter(Path logfile) throws IOException {
749         Writer mockWriter = Mockito.mock(Writer.class);
750         Mockito.doThrow(new IOException("Injected")).when(
751           mockWriter).append(Mockito.<Entry>any());
752         return mockWriter;
753       }
754     };
755     // Set up a background thread dumper.  Needs a thread to depend on and then we need to run
756     // the thread dumping in a background thread so it does not hold up the test.
757     final AtomicBoolean stop = new AtomicBoolean(false);
758     final Thread someOldThread = new Thread("Some-old-thread") {
759       @Override
760       public void run() {
761         while(!stop.get()) Threads.sleep(10);
762       }
763     };
764     someOldThread.setDaemon(true);
765     someOldThread.start();
766     final Thread t = new Thread("Background-thread-dumper") {
767       public void run() {
768         try {
769           Threads.threadDumpingIsAlive(someOldThread);
770         } catch (InterruptedException e) {
771           e.printStackTrace();
772         }
773       }
774     };
775     t.setDaemon(true);
776     t.start();
777     try {
778       logSplitter.splitLogFile(logfiles[largestLogFile], null);
779       fail("Didn't throw!");
780     } catch (IOException ioe) {
781       assertTrue(ioe.toString().contains("Injected"));
782     } finally {
783       // Setting this to true will turn off the background thread dumper.
784       stop.set(true);
785     }
786   }
787 
788   /**
789    * @param spiedFs should be instrumented for failure.
790    */
791   private void retryOverHdfsProblem(final FileSystem spiedFs) throws Exception {
792     generateWALs(-1);
793     useDifferentDFSClient();
794 
795     try {
796       WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals);
797       assertEquals(NUM_WRITERS + 1 /* wal created by factory */, fs.listStatus(OLDLOGDIR).length);
798       assertFalse(fs.exists(WALDIR));
799     } catch (IOException e) {
800       fail("There shouldn't be any exception but: " + e.toString());
801     }
802   }
803 
804   // Test for HBASE-3412
805   @Test (timeout=300000)
806   public void testMovedWALDuringRecovery() throws Exception {
807     // This partial mock will throw LEE for every file simulating
808     // files that were moved
809     FileSystem spiedFs = Mockito.spy(fs);
810     // The "File does not exist" part is very important,
811     // that's how it comes out of HDFS
812     Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")).
813         when(spiedFs).append(Mockito.<Path>any());
814     retryOverHdfsProblem(spiedFs);
815   }
816 
817   @Test (timeout=300000)
818   public void testRetryOpenDuringRecovery() throws Exception {
819     FileSystem spiedFs = Mockito.spy(fs);
820     // The "Cannot obtain block length", "Could not obtain the last block",
821     // and "Blocklist for [^ ]* has changed.*" part is very important,
822     // that's how it comes out of HDFS. If HDFS changes the exception
823     // message, this test needs to be adjusted accordingly.
824     //
825     // When DFSClient tries to open a file, HDFS needs to locate
826     // the last block of the file and get its length. However, if the
827     // last block is under recovery, HDFS may have problem to obtain
828     // the block length, in which case, retry may help.
829     Mockito.doAnswer(new Answer<FSDataInputStream>() {
830       private final String[] errors = new String[] {
831         "Cannot obtain block length", "Could not obtain the last block",
832         "Blocklist for " + OLDLOGDIR + " has changed"};
833       private int count = 0;
834 
835       public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
836             if (count < 3) {
837                 throw new IOException(errors[count++]);
838             }
839             return (FSDataInputStream)invocation.callRealMethod();
840         }
841     }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
842     retryOverHdfsProblem(spiedFs);
843   }
844 
845   @Test (timeout=300000)
846   public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException {
847     generateWALs(1, 10, -1);
848     FileStatus logfile = fs.listStatus(WALDIR)[0];
849     useDifferentDFSClient();
850 
851     final AtomicInteger count = new AtomicInteger();
852 
853     CancelableProgressable localReporter
854       = new CancelableProgressable() {
855         @Override
856         public boolean progress() {
857           count.getAndIncrement();
858           return false;
859         }
860       };
861 
862     FileSystem spiedFs = Mockito.spy(fs);
863     Mockito.doAnswer(new Answer<FSDataInputStream>() {
864       public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
865         Thread.sleep(1500); // Sleep a while and wait report status invoked
866         return (FSDataInputStream)invocation.callRealMethod();
867       }
868     }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
869 
870     try {
871       conf.setInt("hbase.splitlog.report.period", 1000);
872       boolean ret = WALSplitter.splitLogFile(
873         HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, this.mode, wals);
874       assertFalse("Log splitting should failed", ret);
875       assertTrue(count.get() > 0);
876     } catch (IOException e) {
877       fail("There shouldn't be any exception but: " + e.toString());
878     } finally {
879       // reset it back to its default value
880       conf.setInt("hbase.splitlog.report.period", 59000);
881     }
882   }
883 
884   /**
885    * Test log split process with fake data and lots of edits to trigger threading
886    * issues.
887    */
888   @Test (timeout=300000)
889   public void testThreading() throws Exception {
890     doTestThreading(20000, 128*1024*1024, 0);
891   }
892 
893   /**
894    * Test blocking behavior of the log split process if writers are writing slower
895    * than the reader is reading.
896    */
897   @Test (timeout=300000)
898   public void testThreadingSlowWriterSmallBuffer() throws Exception {
899     doTestThreading(200, 1024, 50);
900   }
901 
902   /**
903    * Sets up a log splitter with a mock reader and writer. The mock reader generates
904    * a specified number of edits spread across 5 regions. The mock writer optionally
905    * sleeps for each edit it is fed.
906    * *
907    * After the split is complete, verifies that the statistics show the correct number
908    * of edits output into each region.
909    *
910    * @param numFakeEdits number of fake edits to push through pipeline
911    * @param bufferSize size of in-memory buffer
912    * @param writerSlowness writer threads will sleep this many ms per edit
913    */
914   private void doTestThreading(final int numFakeEdits,
915       final int bufferSize,
916       final int writerSlowness) throws Exception {
917 
918     Configuration localConf = new Configuration(conf);
919     localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize);
920 
921     // Create a fake log file (we'll override the reader to produce a stream of edits)
922     Path logPath = new Path(WALDIR, WAL_FILE_PREFIX + ".fake");
923     FSDataOutputStream out = fs.create(logPath);
924     out.close();
925 
926     // Make region dirs for our destination regions so the output doesn't get skipped
927     final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4");
928     makeRegionDirs(regions);
929 
930     // Create a splitter that reads and writes the data without touching disk
931     WALSplitter logSplitter = new WALSplitter(wals,
932         localConf, HBASEDIR, fs, null, null, this.mode) {
933 
934       /* Produce a mock writer that doesn't write anywhere */
935       @Override
936       protected Writer createWriter(Path logfile) throws IOException {
937         Writer mockWriter = Mockito.mock(Writer.class);
938         Mockito.doAnswer(new Answer<Void>() {
939           int expectedIndex = 0;
940 
941           @Override
942           public Void answer(InvocationOnMock invocation) {
943             if (writerSlowness > 0) {
944               try {
945                 Thread.sleep(writerSlowness);
946               } catch (InterruptedException ie) {
947                 Thread.currentThread().interrupt();
948               }
949             }
950             Entry entry = (Entry) invocation.getArguments()[0];
951             WALEdit edit = entry.getEdit();
952             List<Cell> cells = edit.getCells();
953             assertEquals(1, cells.size());
954             Cell cell = cells.get(0);
955 
956             // Check that the edits come in the right order.
957             assertEquals(expectedIndex, Bytes.toInt(cell.getRow()));
958             expectedIndex++;
959             return null;
960           }
961         }).when(mockWriter).append(Mockito.<Entry>any());
962         return mockWriter;
963       }
964 
965       /* Produce a mock reader that generates fake entries */
966       @Override
967       protected Reader getReader(Path curLogFile, CancelableProgressable reporter)
968           throws IOException {
969         Reader mockReader = Mockito.mock(Reader.class);
970         Mockito.doAnswer(new Answer<Entry>() {
971           int index = 0;
972 
973           @Override
974           public Entry answer(InvocationOnMock invocation) throws Throwable {
975             if (index >= numFakeEdits) return null;
976 
977             // Generate r0 through r4 in round robin fashion
978             int regionIdx = index % regions.size();
979             byte region[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx)};
980 
981             Entry ret = createTestEntry(TABLE_NAME, region,
982                 Bytes.toBytes((int)(index / regions.size())),
983                 FAMILY, QUALIFIER, VALUE, index);
984             index++;
985             return ret;
986           }
987         }).when(mockReader).next();
988         return mockReader;
989       }
990     };
991 
992     logSplitter.splitLogFile(fs.getFileStatus(logPath), null);
993 
994     // Verify number of written edits per region
995     Map<byte[], Long> outputCounts = logSplitter.outputSink.getOutputCounts();
996     for (Map.Entry<byte[], Long> entry : outputCounts.entrySet()) {
997       LOG.info("Got " + entry.getValue() + " output edits for region " +
998           Bytes.toString(entry.getKey()));
999       assertEquals((long)entry.getValue(), numFakeEdits / regions.size());
1000     }
1001     assertEquals("Should have as many outputs as regions", regions.size(), outputCounts.size());
1002   }
1003 
1004   // Does leaving the writer open in testSplitDeletedRegion matter enough for two tests?
1005   @Test (timeout=300000)
1006   public void testSplitLogFileDeletedRegionDir() throws IOException {
1007     LOG.info("testSplitLogFileDeletedRegionDir");
1008     final String REGION = "region__1";
1009     REGIONS.clear();
1010     REGIONS.add(REGION);
1011 
1012     generateWALs(1, 10, -1);
1013     useDifferentDFSClient();
1014 
1015     Path regiondir = new Path(TABLEDIR, REGION);
1016     LOG.info("Region directory is" + regiondir);
1017     fs.delete(regiondir, true);
1018     WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1019     assertFalse(fs.exists(regiondir));
1020   }
1021 
1022   @Test (timeout=300000)
1023   public void testSplitLogFileEmpty() throws IOException {
1024     LOG.info("testSplitLogFileEmpty");
1025     injectEmptyFile(".empty", true);
1026     useDifferentDFSClient();
1027 
1028     WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1029     Path tdir = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
1030     assertFalse(fs.exists(tdir));
1031 
1032     assertEquals(0, countWAL(fs.listStatus(OLDLOGDIR)[0].getPath()));
1033   }
1034 
1035   @Test (timeout=300000)
1036   public void testSplitLogFileMultipleRegions() throws IOException {
1037     LOG.info("testSplitLogFileMultipleRegions");
1038     generateWALs(1, 10, -1);
1039     splitAndCount(1, 10);
1040   }
1041 
1042   @Test (timeout=300000)
1043   public void testSplitLogFileFirstLineCorruptionLog()
1044   throws IOException {
1045     conf.setBoolean(HBASE_SKIP_ERRORS, true);
1046     generateWALs(1, 10, -1);
1047     FileStatus logfile = fs.listStatus(WALDIR)[0];
1048 
1049     corruptWAL(logfile.getPath(),
1050         Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
1051 
1052     useDifferentDFSClient();
1053     WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1054 
1055     final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), conf.get(
1056         "hbase.regionserver.hlog.splitlog.corrupt.dir", HConstants.CORRUPT_DIR_NAME));
1057     assertEquals(1, fs.listStatus(corruptDir).length);
1058   }
1059 
1060   /**
1061    * @throws IOException
1062    * @see https://issues.apache.org/jira/browse/HBASE-4862
1063    */
1064   @Test (timeout=300000)
1065   public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException {
1066     LOG.info("testConcurrentSplitLogAndReplayRecoverEdit");
1067     // Generate wals for our destination region
1068     String regionName = "r0";
1069     final Path regiondir = new Path(TABLEDIR, regionName);
1070     REGIONS.clear();
1071     REGIONS.add(regionName);
1072     generateWALs(-1);
1073 
1074     wals.getWAL(Bytes.toBytes(regionName));
1075     FileStatus[] logfiles = fs.listStatus(WALDIR);
1076     assertTrue("There should be some log file",
1077       logfiles != null && logfiles.length > 0);
1078 
1079     WALSplitter logSplitter = new WALSplitter(wals,
1080         conf, HBASEDIR, fs, null, null, this.mode) {
1081       @Override
1082       protected Writer createWriter(Path logfile)
1083       throws IOException {
1084         Writer writer = wals.createRecoveredEditsWriter(this.fs, logfile);
1085         // After creating writer, simulate region's
1086         // replayRecoveredEditsIfAny() which gets SplitEditFiles of this
1087         // region and delete them, excluding files with '.temp' suffix.
1088         NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
1089         if (files != null && !files.isEmpty()) {
1090           for (Path file : files) {
1091             if (!this.fs.delete(file, false)) {
1092               LOG.error("Failed delete of " + file);
1093             } else {
1094               LOG.debug("Deleted recovered.edits file=" + file);
1095             }
1096           }
1097         }
1098         return writer;
1099       }
1100     };
1101     try{
1102       logSplitter.splitLogFile(logfiles[0], null);
1103     } catch (IOException e) {
1104       LOG.info(e);
1105       fail("Throws IOException when spliting "
1106           + "log, it is most likely because writing file does not "
1107           + "exist which is caused by concurrent replayRecoveredEditsIfAny()");
1108     }
1109     if (fs.exists(CORRUPTDIR)) {
1110       if (fs.listStatus(CORRUPTDIR).length > 0) {
1111         fail("There are some corrupt logs, "
1112                 + "it is most likely caused by concurrent replayRecoveredEditsIfAny()");
1113       }
1114     }
1115   }
1116 
1117   private Writer generateWALs(int leaveOpen) throws IOException {
1118     return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen);
1119   }
1120 
1121   private void makeRegionDirs(List<String> regions) throws IOException {
1122     for (String region : regions) {
1123       LOG.debug("Creating dir for region " + region);
1124       fs.mkdirs(new Path(TABLEDIR, region));
1125     }
1126   }
1127 
1128   /**
1129    * @param leaveOpen index to leave un-closed. -1 to close all.
1130    * @return the writer that's still open, or null if all were closed.
1131    */
1132   private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException {
1133     makeRegionDirs(REGIONS);
1134     fs.mkdirs(WALDIR);
1135     Writer [] ws = new Writer[writers];
1136     int seq = 0;
1137     for (int i = 0; i < writers; i++) {
1138       ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i));
1139       for (int j = 0; j < entries; j++) {
1140         int prefix = 0;
1141         for (String region : REGIONS) {
1142           String row_key = region + prefix++ + i + j;
1143           appendEntry(ws[i], TABLE_NAME, region.getBytes(), row_key.getBytes(), FAMILY, QUALIFIER,
1144               VALUE, seq++);
1145         }
1146       }
1147       if (i != leaveOpen) {
1148         ws[i].close();
1149         LOG.info("Closing writer " + i);
1150       }
1151     }
1152     if (leaveOpen < 0 || leaveOpen >= writers) {
1153       return null;
1154     }
1155     return ws[leaveOpen];
1156   }
1157 
1158   private Path[] getLogForRegion(Path rootdir, TableName table, String region)
1159   throws IOException {
1160     Path tdir = FSUtils.getTableDir(rootdir, table);
1161     @SuppressWarnings("deprecation")
1162     Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
1163       Bytes.toString(region.getBytes())));
1164     FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
1165       @Override
1166       public boolean accept(Path p) {
1167         if (WALSplitter.isSequenceIdFile(p)) {
1168           return false;
1169         }
1170         return true;
1171       }
1172     });
1173     Path[] paths = new Path[files.length];
1174     for (int i = 0; i < files.length; i++) {
1175       paths[i] = files[i].getPath();
1176     }
1177     return paths;
1178   }
1179 
1180   private void corruptWAL(Path path, Corruptions corruption, boolean close) throws IOException {
1181     FSDataOutputStream out;
1182     int fileSize = (int) fs.listStatus(path)[0].getLen();
1183 
1184     FSDataInputStream in = fs.open(path);
1185     byte[] corrupted_bytes = new byte[fileSize];
1186     in.readFully(0, corrupted_bytes, 0, fileSize);
1187     in.close();
1188 
1189     switch (corruption) {
1190       case APPEND_GARBAGE:
1191         fs.delete(path, false);
1192         out = fs.create(path);
1193         out.write(corrupted_bytes);
1194         out.write("-----".getBytes());
1195         closeOrFlush(close, out);
1196         break;
1197 
1198       case INSERT_GARBAGE_ON_FIRST_LINE:
1199         fs.delete(path, false);
1200         out = fs.create(path);
1201         out.write(0);
1202         out.write(corrupted_bytes);
1203         closeOrFlush(close, out);
1204         break;
1205 
1206       case INSERT_GARBAGE_IN_THE_MIDDLE:
1207         fs.delete(path, false);
1208         out = fs.create(path);
1209         int middle = (int) Math.floor(corrupted_bytes.length / 2);
1210         out.write(corrupted_bytes, 0, middle);
1211         out.write(0);
1212         out.write(corrupted_bytes, middle, corrupted_bytes.length - middle);
1213         closeOrFlush(close, out);
1214         break;
1215 
1216       case TRUNCATE:
1217         fs.delete(path, false);
1218         out = fs.create(path);
1219         out.write(corrupted_bytes, 0, fileSize
1220           - (32 + ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT));
1221         closeOrFlush(close, out);
1222         break;
1223 
1224       case TRUNCATE_TRAILER:
1225         fs.delete(path, false);
1226         out = fs.create(path);
1227         out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);// trailer is truncated.
1228         closeOrFlush(close, out);
1229         break;
1230     }
1231   }
1232 
1233   private void closeOrFlush(boolean close, FSDataOutputStream out)
1234   throws IOException {
1235     if (close) {
1236       out.close();
1237     } else {
1238       Method syncMethod = null;
1239       try {
1240         syncMethod = out.getClass().getMethod("hflush", new Class<?> []{});
1241       } catch (NoSuchMethodException e) {
1242         try {
1243           syncMethod = out.getClass().getMethod("sync", new Class<?> []{});
1244         } catch (NoSuchMethodException ex) {
1245           throw new IOException("This version of Hadoop supports " +
1246               "neither Syncable.sync() nor Syncable.hflush().");
1247         }
1248       }
1249       try {
1250         syncMethod.invoke(out, new Object[]{});
1251       } catch (Exception e) {
1252         throw new IOException(e);
1253       }
1254       // Not in 0out.hflush();
1255     }
1256   }
1257 
1258   private int countWAL(Path log) throws IOException {
1259     int count = 0;
1260     Reader in = wals.createReader(fs, log);
1261     while (in.next() != null) {
1262       count++;
1263     }
1264     in.close();
1265     return count;
1266   }
1267 
1268   public static long appendEntry(Writer writer, TableName table, byte[] region,
1269                           byte[] row, byte[] family, byte[] qualifier,
1270                           byte[] value, long seq)
1271           throws IOException {
1272     LOG.info(Thread.currentThread().getName() + " append");
1273     writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
1274     LOG.info(Thread.currentThread().getName() + " sync");
1275     writer.sync();
1276     return seq;
1277   }
1278 
1279   private static Entry createTestEntry(
1280       TableName table, byte[] region,
1281       byte[] row, byte[] family, byte[] qualifier,
1282       byte[] value, long seq) {
1283     long time = System.nanoTime();
1284     WALEdit edit = new WALEdit();
1285     seq++;
1286     edit.add(new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value));
1287     return new Entry(new WALKey(region, table, seq, time,
1288         HConstants.DEFAULT_CLUSTER_ID), edit);
1289   }
1290 
1291   private void injectEmptyFile(String suffix, boolean closeFile)
1292           throws IOException {
1293     Writer writer = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix),
1294         conf);
1295     if (closeFile) writer.close();
1296   }
1297 
1298   private boolean logsAreEqual(Path p1, Path p2) throws IOException {
1299     Reader in1, in2;
1300     in1 = wals.createReader(fs, p1);
1301     in2 = wals.createReader(fs, p2);
1302     Entry entry1;
1303     Entry entry2;
1304     while ((entry1 = in1.next()) != null) {
1305       entry2 = in2.next();
1306       if ((entry1.getKey().compareTo(entry2.getKey()) != 0) ||
1307               (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))) {
1308         return false;
1309       }
1310     }
1311     in1.close();
1312     in2.close();
1313     return true;
1314   }
1315 }