View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.ipc;
20  
21  
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.concurrent.BlockingQueue;
25  import java.util.concurrent.ThreadLocalRandom;
26  import java.util.concurrent.atomic.AtomicInteger;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.hbase.Abortable;
32  import org.apache.hadoop.hbase.HConstants;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
35  import org.apache.hadoop.util.StringUtils;
36  
37  import com.google.common.base.Preconditions;
38  import com.google.common.base.Strings;
39  
40  /**
41   * Runs the CallRunners passed here via {@link #dispatch(CallRunner)}. Subclass and add particular
42   * scheduling behavior.
43   */
44  @InterfaceAudience.Private
45  public abstract class RpcExecutor {
46    private static final Log LOG = LogFactory.getLog(RpcExecutor.class);
47  
48    protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250;
49    protected volatile int currentQueueLimit;
50  
51    private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
52    private final List<Handler> handlers;
53    private final int handlerCount;
54    private final String name;
55    private final AtomicInteger failedHandlerCount = new AtomicInteger(0);
56  
57    private boolean running;
58  
59    private Configuration conf = null;
60    private Abortable abortable = null;
61  
62    public RpcExecutor(final String name, final int handlerCount) {
63      this.handlers = new ArrayList<Handler>(handlerCount);
64      this.handlerCount = handlerCount;
65      this.name = Strings.nullToEmpty(name);
66    }
67  
68    public RpcExecutor(final String name, final int handlerCount, final Configuration conf,
69        final Abortable abortable) {
70      this(name, handlerCount);
71      this.conf = conf;
72      this.abortable = abortable;
73    }
74  
75    public void start(final int port) {
76      running = true;
77      startHandlers(port);
78    }
79  
80    public void stop() {
81      running = false;
82      for (Thread handler : handlers) {
83        handler.interrupt();
84      }
85    }
86  
87    public int getActiveHandlerCount() {
88      return activeHandlerCount.get();
89    }
90  
91    /** Returns the length of the pending queue */
92    public abstract int getQueueLength();
93  
94    /** Add the request to the executor queue */
95    public abstract boolean dispatch(final CallRunner callTask) throws InterruptedException;
96  
97    /** Returns the list of request queues */
98    protected abstract List<BlockingQueue<CallRunner>> getQueues();
99  
100   protected void startHandlers(final int port) {
101     List<BlockingQueue<CallRunner>> callQueues = getQueues();
102     startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port);
103   }
104 
105   /**
106    * Override if providing alternate Handler implementation.
107    */
108   protected Handler getHandler(final String name, final double handlerFailureThreshhold,
109       final BlockingQueue<CallRunner> q) {
110     return new Handler(name, handlerFailureThreshhold, q);
111   }
112 
113   /**
114    * Start up our handlers.
115    */
116   protected void startHandlers(final String nameSuffix, final int numHandlers,
117       final List<BlockingQueue<CallRunner>> callQueues,
118       final int qindex, final int qsize, final int port) {
119     final String threadPrefix = name + Strings.nullToEmpty(nameSuffix);
120     double handlerFailureThreshhold =
121         conf == null ? 1.0 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT,
122           HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT);
123     for (int i = 0; i < numHandlers; i++) {
124       final int index = qindex + (i % qsize);
125       String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" +
126           index + ",port=" + port;
127       Handler handler = getHandler(name, handlerFailureThreshhold, callQueues.get(index));
128       handler.start();
129       LOG.debug("Started " + name);
130       handlers.add(handler);
131     }
132   }
133 
134   /**
135    * Handler thread run the {@link CallRunner#run()} in.
136    */
137   protected class Handler extends Thread {
138     /**
139      * Q to find CallRunners to run in.
140      */
141     final BlockingQueue<CallRunner> q;
142 
143     final double handlerFailureThreshhold;
144 
145     Handler(final String name, final double handlerFailureThreshhold,
146         final BlockingQueue<CallRunner> q) {
147       super(name);
148       setDaemon(true);
149       this.q = q;
150       this.handlerFailureThreshhold = handlerFailureThreshhold;
151     }
152 
153     /**
154      * @return A {@link CallRunner}
155      * @throws InterruptedException
156      */
157     protected CallRunner getCallRunner() throws InterruptedException {
158       return this.q.take();
159     }
160 
161     @Override
162     public void run() {
163       boolean interrupted = false;
164       try {
165         while (running) {
166           try {
167             run(getCallRunner());
168           } catch (InterruptedException e) {
169             interrupted = true;
170           }
171         }
172       } catch (Exception e) {
173         LOG.warn(e);
174         throw e;
175       } finally {
176         if (interrupted) {
177           Thread.currentThread().interrupt();
178         }
179       }
180     }
181 
182     private void run(CallRunner cr) {
183       MonitoredRPCHandler status = RpcServer.getStatus();
184       cr.setStatus(status);
185       try {
186         activeHandlerCount.incrementAndGet();
187         cr.run();
188       } catch (Throwable e) {
189         if (e instanceof Error) {
190           int failedCount = failedHandlerCount.incrementAndGet();
191           if (this.handlerFailureThreshhold >= 0 &&
192               failedCount > handlerCount * this.handlerFailureThreshhold) {
193             String message = "Number of failed RpcServer handler runs exceeded threshhold " +
194               this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e);
195             if (abortable != null) {
196               abortable.abort(message, e);
197             } else {
198               LOG.error("Error but can't abort because abortable is null: " +
199                   StringUtils.stringifyException(e));
200               throw e;
201             }
202           } else {
203             LOG.warn("Handler errors " + StringUtils.stringifyException(e));
204           }
205         } else {
206           LOG.warn("Handler  exception " + StringUtils.stringifyException(e));
207         }
208       } finally {
209         activeHandlerCount.decrementAndGet();
210       }
211     }
212   }
213 
214   public static abstract class QueueBalancer {
215     /**
216      * @return the index of the next queue to which a request should be inserted
217      */
218     public abstract int getNextQueue();
219   }
220 
221   public static QueueBalancer getBalancer(int queueSize) {
222     Preconditions.checkArgument(queueSize > 0, "Queue size is <= 0, must be at least 1");
223     if (queueSize == 1) {
224       return ONE_QUEUE;
225     } else {
226       return new RandomQueueBalancer(queueSize);
227     }
228   }
229 
230   /**
231    * All requests go to the first queue, at index 0
232    */
233   private static QueueBalancer ONE_QUEUE = new QueueBalancer() {
234     @Override
235     public int getNextQueue() {
236       return 0;
237     }
238   };
239 
240   /**
241    * Queue balancer that just randomly selects a queue in the range [0, num queues).
242    */
243   private static class RandomQueueBalancer extends QueueBalancer {
244     private final int queueSize;
245 
246     public RandomQueueBalancer(int queueSize) {
247       this.queueSize = queueSize;
248     }
249 
250     public int getNextQueue() {
251       return ThreadLocalRandom.current().nextInt(queueSize);
252     }
253   }
254 
255   /**
256    * Update current soft limit for executor's call queues
257    * @param conf updated configuration
258    */
259   public void resizeQueues(Configuration conf) {
260     currentQueueLimit = conf.getInt("hbase.ipc.server.max.callqueue.length", currentQueueLimit);
261   }
262 }