View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertTrue;
22  import static org.junit.Assert.fail;
23  import static org.mockito.Matchers.anyObject;
24  import static org.mockito.Mockito.spy;
25  import static org.mockito.Mockito.verify;
26  import static org.mockito.internal.verification.VerificationModeFactory.times;
27  
28  import java.io.IOException;
29  import java.net.InetSocketAddress;
30  import java.util.ArrayList;
31  import java.util.List;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.Cell;
37  import org.apache.hadoop.hbase.CellScanner;
38  import org.apache.hadoop.hbase.CellUtil;
39  import org.apache.hadoop.hbase.HBaseConfiguration;
40  import org.apache.hadoop.hbase.KeyValue;
41  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
42  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
43  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
44  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
45  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
46  import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
47  import org.apache.hadoop.hbase.security.User;
48  import org.apache.hadoop.hbase.util.Bytes;
49  import org.apache.hadoop.hbase.util.Pair;
50  import org.apache.hadoop.io.compress.GzipCodec;
51  import org.apache.hadoop.util.StringUtils;
52  import org.junit.Test;
53  
54  import com.google.common.collect.ImmutableList;
55  import com.google.common.collect.Lists;
56  import com.google.protobuf.BlockingService;
57  import com.google.protobuf.Descriptors.MethodDescriptor;
58  import com.google.protobuf.Message;
59  import com.google.protobuf.RpcController;
60  import com.google.protobuf.ServiceException;
61  
62  /**
63   * Some basic ipc tests.
64   */
65  public abstract class AbstractTestIPC {
66  
67    private static final Log LOG = LogFactory.getLog(AbstractTestIPC.class);
68  
69    private static byte[] CELL_BYTES = Bytes.toBytes("xyz");
70    private static KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
71    static byte[] BIG_CELL_BYTES = new byte[10 * 1024];
72    static KeyValue BIG_CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, BIG_CELL_BYTES);
73    static final Configuration CONF = HBaseConfiguration.create();
74    // We are using the test TestRpcServiceProtos generated classes and Service because they are
75    // available and basic with methods like 'echo', and ping. Below we make a blocking service
76    // by passing in implementation of blocking interface. We use this service in all tests that
77    // follow.
78    static final BlockingService SERVICE =
79        TestRpcServiceProtos.TestProtobufRpcProto
80            .newReflectiveBlockingService(new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
81  
82              @Override
83              public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
84                  throws ServiceException {
85                return null;
86              }
87  
88              @Override
89              public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
90                  throws ServiceException {
91                return null;
92              }
93  
94              @Override
95              public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
96                  throws ServiceException {
97                if (controller instanceof PayloadCarryingRpcController) {
98                  PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller;
99                  // If cells, scan them to check we are able to iterate what we were given and since
100                 // this is
101                 // an echo, just put them back on the controller creating a new block. Tests our
102                 // block
103                 // building.
104                 CellScanner cellScanner = pcrc.cellScanner();
105                 List<Cell> list = null;
106                 if (cellScanner != null) {
107                   list = new ArrayList<Cell>();
108                   try {
109                     while (cellScanner.advance()) {
110                       list.add(cellScanner.current());
111                     }
112                   } catch (IOException e) {
113                     throw new ServiceException(e);
114                   }
115                 }
116                 cellScanner = CellUtil.createCellScanner(list);
117                 ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner);
118               }
119               return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
120             }
121           });
122 
123   /**
124    * Instance of server. We actually don't do anything speical in here so could just use
125    * HBaseRpcServer directly.
126    */
127   static class TestRpcServer extends RpcServer {
128 
129     TestRpcServer() throws IOException {
130       this(new FifoRpcScheduler(CONF, 1));
131     }
132 
133     TestRpcServer(RpcScheduler scheduler) throws IOException {
134       super(null, "testRpcServer", Lists
135           .newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress(
136           "localhost", 0), CONF, scheduler);
137     }
138 
139     @Override
140     public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
141         Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
142         throws IOException {
143       return super.call(service, md, param, cellScanner, receiveTime, status);
144     }
145   }
146 
147   protected abstract AbstractRpcClient createRpcClientNoCodec(Configuration conf);
148 
149   /**
150    * Ensure we do not HAVE TO HAVE a codec.
151    * @throws InterruptedException
152    * @throws IOException
153    */
154   @Test
155   public void testNoCodec() throws InterruptedException, IOException {
156     Configuration conf = HBaseConfiguration.create();
157     AbstractRpcClient client = createRpcClientNoCodec(conf);
158     TestRpcServer rpcServer = new TestRpcServer();
159     try {
160       rpcServer.start();
161       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
162       final String message = "hello";
163       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
164       InetSocketAddress address = rpcServer.getListenerAddress();
165       if (address == null) {
166         throw new IOException("Listener channel is closed");
167       }
168       Pair<Message, CellScanner> r =
169           client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address);
170       assertTrue(r.getSecond() == null);
171       // Silly assertion that the message is in the returned pb.
172       assertTrue(r.getFirst().toString().contains(message));
173     } finally {
174       client.close();
175       rpcServer.stop();
176     }
177   }
178 
179   protected abstract AbstractRpcClient createRpcClient(Configuration conf);
180 
181   /**
182    * It is hard to verify the compression is actually happening under the wraps. Hope that if
183    * unsupported, we'll get an exception out of some time (meantime, have to trace it manually to
184    * confirm that compression is happening down in the client and server).
185    * @throws IOException
186    * @throws InterruptedException
187    * @throws SecurityException
188    * @throws NoSuchMethodException
189    */
190   @Test
191   public void testCompressCellBlock() throws IOException, InterruptedException, SecurityException,
192       NoSuchMethodException, ServiceException {
193     Configuration conf = new Configuration(HBaseConfiguration.create());
194     conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName());
195     List<Cell> cells = new ArrayList<Cell>();
196     int count = 3;
197     for (int i = 0; i < count; i++) {
198       cells.add(CELL);
199     }
200     AbstractRpcClient client = createRpcClient(conf);
201     TestRpcServer rpcServer = new TestRpcServer();
202     try {
203       rpcServer.start();
204       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
205       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
206       PayloadCarryingRpcController pcrc =
207           new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
208       InetSocketAddress address = rpcServer.getListenerAddress();
209       if (address == null) {
210         throw new IOException("Listener channel is closed");
211       }
212       Pair<Message, CellScanner> r =
213           client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address);
214       int index = 0;
215       while (r.getSecond().advance()) {
216         assertTrue(CELL.equals(r.getSecond().current()));
217         index++;
218       }
219       assertEquals(count, index);
220     } finally {
221       client.close();
222       rpcServer.stop();
223     }
224   }
225 
226   protected abstract AbstractRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf)
227       throws IOException;
228 
229   // @Test
230   public void testRTEDuringConnectionSetup() throws Exception {
231     Configuration conf = HBaseConfiguration.create();
232     TestRpcServer rpcServer = new TestRpcServer();
233     AbstractRpcClient client = createRpcClientRTEDuringConnectionSetup(conf);
234     try {
235       rpcServer.start();
236       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
237       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
238       InetSocketAddress address = rpcServer.getListenerAddress();
239       if (address == null) {
240         throw new IOException("Listener channel is closed");
241       }
242       client.call(null, md, param, null, User.getCurrent(), address);
243       fail("Expected an exception to have been thrown!");
244     } catch (Exception e) {
245       LOG.info("Caught expected exception: " + e.toString());
246       assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
247     } finally {
248       client.close();
249       rpcServer.stop();
250     }
251   }
252 
253   /** Tests that the rpc scheduler is called when requests arrive. */
254   @Test
255   public void testRpcScheduler() throws IOException, InterruptedException {
256     RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
257     RpcServer rpcServer = new TestRpcServer(scheduler);
258     verify(scheduler).init((RpcScheduler.Context) anyObject());
259     AbstractRpcClient client = createRpcClient(CONF);
260     try {
261       rpcServer.start();
262       verify(scheduler).start();
263       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
264       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
265       InetSocketAddress address = rpcServer.getListenerAddress();
266       if (address == null) {
267         throw new IOException("Listener channel is closed");
268       }
269       for (int i = 0; i < 10; i++) {
270         client.call(
271           new PayloadCarryingRpcController(
272               CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param, md
273               .getOutputType().toProto(), User.getCurrent(), address);
274       }
275       verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
276     } finally {
277       rpcServer.stop();
278       verify(scheduler).stop();
279     }
280   }
281 }