1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.spark.example.hbasecontext;
18
19 import java.util.List;
20
21 import org.apache.hadoop.conf.Configuration;
22 import org.apache.hadoop.hbase.HBaseConfiguration;
23 import org.apache.hadoop.hbase.TableName;
24 import org.apache.hadoop.hbase.client.Result;
25 import org.apache.hadoop.hbase.client.Scan;
26 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
27 import org.apache.hadoop.hbase.spark.JavaHBaseContext;
28 import org.apache.hadoop.hbase.util.Bytes;
29 import org.apache.spark.SparkConf;
30 import org.apache.spark.api.java.JavaRDD;
31 import org.apache.spark.api.java.JavaSparkContext;
32
33 import org.apache.spark.api.java.function.Function;
34 import scala.Tuple2;
35
36
37
38
39
40 final public class JavaHBaseDistributedScan {
41
42 private JavaHBaseDistributedScan() {}
43
44 public static void main(String[] args) {
45 if (args.length < 1) {
46 System.out.println("JavaHBaseDistributedScan {tableName}");
47 return;
48 }
49
50 String tableName = args[0];
51
52 SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseDistributedScan " + tableName);
53 JavaSparkContext jsc = new JavaSparkContext(sparkConf);
54
55 try {
56 Configuration conf = HBaseConfiguration.create();
57
58 JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
59
60 Scan scan = new Scan();
61 scan.setCaching(100);
62
63 JavaRDD<Tuple2<ImmutableBytesWritable, Result>> javaRdd =
64 hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan);
65
66 List<String> results = javaRdd.map(new ScanConvertFunction()).collect();
67
68 System.out.println("Result Size: " + results.size());
69 } finally {
70 jsc.stop();
71 }
72 }
73
74 private static class ScanConvertFunction implements
75 Function<Tuple2<ImmutableBytesWritable, Result>, String> {
76 @Override
77 public String call(Tuple2<ImmutableBytesWritable, Result> v1) throws Exception {
78 return Bytes.toString(v1._1().copyBytes());
79 }
80 }
81 }