View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *    http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.hadoop.hbase.spark;
18  
19  import java.io.File;
20  import java.io.IOException;
21  import java.io.Serializable;
22  import java.util.ArrayList;
23  import java.util.Iterator;
24  import java.util.List;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.Cell;
30  import org.apache.hadoop.hbase.CellUtil;
31  import org.apache.hadoop.hbase.HBaseTestingUtility;
32  import org.apache.hadoop.hbase.TableName;
33  import org.apache.hadoop.hbase.client.Connection;
34  import org.apache.hadoop.hbase.client.ConnectionFactory;
35  import org.apache.hadoop.hbase.client.Delete;
36  import org.apache.hadoop.hbase.client.Get;
37  import org.apache.hadoop.hbase.client.Put;
38  import org.apache.hadoop.hbase.client.Result;
39  import org.apache.hadoop.hbase.client.Scan;
40  import org.apache.hadoop.hbase.client.Table;
41  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
42  import org.apache.hadoop.hbase.spark.example.hbasecontext.JavaHBaseBulkDeleteExample;
43  import org.apache.hadoop.hbase.testclassification.MediumTests;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.spark.api.java.JavaRDD;
46  import org.apache.spark.api.java.JavaSparkContext;
47  import org.apache.spark.api.java.function.Function;
48  import org.junit.After;
49  import org.junit.Assert;
50  import org.junit.Before;
51  import org.junit.Test;
52  import org.junit.experimental.categories.Category;
53  
54  import com.google.common.io.Files;
55  
56  import scala.Tuple2;
57  
58  @Category({MediumTests.class})
59  public class TestJavaHBaseContext implements Serializable {
60    private transient JavaSparkContext jsc;
61    HBaseTestingUtility htu;
62    protected static final Log LOG = LogFactory.getLog(TestJavaHBaseContext.class);
63  
64  
65    byte[] tableName = Bytes.toBytes("t1");
66    byte[] columnFamily = Bytes.toBytes("c");
67    String columnFamilyStr = Bytes.toString(columnFamily);
68  
69    @Before
70    public void setUp() {
71      jsc = new JavaSparkContext("local", "JavaHBaseContextSuite");
72  
73      File tempDir = Files.createTempDir();
74      tempDir.deleteOnExit();
75  
76      htu = HBaseTestingUtility.createLocalHTU();
77      try {
78        LOG.info("cleaning up test dir");
79  
80        htu.cleanupTestDir();
81  
82        LOG.info("starting minicluster");
83  
84        htu.startMiniZKCluster();
85        htu.startMiniHBaseCluster(1, 1);
86  
87        LOG.info(" - minicluster started");
88  
89        try {
90          htu.deleteTable(TableName.valueOf(tableName));
91        } catch (Exception e) {
92          LOG.info(" - no table " + Bytes.toString(tableName) + " found");
93        }
94  
95        LOG.info(" - creating table " + Bytes.toString(tableName));
96        htu.createTable(TableName.valueOf(tableName),
97                columnFamily);
98        LOG.info(" - created table");
99      } catch (Exception e1) {
100       throw new RuntimeException(e1);
101     }
102   }
103 
104   @After
105   public void tearDown() {
106     try {
107       htu.deleteTable(TableName.valueOf(tableName));
108       LOG.info("shuting down minicluster");
109       htu.shutdownMiniHBaseCluster();
110       htu.shutdownMiniZKCluster();
111       LOG.info(" - minicluster shut down");
112       htu.cleanupTestDir();
113     } catch (Exception e) {
114       throw new RuntimeException(e);
115     }
116     jsc.stop();
117     jsc = null;
118   }
119 
120   @Test
121   public void testBulkPut() throws IOException {
122 
123     List<String> list = new ArrayList<>();
124     list.add("1," + columnFamilyStr + ",a,1");
125     list.add("2," + columnFamilyStr + ",a,2");
126     list.add("3," + columnFamilyStr + ",a,3");
127     list.add("4," + columnFamilyStr + ",a,4");
128     list.add("5," + columnFamilyStr + ",a,5");
129 
130     JavaRDD<String> rdd = jsc.parallelize(list);
131 
132     Configuration conf = htu.getConfiguration();
133 
134     JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
135 
136     Connection conn = ConnectionFactory.createConnection(conf);
137     Table table = conn.getTable(TableName.valueOf(tableName));
138 
139     try {
140       List<Delete> deletes = new ArrayList<>();
141       for (int i = 1; i < 6; i++) {
142         deletes.add(new Delete(Bytes.toBytes(Integer.toString(i))));
143       }
144       table.delete(deletes);
145     } finally {
146       table.close();
147     }
148 
149     hbaseContext.bulkPut(rdd,
150             TableName.valueOf(tableName),
151             new PutFunction());
152 
153     table = conn.getTable(TableName.valueOf(tableName));
154 
155     try {
156       Result result1 = table.get(new Get(Bytes.toBytes("1")));
157       Assert.assertNotNull("Row 1 should had been deleted", result1.getRow());
158 
159       Result result2 = table.get(new Get(Bytes.toBytes("2")));
160       Assert.assertNotNull("Row 2 should had been deleted", result2.getRow());
161 
162       Result result3 = table.get(new Get(Bytes.toBytes("3")));
163       Assert.assertNotNull("Row 3 should had been deleted", result3.getRow());
164 
165       Result result4 = table.get(new Get(Bytes.toBytes("4")));
166       Assert.assertNotNull("Row 4 should had been deleted", result4.getRow());
167 
168       Result result5 = table.get(new Get(Bytes.toBytes("5")));
169       Assert.assertNotNull("Row 5 should had been deleted", result5.getRow());
170     } finally {
171       table.close();
172       conn.close();
173     }
174   }
175 
176   public static class PutFunction implements Function<String, Put> {
177 
178     private static final long serialVersionUID = 1L;
179 
180     public Put call(String v) throws Exception {
181       String[] cells = v.split(",");
182       Put put = new Put(Bytes.toBytes(cells[0]));
183 
184       put.addColumn(Bytes.toBytes(cells[1]), Bytes.toBytes(cells[2]),
185               Bytes.toBytes(cells[3]));
186       return put;
187     }
188   }
189 
190   @Test
191   public void testBulkDelete() throws IOException {
192     List<byte[]> list = new ArrayList<>();
193     list.add(Bytes.toBytes("1"));
194     list.add(Bytes.toBytes("2"));
195     list.add(Bytes.toBytes("3"));
196 
197     JavaRDD<byte[]> rdd = jsc.parallelize(list);
198 
199     Configuration conf = htu.getConfiguration();
200 
201     populateTableWithMockData(conf, TableName.valueOf(tableName));
202 
203     JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
204 
205     hbaseContext.bulkDelete(rdd, TableName.valueOf(tableName),
206             new JavaHBaseBulkDeleteExample.DeleteFunction(), 2);
207 
208 
209 
210     try (
211             Connection conn = ConnectionFactory.createConnection(conf);
212             Table table = conn.getTable(TableName.valueOf(tableName))
213     ){
214       Result result1 = table.get(new Get(Bytes.toBytes("1")));
215       Assert.assertNull("Row 1 should had been deleted", result1.getRow());
216 
217       Result result2 = table.get(new Get(Bytes.toBytes("2")));
218       Assert.assertNull("Row 2 should had been deleted", result2.getRow());
219 
220       Result result3 = table.get(new Get(Bytes.toBytes("3")));
221       Assert.assertNull("Row 3 should had been deleted", result3.getRow());
222 
223       Result result4 = table.get(new Get(Bytes.toBytes("4")));
224       Assert.assertNotNull("Row 4 should had been deleted", result4.getRow());
225 
226       Result result5 = table.get(new Get(Bytes.toBytes("5")));
227       Assert.assertNotNull("Row 5 should had been deleted", result5.getRow());
228     }
229   }
230 
231   @Test
232   public void testDistributedScan() throws IOException {
233     Configuration conf = htu.getConfiguration();
234 
235     populateTableWithMockData(conf, TableName.valueOf(tableName));
236 
237     JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
238 
239     Scan scan = new Scan();
240     scan.setCaching(100);
241 
242     JavaRDD<String> javaRdd =
243             hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan)
244                     .map(new ScanConvertFunction());
245 
246     List<String> results = javaRdd.collect();
247 
248     Assert.assertEquals(results.size(), 5);
249   }
250 
251   private static class ScanConvertFunction implements
252           Function<Tuple2<ImmutableBytesWritable, Result>, String> {
253     @Override
254     public String call(Tuple2<ImmutableBytesWritable, Result> v1) throws Exception {
255       return Bytes.toString(v1._1().copyBytes());
256     }
257   }
258 
259   @Test
260   public void testBulkGet() throws IOException {
261     List<byte[]> list = new ArrayList<>();
262     list.add(Bytes.toBytes("1"));
263     list.add(Bytes.toBytes("2"));
264     list.add(Bytes.toBytes("3"));
265     list.add(Bytes.toBytes("4"));
266     list.add(Bytes.toBytes("5"));
267 
268     JavaRDD<byte[]> rdd = jsc.parallelize(list);
269 
270     Configuration conf = htu.getConfiguration();
271 
272     populateTableWithMockData(conf, TableName.valueOf(tableName));
273 
274     JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
275 
276     final JavaRDD<String> stringJavaRDD =
277             hbaseContext.bulkGet(TableName.valueOf(tableName), 2, rdd,
278             new GetFunction(),
279             new ResultFunction());
280 
281     Assert.assertEquals(stringJavaRDD.count(), 5);
282   }
283 
284   public static class GetFunction implements Function<byte[], Get> {
285 
286     private static final long serialVersionUID = 1L;
287 
288     public Get call(byte[] v) throws Exception {
289       return new Get(v);
290     }
291   }
292 
293   public static class ResultFunction implements Function<Result, String> {
294 
295     private static final long serialVersionUID = 1L;
296 
297     public String call(Result result) throws Exception {
298       Iterator<Cell> it = result.listCells().iterator();
299       StringBuilder b = new StringBuilder();
300 
301       b.append(Bytes.toString(result.getRow())).append(":");
302 
303       while (it.hasNext()) {
304         Cell cell = it.next();
305         String q = Bytes.toString(CellUtil.cloneQualifier(cell));
306         if ("counter".equals(q)) {
307           b.append("(")
308                   .append(q)
309                   .append(",")
310                   .append(Bytes.toLong(CellUtil.cloneValue(cell)))
311                   .append(")");
312         } else {
313           b.append("(")
314                   .append(q)
315                   .append(",")
316                   .append(Bytes.toString(CellUtil.cloneValue(cell)))
317                   .append(")");
318         }
319       }
320       return b.toString();
321     }
322   }
323 
324   private void populateTableWithMockData(Configuration conf, TableName tableName)
325           throws IOException {
326     try (
327       Connection conn = ConnectionFactory.createConnection(conf);
328       Table table = conn.getTable(tableName)) {
329 
330       List<Put> puts = new ArrayList<>();
331 
332       for (int i = 1; i < 6; i++) {
333         Put put = new Put(Bytes.toBytes(Integer.toString(i)));
334         put.addColumn(columnFamily, columnFamily, columnFamily);
335         puts.add(put);
336       }
337       table.put(puts);
338     }
339   }
340 
341 }