View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.compactions;
20  
21  import com.google.common.annotations.VisibleForTesting;
22  import com.google.common.base.Predicate;
23  import com.google.common.collect.Iterables;
24  import com.google.common.collect.Iterators;
25  import com.google.common.collect.Lists;
26  import com.google.common.collect.PeekingIterator;
27  import com.google.common.math.LongMath;
28  
29  import java.io.IOException;
30  import java.util.ArrayList;
31  import java.util.Collection;
32  import java.util.Collections;
33  import java.util.List;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
41  import org.apache.hadoop.hbase.classification.InterfaceAudience;
42  import org.apache.hadoop.hbase.regionserver.RSRpcServices;
43  import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
44  import org.apache.hadoop.hbase.regionserver.StoreFile;
45  import org.apache.hadoop.hbase.regionserver.StoreUtils;
46  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
47  import org.apache.hadoop.hbase.util.Pair;
48  import org.apache.hadoop.hbase.util.ReflectionUtils;
49  
50  /**
51   * HBASE-15181 This is a simple implementation of date-based tiered compaction similar to
52   * Cassandra's for the following benefits:
53   * 1. Improve date-range-based scan by structuring store files in date-based tiered layout.
54   * 2. Reduce compaction overhead.
55   * 3. Improve TTL efficiency.
56   * Perfect fit for the use cases that:
57   * 1. has mostly date-based data write and scan and a focus on the most recent data.
58   * Out-of-order writes are handled gracefully. Time range overlapping among store files is
59   * tolerated and the performance impact is minimized. Configuration can be set at hbase-site
60   * or overridden at per-table or per-column-family level by hbase shell. Design spec is at
61   * https://docs.google.com/document/d/1_AmlNb2N8Us1xICsTeGDLKIqL6T-oHoRLZ323MG_uy8/
62   */
63  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
64  public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
65    private static final Log LOG = LogFactory.getLog(DateTieredCompactionPolicy.class);
66  
67    private RatioBasedCompactionPolicy compactionPolicyPerWindow;
68  
69    public DateTieredCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo)
70        throws IOException {
71      super(conf, storeConfigInfo);
72      try {
73        compactionPolicyPerWindow =
74        ReflectionUtils.instantiateWithCustomCtor(comConf.getCompactionPolicyForTieredWindow(),
75          new Class[] { Configuration.class, StoreConfigInformation.class }, new Object[] { conf,
76          storeConfigInfo });
77      } catch (Exception e) {
78        throw new IOException("Unable to load configured compaction policy '"
79            + comConf.getCompactionPolicyForTieredWindow() + "'", e);
80      }
81    }
82  
83    /**
84     * Heuristics for guessing whether we need minor compaction.
85     */
86    @Override
87    @VisibleForTesting
88    public boolean needsCompaction(final Collection<StoreFile> storeFiles,
89        final List<StoreFile> filesCompacting) {
90      ArrayList<StoreFile> candidates = new ArrayList<StoreFile>(storeFiles);
91      try {
92        return selectMinorCompaction(candidates, false, true) != null;
93      } catch (Exception e) {
94        LOG.error("Can not check for compaction: ", e);
95        return false;
96      }
97    }
98  
99    public boolean shouldPerformMajorCompaction(final Collection<StoreFile> filesToCompact)
100       throws IOException {
101     long mcTime = getNextMajorCompactTime(filesToCompact);
102     if (filesToCompact == null || mcTime == 0) {
103       return false;
104     }
105 
106     // TODO: Use better method for determining stamp of last major (HBASE-2990)
107     long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
108     long now = EnvironmentEdgeManager.currentTime();
109     if (lowTimestamp <= 0L || lowTimestamp >= (now - mcTime)) {
110       return false;
111     }
112 
113     long cfTTL = this.storeConfigInfo.getStoreFileTtl();
114     HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
115     long oldestToCompact = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now);
116     List<Long> boundaries = getCompactBoundariesForMajor(filesToCompact, oldestToCompact, now);
117     boolean[] filesInWindow = new boolean[boundaries.size()];
118 
119     for (StoreFile file: filesToCompact) {
120       Long minTimestamp = file.getMinimumTimestamp();
121       long oldest = (minTimestamp == null) ? (Long)Long.MIN_VALUE : now - minTimestamp.longValue();
122       if (cfTTL != HConstants.FOREVER && oldest >= cfTTL) {
123         LOG.debug("Major compaction triggered on store " + this
124           + "; for TTL maintenance");
125         return true;
126       }
127       if (!file.isMajorCompaction() || file.isBulkLoadResult()) {
128         LOG.debug("Major compaction triggered on store " + this
129           + ", because there are new files and time since last major compaction "
130           + (now - lowTimestamp) + "ms");
131         return true;
132       }
133       int lowerWindowIndex = Collections.binarySearch(boundaries,
134         minTimestamp == null ? (Long)Long.MAX_VALUE : minTimestamp);
135       int upperWindowIndex = Collections.binarySearch(boundaries,
136         file.getMaximumTimestamp() == null ? (Long)Long.MAX_VALUE : file.getMaximumTimestamp());
137       // Handle boundary conditions and negative values of binarySearch
138       lowerWindowIndex = (lowerWindowIndex < 0) ? Math.abs(lowerWindowIndex + 2) : lowerWindowIndex;
139       upperWindowIndex = (upperWindowIndex < 0) ? Math.abs(upperWindowIndex + 2) : upperWindowIndex;
140       if (lowerWindowIndex != upperWindowIndex) {
141         LOG.debug("Major compaction triggered on store " + this + "; because file "
142             + file.getPath() + " has data with timestamps cross window boundaries");
143         return true;
144       } else if (filesInWindow[upperWindowIndex]) {
145         LOG.debug("Major compaction triggered on store " + this +
146             "; because there are more than one file in some windows");
147         return true;
148       } else {
149         filesInWindow[upperWindowIndex] = true;
150       }
151       hdfsBlocksDistribution.add(file.getHDFSBlockDistribution());
152     }
153 
154     float blockLocalityIndex = hdfsBlocksDistribution
155         .getBlockLocalityIndex(RSRpcServices.getHostname(comConf.conf, false));
156     if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {
157       LOG.debug("Major compaction triggered on store " + this
158         + "; to make hdfs blocks local, current blockLocalityIndex is "
159         + blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + ")");
160       return true;
161     }
162 
163     LOG.debug("Skipping major compaction of " + this +
164         ", because the files are already major compacted");
165     return false;
166   }
167 
168   @Override
169   protected CompactionRequest createCompactionRequest(ArrayList<StoreFile> candidateSelection,
170     boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
171     CompactionRequest result = tryingMajor ? selectMajorCompaction(candidateSelection)
172       : selectMinorCompaction(candidateSelection, mayUseOffPeak, mayBeStuck);
173     LOG.debug("Generated compaction request: " + result);
174     return result;
175   }
176 
177   public CompactionRequest selectMajorCompaction(ArrayList<StoreFile> candidateSelection) {
178     long now = EnvironmentEdgeManager.currentTime();
179     long oldestToCompact = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now);
180     return new DateTieredCompactionRequest(candidateSelection,
181       this.getCompactBoundariesForMajor(candidateSelection, oldestToCompact, now));
182   }
183 
184   /**
185    * We receive store files sorted in ascending order by seqId then scan the list of files. If the
186    * current file has a maxTimestamp older than last known maximum, treat this file as it carries
187    * the last known maximum. This way both seqId and timestamp are in the same order. If files carry
188    * the same maxTimestamps, they are ordered by seqId. We then reverse the list so they are ordered
189    * by seqId and maxTimestamp in descending order and build the time windows. All the out-of-order
190    * data into the same compaction windows, guaranteeing contiguous compaction based on sequence id.
191    */
192   public CompactionRequest selectMinorCompaction(ArrayList<StoreFile> candidateSelection,
193       boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
194     long now = EnvironmentEdgeManager.currentTime();
195     long oldestToCompact = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now);
196 
197     List<StoreFile> storeFileList = Lists.newArrayList(filterOldStoreFiles(candidateSelection,
198       oldestToCompact));
199 
200     List<Pair<StoreFile, Long>> storefileMaxTimestampPairs =
201         Lists.newArrayListWithCapacity(Iterables.size(storeFileList));
202     long maxTimestampSeen = Long.MIN_VALUE;
203     for (StoreFile storeFile : storeFileList) {
204       // if there is out-of-order data,
205       // we put them in the same window as the last file in increasing order
206       maxTimestampSeen = Math.max(maxTimestampSeen,
207         storeFile.getMaximumTimestamp() == null? Long.MIN_VALUE : storeFile.getMaximumTimestamp());
208       storefileMaxTimestampPairs.add(new Pair<StoreFile, Long>(storeFile, maxTimestampSeen));
209     }
210     Collections.reverse(storefileMaxTimestampPairs);
211 
212     Window window = getIncomingWindow(now, comConf.getBaseWindowMillis());
213     int minThreshold = comConf.getIncomingWindowMin();
214     PeekingIterator<Pair<StoreFile, Long>> it =
215         Iterators.peekingIterator(storefileMaxTimestampPairs.iterator());
216     while (it.hasNext()) {
217       int compResult = window.compareToTimestamp(it.peek().getSecond());
218       if (compResult > 0) {
219         // If the file is too old for the window, switch to the next window
220         window = window.nextWindow(comConf.getWindowsPerTier(),
221           oldestToCompact);
222         minThreshold = comConf.getMinFilesToCompact();
223       } else {
224         // The file is within the target window
225         ArrayList<StoreFile> fileList = Lists.newArrayList();
226         // Add all files in the same window. For incoming window
227         // we tolerate files with future data although it is sub-optimal
228         while (it.hasNext() && window.compareToTimestamp(it.peek().getSecond()) <= 0) {
229           fileList.add(it.next().getFirst());
230         }
231         if (fileList.size() >= minThreshold) {
232           LOG.debug("Processing files: " + fileList + " for window: " + window);
233           DateTieredCompactionRequest request = generateCompactionRequest(fileList, window,
234             mayUseOffPeak, mayBeStuck, minThreshold);
235           if (request != null) {
236             return request;
237           }
238         }
239       }
240     }
241     // A non-null file list is expected by HStore
242     return new CompactionRequest(Collections.<StoreFile> emptyList());
243   }
244 
245   private DateTieredCompactionRequest generateCompactionRequest(ArrayList<StoreFile> storeFiles,
246       Window window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold)
247     throws IOException {
248     // The files has to be in ascending order for ratio-based compaction to work right
249     // and removeExcessFile to exclude youngest files.
250     Collections.reverse(storeFiles);
251 
252     // Compact everything in the window if have more files than comConf.maxBlockingFiles
253     compactionPolicyPerWindow.setMinThreshold(minThreshold);
254     ArrayList<StoreFile> storeFileSelection = mayBeStuck ? storeFiles
255       : compactionPolicyPerWindow.applyCompactionPolicy(storeFiles, mayUseOffPeak, false);
256     if (storeFileSelection != null && !storeFileSelection.isEmpty()) {
257       // If there is any file in the window excluded from compaction,
258       // only one file will be output from compaction.
259       boolean singleOutput = storeFiles.size() != storeFileSelection.size() ||
260         comConf.useSingleOutputForMinorCompaction();
261       List<Long> boundaries = getCompactionBoundariesForMinor(window, singleOutput);
262       DateTieredCompactionRequest result = new DateTieredCompactionRequest(storeFileSelection,
263         boundaries);
264       return result;
265     }
266     return null;
267   }
268 
269   /**
270    * Return a list of boundaries for multiple compaction output
271    *   in ascending order.
272    */
273   private List<Long> getCompactBoundariesForMajor(Collection<StoreFile> filesToCompact,
274     long oldestToCompact, long now) {
275     long minTimestamp = Long.MAX_VALUE;
276     for (StoreFile file : filesToCompact) {
277       minTimestamp = Math.min(minTimestamp,
278         file.getMinimumTimestamp() == null? Long.MAX_VALUE : file.getMinimumTimestamp());
279     }
280 
281     List<Long> boundaries = new ArrayList<Long>();
282 
283     // Add startMillis of all windows between now and min timestamp
284     for (Window window = getIncomingWindow(now, comConf.getBaseWindowMillis());
285       window.compareToTimestamp(minTimestamp) > 0;
286       window = window.nextWindow(comConf.getWindowsPerTier(), oldestToCompact)) {
287       boundaries.add(window.startMillis());
288     }
289     boundaries.add(Long.MIN_VALUE);
290     Collections.reverse(boundaries);
291     return boundaries;
292   }
293 
294   /**
295    * @return a list of boundaries for multiple compaction output
296    *   from minTimestamp to maxTimestamp.
297    */
298   private static List<Long> getCompactionBoundariesForMinor(Window window, boolean singleOutput) {
299     List<Long> boundaries = new ArrayList<Long>();
300     boundaries.add(Long.MIN_VALUE);
301     if (!singleOutput) {
302       boundaries.add(window.startMillis());
303     }
304     return boundaries;
305   }
306 
307   /**
308    * Removes all store files with max timestamp older than (current - maxAge).
309    * @param storeFiles all store files to consider
310    * @param maxAge the age in milliseconds when a store file stops participating in compaction.
311    * @return a list of storeFiles with the store file older than maxAge excluded
312    */
313   private static Iterable<StoreFile> filterOldStoreFiles(List<StoreFile> storeFiles,
314     final long cutoff) {
315     return Iterables.filter(storeFiles, new Predicate<StoreFile>() {
316       @Override
317       public boolean apply(StoreFile storeFile) {
318         // Known findbugs issue to guava. SuppressWarning or Nonnull annotation don't work.
319         if (storeFile == null) {
320           return false;
321         }
322         Long maxTimestamp = storeFile.getMaximumTimestamp();
323         return maxTimestamp == null ? true : maxTimestamp >= cutoff;
324       }
325     });
326   }
327 
328   private static Window getIncomingWindow(long now, long baseWindowMillis) {
329     return new Window(baseWindowMillis, now / baseWindowMillis);
330   }
331 
332   private static long getOldestToCompact(long maxAgeMillis, long now) {
333     try {
334       return LongMath.checkedSubtract(now, maxAgeMillis);
335     } catch (ArithmeticException ae) {
336       LOG.warn("Value for " + CompactionConfiguration.MAX_AGE_MILLIS_KEY + ": " + maxAgeMillis
337         + ". All the files will be eligible for minor compaction.");
338       return Long.MIN_VALUE;
339     }
340   }
341 
342   /**
343    * This is the class we use to partition from epoch time to now into tiers of exponential sizes of
344    * windows.
345    */
346   private static final class Window {
347     /**
348      * How big a range of timestamps fit inside the window in milliseconds.
349      */
350     private final long windowMillis;
351 
352     /**
353      * A timestamp t is within the window iff t / size == divPosition.
354      */
355     private final long divPosition;
356 
357     private Window(long baseWindowMillis, long divPosition) {
358       windowMillis = baseWindowMillis;
359       this.divPosition = divPosition;
360     }
361 
362     /**
363      * Compares the window to a timestamp.
364      * @param timestamp the timestamp to compare.
365      * @return a negative integer, zero, or a positive integer as the window lies before, covering,
366      *         or after than the timestamp.
367      */
368     public int compareToTimestamp(long timestamp) {
369       if (timestamp < 0) {
370         try {
371           timestamp = LongMath.checkedSubtract(timestamp, windowMillis - 1);
372         } catch (ArithmeticException ae) {
373           timestamp = Long.MIN_VALUE;
374         }
375       }
376       long pos = timestamp / windowMillis;
377       return divPosition == pos ? 0 : divPosition < pos ? -1 : 1;
378     }
379 
380     /**
381      * Move to the new window of the same tier or of the next tier, which represents an earlier time
382      * span.
383      * @param windowsPerTier The number of contiguous windows that will have the same size. Windows
384      *          following those will be <code>tierBase</code> times as big.
385      * @return The next window
386      */
387     public Window nextWindow(int windowsPerTier, long oldestToCompact) {
388       // Don't promote to the next tier if there is not even 1 window at current tier
389       // or if the next window crosses the max age.
390       if (divPosition % windowsPerTier > 0 ||
391           startMillis() - windowMillis * windowsPerTier < oldestToCompact) {
392         return new Window(windowMillis, divPosition - 1);
393       } else {
394         return new Window(windowMillis * windowsPerTier, divPosition / windowsPerTier - 1);
395       }
396     }
397 
398     /**
399      * Inclusive lower bound
400      */
401     public long startMillis() {
402       try {
403         return LongMath.checkedMultiply(windowMillis, divPosition);
404       } catch (ArithmeticException ae) {
405         return Long.MIN_VALUE;
406       }
407     }
408 
409     /**
410      * Exclusive upper bound
411      */
412     public long endMillis() {
413       try {
414         return LongMath.checkedMultiply(windowMillis, (divPosition + 1));
415       } catch (ArithmeticException ae) {
416         return Long.MAX_VALUE;
417       }
418     }
419 
420     @Override
421     public String toString() {
422       return "[" + startMillis() + ", " + endMillis() + ")";
423     }
424   }
425 }