View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mob.compactions;
20  
21  import java.io.FileNotFoundException;
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.Collections;
26  import java.util.Date;
27  import java.util.HashMap;
28  import java.util.Iterator;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Map.Entry;
32  import java.util.concurrent.Callable;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.Future;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.hbase.classification.InterfaceAudience;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.fs.FileStatus;
41  import org.apache.hadoop.fs.FileSystem;
42  import org.apache.hadoop.fs.Path;
43  import org.apache.hadoop.hbase.Cell;
44  import org.apache.hadoop.hbase.HColumnDescriptor;
45  import org.apache.hadoop.hbase.HConstants;
46  import org.apache.hadoop.hbase.KeyValue;
47  import org.apache.hadoop.hbase.TableName;
48  import org.apache.hadoop.hbase.Tag;
49  import org.apache.hadoop.hbase.TagType;
50  import org.apache.hadoop.hbase.client.Connection;
51  import org.apache.hadoop.hbase.client.ConnectionFactory;
52  import org.apache.hadoop.hbase.client.HTable;
53  import org.apache.hadoop.hbase.client.Scan;
54  import org.apache.hadoop.hbase.client.Table;
55  import org.apache.hadoop.hbase.io.HFileLink;
56  import org.apache.hadoop.hbase.io.crypto.Encryption;
57  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
58  import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
59  import org.apache.hadoop.hbase.mob.MobConstants;
60  import org.apache.hadoop.hbase.mob.MobFileName;
61  import org.apache.hadoop.hbase.mob.MobUtils;
62  import org.apache.hadoop.hbase.mob.compactions.MobCompactionRequest.CompactionType;
63  import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition;
64  import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId;
65  import org.apache.hadoop.hbase.regionserver.BloomType;
66  import org.apache.hadoop.hbase.regionserver.HStore;
67  import org.apache.hadoop.hbase.regionserver.ScanInfo;
68  import org.apache.hadoop.hbase.regionserver.ScanType;
69  import org.apache.hadoop.hbase.regionserver.ScannerContext;
70  import org.apache.hadoop.hbase.regionserver.StoreFile;
71  import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
72  import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
73  import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
74  import org.apache.hadoop.hbase.regionserver.StoreScanner;
75  import org.apache.hadoop.hbase.util.Bytes;
76  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
77  import org.apache.hadoop.hbase.util.Pair;
78  
79  /**
80   * An implementation of {@link MobCompactor} that compacts the mob files in partitions.
81   */
82  @InterfaceAudience.Private
83  public class PartitionedMobCompactor extends MobCompactor {
84  
85    private static final Log LOG = LogFactory.getLog(PartitionedMobCompactor.class);
86    protected long mergeableSize;
87    protected int delFileMaxCount;
88    /** The number of files compacted in a batch */
89    protected int compactionBatchSize;
90    protected int compactionKVMax;
91  
92    private Path tempPath;
93    private Path bulkloadPath;
94    private CacheConfig compactionCacheConfig;
95    private final List<Tag> refCellTags;
96    private Encryption.Context cryptoContext = Encryption.Context.NONE;
97  
98    public PartitionedMobCompactor(Configuration conf, FileSystem fs, TableName tableName,
99      HColumnDescriptor column, ExecutorService pool) throws IOException {
100     super(conf, fs, tableName, column, pool);
101     mergeableSize = conf.getLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD,
102       MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD);
103     delFileMaxCount = conf.getInt(MobConstants.MOB_DELFILE_MAX_COUNT,
104       MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
105     // default is 100
106     compactionBatchSize = conf.getInt(MobConstants.MOB_COMPACTION_BATCH_SIZE,
107       MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE);
108     tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME);
109     bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path(
110       tableName.getNamespaceAsString(), tableName.getQualifierAsString())));
111     compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX,
112       HConstants.COMPACTION_KV_MAX_DEFAULT);
113     Configuration copyOfConf = new Configuration(conf);
114     copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
115     compactionCacheConfig = new CacheConfig(copyOfConf);
116     refCellTags = new ArrayList<>(2);
117     refCellTags.add(MobConstants.MOB_REF_TAG);
118     Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, tableName.getName());
119     refCellTags.add(tableNameTag);
120     cryptoContext = MobUtils.createEncryptionContext(copyOfConf, column);
121   }
122 
123   @Override
124   public List<Path> compact(List<FileStatus> files, boolean allFiles) throws IOException {
125     if (files == null || files.isEmpty()) {
126       LOG.info("No candidate mob files");
127       return null;
128     }
129     LOG.info("is allFiles: " + allFiles);
130     // find the files to compact.
131     PartitionedMobCompactionRequest request = select(files, allFiles);
132     // compact the files.
133     return performCompaction(request);
134   }
135 
136   /**
137    * Selects the compacted mob/del files.
138    * Iterates the candidates to find out all the del files and small mob files.
139    * @param candidates All the candidates.
140    * @param allFiles Whether add all mob files into the compaction.
141    * @return A compaction request.
142    * @throws IOException
143    */
144   protected PartitionedMobCompactionRequest select(List<FileStatus> candidates,
145     boolean allFiles) throws IOException {
146     Collection<FileStatus> allDelFiles = new ArrayList<FileStatus>();
147     Map<CompactionPartitionId, CompactionPartition> filesToCompact =
148       new HashMap<CompactionPartitionId, CompactionPartition>();
149     final CompactionPartitionId id = new CompactionPartitionId();
150     int selectedFileCount = 0;
151     int irrelevantFileCount = 0;
152     for (FileStatus file : candidates) {
153       if (!file.isFile()) {
154         irrelevantFileCount++;
155         continue;
156       }
157       // group the del files and small files.
158       FileStatus linkedFile = file;
159       if (HFileLink.isHFileLink(file.getPath())) {
160         HFileLink link = HFileLink.buildFromHFileLinkPattern(conf, file.getPath());
161         linkedFile = getLinkedFileStatus(link);
162         if (linkedFile == null) {
163           // If the linked file cannot be found, regard it as an irrelevantFileCount file
164           irrelevantFileCount++;
165           continue;
166         }
167       }
168       if (StoreFileInfo.isDelFile(linkedFile.getPath())) {
169         allDelFiles.add(file);
170       } else if (allFiles || (linkedFile.getLen() < mergeableSize)) {
171         // add all files if allFiles is true,
172         // otherwise add the small files to the merge pool
173         String fileName = linkedFile.getPath().getName();
174         id.setStartKey(MobFileName.getStartKeyFromName(fileName));
175         id.setDate(MobFileName.getDateFromName(fileName));
176         CompactionPartition compactionPartition = filesToCompact.get(id);
177         if (compactionPartition == null) {
178           CompactionPartitionId newId = new CompactionPartitionId(id.getStartKey(), id.getDate());
179           compactionPartition = new CompactionPartition(newId);
180 
181           compactionPartition.addFile(file);
182           filesToCompact.put(newId, compactionPartition);
183         } else {
184           compactionPartition.addFile(file);
185         }
186         selectedFileCount++;
187       }
188     }
189 
190     /*
191      * If it is not a major mob compaction with del files, and the file number in Partition is 1,
192      * remove the partition from filesToCompact list to avoid re-compacting files which has been
193      * compacted with del files.
194      */
195     if (!allFiles && (allDelFiles.size() > 0)) {
196       Iterator<Map.Entry<CompactionPartitionId, CompactionPartition>> it =
197           filesToCompact.entrySet().iterator();
198 
199       while(it.hasNext()) {
200         Map.Entry<CompactionPartitionId, CompactionPartition> entry = it.next();
201         if (entry.getValue().getFileCount() == 1) {
202           it.remove();
203           --selectedFileCount;
204         }
205       }
206     }
207 
208     PartitionedMobCompactionRequest request = new PartitionedMobCompactionRequest(
209       filesToCompact.values(), allDelFiles);
210     if (candidates.size() == (allDelFiles.size() + selectedFileCount + irrelevantFileCount)) {
211       // all the files are selected
212       request.setCompactionType(CompactionType.ALL_FILES);
213     }
214     LOG.info("The compaction type is " + request.getCompactionType() + ", the request has "
215       + allDelFiles.size() + " del files, " + selectedFileCount + " selected files, and "
216       + irrelevantFileCount + " irrelevant files");
217     return request;
218   }
219 
220   /**
221    * Performs the compaction on the selected files.
222    * <ol>
223    * <li>Compacts the del files.</li>
224    * <li>Compacts the selected small mob files and all the del files.</li>
225    * <li>If all the candidates are selected, delete the del files.</li>
226    * </ol>
227    * @param request The compaction request.
228    * @return The paths of new mob files generated in the compaction.
229    * @throws IOException
230    */
231   protected List<Path> performCompaction(PartitionedMobCompactionRequest request)
232     throws IOException {
233     // merge the del files
234     List<Path> delFilePaths = new ArrayList<Path>();
235     for (FileStatus delFile : request.delFiles) {
236       delFilePaths.add(delFile.getPath());
237     }
238     List<Path> newDelPaths = compactDelFiles(request, delFilePaths);
239     List<StoreFile> newDelFiles = new ArrayList<StoreFile>();
240     List<Path> paths = null;
241     try {
242       for (Path newDelPath : newDelPaths) {
243         StoreFile sf = new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE);
244         // pre-create reader of a del file to avoid race condition when opening the reader in each
245         // partition.
246         sf.createReader();
247         newDelFiles.add(sf);
248       }
249       LOG.info("After merging, there are " + newDelFiles.size() + " del files");
250       // compact the mob files by partitions.
251       paths = compactMobFiles(request, newDelFiles);
252       LOG.info("After compaction, there are " + paths.size() + " mob files");
253     } finally {
254       closeStoreFileReaders(newDelFiles);
255     }
256     // archive the del files if all the mob files are selected.
257     if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) {
258       LOG.info(
259           "After a mob compaction with all files selected, archiving the del files " + newDelPaths);
260       try {
261         MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), newDelFiles);
262       } catch (IOException e) {
263         LOG.error("Failed to archive the del files " + newDelPaths, e);
264       }
265     }
266     return paths;
267   }
268 
269   /**
270    * Compacts the selected small mob files and all the del files.
271    * @param request The compaction request.
272    * @param delFiles The del files.
273    * @return The paths of new mob files after compactions.
274    * @throws IOException
275    */
276   protected List<Path> compactMobFiles(final PartitionedMobCompactionRequest request,
277     final List<StoreFile> delFiles) throws IOException {
278     Collection<CompactionPartition> partitions = request.compactionPartitions;
279     if (partitions == null || partitions.isEmpty()) {
280       LOG.info("No partitions of mob files");
281       return Collections.emptyList();
282     }
283     List<Path> paths = new ArrayList<Path>();
284     Connection c = ConnectionFactory.createConnection(conf);
285     final Table table = c.getTable(tableName);
286     try {
287       Map<CompactionPartitionId, Future<List<Path>>> results =
288         new HashMap<CompactionPartitionId, Future<List<Path>>>();
289       // compact the mob files by partitions in parallel.
290       for (final CompactionPartition partition : partitions) {
291         results.put(partition.getPartitionId(), pool.submit(new Callable<List<Path>>() {
292           @Override
293           public List<Path> call() throws Exception {
294             LOG.info("Compacting mob files for partition " + partition.getPartitionId());
295             return compactMobFilePartition(request, partition, delFiles, table);
296           }
297         }));
298       }
299       // compact the partitions in parallel.
300       List<CompactionPartitionId> failedPartitions = new ArrayList<CompactionPartitionId>();
301       for (Entry<CompactionPartitionId, Future<List<Path>>> result : results.entrySet()) {
302         try {
303           paths.addAll(result.getValue().get());
304         } catch (Exception e) {
305           // just log the error
306           LOG.error("Failed to compact the partition " + result.getKey(), e);
307           failedPartitions.add(result.getKey());
308         }
309       }
310       if (!failedPartitions.isEmpty()) {
311         // if any partition fails in the compaction, directly throw an exception.
312         throw new IOException("Failed to compact the partitions " + failedPartitions);
313       }
314     } finally {
315       try {
316         table.close();
317       } catch (IOException e) {
318         LOG.error("Failed to close the HTable", e);
319       }
320     }
321     return paths;
322   }
323 
324   /**
325    * Compacts a partition of selected small mob files and all the del files.
326    * @param request The compaction request.
327    * @param partition A compaction partition.
328    * @param delFiles The del files.
329    * @param table The current table.
330    * @return The paths of new mob files after compactions.
331    * @throws IOException
332    */
333   private List<Path> compactMobFilePartition(PartitionedMobCompactionRequest request,
334     CompactionPartition partition, List<StoreFile> delFiles, Table table) throws IOException {
335     if (MobUtils.isMobFileExpired(column, EnvironmentEdgeManager.currentTime(),
336         partition.getPartitionId().getDate())) {
337       // If the files in the partition are expired, do not compact them and directly
338       // return an empty list.
339       return Collections.emptyList();
340     }
341     List<Path> newFiles = new ArrayList<Path>();
342     List<FileStatus> files = partition.listFiles();
343     int offset = 0;
344     Path bulkloadPathOfPartition = new Path(bulkloadPath, partition.getPartitionId().toString());
345     Path bulkloadColumnPath = new Path(bulkloadPathOfPartition, column.getNameAsString());
346     while (offset < files.size()) {
347       int batch = compactionBatchSize;
348       if (files.size() - offset < compactionBatchSize) {
349         batch = files.size() - offset;
350       }
351       if (batch == 1 && delFiles.isEmpty()) {
352         // only one file left and no del files, do not compact it,
353         // and directly add it to the new files.
354         newFiles.add(files.get(offset).getPath());
355         offset++;
356         continue;
357       }
358       // clean the bulkload directory to avoid loading old files.
359       fs.delete(bulkloadPathOfPartition, true);
360       // add the selected mob files and del files into filesToCompact
361       List<StoreFile> filesToCompact = new ArrayList<StoreFile>();
362       for (int i = offset; i < batch + offset; i++) {
363         StoreFile sf = new StoreFile(fs, files.get(i).getPath(), conf, compactionCacheConfig,
364           BloomType.NONE);
365         filesToCompact.add(sf);
366       }
367       filesToCompact.addAll(delFiles);
368       // compact the mob files in a batch.
369       compactMobFilesInBatch(request, partition, table, filesToCompact, batch,
370         bulkloadPathOfPartition, bulkloadColumnPath, newFiles);
371       // move to the next batch.
372       offset += batch;
373     }
374     LOG.info("Compaction is finished. The number of mob files is changed from " + files.size()
375       + " to " + newFiles.size());
376     return newFiles;
377   }
378 
379   /**
380    * Closes the readers of store files.
381    * @param storeFiles The store files to be closed.
382    */
383   private void closeStoreFileReaders(List<StoreFile> storeFiles) {
384     for (StoreFile storeFile : storeFiles) {
385       try {
386         storeFile.closeReader(true);
387       } catch (IOException e) {
388         LOG.warn("Failed to close the reader on store file " + storeFile.getPath(), e);
389       }
390     }
391   }
392 
393   /**
394    * Compacts a partition of selected small mob files and all the del files in a batch.
395    * @param request The compaction request.
396    * @param partition A compaction partition.
397    * @param table The current table.
398    * @param filesToCompact The files to be compacted.
399    * @param batch The number of mob files to be compacted in a batch.
400    * @param bulkloadPathOfPartition The directory where the bulkload column of the current
401    *        partition is saved.
402    * @param bulkloadColumnPath The directory where the bulkload files of current partition
403    *        are saved.
404    * @param newFiles The paths of new mob files after compactions.
405    * @throws IOException
406    */
407   private void compactMobFilesInBatch(PartitionedMobCompactionRequest request,
408     CompactionPartition partition, Table table, List<StoreFile> filesToCompact, int batch,
409     Path bulkloadPathOfPartition, Path bulkloadColumnPath, List<Path> newFiles)
410     throws IOException {
411     // open scanner to the selected mob files and del files.
412     StoreScanner scanner = createScanner(filesToCompact, ScanType.COMPACT_DROP_DELETES);
413     // the mob files to be compacted, not include the del files.
414     List<StoreFile> mobFilesToCompact = filesToCompact.subList(0, batch);
415     // Pair(maxSeqId, cellsCount)
416     Pair<Long, Long> fileInfo = getFileInfo(mobFilesToCompact);
417     // open writers for the mob files and new ref store files.
418     Writer writer = null;
419     Writer refFileWriter = null;
420     Path filePath = null;
421     long mobCells = 0;
422     boolean cleanupTmpMobFile = false;
423     boolean cleanupBulkloadDirOfPartition = false;
424     boolean cleanupCommittedMobFile = false;
425     boolean closeReaders= true;
426 
427     try {
428       try {
429         writer = MobUtils
430             .createWriter(conf, fs, column, partition.getPartitionId().getDate(), tempPath,
431                 Long.MAX_VALUE, column.getCompactionCompressionType(),
432                 partition.getPartitionId().getStartKey(), compactionCacheConfig, cryptoContext);
433         cleanupTmpMobFile = true;
434         filePath = writer.getPath();
435         byte[] fileName = Bytes.toBytes(filePath.getName());
436         // create a temp file and open a writer for it in the bulkloadPath
437         refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath,
438             fileInfo.getSecond().longValue(), compactionCacheConfig, cryptoContext);
439         cleanupBulkloadDirOfPartition = true;
440         List<Cell> cells = new ArrayList<>();
441         boolean hasMore;
442         ScannerContext scannerContext =
443             ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
444         do {
445           hasMore = scanner.next(cells, scannerContext);
446           for (Cell cell : cells) {
447             // write the mob cell to the mob file.
448             writer.append(cell);
449             // write the new reference cell to the store file.
450             Cell reference = MobUtils.createMobRefCell(cell, fileName, this.refCellTags);
451             refFileWriter.append(reference);
452             mobCells++;
453           }
454           cells.clear();
455         } while (hasMore);
456       } finally {
457         // close the scanner.
458         scanner.close();
459 
460         if (cleanupTmpMobFile) {
461           // append metadata to the mob file, and close the mob file writer.
462           closeMobFileWriter(writer, fileInfo.getFirst(), mobCells);
463         }
464 
465         if (cleanupBulkloadDirOfPartition) {
466           // append metadata and bulkload info to the ref mob file, and close the writer.
467           closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime);
468         }
469       }
470 
471       if (mobCells > 0) {
472         // commit mob file
473         MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
474         cleanupTmpMobFile = false;
475         cleanupCommittedMobFile = true;
476         // bulkload the ref file
477         bulkloadRefFile(table, bulkloadPathOfPartition, filePath.getName());
478         cleanupCommittedMobFile = false;
479         newFiles.add(new Path(mobFamilyDir, filePath.getName()));
480       }
481 
482       // archive the old mob files, do not archive the del files.
483       try {
484         closeStoreFileReaders(mobFilesToCompact);
485         closeReaders = false;
486         MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact);
487       } catch (IOException e) {
488         LOG.error("Failed to archive the files " + mobFilesToCompact, e);
489       }
490     } finally {
491       if (closeReaders) {
492         closeStoreFileReaders(mobFilesToCompact);
493       }
494 
495       if (cleanupTmpMobFile) {
496         deletePath(filePath);
497       }
498 
499       if (cleanupBulkloadDirOfPartition) {
500         // delete the bulkload files in bulkloadPath
501         deletePath(bulkloadPathOfPartition);
502       }
503 
504       if (cleanupCommittedMobFile) {
505         deletePath(new Path(mobFamilyDir, filePath.getName()));
506       }              
507     }
508   }
509 
510   /**
511    * Compacts the del files in batches which avoids opening too many files.
512    * @param request The compaction request.
513    * @param delFilePaths
514    * @return The paths of new del files after merging or the original files if no merging
515    *         is necessary.
516    * @throws IOException
517    */
518   protected List<Path> compactDelFiles(PartitionedMobCompactionRequest request,
519     List<Path> delFilePaths) throws IOException {
520     if (delFilePaths.size() <= delFileMaxCount) {
521       return delFilePaths;
522     }
523     // when there are more del files than the number that is allowed, merge it firstly.
524     int offset = 0;
525     List<Path> paths = new ArrayList<Path>();
526     while (offset < delFilePaths.size()) {
527       // get the batch
528       int batch = compactionBatchSize;
529       if (delFilePaths.size() - offset < compactionBatchSize) {
530         batch = delFilePaths.size() - offset;
531       }
532       List<StoreFile> batchedDelFiles = new ArrayList<StoreFile>();
533       if (batch == 1) {
534         // only one file left, do not compact it, directly add it to the new files.
535         paths.add(delFilePaths.get(offset));
536         offset++;
537         continue;
538       }
539       for (int i = offset; i < batch + offset; i++) {
540         batchedDelFiles.add(new StoreFile(fs, delFilePaths.get(i), conf, compactionCacheConfig,
541           BloomType.NONE));
542       }
543       // compact the del files in a batch.
544       paths.add(compactDelFilesInBatch(request, batchedDelFiles));
545       // move to the next batch.
546       offset += batch;
547     }
548     return compactDelFiles(request, paths);
549   }
550 
551   /**
552    * Compacts the del file in a batch.
553    * @param request The compaction request.
554    * @param delFiles The del files.
555    * @return The path of new del file after merging.
556    * @throws IOException
557    */
558   private Path compactDelFilesInBatch(PartitionedMobCompactionRequest request,
559     List<StoreFile> delFiles) throws IOException {
560     // create a scanner for the del files.
561     StoreScanner scanner = createScanner(delFiles, ScanType.COMPACT_RETAIN_DELETES);
562     Writer writer = null;
563     Path filePath = null;
564     try {
565       writer = MobUtils.createDelFileWriter(conf, fs, column,
566         MobUtils.formatDate(new Date(request.selectionTime)), tempPath, Long.MAX_VALUE,
567         column.getCompactionCompression(), HConstants.EMPTY_START_ROW, compactionCacheConfig,
568         cryptoContext);
569       filePath = writer.getPath();
570       List<Cell> cells = new ArrayList<Cell>();
571       boolean hasMore = false;
572       ScannerContext scannerContext =
573               ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
574       do {
575         hasMore = scanner.next(cells, scannerContext);
576         for (Cell cell : cells) {
577           writer.append(cell);
578         }
579         cells.clear();
580       } while (hasMore);
581     } finally {
582       scanner.close();
583       if (writer != null) {
584         try {
585           writer.close();
586         } catch (IOException e) {
587           LOG.error("Failed to close the writer of the file " + filePath, e);
588         }
589       }
590     }
591     // commit the new del file
592     Path path = MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
593     // archive the old del files
594     try {
595       MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), delFiles);
596     } catch (IOException e) {
597       LOG.error("Failed to archive the old del files " + delFiles, e);
598     }
599     return path;
600   }
601 
602   /**
603    * Creates a store scanner.
604    * @param filesToCompact The files to be compacted.
605    * @param scanType The scan type.
606    * @return The store scanner.
607    * @throws IOException
608    */
609   private StoreScanner createScanner(List<StoreFile> filesToCompact, ScanType scanType)
610     throws IOException {
611     List scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, true, false,
612       null, HConstants.LATEST_TIMESTAMP);
613     Scan scan = new Scan();
614     scan.setMaxVersions(column.getMaxVersions());
615     long ttl = HStore.determineTTLFromFamily(column);
616     ScanInfo scanInfo = new ScanInfo(column, ttl, 0, KeyValue.COMPARATOR);
617     StoreScanner scanner = new StoreScanner(scan, scanInfo, scanType, null, scanners, 0L,
618       HConstants.LATEST_TIMESTAMP);
619     return scanner;
620   }
621 
622   /**
623    * Bulkloads the current file.
624    * @param table The current table.
625    * @param bulkloadDirectory The path of bulkload directory.
626    * @param fileName The current file name.
627    * @throws IOException
628    */
629   private void bulkloadRefFile(Table table, Path bulkloadDirectory, String fileName)
630     throws IOException {
631     // bulkload the ref file
632     try {
633       LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
634       bulkload.doBulkLoad(bulkloadDirectory, (HTable)table);
635     } catch (Exception e) {
636       throw new IOException(e);
637     }
638   }
639 
640   /**
641    * Closes the mob file writer.
642    * @param writer The mob file writer.
643    * @param maxSeqId Maximum sequence id.
644    * @param mobCellsCount The number of mob cells.
645    * @throws IOException
646    */
647   private void closeMobFileWriter(Writer writer, long maxSeqId, long mobCellsCount)
648     throws IOException {
649     if (writer != null) {
650       writer.appendMetadata(maxSeqId, false, mobCellsCount);
651       try {
652         writer.close();
653       } catch (IOException e) {
654         LOG.error("Failed to close the writer of the file " + writer.getPath(), e);
655       }
656     }
657   }
658 
659   /**
660    * Closes the ref file writer.
661    * @param writer The ref file writer.
662    * @param maxSeqId Maximum sequence id.
663    * @param bulkloadTime The timestamp at which the bulk load file is created.
664    * @throws IOException
665    */
666   private void closeRefFileWriter(Writer writer, long maxSeqId, long bulkloadTime)
667     throws IOException {
668     if (writer != null) {
669       writer.appendMetadata(maxSeqId, false);
670       writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(bulkloadTime));
671       writer.appendFileInfo(StoreFile.SKIP_RESET_SEQ_ID, Bytes.toBytes(true));
672       try {
673         writer.close();
674       } catch (IOException e) {
675         LOG.error("Failed to close the writer of the ref file " + writer.getPath(), e);
676       }
677     }
678   }
679 
680   /**
681    * Gets the max seqId and number of cells of the store files.
682    * @param storeFiles The store files.
683    * @return The pair of the max seqId and number of cells of the store files.
684    * @throws IOException
685    */
686   private Pair<Long, Long> getFileInfo(List<StoreFile> storeFiles) throws IOException {
687     long maxSeqId = 0;
688     long maxKeyCount = 0;
689     for (StoreFile sf : storeFiles) {
690       // the readers will be closed later after the merge.
691       maxSeqId = Math.max(maxSeqId, sf.getMaxSequenceId());
692       byte[] count = sf.createReader().loadFileInfo().get(StoreFile.MOB_CELLS_COUNT);
693       if (count != null) {
694         maxKeyCount += Bytes.toLong(count);
695       }
696     }
697     return new Pair<Long, Long>(Long.valueOf(maxSeqId), Long.valueOf(maxKeyCount));
698   }
699 
700   /**
701    * Deletes a file.
702    * @param path The path of the file to be deleted.
703    */
704   private void deletePath(Path path) {
705     try {
706       if (path != null) {
707         fs.delete(path, true);
708       }
709     } catch (IOException e) {
710       LOG.error("Failed to delete the file " + path, e);
711     }
712   }
713 
714   private FileStatus getLinkedFileStatus(HFileLink link) throws IOException {
715     Path[] locations = link.getLocations();
716     for (Path location : locations) {
717       FileStatus file = getFileStatus(location);
718       if (file != null) {
719         return file;
720       }
721     }
722     return null;
723   }
724 
725   private FileStatus getFileStatus(Path path) throws IOException {
726     try {
727       if (path != null) {
728         FileStatus file = fs.getFileStatus(path);
729         return file;
730       }
731     } catch (FileNotFoundException e) {
732       LOG.warn("The file " + path + " can not be found", e);
733     }
734     return null;
735   }
736 }