1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.spark.example.hbasecontext;
18
19 import java.util.ArrayList;
20 import java.util.Iterator;
21 import java.util.List;
22
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.hbase.Cell;
25 import org.apache.hadoop.hbase.HBaseConfiguration;
26 import org.apache.hadoop.hbase.TableName;
27 import org.apache.hadoop.hbase.client.Get;
28 import org.apache.hadoop.hbase.client.Result;
29 import org.apache.hadoop.hbase.spark.JavaHBaseContext;
30 import org.apache.hadoop.hbase.util.Bytes;
31 import org.apache.spark.SparkConf;
32 import org.apache.spark.api.java.JavaRDD;
33 import org.apache.spark.api.java.JavaSparkContext;
34 import org.apache.spark.api.java.function.Function;
35
36
37
38
39
40 final public class JavaHBaseBulkGetExample {
41
42 private JavaHBaseBulkGetExample() {}
43
44 public static void main(String[] args) {
45 if (args.length < 1) {
46 System.out.println("JavaHBaseBulkGetExample {tableName}");
47 return;
48 }
49
50 String tableName = args[0];
51
52 SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkGetExample " + tableName);
53 JavaSparkContext jsc = new JavaSparkContext(sparkConf);
54
55 try {
56 List<byte[]> list = new ArrayList<>();
57 list.add(Bytes.toBytes("1"));
58 list.add(Bytes.toBytes("2"));
59 list.add(Bytes.toBytes("3"));
60 list.add(Bytes.toBytes("4"));
61 list.add(Bytes.toBytes("5"));
62
63 JavaRDD<byte[]> rdd = jsc.parallelize(list);
64
65 Configuration conf = HBaseConfiguration.create();
66
67 JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
68
69 hbaseContext.bulkGet(TableName.valueOf(tableName), 2, rdd, new GetFunction(),
70 new ResultFunction());
71 } finally {
72 jsc.stop();
73 }
74 }
75
76 public static class GetFunction implements Function<byte[], Get> {
77
78 private static final long serialVersionUID = 1L;
79
80 public Get call(byte[] v) throws Exception {
81 return new Get(v);
82 }
83 }
84
85 public static class ResultFunction implements Function<Result, String> {
86
87 private static final long serialVersionUID = 1L;
88
89 public String call(Result result) throws Exception {
90 Iterator<Cell> it = result.listCells().iterator();
91 StringBuilder b = new StringBuilder();
92
93 b.append(Bytes.toString(result.getRow())).append(":");
94
95 while (it.hasNext()) {
96 Cell cell = it.next();
97 String q = Bytes.toString(cell.getQualifierArray());
98 if (q.equals("counter")) {
99 b.append("(")
100 .append(Bytes.toString(cell.getQualifierArray()))
101 .append(",")
102 .append(Bytes.toLong(cell.getValueArray()))
103 .append(")");
104 } else {
105 b.append("(")
106 .append(Bytes.toString(cell.getQualifierArray()))
107 .append(",")
108 .append(Bytes.toString(cell.getValueArray()))
109 .append(")");
110 }
111 }
112 return b.toString();
113 }
114 }
115 }