1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.io.hfile;
21
22 import java.io.DataOutput;
23 import java.io.DataOutputStream;
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.List;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.FSDataOutputStream;
32 import org.apache.hadoop.fs.FileSystem;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.hbase.Cell;
35 import org.apache.hadoop.hbase.CellComparator;
36 import org.apache.hadoop.hbase.CellUtil;
37 import org.apache.hadoop.hbase.KeyValue.KVComparator;
38 import org.apache.hadoop.hbase.classification.InterfaceAudience;
39 import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
40 import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
41 import org.apache.hadoop.hbase.util.BloomFilterWriter;
42 import org.apache.hadoop.hbase.util.Bytes;
43 import org.apache.hadoop.io.Writable;
44
45
46
47
48 @InterfaceAudience.Private
49 public class HFileWriterV2 extends AbstractHFileWriter {
50 static final Log LOG = LogFactory.getLog(HFileWriterV2.class);
51
52
53 public static final byte [] MAX_MEMSTORE_TS_KEY =
54 Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
55
56
57 public static final byte [] KEY_VALUE_VERSION =
58 Bytes.toBytes("KEY_VALUE_VERSION");
59
60
61 public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
62
63
64 private List<InlineBlockWriter> inlineBlockWriters =
65 new ArrayList<InlineBlockWriter>();
66
67
68 protected HFileBlock.Writer fsBlockWriter;
69
70 private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter;
71 private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter;
72
73
74 private long firstDataBlockOffset = -1;
75
76
77 protected long lastDataBlockOffset;
78
79
80
81
82
83 private Cell lastCellOfPreviousBlock = null;
84
85
86 private List<BlockWritable> additionalLoadOnOpenData =
87 new ArrayList<BlockWritable>();
88
89 protected long maxMemstoreTS = 0;
90
91 static class WriterFactoryV2 extends HFile.WriterFactory {
92 WriterFactoryV2(Configuration conf, CacheConfig cacheConf) {
93 super(conf, cacheConf);
94 }
95
96 @Override
97 public Writer createWriter(FileSystem fs, Path path,
98 FSDataOutputStream ostream,
99 KVComparator comparator, HFileContext context) throws IOException {
100 context.setIncludesTags(false);
101 return new HFileWriterV2(conf, cacheConf, fs, path, ostream,
102 comparator, context);
103 }
104 }
105
106
107 public HFileWriterV2(Configuration conf, CacheConfig cacheConf,
108 FileSystem fs, Path path, FSDataOutputStream ostream,
109 final KVComparator comparator, final HFileContext context) throws IOException {
110 super(cacheConf,
111 ostream == null ? createOutputStream(conf, fs, path, null) : ostream,
112 path, comparator, context);
113 finishInit(conf);
114 }
115
116
117 protected void finishInit(final Configuration conf) {
118 if (fsBlockWriter != null)
119 throw new IllegalStateException("finishInit called twice");
120
121 fsBlockWriter = new HFileBlock.Writer(blockEncoder, hFileContext);
122
123
124 boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
125 dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter,
126 cacheIndexesOnWrite ? cacheConf : null,
127 cacheIndexesOnWrite ? name : null);
128 dataBlockIndexWriter.setMaxChunkSize(
129 HFileBlockIndex.getMaxChunkSize(conf));
130 dataBlockIndexWriter.setMinIndexNumEntries(
131 HFileBlockIndex.getMinIndexNumEntries(conf));
132 inlineBlockWriters.add(dataBlockIndexWriter);
133
134
135 metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter();
136 if (LOG.isTraceEnabled()) LOG.trace("Initialized with " + cacheConf);
137 }
138
139
140
141
142
143
144 protected void checkBlockBoundary() throws IOException {
145 if (fsBlockWriter.blockSizeWritten() < hFileContext.getBlocksize())
146 return;
147
148 finishBlock();
149 writeInlineBlocks(false);
150 newBlock();
151 }
152
153
154 private void finishBlock() throws IOException {
155 if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0)
156 return;
157
158
159 if (firstDataBlockOffset == -1) {
160 firstDataBlockOffset = outputStream.getPos();
161 }
162
163 lastDataBlockOffset = outputStream.getPos();
164 fsBlockWriter.writeHeaderAndData(outputStream);
165 int onDiskSize = fsBlockWriter.getOnDiskSizeWithHeader();
166
167 Cell indexEntry =
168 CellComparator.getMidpoint(this.comparator, lastCellOfPreviousBlock, firstCellInBlock);
169 dataBlockIndexWriter.addEntry(CellUtil.getCellKeySerializedAsKeyValueKey(indexEntry),
170 lastDataBlockOffset, onDiskSize);
171 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
172 if (cacheConf.shouldCacheDataOnWrite()) {
173 doCacheOnWrite(lastDataBlockOffset);
174 }
175 }
176
177
178 private void writeInlineBlocks(boolean closing) throws IOException {
179 for (InlineBlockWriter ibw : inlineBlockWriters) {
180 while (ibw.shouldWriteBlock(closing)) {
181 long offset = outputStream.getPos();
182 boolean cacheThisBlock = ibw.getCacheOnWrite();
183 ibw.writeInlineBlock(fsBlockWriter.startWriting(
184 ibw.getInlineBlockType()));
185 fsBlockWriter.writeHeaderAndData(outputStream);
186 ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(),
187 fsBlockWriter.getUncompressedSizeWithoutHeader());
188 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
189
190 if (cacheThisBlock) {
191 doCacheOnWrite(offset);
192 }
193 }
194 }
195 }
196
197
198
199
200
201
202 private void doCacheOnWrite(long offset) {
203 HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(cacheConf);
204 cacheConf.getBlockCache().cacheBlock(
205 new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()), cacheFormatBlock);
206 }
207
208
209
210
211
212
213 protected void newBlock() throws IOException {
214
215 fsBlockWriter.startWriting(BlockType.DATA);
216 firstCellInBlock = null;
217 if (lastCell != null) {
218 lastCellOfPreviousBlock = lastCell;
219 }
220 }
221
222
223
224
225
226
227
228
229
230
231
232
233 @Override
234 public void appendMetaBlock(String metaBlockName, Writable content) {
235 byte[] key = Bytes.toBytes(metaBlockName);
236 int i;
237 for (i = 0; i < metaNames.size(); ++i) {
238
239 byte[] cur = metaNames.get(i);
240 if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0,
241 key.length) > 0) {
242 break;
243 }
244 }
245 metaNames.add(i, key);
246 metaData.add(i, content);
247 }
248
249
250
251
252
253
254
255
256 @Override
257 public void append(final Cell cell) throws IOException {
258 byte[] value = cell.getValueArray();
259 int voffset = cell.getValueOffset();
260 int vlength = cell.getValueLength();
261
262 boolean dupKey = checkKey(cell);
263 checkValue(value, voffset, vlength);
264 if (!dupKey) {
265 checkBlockBoundary();
266 }
267
268 if (!fsBlockWriter.isWriting()) {
269 newBlock();
270 }
271
272 fsBlockWriter.write(cell);
273
274 totalKeyLength += CellUtil.estimatedSerializedSizeOfKey(cell);
275 totalValueLength += vlength;
276
277
278 if (firstCellInBlock == null) {
279
280
281 firstCellInBlock = cell;
282 }
283
284
285
286 lastCell = cell;
287 entryCount++;
288 this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
289 }
290
291 @Override
292 public void close() throws IOException {
293 if (outputStream == null) {
294 return;
295 }
296
297 blockEncoder.saveMetadata(this);
298
299
300
301 finishBlock();
302 writeInlineBlocks(true);
303
304 FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion());
305
306
307 if (!metaNames.isEmpty()) {
308 for (int i = 0; i < metaNames.size(); ++i) {
309
310 long offset = outputStream.getPos();
311
312 DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META);
313 metaData.get(i).write(dos);
314
315 fsBlockWriter.writeHeaderAndData(outputStream);
316 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
317
318
319 metaBlockIndexWriter.addEntry(metaNames.get(i), offset,
320 fsBlockWriter.getOnDiskSizeWithHeader());
321 }
322 }
323
324
325
326
327
328
329
330
331
332
333 long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);
334 trailer.setLoadOnOpenOffset(rootIndexOffset);
335
336
337 metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting(
338 BlockType.ROOT_INDEX), "meta");
339 fsBlockWriter.writeHeaderAndData(outputStream);
340 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
341
342 if (this.hFileContext.isIncludesMvcc()) {
343 appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
344 appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE));
345 }
346
347
348 writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO));
349 fsBlockWriter.writeHeaderAndData(outputStream);
350 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
351
352
353 for (BlockWritable w : additionalLoadOnOpenData){
354 fsBlockWriter.writeBlock(w, outputStream);
355 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
356 }
357
358
359 trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels());
360 trailer.setUncompressedDataIndexSize(
361 dataBlockIndexWriter.getTotalUncompressedSize());
362 trailer.setFirstDataBlockOffset(firstDataBlockOffset);
363 trailer.setLastDataBlockOffset(lastDataBlockOffset);
364 trailer.setComparatorClass(comparator.getClass());
365 trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());
366
367
368 finishClose(trailer);
369
370 fsBlockWriter.release();
371 }
372
373 @Override
374 public void addInlineBlockWriter(InlineBlockWriter ibw) {
375 inlineBlockWriters.add(ibw);
376 }
377
378 @Override
379 public void addGeneralBloomFilter(final BloomFilterWriter bfw) {
380 this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META);
381 }
382
383 @Override
384 public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) {
385 this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META);
386 }
387
388 private void addBloomFilter(final BloomFilterWriter bfw,
389 final BlockType blockType) {
390 if (bfw.getKeyCount() <= 0)
391 return;
392
393 if (blockType != BlockType.GENERAL_BLOOM_META &&
394 blockType != BlockType.DELETE_FAMILY_BLOOM_META) {
395 throw new RuntimeException("Block Type: " + blockType.toString() +
396 "is not supported");
397 }
398 additionalLoadOnOpenData.add(new BlockWritable() {
399 @Override
400 public BlockType getBlockType() {
401 return blockType;
402 }
403
404 @Override
405 public void writeToBlock(DataOutput out) throws IOException {
406 bfw.getMetaWriter().write(out);
407 Writable dataWriter = bfw.getDataWriter();
408 if (dataWriter != null)
409 dataWriter.write(out);
410 }
411 });
412 }
413
414 protected int getMajorVersion() {
415 return 2;
416 }
417
418 protected int getMinorVersion() {
419 return HFileReaderV2.MAX_MINOR_VERSION;
420 }
421
422 @Override
423 public HFileContext getFileContext() {
424 return hFileContext;
425 }
426 }