1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.Collections;
24 import java.util.List;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.Cell;
29 import org.apache.hadoop.hbase.KeyValue.KVComparator;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
32 import org.apache.hadoop.hbase.util.Bytes;
33
34
35
36
37
38 @InterfaceAudience.Private
39 public abstract class StripeMultiFileWriter extends AbstractMultiFileWriter {
40
41 private static final Log LOG = LogFactory.getLog(StripeMultiFileWriter.class);
42
43 protected final KVComparator comparator;
44 protected List<StoreFile.Writer> existingWriters;
45 protected List<byte[]> boundaries;
46
47
48 private boolean doWriteStripeMetadata = true;
49
50 public StripeMultiFileWriter(KVComparator comparator) {
51 this.comparator = comparator;
52 }
53
54 public void setNoStripeMetadata() {
55 this.doWriteStripeMetadata = false;
56 }
57
58 @Override
59 protected Collection<Writer> writers() {
60 return existingWriters;
61 }
62
63 protected abstract void preCommitWritersInternal() throws IOException;
64
65 @Override
66 protected final void preCommitWriters() throws IOException {
67
68 assert this.existingWriters != null;
69 preCommitWritersInternal();
70 assert this.boundaries.size() == (this.existingWriters.size() + 1);
71 }
72
73 @Override
74 protected void preCloseWriter(Writer writer) throws IOException {
75 if (doWriteStripeMetadata) {
76 if (LOG.isDebugEnabled()) {
77 LOG.debug("Write stripe metadata for " + writer.getPath().toString());
78 }
79 int index = existingWriters.indexOf(writer);
80 writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, boundaries.get(index));
81 writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, boundaries.get(index + 1));
82 } else {
83 if (LOG.isDebugEnabled()) {
84 LOG.debug("Skip writing stripe metadata for " + writer.getPath().toString());
85 }
86 }
87 }
88
89
90
91
92
93
94
95
96 protected void sanityCheckLeft(byte[] left, byte[] row, int rowOffset, int rowLength)
97 throws IOException {
98 if (StripeStoreFileManager.OPEN_KEY != left
99 && comparator.compareRows(row, rowOffset, rowLength, left, 0, left.length) < 0) {
100 String error =
101 "The first row is lower than the left boundary of [" + Bytes.toString(left) + "]: ["
102 + Bytes.toString(row, rowOffset, rowLength) + "]";
103 LOG.error(error);
104 throw new IOException(error);
105 }
106 }
107
108
109
110
111
112
113
114
115 protected void sanityCheckRight(byte[] right, byte[] row, int rowOffset, int rowLength)
116 throws IOException {
117 if (StripeStoreFileManager.OPEN_KEY != right
118 && comparator.compareRows(row, rowOffset, rowLength, right, 0, right.length) >= 0) {
119 String error =
120 "The last row is higher or equal than the right boundary of [" + Bytes.toString(right)
121 + "]: [" + Bytes.toString(row, rowOffset, rowLength) + "]";
122 LOG.error(error);
123 throw new IOException(error);
124 }
125 }
126
127
128
129
130
131
132 public static class BoundaryMultiWriter extends StripeMultiFileWriter {
133 private StoreFile.Writer currentWriter;
134 private byte[] currentWriterEndKey;
135
136 private Cell lastCell;
137 private long cellsInCurrentWriter = 0;
138 private int majorRangeFromIndex = -1, majorRangeToIndex = -1;
139 private boolean hasAnyWriter = false;
140
141
142
143
144
145
146
147 public BoundaryMultiWriter(KVComparator comparator, List<byte[]> targetBoundaries,
148 byte[] majorRangeFrom, byte[] majorRangeTo) throws IOException {
149 super(comparator);
150 this.boundaries = targetBoundaries;
151 this.existingWriters = new ArrayList<StoreFile.Writer>(this.boundaries.size() - 1);
152
153
154 assert (majorRangeFrom == null) == (majorRangeTo == null);
155 if (majorRangeFrom != null) {
156 majorRangeFromIndex =
157 (majorRangeFrom == StripeStoreFileManager.OPEN_KEY) ? 0 : Collections.binarySearch(
158 this.boundaries, majorRangeFrom, Bytes.BYTES_COMPARATOR);
159 majorRangeToIndex =
160 (majorRangeTo == StripeStoreFileManager.OPEN_KEY) ? boundaries.size() : Collections
161 .binarySearch(this.boundaries, majorRangeTo, Bytes.BYTES_COMPARATOR);
162 if (this.majorRangeFromIndex < 0 || this.majorRangeToIndex < 0) {
163 throw new IOException("Major range does not match writer boundaries: ["
164 + Bytes.toString(majorRangeFrom) + "] [" + Bytes.toString(majorRangeTo) + "]; from "
165 + majorRangeFromIndex + " to " + majorRangeToIndex);
166 }
167 }
168 }
169
170 @Override
171 public void append(Cell cell) throws IOException {
172 if (currentWriter == null && existingWriters.isEmpty()) {
173
174 sanityCheckLeft(this.boundaries.get(0), cell.getRowArray(), cell.getRowOffset(),
175 cell.getRowLength());
176 }
177 prepareWriterFor(cell);
178 currentWriter.append(cell);
179 lastCell = cell;
180 ++cellsInCurrentWriter;
181 }
182
183 private boolean isCellAfterCurrentWriter(Cell cell) {
184 return ((currentWriterEndKey != StripeStoreFileManager.OPEN_KEY) && (comparator.compareRows(
185 cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), currentWriterEndKey, 0,
186 currentWriterEndKey.length) >= 0));
187 }
188
189 @Override
190 protected void preCommitWritersInternal() throws IOException {
191 stopUsingCurrentWriter();
192 while (existingWriters.size() < boundaries.size() - 1) {
193 createEmptyWriter();
194 }
195 if (lastCell != null) {
196 sanityCheckRight(boundaries.get(boundaries.size() - 1), lastCell.getRowArray(),
197 lastCell.getRowOffset(), lastCell.getRowLength());
198 }
199 }
200
201 private void prepareWriterFor(Cell cell) throws IOException {
202 if (currentWriter != null && !isCellAfterCurrentWriter(cell)) return;
203
204 stopUsingCurrentWriter();
205
206 while (isCellAfterCurrentWriter(cell)) {
207 checkCanCreateWriter();
208 createEmptyWriter();
209 }
210 checkCanCreateWriter();
211 hasAnyWriter = true;
212 currentWriter = writerFactory.createWriter();
213 existingWriters.add(currentWriter);
214 }
215
216
217
218
219
220
221
222
223
224
225 private void createEmptyWriter() throws IOException {
226 int index = existingWriters.size();
227 boolean isInMajorRange = (index >= majorRangeFromIndex) && (index < majorRangeToIndex);
228
229 boolean isLastWriter = !hasAnyWriter && (index == (boundaries.size() - 2));
230 boolean needEmptyFile = isInMajorRange || isLastWriter;
231 existingWriters.add(needEmptyFile ? writerFactory.createWriter() : null);
232 hasAnyWriter |= needEmptyFile;
233 currentWriterEndKey =
234 (existingWriters.size() + 1 == boundaries.size()) ? null : boundaries.get(existingWriters
235 .size() + 1);
236 }
237
238 private void checkCanCreateWriter() throws IOException {
239 int maxWriterCount = boundaries.size() - 1;
240 assert existingWriters.size() <= maxWriterCount;
241 if (existingWriters.size() >= maxWriterCount) {
242 throw new IOException("Cannot create any more writers (created " + existingWriters.size()
243 + " out of " + maxWriterCount + " - row might be out of range of all valid writers");
244 }
245 }
246
247 private void stopUsingCurrentWriter() {
248 if (currentWriter != null) {
249 if (LOG.isDebugEnabled()) {
250 LOG.debug("Stopping to use a writer after [" + Bytes.toString(currentWriterEndKey)
251 + "] row; wrote out " + cellsInCurrentWriter + " kvs");
252 }
253 cellsInCurrentWriter = 0;
254 }
255 currentWriter = null;
256 currentWriterEndKey =
257 (existingWriters.size() + 1 == boundaries.size()) ? null : boundaries.get(existingWriters
258 .size() + 1);
259 }
260 }
261
262
263
264
265
266
267 public static class SizeMultiWriter extends StripeMultiFileWriter {
268 private int targetCount;
269 private long targetCells;
270 private byte[] left;
271 private byte[] right;
272
273 private Cell lastCell;
274 private StoreFile.Writer currentWriter;
275 protected byte[] lastRowInCurrentWriter = null;
276 private long cellsInCurrentWriter = 0;
277 private long cellsSeen = 0;
278 private long cellsSeenInPrevious = 0;
279
280
281
282
283
284
285
286 public SizeMultiWriter(KVComparator comparator, int targetCount, long targetKvs, byte[] left,
287 byte[] right) {
288 super(comparator);
289 this.targetCount = targetCount;
290 this.targetCells = targetKvs;
291 this.left = left;
292 this.right = right;
293 int preallocate = Math.min(this.targetCount, 64);
294 this.existingWriters = new ArrayList<StoreFile.Writer>(preallocate);
295 this.boundaries = new ArrayList<byte[]>(preallocate + 1);
296 }
297
298 @Override
299 public void append(Cell cell) throws IOException {
300
301
302 boolean doCreateWriter = false;
303 if (currentWriter == null) {
304
305 sanityCheckLeft(left, cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
306 doCreateWriter = true;
307 } else if (lastRowInCurrentWriter != null
308 && !comparator.matchingRows(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
309 lastRowInCurrentWriter, 0, lastRowInCurrentWriter.length)) {
310 if (LOG.isDebugEnabled()) {
311 LOG.debug("Stopping to use a writer after [" + Bytes.toString(lastRowInCurrentWriter)
312 + "] row; wrote out " + cellsInCurrentWriter + " kvs");
313 }
314 lastRowInCurrentWriter = null;
315 cellsInCurrentWriter = 0;
316 cellsSeenInPrevious += cellsSeen;
317 doCreateWriter = true;
318 }
319 if (doCreateWriter) {
320 byte[] boundary = existingWriters.isEmpty() ? left : cell.getRow();
321 if (LOG.isDebugEnabled()) {
322 LOG.debug("Creating new writer starting at [" + Bytes.toString(boundary) + "]");
323 }
324 currentWriter = writerFactory.createWriter();
325 boundaries.add(boundary);
326 existingWriters.add(currentWriter);
327 }
328
329 currentWriter.append(cell);
330 lastCell = cell;
331 ++cellsInCurrentWriter;
332 cellsSeen = cellsInCurrentWriter;
333 if (this.sourceScanner != null) {
334 cellsSeen =
335 Math.max(cellsSeen, this.sourceScanner.getEstimatedNumberOfKvsScanned()
336 - cellsSeenInPrevious);
337 }
338
339
340
341 if (lastRowInCurrentWriter == null && existingWriters.size() < targetCount
342 && cellsSeen >= targetCells) {
343 lastRowInCurrentWriter = cell.getRow();
344 if (LOG.isDebugEnabled()) {
345 LOG.debug("Preparing to start a new writer after ["
346 + Bytes.toString(lastRowInCurrentWriter) + "] row; observed " + cellsSeen
347 + " kvs and wrote out " + cellsInCurrentWriter + " kvs");
348 }
349 }
350 }
351
352 @Override
353 protected void preCommitWritersInternal() throws IOException {
354 if (LOG.isDebugEnabled()) {
355 LOG.debug("Stopping with "
356 + cellsInCurrentWriter
357 + " kvs in last writer"
358 + ((this.sourceScanner == null) ? "" : ("; observed estimated "
359 + this.sourceScanner.getEstimatedNumberOfKvsScanned() + " KVs total")));
360 }
361 if (lastCell != null) {
362 sanityCheckRight(right, lastCell.getRowArray(), lastCell.getRowOffset(),
363 lastCell.getRowLength());
364 }
365
366
367
368 if (existingWriters.isEmpty() && 1 == targetCount) {
369 if (LOG.isDebugEnabled()) {
370 LOG.debug("Merge expired stripes into one, create an empty file to preserve metadata.");
371 }
372 boundaries.add(left);
373 existingWriters.add(writerFactory.createWriter());
374 }
375
376 this.boundaries.add(right);
377 }
378 }
379 }