1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.io.hfile;
20
21 import static org.junit.Assert.assertArrayEquals;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertNull;
24 import static org.junit.Assert.assertTrue;
25 import static org.junit.Assert.fail;
26
27 import java.io.IOException;
28 import java.nio.ByteBuffer;
29 import java.util.Arrays;
30 import java.util.HashSet;
31 import java.util.Random;
32 import java.util.concurrent.ConcurrentLinkedQueue;
33 import java.util.concurrent.atomic.AtomicInteger;
34
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.hbase.HConstants;
37 import org.apache.hadoop.hbase.MultithreadedTestUtil;
38 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
39 import org.apache.hadoop.hbase.io.HeapSize;
40 import org.apache.hadoop.hbase.io.compress.Compression;
41 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
42 import org.apache.hadoop.hbase.util.ChecksumType;
43
44 public class CacheTestUtils {
45
46 private static final boolean includesMemstoreTS = true;
47
48
49
50
51
52
53 public static void testHeapSizeChanges(final BlockCache toBeTested,
54 final int blockSize) {
55 HFileBlockPair[] blocks = generateHFileBlocks(blockSize, 1);
56 long heapSize = ((HeapSize) toBeTested).heapSize();
57 toBeTested.cacheBlock(blocks[0].blockName, blocks[0].block);
58
59
60 assertTrue(heapSize < ((HeapSize) toBeTested).heapSize());
61
62 toBeTested.evictBlock(blocks[0].blockName);
63
64
65 assertEquals(heapSize, ((HeapSize) toBeTested).heapSize());
66 }
67 public static void testCacheMultiThreaded(final BlockCache toBeTested,
68 final int blockSize, final int numThreads, final int numQueries,
69 final double passingScore) throws Exception {
70
71 Configuration conf = new Configuration();
72 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(
73 conf);
74
75 final AtomicInteger totalQueries = new AtomicInteger();
76 final ConcurrentLinkedQueue<HFileBlockPair> blocksToTest = new ConcurrentLinkedQueue<HFileBlockPair>();
77 final AtomicInteger hits = new AtomicInteger();
78 final AtomicInteger miss = new AtomicInteger();
79
80 HFileBlockPair[] blocks = generateHFileBlocks(numQueries, blockSize);
81 blocksToTest.addAll(Arrays.asList(blocks));
82
83 for (int i = 0; i < numThreads; i++) {
84 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
85 @Override
86 public void doAnAction() throws Exception {
87 if (!blocksToTest.isEmpty()) {
88 HFileBlockPair ourBlock = blocksToTest.poll();
89
90 if (ourBlock == null) {
91 ctx.setStopFlag(true);
92 return;
93 }
94 toBeTested.cacheBlock(ourBlock.blockName, ourBlock.block);
95 Cacheable retrievedBlock = toBeTested.getBlock(ourBlock.blockName,
96 false, false, true);
97 if (retrievedBlock != null) {
98 assertEquals(ourBlock.block, retrievedBlock);
99 toBeTested.evictBlock(ourBlock.blockName);
100 hits.incrementAndGet();
101 assertNull(toBeTested.getBlock(ourBlock.blockName, false, false, true));
102 } else {
103 miss.incrementAndGet();
104 }
105 totalQueries.incrementAndGet();
106 }
107 }
108 };
109 t.setDaemon(true);
110 ctx.addThread(t);
111 }
112 ctx.startThreads();
113 while (!blocksToTest.isEmpty() && ctx.shouldRun()) {
114 Thread.sleep(10);
115 }
116 ctx.stop();
117 if (hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore) {
118 fail("Too many nulls returned. Hits: " + hits.get() + " Misses: "
119 + miss.get());
120 }
121 }
122
123 public static void testCacheSimple(BlockCache toBeTested, int blockSize,
124 int numBlocks) throws Exception {
125
126 HFileBlockPair[] blocks = generateHFileBlocks(numBlocks, blockSize);
127
128 for (HFileBlockPair block : blocks) {
129 assertNull(toBeTested.getBlock(block.blockName, true, false, true));
130 }
131
132
133 for (HFileBlockPair block : blocks) {
134 toBeTested.cacheBlock(block.blockName, block.block);
135 }
136
137
138
139
140
141 for (HFileBlockPair block : blocks) {
142 HFileBlock buf = (HFileBlock) toBeTested.getBlock(block.blockName, true, false, true);
143 if (buf != null) {
144 assertEquals(block.block, buf);
145 }
146
147 }
148
149
150
151 for (HFileBlockPair block : blocks) {
152 try {
153 if (toBeTested.getBlock(block.blockName, true, false, true) != null) {
154 toBeTested.cacheBlock(block.blockName, block.block);
155 if (!(toBeTested instanceof BucketCache)) {
156
157
158 fail("Cache should not allow re-caching a block");
159 }
160 }
161 } catch (RuntimeException re) {
162
163 }
164 }
165
166 }
167
168 public static void hammerSingleKey(final BlockCache toBeTested,
169 int BlockSize, int numThreads, int numQueries) throws Exception {
170 final BlockCacheKey key = new BlockCacheKey("key", 0);
171 final byte[] buf = new byte[5 * 1024];
172 Arrays.fill(buf, (byte) 5);
173
174 final ByteArrayCacheable bac = new ByteArrayCacheable(buf);
175 Configuration conf = new Configuration();
176 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(
177 conf);
178
179 final AtomicInteger totalQueries = new AtomicInteger();
180 toBeTested.cacheBlock(key, bac);
181
182 for (int i = 0; i < numThreads; i++) {
183 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
184 @Override
185 public void doAnAction() throws Exception {
186 ByteArrayCacheable returned = (ByteArrayCacheable) toBeTested
187 .getBlock(key, false, false, true);
188 if (returned != null) {
189 assertArrayEquals(buf, returned.buf);
190 } else {
191 Thread.sleep(10);
192 }
193 totalQueries.incrementAndGet();
194 }
195 };
196
197 t.setDaemon(true);
198 ctx.addThread(t);
199 }
200
201
202 final long blockEvictPeriod = 50;
203 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
204 @Override
205 public void doAnAction() throws Exception {
206 toBeTested.evictBlock(key);
207 toBeTested.cacheBlock(key, bac);
208 Thread.sleep(blockEvictPeriod);
209 }
210 };
211 t.setDaemon(true);
212 ctx.addThread(t);
213
214 ctx.startThreads();
215 while (totalQueries.get() < numQueries && ctx.shouldRun()) {
216 Thread.sleep(10);
217 }
218 ctx.stop();
219 }
220
221 public static void hammerEviction(final BlockCache toBeTested, int BlockSize,
222 int numThreads, int numQueries) throws Exception {
223
224 Configuration conf = new Configuration();
225 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(
226 conf);
227
228 final AtomicInteger totalQueries = new AtomicInteger();
229
230 for (int i = 0; i < numThreads; i++) {
231 final int finalI = i;
232
233 final byte[] buf = new byte[5 * 1024];
234 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
235 @Override
236 public void doAnAction() throws Exception {
237 for (int j = 0; j < 100; j++) {
238 BlockCacheKey key = new BlockCacheKey("key_" + finalI + "_" + j, 0);
239 Arrays.fill(buf, (byte) (finalI * j));
240 final ByteArrayCacheable bac = new ByteArrayCacheable(buf);
241
242 ByteArrayCacheable gotBack = (ByteArrayCacheable) toBeTested
243 .getBlock(key, true, false, true);
244 if (gotBack != null) {
245 assertArrayEquals(gotBack.buf, bac.buf);
246 } else {
247 toBeTested.cacheBlock(key, bac);
248 }
249 }
250 totalQueries.incrementAndGet();
251 }
252 };
253
254 t.setDaemon(true);
255 ctx.addThread(t);
256 }
257
258 ctx.startThreads();
259 while (totalQueries.get() < numQueries && ctx.shouldRun()) {
260 Thread.sleep(10);
261 }
262 ctx.stop();
263
264 assertTrue(toBeTested.getStats().getEvictedCount() > 0);
265 }
266
267 public static class ByteArrayCacheable implements Cacheable {
268
269 static final CacheableDeserializer<Cacheable> blockDeserializer =
270 new CacheableDeserializer<Cacheable>() {
271
272 @Override
273 public Cacheable deserialize(ByteBuffer b) throws IOException {
274 int len = b.getInt();
275 Thread.yield();
276 byte buf[] = new byte[len];
277 b.get(buf);
278 return new ByteArrayCacheable(buf);
279 }
280
281 @Override
282 public int getDeserialiserIdentifier() {
283 return deserializerIdentifier;
284 }
285
286 @Override
287 public Cacheable deserialize(ByteBuffer b, boolean reuse)
288 throws IOException {
289 return deserialize(b);
290 }
291 };
292
293 final byte[] buf;
294
295 public ByteArrayCacheable(byte[] buf) {
296 this.buf = buf;
297 }
298
299 @Override
300 public long heapSize() {
301 return 4 + buf.length;
302 }
303
304 @Override
305 public int getSerializedLength() {
306 return 4 + buf.length;
307 }
308
309 @Override
310 public void serialize(ByteBuffer destination) {
311 destination.putInt(buf.length);
312 Thread.yield();
313 destination.put(buf);
314 destination.rewind();
315 }
316
317 @Override
318 public CacheableDeserializer<Cacheable> getDeserializer() {
319 return blockDeserializer;
320 }
321
322 private static final int deserializerIdentifier;
323 static {
324 deserializerIdentifier = CacheableDeserializerIdManager
325 .registerDeserializer(blockDeserializer);
326 }
327
328 @Override
329 public BlockType getBlockType() {
330 return BlockType.DATA;
331 }
332 }
333
334
335 private static HFileBlockPair[] generateHFileBlocks(int blockSize,
336 int numBlocks) {
337 HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks];
338 Random rand = new Random();
339 HashSet<String> usedStrings = new HashSet<String>();
340 for (int i = 0; i < numBlocks; i++) {
341
342
343
344
345
346 ByteBuffer cachedBuffer = ByteBuffer.allocate(blockSize
347 - HFileBlock.EXTRA_SERIALIZATION_SPACE);
348 rand.nextBytes(cachedBuffer.array());
349 cachedBuffer.rewind();
350 int onDiskSizeWithoutHeader = blockSize
351 - HFileBlock.EXTRA_SERIALIZATION_SPACE;
352 int uncompressedSizeWithoutHeader = blockSize
353 - HFileBlock.EXTRA_SERIALIZATION_SPACE;
354 long prevBlockOffset = rand.nextLong();
355 BlockType.DATA.write(cachedBuffer);
356 cachedBuffer.putInt(onDiskSizeWithoutHeader);
357 cachedBuffer.putInt(uncompressedSizeWithoutHeader);
358 cachedBuffer.putLong(prevBlockOffset);
359 cachedBuffer.rewind();
360 HFileContext meta = new HFileContextBuilder()
361 .withHBaseCheckSum(false)
362 .withIncludesMvcc(includesMemstoreTS)
363 .withIncludesTags(false)
364 .withCompression(Compression.Algorithm.NONE)
365 .withBytesPerCheckSum(0)
366 .withChecksumType(ChecksumType.NULL)
367 .build();
368 HFileBlock generated = new HFileBlock(BlockType.DATA,
369 onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
370 prevBlockOffset, cachedBuffer, HFileBlock.DONT_FILL_HEADER,
371 blockSize,
372 onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, meta);
373
374 String strKey;
375
376 for (strKey = new Long(rand.nextLong()).toString(); !usedStrings
377 .add(strKey); strKey = new Long(rand.nextLong()).toString())
378 ;
379
380 returnedBlocks[i] = new HFileBlockPair();
381 returnedBlocks[i].blockName = new BlockCacheKey(strKey, 0);
382 returnedBlocks[i].block = generated;
383 }
384 return returnedBlocks;
385 }
386
387 private static class HFileBlockPair {
388 BlockCacheKey blockName;
389 HFileBlock block;
390 }
391 }