View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.coprocessor;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertTrue;
23  
24  import java.util.Collections;
25  import java.util.Map;
26  import java.util.TreeMap;
27  
28  import org.apache.hadoop.hbase.client.Admin;
29  import org.apache.hadoop.hbase.client.Table;
30  import org.apache.hadoop.hbase.util.ByteStringer;
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.hbase.HBaseTestingUtility;
35  import org.apache.hadoop.hbase.HColumnDescriptor;
36  import org.apache.hadoop.hbase.HTableDescriptor;
37  import org.apache.hadoop.hbase.testclassification.MediumTests;
38  import org.apache.hadoop.hbase.TableName;
39  import org.apache.hadoop.hbase.client.HBaseAdmin;
40  import org.apache.hadoop.hbase.client.HTable;
41  import org.apache.hadoop.hbase.client.Put;
42  import org.apache.hadoop.hbase.client.coprocessor.Batch;
43  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos;
44  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumResponse;
45  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos;
46  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos;
47  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationServiceNullResponse;
48  import org.apache.hadoop.hbase.util.Bytes;
49  import org.junit.AfterClass;
50  import org.junit.BeforeClass;
51  import org.junit.Test;
52  import org.junit.experimental.categories.Category;
53  
54  import com.google.protobuf.ServiceException;
55  
56  /**
57   * TestEndpoint: test cases to verify the batch execution of coprocessor Endpoint
58   */
59  @Category(MediumTests.class)
60  public class TestBatchCoprocessorEndpoint {
61    private static final Log LOG = LogFactory.getLog(TestBatchCoprocessorEndpoint.class);
62  
63    private static final TableName TEST_TABLE =
64        TableName.valueOf("TestTable");
65    private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
66    private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
67    private static byte[] ROW = Bytes.toBytes("testRow");
68  
69    private static final int ROWSIZE = 20;
70    private static final int rowSeperator1 = 5;
71    private static final int rowSeperator2 = 12;
72    private static byte[][] ROWS = makeN(ROW, ROWSIZE);
73  
74    private static HBaseTestingUtility util = new HBaseTestingUtility();
75  
76    @BeforeClass
77    public static void setupBeforeClass() throws Exception {
78      // set configure to indicate which cp should be loaded
79      Configuration conf = util.getConfiguration();
80      conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
81          org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName(),
82          ProtobufCoprocessorService.class.getName(),
83          ColumnAggregationEndpointWithErrors.class.getName(),
84          ColumnAggregationEndpointNullResponse.class.getName());
85      conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
86          ProtobufCoprocessorService.class.getName());
87      util.startMiniCluster(2);
88      Admin admin = new HBaseAdmin(conf);
89      HTableDescriptor desc = new HTableDescriptor(TEST_TABLE);
90      desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
91      admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]});
92      util.waitUntilAllRegionsAssigned(TEST_TABLE);
93      admin.close();
94  
95      Table table = new HTable(conf, TEST_TABLE);
96      for (int i = 0; i < ROWSIZE; i++) {
97        Put put = new Put(ROWS[i]);
98        put.add(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i));
99        table.put(put);
100     }
101     table.close();
102   }
103 
104   @AfterClass
105   public static void tearDownAfterClass() throws Exception {
106     util.shutdownMiniCluster();
107   }
108 
109   @Test
110   public void testAggregationNullResponse() throws Throwable {
111     Table table = new HTable(util.getConfiguration(), TEST_TABLE);
112     ColumnAggregationWithNullResponseProtos.SumRequest.Builder builder =
113         ColumnAggregationWithNullResponseProtos.SumRequest
114         .newBuilder();
115     builder.setFamily(ByteStringer.wrap(TEST_FAMILY));
116     if (TEST_QUALIFIER != null && TEST_QUALIFIER.length > 0) {
117       builder.setQualifier(ByteStringer.wrap(TEST_QUALIFIER));
118     }
119     Map<byte[], ColumnAggregationWithNullResponseProtos.SumResponse> results =
120         table.batchCoprocessorService(
121             ColumnAggregationServiceNullResponse.getDescriptor().findMethodByName("sum"),
122             builder.build(), ROWS[0], ROWS[ROWS.length - 1],
123             ColumnAggregationWithNullResponseProtos.SumResponse.getDefaultInstance());
124 
125     int sumResult = 0;
126     int expectedResult = 0;
127     for (Map.Entry<byte[], ColumnAggregationWithNullResponseProtos.SumResponse> e :
128         results.entrySet()) {
129       LOG.info("Got value " + e.getValue().getSum() + " for region "
130           + Bytes.toStringBinary(e.getKey()));
131       sumResult += e.getValue().getSum();
132     }
133     for (int i = 0; i < rowSeperator2; i++) {
134       expectedResult += i;
135     }
136     assertEquals("Invalid result", expectedResult, sumResult);
137     table.close();
138   }
139 
140   private static byte[][] makeN(byte[] base, int n) {
141     byte[][] ret = new byte[n][];
142     for (int i = 0; i < n; i++) {
143       ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i)));
144     }
145     return ret;
146   }
147 
148   private Map<byte[], SumResponse> sum(final Table table, final byte[] family,
149       final byte[] qualifier, final byte[] start, final byte[] end) throws ServiceException,
150       Throwable {
151     ColumnAggregationProtos.SumRequest.Builder builder = ColumnAggregationProtos.SumRequest
152         .newBuilder();
153     builder.setFamily(ByteStringer.wrap(family));
154     if (qualifier != null && qualifier.length > 0) {
155       builder.setQualifier(ByteStringer.wrap(qualifier));
156     }
157     return table.batchCoprocessorService(
158         ColumnAggregationProtos.ColumnAggregationService.getDescriptor().findMethodByName("sum"),
159         builder.build(), start, end, ColumnAggregationProtos.SumResponse.getDefaultInstance());
160   }
161 
162   @Test
163   public void testAggregationWithReturnValue() throws Throwable {
164     Table table = new HTable(util.getConfiguration(), TEST_TABLE);
165     Map<byte[], SumResponse> results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[0],
166         ROWS[ROWS.length - 1]);
167     int sumResult = 0;
168     int expectedResult = 0;
169     for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
170       LOG.info("Got value " + e.getValue().getSum() + " for region "
171           + Bytes.toStringBinary(e.getKey()));
172       sumResult += e.getValue().getSum();
173     }
174     for (int i = 0; i < ROWSIZE; i++) {
175       expectedResult += i;
176     }
177     assertEquals("Invalid result", expectedResult, sumResult);
178 
179     results.clear();
180 
181     // scan: for region 2 and region 3
182     results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[rowSeperator1],
183         ROWS[ROWS.length - 1]);
184     sumResult = 0;
185     expectedResult = 0;
186     for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
187       LOG.info("Got value " + e.getValue().getSum() + " for region "
188           + Bytes.toStringBinary(e.getKey()));
189       sumResult += e.getValue().getSum();
190     }
191     for (int i = rowSeperator1; i < ROWSIZE; i++) {
192       expectedResult += i;
193     }
194     assertEquals("Invalid result", expectedResult, sumResult);
195     table.close();
196   }
197 
198   @Test
199   public void testAggregation() throws Throwable {
200     Table table = new HTable(util.getConfiguration(), TEST_TABLE);
201     Map<byte[], SumResponse> results = sum(table, TEST_FAMILY, TEST_QUALIFIER,
202         ROWS[0], ROWS[ROWS.length - 1]);
203     int sumResult = 0;
204     int expectedResult = 0;
205     for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
206       LOG.info("Got value " + e.getValue().getSum() + " for region "
207           + Bytes.toStringBinary(e.getKey()));
208       sumResult += e.getValue().getSum();
209     }
210     for (int i = 0; i < ROWSIZE; i++) {
211       expectedResult += i;
212     }
213     assertEquals("Invalid result", expectedResult, sumResult);
214 
215     // scan: for region 2 and region 3
216     results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[rowSeperator1], ROWS[ROWS.length - 1]);
217     sumResult = 0;
218     expectedResult = 0;
219     for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
220       LOG.info("Got value " + e.getValue().getSum() + " for region "
221           + Bytes.toStringBinary(e.getKey()));
222       sumResult += e.getValue().getSum();
223     }
224     for (int i = rowSeperator1; i < ROWSIZE; i++) {
225       expectedResult += i;
226     }
227     assertEquals("Invalid result", expectedResult, sumResult);
228     table.close();
229   }
230 
231   @Test
232   public void testAggregationWithErrors() throws Throwable {
233     Table table = new HTable(util.getConfiguration(), TEST_TABLE);
234     final Map<byte[], ColumnAggregationWithErrorsProtos.SumResponse> results =
235         Collections.synchronizedMap(
236             new TreeMap<byte[], ColumnAggregationWithErrorsProtos.SumResponse>(
237                 Bytes.BYTES_COMPARATOR
238             ));
239     ColumnAggregationWithErrorsProtos.SumRequest.Builder builder =
240         ColumnAggregationWithErrorsProtos.SumRequest
241         .newBuilder();
242     builder.setFamily(ByteStringer.wrap(TEST_FAMILY));
243     if (TEST_QUALIFIER != null && TEST_QUALIFIER.length > 0) {
244       builder.setQualifier(ByteStringer.wrap(TEST_QUALIFIER));
245     }
246 
247     boolean hasError = false;
248     try {
249       table.batchCoprocessorService(
250           ColumnAggregationWithErrorsProtos.ColumnAggregationServiceWithErrors.getDescriptor()
251               .findMethodByName("sum"),
252           builder.build(), ROWS[0], ROWS[ROWS.length - 1],
253           ColumnAggregationWithErrorsProtos.SumResponse.getDefaultInstance(),
254           new Batch.Callback<ColumnAggregationWithErrorsProtos.SumResponse>() {
255 
256             @Override
257             public void update(byte[] region, byte[] row,
258                 ColumnAggregationWithErrorsProtos.SumResponse result) {
259               results.put(region, result);
260             }
261           });
262     } catch (Throwable t) {
263       LOG.info("Exceptions in coprocessor service", t);
264       hasError = true;
265     }
266 
267     int sumResult = 0;
268     int expectedResult = 0;
269     for (Map.Entry<byte[], ColumnAggregationWithErrorsProtos.SumResponse> e : results.entrySet()) {
270       LOG.info("Got value " + e.getValue().getSum() + " for region "
271           + Bytes.toStringBinary(e.getKey()));
272       sumResult += e.getValue().getSum();
273     }
274     for (int i = 0; i < rowSeperator2; i++) {
275       expectedResult += i;
276     }
277     assertEquals("Invalid result", expectedResult, sumResult);
278     assertTrue(hasError);
279     table.close();
280   }
281 }