1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.compactions;
19
20 import com.google.common.collect.Lists;
21
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.List;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.hbase.regionserver.InternalScanner;
35 import org.apache.hadoop.hbase.regionserver.ScanType;
36 import org.apache.hadoop.hbase.regionserver.Store;
37 import org.apache.hadoop.hbase.regionserver.StoreFile;
38 import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
39 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
40 import org.apache.hadoop.hbase.security.User;
41
42
43
44
45
46 @InterfaceAudience.Private
47 public class DefaultCompactor extends Compactor<Writer> {
48 private static final Log LOG = LogFactory.getLog(DefaultCompactor.class);
49
50 public DefaultCompactor(final Configuration conf, final Store store) {
51 super(conf, store);
52 }
53
54 private final CellSinkFactory<Writer> writerFactory = new CellSinkFactory<Writer>() {
55
56 @Override
57 public Writer createWriter(InternalScanner scanner,
58 org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
59 boolean shouldDropBehind) throws IOException {
60 return createTmpWriter(fd, shouldDropBehind);
61 }
62 };
63
64
65
66
67 public List<Path> compact(final CompactionRequest request,
68 CompactionThroughputController throughputController, User user) throws IOException {
69 return compact(request, defaultScannerFactory, writerFactory, throughputController, user);
70 }
71
72
73
74
75
76
77
78
79 protected StoreFile.Writer createTmpWriter(FileDetails fd, long smallestReadPoint) throws IOException {
80
81
82
83
84 StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
85 true, fd.maxMVCCReadpoint > 0, fd.maxTagsLength >0);
86 return writer;
87 }
88
89
90
91
92
93
94
95
96
97
98
99
100 public List<Path> compactForTesting(final Collection<StoreFile> filesToCompact, boolean isMajor)
101 throws IOException {
102 CompactionRequest cr = new CompactionRequest(filesToCompact);
103 cr.setIsMajor(isMajor, isMajor);
104 return this.compact(cr, NoLimitCompactionThroughputController.INSTANCE, null);
105 }
106
107 @Override
108 protected List<Path> commitWriter(Writer writer, FileDetails fd,
109 CompactionRequest request) throws IOException {
110 List<Path> newFiles = Lists.newArrayList(writer.getPath());
111 writer.appendMetadata(fd.maxSeqId, request.isAllFiles());
112 writer.close();
113 return newFiles;
114 }
115
116 @Override
117 protected void abortWriter(Writer writer) throws IOException {
118 Path leftoverFile = writer.getPath();
119 try {
120 writer.close();
121 } catch (IOException e) {
122 LOG.warn("Failed to close the writer after an unfinished compaction.", e);
123 }
124 try {
125 store.getFileSystem().delete(leftoverFile, false);
126 } catch (IOException e) {
127 LOG.warn(
128 "Failed to delete the leftover file " + leftoverFile + " after an unfinished compaction.",
129 e);
130 }
131 }
132 }