1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.compactions;
19
20 import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertNull;
24 import static org.junit.Assert.assertTrue;
25 import static org.mockito.AdditionalMatchers.aryEq;
26 import static org.mockito.Matchers.any;
27 import static org.mockito.Matchers.anyBoolean;
28 import static org.mockito.Matchers.anyInt;
29 import static org.mockito.Matchers.anyLong;
30 import static org.mockito.Matchers.argThat;
31 import static org.mockito.Matchers.eq;
32 import static org.mockito.Matchers.isNull;
33 import static org.mockito.Mockito.mock;
34 import static org.mockito.Mockito.only;
35 import static org.mockito.Mockito.times;
36 import static org.mockito.Mockito.verify;
37 import static org.mockito.Mockito.when;
38
39 import java.io.IOException;
40 import java.util.ArrayList;
41 import java.util.Arrays;
42 import java.util.Collection;
43 import java.util.Iterator;
44 import java.util.List;
45
46 import com.google.common.collect.ImmutableList;
47 import com.google.common.collect.Lists;
48
49 import org.apache.hadoop.conf.Configuration;
50 import org.apache.hadoop.fs.Path;
51 import org.apache.hadoop.hbase.Cell;
52 import org.apache.hadoop.hbase.HBaseConfiguration;
53 import org.apache.hadoop.hbase.HColumnDescriptor;
54 import org.apache.hadoop.hbase.HRegionInfo;
55 import org.apache.hadoop.hbase.KeyValue;
56 import org.apache.hadoop.hbase.KeyValue.KVComparator;
57 import org.apache.hadoop.hbase.io.compress.Compression;
58 import org.apache.hadoop.hbase.io.hfile.HFile;
59 import org.apache.hadoop.hbase.regionserver.BloomType;
60 import org.apache.hadoop.hbase.regionserver.InternalScanner;
61 import org.apache.hadoop.hbase.regionserver.ScanType;
62 import org.apache.hadoop.hbase.regionserver.ScannerContext;
63 import org.apache.hadoop.hbase.regionserver.Store;
64 import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
65 import org.apache.hadoop.hbase.regionserver.StoreFile;
66 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
67 import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
68 import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
69 import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager;
70 import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
71 import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
72 import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider;
73 import org.apache.hadoop.hbase.security.User;
74 import org.apache.hadoop.hbase.testclassification.SmallTests;
75 import org.apache.hadoop.hbase.util.Bytes;
76 import org.apache.hadoop.hbase.util.ConcatenatedLists;
77 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
78 import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
79 import org.junit.Test;
80 import org.junit.experimental.categories.Category;
81 import org.junit.runner.RunWith;
82 import org.junit.runners.Parameterized;
83 import org.junit.runners.Parameterized.Parameter;
84 import org.junit.runners.Parameterized.Parameters;
85 import org.mockito.ArgumentMatcher;
86
87 @RunWith(Parameterized.class)
88 @Category(SmallTests.class)
89 public class TestStripeCompactionPolicy {
90 private static final byte[] KEY_A = Bytes.toBytes("aaa");
91 private static final byte[] KEY_B = Bytes.toBytes("bbb");
92 private static final byte[] KEY_C = Bytes.toBytes("ccc");
93 private static final byte[] KEY_D = Bytes.toBytes("ddd");
94 private static final byte[] KEY_E = Bytes.toBytes("eee");
95 private static final KeyValue KV_A = new KeyValue(KEY_A, 0L);
96 private static final KeyValue KV_B = new KeyValue(KEY_B, 0L);
97 private static final KeyValue KV_C = new KeyValue(KEY_C, 0L);
98 private static final KeyValue KV_D = new KeyValue(KEY_D, 0L);
99 private static final KeyValue KV_E = new KeyValue(KEY_E, 0L);
100
101
102 private static long defaultSplitSize = 18;
103 private static float defaultSplitCount = 1.8F;
104 private final static int defaultInitialCount = 1;
105 private static long defaultTtl = 1000 * 1000;
106
107 @Parameters(name = "{index}: usePrivateReaders={0}")
108 public static Iterable<Object[]> data() {
109 return Arrays.asList(new Object[] { true }, new Object[] { false });
110 }
111
112 @Parameter
113 public boolean usePrivateReaders;
114 @Test
115 public void testNoStripesFromFlush() throws Exception {
116 Configuration conf = HBaseConfiguration.create();
117 conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, true);
118 StripeCompactionPolicy policy = createPolicy(conf);
119 StripeInformationProvider si = createStripesL0Only(0, 0);
120
121 KeyValue[] input = new KeyValue[] { KV_A, KV_B, KV_C, KV_D, KV_E };
122 KeyValue[][] expected = new KeyValue[][] { input };
123 verifyFlush(policy, si, input, expected, null);
124 }
125
126 @Test
127 public void testOldStripesFromFlush() throws Exception {
128 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
129 StripeInformationProvider si = createStripes(0, KEY_C, KEY_D);
130
131 KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E };
132 KeyValue[][] expected = new KeyValue[][] { new KeyValue[] { KV_B },
133 new KeyValue[] { KV_C, KV_C }, new KeyValue[] { KV_D, KV_E } };
134 verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, KEY_C, KEY_D, OPEN_KEY });
135 }
136
137 @Test
138 public void testNewStripesFromFlush() throws Exception {
139 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
140 StripeInformationProvider si = createStripesL0Only(0, 0);
141 KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E };
142
143 KeyValue[][] expected = new KeyValue[][] { input };
144 verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, OPEN_KEY });
145 }
146
147 @Test
148 public void testSingleStripeCompaction() throws Exception {
149
150 Configuration conf = HBaseConfiguration.create();
151 conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.0F);
152 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 3);
153 conf.setInt(StripeStoreConfig.MAX_FILES_KEY, 4);
154 conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, 1000);
155 StoreConfigInformation sci = mock(StoreConfigInformation.class);
156 StripeStoreConfig ssc = new StripeStoreConfig(conf, sci);
157 StripeCompactionPolicy policy = new StripeCompactionPolicy(conf, sci, ssc) {
158 @Override
159 public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
160 List<StoreFile> filesCompacting, boolean isOffpeak) throws IOException {
161 if (!filesCompacting.isEmpty()) return null;
162 return selectSingleStripeCompaction(si, false, false, isOffpeak);
163 }
164
165 @Override
166 public boolean needsCompactions(
167 StripeInformationProvider si, List<StoreFile> filesCompacting) {
168 if (!filesCompacting.isEmpty()) return false;
169 return needsSingleStripeCompaction(si);
170 }
171 };
172
173
174 StripeInformationProvider si = createStripesWithSizes(0, 0,
175 new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 1L });
176 verifyNoCompaction(policy, si);
177
178 si = createStripesWithSizes(0, 0,
179 new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 1L, 1L });
180 assertNull(policy.selectCompaction(si, al(), false));
181 assertTrue(policy.needsCompactions(si, al()));
182
183 si = createStripesWithSizes(0, 0,
184 new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 4L, 3L });
185 verifySingleStripeCompaction(policy, si, 2, null);
186
187 si = createStripesWithSizes(0, 0,
188 new Long[] { 3L, 2L, 2L }, new Long[] { 2L, 2L, 1L }, new Long[] { 3L, 2L, 2L, 1L });
189 verifySingleStripeCompaction(policy, si, 2, null);
190 si = createStripesWithSizes(0, 0,
191 new Long[] { 5L }, new Long[] { 3L, 2L, 2L, 1L }, new Long[] { 3L, 2L, 2L });
192 verifySingleStripeCompaction(policy, si, 1, null);
193
194 si = createStripesWithSizes(0, 0,
195 new Long[] { 3L, 3L, 3L }, new Long[] { 3L, 1L, 2L }, new Long[] { 3L, 2L, 2L });
196 verifySingleStripeCompaction(policy, si, 1, null);
197
198 si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 5L, 4L, 4L, 4L, 4L });
199 List<StoreFile> sfs = si.getStripes().get(1).subList(1, 5);
200 verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true);
201
202 si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 50L, 4L, 4L, 4L, 4L });
203 sfs = si.getStripes().get(1).subList(1, 5);
204 verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true);
205 }
206
207 @Test
208 public void testWithParallelCompaction() throws Exception {
209
210
211 assertNull(createPolicy(HBaseConfiguration.create()).selectCompaction(
212 mock(StripeInformationProvider.class), al(createFile()), false));
213 }
214
215 @Test
216 public void testWithReferences() throws Exception {
217 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
218 StripeCompactor sc = mock(StripeCompactor.class);
219 StoreFile ref = createFile();
220 when(ref.isReference()).thenReturn(true);
221 StripeInformationProvider si = mock(StripeInformationProvider.class);
222 Collection<StoreFile> sfs = al(ref, createFile());
223 when(si.getStorefiles()).thenReturn(sfs);
224
225 assertTrue(policy.needsCompactions(si, al()));
226 StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
227 assertEquals(si.getStorefiles(), scr.getRequest().getFiles());
228 scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
229 verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY),
230 aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY),
231 any(NoLimitCompactionThroughputController.class), any(User.class));
232 }
233
234 @Test
235 public void testInitialCountFromL0() throws Exception {
236 Configuration conf = HBaseConfiguration.create();
237 conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 2);
238 StripeCompactionPolicy policy = createPolicy(
239 conf, defaultSplitSize, defaultSplitCount, 2, false);
240 StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 8);
241 verifyCompaction(policy, si, si.getStorefiles(), true, 2, 12L, OPEN_KEY, OPEN_KEY, true);
242 si = createStripesL0Only(3, 10);
243 verifyCompaction(policy, si, si.getStorefiles(), true, 3, 10L, OPEN_KEY, OPEN_KEY, true);
244 policy = createPolicy(conf, defaultSplitSize, defaultSplitCount, 6, false);
245 verifyCompaction(policy, si, si.getStorefiles(), true, 6, 5L, OPEN_KEY, OPEN_KEY, true);
246 }
247
248 @Test
249 public void testExistingStripesFromL0() throws Exception {
250 Configuration conf = HBaseConfiguration.create();
251 conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 3);
252 StripeCompactionPolicy.StripeInformationProvider si = createStripes(3, KEY_A);
253 verifyCompaction(
254 createPolicy(conf), si, si.getLevel0Files(), null, null, si.getStripeBoundaries());
255 }
256
257 @Test
258 public void testNothingToCompactFromL0() throws Exception {
259 Configuration conf = HBaseConfiguration.create();
260 conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 4);
261 StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 10);
262 StripeCompactionPolicy policy = createPolicy(conf);
263 verifyNoCompaction(policy, si);
264
265 si = createStripes(3, KEY_A);
266 verifyNoCompaction(policy, si);
267 }
268
269 @Test
270 public void testSplitOffStripe() throws Exception {
271 Configuration conf = HBaseConfiguration.create();
272
273 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
274 Long[] toSplit = new Long[] { defaultSplitSize - 2, 1L, 1L };
275 Long[] noSplit = new Long[] { defaultSplitSize - 2, 1L };
276 long splitTargetSize = (long)(defaultSplitSize / defaultSplitCount);
277
278 StripeCompactionPolicy.StripeInformationProvider si =
279 createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 2L });
280 assertNull(createPolicy(conf).selectCompaction(si, al(), false));
281
282 conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 500f);
283 StripeCompactionPolicy policy = createPolicy(conf);
284 verifyWholeStripesCompaction(policy, si, 0, 0, null, 2, splitTargetSize);
285
286 si = createStripesWithSizes(0, 0, noSplit, noSplit, toSplit);
287 verifyWholeStripesCompaction(policy, si, 2, 2, null, 2, splitTargetSize);
288
289 si = createStripesWithSizes(0, 0, noSplit, toSplit, noSplit);
290 verifyWholeStripesCompaction(policy, si, 1, 1, null, 2, splitTargetSize);
291
292
293 StripeCompactionPolicy specPolicy = createPolicy(
294 conf, defaultSplitSize + 1, defaultSplitCount, defaultInitialCount, false);
295 verifySingleStripeCompaction(specPolicy, si, 1, null);
296 }
297
298 @Test
299 public void testSplitOffStripeOffPeak() throws Exception {
300
301 Configuration conf = HBaseConfiguration.create();
302 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
303
304 StripeCompactionPolicy.StripeInformationProvider si =
305 createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 1L, 1L });
306 assertEquals(2, createPolicy(conf).selectCompaction(si, al(), false).getRequest().getFiles()
307 .size());
308
309 conf.setFloat("hbase.hstore.compaction.ratio.offpeak", 500f);
310 assertEquals(3, createPolicy(conf).selectCompaction(si, al(), true).getRequest().getFiles()
311 .size());
312 }
313
314 @Test
315 public void testSplitOffStripeDropDeletes() throws Exception {
316 Configuration conf = HBaseConfiguration.create();
317 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
318 StripeCompactionPolicy policy = createPolicy(conf);
319 Long[] toSplit = new Long[] { defaultSplitSize / 2, defaultSplitSize / 2 };
320 Long[] noSplit = new Long[] { 1L };
321 long splitTargetSize = (long)(defaultSplitSize / defaultSplitCount);
322
323
324 StripeCompactionPolicy.StripeInformationProvider si =
325 createStripesWithSizes(0, 0, noSplit, toSplit);
326 verifyWholeStripesCompaction(policy, si, 1, 1, true, null, splitTargetSize);
327
328 si = createStripesWithSizes(2, 2, noSplit, toSplit);
329 verifyWholeStripesCompaction(policy, si, 1, 1, false, null, splitTargetSize);
330 }
331
332 @SuppressWarnings("unchecked")
333 @Test
334 public void testMergeExpiredFiles() throws Exception {
335 ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
336 long now = defaultTtl + 2;
337 edge.setValue(now);
338 EnvironmentEdgeManager.injectEdge(edge);
339 try {
340 StoreFile expiredFile = createFile(), notExpiredFile = createFile();
341 when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
342 when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
343 List<StoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
344 List<StoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile);
345 List<StoreFile> mixed = Lists.newArrayList(expiredFile, notExpiredFile);
346
347 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create(),
348 defaultSplitSize, defaultSplitCount, defaultInitialCount, true);
349
350 StripeCompactionPolicy.StripeInformationProvider si =
351 createStripesWithFiles(expired, expired, expired);
352 verifyWholeStripesCompaction(policy, si, 0, 2, null, 1, Long.MAX_VALUE, false);
353
354 si = createStripesWithFiles(notExpired, notExpired, notExpired);
355 assertNull(policy.selectCompaction(si, al(), false));
356
357 si = createStripesWithFiles(notExpired, expired, notExpired);
358 verifyWholeStripesCompaction(policy, si, 1, 2, null, 1, Long.MAX_VALUE, false);
359
360
361 si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired);
362 verifyWholeStripesCompaction(policy, si, 3, 4, null, 1, Long.MAX_VALUE, false);
363
364 si = createStripesWithFiles(expired, expired, notExpired, expired, mixed);
365 verifyWholeStripesCompaction(policy, si, 0, 1, null, 1, Long.MAX_VALUE, false);
366 } finally {
367 EnvironmentEdgeManager.reset();
368 }
369 }
370
371 @SuppressWarnings("unchecked")
372 @Test
373 public void testMergeExpiredStripes() throws Exception {
374
375 ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
376 long now = defaultTtl + 2;
377 edge.setValue(now);
378 EnvironmentEdgeManager.injectEdge(edge);
379 try {
380 StoreFile expiredFile = createFile(), notExpiredFile = createFile();
381 when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
382 when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
383 List<StoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
384 List<StoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile);
385
386 StripeCompactionPolicy policy =
387 createPolicy(HBaseConfiguration.create(), defaultSplitSize, defaultSplitCount,
388 defaultInitialCount, true);
389
390
391 StripeCompactionPolicy.StripeInformationProvider si =
392 createStripesWithFiles(expired, expired, expired);
393 verifyMergeCompatcion(policy, si, 0, 2);
394
395
396 si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired);
397 verifyMergeCompatcion(policy, si, 3, 4);
398 } finally {
399 EnvironmentEdgeManager.reset();
400 }
401 }
402
403 @SuppressWarnings("unchecked")
404 private static StripeCompactionPolicy.StripeInformationProvider createStripesWithFiles(
405 List<StoreFile>... stripeFiles) throws Exception {
406 return createStripesWithFiles(createBoundaries(stripeFiles.length),
407 Lists.newArrayList(stripeFiles), new ArrayList<StoreFile>());
408 }
409
410 @Test
411 public void testSingleStripeDropDeletes() throws Exception {
412 Configuration conf = HBaseConfiguration.create();
413 StripeCompactionPolicy policy = createPolicy(conf);
414
415 Long[][] stripes = new Long[][] { new Long[] { 3L, 2L, 2L, 2L }, new Long[] { 6L } };
416 StripeInformationProvider si = createStripesWithSizes(0, 0, stripes);
417 verifySingleStripeCompaction(policy, si, 0, true);
418
419 si = createStripesWithSizes(2, 2, stripes);
420 verifySingleStripeCompaction(policy, si, 0, false);
421
422 si = createStripesWithSizes(6, 2, stripes);
423 ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
424 sfs.addSublist(si.getLevel0Files());
425 sfs.addSublist(si.getStripes().get(0));
426 verifyCompaction(
427 policy, si, sfs, si.getStartRow(0), si.getEndRow(0), si.getStripeBoundaries());
428
429 si = createStripesWithSizes(6, 2,
430 new Long[][] { new Long[] { 10L, 1L, 1L, 1L, 1L }, new Long[] { 12L } });
431 verifyCompaction(policy, si, si.getLevel0Files(), null, null, si.getStripeBoundaries());
432
433
434 stripes = new Long[][] { new Long[] { 100L, 3L, 2L, 2L, 2L }, new Long[] { 6L } };
435 si = createStripesWithSizes(0, 0, stripes);
436 List<StoreFile> compact_file = new ArrayList<StoreFile>();
437 Iterator<StoreFile> iter = si.getStripes().get(0).listIterator(1);
438 while (iter.hasNext()) {
439 compact_file.add(iter.next());
440 }
441 verifyCompaction(policy, si, compact_file, false, 1, null, si.getStartRow(0), si.getEndRow(0), true);
442 }
443
444
445 private static StripeCompactionPolicy createPolicy(
446 Configuration conf) throws Exception {
447 return createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, false);
448 }
449
450 private static StripeCompactionPolicy createPolicy(Configuration conf,
451 long splitSize, float splitCount, int initialCount, boolean hasTtl) throws Exception {
452 conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize);
453 conf.setFloat(StripeStoreConfig.SPLIT_PARTS_KEY, splitCount);
454 conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialCount);
455 StoreConfigInformation sci = mock(StoreConfigInformation.class);
456 when(sci.getStoreFileTtl()).thenReturn(hasTtl ? defaultTtl : Long.MAX_VALUE);
457 StripeStoreConfig ssc = new StripeStoreConfig(conf, sci);
458 return new StripeCompactionPolicy(conf, sci, ssc);
459 }
460
461 private static ArrayList<StoreFile> al(StoreFile... sfs) {
462 return new ArrayList<StoreFile>(Arrays.asList(sfs));
463 }
464
465 private void verifyMergeCompatcion(StripeCompactionPolicy policy, StripeInformationProvider si,
466 int from, int to) throws Exception {
467 StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
468 Collection<StoreFile> sfs = getAllFiles(si, from, to);
469 verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
470
471
472
473 StripeCompactor sc = createCompactor();
474 List<Path> paths = scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
475 assertEquals(1, paths.size());
476 }
477
478
479
480
481
482
483
484
485
486
487
488 private void verifyWholeStripesCompaction(StripeCompactionPolicy policy,
489 StripeInformationProvider si, int from, int to, Boolean dropDeletes,
490 Integer count, Long size, boolean needsCompaction) throws IOException {
491 verifyCompaction(policy, si, getAllFiles(si, from, to), dropDeletes,
492 count, size, si.getStartRow(from), si.getEndRow(to), needsCompaction);
493 }
494
495 private void verifyWholeStripesCompaction(StripeCompactionPolicy policy,
496 StripeInformationProvider si, int from, int to, Boolean dropDeletes,
497 Integer count, Long size) throws IOException {
498 verifyWholeStripesCompaction(policy, si, from, to, dropDeletes, count, size, true);
499 }
500
501 private void verifySingleStripeCompaction(StripeCompactionPolicy policy,
502 StripeInformationProvider si, int index, Boolean dropDeletes) throws IOException {
503 verifyWholeStripesCompaction(policy, si, index, index, dropDeletes, 1, null, true);
504 }
505
506
507
508
509
510
511 private void verifyNoCompaction(
512 StripeCompactionPolicy policy, StripeInformationProvider si) throws IOException {
513 assertNull(policy.selectCompaction(si, al(), false));
514 assertFalse(policy.needsCompactions(si, al()));
515 }
516
517
518
519
520
521
522
523
524
525
526 private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si,
527 Collection<StoreFile> sfs, byte[] dropDeletesFrom, byte[] dropDeletesTo,
528 final List<byte[]> boundaries) throws Exception {
529 StripeCompactor sc = mock(StripeCompactor.class);
530 assertTrue(policy.needsCompactions(si, al()));
531 StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
532 verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
533 scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
534 verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher<List<byte[]>>() {
535 @Override
536 public boolean matches(Object argument) {
537 @SuppressWarnings("unchecked")
538 List<byte[]> other = (List<byte[]>) argument;
539 if (other.size() != boundaries.size()) return false;
540 for (int i = 0; i < other.size(); ++i) {
541 if (!Bytes.equals(other.get(i), boundaries.get(i))) return false;
542 }
543 return true;
544 }
545 }), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom),
546 dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo),
547 any(NoLimitCompactionThroughputController.class), any(User.class));
548 }
549
550
551
552
553
554
555
556
557
558
559
560
561 private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si,
562 Collection<StoreFile> sfs, Boolean dropDeletes, Integer count, Long size,
563 byte[] start, byte[] end, boolean needsCompaction) throws IOException {
564 StripeCompactor sc = mock(StripeCompactor.class);
565 assertTrue(!needsCompaction || policy.needsCompactions(si, al()));
566 StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
567 verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
568 scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
569 verify(sc, times(1)).compact(eq(scr.getRequest()),
570 count == null ? anyInt() : eq(count.intValue()),
571 size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end),
572 dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end),
573 any(NoLimitCompactionThroughputController.class), any(User.class));
574 }
575
576
577 protected void verifyFlush(StripeCompactionPolicy policy, StripeInformationProvider si,
578 KeyValue[] input, KeyValue[][] expected, byte[][] boundaries) throws IOException {
579 StoreFileWritersCapture writers = new StoreFileWritersCapture();
580 StripeStoreFlusher.StripeFlushRequest req = policy.selectFlush(new KVComparator(), si,
581 input.length);
582 StripeMultiFileWriter mw = req.createWriter();
583 mw.init(null, writers);
584 for (KeyValue kv : input) {
585 mw.append(kv);
586 }
587 boolean hasMetadata = boundaries != null;
588 mw.commitWriters(0, false);
589 writers.verifyKvs(expected, true, hasMetadata);
590 if (hasMetadata) {
591 writers.verifyBoundaries(boundaries);
592 }
593 }
594
595
596 private byte[] dropDeletesMatcher(Boolean dropDeletes, byte[] value) {
597 return dropDeletes == null ? any(byte[].class)
598 : (dropDeletes.booleanValue() ? aryEq(value) : isNull(byte[].class));
599 }
600
601 private void verifyCollectionsEqual(Collection<StoreFile> sfs, Collection<StoreFile> scr) {
602
603 assertEquals(sfs.size(), scr.size());
604 assertTrue(scr.containsAll(sfs));
605 }
606
607 private static List<StoreFile> getAllFiles(
608 StripeInformationProvider si, int fromStripe, int toStripe) {
609 ArrayList<StoreFile> expected = new ArrayList<StoreFile>();
610 for (int i = fromStripe; i <= toStripe; ++i) {
611 expected.addAll(si.getStripes().get(i));
612 }
613 return expected;
614 }
615
616
617
618
619
620
621 private static StripeInformationProvider createStripes(
622 int l0Count, byte[]... boundaries) throws Exception {
623 List<Long> l0Sizes = new ArrayList<Long>();
624 for (int i = 0; i < l0Count; ++i) {
625 l0Sizes.add(5L);
626 }
627 List<List<Long>> sizes = new ArrayList<List<Long>>();
628 for (int i = 0; i <= boundaries.length; ++i) {
629 sizes.add(Arrays.asList(Long.valueOf(5)));
630 }
631 return createStripes(Arrays.asList(boundaries), sizes, l0Sizes);
632 }
633
634
635
636
637
638
639 private static StripeInformationProvider createStripesL0Only(
640 int l0Count, long l0Size) throws Exception {
641 List<Long> l0Sizes = new ArrayList<Long>();
642 for (int i = 0; i < l0Count; ++i) {
643 l0Sizes.add(l0Size);
644 }
645 return createStripes(null, new ArrayList<List<Long>>(), l0Sizes);
646 }
647
648
649
650
651
652
653
654 private static StripeInformationProvider createStripesWithSizes(
655 int l0Count, long l0Size, Long[]... sizes) throws Exception {
656 ArrayList<List<Long>> sizeList = new ArrayList<List<Long>>();
657 for (Long[] size : sizes) {
658 sizeList.add(Arrays.asList(size));
659 }
660 return createStripesWithSizes(l0Count, l0Size, sizeList);
661 }
662
663 private static StripeInformationProvider createStripesWithSizes(
664 int l0Count, long l0Size, List<List<Long>> sizes) throws Exception {
665 List<byte[]> boundaries = createBoundaries(sizes.size());
666 List<Long> l0Sizes = new ArrayList<Long>();
667 for (int i = 0; i < l0Count; ++i) {
668 l0Sizes.add(l0Size);
669 }
670 return createStripes(boundaries, sizes, l0Sizes);
671 }
672
673 private static List<byte[]> createBoundaries(int stripeCount) {
674 byte[][] keys = new byte[][] { KEY_A, KEY_B, KEY_C, KEY_D, KEY_E };
675 assert stripeCount <= keys.length + 1;
676 List<byte[]> boundaries = new ArrayList<byte[]>();
677 boundaries.addAll(Arrays.asList(keys).subList(0, stripeCount - 1));
678 return boundaries;
679 }
680
681 private static StripeInformationProvider createStripes(List<byte[]> boundaries,
682 List<List<Long>> stripeSizes, List<Long> l0Sizes) throws Exception {
683 List<List<StoreFile>> stripeFiles = new ArrayList<List<StoreFile>>(stripeSizes.size());
684 for (List<Long> sizes : stripeSizes) {
685 List<StoreFile> sfs = new ArrayList<StoreFile>();
686 for (Long size : sizes) {
687 sfs.add(createFile(size));
688 }
689 stripeFiles.add(sfs);
690 }
691 List<StoreFile> l0Files = new ArrayList<StoreFile>();
692 for (Long size : l0Sizes) {
693 l0Files.add(createFile(size));
694 }
695 return createStripesWithFiles(boundaries, stripeFiles, l0Files);
696 }
697
698
699
700
701 private static StripeInformationProvider createStripesWithFiles(List<byte[]> boundaries,
702 List<List<StoreFile>> stripeFiles, List<StoreFile> l0Files) throws Exception {
703 ArrayList<ImmutableList<StoreFile>> stripes = new ArrayList<ImmutableList<StoreFile>>();
704 ArrayList<byte[]> boundariesList = new ArrayList<byte[]>();
705 StripeInformationProvider si = mock(StripeInformationProvider.class);
706 if (!stripeFiles.isEmpty()) {
707 assert stripeFiles.size() == (boundaries.size() + 1);
708 boundariesList.add(OPEN_KEY);
709 for (int i = 0; i <= boundaries.size(); ++i) {
710 byte[] startKey = ((i == 0) ? OPEN_KEY : boundaries.get(i - 1));
711 byte[] endKey = ((i == boundaries.size()) ? OPEN_KEY : boundaries.get(i));
712 boundariesList.add(endKey);
713 for (StoreFile sf : stripeFiles.get(i)) {
714 setFileStripe(sf, startKey, endKey);
715 }
716 stripes.add(ImmutableList.copyOf(stripeFiles.get(i)));
717 when(si.getStartRow(eq(i))).thenReturn(startKey);
718 when(si.getEndRow(eq(i))).thenReturn(endKey);
719 }
720 }
721 ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
722 sfs.addAllSublists(stripes);
723 sfs.addSublist(l0Files);
724 when(si.getStorefiles()).thenReturn(sfs);
725 when(si.getStripes()).thenReturn(stripes);
726 when(si.getStripeBoundaries()).thenReturn(boundariesList);
727 when(si.getStripeCount()).thenReturn(stripes.size());
728 when(si.getLevel0Files()).thenReturn(l0Files);
729 return si;
730 }
731
732 private static StoreFile createFile(long size) throws Exception {
733 StoreFile sf = mock(StoreFile.class);
734 when(sf.getPath()).thenReturn(new Path("moo"));
735 StoreFile.Reader r = mock(StoreFile.Reader.class);
736 when(r.getEntries()).thenReturn(size);
737 when(r.length()).thenReturn(size);
738 when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
739 when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
740 when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenReturn(
741 mock(StoreFileScanner.class));
742 when(sf.getReader()).thenReturn(r);
743 when(sf.createReader()).thenReturn(r);
744 when(sf.cloneForReader()).thenReturn(sf);
745 return sf;
746 }
747
748 private static StoreFile createFile() throws Exception {
749 return createFile(0);
750 }
751
752 private static void setFileStripe(StoreFile sf, byte[] startKey, byte[] endKey) {
753 when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_START_KEY)).thenReturn(startKey);
754 when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_END_KEY)).thenReturn(endKey);
755 }
756
757 private StripeCompactor createCompactor() throws Exception {
758 HColumnDescriptor col = new HColumnDescriptor(Bytes.toBytes("foo"));
759 StoreFileWritersCapture writers = new StoreFileWritersCapture();
760 Store store = mock(Store.class);
761 HRegionInfo info = mock(HRegionInfo.class);
762 when(info.getRegionNameAsString()).thenReturn("testRegion");
763 when(store.getFamily()).thenReturn(col);
764 when(store.getRegionInfo()).thenReturn(info);
765 when(
766 store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), anyBoolean(),
767 anyBoolean(), anyBoolean())).thenAnswer(writers);
768
769 Configuration conf = HBaseConfiguration.create();
770 conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders);
771 final Scanner scanner = new Scanner();
772 return new StripeCompactor(conf, store) {
773 @Override
774 protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
775 long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
776 byte[] dropDeletesToRow) throws IOException {
777 return scanner;
778 }
779
780 @Override
781 protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
782 ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
783 return scanner;
784 }
785 };
786 }
787
788 private static class Scanner implements InternalScanner {
789 private final ArrayList<KeyValue> kvs;
790
791 public Scanner(KeyValue... kvs) {
792 this.kvs = new ArrayList<KeyValue>(Arrays.asList(kvs));
793 }
794
795 @Override
796 public boolean next(List<Cell> results) throws IOException {
797 if (kvs.isEmpty()) return false;
798 results.add(kvs.remove(0));
799 return !kvs.isEmpty();
800 }
801
802 @Override
803 public boolean next(List<Cell> result, ScannerContext scannerContext)
804 throws IOException {
805 return next(result);
806 }
807
808 @Override
809 public void close() throws IOException {
810 }
811 }
812 }