1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.compactions;
20
21 import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.List;
27
28 import com.google.common.collect.ImmutableList;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.hbase.KeyValue.KVComparator;
35 import org.apache.hadoop.hbase.classification.InterfaceAudience;
36 import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
37 import org.apache.hadoop.hbase.regionserver.StoreFile;
38 import org.apache.hadoop.hbase.regionserver.StoreUtils;
39 import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
40 import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
41 import org.apache.hadoop.hbase.security.User;
42 import org.apache.hadoop.hbase.util.Bytes;
43 import org.apache.hadoop.hbase.util.ConcatenatedLists;
44 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
45 import org.apache.hadoop.hbase.util.Pair;
46
47
48
49
50 @InterfaceAudience.Private
51 public class StripeCompactionPolicy extends CompactionPolicy {
52 private final static Log LOG = LogFactory.getLog(StripeCompactionPolicy.class);
53
54 private ExploringCompactionPolicy stripePolicy = null;
55
56 private StripeStoreConfig config;
57
58 public StripeCompactionPolicy(
59 Configuration conf, StoreConfigInformation storeConfigInfo, StripeStoreConfig config) {
60 super(conf, storeConfigInfo);
61 this.config = config;
62 stripePolicy = new ExploringCompactionPolicy(conf, storeConfigInfo);
63 }
64
65 public List<StoreFile> preSelectFilesForCoprocessor(StripeInformationProvider si,
66 List<StoreFile> filesCompacting) {
67
68
69
70 ArrayList<StoreFile> candidateFiles = new ArrayList<StoreFile>(si.getStorefiles());
71 candidateFiles.removeAll(filesCompacting);
72 return candidateFiles;
73 }
74
75 public StripeCompactionRequest createEmptyRequest(
76 StripeInformationProvider si, CompactionRequest request) {
77
78 if (si.getStripeCount() > 0) {
79 return new BoundaryStripeCompactionRequest(request, si.getStripeBoundaries());
80 }
81 Pair<Long, Integer> targetKvsAndCount = estimateTargetKvs(
82 request.getFiles(), this.config.getInitialCount());
83 return new SplitStripeCompactionRequest(
84 request, OPEN_KEY, OPEN_KEY, targetKvsAndCount.getSecond(), targetKvsAndCount.getFirst());
85 }
86
87 public StripeStoreFlusher.StripeFlushRequest selectFlush(KVComparator comparator,
88 StripeInformationProvider si, int kvCount) {
89 if (this.config.isUsingL0Flush()) {
90
91 return new StripeStoreFlusher.StripeFlushRequest(comparator);
92 }
93 if (si.getStripeCount() == 0) {
94
95 int initialCount = this.config.getInitialCount();
96 return new StripeStoreFlusher.SizeStripeFlushRequest(comparator, initialCount,
97 kvCount / initialCount);
98 }
99
100 return new StripeStoreFlusher.BoundaryStripeFlushRequest(comparator, si.getStripeBoundaries());
101 }
102
103 public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
104 List<StoreFile> filesCompacting, boolean isOffpeak) throws IOException {
105
106
107 if (!filesCompacting.isEmpty()) {
108 LOG.debug("Not selecting compaction: " + filesCompacting.size() + " files compacting");
109 return null;
110 }
111
112
113
114
115
116
117
118 Collection<StoreFile> allFiles = si.getStorefiles();
119 if (StoreUtils.hasReferences(allFiles)) {
120 LOG.debug("There are references in the store; compacting all files");
121 long targetKvs = estimateTargetKvs(allFiles, config.getInitialCount()).getFirst();
122 SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
123 allFiles, OPEN_KEY, OPEN_KEY, targetKvs);
124 request.setMajorRangeFull();
125 return request;
126 }
127
128 int stripeCount = si.getStripeCount();
129 List<StoreFile> l0Files = si.getLevel0Files();
130
131
132 boolean shouldCompactL0 = (this.config.getLevel0MinFiles() <= l0Files.size());
133 if (stripeCount == 0) {
134 if (!shouldCompactL0) return null;
135 return selectNewStripesCompaction(si);
136 }
137
138 boolean canDropDeletesNoL0 = l0Files.size() == 0;
139 if (shouldCompactL0) {
140 if (!canDropDeletesNoL0) {
141
142 StripeCompactionRequest result = selectSingleStripeCompaction(
143 si, true, canDropDeletesNoL0, isOffpeak);
144 if (result != null) return result;
145 }
146 LOG.debug("Selecting L0 compaction with " + l0Files.size() + " files");
147 return new BoundaryStripeCompactionRequest(l0Files, si.getStripeBoundaries());
148 }
149
150
151 StripeCompactionRequest result = selectExpiredMergeCompaction(si, canDropDeletesNoL0);
152 if (result != null) return result;
153
154
155
156 return selectSingleStripeCompaction(si, false, canDropDeletesNoL0, isOffpeak);
157 }
158
159 public boolean needsCompactions(StripeInformationProvider si, List<StoreFile> filesCompacting) {
160
161 return filesCompacting.isEmpty()
162 && (StoreUtils.hasReferences(si.getStorefiles())
163 || (si.getLevel0Files().size() >= this.config.getLevel0MinFiles())
164 || needsSingleStripeCompaction(si));
165 }
166
167 @Override
168 public boolean shouldPerformMajorCompaction(Collection<StoreFile> filesToCompact)
169 throws IOException {
170 return false;
171 }
172
173 @Override
174 public boolean throttleCompaction(long compactionSize) {
175 return compactionSize > comConf.getThrottlePoint();
176 }
177
178
179
180
181
182 protected boolean needsSingleStripeCompaction(StripeInformationProvider si) {
183 int minFiles = this.config.getStripeCompactMinFiles();
184 for (List<StoreFile> stripe : si.getStripes()) {
185 if (stripe.size() >= minFiles) return true;
186 }
187 return false;
188 }
189
190 protected StripeCompactionRequest selectSingleStripeCompaction(StripeInformationProvider si,
191 boolean includeL0, boolean canDropDeletesWithoutL0, boolean isOffpeak) throws IOException {
192 ArrayList<ImmutableList<StoreFile>> stripes = si.getStripes();
193
194 int bqIndex = -1;
195 List<StoreFile> bqSelection = null;
196 int stripeCount = stripes.size();
197 long bqTotalSize = -1;
198 for (int i = 0; i < stripeCount; ++i) {
199
200
201 List<StoreFile> selection = selectSimpleCompaction(stripes.get(i),
202 !canDropDeletesWithoutL0 && includeL0, isOffpeak);
203 if (selection.isEmpty()) continue;
204 long size = 0;
205 for (StoreFile sf : selection) {
206 size += sf.getReader().length();
207 }
208 if (bqSelection == null || selection.size() > bqSelection.size() ||
209 (selection.size() == bqSelection.size() && size < bqTotalSize)) {
210 bqSelection = selection;
211 bqIndex = i;
212 bqTotalSize = size;
213 }
214 }
215 if (bqSelection == null) {
216 LOG.debug("No good compaction is possible in any stripe");
217 return null;
218 }
219 List<StoreFile> filesToCompact = new ArrayList<StoreFile>(bqSelection);
220
221 int targetCount = 1;
222 long targetKvs = Long.MAX_VALUE;
223 boolean hasAllFiles = filesToCompact.size() == stripes.get(bqIndex).size();
224 String splitString = "";
225 if (hasAllFiles && bqTotalSize >= config.getSplitSize()) {
226 if (includeL0) {
227
228
229 return null;
230 }
231 Pair<Long, Integer> kvsAndCount = estimateTargetKvs(filesToCompact, config.getSplitCount());
232 targetKvs = kvsAndCount.getFirst();
233 targetCount = kvsAndCount.getSecond();
234 splitString = "; the stripe will be split into at most "
235 + targetCount + " stripes with " + targetKvs + " target KVs";
236 }
237
238 LOG.debug("Found compaction in a stripe with end key ["
239 + Bytes.toString(si.getEndRow(bqIndex)) + "], with "
240 + filesToCompact.size() + " files of total size " + bqTotalSize + splitString);
241
242
243 StripeCompactionRequest req;
244 if (includeL0) {
245 assert hasAllFiles;
246 List<StoreFile> l0Files = si.getLevel0Files();
247 LOG.debug("Adding " + l0Files.size() + " files to compaction to be able to drop deletes");
248 ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
249 sfs.addSublist(filesToCompact);
250 sfs.addSublist(l0Files);
251 req = new BoundaryStripeCompactionRequest(sfs, si.getStripeBoundaries());
252 } else {
253 req = new SplitStripeCompactionRequest(
254 filesToCompact, si.getStartRow(bqIndex), si.getEndRow(bqIndex), targetCount, targetKvs);
255 }
256 if (hasAllFiles && (canDropDeletesWithoutL0 || includeL0)) {
257 req.setMajorRange(si.getStartRow(bqIndex), si.getEndRow(bqIndex));
258 }
259 req.getRequest().setOffPeak(isOffpeak);
260 return req;
261 }
262
263
264
265
266
267
268
269 private List<StoreFile> selectSimpleCompaction(
270 List<StoreFile> sfs, boolean allFilesOnly, boolean isOffpeak) {
271 int minFilesLocal = Math.max(
272 allFilesOnly ? sfs.size() : 0, this.config.getStripeCompactMinFiles());
273 int maxFilesLocal = Math.max(this.config.getStripeCompactMaxFiles(), minFilesLocal);
274 return stripePolicy.applyCompactionPolicy(sfs, false, isOffpeak, minFilesLocal, maxFilesLocal);
275 }
276
277
278
279
280
281
282
283
284 private StripeCompactionRequest selectCompactionOfAllFiles(StripeInformationProvider si,
285 int targetStripeCount, long targetSize) {
286 Collection<StoreFile> allFiles = si.getStorefiles();
287 SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
288 allFiles, OPEN_KEY, OPEN_KEY, targetStripeCount, targetSize);
289 request.setMajorRangeFull();
290 LOG.debug("Selecting a compaction that includes all " + allFiles.size() + " files");
291 return request;
292 }
293
294 private StripeCompactionRequest selectNewStripesCompaction(StripeInformationProvider si) {
295 List<StoreFile> l0Files = si.getLevel0Files();
296 Pair<Long, Integer> kvsAndCount = estimateTargetKvs(l0Files, config.getInitialCount());
297 LOG.debug("Creating " + kvsAndCount.getSecond() + " initial stripes with "
298 + kvsAndCount.getFirst() + " kvs each via L0 compaction of " + l0Files.size() + " files");
299 SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
300 si.getLevel0Files(), OPEN_KEY, OPEN_KEY, kvsAndCount.getSecond(), kvsAndCount.getFirst());
301 request.setMajorRangeFull();
302 return request;
303 }
304
305 private StripeCompactionRequest selectExpiredMergeCompaction(
306 StripeInformationProvider si, boolean canDropDeletesNoL0) {
307 long cfTtl = this.storeConfigInfo.getStoreFileTtl();
308 if (cfTtl == Long.MAX_VALUE) {
309 return null;
310 }
311 long timestampCutoff = EnvironmentEdgeManager.currentTime() - cfTtl;
312
313 int start = -1, bestStart = -1, length = 0, bestLength = 0;
314 ArrayList<ImmutableList<StoreFile>> stripes = si.getStripes();
315 OUTER: for (int i = 0; i < stripes.size(); ++i) {
316 for (StoreFile storeFile : stripes.get(i)) {
317 if (storeFile.getReader().getMaxTimestamp() < timestampCutoff) continue;
318
319 if (length > bestLength) {
320 bestStart = start;
321 bestLength = length;
322 }
323 start = -1;
324 length = 0;
325 continue OUTER;
326 }
327 if (start == -1) {
328 start = i;
329 }
330 ++length;
331 }
332 if (length > bestLength) {
333 bestStart = start;
334 bestLength = length;
335 }
336 if (bestLength == 0) return null;
337 if (bestLength == 1) {
338
339
340
341
342 if (bestStart == (stripes.size() - 1)) return null;
343 ++bestLength;
344 }
345 LOG.debug("Merging " + bestLength + " stripes to delete expired store files");
346 int endIndex = bestStart + bestLength - 1;
347 ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
348 sfs.addAllSublists(stripes.subList(bestStart, endIndex + 1));
349 SplitStripeCompactionRequest result = new SplitStripeCompactionRequest(sfs,
350 si.getStartRow(bestStart), si.getEndRow(endIndex), 1, Long.MAX_VALUE);
351 if (canDropDeletesNoL0) {
352 result.setMajorRangeFull();
353 }
354 return result;
355 }
356
357 private static long getTotalKvCount(final Collection<StoreFile> candidates) {
358 long totalSize = 0;
359 for (StoreFile storeFile : candidates) {
360 totalSize += storeFile.getReader().getEntries();
361 }
362 return totalSize;
363 }
364
365 public static long getTotalFileSize(final Collection<StoreFile> candidates) {
366 long totalSize = 0;
367 for (StoreFile storeFile : candidates) {
368 totalSize += storeFile.getReader().length();
369 }
370 return totalSize;
371 }
372
373 private Pair<Long, Integer> estimateTargetKvs(Collection<StoreFile> files, double splitCount) {
374
375
376
377
378 long totalSize = getTotalFileSize(files);
379 long targetPartSize = config.getSplitPartSize();
380 assert targetPartSize > 0 && splitCount > 0;
381 double ratio = totalSize / (splitCount * targetPartSize);
382 while (ratio > 1.0) {
383
384 double newRatio = totalSize / ((splitCount + 1.0) * targetPartSize);
385 if ((1.0 / newRatio) >= ratio) break;
386 ratio = newRatio;
387 splitCount += 1.0;
388 }
389 long kvCount = (long)(getTotalKvCount(files) / splitCount);
390 return new Pair<Long, Integer>(kvCount, (int)Math.ceil(splitCount));
391 }
392
393
394 public abstract static class StripeCompactionRequest {
395 protected CompactionRequest request;
396 protected byte[] majorRangeFromRow = null, majorRangeToRow = null;
397
398 public List<Path> execute(StripeCompactor compactor,
399 CompactionThroughputController throughputController) throws IOException {
400 return execute(compactor, throughputController, null);
401 }
402
403
404
405
406
407
408 public abstract List<Path> execute(StripeCompactor compactor,
409 CompactionThroughputController throughputController, User user) throws IOException;
410
411 public StripeCompactionRequest(CompactionRequest request) {
412 this.request = request;
413 }
414
415
416
417
418
419
420
421 public void setMajorRange(byte[] startRow, byte[] endRow) {
422 this.majorRangeFromRow = startRow;
423 this.majorRangeToRow = endRow;
424 }
425
426 public CompactionRequest getRequest() {
427 return this.request;
428 }
429
430 public void setRequest(CompactionRequest request) {
431 assert request != null;
432 this.request = request;
433 this.majorRangeFromRow = this.majorRangeToRow = null;
434 }
435 }
436
437
438
439
440
441 private static class BoundaryStripeCompactionRequest extends StripeCompactionRequest {
442 private final List<byte[]> targetBoundaries;
443
444
445
446
447
448 public BoundaryStripeCompactionRequest(CompactionRequest request,
449 List<byte[]> targetBoundaries) {
450 super(request);
451 this.targetBoundaries = targetBoundaries;
452 }
453
454 public BoundaryStripeCompactionRequest(Collection<StoreFile> files,
455 List<byte[]> targetBoundaries) {
456 this(new CompactionRequest(files), targetBoundaries);
457 }
458
459 @Override
460 public List<Path> execute(StripeCompactor compactor,
461 CompactionThroughputController throughputController, User user) throws IOException {
462 return compactor.compact(this.request, this.targetBoundaries, this.majorRangeFromRow,
463 this.majorRangeToRow, throughputController, user);
464 }
465 }
466
467
468
469
470
471
472
473 private static class SplitStripeCompactionRequest extends StripeCompactionRequest {
474 private final byte[] startRow, endRow;
475 private final int targetCount;
476 private final long targetKvs;
477
478
479
480
481
482
483
484
485
486 public SplitStripeCompactionRequest(CompactionRequest request,
487 byte[] startRow, byte[] endRow, int targetCount, long targetKvs) {
488 super(request);
489 this.startRow = startRow;
490 this.endRow = endRow;
491 this.targetCount = targetCount;
492 this.targetKvs = targetKvs;
493 }
494
495 public SplitStripeCompactionRequest(
496 CompactionRequest request, byte[] startRow, byte[] endRow, long targetKvs) {
497 this(request, startRow, endRow, Integer.MAX_VALUE, targetKvs);
498 }
499
500 public SplitStripeCompactionRequest(
501 Collection<StoreFile> files, byte[] startRow, byte[] endRow, long targetKvs) {
502 this(files, startRow, endRow, Integer.MAX_VALUE, targetKvs);
503 }
504
505 public SplitStripeCompactionRequest(Collection<StoreFile> files,
506 byte[] startRow, byte[] endRow, int targetCount, long targetKvs) {
507 this(new CompactionRequest(files), startRow, endRow, targetCount, targetKvs);
508 }
509
510 @Override
511 public List<Path> execute(StripeCompactor compactor,
512 CompactionThroughputController throughputController, User user) throws IOException {
513 return compactor.compact(this.request, this.targetCount, this.targetKvs, this.startRow,
514 this.endRow, this.majorRangeFromRow, this.majorRangeToRow, throughputController, user);
515 }
516
517
518
519 public void setMajorRangeFull() {
520 setMajorRange(this.startRow, this.endRow);
521 }
522 }
523
524
525 public static interface StripeInformationProvider {
526 public Collection<StoreFile> getStorefiles();
527
528
529
530
531
532
533 public byte[] getStartRow(int stripeIndex);
534
535
536
537
538
539
540 public byte[] getEndRow(int stripeIndex);
541
542
543
544
545 public List<StoreFile> getLevel0Files();
546
547
548
549
550 public List<byte[]> getStripeBoundaries();
551
552
553
554
555 public ArrayList<ImmutableList<StoreFile>> getStripes();
556
557
558
559
560 public int getStripeCount();
561 }
562 }