View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mob.compactions;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertTrue;
23  
24  import java.io.IOException;
25  import java.security.Key;
26  import java.security.SecureRandom;
27  import java.util.ArrayList;
28  import java.util.Arrays;
29  import java.util.Collections;
30  import java.util.List;
31  import java.util.Random;
32  import java.util.concurrent.ExecutorService;
33  import java.util.concurrent.RejectedExecutionException;
34  import java.util.concurrent.RejectedExecutionHandler;
35  import java.util.concurrent.SynchronousQueue;
36  import java.util.concurrent.ThreadPoolExecutor;
37  import java.util.concurrent.TimeUnit;
38  
39  import javax.crypto.spec.SecretKeySpec;
40  
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.fs.FileStatus;
43  import org.apache.hadoop.fs.FileSystem;
44  import org.apache.hadoop.fs.Path;
45  import org.apache.hadoop.hbase.Cell;
46  import org.apache.hadoop.hbase.CellUtil;
47  import org.apache.hadoop.hbase.HBaseTestingUtility;
48  import org.apache.hadoop.hbase.HColumnDescriptor;
49  import org.apache.hadoop.hbase.HConstants;
50  import org.apache.hadoop.hbase.HTableDescriptor;
51  import org.apache.hadoop.hbase.NamespaceDescriptor;
52  import org.apache.hadoop.hbase.TableName;
53  import org.apache.hadoop.hbase.client.Admin;
54  import org.apache.hadoop.hbase.client.Admin.CompactType;
55  import org.apache.hadoop.hbase.client.BufferedMutator;
56  import org.apache.hadoop.hbase.client.Connection;
57  import org.apache.hadoop.hbase.client.ConnectionFactory;
58  import org.apache.hadoop.hbase.client.Delete;
59  import org.apache.hadoop.hbase.client.Durability;
60  import org.apache.hadoop.hbase.client.Get;
61  import org.apache.hadoop.hbase.client.Put;
62  import org.apache.hadoop.hbase.client.Result;
63  import org.apache.hadoop.hbase.client.ResultScanner;
64  import org.apache.hadoop.hbase.client.Scan;
65  import org.apache.hadoop.hbase.client.Table;
66  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
67  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
68  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
69  import org.apache.hadoop.hbase.io.HFileLink;
70  import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
71  import org.apache.hadoop.hbase.io.crypto.aes.AES;
72  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
73  import org.apache.hadoop.hbase.io.hfile.HFile;
74  import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
75  import org.apache.hadoop.hbase.mob.MobConstants;
76  import org.apache.hadoop.hbase.mob.MobUtils;
77  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
78  import org.apache.hadoop.hbase.regionserver.BloomType;
79  import org.apache.hadoop.hbase.regionserver.HRegion;
80  import org.apache.hadoop.hbase.regionserver.Store;
81  import org.apache.hadoop.hbase.regionserver.StoreFile;
82  import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
83  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
84  import org.apache.hadoop.hbase.security.EncryptionUtil;
85  import org.apache.hadoop.hbase.security.User;
86  import org.apache.hadoop.hbase.testclassification.LargeTests;
87  import org.apache.hadoop.hbase.util.Bytes;
88  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
89  import org.apache.hadoop.hbase.util.Threads;
90  import org.junit.AfterClass;
91  import org.junit.Assert;
92  import org.junit.BeforeClass;
93  import org.junit.Test;
94  import org.junit.experimental.categories.Category;
95  
96  @Category(LargeTests.class)
97  public class TestMobCompactor {
98    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
99    private static Configuration conf = null;
100   private TableName tableName;
101   private static Connection conn;
102   private BufferedMutator bufMut;
103   private Table table;
104   private static Admin admin;
105   private HTableDescriptor desc;
106   private HColumnDescriptor hcd1;
107   private HColumnDescriptor hcd2;
108   private static FileSystem fs;
109   private static final String family1 = "family1";
110   private static final String family2 = "family2";
111   private static final String qf1 = "qualifier1";
112   private static final String qf2 = "qualifier2";
113   private static byte[] KEYS = Bytes.toBytes("012");
114   private static int regionNum = KEYS.length;
115   private static int delRowNum = 1;
116   private static int delCellNum = 6;
117   private static int cellNumPerRow = 3;
118   private static int rowNumPerFile = 2;
119   private static ExecutorService pool;
120 
121   @BeforeClass
122   public static void setUpBeforeClass() throws Exception {
123     TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
124     TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
125     TEST_UTIL.getConfiguration()
126       .setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, 5000);
127     TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY,
128       KeyProviderForTesting.class.getName());
129     TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
130     TEST_UTIL.getConfiguration().setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0);
131     TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 1);
132     TEST_UTIL.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval", 100);
133     TEST_UTIL.startMiniCluster(1);
134     pool = createThreadPool(TEST_UTIL.getConfiguration());
135     conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), pool);
136     fs = TEST_UTIL.getTestFileSystem();
137     conf = TEST_UTIL.getConfiguration();
138     admin = TEST_UTIL.getHBaseAdmin();
139   }
140 
141   @AfterClass
142   public static void tearDownAfterClass() throws Exception {
143     pool.shutdown();
144     conn.close();
145     TEST_UTIL.shutdownMiniCluster();
146   }
147 
148   public void setUp(String tableNameAsString) throws IOException {
149     tableName = TableName.valueOf(tableNameAsString);
150     hcd1 = new HColumnDescriptor(family1);
151     hcd1.setMobEnabled(true);
152     hcd1.setMobThreshold(5);
153     hcd2 = new HColumnDescriptor(family2);
154     hcd2.setMobEnabled(true);
155     hcd2.setMobThreshold(5);
156     desc = new HTableDescriptor(tableName);
157     desc.addFamily(hcd1);
158     desc.addFamily(hcd2);
159     admin.createTable(desc, getSplitKeys());
160     table = conn.getTable(tableName);
161     bufMut = conn.getBufferedMutator(tableName);
162   }
163 
164   @Test(timeout = 300000)
165   public void testMinorCompaction() throws Exception {
166     resetConf();
167     int mergeSize = 5000;
168     // change the mob compaction merge size
169     conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
170 
171     // create a table with namespace
172     NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create("ns").build();
173     String tableNameAsString = "ns:testMinorCompaction";
174     admin.createNamespace(namespaceDescriptor);
175     setUp(tableNameAsString);
176     int count = 4;
177     // generate mob files
178     loadData(admin, bufMut, tableName, count, rowNumPerFile);
179     int rowNumPerRegion = count * rowNumPerFile;
180 
181     assertEquals("Before deleting: mob rows count", regionNum * rowNumPerRegion,
182       countMobRows(table));
183     assertEquals("Before deleting: mob cells count", regionNum * cellNumPerRow * rowNumPerRegion,
184       countMobCells(table));
185     assertEquals("Before deleting: mob file count", regionNum * count,
186       countFiles(tableName, true, family1));
187 
188     int largeFilesCount = countLargeFiles(mergeSize, tableName, family1);
189     createDelFile(table, tableName, Bytes.toBytes(family1), Bytes.toBytes(qf1));
190 
191     assertEquals("Before compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum),
192       countMobRows(table));
193     assertEquals("Before compaction: mob cells count", regionNum
194       * (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(table));
195     assertEquals("Before compaction: family1 mob file count", regionNum * count,
196       countFiles(tableName, true, family1));
197     assertEquals("Before compaction: family2 mob file count", regionNum * count,
198       countFiles(tableName, true, family2));
199     assertEquals("Before compaction: family1 del file count", regionNum,
200       countFiles(tableName, false, family1));
201     assertEquals("Before compaction: family2 del file count", regionNum,
202       countFiles(tableName, false, family2));
203 
204     // do the mob file compaction
205     MobCompactor compactor = new PartitionedMobCompactor(conf, fs, tableName, hcd1, pool);
206     compactor.compact();
207 
208     assertEquals("After compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum),
209       countMobRows(table));
210     assertEquals("After compaction: mob cells count", regionNum
211       * (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(table));
212     // After the compaction, the files smaller than the mob compaction merge size
213     // is merge to one file
214     assertEquals("After compaction: family1 mob file count", largeFilesCount + regionNum,
215       countFiles(tableName, true, family1));
216     assertEquals("After compaction: family2 mob file count", regionNum * count,
217       countFiles(tableName, true, family2));
218     assertEquals("After compaction: family1 del file count", regionNum,
219       countFiles(tableName, false, family1));
220     assertEquals("After compaction: family2 del file count", regionNum,
221       countFiles(tableName, false, family2));
222   }
223 
224   @Test(timeout = 300000)
225   public void testCompactionWithHFileLink() throws IOException, InterruptedException {
226     resetConf();
227     String tableNameAsString = "testCompactionWithHFileLink";
228     setUp(tableNameAsString);
229     int count = 4;
230     // generate mob files
231     loadData(admin, bufMut, tableName, count, rowNumPerFile);
232     int rowNumPerRegion = count * rowNumPerFile;
233 
234     long tid = System.currentTimeMillis();
235     byte[] snapshotName1 = Bytes.toBytes("snaptb-" + tid);
236     // take a snapshot
237     admin.snapshot(snapshotName1, tableName);
238 
239     createDelFile(table, tableName, Bytes.toBytes(family1), Bytes.toBytes(qf1));
240 
241     assertEquals("Before compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum),
242       countMobRows(table));
243     assertEquals("Before compaction: mob cells count", regionNum
244       * (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(table));
245     assertEquals("Before compaction: family1 mob file count", regionNum * count,
246       countFiles(tableName, true, family1));
247     assertEquals("Before compaction: family2 mob file count", regionNum * count,
248       countFiles(tableName, true, family2));
249     assertEquals("Before compaction: family1 del file count", regionNum,
250       countFiles(tableName, false, family1));
251     assertEquals("Before compaction: family2 del file count", regionNum,
252       countFiles(tableName, false, family2));
253 
254     // do the mob compaction
255     MobCompactor compactor = new PartitionedMobCompactor(conf, fs, tableName, hcd1, pool);
256     compactor.compact();
257 
258     assertEquals("After first compaction: mob rows count", regionNum
259       * (rowNumPerRegion - delRowNum), countMobRows(table));
260     assertEquals("After first compaction: mob cells count", regionNum
261       * (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(table));
262     assertEquals("After first compaction: family1 mob file count", regionNum,
263       countFiles(tableName, true, family1));
264     assertEquals("After first compaction: family2 mob file count", regionNum * count,
265       countFiles(tableName, true, family2));
266     assertEquals("After first compaction: family1 del file count", 0,
267       countFiles(tableName, false, family1));
268     assertEquals("After first compaction: family2 del file count", regionNum,
269       countFiles(tableName, false, family2));
270     assertEquals("After first compaction: family1 hfilelink count", 0, countHFileLinks(family1));
271     assertEquals("After first compaction: family2 hfilelink count", 0, countHFileLinks(family2));
272 
273     admin.disableTable(tableName);
274     // Restore from snapshot, the hfilelink will exist in mob dir
275     admin.restoreSnapshot(snapshotName1);
276     admin.enableTable(tableName);
277 
278     assertEquals("After restoring snapshot: mob rows count", regionNum * rowNumPerRegion,
279       countMobRows(table));
280     assertEquals("After restoring snapshot: mob cells count", regionNum * cellNumPerRow
281       * rowNumPerRegion, countMobCells(table));
282     assertEquals("After restoring snapshot: family1 mob file count", regionNum * count,
283       countFiles(tableName, true, family1));
284     assertEquals("After restoring snapshot: family2 mob file count", regionNum * count,
285       countFiles(tableName, true, family2));
286     assertEquals("After restoring snapshot: family1 del file count", 0,
287       countFiles(tableName, false, family1));
288     assertEquals("After restoring snapshot: family2 del file count", 0,
289       countFiles(tableName, false, family2));
290     assertEquals("After restoring snapshot: family1 hfilelink count", regionNum * count,
291       countHFileLinks(family1));
292     assertEquals("After restoring snapshot: family2 hfilelink count", 0, countHFileLinks(family2));
293 
294     compactor.compact();
295 
296     assertEquals("After second compaction: mob rows count", regionNum * rowNumPerRegion,
297       countMobRows(table));
298     assertEquals("After second compaction: mob cells count", regionNum * cellNumPerRow
299       * rowNumPerRegion, countMobCells(table));
300     assertEquals("After second compaction: family1 mob file count", regionNum,
301       countFiles(tableName, true, family1));
302     assertEquals("After second compaction: family2 mob file count", regionNum * count,
303       countFiles(tableName, true, family2));
304     assertEquals("After second compaction: family1 del file count", 0,
305       countFiles(tableName, false, family1));
306     assertEquals("After second compaction: family2 del file count", 0,
307       countFiles(tableName, false, family2));
308     assertEquals("After second compaction: family1 hfilelink count", 0, countHFileLinks(family1));
309     assertEquals("After second compaction: family2 hfilelink count", 0, countHFileLinks(family2));
310     assertRefFileNameEqual(family1);
311   }
312 
313   @Test(timeout = 300000)
314   public void testMajorCompactionFromAdmin() throws Exception {
315     resetConf();
316     int mergeSize = 5000;
317     // change the mob compaction merge size
318     conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
319     String tableNameAsString = "testMajorCompactionFromAdmin";
320     SecureRandom rng = new SecureRandom();
321     byte[] keyBytes = new byte[AES.KEY_LENGTH];
322     rng.nextBytes(keyBytes);
323     String algorithm = conf.get(HConstants.CRYPTO_KEY_ALGORITHM_CONF_KEY, HConstants.CIPHER_AES);
324     Key cfKey = new SecretKeySpec(keyBytes, algorithm);
325     byte[] encryptionKey = EncryptionUtil.wrapKey(conf,
326       conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User.getCurrent().getShortName()), cfKey);
327     TableName tableName = TableName.valueOf(tableNameAsString);
328     HTableDescriptor desc = new HTableDescriptor(tableName);
329     HColumnDescriptor hcd1 = new HColumnDescriptor(family1);
330     hcd1.setMobEnabled(true);
331     hcd1.setMobThreshold(0);
332     hcd1.setEncryptionType(algorithm);
333     hcd1.setEncryptionKey(encryptionKey);
334     HColumnDescriptor hcd2 = new HColumnDescriptor(family2);
335     hcd2.setMobEnabled(true);
336     hcd2.setMobThreshold(0);
337     desc.addFamily(hcd1);
338     desc.addFamily(hcd2);
339     admin.createTable(desc, getSplitKeys());
340     Table table = conn.getTable(tableName);
341     BufferedMutator bufMut = conn.getBufferedMutator(tableName);
342     int count = 4;
343     // generate mob files
344     loadData(admin, bufMut, tableName, count, rowNumPerFile);
345     int rowNumPerRegion = count * rowNumPerFile;
346 
347     assertEquals("Before deleting: mob rows count", regionNum * rowNumPerRegion,
348       countMobRows(table));
349     assertEquals("Before deleting: mob cells count", regionNum * cellNumPerRow * rowNumPerRegion,
350       countMobCells(table));
351     assertEquals("Before deleting: mob file count", regionNum * count,
352       countFiles(tableName, true, family1));
353 
354     createDelFile(table, tableName, Bytes.toBytes(family1), Bytes.toBytes(qf1));
355 
356     assertEquals("Before compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum),
357       countMobRows(table));
358     assertEquals("Before compaction: mob cells count", regionNum
359       * (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(table));
360     assertEquals("Before compaction: family1 mob file count", regionNum * count,
361       countFiles(tableName, true, family1));
362     assertEquals("Before compaction: family2 mob file count", regionNum * count,
363       countFiles(tableName, true, family2));
364     assertEquals("Before compaction: family1 del file count", regionNum,
365       countFiles(tableName, false, family1));
366     assertEquals("Before compaction: family2 del file count", regionNum,
367       countFiles(tableName, false, family2));
368 
369     // do the major mob compaction, it will force all files to compaction
370     admin.majorCompact(tableName, hcd1.getName(), Admin.CompactType.MOB);
371 
372     waitUntilMobCompactionFinished(tableName);
373     assertEquals("After compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum),
374       countMobRows(table));
375     assertEquals("After compaction: mob cells count", regionNum
376       * (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(table));
377     assertEquals("After compaction: family1 mob file count", regionNum,
378       countFiles(tableName, true, family1));
379     assertEquals("After compaction: family2 mob file count", regionNum * count,
380       countFiles(tableName, true, family2));
381     assertEquals("After compaction: family1 del file count", 0,
382       countFiles(tableName, false, family1));
383     assertEquals("After compaction: family2 del file count", regionNum,
384       countFiles(tableName, false, family2));
385     Assert.assertTrue(verifyEncryption(tableName, family1));
386     table.close();
387   }
388 
389   @Test(timeout = 300000)
390   public void testScannerOnBulkLoadRefHFiles() throws Exception {
391     resetConf();
392     setUp("testScannerOnBulkLoadRefHFiles");
393     long ts = EnvironmentEdgeManager.currentTime();
394     byte[] key0 = Bytes.toBytes("k0");
395     byte[] key1 = Bytes.toBytes("k1");
396     String value0 = "mobValue0";
397     String value1 = "mobValue1";
398     String newValue0 = "new";
399     Put put0 = new Put(key0);
400     put0.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(value0));
401     loadData(admin, bufMut, tableName, new Put[] { put0 });
402     put0 = new Put(key0);
403     put0.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(newValue0));
404     Put put1 = new Put(key1);
405     put1.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(value1));
406     loadData(admin, bufMut, tableName, new Put[] { put0, put1 });
407     // read the latest cell of key0.
408     Get get = new Get(key0);
409     Result result = table.get(get);
410     Cell cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1));
411     assertEquals("Before compaction: mob value of k0", newValue0,
412       Bytes.toString(CellUtil.cloneValue(cell)));
413     admin.majorCompact(tableName, hcd1.getName(), Admin.CompactType.MOB);
414     waitUntilMobCompactionFinished(tableName);
415     // read the latest cell of key0, the cell seqId in bulk loaded file is not reset in the
416     // scanner. The cell that has "new" value is still visible.
417     result = table.get(get);
418     cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1));
419     assertEquals("After compaction: mob value of k0", newValue0,
420       Bytes.toString(CellUtil.cloneValue(cell)));
421     // read the ref cell, not read further to the mob cell.
422     get = new Get(key1);
423     get.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(true));
424     result = table.get(get);
425     cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1));
426     // the ref name is the new file
427     Path mobFamilyPath =
428       MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, hcd1.getNameAsString());
429     List<Path> paths = new ArrayList<Path>();
430     if (fs.exists(mobFamilyPath)) {
431       FileStatus[] files = fs.listStatus(mobFamilyPath);
432       for (FileStatus file : files) {
433         if (!StoreFileInfo.isDelFile(file.getPath())) {
434           paths.add(file.getPath());
435         }
436       }
437     }
438     assertEquals("After compaction: number of mob files:", 1, paths.size());
439     assertEquals("After compaction: mob file name:", MobUtils.getMobFileName(cell), paths.get(0)
440       .getName());
441   }
442 
443   /**
444    * This case tests the following mob compaction and normal compaction scenario,
445    * after mob compaction, the mob reference in new bulkloaded hfile will win even after it
446    * is compacted with some other normal hfiles. This is to make sure the mvcc is included
447    * after compaction for mob enabled store files.
448    */
449   @Test(timeout = 300000)
450   public void testGetAfterCompaction() throws Exception {
451     resetConf();
452     conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0);
453     String famStr = "f1";
454     byte[] fam = Bytes.toBytes(famStr);
455     byte[] qualifier = Bytes.toBytes("q1");
456     byte[] mobVal = Bytes.toBytes("01234567890");
457     HTableDescriptor hdt = new HTableDescriptor(TableName.valueOf("testGetAfterCompaction"));
458     hdt.addCoprocessor(CompactTwoLatestHfilesCopro.class.getName());
459     HColumnDescriptor hcd = new HColumnDescriptor(fam);
460     hcd.setMobEnabled(true);
461     hcd.setMobThreshold(10);
462     hcd.setMaxVersions(1);
463     hdt.addFamily(hcd);
464     try {
465       Table table = TEST_UTIL.createTable(hdt, null);
466       HRegion r = TEST_UTIL.getMiniHBaseCluster().getRegions(hdt.getTableName()).get(0);
467       Put p = new Put(Bytes.toBytes("r1"));
468       p.addColumn(fam, qualifier, mobVal);
469       table.put(p);
470       // Create mob file mob1 and reference file ref1
471       TEST_UTIL.flush(table.getName());
472       // Make sure that it is flushed.
473       FileSystem fs = r.getRegionFileSystem().getFileSystem();
474       Path path = r.getRegionFileSystem().getStoreDir(famStr);
475       waitUntilFilesShowup(fs, path, 1);
476 
477       p = new Put(Bytes.toBytes("r2"));
478       p.addColumn(fam, qualifier, mobVal);
479       table.put(p);
480       // Create mob file mob2 and reference file ref2
481       TEST_UTIL.flush(table.getName());
482       waitUntilFilesShowup(fs, path, 2);
483       // Do mob compaction to create mob3 and ref3
484       admin.compact(hdt.getTableName(), fam, CompactType.MOB);
485       waitUntilFilesShowup(fs, path, 3);
486 
487       // Compact ref3 and ref2 into ref4
488       admin.compact(hdt.getTableName(), fam);
489       waitUntilFilesShowup(fs, path, 2);
490 
491       // Sleep for some time, since TimeToLiveHFileCleaner is 0, the next run of
492       // clean chore is guaranteed to clean up files in archive
493       Thread.sleep(100);
494       // Run cleaner to make sure that files in archive directory are cleaned up
495       TEST_UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting();
496 
497       // Get "r2"
498       Get get = new Get(Bytes.toBytes("r2"));
499       try {
500         Result result = table.get(get);
501         assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal));
502       } catch (IOException e) {
503         assertTrue("The MOB file doesn't exist", false);
504       }
505     } finally {
506       TEST_UTIL.deleteTable(hdt.getTableName());
507     }
508   }
509 
510   private void waitUntilFilesShowup(final FileSystem fs, final Path path, final int num)
511       throws InterruptedException, IOException {
512     FileStatus[] fileList = fs.listStatus(path);
513     while (fileList.length != num) {
514       Thread.sleep(50);
515       fileList = fs.listStatus(path);
516     }
517   }
518 
519   /**
520    * This copro overwrites the default compaction policy. It always chooses two latest
521    * hfiles and compacts them into a new one.
522    */
523   public static class CompactTwoLatestHfilesCopro extends BaseRegionObserver {
524     @Override
525     public void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
526         final Store store, final List<StoreFile> candidates, final CompactionRequest request)
527             throws IOException {
528 
529       int count = candidates.size();
530       if (count >= 2) {
531         for (int i = 0; i < count - 2; i++) {
532           candidates.remove(0);
533         }
534         c.bypass();
535       }
536     }
537   }
538   private void waitUntilCompactionFinished(TableName tableName) throws IOException,
539     InterruptedException {
540     long finished = EnvironmentEdgeManager.currentTime() + 60000;
541     CompactionState state = admin.getCompactionState(tableName);
542     while (EnvironmentEdgeManager.currentTime() < finished) {
543       if (state == CompactionState.NONE) {
544         break;
545       }
546       state = admin.getCompactionState(tableName);
547       Thread.sleep(10);
548     }
549     assertEquals(CompactionState.NONE, state);
550   }
551 
552   private void waitUntilMobCompactionFinished(TableName tableName) throws IOException,
553     InterruptedException {
554     long finished = EnvironmentEdgeManager.currentTime() + 60000;
555     CompactionState state = admin.getCompactionState(tableName, Admin.CompactType.MOB);
556     while (EnvironmentEdgeManager.currentTime() < finished) {
557       if (state == CompactionState.NONE) {
558         break;
559       }
560       state = admin.getCompactionState(tableName, Admin.CompactType.MOB);
561       Thread.sleep(10);
562     }
563     assertEquals(CompactionState.NONE, state);
564   }
565 
566   /**
567    * Gets the number of rows in the given table.
568    * @param table to get the  scanner
569    * @return the number of rows
570    */
571   private int countMobRows(final Table table) throws IOException {
572     Scan scan = new Scan();
573     // Do not retrieve the mob data when scanning
574     scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
575     return TEST_UTIL.countRows(table, scan);
576   }
577 
578   /**
579    * Gets the number of cells in the given table.
580    * @param table to get the  scanner
581    * @return the number of cells
582    */
583   private int countMobCells(final Table table) throws IOException {
584     Scan scan = new Scan();
585     // Do not retrieve the mob data when scanning
586     scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
587     ResultScanner results = table.getScanner(scan);
588     int count = 0;
589     for (Result res : results) {
590       for (Cell cell : res.listCells()) {
591         count++;
592       }
593     }
594     results.close();
595     return count;
596   }
597 
598   /**
599    * Gets the number of files in the mob path.
600    * @param isMobFile gets number of the mob files or del files
601    * @param familyName the family name
602    * @return the number of the files
603    */
604   private int countFiles(TableName tableName, boolean isMobFile, String familyName)
605     throws IOException {
606     Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableName, familyName);
607     int count = 0;
608     if (fs.exists(mobDirPath)) {
609       FileStatus[] files = fs.listStatus(mobDirPath);
610       for (FileStatus file : files) {
611         if (isMobFile == true) {
612           if (!StoreFileInfo.isDelFile(file.getPath())) {
613             count++;
614           }
615         } else {
616           if (StoreFileInfo.isDelFile(file.getPath())) {
617             count++;
618           }
619         }
620       }
621     }
622     return count;
623   }
624 
625   private boolean verifyEncryption(TableName tableName, String familyName) throws IOException {
626     Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableName, familyName);
627     boolean hasFiles = false;
628     if (fs.exists(mobDirPath)) {
629       FileStatus[] files = fs.listStatus(mobDirPath);
630       hasFiles = files != null && files.length > 0;
631       Assert.assertTrue(hasFiles);
632       Path path = files[0].getPath();
633       CacheConfig cacheConf = new CacheConfig(conf);
634       StoreFile sf = new StoreFile(TEST_UTIL.getTestFileSystem(), path, conf, cacheConf,
635         BloomType.NONE);
636       HFile.Reader reader = sf.createReader().getHFileReader();
637       byte[] encryptionKey = reader.getTrailer().getEncryptionKey();
638       Assert.assertTrue(null != encryptionKey);
639       Assert.assertTrue(reader.getFileContext().getEncryptionContext().getCipher().getName()
640         .equals(HConstants.CIPHER_AES));
641     }
642     return hasFiles;
643   }
644 
645   /**
646    * Gets the number of HFileLink in the mob path.
647    * @param familyName the family name
648    * @return the number of the HFileLink
649    */
650   private int countHFileLinks(String familyName) throws IOException {
651     Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableName, familyName);
652     int count = 0;
653     if (fs.exists(mobDirPath)) {
654       FileStatus[] files = fs.listStatus(mobDirPath);
655       for (FileStatus file : files) {
656         if (HFileLink.isHFileLink(file.getPath())) {
657           count++;
658         }
659       }
660     }
661     return count;
662   }
663 
664   /**
665    * Gets the number of files.
666    * @param size the size of the file
667    * @param tableName the current table name
668    * @param familyName the family name
669    * @return the number of files large than the size
670    */
671   private int countLargeFiles(int size, TableName tableName, String familyName) throws IOException {
672     Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableName, familyName);
673     int count = 0;
674     if (fs.exists(mobDirPath)) {
675       FileStatus[] files = fs.listStatus(mobDirPath);
676       for (FileStatus file : files) {
677         // ignore the del files in the mob path
678         if ((!StoreFileInfo.isDelFile(file.getPath())) && (file.getLen() > size)) {
679           count++;
680         }
681       }
682     }
683     return count;
684   }
685 
686   /**
687    * loads some data to the table.
688    */
689   private void loadData(Admin admin, BufferedMutator table, TableName tableName, int fileNum,
690     int rowNumPerFile) throws IOException, InterruptedException {
691     if (fileNum <= 0) {
692       throw new IllegalArgumentException();
693     }
694     for (int i = 0; i < fileNum * rowNumPerFile; i++) {
695       for (byte k0 : KEYS) {
696         byte[] k = new byte[] { k0 };
697         byte[] key = Bytes.add(k, Bytes.toBytes(i));
698         byte[] mobVal = makeDummyData(10 * (i + 1));
699         Put put = new Put(key);
700         put.setDurability(Durability.SKIP_WAL);
701         put.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), mobVal);
702         put.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf2), mobVal);
703         put.addColumn(Bytes.toBytes(family2), Bytes.toBytes(qf1), mobVal);
704         table.mutate(put);
705       }
706       if ((i + 1) % rowNumPerFile == 0) {
707         table.flush();
708         admin.flush(tableName);
709       }
710     }
711   }
712 
713   private void loadData(Admin admin, BufferedMutator table, TableName tableName, Put[] puts)
714     throws IOException {
715     table.mutate(Arrays.asList(puts));
716     table.flush();
717     admin.flush(tableName);
718   }
719 
720   /**
721    * delete the row, family and cell to create the del file
722    */
723   private void createDelFile(Table table, TableName tableName, byte[] family, byte[] qf)
724     throws IOException, InterruptedException {
725     for (byte k0 : KEYS) {
726       byte[] k = new byte[] { k0 };
727       // delete a family
728       byte[] key1 = Bytes.add(k, Bytes.toBytes(0));
729       Delete delete1 = new Delete(key1);
730       delete1.addFamily(family);
731       table.delete(delete1);
732       // delete one row
733       byte[] key2 = Bytes.add(k, Bytes.toBytes(2));
734       Delete delete2 = new Delete(key2);
735       table.delete(delete2);
736       // delete one cell
737       byte[] key3 = Bytes.add(k, Bytes.toBytes(4));
738       Delete delete3 = new Delete(key3);
739       delete3.addColumn(family, qf);
740       table.delete(delete3);
741     }
742     admin.flush(tableName);
743     List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(tableName);
744     for (HRegion region : regions) {
745       region.waitForFlushesAndCompactions();
746       region.compact(true);
747     }
748   }
749   /**
750    * Creates the dummy data with a specific size.
751    * @param size the size of value
752    * @return the dummy data
753    */
754   private byte[] makeDummyData(int size) {
755     byte[] dummyData = new byte[size];
756     new Random().nextBytes(dummyData);
757     return dummyData;
758   }
759 
760   /**
761    * Gets the split keys
762    */
763   private byte[][] getSplitKeys() {
764     byte[][] splitKeys = new byte[KEYS.length - 1][];
765     for (int i = 0; i < splitKeys.length; ++i) {
766       splitKeys[i] = new byte[] { KEYS[i + 1] };
767     }
768     return splitKeys;
769   }
770 
771   private static ExecutorService createThreadPool(Configuration conf) {
772     int maxThreads = 10;
773     long keepAliveTime = 60;
774     final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
775     ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads,
776         keepAliveTime, TimeUnit.SECONDS, queue,
777         Threads.newDaemonThreadFactory("MobFileCompactionChore"),
778         new RejectedExecutionHandler() {
779           @Override
780           public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
781             try {
782               // waiting for a thread to pick up instead of throwing exceptions.
783               queue.put(r);
784             } catch (InterruptedException e) {
785               throw new RejectedExecutionException(e);
786             }
787           }
788         });
789     ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
790     return pool;
791   }
792 
793   private void assertRefFileNameEqual(String familyName) throws IOException {
794     Scan scan = new Scan();
795     scan.addFamily(Bytes.toBytes(familyName));
796     // Do not retrieve the mob data when scanning
797     scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
798     ResultScanner results = table.getScanner(scan);
799     Path mobFamilyPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(),
800         tableName, familyName);
801     List<Path> actualFilePaths = new ArrayList<>();
802     List<Path> expectFilePaths = new ArrayList<>();
803     for (Result res : results) {
804       for (Cell cell : res.listCells()) {
805         byte[] referenceValue = CellUtil.cloneValue(cell);
806         String fileName = Bytes.toString(referenceValue, Bytes.SIZEOF_INT,
807             referenceValue.length - Bytes.SIZEOF_INT);
808         Path targetPath = new Path(mobFamilyPath, fileName);
809         if(!actualFilePaths.contains(targetPath)) {
810           actualFilePaths.add(targetPath);
811         }
812       }
813     }
814     results.close();
815     if (fs.exists(mobFamilyPath)) {
816       FileStatus[] files = fs.listStatus(mobFamilyPath);
817       for (FileStatus file : files) {
818         if (!StoreFileInfo.isDelFile(file.getPath())) {
819           expectFilePaths.add(file.getPath());
820         }
821       }
822     }
823     Collections.sort(actualFilePaths);
824     Collections.sort(expectFilePaths);
825     assertEquals(expectFilePaths, actualFilePaths);
826   }
827 
828   /**
829    * Resets the configuration.
830    */
831   private void resetConf() {
832     conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD,
833       MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD);
834     conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE,
835       MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE);
836   }
837 }