View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.client;
19  
20  import java.io.EOFException;
21  import java.io.IOException;
22  import java.io.SyncFailedException;
23  import java.lang.reflect.UndeclaredThrowableException;
24  import java.net.ConnectException;
25  import java.net.SocketTimeoutException;
26  import java.nio.channels.ClosedChannelException;
27  import java.util.Map.Entry;
28  import java.util.concurrent.ConcurrentHashMap;
29  import java.util.concurrent.ConcurrentMap;
30  import java.util.concurrent.TimeoutException;
31  
32  import org.apache.commons.lang.mutable.MutableBoolean;
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.DoNotRetryIOException;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.ServerName;
39  import org.apache.hadoop.hbase.classification.InterfaceAudience;
40  import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
41  import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
42  import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
43  import org.apache.hadoop.hbase.ipc.FailedServerException;
44  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
45  import org.apache.hadoop.ipc.RemoteException;
46  
47  /**
48   * 
49   * The concrete {@link RetryingCallerInterceptor} class that implements the preemptive fast fail
50   * feature.
51   * 
52   * The motivation is as follows : 
53   * In case where a large number of clients try and talk to a particular region server in hbase, if
54   * the region server goes down due to network problems, we might end up in a scenario where
55   * the clients would go into a state where they all start to retry.
56   * This behavior will set off many of the threads in pretty much the same path and they all would be
57   * sleeping giving rise to a state where the client either needs to create more threads to send new
58   * requests to other hbase machines or block because the client cannot create anymore threads.
59   * 
60   * In most cases the clients might prefer to have a bound on the number of threads that are created
61   * in order to send requests to hbase. This would mostly result in the client thread starvation.
62   * 
63   *  To circumvent this problem, the approach that is being taken here under is to let 1 of the many
64   *  threads who are trying to contact the regionserver with connection problems and let the other
65   *  threads get a {@link PreemptiveFastFailException} so that they can move on and take other
66   *  requests.
67   *  
68   *  This would give the client more flexibility on the kind of action he would want to take in cases
69   *  where the regionserver is down. He can either discard the requests and send a nack upstream
70   *  faster or have an application level retry or buffer the requests up so as to send them down to
71   *  hbase later.
72   *
73   */
74  @InterfaceAudience.Private
75  class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor {
76  
77    public static final Log LOG = LogFactory
78        .getLog(PreemptiveFastFailInterceptor.class);
79  
80    // amount of time to wait before we consider a server to be in fast fail
81    // mode
82    protected final long fastFailThresholdMilliSec;
83  
84    // Keeps track of failures when we cannot talk to a server. Helps in
85    // fast failing clients if the server is down for a long time.
86    protected final ConcurrentMap<ServerName, FailureInfo> repeatedFailuresMap =
87        new ConcurrentHashMap<ServerName, FailureInfo>();
88  
89    // We populate repeatedFailuresMap every time there is a failure. So, to
90    // keep it from growing unbounded, we garbage collect the failure information
91    // every cleanupInterval.
92    protected final long failureMapCleanupIntervalMilliSec;
93  
94    protected volatile long lastFailureMapCleanupTimeMilliSec;
95  
96    // clear failure Info. Used to clean out all entries.
97    // A safety valve, in case the client does not exit the
98    // fast fail mode for any reason.
99    private long fastFailClearingTimeMilliSec;
100 
101   private final ThreadLocal<MutableBoolean> threadRetryingInFastFailMode =
102       new ThreadLocal<MutableBoolean>();
103 
104   public PreemptiveFastFailInterceptor(Configuration conf) {
105     this.fastFailThresholdMilliSec = conf.getLong(
106         HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS,
107         HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS_DEFAULT);
108     this.failureMapCleanupIntervalMilliSec = conf.getLong(
109         HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS,
110         HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_DURATION_MS_DEFAULT);
111     lastFailureMapCleanupTimeMilliSec = EnvironmentEdgeManager.currentTime();
112   }
113 
114   public void intercept(FastFailInterceptorContext context)
115       throws PreemptiveFastFailException {
116     context.setFailureInfo(repeatedFailuresMap.get(context.getServer()));
117     if (inFastFailMode(context.getServer()) && !currentThreadInFastFailMode()) {
118       // In Fast-fail mode, all but one thread will fast fail. Check
119       // if we are that one chosen thread.
120       context.setRetryDespiteFastFailMode(shouldRetryInspiteOfFastFail(context
121           .getFailureInfo()));
122       if (!context.isRetryDespiteFastFailMode()) { // we don't have to retry
123         LOG.debug("Throwing PFFE : " + context.getFailureInfo() + " tries : "
124             + context.getTries());
125         throw new PreemptiveFastFailException(
126             context.getFailureInfo().numConsecutiveFailures.get(),
127             context.getFailureInfo().timeOfFirstFailureMilliSec,
128             context.getFailureInfo().timeOfLatestAttemptMilliSec, context.getServer());
129       }
130     }
131     context.setDidTry(true);
132   }
133 
134   public void handleFailure(FastFailInterceptorContext context,
135       Throwable t) throws IOException {
136     handleThrowable(t, context.getServer(),
137         context.getCouldNotCommunicateWithServer());
138   }
139 
140   public void updateFailureInfo(FastFailInterceptorContext context) {
141     updateFailureInfoForServer(context.getServer(), context.getFailureInfo(),
142         context.didTry(), context.getCouldNotCommunicateWithServer()
143             .booleanValue(), context.isRetryDespiteFastFailMode());
144   }
145 
146   /**
147    * Handles failures encountered when communicating with a server.
148    *
149    * Updates the FailureInfo in repeatedFailuresMap to reflect the failure.
150    * Throws RepeatedConnectException if the client is in Fast fail mode.
151    *
152    * @param serverName
153    * @param t
154    *          - the throwable to be handled.
155    * @throws PreemptiveFastFailException
156    */
157   private void handleFailureToServer(ServerName serverName, Throwable t) {
158     if (serverName == null || t == null) {
159       return;
160     }
161     long currentTime = EnvironmentEdgeManager.currentTime();
162     FailureInfo fInfo = repeatedFailuresMap.get(serverName);
163     if (fInfo == null) {
164       fInfo = new FailureInfo(currentTime);
165       FailureInfo oldfInfo = repeatedFailuresMap.putIfAbsent(serverName, fInfo);
166 
167       if (oldfInfo != null) {
168         fInfo = oldfInfo;
169       }
170     }
171     fInfo.timeOfLatestAttemptMilliSec = currentTime;
172     fInfo.numConsecutiveFailures.incrementAndGet();
173   }
174 
175   public void handleThrowable(Throwable t1, ServerName serverName,
176       MutableBoolean couldNotCommunicateWithServer) throws IOException {
177     Throwable t2 = translateException(t1);
178     boolean isLocalException = !(t2 instanceof RemoteException);
179     if ((isLocalException && isConnectionException(t2))) {
180       couldNotCommunicateWithServer.setValue(true);
181       handleFailureToServer(serverName, t2);
182     }
183   }
184 
185   private Throwable translateException(Throwable t) throws IOException {
186     if (t instanceof NoSuchMethodError) {
187       // We probably can't recover from this exception by retrying.
188       LOG.error(t);
189       throw (NoSuchMethodError) t;
190     }
191 
192     if (t instanceof NullPointerException) {
193       // The same here. This is probably a bug.
194       LOG.error(t.getMessage(), t);
195       throw (NullPointerException) t;
196     }
197 
198     if (t instanceof UndeclaredThrowableException) {
199       t = t.getCause();
200     }
201     if (t instanceof RemoteException) {
202       t = ((RemoteException) t).unwrapRemoteException();
203     }
204     if (t instanceof DoNotRetryIOException) {
205       throw (DoNotRetryIOException) t;
206     }
207     if (t instanceof Error) {
208       throw (Error) t;
209     }
210     return t;
211   }
212 
213   /**
214    * Check if the exception is something that indicates that we cannot
215    * contact/communicate with the server.
216    *
217    * @param e
218    * @return true when exception indicates that the client wasn't able to make contact with server
219    */
220   private boolean isConnectionException(Throwable e) {
221     if (e == null)
222       return false;
223     // This list covers most connectivity exceptions but not all.
224     // For example, in SocketOutputStream a plain IOException is thrown
225     // at times when the channel is closed.
226     return (e instanceof SocketTimeoutException
227         || e instanceof ConnectException || e instanceof ClosedChannelException
228         || e instanceof SyncFailedException || e instanceof EOFException
229         || e instanceof TimeoutException
230         || e instanceof ConnectionClosingException || e instanceof FailedServerException);
231   }
232 
233   /**
234    * Occasionally cleans up unused information in repeatedFailuresMap.
235    *
236    * repeatedFailuresMap stores the failure information for all remote hosts
237    * that had failures. In order to avoid these from growing indefinitely,
238    * occassionallyCleanupFailureInformation() will clear these up once every
239    * cleanupInterval ms.
240    */
241   protected void occasionallyCleanupFailureInformation() {
242     long now = System.currentTimeMillis();
243     if (!(now > lastFailureMapCleanupTimeMilliSec
244         + failureMapCleanupIntervalMilliSec))
245       return;
246 
247     // remove entries that haven't been attempted in a while
248     // No synchronization needed. It is okay if multiple threads try to
249     // remove the entry again and again from a concurrent hash map.
250     StringBuilder sb = new StringBuilder();
251     for (Entry<ServerName, FailureInfo> entry : repeatedFailuresMap.entrySet()) {
252       if (now > entry.getValue().timeOfLatestAttemptMilliSec
253           + failureMapCleanupIntervalMilliSec) { // no recent failures
254         repeatedFailuresMap.remove(entry.getKey());
255       } else if (now > entry.getValue().timeOfFirstFailureMilliSec
256           + this.fastFailClearingTimeMilliSec) { // been failing for a long
257                                                  // time
258         LOG.error(entry.getKey()
259             + " been failing for a long time. clearing out."
260             + entry.getValue().toString());
261         repeatedFailuresMap.remove(entry.getKey());
262       } else {
263         sb.append(entry.getKey().toString()).append(" failing ")
264             .append(entry.getValue().toString()).append("\n");
265       }
266     }
267     if (sb.length() > 0) {
268       LOG.warn("Preemptive failure enabled for : " + sb.toString());
269     }
270     lastFailureMapCleanupTimeMilliSec = now;
271   }
272 
273   /**
274    * Checks to see if we are in the Fast fail mode for requests to the server.
275    *
276    * If a client is unable to contact a server for more than
277    * fastFailThresholdMilliSec the client will get into fast fail mode.
278    *
279    * @param server
280    * @return true if the client is in fast fail mode for the server.
281    */
282   private boolean inFastFailMode(ServerName server) {
283     FailureInfo fInfo = repeatedFailuresMap.get(server);
284     // if fInfo is null --> The server is considered good.
285     // If the server is bad, wait long enough to believe that the server is
286     // down.
287     return (fInfo != null &&
288         EnvironmentEdgeManager.currentTime() >
289           (fInfo.timeOfFirstFailureMilliSec + this.fastFailThresholdMilliSec));
290   }
291 
292   /**
293    * Checks to see if the current thread is already in FastFail mode for *some*
294    * server.
295    *
296    * @return true, if the thread is already in FF mode.
297    */
298   private boolean currentThreadInFastFailMode() {
299     return (this.threadRetryingInFastFailMode.get() != null && (this.threadRetryingInFastFailMode
300         .get().booleanValue() == true));
301   }
302 
303   /**
304    * Check to see if the client should try to connnect to the server, inspite of
305    * knowing that it is in the fast fail mode.
306    *
307    * The idea here is that we want just one client thread to be actively trying
308    * to reconnect, while all the other threads trying to reach the server will
309    * short circuit.
310    *
311    * @param fInfo
312    * @return true if the client should try to connect to the server.
313    */
314   protected boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) {
315     // We believe that the server is down, But, we want to have just one
316     // client
317     // actively trying to connect. If we are the chosen one, we will retry
318     // and not throw an exception.
319     if (fInfo != null
320         && fInfo.exclusivelyRetringInspiteOfFastFail.compareAndSet(false, true)) {
321       MutableBoolean threadAlreadyInFF = this.threadRetryingInFastFailMode
322           .get();
323       if (threadAlreadyInFF == null) {
324         threadAlreadyInFF = new MutableBoolean();
325         this.threadRetryingInFastFailMode.set(threadAlreadyInFF);
326       }
327       threadAlreadyInFF.setValue(true);
328       return true;
329     } else {
330       return false;
331     }
332   }
333 
334   /**
335    *
336    * This function updates the Failure info for a particular server after the
337    * attempt to 
338    *
339    * @param server
340    * @param fInfo
341    * @param couldNotCommunicate
342    * @param retryDespiteFastFailMode
343    */
344   private void updateFailureInfoForServer(ServerName server,
345       FailureInfo fInfo, boolean didTry, boolean couldNotCommunicate,
346       boolean retryDespiteFastFailMode) {
347     if (server == null || fInfo == null || didTry == false)
348       return;
349 
350     // If we were able to connect to the server, reset the failure
351     // information.
352     if (couldNotCommunicate == false) {
353       LOG.info("Clearing out PFFE for server " + server.getServerName());
354       repeatedFailuresMap.remove(server);
355     } else {
356       // update time of last attempt
357       long currentTime = System.currentTimeMillis();
358       fInfo.timeOfLatestAttemptMilliSec = currentTime;
359 
360       // Release the lock if we were retrying inspite of FastFail
361       if (retryDespiteFastFailMode) {
362         fInfo.exclusivelyRetringInspiteOfFastFail.set(false);
363         threadRetryingInFastFailMode.get().setValue(false);
364       }
365     }
366 
367     occasionallyCleanupFailureInformation();
368   }
369 
370   @Override
371   public void intercept(RetryingCallerInterceptorContext context)
372       throws PreemptiveFastFailException {
373     if (context instanceof FastFailInterceptorContext) {
374       intercept((FastFailInterceptorContext) context);
375     }
376   }
377 
378   @Override
379   public void handleFailure(RetryingCallerInterceptorContext context,
380       Throwable t) throws IOException {
381     if (context instanceof FastFailInterceptorContext) {
382       handleFailure((FastFailInterceptorContext) context, t);
383     }
384   }
385 
386   @Override
387   public void updateFailureInfo(RetryingCallerInterceptorContext context) {
388     if (context instanceof FastFailInterceptorContext) {
389       updateFailureInfo((FastFailInterceptorContext) context);
390     }
391   }
392 
393   @Override
394   public RetryingCallerInterceptorContext createEmptyContext() {
395     return new FastFailInterceptorContext();
396   }
397 
398   protected boolean isServerInFailureMap(ServerName serverName) {
399     return this.repeatedFailuresMap.containsKey(serverName);
400   }
401 
402   @Override
403   public String toString() {
404     return "PreemptiveFastFailInterceptor";
405   }
406 }