1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.ipc;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertTrue;
24 import static org.junit.Assert.fail;
25
26 import java.io.IOException;
27 import java.net.InetSocketAddress;
28 import java.util.ArrayList;
29 import java.util.List;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.hbase.HBaseConfiguration;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.testclassification.MediumTests;
37 import org.apache.hadoop.hbase.ServerName;
38 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos;
39 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg;
40 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse;
41 import org.apache.hadoop.hbase.security.User;
42 import org.apache.log4j.AppenderSkeleton;
43 import org.apache.log4j.Level;
44 import org.apache.log4j.Logger;
45 import org.apache.log4j.spi.LoggingEvent;
46 import org.junit.Ignore;
47 import org.junit.Test;
48 import org.junit.experimental.categories.Category;
49
50 import com.google.common.collect.Lists;
51 import com.google.protobuf.BlockingRpcChannel;
52 import com.google.protobuf.BlockingService;
53 import com.google.protobuf.RpcController;
54 import com.google.protobuf.ServiceException;
55
56
57
58
59
60
61 @Category(MediumTests.class)
62 @Ignore
63 public class TestDelayedRpc {
64 private static final Log LOG = LogFactory.getLog(TestDelayedRpc.class);
65 public static RpcServerInterface rpcServer;
66 public static final int UNDELAYED = 0;
67 public static final int DELAYED = 1;
68 private static final int RPC_CLIENT_TIMEOUT = 30000;
69
70 @Test (timeout=60000)
71 public void testDelayedRpcImmediateReturnValue() throws Exception {
72 testDelayedRpc(false);
73 }
74
75 @Test (timeout=60000)
76 public void testDelayedRpcDelayedReturnValue() throws Exception {
77 testDelayedRpc(true);
78 }
79
80 private void testDelayedRpc(boolean delayReturnValue) throws Exception {
81 LOG.info("Running testDelayedRpc delayReturnValue=" + delayReturnValue);
82 Configuration conf = HBaseConfiguration.create();
83 InetSocketAddress isa = new InetSocketAddress("localhost", 0);
84 TestDelayedImplementation instance = new TestDelayedImplementation(delayReturnValue);
85 BlockingService service =
86 TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
87 rpcServer = new RpcServer(null, "testDelayedRpc",
88 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
89 isa,
90 conf,
91 new FifoRpcScheduler(conf, 1));
92 rpcServer.start();
93 RpcClient rpcClient = RpcClientFactory.createClient(
94 conf, HConstants.DEFAULT_CLUSTER_ID.toString());
95 try {
96 InetSocketAddress address = rpcServer.getListenerAddress();
97 if (address == null) {
98 throw new IOException("Listener channel is closed");
99 }
100 BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
101 ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
102 User.getCurrent(), RPC_CLIENT_TIMEOUT);
103 TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
104 TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
105 List<Integer> results = new ArrayList<Integer>();
106
107 TestThread th1 = new TestThread(stub, true, results);
108
109 TestThread th2 = new TestThread(stub, false, results);
110 TestThread th3 = new TestThread(stub, false, results);
111 th1.start();
112 Thread.sleep(100);
113 th2.start();
114 Thread.sleep(200);
115 th3.start();
116
117 th1.join();
118 th2.join();
119 th3.join();
120
121
122 assertEquals(UNDELAYED, results.get(0).intValue());
123 assertEquals(UNDELAYED, results.get(1).intValue());
124 assertEquals(results.get(2).intValue(), delayReturnValue ? DELAYED : 0xDEADBEEF);
125 } finally {
126 rpcClient.close();
127 }
128 }
129
130 private static class ListAppender extends AppenderSkeleton {
131 private final List<String> messages = new ArrayList<String>();
132
133 @Override
134 protected void append(LoggingEvent event) {
135 messages.add(event.getMessage().toString());
136 }
137
138 @Override
139 public void close() {
140 }
141
142 @Override
143 public boolean requiresLayout() {
144 return false;
145 }
146
147 public List<String> getMessages() {
148 return messages;
149 }
150 }
151
152
153
154
155
156 @Test (timeout=60000)
157 public void testTooManyDelayedRpcs() throws Exception {
158 Configuration conf = HBaseConfiguration.create();
159 final int MAX_DELAYED_RPC = 10;
160 conf.setInt("hbase.ipc.warn.delayedrpc.number", MAX_DELAYED_RPC);
161
162 ListAppender listAppender = new ListAppender();
163 Logger log = Logger.getLogger(RpcServer.class);
164 log.addAppender(listAppender);
165 log.setLevel(Level.WARN);
166
167
168 InetSocketAddress isa = new InetSocketAddress("localhost", 0);
169 TestDelayedImplementation instance = new TestDelayedImplementation(true);
170 BlockingService service =
171 TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
172 rpcServer = new RpcServer(null, "testTooManyDelayedRpcs",
173 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
174 isa,
175 conf,
176 new FifoRpcScheduler(conf, 1));
177 rpcServer.start();
178 RpcClient rpcClient = RpcClientFactory.createClient(
179 conf, HConstants.DEFAULT_CLUSTER_ID.toString());
180 try {
181 InetSocketAddress address = rpcServer.getListenerAddress();
182 if (address == null) {
183 throw new IOException("Listener channel is closed");
184 }
185 BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
186 ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
187 User.getCurrent(), RPC_CLIENT_TIMEOUT);
188 TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
189 TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
190 Thread threads[] = new Thread[MAX_DELAYED_RPC + 1];
191 for (int i = 0; i < MAX_DELAYED_RPC; i++) {
192 threads[i] = new TestThread(stub, true, null);
193 threads[i].start();
194 }
195
196
197 assertTrue(listAppender.getMessages().isEmpty());
198
199
200 threads[MAX_DELAYED_RPC] = new TestThread(stub, true, null);
201 threads[MAX_DELAYED_RPC].start();
202
203 for (int i = 0; i < MAX_DELAYED_RPC; i++) {
204 threads[i].join();
205 }
206
207 assertFalse(listAppender.getMessages().isEmpty());
208 assertTrue(listAppender.getMessages().get(0).startsWith("Too many delayed calls"));
209
210 log.removeAppender(listAppender);
211 } finally {
212 rpcClient.close();
213 }
214 }
215
216 public static class TestDelayedImplementation
217 implements TestDelayedRpcProtos.TestDelayedService.BlockingInterface {
218
219
220
221
222 private final boolean delayReturnValue;
223
224
225
226
227
228 public TestDelayedImplementation(boolean delayReturnValue) {
229 this.delayReturnValue = delayReturnValue;
230 }
231
232 @Override
233 public TestResponse test(final RpcController rpcController, final TestArg testArg)
234 throws ServiceException {
235 boolean delay = testArg.getDelay();
236 TestResponse.Builder responseBuilder = TestResponse.newBuilder();
237 if (!delay) {
238 responseBuilder.setResponse(UNDELAYED);
239 return responseBuilder.build();
240 }
241 final Delayable call = RpcServer.getCurrentCall();
242 call.startDelay(delayReturnValue);
243 new Thread() {
244 @Override
245 public void run() {
246 try {
247 Thread.sleep(500);
248 TestResponse.Builder responseBuilder = TestResponse.newBuilder();
249 call.endDelay(delayReturnValue ?
250 responseBuilder.setResponse(DELAYED).build() : null);
251 } catch (Exception e) {
252 e.printStackTrace();
253 }
254 }
255 }.start();
256
257
258 responseBuilder.setResponse(0xDEADBEEF);
259 return responseBuilder.build();
260 }
261 }
262
263 public static class TestThread extends Thread {
264 private final TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub;
265 private final boolean delay;
266 private final List<Integer> results;
267
268 public TestThread(TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub,
269 boolean delay, List<Integer> results) {
270 this.stub = stub;
271 this.delay = delay;
272 this.results = results;
273 }
274
275 @Override
276 public void run() {
277 Integer result;
278 try {
279 result = new Integer(stub.test(null, TestArg.newBuilder().setDelay(delay).build()).
280 getResponse());
281 } catch (ServiceException e) {
282 throw new RuntimeException(e);
283 }
284 if (results != null) {
285 synchronized (results) {
286 results.add(result);
287 }
288 }
289 }
290 }
291
292 @Test
293 public void testEndDelayThrowing() throws IOException {
294 Configuration conf = HBaseConfiguration.create();
295 InetSocketAddress isa = new InetSocketAddress("localhost", 0);
296 FaultyTestDelayedImplementation instance = new FaultyTestDelayedImplementation();
297 BlockingService service =
298 TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
299 rpcServer = new RpcServer(null, "testEndDelayThrowing",
300 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
301 isa,
302 conf,
303 new FifoRpcScheduler(conf, 1));
304 rpcServer.start();
305 RpcClient rpcClient = RpcClientFactory.createClient(
306 conf, HConstants.DEFAULT_CLUSTER_ID.toString());
307 try {
308 InetSocketAddress address = rpcServer.getListenerAddress();
309 if (address == null) {
310 throw new IOException("Listener channel is closed");
311 }
312 BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
313 ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
314 User.getCurrent(), 1000);
315 TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
316 TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
317
318 int result = 0xDEADBEEF;
319
320 try {
321 result = stub.test(null, TestArg.newBuilder().setDelay(false).build()).getResponse();
322 } catch (Exception e) {
323 fail("No exception should have been thrown.");
324 }
325 assertEquals(result, UNDELAYED);
326
327 boolean caughtException = false;
328 try {
329 result = stub.test(null, TestArg.newBuilder().setDelay(true).build()).getResponse();
330 } catch(Exception e) {
331
332 if (e.getCause().getMessage().contains("java.lang.Exception: Something went wrong")) {
333 caughtException = true;
334 }
335 LOG.warn("Caught exception, expected=" + caughtException);
336 }
337 assertTrue(caughtException);
338 } finally {
339 rpcClient.close();
340 }
341 }
342
343
344
345
346 private static class FaultyTestDelayedImplementation extends TestDelayedImplementation {
347 public FaultyTestDelayedImplementation() {
348 super(false);
349 }
350
351 @Override
352 public TestResponse test(RpcController rpcController, TestArg arg)
353 throws ServiceException {
354 LOG.info("In faulty test, delay=" + arg.getDelay());
355 if (!arg.getDelay()) return TestResponse.newBuilder().setResponse(UNDELAYED).build();
356 Delayable call = RpcServer.getCurrentCall();
357 call.startDelay(true);
358 LOG.info("In faulty test, delaying");
359 try {
360 call.endDelayThrowing(new Exception("Something went wrong"));
361 } catch (IOException e) {
362 e.printStackTrace();
363 }
364
365 return TestResponse.newBuilder().setResponse(DELAYED).build();
366 }
367 }
368 }