1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.io.hfile;
20
21 import java.io.ByteArrayInputStream;
22 import java.io.Closeable;
23 import java.io.DataInput;
24 import java.io.DataInputStream;
25 import java.io.DataOutputStream;
26 import java.io.IOException;
27 import java.io.SequenceInputStream;
28 import java.net.InetSocketAddress;
29 import java.nio.ByteBuffer;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.Comparator;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Set;
36 import java.util.SortedMap;
37 import java.util.TreeMap;
38
39 import org.apache.commons.logging.Log;
40 import org.apache.commons.logging.LogFactory;
41 import org.apache.hadoop.hbase.classification.InterfaceAudience;
42 import org.apache.hadoop.conf.Configuration;
43 import org.apache.hadoop.fs.FSDataInputStream;
44 import org.apache.hadoop.fs.FSDataOutputStream;
45 import org.apache.hadoop.fs.FileStatus;
46 import org.apache.hadoop.fs.FileSystem;
47 import org.apache.hadoop.fs.Path;
48 import org.apache.hadoop.fs.PathFilter;
49 import org.apache.hadoop.hbase.Cell;
50 import org.apache.hadoop.hbase.HConstants;
51 import org.apache.hadoop.hbase.KeyValue;
52 import org.apache.hadoop.hbase.KeyValue.KVComparator;
53 import org.apache.hadoop.hbase.fs.HFileSystem;
54 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
55 import org.apache.hadoop.hbase.io.MetricsIO;
56 import org.apache.hadoop.hbase.io.MetricsIOWrapperImpl;
57 import org.apache.hadoop.hbase.io.compress.Compression;
58 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
59 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
60 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
61 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
62 import org.apache.hadoop.hbase.protobuf.generated.HFileProtos;
63 import org.apache.hadoop.hbase.util.BloomFilterWriter;
64 import org.apache.hadoop.hbase.util.ByteStringer;
65 import org.apache.hadoop.hbase.util.Bytes;
66 import org.apache.hadoop.hbase.util.ChecksumType;
67 import org.apache.hadoop.hbase.util.Counter;
68 import org.apache.hadoop.hbase.util.FSUtils;
69 import org.apache.hadoop.io.Writable;
70
71 import com.google.common.base.Preconditions;
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136 @InterfaceAudience.Private
137 public class HFile {
138 static final Log LOG = LogFactory.getLog(HFile.class);
139
140
141
142
143 public final static int MAXIMUM_KEY_LENGTH = Integer.MAX_VALUE;
144
145
146
147
148 public final static Compression.Algorithm DEFAULT_COMPRESSION_ALGORITHM =
149 Compression.Algorithm.NONE;
150
151
152 public static final int MIN_FORMAT_VERSION = 2;
153
154
155
156 public static final int MAX_FORMAT_VERSION = 3;
157
158
159
160
161 public static final int MIN_FORMAT_VERSION_WITH_TAGS = 3;
162
163
164 public final static String DEFAULT_COMPRESSION =
165 DEFAULT_COMPRESSION_ALGORITHM.getName();
166
167
168 public static final String BLOOM_FILTER_DATA_KEY = "BLOOM_FILTER_DATA";
169
170
171
172
173
174
175
176 public final static int MIN_NUM_HFILE_PATH_LEVELS = 5;
177
178
179
180
181 public static final int DEFAULT_BYTES_PER_CHECKSUM = 16 * 1024;
182
183 public static final ChecksumType DEFAULT_CHECKSUM_TYPE = ChecksumType.CRC32;
184
185
186 static final Counter checksumFailures = new Counter();
187
188
189 public static final Counter dataBlockReadCnt = new Counter();
190
191
192 static final MetricsIO metrics = new MetricsIO(new MetricsIOWrapperImpl());
193
194
195
196
197
198 public static final long getAndResetChecksumFailuresCount() {
199 long count = checksumFailures.get();
200 checksumFailures.set(0);
201 return count;
202 }
203
204
205
206
207 public static final long getChecksumFailuresCount() {
208 long count = checksumFailures.get();
209 return count;
210 }
211
212 public static final void updateReadLatency(long latencyMillis, boolean pread) {
213 if (pread) {
214 metrics.updateFsPreadTime(latencyMillis);
215 } else {
216 metrics.updateFsReadTime(latencyMillis);
217 }
218 }
219
220 public static final void updateWriteLatency(long latencyMillis) {
221 metrics.updateFsWriteTime(latencyMillis);
222 }
223
224
225 public interface Writer extends Closeable {
226
227
228 void appendFileInfo(byte[] key, byte[] value) throws IOException;
229
230 void append(Cell cell) throws IOException;
231
232
233 Path getPath();
234
235
236
237
238
239 void addInlineBlockWriter(InlineBlockWriter bloomWriter);
240
241
242
243
244
245
246 void appendMetaBlock(String bloomFilterMetaKey, Writable metaWriter);
247
248
249
250
251
252
253 void addGeneralBloomFilter(BloomFilterWriter bfw);
254
255
256
257
258
259 void addDeleteFamilyBloomFilter(BloomFilterWriter bfw) throws IOException;
260
261
262
263
264 HFileContext getFileContext();
265 }
266
267
268
269
270
271 public static abstract class WriterFactory {
272 protected final Configuration conf;
273 protected final CacheConfig cacheConf;
274 protected FileSystem fs;
275 protected Path path;
276 protected FSDataOutputStream ostream;
277 protected KVComparator comparator = KeyValue.COMPARATOR;
278 protected InetSocketAddress[] favoredNodes;
279 private HFileContext fileContext;
280
281 WriterFactory(Configuration conf, CacheConfig cacheConf) {
282 this.conf = conf;
283 this.cacheConf = cacheConf;
284 }
285
286 public WriterFactory withPath(FileSystem fs, Path path) {
287 Preconditions.checkNotNull(fs);
288 Preconditions.checkNotNull(path);
289 this.fs = fs;
290 this.path = path;
291 return this;
292 }
293
294 public WriterFactory withOutputStream(FSDataOutputStream ostream) {
295 Preconditions.checkNotNull(ostream);
296 this.ostream = ostream;
297 return this;
298 }
299
300 public WriterFactory withComparator(KVComparator comparator) {
301 Preconditions.checkNotNull(comparator);
302 this.comparator = comparator;
303 return this;
304 }
305
306 public WriterFactory withFavoredNodes(InetSocketAddress[] favoredNodes) {
307
308 this.favoredNodes = favoredNodes;
309 return this;
310 }
311
312 public WriterFactory withFileContext(HFileContext fileContext) {
313 this.fileContext = fileContext;
314 return this;
315 }
316
317 public Writer create() throws IOException {
318 if ((path != null ? 1 : 0) + (ostream != null ? 1 : 0) != 1) {
319 throw new AssertionError("Please specify exactly one of " +
320 "filesystem/path or path");
321 }
322 if (path != null) {
323 ostream = AbstractHFileWriter.createOutputStream(conf, fs, path, favoredNodes);
324 }
325 return createWriter(fs, path, ostream,
326 comparator, fileContext);
327 }
328
329 protected abstract Writer createWriter(FileSystem fs, Path path, FSDataOutputStream ostream,
330 KVComparator comparator, HFileContext fileContext) throws IOException;
331 }
332
333
334 public static final String FORMAT_VERSION_KEY = "hfile.format.version";
335
336 public static int getFormatVersion(Configuration conf) {
337 int version = conf.getInt(FORMAT_VERSION_KEY, MAX_FORMAT_VERSION);
338 checkFormatVersion(version);
339 return version;
340 }
341
342
343
344
345
346
347 public static final WriterFactory getWriterFactoryNoCache(Configuration
348 conf) {
349 Configuration tempConf = new Configuration(conf);
350 tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
351 return HFile.getWriterFactory(conf, new CacheConfig(tempConf));
352 }
353
354
355
356
357 public static final WriterFactory getWriterFactory(Configuration conf,
358 CacheConfig cacheConf) {
359 int version = getFormatVersion(conf);
360 switch (version) {
361 case 2:
362 return new HFileWriterV2.WriterFactoryV2(conf, cacheConf);
363 case 3:
364 return new HFileWriterV3.WriterFactoryV3(conf, cacheConf);
365 default:
366 throw new IllegalArgumentException("Cannot create writer for HFile " +
367 "format version " + version);
368 }
369 }
370
371
372
373
374
375
376 public interface CachingBlockReader {
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394 HFileBlock readBlock(long offset, long onDiskBlockSize,
395 boolean cacheBlock, final boolean pread, final boolean isCompaction,
396 final boolean updateCacheMetrics, BlockType expectedBlockType,
397 DataBlockEncoding expectedDataBlockEncoding)
398 throws IOException;
399 }
400
401
402 public interface Reader extends Closeable, CachingBlockReader {
403
404
405
406
407
408 String getName();
409
410 KVComparator getComparator();
411
412 HFileScanner getScanner(boolean cacheBlocks, final boolean pread, final boolean isCompaction);
413
414 ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock) throws IOException;
415
416 Map<byte[], byte[]> loadFileInfo() throws IOException;
417
418 byte[] getLastKey();
419
420 byte[] midkey() throws IOException;
421
422 long length();
423
424 long getEntries();
425
426 byte[] getFirstKey();
427
428 long indexSize();
429
430 byte[] getFirstRowKey();
431
432 byte[] getLastRowKey();
433
434 FixedFileTrailer getTrailer();
435
436 HFileBlockIndex.BlockIndexReader getDataBlockIndexReader();
437
438 HFileScanner getScanner(boolean cacheBlocks, boolean pread);
439
440 Compression.Algorithm getCompressionAlgorithm();
441
442
443
444
445
446
447 DataInput getGeneralBloomFilterMetadata() throws IOException;
448
449
450
451
452
453
454 DataInput getDeleteBloomFilterMetadata() throws IOException;
455
456 Path getPath();
457
458
459 void close(boolean evictOnClose) throws IOException;
460
461 DataBlockEncoding getDataBlockEncoding();
462
463 boolean hasMVCCInfo();
464
465
466
467
468 HFileContext getFileContext();
469
470 boolean isPrimaryReplicaReader();
471
472 void setPrimaryReplicaReader(boolean isPrimaryReplicaReader);
473
474
475
476
477
478 void unbufferStream();
479 }
480
481
482
483
484
485
486
487
488
489
490
491
492
493 private static Reader openReader(Path path, FSDataInputStreamWrapper fsdis,
494 long size, CacheConfig cacheConf, HFileSystem hfs, Configuration conf) throws IOException {
495 FixedFileTrailer trailer = null;
496 try {
497 boolean isHBaseChecksum = fsdis.shouldUseHBaseChecksum();
498 assert !isHBaseChecksum;
499 trailer = FixedFileTrailer.readFromStream(fsdis.getStream(isHBaseChecksum), size);
500 switch (trailer.getMajorVersion()) {
501 case 2:
502 return new HFileReaderV2(path, trailer, fsdis, size, cacheConf, hfs, conf);
503 case 3 :
504 return new HFileReaderV3(path, trailer, fsdis, size, cacheConf, hfs, conf);
505 default:
506 throw new IllegalArgumentException("Invalid HFile version " + trailer.getMajorVersion());
507 }
508 } catch (Throwable t) {
509 try {
510 fsdis.close();
511 } catch (Throwable t2) {
512 LOG.warn("Error closing fsdis FSDataInputStreamWrapper", t2);
513 }
514 throw new CorruptHFileException("Problem reading HFile Trailer from file " + path, t);
515 } finally {
516 fsdis.unbuffer();
517 }
518 }
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533 public static Reader createReader(FileSystem fs, Path path,
534 FSDataInputStreamWrapper fsdis, long size, CacheConfig cacheConf, Configuration conf)
535 throws IOException {
536 HFileSystem hfs = null;
537
538
539
540
541
542 if (!(fs instanceof HFileSystem)) {
543 hfs = new HFileSystem(fs);
544 } else {
545 hfs = (HFileSystem)fs;
546 }
547 return openReader(path, fsdis, size, cacheConf, hfs, conf);
548 }
549
550
551
552
553
554
555
556
557
558 public static Reader createReader(
559 FileSystem fs, Path path, Configuration conf) throws IOException {
560 return createReader(fs, path, CacheConfig.DISABLED, conf);
561 }
562
563
564
565
566
567
568
569
570
571 public static Reader createReader(
572 FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf) throws IOException {
573 Preconditions.checkNotNull(cacheConf, "Cannot create Reader with null CacheConf");
574 FSDataInputStreamWrapper stream = new FSDataInputStreamWrapper(fs, path);
575 return openReader(path, stream, fs.getFileStatus(path).getLen(),
576 cacheConf, stream.getHfs(), conf);
577 }
578
579
580
581
582
583
584
585 static Reader createReaderFromStream(Path path,
586 FSDataInputStream fsdis, long size, CacheConfig cacheConf, Configuration conf)
587 throws IOException {
588 FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fsdis);
589 return openReader(path, wrapper, size, cacheConf, null, conf);
590 }
591
592
593
594
595
596
597
598
599 public static boolean isHFileFormat(final FileSystem fs, final Path path) throws IOException {
600 return isHFileFormat(fs, fs.getFileStatus(path));
601 }
602
603
604
605
606
607
608
609
610 public static boolean isHFileFormat(final FileSystem fs, final FileStatus fileStatus)
611 throws IOException {
612 final Path path = fileStatus.getPath();
613 final long size = fileStatus.getLen();
614 FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, path);
615 try {
616 boolean isHBaseChecksum = fsdis.shouldUseHBaseChecksum();
617 assert !isHBaseChecksum;
618 FixedFileTrailer.readFromStream(fsdis.getStream(isHBaseChecksum), size);
619 return true;
620 } catch (IllegalArgumentException e) {
621 return false;
622 } catch (IOException e) {
623 throw e;
624 } finally {
625 try {
626 fsdis.close();
627 } catch (Throwable t) {
628 LOG.warn("Error closing fsdis FSDataInputStreamWrapper: " + path, t);
629 }
630 }
631 }
632
633
634
635
636 public static class FileInfo implements SortedMap<byte[], byte[]> {
637 static final String RESERVED_PREFIX = "hfile.";
638 static final byte[] RESERVED_PREFIX_BYTES = Bytes.toBytes(RESERVED_PREFIX);
639 static final byte [] LASTKEY = Bytes.toBytes(RESERVED_PREFIX + "LASTKEY");
640 static final byte [] AVG_KEY_LEN = Bytes.toBytes(RESERVED_PREFIX + "AVG_KEY_LEN");
641 static final byte [] AVG_VALUE_LEN = Bytes.toBytes(RESERVED_PREFIX + "AVG_VALUE_LEN");
642 static final byte [] CREATE_TIME_TS = Bytes.toBytes(RESERVED_PREFIX + "CREATE_TIME_TS");
643 static final byte [] COMPARATOR = Bytes.toBytes(RESERVED_PREFIX + "COMPARATOR");
644 static final byte [] TAGS_COMPRESSED = Bytes.toBytes(RESERVED_PREFIX + "TAGS_COMPRESSED");
645 public static final byte [] MAX_TAGS_LEN = Bytes.toBytes(RESERVED_PREFIX + "MAX_TAGS_LEN");
646 private final SortedMap<byte [], byte []> map = new TreeMap<byte [], byte []>(Bytes.BYTES_COMPARATOR);
647
648 public FileInfo() {
649 super();
650 }
651
652
653
654
655
656
657
658
659
660
661
662
663 public FileInfo append(final byte[] k, final byte[] v,
664 final boolean checkPrefix) throws IOException {
665 if (k == null || v == null) {
666 throw new NullPointerException("Key nor value may be null");
667 }
668 if (checkPrefix && isReservedFileInfoKey(k)) {
669 throw new IOException("Keys with a " + FileInfo.RESERVED_PREFIX
670 + " are reserved");
671 }
672 put(k, v);
673 return this;
674 }
675
676 @Override
677 public void clear() {
678 this.map.clear();
679 }
680
681 @Override
682 public Comparator<? super byte[]> comparator() {
683 return map.comparator();
684 }
685
686 @Override
687 public boolean containsKey(Object key) {
688 return map.containsKey(key);
689 }
690
691 @Override
692 public boolean containsValue(Object value) {
693 return map.containsValue(value);
694 }
695
696 @Override
697 public Set<java.util.Map.Entry<byte[], byte[]>> entrySet() {
698 return map.entrySet();
699 }
700
701 @Override
702 public boolean equals(Object o) {
703 return map.equals(o);
704 }
705
706 @Override
707 public byte[] firstKey() {
708 return map.firstKey();
709 }
710
711 @Override
712 public byte[] get(Object key) {
713 return map.get(key);
714 }
715
716 @Override
717 public int hashCode() {
718 return map.hashCode();
719 }
720
721 @Override
722 public SortedMap<byte[], byte[]> headMap(byte[] toKey) {
723 return this.map.headMap(toKey);
724 }
725
726 @Override
727 public boolean isEmpty() {
728 return map.isEmpty();
729 }
730
731 @Override
732 public Set<byte[]> keySet() {
733 return map.keySet();
734 }
735
736 @Override
737 public byte[] lastKey() {
738 return map.lastKey();
739 }
740
741 @Override
742 public byte[] put(byte[] key, byte[] value) {
743 return this.map.put(key, value);
744 }
745
746 @Override
747 public void putAll(Map<? extends byte[], ? extends byte[]> m) {
748 this.map.putAll(m);
749 }
750
751 @Override
752 public byte[] remove(Object key) {
753 return this.map.remove(key);
754 }
755
756 @Override
757 public int size() {
758 return map.size();
759 }
760
761 @Override
762 public SortedMap<byte[], byte[]> subMap(byte[] fromKey, byte[] toKey) {
763 return this.map.subMap(fromKey, toKey);
764 }
765
766 @Override
767 public SortedMap<byte[], byte[]> tailMap(byte[] fromKey) {
768 return this.map.tailMap(fromKey);
769 }
770
771 @Override
772 public Collection<byte[]> values() {
773 return map.values();
774 }
775
776
777
778
779
780
781
782
783 void write(final DataOutputStream out) throws IOException {
784 HFileProtos.FileInfoProto.Builder builder = HFileProtos.FileInfoProto.newBuilder();
785 for (Map.Entry<byte [], byte[]> e: this.map.entrySet()) {
786 HBaseProtos.BytesBytesPair.Builder bbpBuilder = HBaseProtos.BytesBytesPair.newBuilder();
787 bbpBuilder.setFirst(ByteStringer.wrap(e.getKey()));
788 bbpBuilder.setSecond(ByteStringer.wrap(e.getValue()));
789 builder.addMapEntry(bbpBuilder.build());
790 }
791 out.write(ProtobufUtil.PB_MAGIC);
792 builder.build().writeDelimitedTo(out);
793 }
794
795
796
797
798
799
800
801
802 void read(final DataInputStream in) throws IOException {
803
804 int pblen = ProtobufUtil.lengthOfPBMagic();
805 byte [] pbuf = new byte[pblen];
806 if (in.markSupported()) in.mark(pblen);
807 int read = in.read(pbuf);
808 if (read != pblen) throw new IOException("read=" + read + ", wanted=" + pblen);
809 if (ProtobufUtil.isPBMagicPrefix(pbuf)) {
810 parsePB(HFileProtos.FileInfoProto.parseDelimitedFrom(in));
811 } else {
812 if (in.markSupported()) {
813 in.reset();
814 parseWritable(in);
815 } else {
816
817 ByteArrayInputStream bais = new ByteArrayInputStream(pbuf);
818 SequenceInputStream sis = new SequenceInputStream(bais, in);
819
820
821
822 parseWritable(new DataInputStream(sis));
823 }
824 }
825 }
826
827
828
829
830
831
832 void parseWritable(final DataInputStream in) throws IOException {
833
834 this.map.clear();
835
836 int entries = in.readInt();
837
838 for (int i = 0; i < entries; i++) {
839 byte [] key = Bytes.readByteArray(in);
840
841 in.readByte();
842 byte [] value = Bytes.readByteArray(in);
843 this.map.put(key, value);
844 }
845 }
846
847
848
849
850
851 void parsePB(final HFileProtos.FileInfoProto fip) {
852 this.map.clear();
853 for (BytesBytesPair pair: fip.getMapEntryList()) {
854 this.map.put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
855 }
856 }
857 }
858
859
860 public static boolean isReservedFileInfoKey(byte[] key) {
861 return Bytes.startsWith(key, FileInfo.RESERVED_PREFIX_BYTES);
862 }
863
864
865
866
867
868
869
870
871
872
873
874
875
876 public static String[] getSupportedCompressionAlgorithms() {
877 return Compression.getSupportedAlgorithms();
878 }
879
880
881
882
883
884
885 static int longToInt(final long l) {
886
887
888 return (int)(l & 0x00000000ffffffffL);
889 }
890
891
892
893
894
895
896
897
898
899
900 static List<Path> getStoreFiles(FileSystem fs, Path regionDir)
901 throws IOException {
902 List<Path> regionHFiles = new ArrayList<Path>();
903 PathFilter dirFilter = new FSUtils.DirFilter(fs);
904 FileStatus[] familyDirs = fs.listStatus(regionDir, dirFilter);
905 for(FileStatus dir : familyDirs) {
906 FileStatus[] files = fs.listStatus(dir.getPath());
907 for (FileStatus file : files) {
908 if (!file.isDirectory() &&
909 (!file.getPath().toString().contains(HConstants.HREGION_OLDLOGDIR_NAME)) &&
910 (!file.getPath().toString().contains(HConstants.RECOVERED_EDITS_DIR))) {
911 regionHFiles.add(file.getPath());
912 }
913 }
914 }
915 return regionHFiles;
916 }
917
918
919
920
921
922
923
924
925
926
927 public static void checkFormatVersion(int version)
928 throws IllegalArgumentException {
929 if (version < MIN_FORMAT_VERSION || version > MAX_FORMAT_VERSION) {
930 throw new IllegalArgumentException("Invalid HFile version: " + version
931 + " (expected to be " + "between " + MIN_FORMAT_VERSION + " and "
932 + MAX_FORMAT_VERSION + ")");
933 }
934 }
935
936 public static void main(String[] args) throws Exception {
937
938 HFilePrettyPrinter.main(args);
939 }
940 }