1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.coprocessor;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertNotNull;
23 import static org.junit.Assert.assertNull;
24 import static org.junit.Assert.assertTrue;
25 import static org.junit.Assert.fail;
26
27 import java.io.IOException;
28 import java.util.Collections;
29 import java.util.Map;
30 import java.util.NavigableMap;
31 import java.util.TreeMap;
32
33 import org.apache.hadoop.hbase.client.Admin;
34 import org.apache.hadoop.hbase.client.Table;
35 import org.apache.hadoop.hbase.util.ByteStringer;
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.hbase.HBaseTestingUtility;
40 import org.apache.hadoop.hbase.HColumnDescriptor;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.HRegionInfo;
43 import org.apache.hadoop.hbase.HTableDescriptor;
44 import org.apache.hadoop.hbase.testclassification.MediumTests;
45 import org.apache.hadoop.hbase.ServerName;
46 import org.apache.hadoop.hbase.TableName;
47 import org.apache.hadoop.hbase.client.HTable;
48 import org.apache.hadoop.hbase.client.Put;
49 import org.apache.hadoop.hbase.client.coprocessor.Batch;
50 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos;
51 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
52 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
53 import org.apache.hadoop.hbase.ipc.ServerRpcController;
54 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
55 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
56 import org.apache.hadoop.hbase.util.Bytes;
57 import org.junit.AfterClass;
58 import org.junit.BeforeClass;
59 import org.junit.Test;
60 import org.junit.experimental.categories.Category;
61
62 import com.google.protobuf.RpcController;
63 import com.google.protobuf.ServiceException;
64
65
66
67
68 @Category(MediumTests.class)
69 public class TestCoprocessorEndpoint {
70 private static final Log LOG = LogFactory.getLog(TestCoprocessorEndpoint.class);
71
72 private static final TableName TEST_TABLE =
73 TableName.valueOf("TestCoprocessorEndpoint");
74 private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
75 private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
76 private static byte[] ROW = Bytes.toBytes("testRow");
77
78 private static final int ROWSIZE = 20;
79 private static final int rowSeperator1 = 5;
80 private static final int rowSeperator2 = 12;
81 private static byte[][] ROWS = makeN(ROW, ROWSIZE);
82
83 private static HBaseTestingUtility util = new HBaseTestingUtility();
84
85 @BeforeClass
86 public static void setupBeforeClass() throws Exception {
87
88 Configuration conf = util.getConfiguration();
89 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
90 org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName(),
91 ProtobufCoprocessorService.class.getName());
92 conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
93 ProtobufCoprocessorService.class.getName());
94 util.startMiniCluster(2);
95
96 Admin admin = util.getHBaseAdmin();
97 HTableDescriptor desc = new HTableDescriptor(TEST_TABLE);
98 desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
99 admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]});
100 util.waitUntilAllRegionsAssigned(TEST_TABLE);
101
102 Table table = new HTable(conf, TEST_TABLE);
103 for (int i = 0; i < ROWSIZE; i++) {
104 Put put = new Put(ROWS[i]);
105 put.addColumn(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i));
106 table.put(put);
107 }
108 table.close();
109 }
110
111 @AfterClass
112 public static void tearDownAfterClass() throws Exception {
113 util.shutdownMiniCluster();
114 }
115
116 private Map<byte [], Long> sum(final Table table, final byte [] family,
117 final byte [] qualifier, final byte [] start, final byte [] end)
118 throws ServiceException, Throwable {
119 return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class,
120 start, end,
121 new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() {
122 @Override
123 public Long call(ColumnAggregationProtos.ColumnAggregationService instance)
124 throws IOException {
125 BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback =
126 new BlockingRpcCallback<ColumnAggregationProtos.SumResponse>();
127 ColumnAggregationProtos.SumRequest.Builder builder =
128 ColumnAggregationProtos.SumRequest.newBuilder();
129 builder.setFamily(ByteStringer.wrap(family));
130 if (qualifier != null && qualifier.length > 0) {
131 builder.setQualifier(ByteStringer.wrap(qualifier));
132 }
133 instance.sum(null, builder.build(), rpcCallback);
134 return rpcCallback.get().getSum();
135 }
136 });
137 }
138
139 @Test
140 public void testAggregation() throws Throwable {
141 Table table = new HTable(util.getConfiguration(), TEST_TABLE);
142 Map<byte[], Long> results = sum(table, TEST_FAMILY, TEST_QUALIFIER,
143 ROWS[0], ROWS[ROWS.length-1]);
144 int sumResult = 0;
145 int expectedResult = 0;
146 for (Map.Entry<byte[], Long> e : results.entrySet()) {
147 LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
148 sumResult += e.getValue();
149 }
150 for (int i = 0; i < ROWSIZE; i++) {
151 expectedResult += i;
152 }
153 assertEquals("Invalid result", expectedResult, sumResult);
154
155 results.clear();
156
157
158 results = sum(table, TEST_FAMILY, TEST_QUALIFIER,
159 ROWS[rowSeperator1], ROWS[ROWS.length-1]);
160 sumResult = 0;
161 expectedResult = 0;
162 for (Map.Entry<byte[], Long> e : results.entrySet()) {
163 LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
164 sumResult += e.getValue();
165 }
166 for (int i = rowSeperator1; i < ROWSIZE; i++) {
167 expectedResult += i;
168 }
169 assertEquals("Invalid result", expectedResult, sumResult);
170 table.close();
171 }
172
173 @Test
174 public void testCoprocessorService() throws Throwable {
175 HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
176 NavigableMap<HRegionInfo,ServerName> regions = table.getRegionLocations();
177
178 final TestProtos.EchoRequestProto request =
179 TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
180 final Map<byte[], String> results = Collections.synchronizedMap(
181 new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR));
182 try {
183
184 final RpcController controller = new ServerRpcController();
185 table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
186 ROWS[0], ROWS[ROWS.length - 1],
187 new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() {
188 public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance)
189 throws IOException {
190 LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
191 BlockingRpcCallback<TestProtos.EchoResponseProto> callback = new BlockingRpcCallback<TestProtos.EchoResponseProto>();
192 instance.echo(controller, request, callback);
193 TestProtos.EchoResponseProto response = callback.get();
194 LOG.debug("Batch.Call returning result " + response);
195 return response;
196 }
197 },
198 new Batch.Callback<TestProtos.EchoResponseProto>() {
199 public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
200 assertNotNull(result);
201 assertEquals("hello", result.getMessage());
202 results.put(region, result.getMessage());
203 }
204 }
205 );
206 for (Map.Entry<byte[], String> e : results.entrySet()) {
207 LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
208 }
209 assertEquals(3, results.size());
210 for (HRegionInfo info : regions.navigableKeySet()) {
211 LOG.info("Region info is "+info.getRegionNameAsString());
212 assertTrue(results.containsKey(info.getRegionName()));
213 }
214 results.clear();
215
216
217 table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
218 ROWS[rowSeperator1], ROWS[ROWS.length - 1],
219 new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() {
220 public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance)
221 throws IOException {
222 LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
223 BlockingRpcCallback<TestProtos.EchoResponseProto> callback = new BlockingRpcCallback<TestProtos.EchoResponseProto>();
224 instance.echo(controller, request, callback);
225 TestProtos.EchoResponseProto response = callback.get();
226 LOG.debug("Batch.Call returning result " + response);
227 return response;
228 }
229 },
230 new Batch.Callback<TestProtos.EchoResponseProto>() {
231 public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
232 assertNotNull(result);
233 assertEquals("hello", result.getMessage());
234 results.put(region, result.getMessage());
235 }
236 }
237 );
238 for (Map.Entry<byte[], String> e : results.entrySet()) {
239 LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
240 }
241 assertEquals(2, results.size());
242 } finally {
243 table.close();
244 }
245 }
246
247 @Test
248 public void testCoprocessorServiceNullResponse() throws Throwable {
249 HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
250 NavigableMap<HRegionInfo,ServerName> regions = table.getRegionLocations();
251
252 final TestProtos.EchoRequestProto request =
253 TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
254 try {
255
256 final RpcController controller = new ServerRpcController();
257
258 Map<byte[], String> results = table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
259 ROWS[0], ROWS[ROWS.length - 1],
260 new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, String>() {
261 public String call(TestRpcServiceProtos.TestProtobufRpcProto instance)
262 throws IOException {
263 BlockingRpcCallback<TestProtos.EchoResponseProto> callback = new BlockingRpcCallback<TestProtos.EchoResponseProto>();
264 instance.echo(controller, request, callback);
265 TestProtos.EchoResponseProto response = callback.get();
266 LOG.debug("Batch.Call got result " + response);
267 return null;
268 }
269 }
270 );
271 for (Map.Entry<byte[], String> e : results.entrySet()) {
272 LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
273 }
274 assertEquals(3, results.size());
275 for (HRegionInfo info : regions.navigableKeySet()) {
276 LOG.info("Region info is "+info.getRegionNameAsString());
277 assertTrue(results.containsKey(info.getRegionName()));
278 assertNull(results.get(info.getRegionName()));
279 }
280 } finally {
281 table.close();
282 }
283 }
284
285 @Test
286 public void testMasterCoprocessorService() throws Throwable {
287 Admin admin = util.getHBaseAdmin();
288 final TestProtos.EchoRequestProto request =
289 TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
290 TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
291 TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(admin.coprocessorService());
292 assertEquals("hello", service.echo(null, request).getMessage());
293 }
294
295 @Test
296 public void testCoprocessorError() throws Exception {
297 Configuration configuration = new Configuration(util.getConfiguration());
298
299 configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
300 Table table = new HTable(configuration, TEST_TABLE);
301
302 try {
303 CoprocessorRpcChannel protocol = table.coprocessorService(ROWS[0]);
304
305 TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
306 TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(protocol);
307
308 service.error(null, TestProtos.EmptyRequestProto.getDefaultInstance());
309 fail("Should have thrown an exception");
310 } catch (ServiceException e) {
311 } finally {
312 table.close();
313 }
314 }
315
316 @Test
317 public void testMasterCoprocessorError() throws Throwable {
318 Admin admin = util.getHBaseAdmin();
319 TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
320 TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(admin.coprocessorService());
321 try {
322 service.error(null, TestProtos.EmptyRequestProto.getDefaultInstance());
323 fail("Should have thrown an exception");
324 } catch (ServiceException e) {
325 }
326 }
327
328 private static byte[][] makeN(byte[] base, int n) {
329 byte[][] ret = new byte[n][];
330 for (int i = 0; i < n; i++) {
331 ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i)));
332 }
333 return ret;
334 }
335
336 }
337