1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertNull;
23 import static org.junit.Assert.assertTrue;
24
25 import java.io.IOException;
26 import java.util.Map;
27 import java.util.Map.Entry;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.Coprocessor;
32 import org.apache.hadoop.hbase.CoprocessorEnvironment;
33 import org.apache.hadoop.hbase.HBaseTestingUtility;
34 import org.apache.hadoop.hbase.HRegionInfo;
35 import org.apache.hadoop.hbase.HRegionLocation;
36 import org.apache.hadoop.hbase.ServerName;
37 import org.apache.hadoop.hbase.TableName;
38 import org.apache.hadoop.hbase.client.HTable;
39 import org.apache.hadoop.hbase.client.Put;
40 import org.apache.hadoop.hbase.client.RegionLocator;
41 import org.apache.hadoop.hbase.client.Table;
42 import org.apache.hadoop.hbase.client.coprocessor.Batch;
43 import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
44 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
45 import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
46 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
47 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos;
48 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.CountRequest;
49 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.CountResponse;
50 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.HelloRequest;
51 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.HelloResponse;
52 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.IncrementCountRequest;
53 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.IncrementCountResponse;
54 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.NoopRequest;
55 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.NoopResponse;
56 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.PingRequest;
57 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.PingResponse;
58 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
59 import org.apache.hadoop.hbase.testclassification.MediumTests;
60 import org.apache.hadoop.hbase.util.Bytes;
61 import org.junit.After;
62 import org.junit.AfterClass;
63 import org.junit.Before;
64 import org.junit.BeforeClass;
65 import org.junit.Test;
66 import org.junit.experimental.categories.Category;
67
68 import com.google.protobuf.RpcCallback;
69 import com.google.protobuf.RpcController;
70 import com.google.protobuf.Service;
71 import com.google.protobuf.ServiceException;
72
73 @Category(MediumTests.class)
74 public class TestServerCustomProtocol {
75 private static final Log LOG = LogFactory.getLog(TestServerCustomProtocol.class);
76 static final String WHOAREYOU = "Who are you?";
77 static final String NOBODY = "nobody";
78 static final String HELLO = "Hello, ";
79
80
81 public static class PingHandler extends PingProtos.PingService
82 implements Coprocessor, CoprocessorService {
83 private int counter = 0;
84
85 @Override
86 public void start(CoprocessorEnvironment env) throws IOException {
87 if (env instanceof RegionCoprocessorEnvironment) return;
88 throw new CoprocessorException("Must be loaded on a table region!");
89 }
90
91 @Override
92 public void stop(CoprocessorEnvironment env) throws IOException {
93
94 }
95
96 @Override
97 public void ping(RpcController controller, PingRequest request,
98 RpcCallback<PingResponse> done) {
99 this.counter++;
100 done.run(PingResponse.newBuilder().setPong("pong").build());
101 }
102
103 @Override
104 public void count(RpcController controller, CountRequest request,
105 RpcCallback<CountResponse> done) {
106 done.run(CountResponse.newBuilder().setCount(this.counter).build());
107 }
108
109 @Override
110 public void increment(RpcController controller,
111 IncrementCountRequest request, RpcCallback<IncrementCountResponse> done) {
112 this.counter += request.getDiff();
113 done.run(IncrementCountResponse.newBuilder().setCount(this.counter).build());
114 }
115
116 @Override
117 public void hello(RpcController controller, HelloRequest request,
118 RpcCallback<HelloResponse> done) {
119 if (!request.hasName()) done.run(HelloResponse.newBuilder().setResponse(WHOAREYOU).build());
120 else if (request.getName().equals(NOBODY)) done.run(HelloResponse.newBuilder().build());
121 else done.run(HelloResponse.newBuilder().setResponse(HELLO + request.getName()).build());
122 }
123
124 @Override
125 public void noop(RpcController controller, NoopRequest request,
126 RpcCallback<NoopResponse> done) {
127 done.run(NoopResponse.newBuilder().build());
128 }
129
130 @Override
131 public Service getService() {
132 return this;
133 }
134 }
135
136 private static final TableName TEST_TABLE = TableName.valueOf("test");
137 private static final byte[] TEST_FAMILY = Bytes.toBytes("f1");
138
139 private static final byte[] ROW_A = Bytes.toBytes("aaa");
140 private static final byte[] ROW_B = Bytes.toBytes("bbb");
141 private static final byte[] ROW_C = Bytes.toBytes("ccc");
142
143 private static final byte[] ROW_AB = Bytes.toBytes("abb");
144 private static final byte[] ROW_BC = Bytes.toBytes("bcc");
145
146 private static HBaseTestingUtility util = new HBaseTestingUtility();
147
148 @BeforeClass
149 public static void setupBeforeClass() throws Exception {
150 util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
151 PingHandler.class.getName());
152 util.startMiniCluster();
153 }
154
155 @Before
156 public void before() throws Exception {
157 final byte[][] SPLIT_KEYS = new byte[][] { ROW_B, ROW_C };
158 HTable table = util.createTable(TEST_TABLE, TEST_FAMILY, SPLIT_KEYS);
159
160 Put puta = new Put( ROW_A );
161 puta.add(TEST_FAMILY, Bytes.toBytes("col1"), Bytes.toBytes(1));
162 table.put(puta);
163
164 Put putb = new Put( ROW_B );
165 putb.add(TEST_FAMILY, Bytes.toBytes("col1"), Bytes.toBytes(1));
166 table.put(putb);
167
168 Put putc = new Put( ROW_C );
169 putc.add(TEST_FAMILY, Bytes.toBytes("col1"), Bytes.toBytes(1));
170 table.put(putc);
171 }
172
173 @After
174 public void after() throws Exception {
175 util.deleteTable(TEST_TABLE);
176 }
177
178 @AfterClass
179 public static void tearDownAfterClass() throws Exception {
180 util.shutdownMiniCluster();
181 }
182
183 @Test
184 public void testSingleProxy() throws Throwable {
185 Table table = new HTable(util.getConfiguration(), TEST_TABLE);
186 Map<byte [], String> results = ping(table, null, null);
187
188 assertEquals(3, results.size());
189 for (Map.Entry<byte [], String> e: results.entrySet()) {
190 assertEquals("Invalid custom protocol response", "pong", e.getValue());
191 }
192 hello(table, "George", HELLO + "George");
193 LOG.info("Did george");
194 hello(table, null, "Who are you?");
195 LOG.info("Who are you");
196 hello(table, NOBODY, null);
197 LOG.info(NOBODY);
198 Map<byte [], Integer> intResults = table.coprocessorService(PingProtos.PingService.class,
199 null, null,
200 new Batch.Call<PingProtos.PingService, Integer>() {
201 @Override
202 public Integer call(PingProtos.PingService instance) throws IOException {
203 BlockingRpcCallback<PingProtos.CountResponse> rpcCallback =
204 new BlockingRpcCallback<PingProtos.CountResponse>();
205 instance.count(null, PingProtos.CountRequest.newBuilder().build(), rpcCallback);
206 return rpcCallback.get().getCount();
207 }
208 });
209 int count = -1;
210 for (Map.Entry<byte [], Integer> e: intResults.entrySet()) {
211 assertTrue(e.getValue() > 0);
212 count = e.getValue();
213 }
214 final int diff = 5;
215 intResults = table.coprocessorService(PingProtos.PingService.class,
216 null, null,
217 new Batch.Call<PingProtos.PingService, Integer>() {
218 @Override
219 public Integer call(PingProtos.PingService instance) throws IOException {
220 BlockingRpcCallback<PingProtos.IncrementCountResponse> rpcCallback =
221 new BlockingRpcCallback<PingProtos.IncrementCountResponse>();
222 instance.increment(null, PingProtos.IncrementCountRequest.newBuilder().setDiff(diff).build(),
223 rpcCallback);
224 return rpcCallback.get().getCount();
225 }
226 });
227
228 assertEquals(3, results.size());
229 for (Map.Entry<byte [], Integer> e: intResults.entrySet()) {
230 assertEquals(e.getValue().intValue(), count + diff);
231 }
232 table.close();
233 }
234
235 private Map<byte [], String> hello(final Table table, final String send, final String response)
236 throws ServiceException, Throwable {
237 Map<byte [], String> results = hello(table, send);
238 for (Map.Entry<byte [], String> e: results.entrySet()) {
239 assertEquals("Invalid custom protocol response", response, e.getValue());
240 }
241 return results;
242 }
243
244 private Map<byte [], String> hello(final Table table, final String send)
245 throws ServiceException, Throwable {
246 return hello(table, send, null, null);
247 }
248
249 private Map<byte [], String> hello(final Table table, final String send, final byte [] start,
250 final byte [] end)
251 throws ServiceException, Throwable {
252 return table.coprocessorService(PingProtos.PingService.class,
253 start, end,
254 new Batch.Call<PingProtos.PingService, String>() {
255 @Override
256 public String call(PingProtos.PingService instance) throws IOException {
257 BlockingRpcCallback<PingProtos.HelloResponse> rpcCallback =
258 new BlockingRpcCallback<PingProtos.HelloResponse>();
259 PingProtos.HelloRequest.Builder builder = PingProtos.HelloRequest.newBuilder();
260 if (send != null) builder.setName(send);
261 instance.hello(null, builder.build(), rpcCallback);
262 PingProtos.HelloResponse r = rpcCallback.get();
263 return r != null && r.hasResponse()? r.getResponse(): null;
264 }
265 });
266 }
267
268 private Map<byte [], String> compoundOfHelloAndPing(final Table table, final byte [] start,
269 final byte [] end)
270 throws ServiceException, Throwable {
271 return table.coprocessorService(PingProtos.PingService.class,
272 start, end,
273 new Batch.Call<PingProtos.PingService, String>() {
274 @Override
275 public String call(PingProtos.PingService instance) throws IOException {
276 BlockingRpcCallback<PingProtos.HelloResponse> rpcCallback =
277 new BlockingRpcCallback<PingProtos.HelloResponse>();
278 PingProtos.HelloRequest.Builder builder = PingProtos.HelloRequest.newBuilder();
279
280 builder.setName(doPing(instance));
281 instance.hello(null, builder.build(), rpcCallback);
282 PingProtos.HelloResponse r = rpcCallback.get();
283 return r != null && r.hasResponse()? r.getResponse(): null;
284 }
285 });
286 }
287
288 private Map<byte [], String> noop(final Table table, final byte [] start,
289 final byte [] end)
290 throws ServiceException, Throwable {
291 return table.coprocessorService(PingProtos.PingService.class, start, end,
292 new Batch.Call<PingProtos.PingService, String>() {
293 @Override
294 public String call(PingProtos.PingService instance) throws IOException {
295 BlockingRpcCallback<PingProtos.NoopResponse> rpcCallback =
296 new BlockingRpcCallback<PingProtos.NoopResponse>();
297 PingProtos.NoopRequest.Builder builder = PingProtos.NoopRequest.newBuilder();
298 instance.noop(null, builder.build(), rpcCallback);
299 rpcCallback.get();
300
301 return null;
302 }
303 });
304 }
305
306 @Test
307 public void testSingleMethod() throws Throwable {
308 try (HTable table = new HTable(util.getConfiguration(), TEST_TABLE)) {
309 RegionLocator locator = table.getRegionLocator();
310 Map<byte [], String> results = table.coprocessorService(PingProtos.PingService.class,
311 null, ROW_A,
312 new Batch.Call<PingProtos.PingService, String>() {
313 @Override
314 public String call(PingProtos.PingService instance) throws IOException {
315 BlockingRpcCallback<PingProtos.PingResponse> rpcCallback =
316 new BlockingRpcCallback<PingProtos.PingResponse>();
317 instance.ping(null, PingProtos.PingRequest.newBuilder().build(), rpcCallback);
318 return rpcCallback.get().getPong();
319 }
320 });
321
322
323 assertEquals(1, results.size());
324 verifyRegionResults(locator, results, ROW_A);
325
326 final String name = "NAME";
327 results = hello(table, name, null, ROW_A);
328
329
330 assertEquals(1, results.size());
331 verifyRegionResults(locator, results, "Hello, NAME", ROW_A);
332 }
333 }
334
335 @Test
336 public void testRowRange() throws Throwable {
337 try (HTable table = new HTable(util.getConfiguration(), TEST_TABLE)) {
338 RegionLocator locator = table.getRegionLocator();
339 for (Entry<HRegionInfo, ServerName> e: table.getRegionLocations().entrySet()) {
340 LOG.info("Region " + e.getKey().getRegionNameAsString() + ", servername=" + e.getValue());
341 }
342
343
344
345
346
347
348 Map<byte [], String> results = ping(table, null, ROW_A);
349
350 assertEquals(1, results.size());
351 verifyRegionResults(locator, results, ROW_A);
352
353
354 results = ping(table, ROW_BC, null);
355 assertEquals(2, results.size());
356
357 HRegionLocation loc = table.getRegionLocation(ROW_A, true);
358 assertNull("Should be missing region for row aaa (prior to start row)",
359 results.get(loc.getRegionInfo().getRegionName()));
360 verifyRegionResults(locator, results, ROW_B);
361 verifyRegionResults(locator, results, ROW_C);
362
363
364 results = ping(table, null, ROW_BC);
365
366 assertEquals(2, results.size());
367 verifyRegionResults(locator, results, ROW_A);
368 verifyRegionResults(locator, results, ROW_B);
369 loc = table.getRegionLocation(ROW_C, true);
370 assertNull("Should be missing region for row ccc (past stop row)",
371 results.get(loc.getRegionInfo().getRegionName()));
372
373
374 results = ping(table, ROW_AB, ROW_BC);
375
376 assertEquals(2, results.size());
377 verifyRegionResults(locator, results, ROW_A);
378 verifyRegionResults(locator, results, ROW_B);
379 loc = table.getRegionLocation(ROW_C, true);
380 assertNull("Should be missing region for row ccc (past stop row)",
381 results.get(loc.getRegionInfo().getRegionName()));
382
383
384 results = ping(table, ROW_B, ROW_BC);
385
386 assertEquals(1, results.size());
387 verifyRegionResults(locator, results, ROW_B);
388 loc = table.getRegionLocation(ROW_A, true);
389 assertNull("Should be missing region for row aaa (prior to start)",
390 results.get(loc.getRegionInfo().getRegionName()));
391 loc = table.getRegionLocation(ROW_C, true);
392 assertNull("Should be missing region for row ccc (past stop row)",
393 results.get(loc.getRegionInfo().getRegionName()));
394 }
395 }
396
397 private Map<byte [], String> ping(final Table table, final byte [] start, final byte [] end)
398 throws ServiceException, Throwable {
399 return table.coprocessorService(PingProtos.PingService.class, start, end,
400 new Batch.Call<PingProtos.PingService, String>() {
401 @Override
402 public String call(PingProtos.PingService instance) throws IOException {
403 return doPing(instance);
404 }
405 });
406 }
407
408 private static String doPing(PingProtos.PingService instance) throws IOException {
409 BlockingRpcCallback<PingProtos.PingResponse> rpcCallback =
410 new BlockingRpcCallback<PingProtos.PingResponse>();
411 instance.ping(null, PingProtos.PingRequest.newBuilder().build(), rpcCallback);
412 return rpcCallback.get().getPong();
413 }
414
415 @Test
416 public void testCompoundCall() throws Throwable {
417 try (HTable table = new HTable(util.getConfiguration(), TEST_TABLE)) {
418 RegionLocator locator = table.getRegionLocator();
419 Map<byte [], String> results = compoundOfHelloAndPing(table, ROW_A, ROW_C);
420 verifyRegionResults(locator, results, "Hello, pong", ROW_A);
421 verifyRegionResults(locator, results, "Hello, pong", ROW_B);
422 verifyRegionResults(locator, results, "Hello, pong", ROW_C);
423 }
424 }
425
426 @Test
427 public void testNullCall() throws Throwable {
428 try(HTable table = new HTable(util.getConfiguration(), TEST_TABLE)) {
429 RegionLocator locator = table.getRegionLocator();
430 Map<byte[],String> results = hello(table, null, ROW_A, ROW_C);
431 verifyRegionResults(locator, results, "Who are you?", ROW_A);
432 verifyRegionResults(locator, results, "Who are you?", ROW_B);
433 verifyRegionResults(locator, results, "Who are you?", ROW_C);
434 }
435 }
436
437 @Test
438 public void testNullReturn() throws Throwable {
439 try (HTable table = new HTable(util.getConfiguration(), TEST_TABLE)) {
440 RegionLocator locator = table.getRegionLocator();
441 Map<byte[],String> results = hello(table, "nobody", ROW_A, ROW_C);
442 verifyRegionResults(locator, results, null, ROW_A);
443 verifyRegionResults(locator, results, null, ROW_B);
444 verifyRegionResults(locator, results, null, ROW_C);
445 }
446 }
447
448 @Test
449 public void testEmptyReturnType() throws Throwable {
450 try (HTable table = new HTable(util.getConfiguration(), TEST_TABLE)) {
451 Map<byte[],String> results = noop(table, ROW_A, ROW_C);
452 assertEquals("Should have results from three regions", 3, results.size());
453
454 for (Object v : results.values()) {
455 assertNull(v);
456 }
457 }
458 }
459
460 private void verifyRegionResults(RegionLocator table,
461 Map<byte[],String> results, byte[] row) throws Exception {
462 verifyRegionResults(table, results, "pong", row);
463 }
464
465 private void verifyRegionResults(RegionLocator regionLocator,
466 Map<byte[], String> results, String expected, byte[] row)
467 throws Exception {
468 for (Map.Entry<byte [], String> e: results.entrySet()) {
469 LOG.info("row=" + Bytes.toString(row) + ", expected=" + expected +
470 ", result key=" + Bytes.toString(e.getKey()) +
471 ", value=" + e.getValue());
472 }
473 HRegionLocation loc = regionLocator.getRegionLocation(row, true);
474 byte[] region = loc.getRegionInfo().getRegionName();
475 assertTrue("Results should contain region " +
476 Bytes.toStringBinary(region) + " for row '" + Bytes.toStringBinary(row)+ "'",
477 results.containsKey(region));
478 assertEquals("Invalid result for row '"+Bytes.toStringBinary(row)+"'",
479 expected, results.get(region));
480 }
481 }