View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.compactions;
19  
20  import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertNull;
24  import static org.junit.Assert.assertTrue;
25  import static org.mockito.AdditionalMatchers.aryEq;
26  import static org.mockito.Matchers.any;
27  import static org.mockito.Matchers.anyBoolean;
28  import static org.mockito.Matchers.anyInt;
29  import static org.mockito.Matchers.anyLong;
30  import static org.mockito.Matchers.argThat;
31  import static org.mockito.Matchers.eq;
32  import static org.mockito.Matchers.isNull;
33  import static org.mockito.Mockito.mock;
34  import static org.mockito.Mockito.only;
35  import static org.mockito.Mockito.times;
36  import static org.mockito.Mockito.verify;
37  import static org.mockito.Mockito.when;
38  
39  import java.io.IOException;
40  import java.util.ArrayList;
41  import java.util.Arrays;
42  import java.util.Collection;
43  import java.util.Iterator;
44  import java.util.List;
45  
46  import com.google.common.collect.ImmutableList;
47  import com.google.common.collect.Lists;
48  
49  import org.apache.hadoop.conf.Configuration;
50  import org.apache.hadoop.fs.Path;
51  import org.apache.hadoop.hbase.Cell;
52  import org.apache.hadoop.hbase.HBaseConfiguration;
53  import org.apache.hadoop.hbase.HColumnDescriptor;
54  import org.apache.hadoop.hbase.HRegionInfo;
55  import org.apache.hadoop.hbase.KeyValue;
56  import org.apache.hadoop.hbase.KeyValue.KVComparator;
57  import org.apache.hadoop.hbase.io.compress.Compression;
58  import org.apache.hadoop.hbase.io.hfile.HFile;
59  import org.apache.hadoop.hbase.regionserver.BloomType;
60  import org.apache.hadoop.hbase.regionserver.InternalScanner;
61  import org.apache.hadoop.hbase.regionserver.ScanType;
62  import org.apache.hadoop.hbase.regionserver.ScannerContext;
63  import org.apache.hadoop.hbase.regionserver.Store;
64  import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
65  import org.apache.hadoop.hbase.regionserver.StoreFile;
66  import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
67  import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
68  import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
69  import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager;
70  import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
71  import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
72  import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider;
73  import org.apache.hadoop.hbase.security.User;
74  import org.apache.hadoop.hbase.testclassification.SmallTests;
75  import org.apache.hadoop.hbase.util.Bytes;
76  import org.apache.hadoop.hbase.util.ConcatenatedLists;
77  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
78  import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
79  import org.junit.Test;
80  import org.junit.experimental.categories.Category;
81  import org.junit.runner.RunWith;
82  import org.junit.runners.Parameterized;
83  import org.junit.runners.Parameterized.Parameter;
84  import org.junit.runners.Parameterized.Parameters;
85  import org.mockito.ArgumentMatcher;
86  
87  @RunWith(Parameterized.class)
88  @Category(SmallTests.class)
89  public class TestStripeCompactionPolicy {
90    private static final byte[] KEY_A = Bytes.toBytes("aaa");
91    private static final byte[] KEY_B = Bytes.toBytes("bbb");
92    private static final byte[] KEY_C = Bytes.toBytes("ccc");
93    private static final byte[] KEY_D = Bytes.toBytes("ddd");
94    private static final byte[] KEY_E = Bytes.toBytes("eee");
95    private static final KeyValue KV_A = new KeyValue(KEY_A, 0L);
96    private static final KeyValue KV_B = new KeyValue(KEY_B, 0L);
97    private static final KeyValue KV_C = new KeyValue(KEY_C, 0L);
98    private static final KeyValue KV_D = new KeyValue(KEY_D, 0L);
99    private static final KeyValue KV_E = new KeyValue(KEY_E, 0L);
100 
101 
102   private static long defaultSplitSize = 18;
103   private static float defaultSplitCount = 1.8F;
104   private final static int defaultInitialCount = 1;
105   private static long defaultTtl = 1000 * 1000;
106 
107   @Parameters(name = "{index}: usePrivateReaders={0}")
108   public static Iterable<Object[]> data() {
109     return Arrays.asList(new Object[] { true }, new Object[] { false });
110   }
111 
112   @Parameter
113   public boolean usePrivateReaders;
114   @Test
115   public void testNoStripesFromFlush() throws Exception {
116     Configuration conf = HBaseConfiguration.create();
117     conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, true);
118     StripeCompactionPolicy policy = createPolicy(conf);
119     StripeInformationProvider si = createStripesL0Only(0, 0);
120 
121     KeyValue[] input = new KeyValue[] { KV_A, KV_B, KV_C, KV_D, KV_E };
122     KeyValue[][] expected = new KeyValue[][] { input };
123     verifyFlush(policy, si, input, expected, null);
124   }
125 
126   @Test
127   public void testOldStripesFromFlush() throws Exception {
128     StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
129     StripeInformationProvider si = createStripes(0, KEY_C, KEY_D);
130 
131     KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E };
132     KeyValue[][] expected = new KeyValue[][] { new KeyValue[] { KV_B },
133         new KeyValue[] { KV_C, KV_C }, new KeyValue[] {  KV_D, KV_E } };
134     verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, KEY_C, KEY_D, OPEN_KEY });
135   }
136 
137   @Test
138   public void testNewStripesFromFlush() throws Exception {
139     StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
140     StripeInformationProvider si = createStripesL0Only(0, 0);
141     KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E };
142     // Starts with one stripe; unlike flush results, must have metadata
143     KeyValue[][] expected = new KeyValue[][] { input };
144     verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, OPEN_KEY });
145   }
146 
147   @Test
148   public void testSingleStripeCompaction() throws Exception {
149     // Create a special policy that only compacts single stripes, using standard methods.
150     Configuration conf = HBaseConfiguration.create();
151     conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.0F);
152     conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 3);
153     conf.setInt(StripeStoreConfig.MAX_FILES_KEY, 4);
154     conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, 1000); // make sure the are no splits
155     StoreConfigInformation sci = mock(StoreConfigInformation.class);
156     StripeStoreConfig ssc = new StripeStoreConfig(conf, sci);
157     StripeCompactionPolicy policy = new StripeCompactionPolicy(conf, sci, ssc) {
158       @Override
159       public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
160           List<StoreFile> filesCompacting, boolean isOffpeak) throws IOException {
161         if (!filesCompacting.isEmpty()) return null;
162         return selectSingleStripeCompaction(si, false, false, isOffpeak);
163       }
164 
165       @Override
166       public boolean needsCompactions(
167           StripeInformationProvider si, List<StoreFile> filesCompacting) {
168         if (!filesCompacting.isEmpty()) return false;
169         return needsSingleStripeCompaction(si);
170       }
171     };
172 
173     // No compaction due to min files or ratio
174     StripeInformationProvider si = createStripesWithSizes(0, 0,
175         new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 1L });
176     verifyNoCompaction(policy, si);
177     // No compaction due to min files or ratio - will report needed, but not do any.
178     si = createStripesWithSizes(0, 0,
179         new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 1L, 1L });
180     assertNull(policy.selectCompaction(si, al(), false));
181     assertTrue(policy.needsCompactions(si, al()));
182     // One stripe has possible compaction
183     si = createStripesWithSizes(0, 0,
184         new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 4L, 3L });
185     verifySingleStripeCompaction(policy, si, 2, null);
186     // Several stripes have possible compactions; choose best quality (removes most files)
187     si = createStripesWithSizes(0, 0,
188         new Long[] { 3L, 2L, 2L }, new Long[] { 2L, 2L, 1L }, new Long[] { 3L, 2L, 2L, 1L });
189     verifySingleStripeCompaction(policy, si, 2, null);
190     si = createStripesWithSizes(0, 0,
191         new Long[] { 5L }, new Long[] { 3L, 2L, 2L, 1L }, new Long[] { 3L, 2L, 2L });
192     verifySingleStripeCompaction(policy, si, 1, null);
193     // Or with smallest files, if the count is the same 
194     si = createStripesWithSizes(0, 0,
195         new Long[] { 3L, 3L, 3L }, new Long[] { 3L, 1L, 2L }, new Long[] { 3L, 2L, 2L });
196     verifySingleStripeCompaction(policy, si, 1, null);
197     // Verify max count is respected.
198     si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 5L, 4L, 4L, 4L, 4L });
199     List<StoreFile> sfs = si.getStripes().get(1).subList(1, 5);
200     verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true);
201     // Verify ratio is applied.
202     si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 50L, 4L, 4L, 4L, 4L });
203     sfs = si.getStripes().get(1).subList(1, 5);
204     verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true);
205   }
206 
207   @Test
208   public void testWithParallelCompaction() throws Exception {
209     // TODO: currently only one compaction at a time per store is allowed. If this changes,
210     //       the appropriate file exclusion testing would need to be done in respective tests.
211     assertNull(createPolicy(HBaseConfiguration.create()).selectCompaction(
212         mock(StripeInformationProvider.class), al(createFile()), false));
213   }
214 
215   @Test
216   public void testWithReferences() throws Exception {
217     StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
218     StripeCompactor sc = mock(StripeCompactor.class);
219     StoreFile ref = createFile();
220     when(ref.isReference()).thenReturn(true);
221     StripeInformationProvider si = mock(StripeInformationProvider.class);
222     Collection<StoreFile> sfs = al(ref, createFile());
223     when(si.getStorefiles()).thenReturn(sfs);
224 
225     assertTrue(policy.needsCompactions(si, al()));
226     StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
227     assertEquals(si.getStorefiles(), scr.getRequest().getFiles());
228     scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
229     verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY),
230       aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY),
231       any(NoLimitCompactionThroughputController.class), any(User.class));
232   }
233 
234   @Test
235   public void testInitialCountFromL0() throws Exception {
236     Configuration conf = HBaseConfiguration.create();
237     conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 2);
238     StripeCompactionPolicy policy = createPolicy(
239         conf, defaultSplitSize, defaultSplitCount, 2, false);
240     StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 8);
241     verifyCompaction(policy, si, si.getStorefiles(), true, 2, 12L, OPEN_KEY, OPEN_KEY, true);
242     si = createStripesL0Only(3, 10); // If result would be too large, split into smaller parts.
243     verifyCompaction(policy, si, si.getStorefiles(), true, 3, 10L, OPEN_KEY, OPEN_KEY, true);
244     policy = createPolicy(conf, defaultSplitSize, defaultSplitCount, 6, false);
245     verifyCompaction(policy, si, si.getStorefiles(), true, 6, 5L, OPEN_KEY, OPEN_KEY, true);
246   }
247 
248   @Test
249   public void testExistingStripesFromL0() throws Exception {
250     Configuration conf = HBaseConfiguration.create();
251     conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 3);
252     StripeCompactionPolicy.StripeInformationProvider si = createStripes(3, KEY_A);
253     verifyCompaction(
254         createPolicy(conf), si, si.getLevel0Files(), null, null, si.getStripeBoundaries());
255   }
256 
257   @Test
258   public void testNothingToCompactFromL0() throws Exception {
259     Configuration conf = HBaseConfiguration.create();
260     conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 4);
261     StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 10);
262     StripeCompactionPolicy policy = createPolicy(conf);
263     verifyNoCompaction(policy, si);
264 
265     si = createStripes(3, KEY_A);
266     verifyNoCompaction(policy, si);
267   }
268 
269   @Test
270   public void testSplitOffStripe() throws Exception {
271     Configuration conf = HBaseConfiguration.create();
272     // First test everything with default split count of 2, then split into more.
273     conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
274     Long[] toSplit = new Long[] { defaultSplitSize - 2, 1L, 1L };
275     Long[] noSplit = new Long[] { defaultSplitSize - 2, 1L };
276     long splitTargetSize = (long)(defaultSplitSize / defaultSplitCount);
277     // Don't split if not eligible for compaction.
278     StripeCompactionPolicy.StripeInformationProvider si =
279         createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 2L });
280     assertNull(createPolicy(conf).selectCompaction(si, al(), false));
281     // Make sure everything is eligible.
282     conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 500f);
283     StripeCompactionPolicy policy = createPolicy(conf);
284     verifyWholeStripesCompaction(policy, si, 0, 0, null, 2, splitTargetSize);
285     // Add some extra stripes...
286     si = createStripesWithSizes(0, 0, noSplit, noSplit, toSplit);
287     verifyWholeStripesCompaction(policy, si, 2, 2, null, 2, splitTargetSize);
288     // In the middle.
289     si = createStripesWithSizes(0, 0, noSplit, toSplit, noSplit);
290     verifyWholeStripesCompaction(policy, si, 1, 1, null, 2, splitTargetSize);
291     // No split-off with different config (larger split size).
292     // However, in this case some eligible stripe will just be compacted alone.
293     StripeCompactionPolicy specPolicy = createPolicy(
294         conf, defaultSplitSize + 1, defaultSplitCount, defaultInitialCount, false);
295     verifySingleStripeCompaction(specPolicy, si, 1, null);
296   }
297 
298   @Test
299   public void testSplitOffStripeOffPeak() throws Exception {
300     // for HBASE-11439
301     Configuration conf = HBaseConfiguration.create();
302     conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
303     // Select the last 2 files.
304     StripeCompactionPolicy.StripeInformationProvider si =
305         createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 1L, 1L });
306     assertEquals(2, createPolicy(conf).selectCompaction(si, al(), false).getRequest().getFiles()
307         .size());
308     // Make sure everything is eligible in offpeak.
309     conf.setFloat("hbase.hstore.compaction.ratio.offpeak", 500f);
310     assertEquals(3, createPolicy(conf).selectCompaction(si, al(), true).getRequest().getFiles()
311         .size());
312   }
313 
314   @Test
315   public void testSplitOffStripeDropDeletes() throws Exception {
316     Configuration conf = HBaseConfiguration.create();
317     conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
318     StripeCompactionPolicy policy = createPolicy(conf);
319     Long[] toSplit = new Long[] { defaultSplitSize / 2, defaultSplitSize / 2 };
320     Long[] noSplit = new Long[] { 1L };
321     long splitTargetSize = (long)(defaultSplitSize / defaultSplitCount);
322 
323     // Verify the deletes can be dropped if there are no L0 files.
324     StripeCompactionPolicy.StripeInformationProvider si =
325         createStripesWithSizes(0, 0, noSplit, toSplit);
326     verifyWholeStripesCompaction(policy, si, 1, 1,    true, null, splitTargetSize);
327     // But cannot be dropped if there are.
328     si = createStripesWithSizes(2, 2, noSplit, toSplit);
329     verifyWholeStripesCompaction(policy, si, 1, 1,    false, null, splitTargetSize);
330   }
331 
332   @SuppressWarnings("unchecked")
333   @Test
334   public void testMergeExpiredFiles() throws Exception {
335     ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
336     long now = defaultTtl + 2;
337     edge.setValue(now);
338     EnvironmentEdgeManager.injectEdge(edge);
339     try {
340       StoreFile expiredFile = createFile(), notExpiredFile = createFile();
341       when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
342       when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
343       List<StoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
344       List<StoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile);
345       List<StoreFile> mixed = Lists.newArrayList(expiredFile, notExpiredFile);
346 
347       StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create(),
348           defaultSplitSize, defaultSplitCount, defaultInitialCount, true);
349       // Merge expired if there are eligible stripes.
350       StripeCompactionPolicy.StripeInformationProvider si =
351           createStripesWithFiles(expired, expired, expired);
352       verifyWholeStripesCompaction(policy, si, 0, 2, null, 1, Long.MAX_VALUE, false);
353       // Don't merge if nothing expired.
354       si = createStripesWithFiles(notExpired, notExpired, notExpired);
355       assertNull(policy.selectCompaction(si, al(), false));
356       // Merge one expired stripe with next.
357       si = createStripesWithFiles(notExpired, expired, notExpired);
358       verifyWholeStripesCompaction(policy, si, 1, 2, null, 1, Long.MAX_VALUE, false);
359       // Merge the biggest run out of multiple options.
360       // Merge one expired stripe with next.
361       si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired);
362       verifyWholeStripesCompaction(policy, si, 3, 4, null, 1, Long.MAX_VALUE, false);
363       // Stripe with a subset of expired files is not merged.
364       si = createStripesWithFiles(expired, expired, notExpired, expired, mixed);
365       verifyWholeStripesCompaction(policy, si, 0, 1, null, 1, Long.MAX_VALUE, false);
366     } finally {
367       EnvironmentEdgeManager.reset();
368     }
369   }
370 
371   @SuppressWarnings("unchecked")
372   @Test
373   public void testMergeExpiredStripes() throws Exception {
374     // HBASE-11397
375     ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
376     long now = defaultTtl + 2;
377     edge.setValue(now);
378     EnvironmentEdgeManager.injectEdge(edge);
379     try {
380       StoreFile expiredFile = createFile(), notExpiredFile = createFile();
381       when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
382       when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
383       List<StoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
384       List<StoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile);
385 
386       StripeCompactionPolicy policy =
387           createPolicy(HBaseConfiguration.create(), defaultSplitSize, defaultSplitCount,
388             defaultInitialCount, true);
389 
390       // Merge all three expired stripes into one.
391       StripeCompactionPolicy.StripeInformationProvider si =
392           createStripesWithFiles(expired, expired, expired);
393       verifyMergeCompatcion(policy, si, 0, 2);
394 
395       // Merge two adjacent expired stripes into one.
396       si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired);
397       verifyMergeCompatcion(policy, si, 3, 4);
398     } finally {
399       EnvironmentEdgeManager.reset();
400     }
401   }
402 
403   @SuppressWarnings("unchecked")
404   private static StripeCompactionPolicy.StripeInformationProvider createStripesWithFiles(
405       List<StoreFile>... stripeFiles) throws Exception {
406     return createStripesWithFiles(createBoundaries(stripeFiles.length),
407         Lists.newArrayList(stripeFiles), new ArrayList<StoreFile>());
408   }
409 
410   @Test
411   public void testSingleStripeDropDeletes() throws Exception {
412     Configuration conf = HBaseConfiguration.create();
413     StripeCompactionPolicy policy = createPolicy(conf);
414     // Verify the deletes can be dropped if there are no L0 files.
415     Long[][] stripes = new Long[][] { new Long[] { 3L, 2L, 2L, 2L }, new Long[] { 6L } };
416     StripeInformationProvider si = createStripesWithSizes(0, 0, stripes);
417     verifySingleStripeCompaction(policy, si, 0, true);
418     // But cannot be dropped if there are.
419     si = createStripesWithSizes(2, 2, stripes);
420     verifySingleStripeCompaction(policy, si, 0, false);
421     // Unless there are enough to cause L0 compaction.
422     si = createStripesWithSizes(6, 2, stripes);
423     ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
424     sfs.addSublist(si.getLevel0Files());
425     sfs.addSublist(si.getStripes().get(0));
426     verifyCompaction(
427         policy, si, sfs, si.getStartRow(0), si.getEndRow(0), si.getStripeBoundaries());
428     // If we cannot actually compact all files in some stripe, L0 is chosen.
429     si = createStripesWithSizes(6, 2,
430         new Long[][] { new Long[] { 10L, 1L, 1L, 1L, 1L }, new Long[] { 12L } });
431     verifyCompaction(policy, si, si.getLevel0Files(), null, null, si.getStripeBoundaries());
432     // even if L0 has no file
433     // if all files of stripe aren't selected, delete must not be dropped.
434     stripes = new Long[][] { new Long[] { 100L, 3L, 2L, 2L, 2L }, new Long[] { 6L } };
435     si = createStripesWithSizes(0, 0, stripes);
436     List<StoreFile> compact_file = new ArrayList<StoreFile>();
437     Iterator<StoreFile> iter = si.getStripes().get(0).listIterator(1);
438     while (iter.hasNext()) {
439         compact_file.add(iter.next());
440     }
441     verifyCompaction(policy, si, compact_file, false, 1, null, si.getStartRow(0), si.getEndRow(0), true);
442   }
443 
444   /********* HELPER METHODS ************/
445   private static StripeCompactionPolicy createPolicy(
446       Configuration conf) throws Exception {
447     return createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, false);
448   }
449 
450   private static StripeCompactionPolicy createPolicy(Configuration conf,
451       long splitSize, float splitCount, int initialCount, boolean hasTtl) throws Exception {
452     conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize);
453     conf.setFloat(StripeStoreConfig.SPLIT_PARTS_KEY, splitCount);
454     conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialCount);
455     StoreConfigInformation sci = mock(StoreConfigInformation.class);
456     when(sci.getStoreFileTtl()).thenReturn(hasTtl ? defaultTtl : Long.MAX_VALUE);
457     StripeStoreConfig ssc = new StripeStoreConfig(conf, sci);
458     return new StripeCompactionPolicy(conf, sci, ssc);
459   }
460 
461   private static ArrayList<StoreFile> al(StoreFile... sfs) {
462     return new ArrayList<StoreFile>(Arrays.asList(sfs));
463   }
464 
465   private void verifyMergeCompatcion(StripeCompactionPolicy policy, StripeInformationProvider si,
466       int from, int to) throws Exception {
467     StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
468     Collection<StoreFile> sfs = getAllFiles(si, from, to);
469     verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
470 
471     // All the Stripes are expired, so the Compactor will not create any Writers. We need to create
472     // an empty file to preserve metadata
473     StripeCompactor sc = createCompactor();
474     List<Path> paths = scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
475     assertEquals(1, paths.size());
476   }
477 
478   /**
479    * Verify the compaction that includes several entire stripes.
480    * @param policy Policy to test.
481    * @param si Stripe information pre-set with stripes to test.
482    * @param from Starting stripe.
483    * @param to Ending stripe (inclusive).
484    * @param dropDeletes Whether to drop deletes from compaction range.
485    * @param count Expected # of resulting stripes, null if not checked.
486    * @param size Expected target stripe size, null if not checked.
487    */
488   private void verifyWholeStripesCompaction(StripeCompactionPolicy policy,
489       StripeInformationProvider si, int from, int to, Boolean dropDeletes,
490       Integer count, Long size, boolean needsCompaction) throws IOException {
491     verifyCompaction(policy, si, getAllFiles(si, from, to), dropDeletes,
492         count, size, si.getStartRow(from), si.getEndRow(to), needsCompaction);
493   }
494 
495   private void verifyWholeStripesCompaction(StripeCompactionPolicy policy,
496       StripeInformationProvider si, int from, int to, Boolean dropDeletes,
497       Integer count, Long size) throws IOException {
498     verifyWholeStripesCompaction(policy, si, from, to, dropDeletes, count, size, true);
499   }
500 
501   private void verifySingleStripeCompaction(StripeCompactionPolicy policy,
502       StripeInformationProvider si, int index, Boolean dropDeletes) throws IOException {
503     verifyWholeStripesCompaction(policy, si, index, index, dropDeletes, 1, null, true);
504   }
505 
506   /**
507    * Verify no compaction is needed or selected.
508    * @param policy Policy to test.
509    * @param si Stripe information pre-set with stripes to test.
510    */
511   private void verifyNoCompaction(
512       StripeCompactionPolicy policy, StripeInformationProvider si) throws IOException {
513     assertNull(policy.selectCompaction(si, al(), false));
514     assertFalse(policy.needsCompactions(si, al()));
515   }
516 
517   /**
518    * Verify arbitrary compaction.
519    * @param policy Policy to test.
520    * @param si Stripe information pre-set with stripes to test.
521    * @param sfs Files that should be compacted.
522    * @param dropDeletesFrom Row from which to drop deletes.
523    * @param dropDeletesTo Row to which to drop deletes.
524    * @param boundaries Expected target stripe boundaries.
525    */
526   private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si,
527       Collection<StoreFile> sfs, byte[] dropDeletesFrom, byte[] dropDeletesTo,
528       final List<byte[]> boundaries) throws Exception {
529     StripeCompactor sc = mock(StripeCompactor.class);
530     assertTrue(policy.needsCompactions(si, al()));
531     StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
532     verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
533     scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
534     verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher<List<byte[]>>() {
535       @Override
536       public boolean matches(Object argument) {
537         @SuppressWarnings("unchecked")
538         List<byte[]> other = (List<byte[]>) argument;
539         if (other.size() != boundaries.size()) return false;
540         for (int i = 0; i < other.size(); ++i) {
541           if (!Bytes.equals(other.get(i), boundaries.get(i))) return false;
542         }
543         return true;
544       }
545     }), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom),
546       dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo),
547       any(NoLimitCompactionThroughputController.class), any(User.class));
548   }
549 
550   /**
551    * Verify arbitrary compaction.
552    * @param policy Policy to test.
553    * @param si Stripe information pre-set with stripes to test.
554    * @param sfs Files that should be compacted.
555    * @param dropDeletes Whether to drop deletes from compaction range.
556    * @param count Expected # of resulting stripes, null if not checked.
557    * @param size Expected target stripe size, null if not checked.
558    * @param start Left boundary of the compaction.
559    * @param righr Right boundary of the compaction.
560    */
561   private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si,
562       Collection<StoreFile> sfs, Boolean dropDeletes, Integer count, Long size,
563       byte[] start, byte[] end, boolean needsCompaction) throws IOException {
564     StripeCompactor sc = mock(StripeCompactor.class);
565     assertTrue(!needsCompaction || policy.needsCompactions(si, al()));
566     StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
567     verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
568     scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
569     verify(sc, times(1)).compact(eq(scr.getRequest()),
570       count == null ? anyInt() : eq(count.intValue()),
571       size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end),
572       dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end),
573       any(NoLimitCompactionThroughputController.class), any(User.class));
574   }
575 
576   /** Verify arbitrary flush. */
577   protected void verifyFlush(StripeCompactionPolicy policy, StripeInformationProvider si,
578       KeyValue[] input, KeyValue[][] expected, byte[][] boundaries) throws IOException {
579     StoreFileWritersCapture writers = new StoreFileWritersCapture();
580     StripeStoreFlusher.StripeFlushRequest req = policy.selectFlush(new KVComparator(), si,
581       input.length);
582     StripeMultiFileWriter mw = req.createWriter();
583     mw.init(null, writers);
584     for (KeyValue kv : input) {
585       mw.append(kv);
586     }
587     boolean hasMetadata = boundaries != null;
588     mw.commitWriters(0, false);
589     writers.verifyKvs(expected, true, hasMetadata);
590     if (hasMetadata) {
591       writers.verifyBoundaries(boundaries);
592     }
593   }
594 
595 
596   private byte[] dropDeletesMatcher(Boolean dropDeletes, byte[] value) {
597     return dropDeletes == null ? any(byte[].class)
598             : (dropDeletes.booleanValue() ? aryEq(value) : isNull(byte[].class));
599   }
600 
601   private void verifyCollectionsEqual(Collection<StoreFile> sfs, Collection<StoreFile> scr) {
602     // Dumb.
603     assertEquals(sfs.size(), scr.size());
604     assertTrue(scr.containsAll(sfs));
605   }
606 
607   private static List<StoreFile> getAllFiles(
608       StripeInformationProvider si, int fromStripe, int toStripe) {
609     ArrayList<StoreFile> expected = new ArrayList<StoreFile>();
610     for (int i = fromStripe; i <= toStripe; ++i) {
611       expected.addAll(si.getStripes().get(i));
612     }
613     return expected;
614   }
615 
616   /**
617    * @param l0Count Number of L0 files.
618    * @param boundaries Target boundaries.
619    * @return Mock stripes.
620    */
621   private static StripeInformationProvider createStripes(
622       int l0Count, byte[]... boundaries) throws Exception {
623     List<Long> l0Sizes = new ArrayList<Long>();
624     for (int i = 0; i < l0Count; ++i) {
625       l0Sizes.add(5L);
626     }
627     List<List<Long>> sizes = new ArrayList<List<Long>>();
628     for (int i = 0; i <= boundaries.length; ++i) {
629       sizes.add(Arrays.asList(Long.valueOf(5)));
630     }
631     return createStripes(Arrays.asList(boundaries), sizes, l0Sizes);
632   }
633 
634   /**
635    * @param l0Count Number of L0 files.
636    * @param l0Size Size of each file.
637    * @return Mock stripes.
638    */
639   private static StripeInformationProvider createStripesL0Only(
640       int l0Count, long l0Size) throws Exception {
641     List<Long> l0Sizes = new ArrayList<Long>();
642     for (int i = 0; i < l0Count; ++i) {
643       l0Sizes.add(l0Size);
644     }
645     return createStripes(null, new ArrayList<List<Long>>(), l0Sizes);
646   }
647 
648   /**
649    * @param l0Count Number of L0 files.
650    * @param l0Size Size of each file.
651    * @param sizes Sizes of the files; each sub-array representing a stripe.
652    * @return Mock stripes.
653    */
654   private static StripeInformationProvider createStripesWithSizes(
655       int l0Count, long l0Size, Long[]... sizes) throws Exception {
656     ArrayList<List<Long>> sizeList = new ArrayList<List<Long>>();
657     for (Long[] size : sizes) {
658       sizeList.add(Arrays.asList(size));
659     }
660     return createStripesWithSizes(l0Count, l0Size, sizeList);
661   }
662 
663   private static StripeInformationProvider createStripesWithSizes(
664       int l0Count, long l0Size, List<List<Long>> sizes) throws Exception {
665     List<byte[]> boundaries = createBoundaries(sizes.size());
666     List<Long> l0Sizes = new ArrayList<Long>();
667     for (int i = 0; i < l0Count; ++i) {
668       l0Sizes.add(l0Size);
669     }
670     return createStripes(boundaries, sizes, l0Sizes);
671   }
672 
673   private static List<byte[]> createBoundaries(int stripeCount) {
674     byte[][] keys = new byte[][] { KEY_A, KEY_B, KEY_C, KEY_D, KEY_E };
675     assert stripeCount <= keys.length + 1;
676     List<byte[]> boundaries = new ArrayList<byte[]>();
677     boundaries.addAll(Arrays.asList(keys).subList(0, stripeCount - 1));
678     return boundaries;
679   }
680 
681   private static StripeInformationProvider createStripes(List<byte[]> boundaries,
682       List<List<Long>> stripeSizes, List<Long> l0Sizes) throws Exception {
683     List<List<StoreFile>> stripeFiles = new ArrayList<List<StoreFile>>(stripeSizes.size());
684     for (List<Long> sizes : stripeSizes) {
685       List<StoreFile> sfs = new ArrayList<StoreFile>();
686       for (Long size : sizes) {
687         sfs.add(createFile(size));
688       }
689       stripeFiles.add(sfs);
690     }
691     List<StoreFile> l0Files = new ArrayList<StoreFile>();
692     for (Long size : l0Sizes) {
693       l0Files.add(createFile(size));
694     }
695     return createStripesWithFiles(boundaries, stripeFiles, l0Files);
696   }
697 
698   /**
699    * This method actually does all the work.
700    */
701   private static StripeInformationProvider createStripesWithFiles(List<byte[]> boundaries,
702       List<List<StoreFile>> stripeFiles, List<StoreFile> l0Files) throws Exception {
703     ArrayList<ImmutableList<StoreFile>> stripes = new ArrayList<ImmutableList<StoreFile>>();
704     ArrayList<byte[]> boundariesList = new ArrayList<byte[]>();
705     StripeInformationProvider si = mock(StripeInformationProvider.class);
706     if (!stripeFiles.isEmpty()) {
707       assert stripeFiles.size() == (boundaries.size() + 1);
708       boundariesList.add(OPEN_KEY);
709       for (int i = 0; i <= boundaries.size(); ++i) {
710         byte[] startKey = ((i == 0) ? OPEN_KEY : boundaries.get(i - 1));
711         byte[] endKey = ((i == boundaries.size()) ? OPEN_KEY : boundaries.get(i));
712         boundariesList.add(endKey);
713         for (StoreFile sf : stripeFiles.get(i)) {
714           setFileStripe(sf, startKey, endKey);
715         }
716         stripes.add(ImmutableList.copyOf(stripeFiles.get(i)));
717         when(si.getStartRow(eq(i))).thenReturn(startKey);
718         when(si.getEndRow(eq(i))).thenReturn(endKey);
719       }
720     }
721     ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
722     sfs.addAllSublists(stripes);
723     sfs.addSublist(l0Files);
724     when(si.getStorefiles()).thenReturn(sfs);
725     when(si.getStripes()).thenReturn(stripes);
726     when(si.getStripeBoundaries()).thenReturn(boundariesList);
727     when(si.getStripeCount()).thenReturn(stripes.size());
728     when(si.getLevel0Files()).thenReturn(l0Files);
729     return si;
730   }
731 
732   private static StoreFile createFile(long size) throws Exception {
733     StoreFile sf = mock(StoreFile.class);
734     when(sf.getPath()).thenReturn(new Path("moo"));
735     StoreFile.Reader r = mock(StoreFile.Reader.class);
736     when(r.getEntries()).thenReturn(size);
737     when(r.length()).thenReturn(size);
738     when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
739     when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
740     when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenReturn(
741       mock(StoreFileScanner.class));
742     when(sf.getReader()).thenReturn(r);
743     when(sf.createReader()).thenReturn(r);
744     when(sf.cloneForReader()).thenReturn(sf);
745     return sf;
746   }
747 
748   private static StoreFile createFile() throws Exception {
749     return createFile(0);
750   }
751 
752   private static void setFileStripe(StoreFile sf, byte[] startKey, byte[] endKey) {
753     when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_START_KEY)).thenReturn(startKey);
754     when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_END_KEY)).thenReturn(endKey);
755   }
756 
757   private StripeCompactor createCompactor() throws Exception {
758     HColumnDescriptor col = new HColumnDescriptor(Bytes.toBytes("foo"));
759     StoreFileWritersCapture writers = new StoreFileWritersCapture();
760     Store store = mock(Store.class);
761     HRegionInfo info = mock(HRegionInfo.class);
762     when(info.getRegionNameAsString()).thenReturn("testRegion");
763     when(store.getFamily()).thenReturn(col);
764     when(store.getRegionInfo()).thenReturn(info);
765     when(
766       store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), anyBoolean(),
767         anyBoolean(), anyBoolean())).thenAnswer(writers);
768 
769     Configuration conf = HBaseConfiguration.create();
770     conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders);
771     final Scanner scanner = new Scanner();
772     return new StripeCompactor(conf, store) {
773       @Override
774       protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
775           long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
776           byte[] dropDeletesToRow) throws IOException {
777         return scanner;
778       }
779 
780       @Override
781       protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
782           ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
783         return scanner;
784       }
785     };
786   }
787 
788   private static class Scanner implements InternalScanner {
789     private final ArrayList<KeyValue> kvs;
790 
791     public Scanner(KeyValue... kvs) {
792       this.kvs = new ArrayList<KeyValue>(Arrays.asList(kvs));
793     }
794 
795     @Override
796     public boolean next(List<Cell> results) throws IOException {
797       if (kvs.isEmpty()) return false;
798       results.add(kvs.remove(0));
799       return !kvs.isEmpty();
800     }
801 
802     @Override
803     public boolean next(List<Cell> result, ScannerContext scannerContext)
804         throws IOException {
805       return next(result);
806     }
807 
808     @Override
809     public void close() throws IOException {
810     }
811   }
812 }