1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
22 import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES;
23 import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertTrue;
26 import static org.junit.Assert.fail;
27 import static org.mockito.Matchers.any;
28 import static org.mockito.Mockito.doAnswer;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.spy;
31 import static org.mockito.Mockito.when;
32
33 import java.io.IOException;
34 import java.util.ArrayList;
35 import java.util.Collection;
36 import java.util.Collections;
37 import java.util.List;
38 import java.util.concurrent.CountDownLatch;
39
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42 import org.apache.hadoop.conf.Configuration;
43 import org.apache.hadoop.fs.FSDataOutputStream;
44 import org.apache.hadoop.fs.FileStatus;
45 import org.apache.hadoop.fs.FileSystem;
46 import org.apache.hadoop.fs.Path;
47 import org.apache.hadoop.hbase.ChoreService;
48 import org.apache.hadoop.hbase.HBaseConfiguration;
49 import org.apache.hadoop.hbase.HBaseTestCase;
50 import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon;
51 import org.apache.hadoop.hbase.HBaseTestingUtility;
52 import org.apache.hadoop.hbase.HColumnDescriptor;
53 import org.apache.hadoop.hbase.HConstants;
54 import org.apache.hadoop.hbase.HTableDescriptor;
55 import org.apache.hadoop.hbase.client.Delete;
56 import org.apache.hadoop.hbase.client.Durability;
57 import org.apache.hadoop.hbase.client.Put;
58 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
59 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
60 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
61 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
62 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
63 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
64 import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
65 import org.apache.hadoop.hbase.security.User;
66 import org.apache.hadoop.hbase.testclassification.MediumTests;
67 import org.apache.hadoop.hbase.util.Bytes;
68 import org.apache.hadoop.hbase.util.Pair;
69 import org.apache.hadoop.hbase.util.Threads;
70 import org.apache.hadoop.hbase.wal.WAL;
71 import org.junit.After;
72 import org.junit.Assume;
73 import org.junit.Before;
74 import org.junit.Rule;
75 import org.junit.Test;
76 import org.junit.experimental.categories.Category;
77 import org.junit.rules.TestName;
78 import org.mockito.Mockito;
79 import org.mockito.invocation.InvocationOnMock;
80 import org.mockito.stubbing.Answer;
81
82
83
84
85
86 @Category(MediumTests.class)
87 public class TestCompaction {
88 @Rule public TestName name = new TestName();
89 static final Log LOG = LogFactory.getLog(TestCompaction.class.getName());
90 private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU();
91 protected Configuration conf = UTIL.getConfiguration();
92
93 private HRegion r = null;
94 private HTableDescriptor htd = null;
95 private static final byte [] COLUMN_FAMILY = fam1;
96 private final byte [] STARTROW = Bytes.toBytes(START_KEY);
97 private static final byte [] COLUMN_FAMILY_TEXT = COLUMN_FAMILY;
98 private int compactionThreshold;
99 private byte[] secondRowBytes, thirdRowBytes;
100 private static final long MAX_FILES_TO_COMPACT = 10;
101 private final byte[] FAMILY = Bytes.toBytes("cf");
102
103
104 public TestCompaction() {
105 super();
106
107
108 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
109 conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100);
110 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
111 NoLimitCompactionThroughputController.class.getName());
112 compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
113
114 secondRowBytes = START_KEY_BYTES.clone();
115
116 secondRowBytes[START_KEY_BYTES.length - 1]++;
117 thirdRowBytes = START_KEY_BYTES.clone();
118 thirdRowBytes[START_KEY_BYTES.length - 1] += 2;
119 }
120
121 @Before
122 public void setUp() throws Exception {
123 this.htd = UTIL.createTableDescriptor(name.getMethodName());
124 if (name.getMethodName().equals("testCompactionSeqId")) {
125 UTIL.getConfiguration().set("hbase.hstore.compaction.kv.max", "10");
126 UTIL.getConfiguration().set(
127 DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY,
128 DummyCompactor.class.getName());
129 HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
130 hcd.setMaxVersions(65536);
131 this.htd.addFamily(hcd);
132 }
133 this.r = UTIL.createLocalHRegion(htd, null, null);
134 }
135
136 @After
137 public void tearDown() throws Exception {
138 WAL wal = r.getWAL();
139 this.r.close();
140 wal.close();
141 }
142
143
144
145
146
147
148 @Test
149 public void testInterruptCompaction() throws Exception {
150 assertEquals(0, count());
151
152
153 int origWI = HStore.closeCheckInterval;
154 HStore.closeCheckInterval = 10*1000;
155
156 try {
157
158 int jmax = (int) Math.ceil(15.0/compactionThreshold);
159 byte [] pad = new byte[1000];
160 for (int i = 0; i < compactionThreshold; i++) {
161 HRegionIncommon loader = new HRegionIncommon(r);
162 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
163 p.setDurability(Durability.SKIP_WAL);
164 for (int j = 0; j < jmax; j++) {
165 p.add(COLUMN_FAMILY, Bytes.toBytes(j), pad);
166 }
167 HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY));
168 loader.put(p);
169 loader.flushcache();
170 }
171
172 HRegion spyR = spy(r);
173 doAnswer(new Answer() {
174 @Override
175 public Object answer(InvocationOnMock invocation) throws Throwable {
176 r.writestate.writesEnabled = false;
177 return invocation.callRealMethod();
178 }
179 }).when(spyR).doRegionCompactionPrep();
180
181
182 spyR.compactStores();
183
184
185 Store s = r.stores.get(COLUMN_FAMILY);
186 assertEquals(compactionThreshold, s.getStorefilesCount());
187 assertTrue(s.getStorefilesSize() > 15*1000);
188
189 FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
190 assertEquals(0, ls.length);
191
192 } finally {
193
194 r.writestate.writesEnabled = true;
195 HStore.closeCheckInterval = origWI;
196
197
198 for (int i = 0; i < compactionThreshold; i++) {
199 Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
200 byte [][] famAndQf = {COLUMN_FAMILY, null};
201 delete.deleteFamily(famAndQf[0]);
202 r.delete(delete);
203 }
204 r.flush(true);
205
206
207
208 final int ttl = 1000;
209 for (Store hstore: this.r.stores.values()) {
210 HStore store = (HStore)hstore;
211 ScanInfo old = store.getScanInfo();
212 ScanInfo si = new ScanInfo(old.getFamily(),
213 old.getMinVersions(), old.getMaxVersions(), ttl,
214 old.getKeepDeletedCells(), 0, old.getComparator());
215 store.setScanInfo(si);
216 }
217 Thread.sleep(ttl);
218
219 r.compact(true);
220 assertEquals(0, count());
221 }
222 }
223
224 private int count() throws IOException {
225 int count = 0;
226 for (StoreFile f: this.r.stores.
227 get(COLUMN_FAMILY_TEXT).getStorefiles()) {
228 HFileScanner scanner = f.getReader().getScanner(false, false);
229 if (!scanner.seekTo()) {
230 continue;
231 }
232 do {
233 count++;
234 } while(scanner.next());
235 }
236 return count;
237 }
238
239 private void createStoreFile(final HRegion region) throws IOException {
240 createStoreFile(region, Bytes.toString(COLUMN_FAMILY));
241 }
242
243 private void createStoreFile(final HRegion region, String family) throws IOException {
244 HRegionIncommon loader = new HRegionIncommon(region);
245 HBaseTestCase.addContent(loader, family);
246 loader.flushcache();
247 }
248
249 @Test
250 public void testCompactionWithCorruptResult() throws Exception {
251 int nfiles = 10;
252 for (int i = 0; i < nfiles; i++) {
253 createStoreFile(r);
254 }
255 HStore store = (HStore) r.getStore(COLUMN_FAMILY);
256
257 Collection<StoreFile> storeFiles = store.getStorefiles();
258 DefaultCompactor tool = (DefaultCompactor)store.storeEngine.getCompactor();
259 tool.compactForTesting(storeFiles, false);
260
261
262 FileSystem fs = store.getFileSystem();
263
264 Path dstPath = store.getRegionFileSystem().createTempName();
265 FSDataOutputStream stream = fs.create(dstPath, null, true, 512, (short)3, (long)1024, null);
266 stream.writeChars("CORRUPT FILE!!!!");
267 stream.close();
268 Path origPath = store.getRegionFileSystem().commitStoreFile(
269 Bytes.toString(COLUMN_FAMILY), dstPath);
270
271 try {
272 ((HStore)store).moveFileIntoPlace(origPath);
273 } catch (Exception e) {
274
275
276 assert (fs.exists(origPath));
277 assert (!fs.exists(dstPath));
278 System.out.println("testCompactionWithCorruptResult Passed");
279 return;
280 }
281 fail("testCompactionWithCorruptResult failed since no exception was" +
282 "thrown while completing a corrupt file");
283 }
284
285
286
287
288
289 @Test
290 public void testTrackingCompactionRequest() throws Exception {
291
292 HRegionServer mockServer = Mockito.mock(HRegionServer.class);
293 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
294 CompactSplitThread thread = new CompactSplitThread(mockServer);
295 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
296
297
298 Store store = r.getStore(COLUMN_FAMILY);
299 createStoreFile(r);
300 for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
301 createStoreFile(r);
302 }
303
304 CountDownLatch latch = new CountDownLatch(1);
305 TrackableCompactionRequest request = new TrackableCompactionRequest(latch);
306 thread.requestCompaction(r, store, "test custom comapction", Store.PRIORITY_USER, request,null);
307
308 latch.await();
309
310 thread.interruptIfNecessary();
311 }
312
313
314
315
316
317
318 @Test
319 public void testMultipleCustomCompactionRequests() throws Exception {
320
321 HRegionServer mockServer = Mockito.mock(HRegionServer.class);
322 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
323 CompactSplitThread thread = new CompactSplitThread(mockServer);
324 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
325
326
327 int numStores = r.getStores().size();
328 List<Pair<CompactionRequest, Store>> requests =
329 new ArrayList<Pair<CompactionRequest, Store>>(numStores);
330 CountDownLatch latch = new CountDownLatch(numStores);
331
332
333 for (Store store : r.getStores()) {
334 createStoreFile(r, store.getColumnFamilyName());
335 createStoreFile(r, store.getColumnFamilyName());
336 createStoreFile(r, store.getColumnFamilyName());
337 requests
338 .add(new Pair<CompactionRequest, Store>(new TrackableCompactionRequest(latch), store));
339 }
340
341 thread.requestCompaction(r, "test mulitple custom comapctions", Store.PRIORITY_USER,
342 Collections.unmodifiableList(requests), null);
343
344
345 latch.await();
346
347 thread.interruptIfNecessary();
348 }
349
350 private class StoreMockMaker extends StatefulStoreMockMaker {
351 public ArrayList<StoreFile> compacting = new ArrayList<StoreFile>();
352 public ArrayList<StoreFile> notCompacting = new ArrayList<StoreFile>();
353 private ArrayList<Integer> results;
354
355 public StoreMockMaker(ArrayList<Integer> results) {
356 this.results = results;
357 }
358
359 public class TestCompactionContext extends CompactionContext {
360 private List<StoreFile> selectedFiles;
361 public TestCompactionContext(List<StoreFile> selectedFiles) {
362 super();
363 this.selectedFiles = selectedFiles;
364 }
365
366 @Override
367 public List<StoreFile> preSelect(List<StoreFile> filesCompacting) {
368 return new ArrayList<StoreFile>();
369 }
370
371 @Override
372 public boolean select(List<StoreFile> filesCompacting, boolean isUserCompaction,
373 boolean mayUseOffPeak, boolean forceMajor) throws IOException {
374 this.request = new CompactionRequest(selectedFiles);
375 this.request.setPriority(getPriority());
376 return true;
377 }
378
379 @Override
380 public List<Path> compact(CompactionThroughputController throughputController)
381 throws IOException {
382 return compact(throughputController, null);
383 }
384
385 @Override
386 public List<Path> compact(CompactionThroughputController throughputController, User user)
387 throws IOException {
388 finishCompaction(this.selectedFiles);
389 return new ArrayList<Path>();
390 }
391 }
392
393 @Override
394 public synchronized CompactionContext selectCompaction() {
395 CompactionContext ctx = new TestCompactionContext(new ArrayList<StoreFile>(notCompacting));
396 compacting.addAll(notCompacting);
397 notCompacting.clear();
398 try {
399 ctx.select(null, false, false, false);
400 } catch (IOException ex) {
401 fail("Shouldn't happen");
402 }
403 return ctx;
404 }
405
406 @Override
407 public synchronized void cancelCompaction(Object object) {
408 TestCompactionContext ctx = (TestCompactionContext)object;
409 compacting.removeAll(ctx.selectedFiles);
410 notCompacting.addAll(ctx.selectedFiles);
411 }
412
413 public synchronized void finishCompaction(List<StoreFile> sfs) {
414 if (sfs.isEmpty()) return;
415 synchronized (results) {
416 results.add(sfs.size());
417 }
418 compacting.removeAll(sfs);
419 }
420
421 @Override
422 public int getPriority() {
423 return 7 - compacting.size() - notCompacting.size();
424 }
425 }
426
427 public class BlockingStoreMockMaker extends StatefulStoreMockMaker {
428 BlockingCompactionContext blocked = null;
429
430 public class BlockingCompactionContext extends CompactionContext {
431 public volatile boolean isInCompact = false;
432
433 public void unblock() {
434 synchronized (this) { this.notifyAll(); }
435 }
436
437 @Override
438 public List<Path> compact(CompactionThroughputController throughputController)
439 throws IOException {
440 return compact(throughputController, null);
441 }
442
443 @Override
444 public List<Path> compact(CompactionThroughputController throughputController, User user)
445 throws IOException {
446 try {
447 isInCompact = true;
448 synchronized (this) {
449 this.wait();
450 }
451 } catch (InterruptedException e) {
452 Assume.assumeNoException(e);
453 }
454 return new ArrayList<Path>();
455 }
456
457 @Override
458 public List<StoreFile> preSelect(List<StoreFile> filesCompacting) {
459 return new ArrayList<StoreFile>();
460 }
461
462 @Override
463 public boolean select(List<StoreFile> f, boolean i, boolean m, boolean e)
464 throws IOException {
465 this.request = new CompactionRequest(new ArrayList<StoreFile>());
466 return true;
467 }
468 }
469
470 @Override
471 public CompactionContext selectCompaction() {
472 this.blocked = new BlockingCompactionContext();
473 try {
474 this.blocked.select(null, false, false, false);
475 } catch (IOException ex) {
476 fail("Shouldn't happen");
477 }
478 return this.blocked;
479 }
480
481 @Override
482 public void cancelCompaction(Object object) {}
483
484 @Override
485 public int getPriority() {
486 return Integer.MIN_VALUE;
487 }
488
489 public BlockingCompactionContext waitForBlocking() {
490 while (this.blocked == null || !this.blocked.isInCompact) {
491 Threads.sleepWithoutInterrupt(50);
492 }
493 BlockingCompactionContext ctx = this.blocked;
494 this.blocked = null;
495 return ctx;
496 }
497
498 @Override
499 public Store createStoreMock(String name) throws Exception {
500 return createStoreMock(Integer.MIN_VALUE, name);
501 }
502
503 public Store createStoreMock(int priority, String name) throws Exception {
504
505 Store s = super.createStoreMock(name);
506 when(s.getCompactPriority()).thenReturn(priority);
507 return s;
508 }
509 }
510
511
512 @Test
513 public void testCompactionQueuePriorities() throws Exception {
514
515 final Configuration conf = HBaseConfiguration.create();
516 HRegionServer mockServer = mock(HRegionServer.class);
517 when(mockServer.isStopped()).thenReturn(false);
518 when(mockServer.getConfiguration()).thenReturn(conf);
519 when(mockServer.getChoreService()).thenReturn(new ChoreService("test"));
520 CompactSplitThread cst = new CompactSplitThread(mockServer);
521 when(mockServer.getCompactSplitThread()).thenReturn(cst);
522
523
524 HRegion r = mock(HRegion.class);
525 when(
526 r.compact(any(CompactionContext.class), any(Store.class),
527 any(CompactionThroughputController.class), any(User.class))).then(new Answer<Boolean>() {
528 public Boolean answer(InvocationOnMock invocation) throws Throwable {
529 invocation.getArgumentAt(0, CompactionContext.class).compact(
530 invocation.getArgumentAt(2, CompactionThroughputController.class));
531 return true;
532 }
533 });
534
535
536 ArrayList<Integer> results = new ArrayList<Integer>();
537 StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results);
538 Store store = sm.createStoreMock("store1"), store2 = sm2.createStoreMock("store2");
539 BlockingStoreMockMaker blocker = new BlockingStoreMockMaker();
540
541
542 cst.requestSystemCompaction(r, blocker.createStoreMock(1, "b-pri1"), "b-pri1");
543 BlockingStoreMockMaker.BlockingCompactionContext currentBlock = blocker.waitForBlocking();
544
545
546 for (int i = 0; i < 4; ++i) {
547 sm.notCompacting.add(createFile());
548 }
549 cst.requestSystemCompaction(r, store, "s1-pri3");
550 for (int i = 0; i < 3; ++i) {
551 sm2.notCompacting.add(createFile());
552 }
553 cst.requestSystemCompaction(r, store2, "s2-pri4");
554
555 for (int i = 0; i < 2; ++i) {
556 sm.notCompacting.add(createFile());
557 }
558 cst.requestSystemCompaction(r, store, "s1-pri1");
559
560 cst.requestSystemCompaction(r, blocker.createStoreMock(2, "b-pri2"), "b-pri2");
561
562
563 currentBlock.unblock();
564 currentBlock = blocker.waitForBlocking();
565
566 assertEquals(1, results.size());
567 assertEquals(6, results.get(0).intValue());
568
569 for (int i = 0; i < 2; ++i) {
570 sm.notCompacting.add(createFile());
571 }
572
573
574 cst.requestSystemCompaction(r, blocker.createStoreMock(7, "b-pri7"), "b-pri7");
575 currentBlock.unblock();
576 currentBlock = blocker.waitForBlocking();
577 assertEquals(3, results.size());
578 assertEquals(3, results.get(1).intValue());
579 assertEquals(2, results.get(2).intValue());
580
581 currentBlock.unblock();
582 cst.interruptIfNecessary();
583 }
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601 @Test
602 public void testCompactionSeqId() throws Exception {
603 final byte[] ROW = Bytes.toBytes("row");
604 final byte[] QUALIFIER = Bytes.toBytes("qualifier");
605
606 long timestamp = 10000;
607
608
609
610
611
612
613
614
615
616
617
618 for (int i = 0; i < 10; i++) {
619 Put put = new Put(ROW);
620 put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i));
621 r.put(put);
622 }
623 r.flush(true);
624
625
626
627
628
629
630
631
632
633
634
635 for (int i = 18; i > 8; i--) {
636 Put put = new Put(ROW);
637 put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i));
638 r.put(put);
639 }
640 r.flush(true);
641 r.compact(true);
642 }
643
644 public static class DummyCompactor extends DefaultCompactor {
645 public DummyCompactor(Configuration conf, Store store) {
646 super(conf, store);
647 this.keepSeqIdPeriod = 0;
648 }
649 }
650
651 private static StoreFile createFile() throws Exception {
652 StoreFile sf = mock(StoreFile.class);
653 when(sf.getPath()).thenReturn(new Path("file"));
654 StoreFile.Reader r = mock(StoreFile.Reader.class);
655 when(r.length()).thenReturn(10L);
656 when(sf.getReader()).thenReturn(r);
657 return sf;
658 }
659
660
661
662
663 public static class TrackableCompactionRequest extends CompactionRequest {
664 private CountDownLatch done;
665
666
667
668
669
670 public TrackableCompactionRequest(CountDownLatch finished) {
671 super();
672 this.done = finished;
673 }
674
675 @Override
676 public void afterExecute() {
677 super.afterExecute();
678 this.done.countDown();
679 }
680 }
681 }