1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.compactions;
20
21 import com.google.common.annotations.VisibleForTesting;
22 import com.google.common.base.Predicate;
23 import com.google.common.collect.Iterables;
24 import com.google.common.collect.Iterators;
25 import com.google.common.collect.Lists;
26 import com.google.common.collect.PeekingIterator;
27 import com.google.common.math.LongMath;
28
29 import java.io.IOException;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.Collections;
33 import java.util.List;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
41 import org.apache.hadoop.hbase.classification.InterfaceAudience;
42 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
43 import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
44 import org.apache.hadoop.hbase.regionserver.StoreFile;
45 import org.apache.hadoop.hbase.regionserver.StoreUtils;
46 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
47 import org.apache.hadoop.hbase.util.Pair;
48 import org.apache.hadoop.hbase.util.ReflectionUtils;
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
64 public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
65 private static final Log LOG = LogFactory.getLog(DateTieredCompactionPolicy.class);
66
67 private RatioBasedCompactionPolicy compactionPolicyPerWindow;
68
69 public DateTieredCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo)
70 throws IOException {
71 super(conf, storeConfigInfo);
72 try {
73 compactionPolicyPerWindow =
74 ReflectionUtils.instantiateWithCustomCtor(comConf.getCompactionPolicyForTieredWindow(),
75 new Class[] { Configuration.class, StoreConfigInformation.class }, new Object[] { conf,
76 storeConfigInfo });
77 } catch (Exception e) {
78 throw new IOException("Unable to load configured compaction policy '"
79 + comConf.getCompactionPolicyForTieredWindow() + "'", e);
80 }
81 }
82
83
84
85
86 @Override
87 @VisibleForTesting
88 public boolean needsCompaction(final Collection<StoreFile> storeFiles,
89 final List<StoreFile> filesCompacting) {
90 ArrayList<StoreFile> candidates = new ArrayList<StoreFile>(storeFiles);
91 try {
92 return selectMinorCompaction(candidates, false, true) != null;
93 } catch (Exception e) {
94 LOG.error("Can not check for compaction: ", e);
95 return false;
96 }
97 }
98
99 public boolean shouldPerformMajorCompaction(final Collection<StoreFile> filesToCompact)
100 throws IOException {
101 long mcTime = getNextMajorCompactTime(filesToCompact);
102 if (filesToCompact == null || mcTime == 0) {
103 return false;
104 }
105
106
107 long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
108 long now = EnvironmentEdgeManager.currentTime();
109 if (lowTimestamp <= 0L || lowTimestamp >= (now - mcTime)) {
110 return false;
111 }
112
113 long cfTTL = this.storeConfigInfo.getStoreFileTtl();
114 HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
115 long oldestToCompact = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now);
116 List<Long> boundaries = getCompactBoundariesForMajor(filesToCompact, oldestToCompact, now);
117 boolean[] filesInWindow = new boolean[boundaries.size()];
118
119 for (StoreFile file: filesToCompact) {
120 Long minTimestamp = file.getMinimumTimestamp();
121 long oldest = (minTimestamp == null) ? (Long)Long.MIN_VALUE : now - minTimestamp.longValue();
122 if (cfTTL != HConstants.FOREVER && oldest >= cfTTL) {
123 LOG.debug("Major compaction triggered on store " + this
124 + "; for TTL maintenance");
125 return true;
126 }
127 if (!file.isMajorCompaction() || file.isBulkLoadResult()) {
128 LOG.debug("Major compaction triggered on store " + this
129 + ", because there are new files and time since last major compaction "
130 + (now - lowTimestamp) + "ms");
131 return true;
132 }
133 int lowerWindowIndex = Collections.binarySearch(boundaries,
134 minTimestamp == null ? (Long)Long.MAX_VALUE : minTimestamp);
135 int upperWindowIndex = Collections.binarySearch(boundaries,
136 file.getMaximumTimestamp() == null ? (Long)Long.MAX_VALUE : file.getMaximumTimestamp());
137
138 lowerWindowIndex = (lowerWindowIndex < 0) ? Math.abs(lowerWindowIndex + 2) : lowerWindowIndex;
139 upperWindowIndex = (upperWindowIndex < 0) ? Math.abs(upperWindowIndex + 2) : upperWindowIndex;
140 if (lowerWindowIndex != upperWindowIndex) {
141 LOG.debug("Major compaction triggered on store " + this + "; because file "
142 + file.getPath() + " has data with timestamps cross window boundaries");
143 return true;
144 } else if (filesInWindow[upperWindowIndex]) {
145 LOG.debug("Major compaction triggered on store " + this +
146 "; because there are more than one file in some windows");
147 return true;
148 } else {
149 filesInWindow[upperWindowIndex] = true;
150 }
151 hdfsBlocksDistribution.add(file.getHDFSBlockDistribution());
152 }
153
154 float blockLocalityIndex = hdfsBlocksDistribution
155 .getBlockLocalityIndex(RSRpcServices.getHostname(comConf.conf, false));
156 if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {
157 LOG.debug("Major compaction triggered on store " + this
158 + "; to make hdfs blocks local, current blockLocalityIndex is "
159 + blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + ")");
160 return true;
161 }
162
163 LOG.debug("Skipping major compaction of " + this +
164 ", because the files are already major compacted");
165 return false;
166 }
167
168 @Override
169 protected CompactionRequest createCompactionRequest(ArrayList<StoreFile> candidateSelection,
170 boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
171 CompactionRequest result = tryingMajor ? selectMajorCompaction(candidateSelection)
172 : selectMinorCompaction(candidateSelection, mayUseOffPeak, mayBeStuck);
173 LOG.debug("Generated compaction request: " + result);
174 return result;
175 }
176
177 public CompactionRequest selectMajorCompaction(ArrayList<StoreFile> candidateSelection) {
178 long now = EnvironmentEdgeManager.currentTime();
179 long oldestToCompact = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now);
180 return new DateTieredCompactionRequest(candidateSelection,
181 this.getCompactBoundariesForMajor(candidateSelection, oldestToCompact, now));
182 }
183
184
185
186
187
188
189
190
191
192 public CompactionRequest selectMinorCompaction(ArrayList<StoreFile> candidateSelection,
193 boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
194 long now = EnvironmentEdgeManager.currentTime();
195 long oldestToCompact = getOldestToCompact(comConf.getMaxStoreFileAgeMillis(), now);
196
197 List<StoreFile> storeFileList = Lists.newArrayList(filterOldStoreFiles(candidateSelection,
198 oldestToCompact));
199
200 List<Pair<StoreFile, Long>> storefileMaxTimestampPairs =
201 Lists.newArrayListWithCapacity(Iterables.size(storeFileList));
202 long maxTimestampSeen = Long.MIN_VALUE;
203 for (StoreFile storeFile : storeFileList) {
204
205
206 maxTimestampSeen = Math.max(maxTimestampSeen,
207 storeFile.getMaximumTimestamp() == null? Long.MIN_VALUE : storeFile.getMaximumTimestamp());
208 storefileMaxTimestampPairs.add(new Pair<StoreFile, Long>(storeFile, maxTimestampSeen));
209 }
210 Collections.reverse(storefileMaxTimestampPairs);
211
212 Window window = getIncomingWindow(now, comConf.getBaseWindowMillis());
213 int minThreshold = comConf.getIncomingWindowMin();
214 PeekingIterator<Pair<StoreFile, Long>> it =
215 Iterators.peekingIterator(storefileMaxTimestampPairs.iterator());
216 while (it.hasNext()) {
217 int compResult = window.compareToTimestamp(it.peek().getSecond());
218 if (compResult > 0) {
219
220 window = window.nextWindow(comConf.getWindowsPerTier(),
221 oldestToCompact);
222 minThreshold = comConf.getMinFilesToCompact();
223 } else {
224
225 ArrayList<StoreFile> fileList = Lists.newArrayList();
226
227
228 while (it.hasNext() && window.compareToTimestamp(it.peek().getSecond()) <= 0) {
229 fileList.add(it.next().getFirst());
230 }
231 if (fileList.size() >= minThreshold) {
232 LOG.debug("Processing files: " + fileList + " for window: " + window);
233 DateTieredCompactionRequest request = generateCompactionRequest(fileList, window,
234 mayUseOffPeak, mayBeStuck, minThreshold);
235 if (request != null) {
236 return request;
237 }
238 }
239 }
240 }
241
242 return new CompactionRequest(Collections.<StoreFile> emptyList());
243 }
244
245 private DateTieredCompactionRequest generateCompactionRequest(ArrayList<StoreFile> storeFiles,
246 Window window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold)
247 throws IOException {
248
249
250 Collections.reverse(storeFiles);
251
252
253 compactionPolicyPerWindow.setMinThreshold(minThreshold);
254 ArrayList<StoreFile> storeFileSelection = mayBeStuck ? storeFiles
255 : compactionPolicyPerWindow.applyCompactionPolicy(storeFiles, mayUseOffPeak, false);
256 if (storeFileSelection != null && !storeFileSelection.isEmpty()) {
257
258
259 boolean singleOutput = storeFiles.size() != storeFileSelection.size() ||
260 comConf.useSingleOutputForMinorCompaction();
261 List<Long> boundaries = getCompactionBoundariesForMinor(window, singleOutput);
262 DateTieredCompactionRequest result = new DateTieredCompactionRequest(storeFileSelection,
263 boundaries);
264 return result;
265 }
266 return null;
267 }
268
269
270
271
272
273 private List<Long> getCompactBoundariesForMajor(Collection<StoreFile> filesToCompact,
274 long oldestToCompact, long now) {
275 long minTimestamp = Long.MAX_VALUE;
276 for (StoreFile file : filesToCompact) {
277 minTimestamp = Math.min(minTimestamp,
278 file.getMinimumTimestamp() == null? Long.MAX_VALUE : file.getMinimumTimestamp());
279 }
280
281 List<Long> boundaries = new ArrayList<Long>();
282
283
284 for (Window window = getIncomingWindow(now, comConf.getBaseWindowMillis());
285 window.compareToTimestamp(minTimestamp) > 0;
286 window = window.nextWindow(comConf.getWindowsPerTier(), oldestToCompact)) {
287 boundaries.add(window.startMillis());
288 }
289 boundaries.add(Long.MIN_VALUE);
290 Collections.reverse(boundaries);
291 return boundaries;
292 }
293
294
295
296
297
298 private static List<Long> getCompactionBoundariesForMinor(Window window, boolean singleOutput) {
299 List<Long> boundaries = new ArrayList<Long>();
300 boundaries.add(Long.MIN_VALUE);
301 if (!singleOutput) {
302 boundaries.add(window.startMillis());
303 }
304 return boundaries;
305 }
306
307
308
309
310
311
312
313 private static Iterable<StoreFile> filterOldStoreFiles(List<StoreFile> storeFiles,
314 final long cutoff) {
315 return Iterables.filter(storeFiles, new Predicate<StoreFile>() {
316 @Override
317 public boolean apply(StoreFile storeFile) {
318
319 if (storeFile == null) {
320 return false;
321 }
322 Long maxTimestamp = storeFile.getMaximumTimestamp();
323 return maxTimestamp == null ? true : maxTimestamp >= cutoff;
324 }
325 });
326 }
327
328 private static Window getIncomingWindow(long now, long baseWindowMillis) {
329 return new Window(baseWindowMillis, now / baseWindowMillis);
330 }
331
332 private static long getOldestToCompact(long maxAgeMillis, long now) {
333 try {
334 return LongMath.checkedSubtract(now, maxAgeMillis);
335 } catch (ArithmeticException ae) {
336 LOG.warn("Value for " + CompactionConfiguration.MAX_AGE_MILLIS_KEY + ": " + maxAgeMillis
337 + ". All the files will be eligible for minor compaction.");
338 return Long.MIN_VALUE;
339 }
340 }
341
342
343
344
345
346 private static final class Window {
347
348
349
350 private final long windowMillis;
351
352
353
354
355 private final long divPosition;
356
357 private Window(long baseWindowMillis, long divPosition) {
358 windowMillis = baseWindowMillis;
359 this.divPosition = divPosition;
360 }
361
362
363
364
365
366
367
368 public int compareToTimestamp(long timestamp) {
369 if (timestamp < 0) {
370 try {
371 timestamp = LongMath.checkedSubtract(timestamp, windowMillis - 1);
372 } catch (ArithmeticException ae) {
373 timestamp = Long.MIN_VALUE;
374 }
375 }
376 long pos = timestamp / windowMillis;
377 return divPosition == pos ? 0 : divPosition < pos ? -1 : 1;
378 }
379
380
381
382
383
384
385
386
387 public Window nextWindow(int windowsPerTier, long oldestToCompact) {
388
389
390 if (divPosition % windowsPerTier > 0 ||
391 startMillis() - windowMillis * windowsPerTier < oldestToCompact) {
392 return new Window(windowMillis, divPosition - 1);
393 } else {
394 return new Window(windowMillis * windowsPerTier, divPosition / windowsPerTier - 1);
395 }
396 }
397
398
399
400
401 public long startMillis() {
402 try {
403 return LongMath.checkedMultiply(windowMillis, divPosition);
404 } catch (ArithmeticException ae) {
405 return Long.MIN_VALUE;
406 }
407 }
408
409
410
411
412 public long endMillis() {
413 try {
414 return LongMath.checkedMultiply(windowMillis, (divPosition + 1));
415 } catch (ArithmeticException ae) {
416 return Long.MAX_VALUE;
417 }
418 }
419
420 @Override
421 public String toString() {
422 return "[" + startMillis() + ", " + endMillis() + ")";
423 }
424 }
425 }