1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.List;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.KeyValue.KVComparator;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
34 import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
35 import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
36 import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
37
38 import com.google.common.annotations.VisibleForTesting;
39
40
41
42
43
44 @InterfaceAudience.Private
45 public class StripeStoreFlusher extends StoreFlusher {
46 private static final Log LOG = LogFactory.getLog(StripeStoreFlusher.class);
47 private final Object flushLock = new Object();
48 private final StripeCompactionPolicy policy;
49 private final StripeCompactionPolicy.StripeInformationProvider stripes;
50
51 public StripeStoreFlusher(Configuration conf, Store store,
52 StripeCompactionPolicy policy, StripeStoreFileManager stripes) {
53 super(conf, store);
54 this.policy = policy;
55 this.stripes = stripes;
56 }
57
58 @Override
59 public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
60 MonitoredTask status) throws IOException {
61 List<Path> result = new ArrayList<Path>();
62 int cellsCount = snapshot.getCellsCount();
63 if (cellsCount == 0) return result;
64
65 long smallestReadPoint = store.getSmallestReadPoint();
66 InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
67 if (scanner == null) {
68 return result;
69 }
70
71
72 StripeFlushRequest req = this.policy.selectFlush(store.getComparator(), this.stripes,
73 cellsCount);
74
75 boolean success = false;
76 StripeMultiFileWriter mw = null;
77 try {
78 mw = req.createWriter();
79 StripeMultiFileWriter.WriterFactory factory = createWriterFactory(
80 snapshot.getTimeRangeTracker(), cellsCount);
81 StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
82 mw.init(storeScanner, factory);
83
84 synchronized (flushLock) {
85 performFlush(scanner, mw, smallestReadPoint);
86 result = mw.commitWriters(cacheFlushSeqNum, false);
87 success = true;
88 }
89 } finally {
90 if (!success && (mw != null)) {
91 for (Path leftoverFile : mw.abortWriters()) {
92 try {
93 store.getFileSystem().delete(leftoverFile, false);
94 } catch (Exception e) {
95 LOG.error("Failed to delete a file after failed flush: " + e);
96 }
97 }
98 }
99 try {
100 scanner.close();
101 } catch (IOException ex) {
102 LOG.warn("Failed to close flush scanner, ignoring", ex);
103 }
104 }
105 return result;
106 }
107
108 private StripeMultiFileWriter.WriterFactory createWriterFactory(
109 final TimeRangeTracker tracker, final long kvCount) {
110 return new StripeMultiFileWriter.WriterFactory() {
111 @Override
112 public Writer createWriter() throws IOException {
113 StoreFile.Writer writer = store.createWriterInTmp(
114 kvCount, store.getFamily().getCompression(), false, true, true);
115 writer.setTimeRangeTracker(tracker);
116 return writer;
117 }
118 };
119 }
120
121
122 public static class StripeFlushRequest {
123
124 protected final KVComparator comparator;
125
126 public StripeFlushRequest(KVComparator comparator) {
127 this.comparator = comparator;
128 }
129
130 @VisibleForTesting
131 public StripeMultiFileWriter createWriter() throws IOException {
132 StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(comparator, 1,
133 Long.MAX_VALUE, OPEN_KEY, OPEN_KEY);
134 writer.setNoStripeMetadata();
135 return writer;
136 }
137 }
138
139
140 public static class BoundaryStripeFlushRequest extends StripeFlushRequest {
141 private final List<byte[]> targetBoundaries;
142
143
144 public BoundaryStripeFlushRequest(KVComparator comparator, List<byte[]> targetBoundaries) {
145 super(comparator);
146 this.targetBoundaries = targetBoundaries;
147 }
148
149 @Override
150 public StripeMultiFileWriter createWriter() throws IOException {
151 return new StripeMultiFileWriter.BoundaryMultiWriter(comparator, targetBoundaries, null,
152 null);
153 }
154 }
155
156
157 public static class SizeStripeFlushRequest extends StripeFlushRequest {
158 private final int targetCount;
159 private final long targetKvs;
160
161
162
163
164
165
166 public SizeStripeFlushRequest(KVComparator comparator, int targetCount, long targetKvs) {
167 super(comparator);
168 this.targetCount = targetCount;
169 this.targetKvs = targetKvs;
170 }
171
172 @Override
173 public StripeMultiFileWriter createWriter() throws IOException {
174 return new StripeMultiFileWriter.SizeMultiWriter(comparator, this.targetCount, this.targetKvs,
175 OPEN_KEY, OPEN_KEY);
176 }
177 }
178 }