View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  
21  package org.apache.hadoop.hbase.client;
22  
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.hbase.DoNotRetryIOException;
28  import org.apache.hadoop.hbase.HBaseIOException;
29  import org.apache.hadoop.hbase.HRegionLocation;
30  import org.apache.hadoop.hbase.RegionLocations;
31  import org.apache.hadoop.hbase.ServerName;
32  import org.apache.hadoop.hbase.TableName;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
35  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
36  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
37  import org.apache.hadoop.hbase.protobuf.RequestConverter;
38  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
39  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
40  
41  import com.google.protobuf.ServiceException;
42  
43  
44  import java.io.IOException;
45  import java.io.InterruptedIOException;
46  import java.util.Collections;
47  import java.util.List;
48  import java.util.concurrent.CancellationException;
49  import java.util.concurrent.ExecutionException;
50  import java.util.concurrent.Executor;
51  import java.util.concurrent.ExecutorService;
52  import java.util.concurrent.Future;
53  import java.util.concurrent.RunnableFuture;
54  import java.util.concurrent.TimeUnit;
55  import java.util.concurrent.TimeoutException;
56  
57  /**
58   * Caller that goes to replica if the primary region does no answer within a configurable
59   * timeout. If the timeout is reached, it calls all the secondary replicas, and returns
60   * the first answer. If the answer comes from one of the secondary replica, it will
61   * be marked as stale.
62   */
63  @InterfaceAudience.Private
64  public class RpcRetryingCallerWithReadReplicas {
65    static final Log LOG = LogFactory.getLog(RpcRetryingCallerWithReadReplicas.class);
66  
67    protected final ExecutorService pool;
68    protected final ClusterConnection cConnection;
69    protected final Configuration conf;
70    protected final Get get;
71    protected final TableName tableName;
72    protected final int timeBeforeReplicas;
73    private final int callTimeout;
74    private final int retries;
75    private final RpcControllerFactory rpcControllerFactory;
76    private final RpcRetryingCallerFactory rpcRetryingCallerFactory;
77  
78    public RpcRetryingCallerWithReadReplicas(
79        RpcControllerFactory rpcControllerFactory, TableName tableName,
80        ClusterConnection cConnection, final Get get,
81        ExecutorService pool, int retries, int callTimeout,
82        int timeBeforeReplicas) {
83      this.rpcControllerFactory = rpcControllerFactory;
84      this.tableName = tableName;
85      this.cConnection = cConnection;
86      this.conf = cConnection.getConfiguration();
87      this.get = get;
88      this.pool = pool;
89      this.retries = retries;
90      this.callTimeout = callTimeout;
91      this.timeBeforeReplicas = timeBeforeReplicas;
92      this.rpcRetryingCallerFactory = new RpcRetryingCallerFactory(conf);
93    }
94  
95    /**
96     * A RegionServerCallable that takes into account the replicas, i.e.
97     * - the call can be on any replica
98     * - we need to stop retrying when the call is completed
99     * - we can be interrupted
100    */
101   class ReplicaRegionServerCallable extends RegionServerCallable<Result> implements Cancellable {
102     final int id;
103     private final PayloadCarryingRpcController controller;
104 
105     public ReplicaRegionServerCallable(int id, HRegionLocation location) {
106       super(RpcRetryingCallerWithReadReplicas.this.cConnection,
107           RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow());
108       this.id = id;
109       this.location = location;
110       this.controller = rpcControllerFactory.newController();
111       controller.setPriority(tableName);
112     }
113 
114     @Override
115     public void cancel() {
116       controller.startCancel();
117     }
118 
119     /**
120      * Two responsibilities
121      * - if the call is already completed (by another replica) stops the retries.
122      * - set the location to the right region, depending on the replica.
123      */
124     @Override
125     public void prepare(final boolean reload) throws IOException {
126       if (controller.isCanceled()) return;
127 
128       if (Thread.interrupted()) {
129         throw new InterruptedIOException();
130       }
131 
132       if (reload || location == null) {
133         RegionLocations rl = getRegionLocations(false, id, cConnection, tableName, get.getRow());
134         location = id < rl.size() ? rl.getRegionLocation(id) : null;
135       }
136 
137       if (location == null || location.getServerName() == null) {
138         // With this exception, there will be a retry. The location can be null for a replica
139         //  when the table is created or after a split.
140         throw new HBaseIOException("There is no location for replica id #" + id);
141       }
142 
143       ServerName dest = location.getServerName();
144 
145       setStub(cConnection.getClient(dest));
146     }
147 
148     @Override
149     public Result call(int callTimeout) throws Exception {
150       if (controller.isCanceled()) return null;
151 
152       if (Thread.interrupted()) {
153         throw new InterruptedIOException();
154       }
155 
156       byte[] reg = location.getRegionInfo().getRegionName();
157 
158       ClientProtos.GetRequest request =
159           RequestConverter.buildGetRequest(reg, get);
160       controller.setCallTimeout(callTimeout);
161 
162       try {
163         ClientProtos.GetResponse response = getStub().get(controller, request);
164         if (response == null) {
165           return null;
166         }
167         return ProtobufUtil.toResult(response.getResult());
168       } catch (ServiceException se) {
169         throw ProtobufUtil.getRemoteException(se);
170       }
171     }
172 
173     @Override
174     public boolean isCancelled() {
175       return controller.isCanceled();
176     }
177   }
178 
179   /**
180    * Algo:
181    * - we put the query into the execution pool.
182    * - after x ms, if we don't have a result, we add the queries for the secondary replicas
183    * - we take the first answer
184    * - when done, we cancel what's left. Cancelling means:
185    * - removing from the pool if the actual call was not started
186    * - interrupting the call if it has started
187    * Client side, we need to take into account
188    * - a call is not executed immediately after being put into the pool
189    * - a call is a thread. Let's not multiply the number of thread by the number of replicas.
190    * Server side, if we can cancel when it's still in the handler pool, it's much better, as a call
191    * can take some i/o.
192    * <p/>
193    * Globally, the number of retries, timeout and so on still applies, but it's per replica,
194    * not global. We continue until all retries are done, or all timeouts are exceeded.
195    */
196   public synchronized Result call(int operationTimeout)
197       throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException {
198     boolean isTargetReplicaSpecified = (get.getReplicaId() >= 0);
199 
200     RegionLocations rl = null;
201     boolean skipPrimary = false;
202     try {
203       rl = getRegionLocations(true,
204         (isTargetReplicaSpecified ? get.getReplicaId() : RegionReplicaUtil.DEFAULT_REPLICA_ID),
205         cConnection, tableName, get.getRow());
206     } catch (RetriesExhaustedException | DoNotRetryIOException e) {
207       // When there is no specific replica id specified. It just needs to load all replicas.
208       if (isTargetReplicaSpecified) {
209         throw e;
210       } else {
211         // We cannot get the primary replica location, it is possible that the region
212         // server hosting meta is down, it needs to proceed to try cached replicas.
213         if (cConnection instanceof ConnectionManager.HConnectionImplementation) {
214           rl = ((ConnectionManager.HConnectionImplementation)cConnection).getCachedLocation(tableName, get.getRow());
215           if (rl == null) {
216             // No cached locations
217             throw e;
218           }
219 
220           // Primary replica location is not known, skip primary replica
221           skipPrimary = true;
222         } else {
223           // For completeness
224           throw e;
225         }
226       }
227     }
228 
229     final ResultBoundedCompletionService<Result> cs =
230         new ResultBoundedCompletionService<>(this.rpcRetryingCallerFactory, pool, rl.size());
231 
232     int startIndex = 0;
233     int endIndex = rl.size();
234 
235     if(isTargetReplicaSpecified) {
236       addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId());
237       endIndex = 1;
238     } else {
239       if (!skipPrimary) {
240         addCallsForReplica(cs, rl, 0, 0);
241         try {
242           // wait for the timeout to see whether the primary responds back
243           Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds
244           if (f != null) {
245             return f.get(); //great we got a response
246           }
247         } catch (ExecutionException e) {
248           // We ignore the ExecutionException and continue with the secondary replicas
249           if (LOG.isDebugEnabled()) {
250             LOG.debug("Primary replica returns " + e.getCause());
251           }
252 
253           // Skip the result from the primary as we know that there is something wrong
254           startIndex = 1;
255         } catch (CancellationException e) {
256           throw new InterruptedIOException();
257         } catch (InterruptedException e) {
258           throw new InterruptedIOException();
259         }
260       }
261 
262       // submit call for the all of the secondaries at once
263       addCallsForReplica(cs, rl, 1, rl.size() - 1);
264     }
265 
266     try {
267       Future<Result> f = cs.pollForFirstSuccessfullyCompletedTask(operationTimeout,
268           TimeUnit.MILLISECONDS, startIndex, endIndex);
269       if (f == null) {
270         throw new RetriesExhaustedException("timed out after " + operationTimeout + " ms");
271       }
272       return f.get();
273     } catch (ExecutionException e) {
274       throwEnrichedException(e, retries);
275     } catch (CancellationException e) {
276       throw new InterruptedIOException();
277     } catch (InterruptedException e) {
278       throw new InterruptedIOException();
279     } finally {
280       // We get there because we were interrupted or because one or more of the
281       // calls succeeded or failed. In all case, we stop all our tasks.
282       cs.cancelAll();
283     }
284 
285     LOG.error("Imposible? Arrive at an unreachable line..."); // unreachable
286     return null; // unreachable
287   }
288 
289   /**
290    * Extract the real exception from the ExecutionException, and throws what makes more
291    * sense.
292    */
293   static void throwEnrichedException(ExecutionException e, int retries)
294       throws RetriesExhaustedException, DoNotRetryIOException {
295     Throwable t = e.getCause();
296     assert t != null; // That's what ExecutionException is about: holding an exception
297 
298     if (t instanceof RetriesExhaustedException) {
299       throw (RetriesExhaustedException) t;
300     }
301 
302     if (t instanceof DoNotRetryIOException) {
303       throw (DoNotRetryIOException) t;
304     }
305 
306     RetriesExhaustedException.ThrowableWithExtraContext qt =
307         new RetriesExhaustedException.ThrowableWithExtraContext(t,
308             EnvironmentEdgeManager.currentTime(), null);
309 
310     List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
311         Collections.singletonList(qt);
312 
313     throw new RetriesExhaustedException(retries, exceptions);
314   }
315 
316   /**
317    * Creates the calls and submit them
318    *
319    * @param cs  - the completion service to use for submitting
320    * @param rl  - the region locations
321    * @param min - the id of the first replica, inclusive
322    * @param max - the id of the last replica, inclusive.
323    */
324   private void addCallsForReplica(ResultBoundedCompletionService<Result> cs,
325                                  RegionLocations rl, int min, int max) {
326     for (int id = min; id <= max; id++) {
327       HRegionLocation hrl = rl.getRegionLocation(id);
328       ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl);
329       cs.submit(callOnReplica, callTimeout, id);
330     }
331   }
332 
333   static RegionLocations getRegionLocations(boolean useCache, int replicaId,
334                  ClusterConnection cConnection, TableName tableName, byte[] row)
335       throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException {
336 
337     RegionLocations rl;
338     try {
339       if (useCache) {
340         rl = cConnection.locateRegion(tableName, row, true, true, replicaId);
341       } else {
342         rl = cConnection.relocateRegion(tableName, row, replicaId);
343       }
344     } catch (DoNotRetryIOException e) {
345       throw e;
346     } catch (RetriesExhaustedException e) {
347       throw e;
348     } catch (InterruptedIOException e) {
349       throw e;
350     } catch (IOException e) {
351       throw new RetriesExhaustedException("Can't get the location for replica " + replicaId, e);
352     }
353     if (rl == null) {
354       throw new RetriesExhaustedException("Can't get the location for replica " + replicaId);
355     }
356 
357     return rl;
358   }
359 }