1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.lang.reflect.UndeclaredThrowableException;
25 import java.net.SocketTimeoutException;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.concurrent.atomic.AtomicBoolean;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.hbase.DoNotRetryIOException;
34 import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
35 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
36 import org.apache.hadoop.hbase.util.ExceptionUtil;
37 import org.apache.hadoop.ipc.RemoteException;
38
39 import com.google.protobuf.ServiceException;
40
41
42
43
44
45
46
47
48
49
50 @InterfaceAudience.Private
51 public class RpcRetryingCaller<T> {
52 public static final Log LOG = LogFactory.getLog(RpcRetryingCaller.class);
53
54
55
56 private long globalStartTime;
57
58
59
60 private final static int MIN_RPC_TIMEOUT = 2000;
61
62 private final int startLogErrorsCnt;
63
64 private final long pause;
65 private final int retries;
66 private final int rpcTimeout;
67 private final AtomicBoolean cancelled = new AtomicBoolean(false);
68 private final RetryingCallerInterceptor interceptor;
69 private final RetryingCallerInterceptorContext context;
70
71 public RpcRetryingCaller(long pause, int retries, int startLogErrorsCnt) {
72 this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt, 0);
73 }
74
75 public RpcRetryingCaller(long pause, int retries,
76 RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout) {
77 this.pause = pause;
78 this.retries = retries;
79 this.interceptor = interceptor;
80 context = interceptor.createEmptyContext();
81 this.startLogErrorsCnt = startLogErrorsCnt;
82 this.rpcTimeout = rpcTimeout;
83 }
84
85 private int getRemainingTime(int callTimeout) {
86 if (callTimeout <= 0) {
87 return 0;
88 } else {
89 if (callTimeout == Integer.MAX_VALUE) return Integer.MAX_VALUE;
90 int remainingTime = (int) (callTimeout -
91 (EnvironmentEdgeManager.currentTime() - this.globalStartTime));
92 if (remainingTime < MIN_RPC_TIMEOUT) {
93
94
95
96 remainingTime = MIN_RPC_TIMEOUT;
97 }
98 return remainingTime;
99 }
100 }
101
102 private int getTimeout(int callTimeout){
103 int timeout = getRemainingTime(callTimeout);
104 if (timeout <= 0 || rpcTimeout > 0 && rpcTimeout < timeout){
105 timeout = rpcTimeout;
106 }
107 return timeout;
108 }
109
110 public void cancel(){
111 synchronized (cancelled){
112 cancelled.set(true);
113 cancelled.notifyAll();
114 }
115 }
116
117
118
119
120
121
122
123
124
125 public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
126 throws IOException, RuntimeException {
127 List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
128 new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
129 this.globalStartTime = EnvironmentEdgeManager.currentTime();
130 context.clear();
131 for (int tries = 0;; tries++) {
132 long expectedSleep;
133 try {
134 callable.prepare(tries != 0);
135 interceptor.intercept(context.prepare(callable, tries));
136 return callable.call(getTimeout(callTimeout));
137 } catch (PreemptiveFastFailException e) {
138 throw e;
139 } catch (Throwable t) {
140 ExceptionUtil.rethrowIfInterrupt(t);
141
142
143 interceptor.handleFailure(context, t);
144 t = translateException(t);
145 if (tries > startLogErrorsCnt) {
146 LOG.info("Call exception, tries=" + tries + ", retries=" + retries + ", started=" +
147 (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + " ms ago, "
148 + "cancelled=" + cancelled.get() + ", msg="
149 + t.getMessage() + " " + callable.getExceptionMessageAdditionalDetail());
150 }
151
152 callable.throwable(t, retries != 1);
153 RetriesExhaustedException.ThrowableWithExtraContext qt =
154 new RetriesExhaustedException.ThrowableWithExtraContext(t,
155 EnvironmentEdgeManager.currentTime(), toString());
156 exceptions.add(qt);
157 if (tries >= retries - 1) {
158 throw new RetriesExhaustedException(tries, exceptions);
159 }
160
161
162
163 expectedSleep = callable.sleep(pause, tries);
164
165
166 long duration = singleCallDuration(expectedSleep);
167 if (duration > callTimeout) {
168 String msg = "callTimeout=" + callTimeout + ", callDuration=" + duration +
169 ": " + t.getMessage() + " " + callable.getExceptionMessageAdditionalDetail();
170 throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t));
171 }
172 } finally {
173 interceptor.updateFailureInfo(context);
174 }
175 try {
176 if (expectedSleep > 0) {
177 synchronized (cancelled) {
178 if (cancelled.get()) return null;
179 cancelled.wait(expectedSleep);
180 }
181 }
182 if (cancelled.get()) return null;
183 } catch (InterruptedException e) {
184 throw new InterruptedIOException("Interrupted after " + tries + " tries on " + retries);
185 }
186 }
187 }
188
189
190
191
192 private long singleCallDuration(final long expectedSleep) {
193 return (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + expectedSleep;
194 }
195
196
197
198
199
200
201
202
203
204
205 public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
206 throws IOException, RuntimeException {
207
208 this.globalStartTime = EnvironmentEdgeManager.currentTime();
209 try {
210 callable.prepare(false);
211 return callable.call(callTimeout);
212 } catch (Throwable t) {
213 Throwable t2 = translateException(t);
214 ExceptionUtil.rethrowIfInterrupt(t2);
215
216 if (t2 instanceof IOException) {
217 throw (IOException)t2;
218 } else {
219 throw new RuntimeException(t2);
220 }
221 }
222 }
223
224
225
226
227
228
229
230 static Throwable translateException(Throwable t) throws DoNotRetryIOException {
231 if (t instanceof UndeclaredThrowableException) {
232 if (t.getCause() != null) {
233 t = t.getCause();
234 }
235 }
236 if (t instanceof RemoteException) {
237 t = ((RemoteException)t).unwrapRemoteException();
238 }
239 if (t instanceof LinkageError) {
240 throw new DoNotRetryIOException(t);
241 }
242 if (t instanceof ServiceException) {
243 ServiceException se = (ServiceException)t;
244 Throwable cause = se.getCause();
245 if (cause != null && cause instanceof DoNotRetryIOException) {
246 throw (DoNotRetryIOException)cause;
247 }
248
249 t = cause;
250
251 translateException(t);
252 } else if (t instanceof DoNotRetryIOException) {
253 throw (DoNotRetryIOException)t;
254 }
255 return t;
256 }
257
258 @Override
259 public String toString() {
260 return "RpcRetryingCaller{" + "globalStartTime=" + globalStartTime +
261 ", pause=" + pause + ", retries=" + retries + '}';
262 }
263 }