1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import static org.junit.Assert.assertTrue;
21 import static org.junit.Assert.fail;
22 import io.netty.channel.ChannelHandlerContext;
23 import io.netty.channel.ChannelInitializer;
24 import io.netty.channel.ChannelOutboundHandlerAdapter;
25 import io.netty.channel.ChannelPromise;
26 import io.netty.channel.epoll.EpollEventLoopGroup;
27 import io.netty.channel.nio.NioEventLoopGroup;
28 import io.netty.channel.socket.SocketChannel;
29
30 import java.io.IOException;
31 import java.net.InetSocketAddress;
32 import java.util.ArrayList;
33 import java.util.Collection;
34 import java.util.List;
35 import java.util.concurrent.atomic.AtomicBoolean;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.hbase.CellScannable;
41 import org.apache.hadoop.hbase.CellUtil;
42 import org.apache.hadoop.hbase.HBaseConfiguration;
43 import org.apache.hadoop.hbase.HBaseTestingUtility;
44 import org.apache.hadoop.hbase.HConstants;
45 import org.apache.hadoop.hbase.HRegionInfo;
46 import org.apache.hadoop.hbase.KeyValue;
47 import org.apache.hadoop.hbase.ServerName;
48 import org.apache.hadoop.hbase.Waiter;
49 import org.apache.hadoop.hbase.client.Put;
50 import org.apache.hadoop.hbase.client.RowMutations;
51 import org.apache.hadoop.hbase.codec.Codec;
52 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
53 import org.apache.hadoop.hbase.protobuf.RequestConverter;
54 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
55 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
56 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
57 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
58 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
59 import org.apache.hadoop.hbase.security.User;
60 import org.apache.hadoop.hbase.testclassification.SmallTests;
61 import org.apache.hadoop.util.StringUtils;
62 import org.junit.Assume;
63 import org.junit.Test;
64 import org.junit.experimental.categories.Category;
65 import org.junit.runner.RunWith;
66 import org.junit.runners.Parameterized;
67 import org.junit.runners.Parameterized.Parameters;
68
69 import com.google.protobuf.ByteString;
70 import com.google.protobuf.Descriptors.MethodDescriptor;
71 import com.google.protobuf.Message;
72 import com.google.protobuf.RpcCallback;
73 import com.google.protobuf.RpcChannel;
74
75 @RunWith(Parameterized.class)
76 @Category({ SmallTests.class })
77 public class TestAsyncIPC extends AbstractTestIPC {
78
79 private static final Log LOG = LogFactory.getLog(TestAsyncIPC.class);
80
81
82 private static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
83
84 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
85
86 @Parameters
87 public static Collection<Object[]> parameters() {
88 List<Object[]> paramList = new ArrayList<Object[]>();
89 paramList.add(new Object[] { false, false });
90 paramList.add(new Object[] { false, true });
91 paramList.add(new Object[] { true, false });
92 paramList.add(new Object[] { true, true });
93 return paramList;
94 }
95
96 private final boolean useNativeTransport;
97
98 private final boolean useGlobalEventLoopGroup;
99
100 public TestAsyncIPC(boolean useNativeTransport, boolean useGlobalEventLoopGroup) {
101 this.useNativeTransport = useNativeTransport;
102 this.useGlobalEventLoopGroup = useGlobalEventLoopGroup;
103 }
104
105 private void setConf(Configuration conf) {
106 conf.setBoolean(AsyncRpcClient.USE_NATIVE_TRANSPORT, useNativeTransport);
107 conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, useGlobalEventLoopGroup);
108 if (useGlobalEventLoopGroup && AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP != null) {
109 if (useNativeTransport
110 && !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst() instanceof EpollEventLoopGroup)
111 || (!useNativeTransport
112 && !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst() instanceof NioEventLoopGroup))) {
113 AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst().shutdownGracefully();
114 AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP = null;
115 }
116 }
117 }
118
119 @Override
120 protected AsyncRpcClient createRpcClientNoCodec(Configuration conf) {
121 setConf(conf);
122 return new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null) {
123
124 @Override
125 Codec getCodec() {
126 return null;
127 }
128
129 };
130 }
131
132 @Override
133 protected AsyncRpcClient createRpcClient(Configuration conf) {
134 setConf(conf);
135 return new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null);
136 }
137
138 @Override
139 protected AsyncRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) {
140 setConf(conf);
141 return new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null,
142 new ChannelInitializer<SocketChannel>() {
143
144 @Override
145 protected void initChannel(SocketChannel ch) throws Exception {
146 ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
147 @Override
148 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
149 throws Exception {
150 promise.setFailure(new RuntimeException("Injected fault"));
151 }
152 });
153 }
154 });
155 }
156
157 @Test
158 public void testAsyncConnectionSetup() throws Exception {
159 TestRpcServer rpcServer = new TestRpcServer();
160 AsyncRpcClient client = createRpcClient(CONF);
161 try {
162 rpcServer.start();
163 InetSocketAddress address = rpcServer.getListenerAddress();
164 if (address == null) {
165 throw new IOException("Listener channel is closed");
166 }
167 MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
168 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
169
170 RpcChannel channel =
171 client.createRpcChannel(ServerName.valueOf(address.getHostName(), address.getPort(),
172 System.currentTimeMillis()), User.getCurrent(), 0);
173
174 final AtomicBoolean done = new AtomicBoolean(false);
175
176 channel.callMethod(md, new PayloadCarryingRpcController(), param, md.getOutputType()
177 .toProto(), new RpcCallback<Message>() {
178 @Override
179 public void run(Message parameter) {
180 done.set(true);
181 }
182 });
183
184 TEST_UTIL.waitFor(10000, new Waiter.Predicate<Exception>() {
185 @Override
186 public boolean evaluate() throws Exception {
187 return done.get();
188 }
189 });
190 } finally {
191 client.close();
192 rpcServer.stop();
193 }
194 }
195
196 @Test
197 public void testRTEDuringAsyncConnectionSetup() throws Exception {
198
199
200
201 Assume.assumeTrue(!WINDOWS);
202
203 TestRpcServer rpcServer = new TestRpcServer();
204 AsyncRpcClient client = createRpcClientRTEDuringConnectionSetup(CONF);
205 try {
206 rpcServer.start();
207 InetSocketAddress address = rpcServer.getListenerAddress();
208 if (address == null) {
209 throw new IOException("Listener channel is closed");
210 }
211 MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
212 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
213
214 RpcChannel channel =
215 client.createRpcChannel(ServerName.valueOf(address.getHostName(), address.getPort(),
216 System.currentTimeMillis()), User.getCurrent(), 0);
217
218 final AtomicBoolean done = new AtomicBoolean(false);
219
220 PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
221 controller.notifyOnFail(new RpcCallback<IOException>() {
222 @Override
223 public void run(IOException e) {
224 done.set(true);
225 LOG.info("Caught expected exception: " + e.toString());
226 assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
227 }
228 });
229
230 channel.callMethod(md, controller, param, md.getOutputType().toProto(),
231 new RpcCallback<Message>() {
232 @Override
233 public void run(Message parameter) {
234 done.set(true);
235 fail("Expected an exception to have been thrown!");
236 }
237 });
238
239 TEST_UTIL.waitFor(10000, new Waiter.Predicate<Exception>() {
240 @Override
241 public boolean evaluate() throws Exception {
242 return done.get();
243 }
244 });
245 } finally {
246 client.close();
247 rpcServer.stop();
248 }
249 }
250
251 public static void main(String[] args) throws IOException, SecurityException,
252 NoSuchMethodException, InterruptedException {
253 if (args.length != 2) {
254 System.out.println("Usage: TestAsyncIPC <CYCLES> <CELLS_PER_CYCLE>");
255 return;
256 }
257
258
259 int cycles = Integer.parseInt(args[0]);
260 int cellcount = Integer.parseInt(args[1]);
261 Configuration conf = HBaseConfiguration.create();
262 TestRpcServer rpcServer = new TestRpcServer();
263 MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
264 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
265 AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null);
266 KeyValue kv = BIG_CELL;
267 Put p = new Put(CellUtil.cloneRow(kv));
268 for (int i = 0; i < cellcount; i++) {
269 p.add(kv);
270 }
271 RowMutations rm = new RowMutations(CellUtil.cloneRow(kv));
272 rm.add(p);
273 try {
274 rpcServer.start();
275 InetSocketAddress address = rpcServer.getListenerAddress();
276 if (address == null) {
277 throw new IOException("Listener channel is closed");
278 }
279 long startTime = System.currentTimeMillis();
280 User user = User.getCurrent();
281 for (int i = 0; i < cycles; i++) {
282 List<CellScannable> cells = new ArrayList<CellScannable>();
283
284 ClientProtos.RegionAction.Builder builder =
285 RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells,
286 RegionAction.newBuilder(), ClientProtos.Action.newBuilder(),
287 MutationProto.newBuilder());
288 builder.setRegion(RegionSpecifier
289 .newBuilder()
290 .setType(RegionSpecifierType.REGION_NAME)
291 .setValue(
292 ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())));
293 if (i % 100000 == 0) {
294 LOG.info("" + i);
295
296
297
298 }
299 PayloadCarryingRpcController pcrc =
300 new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
301
302 client.call(pcrc, md, builder.build(), param, user, address);
303
304
305
306
307 }
308 LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in "
309 + (System.currentTimeMillis() - startTime) + "ms");
310 } finally {
311 client.close();
312 rpcServer.stop();
313 }
314 }
315 }