1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.coprocessor;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.List;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.Cell;
27 import org.apache.hadoop.hbase.CellUtil;
28 import org.apache.hadoop.hbase.Coprocessor;
29 import org.apache.hadoop.hbase.CoprocessorEnvironment;
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.client.Scan;
32 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationServiceNullResponse;
33 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.SumRequest;
34 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.SumResponse;
35 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
36 import org.apache.hadoop.hbase.regionserver.InternalScanner;
37 import org.apache.hadoop.hbase.regionserver.Region;
38 import org.apache.hadoop.hbase.util.Bytes;
39
40 import com.google.protobuf.RpcCallback;
41 import com.google.protobuf.RpcController;
42 import com.google.protobuf.Service;
43
44
45
46
47
48
49 public class ColumnAggregationEndpointNullResponse
50 extends
51 ColumnAggregationServiceNullResponse
52 implements Coprocessor, CoprocessorService {
53 static final Log LOG = LogFactory.getLog(ColumnAggregationEndpointNullResponse.class);
54 private RegionCoprocessorEnvironment env = null;
55 @Override
56 public Service getService() {
57 return this;
58 }
59
60 @Override
61 public void start(CoprocessorEnvironment env) throws IOException {
62 if (env instanceof RegionCoprocessorEnvironment) {
63 this.env = (RegionCoprocessorEnvironment)env;
64 return;
65 }
66 throw new CoprocessorException("Must be loaded on a table region!");
67 }
68
69 @Override
70 public void stop(CoprocessorEnvironment env) throws IOException {
71
72 }
73
74 @Override
75 public void sum(RpcController controller, SumRequest request, RpcCallback<SumResponse> done) {
76
77 Scan scan = new Scan();
78
79 byte[] family = request.getFamily().toByteArray();
80 byte[] qualifier = request.hasQualifier() ? request.getQualifier().toByteArray() : null;
81 if (request.hasQualifier()) {
82 scan.addColumn(family, qualifier);
83 } else {
84 scan.addFamily(family);
85 }
86 int sumResult = 0;
87 InternalScanner scanner = null;
88 try {
89 Region region = this.env.getRegion();
90
91 if (Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW)) {
92 done.run(null);
93 return;
94 }
95 scanner = region.getScanner(scan);
96 List<Cell> curVals = new ArrayList<Cell>();
97 boolean hasMore = false;
98 do {
99 curVals.clear();
100 hasMore = scanner.next(curVals);
101 for (Cell kv : curVals) {
102 if (CellUtil.matchingQualifier(kv, qualifier)) {
103 sumResult += Bytes.toInt(kv.getValueArray(), kv.getValueOffset());
104 }
105 }
106 } while (hasMore);
107 } catch (IOException e) {
108 ResponseConverter.setControllerException(controller, e);
109
110 sumResult = -1;
111 LOG.info("Setting sum result to -1 to indicate error", e);
112 } finally {
113 if (scanner != null) {
114 try {
115 scanner.close();
116 } catch (IOException e) {
117 ResponseConverter.setControllerException(controller, e);
118 sumResult = -1;
119 LOG.info("Setting sum result to -1 to indicate error", e);
120 }
121 }
122 }
123 done.run(SumResponse.newBuilder().setSum(sumResult).build());
124 LOG.info("Returning sum " + sumResult + " for region " +
125 Bytes.toStringBinary(env.getRegion().getRegionInfo().getRegionName()));
126 }
127 }