View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.ipc;
20  
21  import static org.apache.hadoop.hbase.ipc.RpcClient.SPECIFIC_WRITE_THREAD;
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertNotNull;
24  import static org.junit.Assert.assertTrue;
25  
26  import java.io.IOException;
27  import java.net.InetSocketAddress;
28  import java.util.ArrayList;
29  import java.util.HashMap;
30  import java.util.List;
31  import java.util.Random;
32  import java.util.concurrent.Callable;
33  import java.util.concurrent.atomic.AtomicBoolean;
34  import java.util.concurrent.atomic.AtomicReference;
35  import java.util.concurrent.locks.ReadWriteLock;
36  import java.util.concurrent.locks.ReentrantReadWriteLock;
37  
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.hbase.CellScanner;
42  import org.apache.hadoop.hbase.HBaseConfiguration;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.codec.Codec;
45  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
46  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
47  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
48  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
49  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
50  import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
51  import org.apache.hadoop.hbase.security.User;
52  import org.apache.hadoop.hbase.testclassification.IntegrationTests;
53  import org.apache.hadoop.hbase.util.Pair;
54  import org.apache.hadoop.hbase.util.Threads;
55  import org.junit.Ignore;
56  import org.junit.Test;
57  import org.junit.experimental.categories.Category;
58  import com.google.common.collect.Lists;
59  import com.google.protobuf.BlockingService;
60  import com.google.protobuf.Message;
61  import com.google.protobuf.RpcController;
62  import com.google.protobuf.ServiceException;
63  import com.google.protobuf.Descriptors.MethodDescriptor;
64  
65  @Category(IntegrationTests.class)
66  public class IntegrationTestRpcClient {
67  
68    private static final Log LOG = LogFactory.getLog(IntegrationTestRpcClient.class);
69  
70    private final Configuration conf;
71  
72    private int numIterations = 10;
73  
74    public IntegrationTestRpcClient() {
75      conf = HBaseConfiguration.create();
76    }
77  
78    static class TestRpcServer extends RpcServer {
79  
80      TestRpcServer(Configuration conf) throws IOException {
81        this(new FifoRpcScheduler(conf, 1), conf);
82      }
83  
84      TestRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException {
85        super(null, "testRpcServer", Lists
86            .newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress(
87            "localhost", 0), conf, scheduler);
88      }
89  
90      @Override
91      public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
92          Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
93          throws IOException {
94        return super.call(service, md, param, cellScanner, receiveTime, status);
95      }
96    }
97  
98    static final BlockingService SERVICE =
99        TestRpcServiceProtos.TestProtobufRpcProto
100       .newReflectiveBlockingService(new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
101 
102         @Override
103         public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
104             throws ServiceException {
105           return null;
106         }
107 
108         @Override
109         public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
110             throws ServiceException {
111           return null;
112         }
113 
114         @Override
115         public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
116             throws ServiceException {
117           return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
118         }
119       });
120 
121   protected AbstractRpcClient createRpcClient(Configuration conf, boolean isSyncClient) {
122     return isSyncClient ?
123         new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) :
124           new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null) {
125           @Override
126           Codec getCodec() {
127             return null;
128           }
129         };
130   }
131 
132   static String BIG_PAYLOAD;
133 
134   static {
135     StringBuilder builder = new StringBuilder();
136 
137     while (builder.length() < 1024 * 1024) { // 2 MB
138       builder.append("big.payload.");
139     }
140 
141     BIG_PAYLOAD = builder.toString();
142   }
143 
144   class Cluster {
145     Random random = new Random();
146     ReadWriteLock lock = new ReentrantReadWriteLock();
147     HashMap<InetSocketAddress, TestRpcServer> rpcServers = new HashMap<>();
148     List<TestRpcServer> serverList = new ArrayList<>();
149     int maxServers;
150     int minServers;
151 
152     Cluster(int minServers, int maxServers) {
153       this.minServers = minServers;
154       this.maxServers = maxServers;
155     }
156 
157     TestRpcServer startServer() throws IOException {
158       lock.writeLock().lock();
159       try {
160         if (rpcServers.size() >= maxServers) {
161           return null;
162         }
163 
164         TestRpcServer rpcServer = new TestRpcServer(conf);
165         rpcServer.start();
166         InetSocketAddress address = rpcServer.getListenerAddress();
167         if (address == null) {
168           throw new IOException("Listener channel is closed");
169         }
170         rpcServers.put(address, rpcServer);
171         serverList.add(rpcServer);
172         LOG.info("Started server: " + address);
173         return rpcServer;
174       } finally {
175         lock.writeLock().unlock();
176       }
177     }
178 
179     void stopRandomServer() throws Exception {
180       lock.writeLock().lock();
181       TestRpcServer rpcServer = null;
182       try {
183         if (rpcServers.size() <= minServers) {
184           return;
185         }
186         int size = rpcServers.size();
187         int rand = random.nextInt(size);
188         rpcServer = serverList.remove(rand);
189         InetSocketAddress address = rpcServer.getListenerAddress();
190         if (address == null) {
191           // Throw exception here. We can't remove this instance from the server map because
192           // we no longer have access to its map key
193           throw new IOException("Listener channel is closed");
194         }
195         rpcServers.remove(address);
196 
197         if (rpcServer != null) {
198           stopServer(rpcServer);
199         }
200       } finally {
201         lock.writeLock().unlock();
202       }
203     }
204 
205     void stopServer(TestRpcServer rpcServer) throws InterruptedException {
206       InetSocketAddress address = rpcServer.getListenerAddress();
207       LOG.info("Stopping server: " + address);
208       rpcServer.stop();
209       rpcServer.join();
210       LOG.info("Stopped server: " + address);
211     }
212 
213     void stopRunning() throws InterruptedException {
214       lock.writeLock().lock();
215       try {
216         for (TestRpcServer rpcServer : serverList) {
217           stopServer(rpcServer);
218         }
219 
220       } finally {
221         lock.writeLock().unlock();
222       }
223     }
224 
225     TestRpcServer getRandomServer() {
226       lock.readLock().lock();
227       try {
228         int size = rpcServers.size();
229         int rand = random.nextInt(size);
230         return serverList.get(rand);
231       } finally {
232         lock.readLock().unlock();
233       }
234     }
235   }
236 
237   static class MiniChaosMonkey extends Thread {
238     AtomicBoolean running = new  AtomicBoolean(true);
239     Random random = new Random();
240     AtomicReference<Exception> exception = new AtomicReference<>(null);
241     Cluster cluster;
242 
243     public MiniChaosMonkey(Cluster cluster) {
244       this.cluster = cluster;
245     }
246 
247     @Override
248     public void run() {
249       while (running.get()) {
250         switch (random.nextInt() % 2) {
251         case 0: //start a server
252           try {
253             cluster.startServer();
254           } catch (Exception e) {
255             LOG.warn(e);
256             exception.compareAndSet(null, e);
257           }
258           break;
259 
260         case 1: // stop a server
261           try {
262             cluster.stopRandomServer();
263           } catch (Exception e) {
264             LOG.warn(e);
265             exception.compareAndSet(null, e);
266           }
267         default:
268         }
269 
270         Threads.sleep(100);
271       }
272     }
273 
274     void stopRunning() {
275       running.set(false);
276     }
277 
278     void rethrowException() throws Exception {
279       if (exception.get() != null) {
280         throw exception.get();
281       }
282     }
283   }
284 
285   static class SimpleClient extends Thread {
286     AbstractRpcClient rpcClient;
287     AtomicBoolean running = new  AtomicBoolean(true);
288     AtomicBoolean sending = new AtomicBoolean(false);
289     AtomicReference<Throwable> exception = new AtomicReference<>(null);
290     Cluster cluster;
291     String id;
292     long numCalls = 0;
293     Random random = new Random();
294 
295     public SimpleClient(Cluster cluster, AbstractRpcClient rpcClient, String id) {
296       this.cluster = cluster;
297       this.rpcClient = rpcClient;
298       this.id = id;
299       this.setName(id);
300     }
301 
302     @Override
303     public void run() {
304       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
305 
306       while (running.get()) {
307         boolean isBigPayload = random.nextBoolean();
308         String message = isBigPayload ? BIG_PAYLOAD : id + numCalls;
309         EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
310         EchoResponseProto ret = EchoResponseProto.newBuilder().setMessage("foo").build();
311 
312         TestRpcServer server = cluster.getRandomServer();
313         try {
314           User user = User.getCurrent();
315           InetSocketAddress address = server.getListenerAddress();
316           if (address == null) {
317             throw new IOException("Listener channel is closed");
318           }
319           sending.set(true);
320           ret = (EchoResponseProto)
321               rpcClient.callBlockingMethod(md, null, param, ret, user, address);
322         } catch (Exception e) {
323           LOG.warn(e);
324           continue; // expected in case connection is closing or closed
325         }
326 
327         try {
328           assertNotNull(ret);
329           assertEquals(message, ret.getMessage());
330         } catch (Throwable t) {
331           exception.compareAndSet(null, t);
332         }
333 
334         numCalls++;
335       }
336     }
337 
338     void stopRunning() {
339       running.set(false);
340     }
341     boolean isSending() {
342       return sending.get();
343     }
344 
345     void rethrowException() throws Throwable {
346       if (exception.get() != null) {
347         throw exception.get();
348       }
349     }
350   }
351 
352   /*
353   Test that not started connections are successfully removed from connection pool when
354   rpc client is closing.
355    */
356   @Test (timeout = 30000)
357   public void testRpcWithWriteThread() throws IOException, InterruptedException {
358     LOG.info("Starting test");
359     Cluster cluster = new Cluster(1, 1);
360     cluster.startServer();
361     conf.setBoolean(SPECIFIC_WRITE_THREAD, true);
362     for(int i = 0; i <1000; i++) {
363       AbstractRpcClient rpcClient = createRpcClient(conf, true);
364       SimpleClient client = new SimpleClient(cluster, rpcClient, "Client1");
365       client.start();
366       while(!client.isSending()) {
367         Thread.sleep(1);
368       }
369       client.stopRunning();
370       rpcClient.close();
371     }
372   }
373 
374 
375   @Test (timeout = 1800000)
376   public void testRpcWithChaosMonkeyWithSyncClient() throws Throwable {
377     for (int i = 0; i < numIterations; i++) {
378       TimeoutThread.runWithTimeout(new Callable<Void>() {
379         @Override
380         public Void call() throws Exception {
381           try {
382             testRpcWithChaosMonkey(true);
383           } catch (Throwable e) {
384             if (e instanceof Exception) {
385               throw (Exception)e;
386             } else {
387               throw new Exception(e);
388             }
389           }
390           return null;
391         }
392       }, 180000);
393     }
394   }
395 
396   @Test (timeout = 900000)
397   @Ignore // TODO: test fails with async client
398   public void testRpcWithChaosMonkeyWithAsyncClient() throws Throwable {
399     for (int i = 0; i < numIterations; i++) {
400       TimeoutThread.runWithTimeout(new Callable<Void>() {
401         @Override
402         public Void call() throws Exception {
403           try {
404             testRpcWithChaosMonkey(false);
405           } catch (Throwable e) {
406             if (e instanceof Exception) {
407               throw (Exception)e;
408             } else {
409               throw new Exception(e);
410             }
411           }
412           return null;
413         }
414       }, 90000);
415     }
416   }
417 
418   static class TimeoutThread extends Thread {
419     long timeout;
420     public TimeoutThread(long timeout) {
421       this.timeout = timeout;
422     }
423 
424     @Override
425     public void run() {
426       try {
427         Thread.sleep(timeout);
428         Threads.printThreadInfo(System.err, "TEST TIMEOUT STACK DUMP");
429         System.exit(1); // a timeout happened
430       } catch (InterruptedException e) {
431         // this is what we want
432       }
433     }
434 
435     // runs in the same thread context but injects a timeout thread which will exit the JVM on
436     // timeout
437     static void runWithTimeout(Callable<?> callable, long timeout) throws Exception {
438       TimeoutThread thread = new TimeoutThread(timeout);
439       thread.start();
440       callable.call();
441       thread.interrupt();
442     }
443   }
444 
445   public void testRpcWithChaosMonkey(boolean isSyncClient) throws Throwable {
446     LOG.info("Starting test");
447     Cluster cluster = new Cluster(10, 100);
448     for (int i = 0; i < 10; i++) {
449       cluster.startServer();
450     }
451 
452     ArrayList<SimpleClient> clients = new ArrayList<>();
453 
454     // all threads should share the same rpc client
455     AbstractRpcClient rpcClient = createRpcClient(conf, isSyncClient);
456 
457     for (int i = 0; i < 30; i++) {
458       String clientId = "client_" + i + "_";
459       LOG.info("Starting client: " + clientId);
460       SimpleClient client = new SimpleClient(cluster, rpcClient, clientId);
461       client.start();
462       clients.add(client);
463     }
464 
465     LOG.info("Starting MiniChaosMonkey");
466     MiniChaosMonkey cm = new MiniChaosMonkey(cluster);
467     cm.start();
468 
469     Threads.sleep(30000);
470 
471     LOG.info("Stopping MiniChaosMonkey");
472     cm.stopRunning();
473     cm.join();
474     cm.rethrowException();
475 
476     LOG.info("Stopping clients");
477     for (SimpleClient client : clients) {
478       LOG.info("Stopping client: " + client.id);
479       LOG.info(client.id + " numCalls:" + client.numCalls);
480       client.stopRunning();
481       client.join();
482       client.rethrowException();
483       assertTrue(client.numCalls > 10);
484     }
485 
486     LOG.info("Stopping RpcClient");
487     rpcClient.close();
488 
489     LOG.info("Stopping Cluster");
490     cluster.stopRunning();
491   }
492 }