1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.ipc;
20
21 import static org.mockito.Matchers.anyInt;
22 import static org.mockito.Mockito.doThrow;
23 import static org.mockito.Mockito.spy;
24
25 import java.io.IOException;
26 import java.net.InetSocketAddress;
27 import java.net.Socket;
28 import java.util.ArrayList;
29 import java.util.List;
30
31 import javax.net.SocketFactory;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.hbase.CellScannable;
37 import org.apache.hadoop.hbase.CellUtil;
38 import org.apache.hadoop.hbase.HBaseConfiguration;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.HRegionInfo;
41 import org.apache.hadoop.hbase.KeyValue;
42 import org.apache.hadoop.hbase.KeyValueUtil;
43 import org.apache.hadoop.hbase.ServerName;
44 import org.apache.hadoop.hbase.Waiter;
45 import org.apache.hadoop.hbase.testclassification.SmallTests;
46 import org.apache.hadoop.hbase.client.Put;
47 import org.apache.hadoop.hbase.client.RowMutations;
48 import org.apache.hadoop.hbase.codec.Codec;
49 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
50 import org.apache.hadoop.hbase.protobuf.RequestConverter;
51 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
52 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
53 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
54 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
55 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
56 import org.apache.hadoop.hbase.security.User;
57 import org.apache.hadoop.hbase.testclassification.SmallTests;
58 import org.apache.hadoop.net.NetUtils;
59 import org.junit.experimental.categories.Category;
60 import org.mockito.Mockito;
61 import org.mockito.invocation.InvocationOnMock;
62 import org.mockito.stubbing.Answer;
63
64 import com.google.protobuf.ByteString;
65 import com.google.protobuf.Descriptors.MethodDescriptor;
66
67 @Category({ SmallTests.class })
68 public class TestIPC extends AbstractTestIPC {
69
70 private static final Log LOG = LogFactory.getLog(TestIPC.class);
71
72 @Override
73 protected RpcClientImpl createRpcClientNoCodec(Configuration conf) {
74 return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) {
75 @Override
76 Codec getCodec() {
77 return null;
78 }
79 };
80 }
81
82 @Override
83 protected RpcClientImpl createRpcClient(Configuration conf) {
84 return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT);
85 }
86
87 @Override
88 protected RpcClientImpl createRpcClientRTEDuringConnectionSetup(Configuration conf)
89 throws IOException {
90 SocketFactory spyFactory = spy(NetUtils.getDefaultSocketFactory(conf));
91 Mockito.doAnswer(new Answer<Socket>() {
92 @Override
93 public Socket answer(InvocationOnMock invocation) throws Throwable {
94 Socket s = spy((Socket) invocation.callRealMethod());
95 doThrow(new RuntimeException("Injected fault")).when(s).setSoTimeout(anyInt());
96 return s;
97 }
98 }).when(spyFactory).createSocket();
99
100 return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT, spyFactory);
101 }
102
103 public static void main(String[] args) throws IOException, SecurityException,
104 NoSuchMethodException, InterruptedException {
105 if (args.length != 2) {
106 System.out.println("Usage: TestIPC <CYCLES> <CELLS_PER_CYCLE>");
107 return;
108 }
109
110
111 int cycles = Integer.parseInt(args[0]);
112 int cellcount = Integer.parseInt(args[1]);
113 Configuration conf = HBaseConfiguration.create();
114 TestRpcServer rpcServer = new TestRpcServer();
115 MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
116 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
117 RpcClientImpl client = new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT);
118 KeyValue kv = BIG_CELL;
119 Put p = new Put(CellUtil.cloneRow(kv));
120 for (int i = 0; i < cellcount; i++) {
121 p.add(kv);
122 }
123 RowMutations rm = new RowMutations(CellUtil.cloneRow(kv));
124 rm.add(p);
125 try {
126 rpcServer.start();
127 long startTime = System.currentTimeMillis();
128 User user = User.getCurrent();
129 InetSocketAddress address = rpcServer.getListenerAddress();
130 if (address == null) {
131 throw new IOException("Listener channel is closed");
132 }
133 for (int i = 0; i < cycles; i++) {
134 List<CellScannable> cells = new ArrayList<CellScannable>();
135
136 ClientProtos.RegionAction.Builder builder =
137 RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells,
138 RegionAction.newBuilder(), ClientProtos.Action.newBuilder(),
139 MutationProto.newBuilder());
140 builder.setRegion(RegionSpecifier
141 .newBuilder()
142 .setType(RegionSpecifierType.REGION_NAME)
143 .setValue(
144 ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())));
145 if (i % 100000 == 0) {
146 LOG.info("" + i);
147
148
149
150 }
151 PayloadCarryingRpcController pcrc =
152 new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
153
154 client.call(pcrc, md, builder.build(), param, user, address);
155
156
157
158
159 }
160 LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in "
161 + (System.currentTimeMillis() - startTime) + "ms");
162 } finally {
163 client.close();
164 rpcServer.stop();
165 }
166 }
167
168 }