1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.io.hfile;
19
20 import java.io.ByteArrayOutputStream;
21 import java.io.DataInputStream;
22 import java.io.DataOutput;
23 import java.io.DataOutputStream;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.nio.ByteBuffer;
27 import java.util.concurrent.atomic.AtomicReference;
28 import java.util.concurrent.locks.Lock;
29 import java.util.concurrent.locks.ReentrantLock;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.fs.FSDataInputStream;
34 import org.apache.hadoop.fs.FSDataOutputStream;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.hbase.Cell;
37 import org.apache.hadoop.hbase.HConstants;
38 import org.apache.hadoop.hbase.classification.InterfaceAudience;
39 import org.apache.hadoop.hbase.fs.HFileSystem;
40 import org.apache.hadoop.hbase.io.ByteBufferInputStream;
41 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
42 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
43 import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
44 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
45 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
46 import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
47 import org.apache.hadoop.hbase.util.ByteBufferUtils;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.apache.hadoop.hbase.util.ChecksumType;
50 import org.apache.hadoop.hbase.util.ClassSize;
51 import org.apache.hadoop.io.IOUtils;
52
53 import com.google.common.annotations.VisibleForTesting;
54 import com.google.common.base.Preconditions;
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88 @InterfaceAudience.Private
89 public class HFileBlock implements Cacheable {
90 private static final Log LOG = LogFactory.getLog(HFileBlock.class);
91
92
93
94
95
96
97 static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3;
98
99 public static final boolean FILL_HEADER = true;
100 public static final boolean DONT_FILL_HEADER = false;
101
102
103
104
105
106 public static final int ENCODED_HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE
107 + DataBlockEncoding.ID_SIZE;
108
109 static final byte[] DUMMY_HEADER_NO_CHECKSUM =
110 new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
111
112 public static final int BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
113 ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false);
114
115
116 public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT
117 + Bytes.SIZEOF_LONG;
118
119
120
121
122 static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
123
124 static final CacheableDeserializer<Cacheable> blockDeserializer =
125 new CacheableDeserializer<Cacheable>() {
126 public HFileBlock deserialize(ByteBuffer buf, boolean reuse) throws IOException{
127 buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
128 ByteBuffer newByteBuffer;
129 if (reuse) {
130 newByteBuffer = buf.slice();
131 } else {
132 newByteBuffer = ByteBuffer.allocate(buf.limit());
133 newByteBuffer.put(buf);
134 }
135 buf.position(buf.limit());
136 buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
137 boolean usesChecksum = buf.get() == (byte)1;
138 HFileBlock hFileBlock = new HFileBlock(newByteBuffer, usesChecksum);
139 hFileBlock.offset = buf.getLong();
140 hFileBlock.nextBlockOnDiskSizeWithHeader = buf.getInt();
141 if (hFileBlock.hasNextBlockHeader()) {
142 hFileBlock.buf.limit(hFileBlock.buf.limit() - hFileBlock.headerSize());
143 }
144 return hFileBlock;
145 }
146
147 @Override
148 public int getDeserialiserIdentifier() {
149 return deserializerIdentifier;
150 }
151
152 @Override
153 public HFileBlock deserialize(ByteBuffer b) throws IOException {
154 return deserialize(b, false);
155 }
156 };
157 private static final int deserializerIdentifier;
158 static {
159 deserializerIdentifier = CacheableDeserializerIdManager
160 .registerDeserializer(blockDeserializer);
161 }
162
163
164 private BlockType blockType;
165
166
167 private int onDiskSizeWithoutHeader;
168
169
170 private final int uncompressedSizeWithoutHeader;
171
172
173 private final long prevBlockOffset;
174
175
176
177
178
179 private final int onDiskDataSizeWithHeader;
180
181
182 private ByteBuffer buf;
183
184
185 private HFileContext fileContext;
186
187
188
189
190
191 private long offset = -1;
192
193
194
195
196
197
198 private int nextBlockOnDiskSizeWithHeader = -1;
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216 HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader,
217 long prevBlockOffset, ByteBuffer buf, boolean fillHeader, long offset,
218 int onDiskDataSizeWithHeader, HFileContext fileContext) {
219 this.blockType = blockType;
220 this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
221 this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
222 this.prevBlockOffset = prevBlockOffset;
223 this.buf = buf;
224 this.offset = offset;
225 this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
226 this.fileContext = fileContext;
227 if (fillHeader)
228 overwriteHeader();
229 this.buf.rewind();
230 }
231
232
233
234
235 HFileBlock(HFileBlock that) {
236 this.blockType = that.blockType;
237 this.onDiskSizeWithoutHeader = that.onDiskSizeWithoutHeader;
238 this.uncompressedSizeWithoutHeader = that.uncompressedSizeWithoutHeader;
239 this.prevBlockOffset = that.prevBlockOffset;
240 this.buf = that.buf.duplicate();
241 this.offset = that.offset;
242 this.onDiskDataSizeWithHeader = that.onDiskDataSizeWithHeader;
243 this.fileContext = that.fileContext;
244 this.nextBlockOnDiskSizeWithHeader = that.nextBlockOnDiskSizeWithHeader;
245 }
246
247
248
249
250
251
252
253
254
255 HFileBlock(ByteBuffer b, boolean usesHBaseChecksum) throws IOException {
256 b.rewind();
257 blockType = BlockType.read(b);
258 onDiskSizeWithoutHeader = b.getInt();
259 uncompressedSizeWithoutHeader = b.getInt();
260 prevBlockOffset = b.getLong();
261 HFileContextBuilder contextBuilder = new HFileContextBuilder();
262 contextBuilder.withHBaseCheckSum(usesHBaseChecksum);
263 if (usesHBaseChecksum) {
264 contextBuilder.withChecksumType(ChecksumType.codeToType(b.get()));
265 contextBuilder.withBytesPerCheckSum(b.getInt());
266 this.onDiskDataSizeWithHeader = b.getInt();
267 } else {
268 contextBuilder.withChecksumType(ChecksumType.NULL);
269 contextBuilder.withBytesPerCheckSum(0);
270 this.onDiskDataSizeWithHeader = onDiskSizeWithoutHeader +
271 HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
272 }
273 this.fileContext = contextBuilder.build();
274 buf = b;
275 buf.rewind();
276 }
277
278 public BlockType getBlockType() {
279 return blockType;
280 }
281
282
283 public short getDataBlockEncodingId() {
284 if (blockType != BlockType.ENCODED_DATA) {
285 throw new IllegalArgumentException("Querying encoder ID of a block " +
286 "of type other than " + BlockType.ENCODED_DATA + ": " + blockType);
287 }
288 return buf.getShort(headerSize());
289 }
290
291
292
293
294 public int getOnDiskSizeWithHeader() {
295 return onDiskSizeWithoutHeader + headerSize();
296 }
297
298
299
300
301 public int getOnDiskSizeWithoutHeader() {
302 return onDiskSizeWithoutHeader;
303 }
304
305
306
307
308 public int getUncompressedSizeWithoutHeader() {
309 return uncompressedSizeWithoutHeader;
310 }
311
312
313
314
315
316 public long getPrevBlockOffset() {
317 return prevBlockOffset;
318 }
319
320
321
322
323
324 private void overwriteHeader() {
325 buf.rewind();
326 blockType.write(buf);
327 buf.putInt(onDiskSizeWithoutHeader);
328 buf.putInt(uncompressedSizeWithoutHeader);
329 buf.putLong(prevBlockOffset);
330 if (this.fileContext.isUseHBaseChecksum()) {
331 buf.put(fileContext.getChecksumType().getCode());
332 buf.putInt(fileContext.getBytesPerChecksum());
333 buf.putInt(onDiskDataSizeWithHeader);
334 }
335 }
336
337
338
339
340
341
342 public ByteBuffer getBufferWithoutHeader() {
343 ByteBuffer dup = this.buf.duplicate();
344 dup.position(headerSize());
345 dup.limit(buf.limit() - totalChecksumBytes());
346 return dup.slice();
347 }
348
349
350
351
352
353
354
355
356
357
358
359 public ByteBuffer getBufferReadOnly() {
360 ByteBuffer dup = this.buf.duplicate();
361 dup.limit(buf.limit() - totalChecksumBytes());
362 return dup.slice();
363 }
364
365
366
367
368
369
370
371
372 public ByteBuffer getBufferReadOnlyWithHeader() {
373 ByteBuffer dup = this.buf.duplicate();
374 return dup.slice();
375 }
376
377
378
379
380
381
382
383 ByteBuffer getBufferWithHeader() {
384 ByteBuffer dupBuf = buf.duplicate();
385 dupBuf.rewind();
386 return dupBuf;
387 }
388
389 private void sanityCheckAssertion(long valueFromBuf, long valueFromField,
390 String fieldName) throws IOException {
391 if (valueFromBuf != valueFromField) {
392 throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf
393 + ") is different from that in the field (" + valueFromField + ")");
394 }
395 }
396
397 private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField)
398 throws IOException {
399 if (valueFromBuf != valueFromField) {
400 throw new IOException("Block type stored in the buffer: " +
401 valueFromBuf + ", block type field: " + valueFromField);
402 }
403 }
404
405
406
407
408
409
410
411
412 void sanityCheck() throws IOException {
413 buf.rewind();
414
415 sanityCheckAssertion(BlockType.read(buf), blockType);
416
417 sanityCheckAssertion(buf.getInt(), onDiskSizeWithoutHeader,
418 "onDiskSizeWithoutHeader");
419
420 sanityCheckAssertion(buf.getInt(), uncompressedSizeWithoutHeader,
421 "uncompressedSizeWithoutHeader");
422
423 sanityCheckAssertion(buf.getLong(), prevBlockOffset, "prevBlocKOffset");
424 if (this.fileContext.isUseHBaseChecksum()) {
425 sanityCheckAssertion(buf.get(), this.fileContext.getChecksumType().getCode(), "checksumType");
426 sanityCheckAssertion(buf.getInt(), this.fileContext.getBytesPerChecksum(), "bytesPerChecksum");
427 sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader");
428 }
429
430 int cksumBytes = totalChecksumBytes();
431 int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes;
432 if (buf.limit() != expectedBufLimit) {
433 throw new AssertionError("Expected buffer limit " + expectedBufLimit
434 + ", got " + buf.limit());
435 }
436
437
438
439 int hdrSize = headerSize();
440 if (buf.capacity() != expectedBufLimit &&
441 buf.capacity() != expectedBufLimit + hdrSize) {
442 throw new AssertionError("Invalid buffer capacity: " + buf.capacity() +
443 ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize));
444 }
445 }
446
447 @Override
448 public String toString() {
449 StringBuilder sb = new StringBuilder()
450 .append("HFileBlock [")
451 .append(" fileOffset=").append(offset)
452 .append(" headerSize()=").append(headerSize())
453 .append(" blockType=").append(blockType)
454 .append(" onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader)
455 .append(" uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader)
456 .append(" prevBlockOffset=").append(prevBlockOffset)
457 .append(" isUseHBaseChecksum()=").append(fileContext.isUseHBaseChecksum());
458 if (fileContext.isUseHBaseChecksum()) {
459 sb.append(" checksumType=").append(ChecksumType.codeToType(this.buf.get(24)))
460 .append(" bytesPerChecksum=").append(this.buf.getInt(24 + 1))
461 .append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader);
462 } else {
463 sb.append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader)
464 .append("(").append(onDiskSizeWithoutHeader)
465 .append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
466 }
467 String dataBegin = null;
468 if (buf.hasArray()) {
469 dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
470 Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
471 } else {
472 ByteBuffer bufWithoutHeader = getBufferWithoutHeader();
473 byte[] dataBeginBytes = new byte[Math.min(32,
474 bufWithoutHeader.limit() - bufWithoutHeader.position())];
475 bufWithoutHeader.get(dataBeginBytes);
476 dataBegin = Bytes.toStringBinary(dataBeginBytes);
477 }
478 sb.append(" getOnDiskSizeWithHeader()=").append(getOnDiskSizeWithHeader())
479 .append(" totalChecksumBytes()=").append(totalChecksumBytes())
480 .append(" isUnpacked()=").append(isUnpacked())
481 .append(" buf=[ ").append(buf).append(" ]")
482 .append(" dataBeginsWith=").append(dataBegin)
483 .append(" fileContext=").append(fileContext)
484 .append(" ]");
485 return sb.toString();
486 }
487
488
489
490
491 private void validateOnDiskSizeWithoutHeader(int expectedOnDiskSizeWithoutHeader)
492 throws IOException {
493 if (onDiskSizeWithoutHeader != expectedOnDiskSizeWithoutHeader) {
494 String dataBegin = null;
495 if (buf.hasArray()) {
496 dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset(), Math.min(32, buf.limit()));
497 } else {
498 ByteBuffer bufDup = getBufferReadOnly();
499 byte[] dataBeginBytes = new byte[Math.min(32, bufDup.limit() - bufDup.position())];
500 bufDup.get(dataBeginBytes);
501 dataBegin = Bytes.toStringBinary(dataBeginBytes);
502 }
503 String blockInfoMsg =
504 "Block offset: " + offset + ", data starts with: " + dataBegin;
505 throw new IOException("On-disk size without header provided is "
506 + expectedOnDiskSizeWithoutHeader + ", but block "
507 + "header contains " + onDiskSizeWithoutHeader + ". " +
508 blockInfoMsg);
509 }
510 }
511
512
513
514
515
516 HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException {
517 if (!fileContext.isCompressedOrEncrypted()) {
518
519
520
521 return this;
522 }
523
524 HFileBlock unpacked = new HFileBlock(this);
525 unpacked.allocateBuffer();
526
527 HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ?
528 reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
529
530 ByteBuffer dup = this.buf.duplicate();
531 dup.position(this.headerSize());
532 dup = dup.slice();
533 ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
534 unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(),
535 dup);
536
537
538 if (unpacked.hasNextBlockHeader()) {
539
540
541
542
543 ByteBuffer inDup = this.buf.duplicate();
544 inDup.limit(inDup.limit() + headerSize());
545 ByteBuffer outDup = unpacked.buf.duplicate();
546 outDup.limit(outDup.limit() + unpacked.headerSize());
547 ByteBufferUtils.copyFromBufferToBuffer(
548 outDup,
549 inDup,
550 this.onDiskDataSizeWithHeader,
551 unpacked.headerSize() + unpacked.uncompressedSizeWithoutHeader
552 + unpacked.totalChecksumBytes(), unpacked.headerSize());
553 }
554 return unpacked;
555 }
556
557
558
559
560 private boolean hasNextBlockHeader() {
561 return nextBlockOnDiskSizeWithHeader > 0;
562 }
563
564
565
566
567
568
569 private void allocateBuffer() {
570 int cksumBytes = totalChecksumBytes();
571 int headerSize = headerSize();
572 int capacityNeeded = headerSize + uncompressedSizeWithoutHeader +
573 cksumBytes + (hasNextBlockHeader() ? headerSize : 0);
574
575
576 ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded);
577
578
579
580 ByteBuffer dup = buf.duplicate();
581 dup.position(0);
582 dup.get(newBuf.array(), newBuf.arrayOffset(), headerSize);
583
584 buf = newBuf;
585
586 buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes);
587 }
588
589
590
591
592
593 public boolean isUnpacked() {
594 final int cksumBytes = totalChecksumBytes();
595 final int headerSize = headerSize();
596 final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
597 final int bufCapacity = buf.capacity();
598 return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
599 }
600
601
602 public void assumeUncompressed() throws IOException {
603 if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader +
604 totalChecksumBytes()) {
605 throw new IOException("Using no compression but "
606 + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
607 + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader
608 + ", numChecksumbytes=" + totalChecksumBytes());
609 }
610 }
611
612
613
614
615
616 public void expectType(BlockType expectedType) throws IOException {
617 if (blockType != expectedType) {
618 throw new IOException("Invalid block type: expected=" + expectedType
619 + ", actual=" + blockType);
620 }
621 }
622
623
624 public long getOffset() {
625 if (offset < 0) {
626 throw new IllegalStateException(
627 "HFile block offset not initialized properly");
628 }
629 return offset;
630 }
631
632
633
634
635 public DataInputStream getByteStream() {
636 ByteBuffer dup = this.buf.duplicate();
637 dup.position(this.headerSize());
638 return new DataInputStream(new ByteBufferInputStream(dup));
639 }
640
641 @Override
642 public long heapSize() {
643 long size = ClassSize.align(
644 ClassSize.OBJECT +
645
646 3 * ClassSize.REFERENCE +
647
648
649 4 * Bytes.SIZEOF_INT +
650
651 2 * Bytes.SIZEOF_LONG +
652
653 fileContext.heapSize()
654 );
655
656 if (buf != null) {
657
658 size += ClassSize.align(buf.capacity() + BYTE_BUFFER_HEAP_SIZE);
659 }
660
661 return ClassSize.align(size);
662 }
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679 public static boolean readWithExtra(InputStream in, byte[] buf,
680 int bufOffset, int necessaryLen, int extraLen) throws IOException {
681 int bytesRemaining = necessaryLen + extraLen;
682 while (bytesRemaining > 0) {
683 int ret = in.read(buf, bufOffset, bytesRemaining);
684 if (ret == -1 && bytesRemaining <= extraLen) {
685
686 break;
687 }
688
689 if (ret < 0) {
690 throw new IOException("Premature EOF from inputStream (read "
691 + "returned " + ret + ", was trying to read " + necessaryLen
692 + " necessary bytes and " + extraLen + " extra bytes, "
693 + "successfully read "
694 + (necessaryLen + extraLen - bytesRemaining));
695 }
696 bufOffset += ret;
697 bytesRemaining -= ret;
698 }
699 return bytesRemaining <= 0;
700 }
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719 @VisibleForTesting
720 static boolean positionalReadWithExtra(FSDataInputStream in,
721 long position, byte[] buf, int bufOffset, int necessaryLen, int extraLen)
722 throws IOException {
723 int bytesRemaining = necessaryLen + extraLen;
724 int bytesRead = 0;
725 while (bytesRead < necessaryLen) {
726 int ret = in.read(position, buf, bufOffset, bytesRemaining);
727 if (ret < 0) {
728 throw new IOException("Premature EOF from inputStream (positional read "
729 + "returned " + ret + ", was trying to read " + necessaryLen
730 + " necessary bytes and " + extraLen + " extra bytes, "
731 + "successfully read " + bytesRead);
732 }
733 position += ret;
734 bufOffset += ret;
735 bytesRemaining -= ret;
736 bytesRead += ret;
737 }
738 return bytesRead != necessaryLen && bytesRemaining <= 0;
739 }
740
741
742
743
744
745 public int getNextBlockOnDiskSizeWithHeader() {
746 return nextBlockOnDiskSizeWithHeader;
747 }
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762 public static class Writer {
763
764 private enum State {
765 INIT,
766 WRITING,
767 BLOCK_READY
768 };
769
770
771 private State state = State.INIT;
772
773
774 private final HFileDataBlockEncoder dataBlockEncoder;
775
776 private HFileBlockEncodingContext dataBlockEncodingCtx;
777
778
779 private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
780
781
782
783
784
785
786
787 private ByteArrayOutputStream baosInMemory;
788
789
790
791
792
793
794 private BlockType blockType;
795
796
797
798
799
800 private DataOutputStream userDataStream;
801
802
803
804 private int unencodedDataSizeWritten;
805
806
807
808
809
810
811 private byte[] onDiskBytesWithHeader;
812
813
814
815
816
817
818
819 private byte[] onDiskChecksum;
820
821
822
823
824
825
826
827
828 private byte[] uncompressedBytesWithHeader;
829
830
831
832
833
834 private long startOffset;
835
836
837
838
839
840 private long[] prevOffsetByType;
841
842
843 private long prevOffset;
844
845 private HFileContext fileContext;
846
847
848
849
850 public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) {
851 this.dataBlockEncoder = dataBlockEncoder != null
852 ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
853 defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null,
854 HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
855 dataBlockEncodingCtx = this.dataBlockEncoder
856 .newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
857
858 if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
859 throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
860 " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " +
861 fileContext.getBytesPerChecksum());
862 }
863
864 baosInMemory = new ByteArrayOutputStream();
865
866 prevOffsetByType = new long[BlockType.values().length];
867 for (int i = 0; i < prevOffsetByType.length; ++i)
868 prevOffsetByType[i] = -1;
869
870 this.fileContext = fileContext;
871 }
872
873
874
875
876
877
878
879 public DataOutputStream startWriting(BlockType newBlockType)
880 throws IOException {
881 if (state == State.BLOCK_READY && startOffset != -1) {
882
883
884 prevOffsetByType[blockType.getId()] = startOffset;
885 }
886
887 startOffset = -1;
888 blockType = newBlockType;
889
890 baosInMemory.reset();
891 baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
892
893 state = State.WRITING;
894
895
896 userDataStream = new DataOutputStream(baosInMemory);
897 if (newBlockType == BlockType.DATA) {
898 this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
899 }
900 this.unencodedDataSizeWritten = 0;
901 return userDataStream;
902 }
903
904
905
906
907
908
909 public void write(Cell cell) throws IOException{
910 expectState(State.WRITING);
911 this.unencodedDataSizeWritten += this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx,
912 this.userDataStream);
913 }
914
915
916
917
918
919
920
921
922 DataOutputStream getUserDataStream() {
923 expectState(State.WRITING);
924 return userDataStream;
925 }
926
927
928
929
930
931 void ensureBlockReady() throws IOException {
932 Preconditions.checkState(state != State.INIT,
933 "Unexpected state: " + state);
934
935 if (state == State.BLOCK_READY)
936 return;
937
938
939 finishBlock();
940 }
941
942
943
944
945
946
947
948 private void finishBlock() throws IOException {
949 if (blockType == BlockType.DATA) {
950 BufferGrabbingByteArrayOutputStream baosInMemoryCopy =
951 new BufferGrabbingByteArrayOutputStream();
952 baosInMemory.writeTo(baosInMemoryCopy);
953 this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
954 baosInMemoryCopy.buf, blockType);
955 blockType = dataBlockEncodingCtx.getBlockType();
956 }
957 userDataStream.flush();
958
959 uncompressedBytesWithHeader = baosInMemory.toByteArray();
960 prevOffset = prevOffsetByType[blockType.getId()];
961
962
963
964
965 state = State.BLOCK_READY;
966 if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
967 onDiskBytesWithHeader = dataBlockEncodingCtx
968 .compressAndEncrypt(uncompressedBytesWithHeader);
969 } else {
970 onDiskBytesWithHeader = defaultBlockEncodingCtx
971 .compressAndEncrypt(uncompressedBytesWithHeader);
972 }
973 int numBytes = (int) ChecksumUtil.numBytes(
974 onDiskBytesWithHeader.length,
975 fileContext.getBytesPerChecksum());
976
977
978 putHeader(onDiskBytesWithHeader, 0,
979 onDiskBytesWithHeader.length + numBytes,
980 uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
981
982 putHeader(uncompressedBytesWithHeader, 0,
983 onDiskBytesWithHeader.length + numBytes,
984 uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
985
986 onDiskChecksum = new byte[numBytes];
987 ChecksumUtil.generateChecksums(
988 onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
989 onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum());
990 }
991
992 public static class BufferGrabbingByteArrayOutputStream extends ByteArrayOutputStream {
993 private byte[] buf;
994
995 @Override
996 public void write(byte[] b, int off, int len) {
997 this.buf = b;
998 }
999
1000 public byte[] getBuffer() {
1001 return this.buf;
1002 }
1003 }
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013 private void putHeader(byte[] dest, int offset, int onDiskSize,
1014 int uncompressedSize, int onDiskDataSize) {
1015 offset = blockType.put(dest, offset);
1016 offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1017 offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1018 offset = Bytes.putLong(dest, offset, prevOffset);
1019 offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode());
1020 offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum());
1021 Bytes.putInt(dest, offset, onDiskDataSize);
1022 }
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032 public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
1033 long offset = out.getPos();
1034 if (startOffset != -1 && offset != startOffset) {
1035 throw new IOException("A " + blockType + " block written to a "
1036 + "stream twice, first at offset " + startOffset + ", then at "
1037 + offset);
1038 }
1039 startOffset = offset;
1040
1041 finishBlockAndWriteHeaderAndData((DataOutputStream) out);
1042 }
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053 protected void finishBlockAndWriteHeaderAndData(DataOutputStream out)
1054 throws IOException {
1055 ensureBlockReady();
1056 long startTime = System.currentTimeMillis();
1057 out.write(onDiskBytesWithHeader);
1058 out.write(onDiskChecksum);
1059 HFile.updateWriteLatency(System.currentTimeMillis() - startTime);
1060 }
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072 byte[] getHeaderAndDataForTest() throws IOException {
1073 ensureBlockReady();
1074
1075
1076 byte[] output =
1077 new byte[onDiskBytesWithHeader.length
1078 + onDiskChecksum.length];
1079 System.arraycopy(onDiskBytesWithHeader, 0, output, 0,
1080 onDiskBytesWithHeader.length);
1081 System.arraycopy(onDiskChecksum, 0, output,
1082 onDiskBytesWithHeader.length, onDiskChecksum.length);
1083 return output;
1084 }
1085
1086
1087
1088
1089 public void release() {
1090 if (dataBlockEncodingCtx != null) {
1091 dataBlockEncodingCtx.close();
1092 dataBlockEncodingCtx = null;
1093 }
1094 if (defaultBlockEncodingCtx != null) {
1095 defaultBlockEncodingCtx.close();
1096 defaultBlockEncodingCtx = null;
1097 }
1098 }
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108 int getOnDiskSizeWithoutHeader() {
1109 expectState(State.BLOCK_READY);
1110 return onDiskBytesWithHeader.length
1111 + onDiskChecksum.length
1112 - HConstants.HFILEBLOCK_HEADER_SIZE;
1113 }
1114
1115
1116
1117
1118
1119
1120
1121
1122 int getOnDiskSizeWithHeader() {
1123 expectState(State.BLOCK_READY);
1124 return onDiskBytesWithHeader.length + onDiskChecksum.length;
1125 }
1126
1127
1128
1129
1130 int getUncompressedSizeWithoutHeader() {
1131 expectState(State.BLOCK_READY);
1132 return uncompressedBytesWithHeader.length - HConstants.HFILEBLOCK_HEADER_SIZE;
1133 }
1134
1135
1136
1137
1138 int getUncompressedSizeWithHeader() {
1139 expectState(State.BLOCK_READY);
1140 return uncompressedBytesWithHeader.length;
1141 }
1142
1143
1144 public boolean isWriting() {
1145 return state == State.WRITING;
1146 }
1147
1148
1149
1150
1151
1152
1153
1154
1155 public int blockSizeWritten() {
1156 if (state != State.WRITING) return 0;
1157 return this.unencodedDataSizeWritten;
1158 }
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168 ByteBuffer getUncompressedBufferWithHeader() {
1169 expectState(State.BLOCK_READY);
1170 return ByteBuffer.wrap(uncompressedBytesWithHeader);
1171 }
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181 ByteBuffer getOnDiskBufferWithHeader() {
1182 expectState(State.BLOCK_READY);
1183 return ByteBuffer.wrap(onDiskBytesWithHeader);
1184 }
1185
1186 private void expectState(State expectedState) {
1187 if (state != expectedState) {
1188 throw new IllegalStateException("Expected state: " + expectedState +
1189 ", actual state: " + state);
1190 }
1191 }
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203 public void writeBlock(BlockWritable bw, FSDataOutputStream out)
1204 throws IOException {
1205 bw.writeToBlock(startWriting(bw.getBlockType()));
1206 writeHeaderAndData(out);
1207 }
1208
1209
1210
1211
1212
1213
1214
1215
1216 public HFileBlock getBlockForCaching(CacheConfig cacheConf) {
1217 HFileContext newContext = new HFileContextBuilder()
1218 .withBlockSize(fileContext.getBlocksize())
1219 .withBytesPerCheckSum(0)
1220 .withChecksumType(ChecksumType.NULL)
1221 .withCompression(fileContext.getCompression())
1222 .withDataBlockEncoding(fileContext.getDataBlockEncoding())
1223 .withHBaseCheckSum(fileContext.isUseHBaseChecksum())
1224 .withCompressTags(fileContext.isCompressTags())
1225 .withIncludesMvcc(fileContext.isIncludesMvcc())
1226 .withIncludesTags(fileContext.isIncludesTags())
1227 .build();
1228 return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
1229 getUncompressedSizeWithoutHeader(), prevOffset,
1230 cacheConf.shouldCacheCompressed(blockType.getCategory()) ?
1231 getOnDiskBufferWithHeader() :
1232 getUncompressedBufferWithHeader(),
1233 FILL_HEADER, startOffset,
1234 onDiskBytesWithHeader.length + onDiskChecksum.length, newContext);
1235 }
1236 }
1237
1238
1239 public interface BlockWritable {
1240
1241
1242 BlockType getBlockType();
1243
1244
1245
1246
1247
1248
1249
1250 void writeToBlock(DataOutput out) throws IOException;
1251 }
1252
1253
1254
1255
1256 public interface BlockIterator {
1257
1258
1259
1260
1261 HFileBlock nextBlock() throws IOException;
1262
1263
1264
1265
1266
1267 HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException;
1268 }
1269
1270
1271 public interface FSReader {
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284 HFileBlock readBlockData(long offset, long onDiskSize,
1285 int uncompressedSize, boolean pread, boolean updateMetrics) throws IOException;
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296 BlockIterator blockRange(long startOffset, long endOffset);
1297
1298
1299 void closeStreams() throws IOException;
1300
1301
1302 HFileBlockDecodingContext getBlockDecodingContext();
1303
1304
1305 HFileBlockDecodingContext getDefaultBlockDecodingContext();
1306
1307
1308
1309
1310
1311 void unbufferStream();
1312 }
1313
1314
1315
1316
1317
1318 private abstract static class AbstractFSReader implements FSReader {
1319
1320
1321
1322 protected long fileSize;
1323
1324
1325 protected final int hdrSize;
1326
1327
1328 protected HFileSystem hfs;
1329
1330 protected final Lock streamLock = new ReentrantLock();
1331
1332
1333 public static final int DEFAULT_BUFFER_SIZE = 1 << 20;
1334
1335 protected HFileContext fileContext;
1336
1337 protected String pathName;
1338
1339 public AbstractFSReader(long fileSize, HFileSystem hfs, Path path, HFileContext fileContext)
1340 throws IOException {
1341 this.fileSize = fileSize;
1342 this.hfs = hfs;
1343 if (path != null) {
1344 this.pathName = path.toString();
1345 }
1346 this.fileContext = fileContext;
1347 this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
1348 }
1349
1350 @Override
1351 public BlockIterator blockRange(final long startOffset,
1352 final long endOffset) {
1353 final FSReader owner = this;
1354 return new BlockIterator() {
1355 private long offset = startOffset;
1356
1357 private long length = -1;
1358
1359 @Override
1360 public HFileBlock nextBlock() throws IOException {
1361 if (offset >= endOffset)
1362 return null;
1363 HFileBlock b = readBlockData(offset, length, -1, false, false);
1364 offset += b.getOnDiskSizeWithHeader();
1365 length = b.getNextBlockOnDiskSizeWithHeader();
1366 return b.unpack(fileContext, owner);
1367 }
1368
1369 @Override
1370 public HFileBlock nextBlockWithBlockType(BlockType blockType)
1371 throws IOException {
1372 HFileBlock blk = nextBlock();
1373 if (blk.getBlockType() != blockType) {
1374 throw new IOException("Expected block of type " + blockType
1375 + " but found " + blk.getBlockType());
1376 }
1377 return blk;
1378 }
1379 };
1380 }
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397 protected int readAtOffset(FSDataInputStream istream,
1398 byte[] dest, int destOffset, int size,
1399 boolean peekIntoNextBlock, long fileOffset, boolean pread)
1400 throws IOException {
1401 if (peekIntoNextBlock &&
1402 destOffset + size + hdrSize > dest.length) {
1403
1404
1405 throw new IOException("Attempted to read " + size + " bytes and " +
1406 hdrSize + " bytes of next header into a " + dest.length +
1407 "-byte array at offset " + destOffset);
1408 }
1409
1410 if (!pread && streamLock.tryLock()) {
1411
1412 try {
1413 HFileUtil.seekOnMultipleSources(istream, fileOffset);
1414
1415 long realOffset = istream.getPos();
1416 if (realOffset != fileOffset) {
1417 throw new IOException("Tried to seek to " + fileOffset + " to "
1418 + "read " + size + " bytes, but pos=" + realOffset
1419 + " after seek");
1420 }
1421
1422 if (!peekIntoNextBlock) {
1423 IOUtils.readFully(istream, dest, destOffset, size);
1424 return -1;
1425 }
1426
1427
1428 if (!readWithExtra(istream, dest, destOffset, size, hdrSize))
1429 return -1;
1430 } finally {
1431 streamLock.unlock();
1432 }
1433 } else {
1434
1435 int extraSize = peekIntoNextBlock ? hdrSize : 0;
1436 if (!positionalReadWithExtra(istream, fileOffset, dest, destOffset,
1437 size, extraSize)) {
1438 return -1;
1439 }
1440 }
1441
1442 assert peekIntoNextBlock;
1443 return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize;
1444 }
1445
1446 }
1447
1448
1449
1450
1451
1452 private static class PrefetchedHeader {
1453 long offset = -1;
1454 byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
1455 final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
1456 }
1457
1458
1459 static class FSReaderImpl extends AbstractFSReader {
1460
1461
1462 protected FSDataInputStreamWrapper streamWrapper;
1463
1464 private HFileBlockDecodingContext encodedBlockDecodingCtx;
1465
1466
1467 private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
1468
1469 private AtomicReference<PrefetchedHeader> prefetchedHeader =
1470 new AtomicReference<PrefetchedHeader>(new PrefetchedHeader());
1471
1472 public FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
1473 HFileContext fileContext) throws IOException {
1474 super(fileSize, hfs, path, fileContext);
1475 this.streamWrapper = stream;
1476
1477 this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
1478 defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
1479 encodedBlockDecodingCtx = defaultDecodingCtx;
1480 }
1481
1482
1483
1484
1485
1486 FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext)
1487 throws IOException {
1488 this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext);
1489 }
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502 @Override
1503 public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL,
1504 int uncompressedSize, boolean pread, boolean updateMetrics)
1505 throws IOException {
1506
1507
1508
1509
1510
1511
1512 boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
1513 FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
1514
1515 HFileBlock blk = readBlockDataInternal(is, offset,
1516 onDiskSizeWithHeaderL,
1517 uncompressedSize, pread,
1518 doVerificationThruHBaseChecksum, updateMetrics);
1519 if (blk == null) {
1520 HFile.LOG.warn("HBase checksum verification failed for file " +
1521 pathName + " at offset " +
1522 offset + " filesize " + fileSize +
1523 ". Retrying read with HDFS checksums turned on...");
1524
1525 if (!doVerificationThruHBaseChecksum) {
1526 String msg = "HBase checksum verification failed for file " +
1527 pathName + " at offset " +
1528 offset + " filesize " + fileSize +
1529 " but this cannot happen because doVerify is " +
1530 doVerificationThruHBaseChecksum;
1531 HFile.LOG.warn(msg);
1532 throw new IOException(msg);
1533 }
1534 HFile.checksumFailures.increment();
1535
1536
1537
1538
1539
1540
1541
1542 is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
1543 doVerificationThruHBaseChecksum = false;
1544 blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL,
1545 uncompressedSize, pread,
1546 doVerificationThruHBaseChecksum, updateMetrics);
1547 if (blk != null) {
1548 HFile.LOG.warn("HDFS checksum verification suceeded for file " +
1549 pathName + " at offset " +
1550 offset + " filesize " + fileSize);
1551 }
1552 }
1553 if (blk == null && !doVerificationThruHBaseChecksum) {
1554 String msg = "readBlockData failed, possibly due to " +
1555 "checksum verification failed for file " + pathName +
1556 " at offset " + offset + " filesize " + fileSize;
1557 HFile.LOG.warn(msg);
1558 throw new IOException(msg);
1559 }
1560
1561
1562
1563
1564
1565
1566
1567
1568 streamWrapper.checksumOk();
1569 return blk;
1570 }
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585 private HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
1586 long onDiskSizeWithHeaderL, int uncompressedSize, boolean pread,
1587 boolean verifyChecksum, boolean updateMetrics)
1588 throws IOException {
1589 if (offset < 0) {
1590 throw new IOException("Invalid offset=" + offset + " trying to read "
1591 + "block (onDiskSize=" + onDiskSizeWithHeaderL
1592 + ", uncompressedSize=" + uncompressedSize + ")");
1593 }
1594
1595 if (uncompressedSize != -1) {
1596 throw new IOException("Version 2 block reader API does not need " +
1597 "the uncompressed size parameter");
1598 }
1599
1600 if ((onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1)
1601 || onDiskSizeWithHeaderL >= Integer.MAX_VALUE) {
1602 throw new IOException("Invalid onDisksize=" + onDiskSizeWithHeaderL
1603 + ": expected to be at least " + hdrSize
1604 + " and at most " + Integer.MAX_VALUE + ", or -1 (offset="
1605 + offset + ", uncompressedSize=" + uncompressedSize + ")");
1606 }
1607
1608 int onDiskSizeWithHeader = (int) onDiskSizeWithHeaderL;
1609
1610
1611
1612
1613
1614 PrefetchedHeader ph = prefetchedHeader.getAndSet(null);
1615 ByteBuffer headerBuf = null;
1616 if (ph != null) {
1617 if (ph.offset == offset) {
1618 headerBuf = ph.buf;
1619 } else {
1620 prefetchedHeader.set(ph);
1621 }
1622 }
1623
1624
1625 int nextBlockOnDiskSize = 0;
1626 byte[] onDiskBlock = null;
1627
1628 long startTime = System.currentTimeMillis();
1629 HFileBlock b = null;
1630 if (onDiskSizeWithHeader > 0) {
1631
1632
1633
1634
1635
1636
1637
1638
1639 int preReadHeaderSize = headerBuf == null ? 0 : hdrSize;
1640 onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize];
1641
1642 nextBlockOnDiskSize = readAtOffset(is, onDiskBlock,
1643 preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize,
1644 true, offset + preReadHeaderSize, pread);
1645 if (headerBuf != null) {
1646
1647
1648
1649 assert headerBuf.hasArray();
1650 System.arraycopy(headerBuf.array(),
1651 headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1652 } else {
1653 headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize);
1654 }
1655
1656 try {
1657
1658 b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum());
1659 } catch (IOException ex) {
1660
1661 throw new IOException("Failed to read compressed block at "
1662 + offset
1663 + ", onDiskSizeWithoutHeader="
1664 + onDiskSizeWithHeader
1665 + ", preReadHeaderSize="
1666 + hdrSize
1667 + ", header.length="
1668 + prefetchedHeader.get().header.length
1669 + ", header bytes: "
1670 + Bytes.toStringBinary(prefetchedHeader.get().header, 0,
1671 hdrSize), ex);
1672 }
1673
1674 int onDiskSizeWithoutHeader = onDiskSizeWithHeader - hdrSize;
1675 assert onDiskSizeWithoutHeader >= 0;
1676 b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader);
1677 } else {
1678
1679
1680
1681
1682
1683
1684
1685
1686 if (headerBuf == null) {
1687
1688
1689
1690
1691
1692 headerBuf = ByteBuffer.allocate(hdrSize);
1693
1694 readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(),
1695 hdrSize, false, offset, pread);
1696 }
1697
1698 b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum());
1699 onDiskBlock = new byte[b.getOnDiskSizeWithHeader() + hdrSize];
1700
1701 System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1702 nextBlockOnDiskSize =
1703 readAtOffset(is, onDiskBlock, hdrSize, b.getOnDiskSizeWithHeader()
1704 - hdrSize, true, offset + hdrSize, pread);
1705 onDiskSizeWithHeader = b.onDiskSizeWithoutHeader + hdrSize;
1706 }
1707
1708 if (!fileContext.isCompressedOrEncrypted()) {
1709 b.assumeUncompressed();
1710 }
1711
1712 if (verifyChecksum && !validateBlockChecksum(b, onDiskBlock, hdrSize)) {
1713 return null;
1714 }
1715
1716 long duration = System.currentTimeMillis() - startTime;
1717 if (updateMetrics) {
1718 HFile.updateReadLatency(duration, pread);
1719 }
1720
1721
1722
1723
1724
1725 b = new HFileBlock(ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader),
1726 this.fileContext.isUseHBaseChecksum());
1727
1728 if (LOG.isTraceEnabled()) {
1729 LOG.trace("Read " + b + " in " + duration + " ns");
1730 }
1731
1732 b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize;
1733
1734
1735 if (b.hasNextBlockHeader()) {
1736 ph = new PrefetchedHeader();
1737 ph.offset = offset + b.getOnDiskSizeWithHeader();
1738 System.arraycopy(onDiskBlock, onDiskSizeWithHeader, ph.header, 0, hdrSize);
1739 prefetchedHeader.set(ph);
1740 }
1741
1742 b.offset = offset;
1743 b.fileContext.setIncludesTags(this.fileContext.isIncludesTags());
1744 b.fileContext.setIncludesMvcc(this.fileContext.isIncludesMvcc());
1745 return b;
1746 }
1747
1748 void setIncludesMemstoreTS(boolean includesMemstoreTS) {
1749 this.fileContext.setIncludesMvcc(includesMemstoreTS);
1750 }
1751
1752 void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
1753 encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext);
1754 }
1755
1756 @Override
1757 public HFileBlockDecodingContext getBlockDecodingContext() {
1758 return this.encodedBlockDecodingCtx;
1759 }
1760
1761 @Override
1762 public HFileBlockDecodingContext getDefaultBlockDecodingContext() {
1763 return this.defaultDecodingCtx;
1764 }
1765
1766
1767
1768
1769
1770
1771
1772 protected boolean validateBlockChecksum(HFileBlock block, byte[] data, int hdrSize)
1773 throws IOException {
1774 return ChecksumUtil.validateBlockChecksum(pathName, block, data, hdrSize);
1775 }
1776
1777 @Override
1778 public void closeStreams() throws IOException {
1779 streamWrapper.close();
1780 }
1781
1782 @Override
1783 public void unbufferStream() {
1784
1785
1786 if (streamLock.tryLock()) {
1787 try {
1788 this.streamWrapper.unbuffer();
1789 } finally {
1790 streamLock.unlock();
1791 }
1792 }
1793 }
1794
1795 @Override
1796 public String toString() {
1797 return "hfs=" + hfs + ", path=" + pathName + ", fileContext=" + fileContext;
1798 }
1799 }
1800
1801 @Override
1802 public int getSerializedLength() {
1803 if (buf != null) {
1804
1805 int extraSpace = hasNextBlockHeader() ? headerSize() : 0;
1806 return this.buf.limit() + extraSpace + HFileBlock.EXTRA_SERIALIZATION_SPACE;
1807 }
1808 return 0;
1809 }
1810
1811 @Override
1812 public void serialize(ByteBuffer destination) {
1813 ByteBufferUtils.copyFromBufferToBuffer(destination, this.buf, 0, getSerializedLength()
1814 - EXTRA_SERIALIZATION_SPACE);
1815 serializeExtraInfo(destination);
1816 }
1817
1818 public void serializeExtraInfo(ByteBuffer destination) {
1819 destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0);
1820 destination.putLong(this.offset);
1821 destination.putInt(this.nextBlockOnDiskSizeWithHeader);
1822 destination.rewind();
1823 }
1824
1825 @Override
1826 public CacheableDeserializer<Cacheable> getDeserializer() {
1827 return HFileBlock.blockDeserializer;
1828 }
1829
1830 @Override
1831 public boolean equals(Object comparison) {
1832 if (this == comparison) {
1833 return true;
1834 }
1835 if (comparison == null) {
1836 return false;
1837 }
1838 if (comparison.getClass() != this.getClass()) {
1839 return false;
1840 }
1841
1842 HFileBlock castedComparison = (HFileBlock) comparison;
1843
1844 if (castedComparison.blockType != this.blockType) {
1845 return false;
1846 }
1847 if (castedComparison.nextBlockOnDiskSizeWithHeader != this.nextBlockOnDiskSizeWithHeader) {
1848 return false;
1849 }
1850 if (castedComparison.offset != this.offset) {
1851 return false;
1852 }
1853 if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) {
1854 return false;
1855 }
1856 if (castedComparison.prevBlockOffset != this.prevBlockOffset) {
1857 return false;
1858 }
1859 if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
1860 return false;
1861 }
1862 if (ByteBufferUtils.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0,
1863 castedComparison.buf.limit()) != 0) {
1864 return false;
1865 }
1866 return true;
1867 }
1868
1869 public DataBlockEncoding getDataBlockEncoding() {
1870 if (blockType == BlockType.ENCODED_DATA) {
1871 return DataBlockEncoding.getEncodingById(getDataBlockEncodingId());
1872 }
1873 return DataBlockEncoding.NONE;
1874 }
1875
1876 byte getChecksumType() {
1877 return this.fileContext.getChecksumType().getCode();
1878 }
1879
1880 int getBytesPerChecksum() {
1881 return this.fileContext.getBytesPerChecksum();
1882 }
1883
1884
1885 int getOnDiskDataSizeWithHeader() {
1886 return this.onDiskDataSizeWithHeader;
1887 }
1888
1889
1890
1891
1892
1893 int totalChecksumBytes() {
1894
1895
1896
1897
1898 if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) {
1899 return 0;
1900 }
1901 return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader,
1902 this.fileContext.getBytesPerChecksum());
1903 }
1904
1905
1906
1907
1908 public int headerSize() {
1909 return headerSize(this.fileContext.isUseHBaseChecksum());
1910 }
1911
1912
1913
1914
1915 public static int headerSize(boolean usesHBaseChecksum) {
1916 if (usesHBaseChecksum) {
1917 return HConstants.HFILEBLOCK_HEADER_SIZE;
1918 }
1919 return HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
1920 }
1921
1922
1923
1924
1925 public byte[] getDummyHeaderForVersion() {
1926 return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum());
1927 }
1928
1929
1930
1931
1932 static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) {
1933 if (usesHBaseChecksum) {
1934 return HConstants.HFILEBLOCK_DUMMY_HEADER;
1935 }
1936 return DUMMY_HEADER_NO_CHECKSUM;
1937 }
1938
1939
1940
1941
1942
1943 public HFileContext getHFileContext() {
1944 return this.fileContext;
1945 }
1946
1947
1948
1949
1950
1951
1952 static String toStringHeader(ByteBuffer buf) throws IOException {
1953 byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)];
1954 buf.get(magicBuf);
1955 BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH);
1956 int compressedBlockSizeNoHeader = buf.getInt();
1957 int uncompressedBlockSizeNoHeader = buf.getInt();
1958 long prevBlockOffset = buf.getLong();
1959 byte cksumtype = buf.get();
1960 long bytesPerChecksum = buf.getInt();
1961 long onDiskDataSizeWithHeader = buf.getInt();
1962 return " Header dump: magic: " + Bytes.toString(magicBuf) +
1963 " blockType " + bt +
1964 " compressedBlockSizeNoHeader " +
1965 compressedBlockSizeNoHeader +
1966 " uncompressedBlockSizeNoHeader " +
1967 uncompressedBlockSizeNoHeader +
1968 " prevBlockOffset " + prevBlockOffset +
1969 " checksumType " + ChecksumType.codeToType(cksumtype) +
1970 " bytesPerChecksum " + bytesPerChecksum +
1971 " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
1972 }
1973 }