View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.compactions;
20  
21  import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.List;
27  
28  import com.google.common.collect.ImmutableList;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.hbase.KeyValue.KVComparator;
35  import org.apache.hadoop.hbase.classification.InterfaceAudience;
36  import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
37  import org.apache.hadoop.hbase.regionserver.StoreFile;
38  import org.apache.hadoop.hbase.regionserver.StoreUtils;
39  import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
40  import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
41  import org.apache.hadoop.hbase.security.User;
42  import org.apache.hadoop.hbase.util.Bytes;
43  import org.apache.hadoop.hbase.util.ConcatenatedLists;
44  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
45  import org.apache.hadoop.hbase.util.Pair;
46  
47  /**
48   * Stripe store implementation of compaction policy.
49   */
50  @InterfaceAudience.Private
51  public class StripeCompactionPolicy extends CompactionPolicy {
52    private final static Log LOG = LogFactory.getLog(StripeCompactionPolicy.class);
53    // Policy used to compact individual stripes.
54    private ExploringCompactionPolicy stripePolicy = null;
55  
56    private StripeStoreConfig config;
57  
58    public StripeCompactionPolicy(
59        Configuration conf, StoreConfigInformation storeConfigInfo, StripeStoreConfig config) {
60      super(conf, storeConfigInfo);
61      this.config = config;
62      stripePolicy = new ExploringCompactionPolicy(conf, storeConfigInfo);
63    }
64  
65    public List<StoreFile> preSelectFilesForCoprocessor(StripeInformationProvider si,
66        List<StoreFile> filesCompacting) {
67      // We sincerely hope nobody is messing with us with their coprocessors.
68      // If they do, they are very likely to shoot themselves in the foot.
69      // We'll just exclude all the filesCompacting from the list.
70      ArrayList<StoreFile> candidateFiles = new ArrayList<StoreFile>(si.getStorefiles());
71      candidateFiles.removeAll(filesCompacting);
72      return candidateFiles;
73    }
74  
75    public StripeCompactionRequest createEmptyRequest(
76        StripeInformationProvider si, CompactionRequest request) {
77      // Treat as L0-ish compaction with fixed set of files, and hope for the best.
78      if (si.getStripeCount() > 0) {
79        return new BoundaryStripeCompactionRequest(request, si.getStripeBoundaries());
80      }
81      Pair<Long, Integer> targetKvsAndCount = estimateTargetKvs(
82          request.getFiles(), this.config.getInitialCount());
83      return new SplitStripeCompactionRequest(
84          request, OPEN_KEY, OPEN_KEY, targetKvsAndCount.getSecond(), targetKvsAndCount.getFirst());
85    }
86  
87    public StripeStoreFlusher.StripeFlushRequest selectFlush(KVComparator comparator,
88        StripeInformationProvider si, int kvCount) {
89      if (this.config.isUsingL0Flush()) {
90        // L0 is used, return dumb request.
91        return new StripeStoreFlusher.StripeFlushRequest(comparator);
92      }
93      if (si.getStripeCount() == 0) {
94        // No stripes - start with the requisite count, derive KVs per stripe.
95        int initialCount = this.config.getInitialCount();
96        return new StripeStoreFlusher.SizeStripeFlushRequest(comparator, initialCount,
97            kvCount / initialCount);
98      }
99      // There are stripes - do according to the boundaries.
100     return new StripeStoreFlusher.BoundaryStripeFlushRequest(comparator, si.getStripeBoundaries());
101   }
102 
103   public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
104       List<StoreFile> filesCompacting, boolean isOffpeak) throws IOException {
105     // TODO: first cut - no parallel compactions. To have more fine grained control we
106     //       probably need structure more sophisticated than a list.
107     if (!filesCompacting.isEmpty()) {
108       LOG.debug("Not selecting compaction: " + filesCompacting.size() + " files compacting");
109       return null;
110     }
111 
112     // We are going to do variations of compaction in strict order of preference.
113     // A better/more advanced approach is to use a heuristic to see which one is "more
114     // necessary" at current time.
115 
116     // This can happen due to region split. We can skip it later; for now preserve
117     // compact-all-things behavior.
118     Collection<StoreFile> allFiles = si.getStorefiles();
119     if (StoreUtils.hasReferences(allFiles)) {
120       LOG.debug("There are references in the store; compacting all files");
121       long targetKvs = estimateTargetKvs(allFiles, config.getInitialCount()).getFirst();
122       SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
123           allFiles, OPEN_KEY, OPEN_KEY, targetKvs);
124       request.setMajorRangeFull();
125       return request;
126     }
127 
128     int stripeCount = si.getStripeCount();
129     List<StoreFile> l0Files = si.getLevel0Files();
130 
131     // See if we need to make new stripes.
132     boolean shouldCompactL0 = (this.config.getLevel0MinFiles() <= l0Files.size());
133     if (stripeCount == 0) {
134       if (!shouldCompactL0) return null; // nothing to do.
135       return selectNewStripesCompaction(si);
136     }
137 
138     boolean canDropDeletesNoL0 = l0Files.size() == 0;
139     if (shouldCompactL0) {
140       if (!canDropDeletesNoL0) {
141         // If we need to compact L0, see if we can add something to it, and drop deletes.
142         StripeCompactionRequest result = selectSingleStripeCompaction(
143             si, true, canDropDeletesNoL0, isOffpeak);
144         if (result != null) return result;
145       }
146       LOG.debug("Selecting L0 compaction with " + l0Files.size() + " files");
147       return new BoundaryStripeCompactionRequest(l0Files, si.getStripeBoundaries());
148     }
149 
150     // Try to delete fully expired stripes
151     StripeCompactionRequest result = selectExpiredMergeCompaction(si, canDropDeletesNoL0);
152     if (result != null) return result;
153 
154     // Ok, nothing special here, let's see if we need to do a common compaction.
155     // This will also split the stripes that are too big if needed.
156     return selectSingleStripeCompaction(si, false, canDropDeletesNoL0, isOffpeak);
157   }
158 
159   public boolean needsCompactions(StripeInformationProvider si, List<StoreFile> filesCompacting) {
160     // Approximation on whether we need compaction.
161     return filesCompacting.isEmpty()
162         && (StoreUtils.hasReferences(si.getStorefiles())
163           || (si.getLevel0Files().size() >= this.config.getLevel0MinFiles())
164           || needsSingleStripeCompaction(si));
165   }
166 
167   @Override
168   public boolean shouldPerformMajorCompaction(Collection<StoreFile> filesToCompact)
169     throws IOException {
170     return false; // there's never a major compaction!
171   }
172 
173   @Override
174   public boolean throttleCompaction(long compactionSize) {
175     return compactionSize > comConf.getThrottlePoint();
176   }
177 
178   /**
179    * @param si StoreFileManager.
180    * @return Whether any stripe potentially needs compaction.
181    */
182   protected boolean needsSingleStripeCompaction(StripeInformationProvider si) {
183     int minFiles = this.config.getStripeCompactMinFiles();
184     for (List<StoreFile> stripe : si.getStripes()) {
185       if (stripe.size() >= minFiles) return true;
186     }
187     return false;
188   }
189 
190   protected StripeCompactionRequest selectSingleStripeCompaction(StripeInformationProvider si,
191       boolean includeL0, boolean canDropDeletesWithoutL0, boolean isOffpeak) throws IOException {
192     ArrayList<ImmutableList<StoreFile>> stripes = si.getStripes();
193 
194     int bqIndex = -1;
195     List<StoreFile> bqSelection = null;
196     int stripeCount = stripes.size();
197     long bqTotalSize = -1;
198     for (int i = 0; i < stripeCount; ++i) {
199       // If we want to compact L0 to drop deletes, we only want whole-stripe compactions.
200       // So, pass includeL0 as 2nd parameter to indicate that.
201       List<StoreFile> selection = selectSimpleCompaction(stripes.get(i),
202           !canDropDeletesWithoutL0 && includeL0, isOffpeak);
203       if (selection.isEmpty()) continue;
204       long size = 0;
205       for (StoreFile sf : selection) {
206         size += sf.getReader().length();
207       }
208       if (bqSelection == null || selection.size() > bqSelection.size() ||
209           (selection.size() == bqSelection.size() && size < bqTotalSize)) {
210         bqSelection = selection;
211         bqIndex = i;
212         bqTotalSize = size;
213       }
214     }
215     if (bqSelection == null) {
216       LOG.debug("No good compaction is possible in any stripe");
217       return null;
218     }
219     List<StoreFile> filesToCompact = new ArrayList<StoreFile>(bqSelection);
220     // See if we can, and need to, split this stripe.
221     int targetCount = 1;
222     long targetKvs = Long.MAX_VALUE;
223     boolean hasAllFiles = filesToCompact.size() == stripes.get(bqIndex).size();
224     String splitString = "";
225     if (hasAllFiles && bqTotalSize >= config.getSplitSize()) {
226       if (includeL0) {
227         // We want to avoid the scenario where we compact a stripe w/L0 and then split it.
228         // So, if we might split, don't compact the stripe with L0.
229         return null;
230       }
231       Pair<Long, Integer> kvsAndCount = estimateTargetKvs(filesToCompact, config.getSplitCount());
232       targetKvs = kvsAndCount.getFirst();
233       targetCount = kvsAndCount.getSecond();
234       splitString = "; the stripe will be split into at most "
235           + targetCount + " stripes with " + targetKvs + " target KVs";
236     }
237 
238     LOG.debug("Found compaction in a stripe with end key ["
239         + Bytes.toString(si.getEndRow(bqIndex)) + "], with "
240         + filesToCompact.size() + " files of total size " + bqTotalSize + splitString);
241 
242     // See if we can drop deletes.
243     StripeCompactionRequest req;
244     if (includeL0) {
245       assert hasAllFiles;
246       List<StoreFile> l0Files = si.getLevel0Files();
247       LOG.debug("Adding " + l0Files.size() + " files to compaction to be able to drop deletes");
248       ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
249       sfs.addSublist(filesToCompact);
250       sfs.addSublist(l0Files);
251       req = new BoundaryStripeCompactionRequest(sfs, si.getStripeBoundaries());
252     } else {
253       req = new SplitStripeCompactionRequest(
254           filesToCompact, si.getStartRow(bqIndex), si.getEndRow(bqIndex), targetCount, targetKvs);
255     }
256     if (hasAllFiles && (canDropDeletesWithoutL0 || includeL0)) {
257       req.setMajorRange(si.getStartRow(bqIndex), si.getEndRow(bqIndex));
258     }
259     req.getRequest().setOffPeak(isOffpeak);
260     return req;
261   }
262 
263   /**
264    * Selects the compaction of a single stripe using default policy.
265    * @param sfs Files.
266    * @param allFilesOnly Whether a compaction of all-or-none files is needed.
267    * @return The resulting selection.
268    */
269   private List<StoreFile> selectSimpleCompaction(
270       List<StoreFile> sfs, boolean allFilesOnly, boolean isOffpeak) {
271     int minFilesLocal = Math.max(
272         allFilesOnly ? sfs.size() : 0, this.config.getStripeCompactMinFiles());
273     int maxFilesLocal = Math.max(this.config.getStripeCompactMaxFiles(), minFilesLocal);
274     return stripePolicy.applyCompactionPolicy(sfs, false, isOffpeak, minFilesLocal, maxFilesLocal);
275   }
276 
277   /**
278    * Selects the compaction that compacts all files (to be removed later).
279    * @param si StoreFileManager.
280    * @param targetStripeCount Target stripe count.
281    * @param targetSize Target stripe size.
282    * @return The compaction.
283    */
284   private StripeCompactionRequest selectCompactionOfAllFiles(StripeInformationProvider si,
285       int targetStripeCount, long targetSize) {
286     Collection<StoreFile> allFiles = si.getStorefiles();
287     SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
288         allFiles, OPEN_KEY, OPEN_KEY, targetStripeCount, targetSize);
289     request.setMajorRangeFull();
290     LOG.debug("Selecting a compaction that includes all " + allFiles.size() + " files");
291     return request;
292   }
293 
294   private StripeCompactionRequest selectNewStripesCompaction(StripeInformationProvider si) {
295     List<StoreFile> l0Files = si.getLevel0Files();
296     Pair<Long, Integer> kvsAndCount = estimateTargetKvs(l0Files, config.getInitialCount());
297     LOG.debug("Creating " + kvsAndCount.getSecond() + " initial stripes with "
298         + kvsAndCount.getFirst() + " kvs each via L0 compaction of " + l0Files.size() + " files");
299     SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
300         si.getLevel0Files(), OPEN_KEY, OPEN_KEY, kvsAndCount.getSecond(), kvsAndCount.getFirst());
301     request.setMajorRangeFull(); // L0 only, can drop deletes.
302     return request;
303   }
304 
305   private StripeCompactionRequest selectExpiredMergeCompaction(
306       StripeInformationProvider si, boolean canDropDeletesNoL0) {
307     long cfTtl = this.storeConfigInfo.getStoreFileTtl();
308     if (cfTtl == Long.MAX_VALUE) {
309       return null; // minversion might be set, cannot delete old files
310     }
311     long timestampCutoff = EnvironmentEdgeManager.currentTime() - cfTtl;
312     // Merge the longest sequence of stripes where all files have expired, if any.
313     int start = -1, bestStart = -1, length = 0, bestLength = 0;
314     ArrayList<ImmutableList<StoreFile>> stripes = si.getStripes();
315     OUTER: for (int i = 0; i < stripes.size(); ++i) {
316       for (StoreFile storeFile : stripes.get(i)) {
317         if (storeFile.getReader().getMaxTimestamp() < timestampCutoff) continue;
318         // Found non-expired file, this stripe has to stay.
319         if (length > bestLength) {
320           bestStart = start;
321           bestLength = length;
322         }
323         start = -1;
324         length = 0;
325         continue OUTER;
326       }
327       if (start == -1) {
328         start = i;
329       }
330       ++length;
331     }
332     if (length > bestLength) {
333       bestStart = start;
334       bestLength = length;
335     }
336     if (bestLength == 0) return null;
337     if (bestLength == 1) {
338       // This is currently inefficient. If only one stripe expired, we will rewrite some
339       // entire stripe just to delete some expired files because we rely on metadata and it
340       // cannot simply be updated in an old file. When we either determine stripe dynamically
341       // or move metadata to manifest, we can just drop the "expired stripes".
342       if (bestStart == (stripes.size() - 1)) return null;
343       ++bestLength;
344     }
345     LOG.debug("Merging " + bestLength + " stripes to delete expired store files");
346     int endIndex = bestStart + bestLength - 1;
347     ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
348     sfs.addAllSublists(stripes.subList(bestStart, endIndex + 1));
349     SplitStripeCompactionRequest result = new SplitStripeCompactionRequest(sfs,
350         si.getStartRow(bestStart), si.getEndRow(endIndex), 1, Long.MAX_VALUE);
351     if (canDropDeletesNoL0) {
352       result.setMajorRangeFull();
353     }
354     return result;
355   }
356 
357   private static long getTotalKvCount(final Collection<StoreFile> candidates) {
358     long totalSize = 0;
359     for (StoreFile storeFile : candidates) {
360       totalSize += storeFile.getReader().getEntries();
361     }
362     return totalSize;
363   }
364 
365   public static long getTotalFileSize(final Collection<StoreFile> candidates) {
366     long totalSize = 0;
367     for (StoreFile storeFile : candidates) {
368       totalSize += storeFile.getReader().length();
369     }
370     return totalSize;
371   }
372 
373   private Pair<Long, Integer> estimateTargetKvs(Collection<StoreFile> files, double splitCount) {
374     // If the size is larger than what we target, we don't want to split into proportionally
375     // larger parts and then have to split again very soon. So, we will increase the multiplier
376     // by one until we get small enough parts. E.g. 5Gb stripe that should have been split into
377     // 2 parts when it was 3Gb will be split into 3x1.67Gb parts, rather than 2x2.5Gb parts.
378     long totalSize = getTotalFileSize(files);
379     long targetPartSize = config.getSplitPartSize();
380     assert targetPartSize > 0 && splitCount > 0;
381     double ratio = totalSize / (splitCount * targetPartSize); // ratio of real to desired size
382     while (ratio > 1.0) {
383       // Ratio of real to desired size if we increase the multiplier.
384       double newRatio = totalSize / ((splitCount + 1.0) * targetPartSize);
385       if ((1.0 / newRatio) >= ratio) break; // New ratio is < 1.0, but further than the last one.
386       ratio = newRatio;
387       splitCount += 1.0;
388     }
389     long kvCount = (long)(getTotalKvCount(files) / splitCount);
390     return new Pair<Long, Integer>(kvCount, (int)Math.ceil(splitCount));
391   }
392 
393   /** Stripe compaction request wrapper. */
394   public abstract static class StripeCompactionRequest {
395     protected CompactionRequest request;
396     protected byte[] majorRangeFromRow = null, majorRangeToRow = null;
397 
398     public List<Path> execute(StripeCompactor compactor,
399       CompactionThroughputController throughputController) throws IOException {
400       return execute(compactor, throughputController, null);
401     }
402     /**
403      * Executes the request against compactor (essentially, just calls correct overload of
404      * compact method), to simulate more dynamic dispatch.
405      * @param compactor Compactor.
406      * @return result of compact(...)
407      */
408     public abstract List<Path> execute(StripeCompactor compactor,
409         CompactionThroughputController throughputController, User user) throws IOException;
410 
411     public StripeCompactionRequest(CompactionRequest request) {
412       this.request = request;
413     }
414 
415     /**
416      * Sets compaction "major range". Major range is the key range for which all
417      * the files are included, so they can be treated like major-compacted files.
418      * @param startRow Left boundary, inclusive.
419      * @param endRow Right boundary, exclusive.
420      */
421     public void setMajorRange(byte[] startRow, byte[] endRow) {
422       this.majorRangeFromRow = startRow;
423       this.majorRangeToRow = endRow;
424     }
425 
426     public CompactionRequest getRequest() {
427       return this.request;
428     }
429 
430     public void setRequest(CompactionRequest request) {
431       assert request != null;
432       this.request = request;
433       this.majorRangeFromRow = this.majorRangeToRow = null;
434     }
435   }
436 
437   /**
438    * Request for stripe compactor that will cause it to split the source files into several
439    * separate files at the provided boundaries.
440    */
441   private static class BoundaryStripeCompactionRequest extends StripeCompactionRequest {
442     private final List<byte[]> targetBoundaries;
443 
444     /**
445      * @param request Original request.
446      * @param targetBoundaries New files should be written with these boundaries.
447      */
448     public BoundaryStripeCompactionRequest(CompactionRequest request,
449         List<byte[]> targetBoundaries) {
450       super(request);
451       this.targetBoundaries = targetBoundaries;
452     }
453 
454     public BoundaryStripeCompactionRequest(Collection<StoreFile> files,
455         List<byte[]> targetBoundaries) {
456       this(new CompactionRequest(files), targetBoundaries);
457     }
458 
459     @Override
460     public List<Path> execute(StripeCompactor compactor,
461         CompactionThroughputController throughputController, User user) throws IOException {
462       return compactor.compact(this.request, this.targetBoundaries, this.majorRangeFromRow,
463         this.majorRangeToRow, throughputController, user);
464     }
465   }
466 
467   /**
468    * Request for stripe compactor that will cause it to split the source files into several
469    * separate files into based on key-value count, as well as file count limit.
470    * Most of the files will be roughly the same size. The last file may be smaller or larger
471    * depending on the interplay of the amount of data and maximum number of files allowed.
472    */
473   private static class SplitStripeCompactionRequest extends StripeCompactionRequest {
474     private final byte[] startRow, endRow;
475     private final int targetCount;
476     private final long targetKvs;
477 
478     /**
479      * @param request Original request.
480      * @param startRow Left boundary of the range to compact, inclusive.
481      * @param endRow Right boundary of the range to compact, exclusive.
482      * @param targetCount The maximum number of stripe to compact into.
483      * @param targetKvs The KV count of each segment. If targetKvs*targetCount is less than
484      *                  total number of kvs, all the overflow data goes into the last stripe.
485      */
486     public SplitStripeCompactionRequest(CompactionRequest request,
487         byte[] startRow, byte[] endRow, int targetCount, long targetKvs) {
488       super(request);
489       this.startRow = startRow;
490       this.endRow = endRow;
491       this.targetCount = targetCount;
492       this.targetKvs = targetKvs;
493     }
494 
495     public SplitStripeCompactionRequest(
496         CompactionRequest request, byte[] startRow, byte[] endRow, long targetKvs) {
497       this(request, startRow, endRow, Integer.MAX_VALUE, targetKvs);
498     }
499 
500     public SplitStripeCompactionRequest(
501         Collection<StoreFile> files, byte[] startRow, byte[] endRow, long targetKvs) {
502       this(files, startRow, endRow, Integer.MAX_VALUE, targetKvs);
503     }
504 
505     public SplitStripeCompactionRequest(Collection<StoreFile> files,
506         byte[] startRow, byte[] endRow, int targetCount, long targetKvs) {
507       this(new CompactionRequest(files), startRow, endRow, targetCount, targetKvs);
508     }
509 
510     @Override
511     public List<Path> execute(StripeCompactor compactor,
512         CompactionThroughputController throughputController, User user) throws IOException {
513       return compactor.compact(this.request, this.targetCount, this.targetKvs, this.startRow,
514         this.endRow, this.majorRangeFromRow, this.majorRangeToRow, throughputController, user);
515     }
516 
517     /** Set major range of the compaction to the entire compaction range.
518      * See {@link #setMajorRange(byte[], byte[])}. */
519     public void setMajorRangeFull() {
520       setMajorRange(this.startRow, this.endRow);
521     }
522   }
523 
524   /** The information about stripes that the policy needs to do its stuff */
525   public static interface StripeInformationProvider {
526     public Collection<StoreFile> getStorefiles();
527 
528     /**
529      * Gets the start row for a given stripe.
530      * @param stripeIndex Stripe index.
531      * @return Start row. May be an open key.
532      */
533     public byte[] getStartRow(int stripeIndex);
534 
535     /**
536      * Gets the end row for a given stripe.
537      * @param stripeIndex Stripe index.
538      * @return End row. May be an open key.
539      */
540     public byte[] getEndRow(int stripeIndex);
541 
542     /**
543      * @return Level 0 files.
544      */
545     public List<StoreFile> getLevel0Files();
546 
547     /**
548      * @return All stripe boundaries; including the open ones on both ends.
549      */
550     public List<byte[]> getStripeBoundaries();
551 
552     /**
553      * @return The stripes.
554      */
555     public ArrayList<ImmutableList<StoreFile>> getStripes();
556 
557     /**
558      * @return Stripe count.
559      */
560     public int getStripeCount();
561   }
562 }