1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertNotEquals;
23 import static org.junit.Assert.assertTrue;
24 import static org.mockito.Matchers.any;
25 import static org.mockito.Matchers.anyObject;
26 import static org.mockito.Matchers.eq;
27 import static org.mockito.Mockito.doAnswer;
28 import static org.mockito.Mockito.mock;
29 import static org.mockito.Mockito.timeout;
30 import static org.mockito.Mockito.verify;
31 import static org.mockito.Mockito.when;
32
33 import java.io.IOException;
34 import java.net.InetSocketAddress;
35 import java.util.ArrayList;
36 import java.util.HashSet;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.Set;
40 import java.util.concurrent.BlockingQueue;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.LinkedBlockingQueue;
43
44 import org.apache.commons.logging.Log;
45 import org.apache.commons.logging.LogFactory;
46 import org.apache.hadoop.conf.Configuration;
47 import org.apache.hadoop.hbase.HBaseConfiguration;
48 import org.apache.hadoop.hbase.HConstants;
49 import org.apache.hadoop.hbase.client.Put;
50 import org.apache.hadoop.hbase.ipc.RpcServer.Call;
51 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
52 import org.apache.hadoop.hbase.protobuf.RequestConverter;
53 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
54 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
55 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
56 import org.apache.hadoop.hbase.security.User;
57 import org.apache.hadoop.hbase.testclassification.SmallTests;
58 import org.apache.hadoop.hbase.util.Bytes;
59 import org.apache.hadoop.hbase.util.EnvironmentEdge;
60 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
61 import org.apache.hadoop.hbase.util.Threads;
62 import org.junit.Before;
63 import org.junit.Test;
64 import org.junit.experimental.categories.Category;
65 import org.mockito.invocation.InvocationOnMock;
66 import org.mockito.stubbing.Answer;
67
68 import com.google.common.collect.ImmutableList;
69 import com.google.common.collect.ImmutableMap;
70 import com.google.common.collect.ImmutableSet;
71 import com.google.common.collect.Maps;
72 import com.google.protobuf.Message;
73
74 @Category(SmallTests.class)
75 public class TestSimpleRpcScheduler {
76 public static final Log LOG = LogFactory.getLog(TestSimpleRpcScheduler.class);
77
78 private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() {
79 @Override
80 public InetSocketAddress getListenerAddress() {
81 return InetSocketAddress.createUnresolved("127.0.0.1", 1000);
82 }
83 };
84 private Configuration conf;
85
86 @Before
87 public void setUp() {
88 conf = HBaseConfiguration.create();
89 }
90
91 @Test
92 public void testBasic() throws IOException, InterruptedException {
93 PriorityFunction qosFunction = mock(PriorityFunction.class);
94 RpcScheduler scheduler = new SimpleRpcScheduler(
95 conf, 10, 0, 0, qosFunction, 0);
96 scheduler.init(CONTEXT);
97 scheduler.start();
98 CallRunner task = createMockTask();
99 task.setStatus(new MonitoredRPCHandlerImpl());
100 scheduler.dispatch(task);
101 verify(task, timeout(1000)).run();
102 scheduler.stop();
103 }
104
105 @Test
106 public void testHandlerIsolation() throws IOException, InterruptedException {
107 CallRunner generalTask = createMockTask();
108 CallRunner priorityTask = createMockTask();
109 CallRunner replicationTask = createMockTask();
110 List<CallRunner> tasks = ImmutableList.of(
111 generalTask,
112 priorityTask,
113 replicationTask);
114 Map<CallRunner, Integer> qos = ImmutableMap.of(
115 generalTask, 0,
116 priorityTask, HConstants.HIGH_QOS + 1,
117 replicationTask, HConstants.REPLICATION_QOS);
118 PriorityFunction qosFunction = mock(PriorityFunction.class);
119 final Map<CallRunner, Thread> handlerThreads = Maps.newHashMap();
120 final CountDownLatch countDownLatch = new CountDownLatch(tasks.size());
121 Answer<Void> answerToRun = new Answer<Void>() {
122 @Override
123 public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
124 synchronized (handlerThreads) {
125 handlerThreads.put(
126 (CallRunner) invocationOnMock.getMock(),
127 Thread.currentThread());
128 }
129 countDownLatch.countDown();
130 return null;
131 }
132 };
133 for (CallRunner task : tasks) {
134 task.setStatus(new MonitoredRPCHandlerImpl());
135 doAnswer(answerToRun).when(task).run();
136 }
137
138 RpcScheduler scheduler = new SimpleRpcScheduler(
139 conf, 1, 1 ,1, qosFunction, HConstants.HIGH_QOS);
140 scheduler.init(CONTEXT);
141 scheduler.start();
142 for (CallRunner task : tasks) {
143 when(qosFunction.getPriority((RPCProtos.RequestHeader) anyObject(), (Message) anyObject()))
144 .thenReturn(qos.get(task));
145 scheduler.dispatch(task);
146 }
147 for (CallRunner task : tasks) {
148 verify(task, timeout(1000)).run();
149 }
150 scheduler.stop();
151
152
153 countDownLatch.await();
154 assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size());
155 }
156
157 private CallRunner createMockTask() {
158 Call call = mock(Call.class);
159 CallRunner task = mock(CallRunner.class);
160 when(task.getCall()).thenReturn(call);
161 return task;
162 }
163
164 @Test
165 public void testRpcScheduler() throws Exception {
166 testRpcScheduler(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
167 testRpcScheduler(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
168 }
169
170 private void testRpcScheduler(final String queueType) throws Exception {
171 Configuration schedConf = HBaseConfiguration.create();
172 schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY, queueType);
173
174 PriorityFunction priority = mock(PriorityFunction.class);
175 when(priority.getPriority(any(RequestHeader.class), any(Message.class)))
176 .thenReturn(HConstants.NORMAL_QOS);
177
178 RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority,
179 HConstants.QOS_THRESHOLD);
180 try {
181 scheduler.start();
182
183 CallRunner smallCallTask = mock(CallRunner.class);
184 RpcServer.Call smallCall = mock(RpcServer.Call.class);
185 RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build();
186 when(smallCallTask.getCall()).thenReturn(smallCall);
187 when(smallCall.getHeader()).thenReturn(smallHead);
188
189 CallRunner largeCallTask = mock(CallRunner.class);
190 RpcServer.Call largeCall = mock(RpcServer.Call.class);
191 RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build();
192 when(largeCallTask.getCall()).thenReturn(largeCall);
193 when(largeCall.getHeader()).thenReturn(largeHead);
194
195 CallRunner hugeCallTask = mock(CallRunner.class);
196 RpcServer.Call hugeCall = mock(RpcServer.Call.class);
197 RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build();
198 when(hugeCallTask.getCall()).thenReturn(hugeCall);
199 when(hugeCall.getHeader()).thenReturn(hugeHead);
200
201 when(priority.getDeadline(eq(smallHead), any(Message.class))).thenReturn(0L);
202 when(priority.getDeadline(eq(largeHead), any(Message.class))).thenReturn(50L);
203 when(priority.getDeadline(eq(hugeHead), any(Message.class))).thenReturn(100L);
204
205 final ArrayList<Integer> work = new ArrayList<Integer>();
206 doAnswerTaskExecution(smallCallTask, work, 10, 250);
207 doAnswerTaskExecution(largeCallTask, work, 50, 250);
208 doAnswerTaskExecution(hugeCallTask, work, 100, 250);
209
210 scheduler.dispatch(smallCallTask);
211 scheduler.dispatch(smallCallTask);
212 scheduler.dispatch(smallCallTask);
213 scheduler.dispatch(hugeCallTask);
214 scheduler.dispatch(smallCallTask);
215 scheduler.dispatch(largeCallTask);
216 scheduler.dispatch(smallCallTask);
217 scheduler.dispatch(smallCallTask);
218
219 while (work.size() < 8) {
220 Threads.sleepWithoutInterrupt(100);
221 }
222
223 int seqSum = 0;
224 int totalTime = 0;
225 for (int i = 0; i < work.size(); ++i) {
226 LOG.debug("Request i=" + i + " value=" + work.get(i));
227 seqSum += work.get(i);
228 totalTime += seqSum;
229 }
230 LOG.debug("Total Time: " + totalTime);
231
232
233
234
235 if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
236 assertEquals(530, totalTime);
237 } else if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) {
238 assertEquals(930, totalTime);
239 }
240 } finally {
241 scheduler.stop();
242 }
243 }
244
245 @Test
246 public void testScanQueueWithZeroScanRatio() throws Exception {
247 Configuration schedConf = HBaseConfiguration.create();
248 schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
249 schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f);
250 schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0f);
251
252 PriorityFunction priority = mock(PriorityFunction.class);
253
254 RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 2, 1, 1, priority,
255 HConstants.QOS_THRESHOLD);
256 assertNotEquals(scheduler, null);
257 }
258
259 @Test
260 public void testScanQueues() throws Exception {
261 Configuration schedConf = HBaseConfiguration.create();
262 schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
263 schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
264 schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
265
266 PriorityFunction priority = mock(PriorityFunction.class);
267 when(priority.getPriority(any(RequestHeader.class), any(Message.class)))
268 .thenReturn(HConstants.NORMAL_QOS);
269
270 RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 1, 1, priority,
271 HConstants.QOS_THRESHOLD);
272 try {
273 scheduler.start();
274
275 CallRunner putCallTask = mock(CallRunner.class);
276 RpcServer.Call putCall = mock(RpcServer.Call.class);
277 putCall.param = RequestConverter.buildMutateRequest(
278 Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
279 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
280 when(putCallTask.getCall()).thenReturn(putCall);
281 when(putCall.getHeader()).thenReturn(putHead);
282
283 CallRunner getCallTask = mock(CallRunner.class);
284 RpcServer.Call getCall = mock(RpcServer.Call.class);
285 RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
286 when(getCallTask.getCall()).thenReturn(getCall);
287 when(getCall.getHeader()).thenReturn(getHead);
288
289 CallRunner scanCallTask = mock(CallRunner.class);
290 RpcServer.Call scanCall = mock(RpcServer.Call.class);
291 scanCall.param = ScanRequest.newBuilder().setScannerId(1).build();
292 RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
293 when(scanCallTask.getCall()).thenReturn(scanCall);
294 when(scanCall.getHeader()).thenReturn(scanHead);
295
296 ArrayList<Integer> work = new ArrayList<Integer>();
297 doAnswerTaskExecution(putCallTask, work, 1, 1000);
298 doAnswerTaskExecution(getCallTask, work, 2, 1000);
299 doAnswerTaskExecution(scanCallTask, work, 3, 1000);
300
301
302
303 scheduler.dispatch(putCallTask);
304 scheduler.dispatch(putCallTask);
305 scheduler.dispatch(putCallTask);
306 scheduler.dispatch(getCallTask);
307 scheduler.dispatch(getCallTask);
308 scheduler.dispatch(getCallTask);
309 scheduler.dispatch(scanCallTask);
310 scheduler.dispatch(scanCallTask);
311 scheduler.dispatch(scanCallTask);
312
313 while (work.size() < 6) {
314 Threads.sleepWithoutInterrupt(100);
315 }
316
317 for (int i = 0; i < work.size() - 2; i += 3) {
318 assertNotEquals(work.get(i + 0), work.get(i + 1));
319 assertNotEquals(work.get(i + 0), work.get(i + 2));
320 assertNotEquals(work.get(i + 1), work.get(i + 2));
321 }
322 } finally {
323 scheduler.stop();
324 }
325 }
326
327 private void doAnswerTaskExecution(final CallRunner callTask,
328 final ArrayList<Integer> results, final int value, final int sleepInterval) {
329 callTask.setStatus(new MonitoredRPCHandlerImpl());
330 doAnswer(new Answer<Object>() {
331 @Override
332 public Object answer(InvocationOnMock invocation) {
333 synchronized (results) {
334 results.add(value);
335 }
336 Threads.sleepWithoutInterrupt(sleepInterval);
337 return null;
338 }
339 }).when(callTask).run();
340 }
341
342 @Test
343 public void testSoftAndHardQueueLimits() throws Exception {
344 Configuration schedConf = HBaseConfiguration.create();
345
346 schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 0);
347 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5);
348
349 PriorityFunction priority = mock(PriorityFunction.class);
350 when(priority.getPriority(any(RequestHeader.class), any(Message.class)
351 )).thenReturn(HConstants.NORMAL_QOS);
352 SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 0, 0, 0, priority,
353 HConstants.QOS_THRESHOLD);
354 try {
355 scheduler.start();
356
357 CallRunner putCallTask = mock(CallRunner.class);
358 RpcServer.Call putCall = mock(RpcServer.Call.class);
359 putCall.param = RequestConverter.buildMutateRequest(
360 Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
361 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
362 when(putCallTask.getCall()).thenReturn(putCall);
363 when(putCall.getHeader()).thenReturn(putHead);
364
365 assertTrue(scheduler.dispatch(putCallTask));
366
367 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0);
368 scheduler.onConfigurationChange(schedConf);
369 assertFalse(scheduler.dispatch(putCallTask));
370
371 schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1);
372 scheduler.onConfigurationChange(schedConf);
373 assertTrue(scheduler.dispatch(putCallTask));
374 } finally {
375 scheduler.stop();
376 }
377 }
378 private static final class CoDelEnvironmentEdge implements EnvironmentEdge {
379 private final BlockingQueue<Long> timeQ = new LinkedBlockingQueue<>();
380
381 private long offset;
382
383 private final Set<String> threadNamePrefixs = new HashSet<>();
384
385 @Override
386 public long currentTime() {
387 for (String threadNamePrefix : threadNamePrefixs) {
388 String threadName = Thread.currentThread().getName();
389 if (threadName.startsWith(threadNamePrefix)) {
390 return timeQ.poll().longValue() + offset;
391 }
392 }
393 return System.currentTimeMillis();
394 }
395 }
396
397 private static void waitUntilQueueEmpty(SimpleRpcScheduler scheduler)
398 throws InterruptedException {
399 while (scheduler.getGeneralQueueLength() > 0) {
400 Thread.sleep(100);
401 }
402 }
403
404 @Test
405 public void testCoDelScheduling() throws Exception {
406 CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge();
407 envEdge.threadNamePrefixs.add("RpcServer.CodelBQ.default.handler");
408 Configuration schedConf = HBaseConfiguration.create();
409 schedConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 250);
410
411 schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY,
412 SimpleRpcScheduler.CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
413
414 PriorityFunction priority = mock(PriorityFunction.class);
415 when(priority.getPriority(any(RPCProtos.RequestHeader.class), any(Message.class)))
416 .thenReturn(HConstants.NORMAL_QOS);
417 SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority,
418 HConstants.QOS_THRESHOLD);
419 try {
420 scheduler.start();
421 EnvironmentEdgeManager.injectEdge(envEdge);
422 envEdge.offset = 5;
423
424 for (int i = 0; i < 100; i++) {
425 long time = System.currentTimeMillis();
426 envEdge.timeQ.put(time);
427 CallRunner cr = getMockedCallRunner(time, 2);
428 scheduler.dispatch(cr);
429 }
430
431 waitUntilQueueEmpty(scheduler);
432 Thread.sleep(100);
433 assertEquals("None of these calls should have been discarded", 0,
434 scheduler.getNumGeneralCallsDropped());
435
436 envEdge.offset = 151;
437
438 for (int i = 0; i < 20; i++) {
439 long time = System.currentTimeMillis();
440 envEdge.timeQ.put(time);
441 CallRunner cr = getMockedCallRunner(time, 2);
442 scheduler.dispatch(cr);
443 }
444
445
446 waitUntilQueueEmpty(scheduler);
447 Thread.sleep(100);
448 assertEquals("None of these calls should have been discarded", 0,
449 scheduler.getNumGeneralCallsDropped());
450
451 envEdge.offset = 2000;
452
453 for (int i = 0; i < 60; i++) {
454 long time = System.currentTimeMillis();
455 envEdge.timeQ.put(time);
456 CallRunner cr = getMockedCallRunner(time, 100);
457 scheduler.dispatch(cr);
458 }
459
460
461 waitUntilQueueEmpty(scheduler);
462 Thread.sleep(100);
463 assertTrue(
464 "There should have been at least 12 calls dropped however there were "
465 + scheduler.getNumGeneralCallsDropped(),
466 scheduler.getNumGeneralCallsDropped() > 12);
467 } finally {
468 scheduler.stop();
469 }
470 }
471
472
473
474 private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException {
475 final RpcServer.Call putCall = mock(RpcServer.Call.class);
476
477 putCall.timestamp = timestamp;
478 putCall.param = RequestConverter.buildMutateRequest(
479 Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
480
481 RPCProtos.RequestHeader putHead = RPCProtos.RequestHeader.newBuilder()
482 .setMethodName("mutate")
483 .build();
484 when(putCall.getSize()).thenReturn(9L);
485 when(putCall.getHeader()).thenReturn(putHead);
486
487 CallRunner cr = new CallRunner(null, putCall) {
488 public void run() {
489 try {
490 LOG.warn("Sleeping for " + sleepTime);
491 Thread.sleep(sleepTime);
492 LOG.warn("Done Sleeping for " + sleepTime);
493 } catch (InterruptedException e) {
494 }
495 }
496 public Call getCall() {
497 return putCall;
498 }
499
500 public void drop() {}
501 };
502
503 return cr;
504 }
505 }