1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.spark.example.hbasecontext;
18
19 import org.apache.hadoop.conf.Configuration;
20 import org.apache.hadoop.hbase.HBaseConfiguration;
21 import org.apache.hadoop.hbase.TableName;
22 import org.apache.hadoop.hbase.client.Put;
23 import org.apache.hadoop.hbase.spark.JavaHBaseContext;
24 import org.apache.hadoop.hbase.util.Bytes;
25 import org.apache.spark.SparkConf;
26 import org.apache.spark.api.java.JavaSparkContext;
27 import org.apache.spark.api.java.function.Function;
28 import org.apache.spark.streaming.Duration;
29 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
30 import org.apache.spark.streaming.api.java.JavaStreamingContext;
31
32
33
34
35 final public class JavaHBaseStreamingBulkPutExample {
36
37 private JavaHBaseStreamingBulkPutExample() {}
38
39 public static void main(String[] args) {
40 if (args.length < 4) {
41 System.out.println("JavaHBaseBulkPutExample " +
42 "{host} {port} {tableName}");
43 return;
44 }
45
46 String host = args[0];
47 String port = args[1];
48 String tableName = args[2];
49
50 SparkConf sparkConf =
51 new SparkConf().setAppName("JavaHBaseStreamingBulkPutExample " +
52 tableName + ":" + port + ":" + tableName);
53
54 JavaSparkContext jsc = new JavaSparkContext(sparkConf);
55
56 try {
57 JavaStreamingContext jssc =
58 new JavaStreamingContext(jsc, new Duration(1000));
59
60 JavaReceiverInputDStream<String> javaDstream =
61 jssc.socketTextStream(host, Integer.parseInt(port));
62
63 Configuration conf = HBaseConfiguration.create();
64
65 JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
66
67 hbaseContext.streamBulkPut(javaDstream,
68 TableName.valueOf(tableName),
69 new PutFunction());
70 } finally {
71 jsc.stop();
72 }
73 }
74
75 public static class PutFunction implements Function<String, Put> {
76
77 private static final long serialVersionUID = 1L;
78
79 public Put call(String v) throws Exception {
80 String[] part = v.split(",");
81 Put put = new Put(Bytes.toBytes(part[0]));
82
83 put.addColumn(Bytes.toBytes(part[1]),
84 Bytes.toBytes(part[2]),
85 Bytes.toBytes(part[3]));
86 return put;
87 }
88
89 }
90 }