View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *    http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.hadoop.hbase.spark.example.hbasecontext;
18  
19  import java.util.List;
20  
21  import org.apache.hadoop.conf.Configuration;
22  import org.apache.hadoop.hbase.HBaseConfiguration;
23  import org.apache.hadoop.hbase.TableName;
24  import org.apache.hadoop.hbase.client.Result;
25  import org.apache.hadoop.hbase.client.Scan;
26  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
27  import org.apache.hadoop.hbase.spark.JavaHBaseContext;
28  import org.apache.hadoop.hbase.util.Bytes;
29  import org.apache.spark.SparkConf;
30  import org.apache.spark.api.java.JavaRDD;
31  import org.apache.spark.api.java.JavaSparkContext;
32  
33  import org.apache.spark.api.java.function.Function;
34  import scala.Tuple2;
35  
36  /**
37   * This is a simple example of scanning records from HBase
38   * with the hbaseRDD function.
39   */
40  final public class JavaHBaseDistributedScan {
41  
42    private JavaHBaseDistributedScan() {}
43  
44    public static void main(String[] args) {
45      if (args.length < 1) {
46        System.out.println("JavaHBaseDistributedScan {tableName}");
47        return;
48      }
49  
50      String tableName = args[0];
51  
52      SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseDistributedScan " + tableName);
53      JavaSparkContext jsc = new JavaSparkContext(sparkConf);
54  
55      try {
56        Configuration conf = HBaseConfiguration.create();
57  
58        JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
59  
60        Scan scan = new Scan();
61        scan.setCaching(100);
62  
63        JavaRDD<Tuple2<ImmutableBytesWritable, Result>> javaRdd =
64                hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan);
65  
66        List<String> results = javaRdd.map(new ScanConvertFunction()).collect();
67  
68        System.out.println("Result Size: " + results.size());
69      } finally {
70        jsc.stop();
71      }
72    }
73  
74    private static class ScanConvertFunction implements
75            Function<Tuple2<ImmutableBytesWritable, Result>, String> {
76      @Override
77      public String call(Tuple2<ImmutableBytesWritable, Result> v1) throws Exception {
78        return Bytes.toString(v1._1().copyBytes());
79      }
80    }
81  }