View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.lang.management.ManagementFactory;
23  import java.lang.management.RuntimeMXBean;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.NavigableSet;
29  import java.util.SortedSet;
30  import java.util.concurrent.atomic.AtomicLong;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.Cell;
37  import org.apache.hadoop.hbase.CellUtil;
38  import org.apache.hadoop.hbase.HBaseConfiguration;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.KeyValue;
41  import org.apache.hadoop.hbase.KeyValueUtil;
42  import org.apache.hadoop.hbase.client.Scan;
43  import org.apache.hadoop.hbase.util.ByteRange;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.hadoop.hbase.util.ClassSize;
46  import org.apache.hadoop.hbase.util.CollectionBackedScanner;
47  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
48  import org.apache.hadoop.hbase.util.Pair;
49  import org.apache.hadoop.hbase.util.ReflectionUtils;
50  import org.apache.htrace.Trace;
51  
52  /**
53   * The MemStore holds in-memory modifications to the Store.  Modifications
54   * are {@link Cell}s.  When asked to flush, current memstore is moved
55   * to snapshot and is cleared.  We continue to serve edits out of new memstore
56   * and backing snapshot until flusher reports in that the flush succeeded. At
57   * this point we let the snapshot go.
58   *  <p>
59   * The MemStore functions should not be called in parallel. Callers should hold
60   *  write and read locks. This is done in {@link HStore}.
61   *  </p>
62   *
63   * TODO: Adjust size of the memstore when we remove items because they have
64   * been deleted.
65   * TODO: With new KVSLS, need to make sure we update HeapSize with difference
66   * in KV size.
67   */
68  @InterfaceAudience.Private
69  public class DefaultMemStore implements MemStore {
70    private static final Log LOG = LogFactory.getLog(DefaultMemStore.class);
71    static final String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled";
72    private static final boolean USEMSLAB_DEFAULT = true;
73    static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class";
74  
75    private Configuration conf;
76  
77    // MemStore.  Use a CellSkipListSet rather than SkipListSet because of the
78    // better semantics.  The Map will overwrite if passed a key it already had
79    // whereas the Set will not add new Cell if key is same though value might be
80    // different.  Value is not important -- just make sure always same
81    // reference passed.
82    volatile CellSkipListSet cellSet;
83  
84    // Snapshot of memstore.  Made for flusher.
85    volatile CellSkipListSet snapshot;
86  
87    final KeyValue.KVComparator comparator;
88  
89    // Used to track own heapSize
90    final AtomicLong size;
91    private volatile long snapshotSize;
92  
93    // Used to track when to flush
94    volatile long timeOfOldestEdit = Long.MAX_VALUE;
95  
96    TimeRangeTracker timeRangeTracker;
97    TimeRangeTracker snapshotTimeRangeTracker;
98  
99    volatile MemStoreLAB allocator;
100   volatile MemStoreLAB snapshotAllocator;
101   volatile long snapshotId;
102   volatile boolean tagsPresent;
103 
104   /**
105    * Default constructor. Used for tests.
106    */
107   public DefaultMemStore() {
108     this(HBaseConfiguration.create(), KeyValue.COMPARATOR);
109   }
110 
111   /**
112    * Constructor.
113    * @param c Comparator
114    */
115   public DefaultMemStore(final Configuration conf,
116                   final KeyValue.KVComparator c) {
117     this.conf = conf;
118     this.comparator = c;
119     this.cellSet = new CellSkipListSet(c);
120     this.snapshot = new CellSkipListSet(c);
121     timeRangeTracker = new TimeRangeTracker();
122     snapshotTimeRangeTracker = new TimeRangeTracker();
123     this.size = new AtomicLong(DEEP_OVERHEAD);
124     this.snapshotSize = 0;
125     if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
126       String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
127       this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
128           new Class[] { Configuration.class }, new Object[] { conf });
129     } else {
130       this.allocator = null;
131     }
132   }
133 
134   void dump() {
135     for (Cell cell: this.cellSet) {
136       LOG.info(cell);
137     }
138     for (Cell cell: this.snapshot) {
139       LOG.info(cell);
140     }
141   }
142 
143   /**
144    * Creates a snapshot of the current memstore.
145    * Snapshot must be cleared by call to {@link #clearSnapshot(long)}
146    */
147   @Override
148   public MemStoreSnapshot snapshot() {
149     // If snapshot currently has entries, then flusher failed or didn't call
150     // cleanup.  Log a warning.
151     if (!this.snapshot.isEmpty()) {
152       LOG.warn("Snapshot called again without clearing previous. " +
153           "Doing nothing. Another ongoing flush or did we fail last attempt?");
154     } else {
155       this.snapshotId = EnvironmentEdgeManager.currentTime();
156       this.snapshotSize = keySize();
157       if (!this.cellSet.isEmpty()) {
158         this.snapshot = this.cellSet;
159         this.cellSet = new CellSkipListSet(this.comparator);
160         this.snapshotTimeRangeTracker = this.timeRangeTracker;
161         this.timeRangeTracker = new TimeRangeTracker();
162         // Reset heap to not include any keys
163         this.size.set(DEEP_OVERHEAD);
164         this.snapshotAllocator = this.allocator;
165         // Reset allocator so we get a fresh buffer for the new memstore
166         if (allocator != null) {
167           String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
168           this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
169               new Class[] { Configuration.class }, new Object[] { conf });
170         }
171         timeOfOldestEdit = Long.MAX_VALUE;
172       }
173     }
174     MemStoreSnapshot memStoreSnapshot = new MemStoreSnapshot(this.snapshotId, snapshot.size(), this.snapshotSize,
175         this.snapshotTimeRangeTracker, new CollectionBackedScanner(snapshot, this.comparator),
176         this.tagsPresent);
177     this.tagsPresent = false;
178     return memStoreSnapshot;
179   }
180 
181   /**
182    * The passed snapshot was successfully persisted; it can be let go.
183    * @param id Id of the snapshot to clean out.
184    * @throws UnexpectedStateException
185    * @see #snapshot()
186    */
187   @Override
188   public void clearSnapshot(long id) throws UnexpectedStateException {
189     MemStoreLAB tmpAllocator = null;
190     if (this.snapshotId == -1) return;  // already cleared
191     if (this.snapshotId != id) {
192       throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed "
193           + id);
194     }
195     // OK. Passed in snapshot is same as current snapshot. If not-empty,
196     // create a new snapshot and let the old one go.
197     if (!this.snapshot.isEmpty()) {
198       this.snapshot = new CellSkipListSet(this.comparator);
199       this.snapshotTimeRangeTracker = new TimeRangeTracker();
200     }
201     this.snapshotSize = 0;
202     this.snapshotId = -1;
203     if (this.snapshotAllocator != null) {
204       tmpAllocator = this.snapshotAllocator;
205       this.snapshotAllocator = null;
206     }
207     if (tmpAllocator != null) {
208       tmpAllocator.close();
209     }
210   }
211 
212   @Override
213   public long getFlushableSize() {
214     return this.snapshotSize > 0 ? this.snapshotSize : keySize();
215   }
216 
217   @Override
218   public long getSnapshotSize() {
219     return this.snapshotSize;
220   }
221 
222   /**
223    * Write an update
224    * @param cell
225    * @return approximate size of the passed KV & newly added KV which maybe different than the
226    *         passed-in KV
227    */
228   @Override
229   public Pair<Long, Cell> add(Cell cell) {
230     Cell toAdd = maybeCloneWithAllocator(cell);
231     return new Pair<Long, Cell>(internalAdd(toAdd), toAdd);
232   }
233 
234   @Override
235   public long timeOfOldestEdit() {
236     return timeOfOldestEdit;
237   }
238 
239   private boolean addToCellSet(Cell e) {
240     boolean b = this.cellSet.add(e);
241     // In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call.
242     // When we use ACL CP or Visibility CP which deals with Tags during
243     // mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not
244     // parse the byte[] to identify the tags length.
245     if(e.getTagsLength() > 0) {
246       tagsPresent = true;
247     }
248     setOldestEditTimeToNow();
249     return b;
250   }
251 
252   private boolean removeFromCellSet(Cell e) {
253     boolean b = this.cellSet.remove(e);
254     setOldestEditTimeToNow();
255     return b;
256   }
257 
258   void setOldestEditTimeToNow() {
259     if (timeOfOldestEdit == Long.MAX_VALUE) {
260       timeOfOldestEdit = EnvironmentEdgeManager.currentTime();
261     }
262   }
263 
264   /**
265    * Internal version of add() that doesn't clone Cells with the
266    * allocator, and doesn't take the lock.
267    *
268    * Callers should ensure they already have the read lock taken
269    */
270   private long internalAdd(final Cell toAdd) {
271     long s = heapSizeChange(toAdd, addToCellSet(toAdd));
272     timeRangeTracker.includeTimestamp(toAdd);
273     this.size.addAndGet(s);
274     return s;
275   }
276 
277   private Cell maybeCloneWithAllocator(Cell cell) {
278     if (allocator == null) {
279       return cell;
280     }
281 
282     int len = KeyValueUtil.length(cell);
283     ByteRange alloc = allocator.allocateBytes(len);
284     if (alloc == null) {
285       // The allocation was too large, allocator decided
286       // not to do anything with it.
287       return cell;
288     }
289     assert alloc.getBytes() != null;
290     KeyValueUtil.appendToByteArray(cell, alloc.getBytes(), alloc.getOffset());
291     KeyValue newKv = new KeyValue(alloc.getBytes(), alloc.getOffset(), len);
292     newKv.setSequenceId(cell.getSequenceId());
293     return newKv;
294   }
295 
296   /**
297    * Remove n key from the memstore. Only cells that have the same key and the
298    * same memstoreTS are removed.  It is ok to not update timeRangeTracker
299    * in this call. It is possible that we can optimize this method by using
300    * tailMap/iterator, but since this method is called rarely (only for
301    * error recovery), we can leave those optimization for the future.
302    * @param cell
303    */
304   @Override
305   public void rollback(Cell cell) {
306     // If the key is in the snapshot, delete it. We should not update
307     // this.size, because that tracks the size of only the memstore and
308     // not the snapshot. The flush of this snapshot to disk has not
309     // yet started because Store.flush() waits for all rwcc transactions to
310     // commit before starting the flush to disk.
311     Cell found = this.snapshot.get(cell);
312     if (found != null && found.getSequenceId() == cell.getSequenceId()) {
313       this.snapshot.remove(cell);
314       long sz = heapSizeChange(cell, true);
315       this.snapshotSize -= sz;
316     }
317     // If the key is in the memstore, delete it. Update this.size.
318     found = this.cellSet.get(cell);
319     if (found != null && found.getSequenceId() == cell.getSequenceId()) {
320       removeFromCellSet(cell);
321       long s = heapSizeChange(cell, true);
322       this.size.addAndGet(-s);
323     }
324   }
325 
326   /**
327    * Write a delete
328    * @param deleteCell
329    * @return approximate size of the passed key and value.
330    */
331   @Override
332   public long delete(Cell deleteCell) {
333     long s = 0;
334     Cell toAdd = maybeCloneWithAllocator(deleteCell);
335     s += heapSizeChange(toAdd, addToCellSet(toAdd));
336     timeRangeTracker.includeTimestamp(toAdd);
337     this.size.addAndGet(s);
338     return s;
339   }
340 
341   /**
342    * @param cell Find the row that comes after this one.  If null, we return the
343    * first.
344    * @return Next row or null if none found.
345    */
346   Cell getNextRow(final Cell cell) {
347     return getLowest(getNextRow(cell, this.cellSet), getNextRow(cell, this.snapshot));
348   }
349 
350   /*
351    * @param a
352    * @param b
353    * @return Return lowest of a or b or null if both a and b are null
354    */
355   private Cell getLowest(final Cell a, final Cell b) {
356     if (a == null) {
357       return b;
358     }
359     if (b == null) {
360       return a;
361     }
362     return comparator.compareRows(a, b) <= 0? a: b;
363   }
364 
365   /*
366    * @param key Find row that follows this one.  If null, return first.
367    * @param map Set to look in for a row beyond <code>row</code>.
368    * @return Next row or null if none found.  If one found, will be a new
369    * KeyValue -- can be destroyed by subsequent calls to this method.
370    */
371   private Cell getNextRow(final Cell key,
372       final NavigableSet<Cell> set) {
373     Cell result = null;
374     SortedSet<Cell> tail = key == null? set: set.tailSet(key);
375     // Iterate until we fall into the next row; i.e. move off current row
376     for (Cell cell: tail) {
377       if (comparator.compareRows(cell, key) <= 0)
378         continue;
379       // Note: Not suppressing deletes or expired cells.  Needs to be handled
380       // by higher up functions.
381       result = cell;
382       break;
383     }
384     return result;
385   }
386 
387   /**
388    * @param state column/delete tracking state
389    */
390   @Override
391   public void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) {
392     getRowKeyAtOrBefore(cellSet, state);
393     getRowKeyAtOrBefore(snapshot, state);
394   }
395 
396   /*
397    * @param set
398    * @param state Accumulates deletes and candidates.
399    */
400   private void getRowKeyAtOrBefore(final NavigableSet<Cell> set,
401       final GetClosestRowBeforeTracker state) {
402     if (set.isEmpty()) {
403       return;
404     }
405     if (!walkForwardInSingleRow(set, state.getTargetKey(), state)) {
406       // Found nothing in row.  Try backing up.
407       getRowKeyBefore(set, state);
408     }
409   }
410 
411   /*
412    * Walk forward in a row from <code>firstOnRow</code>.  Presumption is that
413    * we have been passed the first possible key on a row.  As we walk forward
414    * we accumulate deletes until we hit a candidate on the row at which point
415    * we return.
416    * @param set
417    * @param firstOnRow First possible key on this row.
418    * @param state
419    * @return True if we found a candidate walking this row.
420    */
421   private boolean walkForwardInSingleRow(final SortedSet<Cell> set,
422       final Cell firstOnRow, final GetClosestRowBeforeTracker state) {
423     boolean foundCandidate = false;
424     SortedSet<Cell> tail = set.tailSet(firstOnRow);
425     if (tail.isEmpty()) return foundCandidate;
426     for (Iterator<Cell> i = tail.iterator(); i.hasNext();) {
427       Cell kv = i.next();
428       // Did we go beyond the target row? If so break.
429       if (state.isTooFar(kv, firstOnRow)) break;
430       if (state.isExpired(kv)) {
431         i.remove();
432         continue;
433       }
434       // If we added something, this row is a contender. break.
435       if (state.handle(kv)) {
436         foundCandidate = true;
437         break;
438       }
439     }
440     return foundCandidate;
441   }
442 
443   /*
444    * Walk backwards through the passed set a row at a time until we run out of
445    * set or until we get a candidate.
446    * @param set
447    * @param state
448    */
449   private void getRowKeyBefore(NavigableSet<Cell> set,
450       final GetClosestRowBeforeTracker state) {
451     Cell firstOnRow = state.getTargetKey();
452     for (Member p = memberOfPreviousRow(set, state, firstOnRow);
453         p != null; p = memberOfPreviousRow(p.set, state, firstOnRow)) {
454       // Make sure we don't fall out of our table.
455       if (!state.isTargetTable(p.cell)) break;
456       // Stop looking if we've exited the better candidate range.
457       if (!state.isBetterCandidate(p.cell)) break;
458       // Make into firstOnRow
459       firstOnRow = new KeyValue(p.cell.getRowArray(), p.cell.getRowOffset(), p.cell.getRowLength(),
460           HConstants.LATEST_TIMESTAMP);
461       // If we find something, break;
462       if (walkForwardInSingleRow(p.set, firstOnRow, state)) break;
463     }
464   }
465 
466   /**
467    * Only used by tests. TODO: Remove
468    *
469    * Given the specs of a column, update it, first by inserting a new record,
470    * then removing the old one.  Since there is only 1 KeyValue involved, the memstoreTS
471    * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
472    * store will ensure that the insert/delete each are atomic. A scanner/reader will either
473    * get the new value, or the old value and all readers will eventually only see the new
474    * value after the old was removed.
475    *
476    * @param row
477    * @param family
478    * @param qualifier
479    * @param newValue
480    * @param now
481    * @return  Timestamp
482    */
483   @Override
484   public long updateColumnValue(byte[] row,
485                                 byte[] family,
486                                 byte[] qualifier,
487                                 long newValue,
488                                 long now) {
489     Cell firstCell = KeyValueUtil.createFirstOnRow(row, family, qualifier);
490     // Is there a Cell in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
491     SortedSet<Cell> snSs = snapshot.tailSet(firstCell);
492     if (!snSs.isEmpty()) {
493       Cell snc = snSs.first();
494       // is there a matching Cell in the snapshot?
495       if (CellUtil.matchingRow(snc, firstCell) && CellUtil.matchingQualifier(snc, firstCell)) {
496         if (snc.getTimestamp() == now) {
497           // poop,
498           now += 1;
499         }
500       }
501     }
502 
503     // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
504     // But the timestamp should also be max(now, mostRecentTsInMemstore)
505 
506     // so we cant add the new Cell w/o knowing what's there already, but we also
507     // want to take this chance to delete some cells. So two loops (sad)
508 
509     SortedSet<Cell> ss = cellSet.tailSet(firstCell);
510     for (Cell cell : ss) {
511       // if this isnt the row we are interested in, then bail:
512       if (!CellUtil.matchingColumn(cell, family, qualifier)
513           || !CellUtil.matchingRow(cell, firstCell)) {
514         break; // rows dont match, bail.
515       }
516 
517       // if the qualifier matches and it's a put, just RM it out of the cellSet.
518       if (cell.getTypeByte() == KeyValue.Type.Put.getCode() &&
519           cell.getTimestamp() > now && CellUtil.matchingQualifier(firstCell, cell)) {
520         now = cell.getTimestamp();
521       }
522     }
523 
524     // create or update (upsert) a new Cell with
525     // 'now' and a 0 memstoreTS == immediately visible
526     List<Cell> cells = new ArrayList<Cell>(1);
527     cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)));
528     return upsert(cells, 1L);
529   }
530 
531   /**
532    * Update or insert the specified KeyValues.
533    * <p>
534    * For each KeyValue, insert into MemStore.  This will atomically upsert the
535    * value for that row/family/qualifier.  If a KeyValue did already exist,
536    * it will then be removed.
537    * <p>
538    * Currently the memstoreTS is kept at 0 so as each insert happens, it will
539    * be immediately visible.  May want to change this so it is atomic across
540    * all KeyValues.
541    * <p>
542    * This is called under row lock, so Get operations will still see updates
543    * atomically.  Scans will only see each KeyValue update as atomic.
544    *
545    * @param cells
546    * @param readpoint readpoint below which we can safely remove duplicate KVs
547    * @return change in memstore size
548    */
549   @Override
550   public long upsert(Iterable<Cell> cells, long readpoint) {
551     long size = 0;
552     for (Cell cell : cells) {
553       size += upsert(cell, readpoint);
554     }
555     return size;
556   }
557 
558   /**
559    * Inserts the specified KeyValue into MemStore and deletes any existing
560    * versions of the same row/family/qualifier as the specified KeyValue.
561    * <p>
562    * First, the specified KeyValue is inserted into the Memstore.
563    * <p>
564    * If there are any existing KeyValues in this MemStore with the same row,
565    * family, and qualifier, they are removed.
566    * <p>
567    * Callers must hold the read lock.
568    *
569    * @param cell
570    * @return change in size of MemStore
571    */
572   private long upsert(Cell cell, long readpoint) {
573     // Add the Cell to the MemStore
574     // Use the internalAdd method here since we (a) already have a lock
575     // and (b) cannot safely use the MSLAB here without potentially
576     // hitting OOME - see TestMemStore.testUpsertMSLAB for a
577     // test that triggers the pathological case if we don't avoid MSLAB
578     // here.
579     long addedSize = internalAdd(cell);
580 
581     // Get the Cells for the row/family/qualifier regardless of timestamp.
582     // For this case we want to clean up any other puts
583     Cell firstCell = KeyValueUtil.createFirstOnRow(
584         cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
585         cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
586         cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
587     SortedSet<Cell> ss = cellSet.tailSet(firstCell);
588     Iterator<Cell> it = ss.iterator();
589     // versions visible to oldest scanner
590     int versionsVisible = 0;
591     while ( it.hasNext() ) {
592       Cell cur = it.next();
593 
594       if (cell == cur) {
595         // ignore the one just put in
596         continue;
597       }
598       // check that this is the row and column we are interested in, otherwise bail
599       if (CellUtil.matchingRow(cell, cur) && CellUtil.matchingQualifier(cell, cur)) {
600         // only remove Puts that concurrent scanners cannot possibly see
601         if (cur.getTypeByte() == KeyValue.Type.Put.getCode() &&
602             cur.getSequenceId() <= readpoint) {
603           if (versionsVisible >= 1) {
604             // if we get here we have seen at least one version visible to the oldest scanner,
605             // which means we can prove that no scanner will see this version
606 
607             // false means there was a change, so give us the size.
608             long delta = heapSizeChange(cur, true);
609             addedSize -= delta;
610             this.size.addAndGet(-delta);
611             it.remove();
612             setOldestEditTimeToNow();
613           } else {
614             versionsVisible++;
615           }
616         }
617       } else {
618         // past the row or column, done
619         break;
620       }
621     }
622     return addedSize;
623   }
624 
625   /*
626    * Immutable data structure to hold member found in set and the set it was
627    * found in. Include set because it is carrying context.
628    */
629   private static class Member {
630     final Cell cell;
631     final NavigableSet<Cell> set;
632     Member(final NavigableSet<Cell> s, final Cell kv) {
633       this.cell = kv;
634       this.set = s;
635     }
636   }
637 
638   /*
639    * @param set Set to walk back in.  Pass a first in row or we'll return
640    * same row (loop).
641    * @param state Utility and context.
642    * @param firstOnRow First item on the row after the one we want to find a
643    * member in.
644    * @return Null or member of row previous to <code>firstOnRow</code>
645    */
646   private Member memberOfPreviousRow(NavigableSet<Cell> set,
647       final GetClosestRowBeforeTracker state, final Cell firstOnRow) {
648     NavigableSet<Cell> head = set.headSet(firstOnRow, false);
649     if (head.isEmpty()) return null;
650     for (Iterator<Cell> i = head.descendingIterator(); i.hasNext();) {
651       Cell found = i.next();
652       if (state.isExpired(found)) {
653         i.remove();
654         continue;
655       }
656       return new Member(head, found);
657     }
658     return null;
659   }
660 
661   /**
662    * @return scanner on memstore and snapshot in this order.
663    */
664   @Override
665   public List<KeyValueScanner> getScanners(long readPt) {
666     return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(readPt));
667   }
668 
669   /**
670    * Check if this memstore may contain the required keys
671    * @param scan
672    * @return False if the key definitely does not exist in this Memstore
673    */
674   public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) {
675     return (timeRangeTracker.includesTimeRange(scan.getTimeRange()) ||
676         snapshotTimeRangeTracker.includesTimeRange(scan.getTimeRange()))
677         && (Math.max(timeRangeTracker.getMaximumTimestamp(),
678                      snapshotTimeRangeTracker.getMaximumTimestamp()) >=
679             oldestUnexpiredTS);
680   }
681 
682   /*
683    * MemStoreScanner implements the KeyValueScanner.
684    * It lets the caller scan the contents of a memstore -- both current
685    * map and snapshot.
686    * This behaves as if it were a real scanner but does not maintain position.
687    */
688   protected class MemStoreScanner extends NonLazyKeyValueScanner {
689     // Next row information for either cellSet or snapshot
690     private Cell cellSetNextRow = null;
691     private Cell snapshotNextRow = null;
692 
693     // last iterated Cells for cellSet and snapshot (to restore iterator state after reseek)
694     private Cell cellSetItRow = null;
695     private Cell snapshotItRow = null;
696     
697     // iterator based scanning.
698     private Iterator<Cell> cellSetIt;
699     private Iterator<Cell> snapshotIt;
700 
701     // The cellSet and snapshot at the time of creating this scanner
702     private CellSkipListSet cellSetAtCreation;
703     private CellSkipListSet snapshotAtCreation;
704 
705     // the pre-calculated Cell to be returned by peek() or next()
706     private Cell theNext;
707 
708     // The allocator and snapshot allocator at the time of creating this scanner
709     volatile MemStoreLAB allocatorAtCreation;
710     volatile MemStoreLAB snapshotAllocatorAtCreation;
711     
712     // A flag represents whether could stop skipping Cells for MVCC
713     // if have encountered the next row. Only used for reversed scan
714     private boolean stopSkippingCellsIfNextRow = false;
715 
716     private long readPoint;
717 
718     /*
719     Some notes...
720 
721      So memstorescanner is fixed at creation time. this includes pointers/iterators into
722     existing kvset/snapshot.  during a snapshot creation, the kvset is null, and the
723     snapshot is moved.  since kvset is null there is no point on reseeking on both,
724       we can save us the trouble. During the snapshot->hfile transition, the memstore
725       scanner is re-created by StoreScanner#updateReaders().  StoreScanner should
726       potentially do something smarter by adjusting the existing memstore scanner.
727 
728       But there is a greater problem here, that being once a scanner has progressed
729       during a snapshot scenario, we currently iterate past the kvset then 'finish' up.
730       if a scan lasts a little while, there is a chance for new entries in kvset to
731       become available but we will never see them.  This needs to be handled at the
732       StoreScanner level with coordination with MemStoreScanner.
733 
734       Currently, this problem is only partly managed: during the small amount of time
735       when the StoreScanner has not yet created a new MemStoreScanner, we will miss
736       the adds to kvset in the MemStoreScanner.
737     */
738 
739     MemStoreScanner(long readPoint) {
740       super();
741 
742       this.readPoint = readPoint;
743       cellSetAtCreation = cellSet;
744       snapshotAtCreation = snapshot;
745       if (allocator != null) {
746         this.allocatorAtCreation = allocator;
747         this.allocatorAtCreation.incScannerCount();
748       }
749       if (snapshotAllocator != null) {
750         this.snapshotAllocatorAtCreation = snapshotAllocator;
751         this.snapshotAllocatorAtCreation.incScannerCount();
752       }
753       if (Trace.isTracing() && Trace.currentSpan() != null) {
754         Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner");
755       }
756     }
757 
758     /**
759      * Lock on 'this' must be held by caller.
760      * @param it
761      * @return Next Cell
762      */
763     private Cell getNext(Iterator<Cell> it) {
764       Cell startCell = theNext;
765       Cell v = null;
766       try {
767         while (it.hasNext()) {
768           v = it.next();
769           if (v.getSequenceId() <= this.readPoint) {
770             return v;
771           }
772           if (stopSkippingCellsIfNextRow && startCell != null
773               && comparator.compareRows(v, startCell) > 0) {
774             return null;
775           }
776         }
777 
778         return null;
779       } finally {
780         if (v != null) {
781           // in all cases, remember the last Cell iterated to
782           if (it == snapshotIt) {
783             snapshotItRow = v;
784           } else {
785             cellSetItRow = v;
786           }
787         }
788       }
789     }
790 
791     /**
792      *  Set the scanner at the seek key.
793      *  Must be called only once: there is no thread safety between the scanner
794      *   and the memStore.
795      * @param key seek value
796      * @return false if the key is null or if there is no data
797      */
798     @Override
799     public synchronized boolean seek(Cell key) {
800       if (key == null) {
801         close();
802         return false;
803       }
804       // kvset and snapshot will never be null.
805       // if tailSet can't find anything, SortedSet is empty (not null).
806       cellSetIt = cellSetAtCreation.tailSet(key).iterator();
807       snapshotIt = snapshotAtCreation.tailSet(key).iterator();
808       cellSetItRow = null;
809       snapshotItRow = null;
810 
811       return seekInSubLists(key);
812     }
813 
814 
815     /**
816      * (Re)initialize the iterators after a seek or a reseek.
817      */
818     private synchronized boolean seekInSubLists(Cell key){
819       cellSetNextRow = getNext(cellSetIt);
820       snapshotNextRow = getNext(snapshotIt);
821 
822       // Calculate the next value
823       theNext = getLowest(cellSetNextRow, snapshotNextRow);
824 
825       // has data
826       return (theNext != null);
827     }
828 
829 
830     /**
831      * Move forward on the sub-lists set previously by seek.
832      * @param key seek value (should be non-null)
833      * @return true if there is at least one KV to read, false otherwise
834      */
835     @Override
836     public synchronized boolean reseek(Cell key) {
837       /*
838       See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
839       This code is executed concurrently with flush and puts, without locks.
840       Two points must be known when working on this code:
841       1) It's not possible to use the 'kvTail' and 'snapshot'
842        variables, as they are modified during a flush.
843       2) The ideal implementation for performance would use the sub skip list
844        implicitly pointed by the iterators 'kvsetIt' and
845        'snapshotIt'. Unfortunately the Java API does not offer a method to
846        get it. So we remember the last keys we iterated to and restore
847        the reseeked set to at least that point.
848        */
849       cellSetIt = cellSetAtCreation.tailSet(getHighest(key, cellSetItRow)).iterator();
850       snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator();
851 
852       return seekInSubLists(key);
853     }
854 
855 
856     @Override
857     public synchronized Cell peek() {
858       //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
859       return theNext;
860     }
861 
862     @Override
863     public synchronized Cell next() {
864       if (theNext == null) {
865           return null;
866       }
867 
868       final Cell ret = theNext;
869 
870       // Advance one of the iterators
871       if (theNext == cellSetNextRow) {
872         cellSetNextRow = getNext(cellSetIt);
873       } else {
874         snapshotNextRow = getNext(snapshotIt);
875       }
876 
877       // Calculate the next value
878       theNext = getLowest(cellSetNextRow, snapshotNextRow);
879 
880       //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
881       //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
882       //    getLowest() + " threadpoint=" + readpoint);
883       return ret;
884     }
885 
886     /*
887      * Returns the lower of the two key values, or null if they are both null.
888      * This uses comparator.compare() to compare the KeyValue using the memstore
889      * comparator.
890      */
891     private Cell getLowest(Cell first, Cell second) {
892       if (first == null && second == null) {
893         return null;
894       }
895       if (first != null && second != null) {
896         int compare = comparator.compare(first, second);
897         return (compare <= 0 ? first : second);
898       }
899       return (first != null ? first : second);
900     }
901 
902     /*
903      * Returns the higher of the two cells, or null if they are both null.
904      * This uses comparator.compare() to compare the Cell using the memstore
905      * comparator.
906      */
907     private Cell getHighest(Cell first, Cell second) {
908       if (first == null && second == null) {
909         return null;
910       }
911       if (first != null && second != null) {
912         int compare = comparator.compare(first, second);
913         return (compare > 0 ? first : second);
914       }
915       return (first != null ? first : second);
916     }
917 
918     public synchronized void close() {
919       this.cellSetNextRow = null;
920       this.snapshotNextRow = null;
921 
922       this.cellSetIt = null;
923       this.snapshotIt = null;
924       
925       if (allocatorAtCreation != null) {
926         this.allocatorAtCreation.decScannerCount();
927         this.allocatorAtCreation = null;
928       }
929       if (snapshotAllocatorAtCreation != null) {
930         this.snapshotAllocatorAtCreation.decScannerCount();
931         this.snapshotAllocatorAtCreation = null;
932       }
933 
934       this.cellSetItRow = null;
935       this.snapshotItRow = null;
936     }
937 
938     /**
939      * MemStoreScanner returns max value as sequence id because it will
940      * always have the latest data among all files.
941      */
942     @Override
943     public long getSequenceID() {
944       return Long.MAX_VALUE;
945     }
946 
947     @Override
948     public boolean shouldUseScanner(Scan scan, SortedSet<byte[]> columns,
949         long oldestUnexpiredTS) {
950       return shouldSeek(scan, oldestUnexpiredTS);
951     }
952 
953     /**
954      * Seek scanner to the given key first. If it returns false(means
955      * peek()==null) or scanner's peek row is bigger than row of given key, seek
956      * the scanner to the previous row of given key
957      */
958     @Override
959     public synchronized boolean backwardSeek(Cell key) {
960       seek(key);
961       if (peek() == null || comparator.compareRows(peek(), key) > 0) {
962         return seekToPreviousRow(key);
963       }
964       return true;
965     }
966 
967     /**
968      * Separately get the KeyValue before the specified key from kvset and
969      * snapshotset, and use the row of higher one as the previous row of
970      * specified key, then seek to the first KeyValue of previous row
971      */
972     @Override
973     public synchronized boolean seekToPreviousRow(Cell originalKey) {
974       boolean keepSeeking = false;
975       Cell key = originalKey;
976       do {
977         Cell firstKeyOnRow = KeyValueUtil.createFirstOnRow(key.getRowArray(), key.getRowOffset(),
978             key.getRowLength());
979         SortedSet<Cell> cellHead = cellSetAtCreation.headSet(firstKeyOnRow);
980         Cell cellSetBeforeRow = cellHead.isEmpty() ? null : cellHead.last();
981         SortedSet<Cell> snapshotHead = snapshotAtCreation
982             .headSet(firstKeyOnRow);
983         Cell snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead
984             .last();
985         Cell lastCellBeforeRow = getHighest(cellSetBeforeRow, snapshotBeforeRow);
986         if (lastCellBeforeRow == null) {
987           theNext = null;
988           return false;
989         }
990         Cell firstKeyOnPreviousRow = KeyValueUtil.createFirstOnRow(lastCellBeforeRow.getRowArray(),
991             lastCellBeforeRow.getRowOffset(), lastCellBeforeRow.getRowLength());
992         this.stopSkippingCellsIfNextRow = true;
993         seek(firstKeyOnPreviousRow);
994         this.stopSkippingCellsIfNextRow = false;
995         if (peek() == null
996             || comparator.compareRows(peek(), firstKeyOnPreviousRow) > 0) {
997           keepSeeking = true;
998           key = firstKeyOnPreviousRow;
999           continue;
1000         } else {
1001           keepSeeking = false;
1002         }
1003       } while (keepSeeking);
1004       return true;
1005     }
1006 
1007     @Override
1008     public synchronized boolean seekToLastRow() {
1009       Cell first = cellSetAtCreation.isEmpty() ? null : cellSetAtCreation
1010           .last();
1011       Cell second = snapshotAtCreation.isEmpty() ? null
1012           : snapshotAtCreation.last();
1013       Cell higherCell = getHighest(first, second);
1014       if (higherCell == null) {
1015         return false;
1016       }
1017       Cell firstCellOnLastRow = KeyValueUtil.createFirstOnRow(higherCell.getRowArray(),
1018           higherCell.getRowOffset(), higherCell.getRowLength());
1019       if (seek(firstCellOnLastRow)) {
1020         return true;
1021       } else {
1022         return seekToPreviousRow(higherCell);
1023       }
1024 
1025     }
1026   }
1027 
1028   public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
1029       + (9 * ClassSize.REFERENCE) + (3 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN);
1030 
1031   public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
1032       ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) +
1033       (2 * ClassSize.CELL_SKIPLIST_SET) + (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
1034 
1035   /*
1036    * Calculate how the MemStore size has changed.  Includes overhead of the
1037    * backing Map.
1038    * @param cell
1039    * @param notpresent True if the cell was NOT present in the set.
1040    * @return Size
1041    */
1042   static long heapSizeChange(final Cell cell, final boolean notpresent) {
1043     return notpresent ? ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY
1044         + CellUtil.estimatedHeapSizeOf(cell)) : 0;
1045   }
1046 
1047   private long keySize() {
1048     return heapSize() - DEEP_OVERHEAD;
1049   }
1050 
1051   /**
1052    * Get the entire heap usage for this MemStore not including keys in the
1053    * snapshot.
1054    */
1055   @Override
1056   public long heapSize() {
1057     return size.get();
1058   }
1059 
1060   @Override
1061   public long size() {
1062     return heapSize();
1063   }
1064 
1065   /**
1066    * Code to help figure if our approximation of object heap sizes is close
1067    * enough.  See hbase-900.  Fills memstores then waits so user can heap
1068    * dump and bring up resultant hprof in something like jprofiler which
1069    * allows you get 'deep size' on objects.
1070    * @param args main args
1071    */
1072   public static void main(String [] args) {
1073     RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
1074     LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" +
1075       runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion());
1076     LOG.info("vmInputArguments=" + runtime.getInputArguments());
1077     DefaultMemStore memstore1 = new DefaultMemStore();
1078     // TODO: x32 vs x64
1079     long size = 0;
1080     final int count = 10000;
1081     byte [] fam = Bytes.toBytes("col");
1082     byte [] qf = Bytes.toBytes("umn");
1083     byte [] empty = new byte[0];
1084     for (int i = 0; i < count; i++) {
1085       // Give each its own ts
1086       Pair<Long, Cell> ret = memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
1087       size += ret.getFirst();
1088     }
1089     LOG.info("memstore1 estimated size=" + size);
1090     for (int i = 0; i < count; i++) {
1091       Pair<Long, Cell> ret = memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
1092       size += ret.getFirst();
1093     }
1094     LOG.info("memstore1 estimated size (2nd loading of same data)=" + size);
1095     // Make a variably sized memstore.
1096     DefaultMemStore memstore2 = new DefaultMemStore();
1097     for (int i = 0; i < count; i++) {
1098       Pair<Long, Cell> ret = memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i,
1099         new byte[i]));
1100       size += ret.getFirst();
1101     }
1102     LOG.info("memstore2 estimated size=" + size);
1103     final int seconds = 30;
1104     LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
1105     for (int i = 0; i < seconds; i++) {
1106       // Thread.sleep(1000);
1107     }
1108     LOG.info("Exiting.");
1109   }
1110 
1111 }