1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.compactions;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.List;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.HConstants;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
32 import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
33 import org.apache.hadoop.hbase.regionserver.StoreFile;
34 import org.apache.hadoop.hbase.regionserver.StoreUtils;
35 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
36
37
38
39
40
41
42 @InterfaceAudience.Private
43 public class RatioBasedCompactionPolicy extends SortedCompactionPolicy {
44 private static final Log LOG = LogFactory.getLog(RatioBasedCompactionPolicy.class);
45
46 public RatioBasedCompactionPolicy(Configuration conf,
47 StoreConfigInformation storeConfigInfo) {
48 super(conf, storeConfigInfo);
49 }
50
51
52
53
54
55 @Override
56 public boolean shouldPerformMajorCompaction(final Collection<StoreFile> filesToCompact)
57 throws IOException {
58 boolean result = false;
59 long mcTime = getNextMajorCompactTime(filesToCompact);
60 if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
61 return result;
62 }
63
64 long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
65 long now = EnvironmentEdgeManager.currentTime();
66 if (lowTimestamp > 0L && lowTimestamp < (now - mcTime)) {
67
68 long cfTTL = this.storeConfigInfo.getStoreFileTtl();
69 if (filesToCompact.size() == 1) {
70
71 StoreFile sf = filesToCompact.iterator().next();
72 Long minTimestamp = sf.getMinimumTimestamp();
73 long oldest = (minTimestamp == null) ? Long.MIN_VALUE : now - minTimestamp.longValue();
74 if (sf.isMajorCompaction() && (cfTTL == HConstants.FOREVER || oldest < cfTTL)) {
75 float blockLocalityIndex =
76 sf.getHDFSBlockDistribution().getBlockLocalityIndex(
77 RSRpcServices.getHostname(comConf.conf, false));
78 if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {
79 LOG.debug("Major compaction triggered on only store " + this
80 + "; to make hdfs blocks local, current blockLocalityIndex is "
81 + blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + ")");
82 result = true;
83 } else {
84 LOG.debug("Skipping major compaction of " + this
85 + " because one (major) compacted file only, oldestTime " + oldest
86 + "ms is < TTL=" + cfTTL + " and blockLocalityIndex is " + blockLocalityIndex
87 + " (min " + comConf.getMinLocalityToForceCompact() + ")");
88 }
89 } else if (cfTTL != HConstants.FOREVER && oldest > cfTTL) {
90 LOG.debug("Major compaction triggered on store " + this
91 + ", because keyvalues outdated; time since last major compaction "
92 + (now - lowTimestamp) + "ms");
93 result = true;
94 }
95 } else {
96 LOG.debug("Major compaction triggered on store " + this
97 + "; time since last major compaction " + (now - lowTimestamp) + "ms");
98 }
99 result = true;
100 }
101 return result;
102 }
103
104 @Override
105 protected CompactionRequest createCompactionRequest(ArrayList<StoreFile> candidateSelection,
106 boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
107 if (!tryingMajor) {
108 candidateSelection = filterBulk(candidateSelection);
109 candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
110 candidateSelection = checkMinFilesCriteria(candidateSelection,
111 comConf.getMinFilesToCompact());
112 }
113 return new CompactionRequest(candidateSelection);
114 }
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146 protected ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates,
147 boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
148 if (candidates.isEmpty()) {
149 return candidates;
150 }
151
152
153 int start = 0;
154 double ratio = comConf.getCompactionRatio();
155 if (mayUseOffPeak) {
156 ratio = comConf.getCompactionRatioOffPeak();
157 LOG.info("Running an off-peak compaction, selection ratio = " + ratio);
158 }
159
160
161 final int countOfFiles = candidates.size();
162 long[] fileSizes = new long[countOfFiles];
163 long[] sumSize = new long[countOfFiles];
164 for (int i = countOfFiles - 1; i >= 0; --i) {
165 StoreFile file = candidates.get(i);
166 fileSizes[i] = file.getReader().length();
167
168 int tooFar = i + comConf.getMaxFilesToCompact() - 1;
169 sumSize[i] = fileSizes[i]
170 + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
171 - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
172 }
173
174
175 while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
176 fileSizes[start] > Math.max(comConf.getMinCompactSize(),
177 (long) (sumSize[start + 1] * ratio))) {
178 ++start;
179 }
180 if (start < countOfFiles) {
181 LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
182 + " files from " + countOfFiles + " candidates");
183 } else if (mayBeStuck) {
184
185 int filesToLeave = candidates.size() - comConf.getMinFilesToCompact();
186 if (filesToLeave >= 0) {
187 start = filesToLeave;
188 }
189 }
190 candidates.subList(0, start).clear();
191 return candidates;
192 }
193
194
195
196
197
198
199
200 public boolean needsCompaction(final Collection<StoreFile> storeFiles,
201 final List<StoreFile> filesCompacting) {
202 int numCandidates = storeFiles.size() - filesCompacting.size();
203 return numCandidates >= comConf.getMinFilesToCompact();
204 }
205
206
207
208
209 public void setMinThreshold(int minThreshold) {
210 comConf.setMinFilesToCompact(minThreshold);
211 }
212 }