1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.io.hfile;
21
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertFalse;
24 import static org.junit.Assert.assertNotEquals;
25 import static org.junit.Assert.assertTrue;
26
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.EnumMap;
31 import java.util.List;
32 import java.util.Random;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FileSystem;
38 import org.apache.hadoop.fs.Path;
39 import org.apache.hadoop.hbase.HBaseTestingUtility;
40 import org.apache.hadoop.hbase.HColumnDescriptor;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.KeyValue;
43 import org.apache.hadoop.hbase.testclassification.MediumTests;
44 import org.apache.hadoop.hbase.Tag;
45 import org.apache.hadoop.hbase.client.Durability;
46 import org.apache.hadoop.hbase.client.Put;
47 import org.apache.hadoop.hbase.fs.HFileSystem;
48 import org.apache.hadoop.hbase.io.compress.Compression;
49 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
50 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
51 import org.apache.hadoop.hbase.regionserver.BloomType;
52 import org.apache.hadoop.hbase.regionserver.HRegion;
53 import org.apache.hadoop.hbase.regionserver.Region;
54 import org.apache.hadoop.hbase.regionserver.StoreFile;
55 import org.apache.hadoop.hbase.util.BloomFilterFactory;
56 import org.apache.hadoop.hbase.util.Bytes;
57 import org.apache.hadoop.hbase.util.ChecksumType;
58 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
59 import org.junit.After;
60 import org.junit.AfterClass;
61 import org.junit.Before;
62 import org.junit.Test;
63 import org.junit.experimental.categories.Category;
64 import org.junit.runner.RunWith;
65 import org.junit.runners.Parameterized;
66 import org.junit.runners.Parameterized.Parameters;
67
68 import com.google.common.collect.Lists;
69
70
71
72
73
74 @RunWith(Parameterized.class)
75 @Category(MediumTests.class)
76 public class TestCacheOnWrite {
77
78 private static final Log LOG = LogFactory.getLog(TestCacheOnWrite.class);
79
80 private static final HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
81 private Configuration conf;
82 private CacheConfig cacheConf;
83 private FileSystem fs;
84 private Random rand = new Random(12983177L);
85 private Path storeFilePath;
86 private BlockCache blockCache;
87 private String testDescription;
88
89 private final CacheOnWriteType cowType;
90 private final Compression.Algorithm compress;
91 private final BlockEncoderTestType encoderType;
92 private final HFileDataBlockEncoder encoder;
93 private final boolean cacheCompressedData;
94
95 private static final int DATA_BLOCK_SIZE = 2048;
96 private static final int NUM_KV = 25000;
97 private static final int INDEX_BLOCK_SIZE = 512;
98 private static final int BLOOM_BLOCK_SIZE = 4096;
99 private static final BloomType BLOOM_TYPE = BloomType.ROWCOL;
100 private static final int CKBYTES = 512;
101
102
103 private static final int NUM_VALID_KEY_TYPES =
104 KeyValue.Type.values().length - 2;
105
106 private static enum CacheOnWriteType {
107 DATA_BLOCKS(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY,
108 BlockType.DATA, BlockType.ENCODED_DATA),
109 BLOOM_BLOCKS(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
110 BlockType.BLOOM_CHUNK),
111 INDEX_BLOCKS(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
112 BlockType.LEAF_INDEX, BlockType.INTERMEDIATE_INDEX);
113
114 private final String confKey;
115 private final BlockType blockType1;
116 private final BlockType blockType2;
117
118 private CacheOnWriteType(String confKey, BlockType blockType) {
119 this(confKey, blockType, blockType);
120 }
121
122 private CacheOnWriteType(String confKey, BlockType blockType1,
123 BlockType blockType2) {
124 this.blockType1 = blockType1;
125 this.blockType2 = blockType2;
126 this.confKey = confKey;
127 }
128
129 public boolean shouldBeCached(BlockType blockType) {
130 return blockType == blockType1 || blockType == blockType2;
131 }
132
133 public void modifyConf(Configuration conf) {
134 for (CacheOnWriteType cowType : CacheOnWriteType.values()) {
135 conf.setBoolean(cowType.confKey, cowType == this);
136 }
137 }
138
139 }
140
141 private static final DataBlockEncoding ENCODING_ALGO =
142 DataBlockEncoding.PREFIX;
143
144
145 private static enum BlockEncoderTestType {
146 NO_BLOCK_ENCODING_NOOP(true, false),
147 NO_BLOCK_ENCODING(false, false),
148 BLOCK_ENCODING_EVERYWHERE(false, true);
149
150 private final boolean noop;
151 private final boolean encode;
152
153 BlockEncoderTestType(boolean noop, boolean encode) {
154 this.encode = encode;
155 this.noop = noop;
156 }
157
158 public HFileDataBlockEncoder getEncoder() {
159 return noop ? NoOpDataBlockEncoder.INSTANCE : new HFileDataBlockEncoderImpl(
160 encode ? ENCODING_ALGO : DataBlockEncoding.NONE);
161 }
162 }
163
164 public TestCacheOnWrite(CacheOnWriteType cowType, Compression.Algorithm compress,
165 BlockEncoderTestType encoderType, boolean cacheCompressedData, BlockCache blockCache) {
166 this.cowType = cowType;
167 this.compress = compress;
168 this.encoderType = encoderType;
169 this.encoder = encoderType.getEncoder();
170 this.cacheCompressedData = cacheCompressedData;
171 this.blockCache = blockCache;
172 testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress +
173 ", encoderType=" + encoderType + ", cacheCompressedData=" + cacheCompressedData + "]";
174 LOG.info(testDescription);
175 }
176
177 private static List<BlockCache> getBlockCaches() throws IOException {
178 Configuration conf = TEST_UTIL.getConfiguration();
179 List<BlockCache> blockcaches = new ArrayList<BlockCache>();
180
181 blockcaches.add(new CacheConfig(conf).getBlockCache());
182
183
184 BlockCache lru = new LruBlockCache(128 * 1024 * 1024, 64 * 1024, TEST_UTIL.getConfiguration());
185 blockcaches.add(lru);
186
187
188 FileSystem.get(conf).mkdirs(TEST_UTIL.getDataTestDir());
189 int[] bucketSizes =
190 { INDEX_BLOCK_SIZE, DATA_BLOCK_SIZE, BLOOM_BLOCK_SIZE, 64 * 1024, 128 * 1024 };
191 BlockCache bucketcache =
192 new BucketCache("offheap", 128 * 1024 * 1024, 64 * 1024, bucketSizes, 5, 64 * 100, null);
193 blockcaches.add(bucketcache);
194 return blockcaches;
195 }
196
197 @Parameters
198 public static Collection<Object[]> getParameters() throws IOException {
199 List<Object[]> cowTypes = new ArrayList<Object[]>();
200 for (BlockCache blockache : getBlockCaches()) {
201 for (CacheOnWriteType cowType : CacheOnWriteType.values()) {
202 for (Compression.Algorithm compress : HBaseTestingUtility.COMPRESSION_ALGORITHMS) {
203 for (BlockEncoderTestType encoderType : BlockEncoderTestType.values()) {
204 for (boolean cacheCompressedData : new boolean[] { false, true }) {
205 cowTypes.add(new Object[] { cowType, compress, encoderType, cacheCompressedData,
206 blockache });
207 }
208 }
209 }
210 }
211 }
212 return cowTypes;
213 }
214
215 private void clearBlockCache(BlockCache blockCache) throws InterruptedException {
216 if (blockCache instanceof LruBlockCache) {
217 ((LruBlockCache) blockCache).clearCache();
218 } else {
219
220 for (int clearCount = 0; blockCache.getBlockCount() > 0; clearCount++) {
221 if (clearCount > 0) {
222 LOG.warn("clear block cache " + blockCache + " " + clearCount + " times, "
223 + blockCache.getBlockCount() + " blocks remaining");
224 Thread.sleep(10);
225 }
226 for (CachedBlock block : Lists.newArrayList(blockCache)) {
227 BlockCacheKey key = new BlockCacheKey(block.getFilename(), block.getOffset());
228
229 for (int evictCount = 0; blockCache.evictBlock(key); evictCount++) {
230 if (evictCount > 1) {
231 LOG.warn("evict block " + block + " in " + blockCache + " " + evictCount
232 + " times, maybe a bug here");
233 }
234 }
235 }
236 }
237 }
238 }
239
240 @Before
241 public void setUp() throws IOException {
242 conf = TEST_UTIL.getConfiguration();
243 this.conf.set("dfs.datanode.data.dir.perm", "700");
244 conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
245 conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE);
246 conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE,
247 BLOOM_BLOCK_SIZE);
248 conf.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, cacheCompressedData);
249 cowType.modifyConf(conf);
250 fs = HFileSystem.get(conf);
251 CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = blockCache;
252 cacheConf =
253 new CacheConfig(blockCache, true, true, cowType.shouldBeCached(BlockType.DATA),
254 cowType.shouldBeCached(BlockType.LEAF_INDEX),
255 cowType.shouldBeCached(BlockType.BLOOM_CHUNK), false, cacheCompressedData, false, false);
256 }
257
258 @After
259 public void tearDown() throws IOException, InterruptedException {
260 clearBlockCache(blockCache);
261 }
262
263 @AfterClass
264 public static void afterClass() throws IOException {
265 TEST_UTIL.cleanupTestDir();
266 }
267
268 private void testStoreFileCacheOnWriteInternals(boolean useTags) throws IOException {
269 writeStoreFile(useTags);
270 readStoreFile(useTags);
271 }
272
273 private void readStoreFile(boolean useTags) throws IOException {
274 AbstractHFileReader reader;
275 if (useTags) {
276 reader = (HFileReaderV3) HFile.createReader(fs, storeFilePath, cacheConf, conf);
277 } else {
278 reader = (HFileReaderV2) HFile.createReader(fs, storeFilePath, cacheConf, conf);
279 }
280 LOG.info("HFile information: " + reader);
281 HFileContext meta = new HFileContextBuilder().withCompression(compress)
282 .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL)
283 .withBlockSize(DATA_BLOCK_SIZE).withDataBlockEncoding(encoder.getDataBlockEncoding())
284 .withIncludesTags(useTags).build();
285 final boolean cacheBlocks = false;
286 final boolean pread = false;
287 HFileScanner scanner = reader.getScanner(cacheBlocks, pread);
288 assertTrue(testDescription, scanner.seekTo());
289
290 long offset = 0;
291 HFileBlock prevBlock = null;
292 EnumMap<BlockType, Integer> blockCountByType =
293 new EnumMap<BlockType, Integer>(BlockType.class);
294
295 DataBlockEncoding encodingInCache =
296 encoderType.getEncoder().getDataBlockEncoding();
297 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
298 long onDiskSize = -1;
299 if (prevBlock != null) {
300 onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader();
301 }
302
303
304 HFileBlock block = reader.readBlock(offset, onDiskSize, false, true,
305 false, true, null, encodingInCache);
306 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(),
307 offset);
308 HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true);
309 boolean isCached = fromCache != null;
310 boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType());
311 assertTrue("shouldBeCached: " + shouldBeCached+ "\n" +
312 "isCached: " + isCached + "\n" +
313 "Test description: " + testDescription + "\n" +
314 "block: " + block + "\n" +
315 "encodingInCache: " + encodingInCache + "\n" +
316 "blockCacheKey: " + blockCacheKey,
317 shouldBeCached == isCached);
318 if (isCached) {
319 if (cacheConf.shouldCacheCompressed(fromCache.getBlockType().getCategory())) {
320 if (compress != Compression.Algorithm.NONE) {
321 assertFalse(fromCache.isUnpacked());
322 }
323 fromCache = fromCache.unpack(meta, reader.getUncachedBlockReader());
324 } else {
325 assertTrue(fromCache.isUnpacked());
326 }
327
328 assertEquals(block.getChecksumType(), fromCache.getChecksumType());
329 assertEquals(block.getBlockType(), fromCache.getBlockType());
330 if (block.getBlockType() == BlockType.ENCODED_DATA) {
331 assertEquals(block.getDataBlockEncodingId(), fromCache.getDataBlockEncodingId());
332 assertEquals(block.getDataBlockEncoding(), fromCache.getDataBlockEncoding());
333 }
334 assertEquals(block.getOnDiskSizeWithHeader(), fromCache.getOnDiskSizeWithHeader());
335 assertEquals(block.getOnDiskSizeWithoutHeader(), fromCache.getOnDiskSizeWithoutHeader());
336 assertEquals(
337 block.getUncompressedSizeWithoutHeader(), fromCache.getUncompressedSizeWithoutHeader());
338 }
339 prevBlock = block;
340 offset += block.getOnDiskSizeWithHeader();
341 BlockType bt = block.getBlockType();
342 Integer count = blockCountByType.get(bt);
343 blockCountByType.put(bt, (count == null ? 0 : count) + 1);
344 }
345
346 LOG.info("Block count by type: " + blockCountByType);
347 String countByType = blockCountByType.toString();
348 BlockType cachedDataBlockType =
349 encoderType.encode ? BlockType.ENCODED_DATA : BlockType.DATA;
350 if (useTags) {
351 assertEquals("{" + cachedDataBlockType
352 + "=2663, LEAF_INDEX=297, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=32}", countByType);
353 } else {
354 assertEquals("{" + cachedDataBlockType
355 + "=2498, LEAF_INDEX=278, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=31}", countByType);
356 }
357
358
359 while (scanner.next()) {
360 scanner.getKeyValue();
361 }
362 reader.close();
363 }
364
365 public static KeyValue.Type generateKeyType(Random rand) {
366 if (rand.nextBoolean()) {
367
368 return KeyValue.Type.Put;
369 } else {
370 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
371 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
372 throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
373 + "Probably the layout of KeyValue.Type has changed.");
374 }
375 return keyType;
376 }
377 }
378
379 private void writeStoreFile(boolean useTags) throws IOException {
380 if(useTags) {
381 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
382 } else {
383 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 2);
384 }
385 Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(),
386 "test_cache_on_write");
387 HFileContext meta = new HFileContextBuilder().withCompression(compress)
388 .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL)
389 .withBlockSize(DATA_BLOCK_SIZE).withDataBlockEncoding(encoder.getDataBlockEncoding())
390 .withIncludesTags(useTags).build();
391 StoreFile.Writer sfw = new StoreFile.WriterBuilder(conf, cacheConf, fs)
392 .withOutputDir(storeFileParentDir).withComparator(KeyValue.COMPARATOR)
393 .withFileContext(meta)
394 .withBloomType(BLOOM_TYPE).withMaxKeyCount(NUM_KV).build();
395 byte[] cf = Bytes.toBytes("fam");
396 for (int i = 0; i < NUM_KV; ++i) {
397 byte[] row = TestHFileWriterV2.randomOrderedKey(rand, i);
398 byte[] qualifier = TestHFileWriterV2.randomRowOrQualifier(rand);
399 byte[] value = TestHFileWriterV2.randomValue(rand);
400 KeyValue kv;
401 if(useTags) {
402 Tag t = new Tag((byte) 1, "visibility");
403 List<Tag> tagList = new ArrayList<Tag>();
404 tagList.add(t);
405 Tag[] tags = new Tag[1];
406 tags[0] = t;
407 kv =
408 new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length,
409 rand.nextLong(), generateKeyType(rand), value, 0, value.length, tagList);
410 } else {
411 kv =
412 new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length,
413 rand.nextLong(), generateKeyType(rand), value, 0, value.length);
414 }
415 sfw.append(kv);
416 }
417
418 sfw.close();
419 storeFilePath = sfw.getPath();
420 }
421
422 private void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags)
423 throws IOException, InterruptedException {
424 if (useTags) {
425 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
426 } else {
427 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 2);
428 }
429
430
431
432 final String table = "CompactionCacheOnWrite";
433 final String cf = "myCF";
434 final byte[] cfBytes = Bytes.toBytes(cf);
435 final int maxVersions = 3;
436 Region region = TEST_UTIL.createTestRegion(table,
437 new HColumnDescriptor(cf)
438 .setCompressionType(compress)
439 .setBloomFilterType(BLOOM_TYPE)
440 .setMaxVersions(maxVersions)
441 .setDataBlockEncoding(encoder.getDataBlockEncoding())
442 );
443 int rowIdx = 0;
444 long ts = EnvironmentEdgeManager.currentTime();
445 for (int iFile = 0; iFile < 5; ++iFile) {
446 for (int iRow = 0; iRow < 500; ++iRow) {
447 String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" +
448 iRow;
449 Put p = new Put(Bytes.toBytes(rowStr));
450 ++rowIdx;
451 for (int iCol = 0; iCol < 10; ++iCol) {
452 String qualStr = "col" + iCol;
453 String valueStr = "value_" + rowStr + "_" + qualStr;
454 for (int iTS = 0; iTS < 5; ++iTS) {
455 if (useTags) {
456 Tag t = new Tag((byte) 1, "visibility");
457 Tag[] tags = new Tag[1];
458 tags[0] = t;
459 KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr),
460 HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags);
461 p.add(kv);
462 } else {
463 p.addColumn(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr));
464 }
465 }
466 }
467 p.setDurability(Durability.ASYNC_WAL);
468 region.put(p);
469 }
470 region.flush(true);
471 }
472 clearBlockCache(blockCache);
473 assertEquals(0, blockCache.getBlockCount());
474 region.compact(false);
475 LOG.debug("compactStores() returned");
476
477 for (CachedBlock block: blockCache) {
478 assertNotEquals(BlockType.ENCODED_DATA, block.getBlockType());
479 assertNotEquals(BlockType.DATA, block.getBlockType());
480 }
481 ((HRegion)region).close();
482 }
483
484 @Test
485 public void testStoreFileCacheOnWrite() throws IOException {
486 testStoreFileCacheOnWriteInternals(false);
487 testStoreFileCacheOnWriteInternals(true);
488 }
489
490 @Test
491 public void testNotCachingDataBlocksDuringCompaction() throws IOException, InterruptedException {
492 testNotCachingDataBlocksDuringCompactionInternals(false);
493 testNotCachingDataBlocksDuringCompactionInternals(true);
494 }
495 }