1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mob;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.Date;
23 import java.util.List;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.hbase.Cell;
31 import org.apache.hadoop.hbase.CellUtil;
32 import org.apache.hadoop.hbase.KeyValue;
33 import org.apache.hadoop.hbase.KeyValueUtil;
34 import org.apache.hadoop.hbase.Tag;
35 import org.apache.hadoop.hbase.TagType;
36 import org.apache.hadoop.hbase.client.Scan;
37 import org.apache.hadoop.hbase.regionserver.*;
38 import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
39 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
40 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
41 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
42 import org.apache.hadoop.hbase.security.User;
43 import org.apache.hadoop.hbase.util.Bytes;
44
45
46
47
48 @InterfaceAudience.Private
49 public class DefaultMobStoreCompactor extends DefaultCompactor {
50
51 private static final Log LOG = LogFactory.getLog(DefaultMobStoreCompactor.class);
52 private long mobSizeThreshold;
53 private HMobStore mobStore;
54
55 private final InternalScannerFactory scannerFactory = new InternalScannerFactory() {
56
57 @Override
58 public ScanType getScanType(CompactionRequest request) {
59 return request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES
60 : ScanType.COMPACT_RETAIN_DELETES;
61 }
62
63 @Override
64 public InternalScanner createScanner(List<StoreFileScanner> scanners,
65 ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException {
66 Scan scan = new Scan();
67 scan.setMaxVersions(store.getFamily().getMaxVersions());
68
69 return new StoreScanner(store, store.getScanInfo(), scan, scanners,
70 ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, fd.earliestPutTs);
71 }
72 };
73
74 private final CellSinkFactory<Writer> writerFactory = new CellSinkFactory<Writer>() {
75
76 @Override
77 public Writer createWriter(InternalScanner scanner,
78 org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
79 boolean shouldDropBehind) throws IOException {
80
81 return store.createWriterInTmp(fd.maxKeyCount, compactionCompression, true, true, true);
82 }
83 };
84
85 public DefaultMobStoreCompactor(Configuration conf, Store store) {
86 super(conf, store);
87
88
89
90
91 if (!(store instanceof HMobStore)) {
92 throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
93 }
94 mobStore = (HMobStore) store;
95 mobSizeThreshold = store.getFamily().getMobThreshold();
96 }
97
98 @Override
99 public List<Path> compact(CompactionRequest request,
100 CompactionThroughputController throughputController, User user) throws IOException {
101 return compact(request, scannerFactory, writerFactory, throughputController, user);
102 }
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158 @Override
159 protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
160 long smallestReadPoint, boolean cleanSeqId,
161 CompactionThroughputController throughputController, boolean major) throws IOException {
162 int bytesWritten = 0;
163
164
165 List<Cell> cells = new ArrayList<Cell>();
166
167 int closeCheckInterval = HStore.getCloseCheckInterval();
168 boolean hasMore;
169 Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
170 byte[] fileName = null;
171 Writer mobFileWriter = null, delFileWriter = null;
172 long mobCells = 0;
173 long deleteMarkersCount = 0;
174 Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName()
175 .getName());
176 long cellsCountCompactedToMob = 0;
177 long cellsCountCompactedFromMob = 0;
178 long cellsSizeCompactedToMob = 0;
179 long cellsSizeCompactedFromMob = 0;
180 try {
181 try {
182
183 mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
184 store.getFamily().getCompression(), store.getRegionInfo().getStartKey());
185 fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
186 } catch (IOException e) {
187 LOG.error(
188 "Failed to create mob writer, "
189 + "we will continue the compaction by writing MOB cells directly in store files",
190 e);
191 }
192 delFileWriter = mobStore.createDelFileWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
193 store.getFamily().getCompression(), store.getRegionInfo().getStartKey());
194 ScannerContext scannerContext =
195 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
196 do {
197 hasMore = scanner.next(cells, scannerContext);
198
199 for (Cell c : cells) {
200 if (major && CellUtil.isDelete(c)) {
201 if (MobUtils.isMobReferenceCell(c)) {
202
203 writer.append(c);
204 } else {
205
206 writer.append(MobUtils.createMobRefDeleteMarker(c));
207
208 delFileWriter.append(c);
209 deleteMarkersCount++;
210 }
211 } else if (mobFileWriter == null || c.getTypeByte() != KeyValue.Type.Put.getCode()) {
212
213
214 writer.append(c);
215 } else if (MobUtils.isMobReferenceCell(c)) {
216 if (MobUtils.hasValidMobRefCellValue(c)) {
217 int size = MobUtils.getMobValueLength(c);
218 if (size > mobSizeThreshold) {
219
220
221 writer.append(c);
222 } else {
223
224
225 Cell mobCell = mobStore.resolve(c, false);
226 if (mobCell.getValueLength() != 0) {
227
228 CellUtil.setSequenceId(mobCell, c.getSequenceId());
229 writer.append(mobCell);
230 cellsCountCompactedFromMob++;
231 cellsSizeCompactedFromMob += mobCell.getValueLength();
232 } else {
233
234
235
236 writer.append(c);
237 }
238 }
239 } else {
240 LOG.warn("The value format of the KeyValue " + c
241 + " is wrong, its length is less than " + Bytes.SIZEOF_INT);
242 writer.append(c);
243 }
244 } else if (c.getValueLength() <= mobSizeThreshold) {
245
246
247 writer.append(c);
248 } else {
249
250
251 mobCells++;
252
253 mobFileWriter.append(c);
254 Cell reference = MobUtils.createMobRefCell(c, fileName, tableNameTag);
255
256 writer.append(reference);
257 cellsCountCompactedToMob++;
258 cellsSizeCompactedToMob += c.getValueLength();
259 }
260 ++progress.currentCompactedKVs;
261
262
263 if (closeCheckInterval > 0) {
264 bytesWritten += KeyValueUtil.length(c);
265 if (bytesWritten > closeCheckInterval) {
266 bytesWritten = 0;
267 if (!store.areWritesEnabled()) {
268 progress.cancel();
269 return false;
270 }
271 }
272 }
273 }
274 cells.clear();
275 } while (hasMore);
276 } finally {
277 if (mobFileWriter != null) {
278 mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells);
279 mobFileWriter.close();
280 }
281 if (delFileWriter != null) {
282 delFileWriter.appendMetadata(fd.maxSeqId, major, deleteMarkersCount);
283 delFileWriter.close();
284 }
285 }
286 if (mobFileWriter != null) {
287 if (mobCells > 0) {
288
289 mobStore.commitFile(mobFileWriter.getPath(), path);
290 } else {
291 try {
292
293 store.getFileSystem().delete(mobFileWriter.getPath(), true);
294 } catch (IOException e) {
295 LOG.error("Failed to delete the temp mob file", e);
296 }
297 }
298 }
299 if (delFileWriter != null) {
300 if (deleteMarkersCount > 0) {
301
302
303 mobStore.commitFile(delFileWriter.getPath(), path);
304 } else {
305 try {
306
307 store.getFileSystem().delete(delFileWriter.getPath(), true);
308 } catch (IOException e) {
309 LOG.error("Failed to delete the temp del file", e);
310 }
311 }
312 }
313 mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob);
314 mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob);
315 mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob);
316 mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob);
317 progress.complete();
318 return true;
319 }
320 }