View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mob;
20  
21  import java.io.FileNotFoundException;
22  import java.io.IOException;
23  import java.security.Key;
24  import java.security.KeyException;
25  import java.text.ParseException;
26  import java.text.SimpleDateFormat;
27  import java.util.ArrayList;
28  import java.util.Collection;
29  import java.util.Date;
30  import java.util.List;
31  import java.util.UUID;
32  import java.util.concurrent.ExecutorService;
33  import java.util.concurrent.RejectedExecutionException;
34  import java.util.concurrent.RejectedExecutionHandler;
35  import java.util.concurrent.SynchronousQueue;
36  import java.util.concurrent.ThreadPoolExecutor;
37  import java.util.concurrent.TimeUnit;
38  
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  import org.apache.hadoop.hbase.classification.InterfaceAudience;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.fs.FileStatus;
44  import org.apache.hadoop.fs.FileSystem;
45  import org.apache.hadoop.fs.Path;
46  import org.apache.hadoop.hbase.Cell;
47  import org.apache.hadoop.hbase.CellComparator;
48  import org.apache.hadoop.hbase.HBaseConfiguration;
49  import org.apache.hadoop.hbase.HColumnDescriptor;
50  import org.apache.hadoop.hbase.HConstants;
51  import org.apache.hadoop.hbase.HRegionInfo;
52  import org.apache.hadoop.hbase.HTableDescriptor;
53  import org.apache.hadoop.hbase.KeyValue;
54  import org.apache.hadoop.hbase.TableName;
55  import org.apache.hadoop.hbase.Tag;
56  import org.apache.hadoop.hbase.TagType;
57  import org.apache.hadoop.hbase.backup.HFileArchiver;
58  import org.apache.hadoop.hbase.client.Scan;
59  import org.apache.hadoop.hbase.io.HFileLink;
60  import org.apache.hadoop.hbase.io.compress.Compression;
61  import org.apache.hadoop.hbase.io.crypto.Cipher;
62  import org.apache.hadoop.hbase.io.crypto.Encryption;
63  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
64  import org.apache.hadoop.hbase.io.hfile.HFileContext;
65  import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
66  import org.apache.hadoop.hbase.master.TableLockManager;
67  import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
68  import org.apache.hadoop.hbase.mob.compactions.MobCompactor;
69  import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
70  import org.apache.hadoop.hbase.regionserver.BloomType;
71  import org.apache.hadoop.hbase.regionserver.HStore;
72  import org.apache.hadoop.hbase.regionserver.StoreFile;
73  import org.apache.hadoop.hbase.security.EncryptionUtil;
74  import org.apache.hadoop.hbase.security.User;
75  import org.apache.hadoop.hbase.util.Bytes;
76  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
77  import org.apache.hadoop.hbase.util.FSUtils;
78  import org.apache.hadoop.hbase.util.ReflectionUtils;
79  import org.apache.hadoop.hbase.util.Threads;
80  
81  /**
82   * The mob utilities
83   */
84  @InterfaceAudience.Private
85  public class MobUtils {
86  
87    private static final Log LOG = LogFactory.getLog(MobUtils.class);
88  
89    private static final ThreadLocal<SimpleDateFormat> LOCAL_FORMAT =
90        new ThreadLocal<SimpleDateFormat>() {
91      @Override
92      protected SimpleDateFormat initialValue() {
93        return new SimpleDateFormat("yyyyMMdd");
94      }
95    };
96  
97    /**
98     * Formats a date to a string.
99     * @param date The date.
100    * @return The string format of the date, it's yyyymmdd.
101    */
102   public static String formatDate(Date date) {
103     return LOCAL_FORMAT.get().format(date);
104   }
105 
106   /**
107    * Parses the string to a date.
108    * @param dateString The string format of a date, it's yyyymmdd.
109    * @return A date.
110    * @throws ParseException
111    */
112   public static Date parseDate(String dateString) throws ParseException {
113     return LOCAL_FORMAT.get().parse(dateString);
114   }
115 
116   /**
117    * Whether the current cell is a mob reference cell.
118    * @param cell The current cell.
119    * @return True if the cell has a mob reference tag, false if it doesn't.
120    */
121   public static boolean isMobReferenceCell(Cell cell) {
122     if (cell.getTagsLength() > 0) {
123       Tag tag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(),
124           TagType.MOB_REFERENCE_TAG_TYPE);
125       return tag != null;
126     }
127     return false;
128   }
129 
130   /**
131    * Gets the table name tag.
132    * @param cell The current cell.
133    * @return The table name tag.
134    */
135   public static Tag getTableNameTag(Cell cell) {
136     if (cell.getTagsLength() > 0) {
137       Tag tag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(),
138           TagType.MOB_TABLE_NAME_TAG_TYPE);
139       return tag;
140     }
141     return null;
142   }
143 
144   /**
145    * Whether the tag list has a mob reference tag.
146    * @param tags The tag list.
147    * @return True if the list has a mob reference tag, false if it doesn't.
148    */
149   public static boolean hasMobReferenceTag(List<Tag> tags) {
150     if (!tags.isEmpty()) {
151       for (Tag tag : tags) {
152         if (tag.getType() == TagType.MOB_REFERENCE_TAG_TYPE) {
153           return true;
154         }
155       }
156     }
157     return false;
158   }
159 
160   /**
161    * Indicates whether it's a raw scan.
162    * The information is set in the attribute "hbase.mob.scan.raw" of scan.
163    * For a mob cell, in a normal scan the scanners retrieves the mob cell from the mob file.
164    * In a raw scan, the scanner directly returns cell in HBase without retrieve the one in
165    * the mob file.
166    * @param scan The current scan.
167    * @return True if it's a raw scan.
168    */
169   public static boolean isRawMobScan(Scan scan) {
170     byte[] raw = scan.getAttribute(MobConstants.MOB_SCAN_RAW);
171     try {
172       return raw != null && Bytes.toBoolean(raw);
173     } catch (IllegalArgumentException e) {
174       return false;
175     }
176   }
177 
178   /**
179    * Indicates whether it's a reference only scan.
180    * The information is set in the attribute "hbase.mob.scan.ref.only" of scan.
181    * If it's a ref only scan, only the cells with ref tag are returned.
182    * @param scan The current scan.
183    * @return True if it's a ref only scan.
184    */
185   public static boolean isRefOnlyScan(Scan scan) {
186     byte[] refOnly = scan.getAttribute(MobConstants.MOB_SCAN_REF_ONLY);
187     try {
188       return refOnly != null && Bytes.toBoolean(refOnly);
189     } catch (IllegalArgumentException e) {
190       return false;
191     }
192   }
193 
194   /**
195    * Indicates whether the scan contains the information of caching blocks.
196    * The information is set in the attribute "hbase.mob.cache.blocks" of scan.
197    * @param scan The current scan.
198    * @return True when the Scan attribute specifies to cache the MOB blocks.
199    */
200   public static boolean isCacheMobBlocks(Scan scan) {
201     byte[] cache = scan.getAttribute(MobConstants.MOB_CACHE_BLOCKS);
202     try {
203       return cache != null && Bytes.toBoolean(cache);
204     } catch (IllegalArgumentException e) {
205       return false;
206     }
207   }
208 
209   /**
210    * Sets the attribute of caching blocks in the scan.
211    *
212    * @param scan
213    *          The current scan.
214    * @param cacheBlocks
215    *          True, set the attribute of caching blocks into the scan, the scanner with this scan
216    *          caches blocks.
217    *          False, the scanner doesn't cache blocks for this scan.
218    */
219   public static void setCacheMobBlocks(Scan scan, boolean cacheBlocks) {
220     scan.setAttribute(MobConstants.MOB_CACHE_BLOCKS, Bytes.toBytes(cacheBlocks));
221   }
222 
223   /**
224    * Cleans the expired mob files.
225    * Cleans the files whose creation date is older than (current - columnFamily.ttl), and
226    * the minVersions of that column family is 0.
227    * @param fs The current file system.
228    * @param conf The current configuration.
229    * @param tableName The current table name.
230    * @param columnDescriptor The descriptor of the current column family.
231    * @param cacheConfig The cacheConfig that disables the block cache.
232    * @param current The current time.
233    * @throws IOException
234    */
235   public static void cleanExpiredMobFiles(FileSystem fs, Configuration conf, TableName tableName,
236       HColumnDescriptor columnDescriptor, CacheConfig cacheConfig, long current)
237       throws IOException {
238     long timeToLive = columnDescriptor.getTimeToLive();
239     if (Integer.MAX_VALUE == timeToLive) {
240       // no need to clean, because the TTL is not set.
241       return;
242     }
243 
244     Date expireDate = new Date(current - timeToLive * 1000);
245     expireDate = new Date(expireDate.getYear(), expireDate.getMonth(), expireDate.getDate());
246     LOG.info("MOB HFiles older than " + expireDate.toGMTString() + " will be deleted!");
247 
248     FileStatus[] stats = null;
249     Path mobTableDir = FSUtils.getTableDir(getMobHome(conf), tableName);
250     Path path = getMobFamilyPath(conf, tableName, columnDescriptor.getNameAsString());
251     try {
252       stats = fs.listStatus(path);
253     } catch (FileNotFoundException e) {
254       LOG.warn("Failed to find the mob file " + path, e);
255     }
256     if (null == stats) {
257       // no file found
258       return;
259     }
260     List<StoreFile> filesToClean = new ArrayList<StoreFile>();
261     int deletedFileCount = 0;
262     for (FileStatus file : stats) {
263       String fileName = file.getPath().getName();
264       try {
265         MobFileName mobFileName = null;
266         if (!HFileLink.isHFileLink(file.getPath())) {
267           mobFileName = MobFileName.create(fileName);
268         } else {
269           HFileLink hfileLink = HFileLink.buildFromHFileLinkPattern(conf, file.getPath());
270           mobFileName = MobFileName.create(hfileLink.getOriginPath().getName());
271         }
272         Date fileDate = parseDate(mobFileName.getDate());
273         if (LOG.isDebugEnabled()) {
274           LOG.debug("Checking file " + fileName);
275         }
276         if (fileDate.getTime() < expireDate.getTime()) {
277           if (LOG.isDebugEnabled()) {
278             LOG.debug(fileName + " is an expired file");
279           }
280           filesToClean.add(new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE));
281         }
282       } catch (Exception e) {
283         LOG.error("Cannot parse the fileName " + fileName, e);
284       }
285     }
286     if (!filesToClean.isEmpty()) {
287       try {
288         removeMobFiles(conf, fs, tableName, mobTableDir, columnDescriptor.getName(),
289             filesToClean);
290         deletedFileCount = filesToClean.size();
291       } catch (IOException e) {
292         LOG.error("Failed to delete the mob files " + filesToClean, e);
293       }
294     }
295     LOG.info(deletedFileCount + " expired mob files are deleted");
296   }
297 
298   /**
299    * Gets the root dir of the mob files.
300    * It's {HBASE_DIR}/mobdir.
301    * @param conf The current configuration.
302    * @return the root dir of the mob file.
303    */
304   public static Path getMobHome(Configuration conf) {
305     Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
306     return new Path(hbaseDir, MobConstants.MOB_DIR_NAME);
307   }
308 
309   /**
310    * Gets the qualified root dir of the mob files.
311    * @param conf The current configuration.
312    * @return The qualified root dir.
313    * @throws IOException
314    */
315   public static Path getQualifiedMobRootDir(Configuration conf) throws IOException {
316     Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
317     Path mobRootDir = new Path(hbaseDir, MobConstants.MOB_DIR_NAME);
318     FileSystem fs = mobRootDir.getFileSystem(conf);
319     return mobRootDir.makeQualified(fs);
320   }
321 
322   /**
323    * Gets the region dir of the mob files.
324    * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}.
325    * @param conf The current configuration.
326    * @param tableName The current table name.
327    * @return The region dir of the mob files.
328    */
329   public static Path getMobRegionPath(Configuration conf, TableName tableName) {
330     Path tablePath = FSUtils.getTableDir(getMobHome(conf), tableName);
331     HRegionInfo regionInfo = getMobRegionInfo(tableName);
332     return new Path(tablePath, regionInfo.getEncodedName());
333   }
334 
335   /**
336    * Gets the family dir of the mob files.
337    * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}.
338    * @param conf The current configuration.
339    * @param tableName The current table name.
340    * @param familyName The current family name.
341    * @return The family dir of the mob files.
342    */
343   public static Path getMobFamilyPath(Configuration conf, TableName tableName, String familyName) {
344     return new Path(getMobRegionPath(conf, tableName), familyName);
345   }
346 
347   /**
348    * Gets the family dir of the mob files.
349    * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}.
350    * @param regionPath The path of mob region which is a dummy one.
351    * @param familyName The current family name.
352    * @return The family dir of the mob files.
353    */
354   public static Path getMobFamilyPath(Path regionPath, String familyName) {
355     return new Path(regionPath, familyName);
356   }
357 
358   /**
359    * Gets the HRegionInfo of the mob files.
360    * This is a dummy region. The mob files are not saved in a region in HBase.
361    * This is only used in mob snapshot. It's internally used only.
362    * @param tableName
363    * @return A dummy mob region info.
364    */
365   public static HRegionInfo getMobRegionInfo(TableName tableName) {
366     HRegionInfo info = new HRegionInfo(tableName, MobConstants.MOB_REGION_NAME_BYTES,
367         HConstants.EMPTY_END_ROW, false, 0);
368     return info;
369   }
370 
371   /**
372    * Gets whether the current HRegionInfo is a mob one.
373    * @param regionInfo The current HRegionInfo.
374    * @return If true, the current HRegionInfo is a mob one.
375    */
376   public static boolean isMobRegionInfo(HRegionInfo regionInfo) {
377     return regionInfo == null ? false : getMobRegionInfo(regionInfo.getTable()).getEncodedName()
378         .equals(regionInfo.getEncodedName());
379   }
380 
381   /**
382    * Gets whether the current region name follows the pattern of a mob region name.
383    * @param tableName The current table name.
384    * @param regionName The current region name.
385    * @return True if the current region name follows the pattern of a mob region name.
386    */
387   public static boolean isMobRegionName(TableName tableName, byte[] regionName) {
388     return Bytes.equals(regionName, getMobRegionInfo(tableName).getRegionName());
389   }
390 
391   /**
392    * Gets the working directory of the mob compaction.
393    * @param root The root directory of the mob compaction.
394    * @param jobName The current job name.
395    * @return The directory of the mob compaction for the current job.
396    */
397   public static Path getCompactionWorkingPath(Path root, String jobName) {
398     return new Path(root, jobName);
399   }
400 
401   /**
402    * Archives the mob files.
403    * @param conf The current configuration.
404    * @param fs The current file system.
405    * @param tableName The table name.
406    * @param tableDir The table directory.
407    * @param family The name of the column family.
408    * @param storeFiles The files to be deleted.
409    * @throws IOException
410    */
411   public static void removeMobFiles(Configuration conf, FileSystem fs, TableName tableName,
412       Path tableDir, byte[] family, Collection<StoreFile> storeFiles) throws IOException {
413     HFileArchiver.archiveStoreFiles(conf, fs, getMobRegionInfo(tableName), tableDir, family,
414         storeFiles);
415   }
416 
417   /**
418    * Creates a mob reference KeyValue.
419    * The value of the mob reference KeyValue is mobCellValueSize + mobFileName.
420    * @param cell The original Cell.
421    * @param fileName The mob file name where the mob reference KeyValue is written.
422    * @param tableNameTag The tag of the current table name. It's very important in
423    *                        cloning the snapshot.
424    * @return The mob reference KeyValue.
425    */
426   public static Cell createMobRefCell(Cell cell, byte[] fileName, Tag tableNameTag) {
427     // Append the tags to the KeyValue.
428     // The key is same, the value is the filename of the mob file
429     List<Tag> tags = new ArrayList<Tag>();
430     // Add the ref tag as the 1st one.
431     tags.add(MobConstants.MOB_REF_TAG);
432     // Add the tag of the source table name, this table is where this mob file is flushed
433     // from.
434     // It's very useful in cloning the snapshot. When reading from the cloning table, we need to
435     // find the original mob files by this table name. For details please see cloning
436     // snapshot for mob files.
437     tags.add(tableNameTag);
438     return createMobRefCell(cell, fileName, tags);
439   }
440 
441   public static Cell createMobRefCell(Cell cell, byte[] fileName, List<Tag> refCellTags) {
442     byte[] refValue = Bytes.add(Bytes.toBytes(cell.getValueLength()), fileName);
443     List<Tag> tags = Tag.carryForwardTags(refCellTags, cell);
444     KeyValue reference = new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
445         cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
446         cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
447         cell.getTimestamp(), KeyValue.Type.Put, refValue, 0, refValue.length, tags);
448     reference.setSequenceId(cell.getSequenceId());
449     return reference;
450   }
451 
452   /**
453    * Creates a writer for the mob file in temp directory.
454    * @param conf The current configuration.
455    * @param fs The current file system.
456    * @param family The descriptor of the current column family.
457    * @param date The date string, its format is yyyymmmdd.
458    * @param basePath The basic path for a temp directory.
459    * @param maxKeyCount The key count.
460    * @param compression The compression algorithm.
461    * @param startKey The hex string of the start key.
462    * @param cacheConfig The current cache config.
463    * @param cryptoContext The encryption context.
464    * @return The writer for the mob file.
465    * @throws IOException
466    */
467   public static StoreFile.Writer createWriter(Configuration conf, FileSystem fs,
468       HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
469       Compression.Algorithm compression, String startKey, CacheConfig cacheConfig,
470       Encryption.Context cryptoContext)
471       throws IOException {
472     MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString()
473         .replaceAll("-", ""));
474     return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
475       cacheConfig, cryptoContext);
476   }
477 
478   /**
479    * Creates a writer for the ref file in temp directory.
480    * @param conf The current configuration.
481    * @param fs The current file system.
482    * @param family The descriptor of the current column family.
483    * @param basePath The basic path for a temp directory.
484    * @param maxKeyCount The key count.
485    * @param cacheConfig The current cache config.
486    * @param cryptoContext The encryption context.
487    * @return The writer for the mob file.
488    * @throws IOException
489    */
490   public static StoreFile.Writer createRefFileWriter(Configuration conf, FileSystem fs,
491     HColumnDescriptor family, Path basePath, long maxKeyCount, CacheConfig cacheConfig,
492     Encryption.Context cryptoContext)
493     throws IOException {
494     HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(true)
495       .withIncludesTags(true).withCompression(family.getCompactionCompression())
496       .withCompressTags(family.isCompressTags()).withChecksumType(HStore.getChecksumType(conf))
497       .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(family.getBlocksize())
498       .withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding())
499       .withEncryptionContext(cryptoContext).withCreateTime(EnvironmentEdgeManager.currentTime())
500       .build();
501     Path tempPath = new Path(basePath, UUID.randomUUID().toString().replaceAll("-", ""));
502     StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConfig, fs).withFilePath(tempPath)
503       .withComparator(KeyValue.COMPARATOR).withBloomType(family.getBloomFilterType())
504       .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
505     return w;
506   }
507 
508   /**
509    * Creates a writer for the mob file in temp directory.
510    * @param conf The current configuration.
511    * @param fs The current file system.
512    * @param family The descriptor of the current column family.
513    * @param date The date string, its format is yyyymmmdd.
514    * @param basePath The basic path for a temp directory.
515    * @param maxKeyCount The key count.
516    * @param compression The compression algorithm.
517    * @param startKey The start key.
518    * @param cacheConfig The current cache config.
519    * @param cryptoContext The encryption context.
520    * @return The writer for the mob file.
521    * @throws IOException
522    */
523   public static StoreFile.Writer createWriter(Configuration conf, FileSystem fs,
524       HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
525       Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig,
526       Encryption.Context cryptoContext)
527       throws IOException {
528     MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString()
529         .replaceAll("-", ""));
530     return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
531       cacheConfig, cryptoContext);
532   }
533 
534   /**
535    * Creates a writer for the del file in temp directory.
536    * @param conf The current configuration.
537    * @param fs The current file system.
538    * @param family The descriptor of the current column family.
539    * @param date The date string, its format is yyyymmmdd.
540    * @param basePath The basic path for a temp directory.
541    * @param maxKeyCount The key count.
542    * @param compression The compression algorithm.
543    * @param startKey The start key.
544    * @param cacheConfig The current cache config.
545    * @param cryptoContext The encryption context.
546    * @return The writer for the del file.
547    * @throws IOException
548    */
549   public static StoreFile.Writer createDelFileWriter(Configuration conf, FileSystem fs,
550       HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
551       Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig,
552       Encryption.Context cryptoContext)
553       throws IOException {
554     String suffix = UUID
555       .randomUUID().toString().replaceAll("-", "") + "_del";
556     MobFileName mobFileName = MobFileName.create(startKey, date, suffix);
557     return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
558       cacheConfig, cryptoContext);
559   }
560 
561   /**
562    * Creates a writer for the mob file in temp directory.
563    * @param conf The current configuration.
564    * @param fs The current file system.
565    * @param family The descriptor of the current column family.
566    * @param mobFileName The mob file name.
567    * @param basePath The basic path for a temp directory.
568    * @param maxKeyCount The key count.
569    * @param compression The compression algorithm.
570    * @param cacheConfig The current cache config.
571    * @param cryptoContext The encryption context.
572    * @return The writer for the mob file.
573    * @throws IOException
574    */
575   private static StoreFile.Writer createWriter(Configuration conf, FileSystem fs,
576     HColumnDescriptor family, MobFileName mobFileName, Path basePath, long maxKeyCount,
577     Compression.Algorithm compression, CacheConfig cacheConfig, Encryption.Context cryptoContext)
578     throws IOException {
579     HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
580       .withIncludesMvcc(true).withIncludesTags(true)
581       .withCompressTags(family.isCompressTags())
582       .withChecksumType(HStore.getChecksumType(conf))
583       .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(family.getBlocksize())
584       .withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding())
585       .withEncryptionContext(cryptoContext)
586       .withCreateTime(EnvironmentEdgeManager.currentTime()).build();
587 
588     StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConfig, fs)
589       .withFilePath(new Path(basePath, mobFileName.getFileName()))
590       .withComparator(KeyValue.COMPARATOR).withBloomType(BloomType.NONE)
591       .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
592     return w;
593   }
594 
595   /**
596    * Commits the mob file.
597    * @param conf The current configuration.
598    * @param fs The current file system.
599    * @param sourceFile The path where the mob file is saved.
600    * @param targetPath The directory path where the source file is renamed to.
601    * @param cacheConfig The current cache config.
602    * @return The target file path the source file is renamed to.
603    * @throws IOException
604    */
605   public static Path commitFile(Configuration conf, FileSystem fs, final Path sourceFile,
606       Path targetPath, CacheConfig cacheConfig) throws IOException {
607     if (sourceFile == null) {
608       return null;
609     }
610     Path dstPath = new Path(targetPath, sourceFile.getName());
611     validateMobFile(conf, fs, sourceFile, cacheConfig);
612     String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath;
613     LOG.info(msg);
614     Path parent = dstPath.getParent();
615     if (!fs.exists(parent)) {
616       fs.mkdirs(parent);
617     }
618     if (!fs.rename(sourceFile, dstPath)) {
619       throw new IOException("Failed rename of " + sourceFile + " to " + dstPath);
620     }
621     return dstPath;
622   }
623 
624   /**
625    * Validates a mob file by opening and closing it.
626    * @param conf The current configuration.
627    * @param fs The current file system.
628    * @param path The path where the mob file is saved.
629    * @param cacheConfig The current cache config.
630    */
631   private static void validateMobFile(Configuration conf, FileSystem fs, Path path,
632       CacheConfig cacheConfig) throws IOException {
633     StoreFile storeFile = null;
634     try {
635       storeFile = new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE);
636       storeFile.createReader();
637     } catch (IOException e) {
638       LOG.error("Failed to open mob file[" + path + "], keep it in temp directory.", e);
639       throw e;
640     } finally {
641       if (storeFile != null) {
642         storeFile.closeReader(false);
643       }
644     }
645   }
646 
647   /**
648    * Indicates whether the current mob ref cell has a valid value.
649    * A mob ref cell has a mob reference tag.
650    * The value of a mob ref cell consists of two parts, real mob value length and mob file name.
651    * The real mob value length takes 4 bytes.
652    * The remaining part is the mob file name.
653    * @param cell The mob ref cell.
654    * @return True if the cell has a valid value.
655    */
656   public static boolean hasValidMobRefCellValue(Cell cell) {
657     return cell.getValueLength() > Bytes.SIZEOF_INT;
658   }
659 
660   /**
661    * Gets the mob value length from the mob ref cell.
662    * A mob ref cell has a mob reference tag.
663    * The value of a mob ref cell consists of two parts, real mob value length and mob file name.
664    * The real mob value length takes 4 bytes.
665    * The remaining part is the mob file name.
666    * @param cell The mob ref cell.
667    * @return The real mob value length.
668    */
669   public static int getMobValueLength(Cell cell) {
670     return Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), Bytes.SIZEOF_INT);
671   }
672 
673   /**
674    * Gets the mob file name from the mob ref cell.
675    * A mob ref cell has a mob reference tag.
676    * The value of a mob ref cell consists of two parts, real mob value length and mob file name.
677    * The real mob value length takes 4 bytes.
678    * The remaining part is the mob file name.
679    * @param cell The mob ref cell.
680    * @return The mob file name.
681    */
682   public static String getMobFileName(Cell cell) {
683     return Bytes.toString(cell.getValueArray(), cell.getValueOffset() + Bytes.SIZEOF_INT,
684         cell.getValueLength() - Bytes.SIZEOF_INT);
685   }
686 
687   /**
688    * Gets the table name used in the table lock.
689    * The table lock name is a dummy one, it's not a table name. It's tableName + ".mobLock".
690    * @param tn The table name.
691    * @return The table name used in table lock.
692    */
693   public static TableName getTableLockName(TableName tn) {
694     byte[] tableName = tn.getName();
695     return TableName.valueOf(Bytes.add(tableName, MobConstants.MOB_TABLE_LOCK_SUFFIX));
696   }
697 
698   /**
699    * Performs the mob compaction.
700    * @param conf the Configuration
701    * @param fs the file system
702    * @param tableName the table the compact
703    * @param hcd the column descriptor
704    * @param pool the thread pool
705    * @param tableLockManager the tableLock manager
706    * @param allFiles Whether add all mob files into the compaction.
707    */
708   public static void doMobCompaction(Configuration conf, FileSystem fs, TableName tableName,
709     HColumnDescriptor hcd, ExecutorService pool, TableLockManager tableLockManager,
710     boolean allFiles) throws IOException {
711     String className = conf.get(MobConstants.MOB_COMPACTOR_CLASS_KEY,
712       PartitionedMobCompactor.class.getName());
713     // instantiate the mob compactor.
714     MobCompactor compactor = null;
715     try {
716       compactor = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
717         Configuration.class, FileSystem.class, TableName.class, HColumnDescriptor.class,
718         ExecutorService.class }, new Object[] { conf, fs, tableName, hcd, pool });
719     } catch (Exception e) {
720       throw new IOException("Unable to load configured mob file compactor '" + className + "'", e);
721     }
722     // compact only for mob-enabled column.
723     // obtain a write table lock before performing compaction to avoid race condition
724     // with major compaction in mob-enabled column.
725     boolean tableLocked = false;
726     TableLock lock = null;
727     try {
728       // the tableLockManager might be null in testing. In that case, it is lock-free.
729       if (tableLockManager != null) {
730         lock = tableLockManager.writeLock(MobUtils.getTableLockName(tableName),
731           "Run MobCompactor");
732         lock.acquire();
733       }
734       tableLocked = true;
735       compactor.compact(allFiles);
736     } catch (Exception e) {
737       LOG.error("Failed to compact the mob files for the column " + hcd.getNameAsString()
738         + " in the table " + tableName.getNameAsString(), e);
739     } finally {
740       if (lock != null && tableLocked) {
741         try {
742           lock.release();
743         } catch (IOException e) {
744           LOG.error(
745             "Failed to release the write lock for the table " + tableName.getNameAsString(), e);
746         }
747       }
748     }
749   }
750 
751   /**
752    * Creates a thread pool.
753    * @param conf the Configuration
754    * @return A thread pool.
755    */
756   public static ExecutorService createMobCompactorThreadPool(Configuration conf) {
757     int maxThreads = conf.getInt(MobConstants.MOB_COMPACTION_THREADS_MAX,
758       MobConstants.DEFAULT_MOB_COMPACTION_THREADS_MAX);
759     if (maxThreads == 0) {
760       maxThreads = 1;
761     }
762     final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
763     ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS, queue,
764       Threads.newDaemonThreadFactory("MobCompactor"), new RejectedExecutionHandler() {
765         @Override
766         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
767           try {
768             // waiting for a thread to pick up instead of throwing exceptions.
769             queue.put(r);
770           } catch (InterruptedException e) {
771             throw new RejectedExecutionException(e);
772           }
773         }
774       });
775     ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
776     return pool;
777   }
778 
779   /**
780    * Creates the encyption context.
781    * @param conf The current configuration.
782    * @param family The current column descriptor.
783    * @return The encryption context.
784    * @throws IOException
785    */
786   public static Encryption.Context createEncryptionContext(Configuration conf,
787     HColumnDescriptor family) throws IOException {
788     // TODO the code is repeated, and needs to be unified.
789     Encryption.Context cryptoContext = Encryption.Context.NONE;
790     String cipherName = family.getEncryptionType();
791     if (cipherName != null) {
792       Cipher cipher;
793       Key key;
794       byte[] keyBytes = family.getEncryptionKey();
795       if (keyBytes != null) {
796         // Family provides specific key material
797         String masterKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User
798           .getCurrent().getShortName());
799         try {
800           // First try the master key
801           key = EncryptionUtil.unwrapKey(conf, masterKeyName, keyBytes);
802         } catch (KeyException e) {
803           // If the current master key fails to unwrap, try the alternate, if
804           // one is configured
805           if (LOG.isDebugEnabled()) {
806             LOG.debug("Unable to unwrap key with current master key '" + masterKeyName + "'");
807           }
808           String alternateKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY);
809           if (alternateKeyName != null) {
810             try {
811               key = EncryptionUtil.unwrapKey(conf, alternateKeyName, keyBytes);
812             } catch (KeyException ex) {
813               throw new IOException(ex);
814             }
815           } else {
816             throw new IOException(e);
817           }
818         }
819         // Use the algorithm the key wants
820         cipher = Encryption.getCipher(conf, key.getAlgorithm());
821         if (cipher == null) {
822           throw new RuntimeException("Cipher '" + key.getAlgorithm() + "' is not available");
823         }
824         // Fail if misconfigured
825         // We use the encryption type specified in the column schema as a sanity check on
826         // what the wrapped key is telling us
827         if (!cipher.getName().equalsIgnoreCase(cipherName)) {
828           throw new RuntimeException("Encryption for family '" + family.getNameAsString()
829             + "' configured with type '" + cipherName + "' but key specifies algorithm '"
830             + cipher.getName() + "'");
831         }
832       } else {
833         // Family does not provide key material, create a random key
834         cipher = Encryption.getCipher(conf, cipherName);
835         if (cipher == null) {
836           throw new RuntimeException("Cipher '" + cipherName + "' is not available");
837         }
838         key = cipher.getRandomKey();
839       }
840       cryptoContext = Encryption.newContext(conf);
841       cryptoContext.setCipher(cipher);
842       cryptoContext.setKey(key);
843     }
844     return cryptoContext;
845   }
846 
847   /**
848    * Checks whether this table has mob-enabled columns.
849    * @param htd The current table descriptor.
850    * @return Whether this table has mob-enabled columns.
851    */
852   public static boolean hasMobColumns(HTableDescriptor htd) {
853     HColumnDescriptor[] hcds = htd.getColumnFamilies();
854     for (HColumnDescriptor hcd : hcds) {
855       if (hcd.isMobEnabled()) {
856         return true;
857       }
858     }
859     return false;
860   }
861 
862   /**
863    * Indicates whether return null value when the mob file is missing or corrupt.
864    * The information is set in the attribute "empty.value.on.mobcell.miss" of scan.
865    * @param scan The current scan.
866    * @return True if the readEmptyValueOnMobCellMiss is enabled.
867    */
868   public static boolean isReadEmptyValueOnMobCellMiss(Scan scan) {
869     byte[] readEmptyValueOnMobCellMiss = scan.getAttribute(MobConstants.EMPTY_VALUE_ON_MOBCELL_MISS);
870     try {
871       return readEmptyValueOnMobCellMiss != null && Bytes.toBoolean(readEmptyValueOnMobCellMiss);
872     } catch (IllegalArgumentException e) {
873       return false;
874     }
875   }
876 
877   /**
878    * Archive mob store files
879    * @param conf The current configuration.
880    * @param fs The current file system.
881    * @param mobRegionInfo The mob family region info.
882    * @param mobFamilyDir The mob family directory.
883    * @param family The name of the column family.
884    * @throws IOException
885    */
886   public static void archiveMobStoreFiles(Configuration conf, FileSystem fs,
887       HRegionInfo mobRegionInfo, Path mobFamilyDir, byte[] family) throws IOException {
888     // disable the block cache.
889     Configuration copyOfConf = HBaseConfiguration.create(conf);
890     copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
891     CacheConfig cacheConfig = new CacheConfig(copyOfConf);
892 
893     FileStatus[] fileStatus = FSUtils.listStatus(fs, mobFamilyDir);
894     List<StoreFile> storeFileList = new ArrayList<StoreFile>();
895     for (FileStatus file : fileStatus) {
896       storeFileList.add(new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE));
897     }
898     HFileArchiver.archiveStoreFiles(conf, fs, mobRegionInfo, mobFamilyDir, family, storeFileList);
899   }
900   /**
901    * Creates a mob ref delete marker.
902    * @param cell The current delete marker.
903    * @return A delete marker with the ref tag.
904    */
905   public static Cell createMobRefDeleteMarker(Cell cell) {
906     List<Tag> refTag = new ArrayList<Tag>();
907     refTag.add(MobConstants.MOB_REF_TAG);
908     List<Tag> tags = Tag.carryForwardTags(refTag, cell);
909     KeyValue reference = new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
910         cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
911         cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
912         cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()), cell.getValueArray(),
913         cell.getValueOffset(), cell.getValueLength(), tags);
914     reference.setSequenceId(cell.getSequenceId());
915     return reference;
916   }
917 
918   /**
919    * Checks if the mob file is expired.
920    * @param column The descriptor of the current column family.
921    * @param current The current time.
922    * @param fileDate The date string parsed from the mob file name.
923    * @return True if the mob file is expired.
924    */
925   public static boolean isMobFileExpired(HColumnDescriptor column, long current, String fileDate) {
926     if (column.getMinVersions() > 0) {
927       return false;
928     }
929     long timeToLive = column.getTimeToLive();
930     if (Integer.MAX_VALUE == timeToLive) {
931       return false;
932     }
933 
934     Date expireDate = new Date(current - timeToLive * 1000);
935     expireDate = new Date(expireDate.getYear(), expireDate.getMonth(), expireDate.getDate());
936     try {
937       Date date = parseDate(fileDate);
938       if (date.getTime() < expireDate.getTime()) {
939         return true;
940       }
941     } catch (ParseException e) {
942       LOG.warn("Failed to parse the date " + fileDate, e);
943       return false;
944     }
945     return false;
946   }
947 }