1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22 import static org.junit.Assert.fail;
23 import static org.mockito.Matchers.anyObject;
24 import static org.mockito.Mockito.spy;
25 import static org.mockito.Mockito.verify;
26 import static org.mockito.internal.verification.VerificationModeFactory.times;
27
28 import java.io.IOException;
29 import java.net.InetSocketAddress;
30 import java.util.ArrayList;
31 import java.util.List;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.hbase.Cell;
37 import org.apache.hadoop.hbase.CellScanner;
38 import org.apache.hadoop.hbase.CellUtil;
39 import org.apache.hadoop.hbase.HBaseConfiguration;
40 import org.apache.hadoop.hbase.KeyValue;
41 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
42 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
43 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
44 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
45 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
46 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
47 import org.apache.hadoop.hbase.security.User;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.apache.hadoop.hbase.util.Pair;
50 import org.apache.hadoop.io.compress.GzipCodec;
51 import org.apache.hadoop.util.StringUtils;
52 import org.junit.Test;
53
54 import com.google.common.collect.ImmutableList;
55 import com.google.common.collect.Lists;
56 import com.google.protobuf.BlockingService;
57 import com.google.protobuf.Descriptors.MethodDescriptor;
58 import com.google.protobuf.Message;
59 import com.google.protobuf.RpcController;
60 import com.google.protobuf.ServiceException;
61
62
63
64
65 public abstract class AbstractTestIPC {
66
67 private static final Log LOG = LogFactory.getLog(AbstractTestIPC.class);
68
69 private static byte[] CELL_BYTES = Bytes.toBytes("xyz");
70 private static KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
71 static byte[] BIG_CELL_BYTES = new byte[10 * 1024];
72 static KeyValue BIG_CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, BIG_CELL_BYTES);
73 static final Configuration CONF = HBaseConfiguration.create();
74
75
76
77
78 static final BlockingService SERVICE =
79 TestRpcServiceProtos.TestProtobufRpcProto
80 .newReflectiveBlockingService(new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
81
82 @Override
83 public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
84 throws ServiceException {
85 return null;
86 }
87
88 @Override
89 public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
90 throws ServiceException {
91 return null;
92 }
93
94 @Override
95 public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
96 throws ServiceException {
97 if (controller instanceof PayloadCarryingRpcController) {
98 PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller;
99
100
101
102
103
104 CellScanner cellScanner = pcrc.cellScanner();
105 List<Cell> list = null;
106 if (cellScanner != null) {
107 list = new ArrayList<Cell>();
108 try {
109 while (cellScanner.advance()) {
110 list.add(cellScanner.current());
111 }
112 } catch (IOException e) {
113 throw new ServiceException(e);
114 }
115 }
116 cellScanner = CellUtil.createCellScanner(list);
117 ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner);
118 }
119 return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
120 }
121 });
122
123
124
125
126
127 static class TestRpcServer extends RpcServer {
128
129 TestRpcServer() throws IOException {
130 this(new FifoRpcScheduler(CONF, 1));
131 }
132
133 TestRpcServer(RpcScheduler scheduler) throws IOException {
134 super(null, "testRpcServer", Lists
135 .newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress(
136 "localhost", 0), CONF, scheduler);
137 }
138
139 @Override
140 public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
141 Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
142 throws IOException {
143 return super.call(service, md, param, cellScanner, receiveTime, status);
144 }
145 }
146
147 protected abstract AbstractRpcClient createRpcClientNoCodec(Configuration conf);
148
149
150
151
152
153
154 @Test
155 public void testNoCodec() throws InterruptedException, IOException {
156 Configuration conf = HBaseConfiguration.create();
157 AbstractRpcClient client = createRpcClientNoCodec(conf);
158 TestRpcServer rpcServer = new TestRpcServer();
159 try {
160 rpcServer.start();
161 MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
162 final String message = "hello";
163 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
164 InetSocketAddress address = rpcServer.getListenerAddress();
165 if (address == null) {
166 throw new IOException("Listener channel is closed");
167 }
168 Pair<Message, CellScanner> r =
169 client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address);
170 assertTrue(r.getSecond() == null);
171
172 assertTrue(r.getFirst().toString().contains(message));
173 } finally {
174 client.close();
175 rpcServer.stop();
176 }
177 }
178
179 protected abstract AbstractRpcClient createRpcClient(Configuration conf);
180
181
182
183
184
185
186
187
188
189
190 @Test
191 public void testCompressCellBlock() throws IOException, InterruptedException, SecurityException,
192 NoSuchMethodException, ServiceException {
193 Configuration conf = new Configuration(HBaseConfiguration.create());
194 conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName());
195 List<Cell> cells = new ArrayList<Cell>();
196 int count = 3;
197 for (int i = 0; i < count; i++) {
198 cells.add(CELL);
199 }
200 AbstractRpcClient client = createRpcClient(conf);
201 TestRpcServer rpcServer = new TestRpcServer();
202 try {
203 rpcServer.start();
204 MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
205 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
206 PayloadCarryingRpcController pcrc =
207 new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
208 InetSocketAddress address = rpcServer.getListenerAddress();
209 if (address == null) {
210 throw new IOException("Listener channel is closed");
211 }
212 Pair<Message, CellScanner> r =
213 client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address);
214 int index = 0;
215 while (r.getSecond().advance()) {
216 assertTrue(CELL.equals(r.getSecond().current()));
217 index++;
218 }
219 assertEquals(count, index);
220 } finally {
221 client.close();
222 rpcServer.stop();
223 }
224 }
225
226 protected abstract AbstractRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf)
227 throws IOException;
228
229
230 public void testRTEDuringConnectionSetup() throws Exception {
231 Configuration conf = HBaseConfiguration.create();
232 TestRpcServer rpcServer = new TestRpcServer();
233 AbstractRpcClient client = createRpcClientRTEDuringConnectionSetup(conf);
234 try {
235 rpcServer.start();
236 MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
237 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
238 InetSocketAddress address = rpcServer.getListenerAddress();
239 if (address == null) {
240 throw new IOException("Listener channel is closed");
241 }
242 client.call(null, md, param, null, User.getCurrent(), address);
243 fail("Expected an exception to have been thrown!");
244 } catch (Exception e) {
245 LOG.info("Caught expected exception: " + e.toString());
246 assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
247 } finally {
248 client.close();
249 rpcServer.stop();
250 }
251 }
252
253
254 @Test
255 public void testRpcScheduler() throws IOException, InterruptedException {
256 RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
257 RpcServer rpcServer = new TestRpcServer(scheduler);
258 verify(scheduler).init((RpcScheduler.Context) anyObject());
259 AbstractRpcClient client = createRpcClient(CONF);
260 try {
261 rpcServer.start();
262 verify(scheduler).start();
263 MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
264 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
265 InetSocketAddress address = rpcServer.getListenerAddress();
266 if (address == null) {
267 throw new IOException("Listener channel is closed");
268 }
269 for (int i = 0; i < 10; i++) {
270 client.call(
271 new PayloadCarryingRpcController(
272 CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param, md
273 .getOutputType().toProto(), User.getCurrent(), address);
274 }
275 verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
276 } finally {
277 rpcServer.stop();
278 verify(scheduler).stop();
279 }
280 }
281 }