1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.NavigableSet;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.locks.ReentrantLock;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.hbase.Cell;
34 import org.apache.hadoop.hbase.CellUtil;
35 import org.apache.hadoop.hbase.DoNotRetryIOException;
36 import org.apache.hadoop.hbase.HConstants;
37 import org.apache.hadoop.hbase.KeyValue;
38 import org.apache.hadoop.hbase.KeyValue.KVComparator;
39 import org.apache.hadoop.hbase.KeyValueUtil;
40 import org.apache.hadoop.hbase.classification.InterfaceAudience;
41 import org.apache.hadoop.hbase.client.IsolationLevel;
42 import org.apache.hadoop.hbase.client.Scan;
43 import org.apache.hadoop.hbase.executor.ExecutorService;
44 import org.apache.hadoop.hbase.filter.Filter;
45 import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
46 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
47 import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
48 import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
49 import org.apache.hadoop.hbase.util.Bytes;
50 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
51
52
53
54
55
56 @InterfaceAudience.Private
57 public class StoreScanner extends NonReversedNonLazyKeyValueScanner
58 implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
59 static final Log LOG = LogFactory.getLog(StoreScanner.class);
60 protected Store store;
61 protected ScanQueryMatcher matcher;
62 protected KeyValueHeap heap;
63 protected boolean cacheBlocks;
64
65 protected int countPerRow = 0;
66 protected int storeLimit = -1;
67 protected int storeOffset = 0;
68
69
70
71 protected boolean closing = false;
72 protected final boolean isGet;
73 protected final boolean explicitColumnQuery;
74 protected final boolean useRowColBloom;
75
76
77
78 protected boolean isParallelSeekEnabled = false;
79 protected ExecutorService executor;
80 protected final Scan scan;
81 protected final NavigableSet<byte[]> columns;
82 protected final long oldestUnexpiredTS;
83 protected final long now;
84 protected final int minVersions;
85 protected final long maxRowSize;
86 protected final long cellsPerHeartbeatCheck;
87
88
89
90
91
92 private long kvsScanned = 0;
93 private Cell prevCell = null;
94
95
96 static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
97 public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
98 "hbase.storescanner.parallel.seek.enable";
99
100
101 protected static boolean lazySeekEnabledGlobally =
102 LAZY_SEEK_ENABLED_BY_DEFAULT;
103
104
105
106
107
108
109 public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
110 "hbase.cells.scanned.per.heartbeat.check";
111
112
113
114
115 public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
116
117
118 protected Cell lastTop = null;
119
120
121 private boolean scanUsePread = false;
122 protected ReentrantLock lock = new ReentrantLock();
123
124 protected final long readPt;
125
126
127 enum StoreScannerCompactionRace {
128 BEFORE_SEEK,
129 AFTER_SEEK,
130 COMPACT_COMPLETE
131 }
132
133
134 protected StoreScanner(Store store, boolean cacheBlocks, Scan scan,
135 final NavigableSet<byte[]> columns, long ttl, int minVersions, long readPt) {
136 this.readPt = readPt;
137 this.store = store;
138 this.cacheBlocks = cacheBlocks;
139 isGet = scan.isGetScan();
140 int numCol = columns == null ? 0 : columns.size();
141 explicitColumnQuery = numCol > 0;
142 this.scan = scan;
143 this.columns = columns;
144 this.now = EnvironmentEdgeManager.currentTime();
145 this.oldestUnexpiredTS = now - ttl;
146 this.minVersions = minVersions;
147
148 if (store != null && ((HStore)store).getHRegion() != null
149 && ((HStore)store).getHRegion().getBaseConf() != null) {
150 Configuration conf = ((HStore) store).getHRegion().getBaseConf();
151 this.maxRowSize =
152 conf.getLong(HConstants.TABLE_MAX_ROWSIZE_KEY, HConstants.TABLE_MAX_ROWSIZE_DEFAULT);
153 this.scanUsePread = conf.getBoolean("hbase.storescanner.use.pread", scan.isSmall());
154
155 long tmpCellsPerTimeoutCheck =
156 conf.getLong(HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK,
157 DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK);
158 this.cellsPerHeartbeatCheck =
159 tmpCellsPerTimeoutCheck > 0 ? tmpCellsPerTimeoutCheck
160 : DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
161 } else {
162 this.maxRowSize = HConstants.TABLE_MAX_ROWSIZE_DEFAULT;
163 this.scanUsePread = scan.isSmall();
164 this.cellsPerHeartbeatCheck = DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
165 }
166
167
168
169
170
171 useRowColBloom = numCol > 1 || (!isGet && numCol == 1);
172
173
174
175
176 if (store != null && ((HStore)store).getHRegion() != null
177 && store.getStorefilesCount() > 1) {
178 RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
179 if (rsService == null || !rsService.getConfiguration().getBoolean(
180 STORESCANNER_PARALLEL_SEEK_ENABLE, false)) return;
181 isParallelSeekEnabled = true;
182 executor = rsService.getExecutorService();
183 }
184 }
185
186
187
188
189
190
191
192
193
194
195 public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
196 long readPt)
197 throws IOException {
198 this(store, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
199 scanInfo.getMinVersions(), readPt);
200 if (columns != null && scan.isRaw()) {
201 throw new DoNotRetryIOException(
202 "Cannot specify any column for a raw scan");
203 }
204 matcher = new ScanQueryMatcher(scan, scanInfo, columns,
205 ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
206 oldestUnexpiredTS, now, store.getCoprocessorHost());
207
208 this.store.addChangedReaderObserver(this);
209
210
211 List<KeyValueScanner> scanners = getScannersNoCompaction();
212
213
214
215
216
217 seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery
218 && lazySeekEnabledGlobally, isParallelSeekEnabled);
219
220
221 this.storeLimit = scan.getMaxResultsPerColumnFamily();
222
223
224 this.storeOffset = scan.getRowOffsetPerColumnFamily();
225
226
227 resetKVHeap(scanners, store.getComparator());
228 }
229
230
231
232
233
234
235
236
237
238
239
240 public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
241 List<? extends KeyValueScanner> scanners, ScanType scanType,
242 long smallestReadPoint, long earliestPutTs) throws IOException {
243 this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
244 }
245
246
247
248
249
250
251
252
253
254
255
256
257 public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
258 List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
259 byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
260 this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
261 earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
262 }
263
264 private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
265 List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
266 long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
267 this(store, false, scan, null, scanInfo.getTtl(), scanInfo.getMinVersions(),
268 ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
269 if (dropDeletesFromRow == null) {
270 matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
271 earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
272 } else {
273 matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
274 oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
275 }
276
277
278 scanners = selectScannersFrom(scanners);
279
280
281 seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
282
283
284 resetKVHeap(scanners, store.getComparator());
285 }
286
287
288 StoreScanner(final Scan scan, ScanInfo scanInfo,
289 ScanType scanType, final NavigableSet<byte[]> columns,
290 final List<KeyValueScanner> scanners) throws IOException {
291 this(scan, scanInfo, scanType, columns, scanners,
292 HConstants.LATEST_TIMESTAMP,
293
294 0);
295 }
296
297
298 StoreScanner(final Scan scan, ScanInfo scanInfo,
299 ScanType scanType, final NavigableSet<byte[]> columns,
300 final List<KeyValueScanner> scanners, long earliestPutTs)
301 throws IOException {
302 this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
303
304 0);
305 }
306
307 public StoreScanner(final Scan scan, ScanInfo scanInfo,
308 ScanType scanType, final NavigableSet<byte[]> columns,
309 final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
310 throws IOException {
311 this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
312 scanInfo.getMinVersions(), readPt);
313 this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
314 Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
315
316
317 if (this.store != null) {
318 this.store.addChangedReaderObserver(this);
319 }
320
321 seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
322 resetKVHeap(scanners, scanInfo.getComparator());
323 }
324
325
326
327
328
329 protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
330 final boolean isCompaction = false;
331 boolean usePread = isGet || scanUsePread;
332 return selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread,
333 isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
334 }
335
336
337
338
339
340
341
342
343
344 protected void seekScanners(List<? extends KeyValueScanner> scanners,
345 Cell seekKey, boolean isLazy, boolean isParallelSeek)
346 throws IOException {
347
348
349
350
351 if (isLazy) {
352 for (KeyValueScanner scanner : scanners) {
353 scanner.requestSeek(seekKey, false, true);
354 }
355 } else {
356 if (!isParallelSeek) {
357 long totalScannersSoughtBytes = 0;
358 for (KeyValueScanner scanner : scanners) {
359 if (totalScannersSoughtBytes >= maxRowSize) {
360 throw new RowTooBigException("Max row size allowed: " + maxRowSize
361 + ", but row is bigger than that");
362 }
363 scanner.seek(seekKey);
364 Cell c = scanner.peek();
365 if (c != null) {
366 totalScannersSoughtBytes += CellUtil.estimatedSerializedSizeOf(c);
367 }
368 }
369 } else {
370 parallelSeek(scanners, seekKey);
371 }
372 }
373 }
374
375 protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
376 KVComparator comparator) throws IOException {
377
378 heap = new KeyValueHeap(scanners, comparator);
379 }
380
381
382
383
384
385 protected List<KeyValueScanner> selectScannersFrom(
386 final List<? extends KeyValueScanner> allScanners) {
387 boolean memOnly;
388 boolean filesOnly;
389 if (scan instanceof InternalScan) {
390 InternalScan iscan = (InternalScan)scan;
391 memOnly = iscan.isCheckOnlyMemStore();
392 filesOnly = iscan.isCheckOnlyStoreFiles();
393 } else {
394 memOnly = false;
395 filesOnly = false;
396 }
397
398 List<KeyValueScanner> scanners =
399 new ArrayList<KeyValueScanner>(allScanners.size());
400
401
402
403 long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS :
404 Long.MIN_VALUE;
405
406
407 for (KeyValueScanner kvs : allScanners) {
408 boolean isFile = kvs.isFileScanner();
409 if ((!isFile && filesOnly) || (isFile && memOnly)) {
410 continue;
411 }
412
413 if (kvs.shouldUseScanner(scan, columns, expiredTimestampCutoff)) {
414 scanners.add(kvs);
415 }
416 }
417 return scanners;
418 }
419
420 @Override
421 public Cell peek() {
422 lock.lock();
423 try {
424 if (this.heap == null) {
425 return this.lastTop;
426 }
427 return this.heap.peek();
428 } finally {
429 lock.unlock();
430 }
431 }
432
433 @Override
434 public KeyValue next() {
435
436 throw new RuntimeException("Never call StoreScanner.next()");
437 }
438
439 @Override
440 public void close() {
441 lock.lock();
442 try {
443 if (this.closing) return;
444 this.closing = true;
445
446 if (this.store != null)
447 this.store.deleteChangedReaderObserver(this);
448 if (this.heap != null)
449 this.heap.close();
450 this.heap = null;
451 this.lastTop = null;
452 } finally {
453 lock.unlock();
454 }
455 }
456
457 @Override
458 public boolean seek(Cell key) throws IOException {
459 lock.lock();
460 try {
461
462 checkReseek();
463 return this.heap.seek(key);
464 } finally {
465 lock.unlock();
466 }
467 }
468
469 @Override
470 public boolean next(List<Cell> outResult) throws IOException {
471 return next(outResult, NoLimitScannerContext.getInstance());
472 }
473
474
475
476
477
478
479
480 @Override
481 public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
482 lock.lock();
483
484 try {
485 if (scannerContext == null) {
486 throw new IllegalArgumentException("Scanner context cannot be null");
487 }
488 if (checkReseek()) {
489 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
490 }
491
492
493
494 if (this.heap == null) {
495 close();
496 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
497 }
498
499 Cell peeked = this.heap.peek();
500 if (peeked == null) {
501 close();
502 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
503 }
504
505
506
507 byte[] row = peeked.getRowArray();
508 int offset = peeked.getRowOffset();
509 short length = peeked.getRowLength();
510
511
512
513
514 if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.row == null ||
515 !Bytes.equals(row, offset, length, matcher.row, matcher.rowOffset, matcher.rowLength)) {
516 this.countPerRow = 0;
517 matcher.setRow(row, offset, length);
518 }
519
520
521 if (!scannerContext.getKeepProgress()) scannerContext.clearProgress();
522
523 Cell cell;
524
525
526 KeyValue.KVComparator comparator =
527 store != null ? store.getComparator() : null;
528
529 int count = 0;
530 long totalBytesRead = 0;
531
532 LOOP: while((cell = this.heap.peek()) != null) {
533
534 if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
535 scannerContext.updateTimeProgress();
536 if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
537 return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
538 }
539 }
540
541 if (prevCell != cell) ++kvsScanned;
542 checkScanOrder(prevCell, cell, comparator);
543 prevCell = cell;
544
545 ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
546 qcode = optimize(qcode, cell);
547 switch(qcode) {
548 case INCLUDE:
549 case INCLUDE_AND_SEEK_NEXT_ROW:
550 case INCLUDE_AND_SEEK_NEXT_COL:
551
552 Filter f = matcher.getFilter();
553 if (f != null) {
554
555 cell = f.transformCell(cell);
556 }
557
558 this.countPerRow++;
559 if (storeLimit > -1 &&
560 this.countPerRow > (storeLimit + storeOffset)) {
561
562 if (!matcher.moreRowsMayExistAfter(cell)) {
563 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
564 }
565 seekToNextRow(cell);
566 break LOOP;
567 }
568
569
570
571 if (this.countPerRow > storeOffset) {
572 outResult.add(cell);
573
574
575 count++;
576 totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell);
577
578
579 scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));
580 scannerContext.incrementBatchProgress(1);
581
582 if (totalBytesRead > maxRowSize) {
583 throw new RowTooBigException("Max row size allowed: " + maxRowSize
584 + ", but the row is bigger than that.");
585 }
586 }
587
588 if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
589 if (!matcher.moreRowsMayExistAfter(cell)) {
590 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
591 }
592 seekToNextRow(cell);
593 } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
594 seekAsDirection(matcher.getKeyForNextColumn(cell));
595 } else {
596 this.heap.next();
597 }
598
599 if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
600 break LOOP;
601 }
602 if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
603 break LOOP;
604 }
605 continue;
606
607 case DONE:
608 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
609
610 case DONE_SCAN:
611 close();
612 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
613
614 case SEEK_NEXT_ROW:
615
616
617 if (!matcher.moreRowsMayExistAfter(cell)) {
618 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
619 }
620
621 seekToNextRow(cell);
622 break;
623
624 case SEEK_NEXT_COL:
625 seekAsDirection(matcher.getKeyForNextColumn(cell));
626 break;
627
628 case SKIP:
629 this.heap.next();
630 break;
631
632 case SEEK_NEXT_USING_HINT:
633
634 Cell nextKV = matcher.getNextKeyHint(cell);
635 if (nextKV != null) {
636 seekAsDirection(nextKV);
637 } else {
638 heap.next();
639 }
640 break;
641
642 default:
643 throw new RuntimeException("UNEXPECTED");
644 }
645 }
646
647 if (count > 0) {
648 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
649 }
650
651
652 close();
653 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
654 } finally {
655 lock.unlock();
656 }
657 }
658
659
660
661
662
663 private ScanQueryMatcher.MatchCode optimize(ScanQueryMatcher.MatchCode qcode, Cell cell) {
664 Cell nextIndexedKey = getNextIndexedKey();
665 if (nextIndexedKey == null || nextIndexedKey == HConstants.NO_NEXT_INDEXED_KEY ||
666 store == null) {
667 return qcode;
668 }
669 switch(qcode) {
670 case INCLUDE_AND_SEEK_NEXT_COL:
671 case SEEK_NEXT_COL:
672 {
673 if (matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) {
674 return qcode == MatchCode.SEEK_NEXT_COL ? MatchCode.SKIP : MatchCode.INCLUDE;
675 }
676 break;
677 }
678 case INCLUDE_AND_SEEK_NEXT_ROW:
679 case SEEK_NEXT_ROW:
680 {
681 if (matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) {
682 return qcode == MatchCode.SEEK_NEXT_ROW ? MatchCode.SKIP : MatchCode.INCLUDE;
683 }
684 break;
685 }
686 default:
687 break;
688 }
689 return qcode;
690 }
691
692
693 @Override
694 public void updateReaders() throws IOException {
695 lock.lock();
696 try {
697 if (this.closing) return;
698
699
700
701
702
703
704 if (this.heap == null) return;
705
706
707 this.lastTop = this.peek();
708
709
710
711
712 this.heap.close();
713 this.heap = null;
714
715
716 } finally {
717 lock.unlock();
718 }
719 }
720
721
722
723
724
725
726 protected boolean checkReseek() throws IOException {
727 if (this.heap == null && this.lastTop != null) {
728 resetScannerStack(this.lastTop);
729 if (this.heap.peek() == null
730 || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
731 LOG.debug("Storescanner.peek() is changed where before = "
732 + this.lastTop.toString() + ",and after = " + this.heap.peek());
733 this.lastTop = null;
734 return true;
735 }
736 this.lastTop = null;
737 }
738
739 return false;
740 }
741
742 protected void resetScannerStack(Cell lastTopKey) throws IOException {
743 if (heap != null) {
744 throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
745 }
746
747
748
749
750 List<KeyValueScanner> scanners = getScannersNoCompaction();
751
752
753 seekScanners(scanners, lastTopKey, false, isParallelSeekEnabled);
754
755
756 resetKVHeap(scanners, store.getComparator());
757
758
759
760
761 Cell kv = heap.peek();
762 if (kv == null) {
763 kv = lastTopKey;
764 }
765 byte[] row = kv.getRowArray();
766 int offset = kv.getRowOffset();
767 short length = kv.getRowLength();
768 if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row,
769 matcher.rowOffset, matcher.rowLength)) {
770 this.countPerRow = 0;
771 matcher.reset();
772 matcher.setRow(row, offset, length);
773 }
774 }
775
776
777
778
779
780
781
782
783 protected void checkScanOrder(Cell prevKV, Cell kv,
784 KeyValue.KVComparator comparator) throws IOException {
785
786 assert prevKV == null || comparator == null
787 || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV
788 + " followed by a " + "smaller key " + kv + " in cf " + store;
789 }
790
791 protected boolean seekToNextRow(Cell kv) throws IOException {
792 return reseek(KeyValueUtil.createLastOnRow(kv));
793 }
794
795
796
797
798
799
800
801 protected boolean seekAsDirection(Cell kv)
802 throws IOException {
803 return reseek(kv);
804 }
805
806 @Override
807 public boolean reseek(Cell kv) throws IOException {
808 lock.lock();
809 try {
810
811
812
813 checkReseek();
814 if (explicitColumnQuery && lazySeekEnabledGlobally) {
815 return heap.requestSeek(kv, true, useRowColBloom);
816 }
817 return heap.reseek(kv);
818 } finally {
819 lock.unlock();
820 }
821 }
822
823 @Override
824 public long getSequenceID() {
825 return 0;
826 }
827
828
829
830
831
832
833
834 private void parallelSeek(final List<? extends KeyValueScanner>
835 scanners, final Cell kv) throws IOException {
836 if (scanners.isEmpty()) return;
837 int storeFileScannerCount = scanners.size();
838 CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
839 List<ParallelSeekHandler> handlers =
840 new ArrayList<ParallelSeekHandler>(storeFileScannerCount);
841 for (KeyValueScanner scanner : scanners) {
842 if (scanner instanceof StoreFileScanner) {
843 ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
844 this.readPt, latch);
845 executor.submit(seekHandler);
846 handlers.add(seekHandler);
847 } else {
848 scanner.seek(kv);
849 latch.countDown();
850 }
851 }
852
853 try {
854 latch.await();
855 } catch (InterruptedException ie) {
856 throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
857 }
858
859 for (ParallelSeekHandler handler : handlers) {
860 if (handler.getErr() != null) {
861 throw new IOException(handler.getErr());
862 }
863 }
864 }
865
866
867
868
869
870 List<KeyValueScanner> getAllScannersForTesting() {
871 List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>();
872 KeyValueScanner current = heap.getCurrentForTesting();
873 if (current != null)
874 allScanners.add(current);
875 for (KeyValueScanner scanner : heap.getHeap())
876 allScanners.add(scanner);
877 return allScanners;
878 }
879
880 static void enableLazySeekGlobally(boolean enable) {
881 lazySeekEnabledGlobally = enable;
882 }
883
884
885
886
887 public long getEstimatedNumberOfKvsScanned() {
888 return this.kvsScanned;
889 }
890
891 @Override
892 public Cell getNextIndexedKey() {
893 return this.heap.getNextIndexedKey();
894 }
895 }
896