View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.client;
20  
21  import static org.apache.hadoop.hbase.client.ClientScanner.createClosestRowBefore;
22  
23  import java.io.IOException;
24  import java.io.InterruptedIOException;
25  import java.util.HashSet;
26  import java.util.Set;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.CancellationException;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.TimeoutException;
34  import java.util.concurrent.atomic.AtomicBoolean;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.hbase.classification.InterfaceAudience;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.DoNotRetryIOException;
41  import org.apache.hadoop.hbase.HRegionInfo;
42  import org.apache.hadoop.hbase.RegionLocations;
43  import org.apache.hadoop.hbase.TableName;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.hadoop.hbase.util.Pair;
46  
47  import com.google.common.annotations.VisibleForTesting;
48  
49  /**
50   * This class has the logic for handling scanners for regions with and without replicas.
51   * 1. A scan is attempted on the default (primary) region
52   * 2. The scanner sends all the RPCs to the default region until it is done, or, there
53   * is a timeout on the default (a timeout of zero is disallowed).
54   * 3. If there is a timeout in (2) above, scanner(s) is opened on the non-default replica(s)
55   * 4. The results from the first successful scanner are taken, and it is stored which server
56   * returned the results.
57   * 5. The next RPCs are done on the above stored server until it is done or there is a timeout,
58   * in which case, the other replicas are queried (as in (3) above).
59   *
60   */
61  @InterfaceAudience.Private
62  class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
63    private final Log LOG = LogFactory.getLog(this.getClass());
64    volatile ScannerCallable currentScannerCallable;
65    AtomicBoolean replicaSwitched = new AtomicBoolean(false);
66    final ClusterConnection cConnection;
67    protected final ExecutorService pool;
68    protected final int timeBeforeReplicas;
69    private final Scan scan;
70    private final int retries;
71    private Result lastResult;
72    private final RpcRetryingCaller<Result[]> caller;
73    private final TableName tableName;
74    private Configuration conf;
75    private int scannerTimeout;
76    private Set<ScannerCallable> outstandingCallables = new HashSet<ScannerCallable>();
77    private boolean someRPCcancelled = false; //required for testing purposes only
78  
79    public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection,
80        ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan,
81        int retries, int scannerTimeout, int caching, Configuration conf,
82        RpcRetryingCaller<Result []> caller) {
83      this.currentScannerCallable = baseCallable;
84      this.cConnection = cConnection;
85      this.pool = pool;
86      if (timeBeforeReplicas < 0) {
87        throw new IllegalArgumentException("Invalid value of operation timeout on the primary");
88      }
89      this.timeBeforeReplicas = timeBeforeReplicas;
90      this.scan = scan;
91      this.retries = retries;
92      this.tableName = tableName;
93      this.conf = conf;
94      this.scannerTimeout = scannerTimeout;
95      this.caller = caller;
96    }
97  
98    public void setClose() {
99      currentScannerCallable.setClose();
100   }
101 
102   public void setRenew(boolean val) {
103     currentScannerCallable.setRenew(val);
104   }
105 
106   public void setCaching(int caching) {
107     currentScannerCallable.setCaching(caching);
108   }
109 
110   public int getCaching() {
111     return currentScannerCallable.getCaching();
112   }
113 
114   public HRegionInfo getHRegionInfo() {
115     return currentScannerCallable.getHRegionInfo();
116   }
117 
118   public boolean getServerHasMoreResults() {
119     return currentScannerCallable.getServerHasMoreResults();
120   }
121 
122   public void setServerHasMoreResults(boolean serverHasMoreResults) {
123     currentScannerCallable.setServerHasMoreResults(serverHasMoreResults);
124   }
125 
126   public boolean hasMoreResultsContext() {
127     return currentScannerCallable.hasMoreResultsContext();
128   }
129 
130   public void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
131     currentScannerCallable.setHasMoreResultsContext(serverHasMoreResultsContext);
132   }
133 
134   @Override
135   public Result [] call(int timeout) throws IOException {
136     // If the active replica callable was closed somewhere, invoke the RPC to
137     // really close it. In the case of regular scanners, this applies. We make couple
138     // of RPCs to a RegionServer, and when that region is exhausted, we set
139     // the closed flag. Then an RPC is required to actually close the scanner.
140     if (currentScannerCallable != null && currentScannerCallable.closed) {
141       // For closing we target that exact scanner (and not do replica fallback like in
142       // the case of normal reads)
143       if (LOG.isTraceEnabled()) {
144         LOG.trace("Closing scanner id=" + currentScannerCallable.scannerId);
145       }
146       Result[] r = currentScannerCallable.call(timeout);
147       currentScannerCallable = null;
148       return r;
149     }
150     // We need to do the following:
151     //1. When a scan goes out to a certain replica (default or not), we need to
152     //   continue to hit that until there is a failure. So store the last successfully invoked
153     //   replica
154     //2. We should close the "losing" scanners (scanners other than the ones we hear back
155     //   from first)
156     //
157     RegionLocations rl = null;
158     try {
159       rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
160           RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
161           currentScannerCallable.getRow());
162     } catch (RetriesExhaustedException | DoNotRetryIOException e) {
163       // We cannot get the primary replica region location, it is possible that the region server
164       // hosting meta table is down, it needs to proceed to try cached replicas directly.
165       if (cConnection instanceof ConnectionManager.HConnectionImplementation) {
166         rl = ((ConnectionManager.HConnectionImplementation) cConnection)
167             .getCachedLocation(tableName, currentScannerCallable.getRow());
168         if (rl == null) {
169           throw e;
170         }
171       } else {
172         // For completeness
173         throw e;
174       }
175     }
176     // allocate a boundedcompletion pool of some multiple of number of replicas.
177     // We want to accomodate some RPCs for redundant replica scans (but are still in progress)
178     ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
179         new ResultBoundedCompletionService<Pair<Result[], ScannerCallable>>(
180             RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf), pool,
181             rl.size() * 5);
182 
183     AtomicBoolean done = new AtomicBoolean(false);
184     replicaSwitched.set(false);
185     // submit call for the primary replica.
186     addCallsForCurrentReplica(cs, rl);
187     int startIndex = 0;
188 
189     try {
190       // wait for the timeout to see whether the primary responds back
191       Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeBeforeReplicas,
192           TimeUnit.MICROSECONDS); // Yes, microseconds
193       if (f != null) {
194         // After poll, if f is not null, there must be a completed task
195         Pair<Result[], ScannerCallable> r = f.get();
196         if (r != null && r.getSecond() != null) {
197           updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
198         }
199         return r == null ? null : r.getFirst(); //great we got a response
200       }
201     } catch (ExecutionException e) {
202       // We ignore the ExecutionException and continue with the replicas
203       if (LOG.isDebugEnabled()) {
204         LOG.debug("Scan with primary region returns " + e.getCause());
205       }
206 
207       // If rl's size is 1 or scan's consitency is strong, it needs to throw
208       // out the exception from the primary replica
209       if ((rl.size() == 1) || (scan.getConsistency() == Consistency.STRONG)) {
210         // Rethrow the first exception
211         RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
212       }
213 
214       startIndex = 1;
215     } catch (CancellationException e) {
216       throw new InterruptedIOException(e.getMessage());
217     } catch (InterruptedException e) {
218       throw new InterruptedIOException(e.getMessage());
219     }
220 
221     // submit call for the all of the secondaries at once
222     int endIndex = rl.size();
223     if (scan.getConsistency() == Consistency.STRONG) {
224       // When scan's consistency is strong, do not send to the secondaries
225       endIndex = 1;
226     } else {
227       // TODO: this may be an overkill for large region replication
228       addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
229     }
230 
231     try {
232       Future<Pair<Result[], ScannerCallable>> f = cs.pollForFirstSuccessfullyCompletedTask(timeout,
233           TimeUnit.MILLISECONDS, startIndex, endIndex);
234 
235       if (f == null) {
236         throw new IOException("Failed to get result within timeout, timeout=" + timeout + "ms");
237       }
238       Pair<Result[], ScannerCallable> r = f.get();
239 
240       if (r != null && r.getSecond() != null) {
241         updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
242       }
243       return r == null ? null : r.getFirst(); // great we got an answer
244 
245     } catch (ExecutionException e) {
246       RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
247     } catch (CancellationException e) {
248       throw new InterruptedIOException(e.getMessage());
249     } catch (InterruptedException e) {
250       throw new InterruptedIOException(e.getMessage());
251     } finally {
252       // We get there because we were interrupted or because one or more of the
253       // calls succeeded or failed. In all case, we stop all our tasks.
254       cs.cancelAll();
255     }
256     LOG.error("Imposible? Arrive at an unreachable line..."); // unreachable
257     throw new IOException("Imposible? Arrive at an unreachable line...");
258   }
259 
260   private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result,
261       AtomicBoolean done, ExecutorService pool) {
262     if (done.compareAndSet(false, true)) {
263       if (currentScannerCallable != scanner) replicaSwitched.set(true);
264       currentScannerCallable = scanner;
265       // store where to start the replica scanner from if we need to.
266       if (result != null && result.length != 0) this.lastResult = result[result.length - 1];
267       if (LOG.isTraceEnabled()) {
268         LOG.trace("Setting current scanner as id=" + currentScannerCallable.scannerId +
269             " associated with replica=" + currentScannerCallable.getHRegionInfo().getReplicaId());
270       }
271       // close all outstanding replica scanners but the one we heard back from
272       outstandingCallables.remove(scanner);
273       for (ScannerCallable s : outstandingCallables) {
274         if (LOG.isTraceEnabled()) {
275           LOG.trace("Closing scanner id=" + s.scannerId +
276             ", replica=" + s.getHRegionInfo().getRegionId() +
277             " because slow and replica=" +
278             this.currentScannerCallable.getHRegionInfo().getReplicaId() + " succeeded");
279         }
280         // Submit the "close" to the pool since this might take time, and we don't
281         // want to wait for the "close" to happen yet. The "wait" will happen when
282         // the table is closed (when the awaitTermination of the underlying pool is called)
283         s.setClose();
284         final RetryingRPC r = new RetryingRPC(s);
285         pool.submit(new Callable<Void>(){
286           @Override
287           public Void call() throws Exception {
288             r.call(scannerTimeout);
289             return null;
290           }
291         });
292       }
293       // now clear outstandingCallables since we scheduled a close for all the contained scanners
294       outstandingCallables.clear();
295     }
296   }
297 
298   /**
299    * When a scanner switches in the middle of scanning (the 'next' call fails
300    * for example), the upper layer {@link ClientScanner} needs to know
301    * @return
302    */
303   public boolean switchedToADifferentReplica() {
304     return replicaSwitched.get();
305   }
306 
307   /**
308    * @return true when the most recent RPC response indicated that the response was a heartbeat
309    *         message. Heartbeat messages are sent back from the server when the processing of the
310    *         scan request exceeds a certain time threshold. Heartbeats allow the server to avoid
311    *         timeouts during long running scan operations.
312    */
313   public boolean isHeartbeatMessage() {
314     return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage();
315   }
316 
317   private void addCallsForCurrentReplica(
318       ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) {
319     RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
320     outstandingCallables.add(currentScannerCallable);
321     cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id);
322   }
323 
324   private void addCallsForOtherReplicas(
325       ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl,
326       int min, int max) {
327 
328     for (int id = min; id <= max; id++) {
329       if (currentScannerCallable.id == id) {
330         continue; //this was already scheduled earlier
331       }
332       ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id);
333       setStartRowForReplicaCallable(s);
334       outstandingCallables.add(s);
335       RetryingRPC retryingOnReplica = new RetryingRPC(s);
336       cs.submit(retryingOnReplica, scannerTimeout, id);
337     }
338   }
339 
340   /**
341    * Set the start row for the replica callable based on the state of the last result received.
342    * @param callable The callable to set the start row on
343    */
344   private void setStartRowForReplicaCallable(ScannerCallable callable) {
345     if (this.lastResult == null || callable == null) return;
346 
347     if (this.lastResult.isPartial()) {
348       // The last result was a partial result which means we have not received all of the cells
349       // for this row. Thus, use the last result's row as the start row. If a replica switch
350       // occurs, the scanner will ensure that any accumulated partial results are cleared,
351       // and the scan can resume from this row.
352       callable.getScan().setStartRow(this.lastResult.getRow());
353     } else {
354       // The last result was not a partial result which means it contained all of the cells for
355       // that row (we no longer need any information from it). Set the start row to the next
356       // closest row that could be seen.
357       if (callable.getScan().isReversed()) {
358         callable.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow()));
359       } else {
360         callable.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1]));
361       }
362     }
363   }
364 
365   @VisibleForTesting
366   boolean isAnyRPCcancelled() {
367     return someRPCcancelled;
368   }
369 
370   class RetryingRPC implements RetryingCallable<Pair<Result[], ScannerCallable>>, Cancellable {
371     final ScannerCallable callable;
372     RpcRetryingCaller<Result[]> caller;
373     private volatile boolean cancelled = false;
374 
375     RetryingRPC(ScannerCallable callable) {
376       this.callable = callable;
377       // For the Consistency.STRONG (default case), we reuse the caller
378       // to keep compatibility with what is done in the past
379       // For the Consistency.TIMELINE case, we can't reuse the caller
380       // since we could be making parallel RPCs (caller.callWithRetries is synchronized
381       // and we can't invoke it multiple times at the same time)
382       this.caller = ScannerCallableWithReplicas.this.caller;
383       if (scan.getConsistency() == Consistency.TIMELINE) {
384         this.caller = RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf)
385             .<Result[]>newCaller();
386       }
387     }
388 
389     @Override
390     public Pair<Result[], ScannerCallable> call(int callTimeout) throws IOException {
391       // since the retries is done within the ResultBoundedCompletionService,
392       // we don't invoke callWithRetries here
393       if (cancelled) {
394         return null;
395       }
396       Result[] res = this.caller.callWithoutRetries(this.callable, callTimeout);
397       return new Pair<Result[], ScannerCallable>(res, this.callable);
398     }
399 
400     @Override
401     public void prepare(boolean reload) throws IOException {
402       if (cancelled) return;
403 
404       if (Thread.interrupted()) {
405         throw new InterruptedIOException();
406       }
407 
408       callable.prepare(reload);
409     }
410 
411     @Override
412     public void throwable(Throwable t, boolean retrying) {
413       callable.throwable(t, retrying);
414     }
415 
416     @Override
417     public String getExceptionMessageAdditionalDetail() {
418       return callable.getExceptionMessageAdditionalDetail();
419     }
420 
421     @Override
422     public long sleep(long pause, int tries) {
423       return callable.sleep(pause, tries);
424     }
425 
426     @Override
427     public void cancel() {
428       cancelled = true;
429       caller.cancel();
430       if (callable.getController() != null) {
431         callable.getController().startCancel();
432       }
433       someRPCcancelled = true;
434     }
435 
436     @Override
437     public boolean isCancelled() {
438       return cancelled;
439     }
440   }
441 
442   @Override
443   public void prepare(boolean reload) throws IOException {
444   }
445 
446   @Override
447   public void throwable(Throwable t, boolean retrying) {
448     currentScannerCallable.throwable(t, retrying);
449   }
450 
451   @Override
452   public String getExceptionMessageAdditionalDetail() {
453     return currentScannerCallable.getExceptionMessageAdditionalDetail();
454   }
455 
456   @Override
457   public long sleep(long pause, int tries) {
458     return currentScannerCallable.sleep(pause, tries);
459   }
460 }