View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.coprocessor;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.List;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.hbase.Cell;
28  import org.apache.hadoop.hbase.CellUtil;
29  import org.apache.hadoop.hbase.Coprocessor;
30  import org.apache.hadoop.hbase.CoprocessorEnvironment;
31  import org.apache.hadoop.hbase.client.Scan;
32  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.ColumnAggregationService;
33  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumRequest;
34  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumResponse;
35  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
36  import org.apache.hadoop.hbase.regionserver.InternalScanner;
37  import org.apache.hadoop.hbase.util.Bytes;
38  
39  import com.google.protobuf.RpcCallback;
40  import com.google.protobuf.RpcController;
41  import com.google.protobuf.Service;
42  
43  
44  /**
45   * The aggregation implementation at a region.
46   */
47  public class ColumnAggregationEndpoint extends ColumnAggregationService
48  implements Coprocessor, CoprocessorService {
49    static final Log LOG = LogFactory.getLog(ColumnAggregationEndpoint.class);
50    private RegionCoprocessorEnvironment env = null;
51  
52    @Override
53    public Service getService() {
54      return this;
55    }
56  
57    @Override
58    public void start(CoprocessorEnvironment env) throws IOException {
59      if (env instanceof RegionCoprocessorEnvironment) {
60        this.env = (RegionCoprocessorEnvironment)env;
61        return;
62      }
63      throw new CoprocessorException("Must be loaded on a table region!");
64    }
65  
66    @Override
67    public void stop(CoprocessorEnvironment env) throws IOException {
68      // Nothing to do.
69    }
70  
71    @Override
72    public void sum(RpcController controller, SumRequest request, RpcCallback<SumResponse> done) {
73      // aggregate at each region
74      Scan scan = new Scan();
75      // Family is required in pb. Qualifier is not.
76      byte [] family = request.getFamily().toByteArray();
77      byte [] qualifier = request.hasQualifier()? request.getQualifier().toByteArray(): null;
78      if (request.hasQualifier()) {
79        scan.addColumn(family, qualifier);
80      } else {
81        scan.addFamily(family);
82      }
83      int sumResult = 0;
84      InternalScanner scanner = null;
85      try {
86        scanner = this.env.getRegion().getScanner(scan);
87        List<Cell> curVals = new ArrayList<Cell>();
88        boolean hasMore = false;
89        do {
90          curVals.clear();
91          hasMore = scanner.next(curVals);
92          for (Cell kv : curVals) {
93            if (CellUtil.matchingQualifier(kv, qualifier)) {
94              sumResult += Bytes.toInt(kv.getValueArray(), kv.getValueOffset());
95            }
96          }
97        } while (hasMore);
98      } catch (IOException e) {
99        ResponseConverter.setControllerException(controller, e);
100       // Set result to -1 to indicate error.
101       sumResult = -1;
102       LOG.info("Setting sum result to -1 to indicate error", e);
103     } finally {
104       if (scanner != null) {
105         try {
106           scanner.close();
107         } catch (IOException e) {
108           ResponseConverter.setControllerException(controller, e);
109           sumResult = -1;
110           LOG.info("Setting sum result to -1 to indicate error", e);
111         }
112       }
113     }
114     LOG.info("Returning result " + sumResult);
115     done.run(SumResponse.newBuilder().setSum(sumResult).build());
116   }
117 }