1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.ipc;
20
21 import static org.apache.hadoop.hbase.ipc.RpcClient.SPECIFIC_WRITE_THREAD;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertNotNull;
24 import static org.junit.Assert.assertTrue;
25
26 import java.io.IOException;
27 import java.net.InetSocketAddress;
28 import java.util.ArrayList;
29 import java.util.HashMap;
30 import java.util.List;
31 import java.util.Random;
32 import java.util.concurrent.Callable;
33 import java.util.concurrent.atomic.AtomicBoolean;
34 import java.util.concurrent.atomic.AtomicReference;
35 import java.util.concurrent.locks.ReadWriteLock;
36 import java.util.concurrent.locks.ReentrantReadWriteLock;
37
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.hbase.CellScanner;
42 import org.apache.hadoop.hbase.HBaseConfiguration;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.codec.Codec;
45 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
46 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
47 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
48 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
49 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
50 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
51 import org.apache.hadoop.hbase.security.User;
52 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
53 import org.apache.hadoop.hbase.util.Pair;
54 import org.apache.hadoop.hbase.util.Threads;
55 import org.junit.Ignore;
56 import org.junit.Test;
57 import org.junit.experimental.categories.Category;
58 import com.google.common.collect.Lists;
59 import com.google.protobuf.BlockingService;
60 import com.google.protobuf.Message;
61 import com.google.protobuf.RpcController;
62 import com.google.protobuf.ServiceException;
63 import com.google.protobuf.Descriptors.MethodDescriptor;
64
65 @Category(IntegrationTests.class)
66 public class IntegrationTestRpcClient {
67
68 private static final Log LOG = LogFactory.getLog(IntegrationTestRpcClient.class);
69
70 private final Configuration conf;
71
72 private int numIterations = 10;
73
74 public IntegrationTestRpcClient() {
75 conf = HBaseConfiguration.create();
76 }
77
78 static class TestRpcServer extends RpcServer {
79
80 TestRpcServer(Configuration conf) throws IOException {
81 this(new FifoRpcScheduler(conf, 1), conf);
82 }
83
84 TestRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException {
85 super(null, "testRpcServer", Lists
86 .newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress(
87 "localhost", 0), conf, scheduler);
88 }
89
90 @Override
91 public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
92 Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
93 throws IOException {
94 return super.call(service, md, param, cellScanner, receiveTime, status);
95 }
96 }
97
98 static final BlockingService SERVICE =
99 TestRpcServiceProtos.TestProtobufRpcProto
100 .newReflectiveBlockingService(new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
101
102 @Override
103 public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
104 throws ServiceException {
105 return null;
106 }
107
108 @Override
109 public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
110 throws ServiceException {
111 return null;
112 }
113
114 @Override
115 public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
116 throws ServiceException {
117 return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
118 }
119 });
120
121 protected AbstractRpcClient createRpcClient(Configuration conf, boolean isSyncClient) {
122 return isSyncClient ?
123 new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) :
124 new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null) {
125 @Override
126 Codec getCodec() {
127 return null;
128 }
129 };
130 }
131
132 static String BIG_PAYLOAD;
133
134 static {
135 StringBuilder builder = new StringBuilder();
136
137 while (builder.length() < 1024 * 1024) {
138 builder.append("big.payload.");
139 }
140
141 BIG_PAYLOAD = builder.toString();
142 }
143
144 class Cluster {
145 Random random = new Random();
146 ReadWriteLock lock = new ReentrantReadWriteLock();
147 HashMap<InetSocketAddress, TestRpcServer> rpcServers = new HashMap<>();
148 List<TestRpcServer> serverList = new ArrayList<>();
149 int maxServers;
150 int minServers;
151
152 Cluster(int minServers, int maxServers) {
153 this.minServers = minServers;
154 this.maxServers = maxServers;
155 }
156
157 TestRpcServer startServer() throws IOException {
158 lock.writeLock().lock();
159 try {
160 if (rpcServers.size() >= maxServers) {
161 return null;
162 }
163
164 TestRpcServer rpcServer = new TestRpcServer(conf);
165 rpcServer.start();
166 InetSocketAddress address = rpcServer.getListenerAddress();
167 if (address == null) {
168 throw new IOException("Listener channel is closed");
169 }
170 rpcServers.put(address, rpcServer);
171 serverList.add(rpcServer);
172 LOG.info("Started server: " + address);
173 return rpcServer;
174 } finally {
175 lock.writeLock().unlock();
176 }
177 }
178
179 void stopRandomServer() throws Exception {
180 lock.writeLock().lock();
181 TestRpcServer rpcServer = null;
182 try {
183 if (rpcServers.size() <= minServers) {
184 return;
185 }
186 int size = rpcServers.size();
187 int rand = random.nextInt(size);
188 rpcServer = serverList.remove(rand);
189 InetSocketAddress address = rpcServer.getListenerAddress();
190 if (address == null) {
191
192
193 throw new IOException("Listener channel is closed");
194 }
195 rpcServers.remove(address);
196
197 if (rpcServer != null) {
198 stopServer(rpcServer);
199 }
200 } finally {
201 lock.writeLock().unlock();
202 }
203 }
204
205 void stopServer(TestRpcServer rpcServer) throws InterruptedException {
206 InetSocketAddress address = rpcServer.getListenerAddress();
207 LOG.info("Stopping server: " + address);
208 rpcServer.stop();
209 rpcServer.join();
210 LOG.info("Stopped server: " + address);
211 }
212
213 void stopRunning() throws InterruptedException {
214 lock.writeLock().lock();
215 try {
216 for (TestRpcServer rpcServer : serverList) {
217 stopServer(rpcServer);
218 }
219
220 } finally {
221 lock.writeLock().unlock();
222 }
223 }
224
225 TestRpcServer getRandomServer() {
226 lock.readLock().lock();
227 try {
228 int size = rpcServers.size();
229 int rand = random.nextInt(size);
230 return serverList.get(rand);
231 } finally {
232 lock.readLock().unlock();
233 }
234 }
235 }
236
237 static class MiniChaosMonkey extends Thread {
238 AtomicBoolean running = new AtomicBoolean(true);
239 Random random = new Random();
240 AtomicReference<Exception> exception = new AtomicReference<>(null);
241 Cluster cluster;
242
243 public MiniChaosMonkey(Cluster cluster) {
244 this.cluster = cluster;
245 }
246
247 @Override
248 public void run() {
249 while (running.get()) {
250 switch (random.nextInt() % 2) {
251 case 0:
252 try {
253 cluster.startServer();
254 } catch (Exception e) {
255 LOG.warn(e);
256 exception.compareAndSet(null, e);
257 }
258 break;
259
260 case 1:
261 try {
262 cluster.stopRandomServer();
263 } catch (Exception e) {
264 LOG.warn(e);
265 exception.compareAndSet(null, e);
266 }
267 default:
268 }
269
270 Threads.sleep(100);
271 }
272 }
273
274 void stopRunning() {
275 running.set(false);
276 }
277
278 void rethrowException() throws Exception {
279 if (exception.get() != null) {
280 throw exception.get();
281 }
282 }
283 }
284
285 static class SimpleClient extends Thread {
286 AbstractRpcClient rpcClient;
287 AtomicBoolean running = new AtomicBoolean(true);
288 AtomicBoolean sending = new AtomicBoolean(false);
289 AtomicReference<Throwable> exception = new AtomicReference<>(null);
290 Cluster cluster;
291 String id;
292 long numCalls = 0;
293 Random random = new Random();
294
295 public SimpleClient(Cluster cluster, AbstractRpcClient rpcClient, String id) {
296 this.cluster = cluster;
297 this.rpcClient = rpcClient;
298 this.id = id;
299 this.setName(id);
300 }
301
302 @Override
303 public void run() {
304 MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
305
306 while (running.get()) {
307 boolean isBigPayload = random.nextBoolean();
308 String message = isBigPayload ? BIG_PAYLOAD : id + numCalls;
309 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
310 EchoResponseProto ret = EchoResponseProto.newBuilder().setMessage("foo").build();
311
312 TestRpcServer server = cluster.getRandomServer();
313 try {
314 User user = User.getCurrent();
315 InetSocketAddress address = server.getListenerAddress();
316 if (address == null) {
317 throw new IOException("Listener channel is closed");
318 }
319 sending.set(true);
320 ret = (EchoResponseProto)
321 rpcClient.callBlockingMethod(md, null, param, ret, user, address);
322 } catch (Exception e) {
323 LOG.warn(e);
324 continue;
325 }
326
327 try {
328 assertNotNull(ret);
329 assertEquals(message, ret.getMessage());
330 } catch (Throwable t) {
331 exception.compareAndSet(null, t);
332 }
333
334 numCalls++;
335 }
336 }
337
338 void stopRunning() {
339 running.set(false);
340 }
341 boolean isSending() {
342 return sending.get();
343 }
344
345 void rethrowException() throws Throwable {
346 if (exception.get() != null) {
347 throw exception.get();
348 }
349 }
350 }
351
352
353
354
355
356 @Test (timeout = 30000)
357 public void testRpcWithWriteThread() throws IOException, InterruptedException {
358 LOG.info("Starting test");
359 Cluster cluster = new Cluster(1, 1);
360 cluster.startServer();
361 conf.setBoolean(SPECIFIC_WRITE_THREAD, true);
362 for(int i = 0; i <1000; i++) {
363 AbstractRpcClient rpcClient = createRpcClient(conf, true);
364 SimpleClient client = new SimpleClient(cluster, rpcClient, "Client1");
365 client.start();
366 while(!client.isSending()) {
367 Thread.sleep(1);
368 }
369 client.stopRunning();
370 rpcClient.close();
371 }
372 }
373
374
375 @Test (timeout = 1800000)
376 public void testRpcWithChaosMonkeyWithSyncClient() throws Throwable {
377 for (int i = 0; i < numIterations; i++) {
378 TimeoutThread.runWithTimeout(new Callable<Void>() {
379 @Override
380 public Void call() throws Exception {
381 try {
382 testRpcWithChaosMonkey(true);
383 } catch (Throwable e) {
384 if (e instanceof Exception) {
385 throw (Exception)e;
386 } else {
387 throw new Exception(e);
388 }
389 }
390 return null;
391 }
392 }, 180000);
393 }
394 }
395
396 @Test (timeout = 900000)
397 @Ignore
398 public void testRpcWithChaosMonkeyWithAsyncClient() throws Throwable {
399 for (int i = 0; i < numIterations; i++) {
400 TimeoutThread.runWithTimeout(new Callable<Void>() {
401 @Override
402 public Void call() throws Exception {
403 try {
404 testRpcWithChaosMonkey(false);
405 } catch (Throwable e) {
406 if (e instanceof Exception) {
407 throw (Exception)e;
408 } else {
409 throw new Exception(e);
410 }
411 }
412 return null;
413 }
414 }, 90000);
415 }
416 }
417
418 static class TimeoutThread extends Thread {
419 long timeout;
420 public TimeoutThread(long timeout) {
421 this.timeout = timeout;
422 }
423
424 @Override
425 public void run() {
426 try {
427 Thread.sleep(timeout);
428 Threads.printThreadInfo(System.err, "TEST TIMEOUT STACK DUMP");
429 System.exit(1);
430 } catch (InterruptedException e) {
431
432 }
433 }
434
435
436
437 static void runWithTimeout(Callable<?> callable, long timeout) throws Exception {
438 TimeoutThread thread = new TimeoutThread(timeout);
439 thread.start();
440 callable.call();
441 thread.interrupt();
442 }
443 }
444
445 public void testRpcWithChaosMonkey(boolean isSyncClient) throws Throwable {
446 LOG.info("Starting test");
447 Cluster cluster = new Cluster(10, 100);
448 for (int i = 0; i < 10; i++) {
449 cluster.startServer();
450 }
451
452 ArrayList<SimpleClient> clients = new ArrayList<>();
453
454
455 AbstractRpcClient rpcClient = createRpcClient(conf, isSyncClient);
456
457 for (int i = 0; i < 30; i++) {
458 String clientId = "client_" + i + "_";
459 LOG.info("Starting client: " + clientId);
460 SimpleClient client = new SimpleClient(cluster, rpcClient, clientId);
461 client.start();
462 clients.add(client);
463 }
464
465 LOG.info("Starting MiniChaosMonkey");
466 MiniChaosMonkey cm = new MiniChaosMonkey(cluster);
467 cm.start();
468
469 Threads.sleep(30000);
470
471 LOG.info("Stopping MiniChaosMonkey");
472 cm.stopRunning();
473 cm.join();
474 cm.rethrowException();
475
476 LOG.info("Stopping clients");
477 for (SimpleClient client : clients) {
478 LOG.info("Stopping client: " + client.id);
479 LOG.info(client.id + " numCalls:" + client.numCalls);
480 client.stopRunning();
481 client.join();
482 client.rethrowException();
483 assertTrue(client.numCalls > 10);
484 }
485
486 LOG.info("Stopping RpcClient");
487 rpcClient.close();
488
489 LOG.info("Stopping Cluster");
490 cluster.stopRunning();
491 }
492 }