View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import static org.junit.Assert.assertTrue;
21  import static org.junit.Assert.fail;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelInitializer;
24  import io.netty.channel.ChannelOutboundHandlerAdapter;
25  import io.netty.channel.ChannelPromise;
26  import io.netty.channel.epoll.EpollEventLoopGroup;
27  import io.netty.channel.nio.NioEventLoopGroup;
28  import io.netty.channel.socket.SocketChannel;
29  
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.util.ArrayList;
33  import java.util.Collection;
34  import java.util.List;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.CellScannable;
41  import org.apache.hadoop.hbase.CellUtil;
42  import org.apache.hadoop.hbase.HBaseConfiguration;
43  import org.apache.hadoop.hbase.HBaseTestingUtility;
44  import org.apache.hadoop.hbase.HConstants;
45  import org.apache.hadoop.hbase.HRegionInfo;
46  import org.apache.hadoop.hbase.KeyValue;
47  import org.apache.hadoop.hbase.ServerName;
48  import org.apache.hadoop.hbase.Waiter;
49  import org.apache.hadoop.hbase.client.Put;
50  import org.apache.hadoop.hbase.client.RowMutations;
51  import org.apache.hadoop.hbase.codec.Codec;
52  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
53  import org.apache.hadoop.hbase.protobuf.RequestConverter;
54  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
55  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
56  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
57  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
58  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
59  import org.apache.hadoop.hbase.security.User;
60  import org.apache.hadoop.hbase.testclassification.SmallTests;
61  import org.apache.hadoop.util.StringUtils;
62  import org.junit.Assume;
63  import org.junit.Test;
64  import org.junit.experimental.categories.Category;
65  import org.junit.runner.RunWith;
66  import org.junit.runners.Parameterized;
67  import org.junit.runners.Parameterized.Parameters;
68  
69  import com.google.protobuf.ByteString;
70  import com.google.protobuf.Descriptors.MethodDescriptor;
71  import com.google.protobuf.Message;
72  import com.google.protobuf.RpcCallback;
73  import com.google.protobuf.RpcChannel;
74  
75  @RunWith(Parameterized.class)
76  @Category({ SmallTests.class })
77  public class TestAsyncIPC extends AbstractTestIPC {
78  
79    private static final Log LOG = LogFactory.getLog(TestAsyncIPC.class);
80  
81    /** Set to true on Windows platforms */
82    private static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
83  
84    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
85  
86    @Parameters
87    public static Collection<Object[]> parameters() {
88      List<Object[]> paramList = new ArrayList<Object[]>();
89      paramList.add(new Object[] { false, false });
90      paramList.add(new Object[] { false, true });
91      paramList.add(new Object[] { true, false });
92      paramList.add(new Object[] { true, true });
93      return paramList;
94    }
95  
96    private final boolean useNativeTransport;
97  
98    private final boolean useGlobalEventLoopGroup;
99  
100   public TestAsyncIPC(boolean useNativeTransport, boolean useGlobalEventLoopGroup) {
101     this.useNativeTransport = useNativeTransport;
102     this.useGlobalEventLoopGroup = useGlobalEventLoopGroup;
103   }
104 
105   private void setConf(Configuration conf) {
106     conf.setBoolean(AsyncRpcClient.USE_NATIVE_TRANSPORT, useNativeTransport);
107     conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, useGlobalEventLoopGroup);
108     if (useGlobalEventLoopGroup && AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP != null) {
109       if (useNativeTransport
110           && !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst() instanceof EpollEventLoopGroup)
111           || (!useNativeTransport
112           && !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst() instanceof NioEventLoopGroup))) {
113         AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst().shutdownGracefully();
114         AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP = null;
115       }
116     }
117   }
118 
119   @Override
120   protected AsyncRpcClient createRpcClientNoCodec(Configuration conf) {
121     setConf(conf);
122     return new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null) {
123 
124       @Override
125       Codec getCodec() {
126         return null;
127       }
128 
129     };
130   }
131 
132   @Override
133   protected AsyncRpcClient createRpcClient(Configuration conf) {
134     setConf(conf);
135     return new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null);
136   }
137 
138   @Override
139   protected AsyncRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) {
140     setConf(conf);
141     return new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null,
142         new ChannelInitializer<SocketChannel>() {
143 
144           @Override
145           protected void initChannel(SocketChannel ch) throws Exception {
146             ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
147               @Override
148               public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
149                   throws Exception {
150                 promise.setFailure(new RuntimeException("Injected fault"));
151               }
152             });
153           }
154         });
155   }
156 
157   @Test
158   public void testAsyncConnectionSetup() throws Exception {
159     TestRpcServer rpcServer = new TestRpcServer();
160     AsyncRpcClient client = createRpcClient(CONF);
161     try {
162       rpcServer.start();
163       InetSocketAddress address = rpcServer.getListenerAddress();
164       if (address == null) {
165         throw new IOException("Listener channel is closed");
166       }
167       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
168       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
169 
170       RpcChannel channel =
171           client.createRpcChannel(ServerName.valueOf(address.getHostName(), address.getPort(),
172             System.currentTimeMillis()), User.getCurrent(), 0);
173 
174       final AtomicBoolean done = new AtomicBoolean(false);
175 
176       channel.callMethod(md, new PayloadCarryingRpcController(), param, md.getOutputType()
177           .toProto(), new RpcCallback<Message>() {
178         @Override
179         public void run(Message parameter) {
180           done.set(true);
181         }
182       });
183 
184       TEST_UTIL.waitFor(10000, new Waiter.Predicate<Exception>() {
185         @Override
186         public boolean evaluate() throws Exception {
187           return done.get();
188         }
189       });
190     } finally {
191       client.close();
192       rpcServer.stop();
193     }
194   }
195 
196   @Test
197   public void testRTEDuringAsyncConnectionSetup() throws Exception {
198     // the test run multiple times - in Windows Jenkins environment, it sometimes failed in
199     // one out of 4 calls.  It cannot be reproed in a Windows Azure enviroment.  To reduce
200     // noise, disable it in Windows UT.
201     Assume.assumeTrue(!WINDOWS);
202 
203     TestRpcServer rpcServer = new TestRpcServer();
204     AsyncRpcClient client = createRpcClientRTEDuringConnectionSetup(CONF);
205     try {
206       rpcServer.start();
207       InetSocketAddress address = rpcServer.getListenerAddress();
208       if (address == null) {
209         throw new IOException("Listener channel is closed");
210       }
211       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
212       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
213 
214       RpcChannel channel =
215           client.createRpcChannel(ServerName.valueOf(address.getHostName(), address.getPort(),
216             System.currentTimeMillis()), User.getCurrent(), 0);
217 
218       final AtomicBoolean done = new AtomicBoolean(false);
219 
220       PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
221       controller.notifyOnFail(new RpcCallback<IOException>() {
222         @Override
223         public void run(IOException e) {
224           done.set(true);
225           LOG.info("Caught expected exception: " + e.toString());
226           assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
227         }
228       });
229 
230       channel.callMethod(md, controller, param, md.getOutputType().toProto(),
231         new RpcCallback<Message>() {
232           @Override
233           public void run(Message parameter) {
234             done.set(true);
235             fail("Expected an exception to have been thrown!");
236           }
237         });
238 
239       TEST_UTIL.waitFor(10000, new Waiter.Predicate<Exception>() {
240         @Override
241         public boolean evaluate() throws Exception {
242           return done.get();
243         }
244       });
245     } finally {
246       client.close();
247       rpcServer.stop();
248     }
249   }
250 
251   public static void main(String[] args) throws IOException, SecurityException,
252       NoSuchMethodException, InterruptedException {
253     if (args.length != 2) {
254       System.out.println("Usage: TestAsyncIPC <CYCLES> <CELLS_PER_CYCLE>");
255       return;
256     }
257     // ((Log4JLogger)HBaseServer.LOG).getLogger().setLevel(Level.INFO);
258     // ((Log4JLogger)HBaseClient.LOG).getLogger().setLevel(Level.INFO);
259     int cycles = Integer.parseInt(args[0]);
260     int cellcount = Integer.parseInt(args[1]);
261     Configuration conf = HBaseConfiguration.create();
262     TestRpcServer rpcServer = new TestRpcServer();
263     MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
264     EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
265     AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null);
266     KeyValue kv = BIG_CELL;
267     Put p = new Put(CellUtil.cloneRow(kv));
268     for (int i = 0; i < cellcount; i++) {
269       p.add(kv);
270     }
271     RowMutations rm = new RowMutations(CellUtil.cloneRow(kv));
272     rm.add(p);
273     try {
274       rpcServer.start();
275       InetSocketAddress address = rpcServer.getListenerAddress();
276       if (address == null) {
277         throw new IOException("Listener channel is closed");
278       }
279       long startTime = System.currentTimeMillis();
280       User user = User.getCurrent();
281       for (int i = 0; i < cycles; i++) {
282         List<CellScannable> cells = new ArrayList<CellScannable>();
283         // Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm);
284         ClientProtos.RegionAction.Builder builder =
285             RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells,
286               RegionAction.newBuilder(), ClientProtos.Action.newBuilder(),
287               MutationProto.newBuilder());
288         builder.setRegion(RegionSpecifier
289             .newBuilder()
290             .setType(RegionSpecifierType.REGION_NAME)
291             .setValue(
292               ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())));
293         if (i % 100000 == 0) {
294           LOG.info("" + i);
295           // Uncomment this for a thread dump every so often.
296           // ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
297           // "Thread dump " + Thread.currentThread().getName());
298         }
299         PayloadCarryingRpcController pcrc =
300             new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
301         // Pair<Message, CellScanner> response =
302         client.call(pcrc, md, builder.build(), param, user, address);
303         /*
304          * int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(),
305          * count);
306          */
307       }
308       LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in "
309           + (System.currentTimeMillis() - startTime) + "ms");
310     } finally {
311       client.close();
312       rpcServer.stop();
313     }
314   }
315 }