1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.io.hfile;
20
21 import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
22 import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
23 import static org.junit.Assert.*;
24
25 import java.io.ByteArrayOutputStream;
26 import java.io.DataOutputStream;
27 import java.io.IOException;
28 import java.io.OutputStream;
29 import java.nio.ByteBuffer;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.List;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.fs.FSDataInputStream;
37 import org.apache.hadoop.fs.FSDataOutputStream;
38 import org.apache.hadoop.fs.Path;
39 import org.apache.hadoop.hbase.Cell;
40 import org.apache.hadoop.hbase.HBaseTestingUtility;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.KeyValue;
43 import org.apache.hadoop.hbase.KeyValueUtil;
44 import org.apache.hadoop.hbase.testclassification.SmallTests;
45 import org.apache.hadoop.hbase.fs.HFileSystem;
46 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
47 import org.apache.hadoop.hbase.io.compress.Compression;
48 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
49 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
50 import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
51 import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
52 import org.apache.hadoop.hbase.util.Bytes;
53 import org.apache.hadoop.hbase.util.ChecksumType;
54 import org.apache.hadoop.io.WritableUtils;
55 import org.apache.hadoop.io.compress.Compressor;
56 import org.junit.Before;
57 import org.junit.Test;
58 import org.junit.experimental.categories.Category;
59 import org.junit.runner.RunWith;
60 import org.junit.runners.Parameterized;
61 import org.junit.runners.Parameterized.Parameters;
62
63 import com.google.common.base.Preconditions;
64
65
66
67
68
69 @Category(SmallTests.class)
70 @RunWith(Parameterized.class)
71 public class TestHFileBlockCompatibility {
72
73 private static final Log LOG = LogFactory.getLog(TestHFileBlockCompatibility.class);
74 private static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
75 NONE, GZ };
76
77 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
78 private HFileSystem fs;
79
80 private final boolean includesMemstoreTS;
81 private final boolean includesTag;
82
83 public TestHFileBlockCompatibility(boolean includesMemstoreTS, boolean includesTag) {
84 this.includesMemstoreTS = includesMemstoreTS;
85 this.includesTag = includesTag;
86 }
87
88 @Parameters
89 public static Collection<Object[]> parameters() {
90 return HBaseTestingUtility.MEMSTORETS_TAGS_PARAMETRIZED;
91 }
92
93 @Before
94 public void setUp() throws IOException {
95 fs = (HFileSystem)HFileSystem.get(TEST_UTIL.getConfiguration());
96 }
97
98 public byte[] createTestV1Block(Compression.Algorithm algo)
99 throws IOException {
100 Compressor compressor = algo.getCompressor();
101 ByteArrayOutputStream baos = new ByteArrayOutputStream();
102 OutputStream os = algo.createCompressionStream(baos, compressor, 0);
103 DataOutputStream dos = new DataOutputStream(os);
104 BlockType.META.write(dos);
105 TestHFileBlock.writeTestBlockContents(dos);
106 dos.flush();
107 algo.returnCompressor(compressor);
108 return baos.toByteArray();
109 }
110
111 private Writer createTestV2Block(Compression.Algorithm algo)
112 throws IOException {
113 final BlockType blockType = BlockType.DATA;
114 Writer hbw = new Writer(algo, null,
115 includesMemstoreTS, includesTag);
116 DataOutputStream dos = hbw.startWriting(blockType);
117 TestHFileBlock.writeTestBlockContents(dos);
118
119 hbw.getHeaderAndData();
120 assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
121 hbw.releaseCompressor();
122 return hbw;
123 }
124
125 private String createTestBlockStr(Compression.Algorithm algo,
126 int correctLength) throws IOException {
127 Writer hbw = createTestV2Block(algo);
128 byte[] testV2Block = hbw.getHeaderAndData();
129 int osOffset = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM + 9;
130 if (testV2Block.length == correctLength) {
131
132
133
134 testV2Block[osOffset] = 3;
135 }
136 return Bytes.toStringBinary(testV2Block);
137 }
138
139 @Test
140 public void testNoCompression() throws IOException {
141 assertEquals(4000, createTestV2Block(NONE).getBlockForCaching().
142 getUncompressedSizeWithoutHeader());
143 }
144
145 @Test
146 public void testGzipCompression() throws IOException {
147 final String correctTestBlockStr =
148 "DATABLK*\\x00\\x00\\x00:\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF"
149 + "\\xFF\\xFF\\xFF\\xFF"
150
151 + "\\x1F\\x8B"
152 + "\\x08"
153 + "\\x00"
154 + "\\x00\\x00\\x00\\x00"
155 + "\\x00"
156
157
158 + "\\x03"
159 + "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa"
160 + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c"
161 + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\x5Cs\\xA0\\x0F\\x00\\x00";
162 final int correctGzipBlockLength = 82;
163
164 String returnedStr = createTestBlockStr(GZ, correctGzipBlockLength);
165 assertEquals(correctTestBlockStr, returnedStr);
166 }
167
168 @Test
169 public void testReaderV2() throws IOException {
170 if(includesTag) {
171 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
172 }
173 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
174 for (boolean pread : new boolean[] { false, true }) {
175 LOG.info("testReaderV2: Compression algorithm: " + algo +
176 ", pread=" + pread);
177 Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
178 + algo);
179 FSDataOutputStream os = fs.create(path);
180 Writer hbw = new Writer(algo, null,
181 includesMemstoreTS, includesTag);
182 long totalSize = 0;
183 for (int blockId = 0; blockId < 2; ++blockId) {
184 DataOutputStream dos = hbw.startWriting(BlockType.DATA);
185 for (int i = 0; i < 1234; ++i)
186 dos.writeInt(i);
187 hbw.writeHeaderAndData(os);
188 totalSize += hbw.getOnDiskSizeWithHeader();
189 }
190 os.close();
191
192 FSDataInputStream is = fs.open(path);
193 HFileContext meta = new HFileContextBuilder()
194 .withHBaseCheckSum(false)
195 .withIncludesMvcc(includesMemstoreTS)
196 .withIncludesTags(includesTag)
197 .withCompression(algo)
198 .build();
199 HFileBlock.FSReader hbr =
200 new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(is), totalSize, fs, path, meta);
201 HFileBlock b = hbr.readBlockData(0, -1, -1, pread, false);
202 is.close();
203
204 b.sanityCheck();
205 assertEquals(4936, b.getUncompressedSizeWithoutHeader());
206 assertEquals(algo == GZ ? 2173 : 4936,
207 b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
208 HFileBlock expected = b;
209
210 if (algo == GZ) {
211 is = fs.open(path);
212 hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(is), totalSize, fs, path,
213 meta);
214 b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM +
215 b.totalChecksumBytes(), -1, pread, false);
216 assertEquals(expected, b);
217 int wrongCompressedSize = 2172;
218 try {
219 b = hbr.readBlockData(0, wrongCompressedSize
220 + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM, -1, pread, false);
221 fail("Exception expected");
222 } catch (IOException ex) {
223 String expectedPrefix = "On-disk size without header provided is "
224 + wrongCompressedSize + ", but block header contains "
225 + b.getOnDiskSizeWithoutHeader() + ".";
226 assertTrue("Invalid exception message: '" + ex.getMessage()
227 + "'.\nMessage is expected to start with: '" + expectedPrefix
228 + "'", ex.getMessage().startsWith(expectedPrefix));
229 }
230 is.close();
231 }
232 }
233 }
234 }
235
236
237
238
239
240 @Test
241 public void testDataBlockEncoding() throws IOException {
242 if(includesTag) {
243 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
244 }
245 final int numBlocks = 5;
246 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
247 for (boolean pread : new boolean[] { false, true }) {
248 for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
249 LOG.info("testDataBlockEncoding algo " + algo +
250 " pread = " + pread +
251 " encoding " + encoding);
252 Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
253 + algo + "_" + encoding.toString());
254 FSDataOutputStream os = fs.create(path);
255 HFileDataBlockEncoder dataBlockEncoder = (encoding != DataBlockEncoding.NONE) ?
256 new HFileDataBlockEncoderImpl(encoding) : NoOpDataBlockEncoder.INSTANCE;
257 TestHFileBlockCompatibility.Writer hbw =
258 new TestHFileBlockCompatibility.Writer(algo,
259 dataBlockEncoder, includesMemstoreTS, includesTag);
260 long totalSize = 0;
261 final List<Integer> encodedSizes = new ArrayList<Integer>();
262 final List<ByteBuffer> encodedBlocks = new ArrayList<ByteBuffer>();
263 for (int blockId = 0; blockId < numBlocks; ++blockId) {
264 hbw.startWriting(BlockType.DATA);
265 TestHFileBlock.writeTestKeyValues(hbw, blockId, pread, includesTag);
266 hbw.writeHeaderAndData(os);
267 int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
268 byte[] encodedResultWithHeader = hbw.getUncompressedDataWithHeader();
269 final int encodedSize = encodedResultWithHeader.length - headerLen;
270 if (encoding != DataBlockEncoding.NONE) {
271
272
273 headerLen += DataBlockEncoding.ID_SIZE;
274 }
275 byte[] encodedDataSection =
276 new byte[encodedResultWithHeader.length - headerLen];
277 System.arraycopy(encodedResultWithHeader, headerLen,
278 encodedDataSection, 0, encodedDataSection.length);
279 final ByteBuffer encodedBuf =
280 ByteBuffer.wrap(encodedDataSection);
281 encodedSizes.add(encodedSize);
282 encodedBlocks.add(encodedBuf);
283 totalSize += hbw.getOnDiskSizeWithHeader();
284 }
285 os.close();
286
287 FSDataInputStream is = fs.open(path);
288 HFileContext meta = new HFileContextBuilder()
289 .withHBaseCheckSum(false)
290 .withIncludesMvcc(includesMemstoreTS)
291 .withIncludesTags(includesTag)
292 .withCompression(algo)
293 .build();
294 HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(is),
295 totalSize, fs, path, meta);
296 hbr.setDataBlockEncoder(dataBlockEncoder);
297 hbr.setIncludesMemstoreTS(includesMemstoreTS);
298
299 HFileBlock b;
300 int pos = 0;
301 for (int blockId = 0; blockId < numBlocks; ++blockId) {
302 b = hbr.readBlockData(pos, -1, -1, pread, false);
303 b.sanityCheck();
304 if (meta.isCompressedOrEncrypted()) {
305 assertFalse(b.isUnpacked());
306 b = b.unpack(meta, hbr);
307 }
308 pos += b.getOnDiskSizeWithHeader();
309
310 assertEquals((int) encodedSizes.get(blockId),
311 b.getUncompressedSizeWithoutHeader());
312 ByteBuffer actualBuffer = b.getBufferWithoutHeader();
313 if (encoding != DataBlockEncoding.NONE) {
314
315 assertEquals(0, actualBuffer.get(0));
316 assertEquals(encoding.getId(), actualBuffer.get(1));
317 actualBuffer.position(2);
318 actualBuffer = actualBuffer.slice();
319 }
320
321 ByteBuffer expectedBuffer = encodedBlocks.get(blockId);
322 expectedBuffer.rewind();
323
324
325 TestHFileBlock.assertBuffersEqual(expectedBuffer, actualBuffer,
326 algo, encoding, pread);
327 }
328 is.close();
329 }
330 }
331 }
332 }
333
334
335
336
337
338
339
340
341
342 public static final class Writer extends HFileBlock.Writer {
343
344
345 private static final int HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
346 private static final boolean DONT_FILL_HEADER = HFileBlock.DONT_FILL_HEADER;
347 private static final byte[] DUMMY_HEADER =
348 HFileBlock.DUMMY_HEADER_NO_CHECKSUM;
349
350 private enum State {
351 INIT,
352 WRITING,
353 BLOCK_READY
354 };
355
356
357 private State state = State.INIT;
358
359
360 private final Compression.Algorithm compressAlgo;
361
362
363 private final HFileDataBlockEncoder dataBlockEncoder;
364
365 private HFileBlockEncodingContext dataBlockEncodingCtx;
366
367 private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
368
369
370
371
372
373
374
375 private ByteArrayOutputStream baosInMemory;
376
377
378 private Compressor compressor;
379
380
381
382
383
384
385 private BlockType blockType;
386
387
388
389
390
391 private DataOutputStream userDataStream;
392
393
394
395
396
397 private byte[] onDiskBytesWithHeader;
398
399
400
401
402
403
404 private byte[] uncompressedBytesWithHeader;
405
406
407
408
409
410 private long startOffset;
411
412
413
414
415
416 private long[] prevOffsetByType;
417
418
419 private long prevOffset;
420
421 private int unencodedDataSizeWritten;
422
423 public Writer(Compression.Algorithm compressionAlgorithm,
424 HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS, boolean includesTag) {
425 this(dataBlockEncoder, new HFileContextBuilder().withHBaseCheckSum(false)
426 .withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTag)
427 .withCompression(compressionAlgorithm).build());
428 }
429
430 public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext meta) {
431 super(dataBlockEncoder, meta);
432 compressAlgo = meta.getCompression() == null ? NONE : meta.getCompression();
433 this.dataBlockEncoder = dataBlockEncoder != null ? dataBlockEncoder
434 : NoOpDataBlockEncoder.INSTANCE;
435 defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null, DUMMY_HEADER, meta);
436 dataBlockEncodingCtx = this.dataBlockEncoder.newDataBlockEncodingContext(DUMMY_HEADER, meta);
437 baosInMemory = new ByteArrayOutputStream();
438
439 prevOffsetByType = new long[BlockType.values().length];
440 for (int i = 0; i < prevOffsetByType.length; ++i)
441 prevOffsetByType[i] = -1;
442 }
443
444
445
446
447
448
449
450 public DataOutputStream startWriting(BlockType newBlockType)
451 throws IOException {
452 if (state == State.BLOCK_READY && startOffset != -1) {
453
454
455 prevOffsetByType[blockType.getId()] = startOffset;
456 }
457
458 startOffset = -1;
459 blockType = newBlockType;
460
461 baosInMemory.reset();
462 baosInMemory.write(DUMMY_HEADER);
463
464 state = State.WRITING;
465
466
467 userDataStream = new DataOutputStream(baosInMemory);
468 if (newBlockType == BlockType.DATA) {
469 this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
470 }
471 this.unencodedDataSizeWritten = 0;
472 return userDataStream;
473 }
474
475 @Override
476 public void write(Cell c) throws IOException {
477 KeyValue kv = KeyValueUtil.ensureKeyValue(c);
478 expectState(State.WRITING);
479 this.dataBlockEncoder.encode(kv, dataBlockEncodingCtx, this.userDataStream);
480 this.unencodedDataSizeWritten += kv.getLength();
481 if (dataBlockEncodingCtx.getHFileContext().isIncludesMvcc()) {
482 this.unencodedDataSizeWritten += WritableUtils.getVIntSize(kv.getMvccVersion());
483 }
484 }
485
486
487
488
489
490
491
492
493 DataOutputStream getUserDataStream() {
494 expectState(State.WRITING);
495 return userDataStream;
496 }
497
498
499
500
501
502 void ensureBlockReady() throws IOException {
503 Preconditions.checkState(state != State.INIT,
504 "Unexpected state: " + state);
505
506 if (state == State.BLOCK_READY)
507 return;
508
509
510 finishBlock();
511 }
512
513
514
515
516
517
518
519 void finishBlock() throws IOException {
520 if (blockType == BlockType.DATA) {
521 this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
522 baosInMemory.toByteArray(), blockType);
523 blockType = dataBlockEncodingCtx.getBlockType();
524 }
525 userDataStream.flush();
526
527 uncompressedBytesWithHeader = baosInMemory.toByteArray();
528 prevOffset = prevOffsetByType[blockType.getId()];
529
530
531
532
533 state = State.BLOCK_READY;
534 if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
535 onDiskBytesWithHeader = dataBlockEncodingCtx
536 .compressAndEncrypt(uncompressedBytesWithHeader);
537 } else {
538 onDiskBytesWithHeader = defaultBlockEncodingCtx
539 .compressAndEncrypt(uncompressedBytesWithHeader);
540 }
541
542
543 putHeader(onDiskBytesWithHeader, 0,
544 onDiskBytesWithHeader.length,
545 uncompressedBytesWithHeader.length);
546
547 putHeader(uncompressedBytesWithHeader, 0,
548 onDiskBytesWithHeader.length,
549 uncompressedBytesWithHeader.length);
550 }
551
552
553
554
555
556
557
558 private void putHeader(byte[] dest, int offset, int onDiskSize,
559 int uncompressedSize) {
560 offset = blockType.put(dest, offset);
561 offset = Bytes.putInt(dest, offset, onDiskSize - HEADER_SIZE);
562 offset = Bytes.putInt(dest, offset, uncompressedSize - HEADER_SIZE);
563 Bytes.putLong(dest, offset, prevOffset);
564 }
565
566
567
568
569
570
571
572
573
574 public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
575 long offset = out.getPos();
576 if (startOffset != -1 && offset != startOffset) {
577 throw new IOException("A " + blockType + " block written to a "
578 + "stream twice, first at offset " + startOffset + ", then at "
579 + offset);
580 }
581 startOffset = offset;
582
583 writeHeaderAndData((DataOutputStream) out);
584 }
585
586
587
588
589
590
591
592
593
594
595 private void writeHeaderAndData(DataOutputStream out) throws IOException {
596 ensureBlockReady();
597 out.write(onDiskBytesWithHeader);
598 }
599
600
601
602
603
604
605
606
607
608
609 public byte[] getHeaderAndData() throws IOException {
610 ensureBlockReady();
611 return onDiskBytesWithHeader;
612 }
613
614
615
616
617
618 public void releaseCompressor() {
619 if (compressor != null) {
620 compressAlgo.returnCompressor(compressor);
621 compressor = null;
622 }
623 }
624
625
626
627
628
629
630
631
632
633 public int getOnDiskSizeWithoutHeader() {
634 expectState(State.BLOCK_READY);
635 return onDiskBytesWithHeader.length - HEADER_SIZE;
636 }
637
638
639
640
641
642
643
644
645 public int getOnDiskSizeWithHeader() {
646 expectState(State.BLOCK_READY);
647 return onDiskBytesWithHeader.length;
648 }
649
650
651
652
653 public int getUncompressedSizeWithoutHeader() {
654 expectState(State.BLOCK_READY);
655 return uncompressedBytesWithHeader.length - HEADER_SIZE;
656 }
657
658
659
660
661 public int getUncompressedSizeWithHeader() {
662 expectState(State.BLOCK_READY);
663 return uncompressedBytesWithHeader.length;
664 }
665
666
667 public boolean isWriting() {
668 return state == State.WRITING;
669 }
670
671
672
673
674
675
676
677
678 public int blockSizeWritten() {
679 if (state != State.WRITING)
680 return 0;
681 return this.unencodedDataSizeWritten;
682 }
683
684
685
686
687
688
689
690
691 private byte[] getUncompressedDataWithHeader() {
692 expectState(State.BLOCK_READY);
693
694 return uncompressedBytesWithHeader;
695 }
696
697 private void expectState(State expectedState) {
698 if (state != expectedState) {
699 throw new IllegalStateException("Expected state: " + expectedState +
700 ", actual state: " + state);
701 }
702 }
703
704
705
706
707
708
709
710 public ByteBuffer getUncompressedBufferWithHeader() {
711 byte[] b = getUncompressedDataWithHeader();
712 return ByteBuffer.wrap(b, 0, b.length);
713 }
714
715
716
717
718
719
720
721
722
723
724
725 public void writeBlock(BlockWritable bw, FSDataOutputStream out)
726 throws IOException {
727 bw.writeToBlock(startWriting(bw.getBlockType()));
728 writeHeaderAndData(out);
729 }
730
731
732
733
734 public HFileBlock getBlockForCaching() {
735 HFileContext meta = new HFileContextBuilder()
736 .withHBaseCheckSum(false)
737 .withChecksumType(ChecksumType.NULL)
738 .withBytesPerCheckSum(0)
739 .build();
740 return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
741 getUncompressedSizeWithoutHeader(), prevOffset,
742 getUncompressedBufferWithHeader(), DONT_FILL_HEADER, startOffset,
743 getOnDiskSizeWithoutHeader(), meta);
744 }
745 }
746
747 }
748