View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mob.compactions;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.Collections;
25  import java.util.Date;
26  import java.util.List;
27  import java.util.Random;
28  import java.util.UUID;
29  import java.util.concurrent.ExecutorService;
30  import java.util.concurrent.RejectedExecutionException;
31  import java.util.concurrent.RejectedExecutionHandler;
32  import java.util.concurrent.SynchronousQueue;
33  import java.util.concurrent.ThreadPoolExecutor;
34  import java.util.concurrent.TimeUnit;
35  
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FileStatus;
38  import org.apache.hadoop.fs.FileSystem;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.hbase.*;
41  import org.apache.hadoop.hbase.KeyValue.Type;
42  import org.apache.hadoop.hbase.regionserver.*;
43  import org.apache.hadoop.hbase.testclassification.LargeTests;
44  import org.apache.hadoop.hbase.client.Scan;
45  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
46  import org.apache.hadoop.hbase.io.hfile.HFileContext;
47  import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
48  import org.apache.hadoop.hbase.mob.MobConstants;
49  import org.apache.hadoop.hbase.mob.MobFileName;
50  import org.apache.hadoop.hbase.mob.MobUtils;
51  import org.apache.hadoop.hbase.mob.compactions.MobCompactionRequest.CompactionType;
52  import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition;
53  import org.apache.hadoop.hbase.util.Bytes;
54  import org.apache.hadoop.hbase.util.FSUtils;
55  import org.apache.hadoop.hbase.util.Threads;
56  import org.apache.hadoop.hdfs.DistributedFileSystem;
57  import org.junit.AfterClass;
58  import org.junit.Assert;
59  import static org.junit.Assert.assertTrue;
60  import org.junit.BeforeClass;
61  import org.junit.Test;
62  import org.junit.experimental.categories.Category;
63  
64  @Category(LargeTests.class)
65  public class TestPartitionedMobCompactor {
66    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
67    private final static String family = "family";
68    private final static String qf = "qf";
69    private HColumnDescriptor hcd = new HColumnDescriptor(family);
70    private Configuration conf = TEST_UTIL.getConfiguration();
71    private CacheConfig cacheConf = new CacheConfig(conf);
72    private FileSystem fs;
73    private List<FileStatus> mobFiles = new ArrayList<>();
74    private List<FileStatus> delFiles = new ArrayList<>();
75    private List<FileStatus> allFiles = new ArrayList<>();
76    private Path basePath;
77    private String mobSuffix;
78    private String delSuffix;
79    private static ExecutorService pool;
80  
81    @BeforeClass
82    public static void setUpBeforeClass() throws Exception {
83      TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
84      // Inject our customized DistributedFileSystem
85      TEST_UTIL.getConfiguration().setClass("fs.hdfs.impl", FaultyDistributedFileSystem.class,
86          DistributedFileSystem.class);
87      TEST_UTIL.startMiniCluster(1);
88      pool = createThreadPool();
89    }
90  
91    @AfterClass
92    public static void tearDownAfterClass() throws Exception {
93      pool.shutdown();
94      TEST_UTIL.shutdownMiniCluster();
95    }
96  
97    private void init(String tableName) throws Exception {
98      fs = FileSystem.get(conf);
99      Path testDir = FSUtils.getRootDir(conf);
100     Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
101     basePath = new Path(new Path(mobTestDir, tableName), family);
102     mobSuffix = UUID.randomUUID().toString().replaceAll("-", "");
103     delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del";
104   }
105 
106   @Test
107   public void testCompactionSelectWithAllFiles() throws Exception {
108     String tableName = "testCompactionSelectWithAllFiles";
109     // If there is only 1 file, it will not be compacted with _del files, so
110     // It wont be CompactionType.ALL_FILES in this case, do not create with _del files.
111     testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD,
112         CompactionType.ALL_FILES, false, false);
113   }
114 
115   @Test
116   public void testCompactionSelectToAvoidCompactOneFileWithDelete() throws Exception {
117     String tableName = "testCompactionSelectToAvoidCompactOneFileWithDelete";
118     // If there is only 1 file, it will not be compacted with _del files, so
119     // It wont be CompactionType.ALL_FILES in this case, and expected compact file count will be 0.
120     testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD,
121         CompactionType.PART_FILES, false);
122   }
123 
124 
125   @Test
126   public void testCompactionSelectWithPartFiles() throws Exception {
127     String tableName = "testCompactionSelectWithPartFiles";
128     testCompactionAtMergeSize(tableName, 4000, CompactionType.PART_FILES, false);
129   }
130 
131   @Test
132   public void testCompactionSelectWithForceAllFiles() throws Exception {
133     String tableName = "testCompactionSelectWithForceAllFiles";
134     testCompactionAtMergeSize(tableName, Long.MAX_VALUE, CompactionType.ALL_FILES, true);
135   }
136 
137   private void testCompactionAtMergeSize(final String tableName,
138       final long mergeSize, final CompactionType type, final boolean isForceAllFiles)
139       throws Exception {
140     testCompactionAtMergeSize(tableName, mergeSize, type, isForceAllFiles, true);
141   }
142 
143   private void testCompactionAtMergeSize(final String tableName,
144       final long mergeSize, final CompactionType type, final boolean isForceAllFiles,
145       final boolean createDelFiles)
146       throws Exception {
147     resetConf();
148     init(tableName);
149     int count = 10;
150     // create 10 mob files.
151     createStoreFiles(basePath, family, qf, count, Type.Put);
152 
153     if (createDelFiles) {
154       // create 10 del files
155       createStoreFiles(basePath, family, qf, count, Type.Delete);
156     }
157 
158     listFiles();
159     List<String> expectedStartKeys = new ArrayList<>();
160     for(FileStatus file : mobFiles) {
161       if(file.getLen() < mergeSize) {
162         String fileName = file.getPath().getName();
163         String startKey = fileName.substring(0, 32);
164 
165         // If it is not an major mob compaction and del files are there,
166         // these mob files wont be compacted.
167         if (isForceAllFiles || !createDelFiles) {
168           expectedStartKeys.add(startKey);
169         }
170       }
171     }
172     // set the mob compaction mergeable threshold
173     conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
174     testSelectFiles(tableName, type, isForceAllFiles, expectedStartKeys);
175   }
176 
177   @Test
178   public void testCompactDelFilesWithDefaultBatchSize() throws Exception {
179     String tableName = "testCompactDelFilesWithDefaultBatchSize";
180     testCompactDelFilesAtBatchSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE,
181         MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
182   }
183 
184   @Test
185   public void testCompactDelFilesWithSmallBatchSize() throws Exception {
186     String tableName = "testCompactDelFilesWithSmallBatchSize";
187     testCompactDelFilesAtBatchSize(tableName, 4, MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
188   }
189 
190   @Test
191   public void testCompactDelFilesChangeMaxDelFileCount() throws Exception {
192     String tableName = "testCompactDelFilesWithSmallBatchSize";
193     testCompactDelFilesAtBatchSize(tableName, 4, 2);
194   }
195 
196   @Test
197   public void testCompactFilesWithDstDirFull() throws Exception {
198     String tableName = "testCompactFilesWithDstDirFull";
199     fs = FileSystem.get(conf);
200     FaultyDistributedFileSystem faultyFs = (FaultyDistributedFileSystem)fs;
201     Path testDir = FSUtils.getRootDir(conf);
202     Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
203     basePath = new Path(new Path(mobTestDir, tableName), family);
204 
205     try {
206       int count = 2;
207       // create 2 mob files.
208       createStoreFiles(basePath, family, qf, count, Type.Put, true);
209       listFiles();
210 
211       TableName tName = TableName.valueOf(tableName);
212       MobCompactor compactor = new PartitionedMobCompactor(conf, faultyFs, tName, hcd, pool);
213       faultyFs.setThrowException(true);
214       try {
215         compactor.compact(allFiles, true);
216       } catch (IOException e) {
217         System.out.println("Expected exception, ignore");
218       }
219 
220       // Verify that all the files in tmp directory are cleaned up
221       Path tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME);
222       FileStatus[] ls = faultyFs.listStatus(tempPath);
223 
224       // Only .bulkload under this directory
225       assertTrue(ls.length == 1);
226       assertTrue(MobConstants.BULKLOAD_DIR_NAME.equalsIgnoreCase(ls[0].getPath().getName()));
227 
228       Path bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path(
229           tName.getNamespaceAsString(), tName.getQualifierAsString())));
230 
231       // Nothing in bulkLoad directory
232       FileStatus[] lsBulkload = faultyFs.listStatus(bulkloadPath);
233       assertTrue(lsBulkload.length == 0);
234 
235     } finally {
236       faultyFs.setThrowException(false);
237     }
238   }
239 
240 
241   private void testCompactDelFilesAtBatchSize(String tableName, int batchSize,
242       int delfileMaxCount)  throws Exception {
243     resetConf();
244     init(tableName);
245     // create 20 mob files.
246     createStoreFiles(basePath, family, qf, 20, Type.Put);
247     // create 13 del files
248     createStoreFiles(basePath, family, qf, 13, Type.Delete);
249     listFiles();
250 
251     // set the max del file count
252     conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, delfileMaxCount);
253     // set the mob compaction batch size
254     conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, batchSize);
255     testCompactDelFiles(tableName, 1, 13, false);
256   }
257 
258   /**
259    * Tests the selectFiles
260    * @param tableName the table name
261    * @param type the expected compaction type
262    * @param isForceAllFiles whether all the mob files are selected
263    * @param expected the expected start keys
264    */
265   private void testSelectFiles(String tableName, final CompactionType type,
266     final boolean isForceAllFiles, final List<String> expected) throws IOException {
267     PartitionedMobCompactor compactor = new PartitionedMobCompactor(conf, fs,
268       TableName.valueOf(tableName), hcd, pool) {
269       @Override
270       public List<Path> compact(List<FileStatus> files, boolean isForceAllFiles)
271         throws IOException {
272         if (files == null || files.isEmpty()) {
273           return null;
274         }
275         PartitionedMobCompactionRequest request = select(files, isForceAllFiles);
276         // assert the compaction type
277         Assert.assertEquals(type, request.type);
278         // assert get the right partitions
279         compareCompactedPartitions(expected, request.compactionPartitions);
280         // assert get the right del files
281         compareDelFiles(request.delFiles);
282         return null;
283       }
284     };
285     compactor.compact(allFiles, isForceAllFiles);
286   }
287 
288   /**
289    * Tests the compacteDelFile
290    * @param tableName the table name
291    * @param expectedFileCount the expected file count
292    * @param expectedCellCount the expected cell count
293    * @param isForceAllFiles whether all the mob files are selected
294    */
295   private void testCompactDelFiles(String tableName, final int expectedFileCount,
296       final int expectedCellCount, boolean isForceAllFiles) throws IOException {
297     PartitionedMobCompactor compactor = new PartitionedMobCompactor(conf, fs,
298       TableName.valueOf(tableName), hcd, pool) {
299       @Override
300       protected List<Path> performCompaction(PartitionedMobCompactionRequest request)
301           throws IOException {
302         List<Path> delFilePaths = new ArrayList<Path>();
303         for (FileStatus delFile : request.delFiles) {
304           delFilePaths.add(delFile.getPath());
305         }
306         List<Path> newDelPaths = compactDelFiles(request, delFilePaths);
307         // assert the del files are merged.
308         Assert.assertEquals(expectedFileCount, newDelPaths.size());
309         Assert.assertEquals(expectedCellCount, countDelCellsInDelFiles(newDelPaths));
310         return null;
311       }
312     };
313     compactor.compact(allFiles, isForceAllFiles);
314   }
315 
316   /**
317    * Lists the files in the path
318    */
319   private void listFiles() throws IOException {
320     for (FileStatus file : fs.listStatus(basePath)) {
321       allFiles.add(file);
322       if (file.getPath().getName().endsWith("_del")) {
323         delFiles.add(file);
324       } else {
325         mobFiles.add(file);
326       }
327     }
328   }
329 
330   /**
331    * Compares the compacted partitions.
332    * @param partitions the collection of CompactedPartitions
333    */
334   private void compareCompactedPartitions(List<String> expected,
335       Collection<CompactionPartition> partitions) {
336     List<String> actualKeys = new ArrayList<>();
337     for (CompactionPartition partition : partitions) {
338       actualKeys.add(partition.getPartitionId().getStartKey());
339     }
340     Collections.sort(expected);
341     Collections.sort(actualKeys);
342     Assert.assertEquals(expected.size(), actualKeys.size());
343     for (int i = 0; i < expected.size(); i++) {
344       Assert.assertEquals(expected.get(i), actualKeys.get(i));
345     }
346   }
347 
348   /**
349    * Compares the del files.
350    * @param allDelFiles all the del files
351    */
352   private void compareDelFiles(Collection<FileStatus> allDelFiles) {
353     int i = 0;
354     for (FileStatus file : allDelFiles) {
355       Assert.assertEquals(delFiles.get(i), file);
356       i++;
357     }
358   }
359 
360   /**
361    * Creates store files.
362    * @param basePath the path to create file
363    * @family the family name
364    * @qualifier the column qualifier
365    * @count the store file number
366    * @type the key type
367    */
368   private void createStoreFiles(Path basePath, String family, String qualifier, int count,
369       Type type) throws IOException {
370     createStoreFiles(basePath, family, qualifier, count, type, false);
371   }
372 
373   private void createStoreFiles(Path basePath, String family, String qualifier, int count,
374       Type type, boolean sameStartKey) throws IOException {
375     HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
376     String startKey = "row_";
377     MobFileName mobFileName = null;
378     for (int i = 0; i < count; i++) {
379       byte[] startRow;
380       if (sameStartKey) {
381         // When creating multiple files under one partition, suffix needs to be different.
382         startRow = Bytes.toBytes(startKey);
383         mobSuffix = UUID.randomUUID().toString().replaceAll("-", "");
384         delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del";
385       } else {
386         startRow = Bytes.toBytes(startKey + i);
387       }
388       if(type.equals(Type.Delete)) {
389         mobFileName = MobFileName.create(startRow, MobUtils.formatDate(
390             new Date()), delSuffix);
391       }
392       if(type.equals(Type.Put)){
393         mobFileName = MobFileName.create(startRow, MobUtils.formatDate(
394             new Date()), mobSuffix);
395       }
396       StoreFile.Writer mobFileWriter = new StoreFile.WriterBuilder(conf, cacheConf, fs)
397       .withFileContext(meta).withFilePath(new Path(basePath, mobFileName.getFileName())).build();
398       writeStoreFile(mobFileWriter, startRow, Bytes.toBytes(family), Bytes.toBytes(qualifier),
399           type, (i+1)*1000);
400     }
401   }
402 
403   /**
404    * Writes data to store file.
405    * @param writer the store file writer
406    * @param row the row key
407    * @param family the family name
408    * @param qualifier the column qualifier
409    * @param type the key type
410    * @param size the size of value
411    */
412   private static void writeStoreFile(final StoreFile.Writer writer, byte[]row, byte[] family,
413       byte[] qualifier, Type type, int size) throws IOException {
414     long now = System.currentTimeMillis();
415     try {
416       byte[] dummyData = new byte[size];
417       new Random().nextBytes(dummyData);
418       writer.append(new KeyValue(row, family, qualifier, now, type, dummyData));
419     } finally {
420       writer.close();
421     }
422   }
423 
424   /**
425    * Gets the number of del cell in the del files
426    * @param paths the del file paths
427    * @return the cell size
428    */
429   private int countDelCellsInDelFiles(List<Path> paths) throws IOException {
430     List<StoreFile> sfs = new ArrayList<StoreFile>();
431     int size = 0;
432     for(Path path : paths) {
433       StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE);
434       sfs.add(sf);
435     }
436     List scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true,
437         false, null, HConstants.LATEST_TIMESTAMP);
438     Scan scan = new Scan();
439     scan.setMaxVersions(hcd.getMaxVersions());
440     long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
441     long ttl = HStore.determineTTLFromFamily(hcd);
442     ScanInfo scanInfo = new ScanInfo(hcd, ttl, timeToPurgeDeletes, KeyValue.COMPARATOR);
443     StoreScanner scanner = new StoreScanner(scan, scanInfo, ScanType.COMPACT_RETAIN_DELETES, null,
444         scanners, 0L, HConstants.LATEST_TIMESTAMP);
445     List<Cell> results = new ArrayList<>();
446     boolean hasMore = true;
447 
448     while (hasMore) {
449       hasMore = scanner.next(results);
450       size += results.size();
451       results.clear();
452     }
453     scanner.close();
454     return size;
455   }
456 
457   private static ExecutorService createThreadPool() {
458     int maxThreads = 10;
459     long keepAliveTime = 60;
460     final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
461     ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime,
462       TimeUnit.SECONDS, queue, Threads.newDaemonThreadFactory("MobFileCompactionChore"),
463       new RejectedExecutionHandler() {
464         @Override
465         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
466           try {
467             // waiting for a thread to pick up instead of throwing exceptions.
468             queue.put(r);
469           } catch (InterruptedException e) {
470             throw new RejectedExecutionException(e);
471           }
472         }
473       });
474     ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
475     return pool;
476   }
477 
478   /**
479    * Resets the configuration.
480    */
481   private void resetConf() {
482     conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD,
483       MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD);
484     conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
485     conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE,
486       MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE);
487   }
488 
489   /**
490    * The customized Distributed File System Implementation
491    */
492   static class FaultyDistributedFileSystem extends DistributedFileSystem {
493     private volatile boolean throwException = false;
494 
495     public FaultyDistributedFileSystem() {
496       super();
497     }
498 
499     public void setThrowException(boolean throwException) {
500       this.throwException = throwException;
501     }
502 
503     @Override
504     public boolean rename(Path src, Path dst) throws IOException {
505       if (throwException) {
506         throw new IOException("No more files allowed");
507       }
508       return super.rename(src, dst);
509     }
510   }
511 }