1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.ipc;
20
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.LinkedBlockingQueue;
25
26 import org.apache.commons.lang.ArrayUtils;
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.Abortable;
31 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.hbase.classification.InterfaceStability;
34 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
35 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
36 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
37 import org.apache.hadoop.hbase.protobuf.generated
38 .RegionServerStatusProtos.ReportRegionStateTransitionRequest;
39 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
40 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
41 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
42 import org.apache.hadoop.hbase.util.ReflectionUtils;
43
44 import com.google.protobuf.Message;
45
46
47
48
49
50
51 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
52 @InterfaceStability.Evolving
53 public class RWQueueRpcExecutor extends RpcExecutor {
54 private static final Log LOG = LogFactory.getLog(RWQueueRpcExecutor.class);
55
56 private final List<BlockingQueue<CallRunner>> queues;
57 private final QueueBalancer writeBalancer;
58 private final QueueBalancer readBalancer;
59 private final QueueBalancer scanBalancer;
60 private final int writeHandlersCount;
61 private final int readHandlersCount;
62 private final int scanHandlersCount;
63 private final int numWriteQueues;
64 private final int numReadQueues;
65 private final int numScanQueues;
66
67 public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
68 final float readShare, final int maxQueueLength,
69 final Configuration conf, final Abortable abortable) {
70 this(name, handlerCount, numQueues, readShare, maxQueueLength, 0,
71 conf, abortable, LinkedBlockingQueue.class);
72 }
73
74 public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
75 final float readShare, final float scanShare, final int maxQueueLength) {
76 this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength, null, null);
77 }
78
79 public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
80 final float readShare, final float scanShare, final int maxQueueLength,
81 final Configuration conf, final Abortable abortable) {
82 this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength,
83 conf, abortable, LinkedBlockingQueue.class);
84 }
85
86 public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
87 final float readShare, final int maxQueueLength,
88 final Configuration conf, final Abortable abortable,
89 final Class<? extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
90 this(name, handlerCount, numQueues, readShare, 0, maxQueueLength, conf, abortable,
91 readQueueClass, readQueueInitArgs);
92 }
93
94 public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
95 final float readShare, final float scanShare, final int maxQueueLength,
96 final Configuration conf, final Abortable abortable,
97 final Class<? extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
98 this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare),
99 calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), scanShare,
100 LinkedBlockingQueue.class, new Object[] {maxQueueLength},
101 readQueueClass, ArrayUtils.addAll(new Object[] {maxQueueLength}, readQueueInitArgs));
102 }
103
104 public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
105 final float readShare, final float scanShare,
106 final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
107 final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
108 this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare),
109 calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), scanShare,
110 writeQueueClass, writeQueueInitArgs,
111 readQueueClass, readQueueInitArgs);
112 }
113
114 public RWQueueRpcExecutor(final String name, final int writeHandlers, final int readHandlers,
115 final int numWriteQueues, final int numReadQueues,
116 final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
117 final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
118 this(name, writeHandlers, readHandlers, numWriteQueues, numReadQueues, 0,
119 writeQueueClass, writeQueueInitArgs, readQueueClass, readQueueInitArgs);
120 }
121
122 public RWQueueRpcExecutor(final String name, int writeHandlers, int readHandlers,
123 int numWriteQueues, int numReadQueues, float scanShare,
124 final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
125 final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
126 super(name, Math.max(writeHandlers, numWriteQueues) + Math.max(readHandlers, numReadQueues));
127
128 int numScanQueues = Math.max(0, (int)Math.floor(numReadQueues * scanShare));
129 int scanHandlers = Math.max(0, (int)Math.floor(readHandlers * scanShare));
130 if ((numReadQueues - numScanQueues) > 0) {
131 numReadQueues -= numScanQueues;
132 readHandlers -= scanHandlers;
133 } else {
134 numScanQueues = 0;
135 scanHandlers = 0;
136 }
137
138 this.writeHandlersCount = Math.max(writeHandlers, numWriteQueues);
139 this.readHandlersCount = Math.max(readHandlers, numReadQueues);
140 this.scanHandlersCount = Math.max(scanHandlers, numScanQueues);
141 this.numWriteQueues = numWriteQueues;
142 this.numReadQueues = numReadQueues;
143 this.numScanQueues = numScanQueues;
144 this.writeBalancer = getBalancer(numWriteQueues);
145 this.readBalancer = getBalancer(numReadQueues);
146 this.scanBalancer = numScanQueues > 0 ? getBalancer(numScanQueues) : null;
147
148 queues = new ArrayList<BlockingQueue<CallRunner>>(writeHandlersCount + readHandlersCount);
149 LOG.debug(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount +
150 " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount +
151 ((numScanQueues == 0) ? "" : " scanQueues=" + numScanQueues +
152 " scanHandlers=" + scanHandlersCount));
153 if (writeQueueInitArgs.length > 0) {
154 currentQueueLimit = (int) writeQueueInitArgs[0];
155 writeQueueInitArgs[0] = Math.max((int) writeQueueInitArgs[0],
156 DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);
157 }
158 for (int i = 0; i < numWriteQueues; ++i) {
159
160 queues.add((BlockingQueue<CallRunner>)
161 ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs));
162 }
163
164 if (readQueueInitArgs.length > 0) {
165 currentQueueLimit = (int) readQueueInitArgs[0];
166 readQueueInitArgs[0] = Math.max((int) readQueueInitArgs[0],
167 DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);
168 }
169 for (int i = 0; i < (numReadQueues + numScanQueues); ++i) {
170 queues.add((BlockingQueue<CallRunner>)
171 ReflectionUtils.newInstance(readQueueClass, readQueueInitArgs));
172 }
173 }
174
175 @Override
176 protected void startHandlers(final int port) {
177 startHandlers(".write", writeHandlersCount, queues, 0, numWriteQueues, port);
178 startHandlers(".read", readHandlersCount, queues, numWriteQueues, numReadQueues, port);
179 startHandlers(".scan", scanHandlersCount, queues,
180 numWriteQueues + numReadQueues, numScanQueues, port);
181 }
182
183 @Override
184 public boolean dispatch(final CallRunner callTask) throws InterruptedException {
185 RpcServer.Call call = callTask.getCall();
186 int queueIndex;
187 if (isWriteRequest(call.getHeader(), call.param)) {
188 queueIndex = writeBalancer.getNextQueue();
189 } else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.param)) {
190 queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue();
191 } else {
192 queueIndex = numWriteQueues + readBalancer.getNextQueue();
193 }
194
195 BlockingQueue<CallRunner> queue = queues.get(queueIndex);
196 if (queue.size() >= currentQueueLimit) {
197 return false;
198 }
199 return queue.offer(callTask);
200 }
201
202 private boolean isWriteRequest(final RequestHeader header, final Message param) {
203
204 if (param instanceof MultiRequest) {
205 MultiRequest multi = (MultiRequest)param;
206 for (RegionAction regionAction : multi.getRegionActionList()) {
207 for (Action action: regionAction.getActionList()) {
208 if (action.hasMutation()) {
209 return true;
210 }
211 }
212 }
213 }
214 if (param instanceof MutateRequest) {
215 return true;
216 }
217 if (param instanceof ReportRegionStateTransitionRequest) {
218 return true;
219 }
220 return false;
221 }
222
223 private boolean isScanRequest(final RequestHeader header, final Message param) {
224 if (param instanceof ScanRequest) {
225
226 ScanRequest request = (ScanRequest)param;
227 return request.hasScannerId();
228 }
229 return false;
230 }
231
232 @Override
233 public int getQueueLength() {
234 int length = 0;
235 for (final BlockingQueue<CallRunner> queue: queues) {
236 length += queue.size();
237 }
238 return length;
239 }
240
241 @Override
242 protected List<BlockingQueue<CallRunner>> getQueues() {
243 return queues;
244 }
245
246
247
248
249
250 private static int calcNumWriters(final int count, final float readShare) {
251 return Math.max(1, count - Math.max(1, (int)Math.round(count * readShare)));
252 }
253
254
255
256
257
258 private static int calcNumReaders(final int count, final float readShare) {
259 return count - calcNumWriters(count, readShare);
260 }
261 }