1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import static org.apache.hadoop.hbase.client.ClientScanner.createClosestRowBefore;
22
23 import java.io.IOException;
24 import java.io.InterruptedIOException;
25 import java.util.HashSet;
26 import java.util.Set;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.CancellationException;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Future;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.TimeoutException;
34 import java.util.concurrent.atomic.AtomicBoolean;
35
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.hadoop.hbase.classification.InterfaceAudience;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.hbase.DoNotRetryIOException;
41 import org.apache.hadoop.hbase.HRegionInfo;
42 import org.apache.hadoop.hbase.RegionLocations;
43 import org.apache.hadoop.hbase.TableName;
44 import org.apache.hadoop.hbase.util.Bytes;
45 import org.apache.hadoop.hbase.util.Pair;
46
47 import com.google.common.annotations.VisibleForTesting;
48
49
50
51
52
53
54
55
56
57
58
59
60
61 @InterfaceAudience.Private
62 class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
63 private final Log LOG = LogFactory.getLog(this.getClass());
64 volatile ScannerCallable currentScannerCallable;
65 AtomicBoolean replicaSwitched = new AtomicBoolean(false);
66 final ClusterConnection cConnection;
67 protected final ExecutorService pool;
68 protected final int timeBeforeReplicas;
69 private final Scan scan;
70 private final int retries;
71 private Result lastResult;
72 private final RpcRetryingCaller<Result[]> caller;
73 private final TableName tableName;
74 private Configuration conf;
75 private int scannerTimeout;
76 private Set<ScannerCallable> outstandingCallables = new HashSet<ScannerCallable>();
77 private boolean someRPCcancelled = false;
78
79 public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection,
80 ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan,
81 int retries, int scannerTimeout, int caching, Configuration conf,
82 RpcRetryingCaller<Result []> caller) {
83 this.currentScannerCallable = baseCallable;
84 this.cConnection = cConnection;
85 this.pool = pool;
86 if (timeBeforeReplicas < 0) {
87 throw new IllegalArgumentException("Invalid value of operation timeout on the primary");
88 }
89 this.timeBeforeReplicas = timeBeforeReplicas;
90 this.scan = scan;
91 this.retries = retries;
92 this.tableName = tableName;
93 this.conf = conf;
94 this.scannerTimeout = scannerTimeout;
95 this.caller = caller;
96 }
97
98 public void setClose() {
99 currentScannerCallable.setClose();
100 }
101
102 public void setRenew(boolean val) {
103 currentScannerCallable.setRenew(val);
104 }
105
106 public void setCaching(int caching) {
107 currentScannerCallable.setCaching(caching);
108 }
109
110 public int getCaching() {
111 return currentScannerCallable.getCaching();
112 }
113
114 public HRegionInfo getHRegionInfo() {
115 return currentScannerCallable.getHRegionInfo();
116 }
117
118 public boolean getServerHasMoreResults() {
119 return currentScannerCallable.getServerHasMoreResults();
120 }
121
122 public void setServerHasMoreResults(boolean serverHasMoreResults) {
123 currentScannerCallable.setServerHasMoreResults(serverHasMoreResults);
124 }
125
126 public boolean hasMoreResultsContext() {
127 return currentScannerCallable.hasMoreResultsContext();
128 }
129
130 public void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
131 currentScannerCallable.setHasMoreResultsContext(serverHasMoreResultsContext);
132 }
133
134 @Override
135 public Result [] call(int timeout) throws IOException {
136
137
138
139
140 if (currentScannerCallable != null && currentScannerCallable.closed) {
141
142
143 if (LOG.isTraceEnabled()) {
144 LOG.trace("Closing scanner id=" + currentScannerCallable.scannerId);
145 }
146 Result[] r = currentScannerCallable.call(timeout);
147 currentScannerCallable = null;
148 return r;
149 }
150
151
152
153
154
155
156
157 RegionLocations rl = null;
158 try {
159 rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
160 RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
161 currentScannerCallable.getRow());
162 } catch (RetriesExhaustedException | DoNotRetryIOException e) {
163
164
165 if (cConnection instanceof ConnectionManager.HConnectionImplementation) {
166 rl = ((ConnectionManager.HConnectionImplementation) cConnection)
167 .getCachedLocation(tableName, currentScannerCallable.getRow());
168 if (rl == null) {
169 throw e;
170 }
171 } else {
172
173 throw e;
174 }
175 }
176
177
178 ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
179 new ResultBoundedCompletionService<Pair<Result[], ScannerCallable>>(
180 RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf), pool,
181 rl.size() * 5);
182
183 AtomicBoolean done = new AtomicBoolean(false);
184 replicaSwitched.set(false);
185
186 addCallsForCurrentReplica(cs, rl);
187 int startIndex = 0;
188
189 try {
190
191 Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeBeforeReplicas,
192 TimeUnit.MICROSECONDS);
193 if (f != null) {
194
195 Pair<Result[], ScannerCallable> r = f.get();
196 if (r != null && r.getSecond() != null) {
197 updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
198 }
199 return r == null ? null : r.getFirst();
200 }
201 } catch (ExecutionException e) {
202
203 if (LOG.isDebugEnabled()) {
204 LOG.debug("Scan with primary region returns " + e.getCause());
205 }
206
207
208
209 if ((rl.size() == 1) || (scan.getConsistency() == Consistency.STRONG)) {
210
211 RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
212 }
213
214 startIndex = 1;
215 } catch (CancellationException e) {
216 throw new InterruptedIOException(e.getMessage());
217 } catch (InterruptedException e) {
218 throw new InterruptedIOException(e.getMessage());
219 }
220
221
222 int endIndex = rl.size();
223 if (scan.getConsistency() == Consistency.STRONG) {
224
225 endIndex = 1;
226 } else {
227
228 addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
229 }
230
231 try {
232 Future<Pair<Result[], ScannerCallable>> f = cs.pollForFirstSuccessfullyCompletedTask(timeout,
233 TimeUnit.MILLISECONDS, startIndex, endIndex);
234
235 if (f == null) {
236 throw new IOException("Failed to get result within timeout, timeout=" + timeout + "ms");
237 }
238 Pair<Result[], ScannerCallable> r = f.get();
239
240 if (r != null && r.getSecond() != null) {
241 updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
242 }
243 return r == null ? null : r.getFirst();
244
245 } catch (ExecutionException e) {
246 RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
247 } catch (CancellationException e) {
248 throw new InterruptedIOException(e.getMessage());
249 } catch (InterruptedException e) {
250 throw new InterruptedIOException(e.getMessage());
251 } finally {
252
253
254 cs.cancelAll();
255 }
256 LOG.error("Imposible? Arrive at an unreachable line...");
257 throw new IOException("Imposible? Arrive at an unreachable line...");
258 }
259
260 private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result,
261 AtomicBoolean done, ExecutorService pool) {
262 if (done.compareAndSet(false, true)) {
263 if (currentScannerCallable != scanner) replicaSwitched.set(true);
264 currentScannerCallable = scanner;
265
266 if (result != null && result.length != 0) this.lastResult = result[result.length - 1];
267 if (LOG.isTraceEnabled()) {
268 LOG.trace("Setting current scanner as id=" + currentScannerCallable.scannerId +
269 " associated with replica=" + currentScannerCallable.getHRegionInfo().getReplicaId());
270 }
271
272 outstandingCallables.remove(scanner);
273 for (ScannerCallable s : outstandingCallables) {
274 if (LOG.isTraceEnabled()) {
275 LOG.trace("Closing scanner id=" + s.scannerId +
276 ", replica=" + s.getHRegionInfo().getRegionId() +
277 " because slow and replica=" +
278 this.currentScannerCallable.getHRegionInfo().getReplicaId() + " succeeded");
279 }
280
281
282
283 s.setClose();
284 final RetryingRPC r = new RetryingRPC(s);
285 pool.submit(new Callable<Void>(){
286 @Override
287 public Void call() throws Exception {
288 r.call(scannerTimeout);
289 return null;
290 }
291 });
292 }
293
294 outstandingCallables.clear();
295 }
296 }
297
298
299
300
301
302
303 public boolean switchedToADifferentReplica() {
304 return replicaSwitched.get();
305 }
306
307
308
309
310
311
312
313 public boolean isHeartbeatMessage() {
314 return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage();
315 }
316
317 private void addCallsForCurrentReplica(
318 ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) {
319 RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
320 outstandingCallables.add(currentScannerCallable);
321 cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id);
322 }
323
324 private void addCallsForOtherReplicas(
325 ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl,
326 int min, int max) {
327
328 for (int id = min; id <= max; id++) {
329 if (currentScannerCallable.id == id) {
330 continue;
331 }
332 ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id);
333 setStartRowForReplicaCallable(s);
334 outstandingCallables.add(s);
335 RetryingRPC retryingOnReplica = new RetryingRPC(s);
336 cs.submit(retryingOnReplica, scannerTimeout, id);
337 }
338 }
339
340
341
342
343
344 private void setStartRowForReplicaCallable(ScannerCallable callable) {
345 if (this.lastResult == null || callable == null) return;
346
347 if (this.lastResult.isPartial()) {
348
349
350
351
352 callable.getScan().setStartRow(this.lastResult.getRow());
353 } else {
354
355
356
357 if (callable.getScan().isReversed()) {
358 callable.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow()));
359 } else {
360 callable.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1]));
361 }
362 }
363 }
364
365 @VisibleForTesting
366 boolean isAnyRPCcancelled() {
367 return someRPCcancelled;
368 }
369
370 class RetryingRPC implements RetryingCallable<Pair<Result[], ScannerCallable>>, Cancellable {
371 final ScannerCallable callable;
372 RpcRetryingCaller<Result[]> caller;
373 private volatile boolean cancelled = false;
374
375 RetryingRPC(ScannerCallable callable) {
376 this.callable = callable;
377
378
379
380
381
382 this.caller = ScannerCallableWithReplicas.this.caller;
383 if (scan.getConsistency() == Consistency.TIMELINE) {
384 this.caller = RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf)
385 .<Result[]>newCaller();
386 }
387 }
388
389 @Override
390 public Pair<Result[], ScannerCallable> call(int callTimeout) throws IOException {
391
392
393 if (cancelled) {
394 return null;
395 }
396 Result[] res = this.caller.callWithoutRetries(this.callable, callTimeout);
397 return new Pair<Result[], ScannerCallable>(res, this.callable);
398 }
399
400 @Override
401 public void prepare(boolean reload) throws IOException {
402 if (cancelled) return;
403
404 if (Thread.interrupted()) {
405 throw new InterruptedIOException();
406 }
407
408 callable.prepare(reload);
409 }
410
411 @Override
412 public void throwable(Throwable t, boolean retrying) {
413 callable.throwable(t, retrying);
414 }
415
416 @Override
417 public String getExceptionMessageAdditionalDetail() {
418 return callable.getExceptionMessageAdditionalDetail();
419 }
420
421 @Override
422 public long sleep(long pause, int tries) {
423 return callable.sleep(pause, tries);
424 }
425
426 @Override
427 public void cancel() {
428 cancelled = true;
429 caller.cancel();
430 if (callable.getController() != null) {
431 callable.getController().startCancel();
432 }
433 someRPCcancelled = true;
434 }
435
436 @Override
437 public boolean isCancelled() {
438 return cancelled;
439 }
440 }
441
442 @Override
443 public void prepare(boolean reload) throws IOException {
444 }
445
446 @Override
447 public void throwable(Throwable t, boolean retrying) {
448 currentScannerCallable.throwable(t, retrying);
449 }
450
451 @Override
452 public String getExceptionMessageAdditionalDetail() {
453 return currentScannerCallable.getExceptionMessageAdditionalDetail();
454 }
455
456 @Override
457 public long sleep(long pause, int tries) {
458 return currentScannerCallable.sleep(pause, tries);
459 }
460 }