1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20
21 import java.util.Comparator;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.atomic.AtomicLong;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.Abortable;
29 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.hbase.classification.InterfaceStability;
33 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
34 import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
35
36
37
38
39
40 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
41 @InterfaceStability.Evolving
42 public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObserver {
43 public static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class);
44
45 public static final String CALL_QUEUE_READ_SHARE_CONF_KEY =
46 "hbase.ipc.server.callqueue.read.ratio";
47 public static final String CALL_QUEUE_SCAN_SHARE_CONF_KEY =
48 "hbase.ipc.server.callqueue.scan.ratio";
49 public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
50 "hbase.ipc.server.callqueue.handler.factor";
51
52
53
54
55
56
57 public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
58 public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel";
59 public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
60 public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
61 public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE;
62
63
64 public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY
65 = "hbase.ipc.server.queue.max.call.delay";
66
67
68 public static final String CALL_QUEUE_CODEL_TARGET_DELAY =
69 "hbase.ipc.server.callqueue.codel.target.delay";
70 public static final String CALL_QUEUE_CODEL_INTERVAL =
71 "hbase.ipc.server.callqueue.codel.interval";
72 public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD =
73 "hbase.ipc.server.callqueue.codel.lifo.threshold";
74
75 public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 100;
76 public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100;
77 public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD = 0.8;
78
79 private AtomicLong numGeneralCallsDropped = new AtomicLong();
80 private AtomicLong numLifoModeSwitches = new AtomicLong();
81
82
83
84
85
86 @Override
87 public void onConfigurationChange(Configuration conf) {
88 callExecutor.resizeQueues(conf);
89 if (priorityExecutor != null) {
90 priorityExecutor.resizeQueues(conf);
91 }
92 if (replicationExecutor != null) {
93 replicationExecutor.resizeQueues(conf);
94 }
95
96 String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY,
97 CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
98
99 if (isCodelQueueType(callQueueType)) {
100
101 int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY,
102 CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY);
103 int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL,
104 CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
105 double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
106 CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
107
108 for (BlockingQueue<CallRunner> queue : callExecutor.getQueues()) {
109 if (queue instanceof AdaptiveLifoCoDelCallQueue) {
110 ((AdaptiveLifoCoDelCallQueue) queue).updateTunables(codelTargetDelay,
111 codelInterval, codelLifoThreshold);
112 }
113 }
114 }
115 }
116
117
118
119
120
121
122
123
124 private static class CallPriorityComparator implements Comparator<CallRunner> {
125 private final static int DEFAULT_MAX_CALL_DELAY = 5000;
126
127 private final PriorityFunction priority;
128 private final int maxDelay;
129
130 public CallPriorityComparator(final Configuration conf, final PriorityFunction priority) {
131 this.priority = priority;
132 this.maxDelay = conf.getInt(QUEUE_MAX_CALL_DELAY_CONF_KEY, DEFAULT_MAX_CALL_DELAY);
133 }
134
135 @Override
136 public int compare(CallRunner a, CallRunner b) {
137 RpcServer.Call callA = a.getCall();
138 RpcServer.Call callB = b.getCall();
139 long deadlineA = priority.getDeadline(callA.getHeader(), callA.param);
140 long deadlineB = priority.getDeadline(callB.getHeader(), callB.param);
141 deadlineA = callA.timestamp + Math.min(deadlineA, maxDelay);
142 deadlineB = callB.timestamp + Math.min(deadlineB, maxDelay);
143 return (int)(deadlineA - deadlineB);
144 }
145 }
146
147 private int port;
148 private final PriorityFunction priority;
149 private final RpcExecutor callExecutor;
150 private final RpcExecutor priorityExecutor;
151 private final RpcExecutor replicationExecutor;
152
153
154 private final int highPriorityLevel;
155
156 private Abortable abortable = null;
157
158
159
160
161
162
163
164
165
166 public SimpleRpcScheduler(
167 Configuration conf,
168 int handlerCount,
169 int priorityHandlerCount,
170 int replicationHandlerCount,
171 PriorityFunction priority,
172 Abortable server,
173 int highPriorityLevel) {
174 int maxQueueLength = conf.getInt("hbase.ipc.server.max.callqueue.length",
175 handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
176 int maxPriorityQueueLength =
177 conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, maxQueueLength);
178 this.priority = priority;
179 this.highPriorityLevel = highPriorityLevel;
180 this.abortable = server;
181
182 String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
183 float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
184 float callqScanShare = conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
185
186 float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
187 int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));
188
189 int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY,
190 CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY);
191 int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL,
192 CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
193 double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
194 CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
195
196 LOG.info("Using " + callQueueType + " as user call queue, count=" + numCallQueues);
197
198 if (numCallQueues > 1 && callqReadShare > 0) {
199
200 if (isDeadlineQueueType(callQueueType)) {
201 CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
202 callExecutor = new RWQueueRpcExecutor("DeadlineRWQ.default", handlerCount, numCallQueues,
203 callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
204 BoundedPriorityBlockingQueue.class, callPriority);
205 } else if (isCodelQueueType(callQueueType)) {
206 Object[] callQueueInitArgs = {maxQueueLength, codelTargetDelay, codelInterval,
207 codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches};
208 callExecutor = new RWQueueRpcExecutor("CodelRWQ.default", handlerCount,
209 numCallQueues, callqReadShare, callqScanShare,
210 AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs,
211 AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs);
212 } else {
213 callExecutor = new RWQueueRpcExecutor("FifoRWQ.default", handlerCount, numCallQueues,
214 callqReadShare, callqScanShare, maxQueueLength, conf, abortable);
215 }
216 } else {
217
218 if (isDeadlineQueueType(callQueueType)) {
219 CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
220 callExecutor = new BalancedQueueRpcExecutor("BQDeadline.default", handlerCount, numCallQueues,
221 conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
222 } else if (isCodelQueueType(callQueueType)) {
223 callExecutor = new BalancedQueueRpcExecutor("CodelFPBQ.default", handlerCount, numCallQueues,
224 conf, abortable, AdaptiveLifoCoDelCallQueue.class, maxQueueLength,
225 codelTargetDelay, codelInterval, codelLifoThreshold,
226 numGeneralCallsDropped, numLifoModeSwitches);
227 } else {
228
229 callExecutor = new FastPathBalancedQueueRpcExecutor("FifoWFPBQ.default",
230 handlerCount, numCallQueues, maxQueueLength, conf, abortable);
231 }
232 }
233
234
235 this.priorityExecutor = priorityHandlerCount > 0?
236 new FastPathBalancedQueueRpcExecutor("FifoWFPBQ.priority", priorityHandlerCount,
237 2, maxPriorityQueueLength, conf, abortable): null;
238 this.replicationExecutor = replicationHandlerCount > 0?
239 new FastPathBalancedQueueRpcExecutor("FifoWFPBQ.replication",
240 replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null;
241 }
242
243 private static boolean isDeadlineQueueType(final String callQueueType) {
244 return callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
245 }
246
247 private static boolean isCodelQueueType(final String callQueueType) {
248 return callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
249 }
250
251 public SimpleRpcScheduler(
252 Configuration conf,
253 int handlerCount,
254 int priorityHandlerCount,
255 int replicationHandlerCount,
256 PriorityFunction priority,
257 int highPriorityLevel) {
258 this(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, priority,
259 null, highPriorityLevel);
260 }
261
262 @Override
263 public void init(Context context) {
264 this.port = context.getListenerAddress().getPort();
265 }
266
267 @Override
268 public void start() {
269 callExecutor.start(port);
270 if (priorityExecutor != null) priorityExecutor.start(port);
271 if (replicationExecutor != null) replicationExecutor.start(port);
272 }
273
274 @Override
275 public void stop() {
276 callExecutor.stop();
277 if (priorityExecutor != null) priorityExecutor.stop();
278 if (replicationExecutor != null) replicationExecutor.stop();
279 }
280
281 @Override
282 public boolean dispatch(CallRunner callTask) throws InterruptedException {
283 RpcServer.Call call = callTask.getCall();
284 int level = priority.getPriority(call.getHeader(), call.param);
285 if (priorityExecutor != null && level > highPriorityLevel) {
286 return priorityExecutor.dispatch(callTask);
287 } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
288 return replicationExecutor.dispatch(callTask);
289 } else {
290 return callExecutor.dispatch(callTask);
291 }
292 }
293
294 @Override
295 public int getGeneralQueueLength() {
296 return callExecutor.getQueueLength();
297 }
298
299 @Override
300 public int getPriorityQueueLength() {
301 return priorityExecutor == null ? 0 : priorityExecutor.getQueueLength();
302 }
303
304 @Override
305 public int getReplicationQueueLength() {
306 return replicationExecutor == null ? 0 : replicationExecutor.getQueueLength();
307 }
308
309 @Override
310 public int getActiveRpcHandlerCount() {
311 return callExecutor.getActiveHandlerCount() +
312 (priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount()) +
313 (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount());
314 }
315
316 @Override
317 public long getNumGeneralCallsDropped() {
318 return numGeneralCallsDropped.get();
319 }
320
321 @Override
322 public long getNumLifoModeSwitches() {
323 return numLifoModeSwitches.get();
324 }
325 }
326