View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *    http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.hadoop.hbase.spark.example.hbasecontext;
18  
19  import java.util.ArrayList;
20  import java.util.Iterator;
21  import java.util.List;
22  
23  import org.apache.hadoop.conf.Configuration;
24  import org.apache.hadoop.hbase.HBaseConfiguration;
25  import org.apache.hadoop.hbase.TableName;
26  import org.apache.hadoop.hbase.client.BufferedMutator;
27  import org.apache.hadoop.hbase.client.Connection;
28  import org.apache.hadoop.hbase.client.Get;
29  import org.apache.hadoop.hbase.client.Put;
30  import org.apache.hadoop.hbase.client.Result;
31  import org.apache.hadoop.hbase.client.Table;
32  import org.apache.hadoop.hbase.spark.JavaHBaseContext;
33  import org.apache.hadoop.hbase.util.Bytes;
34  import org.apache.spark.SparkConf;
35  import org.apache.spark.api.java.JavaRDD;
36  import org.apache.spark.api.java.JavaSparkContext;
37  import org.apache.spark.api.java.function.Function;
38  import org.apache.spark.api.java.function.VoidFunction;
39  
40  import scala.Tuple2;
41  
42  /**
43   * This is a simple example of using the foreachPartition
44   * method with a HBase connection
45   */
46  final public class JavaHBaseMapGetPutExample {
47  
48    private JavaHBaseMapGetPutExample() {}
49  
50    public static void main(String[] args) {
51      if (args.length < 1) {
52        System.out.println("JavaHBaseBulkGetExample {tableName}");
53        return;
54      }
55  
56      final String tableName = args[0];
57  
58      SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkGetExample " + tableName);
59      JavaSparkContext jsc = new JavaSparkContext(sparkConf);
60  
61      try {
62        List<byte[]> list = new ArrayList<>();
63        list.add(Bytes.toBytes("1"));
64        list.add(Bytes.toBytes("2"));
65        list.add(Bytes.toBytes("3"));
66        list.add(Bytes.toBytes("4"));
67        list.add(Bytes.toBytes("5"));
68  
69        JavaRDD<byte[]> rdd = jsc.parallelize(list);
70        Configuration conf = HBaseConfiguration.create();
71  
72        JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
73  
74        hbaseContext.foreachPartition(rdd,
75                new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() {
76          public void call(Tuple2<Iterator<byte[]>, Connection> t)
77                  throws Exception {
78            Table table = t._2().getTable(TableName.valueOf(tableName));
79            BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName));
80  
81            while (t._1().hasNext()) {
82              byte[] b = t._1().next();
83              Result r = table.get(new Get(b));
84              if (r.getExists()) {
85                mutator.mutate(new Put(b));
86              }
87            }
88  
89            mutator.flush();
90            mutator.close();
91            table.close();
92          }
93        });
94      } finally {
95        jsc.stop();
96      }
97    }
98  
99    public static class GetFunction implements Function<byte[], Get> {
100     private static final long serialVersionUID = 1L;
101     public Get call(byte[] v) throws Exception {
102       return new Get(v);
103     }
104   }
105 }