View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
22  import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES;
23  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
24  import static org.junit.Assert.assertEquals;
25  import static org.junit.Assert.assertTrue;
26  import static org.junit.Assert.fail;
27  import static org.mockito.Matchers.any;
28  import static org.mockito.Mockito.doAnswer;
29  import static org.mockito.Mockito.mock;
30  import static org.mockito.Mockito.spy;
31  import static org.mockito.Mockito.when;
32  
33  import java.io.IOException;
34  import java.util.ArrayList;
35  import java.util.Collection;
36  import java.util.Collections;
37  import java.util.List;
38  import java.util.concurrent.CountDownLatch;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.fs.FSDataOutputStream;
44  import org.apache.hadoop.fs.FileStatus;
45  import org.apache.hadoop.fs.FileSystem;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.hbase.ChoreService;
48  import org.apache.hadoop.hbase.HBaseConfiguration;
49  import org.apache.hadoop.hbase.HBaseTestCase;
50  import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon;
51  import org.apache.hadoop.hbase.HBaseTestingUtility;
52  import org.apache.hadoop.hbase.HColumnDescriptor;
53  import org.apache.hadoop.hbase.HConstants;
54  import org.apache.hadoop.hbase.HTableDescriptor;
55  import org.apache.hadoop.hbase.client.Delete;
56  import org.apache.hadoop.hbase.client.Durability;
57  import org.apache.hadoop.hbase.client.Put;
58  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
59  import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
60  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
61  import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
62  import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
63  import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
64  import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
65  import org.apache.hadoop.hbase.security.User;
66  import org.apache.hadoop.hbase.testclassification.MediumTests;
67  import org.apache.hadoop.hbase.util.Bytes;
68  import org.apache.hadoop.hbase.util.Pair;
69  import org.apache.hadoop.hbase.util.Threads;
70  import org.apache.hadoop.hbase.wal.WAL;
71  import org.junit.After;
72  import org.junit.Assume;
73  import org.junit.Before;
74  import org.junit.Rule;
75  import org.junit.Test;
76  import org.junit.experimental.categories.Category;
77  import org.junit.rules.TestName;
78  import org.mockito.Mockito;
79  import org.mockito.invocation.InvocationOnMock;
80  import org.mockito.stubbing.Answer;
81  
82  
83  /**
84   * Test compaction framework and common functions
85   */
86  @Category(MediumTests.class)
87  public class TestCompaction {
88    @Rule public TestName name = new TestName();
89    static final Log LOG = LogFactory.getLog(TestCompaction.class.getName());
90    private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU();
91    protected Configuration conf = UTIL.getConfiguration();
92  
93    private HRegion r = null;
94    private HTableDescriptor htd = null;
95    private static final byte [] COLUMN_FAMILY = fam1;
96    private final byte [] STARTROW = Bytes.toBytes(START_KEY);
97    private static final byte [] COLUMN_FAMILY_TEXT = COLUMN_FAMILY;
98    private int compactionThreshold;
99    private byte[] secondRowBytes, thirdRowBytes;
100   private static final long MAX_FILES_TO_COMPACT = 10;
101   private final byte[] FAMILY = Bytes.toBytes("cf");
102 
103   /** constructor */
104   public TestCompaction() {
105     super();
106 
107     // Set cache flush size to 1MB
108     conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
109     conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100);
110     conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
111       NoLimitCompactionThroughputController.class.getName());
112     compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
113 
114     secondRowBytes = START_KEY_BYTES.clone();
115     // Increment the least significant character so we get to next row.
116     secondRowBytes[START_KEY_BYTES.length - 1]++;
117     thirdRowBytes = START_KEY_BYTES.clone();
118     thirdRowBytes[START_KEY_BYTES.length - 1] += 2;
119   }
120 
121   @Before
122   public void setUp() throws Exception {
123     this.htd = UTIL.createTableDescriptor(name.getMethodName());
124     if (name.getMethodName().equals("testCompactionSeqId")) {
125       UTIL.getConfiguration().set("hbase.hstore.compaction.kv.max", "10");
126       UTIL.getConfiguration().set(
127           DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY,
128           DummyCompactor.class.getName());
129       HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
130       hcd.setMaxVersions(65536);
131       this.htd.addFamily(hcd);
132     }
133     this.r = UTIL.createLocalHRegion(htd, null, null);
134   }
135 
136   @After
137   public void tearDown() throws Exception {
138     WAL wal = r.getWAL();
139     this.r.close();
140     wal.close();
141   }
142 
143   /**
144    * Verify that you can stop a long-running compaction
145    * (used during RS shutdown)
146    * @throws Exception
147    */
148   @Test
149   public void testInterruptCompaction() throws Exception {
150     assertEquals(0, count());
151 
152     // lower the polling interval for this test
153     int origWI = HStore.closeCheckInterval;
154     HStore.closeCheckInterval = 10*1000; // 10 KB
155 
156     try {
157       // Create a couple store files w/ 15KB (over 10KB interval)
158       int jmax = (int) Math.ceil(15.0/compactionThreshold);
159       byte [] pad = new byte[1000]; // 1 KB chunk
160       for (int i = 0; i < compactionThreshold; i++) {
161         HRegionIncommon loader = new HRegionIncommon(r);
162         Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
163         p.setDurability(Durability.SKIP_WAL);
164         for (int j = 0; j < jmax; j++) {
165           p.add(COLUMN_FAMILY, Bytes.toBytes(j), pad);
166         }
167         HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY));
168         loader.put(p);
169         loader.flushcache();
170       }
171 
172       HRegion spyR = spy(r);
173       doAnswer(new Answer() {
174         @Override
175         public Object answer(InvocationOnMock invocation) throws Throwable {
176           r.writestate.writesEnabled = false;
177           return invocation.callRealMethod();
178         }
179       }).when(spyR).doRegionCompactionPrep();
180 
181       // force a minor compaction, but not before requesting a stop
182       spyR.compactStores();
183 
184       // ensure that the compaction stopped, all old files are intact,
185       Store s = r.stores.get(COLUMN_FAMILY);
186       assertEquals(compactionThreshold, s.getStorefilesCount());
187       assertTrue(s.getStorefilesSize() > 15*1000);
188       // and no new store files persisted past compactStores()
189       FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
190       assertEquals(0, ls.length);
191 
192     } finally {
193       // don't mess up future tests
194       r.writestate.writesEnabled = true;
195       HStore.closeCheckInterval = origWI;
196 
197       // Delete all Store information once done using
198       for (int i = 0; i < compactionThreshold; i++) {
199         Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
200         byte [][] famAndQf = {COLUMN_FAMILY, null};
201         delete.deleteFamily(famAndQf[0]);
202         r.delete(delete);
203       }
204       r.flush(true);
205 
206       // Multiple versions allowed for an entry, so the delete isn't enough
207       // Lower TTL and expire to ensure that all our entries have been wiped
208       final int ttl = 1000;
209       for (Store hstore: this.r.stores.values()) {
210         HStore store = (HStore)hstore;
211         ScanInfo old = store.getScanInfo();
212         ScanInfo si = new ScanInfo(old.getFamily(),
213             old.getMinVersions(), old.getMaxVersions(), ttl,
214             old.getKeepDeletedCells(), 0, old.getComparator());
215         store.setScanInfo(si);
216       }
217       Thread.sleep(ttl);
218 
219       r.compact(true);
220       assertEquals(0, count());
221     }
222   }
223 
224   private int count() throws IOException {
225     int count = 0;
226     for (StoreFile f: this.r.stores.
227         get(COLUMN_FAMILY_TEXT).getStorefiles()) {
228       HFileScanner scanner = f.getReader().getScanner(false, false);
229       if (!scanner.seekTo()) {
230         continue;
231       }
232       do {
233         count++;
234       } while(scanner.next());
235     }
236     return count;
237   }
238 
239   private void createStoreFile(final HRegion region) throws IOException {
240     createStoreFile(region, Bytes.toString(COLUMN_FAMILY));
241   }
242 
243   private void createStoreFile(final HRegion region, String family) throws IOException {
244     HRegionIncommon loader = new HRegionIncommon(region);
245     HBaseTestCase.addContent(loader, family);
246     loader.flushcache();
247   }
248 
249   @Test
250   public void testCompactionWithCorruptResult() throws Exception {
251     int nfiles = 10;
252     for (int i = 0; i < nfiles; i++) {
253       createStoreFile(r);
254     }
255     HStore store = (HStore) r.getStore(COLUMN_FAMILY);
256 
257     Collection<StoreFile> storeFiles = store.getStorefiles();
258     DefaultCompactor tool = (DefaultCompactor)store.storeEngine.getCompactor();
259     tool.compactForTesting(storeFiles, false);
260 
261     // Now lets corrupt the compacted file.
262     FileSystem fs = store.getFileSystem();
263     // default compaction policy created one and only one new compacted file
264     Path dstPath = store.getRegionFileSystem().createTempName();
265     FSDataOutputStream stream = fs.create(dstPath, null, true, 512, (short)3, (long)1024, null);
266     stream.writeChars("CORRUPT FILE!!!!");
267     stream.close();
268     Path origPath = store.getRegionFileSystem().commitStoreFile(
269       Bytes.toString(COLUMN_FAMILY), dstPath);
270 
271     try {
272       ((HStore)store).moveFileIntoPlace(origPath);
273     } catch (Exception e) {
274       // The complete compaction should fail and the corrupt file should remain
275       // in the 'tmp' directory;
276       assert (fs.exists(origPath));
277       assert (!fs.exists(dstPath));
278       System.out.println("testCompactionWithCorruptResult Passed");
279       return;
280     }
281     fail("testCompactionWithCorruptResult failed since no exception was" +
282         "thrown while completing a corrupt file");
283   }
284 
285   /**
286    * Create a custom compaction request and be sure that we can track it through the queue, knowing
287    * when the compaction is completed.
288    */
289   @Test
290   public void testTrackingCompactionRequest() throws Exception {
291     // setup a compact/split thread on a mock server
292     HRegionServer mockServer = Mockito.mock(HRegionServer.class);
293     Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
294     CompactSplitThread thread = new CompactSplitThread(mockServer);
295     Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
296 
297     // setup a region/store with some files
298     Store store = r.getStore(COLUMN_FAMILY);
299     createStoreFile(r);
300     for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
301       createStoreFile(r);
302     }
303 
304     CountDownLatch latch = new CountDownLatch(1);
305     TrackableCompactionRequest request = new TrackableCompactionRequest(latch);
306     thread.requestCompaction(r, store, "test custom comapction", Store.PRIORITY_USER, request,null);
307     // wait for the latch to complete.
308     latch.await();
309 
310     thread.interruptIfNecessary();
311   }
312 
313   /**
314    * HBASE-7947: Regression test to ensure adding to the correct list in the
315    * {@link CompactSplitThread}
316    * @throws Exception on failure
317    */
318   @Test
319   public void testMultipleCustomCompactionRequests() throws Exception {
320     // setup a compact/split thread on a mock server
321     HRegionServer mockServer = Mockito.mock(HRegionServer.class);
322     Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
323     CompactSplitThread thread = new CompactSplitThread(mockServer);
324     Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
325 
326     // setup a region/store with some files
327     int numStores = r.getStores().size();
328     List<Pair<CompactionRequest, Store>> requests =
329         new ArrayList<Pair<CompactionRequest, Store>>(numStores);
330     CountDownLatch latch = new CountDownLatch(numStores);
331     // create some store files and setup requests for each store on which we want to do a
332     // compaction
333     for (Store store : r.getStores()) {
334       createStoreFile(r, store.getColumnFamilyName());
335       createStoreFile(r, store.getColumnFamilyName());
336       createStoreFile(r, store.getColumnFamilyName());
337       requests
338           .add(new Pair<CompactionRequest, Store>(new TrackableCompactionRequest(latch), store));
339     }
340 
341     thread.requestCompaction(r, "test mulitple custom comapctions", Store.PRIORITY_USER,
342       Collections.unmodifiableList(requests), null);
343 
344     // wait for the latch to complete.
345     latch.await();
346 
347     thread.interruptIfNecessary();
348   }
349 
350   private class StoreMockMaker extends StatefulStoreMockMaker {
351     public ArrayList<StoreFile> compacting = new ArrayList<StoreFile>();
352     public ArrayList<StoreFile> notCompacting = new ArrayList<StoreFile>();
353     private ArrayList<Integer> results;
354 
355     public StoreMockMaker(ArrayList<Integer> results) {
356       this.results = results;
357     }
358 
359     public class TestCompactionContext extends CompactionContext {
360       private List<StoreFile> selectedFiles;
361       public TestCompactionContext(List<StoreFile> selectedFiles) {
362         super();
363         this.selectedFiles = selectedFiles;
364       }
365 
366       @Override
367       public List<StoreFile> preSelect(List<StoreFile> filesCompacting) {
368         return new ArrayList<StoreFile>();
369       }
370 
371       @Override
372       public boolean select(List<StoreFile> filesCompacting, boolean isUserCompaction,
373           boolean mayUseOffPeak, boolean forceMajor) throws IOException {
374         this.request = new CompactionRequest(selectedFiles);
375         this.request.setPriority(getPriority());
376         return true;
377       }
378 
379       @Override
380       public List<Path> compact(CompactionThroughputController throughputController)
381           throws IOException {
382         return compact(throughputController, null);
383       }
384 
385       @Override
386       public List<Path> compact(CompactionThroughputController throughputController, User user)
387           throws IOException {
388         finishCompaction(this.selectedFiles);
389         return new ArrayList<Path>();
390       }
391     }
392 
393     @Override
394     public synchronized CompactionContext selectCompaction() {
395       CompactionContext ctx = new TestCompactionContext(new ArrayList<StoreFile>(notCompacting));
396       compacting.addAll(notCompacting);
397       notCompacting.clear();
398       try {
399         ctx.select(null, false, false, false);
400       } catch (IOException ex) {
401         fail("Shouldn't happen");
402       }
403       return ctx;
404     }
405 
406     @Override
407     public synchronized void cancelCompaction(Object object) {
408       TestCompactionContext ctx = (TestCompactionContext)object;
409       compacting.removeAll(ctx.selectedFiles);
410       notCompacting.addAll(ctx.selectedFiles);
411     }
412 
413     public synchronized void finishCompaction(List<StoreFile> sfs) {
414       if (sfs.isEmpty()) return;
415       synchronized (results) {
416         results.add(sfs.size());
417       }
418       compacting.removeAll(sfs);
419     }
420 
421     @Override
422     public int getPriority() {
423       return 7 - compacting.size() - notCompacting.size();
424     }
425   }
426 
427   public class BlockingStoreMockMaker extends StatefulStoreMockMaker {
428     BlockingCompactionContext blocked = null;
429 
430     public class BlockingCompactionContext extends CompactionContext {
431       public volatile boolean isInCompact = false;
432 
433       public void unblock() {
434         synchronized (this) { this.notifyAll(); }
435       }
436 
437       @Override
438       public List<Path> compact(CompactionThroughputController throughputController)
439           throws IOException {
440         return compact(throughputController, null);
441       }
442 
443       @Override
444       public List<Path> compact(CompactionThroughputController throughputController, User user)
445           throws IOException {
446         try {
447           isInCompact = true;
448           synchronized (this) {
449             this.wait();
450           }
451         } catch (InterruptedException e) {
452           Assume.assumeNoException(e);
453         }
454         return new ArrayList<Path>();
455       }
456 
457       @Override
458       public List<StoreFile> preSelect(List<StoreFile> filesCompacting) {
459         return new ArrayList<StoreFile>();
460       }
461 
462       @Override
463       public boolean select(List<StoreFile> f, boolean i, boolean m, boolean e)
464           throws IOException {
465         this.request = new CompactionRequest(new ArrayList<StoreFile>());
466         return true;
467       }
468     }
469 
470     @Override
471     public CompactionContext selectCompaction() {
472       this.blocked = new BlockingCompactionContext();
473       try {
474         this.blocked.select(null, false, false, false);
475       } catch (IOException ex) {
476         fail("Shouldn't happen");
477       }
478       return this.blocked;
479     }
480 
481     @Override
482     public void cancelCompaction(Object object) {}
483 
484     @Override
485     public int getPriority() {
486       return Integer.MIN_VALUE; // some invalid value, see createStoreMock
487     }
488 
489     public BlockingCompactionContext waitForBlocking() {
490       while (this.blocked == null || !this.blocked.isInCompact) {
491         Threads.sleepWithoutInterrupt(50);
492       }
493       BlockingCompactionContext ctx = this.blocked;
494       this.blocked = null;
495       return ctx;
496     }
497 
498     @Override
499     public Store createStoreMock(String name) throws Exception {
500       return createStoreMock(Integer.MIN_VALUE, name);
501     }
502 
503     public Store createStoreMock(int priority, String name) throws Exception {
504       // Override the mock to always return the specified priority.
505       Store s = super.createStoreMock(name);
506       when(s.getCompactPriority()).thenReturn(priority);
507       return s;
508     }
509   }
510 
511   /** Test compaction priority management and multiple compactions per store (HBASE-8665). */
512   @Test
513   public void testCompactionQueuePriorities() throws Exception {
514     // Setup a compact/split thread on a mock server.
515     final Configuration conf = HBaseConfiguration.create();
516     HRegionServer mockServer = mock(HRegionServer.class);
517     when(mockServer.isStopped()).thenReturn(false);
518     when(mockServer.getConfiguration()).thenReturn(conf);
519     when(mockServer.getChoreService()).thenReturn(new ChoreService("test"));
520     CompactSplitThread cst = new CompactSplitThread(mockServer);
521     when(mockServer.getCompactSplitThread()).thenReturn(cst);
522 
523     // Set up the region mock that redirects compactions.
524     HRegion r = mock(HRegion.class);
525     when(
526       r.compact(any(CompactionContext.class), any(Store.class),
527         any(CompactionThroughputController.class), any(User.class))).then(new Answer<Boolean>() {
528       public Boolean answer(InvocationOnMock invocation) throws Throwable {
529         invocation.getArgumentAt(0, CompactionContext.class).compact(
530           invocation.getArgumentAt(2, CompactionThroughputController.class));
531         return true;
532       }
533     });
534 
535     // Set up store mocks for 2 "real" stores and the one we use for blocking CST.
536     ArrayList<Integer> results = new ArrayList<Integer>();
537     StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results);
538     Store store = sm.createStoreMock("store1"), store2 = sm2.createStoreMock("store2");
539     BlockingStoreMockMaker blocker = new BlockingStoreMockMaker();
540 
541     // First, block the compaction thread so that we could muck with queue.
542     cst.requestSystemCompaction(r, blocker.createStoreMock(1, "b-pri1"), "b-pri1");
543     BlockingStoreMockMaker.BlockingCompactionContext currentBlock = blocker.waitForBlocking();
544 
545     // Add 4 files to store1, 3 to store2, and queue compactions; pri 3 and 4 respectively.
546     for (int i = 0; i < 4; ++i) {
547       sm.notCompacting.add(createFile());
548     }
549     cst.requestSystemCompaction(r, store, "s1-pri3");
550     for (int i = 0; i < 3; ++i) {
551       sm2.notCompacting.add(createFile());
552     }
553     cst.requestSystemCompaction(r, store2, "s2-pri4");
554     // Now add 2 more files to store1 and queue compaction - pri 1.
555     for (int i = 0; i < 2; ++i) {
556       sm.notCompacting.add(createFile());
557     }
558     cst.requestSystemCompaction(r, store, "s1-pri1");
559     // Finally add blocking compaction with priority 2.
560     cst.requestSystemCompaction(r, blocker.createStoreMock(2, "b-pri2"), "b-pri2");
561 
562     // Unblock the blocking compaction; we should run pri1 and become block again in pri2.
563     currentBlock.unblock();
564     currentBlock = blocker.waitForBlocking();
565     // Pri1 should have "compacted" all 6 files.
566     assertEquals(1, results.size());
567     assertEquals(6, results.get(0).intValue());
568     // Add 2 files to store 1 (it has 2 files now).
569     for (int i = 0; i < 2; ++i) {
570       sm.notCompacting.add(createFile());
571     }
572     // Now we have pri4 for store 2 in queue, and pri3 for store1; store1's current priority
573     // is 5, however, so it must not preempt store 2. Add blocking compaction at the end.
574     cst.requestSystemCompaction(r, blocker.createStoreMock(7, "b-pri7"), "b-pri7");
575     currentBlock.unblock();
576     currentBlock = blocker.waitForBlocking();
577     assertEquals(3, results.size());
578     assertEquals(3, results.get(1).intValue()); // 3 files should go before 2 files.
579     assertEquals(2, results.get(2).intValue());
580 
581     currentBlock.unblock();
582     cst.interruptIfNecessary();
583   }
584 
585   /**
586    * Firstly write 10 cells (with different time stamp) to a qualifier and flush
587    * to hfile1, then write 10 cells (with different time stamp) to the same
588    * qualifier and flush to hfile2. The latest cell (cell-A) in hfile1 and the
589    * oldest cell (cell-B) in hfile2 are with the same time stamp but different
590    * sequence id, and will get scanned successively during compaction.
591    * <p/>
592    * We set compaction.kv.max to 10 so compaction will scan 10 versions each
593    * round, meanwhile we set keepSeqIdPeriod=0 in {@link DummyCompactor} so all
594    * 10 versions of hfile2 will be written out with seqId cleaned (set to 0)
595    * including cell-B, then when scanner goes to cell-A it will cause a scan
596    * out-of-order assertion error before HBASE-16931
597    *
598    * @throws Exception
599    *           if error occurs during the test
600    */
601   @Test
602   public void testCompactionSeqId() throws Exception {
603     final byte[] ROW = Bytes.toBytes("row");
604     final byte[] QUALIFIER = Bytes.toBytes("qualifier");
605 
606     long timestamp = 10000;
607 
608     // row1/cf:a/10009/Put/vlen=2/seqid=11 V: v9
609     // row1/cf:a/10008/Put/vlen=2/seqid=10 V: v8
610     // row1/cf:a/10007/Put/vlen=2/seqid=9 V: v7
611     // row1/cf:a/10006/Put/vlen=2/seqid=8 V: v6
612     // row1/cf:a/10005/Put/vlen=2/seqid=7 V: v5
613     // row1/cf:a/10004/Put/vlen=2/seqid=6 V: v4
614     // row1/cf:a/10003/Put/vlen=2/seqid=5 V: v3
615     // row1/cf:a/10002/Put/vlen=2/seqid=4 V: v2
616     // row1/cf:a/10001/Put/vlen=2/seqid=3 V: v1
617     // row1/cf:a/10000/Put/vlen=2/seqid=2 V: v0
618     for (int i = 0; i < 10; i++) {
619       Put put = new Put(ROW);
620       put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i));
621       r.put(put);
622     }
623     r.flush(true);
624 
625     // row1/cf:a/10018/Put/vlen=3/seqid=16 V: v18
626     // row1/cf:a/10017/Put/vlen=3/seqid=17 V: v17
627     // row1/cf:a/10016/Put/vlen=3/seqid=18 V: v16
628     // row1/cf:a/10015/Put/vlen=3/seqid=19 V: v15
629     // row1/cf:a/10014/Put/vlen=3/seqid=20 V: v14
630     // row1/cf:a/10013/Put/vlen=3/seqid=21 V: v13
631     // row1/cf:a/10012/Put/vlen=3/seqid=22 V: v12
632     // row1/cf:a/10011/Put/vlen=3/seqid=23 V: v11
633     // row1/cf:a/10010/Put/vlen=3/seqid=24 V: v10
634     // row1/cf:a/10009/Put/vlen=2/seqid=25 V: v9
635     for (int i = 18; i > 8; i--) {
636       Put put = new Put(ROW);
637       put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i));
638       r.put(put);
639     }
640     r.flush(true);
641     r.compact(true);
642   }
643 
644   public static class DummyCompactor extends DefaultCompactor {
645     public DummyCompactor(Configuration conf, Store store) {
646       super(conf, store);
647       this.keepSeqIdPeriod = 0;
648     }
649   }
650 
651   private static StoreFile createFile() throws Exception {
652     StoreFile sf = mock(StoreFile.class);
653     when(sf.getPath()).thenReturn(new Path("file"));
654     StoreFile.Reader r = mock(StoreFile.Reader.class);
655     when(r.length()).thenReturn(10L);
656     when(sf.getReader()).thenReturn(r);
657     return sf;
658   }
659 
660   /**
661    * Simple {@link CompactionRequest} on which you can wait until the requested compaction finishes.
662    */
663   public static class TrackableCompactionRequest extends CompactionRequest {
664     private CountDownLatch done;
665 
666     /**
667      * Constructor for a custom compaction. Uses the setXXX methods to update the state of the
668      * compaction before being used.
669      */
670     public TrackableCompactionRequest(CountDownLatch finished) {
671       super();
672       this.done = finished;
673     }
674 
675     @Override
676     public void afterExecute() {
677       super.afterExecute();
678       this.done.countDown();
679     }
680   }
681 }