1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.compactions;
19
20 import java.io.IOException;
21 import java.io.InterruptedIOException;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.List;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.io.compress.Compression;
32 import org.apache.hadoop.hbase.regionserver.InternalScanner;
33 import org.apache.hadoop.hbase.regionserver.ScanType;
34 import org.apache.hadoop.hbase.regionserver.Store;
35 import org.apache.hadoop.hbase.regionserver.StoreFile;
36 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
37 import org.apache.hadoop.hbase.regionserver.StoreScanner;
38 import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
39 import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
40 import org.apache.hadoop.hbase.security.User;
41 import org.apache.hadoop.hbase.util.Bytes;
42
43
44
45
46
47 @InterfaceAudience.Private
48 public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFileWriter> {
49 private static final Log LOG = LogFactory.getLog(StripeCompactor.class);
50 public StripeCompactor(Configuration conf, Store store) {
51 super(conf, store);
52 }
53
54 private final class StripeInternalScannerFactory implements InternalScannerFactory {
55
56 private final byte[] majorRangeFromRow;
57
58 private final byte[] majorRangeToRow;
59
60 public StripeInternalScannerFactory(byte[] majorRangeFromRow, byte[] majorRangeToRow) {
61 this.majorRangeFromRow = majorRangeFromRow;
62 this.majorRangeToRow = majorRangeToRow;
63 }
64
65 @Override
66 public ScanType getScanType(CompactionRequest request) {
67
68
69
70 return ScanType.COMPACT_RETAIN_DELETES;
71 }
72
73 @Override
74 public InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
75 FileDetails fd, long smallestReadPoint) throws IOException {
76 return (majorRangeFromRow == null)
77 ? StripeCompactor.this.createScanner(store, scanners, scanType, smallestReadPoint,
78 fd.earliestPutTs)
79 : StripeCompactor.this.createScanner(store, scanners, smallestReadPoint, fd.earliestPutTs,
80 majorRangeFromRow, majorRangeToRow);
81 }
82 }
83
84 public List<Path> compact(CompactionRequest request, final List<byte[]> targetBoundaries,
85 final byte[] majorRangeFromRow, final byte[] majorRangeToRow,
86 CompactionThroughputController throughputController, User user) throws IOException {
87 if (LOG.isDebugEnabled()) {
88 StringBuilder sb = new StringBuilder();
89 sb.append("Executing compaction with " + targetBoundaries.size() + " boundaries:");
90 for (byte[] tb : targetBoundaries) {
91 sb.append(" [").append(Bytes.toString(tb)).append("]");
92 }
93 LOG.debug(sb.toString());
94 }
95 return compact(request, new StripeInternalScannerFactory(majorRangeFromRow, majorRangeToRow),
96 new CellSinkFactory<StripeMultiFileWriter>() {
97
98 @Override
99 public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
100 boolean shouldDropBehind) throws IOException {
101 StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter(
102 store.getComparator(), targetBoundaries, majorRangeFromRow, majorRangeToRow);
103 initMultiWriter(writer, scanner, fd, shouldDropBehind);
104 return writer;
105 }
106 }, throughputController, user);
107 }
108
109 public List<Path> compact(CompactionRequest request, final int targetCount, final long targetSize,
110 final byte[] left, final byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow,
111 CompactionThroughputController throughputController, User user) throws IOException {
112 if (LOG.isDebugEnabled()) {
113 LOG.debug(
114 "Executing compaction with " + targetSize + " target file size, no more than " + targetCount
115 + " files, in [" + Bytes.toString(left) + "] [" + Bytes.toString(right) + "] range");
116 }
117 return compact(request, new StripeInternalScannerFactory(majorRangeFromRow, majorRangeToRow),
118 new CellSinkFactory<StripeMultiFileWriter>() {
119
120 @Override
121 public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
122 boolean shouldDropBehind) throws IOException {
123 StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(
124 store.getComparator(), targetCount, targetSize, left, right);
125 initMultiWriter(writer, scanner, fd, shouldDropBehind);
126 return writer;
127 }
128 }, throughputController, user);
129 }
130
131 @Override
132 protected List<Path> commitWriter(StripeMultiFileWriter writer, FileDetails fd,
133 CompactionRequest request) throws IOException {
134 List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor());
135 assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata.";
136 return newFiles;
137 }
138 }