1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import java.util.ArrayList;
21 import java.util.List;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.LinkedBlockingQueue;
24
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.hbase.Abortable;
27 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.hbase.classification.InterfaceStability;
30 import org.apache.hadoop.hbase.util.ReflectionUtils;
31
32
33
34
35
36
37 @InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX })
38 @InterfaceStability.Evolving
39 public class BalancedQueueRpcExecutor extends RpcExecutor {
40
41 protected final List<BlockingQueue<CallRunner>> queues;
42 private final QueueBalancer balancer;
43
44 public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
45 final int maxQueueLength) {
46 this(name, handlerCount, numQueues, maxQueueLength, null, null);
47 }
48
49 public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
50 final int maxQueueLength, final Configuration conf, final Abortable abortable) {
51 this(name, handlerCount, numQueues, conf, abortable, LinkedBlockingQueue.class, maxQueueLength);
52 }
53
54 public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
55 final Class<? extends BlockingQueue> queueClass, Object... initargs) {
56 this(name, handlerCount, numQueues, null, null, queueClass, initargs);
57 }
58
59 public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
60 final Configuration conf, final Abortable abortable,
61 final Class<? extends BlockingQueue> queueClass, Object... initargs) {
62 super(name, Math.max(handlerCount, numQueues), conf, abortable);
63 queues = new ArrayList<BlockingQueue<CallRunner>>(numQueues);
64 this.balancer = getBalancer(numQueues);
65 initializeQueues(numQueues, queueClass, initargs);
66 }
67
68 protected void initializeQueues(final int numQueues,
69 final Class<? extends BlockingQueue> queueClass, Object... initargs) {
70 if (initargs.length > 0) {
71 currentQueueLimit = (int) initargs[0];
72 initargs[0] = Math.max((int) initargs[0], DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);
73 }
74 for (int i = 0; i < numQueues; ++i) {
75 queues.add((BlockingQueue<CallRunner>) ReflectionUtils.newInstance(queueClass, initargs));
76 }
77 }
78
79 @Override
80 public boolean dispatch(final CallRunner callTask) throws InterruptedException {
81 int queueIndex = balancer.getNextQueue();
82 BlockingQueue<CallRunner> queue = queues.get(queueIndex);
83
84 if (queue.size() >= currentQueueLimit) {
85 return false;
86 }
87 return queue.offer(callTask);
88 }
89
90 @Override
91 public int getQueueLength() {
92 int length = 0;
93 for (final BlockingQueue<CallRunner> queue : queues) {
94 length += queue.size();
95 }
96 return length;
97 }
98
99 @Override
100 public List<BlockingQueue<CallRunner>> getQueues() {
101 return queues;
102 }
103 }