1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.client;
22
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.DoNotRetryIOException;
28 import org.apache.hadoop.hbase.HBaseIOException;
29 import org.apache.hadoop.hbase.HRegionLocation;
30 import org.apache.hadoop.hbase.RegionLocations;
31 import org.apache.hadoop.hbase.ServerName;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.hadoop.hbase.classification.InterfaceAudience;
34 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
35 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
36 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
37 import org.apache.hadoop.hbase.protobuf.RequestConverter;
38 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
39 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
40
41 import com.google.protobuf.ServiceException;
42
43
44 import java.io.IOException;
45 import java.io.InterruptedIOException;
46 import java.util.Collections;
47 import java.util.List;
48 import java.util.concurrent.CancellationException;
49 import java.util.concurrent.ExecutionException;
50 import java.util.concurrent.Executor;
51 import java.util.concurrent.ExecutorService;
52 import java.util.concurrent.Future;
53 import java.util.concurrent.RunnableFuture;
54 import java.util.concurrent.TimeUnit;
55 import java.util.concurrent.TimeoutException;
56
57
58
59
60
61
62
63 @InterfaceAudience.Private
64 public class RpcRetryingCallerWithReadReplicas {
65 static final Log LOG = LogFactory.getLog(RpcRetryingCallerWithReadReplicas.class);
66
67 protected final ExecutorService pool;
68 protected final ClusterConnection cConnection;
69 protected final Configuration conf;
70 protected final Get get;
71 protected final TableName tableName;
72 protected final int timeBeforeReplicas;
73 private final int callTimeout;
74 private final int retries;
75 private final RpcControllerFactory rpcControllerFactory;
76 private final RpcRetryingCallerFactory rpcRetryingCallerFactory;
77
78 public RpcRetryingCallerWithReadReplicas(
79 RpcControllerFactory rpcControllerFactory, TableName tableName,
80 ClusterConnection cConnection, final Get get,
81 ExecutorService pool, int retries, int callTimeout,
82 int timeBeforeReplicas) {
83 this.rpcControllerFactory = rpcControllerFactory;
84 this.tableName = tableName;
85 this.cConnection = cConnection;
86 this.conf = cConnection.getConfiguration();
87 this.get = get;
88 this.pool = pool;
89 this.retries = retries;
90 this.callTimeout = callTimeout;
91 this.timeBeforeReplicas = timeBeforeReplicas;
92 this.rpcRetryingCallerFactory = new RpcRetryingCallerFactory(conf);
93 }
94
95
96
97
98
99
100
101 class ReplicaRegionServerCallable extends RegionServerCallable<Result> implements Cancellable {
102 final int id;
103 private final PayloadCarryingRpcController controller;
104
105 public ReplicaRegionServerCallable(int id, HRegionLocation location) {
106 super(RpcRetryingCallerWithReadReplicas.this.cConnection,
107 RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow());
108 this.id = id;
109 this.location = location;
110 this.controller = rpcControllerFactory.newController();
111 controller.setPriority(tableName);
112 }
113
114 @Override
115 public void cancel() {
116 controller.startCancel();
117 }
118
119
120
121
122
123
124 @Override
125 public void prepare(final boolean reload) throws IOException {
126 if (controller.isCanceled()) return;
127
128 if (Thread.interrupted()) {
129 throw new InterruptedIOException();
130 }
131
132 if (reload || location == null) {
133 RegionLocations rl = getRegionLocations(false, id, cConnection, tableName, get.getRow());
134 location = id < rl.size() ? rl.getRegionLocation(id) : null;
135 }
136
137 if (location == null || location.getServerName() == null) {
138
139
140 throw new HBaseIOException("There is no location for replica id #" + id);
141 }
142
143 ServerName dest = location.getServerName();
144
145 setStub(cConnection.getClient(dest));
146 }
147
148 @Override
149 public Result call(int callTimeout) throws Exception {
150 if (controller.isCanceled()) return null;
151
152 if (Thread.interrupted()) {
153 throw new InterruptedIOException();
154 }
155
156 byte[] reg = location.getRegionInfo().getRegionName();
157
158 ClientProtos.GetRequest request =
159 RequestConverter.buildGetRequest(reg, get);
160 controller.setCallTimeout(callTimeout);
161
162 try {
163 ClientProtos.GetResponse response = getStub().get(controller, request);
164 if (response == null) {
165 return null;
166 }
167 return ProtobufUtil.toResult(response.getResult());
168 } catch (ServiceException se) {
169 throw ProtobufUtil.getRemoteException(se);
170 }
171 }
172
173 @Override
174 public boolean isCancelled() {
175 return controller.isCanceled();
176 }
177 }
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196 public synchronized Result call(int operationTimeout)
197 throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException {
198 boolean isTargetReplicaSpecified = (get.getReplicaId() >= 0);
199
200 RegionLocations rl = null;
201 boolean skipPrimary = false;
202 try {
203 rl = getRegionLocations(true,
204 (isTargetReplicaSpecified ? get.getReplicaId() : RegionReplicaUtil.DEFAULT_REPLICA_ID),
205 cConnection, tableName, get.getRow());
206 } catch (RetriesExhaustedException | DoNotRetryIOException e) {
207
208 if (isTargetReplicaSpecified) {
209 throw e;
210 } else {
211
212
213 if (cConnection instanceof ConnectionManager.HConnectionImplementation) {
214 rl = ((ConnectionManager.HConnectionImplementation)cConnection).getCachedLocation(tableName, get.getRow());
215 if (rl == null) {
216
217 throw e;
218 }
219
220
221 skipPrimary = true;
222 } else {
223
224 throw e;
225 }
226 }
227 }
228
229 final ResultBoundedCompletionService<Result> cs =
230 new ResultBoundedCompletionService<>(this.rpcRetryingCallerFactory, pool, rl.size());
231
232 int startIndex = 0;
233 int endIndex = rl.size();
234
235 if(isTargetReplicaSpecified) {
236 addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId());
237 endIndex = 1;
238 } else {
239 if (!skipPrimary) {
240 addCallsForReplica(cs, rl, 0, 0);
241 try {
242
243 Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS);
244 if (f != null) {
245 return f.get();
246 }
247 } catch (ExecutionException e) {
248
249 if (LOG.isDebugEnabled()) {
250 LOG.debug("Primary replica returns " + e.getCause());
251 }
252
253
254 startIndex = 1;
255 } catch (CancellationException e) {
256 throw new InterruptedIOException();
257 } catch (InterruptedException e) {
258 throw new InterruptedIOException();
259 }
260 }
261
262
263 addCallsForReplica(cs, rl, 1, rl.size() - 1);
264 }
265
266 try {
267 Future<Result> f = cs.pollForFirstSuccessfullyCompletedTask(operationTimeout,
268 TimeUnit.MILLISECONDS, startIndex, endIndex);
269 if (f == null) {
270 throw new RetriesExhaustedException("timed out after " + operationTimeout + " ms");
271 }
272 return f.get();
273 } catch (ExecutionException e) {
274 throwEnrichedException(e, retries);
275 } catch (CancellationException e) {
276 throw new InterruptedIOException();
277 } catch (InterruptedException e) {
278 throw new InterruptedIOException();
279 } finally {
280
281
282 cs.cancelAll();
283 }
284
285 LOG.error("Imposible? Arrive at an unreachable line...");
286 return null;
287 }
288
289
290
291
292
293 static void throwEnrichedException(ExecutionException e, int retries)
294 throws RetriesExhaustedException, DoNotRetryIOException {
295 Throwable t = e.getCause();
296 assert t != null;
297
298 if (t instanceof RetriesExhaustedException) {
299 throw (RetriesExhaustedException) t;
300 }
301
302 if (t instanceof DoNotRetryIOException) {
303 throw (DoNotRetryIOException) t;
304 }
305
306 RetriesExhaustedException.ThrowableWithExtraContext qt =
307 new RetriesExhaustedException.ThrowableWithExtraContext(t,
308 EnvironmentEdgeManager.currentTime(), null);
309
310 List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
311 Collections.singletonList(qt);
312
313 throw new RetriesExhaustedException(retries, exceptions);
314 }
315
316
317
318
319
320
321
322
323
324 private void addCallsForReplica(ResultBoundedCompletionService<Result> cs,
325 RegionLocations rl, int min, int max) {
326 for (int id = min; id <= max; id++) {
327 HRegionLocation hrl = rl.getRegionLocation(id);
328 ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl);
329 cs.submit(callOnReplica, callTimeout, id);
330 }
331 }
332
333 static RegionLocations getRegionLocations(boolean useCache, int replicaId,
334 ClusterConnection cConnection, TableName tableName, byte[] row)
335 throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException {
336
337 RegionLocations rl;
338 try {
339 if (useCache) {
340 rl = cConnection.locateRegion(tableName, row, true, true, replicaId);
341 } else {
342 rl = cConnection.relocateRegion(tableName, row, replicaId);
343 }
344 } catch (DoNotRetryIOException e) {
345 throw e;
346 } catch (RetriesExhaustedException e) {
347 throw e;
348 } catch (InterruptedIOException e) {
349 throw e;
350 } catch (IOException e) {
351 throw new RetriesExhaustedException("Can't get the location for replica " + replicaId, e);
352 }
353 if (rl == null) {
354 throw new RetriesExhaustedException("Can't get the location for replica " + replicaId);
355 }
356
357 return rl;
358 }
359 }