View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import com.google.common.annotations.VisibleForTesting;
22  import com.google.common.base.Function;
23  import com.google.common.base.Preconditions;
24  import com.google.common.collect.ImmutableList;
25  import com.google.common.collect.Ordering;
26  
27  import java.io.DataInput;
28  import java.io.IOException;
29  import java.net.InetSocketAddress;
30  import java.nio.ByteBuffer;
31  import java.util.Arrays;
32  import java.util.Collection;
33  import java.util.Collections;
34  import java.util.Comparator;
35  import java.util.Map;
36  import java.util.SortedSet;
37  import java.util.UUID;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.fs.FileSystem;
44  import org.apache.hadoop.fs.Path;
45  import org.apache.hadoop.hbase.Cell;
46  import org.apache.hadoop.hbase.CellUtil;
47  import org.apache.hadoop.hbase.HConstants;
48  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
49  import org.apache.hadoop.hbase.KeyValue;
50  import org.apache.hadoop.hbase.KeyValue.KVComparator;
51  import org.apache.hadoop.hbase.KeyValueUtil;
52  import org.apache.hadoop.hbase.classification.InterfaceAudience;
53  import org.apache.hadoop.hbase.client.Scan;
54  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
55  import org.apache.hadoop.hbase.io.hfile.BlockType;
56  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
57  import org.apache.hadoop.hbase.io.hfile.HFile;
58  import org.apache.hadoop.hbase.io.hfile.HFileContext;
59  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
60  import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
61  import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
62  import org.apache.hadoop.hbase.util.BloomFilter;
63  import org.apache.hadoop.hbase.util.BloomFilterFactory;
64  import org.apache.hadoop.hbase.util.BloomFilterWriter;
65  import org.apache.hadoop.hbase.util.Bytes;
66  import org.apache.hadoop.hbase.util.Writables;
67  import org.apache.hadoop.io.WritableUtils;
68  
69  import com.google.common.base.Function;
70  import com.google.common.base.Preconditions;
71  import com.google.common.collect.ImmutableList;
72  import com.google.common.collect.Ordering;
73  
74  /**
75   * A Store data file.  Stores usually have one or more of these files.  They
76   * are produced by flushing the memstore to disk.  To
77   * create, instantiate a writer using {@link StoreFile.WriterBuilder}
78   * and append data. Be sure to add any metadata before calling close on the
79   * Writer (Use the appendMetadata convenience methods). On close, a StoreFile
80   * is sitting in the Filesystem.  To refer to it, create a StoreFile instance
81   * passing filesystem and path.  To read, call {@link #createReader()}.
82   * <p>StoreFiles may also reference store files in another Store.
83   *
84   * The reason for this weird pattern where you use a different instance for the
85   * writer and a reader is that we write once but read a lot more.
86   */
87  @InterfaceAudience.LimitedPrivate("Coprocessor")
88  public class StoreFile {
89    static final Log LOG = LogFactory.getLog(StoreFile.class.getName());
90  
91    // Keys for fileinfo values in HFile
92  
93    /** Max Sequence ID in FileInfo */
94    public static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
95  
96    /** Major compaction flag in FileInfo */
97    public static final byte[] MAJOR_COMPACTION_KEY =
98        Bytes.toBytes("MAJOR_COMPACTION_KEY");
99  
100   /** Minor compaction flag in FileInfo */
101   public static final byte[] EXCLUDE_FROM_MINOR_COMPACTION_KEY =
102       Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION");
103 
104   /** Bloom filter Type in FileInfo */
105   public static final byte[] BLOOM_FILTER_TYPE_KEY =
106       Bytes.toBytes("BLOOM_FILTER_TYPE");
107 
108   /** Delete Family Count in FileInfo */
109   public static final byte[] DELETE_FAMILY_COUNT =
110       Bytes.toBytes("DELETE_FAMILY_COUNT");
111 
112   /** Last Bloom filter key in FileInfo */
113   private static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY");
114 
115   /** Key for Timerange information in metadata*/
116   public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
117 
118   /** Key for timestamp of earliest-put in metadata*/
119   public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS");
120 
121   /** Key for the number of mob cells in metadata*/
122   public static final byte[] MOB_CELLS_COUNT = Bytes.toBytes("MOB_CELLS_COUNT");
123 
124   private final StoreFileInfo fileInfo;
125   private final FileSystem fs;
126 
127   // Block cache configuration and reference.
128   private final CacheConfig cacheConf;
129 
130   // Keys for metadata stored in backing HFile.
131   // Set when we obtain a Reader.
132   private long sequenceid = -1;
133 
134   // max of the MemstoreTS in the KV's in this store
135   // Set when we obtain a Reader.
136   private long maxMemstoreTS = -1;
137 
138   CacheConfig getCacheConf() {
139     return cacheConf;
140   }
141 
142   public long getMaxMemstoreTS() {
143     return maxMemstoreTS;
144   }
145 
146   public void setMaxMemstoreTS(long maxMemstoreTS) {
147     this.maxMemstoreTS = maxMemstoreTS;
148   }
149 
150   // If true, this file was product of a major compaction.  Its then set
151   // whenever you get a Reader.
152   private AtomicBoolean majorCompaction = null;
153 
154   // If true, this file should not be included in minor compactions.
155   // It's set whenever you get a Reader.
156   private boolean excludeFromMinorCompaction = false;
157 
158   /** Meta key set when store file is a result of a bulk load */
159   public static final byte[] BULKLOAD_TASK_KEY =
160     Bytes.toBytes("BULKLOAD_SOURCE_TASK");
161   public static final byte[] BULKLOAD_TIME_KEY =
162     Bytes.toBytes("BULKLOAD_TIMESTAMP");
163 
164   /**
165    * Map of the metadata entries in the corresponding HFile
166    */
167   private Map<byte[], byte[]> metadataMap;
168 
169   // StoreFile.Reader
170   private volatile Reader reader;
171 
172   /**
173    * Bloom filter type specified in column family configuration. Does not
174    * necessarily correspond to the Bloom filter type present in the HFile.
175    */
176   private final BloomType cfBloomType;
177 
178   /**
179    * Key for skipping resetting sequence id in metadata.
180    * For bulk loaded hfiles, the scanner resets the cell seqId with the latest one,
181    * if this metadata is set as true, the reset is skipped.
182    */
183   public static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID");
184 
185   /**
186    * Constructor, loads a reader and it's indices, etc. May allocate a
187    * substantial amount of ram depending on the underlying files (10-20MB?).
188    *
189    * @param fs  The current file system to use.
190    * @param p  The path of the file.
191    * @param conf  The current configuration.
192    * @param cacheConf  The cache configuration and block cache reference.
193    * @param cfBloomType The bloom type to use for this store file as specified
194    *          by column family configuration. This may or may not be the same
195    *          as the Bloom filter type actually present in the HFile, because
196    *          column family configuration might change. If this is
197    *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
198    * @throws IOException When opening the reader fails.
199    */
200   public StoreFile(final FileSystem fs, final Path p, final Configuration conf,
201         final CacheConfig cacheConf, final BloomType cfBloomType) throws IOException {
202     this(fs, new StoreFileInfo(conf, fs, p), conf, cacheConf, cfBloomType);
203   }
204 
205 
206   /**
207    * Constructor, loads a reader and it's indices, etc. May allocate a
208    * substantial amount of ram depending on the underlying files (10-20MB?).
209    *
210    * @param fs  The current file system to use.
211    * @param fileInfo  The store file information.
212    * @param conf  The current configuration.
213    * @param cacheConf  The cache configuration and block cache reference.
214    * @param cfBloomType The bloom type to use for this store file as specified
215    *          by column family configuration. This may or may not be the same
216    *          as the Bloom filter type actually present in the HFile, because
217    *          column family configuration might change. If this is
218    *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
219    * @throws IOException When opening the reader fails.
220    */
221   public StoreFile(final FileSystem fs, final StoreFileInfo fileInfo, final Configuration conf,
222       final CacheConfig cacheConf,  final BloomType cfBloomType) throws IOException {
223     this.fs = fs;
224     this.fileInfo = fileInfo;
225     this.cacheConf = cacheConf;
226 
227     if (BloomFilterFactory.isGeneralBloomEnabled(conf)) {
228       this.cfBloomType = cfBloomType;
229     } else {
230       LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " +
231           "cfBloomType=" + cfBloomType + " (disabled in config)");
232       this.cfBloomType = BloomType.NONE;
233     }
234   }
235 
236   /**
237    * Clone
238    * @param other The StoreFile to clone from
239    */
240   public StoreFile(final StoreFile other) {
241     this.fs = other.fs;
242     this.fileInfo = other.fileInfo;
243     this.cacheConf = other.cacheConf;
244     this.cfBloomType = other.cfBloomType;
245   }
246 
247   /**
248    * Clone a StoreFile for opening private reader.
249    */
250   public StoreFile cloneForReader() {
251     return new StoreFile(this);
252   }
253 
254   /**
255    * @return the StoreFile object associated to this StoreFile.
256    *         null if the StoreFile is not a reference.
257    */
258   public StoreFileInfo getFileInfo() {
259     return this.fileInfo;
260   }
261 
262   /**
263    * @return Path or null if this StoreFile was made with a Stream.
264    */
265   public Path getPath() {
266     return this.fileInfo.getPath();
267   }
268 
269   /**
270    * @return Returns the qualified path of this StoreFile
271    */
272   public Path getQualifiedPath() {
273     return this.fileInfo.getPath().makeQualified(fs);
274   }
275 
276   /**
277    * @return True if this is a StoreFile Reference; call after {@link #open()}
278    * else may get wrong answer.
279    */
280   public boolean isReference() {
281     return this.fileInfo.isReference();
282   }
283 
284   /**
285    * @return True if this is HFile.
286    */
287   public boolean isHFile() {
288     return this.fileInfo.isHFile(this.fileInfo.getPath());
289   }
290 
291   /**
292    * @return True if this file was made by a major compaction.
293    */
294   public boolean isMajorCompaction() {
295     if (this.majorCompaction == null) {
296       throw new NullPointerException("This has not been set yet");
297     }
298     return this.majorCompaction.get();
299   }
300 
301   /**
302    * @return True if this file should not be part of a minor compaction.
303    */
304   public boolean excludeFromMinorCompaction() {
305     return this.excludeFromMinorCompaction;
306   }
307 
308   /**
309    * @return This files maximum edit sequence id.
310    */
311   public long getMaxSequenceId() {
312     return this.sequenceid;
313   }
314 
315   public long getModificationTimeStamp() throws IOException {
316     return (fileInfo == null) ? 0 : fileInfo.getModificationTime();
317   }
318 
319   /**
320    * Only used by the Striped Compaction Policy
321    * @param key
322    * @return value associated with the metadata key
323    */
324   public byte[] getMetadataValue(byte[] key) {
325     return metadataMap.get(key);
326   }
327 
328   /**
329    * Return the largest memstoreTS found across all storefiles in
330    * the given list. Store files that were created by a mapreduce
331    * bulk load are ignored, as they do not correspond to any specific
332    * put operation, and thus do not have a memstoreTS associated with them.
333    * @return 0 if no non-bulk-load files are provided or, this is Store that
334    * does not yet have any store files.
335    */
336   public static long getMaxMemstoreTSInList(Collection<StoreFile> sfs) {
337     long max = 0;
338     for (StoreFile sf : sfs) {
339       if (!sf.isBulkLoadResult()) {
340         max = Math.max(max, sf.getMaxMemstoreTS());
341       }
342     }
343     return max;
344   }
345 
346   /**
347    * Return the highest sequence ID found across all storefiles in
348    * the given list.
349    * @param sfs
350    * @return 0 if no non-bulk-load files are provided or, this is Store that
351    * does not yet have any store files.
352    */
353   public static long getMaxSequenceIdInList(Collection<StoreFile> sfs) {
354     long max = 0;
355     for (StoreFile sf : sfs) {
356       max = Math.max(max, sf.getMaxSequenceId());
357     }
358     return max;
359   }
360 
361   /**
362    * Check if this storefile was created by bulk load.
363    * When a hfile is bulk loaded into HBase, we append
364    * '_SeqId_<id-when-loaded>' to the hfile name, unless
365    * "hbase.mapreduce.bulkload.assign.sequenceNumbers" is
366    * explicitly turned off.
367    * If "hbase.mapreduce.bulkload.assign.sequenceNumbers"
368    * is turned off, fall back to BULKLOAD_TIME_KEY.
369    * @return true if this storefile was created by bulk load.
370    */
371   public boolean isBulkLoadResult() {
372     boolean bulkLoadedHFile = false;
373     String fileName = this.getPath().getName();
374     int startPos = fileName.indexOf("SeqId_");
375     if (startPos != -1) {
376       bulkLoadedHFile = true;
377     }
378     return bulkLoadedHFile || metadataMap.containsKey(BULKLOAD_TIME_KEY);
379   }
380 
381   /**
382    * Return the timestamp at which this bulk load file was generated.
383    */
384   public long getBulkLoadTimestamp() {
385     byte[] bulkLoadTimestamp = metadataMap.get(BULKLOAD_TIME_KEY);
386     return (bulkLoadTimestamp == null) ? 0 : Bytes.toLong(bulkLoadTimestamp);
387   }
388 
389   /**
390    * @return the cached value of HDFS blocks distribution. The cached value is
391    * calculated when store file is opened.
392    */
393   public HDFSBlocksDistribution getHDFSBlockDistribution() {
394     return this.fileInfo.getHDFSBlockDistribution();
395   }
396 
397   /**
398    * Opens reader on this store file.  Called by Constructor.
399    * @return Reader for the store file.
400    * @throws IOException
401    * @see #closeReader(boolean)
402    */
403   private Reader open() throws IOException {
404     if (this.reader != null) {
405       throw new IllegalAccessError("Already open");
406     }
407 
408     // Open the StoreFile.Reader
409     this.reader = fileInfo.open(this.fs, this.cacheConf);
410 
411     // Load up indices and fileinfo. This also loads Bloom filter type.
412     metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
413 
414     // Read in our metadata.
415     byte [] b = metadataMap.get(MAX_SEQ_ID_KEY);
416     if (b != null) {
417       // By convention, if halfhfile, top half has a sequence number > bottom
418       // half. Thats why we add one in below. Its done for case the two halves
419       // are ever merged back together --rare.  Without it, on open of store,
420       // since store files are distinguished by sequence id, the one half would
421       // subsume the other.
422       this.sequenceid = Bytes.toLong(b);
423       if (fileInfo.isTopReference()) {
424         this.sequenceid += 1;
425       }
426     }
427 
428     if (isBulkLoadResult()){
429       // generate the sequenceId from the fileName
430       // fileName is of the form <randomName>_SeqId_<id-when-loaded>_
431       String fileName = this.getPath().getName();
432       // Use lastIndexOf() to get the last, most recent bulk load seqId.
433       int startPos = fileName.lastIndexOf("SeqId_");
434       if (startPos != -1) {
435         this.sequenceid = Long.parseLong(fileName.substring(startPos + 6,
436             fileName.indexOf('_', startPos + 6)));
437         // Handle reference files as done above.
438         if (fileInfo.isTopReference()) {
439           this.sequenceid += 1;
440         }
441       }
442       // SKIP_RESET_SEQ_ID only works in bulk loaded file.
443       // In mob compaction, the hfile where the cells contain the path of a new mob file is bulk
444       // loaded to hbase, these cells have the same seqIds with the old ones. We do not want
445       // to reset new seqIds for them since this might make a mess of the visibility of cells that
446       // have the same row key but different seqIds.
447       boolean skipResetSeqId = isSkipResetSeqId(metadataMap.get(SKIP_RESET_SEQ_ID));
448       if (skipResetSeqId) {
449         // increase the seqId when it is a bulk loaded file from mob compaction.
450         this.sequenceid += 1;
451       }
452       this.reader.setSkipResetSeqId(skipResetSeqId);
453       this.reader.setBulkLoaded(true);
454     }
455     this.reader.setSequenceID(this.sequenceid);
456 
457     b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
458     if (b != null) {
459       this.maxMemstoreTS = Bytes.toLong(b);
460     }
461 
462     b = metadataMap.get(MAJOR_COMPACTION_KEY);
463     if (b != null) {
464       boolean mc = Bytes.toBoolean(b);
465       if (this.majorCompaction == null) {
466         this.majorCompaction = new AtomicBoolean(mc);
467       } else {
468         this.majorCompaction.set(mc);
469       }
470     } else {
471       // Presume it is not major compacted if it doesn't explicity say so
472       // HFileOutputFormat explicitly sets the major compacted key.
473       this.majorCompaction = new AtomicBoolean(false);
474     }
475 
476     b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY);
477     this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b));
478 
479     BloomType hfileBloomType = reader.getBloomFilterType();
480     if (cfBloomType != BloomType.NONE) {
481       reader.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
482       if (hfileBloomType != cfBloomType) {
483         LOG.info("HFile Bloom filter type for "
484             + reader.getHFileReader().getName() + ": " + hfileBloomType
485             + ", but " + cfBloomType + " specified in column family "
486             + "configuration");
487       }
488     } else if (hfileBloomType != BloomType.NONE) {
489       LOG.info("Bloom filter turned off by CF config for "
490           + reader.getHFileReader().getName());
491     }
492 
493     // load delete family bloom filter
494     reader.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
495 
496     try {
497       byte [] timerangeBytes = metadataMap.get(TIMERANGE_KEY);
498       if (timerangeBytes != null) {
499         this.reader.timeRangeTracker = new TimeRangeTracker();
500         Writables.copyWritable(timerangeBytes, this.reader.timeRangeTracker);
501       }
502     } catch (IllegalArgumentException e) {
503       LOG.error("Error reading timestamp range data from meta -- " +
504           "proceeding without", e);
505       this.reader.timeRangeTracker = null;
506     }
507     return this.reader;
508   }
509 
510   /**
511    * @return Reader for StoreFile. creates if necessary
512    * @throws IOException
513    */
514   public Reader createReader() throws IOException {
515     if (this.reader == null) {
516       try {
517         this.reader = open();
518       } catch (IOException e) {
519         try {
520           boolean evictOnClose =
521               cacheConf != null? cacheConf.shouldEvictOnClose(): true; 
522           this.closeReader(evictOnClose);
523         } catch (IOException ee) {
524         }
525         throw e;
526       }
527 
528     }
529     return this.reader;
530   }
531 
532   /**
533    * @return Current reader.  Must call createReader first else returns null.
534    * @see #createReader()
535    */
536   public Reader getReader() {
537     return this.reader;
538   }
539 
540   /**
541    * @param evictOnClose whether to evict blocks belonging to this file
542    * @throws IOException
543    */
544   public synchronized void closeReader(boolean evictOnClose)
545       throws IOException {
546     if (this.reader != null) {
547       this.reader.close(evictOnClose);
548       this.reader = null;
549     }
550   }
551 
552   /**
553    * Delete this file
554    * @throws IOException
555    */
556   public void deleteReader() throws IOException {
557     boolean evictOnClose =
558         cacheConf != null? cacheConf.shouldEvictOnClose(): true; 
559     closeReader(evictOnClose);
560     this.fs.delete(getPath(), true);
561   }
562 
563   @Override
564   public String toString() {
565     return this.fileInfo.toString();
566   }
567 
568   /**
569    * @return a length description of this StoreFile, suitable for debug output
570    */
571   public String toStringDetailed() {
572     StringBuilder sb = new StringBuilder();
573     sb.append(this.getPath().toString());
574     sb.append(", isReference=").append(isReference());
575     sb.append(", isBulkLoadResult=").append(isBulkLoadResult());
576     if (isBulkLoadResult()) {
577       sb.append(", bulkLoadTS=").append(getBulkLoadTimestamp());
578     } else {
579       sb.append(", seqid=").append(getMaxSequenceId());
580     }
581     sb.append(", majorCompaction=").append(isMajorCompaction());
582 
583     return sb.toString();
584   }
585 
586   /**
587    * Gets whether to skip resetting the sequence id for cells.
588    * @param skipResetSeqId The byte array of boolean.
589    * @return Whether to skip resetting the sequence id.
590    */
591   private boolean isSkipResetSeqId(byte[] skipResetSeqId) {
592     if (skipResetSeqId != null && skipResetSeqId.length == 1) {
593       return Bytes.toBoolean(skipResetSeqId);
594     }
595     return false;
596   }
597 
598   public static class WriterBuilder {
599     private final Configuration conf;
600     private final CacheConfig cacheConf;
601     private final FileSystem fs;
602 
603     private KeyValue.KVComparator comparator = KeyValue.COMPARATOR;
604     private BloomType bloomType = BloomType.NONE;
605     private long maxKeyCount = 0;
606     private Path dir;
607     private Path filePath;
608     private InetSocketAddress[] favoredNodes;
609     private HFileContext fileContext;
610     public WriterBuilder(Configuration conf, CacheConfig cacheConf,
611         FileSystem fs) {
612       this.conf = conf;
613       this.cacheConf = cacheConf;
614       this.fs = fs;
615     }
616 
617     /**
618      * Use either this method or {@link #withFilePath}, but not both.
619      * @param dir Path to column family directory. The directory is created if
620      *          does not exist. The file is given a unique name within this
621      *          directory.
622      * @return this (for chained invocation)
623      */
624     public WriterBuilder withOutputDir(Path dir) {
625       Preconditions.checkNotNull(dir);
626       this.dir = dir;
627       return this;
628     }
629 
630     /**
631      * Use either this method or {@link #withOutputDir}, but not both.
632      * @param filePath the StoreFile path to write
633      * @return this (for chained invocation)
634      */
635     public WriterBuilder withFilePath(Path filePath) {
636       Preconditions.checkNotNull(filePath);
637       this.filePath = filePath;
638       return this;
639     }
640 
641     /**
642      * @param favoredNodes an array of favored nodes or possibly null
643      * @return this (for chained invocation)
644      */
645     public WriterBuilder withFavoredNodes(InetSocketAddress[] favoredNodes) {
646       this.favoredNodes = favoredNodes;
647       return this;
648     }
649 
650     public WriterBuilder withComparator(KeyValue.KVComparator comparator) {
651       Preconditions.checkNotNull(comparator);
652       this.comparator = comparator;
653       return this;
654     }
655 
656     public WriterBuilder withBloomType(BloomType bloomType) {
657       Preconditions.checkNotNull(bloomType);
658       this.bloomType = bloomType;
659       return this;
660     }
661 
662     /**
663      * @param maxKeyCount estimated maximum number of keys we expect to add
664      * @return this (for chained invocation)
665      */
666     public WriterBuilder withMaxKeyCount(long maxKeyCount) {
667       this.maxKeyCount = maxKeyCount;
668       return this;
669     }
670 
671     public WriterBuilder withFileContext(HFileContext fileContext) {
672       this.fileContext = fileContext;
673       return this;
674     }
675     /**
676      * Create a store file writer. Client is responsible for closing file when
677      * done. If metadata, add BEFORE closing using
678      * {@link Writer#appendMetadata}.
679      */
680     public Writer build() throws IOException {
681       if ((dir == null ? 0 : 1) + (filePath == null ? 0 : 1) != 1) {
682         throw new IllegalArgumentException("Either specify parent directory " +
683             "or file path");
684       }
685 
686       if (dir == null) {
687         dir = filePath.getParent();
688       }
689 
690       if (!fs.exists(dir)) {
691         HRegionFileSystem.mkdirs(fs, conf, dir);
692       }
693 
694       if (filePath == null) {
695         filePath = getUniqueFile(fs, dir);
696         if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
697           bloomType = BloomType.NONE;
698         }
699       }
700 
701       if (comparator == null) {
702         comparator = KeyValue.COMPARATOR;
703       }
704       return new Writer(fs, filePath,
705           conf, cacheConf, comparator, bloomType, maxKeyCount, favoredNodes, fileContext);
706     }
707   }
708 
709   /**
710    * @param fs
711    * @param dir Directory to create file in.
712    * @return random filename inside passed <code>dir</code>
713    */
714   public static Path getUniqueFile(final FileSystem fs, final Path dir)
715       throws IOException {
716     if (!fs.getFileStatus(dir).isDirectory()) {
717       throw new IOException("Expecting " + dir.toString() +
718         " to be a directory");
719     }
720     return new Path(dir, UUID.randomUUID().toString().replaceAll("-", ""));
721   }
722 
723   public Long getMinimumTimestamp() {
724     return (getReader().timeRangeTracker == null) ?
725         null :
726         getReader().timeRangeTracker.getMinimumTimestamp();
727   }
728 
729   public Long getMaximumTimestamp() {
730     return (getReader().timeRangeTracker == null) ?
731         null :
732         getReader().timeRangeTracker.getMaximumTimestamp();
733   }
734 
735 
736   /**
737    * Gets the approximate mid-point of this file that is optimal for use in splitting it.
738    * @param comparator Comparator used to compare KVs.
739    * @return The split point row, or null if splitting is not possible, or reader is null.
740    */
741   @SuppressWarnings("deprecation")
742   byte[] getFileSplitPoint(KVComparator comparator) throws IOException {
743     if (this.reader == null) {
744       LOG.warn("Storefile " + this + " Reader is null; cannot get split point");
745       return null;
746     }
747     // Get first, last, and mid keys.  Midkey is the key that starts block
748     // in middle of hfile.  Has column and timestamp.  Need to return just
749     // the row we want to split on as midkey.
750     byte [] midkey = this.reader.midkey();
751     if (midkey != null) {
752       KeyValue mk = KeyValue.createKeyValueFromKey(midkey, 0, midkey.length);
753       byte [] fk = this.reader.getFirstKey();
754       KeyValue firstKey = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
755       byte [] lk = this.reader.getLastKey();
756       KeyValue lastKey = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
757       // if the midkey is the same as the first or last keys, we cannot (ever) split this region.
758       if (comparator.compareRows(mk, firstKey) == 0 || comparator.compareRows(mk, lastKey) == 0) {
759         if (LOG.isDebugEnabled()) {
760           LOG.debug("cannot split because midkey is the same as first or last row");
761         }
762         return null;
763       }
764       return mk.getRow();
765     }
766     return null;
767   }
768 
769   /**
770    * A StoreFile writer.  Use this to read/write HBase Store Files. It is package
771    * local because it is an implementation detail of the HBase regionserver.
772    */
773   public static class Writer implements Compactor.CellSink {
774     private final BloomFilterWriter generalBloomFilterWriter;
775     private final BloomFilterWriter deleteFamilyBloomFilterWriter;
776     private final BloomType bloomType;
777     private byte[] lastBloomKey;
778     private int lastBloomKeyOffset, lastBloomKeyLen;
779     private KVComparator kvComparator;
780     private Cell lastCell = null;
781     private long earliestPutTs = HConstants.LATEST_TIMESTAMP;
782     private Cell lastDeleteFamilyCell = null;
783     private long deleteFamilyCnt = 0;
784 
785     /** Bytes per Checksum */
786     protected int bytesPerChecksum;
787 
788     TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
789     /* isTimeRangeTrackerSet keeps track if the timeRange has already been set
790      * When flushing a memstore, we set TimeRange and use this variable to
791      * indicate that it doesn't need to be calculated again while
792      * appending KeyValues.
793      * It is not set in cases of compactions when it is recalculated using only
794      * the appended KeyValues*/
795     boolean isTimeRangeTrackerSet = false;
796 
797     protected HFile.Writer writer;
798 
799     /**
800      * Creates an HFile.Writer that also write helpful meta data.
801      * @param fs file system to write to
802      * @param path file name to create
803      * @param conf user configuration
804      * @param comparator key comparator
805      * @param bloomType bloom filter setting
806      * @param maxKeys the expected maximum number of keys to be added. Was used
807      *        for Bloom filter size in {@link HFile} format version 1.
808      * @param favoredNodes
809      * @param fileContext - The HFile context
810      * @throws IOException problem writing to FS
811      */
812     private Writer(FileSystem fs, Path path,
813         final Configuration conf,
814         CacheConfig cacheConf,
815         final KVComparator comparator, BloomType bloomType, long maxKeys,
816         InetSocketAddress[] favoredNodes, HFileContext fileContext)
817             throws IOException {
818       writer = HFile.getWriterFactory(conf, cacheConf)
819           .withPath(fs, path)
820           .withComparator(comparator)
821           .withFavoredNodes(favoredNodes)
822           .withFileContext(fileContext)
823           .create();
824 
825       this.kvComparator = comparator;
826 
827       generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(
828           conf, cacheConf, bloomType,
829           (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
830 
831       if (generalBloomFilterWriter != null) {
832         this.bloomType = bloomType;
833         if (LOG.isTraceEnabled()) LOG.trace("Bloom filter type for " + path + ": " +
834           this.bloomType + ", " + generalBloomFilterWriter.getClass().getSimpleName());
835       } else {
836         // Not using Bloom filters.
837         this.bloomType = BloomType.NONE;
838       }
839 
840       // initialize delete family Bloom filter when there is NO RowCol Bloom
841       // filter
842       if (this.bloomType != BloomType.ROWCOL) {
843         this.deleteFamilyBloomFilterWriter = BloomFilterFactory
844             .createDeleteBloomAtWrite(conf, cacheConf,
845                 (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
846       } else {
847         deleteFamilyBloomFilterWriter = null;
848       }
849       if (deleteFamilyBloomFilterWriter != null) {
850         if (LOG.isTraceEnabled()) LOG.trace("Delete Family Bloom filter type for " + path + ": "
851             + deleteFamilyBloomFilterWriter.getClass().getSimpleName());
852       }
853     }
854 
855     /**
856      * Writes meta data.
857      * Call before {@link #close()} since its written as meta data to this file.
858      * @param maxSequenceId Maximum sequence id.
859      * @param majorCompaction True if this file is product of a major compaction
860      * @throws IOException problem writing to FS
861      */
862     public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
863     throws IOException {
864       writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
865       writer.appendFileInfo(MAJOR_COMPACTION_KEY,
866           Bytes.toBytes(majorCompaction));
867       appendTrackedTimestampsToMetadata();
868     }
869 
870     /**
871      * Writes meta data.
872      * Call before {@link #close()} since its written as meta data to this file.
873      * @param maxSequenceId Maximum sequence id.
874      * @param majorCompaction True if this file is product of a major compaction
875      * @param mobCellsCount The number of mob cells.
876      * @throws IOException problem writing to FS
877      */
878     public void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
879         final long mobCellsCount) throws IOException {
880       writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
881       writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
882       writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount));
883       appendTrackedTimestampsToMetadata();
884     }
885 
886     /**
887      * Add TimestampRange and earliest put timestamp to Metadata
888      */
889     public void appendTrackedTimestampsToMetadata() throws IOException {
890       appendFileInfo(TIMERANGE_KEY,WritableUtils.toByteArray(timeRangeTracker));
891       appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
892     }
893 
894     /**
895      * Set TimeRangeTracker
896      * @param trt
897      */
898     public void setTimeRangeTracker(final TimeRangeTracker trt) {
899       this.timeRangeTracker = trt;
900       isTimeRangeTrackerSet = true;
901     }
902 
903     /**
904      * Record the earlest Put timestamp.
905      *
906      * If the timeRangeTracker is not set,
907      * update TimeRangeTracker to include the timestamp of this key
908      * @param cell
909      */
910     public void trackTimestamps(final Cell cell) {
911       if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) {
912         earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp());
913       }
914       if (!isTimeRangeTrackerSet) {
915         timeRangeTracker.includeTimestamp(cell);
916       }
917     }
918 
919     private void appendGeneralBloomfilter(final Cell cell) throws IOException {
920       if (this.generalBloomFilterWriter != null) {
921         // only add to the bloom filter on a new, unique key
922         boolean newKey = true;
923         if (this.lastCell != null) {
924           switch(bloomType) {
925           case ROW:
926             newKey = ! kvComparator.matchingRows(cell, lastCell);
927             break;
928           case ROWCOL:
929             newKey = ! kvComparator.matchingRowColumn(cell, lastCell);
930             break;
931           case NONE:
932             newKey = false;
933             break;
934           default:
935             throw new IOException("Invalid Bloom filter type: " + bloomType +
936                 " (ROW or ROWCOL expected)");
937           }
938         }
939         if (newKey) {
940           /*
941            * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png
942            * Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + TimeStamp
943            *
944            * 2 Types of Filtering:
945            *  1. Row = Row
946            *  2. RowCol = Row + Qualifier
947            */
948           byte[] bloomKey;
949           int bloomKeyOffset, bloomKeyLen;
950 
951           switch (bloomType) {
952           case ROW:
953             bloomKey = cell.getRowArray();
954             bloomKeyOffset = cell.getRowOffset();
955             bloomKeyLen = cell.getRowLength();
956             break;
957           case ROWCOL:
958             // merge(row, qualifier)
959             // TODO: could save one buffer copy in case of compound Bloom
960             // filters when this involves creating a KeyValue
961             bloomKey = generalBloomFilterWriter.createBloomKey(cell.getRowArray(),
962                 cell.getRowOffset(), cell.getRowLength(), cell.getQualifierArray(),
963                 cell.getQualifierOffset(), cell.getQualifierLength());
964             bloomKeyOffset = 0;
965             bloomKeyLen = bloomKey.length;
966             break;
967           default:
968             throw new IOException("Invalid Bloom filter type: " + bloomType +
969                 " (ROW or ROWCOL expected)");
970           }
971           generalBloomFilterWriter.add(bloomKey, bloomKeyOffset, bloomKeyLen);
972           if (lastBloomKey != null
973               && generalBloomFilterWriter.getComparator().compareFlatKey(bloomKey,
974                   bloomKeyOffset, bloomKeyLen, lastBloomKey,
975                   lastBloomKeyOffset, lastBloomKeyLen) <= 0) {
976             throw new IOException("Non-increasing Bloom keys: "
977                 + Bytes.toStringBinary(bloomKey, bloomKeyOffset, bloomKeyLen)
978                 + " after "
979                 + Bytes.toStringBinary(lastBloomKey, lastBloomKeyOffset,
980                     lastBloomKeyLen));
981           }
982           lastBloomKey = bloomKey;
983           lastBloomKeyOffset = bloomKeyOffset;
984           lastBloomKeyLen = bloomKeyLen;
985           this.lastCell = cell;
986         }
987       }
988     }
989 
990     private void appendDeleteFamilyBloomFilter(final Cell cell)
991         throws IOException {
992       if (!CellUtil.isDeleteFamily(cell) && !CellUtil.isDeleteFamilyVersion(cell)) {
993         return;
994       }
995 
996       // increase the number of delete family in the store file
997       deleteFamilyCnt++;
998       if (null != this.deleteFamilyBloomFilterWriter) {
999         boolean newKey = true;
1000         if (lastDeleteFamilyCell != null) {
1001           newKey = !kvComparator.matchingRows(cell, lastDeleteFamilyCell);
1002         }
1003         if (newKey) {
1004           this.deleteFamilyBloomFilterWriter.add(cell.getRowArray(),
1005               cell.getRowOffset(), cell.getRowLength());
1006           this.lastDeleteFamilyCell = cell;
1007         }
1008       }
1009     }
1010 
1011     public void append(final Cell cell) throws IOException {
1012       appendGeneralBloomfilter(cell);
1013       appendDeleteFamilyBloomFilter(cell);
1014       writer.append(cell);
1015       trackTimestamps(cell);
1016     }
1017 
1018     public Path getPath() {
1019       return this.writer.getPath();
1020     }
1021 
1022     public boolean hasGeneralBloom() {
1023       return this.generalBloomFilterWriter != null;
1024     }
1025 
1026     /**
1027      * For unit testing only.
1028      *
1029      * @return the Bloom filter used by this writer.
1030      */
1031     BloomFilterWriter getGeneralBloomWriter() {
1032       return generalBloomFilterWriter;
1033     }
1034 
1035     private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException {
1036       boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0);
1037       if (haveBloom) {
1038         bfw.compactBloom();
1039       }
1040       return haveBloom;
1041     }
1042 
1043     private boolean closeGeneralBloomFilter() throws IOException {
1044       boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter);
1045 
1046       // add the general Bloom filter writer and append file info
1047       if (hasGeneralBloom) {
1048         writer.addGeneralBloomFilter(generalBloomFilterWriter);
1049         writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY,
1050             Bytes.toBytes(bloomType.toString()));
1051         if (lastBloomKey != null) {
1052           writer.appendFileInfo(LAST_BLOOM_KEY, Arrays.copyOfRange(
1053               lastBloomKey, lastBloomKeyOffset, lastBloomKeyOffset
1054                   + lastBloomKeyLen));
1055         }
1056       }
1057       return hasGeneralBloom;
1058     }
1059 
1060     private boolean closeDeleteFamilyBloomFilter() throws IOException {
1061       boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter);
1062 
1063       // add the delete family Bloom filter writer
1064       if (hasDeleteFamilyBloom) {
1065         writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter);
1066       }
1067 
1068       // append file info about the number of delete family kvs
1069       // even if there is no delete family Bloom.
1070       writer.appendFileInfo(DELETE_FAMILY_COUNT,
1071           Bytes.toBytes(this.deleteFamilyCnt));
1072 
1073       return hasDeleteFamilyBloom;
1074     }
1075 
1076     public void close() throws IOException {
1077       boolean hasGeneralBloom = this.closeGeneralBloomFilter();
1078       boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter();
1079 
1080       writer.close();
1081 
1082       // Log final Bloom filter statistics. This needs to be done after close()
1083       // because compound Bloom filters might be finalized as part of closing.
1084       if (StoreFile.LOG.isTraceEnabled()) {
1085         StoreFile.LOG.trace((hasGeneralBloom ? "" : "NO ") + "General Bloom and " +
1086           (hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily" + " was added to HFile " +
1087           getPath());
1088       }
1089 
1090     }
1091 
1092     public void appendFileInfo(byte[] key, byte[] value) throws IOException {
1093       writer.appendFileInfo(key, value);
1094     }
1095 
1096     /** For use in testing, e.g. {@link org.apache.hadoop.hbase.regionserver.CreateRandomStoreFile}
1097      */
1098     HFile.Writer getHFileWriter() {
1099       return writer;
1100     }
1101   }
1102 
1103   /**
1104    * Reader for a StoreFile.
1105    */
1106   public static class Reader {
1107     static final Log LOG = LogFactory.getLog(Reader.class.getName());
1108 
1109     protected BloomFilter generalBloomFilter = null;
1110     protected BloomFilter deleteFamilyBloomFilter = null;
1111     protected BloomType bloomFilterType;
1112     private final HFile.Reader reader;
1113     protected TimeRangeTracker timeRangeTracker = null;
1114     protected long sequenceID = -1;
1115     private byte[] lastBloomKey;
1116     private long deleteFamilyCnt = -1;
1117     private boolean bulkLoadResult = false;
1118     private boolean skipResetSeqId = true;
1119 
1120     public Reader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf)
1121         throws IOException {
1122       reader = HFile.createReader(fs, path, cacheConf, conf);
1123       bloomFilterType = BloomType.NONE;
1124     }
1125 
1126     public Reader(FileSystem fs, Path path, FSDataInputStreamWrapper in, long size,
1127         CacheConfig cacheConf, Configuration conf) throws IOException {
1128       reader = HFile.createReader(fs, path, in, size, cacheConf, conf);
1129       bloomFilterType = BloomType.NONE;
1130     }
1131 
1132     public void setReplicaStoreFile(boolean isPrimaryReplicaStoreFile) {
1133       reader.setPrimaryReplicaReader(isPrimaryReplicaStoreFile);
1134     }
1135     public boolean isPrimaryReplicaReader() {
1136       return reader.isPrimaryReplicaReader();
1137     }
1138 
1139     /**
1140      * ONLY USE DEFAULT CONSTRUCTOR FOR UNIT TESTS
1141      */
1142     Reader() {
1143       this.reader = null;
1144     }
1145 
1146     public KVComparator getComparator() {
1147       return reader.getComparator();
1148     }
1149 
1150     /**
1151      * Get a scanner to scan over this StoreFile. Do not use
1152      * this overload if using this scanner for compactions.
1153      *
1154      * @param cacheBlocks should this scanner cache blocks?
1155      * @param pread use pread (for highly concurrent small readers)
1156      * @return a scanner
1157      */
1158     public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
1159                                                boolean pread) {
1160       return getStoreFileScanner(cacheBlocks, pread, false,
1161         // 0 is passed as readpoint because this method is only used by test
1162         // where StoreFile is directly operated upon
1163         0);
1164     }
1165 
1166     /**
1167      * Get a scanner to scan over this StoreFile.
1168      *
1169      * @param cacheBlocks should this scanner cache blocks?
1170      * @param pread use pread (for highly concurrent small readers)
1171      * @param isCompaction is scanner being used for compaction?
1172      * @return a scanner
1173      */
1174     public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
1175                                                boolean pread,
1176                                                boolean isCompaction, long readPt) {
1177       return new StoreFileScanner(this,
1178                                  getScanner(cacheBlocks, pread, isCompaction),
1179                                  !isCompaction, reader.hasMVCCInfo(), readPt);
1180     }
1181 
1182     /**
1183      * Warning: Do not write further code which depends on this call. Instead
1184      * use getStoreFileScanner() which uses the StoreFileScanner class/interface
1185      * which is the preferred way to scan a store with higher level concepts.
1186      *
1187      * @param cacheBlocks should we cache the blocks?
1188      * @param pread use pread (for concurrent small readers)
1189      * @return the underlying HFileScanner
1190      */
1191     @Deprecated
1192     public HFileScanner getScanner(boolean cacheBlocks, boolean pread) {
1193       return getScanner(cacheBlocks, pread, false);
1194     }
1195 
1196     /**
1197      * Warning: Do not write further code which depends on this call. Instead
1198      * use getStoreFileScanner() which uses the StoreFileScanner class/interface
1199      * which is the preferred way to scan a store with higher level concepts.
1200      *
1201      * @param cacheBlocks
1202      *          should we cache the blocks?
1203      * @param pread
1204      *          use pread (for concurrent small readers)
1205      * @param isCompaction
1206      *          is scanner being used for compaction?
1207      * @return the underlying HFileScanner
1208      */
1209     @Deprecated
1210     public HFileScanner getScanner(boolean cacheBlocks, boolean pread,
1211         boolean isCompaction) {
1212       return reader.getScanner(cacheBlocks, pread, isCompaction);
1213     }
1214 
1215     public void close(boolean evictOnClose) throws IOException {
1216       reader.close(evictOnClose);
1217     }
1218 
1219     /**
1220      * Check if this storeFile may contain keys within the TimeRange that
1221      * have not expired (i.e. not older than oldestUnexpiredTS).
1222      * @param scan the current scan
1223      * @param oldestUnexpiredTS the oldest timestamp that is not expired, as
1224      *          determined by the column family's TTL
1225      * @return false if queried keys definitely don't exist in this StoreFile
1226      */
1227     boolean passesTimerangeFilter(Scan scan, long oldestUnexpiredTS) {
1228       if (timeRangeTracker == null) {
1229         return true;
1230       } else {
1231         return timeRangeTracker.includesTimeRange(scan.getTimeRange()) &&
1232             timeRangeTracker.getMaximumTimestamp() >= oldestUnexpiredTS;
1233       }
1234     }
1235 
1236     /**
1237      * Checks whether the given scan passes the Bloom filter (if present). Only
1238      * checks Bloom filters for single-row or single-row-column scans. Bloom
1239      * filter checking for multi-gets is implemented as part of the store
1240      * scanner system (see {@link StoreFileScanner#seekExactly}) and uses
1241      * the lower-level API {@link #passesGeneralBloomFilter(byte[], int, int, byte[],
1242      * int, int)}.
1243      *
1244      * @param scan the scan specification. Used to determine the row, and to
1245      *          check whether this is a single-row ("get") scan.
1246      * @param columns the set of columns. Only used for row-column Bloom
1247      *          filters.
1248      * @return true if the scan with the given column set passes the Bloom
1249      *         filter, or if the Bloom filter is not applicable for the scan.
1250      *         False if the Bloom filter is applicable and the scan fails it.
1251      */
1252      boolean passesBloomFilter(Scan scan,
1253         final SortedSet<byte[]> columns) {
1254       // Multi-column non-get scans will use Bloom filters through the
1255       // lower-level API function that this function calls.
1256       if (!scan.isGetScan()) {
1257         return true;
1258       }
1259 
1260       byte[] row = scan.getStartRow();
1261       switch (this.bloomFilterType) {
1262         case ROW:
1263           return passesGeneralBloomFilter(row, 0, row.length, null, 0, 0);
1264 
1265         case ROWCOL:
1266           if (columns != null && columns.size() == 1) {
1267             byte[] column = columns.first();
1268             return passesGeneralBloomFilter(row, 0, row.length, column, 0,
1269                 column.length);
1270           }
1271 
1272           // For multi-column queries the Bloom filter is checked from the
1273           // seekExact operation.
1274           return true;
1275 
1276         default:
1277           return true;
1278       }
1279     }
1280 
1281     public boolean passesDeleteFamilyBloomFilter(byte[] row, int rowOffset,
1282         int rowLen) {
1283       // Cache Bloom filter as a local variable in case it is set to null by
1284       // another thread on an IO error.
1285       BloomFilter bloomFilter = this.deleteFamilyBloomFilter;
1286 
1287       // Empty file or there is no delete family at all
1288       if (reader.getTrailer().getEntryCount() == 0 || deleteFamilyCnt == 0) {
1289         return false;
1290       }
1291 
1292       if (bloomFilter == null) {
1293         return true;
1294       }
1295 
1296       try {
1297         if (!bloomFilter.supportsAutoLoading()) {
1298           return true;
1299         }
1300         return bloomFilter.contains(row, rowOffset, rowLen, null);
1301       } catch (IllegalArgumentException e) {
1302         LOG.error("Bad Delete Family bloom filter data -- proceeding without",
1303             e);
1304         setDeleteFamilyBloomFilterFaulty();
1305       }
1306 
1307       return true;
1308     }
1309 
1310     /**
1311      * A method for checking Bloom filters. Called directly from
1312      * StoreFileScanner in case of a multi-column query.
1313      *
1314      * @param row
1315      * @param rowOffset
1316      * @param rowLen
1317      * @param col
1318      * @param colOffset
1319      * @param colLen
1320      * @return True if passes
1321      */
1322     public boolean passesGeneralBloomFilter(byte[] row, int rowOffset,
1323         int rowLen, byte[] col, int colOffset, int colLen) {
1324       // Cache Bloom filter as a local variable in case it is set to null by
1325       // another thread on an IO error.
1326       BloomFilter bloomFilter = this.generalBloomFilter;
1327       if (bloomFilter == null) {
1328         return true;
1329       }
1330 
1331       byte[] key;
1332       switch (bloomFilterType) {
1333         case ROW:
1334           if (col != null) {
1335             throw new RuntimeException("Row-only Bloom filter called with " +
1336                 "column specified");
1337           }
1338           if (rowOffset != 0 || rowLen != row.length) {
1339               throw new AssertionError("For row-only Bloom filters the row "
1340                   + "must occupy the whole array");
1341           }
1342           key = row;
1343           break;
1344 
1345         case ROWCOL:
1346           key = bloomFilter.createBloomKey(row, rowOffset, rowLen, col,
1347               colOffset, colLen);
1348           break;
1349 
1350         default:
1351           return true;
1352       }
1353 
1354       // Empty file
1355       if (reader.getTrailer().getEntryCount() == 0)
1356         return false;
1357 
1358       try {
1359         boolean shouldCheckBloom;
1360         ByteBuffer bloom;
1361         if (bloomFilter.supportsAutoLoading()) {
1362           bloom = null;
1363           shouldCheckBloom = true;
1364         } else {
1365           bloom = reader.getMetaBlock(HFile.BLOOM_FILTER_DATA_KEY,
1366               true);
1367           shouldCheckBloom = bloom != null;
1368         }
1369 
1370         if (shouldCheckBloom) {
1371           boolean exists;
1372 
1373           // Whether the primary Bloom key is greater than the last Bloom key
1374           // from the file info. For row-column Bloom filters this is not yet
1375           // a sufficient condition to return false.
1376           boolean keyIsAfterLast = lastBloomKey != null
1377               && bloomFilter.getComparator().compareFlatKey(key, lastBloomKey) > 0;
1378 
1379           if (bloomFilterType == BloomType.ROWCOL) {
1380             // Since a Row Delete is essentially a DeleteFamily applied to all
1381             // columns, a file might be skipped if using row+col Bloom filter.
1382             // In order to ensure this file is included an additional check is
1383             // required looking only for a row bloom.
1384             byte[] rowBloomKey = bloomFilter.createBloomKey(row, rowOffset, rowLen,
1385                 null, 0, 0);
1386 
1387             if (keyIsAfterLast
1388                 && bloomFilter.getComparator().compareFlatKey(rowBloomKey,
1389                     lastBloomKey) > 0) {
1390               exists = false;
1391             } else {
1392               exists =
1393                   bloomFilter.contains(key, 0, key.length, bloom) ||
1394                   bloomFilter.contains(rowBloomKey, 0, rowBloomKey.length,
1395                       bloom);
1396             }
1397           } else {
1398             exists = !keyIsAfterLast
1399                 && bloomFilter.contains(key, 0, key.length, bloom);
1400           }
1401 
1402           return exists;
1403         }
1404       } catch (IOException e) {
1405         LOG.error("Error reading bloom filter data -- proceeding without",
1406             e);
1407         setGeneralBloomFilterFaulty();
1408       } catch (IllegalArgumentException e) {
1409         LOG.error("Bad bloom filter data -- proceeding without", e);
1410         setGeneralBloomFilterFaulty();
1411       }
1412 
1413       return true;
1414     }
1415 
1416     /**
1417      * Checks whether the given scan rowkey range overlaps with the current storefile's
1418      * @param scan the scan specification. Used to determine the rowkey range.
1419      * @return true if there is overlap, false otherwise
1420      */
1421     public boolean passesKeyRangeFilter(Scan scan) {
1422       if (this.getFirstKey() == null || this.getLastKey() == null) {
1423         // the file is empty
1424         return false;
1425       }
1426       if (Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW)
1427           && Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
1428         return true;
1429       }
1430       KeyValue smallestScanKeyValue = scan.isReversed() ? KeyValueUtil
1431           .createFirstOnRow(scan.getStopRow()) : KeyValueUtil.createFirstOnRow(scan
1432           .getStartRow());
1433       KeyValue largestScanKeyValue = scan.isReversed() ? KeyValueUtil
1434           .createLastOnRow(scan.getStartRow()) : KeyValueUtil.createLastOnRow(scan
1435           .getStopRow());
1436       boolean nonOverLapping = (getComparator().compareFlatKey(
1437           this.getFirstKey(), largestScanKeyValue.getKey()) > 0 && !Bytes
1438           .equals(scan.isReversed() ? scan.getStartRow() : scan.getStopRow(),
1439               HConstants.EMPTY_END_ROW))
1440           || getComparator().compareFlatKey(this.getLastKey(),
1441               smallestScanKeyValue.getKey()) < 0;
1442       return !nonOverLapping;
1443     }
1444 
1445     public Map<byte[], byte[]> loadFileInfo() throws IOException {
1446       Map<byte [], byte []> fi = reader.loadFileInfo();
1447 
1448       byte[] b = fi.get(BLOOM_FILTER_TYPE_KEY);
1449       if (b != null) {
1450         bloomFilterType = BloomType.valueOf(Bytes.toString(b));
1451       }
1452 
1453       lastBloomKey = fi.get(LAST_BLOOM_KEY);
1454       byte[] cnt = fi.get(DELETE_FAMILY_COUNT);
1455       if (cnt != null) {
1456         deleteFamilyCnt = Bytes.toLong(cnt);
1457       }
1458 
1459       return fi;
1460     }
1461 
1462     public void loadBloomfilter() {
1463       this.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
1464       this.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
1465     }
1466 
1467     private void loadBloomfilter(BlockType blockType) {
1468       try {
1469         if (blockType == BlockType.GENERAL_BLOOM_META) {
1470           if (this.generalBloomFilter != null)
1471             return; // Bloom has been loaded
1472 
1473           DataInput bloomMeta = reader.getGeneralBloomFilterMetadata();
1474           if (bloomMeta != null) {
1475             // sanity check for NONE Bloom filter
1476             if (bloomFilterType == BloomType.NONE) {
1477               throw new IOException(
1478                   "valid bloom filter type not found in FileInfo");
1479             } else {
1480               generalBloomFilter = BloomFilterFactory.createFromMeta(bloomMeta,
1481                   reader);
1482               if (LOG.isTraceEnabled()) {
1483                 LOG.trace("Loaded " + bloomFilterType.toString() + " "
1484                   + generalBloomFilter.getClass().getSimpleName()
1485                   + " metadata for " + reader.getName());
1486               }
1487             }
1488           }
1489         } else if (blockType == BlockType.DELETE_FAMILY_BLOOM_META) {
1490           if (this.deleteFamilyBloomFilter != null)
1491             return; // Bloom has been loaded
1492 
1493           DataInput bloomMeta = reader.getDeleteBloomFilterMetadata();
1494           if (bloomMeta != null) {
1495             deleteFamilyBloomFilter = BloomFilterFactory.createFromMeta(
1496                 bloomMeta, reader);
1497             LOG.info("Loaded Delete Family Bloom ("
1498                 + deleteFamilyBloomFilter.getClass().getSimpleName()
1499                 + ") metadata for " + reader.getName());
1500           }
1501         } else {
1502           throw new RuntimeException("Block Type: " + blockType.toString()
1503               + "is not supported for Bloom filter");
1504         }
1505       } catch (IOException e) {
1506         LOG.error("Error reading bloom filter meta for " + blockType
1507             + " -- proceeding without", e);
1508         setBloomFilterFaulty(blockType);
1509       } catch (IllegalArgumentException e) {
1510         LOG.error("Bad bloom filter meta " + blockType
1511             + " -- proceeding without", e);
1512         setBloomFilterFaulty(blockType);
1513       }
1514     }
1515 
1516     private void setBloomFilterFaulty(BlockType blockType) {
1517       if (blockType == BlockType.GENERAL_BLOOM_META) {
1518         setGeneralBloomFilterFaulty();
1519       } else if (blockType == BlockType.DELETE_FAMILY_BLOOM_META) {
1520         setDeleteFamilyBloomFilterFaulty();
1521       }
1522     }
1523 
1524     /**
1525      * The number of Bloom filter entries in this store file, or an estimate
1526      * thereof, if the Bloom filter is not loaded. This always returns an upper
1527      * bound of the number of Bloom filter entries.
1528      *
1529      * @return an estimate of the number of Bloom filter entries in this file
1530      */
1531     public long getFilterEntries() {
1532       return generalBloomFilter != null ? generalBloomFilter.getKeyCount()
1533           : reader.getEntries();
1534     }
1535 
1536     public void setGeneralBloomFilterFaulty() {
1537       generalBloomFilter = null;
1538     }
1539 
1540     public void setDeleteFamilyBloomFilterFaulty() {
1541       this.deleteFamilyBloomFilter = null;
1542     }
1543 
1544     public byte[] getLastKey() {
1545       return reader.getLastKey();
1546     }
1547 
1548     public byte[] getLastRowKey() {
1549       return reader.getLastRowKey();
1550     }
1551 
1552     public byte[] midkey() throws IOException {
1553       return reader.midkey();
1554     }
1555 
1556     public long length() {
1557       return reader.length();
1558     }
1559 
1560     public long getTotalUncompressedBytes() {
1561       return reader.getTrailer().getTotalUncompressedBytes();
1562     }
1563 
1564     public long getEntries() {
1565       return reader.getEntries();
1566     }
1567 
1568     public long getDeleteFamilyCnt() {
1569       return deleteFamilyCnt;
1570     }
1571 
1572     public byte[] getFirstKey() {
1573       return reader.getFirstKey();
1574     }
1575 
1576     public long indexSize() {
1577       return reader.indexSize();
1578     }
1579 
1580     public BloomType getBloomFilterType() {
1581       return this.bloomFilterType;
1582     }
1583 
1584     public long getSequenceID() {
1585       return sequenceID;
1586     }
1587 
1588     public void setSequenceID(long sequenceID) {
1589       this.sequenceID = sequenceID;
1590     }
1591 
1592     public void setBulkLoaded(boolean bulkLoadResult) {
1593       this.bulkLoadResult = bulkLoadResult;
1594     }
1595 
1596     public boolean isBulkLoaded() {
1597       return this.bulkLoadResult;
1598     }
1599 
1600     BloomFilter getGeneralBloomFilter() {
1601       return generalBloomFilter;
1602     }
1603 
1604     long getUncompressedDataIndexSize() {
1605       return reader.getTrailer().getUncompressedDataIndexSize();
1606     }
1607 
1608     public long getTotalBloomSize() {
1609       if (generalBloomFilter == null)
1610         return 0;
1611       return generalBloomFilter.getByteSize();
1612     }
1613 
1614     public int getHFileVersion() {
1615       return reader.getTrailer().getMajorVersion();
1616     }
1617 
1618     public int getHFileMinorVersion() {
1619       return reader.getTrailer().getMinorVersion();
1620     }
1621 
1622     public HFile.Reader getHFileReader() {
1623       return reader;
1624     }
1625 
1626     void disableBloomFilterForTesting() {
1627       generalBloomFilter = null;
1628       this.deleteFamilyBloomFilter = null;
1629     }
1630 
1631     public long getMaxTimestamp() {
1632       return timeRangeTracker == null ? Long.MAX_VALUE : timeRangeTracker.getMaximumTimestamp();
1633     }
1634 
1635     boolean isSkipResetSeqId() {
1636       return skipResetSeqId;
1637     }
1638 
1639     void setSkipResetSeqId(boolean skipResetSeqId) {
1640       this.skipResetSeqId = skipResetSeqId;
1641     }
1642   }
1643 
1644   /**
1645    * Useful comparators for comparing StoreFiles.
1646    */
1647   public abstract static class Comparators {
1648     /**
1649      * Comparator that compares based on the Sequence Ids of the
1650      * the StoreFiles. Bulk loads that did not request a seq ID
1651      * are given a seq id of -1; thus, they are placed before all non-
1652      * bulk loads, and bulk loads with sequence Id. Among these files,
1653      * the size is used to determine the ordering, then bulkLoadTime.
1654      * If there are ties, the path name is used as a tie-breaker.
1655      */
1656     public static final Comparator<StoreFile> SEQ_ID =
1657       Ordering.compound(ImmutableList.of(
1658           Ordering.natural().onResultOf(new GetSeqId()),
1659           Ordering.natural().onResultOf(new GetFileSize()).reverse(),
1660           Ordering.natural().onResultOf(new GetBulkTime()),
1661           Ordering.natural().onResultOf(new GetPathName())
1662       ));
1663 
1664     /**
1665      * Comparator for time-aware compaction. SeqId is still the first
1666      *   ordering criterion to maintain MVCC.
1667      */
1668     public static final Comparator<StoreFile> SEQ_ID_MAX_TIMESTAMP =
1669       Ordering.compound(ImmutableList.of(
1670         Ordering.natural().onResultOf(new GetSeqId()),
1671         Ordering.natural().onResultOf(new GetMaxTimestamp()),
1672         Ordering.natural().onResultOf(new GetFileSize()).reverse(),
1673         Ordering.natural().onResultOf(new GetBulkTime()),
1674         Ordering.natural().onResultOf(new GetPathName())
1675       ));
1676 
1677     private static class GetSeqId implements Function<StoreFile, Long> {
1678       @Override
1679       public Long apply(StoreFile sf) {
1680         return sf.getMaxSequenceId();
1681       }
1682     }
1683 
1684     private static class GetFileSize implements Function<StoreFile, Long> {
1685       @Override
1686       public Long apply(StoreFile sf) {
1687         return sf.getReader().length();
1688       }
1689     }
1690 
1691     private static class GetBulkTime implements Function<StoreFile, Long> {
1692       @Override
1693       public Long apply(StoreFile sf) {
1694         if (!sf.isBulkLoadResult()) return Long.MAX_VALUE;
1695         return sf.getBulkLoadTimestamp();
1696       }
1697     }
1698 
1699     private static class GetPathName implements Function<StoreFile, String> {
1700       @Override
1701       public String apply(StoreFile sf) {
1702         return sf.getPath().getName();
1703       }
1704     }
1705 
1706     private static class GetMaxTimestamp implements Function<StoreFile, Long> {
1707       @Override
1708       public Long apply(StoreFile sf) {
1709         return sf.getMaximumTimestamp() == null? (Long)Long.MAX_VALUE : sf.getMaximumTimestamp();
1710       }
1711     }
1712   }
1713 }