1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import java.io.EOFException;
21 import java.io.IOException;
22 import java.io.SyncFailedException;
23 import java.lang.reflect.UndeclaredThrowableException;
24 import java.net.ConnectException;
25 import java.net.SocketTimeoutException;
26 import java.nio.channels.ClosedChannelException;
27 import java.util.Map.Entry;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.ConcurrentMap;
30 import java.util.concurrent.TimeoutException;
31
32 import org.apache.commons.lang.mutable.MutableBoolean;
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.hbase.DoNotRetryIOException;
37 import org.apache.hadoop.hbase.HConstants;
38 import org.apache.hadoop.hbase.ServerName;
39 import org.apache.hadoop.hbase.classification.InterfaceAudience;
40 import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
41 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
42 import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
43 import org.apache.hadoop.hbase.ipc.FailedServerException;
44 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
45 import org.apache.hadoop.ipc.RemoteException;
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74 @InterfaceAudience.Private
75 class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor {
76
77 public static final Log LOG = LogFactory
78 .getLog(PreemptiveFastFailInterceptor.class);
79
80
81
82 protected final long fastFailThresholdMilliSec;
83
84
85
86 protected final ConcurrentMap<ServerName, FailureInfo> repeatedFailuresMap =
87 new ConcurrentHashMap<ServerName, FailureInfo>();
88
89
90
91
92 protected final long failureMapCleanupIntervalMilliSec;
93
94 protected volatile long lastFailureMapCleanupTimeMilliSec;
95
96
97
98
99 private long fastFailClearingTimeMilliSec;
100
101 private final ThreadLocal<MutableBoolean> threadRetryingInFastFailMode =
102 new ThreadLocal<MutableBoolean>();
103
104 public PreemptiveFastFailInterceptor(Configuration conf) {
105 this.fastFailThresholdMilliSec = conf.getLong(
106 HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS,
107 HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS_DEFAULT);
108 this.failureMapCleanupIntervalMilliSec = conf.getLong(
109 HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS,
110 HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_DURATION_MS_DEFAULT);
111 lastFailureMapCleanupTimeMilliSec = EnvironmentEdgeManager.currentTime();
112 }
113
114 public void intercept(FastFailInterceptorContext context)
115 throws PreemptiveFastFailException {
116 context.setFailureInfo(repeatedFailuresMap.get(context.getServer()));
117 if (inFastFailMode(context.getServer()) && !currentThreadInFastFailMode()) {
118
119
120 context.setRetryDespiteFastFailMode(shouldRetryInspiteOfFastFail(context
121 .getFailureInfo()));
122 if (!context.isRetryDespiteFastFailMode()) {
123 LOG.debug("Throwing PFFE : " + context.getFailureInfo() + " tries : "
124 + context.getTries());
125 throw new PreemptiveFastFailException(
126 context.getFailureInfo().numConsecutiveFailures.get(),
127 context.getFailureInfo().timeOfFirstFailureMilliSec,
128 context.getFailureInfo().timeOfLatestAttemptMilliSec, context.getServer());
129 }
130 }
131 context.setDidTry(true);
132 }
133
134 public void handleFailure(FastFailInterceptorContext context,
135 Throwable t) throws IOException {
136 handleThrowable(t, context.getServer(),
137 context.getCouldNotCommunicateWithServer());
138 }
139
140 public void updateFailureInfo(FastFailInterceptorContext context) {
141 updateFailureInfoForServer(context.getServer(), context.getFailureInfo(),
142 context.didTry(), context.getCouldNotCommunicateWithServer()
143 .booleanValue(), context.isRetryDespiteFastFailMode());
144 }
145
146
147
148
149
150
151
152
153
154
155
156
157 private void handleFailureToServer(ServerName serverName, Throwable t) {
158 if (serverName == null || t == null) {
159 return;
160 }
161 long currentTime = EnvironmentEdgeManager.currentTime();
162 FailureInfo fInfo = repeatedFailuresMap.get(serverName);
163 if (fInfo == null) {
164 fInfo = new FailureInfo(currentTime);
165 FailureInfo oldfInfo = repeatedFailuresMap.putIfAbsent(serverName, fInfo);
166
167 if (oldfInfo != null) {
168 fInfo = oldfInfo;
169 }
170 }
171 fInfo.timeOfLatestAttemptMilliSec = currentTime;
172 fInfo.numConsecutiveFailures.incrementAndGet();
173 }
174
175 public void handleThrowable(Throwable t1, ServerName serverName,
176 MutableBoolean couldNotCommunicateWithServer) throws IOException {
177 Throwable t2 = translateException(t1);
178 boolean isLocalException = !(t2 instanceof RemoteException);
179 if ((isLocalException && isConnectionException(t2))) {
180 couldNotCommunicateWithServer.setValue(true);
181 handleFailureToServer(serverName, t2);
182 }
183 }
184
185 private Throwable translateException(Throwable t) throws IOException {
186 if (t instanceof NoSuchMethodError) {
187
188 LOG.error(t);
189 throw (NoSuchMethodError) t;
190 }
191
192 if (t instanceof NullPointerException) {
193
194 LOG.error(t.getMessage(), t);
195 throw (NullPointerException) t;
196 }
197
198 if (t instanceof UndeclaredThrowableException) {
199 t = t.getCause();
200 }
201 if (t instanceof RemoteException) {
202 t = ((RemoteException) t).unwrapRemoteException();
203 }
204 if (t instanceof DoNotRetryIOException) {
205 throw (DoNotRetryIOException) t;
206 }
207 if (t instanceof Error) {
208 throw (Error) t;
209 }
210 return t;
211 }
212
213
214
215
216
217
218
219
220 private boolean isConnectionException(Throwable e) {
221 if (e == null)
222 return false;
223
224
225
226 return (e instanceof SocketTimeoutException
227 || e instanceof ConnectException || e instanceof ClosedChannelException
228 || e instanceof SyncFailedException || e instanceof EOFException
229 || e instanceof TimeoutException
230 || e instanceof ConnectionClosingException || e instanceof FailedServerException);
231 }
232
233
234
235
236
237
238
239
240
241 protected void occasionallyCleanupFailureInformation() {
242 long now = System.currentTimeMillis();
243 if (!(now > lastFailureMapCleanupTimeMilliSec
244 + failureMapCleanupIntervalMilliSec))
245 return;
246
247
248
249
250 StringBuilder sb = new StringBuilder();
251 for (Entry<ServerName, FailureInfo> entry : repeatedFailuresMap.entrySet()) {
252 if (now > entry.getValue().timeOfLatestAttemptMilliSec
253 + failureMapCleanupIntervalMilliSec) {
254 repeatedFailuresMap.remove(entry.getKey());
255 } else if (now > entry.getValue().timeOfFirstFailureMilliSec
256 + this.fastFailClearingTimeMilliSec) {
257
258 LOG.error(entry.getKey()
259 + " been failing for a long time. clearing out."
260 + entry.getValue().toString());
261 repeatedFailuresMap.remove(entry.getKey());
262 } else {
263 sb.append(entry.getKey().toString()).append(" failing ")
264 .append(entry.getValue().toString()).append("\n");
265 }
266 }
267 if (sb.length() > 0) {
268 LOG.warn("Preemptive failure enabled for : " + sb.toString());
269 }
270 lastFailureMapCleanupTimeMilliSec = now;
271 }
272
273
274
275
276
277
278
279
280
281
282 private boolean inFastFailMode(ServerName server) {
283 FailureInfo fInfo = repeatedFailuresMap.get(server);
284
285
286
287 return (fInfo != null &&
288 EnvironmentEdgeManager.currentTime() >
289 (fInfo.timeOfFirstFailureMilliSec + this.fastFailThresholdMilliSec));
290 }
291
292
293
294
295
296
297
298 private boolean currentThreadInFastFailMode() {
299 return (this.threadRetryingInFastFailMode.get() != null && (this.threadRetryingInFastFailMode
300 .get().booleanValue() == true));
301 }
302
303
304
305
306
307
308
309
310
311
312
313
314 protected boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) {
315
316
317
318
319 if (fInfo != null
320 && fInfo.exclusivelyRetringInspiteOfFastFail.compareAndSet(false, true)) {
321 MutableBoolean threadAlreadyInFF = this.threadRetryingInFastFailMode
322 .get();
323 if (threadAlreadyInFF == null) {
324 threadAlreadyInFF = new MutableBoolean();
325 this.threadRetryingInFastFailMode.set(threadAlreadyInFF);
326 }
327 threadAlreadyInFF.setValue(true);
328 return true;
329 } else {
330 return false;
331 }
332 }
333
334
335
336
337
338
339
340
341
342
343
344 private void updateFailureInfoForServer(ServerName server,
345 FailureInfo fInfo, boolean didTry, boolean couldNotCommunicate,
346 boolean retryDespiteFastFailMode) {
347 if (server == null || fInfo == null || didTry == false)
348 return;
349
350
351
352 if (couldNotCommunicate == false) {
353 LOG.info("Clearing out PFFE for server " + server.getServerName());
354 repeatedFailuresMap.remove(server);
355 } else {
356
357 long currentTime = System.currentTimeMillis();
358 fInfo.timeOfLatestAttemptMilliSec = currentTime;
359
360
361 if (retryDespiteFastFailMode) {
362 fInfo.exclusivelyRetringInspiteOfFastFail.set(false);
363 threadRetryingInFastFailMode.get().setValue(false);
364 }
365 }
366
367 occasionallyCleanupFailureInformation();
368 }
369
370 @Override
371 public void intercept(RetryingCallerInterceptorContext context)
372 throws PreemptiveFastFailException {
373 if (context instanceof FastFailInterceptorContext) {
374 intercept((FastFailInterceptorContext) context);
375 }
376 }
377
378 @Override
379 public void handleFailure(RetryingCallerInterceptorContext context,
380 Throwable t) throws IOException {
381 if (context instanceof FastFailInterceptorContext) {
382 handleFailure((FastFailInterceptorContext) context, t);
383 }
384 }
385
386 @Override
387 public void updateFailureInfo(RetryingCallerInterceptorContext context) {
388 if (context instanceof FastFailInterceptorContext) {
389 updateFailureInfo((FastFailInterceptorContext) context);
390 }
391 }
392
393 @Override
394 public RetryingCallerInterceptorContext createEmptyContext() {
395 return new FastFailInterceptorContext();
396 }
397
398 protected boolean isServerInFailureMap(ServerName serverName) {
399 return this.repeatedFailuresMap.containsKey(serverName);
400 }
401
402 @Override
403 public String toString() {
404 return "PreemptiveFastFailInterceptor";
405 }
406 }