1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
22 import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES;
23 import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertNotNull;
26 import static org.junit.Assert.assertNull;
27 import static org.junit.Assert.assertTrue;
28
29 import java.io.IOException;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.HashMap;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Map.Entry;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.hbase.Cell;
41 import org.apache.hadoop.hbase.CellUtil;
42 import org.apache.hadoop.hbase.HBaseTestCase;
43 import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon;
44 import org.apache.hadoop.hbase.HBaseTestingUtility;
45 import org.apache.hadoop.hbase.HConstants;
46 import org.apache.hadoop.hbase.HTableDescriptor;
47 import org.apache.hadoop.hbase.client.Delete;
48 import org.apache.hadoop.hbase.client.Get;
49 import org.apache.hadoop.hbase.client.Result;
50 import org.apache.hadoop.hbase.client.Scan;
51 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
52 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
53 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
54 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
55 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
56 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
57 import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
58 import org.apache.hadoop.hbase.testclassification.MediumTests;
59 import org.apache.hadoop.hbase.util.Bytes;
60 import org.apache.hadoop.hbase.wal.WAL;
61 import org.junit.After;
62 import org.junit.Before;
63 import org.junit.Rule;
64 import org.junit.Test;
65 import org.junit.experimental.categories.Category;
66 import org.junit.rules.TestName;
67
68
69
70
71
72 @Category(MediumTests.class)
73 public class TestMajorCompaction {
74 @Rule public TestName name = new TestName();
75 static final Log LOG = LogFactory.getLog(TestMajorCompaction.class.getName());
76 private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU();
77 protected Configuration conf = UTIL.getConfiguration();
78
79 private Region r = null;
80 private HTableDescriptor htd = null;
81 private static final byte [] COLUMN_FAMILY = fam1;
82 private final byte [] STARTROW = Bytes.toBytes(START_KEY);
83 private static final byte [] COLUMN_FAMILY_TEXT = COLUMN_FAMILY;
84 private int compactionThreshold;
85 private byte[] secondRowBytes, thirdRowBytes;
86 private static final long MAX_FILES_TO_COMPACT = 10;
87
88
89 public TestMajorCompaction() {
90 super();
91
92
93 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024*1024);
94 conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100);
95 compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
96
97 secondRowBytes = START_KEY_BYTES.clone();
98
99 secondRowBytes[START_KEY_BYTES.length - 1]++;
100 thirdRowBytes = START_KEY_BYTES.clone();
101 thirdRowBytes[START_KEY_BYTES.length - 1] += 2;
102 }
103
104 @Before
105 public void setUp() throws Exception {
106 this.htd = UTIL.createTableDescriptor(name.getMethodName());
107 this.r = UTIL.createLocalHRegion(htd, null, null);
108 }
109
110 @After
111 public void tearDown() throws Exception {
112 WAL wal = ((HRegion)r).getWAL();
113 ((HRegion)r).close();
114 wal.close();
115 }
116
117
118
119
120
121
122
123 @Test
124 public void testMajorCompactingToNoOutput() throws IOException {
125 createStoreFile(r);
126 for (int i = 0; i < compactionThreshold; i++) {
127 createStoreFile(r);
128 }
129
130 InternalScanner s = r.getScanner(new Scan());
131 do {
132 List<Cell> results = new ArrayList<Cell>();
133 boolean result = s.next(results);
134 r.delete(new Delete(CellUtil.cloneRow(results.get(0))));
135 if (!result) break;
136 } while(true);
137 s.close();
138
139 r.flush(true);
140
141 r.compact(true);
142 s = r.getScanner(new Scan());
143 int counter = 0;
144 do {
145 List<Cell> results = new ArrayList<Cell>();
146 boolean result = s.next(results);
147 if (!result) break;
148 counter++;
149 } while(true);
150 assertEquals(0, counter);
151 }
152
153
154
155
156
157
158 @Test
159 public void testMajorCompaction() throws Exception {
160 majorCompaction();
161 }
162
163 @Test
164 public void testDataBlockEncodingInCacheOnly() throws Exception {
165 majorCompactionWithDataBlockEncoding(true);
166 }
167
168 @Test
169 public void testDataBlockEncodingEverywhere() throws Exception {
170 majorCompactionWithDataBlockEncoding(false);
171 }
172
173 public void majorCompactionWithDataBlockEncoding(boolean inCacheOnly)
174 throws Exception {
175 Map<Store, HFileDataBlockEncoder> replaceBlockCache =
176 new HashMap<Store, HFileDataBlockEncoder>();
177 for (Store store : r.getStores()) {
178 HFileDataBlockEncoder blockEncoder = store.getDataBlockEncoder();
179 replaceBlockCache.put(store, blockEncoder);
180 final DataBlockEncoding inCache = DataBlockEncoding.PREFIX;
181 final DataBlockEncoding onDisk = inCacheOnly ? DataBlockEncoding.NONE :
182 inCache;
183 ((HStore)store).setDataBlockEncoderInTest(new HFileDataBlockEncoderImpl(onDisk));
184 }
185
186 majorCompaction();
187
188
189 for (Entry<Store, HFileDataBlockEncoder> entry : replaceBlockCache.entrySet()) {
190 ((HStore)entry.getKey()).setDataBlockEncoderInTest(entry.getValue());
191 }
192 }
193
194 private void majorCompaction() throws Exception {
195 createStoreFile(r);
196 for (int i = 0; i < compactionThreshold; i++) {
197 createStoreFile(r);
198 }
199
200 HBaseTestCase.addContent(new HRegionIncommon(r), Bytes.toString(COLUMN_FAMILY));
201
202
203
204
205
206 Result result = r.get(new Get(STARTROW).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100));
207 assertEquals(compactionThreshold, result.size());
208
209
210 for (Store store : r.getStores()) {
211 assertNull(store.getCompactionProgress());
212 }
213
214 r.flush(true);
215 r.compact(true);
216
217
218 int storeCount = 0;
219 for (Store store : r.getStores()) {
220 CompactionProgress progress = store.getCompactionProgress();
221 if( progress != null ) {
222 ++storeCount;
223 assertTrue(progress.currentCompactedKVs > 0);
224 assertTrue(progress.totalCompactingKVs > 0);
225 }
226 assertTrue(storeCount > 0);
227 }
228
229
230
231 byte [] secondRowBytes = START_KEY_BYTES.clone();
232 secondRowBytes[START_KEY_BYTES.length - 1]++;
233
234
235 result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).
236 setMaxVersions(100));
237 LOG.debug("Row " + Bytes.toStringBinary(secondRowBytes) + " after " +
238 "initial compaction: " + result);
239 assertEquals("Invalid number of versions of row "
240 + Bytes.toStringBinary(secondRowBytes) + ".", compactionThreshold,
241 result.size());
242
243
244
245
246
247
248 LOG.debug("Adding deletes to memstore and flushing");
249 Delete delete = new Delete(secondRowBytes, System.currentTimeMillis());
250 byte [][] famAndQf = {COLUMN_FAMILY, null};
251 delete.deleteFamily(famAndQf[0]);
252 r.delete(delete);
253
254
255 result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100));
256 assertTrue("Second row should have been deleted", result.isEmpty());
257
258 r.flush(true);
259
260 result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100));
261 assertTrue("Second row should have been deleted", result.isEmpty());
262
263
264 createSmallerStoreFile(this.r);
265 r.flush(true);
266
267 result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100));
268 assertTrue("Second row should still be deleted", result.isEmpty());
269
270
271 r.compact(true);
272 assertEquals(r.getStore(COLUMN_FAMILY_TEXT).getStorefiles().size(), 1);
273
274 result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100));
275 assertTrue("Second row should still be deleted", result.isEmpty());
276
277
278
279
280 verifyCounts(3,0);
281
282
283
284 final int ttl = 1000;
285 for (Store hstore : r.getStores()) {
286 HStore store = ((HStore) hstore);
287 ScanInfo old = store.getScanInfo();
288 ScanInfo si = new ScanInfo(old.getFamily(),
289 old.getMinVersions(), old.getMaxVersions(), ttl,
290 old.getKeepDeletedCells(), 0, old.getComparator());
291 store.setScanInfo(si);
292 }
293 Thread.sleep(1000);
294
295 r.compact(true);
296 int count = count();
297 assertEquals("Should not see anything after TTL has expired", 0, count);
298 }
299
300 @Test
301 public void testTimeBasedMajorCompaction() throws Exception {
302
303 int delay = 10 * 1000;
304 float jitterPct = 0.20f;
305 conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, delay);
306 conf.setFloat("hbase.hregion.majorcompaction.jitter", jitterPct);
307
308 HStore s = ((HStore) r.getStore(COLUMN_FAMILY));
309 s.storeEngine.getCompactionPolicy().setConf(conf);
310 try {
311 createStoreFile(r);
312 createStoreFile(r);
313 r.compact(true);
314
315
316 createStoreFile(r);
317 r.compact(false);
318 assertEquals(2, s.getStorefilesCount());
319
320
321 RatioBasedCompactionPolicy
322 c = (RatioBasedCompactionPolicy)s.storeEngine.getCompactionPolicy();
323 Collection<StoreFile> storeFiles = s.getStorefiles();
324 long mcTime = c.getNextMajorCompactTime(storeFiles);
325 for (int i = 0; i < 10; ++i) {
326 assertEquals(mcTime, c.getNextMajorCompactTime(storeFiles));
327 }
328
329
330 long jitter = Math.round(delay * jitterPct);
331 assertTrue(delay - jitter <= mcTime && mcTime <= delay + jitter);
332
333
334 Thread.sleep(mcTime);
335
336
337 r.compact(false);
338 assertEquals(1, s.getStorefilesCount());
339 } finally {
340
341 conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
342 conf.setFloat("hbase.hregion.majorcompaction.jitter", 0.20F);
343
344 createStoreFile(r);
345 r.compact(true);
346 assertEquals(1, s.getStorefilesCount());
347 }
348 }
349
350 private void verifyCounts(int countRow1, int countRow2) throws Exception {
351 int count1 = 0;
352 int count2 = 0;
353 for (StoreFile f: r.getStore(COLUMN_FAMILY_TEXT).getStorefiles()) {
354 HFileScanner scanner = f.getReader().getScanner(false, false);
355 scanner.seekTo();
356 do {
357 byte [] row = scanner.getKeyValue().getRow();
358 if (Bytes.equals(row, STARTROW)) {
359 count1++;
360 } else if(Bytes.equals(row, secondRowBytes)) {
361 count2++;
362 }
363 } while(scanner.next());
364 }
365 assertEquals(countRow1,count1);
366 assertEquals(countRow2,count2);
367 }
368
369
370 private int count() throws IOException {
371 int count = 0;
372 for (StoreFile f: r.getStore(COLUMN_FAMILY_TEXT).getStorefiles()) {
373 HFileScanner scanner = f.getReader().getScanner(false, false);
374 if (!scanner.seekTo()) {
375 continue;
376 }
377 do {
378 count++;
379 } while(scanner.next());
380 }
381 return count;
382 }
383
384 private void createStoreFile(final Region region) throws IOException {
385 createStoreFile(region, Bytes.toString(COLUMN_FAMILY));
386 }
387
388 private void createStoreFile(final Region region, String family) throws IOException {
389 HRegionIncommon loader = new HRegionIncommon(region);
390 HBaseTestCase.addContent(loader, family);
391 loader.flushcache();
392 }
393
394 private void createSmallerStoreFile(final Region region) throws IOException {
395 HRegionIncommon loader = new HRegionIncommon(region);
396 HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY), ("" +
397 "bbb").getBytes(), null);
398 loader.flushcache();
399 }
400
401
402
403
404 @Test
405 public void testNonUserMajorCompactionRequest() throws Exception {
406 Store store = r.getStore(COLUMN_FAMILY);
407 createStoreFile(r);
408 for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
409 createStoreFile(r);
410 }
411 store.triggerMajorCompaction();
412
413 CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY, null).getRequest();
414 assertNotNull("Expected to receive a compaction request", request);
415 assertEquals(
416 "System-requested major compaction should not occur if there are too many store files",
417 false,
418 request.isMajor());
419 }
420
421
422
423
424 @Test
425 public void testUserMajorCompactionRequest() throws IOException{
426 Store store = r.getStore(COLUMN_FAMILY);
427 createStoreFile(r);
428 for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
429 createStoreFile(r);
430 }
431 store.triggerMajorCompaction();
432 CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER, null).getRequest();
433 assertNotNull("Expected to receive a compaction request", request);
434 assertEquals(
435 "User-requested major compaction should always occur, even if there are too many store files",
436 true,
437 request.isMajor());
438 }
439
440
441
442
443
444
445
446 public void testMajorCompactingToNoOutputWithReverseScan() throws IOException {
447 createStoreFile(r);
448 for (int i = 0; i < compactionThreshold; i++) {
449 createStoreFile(r);
450 }
451
452 Scan scan = new Scan();
453 scan.setReversed(true);
454 InternalScanner s = r.getScanner(scan);
455 do {
456 List<Cell> results = new ArrayList<Cell>();
457 boolean result = s.next(results);
458 assertTrue(!results.isEmpty());
459 r.delete(new Delete(results.get(0).getRow()));
460 if (!result) break;
461 } while (true);
462 s.close();
463
464 r.flush(true);
465
466 r.compact(true);
467 scan = new Scan();
468 scan.setReversed(true);
469 s = r.getScanner(scan);
470 int counter = 0;
471 do {
472 List<Cell> results = new ArrayList<Cell>();
473 boolean result = s.next(results);
474 if (!result) break;
475 counter++;
476 } while (true);
477 s.close();
478 assertEquals(0, counter);
479 }
480 }