View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.coprocessor;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertNotNull;
23  import static org.junit.Assert.assertNull;
24  import static org.junit.Assert.assertTrue;
25  import static org.junit.Assert.fail;
26  
27  import java.io.IOException;
28  import java.util.Collections;
29  import java.util.Map;
30  import java.util.NavigableMap;
31  import java.util.TreeMap;
32  
33  import org.apache.hadoop.hbase.client.Admin;
34  import org.apache.hadoop.hbase.client.Table;
35  import org.apache.hadoop.hbase.util.ByteStringer;
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.hbase.HBaseTestingUtility;
40  import org.apache.hadoop.hbase.HColumnDescriptor;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HRegionInfo;
43  import org.apache.hadoop.hbase.HTableDescriptor;
44  import org.apache.hadoop.hbase.testclassification.MediumTests;
45  import org.apache.hadoop.hbase.ServerName;
46  import org.apache.hadoop.hbase.TableName;
47  import org.apache.hadoop.hbase.client.HTable;
48  import org.apache.hadoop.hbase.client.Put;
49  import org.apache.hadoop.hbase.client.coprocessor.Batch;
50  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos;
51  import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
52  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
53  import org.apache.hadoop.hbase.ipc.ServerRpcController;
54  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
55  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
56  import org.apache.hadoop.hbase.util.Bytes;
57  import org.junit.AfterClass;
58  import org.junit.BeforeClass;
59  import org.junit.Test;
60  import org.junit.experimental.categories.Category;
61  
62  import com.google.protobuf.RpcController;
63  import com.google.protobuf.ServiceException;
64  
65  /**
66   * TestEndpoint: test cases to verify coprocessor Endpoint
67   */
68  @Category(MediumTests.class)
69  public class TestCoprocessorEndpoint {
70    private static final Log LOG = LogFactory.getLog(TestCoprocessorEndpoint.class);
71  
72    private static final TableName TEST_TABLE =
73        TableName.valueOf("TestCoprocessorEndpoint");
74    private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
75    private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
76    private static byte[] ROW = Bytes.toBytes("testRow");
77  
78    private static final int ROWSIZE = 20;
79    private static final int rowSeperator1 = 5;
80    private static final int rowSeperator2 = 12;
81    private static byte[][] ROWS = makeN(ROW, ROWSIZE);
82  
83    private static HBaseTestingUtility util = new HBaseTestingUtility();
84  
85    @BeforeClass
86    public static void setupBeforeClass() throws Exception {
87      // set configure to indicate which cp should be loaded
88      Configuration conf = util.getConfiguration();
89      conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
90          org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName(),
91          ProtobufCoprocessorService.class.getName());
92      conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
93          ProtobufCoprocessorService.class.getName());
94      util.startMiniCluster(2);
95  
96      Admin admin = util.getHBaseAdmin();
97      HTableDescriptor desc = new HTableDescriptor(TEST_TABLE);
98      desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
99      admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]});
100     util.waitUntilAllRegionsAssigned(TEST_TABLE);
101 
102     Table table = new HTable(conf, TEST_TABLE);
103     for (int i = 0; i < ROWSIZE; i++) {
104       Put put = new Put(ROWS[i]);
105       put.addColumn(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i));
106       table.put(put);
107     }
108     table.close();
109   }
110 
111   @AfterClass
112   public static void tearDownAfterClass() throws Exception {
113     util.shutdownMiniCluster();
114   }
115 
116   private Map<byte [], Long> sum(final Table table, final byte [] family,
117       final byte [] qualifier, final byte [] start, final byte [] end)
118   throws ServiceException, Throwable {
119     return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class,
120         start, end,
121       new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() {
122         @Override
123         public Long call(ColumnAggregationProtos.ColumnAggregationService instance)
124         throws IOException {
125           BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback =
126               new BlockingRpcCallback<ColumnAggregationProtos.SumResponse>();
127           ColumnAggregationProtos.SumRequest.Builder builder =
128             ColumnAggregationProtos.SumRequest.newBuilder();
129           builder.setFamily(ByteStringer.wrap(family));
130           if (qualifier != null && qualifier.length > 0) {
131             builder.setQualifier(ByteStringer.wrap(qualifier));
132           }
133           instance.sum(null, builder.build(), rpcCallback);
134           return rpcCallback.get().getSum();
135         }
136       });
137   }
138 
139   @Test
140   public void testAggregation() throws Throwable {
141     Table table = new HTable(util.getConfiguration(), TEST_TABLE);
142     Map<byte[], Long> results = sum(table, TEST_FAMILY, TEST_QUALIFIER,
143       ROWS[0], ROWS[ROWS.length-1]);
144     int sumResult = 0;
145     int expectedResult = 0;
146     for (Map.Entry<byte[], Long> e : results.entrySet()) {
147       LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
148       sumResult += e.getValue();
149     }
150     for (int i = 0; i < ROWSIZE; i++) {
151       expectedResult += i;
152     }
153     assertEquals("Invalid result", expectedResult, sumResult);
154 
155     results.clear();
156 
157     // scan: for region 2 and region 3
158     results = sum(table, TEST_FAMILY, TEST_QUALIFIER,
159       ROWS[rowSeperator1], ROWS[ROWS.length-1]);
160     sumResult = 0;
161     expectedResult = 0;
162     for (Map.Entry<byte[], Long> e : results.entrySet()) {
163       LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
164       sumResult += e.getValue();
165     }
166     for (int i = rowSeperator1; i < ROWSIZE; i++) {
167       expectedResult += i;
168     }
169     assertEquals("Invalid result", expectedResult, sumResult);
170     table.close();
171   }
172 
173   @Test
174   public void testCoprocessorService() throws Throwable {
175     HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
176     NavigableMap<HRegionInfo,ServerName> regions = table.getRegionLocations();
177 
178     final TestProtos.EchoRequestProto request =
179         TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
180     final Map<byte[], String> results = Collections.synchronizedMap(
181         new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR));
182     try {
183       // scan: for all regions
184       final RpcController controller = new ServerRpcController();
185       table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
186           ROWS[0], ROWS[ROWS.length - 1],
187           new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() {
188             public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance)
189                 throws IOException {
190               LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
191               BlockingRpcCallback<TestProtos.EchoResponseProto> callback = new BlockingRpcCallback<TestProtos.EchoResponseProto>();
192               instance.echo(controller, request, callback);
193               TestProtos.EchoResponseProto response = callback.get();
194               LOG.debug("Batch.Call returning result " + response);
195               return response;
196             }
197           },
198           new Batch.Callback<TestProtos.EchoResponseProto>() {
199             public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
200               assertNotNull(result);
201               assertEquals("hello", result.getMessage());
202               results.put(region, result.getMessage());
203             }
204           }
205       );
206       for (Map.Entry<byte[], String> e : results.entrySet()) {
207         LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
208       }
209       assertEquals(3, results.size());
210       for (HRegionInfo info : regions.navigableKeySet()) {
211         LOG.info("Region info is "+info.getRegionNameAsString());
212         assertTrue(results.containsKey(info.getRegionName()));
213       }
214       results.clear();
215 
216       // scan: for region 2 and region 3
217       table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
218           ROWS[rowSeperator1], ROWS[ROWS.length - 1],
219           new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() {
220             public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance)
221                 throws IOException {
222               LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
223               BlockingRpcCallback<TestProtos.EchoResponseProto> callback = new BlockingRpcCallback<TestProtos.EchoResponseProto>();
224               instance.echo(controller, request, callback);
225               TestProtos.EchoResponseProto response = callback.get();
226               LOG.debug("Batch.Call returning result " + response);
227               return response;
228             }
229           },
230           new Batch.Callback<TestProtos.EchoResponseProto>() {
231             public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
232               assertNotNull(result);
233               assertEquals("hello", result.getMessage());
234               results.put(region, result.getMessage());
235             }
236           }
237       );
238       for (Map.Entry<byte[], String> e : results.entrySet()) {
239         LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
240       }
241       assertEquals(2, results.size());
242     } finally {
243       table.close();
244     }
245   }
246 
247   @Test
248   public void testCoprocessorServiceNullResponse() throws Throwable {
249     HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
250     NavigableMap<HRegionInfo,ServerName> regions = table.getRegionLocations();
251 
252     final TestProtos.EchoRequestProto request =
253         TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
254     try {
255       // scan: for all regions
256       final RpcController controller = new ServerRpcController();
257       // test that null results are supported
258       Map<byte[], String> results = table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
259           ROWS[0], ROWS[ROWS.length - 1],
260           new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, String>() {
261             public String call(TestRpcServiceProtos.TestProtobufRpcProto instance)
262                 throws IOException {
263               BlockingRpcCallback<TestProtos.EchoResponseProto> callback = new BlockingRpcCallback<TestProtos.EchoResponseProto>();
264               instance.echo(controller, request, callback);
265               TestProtos.EchoResponseProto response = callback.get();
266               LOG.debug("Batch.Call got result " + response);
267               return null;
268             }
269           }
270       );
271       for (Map.Entry<byte[], String> e : results.entrySet()) {
272         LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
273       }
274       assertEquals(3, results.size());
275       for (HRegionInfo info : regions.navigableKeySet()) {
276         LOG.info("Region info is "+info.getRegionNameAsString());
277         assertTrue(results.containsKey(info.getRegionName()));
278         assertNull(results.get(info.getRegionName()));
279       }
280     } finally {
281       table.close();
282     }
283   }
284 
285   @Test
286   public void testMasterCoprocessorService() throws Throwable {
287     Admin admin = util.getHBaseAdmin();
288     final TestProtos.EchoRequestProto request =
289         TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
290     TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
291         TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(admin.coprocessorService());
292     assertEquals("hello", service.echo(null, request).getMessage());
293   }
294 
295   @Test
296   public void testCoprocessorError() throws Exception {
297     Configuration configuration = new Configuration(util.getConfiguration());
298     // Make it not retry forever
299     configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
300     Table table = new HTable(configuration, TEST_TABLE);
301 
302     try {
303       CoprocessorRpcChannel protocol = table.coprocessorService(ROWS[0]);
304 
305       TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
306           TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(protocol);
307 
308       service.error(null, TestProtos.EmptyRequestProto.getDefaultInstance());
309       fail("Should have thrown an exception");
310     } catch (ServiceException e) {
311     } finally {
312       table.close();
313     }
314   }
315 
316   @Test
317   public void testMasterCoprocessorError() throws Throwable {
318     Admin admin = util.getHBaseAdmin();
319     TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
320         TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(admin.coprocessorService());
321     try {
322       service.error(null, TestProtos.EmptyRequestProto.getDefaultInstance());
323       fail("Should have thrown an exception");
324     } catch (ServiceException e) {
325     }
326   }
327 
328   private static byte[][] makeN(byte[] base, int n) {
329     byte[][] ret = new byte[n][];
330     for (int i = 0; i < n; i++) {
331       ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i)));
332     }
333     return ret;
334   }
335 
336 }
337