1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.compactions;
19
20 import com.google.common.io.Closeables;
21
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.security.PrivilegedExceptionAction;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.atomic.AtomicInteger;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.hbase.Cell;
37 import org.apache.hadoop.hbase.CellUtil;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.KeyValueUtil;
40 import org.apache.hadoop.hbase.classification.InterfaceAudience;
41 import org.apache.hadoop.hbase.client.Scan;
42 import org.apache.hadoop.hbase.io.compress.Compression;
43 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
44 import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
45 import org.apache.hadoop.hbase.regionserver.HStore;
46 import org.apache.hadoop.hbase.regionserver.InternalScanner;
47 import org.apache.hadoop.hbase.regionserver.ScanType;
48 import org.apache.hadoop.hbase.regionserver.ScannerContext;
49 import org.apache.hadoop.hbase.regionserver.Store;
50 import org.apache.hadoop.hbase.regionserver.StoreFile;
51 import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
52 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
53 import org.apache.hadoop.hbase.regionserver.StoreScanner;
54 import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
55 import org.apache.hadoop.hbase.regionserver.compactions.Compactor.CellSink;
56 import org.apache.hadoop.hbase.security.User;
57 import org.apache.hadoop.hbase.util.Bytes;
58 import org.apache.hadoop.hbase.util.Writables;
59 import org.apache.hadoop.util.StringUtils;
60 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
61 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
62
63
64
65
66
67 @InterfaceAudience.Private
68 public abstract class Compactor<T extends CellSink> {
69 private static final Log LOG = LogFactory.getLog(Compactor.class);
70
71 protected volatile CompactionProgress progress;
72
73 protected final Configuration conf;
74 protected final Store store;
75
76 protected final int compactionKVMax;
77 protected final Compression.Algorithm compactionCompression;
78
79
80 protected int keepSeqIdPeriod;
81
82
83 Compactor(final Configuration conf, final Store store) {
84 this.conf = conf;
85 this.store = store;
86 this.compactionKVMax =
87 this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
88 this.compactionCompression = (this.store.getFamily() == null) ?
89 Compression.Algorithm.NONE : this.store.getFamily().getCompactionCompression();
90 this.keepSeqIdPeriod = Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD,
91 HConstants.MIN_KEEP_SEQID_PERIOD), HConstants.MIN_KEEP_SEQID_PERIOD);
92 }
93
94 public interface CellSink {
95 void append(Cell cell) throws IOException;
96 }
97
98 protected interface CellSinkFactory<S> {
99 S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind)
100 throws IOException;
101 }
102
103 public CompactionProgress getProgress() {
104 return this.progress;
105 }
106
107
108 protected static class FileDetails {
109
110 public long maxKeyCount = 0;
111
112 public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
113
114 public long latestPutTs = HConstants.LATEST_TIMESTAMP;
115
116 public long maxSeqId = 0;
117
118 public long maxMVCCReadpoint = 0;
119
120 public int maxTagsLength = 0;
121
122 public long minSeqIdToKeep = 0;
123 }
124
125
126
127
128
129
130
131 protected FileDetails getFileDetails(
132 Collection<StoreFile> filesToCompact, boolean allFiles) throws IOException {
133 FileDetails fd = new FileDetails();
134 long oldestHFileTimeStampToKeepMVCC = System.currentTimeMillis() -
135 (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod);
136
137 for (StoreFile file : filesToCompact) {
138 if(allFiles && (file.getModificationTimeStamp() < oldestHFileTimeStampToKeepMVCC)) {
139
140
141 if(fd.minSeqIdToKeep < file.getMaxMemstoreTS()) {
142 fd.minSeqIdToKeep = file.getMaxMemstoreTS();
143 }
144 }
145 long seqNum = file.getMaxSequenceId();
146 fd.maxSeqId = Math.max(fd.maxSeqId, seqNum);
147 StoreFile.Reader r = file.getReader();
148 if (r == null) {
149 LOG.warn("Null reader for " + file.getPath());
150 continue;
151 }
152
153
154
155 long keyCount = r.getEntries();
156 fd.maxKeyCount += keyCount;
157
158 Map<byte[], byte[]> fileInfo = r.loadFileInfo();
159 byte[] tmp = null;
160
161
162 if (r.isBulkLoaded()) {
163 fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID());
164 }
165 else {
166 tmp = fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
167 if (tmp != null) {
168 fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp));
169 }
170 }
171 tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN);
172 if (tmp != null) {
173 fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp));
174 }
175
176
177 long earliestPutTs = 0;
178 if (allFiles) {
179 tmp = fileInfo.get(StoreFile.EARLIEST_PUT_TS);
180 if (tmp == null) {
181
182
183 fd.earliestPutTs = earliestPutTs = HConstants.OLDEST_TIMESTAMP;
184 } else {
185 earliestPutTs = Bytes.toLong(tmp);
186 fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs);
187 }
188 }
189 tmp = fileInfo.get(StoreFile.TIMERANGE_KEY);
190 TimeRangeTracker trt = new TimeRangeTracker();
191 if (tmp == null) {
192 fd.latestPutTs = HConstants.LATEST_TIMESTAMP;
193 } else {
194 Writables.copyWritable(tmp, trt);
195 fd.latestPutTs = trt.getMaximumTimestamp();
196 }
197 if (LOG.isDebugEnabled()) {
198 LOG.debug("Compacting " + file +
199 ", keycount=" + keyCount +
200 ", bloomtype=" + r.getBloomFilterType().toString() +
201 ", size=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1) +
202 ", encoding=" + r.getHFileReader().getDataBlockEncoding() +
203 ", seqNum=" + seqNum +
204 (allFiles ? ", earliestPutTs=" + earliestPutTs: ""));
205 }
206 }
207 return fd;
208 }
209
210
211
212
213
214
215 protected List<StoreFileScanner> createFileScanners(
216 final Collection<StoreFile> filesToCompact, long smallestReadPoint) throws IOException {
217 return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true,
218 smallestReadPoint);
219 }
220
221 protected long getSmallestReadPoint() {
222 return store.getSmallestReadPoint();
223 }
224
225 protected interface InternalScannerFactory {
226
227 ScanType getScanType(CompactionRequest request);
228
229 InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
230 FileDetails fd, long smallestReadPoint) throws IOException;
231 }
232
233 protected final InternalScannerFactory defaultScannerFactory = new InternalScannerFactory() {
234
235 @Override
236 public ScanType getScanType(CompactionRequest request) {
237 return request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES
238 : ScanType.COMPACT_RETAIN_DELETES;
239 }
240
241 @Override
242 public InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
243 FileDetails fd, long smallestReadPoint) throws IOException {
244 return Compactor.this.createScanner(store, scanners, scanType, smallestReadPoint,
245 fd.earliestPutTs);
246 }
247 };
248
249
250
251
252
253
254
255 protected Writer createTmpWriter(FileDetails fd, boolean shouldDropBehind) throws IOException {
256
257
258 return store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
259
260
261
262 }
263
264 protected List<Path> compact(final CompactionRequest request,
265 InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory,
266 CompactionThroughputController throughputController, User user) throws IOException {
267 FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles());
268 this.progress = new CompactionProgress(fd.maxKeyCount);
269
270
271 long smallestReadPoint = getSmallestReadPoint();
272
273 List<StoreFileScanner> scanners;
274 Collection<StoreFile> readersToClose;
275 T writer = null;
276 if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) {
277
278
279 readersToClose = new ArrayList<StoreFile>(request.getFiles().size());
280 for (StoreFile f : request.getFiles()) {
281 StoreFile clonedStoreFile = f.cloneForReader();
282
283
284 clonedStoreFile.createReader();
285 readersToClose.add(clonedStoreFile);
286 }
287 scanners = createFileScanners(readersToClose, smallestReadPoint);
288
289 } else {
290 readersToClose = Collections.emptyList();
291 scanners = createFileScanners(request.getFiles(), smallestReadPoint);
292
293 }
294 InternalScanner scanner = null;
295 boolean finished = false;
296 try {
297
298 ScanType scanType = scannerFactory.getScanType(request);
299 scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners, user);
300 if (scanner == null) {
301 scanner = scannerFactory.createScanner(scanners, scanType, fd, smallestReadPoint);
302 }
303 scanner = postCreateCoprocScanner(request, scanType, scanner, user);
304 if (scanner == null) {
305
306 return new ArrayList<Path>();
307 }
308 boolean cleanSeqId = false;
309 if (fd.minSeqIdToKeep > 0) {
310 smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
311 cleanSeqId = true;
312 }
313 writer = sinkFactory.createWriter(scanner, fd, store.throttleCompaction(request.getSize()));
314 finished =
315 performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
316 throughputController, request.isAllFiles());
317 if (!finished) {
318 throw new InterruptedIOException("Aborting compaction of store " + store + " in region "
319 + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted.");
320 }
321 } finally {
322 Closeables.close(scanner, true);
323 for (StoreFile f : readersToClose) {
324 try {
325 f.closeReader(true);
326 } catch (IOException e) {
327 LOG.warn("Exception closing " + f, e);
328 }
329 }
330 if (!finished && writer != null) {
331 abortWriter(writer);
332 }
333 }
334 assert finished : "We should have exited the method on all error paths";
335 assert writer != null : "Writer should be non-null if no error";
336 return commitWriter(writer, fd, request);
337 }
338
339 protected abstract List<Path> commitWriter(T writer, FileDetails fd, CompactionRequest request)
340 throws IOException;
341
342 protected abstract void abortWriter(T writer) throws IOException;
343
344
345
346
347
348
349
350
351
352 protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
353 ScanType scanType, long earliestPutTs, List<StoreFileScanner> scanners) throws IOException {
354 return preCreateCoprocScanner(request, scanType, earliestPutTs, scanners, null);
355 }
356
357 protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
358 final ScanType scanType, final long earliestPutTs, final List<StoreFileScanner> scanners,
359 User user) throws IOException {
360 if (store.getCoprocessorHost() == null) return null;
361 if (user == null) {
362 return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType,
363 earliestPutTs, request);
364 } else {
365 try {
366 return user.getUGI().doAs(new PrivilegedExceptionAction<InternalScanner>() {
367 @Override
368 public InternalScanner run() throws Exception {
369 return store.getCoprocessorHost().preCompactScannerOpen(store, scanners,
370 scanType, earliestPutTs, request);
371 }
372 });
373 } catch (InterruptedException ie) {
374 InterruptedIOException iioe = new InterruptedIOException();
375 iioe.initCause(ie);
376 throw iioe;
377 }
378 }
379 }
380
381
382
383
384
385
386
387
388 protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
389 final ScanType scanType, final InternalScanner scanner, User user) throws IOException {
390 if (store.getCoprocessorHost() == null) {
391 return scanner;
392 }
393 if (user == null) {
394 return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
395 } else {
396 try {
397 return user.getUGI().doAs(new PrivilegedExceptionAction<InternalScanner>() {
398 @Override
399 public InternalScanner run() throws Exception {
400 return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
401 }
402 });
403 } catch (InterruptedException ie) {
404 InterruptedIOException iioe = new InterruptedIOException();
405 iioe.initCause(ie);
406 throw iioe;
407 }
408 }
409 }
410
411
412
413
414
415 private static final AtomicInteger NAME_COUNTER = new AtomicInteger(0);
416
417 private String generateCompactionName() {
418 int counter;
419 for (;;) {
420 counter = NAME_COUNTER.get();
421 int next = counter == Integer.MAX_VALUE ? 0 : counter + 1;
422 if (NAME_COUNTER.compareAndSet(counter, next)) {
423 break;
424 }
425 }
426 return store.getRegionInfo().getRegionNameAsString() + "#"
427 + store.getFamily().getNameAsString() + "#" + counter;
428 }
429
430
431
432
433
434
435
436
437
438
439 protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
440 long smallestReadPoint, boolean cleanSeqId,
441 CompactionThroughputController throughputController, boolean major) throws IOException {
442 long bytesWritten = 0;
443 long bytesWrittenProgress = 0;
444
445
446 List<Cell> cells = new ArrayList<Cell>();
447 long closeCheckInterval = HStore.getCloseCheckInterval();
448 long lastMillis = 0;
449 if (LOG.isDebugEnabled()) {
450 lastMillis = EnvironmentEdgeManager.currentTime();
451 }
452 String compactionName = generateCompactionName();
453 long now = 0;
454 boolean hasMore;
455 ScannerContext scannerContext =
456 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
457
458 throughputController.start(compactionName);
459 try {
460 do {
461 hasMore = scanner.next(cells, scannerContext);
462 if (LOG.isDebugEnabled()) {
463 now = EnvironmentEdgeManager.currentTime();
464 }
465
466 Cell lastCleanCell = null;
467 long lastCleanCellSeqId = 0;
468 for (Cell c : cells) {
469 if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
470 lastCleanCell = c;
471 lastCleanCellSeqId = c.getSequenceId();
472 CellUtil.setSequenceId(c, 0);
473 } else {
474 lastCleanCell = null;
475 lastCleanCellSeqId = 0;
476 }
477 writer.append(c);
478 int len = KeyValueUtil.length(c);
479 ++progress.currentCompactedKVs;
480 progress.totalCompactedSize += len;
481 if (LOG.isDebugEnabled()) {
482 bytesWrittenProgress += len;
483 }
484 throughputController.control(compactionName, len);
485
486 if (closeCheckInterval > 0) {
487 bytesWritten += len;
488 if (bytesWritten > closeCheckInterval) {
489 bytesWritten = 0;
490 if (!store.areWritesEnabled()) {
491 progress.cancel();
492 return false;
493 }
494 }
495 }
496 }
497 if (lastCleanCell != null) {
498
499 CellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId);
500 }
501
502
503 if (LOG.isDebugEnabled()) {
504 if ((now - lastMillis) >= 60 * 1000) {
505 LOG.debug("Compaction progress: "
506 + compactionName
507 + " "
508 + progress
509 + String.format(", rate=%.2f kB/sec", (bytesWrittenProgress / 1024.0)
510 / ((now - lastMillis) / 1000.0)) + ", throughputController is "
511 + throughputController);
512 lastMillis = now;
513 bytesWrittenProgress = 0;
514 }
515 }
516 cells.clear();
517 } while (hasMore);
518 } catch (InterruptedException e) {
519 progress.cancel();
520 throw new InterruptedIOException("Interrupted while control throughput of compacting "
521 + compactionName);
522 } finally {
523 throughputController.finish(compactionName);
524 }
525 progress.complete();
526 return true;
527 }
528
529
530
531
532
533
534
535
536
537 protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
538 ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
539 Scan scan = new Scan();
540 scan.setMaxVersions(store.getFamily().getMaxVersions());
541 return new StoreScanner(store, store.getScanInfo(), scan, scanners,
542 scanType, smallestReadPoint, earliestPutTs);
543 }
544
545
546
547
548
549
550
551
552
553
554 protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
555 long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
556 byte[] dropDeletesToRow) throws IOException {
557 Scan scan = new Scan();
558 scan.setMaxVersions(store.getFamily().getMaxVersions());
559 return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint,
560 earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
561 }
562
563
564
565
566
567
568
569
570 protected void appendMetadataAndCloseWriter(StoreFile.Writer writer, FileDetails fd,
571 boolean isMajor) throws IOException {
572 writer.appendMetadata(fd.maxSeqId, isMajor);
573 writer.close();
574 }
575 }