View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertFalse;
22  import static org.junit.Assert.assertNotEquals;
23  import static org.junit.Assert.assertTrue;
24  import static org.mockito.Matchers.any;
25  import static org.mockito.Matchers.anyObject;
26  import static org.mockito.Matchers.eq;
27  import static org.mockito.Mockito.doAnswer;
28  import static org.mockito.Mockito.mock;
29  import static org.mockito.Mockito.timeout;
30  import static org.mockito.Mockito.verify;
31  import static org.mockito.Mockito.when;
32  
33  import java.io.IOException;
34  import java.net.InetSocketAddress;
35  import java.util.ArrayList;
36  import java.util.HashSet;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.Set;
40  import java.util.concurrent.BlockingQueue;
41  import java.util.concurrent.CountDownLatch;
42  import java.util.concurrent.LinkedBlockingQueue;
43  
44  import org.apache.commons.logging.Log;
45  import org.apache.commons.logging.LogFactory;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.hbase.HBaseConfiguration;
48  import org.apache.hadoop.hbase.HConstants;
49  import org.apache.hadoop.hbase.client.Put;
50  import org.apache.hadoop.hbase.ipc.RpcServer.Call;
51  import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
52  import org.apache.hadoop.hbase.protobuf.RequestConverter;
53  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
54  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
55  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
56  import org.apache.hadoop.hbase.security.User;
57  import org.apache.hadoop.hbase.testclassification.SmallTests;
58  import org.apache.hadoop.hbase.util.Bytes;
59  import org.apache.hadoop.hbase.util.EnvironmentEdge;
60  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
61  import org.apache.hadoop.hbase.util.Threads;
62  import org.junit.Before;
63  import org.junit.Test;
64  import org.junit.experimental.categories.Category;
65  import org.mockito.invocation.InvocationOnMock;
66  import org.mockito.stubbing.Answer;
67  
68  import com.google.common.collect.ImmutableList;
69  import com.google.common.collect.ImmutableMap;
70  import com.google.common.collect.ImmutableSet;
71  import com.google.common.collect.Maps;
72  import com.google.protobuf.Message;
73  
74  @Category(SmallTests.class)
75  public class TestSimpleRpcScheduler {
76    public static final Log LOG = LogFactory.getLog(TestSimpleRpcScheduler.class);
77  
78    private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() {
79      @Override
80      public InetSocketAddress getListenerAddress() {
81        return InetSocketAddress.createUnresolved("127.0.0.1", 1000);
82      }
83    };
84    private Configuration conf;
85  
86    @Before
87    public void setUp() {
88      conf = HBaseConfiguration.create();
89    }
90  
91    @Test
92    public void testBasic() throws IOException, InterruptedException {
93      PriorityFunction qosFunction = mock(PriorityFunction.class);
94      RpcScheduler scheduler = new SimpleRpcScheduler(
95          conf, 10, 0, 0, qosFunction, 0);
96      scheduler.init(CONTEXT);
97      scheduler.start();
98      CallRunner task = createMockTask();
99      task.setStatus(new MonitoredRPCHandlerImpl());
100     scheduler.dispatch(task);
101     verify(task, timeout(1000)).run();
102     scheduler.stop();
103   }
104 
105   @Test
106   public void testHandlerIsolation() throws IOException, InterruptedException {
107     CallRunner generalTask = createMockTask();
108     CallRunner priorityTask = createMockTask();
109     CallRunner replicationTask = createMockTask();
110     List<CallRunner> tasks = ImmutableList.of(
111         generalTask,
112         priorityTask,
113         replicationTask);
114     Map<CallRunner, Integer> qos = ImmutableMap.of(
115         generalTask, 0,
116         priorityTask, HConstants.HIGH_QOS + 1,
117         replicationTask, HConstants.REPLICATION_QOS);
118     PriorityFunction qosFunction = mock(PriorityFunction.class);
119     final Map<CallRunner, Thread> handlerThreads = Maps.newHashMap();
120     final CountDownLatch countDownLatch = new CountDownLatch(tasks.size());
121     Answer<Void> answerToRun = new Answer<Void>() {
122       @Override
123       public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
124         synchronized (handlerThreads) {
125           handlerThreads.put(
126               (CallRunner) invocationOnMock.getMock(),
127               Thread.currentThread());
128         }
129         countDownLatch.countDown();
130         return null;
131       }
132     };
133     for (CallRunner task : tasks) {
134       task.setStatus(new MonitoredRPCHandlerImpl());
135       doAnswer(answerToRun).when(task).run();
136     }
137 
138     RpcScheduler scheduler = new SimpleRpcScheduler(
139         conf, 1, 1 ,1, qosFunction, HConstants.HIGH_QOS);
140     scheduler.init(CONTEXT);
141     scheduler.start();
142     for (CallRunner task : tasks) {
143       when(qosFunction.getPriority((RPCProtos.RequestHeader) anyObject(), (Message) anyObject()))
144           .thenReturn(qos.get(task));
145       scheduler.dispatch(task);
146     }
147     for (CallRunner task : tasks) {
148       verify(task, timeout(1000)).run();
149     }
150     scheduler.stop();
151 
152     // Tests that these requests are handled by three distinct threads.
153     countDownLatch.await();
154     assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size());
155   }
156 
157   private CallRunner createMockTask() {
158     Call call = mock(Call.class);
159     CallRunner task = mock(CallRunner.class);
160     when(task.getCall()).thenReturn(call);
161     return task;
162   }
163 
164   @Test
165   public void testRpcScheduler() throws Exception {
166     testRpcScheduler(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
167     testRpcScheduler(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
168   }
169 
170   private void testRpcScheduler(final String queueType) throws Exception {
171     Configuration schedConf = HBaseConfiguration.create();
172     schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY, queueType);
173 
174     PriorityFunction priority = mock(PriorityFunction.class);
175     when(priority.getPriority(any(RequestHeader.class), any(Message.class)))
176       .thenReturn(HConstants.NORMAL_QOS);
177 
178     RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority,
179                                                     HConstants.QOS_THRESHOLD);
180     try {
181       scheduler.start();
182 
183       CallRunner smallCallTask = mock(CallRunner.class);
184       RpcServer.Call smallCall = mock(RpcServer.Call.class);
185       RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build();
186       when(smallCallTask.getCall()).thenReturn(smallCall);
187       when(smallCall.getHeader()).thenReturn(smallHead);
188 
189       CallRunner largeCallTask = mock(CallRunner.class);
190       RpcServer.Call largeCall = mock(RpcServer.Call.class);
191       RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build();
192       when(largeCallTask.getCall()).thenReturn(largeCall);
193       when(largeCall.getHeader()).thenReturn(largeHead);
194 
195       CallRunner hugeCallTask = mock(CallRunner.class);
196       RpcServer.Call hugeCall = mock(RpcServer.Call.class);
197       RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build();
198       when(hugeCallTask.getCall()).thenReturn(hugeCall);
199       when(hugeCall.getHeader()).thenReturn(hugeHead);
200 
201       when(priority.getDeadline(eq(smallHead), any(Message.class))).thenReturn(0L);
202       when(priority.getDeadline(eq(largeHead), any(Message.class))).thenReturn(50L);
203       when(priority.getDeadline(eq(hugeHead), any(Message.class))).thenReturn(100L);
204 
205       final ArrayList<Integer> work = new ArrayList<Integer>();
206       doAnswerTaskExecution(smallCallTask, work, 10, 250);
207       doAnswerTaskExecution(largeCallTask, work, 50, 250);
208       doAnswerTaskExecution(hugeCallTask, work, 100, 250);
209 
210       scheduler.dispatch(smallCallTask);
211       scheduler.dispatch(smallCallTask);
212       scheduler.dispatch(smallCallTask);
213       scheduler.dispatch(hugeCallTask);
214       scheduler.dispatch(smallCallTask);
215       scheduler.dispatch(largeCallTask);
216       scheduler.dispatch(smallCallTask);
217       scheduler.dispatch(smallCallTask);
218 
219       while (work.size() < 8) {
220         Threads.sleepWithoutInterrupt(100);
221       }
222 
223       int seqSum = 0;
224       int totalTime = 0;
225       for (int i = 0; i < work.size(); ++i) {
226         LOG.debug("Request i=" + i + " value=" + work.get(i));
227         seqSum += work.get(i);
228         totalTime += seqSum;
229       }
230       LOG.debug("Total Time: " + totalTime);
231 
232       // -> [small small small huge small large small small]
233       // -> NO REORDER   [10 10 10 100 10 50 10 10] -> 930 (FIFO Queue)
234       // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue)
235       if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
236         assertEquals(530, totalTime);
237       } else if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) {
238         assertEquals(930, totalTime);
239       }
240     } finally {
241       scheduler.stop();
242     }
243   }
244 
245   @Test
246   public void testScanQueueWithZeroScanRatio() throws Exception {
247     Configuration schedConf = HBaseConfiguration.create();
248     schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
249     schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f);
250     schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0f);
251 
252     PriorityFunction priority = mock(PriorityFunction.class);
253 
254     RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 2, 1, 1, priority,
255                                                     HConstants.QOS_THRESHOLD);
256     assertNotEquals(scheduler, null);
257   }
258 
259   @Test
260   public void testScanQueues() throws Exception {
261     Configuration schedConf = HBaseConfiguration.create();
262     schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
263     schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
264     schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
265 
266     PriorityFunction priority = mock(PriorityFunction.class);
267     when(priority.getPriority(any(RequestHeader.class), any(Message.class)))
268       .thenReturn(HConstants.NORMAL_QOS);
269 
270     RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 1, 1, priority,
271                                                     HConstants.QOS_THRESHOLD);
272     try {
273       scheduler.start();
274 
275       CallRunner putCallTask = mock(CallRunner.class);
276       RpcServer.Call putCall = mock(RpcServer.Call.class);
277       putCall.param = RequestConverter.buildMutateRequest(
278           Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
279       RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
280       when(putCallTask.getCall()).thenReturn(putCall);
281       when(putCall.getHeader()).thenReturn(putHead);
282 
283       CallRunner getCallTask = mock(CallRunner.class);
284       RpcServer.Call getCall = mock(RpcServer.Call.class);
285       RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
286       when(getCallTask.getCall()).thenReturn(getCall);
287       when(getCall.getHeader()).thenReturn(getHead);
288 
289       CallRunner scanCallTask = mock(CallRunner.class);
290       RpcServer.Call scanCall = mock(RpcServer.Call.class);
291       scanCall.param = ScanRequest.newBuilder().setScannerId(1).build();
292       RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
293       when(scanCallTask.getCall()).thenReturn(scanCall);
294       when(scanCall.getHeader()).thenReturn(scanHead);
295 
296       ArrayList<Integer> work = new ArrayList<Integer>();
297       doAnswerTaskExecution(putCallTask, work, 1, 1000);
298       doAnswerTaskExecution(getCallTask, work, 2, 1000);
299       doAnswerTaskExecution(scanCallTask, work, 3, 1000);
300 
301       // There are 3 queues: [puts], [gets], [scans]
302       // so the calls will be interleaved
303       scheduler.dispatch(putCallTask);
304       scheduler.dispatch(putCallTask);
305       scheduler.dispatch(putCallTask);
306       scheduler.dispatch(getCallTask);
307       scheduler.dispatch(getCallTask);
308       scheduler.dispatch(getCallTask);
309       scheduler.dispatch(scanCallTask);
310       scheduler.dispatch(scanCallTask);
311       scheduler.dispatch(scanCallTask);
312 
313       while (work.size() < 6) {
314         Threads.sleepWithoutInterrupt(100);
315       }
316 
317       for (int i = 0; i < work.size() - 2; i += 3) {
318         assertNotEquals(work.get(i + 0), work.get(i + 1));
319         assertNotEquals(work.get(i + 0), work.get(i + 2));
320         assertNotEquals(work.get(i + 1), work.get(i + 2));
321       }
322     } finally {
323       scheduler.stop();
324     }
325   }
326 
327   private void doAnswerTaskExecution(final CallRunner callTask,
328       final ArrayList<Integer> results, final int value, final int sleepInterval) {
329     callTask.setStatus(new MonitoredRPCHandlerImpl());
330     doAnswer(new Answer<Object>() {
331       @Override
332       public Object answer(InvocationOnMock invocation) {
333         synchronized (results) {
334           results.add(value);
335         }
336         Threads.sleepWithoutInterrupt(sleepInterval);
337         return null;
338       }
339     }).when(callTask).run();
340   }
341 
342   @Test
343   public void testSoftAndHardQueueLimits() throws Exception {
344     Configuration schedConf = HBaseConfiguration.create();
345 
346     schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 0);
347     schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5);
348 
349     PriorityFunction priority = mock(PriorityFunction.class);
350     when(priority.getPriority(any(RequestHeader.class), any(Message.class)
351       )).thenReturn(HConstants.NORMAL_QOS);
352     SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 0, 0, 0, priority,
353       HConstants.QOS_THRESHOLD);
354     try {
355       scheduler.start();
356 
357       CallRunner putCallTask = mock(CallRunner.class);
358       RpcServer.Call putCall = mock(RpcServer.Call.class);
359       putCall.param = RequestConverter.buildMutateRequest(
360         Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
361       RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
362       when(putCallTask.getCall()).thenReturn(putCall);
363       when(putCall.getHeader()).thenReturn(putHead);
364 
365       assertTrue(scheduler.dispatch(putCallTask));
366 
367       schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0);
368       scheduler.onConfigurationChange(schedConf);
369       assertFalse(scheduler.dispatch(putCallTask));
370 
371       schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1);
372       scheduler.onConfigurationChange(schedConf);
373       assertTrue(scheduler.dispatch(putCallTask));
374     } finally {
375       scheduler.stop();
376     }
377   }
378   private static final class CoDelEnvironmentEdge implements EnvironmentEdge {
379     private final BlockingQueue<Long> timeQ = new LinkedBlockingQueue<>();
380 
381     private long offset;
382 
383     private final Set<String> threadNamePrefixs = new HashSet<>();
384 
385     @Override
386     public long currentTime() {
387       for (String threadNamePrefix : threadNamePrefixs) {
388         String threadName = Thread.currentThread().getName();
389         if (threadName.startsWith(threadNamePrefix)) {
390           return timeQ.poll().longValue() + offset;
391         }
392       }
393       return System.currentTimeMillis();
394     }
395   }
396 
397   private static void waitUntilQueueEmpty(SimpleRpcScheduler scheduler)
398       throws InterruptedException {
399     while (scheduler.getGeneralQueueLength() > 0) {
400       Thread.sleep(100);
401     }
402   }
403 
404   @Test
405   public void testCoDelScheduling() throws Exception {
406     CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge();
407     envEdge.threadNamePrefixs.add("RpcServer.CodelBQ.default.handler");
408     Configuration schedConf = HBaseConfiguration.create();
409     schedConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 250);
410 
411     schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY,
412       SimpleRpcScheduler.CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
413 
414     PriorityFunction priority = mock(PriorityFunction.class);
415     when(priority.getPriority(any(RPCProtos.RequestHeader.class), any(Message.class)))
416     .thenReturn(HConstants.NORMAL_QOS);
417     SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority,
418       HConstants.QOS_THRESHOLD);
419     try {
420       scheduler.start();
421       EnvironmentEdgeManager.injectEdge(envEdge);
422       envEdge.offset = 5;
423       // calls faster than min delay
424       for (int i = 0; i < 100; i++) {
425         long time = System.currentTimeMillis();
426         envEdge.timeQ.put(time);
427         CallRunner cr = getMockedCallRunner(time, 2);
428         scheduler.dispatch(cr);
429       }
430       // make sure fast calls are handled
431       waitUntilQueueEmpty(scheduler);
432       Thread.sleep(100);
433       assertEquals("None of these calls should have been discarded", 0,
434         scheduler.getNumGeneralCallsDropped());
435 
436       envEdge.offset = 151;
437       // calls slower than min delay, but not individually slow enough to be dropped
438       for (int i = 0; i < 20; i++) {
439         long time = System.currentTimeMillis();
440         envEdge.timeQ.put(time);
441         CallRunner cr = getMockedCallRunner(time, 2);
442         scheduler.dispatch(cr);
443       }
444 
445       // make sure somewhat slow calls are handled
446       waitUntilQueueEmpty(scheduler);
447       Thread.sleep(100);
448       assertEquals("None of these calls should have been discarded", 0,
449         scheduler.getNumGeneralCallsDropped());
450 
451       envEdge.offset = 2000;
452       // now slow calls and the ones to be dropped
453       for (int i = 0; i < 60; i++) {
454         long time = System.currentTimeMillis();
455         envEdge.timeQ.put(time);
456         CallRunner cr = getMockedCallRunner(time, 100);
457         scheduler.dispatch(cr);
458       }
459 
460       // make sure somewhat slow calls are handled
461       waitUntilQueueEmpty(scheduler);
462       Thread.sleep(100);
463       assertTrue(
464           "There should have been at least 12 calls dropped however there were "
465               + scheduler.getNumGeneralCallsDropped(),
466           scheduler.getNumGeneralCallsDropped() > 12);
467     } finally {
468       scheduler.stop();
469     }
470   }
471 
472   // Get mocked call that has the CallRunner sleep for a while so that the fast
473   // path isn't hit.
474   private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException {
475     final RpcServer.Call putCall = mock(RpcServer.Call.class);
476 
477     putCall.timestamp = timestamp;
478     putCall.param = RequestConverter.buildMutateRequest(
479         Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
480 
481     RPCProtos.RequestHeader putHead = RPCProtos.RequestHeader.newBuilder()
482                                                              .setMethodName("mutate")
483                                                              .build();
484     when(putCall.getSize()).thenReturn(9L);
485     when(putCall.getHeader()).thenReturn(putHead);
486 
487     CallRunner cr = new CallRunner(null, putCall) {
488       public void run() {
489         try {
490           LOG.warn("Sleeping for " + sleepTime);
491           Thread.sleep(sleepTime);
492           LOG.warn("Done Sleeping for " + sleepTime);
493         } catch (InterruptedException e) {
494         }
495       }
496       public Call getCall() {
497         return putCall;
498       }
499 
500       public void drop() {}
501     };
502 
503     return cr;
504   }
505 }