1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mob.compactions;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertTrue;
23
24 import java.io.IOException;
25 import java.security.Key;
26 import java.security.SecureRandom;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.List;
31 import java.util.Random;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.RejectedExecutionException;
34 import java.util.concurrent.RejectedExecutionHandler;
35 import java.util.concurrent.SynchronousQueue;
36 import java.util.concurrent.ThreadPoolExecutor;
37 import java.util.concurrent.TimeUnit;
38
39 import javax.crypto.spec.SecretKeySpec;
40
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.fs.FileStatus;
43 import org.apache.hadoop.fs.FileSystem;
44 import org.apache.hadoop.fs.Path;
45 import org.apache.hadoop.hbase.Cell;
46 import org.apache.hadoop.hbase.CellUtil;
47 import org.apache.hadoop.hbase.HBaseTestingUtility;
48 import org.apache.hadoop.hbase.HColumnDescriptor;
49 import org.apache.hadoop.hbase.HConstants;
50 import org.apache.hadoop.hbase.HTableDescriptor;
51 import org.apache.hadoop.hbase.NamespaceDescriptor;
52 import org.apache.hadoop.hbase.TableName;
53 import org.apache.hadoop.hbase.client.Admin;
54 import org.apache.hadoop.hbase.client.Admin.CompactType;
55 import org.apache.hadoop.hbase.client.BufferedMutator;
56 import org.apache.hadoop.hbase.client.Connection;
57 import org.apache.hadoop.hbase.client.ConnectionFactory;
58 import org.apache.hadoop.hbase.client.Delete;
59 import org.apache.hadoop.hbase.client.Durability;
60 import org.apache.hadoop.hbase.client.Get;
61 import org.apache.hadoop.hbase.client.Put;
62 import org.apache.hadoop.hbase.client.Result;
63 import org.apache.hadoop.hbase.client.ResultScanner;
64 import org.apache.hadoop.hbase.client.Scan;
65 import org.apache.hadoop.hbase.client.Table;
66 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
67 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
68 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
69 import org.apache.hadoop.hbase.io.HFileLink;
70 import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
71 import org.apache.hadoop.hbase.io.crypto.aes.AES;
72 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
73 import org.apache.hadoop.hbase.io.hfile.HFile;
74 import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
75 import org.apache.hadoop.hbase.mob.MobConstants;
76 import org.apache.hadoop.hbase.mob.MobUtils;
77 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
78 import org.apache.hadoop.hbase.regionserver.BloomType;
79 import org.apache.hadoop.hbase.regionserver.HRegion;
80 import org.apache.hadoop.hbase.regionserver.Store;
81 import org.apache.hadoop.hbase.regionserver.StoreFile;
82 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
83 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
84 import org.apache.hadoop.hbase.security.EncryptionUtil;
85 import org.apache.hadoop.hbase.security.User;
86 import org.apache.hadoop.hbase.testclassification.LargeTests;
87 import org.apache.hadoop.hbase.util.Bytes;
88 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
89 import org.apache.hadoop.hbase.util.Threads;
90 import org.junit.AfterClass;
91 import org.junit.Assert;
92 import org.junit.BeforeClass;
93 import org.junit.Test;
94 import org.junit.experimental.categories.Category;
95
96 @Category(LargeTests.class)
97 public class TestMobCompactor {
98 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
99 private static Configuration conf = null;
100 private TableName tableName;
101 private static Connection conn;
102 private BufferedMutator bufMut;
103 private Table table;
104 private static Admin admin;
105 private HTableDescriptor desc;
106 private HColumnDescriptor hcd1;
107 private HColumnDescriptor hcd2;
108 private static FileSystem fs;
109 private static final String family1 = "family1";
110 private static final String family2 = "family2";
111 private static final String qf1 = "qualifier1";
112 private static final String qf2 = "qualifier2";
113 private static byte[] KEYS = Bytes.toBytes("012");
114 private static int regionNum = KEYS.length;
115 private static int delRowNum = 1;
116 private static int delCellNum = 6;
117 private static int cellNumPerRow = 3;
118 private static int rowNumPerFile = 2;
119 private static ExecutorService pool;
120
121 @BeforeClass
122 public static void setUpBeforeClass() throws Exception {
123 TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
124 TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
125 TEST_UTIL.getConfiguration()
126 .setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, 5000);
127 TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY,
128 KeyProviderForTesting.class.getName());
129 TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
130 TEST_UTIL.getConfiguration().setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0);
131 TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 1);
132 TEST_UTIL.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval", 100);
133 TEST_UTIL.startMiniCluster(1);
134 pool = createThreadPool(TEST_UTIL.getConfiguration());
135 conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), pool);
136 fs = TEST_UTIL.getTestFileSystem();
137 conf = TEST_UTIL.getConfiguration();
138 admin = TEST_UTIL.getHBaseAdmin();
139 }
140
141 @AfterClass
142 public static void tearDownAfterClass() throws Exception {
143 pool.shutdown();
144 conn.close();
145 TEST_UTIL.shutdownMiniCluster();
146 }
147
148 public void setUp(String tableNameAsString) throws IOException {
149 tableName = TableName.valueOf(tableNameAsString);
150 hcd1 = new HColumnDescriptor(family1);
151 hcd1.setMobEnabled(true);
152 hcd1.setMobThreshold(5);
153 hcd2 = new HColumnDescriptor(family2);
154 hcd2.setMobEnabled(true);
155 hcd2.setMobThreshold(5);
156 desc = new HTableDescriptor(tableName);
157 desc.addFamily(hcd1);
158 desc.addFamily(hcd2);
159 admin.createTable(desc, getSplitKeys());
160 table = conn.getTable(tableName);
161 bufMut = conn.getBufferedMutator(tableName);
162 }
163
164 @Test(timeout = 300000)
165 public void testMinorCompaction() throws Exception {
166 resetConf();
167 int mergeSize = 5000;
168
169 conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
170
171
172 NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create("ns").build();
173 String tableNameAsString = "ns:testMinorCompaction";
174 admin.createNamespace(namespaceDescriptor);
175 setUp(tableNameAsString);
176 int count = 4;
177
178 loadData(admin, bufMut, tableName, count, rowNumPerFile);
179 int rowNumPerRegion = count * rowNumPerFile;
180
181 assertEquals("Before deleting: mob rows count", regionNum * rowNumPerRegion,
182 countMobRows(table));
183 assertEquals("Before deleting: mob cells count", regionNum * cellNumPerRow * rowNumPerRegion,
184 countMobCells(table));
185 assertEquals("Before deleting: mob file count", regionNum * count,
186 countFiles(tableName, true, family1));
187
188 int largeFilesCount = countLargeFiles(mergeSize, tableName, family1);
189 createDelFile(table, tableName, Bytes.toBytes(family1), Bytes.toBytes(qf1));
190
191 assertEquals("Before compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum),
192 countMobRows(table));
193 assertEquals("Before compaction: mob cells count", regionNum
194 * (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(table));
195 assertEquals("Before compaction: family1 mob file count", regionNum * count,
196 countFiles(tableName, true, family1));
197 assertEquals("Before compaction: family2 mob file count", regionNum * count,
198 countFiles(tableName, true, family2));
199 assertEquals("Before compaction: family1 del file count", regionNum,
200 countFiles(tableName, false, family1));
201 assertEquals("Before compaction: family2 del file count", regionNum,
202 countFiles(tableName, false, family2));
203
204
205 MobCompactor compactor = new PartitionedMobCompactor(conf, fs, tableName, hcd1, pool);
206 compactor.compact();
207
208 assertEquals("After compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum),
209 countMobRows(table));
210 assertEquals("After compaction: mob cells count", regionNum
211 * (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(table));
212
213
214 assertEquals("After compaction: family1 mob file count", largeFilesCount + regionNum,
215 countFiles(tableName, true, family1));
216 assertEquals("After compaction: family2 mob file count", regionNum * count,
217 countFiles(tableName, true, family2));
218 assertEquals("After compaction: family1 del file count", regionNum,
219 countFiles(tableName, false, family1));
220 assertEquals("After compaction: family2 del file count", regionNum,
221 countFiles(tableName, false, family2));
222 }
223
224 @Test(timeout = 300000)
225 public void testCompactionWithHFileLink() throws IOException, InterruptedException {
226 resetConf();
227 String tableNameAsString = "testCompactionWithHFileLink";
228 setUp(tableNameAsString);
229 int count = 4;
230
231 loadData(admin, bufMut, tableName, count, rowNumPerFile);
232 int rowNumPerRegion = count * rowNumPerFile;
233
234 long tid = System.currentTimeMillis();
235 byte[] snapshotName1 = Bytes.toBytes("snaptb-" + tid);
236
237 admin.snapshot(snapshotName1, tableName);
238
239 createDelFile(table, tableName, Bytes.toBytes(family1), Bytes.toBytes(qf1));
240
241 assertEquals("Before compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum),
242 countMobRows(table));
243 assertEquals("Before compaction: mob cells count", regionNum
244 * (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(table));
245 assertEquals("Before compaction: family1 mob file count", regionNum * count,
246 countFiles(tableName, true, family1));
247 assertEquals("Before compaction: family2 mob file count", regionNum * count,
248 countFiles(tableName, true, family2));
249 assertEquals("Before compaction: family1 del file count", regionNum,
250 countFiles(tableName, false, family1));
251 assertEquals("Before compaction: family2 del file count", regionNum,
252 countFiles(tableName, false, family2));
253
254
255 MobCompactor compactor = new PartitionedMobCompactor(conf, fs, tableName, hcd1, pool);
256 compactor.compact();
257
258 assertEquals("After first compaction: mob rows count", regionNum
259 * (rowNumPerRegion - delRowNum), countMobRows(table));
260 assertEquals("After first compaction: mob cells count", regionNum
261 * (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(table));
262 assertEquals("After first compaction: family1 mob file count", regionNum,
263 countFiles(tableName, true, family1));
264 assertEquals("After first compaction: family2 mob file count", regionNum * count,
265 countFiles(tableName, true, family2));
266 assertEquals("After first compaction: family1 del file count", 0,
267 countFiles(tableName, false, family1));
268 assertEquals("After first compaction: family2 del file count", regionNum,
269 countFiles(tableName, false, family2));
270 assertEquals("After first compaction: family1 hfilelink count", 0, countHFileLinks(family1));
271 assertEquals("After first compaction: family2 hfilelink count", 0, countHFileLinks(family2));
272
273 admin.disableTable(tableName);
274
275 admin.restoreSnapshot(snapshotName1);
276 admin.enableTable(tableName);
277
278 assertEquals("After restoring snapshot: mob rows count", regionNum * rowNumPerRegion,
279 countMobRows(table));
280 assertEquals("After restoring snapshot: mob cells count", regionNum * cellNumPerRow
281 * rowNumPerRegion, countMobCells(table));
282 assertEquals("After restoring snapshot: family1 mob file count", regionNum * count,
283 countFiles(tableName, true, family1));
284 assertEquals("After restoring snapshot: family2 mob file count", regionNum * count,
285 countFiles(tableName, true, family2));
286 assertEquals("After restoring snapshot: family1 del file count", 0,
287 countFiles(tableName, false, family1));
288 assertEquals("After restoring snapshot: family2 del file count", 0,
289 countFiles(tableName, false, family2));
290 assertEquals("After restoring snapshot: family1 hfilelink count", regionNum * count,
291 countHFileLinks(family1));
292 assertEquals("After restoring snapshot: family2 hfilelink count", 0, countHFileLinks(family2));
293
294 compactor.compact();
295
296 assertEquals("After second compaction: mob rows count", regionNum * rowNumPerRegion,
297 countMobRows(table));
298 assertEquals("After second compaction: mob cells count", regionNum * cellNumPerRow
299 * rowNumPerRegion, countMobCells(table));
300 assertEquals("After second compaction: family1 mob file count", regionNum,
301 countFiles(tableName, true, family1));
302 assertEquals("After second compaction: family2 mob file count", regionNum * count,
303 countFiles(tableName, true, family2));
304 assertEquals("After second compaction: family1 del file count", 0,
305 countFiles(tableName, false, family1));
306 assertEquals("After second compaction: family2 del file count", 0,
307 countFiles(tableName, false, family2));
308 assertEquals("After second compaction: family1 hfilelink count", 0, countHFileLinks(family1));
309 assertEquals("After second compaction: family2 hfilelink count", 0, countHFileLinks(family2));
310 assertRefFileNameEqual(family1);
311 }
312
313 @Test(timeout = 300000)
314 public void testMajorCompactionFromAdmin() throws Exception {
315 resetConf();
316 int mergeSize = 5000;
317
318 conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
319 String tableNameAsString = "testMajorCompactionFromAdmin";
320 SecureRandom rng = new SecureRandom();
321 byte[] keyBytes = new byte[AES.KEY_LENGTH];
322 rng.nextBytes(keyBytes);
323 String algorithm = conf.get(HConstants.CRYPTO_KEY_ALGORITHM_CONF_KEY, HConstants.CIPHER_AES);
324 Key cfKey = new SecretKeySpec(keyBytes, algorithm);
325 byte[] encryptionKey = EncryptionUtil.wrapKey(conf,
326 conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User.getCurrent().getShortName()), cfKey);
327 TableName tableName = TableName.valueOf(tableNameAsString);
328 HTableDescriptor desc = new HTableDescriptor(tableName);
329 HColumnDescriptor hcd1 = new HColumnDescriptor(family1);
330 hcd1.setMobEnabled(true);
331 hcd1.setMobThreshold(0);
332 hcd1.setEncryptionType(algorithm);
333 hcd1.setEncryptionKey(encryptionKey);
334 HColumnDescriptor hcd2 = new HColumnDescriptor(family2);
335 hcd2.setMobEnabled(true);
336 hcd2.setMobThreshold(0);
337 desc.addFamily(hcd1);
338 desc.addFamily(hcd2);
339 admin.createTable(desc, getSplitKeys());
340 Table table = conn.getTable(tableName);
341 BufferedMutator bufMut = conn.getBufferedMutator(tableName);
342 int count = 4;
343
344 loadData(admin, bufMut, tableName, count, rowNumPerFile);
345 int rowNumPerRegion = count * rowNumPerFile;
346
347 assertEquals("Before deleting: mob rows count", regionNum * rowNumPerRegion,
348 countMobRows(table));
349 assertEquals("Before deleting: mob cells count", regionNum * cellNumPerRow * rowNumPerRegion,
350 countMobCells(table));
351 assertEquals("Before deleting: mob file count", regionNum * count,
352 countFiles(tableName, true, family1));
353
354 createDelFile(table, tableName, Bytes.toBytes(family1), Bytes.toBytes(qf1));
355
356 assertEquals("Before compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum),
357 countMobRows(table));
358 assertEquals("Before compaction: mob cells count", regionNum
359 * (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(table));
360 assertEquals("Before compaction: family1 mob file count", regionNum * count,
361 countFiles(tableName, true, family1));
362 assertEquals("Before compaction: family2 mob file count", regionNum * count,
363 countFiles(tableName, true, family2));
364 assertEquals("Before compaction: family1 del file count", regionNum,
365 countFiles(tableName, false, family1));
366 assertEquals("Before compaction: family2 del file count", regionNum,
367 countFiles(tableName, false, family2));
368
369
370 admin.majorCompact(tableName, hcd1.getName(), Admin.CompactType.MOB);
371
372 waitUntilMobCompactionFinished(tableName);
373 assertEquals("After compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum),
374 countMobRows(table));
375 assertEquals("After compaction: mob cells count", regionNum
376 * (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(table));
377 assertEquals("After compaction: family1 mob file count", regionNum,
378 countFiles(tableName, true, family1));
379 assertEquals("After compaction: family2 mob file count", regionNum * count,
380 countFiles(tableName, true, family2));
381 assertEquals("After compaction: family1 del file count", 0,
382 countFiles(tableName, false, family1));
383 assertEquals("After compaction: family2 del file count", regionNum,
384 countFiles(tableName, false, family2));
385 Assert.assertTrue(verifyEncryption(tableName, family1));
386 table.close();
387 }
388
389 @Test(timeout = 300000)
390 public void testScannerOnBulkLoadRefHFiles() throws Exception {
391 resetConf();
392 setUp("testScannerOnBulkLoadRefHFiles");
393 long ts = EnvironmentEdgeManager.currentTime();
394 byte[] key0 = Bytes.toBytes("k0");
395 byte[] key1 = Bytes.toBytes("k1");
396 String value0 = "mobValue0";
397 String value1 = "mobValue1";
398 String newValue0 = "new";
399 Put put0 = new Put(key0);
400 put0.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(value0));
401 loadData(admin, bufMut, tableName, new Put[] { put0 });
402 put0 = new Put(key0);
403 put0.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(newValue0));
404 Put put1 = new Put(key1);
405 put1.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(value1));
406 loadData(admin, bufMut, tableName, new Put[] { put0, put1 });
407
408 Get get = new Get(key0);
409 Result result = table.get(get);
410 Cell cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1));
411 assertEquals("Before compaction: mob value of k0", newValue0,
412 Bytes.toString(CellUtil.cloneValue(cell)));
413 admin.majorCompact(tableName, hcd1.getName(), Admin.CompactType.MOB);
414 waitUntilMobCompactionFinished(tableName);
415
416
417 result = table.get(get);
418 cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1));
419 assertEquals("After compaction: mob value of k0", newValue0,
420 Bytes.toString(CellUtil.cloneValue(cell)));
421
422 get = new Get(key1);
423 get.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(true));
424 result = table.get(get);
425 cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1));
426
427 Path mobFamilyPath =
428 MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, hcd1.getNameAsString());
429 List<Path> paths = new ArrayList<Path>();
430 if (fs.exists(mobFamilyPath)) {
431 FileStatus[] files = fs.listStatus(mobFamilyPath);
432 for (FileStatus file : files) {
433 if (!StoreFileInfo.isDelFile(file.getPath())) {
434 paths.add(file.getPath());
435 }
436 }
437 }
438 assertEquals("After compaction: number of mob files:", 1, paths.size());
439 assertEquals("After compaction: mob file name:", MobUtils.getMobFileName(cell), paths.get(0)
440 .getName());
441 }
442
443
444
445
446
447
448
449 @Test(timeout = 300000)
450 public void testGetAfterCompaction() throws Exception {
451 resetConf();
452 conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0);
453 String famStr = "f1";
454 byte[] fam = Bytes.toBytes(famStr);
455 byte[] qualifier = Bytes.toBytes("q1");
456 byte[] mobVal = Bytes.toBytes("01234567890");
457 HTableDescriptor hdt = new HTableDescriptor(TableName.valueOf("testGetAfterCompaction"));
458 hdt.addCoprocessor(CompactTwoLatestHfilesCopro.class.getName());
459 HColumnDescriptor hcd = new HColumnDescriptor(fam);
460 hcd.setMobEnabled(true);
461 hcd.setMobThreshold(10);
462 hcd.setMaxVersions(1);
463 hdt.addFamily(hcd);
464 try {
465 Table table = TEST_UTIL.createTable(hdt, null);
466 HRegion r = TEST_UTIL.getMiniHBaseCluster().getRegions(hdt.getTableName()).get(0);
467 Put p = new Put(Bytes.toBytes("r1"));
468 p.addColumn(fam, qualifier, mobVal);
469 table.put(p);
470
471 TEST_UTIL.flush(table.getName());
472
473 FileSystem fs = r.getRegionFileSystem().getFileSystem();
474 Path path = r.getRegionFileSystem().getStoreDir(famStr);
475 waitUntilFilesShowup(fs, path, 1);
476
477 p = new Put(Bytes.toBytes("r2"));
478 p.addColumn(fam, qualifier, mobVal);
479 table.put(p);
480
481 TEST_UTIL.flush(table.getName());
482 waitUntilFilesShowup(fs, path, 2);
483
484 admin.compact(hdt.getTableName(), fam, CompactType.MOB);
485 waitUntilFilesShowup(fs, path, 3);
486
487
488 admin.compact(hdt.getTableName(), fam);
489 waitUntilFilesShowup(fs, path, 2);
490
491
492
493 Thread.sleep(100);
494
495 TEST_UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting();
496
497
498 Get get = new Get(Bytes.toBytes("r2"));
499 try {
500 Result result = table.get(get);
501 assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal));
502 } catch (IOException e) {
503 assertTrue("The MOB file doesn't exist", false);
504 }
505 } finally {
506 TEST_UTIL.deleteTable(hdt.getTableName());
507 }
508 }
509
510 private void waitUntilFilesShowup(final FileSystem fs, final Path path, final int num)
511 throws InterruptedException, IOException {
512 FileStatus[] fileList = fs.listStatus(path);
513 while (fileList.length != num) {
514 Thread.sleep(50);
515 fileList = fs.listStatus(path);
516 }
517 }
518
519
520
521
522
523 public static class CompactTwoLatestHfilesCopro extends BaseRegionObserver {
524 @Override
525 public void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
526 final Store store, final List<StoreFile> candidates, final CompactionRequest request)
527 throws IOException {
528
529 int count = candidates.size();
530 if (count >= 2) {
531 for (int i = 0; i < count - 2; i++) {
532 candidates.remove(0);
533 }
534 c.bypass();
535 }
536 }
537 }
538 private void waitUntilCompactionFinished(TableName tableName) throws IOException,
539 InterruptedException {
540 long finished = EnvironmentEdgeManager.currentTime() + 60000;
541 CompactionState state = admin.getCompactionState(tableName);
542 while (EnvironmentEdgeManager.currentTime() < finished) {
543 if (state == CompactionState.NONE) {
544 break;
545 }
546 state = admin.getCompactionState(tableName);
547 Thread.sleep(10);
548 }
549 assertEquals(CompactionState.NONE, state);
550 }
551
552 private void waitUntilMobCompactionFinished(TableName tableName) throws IOException,
553 InterruptedException {
554 long finished = EnvironmentEdgeManager.currentTime() + 60000;
555 CompactionState state = admin.getCompactionState(tableName, Admin.CompactType.MOB);
556 while (EnvironmentEdgeManager.currentTime() < finished) {
557 if (state == CompactionState.NONE) {
558 break;
559 }
560 state = admin.getCompactionState(tableName, Admin.CompactType.MOB);
561 Thread.sleep(10);
562 }
563 assertEquals(CompactionState.NONE, state);
564 }
565
566
567
568
569
570
571 private int countMobRows(final Table table) throws IOException {
572 Scan scan = new Scan();
573
574 scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
575 return TEST_UTIL.countRows(table, scan);
576 }
577
578
579
580
581
582
583 private int countMobCells(final Table table) throws IOException {
584 Scan scan = new Scan();
585
586 scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
587 ResultScanner results = table.getScanner(scan);
588 int count = 0;
589 for (Result res : results) {
590 for (Cell cell : res.listCells()) {
591 count++;
592 }
593 }
594 results.close();
595 return count;
596 }
597
598
599
600
601
602
603
604 private int countFiles(TableName tableName, boolean isMobFile, String familyName)
605 throws IOException {
606 Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableName, familyName);
607 int count = 0;
608 if (fs.exists(mobDirPath)) {
609 FileStatus[] files = fs.listStatus(mobDirPath);
610 for (FileStatus file : files) {
611 if (isMobFile == true) {
612 if (!StoreFileInfo.isDelFile(file.getPath())) {
613 count++;
614 }
615 } else {
616 if (StoreFileInfo.isDelFile(file.getPath())) {
617 count++;
618 }
619 }
620 }
621 }
622 return count;
623 }
624
625 private boolean verifyEncryption(TableName tableName, String familyName) throws IOException {
626 Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableName, familyName);
627 boolean hasFiles = false;
628 if (fs.exists(mobDirPath)) {
629 FileStatus[] files = fs.listStatus(mobDirPath);
630 hasFiles = files != null && files.length > 0;
631 Assert.assertTrue(hasFiles);
632 Path path = files[0].getPath();
633 CacheConfig cacheConf = new CacheConfig(conf);
634 StoreFile sf = new StoreFile(TEST_UTIL.getTestFileSystem(), path, conf, cacheConf,
635 BloomType.NONE);
636 HFile.Reader reader = sf.createReader().getHFileReader();
637 byte[] encryptionKey = reader.getTrailer().getEncryptionKey();
638 Assert.assertTrue(null != encryptionKey);
639 Assert.assertTrue(reader.getFileContext().getEncryptionContext().getCipher().getName()
640 .equals(HConstants.CIPHER_AES));
641 }
642 return hasFiles;
643 }
644
645
646
647
648
649
650 private int countHFileLinks(String familyName) throws IOException {
651 Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableName, familyName);
652 int count = 0;
653 if (fs.exists(mobDirPath)) {
654 FileStatus[] files = fs.listStatus(mobDirPath);
655 for (FileStatus file : files) {
656 if (HFileLink.isHFileLink(file.getPath())) {
657 count++;
658 }
659 }
660 }
661 return count;
662 }
663
664
665
666
667
668
669
670
671 private int countLargeFiles(int size, TableName tableName, String familyName) throws IOException {
672 Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableName, familyName);
673 int count = 0;
674 if (fs.exists(mobDirPath)) {
675 FileStatus[] files = fs.listStatus(mobDirPath);
676 for (FileStatus file : files) {
677
678 if ((!StoreFileInfo.isDelFile(file.getPath())) && (file.getLen() > size)) {
679 count++;
680 }
681 }
682 }
683 return count;
684 }
685
686
687
688
689 private void loadData(Admin admin, BufferedMutator table, TableName tableName, int fileNum,
690 int rowNumPerFile) throws IOException, InterruptedException {
691 if (fileNum <= 0) {
692 throw new IllegalArgumentException();
693 }
694 for (int i = 0; i < fileNum * rowNumPerFile; i++) {
695 for (byte k0 : KEYS) {
696 byte[] k = new byte[] { k0 };
697 byte[] key = Bytes.add(k, Bytes.toBytes(i));
698 byte[] mobVal = makeDummyData(10 * (i + 1));
699 Put put = new Put(key);
700 put.setDurability(Durability.SKIP_WAL);
701 put.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), mobVal);
702 put.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf2), mobVal);
703 put.addColumn(Bytes.toBytes(family2), Bytes.toBytes(qf1), mobVal);
704 table.mutate(put);
705 }
706 if ((i + 1) % rowNumPerFile == 0) {
707 table.flush();
708 admin.flush(tableName);
709 }
710 }
711 }
712
713 private void loadData(Admin admin, BufferedMutator table, TableName tableName, Put[] puts)
714 throws IOException {
715 table.mutate(Arrays.asList(puts));
716 table.flush();
717 admin.flush(tableName);
718 }
719
720
721
722
723 private void createDelFile(Table table, TableName tableName, byte[] family, byte[] qf)
724 throws IOException, InterruptedException {
725 for (byte k0 : KEYS) {
726 byte[] k = new byte[] { k0 };
727
728 byte[] key1 = Bytes.add(k, Bytes.toBytes(0));
729 Delete delete1 = new Delete(key1);
730 delete1.addFamily(family);
731 table.delete(delete1);
732
733 byte[] key2 = Bytes.add(k, Bytes.toBytes(2));
734 Delete delete2 = new Delete(key2);
735 table.delete(delete2);
736
737 byte[] key3 = Bytes.add(k, Bytes.toBytes(4));
738 Delete delete3 = new Delete(key3);
739 delete3.addColumn(family, qf);
740 table.delete(delete3);
741 }
742 admin.flush(tableName);
743 List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(tableName);
744 for (HRegion region : regions) {
745 region.waitForFlushesAndCompactions();
746 region.compact(true);
747 }
748 }
749
750
751
752
753
754 private byte[] makeDummyData(int size) {
755 byte[] dummyData = new byte[size];
756 new Random().nextBytes(dummyData);
757 return dummyData;
758 }
759
760
761
762
763 private byte[][] getSplitKeys() {
764 byte[][] splitKeys = new byte[KEYS.length - 1][];
765 for (int i = 0; i < splitKeys.length; ++i) {
766 splitKeys[i] = new byte[] { KEYS[i + 1] };
767 }
768 return splitKeys;
769 }
770
771 private static ExecutorService createThreadPool(Configuration conf) {
772 int maxThreads = 10;
773 long keepAliveTime = 60;
774 final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
775 ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads,
776 keepAliveTime, TimeUnit.SECONDS, queue,
777 Threads.newDaemonThreadFactory("MobFileCompactionChore"),
778 new RejectedExecutionHandler() {
779 @Override
780 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
781 try {
782
783 queue.put(r);
784 } catch (InterruptedException e) {
785 throw new RejectedExecutionException(e);
786 }
787 }
788 });
789 ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
790 return pool;
791 }
792
793 private void assertRefFileNameEqual(String familyName) throws IOException {
794 Scan scan = new Scan();
795 scan.addFamily(Bytes.toBytes(familyName));
796
797 scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
798 ResultScanner results = table.getScanner(scan);
799 Path mobFamilyPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(),
800 tableName, familyName);
801 List<Path> actualFilePaths = new ArrayList<>();
802 List<Path> expectFilePaths = new ArrayList<>();
803 for (Result res : results) {
804 for (Cell cell : res.listCells()) {
805 byte[] referenceValue = CellUtil.cloneValue(cell);
806 String fileName = Bytes.toString(referenceValue, Bytes.SIZEOF_INT,
807 referenceValue.length - Bytes.SIZEOF_INT);
808 Path targetPath = new Path(mobFamilyPath, fileName);
809 if(!actualFilePaths.contains(targetPath)) {
810 actualFilePaths.add(targetPath);
811 }
812 }
813 }
814 results.close();
815 if (fs.exists(mobFamilyPath)) {
816 FileStatus[] files = fs.listStatus(mobFamilyPath);
817 for (FileStatus file : files) {
818 if (!StoreFileInfo.isDelFile(file.getPath())) {
819 expectFilePaths.add(file.getPath());
820 }
821 }
822 }
823 Collections.sort(actualFilePaths);
824 Collections.sort(expectFilePaths);
825 assertEquals(expectFilePaths, actualFilePaths);
826 }
827
828
829
830
831 private void resetConf() {
832 conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD,
833 MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD);
834 conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE,
835 MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE);
836 }
837 }