1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.List;
26 import java.util.SortedSet;
27 import java.util.concurrent.atomic.AtomicLong;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.hbase.Cell;
33 import org.apache.hadoop.hbase.CellUtil;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.KeyValue;
36 import org.apache.hadoop.hbase.KeyValueUtil;
37 import org.apache.hadoop.hbase.client.Scan;
38 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
39 import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
40
41
42
43
44
45 @InterfaceAudience.LimitedPrivate("Coprocessor")
46 public class StoreFileScanner implements KeyValueScanner {
47 static final Log LOG = LogFactory.getLog(HStore.class);
48
49
50 private final StoreFile.Reader reader;
51 private final HFileScanner hfs;
52 private Cell cur = null;
53 private boolean closed = false;
54
55 private boolean realSeekDone;
56 private boolean delayedReseek;
57 private Cell delayedSeekKV;
58
59 private boolean enforceMVCC = false;
60 private boolean hasMVCCInfo = false;
61
62
63 private boolean stopSkippingKVsIfNextRow = false;
64
65 private static AtomicLong seekCount;
66
67 private ScanQueryMatcher matcher;
68
69 private long readPt;
70
71
72
73
74
75 public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC,
76 boolean hasMVCC, long readPt) {
77 this.readPt = readPt;
78 this.reader = reader;
79 this.hfs = hfs;
80 this.enforceMVCC = useMVCC;
81 this.hasMVCCInfo = hasMVCC;
82 }
83
84 boolean isPrimaryReplica() {
85 return reader.isPrimaryReplicaReader();
86 }
87
88
89
90
91
92 public static List<StoreFileScanner> getScannersForStoreFiles(
93 Collection<StoreFile> files,
94 boolean cacheBlocks,
95 boolean usePread, long readPt) throws IOException {
96 return getScannersForStoreFiles(files, cacheBlocks,
97 usePread, false, readPt);
98 }
99
100
101
102
103 public static List<StoreFileScanner> getScannersForStoreFiles(
104 Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
105 boolean isCompaction, long readPt) throws IOException {
106 return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction,
107 null, readPt, true);
108 }
109
110
111
112
113
114
115 public static List<StoreFileScanner> getScannersForStoreFiles(
116 Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
117 boolean isCompaction, ScanQueryMatcher matcher, long readPt, boolean isPrimaryReplica)
118 throws IOException {
119 List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(
120 files.size());
121 for (StoreFile file : files) {
122 StoreFile.Reader r = file.createReader();
123 r.setReplicaStoreFile(isPrimaryReplica);
124 StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread,
125 isCompaction, readPt);
126 scanner.setScanQueryMatcher(matcher);
127 scanners.add(scanner);
128 }
129 return scanners;
130 }
131
132 public static List<StoreFileScanner> getScannersForStoreFiles(
133 Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
134 boolean isCompaction, ScanQueryMatcher matcher, long readPt) throws IOException {
135 return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction,
136 matcher, readPt, true);
137 }
138
139 public String toString() {
140 return "StoreFileScanner[" + hfs.toString() + ", cur=" + cur + "]";
141 }
142
143 public Cell peek() {
144 return cur;
145 }
146
147 public Cell next() throws IOException {
148 Cell retKey = cur;
149
150 try {
151
152 if (cur != null) {
153 hfs.next();
154 setCurrentCell(hfs.getKeyValue());
155 if (hasMVCCInfo || this.reader.isBulkLoaded()) {
156 skipKVsNewerThanReadpoint();
157 }
158 }
159 } catch(IOException e) {
160 throw new IOException("Could not iterate " + this, e);
161 }
162 return retKey;
163 }
164
165 public boolean seek(Cell key) throws IOException {
166 if (seekCount != null) seekCount.incrementAndGet();
167
168 try {
169 try {
170 if(!seekAtOrAfter(hfs, key)) {
171 close();
172 return false;
173 }
174
175 setCurrentCell(hfs.getKeyValue());
176
177 if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
178 return skipKVsNewerThanReadpoint();
179 } else {
180 return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
181 }
182 } finally {
183 realSeekDone = true;
184 }
185 } catch (IOException ioe) {
186 throw new IOException("Could not seek " + this + " to key " + key, ioe);
187 }
188 }
189
190 public boolean reseek(Cell key) throws IOException {
191 if (seekCount != null) seekCount.incrementAndGet();
192
193 try {
194 try {
195 if (!reseekAtOrAfter(hfs, key)) {
196 close();
197 return false;
198 }
199 setCurrentCell(hfs.getKeyValue());
200
201 if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
202 return skipKVsNewerThanReadpoint();
203 } else {
204 return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
205 }
206 } finally {
207 realSeekDone = true;
208 }
209 } catch (IOException ioe) {
210 throw new IOException("Could not reseek " + this + " to key " + key,
211 ioe);
212 }
213 }
214
215 protected void setCurrentCell(Cell newVal) throws IOException {
216 this.cur = newVal;
217 if (this.cur != null && this.reader.isBulkLoaded() && !this.reader.isSkipResetSeqId()) {
218 CellUtil.setSequenceId(cur, this.reader.getSequenceID());
219 }
220 }
221
222 protected boolean skipKVsNewerThanReadpoint() throws IOException {
223
224
225 Cell startKV = cur;
226 while(enforceMVCC
227 && cur != null
228 && (cur.getMvccVersion() > readPt)) {
229 boolean hasNext = hfs.next();
230 setCurrentCell(hfs.getKeyValue());
231 if (hasNext && this.stopSkippingKVsIfNextRow
232 && getComparator().compareRows(cur.getRowArray(), cur.getRowOffset(),
233 cur.getRowLength(), startKV.getRowArray(), startKV.getRowOffset(),
234 startKV.getRowLength()) > 0) {
235 return false;
236 }
237 }
238
239 if (cur == null) {
240 close();
241 return false;
242 }
243
244 return true;
245 }
246
247 public void close() {
248
249 cur = null;
250 if (closed) return;
251 closed = true;
252 this.hfs.close();
253 }
254
255
256
257
258
259
260
261
262 public static boolean seekAtOrAfter(HFileScanner s, Cell k)
263 throws IOException {
264 int result = s.seekTo(k);
265 if(result < 0) {
266 if (result == HConstants.INDEX_KEY_MAGIC) {
267
268 return true;
269 }
270
271 return s.seekTo();
272 } else if(result > 0) {
273
274
275 return s.next();
276 }
277
278 return true;
279 }
280
281 static boolean reseekAtOrAfter(HFileScanner s, Cell k)
282 throws IOException {
283
284 int result = s.reseekTo(k);
285 if (result <= 0) {
286 if (result == HConstants.INDEX_KEY_MAGIC) {
287
288 return true;
289 }
290
291
292
293 if (!s.isSeeked()) {
294 return s.seekTo();
295 }
296 return true;
297 }
298
299
300 return s.next();
301 }
302
303 @Override
304 public long getSequenceID() {
305 return reader.getSequenceID();
306 }
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322 @Override
323 public boolean requestSeek(Cell kv, boolean forward, boolean useBloom)
324 throws IOException {
325 if (kv.getFamilyLength() == 0) {
326 useBloom = false;
327 }
328
329 boolean haveToSeek = true;
330 if (useBloom) {
331
332 if (reader.getBloomFilterType() == BloomType.ROWCOL) {
333 haveToSeek = reader.passesGeneralBloomFilter(kv.getRowArray(),
334 kv.getRowOffset(), kv.getRowLength(), kv.getQualifierArray(),
335 kv.getQualifierOffset(), kv.getQualifierLength());
336 } else if (this.matcher != null && !matcher.hasNullColumnInQuery() &&
337 ((CellUtil.isDeleteFamily(kv) || CellUtil.isDeleteFamilyVersion(kv)))) {
338
339
340 haveToSeek = reader.passesDeleteFamilyBloomFilter(kv.getRowArray(),
341 kv.getRowOffset(), kv.getRowLength());
342 }
343 }
344
345 delayedReseek = forward;
346 delayedSeekKV = kv;
347
348 if (haveToSeek) {
349
350
351 realSeekDone = false;
352 long maxTimestampInFile = reader.getMaxTimestamp();
353 long seekTimestamp = kv.getTimestamp();
354 if (seekTimestamp > maxTimestampInFile) {
355
356
357
358
359
360
361 setCurrentCell(KeyValueUtil.createFirstOnRowColTS(kv, maxTimestampInFile));
362 } else {
363
364
365
366
367 enforceSeek();
368 }
369 return cur != null;
370 }
371
372
373
374
375
376
377
378
379 setCurrentCell(KeyValueUtil.createLastOnRowCol(kv));
380
381 realSeekDone = true;
382 return true;
383 }
384
385 Reader getReader() {
386 return reader;
387 }
388
389 KeyValue.KVComparator getComparator() {
390 return reader.getComparator();
391 }
392
393 @Override
394 public boolean realSeekDone() {
395 return realSeekDone;
396 }
397
398 @Override
399 public void enforceSeek() throws IOException {
400 if (realSeekDone)
401 return;
402
403 if (delayedReseek) {
404 reseek(delayedSeekKV);
405 } else {
406 seek(delayedSeekKV);
407 }
408 }
409
410 public void setScanQueryMatcher(ScanQueryMatcher matcher) {
411 this.matcher = matcher;
412 }
413
414 @Override
415 public boolean isFileScanner() {
416 return true;
417 }
418
419
420
421 static final long getSeekCount() {
422 return seekCount.get();
423 }
424 static final void instrument() {
425 seekCount = new AtomicLong();
426 }
427
428 @Override
429 public boolean shouldUseScanner(Scan scan, SortedSet<byte[]> columns, long oldestUnexpiredTS) {
430 return reader.passesTimerangeFilter(scan, oldestUnexpiredTS)
431 && reader.passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, columns);
432 }
433
434 @Override
435 @SuppressWarnings("deprecation")
436 public boolean seekToPreviousRow(Cell originalKey) throws IOException {
437 try {
438 try {
439 boolean keepSeeking = false;
440 Cell key = originalKey;
441 do {
442 KeyValue seekKey = KeyValueUtil.createFirstOnRow(key.getRowArray(), key.getRowOffset(),
443 key.getRowLength());
444 if (seekCount != null) seekCount.incrementAndGet();
445 if (!hfs.seekBefore(seekKey.getBuffer(), seekKey.getKeyOffset(),
446 seekKey.getKeyLength())) {
447 close();
448 return false;
449 }
450 KeyValue firstKeyOfPreviousRow = KeyValueUtil.createFirstOnRow(hfs.getKeyValue()
451 .getRowArray(), hfs.getKeyValue().getRowOffset(), hfs.getKeyValue().getRowLength());
452
453 if (seekCount != null) seekCount.incrementAndGet();
454 if (!seekAtOrAfter(hfs, firstKeyOfPreviousRow)) {
455 close();
456 return false;
457 }
458
459 setCurrentCell(hfs.getKeyValue());
460 this.stopSkippingKVsIfNextRow = true;
461 boolean resultOfSkipKVs;
462 try {
463 resultOfSkipKVs = skipKVsNewerThanReadpoint();
464 } finally {
465 this.stopSkippingKVsIfNextRow = false;
466 }
467 if (!resultOfSkipKVs
468 || getComparator().compareRows(cur, firstKeyOfPreviousRow) > 0) {
469 keepSeeking = true;
470 key = firstKeyOfPreviousRow;
471 continue;
472 } else {
473 keepSeeking = false;
474 }
475 } while (keepSeeking);
476 return true;
477 } finally {
478 realSeekDone = true;
479 }
480 } catch (IOException ioe) {
481 throw new IOException("Could not seekToPreviousRow " + this + " to key "
482 + originalKey, ioe);
483 }
484 }
485
486 @Override
487 public boolean seekToLastRow() throws IOException {
488 byte[] lastRow = reader.getLastRowKey();
489 if (lastRow == null) {
490 return false;
491 }
492 KeyValue seekKey = KeyValueUtil.createFirstOnRow(lastRow);
493 if (seek(seekKey)) {
494 return true;
495 } else {
496 return seekToPreviousRow(seekKey);
497 }
498 }
499
500 @Override
501 public boolean backwardSeek(Cell key) throws IOException {
502 seek(key);
503 if (cur == null
504 || getComparator().compareRows(cur.getRowArray(), cur.getRowOffset(),
505 cur.getRowLength(), key.getRowArray(), key.getRowOffset(),
506 key.getRowLength()) > 0) {
507 return seekToPreviousRow(key);
508 }
509 return true;
510 }
511
512 @Override
513 public Cell getNextIndexedKey() {
514 return hfs.getNextIndexedKey();
515 }
516 }