1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.io.hfile;
19
20 import java.io.DataInput;
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.util.ArrayList;
24 import java.util.List;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.Cell;
32 import org.apache.hadoop.hbase.CellUtil;
33 import org.apache.hadoop.hbase.HConstants;
34 import org.apache.hadoop.hbase.KeyValue;
35 import org.apache.hadoop.hbase.KeyValue.KVComparator;
36 import org.apache.hadoop.hbase.NoTagsKeyValue;
37 import org.apache.hadoop.hbase.fs.HFileSystem;
38 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
39 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
40 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
41 import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
42 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
43 import org.apache.hadoop.hbase.util.Bytes;
44 import org.apache.hadoop.hbase.util.IdLock;
45 import org.apache.hadoop.io.WritableUtils;
46 import org.apache.htrace.Trace;
47 import org.apache.htrace.TraceScope;
48
49 import com.google.common.annotations.VisibleForTesting;
50
51
52
53
54 @InterfaceAudience.Private
55 public class HFileReaderV2 extends AbstractHFileReader {
56
57 private static final Log LOG = LogFactory.getLog(HFileReaderV2.class);
58
59
60 public static final int MINOR_VERSION_WITH_CHECKSUM = 1;
61
62 public static final int MINOR_VERSION_NO_CHECKSUM = 0;
63
64
65 public static final int PBUF_TRAILER_MINOR_VERSION = 2;
66
67
68
69
70
71 public final static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
72
73 private boolean includesMemstoreTS = false;
74 protected boolean decodeMemstoreTS = false;
75
76 protected boolean shouldIncludeMemstoreTS() {
77 return includesMemstoreTS;
78 }
79
80
81 private HFileBlock.FSReader fsBlockReader;
82
83
84
85
86
87
88
89 private IdLock offsetLock = new IdLock();
90
91
92
93
94
95 private List<HFileBlock> loadOnOpenBlocks = new ArrayList<HFileBlock>();
96
97
98 static final int MIN_MINOR_VERSION = 0;
99
100
101
102
103 static final int MAX_MINOR_VERSION = 3;
104
105
106 static final int MINOR_VERSION_WITH_FAKED_KEY = 3;
107
108 HFileContext hfileContext;
109
110
111
112
113
114
115
116
117
118
119
120
121
122 public HFileReaderV2(final Path path, final FixedFileTrailer trailer,
123 final FSDataInputStreamWrapper fsdis, final long size, final CacheConfig cacheConf,
124 final HFileSystem hfs, final Configuration conf) throws IOException {
125 super(path, trailer, size, cacheConf, hfs, conf);
126 this.conf = conf;
127 trailer.expectMajorVersion(getMajorVersion());
128 validateMinorVersion(path, trailer.getMinorVersion());
129 this.hfileContext = createHFileContext(fsdis, fileSize, hfs, path, trailer);
130 HFileBlock.FSReaderImpl fsBlockReaderV2 =
131 new HFileBlock.FSReaderImpl(fsdis, fileSize, hfs, path, hfileContext);
132 this.fsBlockReader = fsBlockReaderV2;
133
134
135 comparator = trailer.createComparator();
136 dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader(comparator,
137 trailer.getNumDataIndexLevels(), this);
138 metaBlockIndexReader = new HFileBlockIndex.BlockIndexReader(
139 KeyValue.RAW_COMPARATOR, 1);
140
141
142
143 HFileBlock.BlockIterator blockIter = fsBlockReaderV2.blockRange(
144 trailer.getLoadOnOpenDataOffset(),
145 fileSize - trailer.getTrailerSize());
146
147
148
149 dataBlockIndexReader.readMultiLevelIndexRoot(
150 blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX),
151 trailer.getDataIndexCount());
152
153
154 metaBlockIndexReader.readRootIndex(
155 blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX),
156 trailer.getMetaIndexCount());
157
158
159 fileInfo = new FileInfo();
160 fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
161 byte[] creationTimeBytes = fileInfo.get(FileInfo.CREATE_TIME_TS);
162 this.hfileContext.setFileCreateTime(creationTimeBytes == null? 0:
163 Bytes.toLong(creationTimeBytes));
164 lastKey = fileInfo.get(FileInfo.LASTKEY);
165 avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN));
166 avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN));
167 byte [] keyValueFormatVersion =
168 fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION);
169 includesMemstoreTS = keyValueFormatVersion != null &&
170 Bytes.toInt(keyValueFormatVersion) ==
171 HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE;
172 fsBlockReaderV2.setIncludesMemstoreTS(includesMemstoreTS);
173 if (includesMemstoreTS) {
174 decodeMemstoreTS = Bytes.toLong(fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY)) > 0;
175 }
176
177
178 dataBlockEncoder = HFileDataBlockEncoderImpl.createFromFileInfo(fileInfo);
179 fsBlockReaderV2.setDataBlockEncoder(dataBlockEncoder);
180
181
182 HFileBlock b;
183 while ((b = blockIter.nextBlock()) != null) {
184 loadOnOpenBlocks.add(b);
185 }
186
187
188 if (cacheConf.shouldPrefetchOnOpen()) {
189 PrefetchExecutor.request(path, new Runnable() {
190 public void run() {
191 try {
192 long offset = 0;
193 long end = fileSize - getTrailer().getTrailerSize();
194 HFileBlock prevBlock = null;
195 while (offset < end) {
196 if (Thread.interrupted()) {
197 break;
198 }
199 long onDiskSize = -1;
200 if (prevBlock != null) {
201 onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader();
202 }
203 HFileBlock block = readBlock(offset, onDiskSize, true, false, false, false,
204 null, null);
205 prevBlock = block;
206 offset += block.getOnDiskSizeWithHeader();
207 }
208 } catch (IOException e) {
209
210 if (LOG.isTraceEnabled()) {
211 LOG.trace("Exception encountered while prefetching " + path + ":", e);
212 }
213 } catch (Exception e) {
214
215 LOG.warn("Exception encountered while prefetching " + path + ":", e);
216 } finally {
217 PrefetchExecutor.complete(path);
218 }
219 }
220 });
221 }
222 }
223
224 protected HFileContext createHFileContext(FSDataInputStreamWrapper fsdis, long fileSize,
225 HFileSystem hfs, Path path, FixedFileTrailer trailer) throws IOException {
226 return new HFileContextBuilder()
227 .withIncludesMvcc(this.includesMemstoreTS)
228 .withCompression(this.compressAlgo)
229 .withHBaseCheckSum(trailer.getMinorVersion() >= MINOR_VERSION_WITH_CHECKSUM)
230 .build();
231 }
232
233
234
235
236
237
238
239
240
241
242
243
244
245 @Override
246 public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
247 final boolean isCompaction) {
248 if (dataBlockEncoder.useEncodedScanner()) {
249 return new EncodedScannerV2(this, cacheBlocks, pread, isCompaction,
250 hfileContext);
251 }
252
253 return new ScannerV2(this, cacheBlocks, pread, isCompaction);
254 }
255
256
257
258
259
260 private HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean cacheBlock, boolean useLock,
261 boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType,
262 DataBlockEncoding expectedDataBlockEncoding) throws IOException {
263
264 if (cacheConf.isBlockCacheEnabled()) {
265 BlockCache cache = cacheConf.getBlockCache();
266 HFileBlock cachedBlock = (HFileBlock) cache.getBlock(cacheKey, cacheBlock, useLock,
267 updateCacheMetrics);
268 if (cachedBlock != null) {
269 if (cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) {
270 cachedBlock = cachedBlock.unpack(hfileContext, fsBlockReader);
271 }
272 validateBlockType(cachedBlock, expectedBlockType);
273
274 if (expectedDataBlockEncoding == null) {
275 return cachedBlock;
276 }
277 DataBlockEncoding actualDataBlockEncoding =
278 cachedBlock.getDataBlockEncoding();
279
280
281
282 if (cachedBlock.getBlockType().isData() &&
283 !actualDataBlockEncoding.equals(expectedDataBlockEncoding)) {
284
285
286
287
288
289
290
291
292
293
294 if (!expectedDataBlockEncoding.equals(DataBlockEncoding.NONE) &&
295 !actualDataBlockEncoding.equals(DataBlockEncoding.NONE)) {
296
297
298
299
300
301
302 LOG.info("Evicting cached block with key " + cacheKey +
303 " because of a data block encoding mismatch" +
304 "; expected: " + expectedDataBlockEncoding +
305 ", actual: " + actualDataBlockEncoding);
306 cache.evictBlock(cacheKey);
307 }
308 return null;
309 }
310 return cachedBlock;
311 }
312 }
313 return null;
314 }
315
316
317
318
319
320
321 @Override
322 public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock)
323 throws IOException {
324 if (trailer.getMetaIndexCount() == 0) {
325 return null;
326 }
327 if (metaBlockIndexReader == null) {
328 throw new IOException("Meta index not loaded");
329 }
330
331 byte[] mbname = Bytes.toBytes(metaBlockName);
332 int block = metaBlockIndexReader.rootBlockContainingKey(mbname,
333 0, mbname.length);
334 if (block == -1)
335 return null;
336 long blockSize = metaBlockIndexReader.getRootBlockDataSize(block);
337
338
339
340
341 synchronized (metaBlockIndexReader.getRootBlockKey(block)) {
342
343 long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block);
344 BlockCacheKey cacheKey = new BlockCacheKey(name, metaBlockOffset,
345 this.isPrimaryReplicaReader(), BlockType.META);
346
347 cacheBlock &= cacheConf.shouldCacheDataOnRead();
348 if (cacheConf.isBlockCacheEnabled()) {
349 HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, false, true, true,
350 BlockType.META, null);
351 if (cachedBlock != null) {
352 assert cachedBlock.isUnpacked() : "Packed block leak.";
353
354
355 return cachedBlock.getBufferWithoutHeader();
356 }
357
358 }
359
360 HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset,
361 blockSize, -1, true, false).unpack(hfileContext, fsBlockReader);
362
363
364 if (cacheBlock) {
365 cacheConf.getBlockCache().cacheBlock(cacheKey, metaBlock,
366 cacheConf.isInMemory(), this.cacheConf.isCacheDataInL1());
367 }
368
369 return metaBlock.getBufferWithoutHeader();
370 }
371 }
372
373 @Override
374 public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize,
375 final boolean cacheBlock, boolean pread, final boolean isCompaction,
376 boolean updateCacheMetrics, BlockType expectedBlockType,
377 DataBlockEncoding expectedDataBlockEncoding)
378 throws IOException {
379 if (dataBlockIndexReader == null) {
380 throw new IOException("Block index not loaded");
381 }
382 if (dataBlockOffset < 0 || dataBlockOffset >= trailer.getLoadOnOpenDataOffset()) {
383 throw new IOException("Requested block is out of range: " + dataBlockOffset +
384 ", lastDataBlockOffset: " + trailer.getLastDataBlockOffset());
385 }
386
387
388
389
390
391 BlockCacheKey cacheKey =
392 new BlockCacheKey(name, dataBlockOffset, this.isPrimaryReplicaReader(), expectedBlockType);
393 boolean useLock = false;
394 IdLock.Entry lockEntry = null;
395 TraceScope traceScope = Trace.startSpan("HFileReaderV2.readBlock");
396 try {
397 while (true) {
398
399 if (cacheConf.shouldReadBlockFromCache(expectedBlockType)) {
400 if (useLock) {
401 lockEntry = offsetLock.getLockEntry(dataBlockOffset);
402 }
403
404
405 HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, isCompaction,
406 updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding);
407 if (cachedBlock != null) {
408 if (Trace.isTracing()) {
409 traceScope.getSpan().addTimelineAnnotation("blockCacheHit");
410 }
411 assert cachedBlock.isUnpacked() : "Packed block leak.";
412 if (cachedBlock.getBlockType().isData()) {
413 if (updateCacheMetrics) {
414 HFile.dataBlockReadCnt.increment();
415 }
416
417
418 if (cachedBlock.getDataBlockEncoding() != dataBlockEncoder.getDataBlockEncoding()) {
419 throw new IOException("Cached block under key " + cacheKey + " "
420 + "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: "
421 + dataBlockEncoder.getDataBlockEncoding() + ")");
422 }
423 }
424
425 return cachedBlock;
426 }
427 if (!useLock && cacheBlock && cacheConf.shouldLockOnCacheMiss(expectedBlockType)) {
428
429 useLock = true;
430 continue;
431 }
432
433 }
434
435 if (Trace.isTracing()) {
436 traceScope.getSpan().addTimelineAnnotation("blockCacheMiss");
437 }
438
439 HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1,
440 pread, !isCompaction);
441 validateBlockType(hfileBlock, expectedBlockType);
442 HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
443 BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory();
444
445
446 if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
447 cacheConf.getBlockCache().cacheBlock(cacheKey,
448 cacheConf.shouldCacheCompressed(category) ? hfileBlock : unpacked,
449 cacheConf.isInMemory(), this.cacheConf.isCacheDataInL1());
450 }
451
452 if (updateCacheMetrics && hfileBlock.getBlockType().isData()) {
453 HFile.dataBlockReadCnt.increment();
454 }
455
456 return unpacked;
457 }
458 } finally {
459 traceScope.close();
460 if (lockEntry != null) {
461 offsetLock.releaseLockEntry(lockEntry);
462 }
463 }
464 }
465
466 @Override
467 public boolean hasMVCCInfo() {
468 return includesMemstoreTS && decodeMemstoreTS;
469 }
470
471
472
473
474
475
476
477
478
479
480 private void validateBlockType(HFileBlock block,
481 BlockType expectedBlockType) throws IOException {
482 if (expectedBlockType == null) {
483 return;
484 }
485 BlockType actualBlockType = block.getBlockType();
486 if (expectedBlockType.isData() && actualBlockType.isData()) {
487
488
489 return;
490 }
491 if (actualBlockType != expectedBlockType) {
492 throw new IOException("Expected block type " + expectedBlockType + ", " +
493 "but got " + actualBlockType + ": " + block);
494 }
495 }
496
497
498
499
500
501
502 @Override
503 public byte[] getLastKey() {
504 return dataBlockIndexReader.isEmpty() ? null : lastKey;
505 }
506
507
508
509
510
511
512 @Override
513 public byte[] midkey() throws IOException {
514 return dataBlockIndexReader.midkey();
515 }
516
517 @Override
518 public void close() throws IOException {
519 close(cacheConf.shouldEvictOnClose());
520 }
521
522 public void close(boolean evictOnClose) throws IOException {
523 PrefetchExecutor.cancel(path);
524 if (evictOnClose && cacheConf.isBlockCacheEnabled()) {
525 int numEvicted = cacheConf.getBlockCache().evictBlocksByHfileName(name);
526 if (LOG.isTraceEnabled()) {
527 LOG.trace("On close, file=" + name + " evicted=" + numEvicted
528 + " block(s)");
529 }
530 }
531 fsBlockReader.closeStreams();
532 }
533
534 public DataBlockEncoding getEffectiveEncodingInCache(boolean isCompaction) {
535 return dataBlockEncoder.getEffectiveEncodingInCache(isCompaction);
536 }
537
538
539 @Override
540 HFileBlock.FSReader getUncachedBlockReader() {
541 return fsBlockReader;
542 }
543
544
545 protected abstract static class AbstractScannerV2
546 extends AbstractHFileReader.Scanner {
547 protected HFileBlock block;
548
549 @Override
550 public Cell getNextIndexedKey() {
551 return nextIndexedKey;
552 }
553
554
555
556
557
558
559
560 protected Cell nextIndexedKey;
561
562 public AbstractScannerV2(HFileReaderV2 r, boolean cacheBlocks,
563 final boolean pread, final boolean isCompaction) {
564 super(r, cacheBlocks, pread, isCompaction);
565 }
566
567 protected abstract ByteBuffer getFirstKeyInBlock(HFileBlock curBlock);
568
569 protected abstract int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey,
570 boolean rewind, Cell key, boolean seekBefore) throws IOException;
571
572 @Override
573 public int seekTo(byte[] key, int offset, int length) throws IOException {
574
575
576 return seekTo(new KeyValue.KeyOnlyKeyValue(key, offset, length));
577 }
578
579 @Override
580 public int reseekTo(byte[] key, int offset, int length) throws IOException {
581 return reseekTo(new KeyValue.KeyOnlyKeyValue(key, offset, length));
582 }
583
584 @Override
585 public int seekTo(Cell key) throws IOException {
586 return seekTo(key, true);
587 }
588
589 @Override
590 public int reseekTo(Cell key) throws IOException {
591 int compared;
592 if (isSeeked()) {
593 compared = compareKey(reader.getComparator(), key);
594 if (compared < 1) {
595
596
597 return compared;
598 } else {
599
600 if (this.nextIndexedKey != null &&
601 (this.nextIndexedKey == HConstants.NO_NEXT_INDEXED_KEY || reader
602 .getComparator()
603 .compareOnlyKeyPortion(key, nextIndexedKey) < 0)) {
604
605
606
607
608
609
610 return loadBlockAndSeekToKey(this.block, nextIndexedKey, false, key, false);
611 }
612 }
613 }
614
615
616 return seekTo(key, false);
617 }
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634 public int seekTo(Cell key, boolean rewind) throws IOException {
635 HFileBlockIndex.BlockIndexReader indexReader = reader.getDataBlockIndexReader();
636 BlockWithScanInfo blockWithScanInfo = indexReader.loadDataBlockWithScanInfo(key, block,
637 cacheBlocks, pread, isCompaction, getEffectiveDataBlockEncoding());
638 if (blockWithScanInfo == null || blockWithScanInfo.getHFileBlock() == null) {
639
640 return -1;
641 }
642 return loadBlockAndSeekToKey(blockWithScanInfo.getHFileBlock(),
643 blockWithScanInfo.getNextIndexedKey(), rewind, key, false);
644 }
645
646 @Override
647 public boolean seekBefore(byte[] key, int offset, int length) throws IOException {
648 return seekBefore(new KeyValue.KeyOnlyKeyValue(key, offset, length));
649 }
650
651 @Override
652 public boolean seekBefore(Cell key) throws IOException {
653 HFileBlock seekToBlock = reader.getDataBlockIndexReader().seekToDataBlock(key, block,
654 cacheBlocks, pread, isCompaction,
655 ((HFileReaderV2) reader).getEffectiveEncodingInCache(isCompaction));
656 if (seekToBlock == null) {
657 return false;
658 }
659 ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock);
660
661 if (reader.getComparator()
662 .compareOnlyKeyPortion(
663 new KeyValue.KeyOnlyKeyValue(firstKey.array(), firstKey.arrayOffset(),
664 firstKey.limit()), key) >= 0) {
665 long previousBlockOffset = seekToBlock.getPrevBlockOffset();
666
667 if (previousBlockOffset == -1) {
668
669 return false;
670 }
671
672
673
674
675
676
677 int prevBlockSize = -1;
678 seekToBlock = reader.readBlock(previousBlockOffset,
679 prevBlockSize, cacheBlocks,
680 pread, isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
681
682
683 }
684 Cell firstKeyInCurrentBlock = new KeyValue.KeyOnlyKeyValue(Bytes.getBytes(firstKey));
685 loadBlockAndSeekToKey(seekToBlock, firstKeyInCurrentBlock, true, key, true);
686 return true;
687 }
688
689
690
691
692
693
694
695
696 protected HFileBlock readNextDataBlock() throws IOException {
697 long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
698 if (block == null)
699 return null;
700
701 HFileBlock curBlock = block;
702
703 do {
704 if (curBlock.getOffset() >= lastDataBlockOffset)
705 return null;
706
707 if (curBlock.getOffset() < 0) {
708 throw new IOException("Invalid block file offset: " + block);
709 }
710
711
712
713 curBlock = reader.readBlock(curBlock.getOffset()
714 + curBlock.getOnDiskSizeWithHeader(),
715 curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
716 isCompaction, true, null, getEffectiveDataBlockEncoding());
717 } while (!curBlock.getBlockType().isData());
718
719 return curBlock;
720 }
721
722 public DataBlockEncoding getEffectiveDataBlockEncoding() {
723 return ((HFileReaderV2)reader).getEffectiveEncodingInCache(isCompaction);
724 }
725
726
727
728
729
730
731
732
733 public abstract int compareKey(KVComparator comparator, byte[] key, int offset,
734 int length);
735
736 public abstract int compareKey(KVComparator comparator, Cell kv);
737 }
738
739
740
741
742 protected static class ScannerV2 extends AbstractScannerV2 {
743 private HFileReaderV2 reader;
744
745 public ScannerV2(HFileReaderV2 r, boolean cacheBlocks,
746 final boolean pread, final boolean isCompaction) {
747 super(r, cacheBlocks, pread, isCompaction);
748 this.reader = r;
749 }
750
751 @Override
752 public Cell getKeyValue() {
753 if (!isSeeked())
754 return null;
755
756 return formNoTagsKeyValue();
757 }
758
759 protected Cell formNoTagsKeyValue() {
760 NoTagsKeyValue ret = new NoTagsKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
761 + blockBuffer.position(), getCellBufSize());
762 if (this.reader.shouldIncludeMemstoreTS()) {
763 ret.setSequenceId(currMemstoreTS);
764 }
765 return ret;
766 }
767
768 protected int getCellBufSize() {
769 return KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen;
770 }
771
772 @Override
773 public ByteBuffer getKey() {
774 assertSeeked();
775 return ByteBuffer.wrap(
776 blockBuffer.array(),
777 blockBuffer.arrayOffset() + blockBuffer.position()
778 + KEY_VALUE_LEN_SIZE, currKeyLen).slice();
779 }
780
781 @Override
782 public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
783 return comparator.compareFlatKey(key, offset, length, blockBuffer.array(),
784 blockBuffer.arrayOffset() + blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen);
785 }
786
787 @Override
788 public ByteBuffer getValue() {
789 assertSeeked();
790 return ByteBuffer.wrap(
791 blockBuffer.array(),
792 blockBuffer.arrayOffset() + blockBuffer.position()
793 + KEY_VALUE_LEN_SIZE + currKeyLen, currValueLen).slice();
794 }
795
796 protected void setNonSeekedState() {
797 block = null;
798 blockBuffer = null;
799 currKeyLen = 0;
800 currValueLen = 0;
801 currMemstoreTS = 0;
802 currMemstoreTSLen = 0;
803 }
804
805
806
807
808 private void positionThisBlockBuffer() {
809 try {
810 blockBuffer.position(getNextCellStartPosition());
811 } catch (IllegalArgumentException e) {
812 LOG.error("Current pos = " + blockBuffer.position()
813 + "; currKeyLen = " + currKeyLen + "; currValLen = "
814 + currValueLen + "; block limit = " + blockBuffer.limit()
815 + "; HFile name = " + reader.getName()
816 + "; currBlock currBlockOffset = " + block.getOffset());
817 throw e;
818 }
819 }
820
821
822
823
824
825
826 private boolean positionForNextBlock() throws IOException {
827
828 long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
829 if (block.getOffset() >= lastDataBlockOffset) {
830 setNonSeekedState();
831 return false;
832 }
833 return isNextBlock();
834 }
835
836 private boolean isNextBlock() throws IOException {
837
838 HFileBlock nextBlock = readNextDataBlock();
839 if (nextBlock == null) {
840 setNonSeekedState();
841 return false;
842 }
843 updateCurrBlock(nextBlock);
844 return true;
845 }
846
847 private final boolean _next() throws IOException {
848
849 if (blockBuffer.remaining() <= 0) {
850 return positionForNextBlock();
851 }
852
853 readKeyValueLen();
854 return true;
855 }
856
857
858
859
860
861
862
863
864 @Override
865 public boolean next() throws IOException {
866
867
868 assertSeeked();
869 positionThisBlockBuffer();
870 return _next();
871 }
872
873 protected int getNextCellStartPosition() {
874 return blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen
875 + currMemstoreTSLen;
876 }
877
878
879
880
881
882
883
884
885 @Override
886 public boolean seekTo() throws IOException {
887 if (reader == null) {
888 return false;
889 }
890
891 if (reader.getTrailer().getEntryCount() == 0) {
892
893 return false;
894 }
895
896 long firstDataBlockOffset =
897 reader.getTrailer().getFirstDataBlockOffset();
898 if (block != null && block.getOffset() == firstDataBlockOffset) {
899 blockBuffer.rewind();
900 readKeyValueLen();
901 return true;
902 }
903
904 block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
905 isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
906 if (block.getOffset() < 0) {
907 throw new IOException("Invalid block offset: " + block.getOffset());
908 }
909 updateCurrBlock(block);
910 return true;
911 }
912
913 @Override
914 protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey,
915 boolean rewind, Cell key, boolean seekBefore) throws IOException {
916 if (block == null || block.getOffset() != seekToBlock.getOffset()) {
917 updateCurrBlock(seekToBlock);
918 } else if (rewind) {
919 blockBuffer.rewind();
920 }
921
922
923 this.nextIndexedKey = nextIndexedKey;
924 return blockSeek(key, seekBefore);
925 }
926
927
928
929
930
931
932
933 protected void updateCurrBlock(HFileBlock newBlock) {
934 block = newBlock;
935
936
937 if (block.getBlockType() != BlockType.DATA) {
938 throw new IllegalStateException("ScannerV2 works only on data " +
939 "blocks, got " + block.getBlockType() + "; " +
940 "fileName=" + reader.name + ", " +
941 "dataBlockEncoder=" + reader.dataBlockEncoder + ", " +
942 "isCompaction=" + isCompaction);
943 }
944
945 blockBuffer = block.getBufferWithoutHeader();
946 readKeyValueLen();
947 blockFetches++;
948
949
950 this.nextIndexedKey = null;
951 }
952
953
954
955
956
957 protected final boolean checkLen(final int v) {
958 return v < 0 || v > this.blockBuffer.limit();
959 }
960
961
962
963
964 protected final void checkKeyValueLen() {
965 if (checkLen(this.currKeyLen) || checkLen(this.currValueLen)) {
966 throw new IllegalStateException("Invalid currKeyLen " + this.currKeyLen +
967 " or currValueLen " + this.currValueLen + ". Block offset: " + block.getOffset() +
968 ", block length: " + this.blockBuffer.limit() + ", position: " +
969 this.blockBuffer.position() + " (without header).");
970 }
971 }
972
973 protected void readKeyValueLen() {
974
975
976
977
978
979 int p = blockBuffer.position() + blockBuffer.arrayOffset();
980
981
982 long ll = Bytes.toLong(blockBuffer.array(), p);
983
984 this.currKeyLen = (int)(ll >> Integer.SIZE);
985 this.currValueLen = (int)(Bytes.MASK_FOR_LOWER_INT_IN_LONG ^ ll);
986 checkKeyValueLen();
987
988 p += (Bytes.SIZEOF_LONG + currKeyLen + currValueLen);
989 readMvccVersion(p);
990 }
991
992
993
994
995
996 protected void readMvccVersion(final int position) {
997
998 if (!this.reader.shouldIncludeMemstoreTS()) return;
999 if (!this.reader.decodeMemstoreTS) {
1000 currMemstoreTS = 0;
1001 currMemstoreTSLen = 1;
1002 return;
1003 }
1004 _readMvccVersion(position);
1005 }
1006
1007
1008
1009
1010
1011 private void _readMvccVersion(final int position) {
1012
1013
1014
1015 byte firstByte = blockBuffer.array()[position];
1016 int len = WritableUtils.decodeVIntSize(firstByte);
1017 if (len == 1) {
1018 this.currMemstoreTS = firstByte;
1019 } else {
1020 long i = 0;
1021 for (int idx = 0; idx < len - 1; idx++) {
1022 byte b = blockBuffer.array()[position + 1 + idx];
1023 i = i << 8;
1024 i = i | (b & 0xFF);
1025 }
1026 currMemstoreTS = (WritableUtils.isNegativeVInt(firstByte) ? ~i : i);
1027 }
1028 this.currMemstoreTSLen = len;
1029 }
1030
1031 protected void readMvccVersion() {
1032
1033 readMvccVersion(blockBuffer.arrayOffset() + blockBuffer.position());
1034 }
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053 protected int blockSeek(Cell key, boolean seekBefore) {
1054 int klen, vlen;
1055 long memstoreTS = 0;
1056 int memstoreTSLen = 0;
1057 int lastKeyValueSize = -1;
1058 KeyValue.KeyOnlyKeyValue keyOnlykv = new KeyValue.KeyOnlyKeyValue();
1059 do {
1060 blockBuffer.mark();
1061 klen = blockBuffer.getInt();
1062 vlen = blockBuffer.getInt();
1063 blockBuffer.reset();
1064 if (this.reader.shouldIncludeMemstoreTS()) {
1065 if (this.reader.decodeMemstoreTS) {
1066 int memstoreTSOffset = blockBuffer.arrayOffset() + blockBuffer.position()
1067 + KEY_VALUE_LEN_SIZE + klen + vlen;
1068 memstoreTS = Bytes.readAsVLong(blockBuffer.array(), memstoreTSOffset);
1069 memstoreTSLen = WritableUtils.getVIntSize(memstoreTS);
1070 } else {
1071 memstoreTS = 0;
1072 memstoreTSLen = 1;
1073 }
1074 }
1075
1076 int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position() + KEY_VALUE_LEN_SIZE;
1077 keyOnlykv.setKey(blockBuffer.array(), keyOffset, klen);
1078 int comp = reader.getComparator().compareOnlyKeyPortion(key, keyOnlykv);
1079
1080 if (comp == 0) {
1081 if (seekBefore) {
1082 if (lastKeyValueSize < 0) {
1083 throw new IllegalStateException("blockSeek with seekBefore "
1084 + "at the first key of the block: key="
1085 + CellUtil.getCellKeyAsString(key)
1086 + ", blockOffset=" + block.getOffset() + ", onDiskSize="
1087 + block.getOnDiskSizeWithHeader());
1088 }
1089 blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
1090 readKeyValueLen();
1091 return 1;
1092 }
1093 currKeyLen = klen;
1094 currValueLen = vlen;
1095 if (this.reader.shouldIncludeMemstoreTS()) {
1096 currMemstoreTS = memstoreTS;
1097 currMemstoreTSLen = memstoreTSLen;
1098 }
1099 return 0;
1100 } else if (comp < 0) {
1101 if (lastKeyValueSize > 0)
1102 blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
1103 readKeyValueLen();
1104 if (lastKeyValueSize == -1 && blockBuffer.position() == 0
1105 && this.reader.trailer.getMinorVersion() >= MINOR_VERSION_WITH_FAKED_KEY) {
1106 return HConstants.INDEX_KEY_MAGIC;
1107 }
1108 return 1;
1109 }
1110
1111
1112 lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE;
1113 blockBuffer.position(blockBuffer.position() + lastKeyValueSize);
1114 } while (blockBuffer.remaining() > 0);
1115
1116
1117
1118
1119 blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
1120 readKeyValueLen();
1121 return 1;
1122 }
1123
1124 @Override
1125 protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
1126 ByteBuffer buffer = curBlock.getBufferWithoutHeader();
1127
1128 buffer.rewind();
1129 int klen = buffer.getInt();
1130 buffer.getInt();
1131 ByteBuffer keyBuff = buffer.slice();
1132 keyBuff.limit(klen);
1133 keyBuff.rewind();
1134 return keyBuff;
1135 }
1136
1137 @Override
1138 public String getKeyString() {
1139 return Bytes.toStringBinary(blockBuffer.array(),
1140 blockBuffer.arrayOffset() + blockBuffer.position()
1141 + KEY_VALUE_LEN_SIZE, currKeyLen);
1142 }
1143
1144 @Override
1145 public String getValueString() {
1146 return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset()
1147 + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen,
1148 currValueLen);
1149 }
1150
1151 @Override
1152 public int compareKey(KVComparator comparator, Cell key) {
1153 return comparator.compareOnlyKeyPortion(
1154 key,
1155 new KeyValue.KeyOnlyKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
1156 + blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen));
1157 }
1158 }
1159
1160
1161
1162
1163 protected static class EncodedScannerV2 extends AbstractScannerV2 {
1164 private final HFileBlockDecodingContext decodingCtx;
1165 private final DataBlockEncoder.EncodedSeeker seeker;
1166 private final DataBlockEncoder dataBlockEncoder;
1167 protected final HFileContext meta;
1168
1169 public EncodedScannerV2(HFileReaderV2 reader, boolean cacheBlocks,
1170 boolean pread, boolean isCompaction, HFileContext meta) {
1171 super(reader, cacheBlocks, pread, isCompaction);
1172 DataBlockEncoding encoding = reader.dataBlockEncoder.getDataBlockEncoding();
1173 dataBlockEncoder = encoding.getEncoder();
1174 decodingCtx = dataBlockEncoder.newDataBlockDecodingContext(meta);
1175 seeker = dataBlockEncoder.createSeeker(
1176 reader.getComparator(), decodingCtx);
1177 this.meta = meta;
1178 }
1179
1180 @Override
1181 public boolean isSeeked(){
1182 return this.block != null;
1183 }
1184
1185
1186
1187
1188
1189
1190
1191
1192 private void updateCurrentBlock(HFileBlock newBlock) throws CorruptHFileException {
1193 block = newBlock;
1194
1195
1196 if (block.getBlockType() != BlockType.ENCODED_DATA) {
1197 throw new IllegalStateException(
1198 "EncodedScanner works only on encoded data blocks");
1199 }
1200 short dataBlockEncoderId = block.getDataBlockEncodingId();
1201 if (!DataBlockEncoding.isCorrectEncoder(dataBlockEncoder, dataBlockEncoderId)) {
1202 String encoderCls = dataBlockEncoder.getClass().getName();
1203 throw new CorruptHFileException("Encoder " + encoderCls
1204 + " doesn't support data block encoding "
1205 + DataBlockEncoding.getNameFromId(dataBlockEncoderId));
1206 }
1207
1208 seeker.setCurrentBuffer(getEncodedBuffer(newBlock));
1209 blockFetches++;
1210
1211
1212 this.nextIndexedKey = null;
1213 }
1214
1215 private ByteBuffer getEncodedBuffer(HFileBlock newBlock) {
1216 ByteBuffer origBlock = newBlock.getBufferReadOnly();
1217 ByteBuffer encodedBlock = ByteBuffer.wrap(origBlock.array(),
1218 origBlock.arrayOffset() + newBlock.headerSize() +
1219 DataBlockEncoding.ID_SIZE,
1220 newBlock.getUncompressedSizeWithoutHeader() -
1221 DataBlockEncoding.ID_SIZE).slice();
1222 return encodedBlock;
1223 }
1224
1225 @Override
1226 public boolean seekTo() throws IOException {
1227 if (reader == null) {
1228 return false;
1229 }
1230
1231 if (reader.getTrailer().getEntryCount() == 0) {
1232
1233 return false;
1234 }
1235
1236 long firstDataBlockOffset =
1237 reader.getTrailer().getFirstDataBlockOffset();
1238 if (block != null && block.getOffset() == firstDataBlockOffset) {
1239 seeker.rewind();
1240 return true;
1241 }
1242
1243 block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
1244 isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
1245 if (block.getOffset() < 0) {
1246 throw new IOException("Invalid block offset: " + block.getOffset());
1247 }
1248 updateCurrentBlock(block);
1249 return true;
1250 }
1251
1252 @Override
1253 public boolean next() throws IOException {
1254 boolean isValid = seeker.next();
1255 if (!isValid) {
1256 block = readNextDataBlock();
1257 isValid = block != null;
1258 if (isValid) {
1259 updateCurrentBlock(block);
1260 }
1261 }
1262 return isValid;
1263 }
1264
1265 @Override
1266 public ByteBuffer getKey() {
1267 assertValidSeek();
1268 return seeker.getKeyDeepCopy();
1269 }
1270
1271 @Override
1272 public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
1273 return seeker.compareKey(comparator, key, offset, length);
1274 }
1275
1276 @Override
1277 public ByteBuffer getValue() {
1278 assertValidSeek();
1279 return seeker.getValueShallowCopy();
1280 }
1281
1282 @Override
1283 public Cell getKeyValue() {
1284 if (block == null) {
1285 return null;
1286 }
1287 return seeker.getKeyValue();
1288 }
1289
1290 @Override
1291 public String getKeyString() {
1292 ByteBuffer keyBuffer = getKey();
1293 return Bytes.toStringBinary(keyBuffer.array(),
1294 keyBuffer.arrayOffset(), keyBuffer.limit());
1295 }
1296
1297 @Override
1298 public String getValueString() {
1299 ByteBuffer valueBuffer = getValue();
1300 return Bytes.toStringBinary(valueBuffer.array(),
1301 valueBuffer.arrayOffset(), valueBuffer.limit());
1302 }
1303
1304 private void assertValidSeek() {
1305 if (block == null) {
1306 throw new NotSeekedException();
1307 }
1308 }
1309
1310 @Override
1311 protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
1312 return dataBlockEncoder.getFirstKeyInBlock(getEncodedBuffer(curBlock));
1313 }
1314
1315 @Override
1316 protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey,
1317 boolean rewind, Cell key, boolean seekBefore) throws IOException {
1318 if (block == null || block.getOffset() != seekToBlock.getOffset()) {
1319 updateCurrentBlock(seekToBlock);
1320 } else if (rewind) {
1321 seeker.rewind();
1322 }
1323 this.nextIndexedKey = nextIndexedKey;
1324 return seeker.seekToKeyInBlock(key, seekBefore);
1325 }
1326
1327 @Override
1328 public int compareKey(KVComparator comparator, Cell key) {
1329 return seeker.compareKey(comparator, key);
1330 }
1331 }
1332
1333
1334
1335
1336
1337 @Override
1338 public DataInput getGeneralBloomFilterMetadata() throws IOException {
1339 return this.getBloomFilterMetadata(BlockType.GENERAL_BLOOM_META);
1340 }
1341
1342 @Override
1343 public DataInput getDeleteBloomFilterMetadata() throws IOException {
1344 return this.getBloomFilterMetadata(BlockType.DELETE_FAMILY_BLOOM_META);
1345 }
1346
1347 private DataInput getBloomFilterMetadata(BlockType blockType)
1348 throws IOException {
1349 if (blockType != BlockType.GENERAL_BLOOM_META &&
1350 blockType != BlockType.DELETE_FAMILY_BLOOM_META) {
1351 throw new RuntimeException("Block Type: " + blockType.toString() +
1352 " is not supported") ;
1353 }
1354
1355 for (HFileBlock b : loadOnOpenBlocks)
1356 if (b.getBlockType() == blockType)
1357 return b.getByteStream();
1358 return null;
1359 }
1360
1361 @Override
1362 public boolean isFileInfoLoaded() {
1363 return true;
1364 }
1365
1366
1367
1368
1369
1370 private void validateMinorVersion(Path path, int minorVersion) {
1371 if (minorVersion < MIN_MINOR_VERSION ||
1372 minorVersion > MAX_MINOR_VERSION) {
1373 String msg = "Minor version for path " + path +
1374 " is expected to be between " +
1375 MIN_MINOR_VERSION + " and " + MAX_MINOR_VERSION +
1376 " but is found to be " + minorVersion;
1377 LOG.error(msg);
1378 throw new RuntimeException(msg);
1379 }
1380 }
1381
1382 @Override
1383 public int getMajorVersion() {
1384 return 2;
1385 }
1386
1387 @Override
1388 public HFileContext getFileContext() {
1389 return hfileContext;
1390 }
1391
1392
1393
1394
1395
1396 @VisibleForTesting
1397 boolean prefetchComplete() {
1398 return PrefetchExecutor.isCompleted(path);
1399 }
1400
1401 @Override
1402 public void unbufferStream() {
1403 fsBlockReader.unbufferStream();
1404 }
1405 }