View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  
24  import org.apache.hadoop.hbase.Cell;
25  import org.apache.hadoop.hbase.CellComparator;
26  import org.apache.hadoop.hbase.CellUtil;
27  import org.apache.hadoop.hbase.HConstants;
28  import org.apache.hadoop.hbase.KeyValue;
29  import org.apache.hadoop.hbase.KeyValue.KVComparator;
30  import org.apache.hadoop.hbase.KeyValue.SamePrefixComparator;
31  import org.apache.hadoop.hbase.KeyValue.Type;
32  import org.apache.hadoop.hbase.KeyValueUtil;
33  import org.apache.hadoop.hbase.SettableSequenceId;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.io.HeapSize;
36  import org.apache.hadoop.hbase.io.TagCompressionContext;
37  import org.apache.hadoop.hbase.io.hfile.BlockType;
38  import org.apache.hadoop.hbase.io.hfile.HFileContext;
39  import org.apache.hadoop.hbase.io.util.LRUDictionary;
40  import org.apache.hadoop.hbase.util.ByteBufferUtils;
41  import org.apache.hadoop.hbase.util.Bytes;
42  import org.apache.hadoop.hbase.util.ClassSize;
43  import org.apache.hadoop.io.WritableUtils;
44  
45  /**
46   * Base class for all data block encoders that use a buffer.
47   */
48  @InterfaceAudience.Private
49  abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
50  
51    private static int INITIAL_KEY_BUFFER_SIZE = 512;
52  
53    @Override
54    public ByteBuffer decodeKeyValues(DataInputStream source,
55        HFileBlockDecodingContext blkDecodingCtx) throws IOException {
56      if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) {
57        throw new IOException(this.getClass().getName() + " only accepts "
58            + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context.");
59      }
60  
61      HFileBlockDefaultDecodingContext decodingCtx =
62          (HFileBlockDefaultDecodingContext) blkDecodingCtx;
63      if (decodingCtx.getHFileContext().isIncludesTags()
64          && decodingCtx.getHFileContext().isCompressTags()) {
65        if (decodingCtx.getTagCompressionContext() != null) {
66          // It will be overhead to create the TagCompressionContext again and again for every block
67          // decoding.
68          decodingCtx.getTagCompressionContext().clear();
69        } else {
70          try {
71            TagCompressionContext tagCompressionContext = new TagCompressionContext(
72                LRUDictionary.class, Byte.MAX_VALUE);
73            decodingCtx.setTagCompressionContext(tagCompressionContext);
74          } catch (Exception e) {
75            throw new IOException("Failed to initialize TagCompressionContext", e);
76          }
77        }
78      }
79      return internalDecodeKeyValues(source, 0, 0, decodingCtx);
80    }
81  
82    protected static class SeekerState implements Cell {
83      protected ByteBuffer currentBuffer;
84      protected TagCompressionContext tagCompressionContext;
85      protected int valueOffset = -1;
86      protected int keyLength;
87      protected int valueLength;
88      protected int lastCommonPrefix;
89      protected int tagsLength = 0;
90      protected int tagsOffset = -1;
91      protected int tagsCompressedLength = 0;
92      protected boolean uncompressTags = true;
93  
94      /** We need to store a copy of the key. */
95      protected byte[] keyBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
96      protected byte[] tagsBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
97  
98      protected long memstoreTS;
99      protected int nextKvOffset;
100     protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue();
101 
102     protected boolean isValid() {
103       return valueOffset != -1;
104     }
105 
106     protected void invalidate() {
107       valueOffset = -1;
108       tagsCompressedLength = 0;
109       currentKey = new KeyValue.KeyOnlyKeyValue();
110       uncompressTags = true;
111       currentBuffer = null;
112     }
113 
114     protected void ensureSpaceForKey() {
115       if (keyLength > keyBuffer.length) {
116         // rare case, but we need to handle arbitrary length of key
117         int newKeyBufferLength = Math.max(keyBuffer.length, 1) * 2;
118         while (keyLength > newKeyBufferLength) {
119           newKeyBufferLength *= 2;
120         }
121         byte[] newKeyBuffer = new byte[newKeyBufferLength];
122         System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length);
123         keyBuffer = newKeyBuffer;
124       }
125     }
126 
127     protected void ensureSpaceForTags() {
128       if (tagsLength > tagsBuffer.length) {
129         // rare case, but we need to handle arbitrary length of tags
130         int newTagsBufferLength = Math.max(tagsBuffer.length, 1) * 2;
131         while (tagsLength > newTagsBufferLength) {
132           newTagsBufferLength *= 2;
133         }
134         byte[] newTagsBuffer = new byte[newTagsBufferLength];
135         System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length);
136         tagsBuffer = newTagsBuffer;
137       }
138     }
139 
140     protected void setKey(byte[] keyBuffer, long memTS) {
141       currentKey.setKey(keyBuffer, 0, keyLength);
142       memstoreTS = memTS;
143     }
144 
145     /**
146      * Copy the state from the next one into this instance (the previous state
147      * placeholder). Used to save the previous state when we are advancing the
148      * seeker to the next key/value.
149      */
150     protected void copyFromNext(SeekerState nextState) {
151       if (keyBuffer.length != nextState.keyBuffer.length) {
152         keyBuffer = nextState.keyBuffer.clone();
153       } else if (!isValid()) {
154         // Note: we can only call isValid before we override our state, so this
155         // comes before all the assignments at the end of this method.
156         System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0,
157              nextState.keyLength);
158       } else {
159         // don't copy the common prefix between this key and the previous one
160         System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix,
161             keyBuffer, nextState.lastCommonPrefix, nextState.keyLength
162                 - nextState.lastCommonPrefix);
163       }
164       currentKey = nextState.currentKey;
165 
166       valueOffset = nextState.valueOffset;
167       keyLength = nextState.keyLength;
168       valueLength = nextState.valueLength;
169       lastCommonPrefix = nextState.lastCommonPrefix;
170       nextKvOffset = nextState.nextKvOffset;
171       memstoreTS = nextState.memstoreTS;
172       currentBuffer = nextState.currentBuffer;
173       tagsOffset = nextState.tagsOffset;
174       tagsLength = nextState.tagsLength;
175       if (nextState.tagCompressionContext != null) {
176         tagCompressionContext = nextState.tagCompressionContext;
177       }
178     }
179 
180     @Override
181     public byte[] getRowArray() {
182       return currentKey.getRowArray();
183     }
184 
185     @Override
186     public int getRowOffset() {
187       return Bytes.SIZEOF_SHORT;
188     }
189 
190     @Override
191     public short getRowLength() {
192       return currentKey.getRowLength();
193     }
194 
195     @Override
196     public byte[] getFamilyArray() {
197       return currentKey.getFamilyArray();
198     }
199 
200     @Override
201     public int getFamilyOffset() {
202       return currentKey.getFamilyOffset();
203     }
204 
205     @Override
206     public byte getFamilyLength() {
207       return currentKey.getFamilyLength();
208     }
209 
210     @Override
211     public byte[] getQualifierArray() {
212       return currentKey.getQualifierArray();
213     }
214 
215     @Override
216     public int getQualifierOffset() {
217       return currentKey.getQualifierOffset();
218     }
219 
220     @Override
221     public int getQualifierLength() {
222       return currentKey.getQualifierLength();
223     }
224 
225     @Override
226     public long getTimestamp() {
227       return currentKey.getTimestamp();
228     }
229 
230     @Override
231     public byte getTypeByte() {
232       return currentKey.getTypeByte();
233     }
234 
235     @Override
236     public long getMvccVersion() {
237       return memstoreTS;
238     }
239 
240     @Override
241     public long getSequenceId() {
242       return memstoreTS;
243     }
244 
245     @Override
246     public byte[] getValueArray() {
247       return currentBuffer.array();
248     }
249 
250     @Override
251     public int getValueOffset() {
252       return currentBuffer.arrayOffset() + valueOffset;
253     }
254 
255     @Override
256     public int getValueLength() {
257       return valueLength;
258     }
259 
260     @Override
261     public byte[] getTagsArray() {
262       if (tagCompressionContext != null) {
263         return tagsBuffer;
264       }
265       return currentBuffer.array();
266     }
267 
268     @Override
269     public int getTagsOffset() {
270       if (tagCompressionContext != null) {
271         return 0;
272       }
273       return currentBuffer.arrayOffset() + tagsOffset;
274     }
275 
276     @Override
277     public int getTagsLength() {
278       return tagsLength;
279     }
280 
281     @Override
282     @Deprecated
283     public byte[] getValue() {
284       throw new UnsupportedOperationException("getValue() not supported");
285     }
286 
287     @Override
288     @Deprecated
289     public byte[] getFamily() {
290       throw new UnsupportedOperationException("getFamily() not supported");
291     }
292 
293     @Override
294     @Deprecated
295     public byte[] getQualifier() {
296       throw new UnsupportedOperationException("getQualifier() not supported");
297     }
298 
299     @Override
300     @Deprecated
301     public byte[] getRow() {
302       throw new UnsupportedOperationException("getRow() not supported");
303     }
304 
305     @Override
306     public String toString() {
307       return KeyValue.keyToString(this.keyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
308           + getValueLength() + "/seqid=" + memstoreTS;
309     }
310 
311     public Cell shallowCopy() {
312       return new ClonedSeekerState(currentBuffer, keyBuffer, currentKey.getRowLength(),
313           currentKey.getFamilyOffset(), currentKey.getFamilyLength(), keyLength, 
314           currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
315           currentKey.getTimestamp(), currentKey.getTypeByte(), valueLength, valueOffset,
316           memstoreTS, tagsOffset, tagsLength, tagCompressionContext, tagsBuffer);
317     }
318   }
319 
320   /**
321    * Copies only the key part of the keybuffer by doing a deep copy and passes the 
322    * seeker state members for taking a clone.
323    * Note that the value byte[] part is still pointing to the currentBuffer and the 
324    * represented by the valueOffset and valueLength
325    */
326   // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId
327   // there. So this has to be an instance of SettableSequenceId. SeekerState need not be
328   // SettableSequenceId as we never return that to top layers. When we have to, we make
329   // ClonedSeekerState from it.
330   protected static class ClonedSeekerState implements Cell, HeapSize, SettableSequenceId {
331     private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
332         + (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
333         + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (2 * ClassSize.ARRAY));
334     private byte[] keyOnlyBuffer;
335     private ByteBuffer currentBuffer;
336     private short rowLength;
337     private int familyOffset;
338     private byte familyLength;
339     private int qualifierOffset;
340     private int qualifierLength;
341     private long timestamp;
342     private byte typeByte;
343     private int valueOffset;
344     private int valueLength;
345     private int tagsLength;
346     private int tagsOffset;
347     private byte[] cloneTagsBuffer;
348     private long seqId;
349     private TagCompressionContext tagCompressionContext;
350     
351     protected ClonedSeekerState(ByteBuffer currentBuffer, byte[] keyBuffer, short rowLength,
352         int familyOffset, byte familyLength, int keyLength, int qualOffset, int qualLength,
353         long timeStamp, byte typeByte, int valueLen, int valueOffset, long seqId,
354         int tagsOffset, int tagsLength, TagCompressionContext tagCompressionContext,
355         byte[] tagsBuffer) {
356       this.currentBuffer = currentBuffer;
357       keyOnlyBuffer = new byte[keyLength];
358       this.tagCompressionContext = tagCompressionContext;
359       this.rowLength = rowLength;
360       this.familyOffset = familyOffset;
361       this.familyLength = familyLength;
362       this.qualifierOffset = qualOffset;
363       this.qualifierLength = qualLength;
364       this.timestamp = timeStamp;
365       this.typeByte = typeByte;
366       this.valueLength = valueLen;
367       this.valueOffset = valueOffset;
368       this.tagsOffset = tagsOffset;
369       this.tagsLength = tagsLength;
370       System.arraycopy(keyBuffer, 0, keyOnlyBuffer, 0, keyLength);
371       if (tagCompressionContext != null) {
372         this.cloneTagsBuffer = new byte[tagsLength];
373         System.arraycopy(tagsBuffer, 0, this.cloneTagsBuffer, 0, tagsLength);
374       }
375       setSequenceId(seqId);
376     }
377 
378     @Override
379     public byte[] getRowArray() {
380       return keyOnlyBuffer;
381     }
382 
383     @Override
384     public byte[] getFamilyArray() {
385       return keyOnlyBuffer;
386     }
387 
388     @Override
389     public byte[] getQualifierArray() {
390       return keyOnlyBuffer;
391     }
392 
393     @Override
394     public int getRowOffset() {
395       return Bytes.SIZEOF_SHORT;
396     }
397 
398     @Override
399     public short getRowLength() {
400       return rowLength;
401     }
402 
403     @Override
404     public int getFamilyOffset() {
405       return familyOffset;
406     }
407 
408     @Override
409     public byte getFamilyLength() {
410       return familyLength;
411     }
412 
413     @Override
414     public int getQualifierOffset() {
415       return qualifierOffset;
416     }
417 
418     @Override
419     public int getQualifierLength() {
420       return qualifierLength;
421     }
422 
423     @Override
424     public long getTimestamp() {
425       return timestamp;
426     }
427 
428     @Override
429     public byte getTypeByte() {
430       return typeByte;
431     }
432 
433     @Override
434     @Deprecated
435     public long getMvccVersion() {
436       return getSequenceId();
437     }
438 
439     @Override
440     public long getSequenceId() {
441       return seqId;
442     }
443 
444     @Override
445     public byte[] getValueArray() {
446       return currentBuffer.array();
447     }
448 
449     @Override
450     public int getValueOffset() {
451       return currentBuffer.arrayOffset() + valueOffset;
452     }
453 
454     @Override
455     public int getValueLength() {
456       return valueLength;
457     }
458 
459     @Override
460     public byte[] getTagsArray() {
461       if (tagCompressionContext != null) {
462         return cloneTagsBuffer;
463       }
464       return currentBuffer.array();
465     }
466 
467     @Override
468     public int getTagsOffset() {
469       if (tagCompressionContext != null) {
470         return 0;
471       }
472       return currentBuffer.arrayOffset() + tagsOffset;
473     }
474 
475     @Override
476     public int getTagsLength() {
477       return tagsLength;
478     }
479 
480     @Override
481     @Deprecated
482     public byte[] getValue() {
483       return CellUtil.cloneValue(this);
484     }
485 
486     @Override
487     @Deprecated
488     public byte[] getFamily() {
489       return CellUtil.cloneFamily(this);
490     }
491 
492     @Override
493     @Deprecated
494     public byte[] getQualifier() {
495       return CellUtil.cloneQualifier(this);
496     }
497 
498     @Override
499     @Deprecated
500     public byte[] getRow() {
501       return CellUtil.cloneRow(this);
502     }
503 
504     @Override
505     public String toString() {
506       return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
507           + getValueLength() + "/seqid=" + seqId;
508     }
509 
510     @Override
511     public void setSequenceId(long seqId) {
512       this.seqId = seqId;
513     }
514 
515     @Override
516     public long heapSize() {
517       return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength;
518     }
519   }
520 
521   protected abstract static class
522       BufferedEncodedSeeker<STATE extends SeekerState>
523       implements EncodedSeeker {
524     protected HFileBlockDecodingContext decodingCtx;
525     protected final KVComparator comparator;
526     protected final SamePrefixComparator<byte[]> samePrefixComparator;
527     protected ByteBuffer currentBuffer;
528     protected STATE current = createSeekerState(); // always valid
529     protected STATE previous = createSeekerState(); // may not be valid
530     protected TagCompressionContext tagCompressionContext = null;
531 
532     public BufferedEncodedSeeker(KVComparator comparator,
533         HFileBlockDecodingContext decodingCtx) {
534       this.comparator = comparator;
535       this.samePrefixComparator = comparator;
536       this.decodingCtx = decodingCtx;
537       if (decodingCtx.getHFileContext().isCompressTags()) {
538         try {
539           tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
540         } catch (Exception e) {
541           throw new RuntimeException("Failed to initialize TagCompressionContext", e);
542         }
543       }
544     }
545     
546     protected boolean includesMvcc() {
547       return this.decodingCtx.getHFileContext().isIncludesMvcc();
548     }
549 
550     protected boolean includesTags() {
551       return this.decodingCtx.getHFileContext().isIncludesTags();
552     }
553 
554     @Override
555     public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
556       return comparator.compareFlatKey(key, offset, length,
557           current.keyBuffer, 0, current.keyLength);
558     }
559 
560     @Override
561     public int compareKey(KVComparator comparator, Cell key) {
562       return comparator.compareOnlyKeyPortion(key,
563           new KeyValue.KeyOnlyKeyValue(current.keyBuffer, 0, current.keyLength));
564     }
565 
566     @Override
567     public void setCurrentBuffer(ByteBuffer buffer) {
568       if (this.tagCompressionContext != null) {
569         this.tagCompressionContext.clear();
570       }
571       currentBuffer = buffer;
572       current.currentBuffer = currentBuffer;
573       if(tagCompressionContext != null) {
574         current.tagCompressionContext = tagCompressionContext;
575       }
576       decodeFirst();
577       current.setKey(current.keyBuffer, current.memstoreTS);
578       previous.invalidate();
579     }
580 
581     @Override
582     public ByteBuffer getKeyDeepCopy() {
583       ByteBuffer keyBuffer = ByteBuffer.allocate(current.keyLength);
584       keyBuffer.put(current.keyBuffer, 0, current.keyLength);
585       keyBuffer.rewind();
586       return keyBuffer;
587     }
588 
589     @Override
590     public ByteBuffer getValueShallowCopy() {
591       ByteBuffer dup = currentBuffer.duplicate();
592       dup.position(current.valueOffset);
593       dup.limit(current.valueOffset + current.valueLength);
594       return dup.slice();
595     }
596 
597     @Override
598     public ByteBuffer getKeyValueBuffer() {
599       ByteBuffer kvBuffer = createKVBuffer();
600       kvBuffer.putInt(current.keyLength);
601       kvBuffer.putInt(current.valueLength);
602       kvBuffer.put(current.keyBuffer, 0, current.keyLength);
603       ByteBufferUtils.copyFromBufferToBuffer(kvBuffer, currentBuffer, current.valueOffset,
604           current.valueLength);
605       if (current.tagsLength > 0) {
606         // Put short as unsigned
607         kvBuffer.put((byte) (current.tagsLength >> 8 & 0xff));
608         kvBuffer.put((byte) (current.tagsLength & 0xff));
609         if (current.tagsOffset != -1) {
610           // the offset of the tags bytes in the underlying buffer is marked. So the temp
611           // buffer,tagsBuffer was not been used.
612           ByteBufferUtils.copyFromBufferToBuffer(kvBuffer, currentBuffer, current.tagsOffset,
613               current.tagsLength);
614         } else {
615           // When tagsOffset is marked as -1, tag compression was present and so the tags were
616           // uncompressed into temp buffer, tagsBuffer. Let us copy it from there
617           kvBuffer.put(current.tagsBuffer, 0, current.tagsLength);
618         }
619       }
620       kvBuffer.rewind();
621       return kvBuffer;
622     }
623 
624     protected ByteBuffer createKVBuffer() {
625       int kvBufSize = (int) KeyValue.getKeyValueDataStructureSize(current.keyLength,
626           current.valueLength, current.tagsLength);
627       ByteBuffer kvBuffer = ByteBuffer.allocate(kvBufSize);
628       return kvBuffer;
629     }
630 
631     @Override
632     public Cell getKeyValue() {
633       return current.shallowCopy();
634     }
635 
636     @Override
637     public void rewind() {
638       currentBuffer.rewind();
639       if (tagCompressionContext != null) {
640         tagCompressionContext.clear();
641       }
642       decodeFirst();
643       current.setKey(current.keyBuffer, current.memstoreTS);
644       previous.invalidate();
645     }
646 
647     @Override
648     public boolean next() {
649       if (!currentBuffer.hasRemaining()) {
650         return false;
651       }
652       decodeNext();
653       current.setKey(current.keyBuffer, current.memstoreTS);
654       previous.invalidate();
655       return true;
656     }
657 
658     protected void decodeTags() {
659       current.tagsLength = ByteBufferUtils.readCompressedInt(currentBuffer);
660       if (tagCompressionContext != null) {
661         if (current.uncompressTags) {
662           // Tag compression is been used. uncompress it into tagsBuffer
663           current.ensureSpaceForTags();
664           try {
665             current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer,
666                 current.tagsBuffer, 0, current.tagsLength);
667           } catch (IOException e) {
668             throw new RuntimeException("Exception while uncompressing tags", e);
669           }
670         } else {
671           ByteBufferUtils.skip(currentBuffer, current.tagsCompressedLength);
672           current.uncompressTags = true;// Reset this.
673         }
674         current.tagsOffset = -1;
675       } else {
676         // When tag compress is not used, let us not do copying of tags bytes into tagsBuffer.
677         // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer()
678         current.tagsOffset = currentBuffer.position();
679         ByteBufferUtils.skip(currentBuffer, current.tagsLength);
680       }
681     }
682 
683     @Override
684     public int seekToKeyInBlock(byte[] key, int offset, int length, boolean seekBefore) {
685       return seekToKeyInBlock(new KeyValue.KeyOnlyKeyValue(key, offset, length), seekBefore);
686     }
687 
688     @Override
689     public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
690       int rowCommonPrefix = 0;
691       int familyCommonPrefix = 0;
692       int qualCommonPrefix = 0;
693       previous.invalidate();
694       KeyValue.KeyOnlyKeyValue currentCell = new KeyValue.KeyOnlyKeyValue();
695       do {
696         int comp;
697         if (samePrefixComparator != null) {
698           currentCell.setKey(current.keyBuffer, 0, current.keyLength);
699           if (current.lastCommonPrefix != 0) {
700             // The KV format has row key length also in the byte array. The
701             // common prefix
702             // includes it. So we need to subtract to find out the common prefix
703             // in the
704             // row part alone
705             rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2);
706           }
707           if (current.lastCommonPrefix <= 2) {
708             rowCommonPrefix = 0;
709           }
710           rowCommonPrefix += CellComparator.findCommonPrefixInRowPart(seekCell, currentCell,
711               rowCommonPrefix);
712           comp = CellComparator.compareCommonRowPrefix(seekCell, currentCell, rowCommonPrefix);
713           if (comp == 0) {
714             comp = compareTypeBytes(seekCell, currentCell);
715             if (comp == 0) {
716               // Subtract the fixed row key length and the family key fixed length
717               familyCommonPrefix = Math.max(
718                   0,
719                   Math.min(familyCommonPrefix,
720                       current.lastCommonPrefix - (3 + currentCell.getRowLength())));
721               familyCommonPrefix += CellComparator.findCommonPrefixInFamilyPart(seekCell,
722                   currentCell, familyCommonPrefix);
723               comp = CellComparator.compareCommonFamilyPrefix(seekCell, currentCell,
724                   familyCommonPrefix);
725               if (comp == 0) {
726                 // subtract the rowkey fixed length and the family key fixed
727                 // length
728                 qualCommonPrefix = Math.max(
729                     0,
730                     Math.min(
731                         qualCommonPrefix,
732                         current.lastCommonPrefix
733                             - (3 + currentCell.getRowLength() + currentCell.getFamilyLength())));
734                 qualCommonPrefix += CellComparator.findCommonPrefixInQualifierPart(seekCell,
735                     currentCell, qualCommonPrefix);
736                 comp = CellComparator.compareCommonQualifierPrefix(seekCell, currentCell,
737                     qualCommonPrefix);
738                 if (comp == 0) {
739                   comp = CellComparator.compareTimestamps(seekCell, currentCell);
740                   if (comp == 0) {
741                     // Compare types. Let the delete types sort ahead of puts;
742                     // i.e. types
743                     // of higher numbers sort before those of lesser numbers.
744                     // Maximum
745                     // (255)
746                     // appears ahead of everything, and minimum (0) appears
747                     // after
748                     // everything.
749                     comp = (0xff & currentCell.getTypeByte()) - (0xff & seekCell.getTypeByte());
750                   }
751                 }
752               }
753             }
754           }
755         } else {
756           Cell r = new KeyValue.KeyOnlyKeyValue(current.keyBuffer, 0, current.keyLength);
757           comp = comparator.compareOnlyKeyPortion(seekCell, r);
758         }
759         if (comp == 0) { // exact match
760           if (seekBefore) {
761             if (!previous.isValid()) {
762               // The caller (seekBefore) has to ensure that we are not at the
763               // first key in the block.
764               throw new IllegalStateException("Cannot seekBefore if "
765                   + "positioned at the first key in the block: key="
766                   + Bytes.toStringBinary(seekCell.getRowArray()));
767             }
768             moveToPrevious();
769             return 1;
770           }
771           return 0;
772         }
773 
774         if (comp < 0) { // already too large, check previous
775           if (previous.isValid()) {
776             moveToPrevious();
777           } else {
778             return HConstants.INDEX_KEY_MAGIC; // using optimized index key
779           }
780           return 1;
781         }
782 
783         // move to next, if more data is available
784         if (currentBuffer.hasRemaining()) {
785           previous.copyFromNext(current);
786           decodeNext();
787           current.setKey(current.keyBuffer, current.memstoreTS);
788         } else {
789           break;
790         }
791       } while (true);
792 
793       // we hit the end of the block, not an exact match
794       return 1;
795     }
796 
797     private int compareTypeBytes(Cell key, Cell right) {
798       if (key.getFamilyLength() + key.getQualifierLength() == 0
799           && key.getTypeByte() == Type.Minimum.getCode()) {
800         // left is "bigger", i.e. it appears later in the sorted order
801         return 1;
802       }
803       if (right.getFamilyLength() + right.getQualifierLength() == 0
804           && right.getTypeByte() == Type.Minimum.getCode()) {
805         return -1;
806       }
807       return 0;
808     }
809 
810 
811     private void moveToPrevious() {
812       if (!previous.isValid()) {
813         throw new IllegalStateException(
814             "Can move back only once and not in first key in the block.");
815       }
816 
817       STATE tmp = previous;
818       previous = current;
819       current = tmp;
820 
821       // move after last key value
822       currentBuffer.position(current.nextKvOffset);
823       // Already decoded the tag bytes. We cache this tags into current state and also the total
824       // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode
825       // the tags again. This might pollute the Data Dictionary what we use for the compression.
826       // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip
827       // 'tagsCompressedLength' bytes of source stream.
828       // See in decodeTags()
829       current.tagsBuffer = previous.tagsBuffer;
830       current.tagsCompressedLength = previous.tagsCompressedLength;
831       current.uncompressTags = false;
832       current.setKey(current.keyBuffer, current.memstoreTS);
833       previous.invalidate();
834     }
835 
836     @SuppressWarnings("unchecked")
837     protected STATE createSeekerState() {
838       // This will fail for non-default seeker state if the subclass does not
839       // override this method.
840       return (STATE) new SeekerState();
841     }
842 
843     abstract protected void decodeFirst();
844     abstract protected void decodeNext();
845   }
846 
847   /**
848    * @param cell
849    * @param out
850    * @param encodingCtx
851    * @return unencoded size added
852    * @throws IOException
853    */
854   protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out,
855       HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
856     int size = 0;
857     if (encodingCtx.getHFileContext().isIncludesTags()) {
858       int tagsLength = cell.getTagsLength();
859       ByteBufferUtils.putCompressedInt(out, tagsLength);
860       // There are some tags to be written
861       if (tagsLength > 0) {
862         TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext();
863         // When tag compression is enabled, tagCompressionContext will have a not null value. Write
864         // the tags using Dictionary compression in such a case
865         if (tagCompressionContext != null) {
866           tagCompressionContext
867               .compressTags(out, cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
868         } else {
869           out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
870         }
871       }
872       size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
873     }
874     if (encodingCtx.getHFileContext().isIncludesMvcc()) {
875       // Copy memstore timestamp from the byte buffer to the output stream.
876       long memstoreTS = cell.getSequenceId();
877       WritableUtils.writeVLong(out, memstoreTS);
878       // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be
879       // avoided.
880       size += WritableUtils.getVIntSize(memstoreTS);
881     }
882     return size;
883   }
884 
885   protected final void afterDecodingKeyValue(DataInputStream source,
886       ByteBuffer dest, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
887     if (decodingCtx.getHFileContext().isIncludesTags()) {
888       int tagsLength = ByteBufferUtils.readCompressedInt(source);
889       // Put as unsigned short
890       dest.put((byte) ((tagsLength >> 8) & 0xff));
891       dest.put((byte) (tagsLength & 0xff));
892       if (tagsLength > 0) {
893         TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
894         // When tag compression is been used in this file, tagCompressionContext will have a not
895         // null value passed.
896         if (tagCompressionContext != null) {
897           tagCompressionContext.uncompressTags(source, dest, tagsLength);
898         } else {
899           ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
900         }
901       }
902     }
903     if (decodingCtx.getHFileContext().isIncludesMvcc()) {
904       long memstoreTS = -1;
905       try {
906         // Copy memstore timestamp from the data input stream to the byte
907         // buffer.
908         memstoreTS = WritableUtils.readVLong(source);
909         ByteBufferUtils.writeVLong(dest, memstoreTS);
910       } catch (IOException ex) {
911         throw new RuntimeException("Unable to copy memstore timestamp " +
912             memstoreTS + " after decoding a key/value");
913       }
914     }
915   }
916 
917   @Override
918   public HFileBlockEncodingContext newDataBlockEncodingContext(DataBlockEncoding encoding,
919       byte[] header, HFileContext meta) {
920     return new HFileBlockDefaultEncodingContext(encoding, header, meta);
921   }
922 
923   @Override
924   public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) {
925     return new HFileBlockDefaultDecodingContext(meta);
926   }
927 
928   protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source,
929       int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)
930       throws IOException;
931 
932   /**
933    * Asserts that there is at least the given amount of unfilled space
934    * remaining in the given buffer.
935    * @param out typically, the buffer we are writing to
936    * @param length the required space in the buffer
937    * @throws EncoderBufferTooSmallException If there are no enough bytes.
938    */
939   protected static void ensureSpace(ByteBuffer out, int length)
940       throws EncoderBufferTooSmallException {
941     if (out.position() + length > out.limit()) {
942       throw new EncoderBufferTooSmallException(
943           "Buffer position=" + out.position() +
944           ", buffer limit=" + out.limit() +
945           ", length to be written=" + length);
946     }
947   }
948 
949   @Override
950   public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
951       throws IOException {
952     if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
953       throw new IOException (this.getClass().getName() + " only accepts "
954           + HFileBlockDefaultEncodingContext.class.getName() + " as the " +
955           "encoding context.");
956     }
957 
958     HFileBlockDefaultEncodingContext encodingCtx =
959         (HFileBlockDefaultEncodingContext) blkEncodingCtx;
960     encodingCtx.prepareEncoding(out);
961     if (encodingCtx.getHFileContext().isIncludesTags()
962         && encodingCtx.getHFileContext().isCompressTags()) {
963       if (encodingCtx.getTagCompressionContext() != null) {
964         // It will be overhead to create the TagCompressionContext again and again for every block
965         // encoding.
966         encodingCtx.getTagCompressionContext().clear();
967       } else {
968         try {
969           TagCompressionContext tagCompressionContext = new TagCompressionContext(
970               LRUDictionary.class, Byte.MAX_VALUE);
971           encodingCtx.setTagCompressionContext(tagCompressionContext);
972         } catch (Exception e) {
973           throw new IOException("Failed to initialize TagCompressionContext", e);
974         }
975       }
976     }
977     ByteBufferUtils.putInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding()
978     blkEncodingCtx.setEncodingState(new BufferedDataBlockEncodingState());
979   }
980 
981   private static class BufferedDataBlockEncodingState extends EncodingState {
982     int unencodedDataSizeWritten = 0;
983   }
984 
985   @Override
986   public int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
987       throws IOException {
988     BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
989         .getEncodingState();
990     int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out);
991     state.unencodedDataSizeWritten += encodedKvSize;
992     return encodedKvSize;
993   }
994 
995   public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx,
996       DataOutputStream out) throws IOException;
997 
998   @Override
999   public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
1000       byte[] uncompressedBytesWithHeader) throws IOException {
1001     BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
1002         .getEncodingState();
1003     // Write the unencodedDataSizeWritten (with header size)
1004     Bytes.putInt(uncompressedBytesWithHeader, HConstants.HFILEBLOCK_HEADER_SIZE
1005         + DataBlockEncoding.ID_SIZE, state.unencodedDataSizeWritten
1006         );
1007     if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
1008       encodingCtx.postEncoding(BlockType.ENCODED_DATA);
1009     } else {
1010       encodingCtx.postEncoding(BlockType.DATA);
1011     }
1012   }
1013 }