View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *    http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.hadoop.hbase.spark.example.hbasecontext;
18  
19  import org.apache.hadoop.conf.Configuration;
20  import org.apache.hadoop.hbase.HBaseConfiguration;
21  import org.apache.hadoop.hbase.TableName;
22  import org.apache.hadoop.hbase.client.Put;
23  import org.apache.hadoop.hbase.spark.JavaHBaseContext;
24  import org.apache.hadoop.hbase.util.Bytes;
25  import org.apache.spark.SparkConf;
26  import org.apache.spark.api.java.JavaSparkContext;
27  import org.apache.spark.api.java.function.Function;
28  import org.apache.spark.streaming.Duration;
29  import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
30  import org.apache.spark.streaming.api.java.JavaStreamingContext;
31  
32  /**
33   * This is a simple example of BulkPut with Spark Streaming
34   */
35  final public class JavaHBaseStreamingBulkPutExample {
36  
37    private JavaHBaseStreamingBulkPutExample() {}
38  
39    public static void main(String[] args) {
40      if (args.length < 4) {
41        System.out.println("JavaHBaseBulkPutExample  " +
42                "{host} {port} {tableName}");
43        return;
44      }
45  
46      String host = args[0];
47      String port = args[1];
48      String tableName = args[2];
49  
50      SparkConf sparkConf =
51              new SparkConf().setAppName("JavaHBaseStreamingBulkPutExample " +
52                      tableName + ":" + port + ":" + tableName);
53  
54      JavaSparkContext jsc = new JavaSparkContext(sparkConf);
55  
56      try {
57        JavaStreamingContext jssc =
58                new JavaStreamingContext(jsc, new Duration(1000));
59  
60        JavaReceiverInputDStream<String> javaDstream =
61                jssc.socketTextStream(host, Integer.parseInt(port));
62  
63        Configuration conf = HBaseConfiguration.create();
64  
65        JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
66  
67        hbaseContext.streamBulkPut(javaDstream,
68                TableName.valueOf(tableName),
69                new PutFunction());
70      } finally {
71        jsc.stop();
72      }
73    }
74  
75    public static class PutFunction implements Function<String, Put> {
76  
77      private static final long serialVersionUID = 1L;
78  
79      public Put call(String v) throws Exception {
80        String[] part = v.split(",");
81        Put put = new Put(Bytes.toBytes(part[0]));
82  
83        put.addColumn(Bytes.toBytes(part[1]),
84                Bytes.toBytes(part[2]),
85                Bytes.toBytes(part[3]));
86        return put;
87      }
88  
89    }
90  }