1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.io.hfile;
20
21 import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
22 import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
23 import static org.junit.Assert.*;
24
25 import java.io.ByteArrayOutputStream;
26 import java.io.DataOutputStream;
27 import java.io.IOException;
28 import java.io.OutputStream;
29 import java.nio.ByteBuffer;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.Collections;
33 import java.util.HashMap;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Random;
37 import java.util.concurrent.Callable;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.Executor;
40 import java.util.concurrent.ExecutorCompletionService;
41 import java.util.concurrent.Executors;
42 import java.util.concurrent.Future;
43
44 import org.apache.commons.logging.Log;
45 import org.apache.commons.logging.LogFactory;
46 import org.apache.hadoop.fs.FSDataInputStream;
47 import org.apache.hadoop.fs.FSDataOutputStream;
48 import org.apache.hadoop.fs.FileSystem;
49 import org.apache.hadoop.fs.Path;
50 import org.apache.hadoop.hbase.HBaseTestingUtility;
51 import org.apache.hadoop.hbase.HConstants;
52 import org.apache.hadoop.hbase.KeyValue;
53 import org.apache.hadoop.hbase.testclassification.MediumTests;
54 import org.apache.hadoop.hbase.Tag;
55 import org.apache.hadoop.hbase.fs.HFileSystem;
56 import org.apache.hadoop.hbase.io.compress.Compression;
57 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
58 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
59 import org.apache.hadoop.hbase.util.Bytes;
60 import org.apache.hadoop.hbase.util.ChecksumType;
61 import org.apache.hadoop.hbase.util.ClassSize;
62 import org.apache.hadoop.io.WritableUtils;
63 import org.apache.hadoop.io.compress.Compressor;
64 import org.junit.Assume;
65 import org.junit.Before;
66 import org.junit.Test;
67 import org.junit.experimental.categories.Category;
68 import org.junit.runner.RunWith;
69 import org.junit.runners.Parameterized;
70 import org.junit.runners.Parameterized.Parameters;
71 import org.mockito.Mockito;
72
73 @Category(MediumTests.class)
74 @RunWith(Parameterized.class)
75 public class TestHFileBlock {
76
77 private static final boolean detailedLogging = false;
78 private static final boolean[] BOOLEAN_VALUES = new boolean[] { false, true };
79
80 private static final Log LOG = LogFactory.getLog(TestHFileBlock.class);
81
82 static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ };
83
84 private static final int NUM_TEST_BLOCKS = 1000;
85 private static final int NUM_READER_THREADS = 26;
86
87
88 private static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
89
90
91 private static int NUM_KEYVALUES = 50;
92 private static int FIELD_LENGTH = 10;
93 private static float CHANCE_TO_REPEAT = 0.6f;
94
95 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
96 private FileSystem fs;
97
98 private final boolean includesMemstoreTS;
99 private final boolean includesTag;
100 public TestHFileBlock(boolean includesMemstoreTS, boolean includesTag) {
101 this.includesMemstoreTS = includesMemstoreTS;
102 this.includesTag = includesTag;
103 }
104
105 @Parameters
106 public static Collection<Object[]> parameters() {
107 return HBaseTestingUtility.MEMSTORETS_TAGS_PARAMETRIZED;
108 }
109
110 @Before
111 public void setUp() throws IOException {
112 fs = HFileSystem.get(TEST_UTIL.getConfiguration());
113 }
114
115 static void writeTestBlockContents(DataOutputStream dos) throws IOException {
116
117 for (int i = 0; i < 1000; ++i)
118 dos.writeInt(i / 100);
119 }
120
121 static int writeTestKeyValues(HFileBlock.Writer hbw, int seed, boolean includesMemstoreTS,
122 boolean useTag) throws IOException {
123 List<KeyValue> keyValues = new ArrayList<KeyValue>();
124 Random randomizer = new Random(42l + seed);
125
126
127 for (int i = 0; i < NUM_KEYVALUES; ++i) {
128 byte[] row;
129 long timestamp;
130 byte[] family;
131 byte[] qualifier;
132 byte[] value;
133
134
135 if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
136 row = keyValues.get(randomizer.nextInt(keyValues.size())).getRow();
137 } else {
138 row = new byte[FIELD_LENGTH];
139 randomizer.nextBytes(row);
140 }
141 if (0 == i) {
142 family = new byte[FIELD_LENGTH];
143 randomizer.nextBytes(family);
144 } else {
145 family = keyValues.get(0).getFamily();
146 }
147 if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
148 qualifier = keyValues.get(
149 randomizer.nextInt(keyValues.size())).getQualifier();
150 } else {
151 qualifier = new byte[FIELD_LENGTH];
152 randomizer.nextBytes(qualifier);
153 }
154 if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
155 value = keyValues.get(randomizer.nextInt(keyValues.size())).getValue();
156 } else {
157 value = new byte[FIELD_LENGTH];
158 randomizer.nextBytes(value);
159 }
160 if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
161 timestamp = keyValues.get(
162 randomizer.nextInt(keyValues.size())).getTimestamp();
163 } else {
164 timestamp = randomizer.nextLong();
165 }
166 if (!useTag) {
167 keyValues.add(new KeyValue(row, family, qualifier, timestamp, value));
168 } else {
169 keyValues.add(new KeyValue(row, family, qualifier, timestamp, value, new Tag[] { new Tag(
170 (byte) 1, Bytes.toBytes("myTagVal")) }));
171 }
172 }
173
174
175 int totalSize = 0;
176 Collections.sort(keyValues, KeyValue.COMPARATOR);
177
178 for (KeyValue kv : keyValues) {
179 totalSize += kv.getLength();
180 if (includesMemstoreTS) {
181 long memstoreTS = randomizer.nextLong();
182 kv.setSequenceId(memstoreTS);
183 totalSize += WritableUtils.getVIntSize(memstoreTS);
184 }
185 hbw.write(kv);
186 }
187 return totalSize;
188 }
189
190 public byte[] createTestV1Block(Compression.Algorithm algo)
191 throws IOException {
192 Compressor compressor = algo.getCompressor();
193 ByteArrayOutputStream baos = new ByteArrayOutputStream();
194 OutputStream os = algo.createCompressionStream(baos, compressor, 0);
195 DataOutputStream dos = new DataOutputStream(os);
196 BlockType.META.write(dos);
197 writeTestBlockContents(dos);
198 dos.flush();
199 algo.returnCompressor(compressor);
200 return baos.toByteArray();
201 }
202
203 static HFileBlock.Writer createTestV2Block(Compression.Algorithm algo,
204 boolean includesMemstoreTS, boolean includesTag) throws IOException {
205 final BlockType blockType = BlockType.DATA;
206 HFileContext meta = new HFileContextBuilder()
207 .withCompression(algo)
208 .withIncludesMvcc(includesMemstoreTS)
209 .withIncludesTags(includesTag)
210 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
211 .build();
212 HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
213 DataOutputStream dos = hbw.startWriting(blockType);
214 writeTestBlockContents(dos);
215 dos.flush();
216 hbw.ensureBlockReady();
217 assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
218 hbw.release();
219 return hbw;
220 }
221
222 public String createTestBlockStr(Compression.Algorithm algo,
223 int correctLength, boolean useTag) throws IOException {
224 HFileBlock.Writer hbw = createTestV2Block(algo, includesMemstoreTS, useTag);
225 byte[] testV2Block = hbw.getHeaderAndDataForTest();
226 int osOffset = HConstants.HFILEBLOCK_HEADER_SIZE + 9;
227 if (testV2Block.length == correctLength) {
228
229
230
231
232
233 testV2Block[osOffset] = 3;
234 }
235 return Bytes.toStringBinary(testV2Block);
236 }
237
238 @Test
239 public void testNoCompression() throws IOException {
240 CacheConfig cacheConf = Mockito.mock(CacheConfig.class);
241 Mockito.when(cacheConf.isBlockCacheEnabled()).thenReturn(false);
242
243 HFileBlock block =
244 createTestV2Block(NONE, includesMemstoreTS, false).getBlockForCaching(cacheConf);
245 assertEquals(4000, block.getUncompressedSizeWithoutHeader());
246 assertEquals(4004, block.getOnDiskSizeWithoutHeader());
247 assertTrue(block.isUnpacked());
248 }
249
250 @Test
251 public void testGzipCompression() throws IOException {
252 final String correctTestBlockStr =
253 "DATABLK*\\x00\\x00\\x00>\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF"
254 + "\\xFF\\xFF\\xFF\\xFF"
255 + "\\x01\\x00\\x00@\\x00\\x00\\x00\\x00["
256
257 + "\\x1F\\x8B"
258 + "\\x08"
259 + "\\x00"
260 + "\\x00\\x00\\x00\\x00"
261 + "\\x00"
262
263
264
265
266 + "\\x03"
267 + "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa"
268 + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c"
269 + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\x5Cs\\xA0\\x0F\\x00\\x00"
270 + "\\x00\\x00\\x00\\x00";
271 final int correctGzipBlockLength = 95;
272 final String testBlockStr = createTestBlockStr(GZ, correctGzipBlockLength, false);
273
274
275 assertEquals(correctTestBlockStr.substring(0, correctGzipBlockLength - 4),
276 testBlockStr.substring(0, correctGzipBlockLength - 4));
277 }
278
279 @Test
280 public void testReaderV2() throws IOException {
281 testReaderV2Internals();
282 }
283
284 protected void testReaderV2Internals() throws IOException {
285 if(includesTag) {
286 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
287 }
288 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
289 for (boolean pread : new boolean[] { false, true }) {
290 LOG.info("testReaderV2: Compression algorithm: " + algo +
291 ", pread=" + pread);
292 Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
293 + algo);
294 FSDataOutputStream os = fs.create(path);
295 HFileContext meta = new HFileContextBuilder()
296 .withCompression(algo)
297 .withIncludesMvcc(includesMemstoreTS)
298 .withIncludesTags(includesTag)
299 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
300 .build();
301 HFileBlock.Writer hbw = new HFileBlock.Writer(null,
302 meta);
303 long totalSize = 0;
304 for (int blockId = 0; blockId < 2; ++blockId) {
305 DataOutputStream dos = hbw.startWriting(BlockType.DATA);
306 for (int i = 0; i < 1234; ++i)
307 dos.writeInt(i);
308 hbw.writeHeaderAndData(os);
309 totalSize += hbw.getOnDiskSizeWithHeader();
310 }
311 os.close();
312
313 FSDataInputStream is = fs.open(path);
314 meta = new HFileContextBuilder()
315 .withHBaseCheckSum(true)
316 .withIncludesMvcc(includesMemstoreTS)
317 .withIncludesTags(includesTag)
318 .withCompression(algo).build();
319 HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
320 HFileBlock b = hbr.readBlockData(0, -1, -1, pread, false);
321 is.close();
322 assertEquals(0, HFile.getAndResetChecksumFailuresCount());
323
324 b.sanityCheck();
325 assertEquals(4936, b.getUncompressedSizeWithoutHeader());
326 assertEquals(algo == GZ ? 2173 : 4936,
327 b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
328 HFileBlock expected = b;
329
330 if (algo == GZ) {
331 is = fs.open(path);
332 hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
333 b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE +
334 b.totalChecksumBytes(), -1, pread, false);
335 assertEquals(expected, b);
336 int wrongCompressedSize = 2172;
337 try {
338 b = hbr.readBlockData(0, wrongCompressedSize
339 + HConstants.HFILEBLOCK_HEADER_SIZE, -1, pread, false);
340 fail("Exception expected");
341 } catch (IOException ex) {
342 String expectedPrefix = "On-disk size without header provided is "
343 + wrongCompressedSize + ", but block header contains "
344 + b.getOnDiskSizeWithoutHeader() + ".";
345 assertTrue("Invalid exception message: '" + ex.getMessage()
346 + "'.\nMessage is expected to start with: '" + expectedPrefix
347 + "'", ex.getMessage().startsWith(expectedPrefix));
348 }
349 is.close();
350 }
351 }
352 }
353 }
354
355
356
357
358
359 @Test
360 public void testDataBlockEncoding() throws IOException {
361 testInternals();
362 }
363
364 private void testInternals() throws IOException {
365 final int numBlocks = 5;
366 if(includesTag) {
367 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
368 }
369 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
370 for (boolean pread : new boolean[] { false, true }) {
371 for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
372 Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
373 + algo + "_" + encoding.toString());
374 FSDataOutputStream os = fs.create(path);
375 HFileDataBlockEncoder dataBlockEncoder = (encoding != DataBlockEncoding.NONE) ?
376 new HFileDataBlockEncoderImpl(encoding) : NoOpDataBlockEncoder.INSTANCE;
377 HFileContext meta = new HFileContextBuilder()
378 .withCompression(algo)
379 .withIncludesMvcc(includesMemstoreTS)
380 .withIncludesTags(includesTag)
381 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
382 .build();
383 HFileBlock.Writer hbw = new HFileBlock.Writer(dataBlockEncoder, meta);
384 long totalSize = 0;
385 final List<Integer> encodedSizes = new ArrayList<Integer>();
386 final List<ByteBuffer> encodedBlocks = new ArrayList<ByteBuffer>();
387 for (int blockId = 0; blockId < numBlocks; ++blockId) {
388 hbw.startWriting(BlockType.DATA);
389 writeTestKeyValues(hbw, blockId, includesMemstoreTS, includesTag);
390 hbw.writeHeaderAndData(os);
391 int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE;
392 byte[] encodedResultWithHeader = hbw.getUncompressedBufferWithHeader().array();
393 final int encodedSize = encodedResultWithHeader.length - headerLen;
394 if (encoding != DataBlockEncoding.NONE) {
395
396
397 headerLen += DataBlockEncoding.ID_SIZE;
398 }
399 byte[] encodedDataSection =
400 new byte[encodedResultWithHeader.length - headerLen];
401 System.arraycopy(encodedResultWithHeader, headerLen,
402 encodedDataSection, 0, encodedDataSection.length);
403 final ByteBuffer encodedBuf =
404 ByteBuffer.wrap(encodedDataSection);
405 encodedSizes.add(encodedSize);
406 encodedBlocks.add(encodedBuf);
407 totalSize += hbw.getOnDiskSizeWithHeader();
408 }
409 os.close();
410
411 FSDataInputStream is = fs.open(path);
412 meta = new HFileContextBuilder()
413 .withHBaseCheckSum(true)
414 .withCompression(algo)
415 .withIncludesMvcc(includesMemstoreTS)
416 .withIncludesTags(includesTag)
417 .build();
418 HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
419 hbr.setDataBlockEncoder(dataBlockEncoder);
420 hbr.setIncludesMemstoreTS(includesMemstoreTS);
421 HFileBlock blockFromHFile, blockUnpacked;
422 int pos = 0;
423 for (int blockId = 0; blockId < numBlocks; ++blockId) {
424 blockFromHFile = hbr.readBlockData(pos, -1, -1, pread, false);
425 assertEquals(0, HFile.getChecksumFailuresCount());
426 blockFromHFile.sanityCheck();
427 pos += blockFromHFile.getOnDiskSizeWithHeader();
428 assertEquals((int) encodedSizes.get(blockId),
429 blockFromHFile.getUncompressedSizeWithoutHeader());
430 assertEquals(meta.isCompressedOrEncrypted(), !blockFromHFile.isUnpacked());
431 long packedHeapsize = blockFromHFile.heapSize();
432 blockUnpacked = blockFromHFile.unpack(meta, hbr);
433 assertTrue(blockUnpacked.isUnpacked());
434 if (meta.isCompressedOrEncrypted()) {
435 LOG.info("packedHeapsize=" + packedHeapsize + ", unpackedHeadsize=" + blockUnpacked
436 .heapSize());
437 assertFalse(packedHeapsize == blockUnpacked.heapSize());
438 assertTrue("Packed heapSize should be < unpacked heapSize",
439 packedHeapsize < blockUnpacked.heapSize());
440 }
441 ByteBuffer actualBuffer = blockUnpacked.getBufferWithoutHeader();
442 if (encoding != DataBlockEncoding.NONE) {
443
444 assertEquals(
445 "Unexpected first byte with " + buildMessageDetails(algo, encoding, pread),
446 Long.toHexString(0), Long.toHexString(actualBuffer.get(0)));
447 assertEquals(
448 "Unexpected second byte with " + buildMessageDetails(algo, encoding, pread),
449 Long.toHexString(encoding.getId()), Long.toHexString(actualBuffer.get(1)));
450 actualBuffer.position(2);
451 actualBuffer = actualBuffer.slice();
452 }
453
454 ByteBuffer expectedBuffer = encodedBlocks.get(blockId);
455 expectedBuffer.rewind();
456
457
458 assertBuffersEqual(expectedBuffer, actualBuffer, algo, encoding, pread);
459
460
461 for (boolean reuseBuffer : new boolean[] { false, true }) {
462 ByteBuffer serialized = ByteBuffer.allocate(blockFromHFile.getSerializedLength());
463 blockFromHFile.serialize(serialized);
464 HFileBlock deserialized =
465 (HFileBlock) blockFromHFile.getDeserializer().deserialize(serialized, reuseBuffer);
466 assertEquals(
467 "Serialization did not preserve block state. reuseBuffer=" + reuseBuffer,
468 blockFromHFile, deserialized);
469
470 if (blockFromHFile != blockUnpacked) {
471 assertEquals("Deserializaed block cannot be unpacked correctly.",
472 blockUnpacked, deserialized.unpack(meta, hbr));
473 }
474 }
475 }
476 is.close();
477 }
478 }
479 }
480 }
481
482 static String buildMessageDetails(Algorithm compression, DataBlockEncoding encoding,
483 boolean pread) {
484 return String.format("compression %s, encoding %s, pread %s", compression, encoding, pread);
485 }
486
487 static void assertBuffersEqual(ByteBuffer expectedBuffer,
488 ByteBuffer actualBuffer, Compression.Algorithm compression,
489 DataBlockEncoding encoding, boolean pread) {
490 if (!actualBuffer.equals(expectedBuffer)) {
491 int prefix = 0;
492 int minLimit = Math.min(expectedBuffer.limit(), actualBuffer.limit());
493 while (prefix < minLimit &&
494 expectedBuffer.get(prefix) == actualBuffer.get(prefix)) {
495 prefix++;
496 }
497
498 fail(String.format(
499 "Content mismatch for %s, commonPrefix %d, expected %s, got %s",
500 buildMessageDetails(compression, encoding, pread), prefix,
501 nextBytesToStr(expectedBuffer, prefix),
502 nextBytesToStr(actualBuffer, prefix)));
503 }
504 }
505
506
507
508
509
510 private static String nextBytesToStr(ByteBuffer buf, int pos) {
511 int maxBytes = buf.limit() - pos;
512 int numBytes = Math.min(16, maxBytes);
513 return Bytes.toStringBinary(buf.array(), buf.arrayOffset() + pos,
514 numBytes) + (numBytes < maxBytes ? "..." : "");
515 }
516
517 @Test
518 public void testPreviousOffset() throws IOException {
519 testPreviousOffsetInternals();
520 }
521
522 protected void testPreviousOffsetInternals() throws IOException {
523
524 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
525 for (boolean pread : BOOLEAN_VALUES) {
526 for (boolean cacheOnWrite : BOOLEAN_VALUES) {
527 Random rand = defaultRandom();
528 LOG.info("testPreviousOffset:Compression algorithm: " + algo +
529 ", pread=" + pread +
530 ", cacheOnWrite=" + cacheOnWrite);
531 Path path = new Path(TEST_UTIL.getDataTestDir(), "prev_offset");
532 List<Long> expectedOffsets = new ArrayList<Long>();
533 List<Long> expectedPrevOffsets = new ArrayList<Long>();
534 List<BlockType> expectedTypes = new ArrayList<BlockType>();
535 List<ByteBuffer> expectedContents = cacheOnWrite
536 ? new ArrayList<ByteBuffer>() : null;
537 long totalSize = writeBlocks(rand, algo, path, expectedOffsets,
538 expectedPrevOffsets, expectedTypes, expectedContents);
539
540 FSDataInputStream is = fs.open(path);
541 HFileContext meta = new HFileContextBuilder()
542 .withHBaseCheckSum(true)
543 .withIncludesMvcc(includesMemstoreTS)
544 .withIncludesTags(includesTag)
545 .withCompression(algo).build();
546 HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
547 long curOffset = 0;
548 for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
549 if (!pread) {
550 assertEquals(is.getPos(), curOffset + (i == 0 ? 0 :
551 HConstants.HFILEBLOCK_HEADER_SIZE));
552 }
553
554 assertEquals(expectedOffsets.get(i).longValue(), curOffset);
555 if (detailedLogging) {
556 LOG.info("Reading block #" + i + " at offset " + curOffset);
557 }
558 HFileBlock b = hbr.readBlockData(curOffset, -1, -1, pread, false);
559 if (detailedLogging) {
560 LOG.info("Block #" + i + ": " + b);
561 }
562 assertEquals("Invalid block #" + i + "'s type:",
563 expectedTypes.get(i), b.getBlockType());
564 assertEquals("Invalid previous block offset for block " + i
565 + " of " + "type " + b.getBlockType() + ":",
566 (long) expectedPrevOffsets.get(i), b.getPrevBlockOffset());
567 b.sanityCheck();
568 assertEquals(curOffset, b.getOffset());
569
570
571
572 HFileBlock b2 = hbr.readBlockData(curOffset,
573 b.getOnDiskSizeWithHeader(), -1, pread, false);
574 b2.sanityCheck();
575
576 assertEquals(b.getBlockType(), b2.getBlockType());
577 assertEquals(b.getOnDiskSizeWithoutHeader(),
578 b2.getOnDiskSizeWithoutHeader());
579 assertEquals(b.getOnDiskSizeWithHeader(),
580 b2.getOnDiskSizeWithHeader());
581 assertEquals(b.getUncompressedSizeWithoutHeader(),
582 b2.getUncompressedSizeWithoutHeader());
583 assertEquals(b.getPrevBlockOffset(), b2.getPrevBlockOffset());
584 assertEquals(curOffset, b2.getOffset());
585 assertEquals(b.getBytesPerChecksum(), b2.getBytesPerChecksum());
586 assertEquals(b.getOnDiskDataSizeWithHeader(),
587 b2.getOnDiskDataSizeWithHeader());
588 assertEquals(0, HFile.getAndResetChecksumFailuresCount());
589
590 curOffset += b.getOnDiskSizeWithHeader();
591
592 if (cacheOnWrite) {
593
594
595
596 b = b.unpack(meta, hbr);
597
598
599 ByteBuffer bufRead = b.getBufferWithHeader();
600 ByteBuffer bufExpected = expectedContents.get(i);
601 boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(),
602 bufRead.arrayOffset(),
603 bufRead.limit() - b.totalChecksumBytes(),
604 bufExpected.array(), bufExpected.arrayOffset(),
605 bufExpected.limit()) == 0;
606 String wrongBytesMsg = "";
607
608 if (!bytesAreCorrect) {
609
610
611 wrongBytesMsg = "Expected bytes in block #" + i + " (algo="
612 + algo + ", pread=" + pread
613 + ", cacheOnWrite=" + cacheOnWrite + "):\n";
614 wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(),
615 bufExpected.arrayOffset(), Math.min(32 + 10, bufExpected.limit()))
616 + ", actual:\n"
617 + Bytes.toStringBinary(bufRead.array(),
618 bufRead.arrayOffset(), Math.min(32 + 10, bufRead.limit()));
619 if (detailedLogging) {
620 LOG.warn("expected header" +
621 HFileBlock.toStringHeader(bufExpected) +
622 "\nfound header" +
623 HFileBlock.toStringHeader(bufRead));
624 LOG.warn("bufread offset " + bufRead.arrayOffset() +
625 " limit " + bufRead.limit() +
626 " expected offset " + bufExpected.arrayOffset() +
627 " limit " + bufExpected.limit());
628 LOG.warn(wrongBytesMsg);
629 }
630 }
631 assertTrue(wrongBytesMsg, bytesAreCorrect);
632 }
633 }
634
635 assertEquals(curOffset, fs.getFileStatus(path).getLen());
636 is.close();
637 }
638 }
639 }
640 }
641
642 private Random defaultRandom() {
643 return new Random(189237);
644 }
645
646 private class BlockReaderThread implements Callable<Boolean> {
647 private final String clientId;
648 private final HFileBlock.FSReader hbr;
649 private final List<Long> offsets;
650 private final List<BlockType> types;
651 private final long fileSize;
652
653 public BlockReaderThread(String clientId,
654 HFileBlock.FSReader hbr, List<Long> offsets, List<BlockType> types,
655 long fileSize) {
656 this.clientId = clientId;
657 this.offsets = offsets;
658 this.hbr = hbr;
659 this.types = types;
660 this.fileSize = fileSize;
661 }
662
663 @Override
664 public Boolean call() throws Exception {
665 Random rand = new Random(clientId.hashCode());
666 long endTime = System.currentTimeMillis() + 10000;
667 int numBlocksRead = 0;
668 int numPositionalRead = 0;
669 int numWithOnDiskSize = 0;
670 while (System.currentTimeMillis() < endTime) {
671 int blockId = rand.nextInt(NUM_TEST_BLOCKS);
672 long offset = offsets.get(blockId);
673 boolean pread = rand.nextBoolean();
674 boolean withOnDiskSize = rand.nextBoolean();
675 long expectedSize =
676 (blockId == NUM_TEST_BLOCKS - 1 ? fileSize
677 : offsets.get(blockId + 1)) - offset;
678
679 HFileBlock b;
680 try {
681 long onDiskSizeArg = withOnDiskSize ? expectedSize : -1;
682 b = hbr.readBlockData(offset, onDiskSizeArg, -1, pread, false);
683 } catch (IOException ex) {
684 LOG.error("Error in client " + clientId + " trying to read block at "
685 + offset + ", pread=" + pread + ", withOnDiskSize=" +
686 withOnDiskSize, ex);
687 return false;
688 }
689
690 assertEquals(types.get(blockId), b.getBlockType());
691 assertEquals(expectedSize, b.getOnDiskSizeWithHeader());
692 assertEquals(offset, b.getOffset());
693
694 ++numBlocksRead;
695 if (pread)
696 ++numPositionalRead;
697 if (withOnDiskSize)
698 ++numWithOnDiskSize;
699 }
700 LOG.info("Client " + clientId + " successfully read " + numBlocksRead +
701 " blocks (with pread: " + numPositionalRead + ", with onDiskSize " +
702 "specified: " + numWithOnDiskSize + ")");
703
704 return true;
705 }
706
707 }
708
709 @Test
710 public void testConcurrentReading() throws Exception {
711
712
713
714
715
716
717 Assume.assumeTrue(!WINDOWS);
718
719 testConcurrentReadingInternals();
720 }
721
722 protected void testConcurrentReadingInternals() throws IOException,
723 InterruptedException, ExecutionException {
724 for (Compression.Algorithm compressAlgo : COMPRESSION_ALGORITHMS) {
725 Path path =
726 new Path(TEST_UTIL.getDataTestDir(), "concurrent_reading");
727 Random rand = defaultRandom();
728 List<Long> offsets = new ArrayList<Long>();
729 List<BlockType> types = new ArrayList<BlockType>();
730 writeBlocks(rand, compressAlgo, path, offsets, null, types, null);
731 FSDataInputStream is = fs.open(path);
732 long fileSize = fs.getFileStatus(path).getLen();
733 HFileContext meta = new HFileContextBuilder()
734 .withHBaseCheckSum(true)
735 .withIncludesMvcc(includesMemstoreTS)
736 .withIncludesTags(includesTag)
737 .withCompression(compressAlgo)
738 .build();
739 HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, fileSize, meta);
740
741 Executor exec = Executors.newFixedThreadPool(NUM_READER_THREADS);
742 ExecutorCompletionService<Boolean> ecs =
743 new ExecutorCompletionService<Boolean>(exec);
744
745 for (int i = 0; i < NUM_READER_THREADS; ++i) {
746 ecs.submit(new BlockReaderThread("reader_" + (char) ('A' + i), hbr,
747 offsets, types, fileSize));
748 }
749
750 for (int i = 0; i < NUM_READER_THREADS; ++i) {
751 Future<Boolean> result = ecs.take();
752 assertTrue(result.get());
753 if (detailedLogging) {
754 LOG.info(String.valueOf(i + 1)
755 + " reader threads finished successfully (algo=" + compressAlgo
756 + ")");
757 }
758 }
759
760 is.close();
761 }
762 }
763
764 private long writeBlocks(Random rand, Compression.Algorithm compressAlgo,
765 Path path, List<Long> expectedOffsets, List<Long> expectedPrevOffsets,
766 List<BlockType> expectedTypes, List<ByteBuffer> expectedContents
767 ) throws IOException {
768 boolean cacheOnWrite = expectedContents != null;
769 FSDataOutputStream os = fs.create(path);
770 HFileContext meta = new HFileContextBuilder()
771 .withHBaseCheckSum(true)
772 .withIncludesMvcc(includesMemstoreTS)
773 .withIncludesTags(includesTag)
774 .withCompression(compressAlgo)
775 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
776 .build();
777 HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
778 Map<BlockType, Long> prevOffsetByType = new HashMap<BlockType, Long>();
779 long totalSize = 0;
780 for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
781 long pos = os.getPos();
782 int blockTypeOrdinal = rand.nextInt(BlockType.values().length);
783 if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) {
784 blockTypeOrdinal = BlockType.DATA.ordinal();
785 }
786 BlockType bt = BlockType.values()[blockTypeOrdinal];
787 DataOutputStream dos = hbw.startWriting(bt);
788 int size = rand.nextInt(500);
789 for (int j = 0; j < size; ++j) {
790
791 dos.writeShort(i + 1);
792 dos.writeInt(j + 1);
793 }
794
795 if (expectedOffsets != null)
796 expectedOffsets.add(os.getPos());
797
798 if (expectedPrevOffsets != null) {
799 Long prevOffset = prevOffsetByType.get(bt);
800 expectedPrevOffsets.add(prevOffset != null ? prevOffset : -1);
801 prevOffsetByType.put(bt, os.getPos());
802 }
803
804 expectedTypes.add(bt);
805
806 hbw.writeHeaderAndData(os);
807 totalSize += hbw.getOnDiskSizeWithHeader();
808
809 if (cacheOnWrite)
810 expectedContents.add(hbw.getUncompressedBufferWithHeader());
811
812 if (detailedLogging) {
813 LOG.info("Written block #" + i + " of type " + bt
814 + ", uncompressed size " + hbw.getUncompressedSizeWithoutHeader()
815 + ", packed size " + hbw.getOnDiskSizeWithoutHeader()
816 + " at offset " + pos);
817 }
818 }
819 os.close();
820 LOG.info("Created a temporary file at " + path + ", "
821 + fs.getFileStatus(path).getLen() + " byte, compression=" +
822 compressAlgo);
823 return totalSize;
824 }
825
826 @Test
827 public void testBlockHeapSize() {
828 testBlockHeapSizeInternals();
829 }
830
831 protected void testBlockHeapSizeInternals() {
832 if (ClassSize.is32BitJVM()) {
833 assertTrue(HFileBlock.BYTE_BUFFER_HEAP_SIZE == 64);
834 } else {
835 assertTrue(HFileBlock.BYTE_BUFFER_HEAP_SIZE == 80);
836 }
837
838 for (int size : new int[] { 100, 256, 12345 }) {
839 byte[] byteArr = new byte[HConstants.HFILEBLOCK_HEADER_SIZE + size];
840 ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
841 HFileContext meta = new HFileContextBuilder()
842 .withIncludesMvcc(includesMemstoreTS)
843 .withIncludesTags(includesTag)
844 .withHBaseCheckSum(false)
845 .withCompression(Algorithm.NONE)
846 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
847 .withChecksumType(ChecksumType.NULL).build();
848 HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf,
849 HFileBlock.FILL_HEADER, -1,
850 0, meta);
851 long byteBufferExpectedSize =
852 ClassSize.align(ClassSize.estimateBase(buf.getClass(), true)
853 + HConstants.HFILEBLOCK_HEADER_SIZE + size);
854 long hfileMetaSize = ClassSize.align(ClassSize.estimateBase(HFileContext.class, true));
855 long hfileBlockExpectedSize =
856 ClassSize.align(ClassSize.estimateBase(HFileBlock.class, true));
857 long expected = hfileBlockExpectedSize + byteBufferExpectedSize + hfileMetaSize;
858 assertEquals("Block data size: " + size + ", byte buffer expected " +
859 "size: " + byteBufferExpectedSize + ", HFileBlock class expected " +
860 "size: " + hfileBlockExpectedSize + ";", expected,
861 block.heapSize());
862 }
863 }
864 }