1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.spark.example.hbasecontext;
18
19 import java.util.ArrayList;
20 import java.util.Iterator;
21 import java.util.List;
22
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.hbase.HBaseConfiguration;
25 import org.apache.hadoop.hbase.TableName;
26 import org.apache.hadoop.hbase.client.BufferedMutator;
27 import org.apache.hadoop.hbase.client.Connection;
28 import org.apache.hadoop.hbase.client.Get;
29 import org.apache.hadoop.hbase.client.Put;
30 import org.apache.hadoop.hbase.client.Result;
31 import org.apache.hadoop.hbase.client.Table;
32 import org.apache.hadoop.hbase.spark.JavaHBaseContext;
33 import org.apache.hadoop.hbase.util.Bytes;
34 import org.apache.spark.SparkConf;
35 import org.apache.spark.api.java.JavaRDD;
36 import org.apache.spark.api.java.JavaSparkContext;
37 import org.apache.spark.api.java.function.Function;
38 import org.apache.spark.api.java.function.VoidFunction;
39
40 import scala.Tuple2;
41
42
43
44
45
46 final public class JavaHBaseMapGetPutExample {
47
48 private JavaHBaseMapGetPutExample() {}
49
50 public static void main(String[] args) {
51 if (args.length < 1) {
52 System.out.println("JavaHBaseBulkGetExample {tableName}");
53 return;
54 }
55
56 final String tableName = args[0];
57
58 SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkGetExample " + tableName);
59 JavaSparkContext jsc = new JavaSparkContext(sparkConf);
60
61 try {
62 List<byte[]> list = new ArrayList<>();
63 list.add(Bytes.toBytes("1"));
64 list.add(Bytes.toBytes("2"));
65 list.add(Bytes.toBytes("3"));
66 list.add(Bytes.toBytes("4"));
67 list.add(Bytes.toBytes("5"));
68
69 JavaRDD<byte[]> rdd = jsc.parallelize(list);
70 Configuration conf = HBaseConfiguration.create();
71
72 JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
73
74 hbaseContext.foreachPartition(rdd,
75 new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() {
76 public void call(Tuple2<Iterator<byte[]>, Connection> t)
77 throws Exception {
78 Table table = t._2().getTable(TableName.valueOf(tableName));
79 BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName));
80
81 while (t._1().hasNext()) {
82 byte[] b = t._1().next();
83 Result r = table.get(new Get(b));
84 if (r.getExists()) {
85 mutator.mutate(new Put(b));
86 }
87 }
88
89 mutator.flush();
90 mutator.close();
91 table.close();
92 }
93 });
94 } finally {
95 jsc.stop();
96 }
97 }
98
99 public static class GetFunction implements Function<byte[], Get> {
100 private static final long serialVersionUID = 1L;
101 public Get call(byte[] v) throws Exception {
102 return new Get(v);
103 }
104 }
105 }