View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  
21  import java.util.Comparator;
22  import java.util.concurrent.BlockingQueue;
23  import java.util.concurrent.atomic.AtomicLong;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.Abortable;
29  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.hbase.classification.InterfaceStability;
33  import org.apache.hadoop.hbase.conf.ConfigurationObserver;
34  import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
35  
36  /**
37   * A scheduler that maintains isolated handler pools for general,
38   * high-priority, and replication requests.
39   */
40  @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
41  @InterfaceStability.Evolving
42  public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObserver {
43    public static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class);
44  
45    public static final String CALL_QUEUE_READ_SHARE_CONF_KEY =
46        "hbase.ipc.server.callqueue.read.ratio";
47    public static final String CALL_QUEUE_SCAN_SHARE_CONF_KEY =
48        "hbase.ipc.server.callqueue.scan.ratio";
49    public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
50        "hbase.ipc.server.callqueue.handler.factor";
51  
52    /**
53     * The default, 'fifo', has the least friction but is dumb.
54     * If set to 'deadline', uses a priority queue and deprioritizes long-running scans. Sorting by
55     * priority comes at a cost, reduced throughput.
56     */
57    public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
58    public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel";
59    public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
60    public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
61    public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE;
62  
63    /** max delay in msec used to bound the deprioritized requests */
64    public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY
65        = "hbase.ipc.server.queue.max.call.delay";
66  
67    // These 3 are only used by Codel executor
68    public static final String CALL_QUEUE_CODEL_TARGET_DELAY =
69      "hbase.ipc.server.callqueue.codel.target.delay";
70    public static final String CALL_QUEUE_CODEL_INTERVAL =
71      "hbase.ipc.server.callqueue.codel.interval";
72    public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD =
73      "hbase.ipc.server.callqueue.codel.lifo.threshold";
74  
75    public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 100;
76    public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100;
77    public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD = 0.8;
78  
79    private AtomicLong numGeneralCallsDropped = new AtomicLong();
80    private AtomicLong numLifoModeSwitches = new AtomicLong();
81  
82    /**
83     * Resize call queues;
84     * @param conf new configuration
85     */
86    @Override
87    public void onConfigurationChange(Configuration conf) {
88      callExecutor.resizeQueues(conf);
89      if (priorityExecutor != null) {
90        priorityExecutor.resizeQueues(conf);
91      }
92      if (replicationExecutor != null) {
93        replicationExecutor.resizeQueues(conf);
94      }
95  
96      String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY,
97        CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
98  
99      if (isCodelQueueType(callQueueType)) {
100       // update CoDel Scheduler tunables
101       int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY,
102         CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY);
103       int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL,
104         CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
105       double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
106         CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
107 
108       for (BlockingQueue<CallRunner> queue : callExecutor.getQueues()) {
109         if (queue instanceof AdaptiveLifoCoDelCallQueue) {
110           ((AdaptiveLifoCoDelCallQueue) queue).updateTunables(codelTargetDelay,
111             codelInterval, codelLifoThreshold);
112         }
113       }
114     }
115   }
116 
117   /**
118    * Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true.
119    * It uses the calculated "deadline" e.g. to deprioritize long-running job
120    *
121    * If multiple requests have the same deadline BoundedPriorityBlockingQueue will order them in
122    * FIFO (first-in-first-out) manner.
123    */
124   private static class CallPriorityComparator implements Comparator<CallRunner> {
125     private final static int DEFAULT_MAX_CALL_DELAY = 5000;
126 
127     private final PriorityFunction priority;
128     private final int maxDelay;
129 
130     public CallPriorityComparator(final Configuration conf, final PriorityFunction priority) {
131       this.priority = priority;
132       this.maxDelay = conf.getInt(QUEUE_MAX_CALL_DELAY_CONF_KEY, DEFAULT_MAX_CALL_DELAY);
133     }
134 
135     @Override
136     public int compare(CallRunner a, CallRunner b) {
137       RpcServer.Call callA = a.getCall();
138       RpcServer.Call callB = b.getCall();
139       long deadlineA = priority.getDeadline(callA.getHeader(), callA.param);
140       long deadlineB = priority.getDeadline(callB.getHeader(), callB.param);
141       deadlineA = callA.timestamp + Math.min(deadlineA, maxDelay);
142       deadlineB = callB.timestamp + Math.min(deadlineB, maxDelay);
143       return (int)(deadlineA - deadlineB);
144     }
145   }
146 
147   private int port;
148   private final PriorityFunction priority;
149   private final RpcExecutor callExecutor;
150   private final RpcExecutor priorityExecutor;
151   private final RpcExecutor replicationExecutor;
152 
153   /** What level a high priority call is at. */
154   private final int highPriorityLevel;
155 
156   private Abortable abortable = null;
157 
158   /**
159    * @param conf
160    * @param handlerCount the number of handler threads that will be used to process calls
161    * @param priorityHandlerCount How many threads for priority handling.
162    * @param replicationHandlerCount How many threads for replication handling.
163    * @param highPriorityLevel
164    * @param priority Function to extract request priority.
165    */
166   public SimpleRpcScheduler(
167       Configuration conf,
168       int handlerCount,
169       int priorityHandlerCount,
170       int replicationHandlerCount,
171       PriorityFunction priority,
172       Abortable server,
173       int highPriorityLevel) {
174     int maxQueueLength = conf.getInt("hbase.ipc.server.max.callqueue.length",
175         handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
176     int maxPriorityQueueLength =
177         conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, maxQueueLength);
178     this.priority = priority;
179     this.highPriorityLevel = highPriorityLevel;
180     this.abortable = server;
181 
182     String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
183     float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
184     float callqScanShare = conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
185 
186     float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
187     int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));
188 
189     int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY,
190         CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY);
191       int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL,
192         CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
193       double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
194         CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
195 
196     LOG.info("Using " + callQueueType + " as user call queue, count=" + numCallQueues);
197 
198     if (numCallQueues > 1 && callqReadShare > 0) {
199       // multiple read/write queues
200       if (isDeadlineQueueType(callQueueType)) {
201         CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
202         callExecutor = new RWQueueRpcExecutor("DeadlineRWQ.default", handlerCount, numCallQueues,
203             callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
204             BoundedPriorityBlockingQueue.class, callPriority);
205       } else if (isCodelQueueType(callQueueType)) {
206         Object[] callQueueInitArgs = {maxQueueLength, codelTargetDelay, codelInterval,
207           codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches};
208         callExecutor = new RWQueueRpcExecutor("CodelRWQ.default", handlerCount,
209           numCallQueues, callqReadShare, callqScanShare,
210           AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs,
211           AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs);
212       } else {
213         callExecutor = new RWQueueRpcExecutor("FifoRWQ.default", handlerCount, numCallQueues,
214           callqReadShare, callqScanShare, maxQueueLength, conf, abortable);
215       }
216     } else {
217       // multiple queues
218       if (isDeadlineQueueType(callQueueType)) {
219         CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
220         callExecutor = new BalancedQueueRpcExecutor("BQDeadline.default", handlerCount, numCallQueues,
221           conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
222       } else if (isCodelQueueType(callQueueType)) {
223         callExecutor = new BalancedQueueRpcExecutor("CodelFPBQ.default", handlerCount, numCallQueues,
224           conf, abortable, AdaptiveLifoCoDelCallQueue.class, maxQueueLength,
225           codelTargetDelay, codelInterval, codelLifoThreshold,
226           numGeneralCallsDropped, numLifoModeSwitches);
227       } else {
228 				// FifoWFPBQ = FastPathBalancedQueueRpcExecutor
229         callExecutor = new FastPathBalancedQueueRpcExecutor("FifoWFPBQ.default",
230           handlerCount, numCallQueues, maxQueueLength, conf, abortable);
231       }
232     }
233 
234     // Create 2 queues to help priorityExecutor be more scalable.
235     this.priorityExecutor = priorityHandlerCount > 0?
236         new FastPathBalancedQueueRpcExecutor("FifoWFPBQ.priority", priorityHandlerCount,
237             2, maxPriorityQueueLength, conf, abortable): null;
238         this.replicationExecutor = replicationHandlerCount > 0?
239             new FastPathBalancedQueueRpcExecutor("FifoWFPBQ.replication",
240                 replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null;
241   }
242 
243   private static boolean isDeadlineQueueType(final String callQueueType) {
244     return callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
245   }
246 
247   private static boolean isCodelQueueType(final String callQueueType) {
248     return callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
249   }
250 
251   public SimpleRpcScheduler(
252 	      Configuration conf,
253 	      int handlerCount,
254 	      int priorityHandlerCount,
255 	      int replicationHandlerCount,
256 	      PriorityFunction priority,
257 	      int highPriorityLevel) {
258 	  this(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, priority,
259 	    null, highPriorityLevel);
260   }
261 
262   @Override
263   public void init(Context context) {
264     this.port = context.getListenerAddress().getPort();
265   }
266 
267   @Override
268   public void start() {
269     callExecutor.start(port);
270     if (priorityExecutor != null) priorityExecutor.start(port);
271     if (replicationExecutor != null) replicationExecutor.start(port);
272   }
273 
274   @Override
275   public void stop() {
276     callExecutor.stop();
277     if (priorityExecutor != null) priorityExecutor.stop();
278     if (replicationExecutor != null) replicationExecutor.stop();
279   }
280 
281   @Override
282   public boolean dispatch(CallRunner callTask) throws InterruptedException {
283     RpcServer.Call call = callTask.getCall();
284     int level = priority.getPriority(call.getHeader(), call.param);
285     if (priorityExecutor != null && level > highPriorityLevel) {
286       return priorityExecutor.dispatch(callTask);
287     } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
288       return replicationExecutor.dispatch(callTask);
289     } else {
290       return callExecutor.dispatch(callTask);
291     }
292   }
293 
294   @Override
295   public int getGeneralQueueLength() {
296     return callExecutor.getQueueLength();
297   }
298 
299   @Override
300   public int getPriorityQueueLength() {
301     return priorityExecutor == null ? 0 : priorityExecutor.getQueueLength();
302   }
303 
304   @Override
305   public int getReplicationQueueLength() {
306     return replicationExecutor == null ? 0 : replicationExecutor.getQueueLength();
307   }
308 
309   @Override
310   public int getActiveRpcHandlerCount() {
311     return callExecutor.getActiveHandlerCount() +
312            (priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount()) +
313            (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount());
314   }
315 
316   @Override
317   public long getNumGeneralCallsDropped() {
318     return numGeneralCallsDropped.get();
319   }
320 
321   @Override
322   public long getNumLifoModeSwitches() {
323     return numLifoModeSwitches.get();
324   }
325 }
326