1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import java.io.IOException;
21 import java.io.InterruptedIOException;
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.LinkedList;
25 import java.util.List;
26 import java.util.concurrent.ExecutorService;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.KeyValue.MetaComparator;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.hbase.Cell;
34 import org.apache.hadoop.hbase.CellComparator;
35 import org.apache.hadoop.hbase.CellUtil;
36 import org.apache.hadoop.hbase.DoNotRetryIOException;
37 import org.apache.hadoop.hbase.HBaseConfiguration;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.HRegionInfo;
40 import org.apache.hadoop.hbase.NotServingRegionException;
41 import org.apache.hadoop.hbase.TableName;
42 import org.apache.hadoop.hbase.UnknownScannerException;
43 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
44 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
45 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
46 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
47 import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
48 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
49 import org.apache.hadoop.hbase.util.Bytes;
50
51 import com.google.common.annotations.VisibleForTesting;
52
53
54
55
56
57
58 @InterfaceAudience.Private
59 public class ClientScanner extends AbstractClientScanner {
60 private final Log LOG = LogFactory.getLog(this.getClass());
61
62
63 static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9);
64 protected Scan scan;
65 protected boolean closed = false;
66
67
68 protected HRegionInfo currentRegion = null;
69 protected ScannerCallableWithReplicas callable = null;
70 protected final LinkedList<Result> cache = new LinkedList<Result>();
71
72
73
74
75
76 protected final LinkedList<Result> partialResults = new LinkedList<Result>();
77
78
79
80
81
82 protected byte[] partialResultsRow = null;
83
84
85
86 protected Cell lastCellLoadedToCache = null;
87 protected final int caching;
88 protected long lastNext;
89
90 protected Result lastResult = null;
91 protected final long maxScannerResultSize;
92 private final ClusterConnection connection;
93 private final TableName tableName;
94 protected final int scannerTimeout;
95 protected boolean scanMetricsPublished = false;
96 protected RpcRetryingCaller<Result []> caller;
97 protected RpcControllerFactory rpcControllerFactory;
98 protected Configuration conf;
99
100
101
102
103
104 protected final int primaryOperationTimeout;
105 private int retries;
106 protected final ExecutorService pool;
107 private static MetaComparator metaComparator = new MetaComparator();
108
109
110
111
112
113
114
115
116
117
118 public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
119 ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
120 RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
121 throws IOException {
122 if (LOG.isTraceEnabled()) {
123 LOG.trace("Scan table=" + tableName
124 + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
125 }
126 this.scan = scan;
127 this.tableName = tableName;
128 this.lastNext = System.currentTimeMillis();
129 this.connection = connection;
130 this.pool = pool;
131 this.primaryOperationTimeout = primaryOperationTimeout;
132 this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
133 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
134 if (scan.getMaxResultSize() > 0) {
135 this.maxScannerResultSize = scan.getMaxResultSize();
136 } else {
137 this.maxScannerResultSize = conf.getLong(
138 HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
139 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
140 }
141 this.scannerTimeout = HBaseConfiguration.getInt(conf,
142 HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
143 HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
144 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
145
146
147 initScanMetrics(scan);
148
149
150 if (this.scan.getCaching() > 0) {
151 this.caching = this.scan.getCaching();
152 } else {
153 this.caching = conf.getInt(
154 HConstants.HBASE_CLIENT_SCANNER_CACHING,
155 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
156 }
157
158 this.caller = rpcFactory.<Result[]> newCaller();
159 this.rpcControllerFactory = controllerFactory;
160
161 this.conf = conf;
162 initializeScannerInConstruction();
163 }
164
165 protected void initializeScannerInConstruction() throws IOException{
166
167 nextScanner(this.caching, false);
168 }
169
170 protected ClusterConnection getConnection() {
171 return this.connection;
172 }
173
174
175
176
177
178
179
180 @Deprecated
181 protected byte [] getTableName() {
182 return this.tableName.getName();
183 }
184
185 protected TableName getTable() {
186 return this.tableName;
187 }
188
189 protected int getRetries() {
190 return this.retries;
191 }
192
193 protected int getScannerTimeout() {
194 return this.scannerTimeout;
195 }
196
197 protected Configuration getConf() {
198 return this.conf;
199 }
200
201 protected Scan getScan() {
202 return scan;
203 }
204
205 protected ExecutorService getPool() {
206 return pool;
207 }
208
209 protected int getPrimaryOperationTimeout() {
210 return primaryOperationTimeout;
211 }
212
213 protected int getCaching() {
214 return caching;
215 }
216
217 protected long getTimestamp() {
218 return lastNext;
219 }
220
221 @VisibleForTesting
222 protected long getMaxResultSize() {
223 return maxScannerResultSize;
224 }
225
226
227 protected boolean checkScanStopRow(final byte [] endKey) {
228 if (this.scan.getStopRow().length > 0) {
229
230 byte [] stopRow = scan.getStopRow();
231 int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
232 endKey, 0, endKey.length);
233 if (cmp <= 0) {
234
235
236 return true;
237 }
238 }
239 return false;
240 }
241
242 private boolean possiblyNextScanner(int nbRows, final boolean done) throws IOException {
243
244
245
246
247 if (callable != null && callable.switchedToADifferentReplica()) return true;
248 return nextScanner(nbRows, done);
249 }
250
251
252
253
254
255
256
257
258
259
260 protected boolean nextScanner(int nbRows, final boolean done)
261 throws IOException {
262
263 if (this.callable != null) {
264 this.callable.setClose();
265 call(callable, caller, scannerTimeout);
266 this.callable = null;
267 }
268
269
270 byte [] localStartKey;
271
272
273 if (this.currentRegion != null) {
274 byte [] endKey = this.currentRegion.getEndKey();
275 if (endKey == null ||
276 Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
277 checkScanStopRow(endKey) ||
278 done) {
279 close();
280 if (LOG.isTraceEnabled()) {
281 LOG.trace("Finished " + this.currentRegion);
282 }
283 return false;
284 }
285 localStartKey = endKey;
286 if (LOG.isTraceEnabled()) {
287 LOG.trace("Finished " + this.currentRegion);
288 }
289 } else {
290 localStartKey = this.scan.getStartRow();
291 }
292
293 if (LOG.isDebugEnabled() && this.currentRegion != null) {
294
295 LOG.debug("Advancing internal scanner to startKey at '" +
296 Bytes.toStringBinary(localStartKey) + "'");
297 }
298 try {
299 callable = getScannerCallable(localStartKey, nbRows);
300
301
302 call(callable, caller, scannerTimeout);
303 this.currentRegion = callable.getHRegionInfo();
304 if (this.scanMetrics != null) {
305 this.scanMetrics.countOfRegions.incrementAndGet();
306 }
307 } catch (IOException e) {
308 close();
309 throw e;
310 }
311 return true;
312 }
313
314 @VisibleForTesting
315 boolean isAnyRPCcancelled() {
316 return callable.isAnyRPCcancelled();
317 }
318
319 Result[] call(ScannerCallableWithReplicas callable,
320 RpcRetryingCaller<Result[]> caller, int scannerTimeout)
321 throws IOException, RuntimeException {
322 if (Thread.interrupted()) {
323 throw new InterruptedIOException();
324 }
325
326
327 return caller.callWithoutRetries(callable, scannerTimeout);
328 }
329
330 @InterfaceAudience.Private
331 protected ScannerCallableWithReplicas getScannerCallable(byte [] localStartKey,
332 int nbRows) {
333 scan.setStartRow(localStartKey);
334 ScannerCallable s =
335 new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
336 this.rpcControllerFactory);
337 s.setCaching(nbRows);
338 ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(tableName, getConnection(),
339 s, pool, primaryOperationTimeout, scan,
340 retries, scannerTimeout, caching, conf, caller);
341 return sr;
342 }
343
344
345
346
347
348
349
350
351
352
353
354
355 protected void writeScanMetrics() {
356 if (this.scanMetrics == null || scanMetricsPublished) {
357 return;
358 }
359 MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics);
360 scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray());
361 scanMetricsPublished = true;
362 }
363
364 @Override
365 public Result next() throws IOException {
366
367 if (cache.size() == 0 && this.closed) {
368 return null;
369 }
370 if (cache.size() == 0) {
371 loadCache();
372 }
373
374 if (cache.size() > 0) {
375 return cache.poll();
376 }
377
378
379 writeScanMetrics();
380 return null;
381 }
382
383 @VisibleForTesting
384 public int getCacheSize() {
385 return cache != null ? cache.size() : 0;
386 }
387
388
389
390
391 protected void loadCache() throws IOException {
392 Result[] values = null;
393 long remainingResultSize = maxScannerResultSize;
394 int countdown = this.caching;
395
396 callable.setCaching(this.caching);
397
398
399 boolean retryAfterOutOfOrderException = true;
400
401
402 boolean serverHasMoreResults = false;
403 boolean allResultsSkipped = false;
404
405
406 int retriesLeft = getRetries();
407 do {
408 allResultsSkipped = false;
409 try {
410
411
412
413 values = call(callable, caller, scannerTimeout);
414
415
416
417
418
419 if (values == null && callable.switchedToADifferentReplica()) {
420
421
422 clearPartialResults();
423 this.currentRegion = callable.getHRegionInfo();
424 continue;
425 }
426 retryAfterOutOfOrderException = true;
427 } catch (DoNotRetryIOException e) {
428
429
430 clearPartialResults();
431
432
433
434
435
436
437
438
439
440
441
442
443
444 if (e instanceof UnknownScannerException) {
445 long timeout = lastNext + scannerTimeout;
446
447
448
449 if (timeout < System.currentTimeMillis()) {
450 LOG.info("For hints related to the following exception, please try taking a look at: "
451 + "https://hbase.apache.org/book.html#trouble.client.scantimeout");
452 long elapsed = System.currentTimeMillis() - lastNext;
453 ScannerTimeoutException ex =
454 new ScannerTimeoutException(elapsed + "ms passed since the last invocation, "
455 + "timeout is currently set to " + scannerTimeout);
456 ex.initCause(e);
457 throw ex;
458 }
459 if (retriesLeft-- <= 0) {
460 throw e;
461 }
462 } else {
463
464
465 Throwable cause = e.getCause();
466 if ((cause != null && cause instanceof NotServingRegionException) ||
467 (cause != null && cause instanceof RegionServerStoppedException) ||
468 e instanceof OutOfOrderScannerNextException ||
469 e instanceof ScannerResetException) {
470
471
472 if (retriesLeft-- <= 0) {
473 throw e;
474 }
475 } else {
476 throw e;
477 }
478 }
479
480 if (this.lastResult != null) {
481
482
483
484
485 if (!this.lastResult.isPartial() && scan.getBatch() < 0 ) {
486 if (scan.isReversed()) {
487 scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
488 } else {
489 scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1]));
490 }
491 } else {
492
493 scan.setStartRow(lastResult.getRow());
494 }
495 }
496 if (e instanceof OutOfOrderScannerNextException) {
497 if (retryAfterOutOfOrderException) {
498 retryAfterOutOfOrderException = false;
499 } else {
500
501 throw new DoNotRetryIOException("Failed after retry of " +
502 "OutOfOrderScannerNextException: was there a rpc timeout?", e);
503 }
504 }
505
506 this.currentRegion = null;
507
508
509 callable = null;
510
511 continue;
512 }
513 long currentTime = System.currentTimeMillis();
514 if (this.scanMetrics != null) {
515 this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - lastNext);
516 }
517 lastNext = currentTime;
518
519
520
521 List<Result> resultsToAddToCache =
522 getResultsToAddToCache(values, callable.isHeartbeatMessage());
523 if (!resultsToAddToCache.isEmpty()) {
524 for (Result rs : resultsToAddToCache) {
525 rs = filterLoadedCell(rs);
526 if (rs == null) {
527 continue;
528 }
529 cache.add(rs);
530 for (Cell cell : rs.rawCells()) {
531 remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
532 }
533 countdown--;
534 this.lastResult = rs;
535 if (this.lastResult.isPartial() || scan.getBatch() > 0 ) {
536 updateLastCellLoadedToCache(this.lastResult);
537 } else {
538 this.lastCellLoadedToCache = null;
539 }
540 }
541 if (cache.isEmpty()) {
542
543 allResultsSkipped = true;
544 continue;
545 }
546 }
547 if (callable.isHeartbeatMessage()) {
548 if (cache.size() > 0) {
549
550
551
552
553 if (LOG.isTraceEnabled()) {
554 LOG.trace("Heartbeat message received and cache contains Results."
555 + " Breaking out of scan loop");
556 }
557 break;
558 }
559 continue;
560 }
561
562
563
564
565
566 if (null != values && values.length > 0 && callable.hasMoreResultsContext()) {
567
568
569 serverHasMoreResults = callable.getServerHasMoreResults() && partialResults.isEmpty();
570 }
571
572
573
574
575 } while (allResultsSkipped || (callable != null && callable.isHeartbeatMessage())
576 || (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults)
577 && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null))));
578 }
579
580
581
582
583
584
585
586
587 private boolean doneWithRegion(long remainingResultSize, int remainingRows,
588 boolean regionHasMoreResults) {
589 return remainingResultSize > 0 && remainingRows > 0 && !regionHasMoreResults;
590 }
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605 protected List<Result>
606 getResultsToAddToCache(Result[] resultsFromServer, boolean heartbeatMessage)
607 throws IOException {
608 int resultSize = resultsFromServer != null ? resultsFromServer.length : 0;
609 List<Result> resultsToAddToCache = new ArrayList<Result>(resultSize);
610
611 final boolean isBatchSet = scan != null && scan.getBatch() > 0;
612 final boolean allowPartials = scan != null && scan.getAllowPartialResults();
613
614
615
616
617
618
619
620 if (allowPartials || isBatchSet) {
621 addResultsToList(resultsToAddToCache, resultsFromServer, 0,
622 (null == resultsFromServer ? 0 : resultsFromServer.length));
623 return resultsToAddToCache;
624 }
625
626
627
628
629 if (resultsFromServer == null || resultsFromServer.length == 0) {
630
631
632
633 if (!partialResults.isEmpty() && !heartbeatMessage) {
634 resultsToAddToCache.add(Result.createCompleteResult(partialResults));
635 clearPartialResults();
636 }
637
638 return resultsToAddToCache;
639 }
640
641
642
643 Result last = resultsFromServer[resultsFromServer.length - 1];
644 Result partial = last.isPartial() ? last : null;
645
646 if (LOG.isTraceEnabled()) {
647 StringBuilder sb = new StringBuilder();
648 sb.append("number results from RPC: ").append(resultsFromServer.length).append(",");
649 sb.append("partial != null: ").append(partial != null).append(",");
650 sb.append("number of partials so far: ").append(partialResults.size());
651 LOG.trace(sb.toString());
652 }
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674 if (partial != null && partialResults.isEmpty()) {
675 addToPartialResults(partial);
676
677
678 addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length - 1);
679 } else if (!partialResults.isEmpty()) {
680 for (int i = 0; i < resultsFromServer.length; i++) {
681 Result result = resultsFromServer[i];
682
683
684
685 if (Bytes.equals(partialResultsRow, result.getRow())) {
686 addToPartialResults(result);
687
688
689
690 if (!result.isPartial()) {
691 resultsToAddToCache.add(Result.createCompleteResult(partialResults));
692 clearPartialResults();
693 }
694 } else {
695
696
697
698 if (!partialResults.isEmpty()) {
699 resultsToAddToCache.add(Result.createCompleteResult(partialResults));
700 clearPartialResults();
701 }
702
703
704
705
706 if (result.isPartial()) {
707 addToPartialResults(result);
708 } else {
709 resultsToAddToCache.add(result);
710 }
711 }
712 }
713 } else {
714 addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length);
715 }
716
717 return resultsToAddToCache;
718 }
719
720
721
722
723
724
725
726 private void addToPartialResults(final Result result) throws IOException {
727 final byte[] row = result.getRow();
728 if (partialResultsRow != null && !Bytes.equals(row, partialResultsRow)) {
729 throw new IOException("Partial result row does not match. All partial results must come "
730 + "from the same row. partialResultsRow: " + Bytes.toString(partialResultsRow) + "row: "
731 + Bytes.toString(row));
732 }
733 partialResultsRow = row;
734 partialResults.add(result);
735 }
736
737
738
739
740 private void clearPartialResults() {
741 partialResults.clear();
742 partialResultsRow = null;
743 }
744
745
746
747
748
749
750
751
752 private void addResultsToList(List<Result> outputList, Result[] inputArray, int start, int end) {
753 if (inputArray == null || start < 0 || end > inputArray.length) return;
754
755 for (int i = start; i < end; i++) {
756 outputList.add(inputArray[i]);
757 }
758 }
759
760 @Override
761 public void close() {
762 if (!scanMetricsPublished) writeScanMetrics();
763 if (callable != null) {
764 callable.setClose();
765 try {
766 call(callable, caller, scannerTimeout);
767 } catch (UnknownScannerException e) {
768
769
770
771 } catch (IOException e) {
772
773 LOG.warn("scanner failed to close. Exception follows: " + e);
774 }
775 callable = null;
776 }
777 closed = true;
778 }
779
780
781
782
783
784
785 protected static byte[] createClosestRowBefore(byte[] row) {
786 if (row == null) {
787 throw new IllegalArgumentException("The passed row is empty");
788 }
789 if (Bytes.equals(row, HConstants.EMPTY_BYTE_ARRAY)) {
790 return MAX_BYTE_ARRAY;
791 }
792 if (row[row.length - 1] == 0) {
793 return Arrays.copyOf(row, row.length - 1);
794 } else {
795 byte[] closestFrontRow = Arrays.copyOf(row, row.length);
796 closestFrontRow[row.length - 1] = (byte) ((closestFrontRow[row.length - 1] & 0xff) - 1);
797 closestFrontRow = Bytes.add(closestFrontRow, MAX_BYTE_ARRAY);
798 return closestFrontRow;
799 }
800 }
801
802 @Override
803 public boolean renewLease() {
804 if (callable != null) {
805
806 callable.setRenew(true);
807 try {
808 this.caller.callWithoutRetries(callable, this.scannerTimeout);
809 } catch (Exception e) {
810 return false;
811 } finally {
812 callable.setRenew(false);
813 }
814 return true;
815 }
816 return false;
817 }
818
819 protected void updateLastCellLoadedToCache(Result result) {
820 if (result.rawCells().length == 0) {
821 return;
822 }
823 this.lastCellLoadedToCache = result.rawCells()[result.rawCells().length - 1];
824 }
825
826
827
828
829
830 private int compare(Cell a, Cell b) {
831 int r = 0;
832 if (currentRegion != null && currentRegion.isMetaRegion()) {
833 r = metaComparator.compareRows(a, b);
834 } else {
835 r = CellComparator.compareRows(a, b);
836 }
837 if (r != 0) {
838 return this.scan.isReversed() ? -r : r;
839 }
840 return CellComparator.compareWithoutRow(a, b);
841 }
842
843 private Result filterLoadedCell(Result result) {
844
845
846
847
848 if (lastCellLoadedToCache == null || result.rawCells().length == 0) {
849 return result;
850 }
851 if (compare(this.lastCellLoadedToCache, result.rawCells()[0]) < 0) {
852
853
854 return result;
855 }
856 if (compare(this.lastCellLoadedToCache, result.rawCells()[result.rawCells().length - 1]) >= 0) {
857
858 return null;
859 }
860
861
862 int index = 1;
863 while (index < result.rawCells().length) {
864 if (compare(this.lastCellLoadedToCache, result.rawCells()[index]) < 0) {
865 break;
866 }
867 index++;
868 }
869 Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length);
870 return Result.create(list, result.getExists(), result.isStale(), result.isPartial());
871 }
872 }