View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import com.google.common.collect.ImmutableList;
21  import com.google.common.collect.Lists;
22  import com.google.protobuf.BlockingService;
23  import com.google.protobuf.Descriptors.MethodDescriptor;
24  import com.google.protobuf.Message;
25  import com.google.protobuf.RpcController;
26  import com.google.protobuf.ServiceException;
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.Abortable;
31  import org.apache.hadoop.hbase.Cell;
32  import org.apache.hadoop.hbase.CellScanner;
33  import org.apache.hadoop.hbase.CellUtil;
34  import org.apache.hadoop.hbase.HBaseConfiguration;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.KeyValue;
37  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
38  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
39  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
40  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
41  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
42  import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
43  import org.apache.hadoop.hbase.security.User;
44  import org.apache.hadoop.hbase.testclassification.SmallTests;
45  import org.apache.hadoop.hbase.util.Pair;
46  import org.junit.Ignore;
47  import org.junit.Test;
48  import org.junit.experimental.categories.Category;
49  import org.mockito.Mockito;
50  
51  import java.io.IOException;
52  import java.net.InetSocketAddress;
53  import java.util.ArrayList;
54  import java.util.List;
55  
56  import static org.mockito.Mockito.mock;
57  
58  @Category({SmallTests.class})
59  public class TestRpcHandlerException {
60    public static final Log LOG = LogFactory.getLog(TestRpcHandlerException.class);
61    static String example = "xyz";
62    static byte[] CELL_BYTES = example.getBytes();
63    static Cell CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
64  
65    private final static Configuration CONF = HBaseConfiguration.create();
66    RpcExecutor rpcExecutor = Mockito.mock(RpcExecutor.class);
67  
68    // We are using the test TestRpcServiceProtos generated classes and Service because they are
69    // available and basic with methods like 'echo', and ping. Below we make a blocking service
70    // by passing in implementation of blocking interface. We use this service in all tests that
71    // follow.
72    private static final BlockingService SERVICE =
73        TestRpcServiceProtos.TestProtobufRpcProto
74        .newReflectiveBlockingService(new TestRpcServiceProtos
75  		  .TestProtobufRpcProto.BlockingInterface() {
76  
77          @Override
78          public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
79              throws ServiceException {
80            return null;
81          }
82  
83          @Override
84          public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
85              throws ServiceException {
86            return null;
87          }
88  
89          @Override
90          public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
91              throws Error, RuntimeException {
92            if (controller instanceof PayloadCarryingRpcController) {
93              PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller;
94              // If cells, scan them to check we are able to iterate what we were given and since
95              // this is
96              // an echo, just put them back on the controller creating a new block. Tests our
97              // block
98              // building.
99              CellScanner cellScanner = pcrc.cellScanner();
100             List<Cell> list = null;
101             if (cellScanner != null) {
102 		list = new ArrayList<Cell>();
103 		try {
104 			while (cellScanner.advance()) {
105 				list.add(cellScanner.current());
106 				throw new StackOverflowError();
107 			}
108 		} catch (StackOverflowError e) {
109 			throw e;
110 		} catch (IOException e) {
111 			throw new RuntimeException(e);
112 		}
113             }
114             cellScanner = CellUtil.createCellScanner(list);
115             ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner);
116           }
117           return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
118         }
119       });
120 
121   /**
122    * Instance of server. We actually don't do anything speical in here so could just use
123    * HBaseRpcServer directly.
124    */
125   private static class TestRpcServer extends RpcServer {
126 
127     TestRpcServer() throws IOException {
128       this(new FifoRpcScheduler(CONF, 1));
129     }
130 
131     TestRpcServer(RpcScheduler scheduler) throws IOException {
132       super(null, "testRpcServer",
133 		  Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
134 		  new InetSocketAddress("localhost", 0), CONF, scheduler);
135     }
136 
137     @Override
138     public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
139       Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
140           throws IOException {
141       return super.call(service, md, param, cellScanner, receiveTime, status);
142     }
143   }
144 
145   /** Tests that the rpc scheduler is called when requests arrive.
146    *  When Rpc handler thread dies, the client will hang and the test will fail.
147    *  The test is meant to be a unit test to test the behavior.
148    *
149    * */
150   private class AbortServer implements Abortable {
151     private boolean aborted = false;
152 
153     @Override
154     public void abort(String why, Throwable e) {
155       aborted = true;
156     }
157 
158     @Override
159     public boolean isAborted() {
160       return aborted;
161     }
162   }
163 
164   /* This is a unit test to make sure to abort region server when the number of Rpc handler thread
165    * caught errors exceeds the threshold. Client will hang when RS aborts.
166    */
167   @Ignore
168   @Test
169   public void testRpcScheduler() throws IOException, InterruptedException {
170     PriorityFunction qosFunction = mock(PriorityFunction.class);
171     Abortable abortable = new AbortServer();
172     RpcScheduler scheduler = new SimpleRpcScheduler(CONF, 2, 0, 0, qosFunction, abortable, 0);
173     RpcServer rpcServer = new TestRpcServer(scheduler);
174     RpcClientImpl client = new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT);
175     try {
176       rpcServer.start();
177       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
178       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
179       PayloadCarryingRpcController controller =
180           new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL)));
181       InetSocketAddress address = rpcServer.getListenerAddress();
182       if (address == null) {
183         throw new IOException("Listener channel is closed");
184       }
185       client.call(controller, md, param, md.getOutputType().toProto(), User.getCurrent(),
186           address);
187     } catch (Throwable e) {
188       assert(abortable.isAborted() == true);
189     } finally {
190       rpcServer.stop();
191     }
192   }
193 
194 }