View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.io.hfile;
19  
20  import java.io.ByteArrayOutputStream;
21  import java.io.DataInputStream;
22  import java.io.DataOutput;
23  import java.io.DataOutputStream;
24  import java.io.IOException;
25  import java.io.InputStream;
26  import java.nio.ByteBuffer;
27  import java.util.concurrent.atomic.AtomicReference;
28  import java.util.concurrent.locks.Lock;
29  import java.util.concurrent.locks.ReentrantLock;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.fs.FSDataInputStream;
34  import org.apache.hadoop.fs.FSDataOutputStream;
35  import org.apache.hadoop.fs.Path;
36  import org.apache.hadoop.hbase.Cell;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.classification.InterfaceAudience;
39  import org.apache.hadoop.hbase.fs.HFileSystem;
40  import org.apache.hadoop.hbase.io.ByteBufferInputStream;
41  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
42  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
43  import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
44  import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
45  import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
46  import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
47  import org.apache.hadoop.hbase.util.ByteBufferUtils;
48  import org.apache.hadoop.hbase.util.Bytes;
49  import org.apache.hadoop.hbase.util.ChecksumType;
50  import org.apache.hadoop.hbase.util.ClassSize;
51  import org.apache.hadoop.io.IOUtils;
52  
53  import com.google.common.annotations.VisibleForTesting;
54  import com.google.common.base.Preconditions;
55  
56  /**
57   * Reading {@link HFile} version 1 and 2 blocks, and writing version 2 blocks.
58   * <ul>
59   * <li>In version 1 all blocks are always compressed or uncompressed, as
60   * specified by the {@link HFile}'s compression algorithm, with a type-specific
61   * magic record stored in the beginning of the compressed data (i.e. one needs
62   * to uncompress the compressed block to determine the block type). There is
63   * only a single compression algorithm setting for all blocks. Offset and size
64   * information from the block index are required to read a block.
65   * <li>In version 2 a block is structured as follows:
66   * <ul>
67   * <li>header (see Writer#finishBlock())
68   * <ul>
69   * <li>Magic record identifying the block type (8 bytes)
70   * <li>Compressed block size, excluding header, including checksum (4 bytes)
71   * <li>Uncompressed block size, excluding header, excluding checksum (4 bytes)
72   * <li>The offset of the previous block of the same type (8 bytes). This is
73   * used to be able to navigate to the previous block without going to the block
74   * <li>For minorVersions >=1, the ordinal describing checksum type (1 byte)
75   * <li>For minorVersions >=1, the number of data bytes/checksum chunk (4 bytes)
76   * <li>For minorVersions >=1, the size of data on disk, including header,
77   * excluding checksums (4 bytes)
78   * </ul>
79   * </li>
80   * <li>Raw/Compressed/Encrypted/Encoded data. The compression algorithm is the
81   * same for all the blocks in the {@link HFile}, similarly to what was done in
82   * version 1.
83   * <li>For minorVersions >=1, a series of 4 byte checksums, one each for
84   * the number of bytes specified by bytesPerChecksum.
85   * </ul>
86   * </ul>
87   */
88  @InterfaceAudience.Private
89  public class HFileBlock implements Cacheable {
90    private static final Log LOG = LogFactory.getLog(HFileBlock.class);
91  
92    /**
93     * On a checksum failure on a Reader, these many suceeding read
94     * requests switch back to using hdfs checksums before auto-reenabling
95     * hbase checksum verification.
96     */
97    static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3;
98  
99    public static final boolean FILL_HEADER = true;
100   public static final boolean DONT_FILL_HEADER = false;
101 
102   /**
103    * The size of block header when blockType is {@link BlockType#ENCODED_DATA}.
104    * This extends normal header by adding the id of encoder.
105    */
106   public static final int ENCODED_HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE
107       + DataBlockEncoding.ID_SIZE;
108 
109   static final byte[] DUMMY_HEADER_NO_CHECKSUM =
110      new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
111 
112   public static final int BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
113       ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false);
114 
115   // meta.usesHBaseChecksum+offset+nextBlockOnDiskSizeWithHeader
116   public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT
117       + Bytes.SIZEOF_LONG;
118 
119   /**
120    * Each checksum value is an integer that can be stored in 4 bytes.
121    */
122   static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
123 
124   static final CacheableDeserializer<Cacheable> blockDeserializer =
125       new CacheableDeserializer<Cacheable>() {
126         public HFileBlock deserialize(ByteBuffer buf, boolean reuse) throws IOException{
127           buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
128           ByteBuffer newByteBuffer;
129           if (reuse) {
130             newByteBuffer = buf.slice();
131           } else {
132            newByteBuffer = ByteBuffer.allocate(buf.limit());
133            newByteBuffer.put(buf);
134           }
135           buf.position(buf.limit());
136           buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
137           boolean usesChecksum = buf.get() == (byte)1;
138           HFileBlock hFileBlock = new HFileBlock(newByteBuffer, usesChecksum);
139           hFileBlock.offset = buf.getLong();
140           hFileBlock.nextBlockOnDiskSizeWithHeader = buf.getInt();
141           if (hFileBlock.hasNextBlockHeader()) {
142             hFileBlock.buf.limit(hFileBlock.buf.limit() - hFileBlock.headerSize());
143           }
144           return hFileBlock;
145         }
146 
147         @Override
148         public int getDeserialiserIdentifier() {
149           return deserializerIdentifier;
150         }
151 
152         @Override
153         public HFileBlock deserialize(ByteBuffer b) throws IOException {
154           return deserialize(b, false);
155         }
156       };
157   private static final int deserializerIdentifier;
158   static {
159     deserializerIdentifier = CacheableDeserializerIdManager
160         .registerDeserializer(blockDeserializer);
161   }
162 
163   /** Type of block. Header field 0. */
164   private BlockType blockType;
165 
166   /** Size on disk excluding header, including checksum. Header field 1. */
167   private int onDiskSizeWithoutHeader;
168 
169   /** Size of pure data. Does not include header or checksums. Header field 2. */
170   private final int uncompressedSizeWithoutHeader;
171 
172   /** The offset of the previous block on disk. Header field 3. */
173   private final long prevBlockOffset;
174 
175   /**
176    * Size on disk of header + data. Excludes checksum. Header field 6,
177    * OR calculated from {@link #onDiskSizeWithoutHeader} when using HDFS checksum.
178    */
179   private final int onDiskDataSizeWithHeader;
180 
181   /** The in-memory representation of the hfile block */
182   private ByteBuffer buf;
183 
184   /** Meta data that holds meta information on the hfileblock */
185   private HFileContext fileContext;
186 
187   /**
188    * The offset of this block in the file. Populated by the reader for
189    * convenience of access. This offset is not part of the block header.
190    */
191   private long offset = -1;
192 
193   /**
194    * The on-disk size of the next block, including the header, obtained by
195    * peeking into the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the next block's
196    * header, or -1 if unknown.
197    */
198   private int nextBlockOnDiskSizeWithHeader = -1;
199 
200   /**
201    * Creates a new {@link HFile} block from the given fields. This constructor
202    * is mostly used when the block data has already been read and uncompressed,
203    * and is sitting in a byte buffer.
204    *
205    * @param blockType the type of this block, see {@link BlockType}
206    * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader}
207    * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader}
208    * @param prevBlockOffset see {@link #prevBlockOffset}
209    * @param buf block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) followed by
210    *          uncompressed data. This
211    * @param fillHeader when true, parse {@code buf} and override the first 4 header fields.
212    * @param offset the file offset the block was read from
213    * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader}
214    * @param fileContext HFile meta data
215    */
216   HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader,
217       long prevBlockOffset, ByteBuffer buf, boolean fillHeader, long offset,
218       int onDiskDataSizeWithHeader, HFileContext fileContext) {
219     this.blockType = blockType;
220     this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
221     this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
222     this.prevBlockOffset = prevBlockOffset;
223     this.buf = buf;
224     this.offset = offset;
225     this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
226     this.fileContext = fileContext;
227     if (fillHeader)
228       overwriteHeader();
229     this.buf.rewind();
230   }
231 
232   /**
233    * Copy constructor. Creates a shallow copy of {@code that}'s buffer.
234    */
235   HFileBlock(HFileBlock that) {
236     this.blockType = that.blockType;
237     this.onDiskSizeWithoutHeader = that.onDiskSizeWithoutHeader;
238     this.uncompressedSizeWithoutHeader = that.uncompressedSizeWithoutHeader;
239     this.prevBlockOffset = that.prevBlockOffset;
240     this.buf = that.buf.duplicate();
241     this.offset = that.offset;
242     this.onDiskDataSizeWithHeader = that.onDiskDataSizeWithHeader;
243     this.fileContext = that.fileContext;
244     this.nextBlockOnDiskSizeWithHeader = that.nextBlockOnDiskSizeWithHeader;
245   }
246 
247   /**
248    * Creates a block from an existing buffer starting with a header. Rewinds
249    * and takes ownership of the buffer. By definition of rewind, ignores the
250    * buffer position, but if you slice the buffer beforehand, it will rewind
251    * to that point. The reason this has a minorNumber and not a majorNumber is
252    * because majorNumbers indicate the format of a HFile whereas minorNumbers
253    * indicate the format inside a HFileBlock.
254    */
255   HFileBlock(ByteBuffer b, boolean usesHBaseChecksum) throws IOException {
256     b.rewind();
257     blockType = BlockType.read(b);
258     onDiskSizeWithoutHeader = b.getInt();
259     uncompressedSizeWithoutHeader = b.getInt();
260     prevBlockOffset = b.getLong();
261     HFileContextBuilder contextBuilder = new HFileContextBuilder();
262     contextBuilder.withHBaseCheckSum(usesHBaseChecksum);
263     if (usesHBaseChecksum) {
264       contextBuilder.withChecksumType(ChecksumType.codeToType(b.get()));
265       contextBuilder.withBytesPerCheckSum(b.getInt());
266       this.onDiskDataSizeWithHeader = b.getInt();
267     } else {
268       contextBuilder.withChecksumType(ChecksumType.NULL);
269       contextBuilder.withBytesPerCheckSum(0);
270       this.onDiskDataSizeWithHeader = onDiskSizeWithoutHeader +
271                                        HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
272     }
273     this.fileContext = contextBuilder.build();
274     buf = b;
275     buf.rewind();
276   }
277 
278   public BlockType getBlockType() {
279     return blockType;
280   }
281 
282   /** @return get data block encoding id that was used to encode this block */
283   public short getDataBlockEncodingId() {
284     if (blockType != BlockType.ENCODED_DATA) {
285       throw new IllegalArgumentException("Querying encoder ID of a block " +
286           "of type other than " + BlockType.ENCODED_DATA + ": " + blockType);
287     }
288     return buf.getShort(headerSize());
289   }
290 
291   /**
292    * @return the on-disk size of header + data part + checksum.
293    */
294   public int getOnDiskSizeWithHeader() {
295     return onDiskSizeWithoutHeader + headerSize();
296   }
297 
298   /**
299    * @return the on-disk size of the data part + checksum (header excluded).
300    */
301   public int getOnDiskSizeWithoutHeader() {
302     return onDiskSizeWithoutHeader;
303   }
304 
305   /**
306    * @return the uncompressed size of data part (header and checksum excluded).
307    */
308    public int getUncompressedSizeWithoutHeader() {
309     return uncompressedSizeWithoutHeader;
310   }
311 
312   /**
313    * @return the offset of the previous block of the same type in the file, or
314    *         -1 if unknown
315    */
316   public long getPrevBlockOffset() {
317     return prevBlockOffset;
318   }
319 
320   /**
321    * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position
322    * is modified as side-effect.
323    */
324   private void overwriteHeader() {
325     buf.rewind();
326     blockType.write(buf);
327     buf.putInt(onDiskSizeWithoutHeader);
328     buf.putInt(uncompressedSizeWithoutHeader);
329     buf.putLong(prevBlockOffset);
330     if (this.fileContext.isUseHBaseChecksum()) {
331       buf.put(fileContext.getChecksumType().getCode());
332       buf.putInt(fileContext.getBytesPerChecksum());
333       buf.putInt(onDiskDataSizeWithHeader);
334     }
335   }
336 
337   /**
338    * Returns a buffer that does not include the header or checksum.
339    *
340    * @return the buffer with header skipped and checksum omitted.
341    */
342   public ByteBuffer getBufferWithoutHeader() {
343     ByteBuffer dup = this.buf.duplicate();
344     dup.position(headerSize());
345     dup.limit(buf.limit() - totalChecksumBytes());
346     return dup.slice();
347   }
348 
349   /**
350    * Returns the buffer this block stores internally. The clients must not
351    * modify the buffer object. This method has to be public because it is
352    * used in {@link org.apache.hadoop.hbase.util.CompoundBloomFilter} 
353    * to avoid object creation on every Bloom filter lookup, but has to 
354    * be used with caution. Checksum data is not included in the returned 
355    * buffer but header data is.
356    *
357    * @return the buffer of this block for read-only operations
358    */
359   public ByteBuffer getBufferReadOnly() {
360     ByteBuffer dup = this.buf.duplicate();
361     dup.limit(buf.limit() - totalChecksumBytes());
362     return dup.slice();
363   }
364 
365   /**
366    * Returns the buffer of this block, including header data. The clients must
367    * not modify the buffer object. This method has to be public because it is
368    * used in {@link org.apache.hadoop.hbase.io.hfile.bucket.BucketCache} to avoid buffer copy.
369    *
370    * @return the buffer with header and checksum included for read-only operations
371    */
372   public ByteBuffer getBufferReadOnlyWithHeader() {
373     ByteBuffer dup = this.buf.duplicate();
374     return dup.slice();
375   }
376 
377   /**
378    * Returns a byte buffer of this block, including header data and checksum, positioned at
379    * the beginning of header. The underlying data array is not copied.
380    *
381    * @return the byte buffer with header and checksum included
382    */
383   ByteBuffer getBufferWithHeader() {
384     ByteBuffer dupBuf = buf.duplicate();
385     dupBuf.rewind();
386     return dupBuf;
387   }
388 
389   private void sanityCheckAssertion(long valueFromBuf, long valueFromField,
390       String fieldName) throws IOException {
391     if (valueFromBuf != valueFromField) {
392       throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf
393           + ") is different from that in the field (" + valueFromField + ")");
394     }
395   }
396 
397   private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField)
398       throws IOException {
399     if (valueFromBuf != valueFromField) {
400       throw new IOException("Block type stored in the buffer: " +
401         valueFromBuf + ", block type field: " + valueFromField);
402     }
403   }
404 
405   /**
406    * Checks if the block is internally consistent, i.e. the first
407    * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a
408    * valid header consistent with the fields. Assumes a packed block structure.
409    * This function is primary for testing and debugging, and is not
410    * thread-safe, because it alters the internal buffer pointer.
411    */
412   void sanityCheck() throws IOException {
413     buf.rewind();
414 
415     sanityCheckAssertion(BlockType.read(buf), blockType);
416 
417     sanityCheckAssertion(buf.getInt(), onDiskSizeWithoutHeader,
418         "onDiskSizeWithoutHeader");
419 
420     sanityCheckAssertion(buf.getInt(), uncompressedSizeWithoutHeader,
421         "uncompressedSizeWithoutHeader");
422 
423     sanityCheckAssertion(buf.getLong(), prevBlockOffset, "prevBlocKOffset");
424     if (this.fileContext.isUseHBaseChecksum()) {
425       sanityCheckAssertion(buf.get(), this.fileContext.getChecksumType().getCode(), "checksumType");
426       sanityCheckAssertion(buf.getInt(), this.fileContext.getBytesPerChecksum(), "bytesPerChecksum");
427       sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader");
428     }
429 
430     int cksumBytes = totalChecksumBytes();
431     int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes;
432     if (buf.limit() != expectedBufLimit) {
433       throw new AssertionError("Expected buffer limit " + expectedBufLimit
434           + ", got " + buf.limit());
435     }
436 
437     // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next
438     // block's header, so there are two sensible values for buffer capacity.
439     int hdrSize = headerSize();
440     if (buf.capacity() != expectedBufLimit &&
441         buf.capacity() != expectedBufLimit + hdrSize) {
442       throw new AssertionError("Invalid buffer capacity: " + buf.capacity() +
443           ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize));
444     }
445   }
446 
447   @Override
448   public String toString() {
449     StringBuilder sb = new StringBuilder()
450       .append("HFileBlock [")
451       .append(" fileOffset=").append(offset)
452       .append(" headerSize()=").append(headerSize())
453       .append(" blockType=").append(blockType)
454       .append(" onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader)
455       .append(" uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader)
456       .append(" prevBlockOffset=").append(prevBlockOffset)
457       .append(" isUseHBaseChecksum()=").append(fileContext.isUseHBaseChecksum());
458     if (fileContext.isUseHBaseChecksum()) {
459       sb.append(" checksumType=").append(ChecksumType.codeToType(this.buf.get(24)))
460         .append(" bytesPerChecksum=").append(this.buf.getInt(24 + 1))
461         .append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader);
462     } else {
463       sb.append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader)
464         .append("(").append(onDiskSizeWithoutHeader)
465         .append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
466     }
467     String dataBegin = null;
468     if (buf.hasArray()) {
469       dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
470           Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
471     } else {
472       ByteBuffer bufWithoutHeader = getBufferWithoutHeader();
473       byte[] dataBeginBytes = new byte[Math.min(32,
474           bufWithoutHeader.limit() - bufWithoutHeader.position())];
475       bufWithoutHeader.get(dataBeginBytes);
476       dataBegin = Bytes.toStringBinary(dataBeginBytes);
477     }
478     sb.append(" getOnDiskSizeWithHeader()=").append(getOnDiskSizeWithHeader())
479       .append(" totalChecksumBytes()=").append(totalChecksumBytes())
480       .append(" isUnpacked()=").append(isUnpacked())
481       .append(" buf=[ ").append(buf).append(" ]")
482       .append(" dataBeginsWith=").append(dataBegin)
483       .append(" fileContext=").append(fileContext)
484       .append(" ]");
485     return sb.toString();
486   }
487 
488   /**
489    * Called after reading a block with provided onDiskSizeWithHeader.
490    */
491   private void validateOnDiskSizeWithoutHeader(int expectedOnDiskSizeWithoutHeader)
492   throws IOException {
493     if (onDiskSizeWithoutHeader != expectedOnDiskSizeWithoutHeader) {
494       String dataBegin = null;
495       if (buf.hasArray()) {
496         dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset(), Math.min(32, buf.limit()));
497       } else {
498         ByteBuffer bufDup = getBufferReadOnly();
499         byte[] dataBeginBytes = new byte[Math.min(32, bufDup.limit() - bufDup.position())];
500         bufDup.get(dataBeginBytes);
501         dataBegin = Bytes.toStringBinary(dataBeginBytes);
502       }
503       String blockInfoMsg =
504         "Block offset: " + offset + ", data starts with: " + dataBegin;
505       throw new IOException("On-disk size without header provided is "
506           + expectedOnDiskSizeWithoutHeader + ", but block "
507           + "header contains " + onDiskSizeWithoutHeader + ". " +
508           blockInfoMsg);
509     }
510   }
511 
512   /**
513    * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its
514    * encoded structure. Internal structures are shared between instances where applicable.
515    */
516   HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException {
517     if (!fileContext.isCompressedOrEncrypted()) {
518       // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean),
519       // which is used for block serialization to L2 cache, does not preserve encoding and
520       // encryption details.
521       return this;
522     }
523 
524     HFileBlock unpacked = new HFileBlock(this);
525     unpacked.allocateBuffer(); // allocates space for the decompressed block
526 
527     HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ?
528       reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
529 
530     ByteBuffer dup = this.buf.duplicate();
531     dup.position(this.headerSize());
532     dup = dup.slice();
533     ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
534       unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(),
535       dup);
536 
537     // Preserve the next block's header bytes in the new block if we have them.
538     if (unpacked.hasNextBlockHeader()) {
539       // Both the buffers are limited till checksum bytes and avoid the next block's header.
540       // Below call to copyFromBufferToBuffer() will try positional read/write from/to buffers when
541       // any of the buffer is DBB. So we change the limit on a dup buffer. No copying just create
542       // new BB objects
543       ByteBuffer inDup = this.buf.duplicate();
544       inDup.limit(inDup.limit() + headerSize());
545       ByteBuffer outDup = unpacked.buf.duplicate();
546       outDup.limit(outDup.limit() + unpacked.headerSize());
547       ByteBufferUtils.copyFromBufferToBuffer(
548           outDup,
549           inDup,
550           this.onDiskDataSizeWithHeader,
551           unpacked.headerSize() + unpacked.uncompressedSizeWithoutHeader
552               + unpacked.totalChecksumBytes(), unpacked.headerSize());
553     }
554     return unpacked;
555   }
556 
557   /**
558    * Return true when this buffer includes next block's header.
559    */
560   private boolean hasNextBlockHeader() {
561     return nextBlockOnDiskSizeWithHeader > 0;
562   }
563 
564   /**
565    * Always allocates a new buffer of the correct size. Copies header bytes
566    * from the existing buffer. Does not change header fields.
567    * Reserve room to keep checksum bytes too.
568    */
569   private void allocateBuffer() {
570     int cksumBytes = totalChecksumBytes();
571     int headerSize = headerSize();
572     int capacityNeeded = headerSize + uncompressedSizeWithoutHeader +
573         cksumBytes + (hasNextBlockHeader() ? headerSize : 0);
574 
575     // TODO we need consider allocating offheap here?
576     ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded);
577 
578     // Copy header bytes into newBuf.
579     // newBuf is HBB so no issue in calling array()
580     ByteBuffer dup = buf.duplicate();
581     dup.position(0);
582     dup.get(newBuf.array(), newBuf.arrayOffset(), headerSize);
583 
584     buf = newBuf;
585     // set limit to exclude next block's header
586     buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes);
587   }
588 
589   /**
590    * Return true when this block's buffer has been unpacked, false otherwise. Note this is a
591    * calculated heuristic, not tracked attribute of the block.
592    */
593   public boolean isUnpacked() {
594     final int cksumBytes = totalChecksumBytes();
595     final int headerSize = headerSize();
596     final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
597     final int bufCapacity = buf.capacity();
598     return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
599   }
600 
601   /** An additional sanity-check in case no compression or encryption is being used. */
602   public void assumeUncompressed() throws IOException {
603     if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader +
604         totalChecksumBytes()) {
605       throw new IOException("Using no compression but "
606           + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
607           + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader
608           + ", numChecksumbytes=" + totalChecksumBytes());
609     }
610   }
611 
612   /**
613    * @param expectedType the expected type of this block
614    * @throws IOException if this block's type is different than expected
615    */
616   public void expectType(BlockType expectedType) throws IOException {
617     if (blockType != expectedType) {
618       throw new IOException("Invalid block type: expected=" + expectedType
619           + ", actual=" + blockType);
620     }
621   }
622 
623   /** @return the offset of this block in the file it was read from */
624   public long getOffset() {
625     if (offset < 0) {
626       throw new IllegalStateException(
627           "HFile block offset not initialized properly");
628     }
629     return offset;
630   }
631 
632   /**
633    * @return a byte stream reading the data + checksum of this block
634    */
635   public DataInputStream getByteStream() {
636     ByteBuffer dup = this.buf.duplicate();
637     dup.position(this.headerSize());
638     return new DataInputStream(new ByteBufferInputStream(dup));
639   }
640 
641   @Override
642   public long heapSize() {
643     long size = ClassSize.align(
644         ClassSize.OBJECT +
645         // Block type, byte buffer and meta references
646         3 * ClassSize.REFERENCE +
647         // On-disk size, uncompressed size, and next block's on-disk size
648         // bytePerChecksum and onDiskDataSize
649         4 * Bytes.SIZEOF_INT +
650         // This and previous block offset
651         2 * Bytes.SIZEOF_LONG +
652         // Heap size of the meta object. meta will be always not null.
653         fileContext.heapSize()
654     );
655 
656     if (buf != null) {
657       // Deep overhead of the byte buffer. Needs to be aligned separately.
658       size += ClassSize.align(buf.capacity() + BYTE_BUFFER_HEAP_SIZE);
659     }
660 
661     return ClassSize.align(size);
662   }
663 
664   /**
665    * Read from an input stream. Analogous to
666    * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a
667    * number of "extra" bytes that would be desirable but not absolutely
668    * necessary to read.
669    *
670    * @param in the input stream to read from
671    * @param buf the buffer to read into
672    * @param bufOffset the destination offset in the buffer
673    * @param necessaryLen the number of bytes that are absolutely necessary to
674    *          read
675    * @param extraLen the number of extra bytes that would be nice to read
676    * @return true if succeeded reading the extra bytes
677    * @throws IOException if failed to read the necessary bytes
678    */
679   public static boolean readWithExtra(InputStream in, byte[] buf,
680       int bufOffset, int necessaryLen, int extraLen) throws IOException {
681     int bytesRemaining = necessaryLen + extraLen;
682     while (bytesRemaining > 0) {
683       int ret = in.read(buf, bufOffset, bytesRemaining);
684       if (ret == -1 && bytesRemaining <= extraLen) {
685         // We could not read the "extra data", but that is OK.
686         break;
687       }
688 
689       if (ret < 0) {
690         throw new IOException("Premature EOF from inputStream (read "
691             + "returned " + ret + ", was trying to read " + necessaryLen
692             + " necessary bytes and " + extraLen + " extra bytes, "
693             + "successfully read "
694             + (necessaryLen + extraLen - bytesRemaining));
695       }
696       bufOffset += ret;
697       bytesRemaining -= ret;
698     }
699     return bytesRemaining <= 0;
700   }
701 
702   /**
703    * Read from an input stream. Analogous to
704    * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses
705    * positional read and specifies a number of "extra" bytes that would be
706    * desirable but not absolutely necessary to read.
707    *
708    * @param in the input stream to read from
709    * @param position the position within the stream from which to start reading
710    * @param buf the buffer to read into
711    * @param bufOffset the destination offset in the buffer
712    * @param necessaryLen the number of bytes that are absolutely necessary to
713    *     read
714    * @param extraLen the number of extra bytes that would be nice to read
715    * @return true if and only if extraLen is > 0 and reading those extra bytes
716    *     was successful
717    * @throws IOException if failed to read the necessary bytes
718    */
719   @VisibleForTesting
720   static boolean positionalReadWithExtra(FSDataInputStream in,
721       long position, byte[] buf, int bufOffset, int necessaryLen, int extraLen)
722       throws IOException {
723     int bytesRemaining = necessaryLen + extraLen;
724     int bytesRead = 0;
725     while (bytesRead < necessaryLen) {
726       int ret = in.read(position, buf, bufOffset, bytesRemaining);
727       if (ret < 0) {
728         throw new IOException("Premature EOF from inputStream (positional read "
729             + "returned " + ret + ", was trying to read " + necessaryLen
730             + " necessary bytes and " + extraLen + " extra bytes, "
731             + "successfully read " + bytesRead);
732       }
733       position += ret;
734       bufOffset += ret;
735       bytesRemaining -= ret;
736       bytesRead += ret;
737     }
738     return bytesRead != necessaryLen && bytesRemaining <= 0;
739   }
740 
741   /**
742    * @return the on-disk size of the next block (including the header size)
743    *         that was read by peeking into the next block's header
744    */
745   public int getNextBlockOnDiskSizeWithHeader() {
746     return nextBlockOnDiskSizeWithHeader;
747   }
748 
749   /**
750    * Unified version 2 {@link HFile} block writer. The intended usage pattern
751    * is as follows:
752    * <ol>
753    * <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm.
754    * <li>Call {@link Writer#startWriting} and get a data stream to write to.
755    * <li>Write your data into the stream.
756    * <li>Call {@link Writer#writeHeaderAndData(FSDataOutputStream)} as many times as you need to.
757    * store the serialized block into an external stream.
758    * <li>Repeat to write more blocks.
759    * </ol>
760    * <p>
761    */
762   public static class Writer {
763 
764     private enum State {
765       INIT,
766       WRITING,
767       BLOCK_READY
768     };
769 
770     /** Writer state. Used to ensure the correct usage protocol. */
771     private State state = State.INIT;
772 
773     /** Data block encoder used for data blocks */
774     private final HFileDataBlockEncoder dataBlockEncoder;
775 
776     private HFileBlockEncodingContext dataBlockEncodingCtx;
777 
778     /** block encoding context for non-data blocks */
779     private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
780 
781     /**
782      * The stream we use to accumulate data in uncompressed format for each
783      * block. We reset this stream at the end of each block and reuse it. The
784      * header is written as the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this
785      * stream.
786      */
787     private ByteArrayOutputStream baosInMemory;
788 
789     /**
790      * Current block type. Set in {@link #startWriting(BlockType)}. Could be
791      * changed in {@link #finishBlock()} from {@link BlockType#DATA}
792      * to {@link BlockType#ENCODED_DATA}.
793      */
794     private BlockType blockType;
795 
796     /**
797      * A stream that we write uncompressed bytes to, which compresses them and
798      * writes them to {@link #baosInMemory}.
799      */
800     private DataOutputStream userDataStream;
801 
802     // Size of actual data being written. Not considering the block encoding/compression. This
803     // includes the header size also.
804     private int unencodedDataSizeWritten;
805 
806     /**
807      * Bytes to be written to the file system, including the header. Compressed
808      * if compression is turned on. It also includes the checksum data that
809      * immediately follows the block data. (header + data + checksums)
810      */
811     private byte[] onDiskBytesWithHeader;
812 
813     /**
814      * The size of the checksum data on disk. It is used only if data is
815      * not compressed. If data is compressed, then the checksums are already
816      * part of onDiskBytesWithHeader. If data is uncompressed, then this
817      * variable stores the checksum data for this block.
818      */
819     private byte[] onDiskChecksum;
820 
821     /**
822      * Valid in the READY state. Contains the header and the uncompressed (but
823      * potentially encoded, if this is a data block) bytes, so the length is
824      * {@link #uncompressedSizeWithoutHeader} +
825      * {@link org.apache.hadoop.hbase.HConstants#HFILEBLOCK_HEADER_SIZE}.
826      * Does not store checksums.
827      */
828     private byte[] uncompressedBytesWithHeader;
829 
830     /**
831      * Current block's start offset in the {@link HFile}. Set in
832      * {@link #writeHeaderAndData(FSDataOutputStream)}.
833      */
834     private long startOffset;
835 
836     /**
837      * Offset of previous block by block type. Updated when the next block is
838      * started.
839      */
840     private long[] prevOffsetByType;
841 
842     /** The offset of the previous block of the same type */
843     private long prevOffset;
844     /** Meta data that holds information about the hfileblock**/
845     private HFileContext fileContext;
846 
847     /**
848      * @param dataBlockEncoder data block encoding algorithm to use
849      */
850     public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) {
851       this.dataBlockEncoder = dataBlockEncoder != null
852           ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
853       defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null,
854           HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
855       dataBlockEncodingCtx = this.dataBlockEncoder
856           .newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
857 
858       if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
859         throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
860             " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " +
861             fileContext.getBytesPerChecksum());
862       }
863 
864       baosInMemory = new ByteArrayOutputStream();
865 
866       prevOffsetByType = new long[BlockType.values().length];
867       for (int i = 0; i < prevOffsetByType.length; ++i)
868         prevOffsetByType[i] = -1;
869 
870       this.fileContext = fileContext;
871     }
872 
873     /**
874      * Starts writing into the block. The previous block's data is discarded.
875      *
876      * @return the stream the user can write their data into
877      * @throws IOException
878      */
879     public DataOutputStream startWriting(BlockType newBlockType)
880         throws IOException {
881       if (state == State.BLOCK_READY && startOffset != -1) {
882         // We had a previous block that was written to a stream at a specific
883         // offset. Save that offset as the last offset of a block of that type.
884         prevOffsetByType[blockType.getId()] = startOffset;
885       }
886 
887       startOffset = -1;
888       blockType = newBlockType;
889 
890       baosInMemory.reset();
891       baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
892 
893       state = State.WRITING;
894 
895       // We will compress it later in finishBlock()
896       userDataStream = new DataOutputStream(baosInMemory);
897       if (newBlockType == BlockType.DATA) {
898         this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
899       }
900       this.unencodedDataSizeWritten = 0;
901       return userDataStream;
902     }
903 
904     /**
905      * Writes the Cell to this block
906      * @param cell
907      * @throws IOException
908      */
909     public void write(Cell cell) throws IOException{
910       expectState(State.WRITING);
911       this.unencodedDataSizeWritten += this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx,
912           this.userDataStream);
913     }
914 
915     /**
916      * Returns the stream for the user to write to. The block writer takes care
917      * of handling compression and buffering for caching on write. Can only be
918      * called in the "writing" state.
919      *
920      * @return the data output stream for the user to write to
921      */
922     DataOutputStream getUserDataStream() {
923       expectState(State.WRITING);
924       return userDataStream;
925     }
926 
927     /**
928      * Transitions the block writer from the "writing" state to the "block
929      * ready" state.  Does nothing if a block is already finished.
930      */
931     void ensureBlockReady() throws IOException {
932       Preconditions.checkState(state != State.INIT,
933           "Unexpected state: " + state);
934 
935       if (state == State.BLOCK_READY)
936         return;
937 
938       // This will set state to BLOCK_READY.
939       finishBlock();
940     }
941 
942     /**
943      * An internal method that flushes the compressing stream (if using
944      * compression), serializes the header, and takes care of the separate
945      * uncompressed stream for caching on write, if applicable. Sets block
946      * write state to "block ready".
947      */
948     private void finishBlock() throws IOException {
949       if (blockType == BlockType.DATA) {
950         BufferGrabbingByteArrayOutputStream baosInMemoryCopy =
951             new BufferGrabbingByteArrayOutputStream();
952         baosInMemory.writeTo(baosInMemoryCopy);
953         this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
954             baosInMemoryCopy.buf, blockType);
955         blockType = dataBlockEncodingCtx.getBlockType();
956       }
957       userDataStream.flush();
958       // This does an array copy, so it is safe to cache this byte array.
959       uncompressedBytesWithHeader = baosInMemory.toByteArray();
960       prevOffset = prevOffsetByType[blockType.getId()];
961 
962       // We need to set state before we can package the block up for
963       // cache-on-write. In a way, the block is ready, but not yet encoded or
964       // compressed.
965       state = State.BLOCK_READY;
966       if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
967         onDiskBytesWithHeader = dataBlockEncodingCtx
968             .compressAndEncrypt(uncompressedBytesWithHeader);
969       } else {
970         onDiskBytesWithHeader = defaultBlockEncodingCtx
971             .compressAndEncrypt(uncompressedBytesWithHeader);
972       }
973       int numBytes = (int) ChecksumUtil.numBytes(
974           onDiskBytesWithHeader.length,
975           fileContext.getBytesPerChecksum());
976 
977       // put the header for on disk bytes
978       putHeader(onDiskBytesWithHeader, 0,
979           onDiskBytesWithHeader.length + numBytes,
980           uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
981       // set the header for the uncompressed bytes (for cache-on-write)
982       putHeader(uncompressedBytesWithHeader, 0,
983           onDiskBytesWithHeader.length + numBytes,
984           uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
985 
986       onDiskChecksum = new byte[numBytes];
987       ChecksumUtil.generateChecksums(
988           onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
989           onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum());
990     }
991 
992     public static class BufferGrabbingByteArrayOutputStream extends ByteArrayOutputStream {
993       private byte[] buf;
994 
995       @Override
996       public void write(byte[] b, int off, int len) {
997         this.buf = b;
998       }
999 
1000       public byte[] getBuffer() {
1001         return this.buf;
1002       }
1003     }
1004 
1005     /**
1006      * Put the header into the given byte array at the given offset.
1007      * @param onDiskSize size of the block on disk header + data + checksum
1008      * @param uncompressedSize size of the block after decompression (but
1009      *          before optional data block decoding) including header
1010      * @param onDiskDataSize size of the block on disk with header
1011      *        and data but not including the checksums
1012      */
1013     private void putHeader(byte[] dest, int offset, int onDiskSize,
1014         int uncompressedSize, int onDiskDataSize) {
1015       offset = blockType.put(dest, offset);
1016       offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1017       offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1018       offset = Bytes.putLong(dest, offset, prevOffset);
1019       offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode());
1020       offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum());
1021       Bytes.putInt(dest, offset, onDiskDataSize);
1022     }
1023 
1024     /**
1025      * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records
1026      * the offset of this block so that it can be referenced in the next block
1027      * of the same type.
1028      *
1029      * @param out
1030      * @throws IOException
1031      */
1032     public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
1033       long offset = out.getPos();
1034       if (startOffset != -1 && offset != startOffset) {
1035         throw new IOException("A " + blockType + " block written to a "
1036             + "stream twice, first at offset " + startOffset + ", then at "
1037             + offset);
1038       }
1039       startOffset = offset;
1040 
1041       finishBlockAndWriteHeaderAndData((DataOutputStream) out);
1042     }
1043 
1044     /**
1045      * Writes the header and the compressed data of this block (or uncompressed
1046      * data when not using compression) into the given stream. Can be called in
1047      * the "writing" state or in the "block ready" state. If called in the
1048      * "writing" state, transitions the writer to the "block ready" state.
1049      *
1050      * @param out the output stream to write the
1051      * @throws IOException
1052      */
1053     protected void finishBlockAndWriteHeaderAndData(DataOutputStream out)
1054       throws IOException {
1055       ensureBlockReady();
1056       long startTime = System.currentTimeMillis();
1057       out.write(onDiskBytesWithHeader);
1058       out.write(onDiskChecksum);
1059       HFile.updateWriteLatency(System.currentTimeMillis() - startTime);
1060     }
1061 
1062     /**
1063      * Returns the header or the compressed data (or uncompressed data when not
1064      * using compression) as a byte array. Can be called in the "writing" state
1065      * or in the "block ready" state. If called in the "writing" state,
1066      * transitions the writer to the "block ready" state. This returns
1067      * the header + data + checksums stored on disk.
1068      *
1069      * @return header and data as they would be stored on disk in a byte array
1070      * @throws IOException
1071      */
1072     byte[] getHeaderAndDataForTest() throws IOException {
1073       ensureBlockReady();
1074       // This is not very optimal, because we are doing an extra copy.
1075       // But this method is used only by unit tests.
1076       byte[] output =
1077           new byte[onDiskBytesWithHeader.length
1078               + onDiskChecksum.length];
1079       System.arraycopy(onDiskBytesWithHeader, 0, output, 0,
1080           onDiskBytesWithHeader.length);
1081       System.arraycopy(onDiskChecksum, 0, output,
1082           onDiskBytesWithHeader.length, onDiskChecksum.length);
1083       return output;
1084     }
1085 
1086     /**
1087      * Releases resources used by this writer.
1088      */
1089     public void release() {
1090       if (dataBlockEncodingCtx != null) {
1091         dataBlockEncodingCtx.close();
1092         dataBlockEncodingCtx = null;
1093       }
1094       if (defaultBlockEncodingCtx != null) {
1095         defaultBlockEncodingCtx.close();
1096         defaultBlockEncodingCtx = null;
1097       }
1098     }
1099 
1100     /**
1101      * Returns the on-disk size of the data portion of the block. This is the
1102      * compressed size if compression is enabled. Can only be called in the
1103      * "block ready" state. Header is not compressed, and its size is not
1104      * included in the return value.
1105      *
1106      * @return the on-disk size of the block, not including the header.
1107      */
1108     int getOnDiskSizeWithoutHeader() {
1109       expectState(State.BLOCK_READY);
1110       return onDiskBytesWithHeader.length
1111           + onDiskChecksum.length
1112           - HConstants.HFILEBLOCK_HEADER_SIZE;
1113     }
1114 
1115     /**
1116      * Returns the on-disk size of the block. Can only be called in the
1117      * "block ready" state.
1118      *
1119      * @return the on-disk size of the block ready to be written, including the
1120      *         header size, the data and the checksum data.
1121      */
1122     int getOnDiskSizeWithHeader() {
1123       expectState(State.BLOCK_READY);
1124       return onDiskBytesWithHeader.length + onDiskChecksum.length;
1125     }
1126 
1127     /**
1128      * The uncompressed size of the block data. Does not include header size.
1129      */
1130     int getUncompressedSizeWithoutHeader() {
1131       expectState(State.BLOCK_READY);
1132       return uncompressedBytesWithHeader.length - HConstants.HFILEBLOCK_HEADER_SIZE;
1133     }
1134 
1135     /**
1136      * The uncompressed size of the block data, including header size.
1137      */
1138     int getUncompressedSizeWithHeader() {
1139       expectState(State.BLOCK_READY);
1140       return uncompressedBytesWithHeader.length;
1141     }
1142 
1143     /** @return true if a block is being written  */
1144     public boolean isWriting() {
1145       return state == State.WRITING;
1146     }
1147 
1148     /**
1149      * Returns the number of bytes written into the current block so far, or
1150      * zero if not writing the block at the moment. Note that this will return
1151      * zero in the "block ready" state as well.
1152      *
1153      * @return the number of bytes written
1154      */
1155     public int blockSizeWritten() {
1156       if (state != State.WRITING) return 0;
1157       return this.unencodedDataSizeWritten;
1158     }
1159 
1160     /**
1161      * Returns the header followed by the uncompressed data, even if using
1162      * compression. This is needed for storing uncompressed blocks in the block
1163      * cache. Can be called in the "writing" state or the "block ready" state.
1164      * Returns only the header and data, does not include checksum data.
1165      *
1166      * @return uncompressed block bytes for caching on write
1167      */
1168     ByteBuffer getUncompressedBufferWithHeader() {
1169       expectState(State.BLOCK_READY);
1170       return ByteBuffer.wrap(uncompressedBytesWithHeader);
1171     }
1172 
1173     /**
1174      * Returns the header followed by the on-disk (compressed/encoded/encrypted) data. This is
1175      * needed for storing packed blocks in the block cache. Expects calling semantics identical to
1176      * {@link #getUncompressedBufferWithHeader()}. Returns only the header and data,
1177      * Does not include checksum data.
1178      *
1179      * @return packed block bytes for caching on write
1180      */
1181     ByteBuffer getOnDiskBufferWithHeader() {
1182       expectState(State.BLOCK_READY);
1183       return ByteBuffer.wrap(onDiskBytesWithHeader);
1184     }
1185 
1186     private void expectState(State expectedState) {
1187       if (state != expectedState) {
1188         throw new IllegalStateException("Expected state: " + expectedState +
1189             ", actual state: " + state);
1190       }
1191     }
1192 
1193     /**
1194      * Takes the given {@link BlockWritable} instance, creates a new block of
1195      * its appropriate type, writes the writable into this block, and flushes
1196      * the block into the output stream. The writer is instructed not to buffer
1197      * uncompressed bytes for cache-on-write.
1198      *
1199      * @param bw the block-writable object to write as a block
1200      * @param out the file system output stream
1201      * @throws IOException
1202      */
1203     public void writeBlock(BlockWritable bw, FSDataOutputStream out)
1204         throws IOException {
1205       bw.writeToBlock(startWriting(bw.getBlockType()));
1206       writeHeaderAndData(out);
1207     }
1208 
1209     /**
1210      * Creates a new HFileBlock. Checksums have already been validated, so
1211      * the byte buffer passed into the constructor of this newly created
1212      * block does not have checksum data even though the header minor
1213      * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a
1214      * 0 value in bytesPerChecksum.
1215      */
1216     public HFileBlock getBlockForCaching(CacheConfig cacheConf) {
1217       HFileContext newContext = new HFileContextBuilder()
1218                                 .withBlockSize(fileContext.getBlocksize())
1219                                 .withBytesPerCheckSum(0)
1220                                 .withChecksumType(ChecksumType.NULL) // no checksums in cached data
1221                                 .withCompression(fileContext.getCompression())
1222                                 .withDataBlockEncoding(fileContext.getDataBlockEncoding())
1223                                 .withHBaseCheckSum(fileContext.isUseHBaseChecksum())
1224                                 .withCompressTags(fileContext.isCompressTags())
1225                                 .withIncludesMvcc(fileContext.isIncludesMvcc())
1226                                 .withIncludesTags(fileContext.isIncludesTags())
1227                                 .build();
1228       return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
1229           getUncompressedSizeWithoutHeader(), prevOffset,
1230           cacheConf.shouldCacheCompressed(blockType.getCategory()) ?
1231             getOnDiskBufferWithHeader() :
1232             getUncompressedBufferWithHeader(),
1233           FILL_HEADER, startOffset,
1234           onDiskBytesWithHeader.length + onDiskChecksum.length, newContext);
1235     }
1236   }
1237 
1238   /** Something that can be written into a block. */
1239   public interface BlockWritable {
1240 
1241     /** The type of block this data should use. */
1242     BlockType getBlockType();
1243 
1244     /**
1245      * Writes the block to the provided stream. Must not write any magic
1246      * records.
1247      *
1248      * @param out a stream to write uncompressed data into
1249      */
1250     void writeToBlock(DataOutput out) throws IOException;
1251   }
1252 
1253   // Block readers and writers
1254 
1255   /** An interface allowing to iterate {@link HFileBlock}s. */
1256   public interface BlockIterator {
1257 
1258     /**
1259      * Get the next block, or null if there are no more blocks to iterate.
1260      */
1261     HFileBlock nextBlock() throws IOException;
1262 
1263     /**
1264      * Similar to {@link #nextBlock()} but checks block type, throws an
1265      * exception if incorrect, and returns the HFile block
1266      */
1267     HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException;
1268   }
1269 
1270   /** A full-fledged reader with iteration ability. */
1271   public interface FSReader {
1272 
1273     /**
1274      * Reads the block at the given offset in the file with the given on-disk
1275      * size and uncompressed size.
1276      *
1277      * @param offset
1278      * @param onDiskSize the on-disk size of the entire block, including all
1279      *          applicable headers, or -1 if unknown
1280      * @param uncompressedSize the uncompressed size of the compressed part of
1281      *          the block, or -1 if unknown
1282      * @return the newly read block
1283      */
1284     HFileBlock readBlockData(long offset, long onDiskSize,
1285         int uncompressedSize, boolean pread, boolean updateMetrics) throws IOException;
1286 
1287     /**
1288      * Creates a block iterator over the given portion of the {@link HFile}.
1289      * The iterator returns blocks starting with offset such that offset <=
1290      * startOffset < endOffset. Returned blocks are always unpacked.
1291      *
1292      * @param startOffset the offset of the block to start iteration with
1293      * @param endOffset the offset to end iteration at (exclusive)
1294      * @return an iterator of blocks between the two given offsets
1295      */
1296     BlockIterator blockRange(long startOffset, long endOffset);
1297 
1298     /** Closes the backing streams */
1299     void closeStreams() throws IOException;
1300 
1301     /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */
1302     HFileBlockDecodingContext getBlockDecodingContext();
1303 
1304     /** Get the default decoder for blocks from this file. */
1305     HFileBlockDecodingContext getDefaultBlockDecodingContext();
1306 
1307     /**
1308      * To close the stream's socket. Note: This can be concurrently called from multiple threads and
1309      * implementation should take care of thread safety.
1310      */
1311     void unbufferStream();
1312   }
1313 
1314   /**
1315    * A common implementation of some methods of {@link FSReader} and some
1316    * tools for implementing HFile format version-specific block readers.
1317    */
1318   private abstract static class AbstractFSReader implements FSReader {
1319     /** Compression algorithm used by the {@link HFile} */
1320 
1321     /** The size of the file we are reading from, or -1 if unknown. */
1322     protected long fileSize;
1323 
1324     /** The size of the header */
1325     protected final int hdrSize;
1326 
1327     /** The filesystem used to access data */
1328     protected HFileSystem hfs;
1329 
1330     protected final Lock streamLock = new ReentrantLock();
1331 
1332     /** The default buffer size for our buffered streams */
1333     public static final int DEFAULT_BUFFER_SIZE = 1 << 20;
1334 
1335     protected HFileContext fileContext;
1336     // Cache the fileName
1337     protected String pathName;
1338 
1339     public AbstractFSReader(long fileSize, HFileSystem hfs, Path path, HFileContext fileContext)
1340         throws IOException {
1341       this.fileSize = fileSize;
1342       this.hfs = hfs;
1343       if (path != null) {
1344         this.pathName = path.toString();
1345       }
1346       this.fileContext = fileContext;
1347       this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
1348     }
1349 
1350     @Override
1351     public BlockIterator blockRange(final long startOffset,
1352         final long endOffset) {
1353       final FSReader owner = this; // handle for inner class
1354       return new BlockIterator() {
1355         private long offset = startOffset;
1356         // Cache length of next block. Current block has the length of next block in it.
1357         private long length = -1;
1358 
1359         @Override
1360         public HFileBlock nextBlock() throws IOException {
1361           if (offset >= endOffset)
1362             return null;
1363           HFileBlock b = readBlockData(offset, length, -1, false, false);
1364           offset += b.getOnDiskSizeWithHeader();
1365           length = b.getNextBlockOnDiskSizeWithHeader();
1366           return b.unpack(fileContext, owner);
1367         }
1368 
1369         @Override
1370         public HFileBlock nextBlockWithBlockType(BlockType blockType)
1371             throws IOException {
1372           HFileBlock blk = nextBlock();
1373           if (blk.getBlockType() != blockType) {
1374             throw new IOException("Expected block of type " + blockType
1375                 + " but found " + blk.getBlockType());
1376           }
1377           return blk;
1378         }
1379       };
1380     }
1381 
1382     /**
1383      * Does a positional read or a seek and read into the given buffer. Returns
1384      * the on-disk size of the next block, or -1 if it could not be determined.
1385      *
1386      * @param dest destination buffer
1387      * @param destOffset offset in the destination buffer
1388      * @param size size of the block to be read
1389      * @param peekIntoNextBlock whether to read the next block's on-disk size
1390      * @param fileOffset position in the stream to read at
1391      * @param pread whether we should do a positional read
1392      * @param istream The input source of data
1393      * @return the on-disk size of the next block with header size included, or
1394      *         -1 if it could not be determined
1395      * @throws IOException
1396      */
1397     protected int readAtOffset(FSDataInputStream istream,
1398         byte[] dest, int destOffset, int size,
1399         boolean peekIntoNextBlock, long fileOffset, boolean pread)
1400         throws IOException {
1401       if (peekIntoNextBlock &&
1402           destOffset + size + hdrSize > dest.length) {
1403         // We are asked to read the next block's header as well, but there is
1404         // not enough room in the array.
1405         throw new IOException("Attempted to read " + size + " bytes and " +
1406             hdrSize + " bytes of next header into a " + dest.length +
1407             "-byte array at offset " + destOffset);
1408       }
1409 
1410       if (!pread && streamLock.tryLock()) {
1411         // Seek + read. Better for scanning.
1412         try {
1413           HFileUtil.seekOnMultipleSources(istream, fileOffset);
1414           // TODO: do we need seek time latencies?
1415           long realOffset = istream.getPos();
1416           if (realOffset != fileOffset) {
1417             throw new IOException("Tried to seek to " + fileOffset + " to "
1418                 + "read " + size + " bytes, but pos=" + realOffset
1419                 + " after seek");
1420           }
1421 
1422           if (!peekIntoNextBlock) {
1423             IOUtils.readFully(istream, dest, destOffset, size);
1424             return -1;
1425           }
1426 
1427           // Try to read the next block header.
1428           if (!readWithExtra(istream, dest, destOffset, size, hdrSize))
1429             return -1;
1430         } finally {
1431           streamLock.unlock();
1432         }
1433       } else {
1434         // Positional read. Better for random reads; or when the streamLock is already locked.
1435         int extraSize = peekIntoNextBlock ? hdrSize : 0;
1436         if (!positionalReadWithExtra(istream, fileOffset, dest, destOffset,
1437             size, extraSize)) {
1438           return -1;
1439         }
1440       }
1441 
1442       assert peekIntoNextBlock;
1443       return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize;
1444     }
1445 
1446   }
1447 
1448   /**
1449    * We always prefetch the header of the next block, so that we know its
1450    * on-disk size in advance and can read it in one operation.
1451    */
1452   private static class PrefetchedHeader {
1453     long offset = -1;
1454     byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
1455     final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
1456   }
1457 
1458   /** Reads version 2 blocks from the filesystem. */
1459   static class FSReaderImpl extends AbstractFSReader {
1460     /** The file system stream of the underlying {@link HFile} that
1461      * does or doesn't do checksum validations in the filesystem */
1462     protected FSDataInputStreamWrapper streamWrapper;
1463 
1464     private HFileBlockDecodingContext encodedBlockDecodingCtx;
1465 
1466     /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */
1467     private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
1468 
1469     private AtomicReference<PrefetchedHeader> prefetchedHeader =
1470         new AtomicReference<PrefetchedHeader>(new PrefetchedHeader());
1471 
1472     public FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
1473         HFileContext fileContext) throws IOException {
1474       super(fileSize, hfs, path, fileContext);
1475       this.streamWrapper = stream;
1476       // Older versions of HBase didn't support checksum.
1477       this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
1478       defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
1479       encodedBlockDecodingCtx = defaultDecodingCtx;
1480     }
1481 
1482     /**
1483      * A constructor that reads files with the latest minor version.
1484      * This is used by unit tests only.
1485      */
1486     FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext)
1487     throws IOException {
1488       this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext);
1489     }
1490 
1491     /**
1492      * Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as
1493      * little memory allocation as possible, using the provided on-disk size.
1494      *
1495      * @param offset the offset in the stream to read at
1496      * @param onDiskSizeWithHeaderL the on-disk size of the block, including
1497      *          the header, or -1 if unknown
1498      * @param uncompressedSize the uncompressed size of the the block. Always
1499      *          expected to be -1. This parameter is only used in version 1.
1500      * @param pread whether to use a positional read
1501      */
1502     @Override
1503     public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL,
1504         int uncompressedSize, boolean pread, boolean updateMetrics)
1505     throws IOException {
1506 
1507       // get a copy of the current state of whether to validate
1508       // hbase checksums or not for this read call. This is not
1509       // thread-safe but the one constaint is that if we decide
1510       // to skip hbase checksum verification then we are
1511       // guaranteed to use hdfs checksum verification.
1512       boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
1513       FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
1514 
1515       HFileBlock blk = readBlockDataInternal(is, offset,
1516                          onDiskSizeWithHeaderL,
1517                          uncompressedSize, pread,
1518                          doVerificationThruHBaseChecksum, updateMetrics);
1519       if (blk == null) {
1520         HFile.LOG.warn("HBase checksum verification failed for file " +
1521                        pathName + " at offset " +
1522                        offset + " filesize " + fileSize +
1523                        ". Retrying read with HDFS checksums turned on...");
1524 
1525         if (!doVerificationThruHBaseChecksum) {
1526           String msg = "HBase checksum verification failed for file " +
1527                        pathName + " at offset " +
1528                        offset + " filesize " + fileSize +
1529                        " but this cannot happen because doVerify is " +
1530                        doVerificationThruHBaseChecksum;
1531           HFile.LOG.warn(msg);
1532           throw new IOException(msg); // cannot happen case here
1533         }
1534         HFile.checksumFailures.increment(); // update metrics
1535 
1536         // If we have a checksum failure, we fall back into a mode where
1537         // the next few reads use HDFS level checksums. We aim to make the
1538         // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid
1539         // hbase checksum verification, but since this value is set without
1540         // holding any locks, it can so happen that we might actually do
1541         // a few more than precisely this number.
1542         is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
1543         doVerificationThruHBaseChecksum = false;
1544         blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL,
1545                                     uncompressedSize, pread,
1546                                     doVerificationThruHBaseChecksum, updateMetrics);
1547         if (blk != null) {
1548           HFile.LOG.warn("HDFS checksum verification suceeded for file " +
1549                          pathName + " at offset " +
1550                          offset + " filesize " + fileSize);
1551         }
1552       }
1553       if (blk == null && !doVerificationThruHBaseChecksum) {
1554         String msg = "readBlockData failed, possibly due to " +
1555                      "checksum verification failed for file " + pathName +
1556                      " at offset " + offset + " filesize " + fileSize;
1557         HFile.LOG.warn(msg);
1558         throw new IOException(msg);
1559       }
1560 
1561       // If there is a checksum mismatch earlier, then retry with
1562       // HBase checksums switched off and use HDFS checksum verification.
1563       // This triggers HDFS to detect and fix corrupt replicas. The
1564       // next checksumOffCount read requests will use HDFS checksums.
1565       // The decrementing of this.checksumOffCount is not thread-safe,
1566       // but it is harmless because eventually checksumOffCount will be
1567       // a negative number.
1568       streamWrapper.checksumOk();
1569       return blk;
1570     }
1571 
1572     /**
1573      * Reads a version 2 block.
1574      *
1575      * @param offset the offset in the stream to read at
1576      * @param onDiskSizeWithHeaderL the on-disk size of the block, including
1577      *          the header, or -1 if unknown
1578      * @param uncompressedSize the uncompressed size of the the block. Always
1579      *          expected to be -1. This parameter is only used in version 1.
1580      * @param pread whether to use a positional read
1581      * @param verifyChecksum Whether to use HBase checksums.
1582      *        If HBase checksum is switched off, then use HDFS checksum.
1583      * @return the HFileBlock or null if there is a HBase checksum mismatch
1584      */
1585     private HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
1586         long onDiskSizeWithHeaderL, int uncompressedSize, boolean pread,
1587         boolean verifyChecksum, boolean updateMetrics)
1588     throws IOException {
1589       if (offset < 0) {
1590         throw new IOException("Invalid offset=" + offset + " trying to read "
1591             + "block (onDiskSize=" + onDiskSizeWithHeaderL
1592             + ", uncompressedSize=" + uncompressedSize + ")");
1593       }
1594 
1595       if (uncompressedSize != -1) {
1596         throw new IOException("Version 2 block reader API does not need " +
1597             "the uncompressed size parameter");
1598       }
1599 
1600       if ((onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1)
1601           || onDiskSizeWithHeaderL >= Integer.MAX_VALUE) {
1602         throw new IOException("Invalid onDisksize=" + onDiskSizeWithHeaderL
1603             + ": expected to be at least " + hdrSize
1604             + " and at most " + Integer.MAX_VALUE + ", or -1 (offset="
1605             + offset + ", uncompressedSize=" + uncompressedSize + ")");
1606       }
1607 
1608       int onDiskSizeWithHeader = (int) onDiskSizeWithHeaderL;
1609       // See if we can avoid reading the header. This is desirable, because
1610       // we will not incur a backward seek operation if we have already
1611       // read this block's header as part of the previous read's look-ahead.
1612       // And we also want to skip reading the header again if it has already
1613       // been read.
1614       PrefetchedHeader ph = prefetchedHeader.getAndSet(null); // be multithread safe
1615       ByteBuffer headerBuf = null;
1616       if (ph != null) {
1617         if (ph.offset == offset) {
1618           headerBuf = ph.buf;       // our previous read, use the cached buffer
1619         } else {
1620           prefetchedHeader.set(ph); // not our previous read, put back
1621         }
1622       }
1623 
1624       // Allocate enough space to fit the next block's header too.
1625       int nextBlockOnDiskSize = 0;
1626       byte[] onDiskBlock = null;
1627 
1628       long startTime = System.currentTimeMillis();
1629       HFileBlock b = null;
1630       if (onDiskSizeWithHeader > 0) {
1631         // We know the total on-disk size. Read the entire block into memory,
1632         // then parse the header. This code path is used when
1633         // doing a random read operation relying on the block index, as well as
1634         // when the client knows the on-disk size from peeking into the next
1635         // block's header (e.g. this block's header) when reading the previous
1636         // block. This is the faster and more preferable case.
1637 
1638         // Size that we have to skip in case we have already read the header.
1639         int preReadHeaderSize = headerBuf == null ? 0 : hdrSize;
1640         onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize]; // room for this block plus the
1641                                                                 // next block's header
1642         nextBlockOnDiskSize = readAtOffset(is, onDiskBlock,
1643             preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize,
1644             true, offset + preReadHeaderSize, pread);
1645         if (headerBuf != null) {
1646           // the header has been read when reading the previous block, copy
1647           // to this block's header
1648           // headerBuf is HBB
1649           assert headerBuf.hasArray();
1650           System.arraycopy(headerBuf.array(),
1651               headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1652         } else {
1653           headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize);
1654         }
1655         // We know the total on-disk size but not the uncompressed size. Parse the header.
1656         try {
1657           // TODO: FIX!!! Expensive parse just to get a length
1658           b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum());
1659         } catch (IOException ex) {
1660           // Seen in load testing. Provide comprehensive debug info.
1661           throw new IOException("Failed to read compressed block at "
1662               + offset
1663               + ", onDiskSizeWithoutHeader="
1664               + onDiskSizeWithHeader
1665               + ", preReadHeaderSize="
1666               + hdrSize
1667               + ", header.length="
1668               + prefetchedHeader.get().header.length
1669               + ", header bytes: "
1670               + Bytes.toStringBinary(prefetchedHeader.get().header, 0,
1671                   hdrSize), ex);
1672         }
1673         // if the caller specifies a onDiskSizeWithHeader, validate it.
1674         int onDiskSizeWithoutHeader = onDiskSizeWithHeader - hdrSize;
1675         assert onDiskSizeWithoutHeader >= 0;
1676         b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader);
1677       } else {
1678         // Check headerBuf to see if we have read this block's header as part of
1679         // reading the previous block. This is an optimization of peeking into
1680         // the next block's header (e.g.this block's header) when reading the
1681         // previous block. This is the faster and more preferable case. If the
1682         // header is already there, don't read the header again.
1683 
1684         // Unfortunately, we still have to do a separate read operation to
1685         // read the header.
1686         if (headerBuf == null) {
1687           // From the header, determine the on-disk size of the given hfile
1688           // block, and read the remaining data, thereby incurring two read
1689           // operations. This might happen when we are doing the first read
1690           // in a series of reads or a random read, and we don't have access
1691           // to the block index. This is costly and should happen very rarely.
1692           headerBuf = ByteBuffer.allocate(hdrSize);
1693           // headerBuf is HBB
1694           readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(),
1695               hdrSize, false, offset, pread);
1696         }
1697         // TODO: FIX!!! Expensive parse just to get a length
1698         b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum());
1699         onDiskBlock = new byte[b.getOnDiskSizeWithHeader() + hdrSize];
1700         // headerBuf is HBB
1701         System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1702         nextBlockOnDiskSize =
1703           readAtOffset(is, onDiskBlock, hdrSize, b.getOnDiskSizeWithHeader()
1704               - hdrSize, true, offset + hdrSize, pread);
1705         onDiskSizeWithHeader = b.onDiskSizeWithoutHeader + hdrSize;
1706       }
1707 
1708       if (!fileContext.isCompressedOrEncrypted()) {
1709         b.assumeUncompressed();
1710       }
1711 
1712       if (verifyChecksum && !validateBlockChecksum(b, onDiskBlock, hdrSize)) {
1713         return null;             // checksum mismatch
1714       }
1715 
1716       long duration = System.currentTimeMillis() - startTime;
1717       if (updateMetrics) {
1718         HFile.updateReadLatency(duration, pread);
1719       }
1720 
1721       // The onDiskBlock will become the headerAndDataBuffer for this block.
1722       // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
1723       // contains the header of next block, so no need to set next
1724       // block's header in it.
1725       b = new HFileBlock(ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader),
1726         this.fileContext.isUseHBaseChecksum());
1727 
1728       if (LOG.isTraceEnabled()) {
1729         LOG.trace("Read " + b + " in " + duration + " ns");
1730       }
1731 
1732       b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize;
1733 
1734       // Set prefetched header
1735       if (b.hasNextBlockHeader()) {
1736         ph = new PrefetchedHeader();
1737         ph.offset = offset + b.getOnDiskSizeWithHeader();
1738         System.arraycopy(onDiskBlock, onDiskSizeWithHeader, ph.header, 0, hdrSize);
1739         prefetchedHeader.set(ph);
1740       }
1741 
1742       b.offset = offset;
1743       b.fileContext.setIncludesTags(this.fileContext.isIncludesTags());
1744       b.fileContext.setIncludesMvcc(this.fileContext.isIncludesMvcc());
1745       return b;
1746     }
1747 
1748     void setIncludesMemstoreTS(boolean includesMemstoreTS) {
1749       this.fileContext.setIncludesMvcc(includesMemstoreTS);
1750     }
1751 
1752     void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
1753       encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext);
1754     }
1755 
1756     @Override
1757     public HFileBlockDecodingContext getBlockDecodingContext() {
1758       return this.encodedBlockDecodingCtx;
1759     }
1760 
1761     @Override
1762     public HFileBlockDecodingContext getDefaultBlockDecodingContext() {
1763       return this.defaultDecodingCtx;
1764     }
1765 
1766     /**
1767      * Generates the checksum for the header as well as the data and
1768      * then validates that it matches the value stored in the header.
1769      * If there is a checksum mismatch, then return false. Otherwise
1770      * return true.
1771      */
1772     protected boolean validateBlockChecksum(HFileBlock block,  byte[] data, int hdrSize)
1773         throws IOException {
1774       return ChecksumUtil.validateBlockChecksum(pathName, block, data, hdrSize);
1775     }
1776 
1777     @Override
1778     public void closeStreams() throws IOException {
1779       streamWrapper.close();
1780     }
1781 
1782     @Override
1783     public void unbufferStream() {
1784       // To handle concurrent reads, ensure that no other client is accessing the streams while we
1785       // unbuffer it.
1786       if (streamLock.tryLock()) {
1787         try {
1788           this.streamWrapper.unbuffer();
1789         } finally {
1790           streamLock.unlock();
1791         }
1792       }
1793     }
1794 
1795     @Override
1796     public String toString() {
1797       return "hfs=" + hfs + ", path=" + pathName + ", fileContext=" + fileContext;
1798     }
1799   }
1800 
1801   @Override
1802   public int getSerializedLength() {
1803     if (buf != null) {
1804       // include extra bytes for the next header when it's available.
1805       int extraSpace = hasNextBlockHeader() ? headerSize() : 0;
1806       return this.buf.limit() + extraSpace + HFileBlock.EXTRA_SERIALIZATION_SPACE;
1807     }
1808     return 0;
1809   }
1810 
1811   @Override
1812   public void serialize(ByteBuffer destination) {
1813     ByteBufferUtils.copyFromBufferToBuffer(destination, this.buf, 0, getSerializedLength()
1814         - EXTRA_SERIALIZATION_SPACE);
1815     serializeExtraInfo(destination);
1816   }
1817 
1818   public void serializeExtraInfo(ByteBuffer destination) {
1819     destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0);
1820     destination.putLong(this.offset);
1821     destination.putInt(this.nextBlockOnDiskSizeWithHeader);
1822     destination.rewind();
1823   }
1824 
1825   @Override
1826   public CacheableDeserializer<Cacheable> getDeserializer() {
1827     return HFileBlock.blockDeserializer;
1828   }
1829 
1830   @Override
1831   public boolean equals(Object comparison) {
1832     if (this == comparison) {
1833       return true;
1834     }
1835     if (comparison == null) {
1836       return false;
1837     }
1838     if (comparison.getClass() != this.getClass()) {
1839       return false;
1840     }
1841 
1842     HFileBlock castedComparison = (HFileBlock) comparison;
1843 
1844     if (castedComparison.blockType != this.blockType) {
1845       return false;
1846     }
1847     if (castedComparison.nextBlockOnDiskSizeWithHeader != this.nextBlockOnDiskSizeWithHeader) {
1848       return false;
1849     }
1850     if (castedComparison.offset != this.offset) {
1851       return false;
1852     }
1853     if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) {
1854       return false;
1855     }
1856     if (castedComparison.prevBlockOffset != this.prevBlockOffset) {
1857       return false;
1858     }
1859     if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
1860       return false;
1861     }
1862     if (ByteBufferUtils.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0,
1863         castedComparison.buf.limit()) != 0) {
1864       return false;
1865     }
1866     return true;
1867   }
1868 
1869   public DataBlockEncoding getDataBlockEncoding() {
1870     if (blockType == BlockType.ENCODED_DATA) {
1871       return DataBlockEncoding.getEncodingById(getDataBlockEncodingId());
1872     }
1873     return DataBlockEncoding.NONE;
1874   }
1875 
1876   byte getChecksumType() {
1877     return this.fileContext.getChecksumType().getCode();
1878   }
1879 
1880   int getBytesPerChecksum() {
1881     return this.fileContext.getBytesPerChecksum();
1882   }
1883 
1884   /** @return the size of data on disk + header. Excludes checksum. */
1885   int getOnDiskDataSizeWithHeader() {
1886     return this.onDiskDataSizeWithHeader;
1887   }
1888 
1889   /**
1890    * Calcuate the number of bytes required to store all the checksums
1891    * for this block. Each checksum value is a 4 byte integer.
1892    */
1893   int totalChecksumBytes() {
1894     // If the hfile block has minorVersion 0, then there are no checksum
1895     // data to validate. Similarly, a zero value in this.bytesPerChecksum
1896     // indicates that cached blocks do not have checksum data because
1897     // checksums were already validated when the block was read from disk.
1898     if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) {
1899       return 0;
1900     }
1901     return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader,
1902         this.fileContext.getBytesPerChecksum());
1903   }
1904 
1905   /**
1906    * Returns the size of this block header.
1907    */
1908   public int headerSize() {
1909     return headerSize(this.fileContext.isUseHBaseChecksum());
1910   }
1911 
1912   /**
1913    * Maps a minor version to the size of the header.
1914    */
1915   public static int headerSize(boolean usesHBaseChecksum) {
1916     if (usesHBaseChecksum) {
1917       return HConstants.HFILEBLOCK_HEADER_SIZE;
1918     }
1919     return HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
1920   }
1921 
1922   /**
1923    * Return the appropriate DUMMY_HEADER for the minor version
1924    */
1925   public byte[] getDummyHeaderForVersion() {
1926     return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum());
1927   }
1928 
1929   /**
1930    * Return the appropriate DUMMY_HEADER for the minor version
1931    */
1932   static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) {
1933     if (usesHBaseChecksum) {
1934       return HConstants.HFILEBLOCK_DUMMY_HEADER;
1935     }
1936     return DUMMY_HEADER_NO_CHECKSUM;
1937   }
1938 
1939   /**
1940    * @return the HFileContext used to create this HFileBlock. Not necessary the
1941    * fileContext for the file from which this block's data was originally read.
1942    */
1943   public HFileContext getHFileContext() {
1944     return this.fileContext;
1945   }
1946 
1947   /**
1948    * Convert the contents of the block header into a human readable string.
1949    * This is mostly helpful for debugging. This assumes that the block
1950    * has minor version > 0.
1951    */
1952   static String toStringHeader(ByteBuffer buf) throws IOException {
1953     byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)];
1954     buf.get(magicBuf);
1955     BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH);
1956     int compressedBlockSizeNoHeader = buf.getInt();
1957     int uncompressedBlockSizeNoHeader = buf.getInt();
1958     long prevBlockOffset = buf.getLong();
1959     byte cksumtype = buf.get();
1960     long bytesPerChecksum = buf.getInt();
1961     long onDiskDataSizeWithHeader = buf.getInt();
1962     return " Header dump: magic: " + Bytes.toString(magicBuf) +
1963                    " blockType " + bt +
1964                    " compressedBlockSizeNoHeader " +
1965                    compressedBlockSizeNoHeader +
1966                    " uncompressedBlockSizeNoHeader " +
1967                    uncompressedBlockSizeNoHeader +
1968                    " prevBlockOffset " + prevBlockOffset +
1969                    " checksumType " + ChecksumType.codeToType(cksumtype) +
1970                    " bytesPerChecksum " + bytesPerChecksum +
1971                    " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
1972   }
1973 }