1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.io.encoding;
18
19 import java.io.DataInputStream;
20 import java.io.DataOutputStream;
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23
24 import org.apache.hadoop.hbase.Cell;
25 import org.apache.hadoop.hbase.CellComparator;
26 import org.apache.hadoop.hbase.CellUtil;
27 import org.apache.hadoop.hbase.HConstants;
28 import org.apache.hadoop.hbase.KeyValue;
29 import org.apache.hadoop.hbase.KeyValue.KVComparator;
30 import org.apache.hadoop.hbase.KeyValue.SamePrefixComparator;
31 import org.apache.hadoop.hbase.KeyValue.Type;
32 import org.apache.hadoop.hbase.KeyValueUtil;
33 import org.apache.hadoop.hbase.SettableSequenceId;
34 import org.apache.hadoop.hbase.classification.InterfaceAudience;
35 import org.apache.hadoop.hbase.io.HeapSize;
36 import org.apache.hadoop.hbase.io.TagCompressionContext;
37 import org.apache.hadoop.hbase.io.hfile.BlockType;
38 import org.apache.hadoop.hbase.io.hfile.HFileContext;
39 import org.apache.hadoop.hbase.io.util.LRUDictionary;
40 import org.apache.hadoop.hbase.util.ByteBufferUtils;
41 import org.apache.hadoop.hbase.util.Bytes;
42 import org.apache.hadoop.hbase.util.ClassSize;
43 import org.apache.hadoop.io.WritableUtils;
44
45
46
47
48 @InterfaceAudience.Private
49 abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
50
51 private static int INITIAL_KEY_BUFFER_SIZE = 512;
52
53 @Override
54 public ByteBuffer decodeKeyValues(DataInputStream source,
55 HFileBlockDecodingContext blkDecodingCtx) throws IOException {
56 if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) {
57 throw new IOException(this.getClass().getName() + " only accepts "
58 + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context.");
59 }
60
61 HFileBlockDefaultDecodingContext decodingCtx =
62 (HFileBlockDefaultDecodingContext) blkDecodingCtx;
63 if (decodingCtx.getHFileContext().isIncludesTags()
64 && decodingCtx.getHFileContext().isCompressTags()) {
65 if (decodingCtx.getTagCompressionContext() != null) {
66
67
68 decodingCtx.getTagCompressionContext().clear();
69 } else {
70 try {
71 TagCompressionContext tagCompressionContext = new TagCompressionContext(
72 LRUDictionary.class, Byte.MAX_VALUE);
73 decodingCtx.setTagCompressionContext(tagCompressionContext);
74 } catch (Exception e) {
75 throw new IOException("Failed to initialize TagCompressionContext", e);
76 }
77 }
78 }
79 return internalDecodeKeyValues(source, 0, 0, decodingCtx);
80 }
81
82 protected static class SeekerState implements Cell {
83 protected ByteBuffer currentBuffer;
84 protected TagCompressionContext tagCompressionContext;
85 protected int valueOffset = -1;
86 protected int keyLength;
87 protected int valueLength;
88 protected int lastCommonPrefix;
89 protected int tagsLength = 0;
90 protected int tagsOffset = -1;
91 protected int tagsCompressedLength = 0;
92 protected boolean uncompressTags = true;
93
94
95 protected byte[] keyBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
96 protected byte[] tagsBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
97
98 protected long memstoreTS;
99 protected int nextKvOffset;
100 protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue();
101
102 protected boolean isValid() {
103 return valueOffset != -1;
104 }
105
106 protected void invalidate() {
107 valueOffset = -1;
108 tagsCompressedLength = 0;
109 currentKey = new KeyValue.KeyOnlyKeyValue();
110 uncompressTags = true;
111 currentBuffer = null;
112 }
113
114 protected void ensureSpaceForKey() {
115 if (keyLength > keyBuffer.length) {
116
117 int newKeyBufferLength = Math.max(keyBuffer.length, 1) * 2;
118 while (keyLength > newKeyBufferLength) {
119 newKeyBufferLength *= 2;
120 }
121 byte[] newKeyBuffer = new byte[newKeyBufferLength];
122 System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length);
123 keyBuffer = newKeyBuffer;
124 }
125 }
126
127 protected void ensureSpaceForTags() {
128 if (tagsLength > tagsBuffer.length) {
129
130 int newTagsBufferLength = Math.max(tagsBuffer.length, 1) * 2;
131 while (tagsLength > newTagsBufferLength) {
132 newTagsBufferLength *= 2;
133 }
134 byte[] newTagsBuffer = new byte[newTagsBufferLength];
135 System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length);
136 tagsBuffer = newTagsBuffer;
137 }
138 }
139
140 protected void setKey(byte[] keyBuffer, long memTS) {
141 currentKey.setKey(keyBuffer, 0, keyLength);
142 memstoreTS = memTS;
143 }
144
145
146
147
148
149
150 protected void copyFromNext(SeekerState nextState) {
151 if (keyBuffer.length != nextState.keyBuffer.length) {
152 keyBuffer = nextState.keyBuffer.clone();
153 } else if (!isValid()) {
154
155
156 System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0,
157 nextState.keyLength);
158 } else {
159
160 System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix,
161 keyBuffer, nextState.lastCommonPrefix, nextState.keyLength
162 - nextState.lastCommonPrefix);
163 }
164 currentKey = nextState.currentKey;
165
166 valueOffset = nextState.valueOffset;
167 keyLength = nextState.keyLength;
168 valueLength = nextState.valueLength;
169 lastCommonPrefix = nextState.lastCommonPrefix;
170 nextKvOffset = nextState.nextKvOffset;
171 memstoreTS = nextState.memstoreTS;
172 currentBuffer = nextState.currentBuffer;
173 tagsOffset = nextState.tagsOffset;
174 tagsLength = nextState.tagsLength;
175 if (nextState.tagCompressionContext != null) {
176 tagCompressionContext = nextState.tagCompressionContext;
177 }
178 }
179
180 @Override
181 public byte[] getRowArray() {
182 return currentKey.getRowArray();
183 }
184
185 @Override
186 public int getRowOffset() {
187 return Bytes.SIZEOF_SHORT;
188 }
189
190 @Override
191 public short getRowLength() {
192 return currentKey.getRowLength();
193 }
194
195 @Override
196 public byte[] getFamilyArray() {
197 return currentKey.getFamilyArray();
198 }
199
200 @Override
201 public int getFamilyOffset() {
202 return currentKey.getFamilyOffset();
203 }
204
205 @Override
206 public byte getFamilyLength() {
207 return currentKey.getFamilyLength();
208 }
209
210 @Override
211 public byte[] getQualifierArray() {
212 return currentKey.getQualifierArray();
213 }
214
215 @Override
216 public int getQualifierOffset() {
217 return currentKey.getQualifierOffset();
218 }
219
220 @Override
221 public int getQualifierLength() {
222 return currentKey.getQualifierLength();
223 }
224
225 @Override
226 public long getTimestamp() {
227 return currentKey.getTimestamp();
228 }
229
230 @Override
231 public byte getTypeByte() {
232 return currentKey.getTypeByte();
233 }
234
235 @Override
236 public long getMvccVersion() {
237 return memstoreTS;
238 }
239
240 @Override
241 public long getSequenceId() {
242 return memstoreTS;
243 }
244
245 @Override
246 public byte[] getValueArray() {
247 return currentBuffer.array();
248 }
249
250 @Override
251 public int getValueOffset() {
252 return currentBuffer.arrayOffset() + valueOffset;
253 }
254
255 @Override
256 public int getValueLength() {
257 return valueLength;
258 }
259
260 @Override
261 public byte[] getTagsArray() {
262 if (tagCompressionContext != null) {
263 return tagsBuffer;
264 }
265 return currentBuffer.array();
266 }
267
268 @Override
269 public int getTagsOffset() {
270 if (tagCompressionContext != null) {
271 return 0;
272 }
273 return currentBuffer.arrayOffset() + tagsOffset;
274 }
275
276 @Override
277 public int getTagsLength() {
278 return tagsLength;
279 }
280
281 @Override
282 @Deprecated
283 public byte[] getValue() {
284 throw new UnsupportedOperationException("getValue() not supported");
285 }
286
287 @Override
288 @Deprecated
289 public byte[] getFamily() {
290 throw new UnsupportedOperationException("getFamily() not supported");
291 }
292
293 @Override
294 @Deprecated
295 public byte[] getQualifier() {
296 throw new UnsupportedOperationException("getQualifier() not supported");
297 }
298
299 @Override
300 @Deprecated
301 public byte[] getRow() {
302 throw new UnsupportedOperationException("getRow() not supported");
303 }
304
305 @Override
306 public String toString() {
307 return KeyValue.keyToString(this.keyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
308 + getValueLength() + "/seqid=" + memstoreTS;
309 }
310
311 public Cell shallowCopy() {
312 return new ClonedSeekerState(currentBuffer, keyBuffer, currentKey.getRowLength(),
313 currentKey.getFamilyOffset(), currentKey.getFamilyLength(), keyLength,
314 currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
315 currentKey.getTimestamp(), currentKey.getTypeByte(), valueLength, valueOffset,
316 memstoreTS, tagsOffset, tagsLength, tagCompressionContext, tagsBuffer);
317 }
318 }
319
320
321
322
323
324
325
326
327
328
329
330 protected static class ClonedSeekerState implements Cell, HeapSize, SettableSequenceId {
331 private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
332 + (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
333 + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (2 * ClassSize.ARRAY));
334 private byte[] keyOnlyBuffer;
335 private ByteBuffer currentBuffer;
336 private short rowLength;
337 private int familyOffset;
338 private byte familyLength;
339 private int qualifierOffset;
340 private int qualifierLength;
341 private long timestamp;
342 private byte typeByte;
343 private int valueOffset;
344 private int valueLength;
345 private int tagsLength;
346 private int tagsOffset;
347 private byte[] cloneTagsBuffer;
348 private long seqId;
349 private TagCompressionContext tagCompressionContext;
350
351 protected ClonedSeekerState(ByteBuffer currentBuffer, byte[] keyBuffer, short rowLength,
352 int familyOffset, byte familyLength, int keyLength, int qualOffset, int qualLength,
353 long timeStamp, byte typeByte, int valueLen, int valueOffset, long seqId,
354 int tagsOffset, int tagsLength, TagCompressionContext tagCompressionContext,
355 byte[] tagsBuffer) {
356 this.currentBuffer = currentBuffer;
357 keyOnlyBuffer = new byte[keyLength];
358 this.tagCompressionContext = tagCompressionContext;
359 this.rowLength = rowLength;
360 this.familyOffset = familyOffset;
361 this.familyLength = familyLength;
362 this.qualifierOffset = qualOffset;
363 this.qualifierLength = qualLength;
364 this.timestamp = timeStamp;
365 this.typeByte = typeByte;
366 this.valueLength = valueLen;
367 this.valueOffset = valueOffset;
368 this.tagsOffset = tagsOffset;
369 this.tagsLength = tagsLength;
370 System.arraycopy(keyBuffer, 0, keyOnlyBuffer, 0, keyLength);
371 if (tagCompressionContext != null) {
372 this.cloneTagsBuffer = new byte[tagsLength];
373 System.arraycopy(tagsBuffer, 0, this.cloneTagsBuffer, 0, tagsLength);
374 }
375 setSequenceId(seqId);
376 }
377
378 @Override
379 public byte[] getRowArray() {
380 return keyOnlyBuffer;
381 }
382
383 @Override
384 public byte[] getFamilyArray() {
385 return keyOnlyBuffer;
386 }
387
388 @Override
389 public byte[] getQualifierArray() {
390 return keyOnlyBuffer;
391 }
392
393 @Override
394 public int getRowOffset() {
395 return Bytes.SIZEOF_SHORT;
396 }
397
398 @Override
399 public short getRowLength() {
400 return rowLength;
401 }
402
403 @Override
404 public int getFamilyOffset() {
405 return familyOffset;
406 }
407
408 @Override
409 public byte getFamilyLength() {
410 return familyLength;
411 }
412
413 @Override
414 public int getQualifierOffset() {
415 return qualifierOffset;
416 }
417
418 @Override
419 public int getQualifierLength() {
420 return qualifierLength;
421 }
422
423 @Override
424 public long getTimestamp() {
425 return timestamp;
426 }
427
428 @Override
429 public byte getTypeByte() {
430 return typeByte;
431 }
432
433 @Override
434 @Deprecated
435 public long getMvccVersion() {
436 return getSequenceId();
437 }
438
439 @Override
440 public long getSequenceId() {
441 return seqId;
442 }
443
444 @Override
445 public byte[] getValueArray() {
446 return currentBuffer.array();
447 }
448
449 @Override
450 public int getValueOffset() {
451 return currentBuffer.arrayOffset() + valueOffset;
452 }
453
454 @Override
455 public int getValueLength() {
456 return valueLength;
457 }
458
459 @Override
460 public byte[] getTagsArray() {
461 if (tagCompressionContext != null) {
462 return cloneTagsBuffer;
463 }
464 return currentBuffer.array();
465 }
466
467 @Override
468 public int getTagsOffset() {
469 if (tagCompressionContext != null) {
470 return 0;
471 }
472 return currentBuffer.arrayOffset() + tagsOffset;
473 }
474
475 @Override
476 public int getTagsLength() {
477 return tagsLength;
478 }
479
480 @Override
481 @Deprecated
482 public byte[] getValue() {
483 return CellUtil.cloneValue(this);
484 }
485
486 @Override
487 @Deprecated
488 public byte[] getFamily() {
489 return CellUtil.cloneFamily(this);
490 }
491
492 @Override
493 @Deprecated
494 public byte[] getQualifier() {
495 return CellUtil.cloneQualifier(this);
496 }
497
498 @Override
499 @Deprecated
500 public byte[] getRow() {
501 return CellUtil.cloneRow(this);
502 }
503
504 @Override
505 public String toString() {
506 return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
507 + getValueLength() + "/seqid=" + seqId;
508 }
509
510 @Override
511 public void setSequenceId(long seqId) {
512 this.seqId = seqId;
513 }
514
515 @Override
516 public long heapSize() {
517 return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength;
518 }
519 }
520
521 protected abstract static class
522 BufferedEncodedSeeker<STATE extends SeekerState>
523 implements EncodedSeeker {
524 protected HFileBlockDecodingContext decodingCtx;
525 protected final KVComparator comparator;
526 protected final SamePrefixComparator<byte[]> samePrefixComparator;
527 protected ByteBuffer currentBuffer;
528 protected STATE current = createSeekerState();
529 protected STATE previous = createSeekerState();
530 protected TagCompressionContext tagCompressionContext = null;
531
532 public BufferedEncodedSeeker(KVComparator comparator,
533 HFileBlockDecodingContext decodingCtx) {
534 this.comparator = comparator;
535 this.samePrefixComparator = comparator;
536 this.decodingCtx = decodingCtx;
537 if (decodingCtx.getHFileContext().isCompressTags()) {
538 try {
539 tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
540 } catch (Exception e) {
541 throw new RuntimeException("Failed to initialize TagCompressionContext", e);
542 }
543 }
544 }
545
546 protected boolean includesMvcc() {
547 return this.decodingCtx.getHFileContext().isIncludesMvcc();
548 }
549
550 protected boolean includesTags() {
551 return this.decodingCtx.getHFileContext().isIncludesTags();
552 }
553
554 @Override
555 public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
556 return comparator.compareFlatKey(key, offset, length,
557 current.keyBuffer, 0, current.keyLength);
558 }
559
560 @Override
561 public int compareKey(KVComparator comparator, Cell key) {
562 return comparator.compareOnlyKeyPortion(key,
563 new KeyValue.KeyOnlyKeyValue(current.keyBuffer, 0, current.keyLength));
564 }
565
566 @Override
567 public void setCurrentBuffer(ByteBuffer buffer) {
568 if (this.tagCompressionContext != null) {
569 this.tagCompressionContext.clear();
570 }
571 currentBuffer = buffer;
572 current.currentBuffer = currentBuffer;
573 if(tagCompressionContext != null) {
574 current.tagCompressionContext = tagCompressionContext;
575 }
576 decodeFirst();
577 current.setKey(current.keyBuffer, current.memstoreTS);
578 previous.invalidate();
579 }
580
581 @Override
582 public ByteBuffer getKeyDeepCopy() {
583 ByteBuffer keyBuffer = ByteBuffer.allocate(current.keyLength);
584 keyBuffer.put(current.keyBuffer, 0, current.keyLength);
585 keyBuffer.rewind();
586 return keyBuffer;
587 }
588
589 @Override
590 public ByteBuffer getValueShallowCopy() {
591 ByteBuffer dup = currentBuffer.duplicate();
592 dup.position(current.valueOffset);
593 dup.limit(current.valueOffset + current.valueLength);
594 return dup.slice();
595 }
596
597 @Override
598 public ByteBuffer getKeyValueBuffer() {
599 ByteBuffer kvBuffer = createKVBuffer();
600 kvBuffer.putInt(current.keyLength);
601 kvBuffer.putInt(current.valueLength);
602 kvBuffer.put(current.keyBuffer, 0, current.keyLength);
603 ByteBufferUtils.copyFromBufferToBuffer(kvBuffer, currentBuffer, current.valueOffset,
604 current.valueLength);
605 if (current.tagsLength > 0) {
606
607 kvBuffer.put((byte) (current.tagsLength >> 8 & 0xff));
608 kvBuffer.put((byte) (current.tagsLength & 0xff));
609 if (current.tagsOffset != -1) {
610
611
612 ByteBufferUtils.copyFromBufferToBuffer(kvBuffer, currentBuffer, current.tagsOffset,
613 current.tagsLength);
614 } else {
615
616
617 kvBuffer.put(current.tagsBuffer, 0, current.tagsLength);
618 }
619 }
620 kvBuffer.rewind();
621 return kvBuffer;
622 }
623
624 protected ByteBuffer createKVBuffer() {
625 int kvBufSize = (int) KeyValue.getKeyValueDataStructureSize(current.keyLength,
626 current.valueLength, current.tagsLength);
627 ByteBuffer kvBuffer = ByteBuffer.allocate(kvBufSize);
628 return kvBuffer;
629 }
630
631 @Override
632 public Cell getKeyValue() {
633 return current.shallowCopy();
634 }
635
636 @Override
637 public void rewind() {
638 currentBuffer.rewind();
639 if (tagCompressionContext != null) {
640 tagCompressionContext.clear();
641 }
642 decodeFirst();
643 current.setKey(current.keyBuffer, current.memstoreTS);
644 previous.invalidate();
645 }
646
647 @Override
648 public boolean next() {
649 if (!currentBuffer.hasRemaining()) {
650 return false;
651 }
652 decodeNext();
653 current.setKey(current.keyBuffer, current.memstoreTS);
654 previous.invalidate();
655 return true;
656 }
657
658 protected void decodeTags() {
659 current.tagsLength = ByteBufferUtils.readCompressedInt(currentBuffer);
660 if (tagCompressionContext != null) {
661 if (current.uncompressTags) {
662
663 current.ensureSpaceForTags();
664 try {
665 current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer,
666 current.tagsBuffer, 0, current.tagsLength);
667 } catch (IOException e) {
668 throw new RuntimeException("Exception while uncompressing tags", e);
669 }
670 } else {
671 ByteBufferUtils.skip(currentBuffer, current.tagsCompressedLength);
672 current.uncompressTags = true;
673 }
674 current.tagsOffset = -1;
675 } else {
676
677
678 current.tagsOffset = currentBuffer.position();
679 ByteBufferUtils.skip(currentBuffer, current.tagsLength);
680 }
681 }
682
683 @Override
684 public int seekToKeyInBlock(byte[] key, int offset, int length, boolean seekBefore) {
685 return seekToKeyInBlock(new KeyValue.KeyOnlyKeyValue(key, offset, length), seekBefore);
686 }
687
688 @Override
689 public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
690 int rowCommonPrefix = 0;
691 int familyCommonPrefix = 0;
692 int qualCommonPrefix = 0;
693 previous.invalidate();
694 KeyValue.KeyOnlyKeyValue currentCell = new KeyValue.KeyOnlyKeyValue();
695 do {
696 int comp;
697 if (samePrefixComparator != null) {
698 currentCell.setKey(current.keyBuffer, 0, current.keyLength);
699 if (current.lastCommonPrefix != 0) {
700
701
702
703
704
705 rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2);
706 }
707 if (current.lastCommonPrefix <= 2) {
708 rowCommonPrefix = 0;
709 }
710 rowCommonPrefix += CellComparator.findCommonPrefixInRowPart(seekCell, currentCell,
711 rowCommonPrefix);
712 comp = CellComparator.compareCommonRowPrefix(seekCell, currentCell, rowCommonPrefix);
713 if (comp == 0) {
714 comp = compareTypeBytes(seekCell, currentCell);
715 if (comp == 0) {
716
717 familyCommonPrefix = Math.max(
718 0,
719 Math.min(familyCommonPrefix,
720 current.lastCommonPrefix - (3 + currentCell.getRowLength())));
721 familyCommonPrefix += CellComparator.findCommonPrefixInFamilyPart(seekCell,
722 currentCell, familyCommonPrefix);
723 comp = CellComparator.compareCommonFamilyPrefix(seekCell, currentCell,
724 familyCommonPrefix);
725 if (comp == 0) {
726
727
728 qualCommonPrefix = Math.max(
729 0,
730 Math.min(
731 qualCommonPrefix,
732 current.lastCommonPrefix
733 - (3 + currentCell.getRowLength() + currentCell.getFamilyLength())));
734 qualCommonPrefix += CellComparator.findCommonPrefixInQualifierPart(seekCell,
735 currentCell, qualCommonPrefix);
736 comp = CellComparator.compareCommonQualifierPrefix(seekCell, currentCell,
737 qualCommonPrefix);
738 if (comp == 0) {
739 comp = CellComparator.compareTimestamps(seekCell, currentCell);
740 if (comp == 0) {
741
742
743
744
745
746
747
748
749 comp = (0xff & currentCell.getTypeByte()) - (0xff & seekCell.getTypeByte());
750 }
751 }
752 }
753 }
754 }
755 } else {
756 Cell r = new KeyValue.KeyOnlyKeyValue(current.keyBuffer, 0, current.keyLength);
757 comp = comparator.compareOnlyKeyPortion(seekCell, r);
758 }
759 if (comp == 0) {
760 if (seekBefore) {
761 if (!previous.isValid()) {
762
763
764 throw new IllegalStateException("Cannot seekBefore if "
765 + "positioned at the first key in the block: key="
766 + Bytes.toStringBinary(seekCell.getRowArray()));
767 }
768 moveToPrevious();
769 return 1;
770 }
771 return 0;
772 }
773
774 if (comp < 0) {
775 if (previous.isValid()) {
776 moveToPrevious();
777 } else {
778 return HConstants.INDEX_KEY_MAGIC;
779 }
780 return 1;
781 }
782
783
784 if (currentBuffer.hasRemaining()) {
785 previous.copyFromNext(current);
786 decodeNext();
787 current.setKey(current.keyBuffer, current.memstoreTS);
788 } else {
789 break;
790 }
791 } while (true);
792
793
794 return 1;
795 }
796
797 private int compareTypeBytes(Cell key, Cell right) {
798 if (key.getFamilyLength() + key.getQualifierLength() == 0
799 && key.getTypeByte() == Type.Minimum.getCode()) {
800
801 return 1;
802 }
803 if (right.getFamilyLength() + right.getQualifierLength() == 0
804 && right.getTypeByte() == Type.Minimum.getCode()) {
805 return -1;
806 }
807 return 0;
808 }
809
810
811 private void moveToPrevious() {
812 if (!previous.isValid()) {
813 throw new IllegalStateException(
814 "Can move back only once and not in first key in the block.");
815 }
816
817 STATE tmp = previous;
818 previous = current;
819 current = tmp;
820
821
822 currentBuffer.position(current.nextKvOffset);
823
824
825
826
827
828
829 current.tagsBuffer = previous.tagsBuffer;
830 current.tagsCompressedLength = previous.tagsCompressedLength;
831 current.uncompressTags = false;
832 current.setKey(current.keyBuffer, current.memstoreTS);
833 previous.invalidate();
834 }
835
836 @SuppressWarnings("unchecked")
837 protected STATE createSeekerState() {
838
839
840 return (STATE) new SeekerState();
841 }
842
843 abstract protected void decodeFirst();
844 abstract protected void decodeNext();
845 }
846
847
848
849
850
851
852
853
854 protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out,
855 HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
856 int size = 0;
857 if (encodingCtx.getHFileContext().isIncludesTags()) {
858 int tagsLength = cell.getTagsLength();
859 ByteBufferUtils.putCompressedInt(out, tagsLength);
860
861 if (tagsLength > 0) {
862 TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext();
863
864
865 if (tagCompressionContext != null) {
866 tagCompressionContext
867 .compressTags(out, cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
868 } else {
869 out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
870 }
871 }
872 size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
873 }
874 if (encodingCtx.getHFileContext().isIncludesMvcc()) {
875
876 long memstoreTS = cell.getSequenceId();
877 WritableUtils.writeVLong(out, memstoreTS);
878
879
880 size += WritableUtils.getVIntSize(memstoreTS);
881 }
882 return size;
883 }
884
885 protected final void afterDecodingKeyValue(DataInputStream source,
886 ByteBuffer dest, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
887 if (decodingCtx.getHFileContext().isIncludesTags()) {
888 int tagsLength = ByteBufferUtils.readCompressedInt(source);
889
890 dest.put((byte) ((tagsLength >> 8) & 0xff));
891 dest.put((byte) (tagsLength & 0xff));
892 if (tagsLength > 0) {
893 TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
894
895
896 if (tagCompressionContext != null) {
897 tagCompressionContext.uncompressTags(source, dest, tagsLength);
898 } else {
899 ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
900 }
901 }
902 }
903 if (decodingCtx.getHFileContext().isIncludesMvcc()) {
904 long memstoreTS = -1;
905 try {
906
907
908 memstoreTS = WritableUtils.readVLong(source);
909 ByteBufferUtils.writeVLong(dest, memstoreTS);
910 } catch (IOException ex) {
911 throw new RuntimeException("Unable to copy memstore timestamp " +
912 memstoreTS + " after decoding a key/value");
913 }
914 }
915 }
916
917 @Override
918 public HFileBlockEncodingContext newDataBlockEncodingContext(DataBlockEncoding encoding,
919 byte[] header, HFileContext meta) {
920 return new HFileBlockDefaultEncodingContext(encoding, header, meta);
921 }
922
923 @Override
924 public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) {
925 return new HFileBlockDefaultDecodingContext(meta);
926 }
927
928 protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source,
929 int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)
930 throws IOException;
931
932
933
934
935
936
937
938
939 protected static void ensureSpace(ByteBuffer out, int length)
940 throws EncoderBufferTooSmallException {
941 if (out.position() + length > out.limit()) {
942 throw new EncoderBufferTooSmallException(
943 "Buffer position=" + out.position() +
944 ", buffer limit=" + out.limit() +
945 ", length to be written=" + length);
946 }
947 }
948
949 @Override
950 public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
951 throws IOException {
952 if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
953 throw new IOException (this.getClass().getName() + " only accepts "
954 + HFileBlockDefaultEncodingContext.class.getName() + " as the " +
955 "encoding context.");
956 }
957
958 HFileBlockDefaultEncodingContext encodingCtx =
959 (HFileBlockDefaultEncodingContext) blkEncodingCtx;
960 encodingCtx.prepareEncoding(out);
961 if (encodingCtx.getHFileContext().isIncludesTags()
962 && encodingCtx.getHFileContext().isCompressTags()) {
963 if (encodingCtx.getTagCompressionContext() != null) {
964
965
966 encodingCtx.getTagCompressionContext().clear();
967 } else {
968 try {
969 TagCompressionContext tagCompressionContext = new TagCompressionContext(
970 LRUDictionary.class, Byte.MAX_VALUE);
971 encodingCtx.setTagCompressionContext(tagCompressionContext);
972 } catch (Exception e) {
973 throw new IOException("Failed to initialize TagCompressionContext", e);
974 }
975 }
976 }
977 ByteBufferUtils.putInt(out, 0);
978 blkEncodingCtx.setEncodingState(new BufferedDataBlockEncodingState());
979 }
980
981 private static class BufferedDataBlockEncodingState extends EncodingState {
982 int unencodedDataSizeWritten = 0;
983 }
984
985 @Override
986 public int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
987 throws IOException {
988 BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
989 .getEncodingState();
990 int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out);
991 state.unencodedDataSizeWritten += encodedKvSize;
992 return encodedKvSize;
993 }
994
995 public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx,
996 DataOutputStream out) throws IOException;
997
998 @Override
999 public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
1000 byte[] uncompressedBytesWithHeader) throws IOException {
1001 BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
1002 .getEncodingState();
1003
1004 Bytes.putInt(uncompressedBytesWithHeader, HConstants.HFILEBLOCK_HEADER_SIZE
1005 + DataBlockEncoding.ID_SIZE, state.unencodedDataSizeWritten
1006 );
1007 if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
1008 encodingCtx.postEncoding(BlockType.ENCODED_DATA);
1009 } else {
1010 encodingCtx.postEncoding(BlockType.DATA);
1011 }
1012 }
1013 }