View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.ipc;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertTrue;
24  import static org.junit.Assert.fail;
25  
26  import java.io.IOException;
27  import java.net.InetSocketAddress;
28  import java.util.ArrayList;
29  import java.util.List;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.hbase.HBaseConfiguration;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.testclassification.MediumTests;
37  import org.apache.hadoop.hbase.ServerName;
38  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos;
39  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg;
40  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse;
41  import org.apache.hadoop.hbase.security.User;
42  import org.apache.log4j.AppenderSkeleton;
43  import org.apache.log4j.Level;
44  import org.apache.log4j.Logger;
45  import org.apache.log4j.spi.LoggingEvent;
46  import org.junit.Ignore;
47  import org.junit.Test;
48  import org.junit.experimental.categories.Category;
49  
50  import com.google.common.collect.Lists;
51  import com.google.protobuf.BlockingRpcChannel;
52  import com.google.protobuf.BlockingService;
53  import com.google.protobuf.RpcController;
54  import com.google.protobuf.ServiceException;
55  
56  /**
57   * Test that delayed RPCs work. Fire up three calls, the first of which should
58   * be delayed. Check that the last two, which are undelayed, return before the
59   * first one.
60   */
61  @Category(MediumTests.class) // Fails sometimes with small tests
62  @Ignore
63  public class TestDelayedRpc {
64    private static final Log LOG = LogFactory.getLog(TestDelayedRpc.class);
65    public static RpcServerInterface rpcServer;
66    public static final int UNDELAYED = 0;
67    public static final int DELAYED = 1;
68    private static final int RPC_CLIENT_TIMEOUT = 30000;
69  
70    @Test (timeout=60000)
71    public void testDelayedRpcImmediateReturnValue() throws Exception {
72      testDelayedRpc(false);
73    }
74  
75    @Test (timeout=60000)
76    public void testDelayedRpcDelayedReturnValue() throws Exception {
77      testDelayedRpc(true);
78    }
79  
80    private void testDelayedRpc(boolean delayReturnValue) throws Exception {
81      LOG.info("Running testDelayedRpc delayReturnValue=" + delayReturnValue);
82      Configuration conf = HBaseConfiguration.create();
83      InetSocketAddress isa = new InetSocketAddress("localhost", 0);
84      TestDelayedImplementation instance = new TestDelayedImplementation(delayReturnValue);
85      BlockingService service =
86        TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
87      rpcServer = new RpcServer(null, "testDelayedRpc",
88          Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
89          isa,
90          conf,
91          new FifoRpcScheduler(conf, 1));
92      rpcServer.start();
93      RpcClient rpcClient = RpcClientFactory.createClient(
94          conf, HConstants.DEFAULT_CLUSTER_ID.toString());
95      try {
96        InetSocketAddress address = rpcServer.getListenerAddress();
97        if (address == null) {
98          throw new IOException("Listener channel is closed");
99        }
100       BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
101           ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
102           User.getCurrent(), RPC_CLIENT_TIMEOUT);
103       TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
104         TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
105       List<Integer> results = new ArrayList<Integer>();
106       // Setting true sets 'delayed' on the client.
107       TestThread th1 = new TestThread(stub, true, results);
108       // Setting 'false' means we will return UNDELAYED as response immediately.
109       TestThread th2 = new TestThread(stub, false, results);
110       TestThread th3 = new TestThread(stub, false, results);
111       th1.start();
112       Thread.sleep(100);
113       th2.start();
114       Thread.sleep(200);
115       th3.start();
116 
117       th1.join();
118       th2.join();
119       th3.join();
120 
121       // We should get the two undelayed responses first.
122       assertEquals(UNDELAYED, results.get(0).intValue());
123       assertEquals(UNDELAYED, results.get(1).intValue());
124       assertEquals(results.get(2).intValue(), delayReturnValue ? DELAYED :  0xDEADBEEF);
125     } finally {
126       rpcClient.close();
127     }
128   }
129 
130   private static class ListAppender extends AppenderSkeleton {
131     private final List<String> messages = new ArrayList<String>();
132 
133     @Override
134     protected void append(LoggingEvent event) {
135       messages.add(event.getMessage().toString());
136     }
137 
138     @Override
139     public void close() {
140     }
141 
142     @Override
143     public boolean requiresLayout() {
144       return false;
145     }
146 
147     public List<String> getMessages() {
148       return messages;
149     }
150   }
151 
152   /**
153    * Tests that we see a WARN message in the logs.
154    * @throws Exception
155    */
156   @Test (timeout=60000)
157   public void testTooManyDelayedRpcs() throws Exception {
158     Configuration conf = HBaseConfiguration.create();
159     final int MAX_DELAYED_RPC = 10;
160     conf.setInt("hbase.ipc.warn.delayedrpc.number", MAX_DELAYED_RPC);
161     // Set up an appender to catch the "Too many delayed calls" that we expect.
162     ListAppender listAppender = new ListAppender();
163     Logger log = Logger.getLogger(RpcServer.class);
164     log.addAppender(listAppender);
165     log.setLevel(Level.WARN);
166 
167 
168     InetSocketAddress isa = new InetSocketAddress("localhost", 0);
169     TestDelayedImplementation instance = new TestDelayedImplementation(true);
170     BlockingService service =
171       TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
172     rpcServer = new RpcServer(null, "testTooManyDelayedRpcs",
173       Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
174         isa,
175         conf,
176         new FifoRpcScheduler(conf, 1));
177     rpcServer.start();
178     RpcClient rpcClient = RpcClientFactory.createClient(
179         conf, HConstants.DEFAULT_CLUSTER_ID.toString());
180     try {
181       InetSocketAddress address = rpcServer.getListenerAddress();
182       if (address == null) {
183         throw new IOException("Listener channel is closed");
184       }
185       BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
186           ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
187           User.getCurrent(), RPC_CLIENT_TIMEOUT);
188       TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
189         TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
190       Thread threads[] = new Thread[MAX_DELAYED_RPC + 1];
191       for (int i = 0; i < MAX_DELAYED_RPC; i++) {
192         threads[i] = new TestThread(stub, true, null);
193         threads[i].start();
194       }
195 
196       /* No warnings till here. */
197       assertTrue(listAppender.getMessages().isEmpty());
198 
199       /* This should give a warning. */
200       threads[MAX_DELAYED_RPC] = new TestThread(stub, true, null);
201       threads[MAX_DELAYED_RPC].start();
202 
203       for (int i = 0; i < MAX_DELAYED_RPC; i++) {
204         threads[i].join();
205       }
206 
207       assertFalse(listAppender.getMessages().isEmpty());
208       assertTrue(listAppender.getMessages().get(0).startsWith("Too many delayed calls"));
209 
210       log.removeAppender(listAppender);
211     } finally {
212       rpcClient.close();
213     }
214   }
215 
216   public static class TestDelayedImplementation
217   implements TestDelayedRpcProtos.TestDelayedService.BlockingInterface {
218     /**
219      * Should the return value of delayed call be set at the end of the delay
220      * or at call return.
221      */
222     private final boolean delayReturnValue;
223 
224     /**
225      * @param delayReturnValue Should the response to the delayed call be set
226      * at the start or the end of the delay.
227      */
228     public TestDelayedImplementation(boolean delayReturnValue) {
229       this.delayReturnValue = delayReturnValue;
230     }
231 
232     @Override
233     public TestResponse test(final RpcController rpcController, final TestArg testArg)
234     throws ServiceException {
235       boolean delay = testArg.getDelay();
236       TestResponse.Builder responseBuilder = TestResponse.newBuilder();
237       if (!delay) {
238         responseBuilder.setResponse(UNDELAYED);
239         return responseBuilder.build();
240       }
241       final Delayable call = RpcServer.getCurrentCall();
242       call.startDelay(delayReturnValue);
243       new Thread() {
244         @Override
245         public void run() {
246           try {
247             Thread.sleep(500);
248             TestResponse.Builder responseBuilder = TestResponse.newBuilder();
249             call.endDelay(delayReturnValue ?
250                 responseBuilder.setResponse(DELAYED).build() : null);
251           } catch (Exception e) {
252             e.printStackTrace();
253           }
254         }
255       }.start();
256       // This value should go back to client only if the response is set
257       // immediately at delay time.
258       responseBuilder.setResponse(0xDEADBEEF);
259       return responseBuilder.build();
260     }
261   }
262 
263   public static class TestThread extends Thread {
264     private final TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub;
265     private final boolean delay;
266     private final List<Integer> results;
267 
268     public TestThread(TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub,
269         boolean delay, List<Integer> results) {
270       this.stub = stub;
271       this.delay = delay;
272       this.results = results;
273     }
274 
275     @Override
276     public void run() {
277       Integer result;
278       try {
279         result = new Integer(stub.test(null, TestArg.newBuilder().setDelay(delay).build()).
280           getResponse());
281       } catch (ServiceException e) {
282         throw new RuntimeException(e);
283       }
284       if (results != null) {
285         synchronized (results) {
286           results.add(result);
287         }
288       }
289     }
290   }
291 
292   @Test
293   public void testEndDelayThrowing() throws IOException {
294     Configuration conf = HBaseConfiguration.create();
295     InetSocketAddress isa = new InetSocketAddress("localhost", 0);
296     FaultyTestDelayedImplementation instance = new FaultyTestDelayedImplementation();
297     BlockingService service =
298       TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
299     rpcServer = new RpcServer(null, "testEndDelayThrowing",
300         Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
301         isa,
302         conf,
303         new FifoRpcScheduler(conf, 1));
304     rpcServer.start();
305     RpcClient rpcClient = RpcClientFactory.createClient(
306         conf, HConstants.DEFAULT_CLUSTER_ID.toString());
307     try {
308       InetSocketAddress address = rpcServer.getListenerAddress();
309       if (address == null) {
310         throw new IOException("Listener channel is closed");
311       }
312       BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
313           ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
314         User.getCurrent(), 1000);
315       TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
316         TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
317 
318       int result = 0xDEADBEEF;
319 
320       try {
321         result = stub.test(null, TestArg.newBuilder().setDelay(false).build()).getResponse();
322       } catch (Exception e) {
323         fail("No exception should have been thrown.");
324       }
325       assertEquals(result, UNDELAYED);
326 
327       boolean caughtException = false;
328       try {
329         result = stub.test(null, TestArg.newBuilder().setDelay(true).build()).getResponse();
330       } catch(Exception e) {
331         // Exception thrown by server is enclosed in a RemoteException.
332         if (e.getCause().getMessage().contains("java.lang.Exception: Something went wrong")) {
333           caughtException = true;
334         }
335         LOG.warn("Caught exception, expected=" + caughtException);
336       }
337       assertTrue(caughtException);
338     } finally {
339       rpcClient.close();
340     }
341   }
342 
343   /**
344    * Delayed calls to this class throw an exception.
345    */
346   private static class FaultyTestDelayedImplementation extends TestDelayedImplementation {
347     public FaultyTestDelayedImplementation() {
348       super(false);
349     }
350 
351     @Override
352     public TestResponse test(RpcController rpcController, TestArg arg)
353     throws ServiceException {
354       LOG.info("In faulty test, delay=" + arg.getDelay());
355       if (!arg.getDelay()) return TestResponse.newBuilder().setResponse(UNDELAYED).build();
356       Delayable call = RpcServer.getCurrentCall();
357       call.startDelay(true);
358       LOG.info("In faulty test, delaying");
359       try {
360         call.endDelayThrowing(new Exception("Something went wrong"));
361       } catch (IOException e) {
362         e.printStackTrace();
363       }
364       // Client will receive the Exception, not this value.
365       return TestResponse.newBuilder().setResponse(DELAYED).build();
366     }
367   }
368 }