View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.compactions;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.List;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.HConstants;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.hbase.regionserver.RSRpcServices;
32  import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
33  import org.apache.hadoop.hbase.regionserver.StoreFile;
34  import org.apache.hadoop.hbase.regionserver.StoreUtils;
35  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
36  
37  /**
38   * The default algorithm for selecting files for compaction.
39   * Combines the compaction configuration and the provisional file selection that
40   * it's given to produce the list of suitable candidates for compaction.
41   */
42  @InterfaceAudience.Private
43  public class RatioBasedCompactionPolicy extends SortedCompactionPolicy {
44    private static final Log LOG = LogFactory.getLog(RatioBasedCompactionPolicy.class);
45  
46    public RatioBasedCompactionPolicy(Configuration conf,
47                                      StoreConfigInformation storeConfigInfo) {
48      super(conf, storeConfigInfo);
49    }
50  
51    /*
52     * @param filesToCompact Files to compact. Can be null.
53     * @return True if we should run a major compaction.
54     */
55    @Override
56    public boolean shouldPerformMajorCompaction(final Collection<StoreFile> filesToCompact)
57      throws IOException {
58      boolean result = false;
59      long mcTime = getNextMajorCompactTime(filesToCompact);
60      if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
61        return result;
62      }
63      // TODO: Use better method for determining stamp of last major (HBASE-2990)
64      long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
65      long now = EnvironmentEdgeManager.currentTime();
66      if (lowTimestamp > 0L && lowTimestamp < (now - mcTime)) {
67        // Major compaction time has elapsed.
68        long cfTTL = this.storeConfigInfo.getStoreFileTtl();
69        if (filesToCompact.size() == 1) {
70          // Single file
71          StoreFile sf = filesToCompact.iterator().next();
72          Long minTimestamp = sf.getMinimumTimestamp();
73          long oldest = (minTimestamp == null) ? Long.MIN_VALUE : now - minTimestamp.longValue();
74          if (sf.isMajorCompaction() && (cfTTL == HConstants.FOREVER || oldest < cfTTL)) {
75            float blockLocalityIndex =
76              sf.getHDFSBlockDistribution().getBlockLocalityIndex(
77              RSRpcServices.getHostname(comConf.conf, false));
78            if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {
79              LOG.debug("Major compaction triggered on only store " + this
80                + "; to make hdfs blocks local, current blockLocalityIndex is "
81                + blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + ")");
82              result = true;
83            } else {
84              LOG.debug("Skipping major compaction of " + this
85                + " because one (major) compacted file only, oldestTime " + oldest
86                + "ms is < TTL=" + cfTTL + " and blockLocalityIndex is " + blockLocalityIndex
87                + " (min " + comConf.getMinLocalityToForceCompact() + ")");
88            }
89          } else if (cfTTL != HConstants.FOREVER && oldest > cfTTL) {
90            LOG.debug("Major compaction triggered on store " + this
91              + ", because keyvalues outdated; time since last major compaction "
92              + (now - lowTimestamp) + "ms");
93            result = true;
94          }
95        } else {
96          LOG.debug("Major compaction triggered on store " + this
97            + "; time since last major compaction " + (now - lowTimestamp) + "ms");
98        }
99        result = true;
100     }
101     return result;
102   }
103 
104   @Override
105   protected CompactionRequest createCompactionRequest(ArrayList<StoreFile> candidateSelection,
106     boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
107     if (!tryingMajor) {
108       candidateSelection = filterBulk(candidateSelection);
109       candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
110       candidateSelection = checkMinFilesCriteria(candidateSelection,
111         comConf.getMinFilesToCompact());
112     }
113     return new CompactionRequest(candidateSelection);
114   }
115 
116   /**
117     * -- Default minor compaction selection algorithm:
118     * choose CompactSelection from candidates --
119     * First exclude bulk-load files if indicated in configuration.
120     * Start at the oldest file and stop when you find the first file that
121     * meets compaction criteria:
122     * (1) a recently-flushed, small file (i.e. <= minCompactSize)
123     * OR
124     * (2) within the compactRatio of sum(newer_files)
125     * Given normal skew, any newer files will also meet this criteria
126     * <p/>
127     * Additional Note:
128     * If fileSizes.size() >> maxFilesToCompact, we will recurse on
129     * compact().  Consider the oldest files first to avoid a
130     * situation where we always compact [end-threshold,end).  Then, the
131     * last file becomes an aggregate of the previous compactions.
132     *
133     * normal skew:
134     *
135     *         older ----> newer (increasing seqID)
136     *     _
137     *    | |   _
138     *    | |  | |   _
139     *  --|-|- |-|- |-|---_-------_-------  minCompactSize
140     *    | |  | |  | |  | |  _  | |
141     *    | |  | |  | |  | | | | | |
142     *    | |  | |  | |  | | | | | |
143     * @param candidates pre-filtrate
144     * @return filtered subset
145     */
146   protected ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates,
147     boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
148     if (candidates.isEmpty()) {
149       return candidates;
150     }
151 
152     // we're doing a minor compaction, let's see what files are applicable
153     int start = 0;
154     double ratio = comConf.getCompactionRatio();
155     if (mayUseOffPeak) {
156       ratio = comConf.getCompactionRatioOffPeak();
157       LOG.info("Running an off-peak compaction, selection ratio = " + ratio);
158     }
159 
160     // get store file sizes for incremental compacting selection.
161     final int countOfFiles = candidates.size();
162     long[] fileSizes = new long[countOfFiles];
163     long[] sumSize = new long[countOfFiles];
164     for (int i = countOfFiles - 1; i >= 0; --i) {
165       StoreFile file = candidates.get(i);
166       fileSizes[i] = file.getReader().length();
167       // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
168       int tooFar = i + comConf.getMaxFilesToCompact() - 1;
169       sumSize[i] = fileSizes[i]
170         + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
171         - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
172     }
173 
174 
175     while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
176       fileSizes[start] > Math.max(comConf.getMinCompactSize(),
177           (long) (sumSize[start + 1] * ratio))) {
178       ++start;
179     }
180     if (start < countOfFiles) {
181       LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
182         + " files from " + countOfFiles + " candidates");
183     } else if (mayBeStuck) {
184       // We may be stuck. Compact the latest files if we can.
185       int filesToLeave = candidates.size() - comConf.getMinFilesToCompact();
186       if (filesToLeave >= 0) {
187         start = filesToLeave;
188       }
189     }
190     candidates.subList(0, start).clear();
191     return candidates;
192   }
193 
194   /**
195    * A heuristic method to decide whether to schedule a compaction request
196    * @param storeFiles files in the store.
197    * @param filesCompacting files being scheduled to compact.
198    * @return true to schedule a request.
199    */
200   public boolean needsCompaction(final Collection<StoreFile> storeFiles,
201       final List<StoreFile> filesCompacting) {
202     int numCandidates = storeFiles.size() - filesCompacting.size();
203     return numCandidates >= comConf.getMinFilesToCompact();
204   }
205 
206   /**
207    * Overwrite min threshold for compaction
208    */
209   public void setMinThreshold(int minThreshold) {
210     comConf.setMinFilesToCompact(minThreshold);
211   }
212 }