View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertNotNull;
23  import static org.junit.Assert.assertTrue;
24  import static org.junit.Assert.fail;
25  import static org.mockito.Matchers.any;
26  import static org.mockito.Matchers.eq;
27  import static org.mockito.Mockito.doAnswer;
28  import static org.mockito.Mockito.spy;
29  import static org.mockito.Mockito.when;
30  
31  import java.io.FilterInputStream;
32  import java.io.IOException;
33  import java.lang.reflect.Field;
34  import java.security.PrivilegedExceptionAction;
35  import java.util.ArrayList;
36  import java.util.Collection;
37  import java.util.HashSet;
38  import java.util.List;
39  import java.util.Set;
40  import java.util.concurrent.atomic.AtomicBoolean;
41  import java.util.concurrent.atomic.AtomicInteger;
42  import java.util.concurrent.atomic.AtomicLong;
43  
44  import org.apache.commons.logging.Log;
45  import org.apache.commons.logging.LogFactory;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.fs.FSDataInputStream;
48  import org.apache.hadoop.fs.FileStatus;
49  import org.apache.hadoop.fs.FileSystem;
50  import org.apache.hadoop.fs.Path;
51  import org.apache.hadoop.fs.PathFilter;
52  import org.apache.hadoop.hbase.Cell;
53  import org.apache.hadoop.hbase.HBaseConfiguration;
54  import org.apache.hadoop.hbase.HBaseTestingUtility;
55  import org.apache.hadoop.hbase.HColumnDescriptor;
56  import org.apache.hadoop.hbase.HConstants;
57  import org.apache.hadoop.hbase.HRegionInfo;
58  import org.apache.hadoop.hbase.HTableDescriptor;
59  import org.apache.hadoop.hbase.KeyValue;
60  import org.apache.hadoop.hbase.MasterNotRunningException;
61  import org.apache.hadoop.hbase.testclassification.MediumTests;
62  import org.apache.hadoop.hbase.MiniHBaseCluster;
63  import org.apache.hadoop.hbase.ServerName;
64  import org.apache.hadoop.hbase.TableName;
65  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
66  import org.apache.hadoop.hbase.client.Delete;
67  import org.apache.hadoop.hbase.client.Get;
68  import org.apache.hadoop.hbase.client.HTable;
69  import org.apache.hadoop.hbase.client.Put;
70  import org.apache.hadoop.hbase.client.Result;
71  import org.apache.hadoop.hbase.client.ResultScanner;
72  import org.apache.hadoop.hbase.client.Scan;
73  import org.apache.hadoop.hbase.client.Table;
74  import org.apache.hadoop.hbase.fs.HFileSystem;
75  import org.apache.hadoop.hbase.master.HMaster;
76  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
77  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
78  import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
79  import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
80  import org.apache.hadoop.hbase.regionserver.FlushRequestListener;
81  import org.apache.hadoop.hbase.regionserver.FlushRequester;
82  import org.apache.hadoop.hbase.regionserver.HRegion;
83  import org.apache.hadoop.hbase.regionserver.HRegionServer;
84  import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
85  import org.apache.hadoop.hbase.regionserver.Region;
86  import org.apache.hadoop.hbase.regionserver.RegionScanner;
87  import org.apache.hadoop.hbase.regionserver.RegionServerServices;
88  import org.apache.hadoop.hbase.regionserver.Store;
89  import org.apache.hadoop.hbase.security.User;
90  import org.apache.hadoop.hbase.util.Bytes;
91  import org.apache.hadoop.hbase.util.EnvironmentEdge;
92  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
93  import org.apache.hadoop.hbase.util.FSUtils;
94  import org.apache.hadoop.hbase.util.HFileTestUtil;
95  import org.apache.hadoop.hbase.util.Pair;
96  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
97  import org.apache.hadoop.hbase.wal.WAL;
98  import org.apache.hadoop.hbase.wal.WALFactory;
99  import org.apache.hadoop.hbase.wal.WALKey;
100 import org.apache.hadoop.hbase.wal.WALSplitter;
101 import org.apache.hadoop.hdfs.DFSInputStream;
102 import org.junit.After;
103 import org.junit.AfterClass;
104 import org.junit.Before;
105 import org.junit.BeforeClass;
106 import org.junit.Rule;
107 import org.junit.Test;
108 import org.junit.experimental.categories.Category;
109 import org.junit.rules.TestName;
110 import org.mockito.Mockito;
111 import org.mockito.invocation.InvocationOnMock;
112 import org.mockito.stubbing.Answer;
113 
114 /**
115  * Test replay of edits out of a WAL split.
116  */
117 @Category(MediumTests.class)
118 public class TestWALReplay {
119   public static final Log LOG = LogFactory.getLog(TestWALReplay.class);
120   static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
121   private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
122   private Path hbaseRootDir = null;
123   private Path hbaseWALRootDir = null;
124   private String logName;
125   private Path oldLogDir;
126   private Path logDir;
127   private FileSystem fs;
128   private Configuration conf;
129   private RecoveryMode mode;
130   private WALFactory wals;
131 
132   @Rule
133   public final TestName currentTest = new TestName();
134 
135 
136   @BeforeClass
137   public static void setUpBeforeClass() throws Exception {
138     Configuration conf = TEST_UTIL.getConfiguration();
139     conf.setBoolean("dfs.support.append", true);
140     // The below config supported by 0.20-append and CDH3b2
141     conf.setInt("dfs.client.block.recovery.retries", 2);
142     TEST_UTIL.startMiniCluster(3);
143     Path hbaseRootDir =
144       TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
145     Path hbaseWALRootDir =
146         TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbaselog"));
147     LOG.info(HConstants.HBASE_DIR + "=" + hbaseRootDir);
148     LOG.info(HFileSystem.HBASE_WAL_DIR + "=" + hbaseWALRootDir);
149     FSUtils.setRootDir(conf, hbaseRootDir);
150     FSUtils.setWALRootDir(conf, hbaseWALRootDir);
151   }
152 
153   @AfterClass
154   public static void tearDownAfterClass() throws Exception {
155     TEST_UTIL.shutdownMiniCluster();
156   }
157 
158   @Before
159   public void setUp() throws Exception {
160     this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
161     this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
162     this.hbaseRootDir = FSUtils.getRootDir(this.conf);
163     this.hbaseWALRootDir = FSUtils.getWALRootDir(this.conf);
164     this.oldLogDir = new Path(this.hbaseWALRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
165     this.logName = DefaultWALProvider.getWALDirectoryName(currentTest.getMethodName() + "-manual");
166     this.logDir = new Path(this.hbaseWALRootDir, logName);
167     if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
168       TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
169     }
170     if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseWALRootDir)) {
171       TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseWALRootDir, true);
172     }
173     this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
174         RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
175     this.wals = new WALFactory(conf, null, currentTest.getMethodName());
176   }
177 
178   @After
179   public void tearDown() throws Exception {
180     this.wals.close();
181     TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
182     TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseWALRootDir, true);
183   }
184 
185   /*
186    * @param p Directory to cleanup
187    */
188   private void deleteDir(final Path p) throws IOException {
189     if (this.fs.exists(p)) {
190       if (!this.fs.delete(p, true)) {
191         throw new IOException("Failed remove of " + p);
192       }
193     }
194   }
195 
196   /**
197    *
198    * @throws Exception
199    */
200   @Test
201   public void testReplayEditsAfterRegionMovedWithMultiCF() throws Exception {
202     final TableName tableName =
203         TableName.valueOf("testReplayEditsAfterRegionMovedWithMultiCF");
204     byte[] family1 = Bytes.toBytes("cf1");
205     byte[] family2 = Bytes.toBytes("cf2");
206     byte[] qualifier = Bytes.toBytes("q");
207     byte[] value = Bytes.toBytes("testV");
208     byte[][] familys = { family1, family2 };
209     TEST_UTIL.createTable(tableName, familys);
210     Table htable = new HTable(TEST_UTIL.getConfiguration(), tableName);
211     Put put = new Put(Bytes.toBytes("r1"));
212     put.add(family1, qualifier, value);
213     htable.put(put);
214     ResultScanner resultScanner = htable.getScanner(new Scan());
215     int count = 0;
216     while (resultScanner.next() != null) {
217       count++;
218     }
219     resultScanner.close();
220     assertEquals(1, count);
221 
222     MiniHBaseCluster hbaseCluster = TEST_UTIL.getMiniHBaseCluster();
223     List<HRegion> regions = hbaseCluster.getRegions(tableName);
224     assertEquals(1, regions.size());
225 
226     // move region to another regionserver
227     Region destRegion = regions.get(0);
228     int originServerNum = hbaseCluster
229         .getServerWith(destRegion.getRegionInfo().getRegionName());
230     assertTrue("Please start more than 1 regionserver", hbaseCluster
231         .getRegionServerThreads().size() > 1);
232     int destServerNum = 0;
233     while (destServerNum == originServerNum) {
234       destServerNum++;
235     }
236     HRegionServer originServer = hbaseCluster.getRegionServer(originServerNum);
237     HRegionServer destServer = hbaseCluster.getRegionServer(destServerNum);
238     // move region to destination regionserver
239     moveRegionAndWait(destRegion, destServer);
240 
241     // delete the row
242     Delete del = new Delete(Bytes.toBytes("r1"));
243     htable.delete(del);
244     resultScanner = htable.getScanner(new Scan());
245     count = 0;
246     while (resultScanner.next() != null) {
247       count++;
248     }
249     resultScanner.close();
250     assertEquals(0, count);
251 
252     // flush region and make major compaction
253     Region region =  destServer.getOnlineRegion(destRegion.getRegionInfo().getRegionName());
254     region.flush(true);
255     // wait to complete major compaction
256     for (Store store : region.getStores()) {
257       store.triggerMajorCompaction();
258     }
259     region.compact(true);
260 
261     // move region to origin regionserver
262     moveRegionAndWait(destRegion, originServer);
263     // abort the origin regionserver
264     originServer.abort("testing");
265 
266     // see what we get
267     Result result = htable.get(new Get(Bytes.toBytes("r1")));
268     if (result != null) {
269       assertTrue("Row is deleted, but we get" + result.toString(),
270           (result == null) || result.isEmpty());
271     }
272     resultScanner.close();
273   }
274 
275   private void moveRegionAndWait(Region destRegion, HRegionServer destServer)
276       throws InterruptedException, MasterNotRunningException,
277       ZooKeeperConnectionException, IOException {
278     HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
279     TEST_UTIL.getHBaseAdmin().move(
280         destRegion.getRegionInfo().getEncodedNameAsBytes(),
281         Bytes.toBytes(destServer.getServerName().getServerName()));
282     while (true) {
283       ServerName serverName = master.getAssignmentManager()
284         .getRegionStates().getRegionServerOfRegion(destRegion.getRegionInfo());
285       if (serverName != null && serverName.equals(destServer.getServerName())) {
286         TEST_UTIL.assertRegionOnServer(
287           destRegion.getRegionInfo(), serverName, 200);
288         break;
289       }
290       Thread.sleep(10);
291     }
292   }
293 
294   /**
295    * Tests for hbase-2727.
296    * @throws Exception
297    * @see https://issues.apache.org/jira/browse/HBASE-2727
298    */
299   @Test
300   public void test2727() throws Exception {
301     // Test being able to have > 1 set of edits in the recovered.edits directory.
302     // Ensure edits are replayed properly.
303     final TableName tableName =
304         TableName.valueOf("test2727");
305     HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
306     Path basedir = FSUtils.getTableDir(hbaseWALRootDir, tableName);
307     deleteDir(basedir);
308 
309     HTableDescriptor htd = createBasic3FamilyHTD(tableName);
310     HRegion region2 = HRegion.createHRegion(hri, hbaseWALRootDir, this.conf, htd);
311     HRegion.closeHRegion(region2);
312     final byte [] rowName = tableName.getName();
313 
314     WAL wal1 = createWAL(this.conf);
315     // Add 1k to each family.
316     final int countPerFamily = 1000;
317     final AtomicLong sequenceId = new AtomicLong(1);
318     for (HColumnDescriptor hcd: htd.getFamilies()) {
319       addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee,
320           wal1, htd, sequenceId);
321     }
322     wal1.shutdown();
323     runWALSplit(this.conf);
324 
325     WAL wal2 = createWAL(this.conf);
326     // Add 1k to each family.
327     for (HColumnDescriptor hcd: htd.getFamilies()) {
328       addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily,
329           ee, wal2, htd, sequenceId);
330     }
331     wal2.shutdown();
332     runWALSplit(this.conf);
333 
334     WAL wal3 = createWAL(this.conf);
335     try {
336       HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseWALRootDir, hri, htd, wal3);
337       long seqid = region.getOpenSeqNum();
338       // The regions opens with sequenceId as 1. With 6k edits, its sequence number reaches 6k + 1.
339       // When opened, this region would apply 6k edits, and increment the sequenceId by 1
340       assertTrue(seqid > sequenceId.get());
341       assertEquals(seqid - 1, sequenceId.get());
342       LOG.debug("region.getOpenSeqNum(): " + region.getOpenSeqNum() + ", wal3.id: "
343           + sequenceId.get());
344 
345       // TODO: Scan all.
346       region.close();
347     } finally {
348       wal3.close();
349     }
350   }
351 
352   /**
353    * Test case of HRegion that is only made out of bulk loaded files.  Assert
354    * that we don't 'crash'.
355    * @throws IOException
356    * @throws IllegalAccessException
357    * @throws NoSuchFieldException
358    * @throws IllegalArgumentException
359    * @throws SecurityException
360    */
361   @Test
362   public void testRegionMadeOfBulkLoadedFilesOnly()
363   throws IOException, SecurityException, IllegalArgumentException,
364       NoSuchFieldException, IllegalAccessException, InterruptedException {
365     final TableName tableName =
366         TableName.valueOf("testRegionMadeOfBulkLoadedFilesOnly");
367     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
368     final Path basedir = new Path(this.hbaseWALRootDir, tableName.getNameAsString());
369     deleteDir(basedir);
370     final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
371     HRegion region2 = HRegion.createHRegion(hri, hbaseWALRootDir, this.conf, htd);
372     HRegion.closeHRegion(region2);
373     WAL wal = createWAL(this.conf);
374     HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseWALRootDir, hri, htd, wal);
375 
376     byte [] family = htd.getFamilies().iterator().next().getName();
377     Path f =  new Path(basedir, "hfile");
378     HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(""),
379         Bytes.toBytes("z"), 10);
380     List <Pair<byte[],String>>  hfs= new ArrayList<Pair<byte[],String>>(1);
381     hfs.add(Pair.newPair(family, f.toString()));
382     region.bulkLoadHFiles(hfs, true, null);
383 
384     // Add an edit so something in the WAL
385     byte [] row = tableName.getName();
386     region.put((new Put(row)).add(family, family, family));
387     wal.sync();
388     final int rowsInsertedCount = 11;
389 
390     assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
391 
392     // Now 'crash' the region by stealing its wal
393     final Configuration newConf = HBaseConfiguration.create(this.conf);
394     User user = HBaseTestingUtility.getDifferentUser(newConf,
395         tableName.getNameAsString());
396     user.runAs(new PrivilegedExceptionAction() {
397       @Override
398       public Object run() throws Exception {
399         runWALSplit(newConf);
400         WAL wal2 = createWAL(newConf);
401 
402         HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf),
403           hbaseWALRootDir, hri, htd, wal2);
404         long seqid2 = region2.getOpenSeqNum();
405         assertTrue(seqid2 > -1);
406         assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
407 
408         // I can't close wal1.  Its been appropriated when we split.
409         region2.close();
410         wal2.close();
411         return null;
412       }
413     });
414   }
415 
416   /**
417    * HRegion test case that is made of a major compacted HFile (created with three bulk loaded
418    * files) and an edit in the memstore.
419    * This is for HBASE-10958 "[dataloss] Bulk loading with seqids can prevent some log entries
420    * from being replayed"
421    * @throws IOException
422    * @throws IllegalAccessException
423    * @throws NoSuchFieldException
424    * @throws IllegalArgumentException
425    * @throws SecurityException
426    */
427   @Test
428   public void testCompactedBulkLoadedFiles()
429       throws IOException, SecurityException, IllegalArgumentException,
430       NoSuchFieldException, IllegalAccessException, InterruptedException {
431     final TableName tableName =
432         TableName.valueOf("testCompactedBulkLoadedFiles");
433     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
434     final Path basedir = new Path(this.hbaseWALRootDir, tableName.getNameAsString());
435     deleteDir(basedir);
436     final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
437     HRegion region2 = HRegion.createHRegion(hri,
438         hbaseWALRootDir, this.conf, htd);
439     HRegion.closeHRegion(region2);
440     WAL wal = createWAL(this.conf);
441     HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseWALRootDir, hri, htd, wal);
442 
443     // Add an edit so something in the WAL
444     byte [] row = tableName.getName();
445     byte [] family = htd.getFamilies().iterator().next().getName();
446     region.put((new Put(row)).add(family, family, family));
447     wal.sync();
448 
449     List <Pair<byte[],String>>  hfs= new ArrayList<Pair<byte[],String>>(1);
450     for (int i = 0; i < 3; i++) {
451       Path f = new Path(basedir, "hfile"+i);
452       HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(i + "00"),
453           Bytes.toBytes(i + "50"), 10);
454       hfs.add(Pair.newPair(family, f.toString()));
455     }
456     region.bulkLoadHFiles(hfs, true, null);
457     final int rowsInsertedCount = 31;
458     assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
459 
460     // major compact to turn all the bulk loaded files into one normal file
461     region.compact(true);
462     assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
463 
464     // Now 'crash' the region by stealing its wal
465     final Configuration newConf = HBaseConfiguration.create(this.conf);
466     User user = HBaseTestingUtility.getDifferentUser(newConf,
467         tableName.getNameAsString());
468     user.runAs(new PrivilegedExceptionAction() {
469       @Override
470       public Object run() throws Exception {
471         runWALSplit(newConf);
472         WAL wal2 = createWAL(newConf);
473 
474         HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf),
475             hbaseWALRootDir, hri, htd, wal2);
476         long seqid2 = region2.getOpenSeqNum();
477         assertTrue(seqid2 > -1);
478         assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
479 
480         // I can't close wal1.  Its been appropriated when we split.
481         region2.close();
482         wal2.close();
483         return null;
484       }
485     });
486   }
487 
488 
489   /**
490    * Test writing edits into an HRegion, closing it, splitting logs, opening
491    * Region again.  Verify seqids.
492    * @throws IOException
493    * @throws IllegalAccessException
494    * @throws NoSuchFieldException
495    * @throws IllegalArgumentException
496    * @throws SecurityException
497    */
498   @Test
499   public void testReplayEditsWrittenViaHRegion()
500   throws IOException, SecurityException, IllegalArgumentException,
501       NoSuchFieldException, IllegalAccessException, InterruptedException {
502     final TableName tableName =
503         TableName.valueOf("testReplayEditsWrittenViaHRegion");
504     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
505     final Path basedir = FSUtils.getTableDir(this.hbaseWALRootDir, tableName);
506     deleteDir(basedir);
507     final byte[] rowName = tableName.getName();
508     final int countPerFamily = 10;
509     final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
510     HRegion region3 = HRegion.createHRegion(hri,
511             hbaseWALRootDir, this.conf, htd);
512     HRegion.closeHRegion(region3);
513     // Write countPerFamily edits into the three families.  Do a flush on one
514     // of the families during the load of edits so its seqid is not same as
515     // others to test we do right thing when different seqids.
516     WAL wal = createWAL(this.conf);
517     HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseWALRootDir, hri, htd, wal);
518     long seqid = region.getOpenSeqNum();
519     boolean first = true;
520     for (HColumnDescriptor hcd: htd.getFamilies()) {
521       addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
522       if (first) {
523         // If first, so we have at least one family w/ different seqid to rest.
524         region.flush(true);
525         first = false;
526       }
527     }
528     // Now assert edits made it in.
529     final Get g = new Get(rowName);
530     Result result = region.get(g);
531     assertEquals(countPerFamily * htd.getFamilies().size(),
532       result.size());
533     // Now close the region (without flush), split the log, reopen the region and assert that
534     // replay of log has the correct effect, that our seqids are calculated correctly so
535     // all edits in logs are seen as 'stale'/old.
536     region.close(true);
537     wal.shutdown();
538     runWALSplit(this.conf);
539     WAL wal2 = createWAL(this.conf);
540     HRegion region2 = HRegion.openHRegion(conf, this.fs, hbaseWALRootDir, hri, htd, wal2);
541     long seqid2 = region2.getOpenSeqNum();
542     assertTrue(seqid + result.size() < seqid2);
543     final Result result1b = region2.get(g);
544     assertEquals(result.size(), result1b.size());
545 
546     // Next test.  Add more edits, then 'crash' this region by stealing its wal
547     // out from under it and assert that replay of the log adds the edits back
548     // correctly when region is opened again.
549     for (HColumnDescriptor hcd: htd.getFamilies()) {
550       addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y");
551     }
552     // Get count of edits.
553     final Result result2 = region2.get(g);
554     assertEquals(2 * result.size(), result2.size());
555     wal2.sync();
556     final Configuration newConf = HBaseConfiguration.create(this.conf);
557     User user = HBaseTestingUtility.getDifferentUser(newConf,
558       tableName.getNameAsString());
559     user.runAs(new PrivilegedExceptionAction() {
560       @Override
561       public Object run() throws Exception {
562         runWALSplit(newConf);
563         FileSystem newFS = FileSystem.get(newConf);
564         // Make a new wal for new region open.
565         WAL wal3 = createWAL(newConf);
566         final AtomicInteger countOfRestoredEdits = new AtomicInteger(0);
567         HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) {
568           @Override
569           protected boolean restoreEdit(Store s, Cell cell) {
570             boolean b = super.restoreEdit(s, cell);
571             countOfRestoredEdits.incrementAndGet();
572             return b;
573           }
574         };
575         long seqid3 = region3.initialize();
576         Result result3 = region3.get(g);
577         // Assert that count of cells is same as before crash.
578         assertEquals(result2.size(), result3.size());
579         assertEquals(htd.getFamilies().size() * countPerFamily,
580           countOfRestoredEdits.get());
581 
582         // I can't close wal1.  Its been appropriated when we split.
583         region3.close();
584         wal3.close();
585         return null;
586       }
587     });
588   }
589 
590   /**
591    * Test that we recover correctly when there is a failure in between the
592    * flushes. i.e. Some stores got flushed but others did not.
593    *
594    * Unfortunately, there is no easy hook to flush at a store level. The way
595    * we get around this is by flushing at the region level, and then deleting
596    * the recently flushed store file for one of the Stores. This would put us
597    * back in the situation where all but that store got flushed and the region
598    * died.
599    *
600    * We restart Region again, and verify that the edits were replayed.
601    *
602    * @throws IOException
603    * @throws IllegalAccessException
604    * @throws NoSuchFieldException
605    * @throws IllegalArgumentException
606    * @throws SecurityException
607    */
608   @Test
609   public void testReplayEditsAfterPartialFlush()
610   throws IOException, SecurityException, IllegalArgumentException,
611       NoSuchFieldException, IllegalAccessException, InterruptedException {
612     final TableName tableName =
613         TableName.valueOf("testReplayEditsWrittenViaHRegion");
614     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
615     final Path basedir = FSUtils.getTableDir(this.hbaseWALRootDir, tableName);
616     deleteDir(basedir);
617     final byte[] rowName = tableName.getName();
618     final int countPerFamily = 10;
619     final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
620     HRegion region3 = HRegion.createHRegion(hri,
621             hbaseWALRootDir, this.conf, htd);
622     HRegion.closeHRegion(region3);
623     // Write countPerFamily edits into the three families.  Do a flush on one
624     // of the families during the load of edits so its seqid is not same as
625     // others to test we do right thing when different seqids.
626     WAL wal = createWAL(this.conf);
627     HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseWALRootDir, hri, htd, wal);
628     long seqid = region.getOpenSeqNum();
629     for (HColumnDescriptor hcd: htd.getFamilies()) {
630       addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
631     }
632 
633     // Now assert edits made it in.
634     final Get g = new Get(rowName);
635     Result result = region.get(g);
636     assertEquals(countPerFamily * htd.getFamilies().size(),
637       result.size());
638 
639     // Let us flush the region
640     region.flush(true);
641     region.close(true);
642     wal.shutdown();
643 
644     // delete the store files in the second column family to simulate a failure
645     // in between the flushcache();
646     // we have 3 families. killing the middle one ensures that taking the maximum
647     // will make us fail.
648     int cf_count = 0;
649     for (HColumnDescriptor hcd: htd.getFamilies()) {
650       cf_count++;
651       if (cf_count == 2) {
652         region.getRegionFileSystem().deleteFamily(hcd.getNameAsString());
653       }
654     }
655 
656 
657     // Let us try to split and recover
658     runWALSplit(this.conf);
659     WAL wal2 = createWAL(this.conf);
660     HRegion region2 = HRegion.openHRegion(this.conf, this.fs, hbaseWALRootDir, hri, htd, wal2);
661     long seqid2 = region2.getOpenSeqNum();
662     assertTrue(seqid + result.size() < seqid2);
663 
664     final Result result1b = region2.get(g);
665     assertEquals(result.size(), result1b.size());
666   }
667 
668 
669   // StoreFlusher implementation used in testReplayEditsAfterAbortingFlush.
670   // Only throws exception if throwExceptionWhenFlushing is set true.
671   public static class CustomStoreFlusher extends DefaultStoreFlusher {
672     // Switch between throw and not throw exception in flush
673     static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false);
674 
675     public CustomStoreFlusher(Configuration conf, Store store) {
676       super(conf, store);
677     }
678     @Override
679     public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
680         MonitoredTask status) throws IOException {
681       if (throwExceptionWhenFlushing.get()) {
682         throw new IOException("Simulated exception by tests");
683       }
684       return super.flushSnapshot(snapshot, cacheFlushId, status);
685     }
686 
687   };
688 
689   /**
690    * Test that we could recover the data correctly after aborting flush. In the
691    * test, first we abort flush after writing some data, then writing more data
692    * and flush again, at last verify the data.
693    * @throws IOException
694    */
695   @Test
696   public void testReplayEditsAfterAbortingFlush() throws IOException {
697     final TableName tableName =
698         TableName.valueOf("testReplayEditsAfterAbortingFlush");
699     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
700     final Path basedir = FSUtils.getTableDir(this.hbaseWALRootDir, tableName);
701     deleteDir(basedir);
702     final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
703     HRegion region3 = HRegion.createHRegion(hri, hbaseWALRootDir, this.conf, htd);
704     region3.close();
705     region3.getWAL().close();
706     // Write countPerFamily edits into the three families. Do a flush on one
707     // of the families during the load of edits so its seqid is not same as
708     // others to test we do right thing when different seqids.
709     WAL wal = createWAL(this.conf);
710     RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
711     Mockito.doReturn(false).when(rsServices).isAborted();
712     when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10));
713     Configuration customConf = new Configuration(this.conf);
714     customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
715         CustomStoreFlusher.class.getName());
716     HRegion region =
717       HRegion.openHRegion(this.hbaseWALRootDir, hri, htd, wal, customConf, rsServices, null);
718     int writtenRowCount = 10;
719     List<HColumnDescriptor> families = new ArrayList<HColumnDescriptor>(
720         htd.getFamilies());
721     for (int i = 0; i < writtenRowCount; i++) {
722       Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
723       put.add(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
724           Bytes.toBytes("val"));
725       region.put(put);
726     }
727 
728     // Now assert edits made it in.
729     RegionScanner scanner = region.getScanner(new Scan());
730     assertEquals(writtenRowCount, getScannedCount(scanner));
731 
732     // Let us flush the region
733     CustomStoreFlusher.throwExceptionWhenFlushing.set(true);
734     try {
735       region.flush(true);
736       fail("Injected exception hasn't been thrown");
737     } catch (Throwable t) {
738       LOG.info("Expected simulated exception when flushing region,"
739           + t.getMessage());
740       // simulated to abort server
741       Mockito.doReturn(true).when(rsServices).isAborted();
742       region.setClosing(false); // region normally does not accept writes after
743       // DroppedSnapshotException. We mock around it for this test.
744     }
745     // writing more data
746     int moreRow = 10;
747     for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) {
748       Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
749       put.add(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
750           Bytes.toBytes("val"));
751       region.put(put);
752     }
753     writtenRowCount += moreRow;
754     // call flush again
755     CustomStoreFlusher.throwExceptionWhenFlushing.set(false);
756     try {
757       region.flush(true);
758     } catch (IOException t) {
759       LOG.info("Expected exception when flushing region because server is stopped,"
760           + t.getMessage());
761     }
762 
763     region.close(true);
764     wal.shutdown();
765 
766     // Let us try to split and recover
767     runWALSplit(this.conf);
768     WAL wal2 = createWAL(this.conf);
769     Mockito.doReturn(false).when(rsServices).isAborted();
770     HRegion region2 =
771       HRegion.openHRegion(this.hbaseWALRootDir, hri, htd, wal2, this.conf, rsServices, null);
772     scanner = region2.getScanner(new Scan());
773     assertEquals(writtenRowCount, getScannedCount(scanner));
774   }
775 
776   private int getScannedCount(RegionScanner scanner) throws IOException {
777     int scannedCount = 0;
778     List<Cell> results = new ArrayList<Cell>();
779     while (true) {
780       boolean existMore = scanner.next(results);
781       if (!results.isEmpty())
782         scannedCount++;
783       if (!existMore)
784         break;
785       results.clear();
786     }
787     return scannedCount;
788   }
789 
790   /**
791    * Create an HRegion with the result of a WAL split and test we only see the
792    * good edits
793    * @throws Exception
794    */
795   @Test
796   public void testReplayEditsWrittenIntoWAL() throws Exception {
797     final TableName tableName =
798         TableName.valueOf("testReplayEditsWrittenIntoWAL");
799     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
800     final Path basedir = FSUtils.getTableDir(hbaseWALRootDir, tableName);
801     deleteDir(basedir);
802 
803     final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
804     HRegion region2 = HRegion.createHRegion(hri,
805             hbaseWALRootDir, this.conf, htd);
806     HRegion.closeHRegion(region2);
807     final WAL wal = createWAL(this.conf);
808     final byte[] rowName = tableName.getName();
809     final byte[] regionName = hri.getEncodedNameAsBytes();
810     final AtomicLong sequenceId = new AtomicLong(1);
811 
812     // Add 1k to each family.
813     final int countPerFamily = 1000;
814     Set<byte[]> familyNames = new HashSet<byte[]>();
815     for (HColumnDescriptor hcd: htd.getFamilies()) {
816       addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily,
817           ee, wal, htd, sequenceId);
818       familyNames.add(hcd.getName());
819     }
820 
821     // Add a cache flush, shouldn't have any effect
822     wal.startCacheFlush(regionName, familyNames);
823     wal.completeCacheFlush(regionName);
824 
825     // Add an edit to another family, should be skipped.
826     WALEdit edit = new WALEdit();
827     long now = ee.currentTime();
828     edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName,
829       now, rowName));
830     wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now), edit, sequenceId,
831         true, null);
832 
833     // Delete the c family to verify deletes make it over.
834     edit = new WALEdit();
835     now = ee.currentTime();
836     edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now,
837       KeyValue.Type.DeleteFamily));
838     wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now), edit, sequenceId,
839         true, null);
840 
841     // Sync.
842     wal.sync();
843     // Make a new conf and a new fs for the splitter to run on so we can take
844     // over old wal.
845     final Configuration newConf = HBaseConfiguration.create(this.conf);
846     User user = HBaseTestingUtility.getDifferentUser(newConf,
847       ".replay.wal.secondtime");
848     user.runAs(new PrivilegedExceptionAction<Void>() {
849       @Override
850       public Void run() throws Exception {
851         runWALSplit(newConf);
852         FileSystem newFS = FileSystem.get(newConf);
853         // 100k seems to make for about 4 flushes during HRegion#initialize.
854         newConf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 100);
855         // Make a new wal for new region.
856         WAL newWal = createWAL(newConf);
857         final AtomicInteger flushcount = new AtomicInteger(0);
858         try {
859           final HRegion region =
860               new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) {
861             @Override
862             protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
863                 final Collection<Store> storesToFlush, MonitoredTask status,
864                 boolean writeFlushWalMarker)
865                     throws IOException {
866               LOG.info("InternalFlushCache Invoked");
867               FlushResult fs = super.internalFlushcache(wal, myseqid, storesToFlush,
868                   Mockito.mock(MonitoredTask.class), writeFlushWalMarker);
869               flushcount.incrementAndGet();
870               return fs;
871             };
872           };
873           long seqid = region.initialize();
874           // We flushed during init.
875           assertTrue("Flushcount=" + flushcount.get(), flushcount.get() > 0);
876           assertTrue(seqid - 1 == sequenceId.get());
877 
878           Get get = new Get(rowName);
879           Result result = region.get(get);
880           // Make sure we only see the good edits
881           assertEquals(countPerFamily * (htd.getFamilies().size() - 1),
882             result.size());
883           region.close();
884         } finally {
885           newWal.close();
886         }
887         return null;
888       }
889     });
890   }
891 
892   @Test
893   // the following test is for HBASE-6065
894   public void testSequentialEditLogSeqNum() throws IOException {
895     final TableName tableName = TableName.valueOf(currentTest.getMethodName());
896     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
897     final Path basedir =
898         FSUtils.getTableDir(this.hbaseWALRootDir, tableName);
899     deleteDir(basedir);
900     final byte[] rowName = tableName.getName();
901     final int countPerFamily = 10;
902     final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
903 
904     // Mock the WAL
905     MockWAL wal = createMockWAL();
906 
907     HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseWALRootDir, hri, htd, wal);
908     for (HColumnDescriptor hcd : htd.getFamilies()) {
909       addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
910     }
911 
912     // Let us flush the region
913     // But this time completeflushcache is not yet done
914     region.flush(true);
915     for (HColumnDescriptor hcd : htd.getFamilies()) {
916       addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x");
917     }
918     long lastestSeqNumber = region.getSequenceId().get();
919     // get the current seq no
920     wal.doCompleteCacheFlush = true;
921     // allow complete cache flush with the previous seq number got after first
922     // set of edits.
923     wal.completeCacheFlush(hri.getEncodedNameAsBytes());
924     wal.shutdown();
925     FileStatus[] listStatus = wal.getFiles();
926     assertNotNull(listStatus);
927     assertTrue(listStatus.length > 0);
928     WALSplitter.splitLogFile(hbaseWALRootDir, listStatus[0],
929         this.fs, this.conf, null, null, null, mode, wals);
930     FileStatus[] listStatus1 = this.fs.listStatus(
931       new Path(FSUtils.getTableDir(hbaseWALRootDir, tableName), new Path(hri.getEncodedName(),
932           "recovered.edits")), new PathFilter() {
933         @Override
934         public boolean accept(Path p) {
935           if (WALSplitter.isSequenceIdFile(p)) {
936             return false;
937           }
938           return true;
939         }
940       });
941     int editCount = 0;
942     for (FileStatus fileStatus : listStatus1) {
943       editCount = Integer.parseInt(fileStatus.getPath().getName());
944     }
945     // The sequence number should be same
946     assertEquals(
947         "The sequence number of the recoverd.edits and the current edit seq should be same",
948         lastestSeqNumber, editCount);
949   }
950 
951   /**
952    * testcase for https://issues.apache.org/jira/browse/HBASE-15252
953    */
954   @Test
955   public void testDatalossWhenInputError() throws IOException, InstantiationException,
956       IllegalAccessException {
957     final TableName tableName = TableName.valueOf("testDatalossWhenInputError");
958     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
959     final Path basedir = FSUtils.getTableDir(this.hbaseWALRootDir, tableName);
960     deleteDir(basedir);
961     final byte[] rowName = tableName.getName();
962     final int countPerFamily = 10;
963     final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
964     HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseWALRootDir, this.hbaseWALRootDir, this.conf, htd);
965     Path regionDir = region1.getRegionFileSystem().getRegionDir();
966     HBaseTestingUtility.closeRegionAndWAL(region1);
967 
968     WAL wal = createWAL(this.conf);
969     HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseWALRootDir, hri, htd, wal);
970     for (HColumnDescriptor hcd : htd.getFamilies()) {
971       addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
972     }
973     // Now assert edits made it in.
974     final Get g = new Get(rowName);
975     Result result = region.get(g);
976     assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
977     // Now close the region (without flush), split the log, reopen the region and assert that
978     // replay of log has the correct effect.
979     region.close(true);
980     wal.shutdown();
981 
982     runWALSplit(this.conf);
983 
984     // here we let the DFSInputStream throw an IOException just after the WALHeader.
985     Path editFile = WALSplitter.getSplitEditFilesSorted(this.fs, regionDir).first();
986     FSDataInputStream stream = fs.open(editFile);
987     stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length);
988     Class<? extends DefaultWALProvider.Reader> logReaderClass =
989         conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
990           DefaultWALProvider.Reader.class);
991     DefaultWALProvider.Reader reader = logReaderClass.newInstance();
992     reader.init(this.fs, editFile, conf, stream);
993     final long headerLength = stream.getPos();
994     reader.close();
995     FileSystem spyFs = spy(this.fs);
996     doAnswer(new Answer<FSDataInputStream>() {
997 
998       @Override
999       public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
1000         FSDataInputStream stream = (FSDataInputStream) invocation.callRealMethod();
1001         Field field = FilterInputStream.class.getDeclaredField("in");
1002         field.setAccessible(true);
1003         final DFSInputStream in = (DFSInputStream) field.get(stream);
1004         DFSInputStream spyIn = spy(in);
1005         doAnswer(new Answer<Integer>() {
1006 
1007           private long pos;
1008 
1009           @Override
1010           public Integer answer(InvocationOnMock invocation) throws Throwable {
1011             if (pos >= headerLength) {
1012               throw new IOException("read over limit");
1013             }
1014             int b = (Integer) invocation.callRealMethod();
1015             if (b > 0) {
1016               pos += b;
1017             }
1018             return b;
1019           }
1020         }).when(spyIn).read(any(byte[].class), any(int.class), any(int.class));
1021         doAnswer(new Answer<Void>() {
1022 
1023           @Override
1024           public Void answer(InvocationOnMock invocation) throws Throwable {
1025             invocation.callRealMethod();
1026             in.close();
1027             return null;
1028           }
1029         }).when(spyIn).close();
1030         field.set(stream, spyIn);
1031         return stream;
1032       }
1033     }).when(spyFs).open(eq(editFile));
1034 
1035     WAL wal2 = createWAL(this.conf);
1036     HRegion region2;
1037     try {
1038       // log replay should fail due to the IOException, otherwise we may lose data.
1039       region2 = HRegion.openHRegion(conf, spyFs, hbaseWALRootDir, hri, htd, wal2);
1040       assertEquals(result.size(), region2.get(g).size());
1041     } catch (IOException e) {
1042       assertEquals("read over limit", e.getMessage());
1043     }
1044     region2 = HRegion.openHRegion(conf, fs, hbaseWALRootDir, hri, htd, wal2);
1045     assertEquals(result.size(), region2.get(g).size());
1046   }
1047 
1048   static class MockWAL extends FSHLog {
1049     boolean doCompleteCacheFlush = false;
1050 
1051     public MockWAL(FileSystem fs, Path walRootDir, String logName, Configuration conf)
1052         throws IOException {
1053       super(fs, walRootDir, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
1054     }
1055 
1056     @Override
1057     public void completeCacheFlush(byte[] encodedRegionName) {
1058       if (!doCompleteCacheFlush) {
1059         return;
1060       }
1061       super.completeCacheFlush(encodedRegionName);
1062     }
1063   }
1064 
1065   private HTableDescriptor createBasic1FamilyHTD(final TableName tableName) {
1066     HTableDescriptor htd = new HTableDescriptor(tableName);
1067     HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a"));
1068     htd.addFamily(a);
1069     return htd;
1070   }
1071 
1072   private MockWAL createMockWAL() throws IOException {
1073     MockWAL wal = new MockWAL(fs, hbaseWALRootDir, logName, conf);
1074     // Set down maximum recovery so we dfsclient doesn't linger retrying something
1075     // long gone.
1076     HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
1077     return wal;
1078   }
1079 
1080   // Flusher used in this test.  Keep count of how often we are called and
1081   // actually run the flush inside here.
1082   class TestFlusher implements FlushRequester {
1083     private HRegion r;
1084 
1085     @Override
1086     public void requestFlush(Region region, boolean force) {
1087       try {
1088         r.flush(force);
1089       } catch (IOException e) {
1090         throw new RuntimeException("Exception flushing", e);
1091       }
1092     }
1093 
1094     @Override
1095     public void requestDelayedFlush(Region region, long when, boolean forceFlushAllStores) {
1096       // TODO Auto-generated method stub
1097 
1098     }
1099 
1100     @Override
1101     public void registerFlushRequestListener(FlushRequestListener listener) {
1102 
1103     }
1104 
1105     @Override
1106     public boolean unregisterFlushRequestListener(FlushRequestListener listener) {
1107       return false;
1108     }
1109 
1110     @Override
1111     public void setGlobalMemstoreLimit(long globalMemStoreSize) {
1112 
1113     }
1114   }
1115 
1116   private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,
1117       final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
1118       final HTableDescriptor htd, final AtomicLong sequenceId)
1119   throws IOException {
1120     String familyStr = Bytes.toString(family);
1121     for (int j = 0; j < count; j++) {
1122       byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j));
1123       byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j));
1124       WALEdit edit = new WALEdit();
1125       edit.add(new KeyValue(rowName, family, qualifierBytes,
1126         ee.currentTime(), columnBytes));
1127       wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, ee.currentTime()),
1128           edit, sequenceId, true, null);
1129     }
1130     wal.sync();
1131   }
1132 
1133   static List<Put> addRegionEdits (final byte [] rowName, final byte [] family,
1134       final int count, EnvironmentEdge ee, final Region r,
1135       final String qualifierPrefix)
1136   throws IOException {
1137     List<Put> puts = new ArrayList<Put>();
1138     for (int j = 0; j < count; j++) {
1139       byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j));
1140       Put p = new Put(rowName);
1141       p.add(family, qualifier, ee.currentTime(), rowName);
1142       r.put(p);
1143       puts.add(p);
1144     }
1145     return puts;
1146   }
1147 
1148   /*
1149    * Creates an HRI around an HTD that has <code>tableName</code> and three
1150    * column families named 'a','b', and 'c'.
1151    * @param tableName Name of table to use when we create HTableDescriptor.
1152    */
1153    private HRegionInfo createBasic3FamilyHRegionInfo(final TableName tableName) {
1154     return new HRegionInfo(tableName, null, null, false);
1155    }
1156 
1157   /*
1158    * Run the split.  Verify only single split file made.
1159    * @param c
1160    * @return The single split file made
1161    * @throws IOException
1162    */
1163   private Path runWALSplit(final Configuration c) throws IOException {
1164     List<Path> splits = WALSplitter.split(
1165       hbaseWALRootDir, logDir, oldLogDir, FSUtils.getWALFileSystem(c), c, wals);
1166     // Split should generate only 1 file since there's only 1 region
1167     assertEquals("splits=" + splits, 1, splits.size());
1168     // Make sure the file exists
1169     assertTrue(fs.exists(splits.get(0)));
1170     LOG.info("Split file=" + splits.get(0));
1171     return splits.get(0);
1172   }
1173 
1174   /*
1175    * @param c
1176    * @return WAL with retries set down from 5 to 1 only.
1177    * @throws IOException
1178    */
1179   private WAL createWAL(final Configuration c) throws IOException {
1180     FSHLog wal = new FSHLog(FSUtils.getWALFileSystem(c), hbaseWALRootDir, logName, c);
1181     // Set down maximum recovery so we dfsclient doesn't linger retrying something
1182     // long gone.
1183     HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
1184     return wal;
1185   }
1186 
1187   private HTableDescriptor createBasic3FamilyHTD(final TableName tableName) {
1188     HTableDescriptor htd = new HTableDescriptor(tableName);
1189     HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a"));
1190     htd.addFamily(a);
1191     HColumnDescriptor b = new HColumnDescriptor(Bytes.toBytes("b"));
1192     htd.addFamily(b);
1193     HColumnDescriptor c = new HColumnDescriptor(Bytes.toBytes("c"));
1194     htd.addFamily(c);
1195     return htd;
1196   }
1197 }