View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.io.hfile;
21  
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertFalse;
24  import static org.junit.Assert.assertNotEquals;
25  import static org.junit.Assert.assertTrue;
26  
27  import java.io.IOException;
28  import java.util.ArrayList;
29  import java.util.Collection;
30  import java.util.EnumMap;
31  import java.util.List;
32  import java.util.Random;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.HBaseTestingUtility;
40  import org.apache.hadoop.hbase.HColumnDescriptor;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.KeyValue;
43  import org.apache.hadoop.hbase.testclassification.MediumTests;
44  import org.apache.hadoop.hbase.Tag;
45  import org.apache.hadoop.hbase.client.Durability;
46  import org.apache.hadoop.hbase.client.Put;
47  import org.apache.hadoop.hbase.fs.HFileSystem;
48  import org.apache.hadoop.hbase.io.compress.Compression;
49  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
50  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
51  import org.apache.hadoop.hbase.regionserver.BloomType;
52  import org.apache.hadoop.hbase.regionserver.HRegion;
53  import org.apache.hadoop.hbase.regionserver.Region;
54  import org.apache.hadoop.hbase.regionserver.StoreFile;
55  import org.apache.hadoop.hbase.util.BloomFilterFactory;
56  import org.apache.hadoop.hbase.util.Bytes;
57  import org.apache.hadoop.hbase.util.ChecksumType;
58  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
59  import org.junit.After;
60  import org.junit.AfterClass;
61  import org.junit.Before;
62  import org.junit.Test;
63  import org.junit.experimental.categories.Category;
64  import org.junit.runner.RunWith;
65  import org.junit.runners.Parameterized;
66  import org.junit.runners.Parameterized.Parameters;
67  
68  import com.google.common.collect.Lists;
69  
70  /**
71   * Tests {@link HFile} cache-on-write functionality for the following block
72   * types: data blocks, non-root index blocks, and Bloom filter blocks.
73   */
74  @RunWith(Parameterized.class)
75  @Category(MediumTests.class)
76  public class TestCacheOnWrite {
77  
78    private static final Log LOG = LogFactory.getLog(TestCacheOnWrite.class);
79  
80    private static final HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
81    private Configuration conf;
82    private CacheConfig cacheConf;
83    private FileSystem fs;
84    private Random rand = new Random(12983177L);
85    private Path storeFilePath;
86    private BlockCache blockCache;
87    private String testDescription;
88  
89    private final CacheOnWriteType cowType;
90    private final Compression.Algorithm compress;
91    private final BlockEncoderTestType encoderType;
92    private final HFileDataBlockEncoder encoder;
93    private final boolean cacheCompressedData;
94  
95    private static final int DATA_BLOCK_SIZE = 2048;
96    private static final int NUM_KV = 25000;
97    private static final int INDEX_BLOCK_SIZE = 512;
98    private static final int BLOOM_BLOCK_SIZE = 4096;
99    private static final BloomType BLOOM_TYPE = BloomType.ROWCOL;
100   private static final int CKBYTES = 512;
101 
102   /** The number of valid key types possible in a store file */
103   private static final int NUM_VALID_KEY_TYPES =
104       KeyValue.Type.values().length - 2;
105 
106   private static enum CacheOnWriteType {
107     DATA_BLOCKS(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY,
108         BlockType.DATA, BlockType.ENCODED_DATA),
109     BLOOM_BLOCKS(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
110         BlockType.BLOOM_CHUNK),
111     INDEX_BLOCKS(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
112         BlockType.LEAF_INDEX, BlockType.INTERMEDIATE_INDEX);
113 
114     private final String confKey;
115     private final BlockType blockType1;
116     private final BlockType blockType2;
117 
118     private CacheOnWriteType(String confKey, BlockType blockType) {
119       this(confKey, blockType, blockType);
120     }
121 
122     private CacheOnWriteType(String confKey, BlockType blockType1,
123         BlockType blockType2) {
124       this.blockType1 = blockType1;
125       this.blockType2 = blockType2;
126       this.confKey = confKey;
127     }
128 
129     public boolean shouldBeCached(BlockType blockType) {
130       return blockType == blockType1 || blockType == blockType2;
131     }
132 
133     public void modifyConf(Configuration conf) {
134       for (CacheOnWriteType cowType : CacheOnWriteType.values()) {
135         conf.setBoolean(cowType.confKey, cowType == this);
136       }
137     }
138 
139   }
140 
141   private static final DataBlockEncoding ENCODING_ALGO =
142       DataBlockEncoding.PREFIX;
143 
144   /** Provides fancy names for three combinations of two booleans */
145   private static enum BlockEncoderTestType {
146     NO_BLOCK_ENCODING_NOOP(true, false),
147     NO_BLOCK_ENCODING(false, false),
148     BLOCK_ENCODING_EVERYWHERE(false, true);
149 
150     private final boolean noop;
151     private final boolean encode;
152 
153     BlockEncoderTestType(boolean noop, boolean encode) {
154       this.encode = encode;
155       this.noop = noop;
156     }
157 
158     public HFileDataBlockEncoder getEncoder() {
159       return noop ? NoOpDataBlockEncoder.INSTANCE : new HFileDataBlockEncoderImpl(
160         encode ? ENCODING_ALGO : DataBlockEncoding.NONE);
161     }
162   }
163 
164   public TestCacheOnWrite(CacheOnWriteType cowType, Compression.Algorithm compress,
165       BlockEncoderTestType encoderType, boolean cacheCompressedData, BlockCache blockCache) {
166     this.cowType = cowType;
167     this.compress = compress;
168     this.encoderType = encoderType;
169     this.encoder = encoderType.getEncoder();
170     this.cacheCompressedData = cacheCompressedData;
171     this.blockCache = blockCache;
172     testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress +
173         ", encoderType=" + encoderType + ", cacheCompressedData=" + cacheCompressedData + "]";
174     LOG.info(testDescription);
175   }
176 
177   private static List<BlockCache> getBlockCaches() throws IOException {
178     Configuration conf = TEST_UTIL.getConfiguration();
179     List<BlockCache> blockcaches = new ArrayList<BlockCache>();
180     // default
181     blockcaches.add(new CacheConfig(conf).getBlockCache());
182 
183     // memory
184     BlockCache lru = new LruBlockCache(128 * 1024 * 1024, 64 * 1024, TEST_UTIL.getConfiguration());
185     blockcaches.add(lru);
186 
187     // bucket cache
188     FileSystem.get(conf).mkdirs(TEST_UTIL.getDataTestDir());
189     int[] bucketSizes =
190         { INDEX_BLOCK_SIZE, DATA_BLOCK_SIZE, BLOOM_BLOCK_SIZE, 64 * 1024, 128 * 1024 };
191     BlockCache bucketcache =
192         new BucketCache("offheap", 128 * 1024 * 1024, 64 * 1024, bucketSizes, 5, 64 * 100, null);
193     blockcaches.add(bucketcache);
194     return blockcaches;
195   }
196 
197   @Parameters
198   public static Collection<Object[]> getParameters() throws IOException {
199     List<Object[]> cowTypes = new ArrayList<Object[]>();
200     for (BlockCache blockache : getBlockCaches()) {
201       for (CacheOnWriteType cowType : CacheOnWriteType.values()) {
202         for (Compression.Algorithm compress : HBaseTestingUtility.COMPRESSION_ALGORITHMS) {
203           for (BlockEncoderTestType encoderType : BlockEncoderTestType.values()) {
204             for (boolean cacheCompressedData : new boolean[] { false, true }) {
205               cowTypes.add(new Object[] { cowType, compress, encoderType, cacheCompressedData,
206                   blockache });
207             }
208           }
209         }
210       }
211     }
212     return cowTypes;
213   }
214 
215   private void clearBlockCache(BlockCache blockCache) throws InterruptedException {
216     if (blockCache instanceof LruBlockCache) {
217       ((LruBlockCache) blockCache).clearCache();
218     } else {
219       // BucketCache may not return all cached blocks(blocks in write queue), so check it here.
220       for (int clearCount = 0; blockCache.getBlockCount() > 0; clearCount++) {
221         if (clearCount > 0) {
222           LOG.warn("clear block cache " + blockCache + " " + clearCount + " times, "
223               + blockCache.getBlockCount() + " blocks remaining");
224           Thread.sleep(10);
225         }
226         for (CachedBlock block : Lists.newArrayList(blockCache)) {
227           BlockCacheKey key = new BlockCacheKey(block.getFilename(), block.getOffset());
228           // CombinedBucketCache may need evict two times.
229           for (int evictCount = 0; blockCache.evictBlock(key); evictCount++) {
230             if (evictCount > 1) {
231               LOG.warn("evict block " + block + " in " + blockCache + " " + evictCount
232                   + " times, maybe a bug here");
233             }
234           }
235         }
236       }
237     }
238   }
239 
240   @Before
241   public void setUp() throws IOException {
242     conf = TEST_UTIL.getConfiguration();
243     this.conf.set("dfs.datanode.data.dir.perm", "700");
244     conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
245     conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE);
246     conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE,
247         BLOOM_BLOCK_SIZE);
248     conf.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, cacheCompressedData);
249     cowType.modifyConf(conf);
250     fs = HFileSystem.get(conf);
251     CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = blockCache;
252     cacheConf =
253         new CacheConfig(blockCache, true, true, cowType.shouldBeCached(BlockType.DATA),
254         cowType.shouldBeCached(BlockType.LEAF_INDEX),
255         cowType.shouldBeCached(BlockType.BLOOM_CHUNK), false, cacheCompressedData, false, false);
256   }
257 
258   @After
259   public void tearDown() throws IOException, InterruptedException {
260     clearBlockCache(blockCache);
261   }
262 
263   @AfterClass
264   public static void afterClass() throws IOException {
265     TEST_UTIL.cleanupTestDir();
266   }
267 
268   private void testStoreFileCacheOnWriteInternals(boolean useTags) throws IOException {
269     writeStoreFile(useTags);
270     readStoreFile(useTags);
271   }
272 
273   private void readStoreFile(boolean useTags) throws IOException {
274     AbstractHFileReader reader;
275     if (useTags) {
276         reader = (HFileReaderV3) HFile.createReader(fs, storeFilePath, cacheConf, conf);
277     } else {
278         reader = (HFileReaderV2) HFile.createReader(fs, storeFilePath, cacheConf, conf);
279     }
280     LOG.info("HFile information: " + reader);
281     HFileContext meta = new HFileContextBuilder().withCompression(compress)
282       .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL)
283       .withBlockSize(DATA_BLOCK_SIZE).withDataBlockEncoding(encoder.getDataBlockEncoding())
284       .withIncludesTags(useTags).build();
285     final boolean cacheBlocks = false;
286     final boolean pread = false;
287     HFileScanner scanner = reader.getScanner(cacheBlocks, pread);
288     assertTrue(testDescription, scanner.seekTo());
289 
290     long offset = 0;
291     HFileBlock prevBlock = null;
292     EnumMap<BlockType, Integer> blockCountByType =
293         new EnumMap<BlockType, Integer>(BlockType.class);
294 
295     DataBlockEncoding encodingInCache =
296         encoderType.getEncoder().getDataBlockEncoding();
297     while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
298       long onDiskSize = -1;
299       if (prevBlock != null) {
300          onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader();
301       }
302       // Flags: don't cache the block, use pread, this is not a compaction.
303       // Also, pass null for expected block type to avoid checking it.
304       HFileBlock block = reader.readBlock(offset, onDiskSize, false, true,
305         false, true, null, encodingInCache);
306       BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(),
307           offset);
308       HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true);
309       boolean isCached = fromCache != null;
310       boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType());
311       assertTrue("shouldBeCached: " + shouldBeCached+ "\n" +
312           "isCached: " + isCached + "\n" +
313           "Test description: " + testDescription + "\n" +
314           "block: " + block + "\n" +
315           "encodingInCache: " + encodingInCache + "\n" +
316           "blockCacheKey: " + blockCacheKey,
317         shouldBeCached == isCached);
318       if (isCached) {
319         if (cacheConf.shouldCacheCompressed(fromCache.getBlockType().getCategory())) {
320           if (compress != Compression.Algorithm.NONE) {
321             assertFalse(fromCache.isUnpacked());
322           }
323           fromCache = fromCache.unpack(meta, reader.getUncachedBlockReader());
324         } else {
325           assertTrue(fromCache.isUnpacked());
326         }
327         // block we cached at write-time and block read from file should be identical
328         assertEquals(block.getChecksumType(), fromCache.getChecksumType());
329         assertEquals(block.getBlockType(), fromCache.getBlockType());
330         if (block.getBlockType() == BlockType.ENCODED_DATA) {
331           assertEquals(block.getDataBlockEncodingId(), fromCache.getDataBlockEncodingId());
332           assertEquals(block.getDataBlockEncoding(), fromCache.getDataBlockEncoding());
333         }
334         assertEquals(block.getOnDiskSizeWithHeader(), fromCache.getOnDiskSizeWithHeader());
335         assertEquals(block.getOnDiskSizeWithoutHeader(), fromCache.getOnDiskSizeWithoutHeader());
336         assertEquals(
337           block.getUncompressedSizeWithoutHeader(), fromCache.getUncompressedSizeWithoutHeader());
338       }
339       prevBlock = block;
340       offset += block.getOnDiskSizeWithHeader();
341       BlockType bt = block.getBlockType();
342       Integer count = blockCountByType.get(bt);
343       blockCountByType.put(bt, (count == null ? 0 : count) + 1);
344     }
345 
346     LOG.info("Block count by type: " + blockCountByType);
347     String countByType = blockCountByType.toString();
348     BlockType cachedDataBlockType =
349         encoderType.encode ? BlockType.ENCODED_DATA : BlockType.DATA;
350     if (useTags) {
351       assertEquals("{" + cachedDataBlockType
352           + "=2663, LEAF_INDEX=297, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=32}", countByType);
353     } else {
354       assertEquals("{" + cachedDataBlockType
355           + "=2498, LEAF_INDEX=278, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=31}", countByType);
356     }
357 
358     // iterate all the keyvalue from hfile
359     while (scanner.next()) {
360       scanner.getKeyValue();
361     }
362     reader.close();
363   }
364 
365   public static KeyValue.Type generateKeyType(Random rand) {
366     if (rand.nextBoolean()) {
367       // Let's make half of KVs puts.
368       return KeyValue.Type.Put;
369     } else {
370       KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
371       if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
372         throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
373             + "Probably the layout of KeyValue.Type has changed.");
374       }
375       return keyType;
376     }
377   }
378 
379   private void writeStoreFile(boolean useTags) throws IOException {
380     if(useTags) {
381       TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
382     } else {
383       TEST_UTIL.getConfiguration().setInt("hfile.format.version", 2);
384     }
385     Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(),
386         "test_cache_on_write");
387     HFileContext meta = new HFileContextBuilder().withCompression(compress)
388         .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL)
389         .withBlockSize(DATA_BLOCK_SIZE).withDataBlockEncoding(encoder.getDataBlockEncoding())
390         .withIncludesTags(useTags).build();
391     StoreFile.Writer sfw = new StoreFile.WriterBuilder(conf, cacheConf, fs)
392         .withOutputDir(storeFileParentDir).withComparator(KeyValue.COMPARATOR)
393         .withFileContext(meta)
394         .withBloomType(BLOOM_TYPE).withMaxKeyCount(NUM_KV).build();
395     byte[] cf = Bytes.toBytes("fam");
396     for (int i = 0; i < NUM_KV; ++i) {
397       byte[] row = TestHFileWriterV2.randomOrderedKey(rand, i);
398       byte[] qualifier = TestHFileWriterV2.randomRowOrQualifier(rand);
399       byte[] value = TestHFileWriterV2.randomValue(rand);
400       KeyValue kv;
401       if(useTags) {
402         Tag t = new Tag((byte) 1, "visibility");
403         List<Tag> tagList = new ArrayList<Tag>();
404         tagList.add(t);
405         Tag[] tags = new Tag[1];
406         tags[0] = t;
407         kv =
408             new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length,
409                 rand.nextLong(), generateKeyType(rand), value, 0, value.length, tagList);
410       } else {
411         kv =
412             new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length,
413                 rand.nextLong(), generateKeyType(rand), value, 0, value.length);
414       }
415       sfw.append(kv);
416     }
417 
418     sfw.close();
419     storeFilePath = sfw.getPath();
420   }
421 
422   private void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags)
423       throws IOException, InterruptedException {
424     if (useTags) {
425       TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
426     } else {
427       TEST_UTIL.getConfiguration().setInt("hfile.format.version", 2);
428     }
429     // TODO: need to change this test if we add a cache size threshold for
430     // compactions, or if we implement some other kind of intelligent logic for
431     // deciding what blocks to cache-on-write on compaction.
432     final String table = "CompactionCacheOnWrite";
433     final String cf = "myCF";
434     final byte[] cfBytes = Bytes.toBytes(cf);
435     final int maxVersions = 3;
436     Region region = TEST_UTIL.createTestRegion(table, 
437         new HColumnDescriptor(cf)
438             .setCompressionType(compress)
439             .setBloomFilterType(BLOOM_TYPE)
440             .setMaxVersions(maxVersions)
441             .setDataBlockEncoding(encoder.getDataBlockEncoding())
442     );
443     int rowIdx = 0;
444     long ts = EnvironmentEdgeManager.currentTime();
445     for (int iFile = 0; iFile < 5; ++iFile) {
446       for (int iRow = 0; iRow < 500; ++iRow) {
447         String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" + 
448             iRow;
449         Put p = new Put(Bytes.toBytes(rowStr));
450         ++rowIdx;
451         for (int iCol = 0; iCol < 10; ++iCol) {
452           String qualStr = "col" + iCol;
453           String valueStr = "value_" + rowStr + "_" + qualStr;
454           for (int iTS = 0; iTS < 5; ++iTS) {
455             if (useTags) {
456               Tag t = new Tag((byte) 1, "visibility");
457               Tag[] tags = new Tag[1];
458               tags[0] = t;
459               KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr),
460                   HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags);
461               p.add(kv);
462             } else {
463               p.addColumn(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr));
464             }
465           }
466         }
467         p.setDurability(Durability.ASYNC_WAL);
468         region.put(p);
469       }
470       region.flush(true);
471     }
472     clearBlockCache(blockCache);
473     assertEquals(0, blockCache.getBlockCount());
474     region.compact(false);
475     LOG.debug("compactStores() returned");
476 
477     for (CachedBlock block: blockCache) {
478       assertNotEquals(BlockType.ENCODED_DATA, block.getBlockType());
479       assertNotEquals(BlockType.DATA, block.getBlockType());
480     }
481     ((HRegion)region).close();
482   }
483 
484   @Test
485   public void testStoreFileCacheOnWrite() throws IOException {
486     testStoreFileCacheOnWriteInternals(false);
487     testStoreFileCacheOnWriteInternals(true);
488   }
489 
490   @Test
491   public void testNotCachingDataBlocksDuringCompaction() throws IOException, InterruptedException {
492     testNotCachingDataBlocksDuringCompactionInternals(false);
493     testNotCachingDataBlocksDuringCompactionInternals(true);
494   }
495 }