View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
22  import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
23  import static org.junit.Assert.*;
24  
25  import java.io.ByteArrayOutputStream;
26  import java.io.DataOutputStream;
27  import java.io.IOException;
28  import java.io.OutputStream;
29  import java.nio.ByteBuffer;
30  import java.util.ArrayList;
31  import java.util.Collection;
32  import java.util.Collections;
33  import java.util.HashMap;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.Random;
37  import java.util.concurrent.Callable;
38  import java.util.concurrent.ExecutionException;
39  import java.util.concurrent.Executor;
40  import java.util.concurrent.ExecutorCompletionService;
41  import java.util.concurrent.Executors;
42  import java.util.concurrent.Future;
43  
44  import org.apache.commons.logging.Log;
45  import org.apache.commons.logging.LogFactory;
46  import org.apache.hadoop.fs.FSDataInputStream;
47  import org.apache.hadoop.fs.FSDataOutputStream;
48  import org.apache.hadoop.fs.FileSystem;
49  import org.apache.hadoop.fs.Path;
50  import org.apache.hadoop.hbase.HBaseTestingUtility;
51  import org.apache.hadoop.hbase.HConstants;
52  import org.apache.hadoop.hbase.KeyValue;
53  import org.apache.hadoop.hbase.testclassification.MediumTests;
54  import org.apache.hadoop.hbase.Tag;
55  import org.apache.hadoop.hbase.fs.HFileSystem;
56  import org.apache.hadoop.hbase.io.compress.Compression;
57  import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
58  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
59  import org.apache.hadoop.hbase.util.Bytes;
60  import org.apache.hadoop.hbase.util.ChecksumType;
61  import org.apache.hadoop.hbase.util.ClassSize;
62  import org.apache.hadoop.io.WritableUtils;
63  import org.apache.hadoop.io.compress.Compressor;
64  import org.junit.Assume;
65  import org.junit.Before;
66  import org.junit.Test;
67  import org.junit.experimental.categories.Category;
68  import org.junit.runner.RunWith;
69  import org.junit.runners.Parameterized;
70  import org.junit.runners.Parameterized.Parameters;
71  import org.mockito.Mockito;
72  
73  @Category(MediumTests.class)
74  @RunWith(Parameterized.class)
75  public class TestHFileBlock {
76    // change this value to activate more logs
77    private static final boolean detailedLogging = false;
78    private static final boolean[] BOOLEAN_VALUES = new boolean[] { false, true };
79  
80    private static final Log LOG = LogFactory.getLog(TestHFileBlock.class);
81  
82    static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ };
83  
84    private static final int NUM_TEST_BLOCKS = 1000;
85    private static final int NUM_READER_THREADS = 26;
86  
87    /** Set to true on Windows platforms */
88    private static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
89  
90    // Used to generate KeyValues
91    private static int NUM_KEYVALUES = 50;
92    private static int FIELD_LENGTH = 10;
93    private static float CHANCE_TO_REPEAT = 0.6f;
94  
95    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
96    private FileSystem fs;
97  
98    private final boolean includesMemstoreTS;
99    private final boolean includesTag;
100   public TestHFileBlock(boolean includesMemstoreTS, boolean includesTag) {
101     this.includesMemstoreTS = includesMemstoreTS;
102     this.includesTag = includesTag;
103   }
104 
105   @Parameters
106   public static Collection<Object[]> parameters() {
107     return HBaseTestingUtility.MEMSTORETS_TAGS_PARAMETRIZED;
108   }
109 
110   @Before
111   public void setUp() throws IOException {
112     fs = HFileSystem.get(TEST_UTIL.getConfiguration());
113   }
114 
115   static void writeTestBlockContents(DataOutputStream dos) throws IOException {
116     // This compresses really well.
117     for (int i = 0; i < 1000; ++i)
118       dos.writeInt(i / 100);
119   }
120 
121   static int writeTestKeyValues(HFileBlock.Writer hbw, int seed, boolean includesMemstoreTS,
122       boolean useTag) throws IOException {
123     List<KeyValue> keyValues = new ArrayList<KeyValue>();
124     Random randomizer = new Random(42l + seed); // just any fixed number
125 
126     // generate keyValues
127     for (int i = 0; i < NUM_KEYVALUES; ++i) {
128       byte[] row;
129       long timestamp;
130       byte[] family;
131       byte[] qualifier;
132       byte[] value;
133 
134       // generate it or repeat, it should compress well
135       if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
136         row = keyValues.get(randomizer.nextInt(keyValues.size())).getRow();
137       } else {
138         row = new byte[FIELD_LENGTH];
139         randomizer.nextBytes(row);
140       }
141       if (0 == i) {
142         family = new byte[FIELD_LENGTH];
143         randomizer.nextBytes(family);
144       } else {
145         family = keyValues.get(0).getFamily();
146       }
147       if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
148         qualifier = keyValues.get(
149             randomizer.nextInt(keyValues.size())).getQualifier();
150       } else {
151         qualifier = new byte[FIELD_LENGTH];
152         randomizer.nextBytes(qualifier);
153       }
154       if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
155         value = keyValues.get(randomizer.nextInt(keyValues.size())).getValue();
156       } else {
157         value = new byte[FIELD_LENGTH];
158         randomizer.nextBytes(value);
159       }
160       if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
161         timestamp = keyValues.get(
162             randomizer.nextInt(keyValues.size())).getTimestamp();
163       } else {
164         timestamp = randomizer.nextLong();
165       }
166       if (!useTag) {
167         keyValues.add(new KeyValue(row, family, qualifier, timestamp, value));
168       } else {
169         keyValues.add(new KeyValue(row, family, qualifier, timestamp, value, new Tag[] { new Tag(
170             (byte) 1, Bytes.toBytes("myTagVal")) }));
171       }
172     }
173 
174     // sort it and write to stream
175     int totalSize = 0;
176     Collections.sort(keyValues, KeyValue.COMPARATOR);
177 
178     for (KeyValue kv : keyValues) {
179       totalSize += kv.getLength();
180       if (includesMemstoreTS) {
181         long memstoreTS = randomizer.nextLong();
182         kv.setSequenceId(memstoreTS);
183         totalSize += WritableUtils.getVIntSize(memstoreTS);
184       }
185       hbw.write(kv);
186     }
187     return totalSize;
188   }
189 
190   public byte[] createTestV1Block(Compression.Algorithm algo)
191       throws IOException {
192     Compressor compressor = algo.getCompressor();
193     ByteArrayOutputStream baos = new ByteArrayOutputStream();
194     OutputStream os = algo.createCompressionStream(baos, compressor, 0);
195     DataOutputStream dos = new DataOutputStream(os);
196     BlockType.META.write(dos); // Let's make this a meta block.
197     writeTestBlockContents(dos);
198     dos.flush();
199     algo.returnCompressor(compressor);
200     return baos.toByteArray();
201   }
202 
203   static HFileBlock.Writer createTestV2Block(Compression.Algorithm algo,
204       boolean includesMemstoreTS, boolean includesTag) throws IOException {
205     final BlockType blockType = BlockType.DATA;
206     HFileContext meta = new HFileContextBuilder()
207                         .withCompression(algo)
208                         .withIncludesMvcc(includesMemstoreTS)
209                         .withIncludesTags(includesTag)
210                         .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
211                         .build();
212     HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
213     DataOutputStream dos = hbw.startWriting(blockType);
214     writeTestBlockContents(dos);
215     dos.flush();
216     hbw.ensureBlockReady();
217     assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
218     hbw.release();
219     return hbw;
220   }
221 
222   public String createTestBlockStr(Compression.Algorithm algo,
223       int correctLength, boolean useTag) throws IOException {
224     HFileBlock.Writer hbw = createTestV2Block(algo, includesMemstoreTS, useTag);
225     byte[] testV2Block = hbw.getHeaderAndDataForTest();
226     int osOffset = HConstants.HFILEBLOCK_HEADER_SIZE + 9;
227     if (testV2Block.length == correctLength) {
228       // Force-set the "OS" field of the gzip header to 3 (Unix) to avoid
229       // variations across operating systems.
230       // See http://www.gzip.org/zlib/rfc-gzip.html for gzip format.
231       // We only make this change when the compressed block length matches.
232       // Otherwise, there are obviously other inconsistencies.
233       testV2Block[osOffset] = 3;
234     }
235     return Bytes.toStringBinary(testV2Block);
236   }
237 
238   @Test
239   public void testNoCompression() throws IOException {
240     CacheConfig cacheConf = Mockito.mock(CacheConfig.class);
241     Mockito.when(cacheConf.isBlockCacheEnabled()).thenReturn(false);
242 
243     HFileBlock block =
244       createTestV2Block(NONE, includesMemstoreTS, false).getBlockForCaching(cacheConf);
245     assertEquals(4000, block.getUncompressedSizeWithoutHeader());
246     assertEquals(4004, block.getOnDiskSizeWithoutHeader());
247     assertTrue(block.isUnpacked());
248   }
249 
250   @Test
251   public void testGzipCompression() throws IOException {
252     final String correctTestBlockStr =
253         "DATABLK*\\x00\\x00\\x00>\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF"
254             + "\\xFF\\xFF\\xFF\\xFF"
255             + "\\x01\\x00\\x00@\\x00\\x00\\x00\\x00["
256             // gzip-compressed block: http://www.gzip.org/zlib/rfc-gzip.html
257             + "\\x1F\\x8B"  // gzip magic signature
258             + "\\x08"  // Compression method: 8 = "deflate"
259             + "\\x00"  // Flags
260             + "\\x00\\x00\\x00\\x00"  // mtime
261             + "\\x00"  // XFL (extra flags)
262             // OS (0 = FAT filesystems, 3 = Unix). However, this field
263             // sometimes gets set to 0 on Linux and Mac, so we reset it to 3.
264             // This appears to be a difference caused by the availability
265             // (and use) of the native GZ codec.
266             + "\\x03"
267             + "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa"
268             + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c"
269             + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\x5Cs\\xA0\\x0F\\x00\\x00"
270             + "\\x00\\x00\\x00\\x00"; //  4 byte checksum (ignored)
271     final int correctGzipBlockLength = 95;
272     final String testBlockStr = createTestBlockStr(GZ, correctGzipBlockLength, false);
273     // We ignore the block checksum because createTestBlockStr can change the
274     // gzip header after the block is produced
275     assertEquals(correctTestBlockStr.substring(0, correctGzipBlockLength - 4),
276       testBlockStr.substring(0, correctGzipBlockLength - 4));
277   }
278 
279   @Test
280   public void testReaderV2() throws IOException {
281     testReaderV2Internals();
282   }
283 
284   protected void testReaderV2Internals() throws IOException {
285     if(includesTag) {
286       TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
287     }
288     for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
289       for (boolean pread : new boolean[] { false, true }) {
290           LOG.info("testReaderV2: Compression algorithm: " + algo +
291                    ", pread=" + pread);
292         Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
293             + algo);
294         FSDataOutputStream os = fs.create(path);
295         HFileContext meta = new HFileContextBuilder()
296                            .withCompression(algo)
297                            .withIncludesMvcc(includesMemstoreTS)
298                            .withIncludesTags(includesTag)
299                            .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
300                            .build();
301         HFileBlock.Writer hbw = new HFileBlock.Writer(null,
302            meta);
303         long totalSize = 0;
304         for (int blockId = 0; blockId < 2; ++blockId) {
305           DataOutputStream dos = hbw.startWriting(BlockType.DATA);
306           for (int i = 0; i < 1234; ++i)
307             dos.writeInt(i);
308           hbw.writeHeaderAndData(os);
309           totalSize += hbw.getOnDiskSizeWithHeader();
310         }
311         os.close();
312 
313         FSDataInputStream is = fs.open(path);
314         meta = new HFileContextBuilder()
315         .withHBaseCheckSum(true)
316         .withIncludesMvcc(includesMemstoreTS)
317         .withIncludesTags(includesTag)
318         .withCompression(algo).build();
319         HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
320         HFileBlock b = hbr.readBlockData(0, -1, -1, pread, false);
321         is.close();
322         assertEquals(0, HFile.getAndResetChecksumFailuresCount());
323 
324         b.sanityCheck();
325         assertEquals(4936, b.getUncompressedSizeWithoutHeader());
326         assertEquals(algo == GZ ? 2173 : 4936,
327                      b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
328         HFileBlock expected = b;
329 
330         if (algo == GZ) {
331           is = fs.open(path);
332           hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
333           b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE +
334                                 b.totalChecksumBytes(), -1, pread, false);
335           assertEquals(expected, b);
336           int wrongCompressedSize = 2172;
337           try {
338             b = hbr.readBlockData(0, wrongCompressedSize
339                 + HConstants.HFILEBLOCK_HEADER_SIZE, -1, pread, false);
340             fail("Exception expected");
341           } catch (IOException ex) {
342             String expectedPrefix = "On-disk size without header provided is "
343                 + wrongCompressedSize + ", but block header contains "
344                 + b.getOnDiskSizeWithoutHeader() + ".";
345             assertTrue("Invalid exception message: '" + ex.getMessage()
346                 + "'.\nMessage is expected to start with: '" + expectedPrefix
347                 + "'", ex.getMessage().startsWith(expectedPrefix));
348           }
349           is.close();
350         }
351       }
352     }
353   }
354 
355   /**
356    * Test encoding/decoding data blocks.
357    * @throws IOException a bug or a problem with temporary files.
358    */
359   @Test
360   public void testDataBlockEncoding() throws IOException {
361     testInternals();
362   }
363 
364   private void testInternals() throws IOException {
365     final int numBlocks = 5;
366     if(includesTag) {
367       TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
368     }
369     for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
370       for (boolean pread : new boolean[] { false, true }) {
371         for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
372           Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
373               + algo + "_" + encoding.toString());
374           FSDataOutputStream os = fs.create(path);
375           HFileDataBlockEncoder dataBlockEncoder = (encoding != DataBlockEncoding.NONE) ?
376               new HFileDataBlockEncoderImpl(encoding) : NoOpDataBlockEncoder.INSTANCE;
377           HFileContext meta = new HFileContextBuilder()
378                               .withCompression(algo)
379                               .withIncludesMvcc(includesMemstoreTS)
380                               .withIncludesTags(includesTag)
381                               .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
382                               .build();
383           HFileBlock.Writer hbw = new HFileBlock.Writer(dataBlockEncoder, meta);
384           long totalSize = 0;
385           final List<Integer> encodedSizes = new ArrayList<Integer>();
386           final List<ByteBuffer> encodedBlocks = new ArrayList<ByteBuffer>();
387           for (int blockId = 0; blockId < numBlocks; ++blockId) {
388             hbw.startWriting(BlockType.DATA);
389             writeTestKeyValues(hbw, blockId, includesMemstoreTS, includesTag);
390             hbw.writeHeaderAndData(os);
391             int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE;
392             byte[] encodedResultWithHeader = hbw.getUncompressedBufferWithHeader().array();
393             final int encodedSize = encodedResultWithHeader.length - headerLen;
394             if (encoding != DataBlockEncoding.NONE) {
395               // We need to account for the two-byte encoding algorithm ID that
396               // comes after the 24-byte block header but before encoded KVs.
397               headerLen += DataBlockEncoding.ID_SIZE;
398             }
399             byte[] encodedDataSection =
400                 new byte[encodedResultWithHeader.length - headerLen];
401             System.arraycopy(encodedResultWithHeader, headerLen,
402                 encodedDataSection, 0, encodedDataSection.length);
403             final ByteBuffer encodedBuf =
404                 ByteBuffer.wrap(encodedDataSection);
405             encodedSizes.add(encodedSize);
406             encodedBlocks.add(encodedBuf);
407             totalSize += hbw.getOnDiskSizeWithHeader();
408           }
409           os.close();
410 
411           FSDataInputStream is = fs.open(path);
412           meta = new HFileContextBuilder()
413                 .withHBaseCheckSum(true)
414                 .withCompression(algo)
415                 .withIncludesMvcc(includesMemstoreTS)
416                 .withIncludesTags(includesTag)
417                 .build();
418           HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
419           hbr.setDataBlockEncoder(dataBlockEncoder);
420           hbr.setIncludesMemstoreTS(includesMemstoreTS);
421           HFileBlock blockFromHFile, blockUnpacked;
422           int pos = 0;
423           for (int blockId = 0; blockId < numBlocks; ++blockId) {
424             blockFromHFile = hbr.readBlockData(pos, -1, -1, pread, false);
425             assertEquals(0, HFile.getChecksumFailuresCount());
426             blockFromHFile.sanityCheck();
427             pos += blockFromHFile.getOnDiskSizeWithHeader();
428             assertEquals((int) encodedSizes.get(blockId),
429               blockFromHFile.getUncompressedSizeWithoutHeader());
430             assertEquals(meta.isCompressedOrEncrypted(), !blockFromHFile.isUnpacked());
431             long packedHeapsize = blockFromHFile.heapSize();
432             blockUnpacked = blockFromHFile.unpack(meta, hbr);
433             assertTrue(blockUnpacked.isUnpacked());
434             if (meta.isCompressedOrEncrypted()) {
435               LOG.info("packedHeapsize=" + packedHeapsize + ", unpackedHeadsize=" + blockUnpacked
436                 .heapSize());
437               assertFalse(packedHeapsize == blockUnpacked.heapSize());
438               assertTrue("Packed heapSize should be < unpacked heapSize",
439                 packedHeapsize < blockUnpacked.heapSize());
440             }
441             ByteBuffer actualBuffer = blockUnpacked.getBufferWithoutHeader();
442             if (encoding != DataBlockEncoding.NONE) {
443               // We expect a two-byte big-endian encoding id.
444               assertEquals(
445                 "Unexpected first byte with " + buildMessageDetails(algo, encoding, pread),
446                 Long.toHexString(0), Long.toHexString(actualBuffer.get(0)));
447               assertEquals(
448                 "Unexpected second byte with " + buildMessageDetails(algo, encoding, pread),
449                 Long.toHexString(encoding.getId()), Long.toHexString(actualBuffer.get(1)));
450               actualBuffer.position(2);
451               actualBuffer = actualBuffer.slice();
452             }
453 
454             ByteBuffer expectedBuffer = encodedBlocks.get(blockId);
455             expectedBuffer.rewind();
456 
457             // test if content matches, produce nice message
458             assertBuffersEqual(expectedBuffer, actualBuffer, algo, encoding, pread);
459 
460             // test serialized blocks
461             for (boolean reuseBuffer : new boolean[] { false, true }) {
462               ByteBuffer serialized = ByteBuffer.allocate(blockFromHFile.getSerializedLength());
463               blockFromHFile.serialize(serialized);
464               HFileBlock deserialized =
465                 (HFileBlock) blockFromHFile.getDeserializer().deserialize(serialized, reuseBuffer);
466               assertEquals(
467                 "Serialization did not preserve block state. reuseBuffer=" + reuseBuffer,
468                 blockFromHFile, deserialized);
469               // intentional reference comparison
470               if (blockFromHFile != blockUnpacked) {
471                 assertEquals("Deserializaed block cannot be unpacked correctly.",
472                   blockUnpacked, deserialized.unpack(meta, hbr));
473               }
474             }
475           }
476           is.close();
477         }
478       }
479     }
480   }
481 
482   static String buildMessageDetails(Algorithm compression, DataBlockEncoding encoding,
483       boolean pread) {
484     return String.format("compression %s, encoding %s, pread %s", compression, encoding, pread);
485   }
486 
487   static void assertBuffersEqual(ByteBuffer expectedBuffer,
488       ByteBuffer actualBuffer, Compression.Algorithm compression,
489       DataBlockEncoding encoding, boolean pread) {
490     if (!actualBuffer.equals(expectedBuffer)) {
491       int prefix = 0;
492       int minLimit = Math.min(expectedBuffer.limit(), actualBuffer.limit());
493       while (prefix < minLimit &&
494           expectedBuffer.get(prefix) == actualBuffer.get(prefix)) {
495         prefix++;
496       }
497 
498       fail(String.format(
499           "Content mismatch for %s, commonPrefix %d, expected %s, got %s",
500           buildMessageDetails(compression, encoding, pread), prefix,
501           nextBytesToStr(expectedBuffer, prefix),
502           nextBytesToStr(actualBuffer, prefix)));
503     }
504   }
505 
506   /**
507    * Convert a few next bytes in the given buffer at the given position to
508    * string. Used for error messages.
509    */
510   private static String nextBytesToStr(ByteBuffer buf, int pos) {
511     int maxBytes = buf.limit() - pos;
512     int numBytes = Math.min(16, maxBytes);
513     return Bytes.toStringBinary(buf.array(), buf.arrayOffset() + pos,
514         numBytes) + (numBytes < maxBytes ? "..." : "");
515   }
516 
517   @Test
518   public void testPreviousOffset() throws IOException {
519     testPreviousOffsetInternals();
520   }
521 
522   protected void testPreviousOffsetInternals() throws IOException {
523     // TODO: parameterize these nested loops.
524     for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
525       for (boolean pread : BOOLEAN_VALUES) {
526         for (boolean cacheOnWrite : BOOLEAN_VALUES) {
527           Random rand = defaultRandom();
528           LOG.info("testPreviousOffset:Compression algorithm: " + algo +
529                    ", pread=" + pread +
530                    ", cacheOnWrite=" + cacheOnWrite);
531           Path path = new Path(TEST_UTIL.getDataTestDir(), "prev_offset");
532           List<Long> expectedOffsets = new ArrayList<Long>();
533           List<Long> expectedPrevOffsets = new ArrayList<Long>();
534           List<BlockType> expectedTypes = new ArrayList<BlockType>();
535           List<ByteBuffer> expectedContents = cacheOnWrite
536               ? new ArrayList<ByteBuffer>() : null;
537           long totalSize = writeBlocks(rand, algo, path, expectedOffsets,
538               expectedPrevOffsets, expectedTypes, expectedContents);
539 
540           FSDataInputStream is = fs.open(path);
541           HFileContext meta = new HFileContextBuilder()
542                               .withHBaseCheckSum(true)
543                               .withIncludesMvcc(includesMemstoreTS)
544                               .withIncludesTags(includesTag)
545                               .withCompression(algo).build();
546           HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
547           long curOffset = 0;
548           for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
549             if (!pread) {
550               assertEquals(is.getPos(), curOffset + (i == 0 ? 0 :
551                   HConstants.HFILEBLOCK_HEADER_SIZE));
552             }
553 
554             assertEquals(expectedOffsets.get(i).longValue(), curOffset);
555             if (detailedLogging) {
556               LOG.info("Reading block #" + i + " at offset " + curOffset);
557             }
558             HFileBlock b = hbr.readBlockData(curOffset, -1, -1, pread, false);
559             if (detailedLogging) {
560               LOG.info("Block #" + i + ": " + b);
561             }
562             assertEquals("Invalid block #" + i + "'s type:",
563                 expectedTypes.get(i), b.getBlockType());
564             assertEquals("Invalid previous block offset for block " + i
565                 + " of " + "type " + b.getBlockType() + ":",
566                 (long) expectedPrevOffsets.get(i), b.getPrevBlockOffset());
567             b.sanityCheck();
568             assertEquals(curOffset, b.getOffset());
569 
570             // Now re-load this block knowing the on-disk size. This tests a
571             // different branch in the loader.
572             HFileBlock b2 = hbr.readBlockData(curOffset,
573                 b.getOnDiskSizeWithHeader(), -1, pread, false);
574             b2.sanityCheck();
575 
576             assertEquals(b.getBlockType(), b2.getBlockType());
577             assertEquals(b.getOnDiskSizeWithoutHeader(),
578                 b2.getOnDiskSizeWithoutHeader());
579             assertEquals(b.getOnDiskSizeWithHeader(),
580                 b2.getOnDiskSizeWithHeader());
581             assertEquals(b.getUncompressedSizeWithoutHeader(),
582                 b2.getUncompressedSizeWithoutHeader());
583             assertEquals(b.getPrevBlockOffset(), b2.getPrevBlockOffset());
584             assertEquals(curOffset, b2.getOffset());
585             assertEquals(b.getBytesPerChecksum(), b2.getBytesPerChecksum());
586             assertEquals(b.getOnDiskDataSizeWithHeader(),
587                          b2.getOnDiskDataSizeWithHeader());
588             assertEquals(0, HFile.getAndResetChecksumFailuresCount());
589 
590             curOffset += b.getOnDiskSizeWithHeader();
591 
592             if (cacheOnWrite) {
593               // NOTE: cache-on-write testing doesn't actually involve a BlockCache. It simply
594               // verifies that the unpacked value read back off disk matches the unpacked value
595               // generated before writing to disk.
596               b = b.unpack(meta, hbr);
597               // b's buffer has header + data + checksum while
598               // expectedContents have header + data only
599               ByteBuffer bufRead = b.getBufferWithHeader();
600               ByteBuffer bufExpected = expectedContents.get(i);
601               boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(),
602                   bufRead.arrayOffset(),
603                   bufRead.limit() - b.totalChecksumBytes(),
604                   bufExpected.array(), bufExpected.arrayOffset(),
605                   bufExpected.limit()) == 0;
606               String wrongBytesMsg = "";
607 
608               if (!bytesAreCorrect) {
609                 // Optimization: only construct an error message in case we
610                 // will need it.
611                 wrongBytesMsg = "Expected bytes in block #" + i + " (algo="
612                     + algo + ", pread=" + pread
613                     + ", cacheOnWrite=" + cacheOnWrite + "):\n";
614                 wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(),
615                   bufExpected.arrayOffset(), Math.min(32 + 10, bufExpected.limit()))
616                     + ", actual:\n"
617                     + Bytes.toStringBinary(bufRead.array(),
618                   bufRead.arrayOffset(), Math.min(32 + 10, bufRead.limit()));
619                 if (detailedLogging) {
620                   LOG.warn("expected header" +
621                            HFileBlock.toStringHeader(bufExpected) +
622                            "\nfound    header" +
623                            HFileBlock.toStringHeader(bufRead));
624                   LOG.warn("bufread offset " + bufRead.arrayOffset() +
625                            " limit " + bufRead.limit() +
626                            " expected offset " + bufExpected.arrayOffset() +
627                            " limit " + bufExpected.limit());
628                   LOG.warn(wrongBytesMsg);
629                 }
630               }
631               assertTrue(wrongBytesMsg, bytesAreCorrect);
632             }
633           }
634 
635           assertEquals(curOffset, fs.getFileStatus(path).getLen());
636           is.close();
637         }
638       }
639     }
640   }
641 
642   private Random defaultRandom() {
643     return new Random(189237);
644   }
645 
646   private class BlockReaderThread implements Callable<Boolean> {
647     private final String clientId;
648     private final HFileBlock.FSReader hbr;
649     private final List<Long> offsets;
650     private final List<BlockType> types;
651     private final long fileSize;
652 
653     public BlockReaderThread(String clientId,
654         HFileBlock.FSReader hbr, List<Long> offsets, List<BlockType> types,
655         long fileSize) {
656       this.clientId = clientId;
657       this.offsets = offsets;
658       this.hbr = hbr;
659       this.types = types;
660       this.fileSize = fileSize;
661     }
662 
663     @Override
664     public Boolean call() throws Exception {
665       Random rand = new Random(clientId.hashCode());
666       long endTime = System.currentTimeMillis() + 10000;
667       int numBlocksRead = 0;
668       int numPositionalRead = 0;
669       int numWithOnDiskSize = 0;
670       while (System.currentTimeMillis() < endTime) {
671         int blockId = rand.nextInt(NUM_TEST_BLOCKS);
672         long offset = offsets.get(blockId);
673         boolean pread = rand.nextBoolean();
674         boolean withOnDiskSize = rand.nextBoolean();
675         long expectedSize =
676           (blockId == NUM_TEST_BLOCKS - 1 ? fileSize
677               : offsets.get(blockId + 1)) - offset;
678 
679         HFileBlock b;
680         try {
681           long onDiskSizeArg = withOnDiskSize ? expectedSize : -1;
682           b = hbr.readBlockData(offset, onDiskSizeArg, -1, pread, false);
683         } catch (IOException ex) {
684           LOG.error("Error in client " + clientId + " trying to read block at "
685               + offset + ", pread=" + pread + ", withOnDiskSize=" +
686               withOnDiskSize, ex);
687           return false;
688         }
689 
690         assertEquals(types.get(blockId), b.getBlockType());
691         assertEquals(expectedSize, b.getOnDiskSizeWithHeader());
692         assertEquals(offset, b.getOffset());
693 
694         ++numBlocksRead;
695         if (pread)
696           ++numPositionalRead;
697         if (withOnDiskSize)
698           ++numWithOnDiskSize;
699       }
700       LOG.info("Client " + clientId + " successfully read " + numBlocksRead +
701         " blocks (with pread: " + numPositionalRead + ", with onDiskSize " +
702         "specified: " + numWithOnDiskSize + ")");
703 
704       return true;
705     }
706 
707   }
708 
709   @Test
710   public void testConcurrentReading() throws Exception {
711 	// In the local file system, Hadoop has a bug where seeks do not go to the correct location
712 	// if checksum verification is disabled (see HBASE-5885).  However, HBASE-11218 disables
713 	// checksum verification for local file system, which caused this test to fail randomly
714 	// in Windows operating system.
715 	// For now, disable this test when running under WINDOWS OS.  It would be re-enabled when
716 	// HADOOP (eg. HADOOP-7844) fix the issue or a good workaround is identified.
717     Assume.assumeTrue(!WINDOWS);
718 
719     testConcurrentReadingInternals();
720   }
721 
722   protected void testConcurrentReadingInternals() throws IOException,
723       InterruptedException, ExecutionException {
724     for (Compression.Algorithm compressAlgo : COMPRESSION_ALGORITHMS) {
725       Path path =
726           new Path(TEST_UTIL.getDataTestDir(), "concurrent_reading");
727       Random rand = defaultRandom();
728       List<Long> offsets = new ArrayList<Long>();
729       List<BlockType> types = new ArrayList<BlockType>();
730       writeBlocks(rand, compressAlgo, path, offsets, null, types, null);
731       FSDataInputStream is = fs.open(path);
732       long fileSize = fs.getFileStatus(path).getLen();
733       HFileContext meta = new HFileContextBuilder()
734                           .withHBaseCheckSum(true)
735                           .withIncludesMvcc(includesMemstoreTS)
736                           .withIncludesTags(includesTag)
737                           .withCompression(compressAlgo)
738                           .build();
739       HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, fileSize, meta);
740 
741       Executor exec = Executors.newFixedThreadPool(NUM_READER_THREADS);
742       ExecutorCompletionService<Boolean> ecs =
743           new ExecutorCompletionService<Boolean>(exec);
744 
745       for (int i = 0; i < NUM_READER_THREADS; ++i) {
746         ecs.submit(new BlockReaderThread("reader_" + (char) ('A' + i), hbr,
747             offsets, types, fileSize));
748       }
749 
750       for (int i = 0; i < NUM_READER_THREADS; ++i) {
751         Future<Boolean> result = ecs.take();
752         assertTrue(result.get());
753         if (detailedLogging) {
754           LOG.info(String.valueOf(i + 1)
755             + " reader threads finished successfully (algo=" + compressAlgo
756             + ")");
757         }
758       }
759 
760       is.close();
761     }
762   }
763 
764   private long writeBlocks(Random rand, Compression.Algorithm compressAlgo,
765       Path path, List<Long> expectedOffsets, List<Long> expectedPrevOffsets,
766       List<BlockType> expectedTypes, List<ByteBuffer> expectedContents
767   ) throws IOException {
768     boolean cacheOnWrite = expectedContents != null;
769     FSDataOutputStream os = fs.create(path);
770     HFileContext meta = new HFileContextBuilder()
771                         .withHBaseCheckSum(true)
772                         .withIncludesMvcc(includesMemstoreTS)
773                         .withIncludesTags(includesTag)
774                         .withCompression(compressAlgo)
775                         .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
776                         .build();
777     HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
778     Map<BlockType, Long> prevOffsetByType = new HashMap<BlockType, Long>();
779     long totalSize = 0;
780     for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
781       long pos = os.getPos();
782       int blockTypeOrdinal = rand.nextInt(BlockType.values().length);
783       if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) {
784         blockTypeOrdinal = BlockType.DATA.ordinal();
785       }
786       BlockType bt = BlockType.values()[blockTypeOrdinal];
787       DataOutputStream dos = hbw.startWriting(bt);
788       int size = rand.nextInt(500);
789       for (int j = 0; j < size; ++j) {
790         // This might compress well.
791         dos.writeShort(i + 1);
792         dos.writeInt(j + 1);
793       }
794 
795       if (expectedOffsets != null)
796         expectedOffsets.add(os.getPos());
797 
798       if (expectedPrevOffsets != null) {
799         Long prevOffset = prevOffsetByType.get(bt);
800         expectedPrevOffsets.add(prevOffset != null ? prevOffset : -1);
801         prevOffsetByType.put(bt, os.getPos());
802       }
803 
804       expectedTypes.add(bt);
805 
806       hbw.writeHeaderAndData(os);
807       totalSize += hbw.getOnDiskSizeWithHeader();
808 
809       if (cacheOnWrite)
810         expectedContents.add(hbw.getUncompressedBufferWithHeader());
811 
812       if (detailedLogging) {
813         LOG.info("Written block #" + i + " of type " + bt
814             + ", uncompressed size " + hbw.getUncompressedSizeWithoutHeader()
815             + ", packed size " + hbw.getOnDiskSizeWithoutHeader()
816             + " at offset " + pos);
817       }
818     }
819     os.close();
820     LOG.info("Created a temporary file at " + path + ", "
821         + fs.getFileStatus(path).getLen() + " byte, compression=" +
822         compressAlgo);
823     return totalSize;
824   }
825 
826   @Test
827   public void testBlockHeapSize() {
828     testBlockHeapSizeInternals();
829   }
830 
831   protected void testBlockHeapSizeInternals() {
832     if (ClassSize.is32BitJVM()) {
833       assertTrue(HFileBlock.BYTE_BUFFER_HEAP_SIZE == 64);
834     } else {
835       assertTrue(HFileBlock.BYTE_BUFFER_HEAP_SIZE == 80);
836     }
837 
838     for (int size : new int[] { 100, 256, 12345 }) {
839       byte[] byteArr = new byte[HConstants.HFILEBLOCK_HEADER_SIZE + size];
840       ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
841       HFileContext meta = new HFileContextBuilder()
842                           .withIncludesMvcc(includesMemstoreTS)
843                           .withIncludesTags(includesTag)
844                           .withHBaseCheckSum(false)
845                           .withCompression(Algorithm.NONE)
846                           .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
847                           .withChecksumType(ChecksumType.NULL).build();
848       HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf,
849           HFileBlock.FILL_HEADER, -1, 
850           0, meta);
851       long byteBufferExpectedSize =
852           ClassSize.align(ClassSize.estimateBase(buf.getClass(), true)
853               + HConstants.HFILEBLOCK_HEADER_SIZE + size);
854       long hfileMetaSize =  ClassSize.align(ClassSize.estimateBase(HFileContext.class, true));
855       long hfileBlockExpectedSize =
856           ClassSize.align(ClassSize.estimateBase(HFileBlock.class, true));
857       long expected = hfileBlockExpectedSize + byteBufferExpectedSize + hfileMetaSize;
858       assertEquals("Block data size: " + size + ", byte buffer expected " +
859           "size: " + byteBufferExpectedSize + ", HFileBlock class expected " +
860           "size: " + hfileBlockExpectedSize + ";", expected,
861           block.heapSize());
862     }
863   }
864 }