View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertNotEquals;
24  import static org.junit.Assert.assertNotNull;
25  import static org.junit.Assert.assertTrue;
26  import static org.junit.Assert.fail;
27  
28  import java.io.IOException;
29  import java.lang.reflect.Field;
30  import java.util.ArrayList;
31  import java.util.Comparator;
32  import java.util.HashMap;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.Set;
36  import java.util.UUID;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  import java.util.concurrent.atomic.AtomicLong;
39  import java.util.concurrent.CountDownLatch;
40  import java.util.concurrent.ExecutorService;
41  import java.util.concurrent.Executors;
42  
43  import org.apache.commons.lang.mutable.MutableBoolean;
44  import org.apache.commons.logging.Log;
45  import org.apache.commons.logging.LogFactory;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.fs.FileStatus;
48  import org.apache.hadoop.fs.FileSystem;
49  import org.apache.hadoop.fs.Path;
50  import org.apache.hadoop.hbase.CellScanner;
51  import org.apache.hadoop.hbase.Coprocessor;
52  import org.apache.hadoop.hbase.HBaseConfiguration;
53  import org.apache.hadoop.hbase.HBaseTestingUtility;
54  import org.apache.hadoop.hbase.HColumnDescriptor;
55  import org.apache.hadoop.hbase.HConstants;
56  import org.apache.hadoop.hbase.HRegionInfo;
57  import org.apache.hadoop.hbase.HTableDescriptor;
58  import org.apache.hadoop.hbase.KeyValue;
59  import org.apache.hadoop.hbase.testclassification.MediumTests;
60  import org.apache.hadoop.hbase.TableName;
61  import org.apache.hadoop.hbase.client.Get;
62  import org.apache.hadoop.hbase.client.Put;
63  import org.apache.hadoop.hbase.client.Result;
64  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
65  import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
66  import org.apache.hadoop.hbase.regionserver.HRegion;
67  import org.apache.hadoop.hbase.regionserver.Region;
68  import org.apache.hadoop.hbase.testclassification.MediumTests;
69  import org.apache.hadoop.hbase.util.Bytes;
70  import org.apache.hadoop.hbase.util.EnvironmentEdge;
71  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
72  import org.apache.hadoop.hbase.util.FSUtils;
73  import org.apache.hadoop.hbase.util.Threads;
74  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
75  import org.apache.hadoop.hbase.wal.WAL;
76  import org.apache.hadoop.hbase.wal.WALKey;
77  import org.junit.After;
78  import org.junit.AfterClass;
79  import org.junit.Before;
80  import org.junit.BeforeClass;
81  import org.junit.Rule;
82  import org.junit.Test;
83  import org.junit.experimental.categories.Category;
84  import org.junit.rules.TestName;
85  
86  /**
87   * Provides FSHLog test cases.
88   */
89  @Category(MediumTests.class)
90  public class TestFSHLog {
91    protected static final Log LOG = LogFactory.getLog(TestFSHLog.class);
92  
93    protected static Configuration conf;
94    protected static FileSystem fs;
95    protected static Path dir;
96    protected static Path rootDir;
97    protected static Path walRootDir;
98    protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
99  
100   @Rule
101   public final TestName currentTest = new TestName();
102 
103   @Before
104   public void setUp() throws Exception {
105     FileStatus[] entries = fs.listStatus(new Path("/"));
106     for (FileStatus dir : entries) {
107       fs.delete(dir.getPath(), true);
108     }
109     rootDir = TEST_UTIL.createRootDir();
110     walRootDir = TEST_UTIL.createWALRootDir();
111     dir = new Path(walRootDir, currentTest.getMethodName());
112     assertNotEquals(rootDir, walRootDir);
113   }
114 
115   @After
116   public void tearDown() throws Exception {
117   }
118 
119   @BeforeClass
120   public static void setUpBeforeClass() throws Exception {
121     // Make block sizes small.
122     TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
123     // quicker heartbeat interval for faster DN death notification
124     TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
125     TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
126     TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
127 
128     // faster failover with cluster.shutdown();fs.close() idiom
129     TEST_UTIL.getConfiguration()
130         .setInt("hbase.ipc.client.connect.max.retries", 1);
131     TEST_UTIL.getConfiguration().setInt(
132         "dfs.client.block.recovery.retries", 1);
133     TEST_UTIL.getConfiguration().setInt(
134       "hbase.ipc.client.connection.maxidletime", 500);
135     TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
136         SampleRegionWALObserver.class.getName());
137     TEST_UTIL.startMiniDFSCluster(3);
138 
139     conf = TEST_UTIL.getConfiguration();
140     fs = TEST_UTIL.getDFSCluster().getFileSystem();
141   }
142 
143   @AfterClass
144   public static void tearDownAfterClass() throws Exception {
145     fs.delete(rootDir, true);
146     fs.delete(walRootDir, true);
147     TEST_UTIL.shutdownMiniCluster();
148   }
149 
150   /**
151    * A loaded WAL coprocessor won't break existing WAL test cases.
152    */
153   @Test
154   public void testWALCoprocessorLoaded() throws Exception {
155     // test to see whether the coprocessor is loaded or not.
156     FSHLog log = null;
157     try {
158       log = new FSHLog(fs, walRootDir, dir.toString(),
159           HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
160       WALCoprocessorHost host = log.getCoprocessorHost();
161       Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName());
162       assertNotNull(c);
163     } finally {
164       if (log != null) {
165         log.close();
166       }
167     }
168   }
169 
170   protected void addEdits(WAL log, HRegionInfo hri, HTableDescriptor htd, int times,
171       AtomicLong sequenceId) throws IOException {
172     final byte[] row = Bytes.toBytes("row");
173     for (int i = 0; i < times; i++) {
174       long timestamp = System.currentTimeMillis();
175       WALEdit cols = new WALEdit();
176       cols.add(new KeyValue(row, row, row, timestamp, row));
177       log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp),
178         cols, sequenceId, true, null);
179     }
180     log.sync();
181   }
182 
183   /**
184    * helper method to simulate region flush for a WAL.
185    * @param wal
186    * @param regionEncodedName
187    */
188   protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) {
189     wal.startCacheFlush(regionEncodedName, flushedFamilyNames);
190     wal.completeCacheFlush(regionEncodedName);
191   }
192 
193   /**
194    * tests the log comparator. Ensure that we are not mixing meta logs with non-meta logs (throws
195    * exception if we do). Comparison is based on the timestamp present in the wal name.
196    * @throws Exception
197    */
198   @Test 
199   public void testWALComparator() throws Exception {
200     FSHLog wal1 = null;
201     FSHLog walMeta = null;
202     try {
203       wal1 = new FSHLog(fs, walRootDir, dir.toString(),
204           HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
205       LOG.debug("Log obtained is: " + wal1);
206       Comparator<Path> comp = wal1.LOG_NAME_COMPARATOR;
207       Path p1 = wal1.computeFilename(11);
208       Path p2 = wal1.computeFilename(12);
209       // comparing with itself returns 0
210       assertTrue(comp.compare(p1, p1) == 0);
211       // comparing with different filenum.
212       assertTrue(comp.compare(p1, p2) < 0);
213       walMeta = new FSHLog(fs, walRootDir, dir.toString(),
214           HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null,
215           DefaultWALProvider.META_WAL_PROVIDER_ID);
216       Comparator<Path> compMeta = walMeta.LOG_NAME_COMPARATOR;
217 
218       Path p1WithMeta = walMeta.computeFilename(11);
219       Path p2WithMeta = walMeta.computeFilename(12);
220       assertTrue(compMeta.compare(p1WithMeta, p1WithMeta) == 0);
221       assertTrue(compMeta.compare(p1WithMeta, p2WithMeta) < 0);
222       // mixing meta and non-meta logs gives error
223       boolean ex = false;
224       try {
225         comp.compare(p1WithMeta, p2);
226       } catch (IllegalArgumentException e) {
227         ex = true;
228       }
229       assertTrue("Comparator doesn't complain while checking meta log files", ex);
230       boolean exMeta = false;
231       try {
232         compMeta.compare(p1WithMeta, p2);
233       } catch (IllegalArgumentException e) {
234         exMeta = true;
235       }
236       assertTrue("Meta comparator doesn't complain while checking log files", exMeta);
237     } finally {
238       if (wal1 != null) {
239         wal1.close();
240       }
241       if (walMeta != null) {
242         walMeta.close();
243       }
244     }
245   }
246 
247   /**
248    * On rolling a wal after reaching the threshold, {@link WAL#rollWriter()} returns the
249    * list of regions which should be flushed in order to archive the oldest wal file.
250    * <p>
251    * This method tests this behavior by inserting edits and rolling the wal enough times to reach
252    * the max number of logs threshold. It checks whether we get the "right regions" for flush on
253    * rolling the wal.
254    * @throws Exception
255    */
256   @Test 
257   public void testFindMemStoresEligibleForFlush() throws Exception {
258     LOG.debug("testFindMemStoresEligibleForFlush");
259     Configuration conf1 = HBaseConfiguration.create(conf);
260     conf1.setInt("hbase.regionserver.maxlogs", 1);
261     FSHLog wal = new FSHLog(fs, walRootDir, dir.toString(),
262         HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null);
263     HTableDescriptor t1 =
264         new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row"));
265     HTableDescriptor t2 =
266         new HTableDescriptor(TableName.valueOf("t2")).addFamily(new HColumnDescriptor("row"));
267     HRegionInfo hri1 =
268         new HRegionInfo(t1.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
269     HRegionInfo hri2 =
270         new HRegionInfo(t2.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
271     // variables to mock region sequenceIds
272     final AtomicLong sequenceId1 = new AtomicLong(1);
273     final AtomicLong sequenceId2 = new AtomicLong(1);
274     // add edits and roll the wal
275     try {
276       addEdits(wal, hri1, t1, 2, sequenceId1);
277       wal.rollWriter();
278       // add some more edits and roll the wal. This would reach the log number threshold
279       addEdits(wal, hri1, t1, 2, sequenceId1);
280       wal.rollWriter();
281       // with above rollWriter call, the max logs limit is reached.
282       assertTrue(wal.getNumRolledLogFiles() == 2);
283 
284       // get the regions to flush; since there is only one region in the oldest wal, it should
285       // return only one region.
286       byte[][] regionsToFlush = wal.findRegionsToForceFlush();
287       assertEquals(1, regionsToFlush.length);
288       assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
289       // insert edits in second region
290       addEdits(wal, hri2, t2, 2, sequenceId2);
291       // get the regions to flush, it should still read region1.
292       regionsToFlush = wal.findRegionsToForceFlush();
293       assertEquals(regionsToFlush.length, 1);
294       assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
295       // flush region 1, and roll the wal file. Only last wal which has entries for region1 should
296       // remain.
297       flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
298       wal.rollWriter();
299       // only one wal should remain now (that is for the second region).
300       assertEquals(1, wal.getNumRolledLogFiles());
301       // flush the second region
302       flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getFamiliesKeys());
303       wal.rollWriter(true);
304       // no wal should remain now.
305       assertEquals(0, wal.getNumRolledLogFiles());
306       // add edits both to region 1 and region 2, and roll.
307       addEdits(wal, hri1, t1, 2, sequenceId1);
308       addEdits(wal, hri2, t2, 2, sequenceId2);
309       wal.rollWriter();
310       // add edits and roll the writer, to reach the max logs limit.
311       assertEquals(1, wal.getNumRolledLogFiles());
312       addEdits(wal, hri1, t1, 2, sequenceId1);
313       wal.rollWriter();
314       // it should return two regions to flush, as the oldest wal file has entries
315       // for both regions.
316       regionsToFlush = wal.findRegionsToForceFlush();
317       assertEquals(2, regionsToFlush.length);
318       // flush both regions
319       flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
320       flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getFamiliesKeys());
321       wal.rollWriter(true);
322       assertEquals(0, wal.getNumRolledLogFiles());
323       // Add an edit to region1, and roll the wal.
324       addEdits(wal, hri1, t1, 2, sequenceId1);
325       // tests partial flush: roll on a partial flush, and ensure that wal is not archived.
326       wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
327       wal.rollWriter();
328       wal.completeCacheFlush(hri1.getEncodedNameAsBytes());
329       assertEquals(1, wal.getNumRolledLogFiles());
330     } finally {
331       if (wal != null) {
332         wal.close();
333       }
334     }
335   }
336 
337   /**
338    * Simulates WAL append ops for a region and tests
339    * {@link FSHLog#areAllRegionsFlushed(Map, Map, Map)} API.
340    * It compares the region sequenceIds with oldestFlushing and oldestUnFlushed entries.
341    * If a region's entries are larger than min of (oldestFlushing, oldestUnFlushed), then the
342    * region should be flushed before archiving this WAL.
343   */
344   @Test
345   public void testAllRegionsFlushed() {
346     LOG.debug("testAllRegionsFlushed");
347     Map<byte[], Long> oldestFlushingSeqNo = new HashMap<byte[], Long>();
348     Map<byte[], Long> oldestUnFlushedSeqNo = new HashMap<byte[], Long>();
349     Map<byte[], Long> seqNo = new HashMap<byte[], Long>();
350     // create a table
351     TableName t1 = TableName.valueOf("t1");
352     // create a region
353     HRegionInfo hri1 = new HRegionInfo(t1, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
354     // variables to mock region sequenceIds
355     final AtomicLong sequenceId1 = new AtomicLong(1);
356     // test empty map
357     assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
358     // add entries in the region
359     seqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.incrementAndGet());
360     oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get());
361     // should say region1 is not flushed.
362     assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
363     // test with entries in oldestFlushing map.
364     oldestUnFlushedSeqNo.clear();
365     oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get());
366     assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
367     // simulate region flush, i.e., clear oldestFlushing and oldestUnflushed maps
368     oldestFlushingSeqNo.clear();
369     oldestUnFlushedSeqNo.clear();
370     assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
371     // insert some large values for region1
372     oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), 1000l);
373     seqNo.put(hri1.getEncodedNameAsBytes(), 1500l);
374     assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
375 
376     // tests when oldestUnFlushed/oldestFlushing contains larger value.
377     // It means region is flushed.
378     oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), 1200l);
379     oldestUnFlushedSeqNo.clear();
380     seqNo.put(hri1.getEncodedNameAsBytes(), 1199l);
381     assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
382   }
383 
384   @Test(expected=IOException.class)
385   public void testFailedToCreateWALIfParentRenamed() throws IOException {
386     final String name = "testFailedToCreateWALIfParentRenamed";
387     FSHLog log = new FSHLog(fs, walRootDir, name, HConstants.HREGION_OLDLOGDIR_NAME,
388         conf, null, true, null, null);
389     long filenum = System.currentTimeMillis();
390     Path path = log.computeFilename(filenum);
391     log.createWriterInstance(path);
392     Path parent = path.getParent();
393     path = log.computeFilename(filenum + 1);
394     Path newPath = new Path(parent.getParent(), parent.getName() + "-splitting");
395     fs.rename(parent, newPath);
396     log.createWriterInstance(path);
397     fail("It should fail to create the new WAL");
398   }
399 
400   /**
401    * Test flush for sure has a sequence id that is beyond the last edit appended.  We do this
402    * by slowing appends in the background ring buffer thread while in foreground we call
403    * flush.  The addition of the sync over HRegion in flush should fix an issue where flush was
404    * returning before all of its appends had made it out to the WAL (HBASE-11109).
405    * see HBASE-11109
406    * @throws IOException
407    */
408   @Test
409   public void testFlushSequenceIdIsGreaterThanAllEditsInHFile() throws IOException {
410     String testName = "testFlushSequenceIdIsGreaterThanAllEditsInHFile";
411     final TableName tableName = TableName.valueOf(testName);
412     final HRegionInfo hri = new HRegionInfo(tableName);
413     final byte[] rowName = tableName.getName();
414     final HTableDescriptor htd = new HTableDescriptor(tableName);
415     htd.addFamily(new HColumnDescriptor("f"));
416     HRegion r = HRegion.createHRegion(hri, rootDir,
417       TEST_UTIL.getConfiguration(), htd);
418     HRegion.closeHRegion(r);
419     final int countPerFamily = 10;
420     final MutableBoolean goslow = new MutableBoolean(false);
421     // subclass and doctor a method.
422     FSHLog wal = new FSHLog(FileSystem.get(conf), walRootDir,
423         testName, conf) {
424       @Override
425       void atHeadOfRingBufferEventHandlerAppend() {
426         if (goslow.isTrue()) {
427           Threads.sleep(100);
428           LOG.debug("Sleeping before appending 100ms");
429         }
430         super.atHeadOfRingBufferEventHandlerAppend();
431       }
432     };
433     HRegion region = HRegion.openHRegion(TEST_UTIL.getConfiguration(),
434       TEST_UTIL.getTestFileSystem(), rootDir, hri, htd, wal);
435     EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
436     try {
437       List<Put> puts = null;
438       for (HColumnDescriptor hcd: htd.getFamilies()) {
439         puts =
440           TestWALReplay.addRegionEdits(rowName, hcd.getName(), countPerFamily, ee, region, "x");
441       }
442 
443       // Now assert edits made it in.
444       final Get g = new Get(rowName);
445       Result result = region.get(g);
446       assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
447 
448       // Construct a WALEdit and add it a few times to the WAL.
449       WALEdit edits = new WALEdit();
450       for (Put p: puts) {
451         CellScanner cs = p.cellScanner();
452         while (cs.advance()) {
453           edits.add(cs.current());
454         }
455       }
456       // Add any old cluster id.
457       List<UUID> clusterIds = new ArrayList<UUID>();
458       clusterIds.add(UUID.randomUUID());
459       // Now make appends run slow.
460       goslow.setValue(true);
461       for (int i = 0; i < countPerFamily; i++) {
462         final HRegionInfo info = region.getRegionInfo();
463         final WALKey logkey = new WALKey(info.getEncodedNameAsBytes(), tableName,
464             System.currentTimeMillis(), clusterIds, -1, -1);
465         wal.append(htd, info, logkey, edits, region.getSequenceId(), true, null);
466       }
467       region.flush(true);
468       // FlushResult.flushSequenceId is not visible here so go get the current sequence id.
469       long currentSequenceId = region.getSequenceId().get();
470       // Now release the appends
471       goslow.setValue(false);
472       synchronized (goslow) {
473         goslow.notifyAll();
474       }
475       assertTrue(currentSequenceId >= region.getSequenceId().get());
476     } finally {
477       region.close(true);
478       wal.close();
479     }
480   }
481 
482   @Test
483   public void testSyncRunnerIndexOverflow() throws IOException, NoSuchFieldException,
484       SecurityException, IllegalArgumentException, IllegalAccessException {
485     final String name = "testSyncRunnerIndexOverflow";
486     FSHLog log =
487         new FSHLog(fs, walRootDir, name, HConstants.HREGION_OLDLOGDIR_NAME, conf,
488             null, true, null, null);
489     try {
490       Field ringBufferEventHandlerField = FSHLog.class.getDeclaredField("ringBufferEventHandler");
491       ringBufferEventHandlerField.setAccessible(true);
492       FSHLog.RingBufferEventHandler ringBufferEventHandler =
493           (FSHLog.RingBufferEventHandler) ringBufferEventHandlerField.get(log);
494       Field syncRunnerIndexField =
495           FSHLog.RingBufferEventHandler.class.getDeclaredField("syncRunnerIndex");
496       syncRunnerIndexField.setAccessible(true);
497       syncRunnerIndexField.set(ringBufferEventHandler, Integer.MAX_VALUE - 1);
498       HTableDescriptor htd =
499           new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row"));
500       HRegionInfo hri =
501           new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
502       final AtomicLong sequenceId1 = new AtomicLong(1);
503       for (int i = 0; i < 10; i++) {
504         addEdits(log, hri, htd, 1, sequenceId1);
505       }
506     } finally {
507       log.close();
508     }
509   }
510 
511   /**
512    * Test case for https://issues.apache.org/jira/browse/HBASE-16721
513    */
514   @Test (timeout = 30000)
515   public void testUnflushedSeqIdTracking() throws IOException, InterruptedException {
516     final String name = "testSyncRunnerIndexOverflow";
517     final byte[] b = Bytes.toBytes("b");
518 
519     final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
520     final CountDownLatch holdAppend = new CountDownLatch(1);
521     final CountDownLatch flushFinished = new CountDownLatch(1);
522     final CountDownLatch putFinished = new CountDownLatch(1);
523 
524     try (FSHLog log =
525         new FSHLog(fs, walRootDir, name, HConstants.HREGION_OLDLOGDIR_NAME, conf,
526             null, true, null, null)) {
527 
528       log.registerWALActionsListener(new WALActionsListener.Base() {
529         @Override
530         public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {
531           if (startHoldingForAppend.get()) {
532             try {
533               holdAppend.await();
534             } catch (InterruptedException e) {
535               LOG.error(e);
536             }
537           }
538         }
539       });
540 
541       // open a new region which uses this WAL
542       HTableDescriptor htd =
543           new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor(b));
544       HRegionInfo hri =
545           new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
546 
547       final HRegion region = TEST_UTIL.createLocalHRegion(hri, htd, log);
548       ExecutorService exec = Executors.newFixedThreadPool(2);
549 
550       // do a regular write first because of memstore size calculation.
551       region.put(new Put(b).addColumn(b, b,b));
552 
553       startHoldingForAppend.set(true);
554       exec.submit(new Runnable() {
555         @Override
556         public void run() {
557           try {
558             region.put(new Put(b).addColumn(b, b,b));
559             putFinished.countDown();
560           } catch (IOException e) {
561             LOG.error(e);
562           }
563         }
564       });
565 
566       // give the put a chance to start
567       Threads.sleep(3000);
568 
569       exec.submit(new Runnable() {
570         @Override
571         public void run() {
572           try {
573             Region.FlushResult flushResult = region.flush(true);
574             LOG.info("Flush result:" +  flushResult.getResult());
575             LOG.info("Flush succeeded:" +  flushResult.isFlushSucceeded());
576             flushFinished.countDown();
577           } catch (IOException e) {
578             LOG.error(e);
579           }
580         }
581       });
582 
583       // give the flush a chance to start. Flush should have got the region lock, and
584       // should have been waiting on the mvcc complete after this.
585       Threads.sleep(3000);
586 
587       // let the append to WAL go through now that the flush already started
588       holdAppend.countDown();
589       putFinished.await();
590       flushFinished.await();
591 
592       // check whether flush went through
593       assertEquals("Region did not flush?", 1, region.getStoreFileList(new byte[][]{b}).size());
594 
595       // now check the region's unflushed seqIds.
596       long seqId = log.getEarliestMemstoreSeqNum(hri.getEncodedNameAsBytes());
597       assertEquals("Found seqId for the region which is already flushed",
598           HConstants.NO_SEQNUM, seqId);
599 
600       region.close();
601     }
602   }
603 }