1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import com.google.common.collect.ImmutableList;
21 import com.google.common.collect.Lists;
22 import com.google.protobuf.BlockingService;
23 import com.google.protobuf.Descriptors.MethodDescriptor;
24 import com.google.protobuf.Message;
25 import com.google.protobuf.RpcController;
26 import com.google.protobuf.ServiceException;
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.Abortable;
31 import org.apache.hadoop.hbase.Cell;
32 import org.apache.hadoop.hbase.CellScanner;
33 import org.apache.hadoop.hbase.CellUtil;
34 import org.apache.hadoop.hbase.HBaseConfiguration;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.KeyValue;
37 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
38 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
39 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
40 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
41 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
42 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
43 import org.apache.hadoop.hbase.security.User;
44 import org.apache.hadoop.hbase.testclassification.SmallTests;
45 import org.apache.hadoop.hbase.util.Pair;
46 import org.junit.Ignore;
47 import org.junit.Test;
48 import org.junit.experimental.categories.Category;
49 import org.mockito.Mockito;
50
51 import java.io.IOException;
52 import java.net.InetSocketAddress;
53 import java.util.ArrayList;
54 import java.util.List;
55
56 import static org.mockito.Mockito.mock;
57
58 @Category({SmallTests.class})
59 public class TestRpcHandlerException {
60 public static final Log LOG = LogFactory.getLog(TestRpcHandlerException.class);
61 static String example = "xyz";
62 static byte[] CELL_BYTES = example.getBytes();
63 static Cell CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
64
65 private final static Configuration CONF = HBaseConfiguration.create();
66 RpcExecutor rpcExecutor = Mockito.mock(RpcExecutor.class);
67
68
69
70
71
72 private static final BlockingService SERVICE =
73 TestRpcServiceProtos.TestProtobufRpcProto
74 .newReflectiveBlockingService(new TestRpcServiceProtos
75 .TestProtobufRpcProto.BlockingInterface() {
76
77 @Override
78 public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
79 throws ServiceException {
80 return null;
81 }
82
83 @Override
84 public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
85 throws ServiceException {
86 return null;
87 }
88
89 @Override
90 public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
91 throws Error, RuntimeException {
92 if (controller instanceof PayloadCarryingRpcController) {
93 PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller;
94
95
96
97
98
99 CellScanner cellScanner = pcrc.cellScanner();
100 List<Cell> list = null;
101 if (cellScanner != null) {
102 list = new ArrayList<Cell>();
103 try {
104 while (cellScanner.advance()) {
105 list.add(cellScanner.current());
106 throw new StackOverflowError();
107 }
108 } catch (StackOverflowError e) {
109 throw e;
110 } catch (IOException e) {
111 throw new RuntimeException(e);
112 }
113 }
114 cellScanner = CellUtil.createCellScanner(list);
115 ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner);
116 }
117 return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
118 }
119 });
120
121
122
123
124
125 private static class TestRpcServer extends RpcServer {
126
127 TestRpcServer() throws IOException {
128 this(new FifoRpcScheduler(CONF, 1));
129 }
130
131 TestRpcServer(RpcScheduler scheduler) throws IOException {
132 super(null, "testRpcServer",
133 Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
134 new InetSocketAddress("localhost", 0), CONF, scheduler);
135 }
136
137 @Override
138 public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
139 Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
140 throws IOException {
141 return super.call(service, md, param, cellScanner, receiveTime, status);
142 }
143 }
144
145
146
147
148
149
150 private class AbortServer implements Abortable {
151 private boolean aborted = false;
152
153 @Override
154 public void abort(String why, Throwable e) {
155 aborted = true;
156 }
157
158 @Override
159 public boolean isAborted() {
160 return aborted;
161 }
162 }
163
164
165
166
167 @Ignore
168 @Test
169 public void testRpcScheduler() throws IOException, InterruptedException {
170 PriorityFunction qosFunction = mock(PriorityFunction.class);
171 Abortable abortable = new AbortServer();
172 RpcScheduler scheduler = new SimpleRpcScheduler(CONF, 2, 0, 0, qosFunction, abortable, 0);
173 RpcServer rpcServer = new TestRpcServer(scheduler);
174 RpcClientImpl client = new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT);
175 try {
176 rpcServer.start();
177 MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
178 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
179 PayloadCarryingRpcController controller =
180 new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL)));
181 InetSocketAddress address = rpcServer.getListenerAddress();
182 if (address == null) {
183 throw new IOException("Listener channel is closed");
184 }
185 client.call(controller, md, param, md.getOutputType().toProto(), User.getCurrent(),
186 address);
187 } catch (Throwable e) {
188 assert(abortable.isAborted() == true);
189 } finally {
190 rpcServer.stop();
191 }
192 }
193
194 }