View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.List;
26  import java.util.SortedSet;
27  import java.util.concurrent.atomic.AtomicLong;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.hbase.Cell;
33  import org.apache.hadoop.hbase.CellUtil;
34  import org.apache.hadoop.hbase.HConstants;
35  import org.apache.hadoop.hbase.KeyValue;
36  import org.apache.hadoop.hbase.KeyValueUtil;
37  import org.apache.hadoop.hbase.client.Scan;
38  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
39  import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
40  
41  /**
42   * KeyValueScanner adaptor over the Reader.  It also provides hooks into
43   * bloom filter things.
44   */
45  @InterfaceAudience.LimitedPrivate("Coprocessor")
46  public class StoreFileScanner implements KeyValueScanner {
47    static final Log LOG = LogFactory.getLog(HStore.class);
48  
49    // the reader it comes from:
50    private final StoreFile.Reader reader;
51    private final HFileScanner hfs;
52    private Cell cur = null;
53    private boolean closed = false;
54  
55    private boolean realSeekDone;
56    private boolean delayedReseek;
57    private Cell delayedSeekKV;
58  
59    private boolean enforceMVCC = false;
60    private boolean hasMVCCInfo = false;
61    // A flag represents whether could stop skipping KeyValues for MVCC
62    // if have encountered the next row. Only used for reversed scan
63    private boolean stopSkippingKVsIfNextRow = false;
64  
65    private static AtomicLong seekCount;
66  
67    private ScanQueryMatcher matcher;
68    
69    private long readPt;
70  
71    /**
72     * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner}
73     * @param hfs HFile scanner
74     */
75    public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC,
76        boolean hasMVCC, long readPt) {
77      this.readPt = readPt;
78      this.reader = reader;
79      this.hfs = hfs;
80      this.enforceMVCC = useMVCC;
81      this.hasMVCCInfo = hasMVCC;
82    }
83  
84    boolean isPrimaryReplica() {
85      return reader.isPrimaryReplicaReader();
86    }
87  
88    /**
89     * Return an array of scanners corresponding to the given
90     * set of store files.
91     */
92    public static List<StoreFileScanner> getScannersForStoreFiles(
93        Collection<StoreFile> files,
94        boolean cacheBlocks,
95        boolean usePread, long readPt) throws IOException {
96      return getScannersForStoreFiles(files, cacheBlocks,
97                                     usePread, false, readPt);
98    }
99  
100   /**
101    * Return an array of scanners corresponding to the given set of store files.
102    */
103   public static List<StoreFileScanner> getScannersForStoreFiles(
104       Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
105       boolean isCompaction, long readPt) throws IOException {
106     return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction,
107         null, readPt, true);
108   }
109 
110   /**
111    * Return an array of scanners corresponding to the given set of store files,
112    * And set the ScanQueryMatcher for each store file scanner for further
113    * optimization
114    */
115   public static List<StoreFileScanner> getScannersForStoreFiles(
116       Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
117       boolean isCompaction, ScanQueryMatcher matcher, long readPt, boolean isPrimaryReplica)
118           throws IOException {
119     List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(
120         files.size());
121     for (StoreFile file : files) {
122       StoreFile.Reader r = file.createReader();
123       r.setReplicaStoreFile(isPrimaryReplica);
124       StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread,
125           isCompaction, readPt);
126       scanner.setScanQueryMatcher(matcher);
127       scanners.add(scanner);
128     }
129     return scanners;
130   }
131 
132   public static List<StoreFileScanner> getScannersForStoreFiles(
133     Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
134     boolean isCompaction, ScanQueryMatcher matcher, long readPt) throws IOException {
135     return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction,
136       matcher, readPt, true);
137   }
138 
139   public String toString() {
140     return "StoreFileScanner[" + hfs.toString() + ", cur=" + cur + "]";
141   }
142 
143   public Cell peek() {
144     return cur;
145   }
146 
147   public Cell next() throws IOException {
148     Cell retKey = cur;
149 
150     try {
151       // only seek if we aren't at the end. cur == null implies 'end'.
152       if (cur != null) {
153         hfs.next();
154         setCurrentCell(hfs.getKeyValue());
155         if (hasMVCCInfo || this.reader.isBulkLoaded()) {
156           skipKVsNewerThanReadpoint();
157         }
158       }
159     } catch(IOException e) {
160       throw new IOException("Could not iterate " + this, e);
161     }
162     return retKey;
163   }
164 
165   public boolean seek(Cell key) throws IOException {
166     if (seekCount != null) seekCount.incrementAndGet();
167 
168     try {
169       try {
170         if(!seekAtOrAfter(hfs, key)) {
171           close();
172           return false;
173         }
174 
175         setCurrentCell(hfs.getKeyValue());
176 
177         if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
178           return skipKVsNewerThanReadpoint();
179         } else {
180           return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
181         }
182       } finally {
183         realSeekDone = true;
184       }
185     } catch (IOException ioe) {
186       throw new IOException("Could not seek " + this + " to key " + key, ioe);
187     }
188   }
189 
190   public boolean reseek(Cell key) throws IOException {
191     if (seekCount != null) seekCount.incrementAndGet();
192 
193     try {
194       try {
195         if (!reseekAtOrAfter(hfs, key)) {
196           close();
197           return false;
198         }
199         setCurrentCell(hfs.getKeyValue());
200 
201         if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
202           return skipKVsNewerThanReadpoint();
203         } else {
204           return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
205         }
206       } finally {
207         realSeekDone = true;
208       }
209     } catch (IOException ioe) {
210       throw new IOException("Could not reseek " + this + " to key " + key,
211           ioe);
212     }
213   }
214 
215   protected void setCurrentCell(Cell newVal) throws IOException {
216     this.cur = newVal;
217     if (this.cur != null && this.reader.isBulkLoaded() && !this.reader.isSkipResetSeqId()) {
218       CellUtil.setSequenceId(cur, this.reader.getSequenceID());
219     }
220   }
221 
222   protected boolean skipKVsNewerThanReadpoint() throws IOException {
223     // We want to ignore all key-values that are newer than our current
224     // readPoint
225     Cell startKV = cur;
226     while(enforceMVCC
227         && cur != null
228         && (cur.getMvccVersion() > readPt)) {
229       boolean hasNext = hfs.next();
230       setCurrentCell(hfs.getKeyValue());
231       if (hasNext && this.stopSkippingKVsIfNextRow
232           && getComparator().compareRows(cur.getRowArray(), cur.getRowOffset(),
233               cur.getRowLength(), startKV.getRowArray(), startKV.getRowOffset(),
234               startKV.getRowLength()) > 0) {
235         return false;
236       }
237     }
238 
239     if (cur == null) {
240       close();
241       return false;
242     }
243 
244     return true;
245   }
246 
247   public void close() {
248     // Nothing to close on HFileScanner?
249     cur = null;
250     if (closed) return;
251     closed = true;
252     this.hfs.close();
253   }
254 
255   /**
256    *
257    * @param s
258    * @param k
259    * @return false if not found or if k is after the end.
260    * @throws IOException
261    */
262   public static boolean seekAtOrAfter(HFileScanner s, Cell k)
263   throws IOException {
264     int result = s.seekTo(k);
265     if(result < 0) {
266       if (result == HConstants.INDEX_KEY_MAGIC) {
267         // using faked key
268         return true;
269       }
270       // Passed KV is smaller than first KV in file, work from start of file
271       return s.seekTo();
272     } else if(result > 0) {
273       // Passed KV is larger than current KV in file, if there is a next
274       // it is the "after", if not then this scanner is done.
275       return s.next();
276     }
277     // Seeked to the exact key
278     return true;
279   }
280 
281   static boolean reseekAtOrAfter(HFileScanner s, Cell k)
282   throws IOException {
283     //This function is similar to seekAtOrAfter function
284     int result = s.reseekTo(k);
285     if (result <= 0) {
286       if (result == HConstants.INDEX_KEY_MAGIC) {
287         // using faked key
288         return true;
289       }
290       // If up to now scanner is not seeked yet, this means passed KV is smaller
291       // than first KV in file, and it is the first time we seek on this file.
292       // So we also need to work from the start of file.
293       if (!s.isSeeked()) {
294         return  s.seekTo();
295       }
296       return true;
297     }
298     // passed KV is larger than current KV in file, if there is a next
299     // it is after, if not then this scanner is done.
300     return s.next();
301   }
302 
303   @Override
304   public long getSequenceID() {
305     return reader.getSequenceID();
306   }
307 
308   /**
309    * Pretend we have done a seek but don't do it yet, if possible. The hope is
310    * that we find requested columns in more recent files and won't have to seek
311    * in older files. Creates a fake key/value with the given row/column and the
312    * highest (most recent) possible timestamp we might get from this file. When
313    * users of such "lazy scanner" need to know the next KV precisely (e.g. when
314    * this scanner is at the top of the heap), they run {@link #enforceSeek()}.
315    * <p>
316    * Note that this function does guarantee that the current KV of this scanner
317    * will be advanced to at least the given KV. Because of this, it does have
318    * to do a real seek in cases when the seek timestamp is older than the
319    * highest timestamp of the file, e.g. when we are trying to seek to the next
320    * row/column and use OLDEST_TIMESTAMP in the seek key.
321    */
322   @Override
323   public boolean requestSeek(Cell kv, boolean forward, boolean useBloom)
324       throws IOException {
325     if (kv.getFamilyLength() == 0) {
326       useBloom = false;
327     }
328 
329     boolean haveToSeek = true;
330     if (useBloom) {
331       // check ROWCOL Bloom filter first.
332       if (reader.getBloomFilterType() == BloomType.ROWCOL) {
333         haveToSeek = reader.passesGeneralBloomFilter(kv.getRowArray(),
334             kv.getRowOffset(), kv.getRowLength(), kv.getQualifierArray(),
335             kv.getQualifierOffset(), kv.getQualifierLength());
336       } else if (this.matcher != null && !matcher.hasNullColumnInQuery() &&
337           ((CellUtil.isDeleteFamily(kv) || CellUtil.isDeleteFamilyVersion(kv)))) {
338         // if there is no such delete family kv in the store file,
339         // then no need to seek.
340         haveToSeek = reader.passesDeleteFamilyBloomFilter(kv.getRowArray(),
341             kv.getRowOffset(), kv.getRowLength());
342       }
343     }
344 
345     delayedReseek = forward;
346     delayedSeekKV = kv;
347 
348     if (haveToSeek) {
349       // This row/column might be in this store file (or we did not use the
350       // Bloom filter), so we still need to seek.
351       realSeekDone = false;
352       long maxTimestampInFile = reader.getMaxTimestamp();
353       long seekTimestamp = kv.getTimestamp();
354       if (seekTimestamp > maxTimestampInFile) {
355         // Create a fake key that is not greater than the real next key.
356         // (Lower timestamps correspond to higher KVs.)
357         // To understand this better, consider that we are asked to seek to
358         // a higher timestamp than the max timestamp in this file. We know that
359         // the next point when we have to consider this file again is when we
360         // pass the max timestamp of this file (with the same row/column).
361         setCurrentCell(KeyValueUtil.createFirstOnRowColTS(kv, maxTimestampInFile));
362       } else {
363         // This will be the case e.g. when we need to seek to the next
364         // row/column, and we don't know exactly what they are, so we set the
365         // seek key's timestamp to OLDEST_TIMESTAMP to skip the rest of this
366         // row/column.
367         enforceSeek();
368       }
369       return cur != null;
370     }
371 
372     // Multi-column Bloom filter optimization.
373     // Create a fake key/value, so that this scanner only bubbles up to the top
374     // of the KeyValueHeap in StoreScanner after we scanned this row/column in
375     // all other store files. The query matcher will then just skip this fake
376     // key/value and the store scanner will progress to the next column. This
377     // is obviously not a "real real" seek, but unlike the fake KV earlier in
378     // this method, we want this to be propagated to ScanQueryMatcher.
379     setCurrentCell(KeyValueUtil.createLastOnRowCol(kv));
380 
381     realSeekDone = true;
382     return true;
383   }
384 
385   Reader getReader() {
386     return reader;
387   }
388 
389   KeyValue.KVComparator getComparator() {
390     return reader.getComparator();
391   }
392 
393   @Override
394   public boolean realSeekDone() {
395     return realSeekDone;
396   }
397 
398   @Override
399   public void enforceSeek() throws IOException {
400     if (realSeekDone)
401       return;
402 
403     if (delayedReseek) {
404       reseek(delayedSeekKV);
405     } else {
406       seek(delayedSeekKV);
407     }
408   }
409 
410   public void setScanQueryMatcher(ScanQueryMatcher matcher) {
411     this.matcher = matcher;
412   }
413 
414   @Override
415   public boolean isFileScanner() {
416     return true;
417   }
418 
419   // Test methods
420 
421   static final long getSeekCount() {
422     return seekCount.get();
423   }
424   static final void instrument() {
425     seekCount = new AtomicLong();
426   }
427 
428   @Override
429   public boolean shouldUseScanner(Scan scan, SortedSet<byte[]> columns, long oldestUnexpiredTS) {
430     return reader.passesTimerangeFilter(scan, oldestUnexpiredTS)
431         && reader.passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, columns);
432   }
433 
434   @Override
435   @SuppressWarnings("deprecation")
436   public boolean seekToPreviousRow(Cell originalKey) throws IOException {
437     try {
438       try {
439         boolean keepSeeking = false;
440         Cell key = originalKey;
441         do {
442           KeyValue seekKey = KeyValueUtil.createFirstOnRow(key.getRowArray(), key.getRowOffset(),
443               key.getRowLength());
444           if (seekCount != null) seekCount.incrementAndGet();
445           if (!hfs.seekBefore(seekKey.getBuffer(), seekKey.getKeyOffset(),
446               seekKey.getKeyLength())) {
447             close();
448             return false;
449           }
450           KeyValue firstKeyOfPreviousRow = KeyValueUtil.createFirstOnRow(hfs.getKeyValue()
451               .getRowArray(), hfs.getKeyValue().getRowOffset(), hfs.getKeyValue().getRowLength());
452 
453           if (seekCount != null) seekCount.incrementAndGet();
454           if (!seekAtOrAfter(hfs, firstKeyOfPreviousRow)) {
455             close();
456             return false;
457           }
458 
459           setCurrentCell(hfs.getKeyValue());
460           this.stopSkippingKVsIfNextRow = true;
461           boolean resultOfSkipKVs;
462           try {
463             resultOfSkipKVs = skipKVsNewerThanReadpoint();
464           } finally {
465             this.stopSkippingKVsIfNextRow = false;
466           }
467           if (!resultOfSkipKVs
468               || getComparator().compareRows(cur, firstKeyOfPreviousRow) > 0) {
469             keepSeeking = true;
470             key = firstKeyOfPreviousRow;
471             continue;
472           } else {
473             keepSeeking = false;
474           }
475         } while (keepSeeking);
476         return true;
477       } finally {
478         realSeekDone = true;
479       }
480     } catch (IOException ioe) {
481       throw new IOException("Could not seekToPreviousRow " + this + " to key "
482           + originalKey, ioe);
483     }
484   }
485 
486   @Override
487   public boolean seekToLastRow() throws IOException {
488     byte[] lastRow = reader.getLastRowKey();
489     if (lastRow == null) {
490       return false;
491     }
492     KeyValue seekKey = KeyValueUtil.createFirstOnRow(lastRow);
493     if (seek(seekKey)) {
494       return true;
495     } else {
496       return seekToPreviousRow(seekKey);
497     }
498   }
499 
500   @Override
501   public boolean backwardSeek(Cell key) throws IOException {
502     seek(key);
503     if (cur == null
504         || getComparator().compareRows(cur.getRowArray(), cur.getRowOffset(),
505             cur.getRowLength(), key.getRowArray(), key.getRowOffset(),
506             key.getRowLength()) > 0) {
507       return seekToPreviousRow(key);
508     }
509     return true;
510   }
511 
512   @Override
513   public Cell getNextIndexedKey() {
514     return hfs.getNextIndexedKey();
515   }
516 }