1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.spark;
18
19 import java.io.File;
20 import java.io.IOException;
21 import java.io.Serializable;
22 import java.util.ArrayList;
23 import java.util.Iterator;
24 import java.util.List;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.Cell;
30 import org.apache.hadoop.hbase.CellUtil;
31 import org.apache.hadoop.hbase.HBaseTestingUtility;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.hadoop.hbase.client.Connection;
34 import org.apache.hadoop.hbase.client.ConnectionFactory;
35 import org.apache.hadoop.hbase.client.Delete;
36 import org.apache.hadoop.hbase.client.Get;
37 import org.apache.hadoop.hbase.client.Put;
38 import org.apache.hadoop.hbase.client.Result;
39 import org.apache.hadoop.hbase.client.Scan;
40 import org.apache.hadoop.hbase.client.Table;
41 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
42 import org.apache.hadoop.hbase.spark.example.hbasecontext.JavaHBaseBulkDeleteExample;
43 import org.apache.hadoop.hbase.testclassification.MediumTests;
44 import org.apache.hadoop.hbase.util.Bytes;
45 import org.apache.spark.api.java.JavaRDD;
46 import org.apache.spark.api.java.JavaSparkContext;
47 import org.apache.spark.api.java.function.Function;
48 import org.junit.After;
49 import org.junit.Assert;
50 import org.junit.Before;
51 import org.junit.Test;
52 import org.junit.experimental.categories.Category;
53
54 import com.google.common.io.Files;
55
56 import scala.Tuple2;
57
58 @Category({MediumTests.class})
59 public class TestJavaHBaseContext implements Serializable {
60 private transient JavaSparkContext jsc;
61 HBaseTestingUtility htu;
62 protected static final Log LOG = LogFactory.getLog(TestJavaHBaseContext.class);
63
64
65 byte[] tableName = Bytes.toBytes("t1");
66 byte[] columnFamily = Bytes.toBytes("c");
67 String columnFamilyStr = Bytes.toString(columnFamily);
68
69 @Before
70 public void setUp() {
71 jsc = new JavaSparkContext("local", "JavaHBaseContextSuite");
72
73 File tempDir = Files.createTempDir();
74 tempDir.deleteOnExit();
75
76 htu = HBaseTestingUtility.createLocalHTU();
77 try {
78 LOG.info("cleaning up test dir");
79
80 htu.cleanupTestDir();
81
82 LOG.info("starting minicluster");
83
84 htu.startMiniZKCluster();
85 htu.startMiniHBaseCluster(1, 1);
86
87 LOG.info(" - minicluster started");
88
89 try {
90 htu.deleteTable(TableName.valueOf(tableName));
91 } catch (Exception e) {
92 LOG.info(" - no table " + Bytes.toString(tableName) + " found");
93 }
94
95 LOG.info(" - creating table " + Bytes.toString(tableName));
96 htu.createTable(TableName.valueOf(tableName),
97 columnFamily);
98 LOG.info(" - created table");
99 } catch (Exception e1) {
100 throw new RuntimeException(e1);
101 }
102 }
103
104 @After
105 public void tearDown() {
106 try {
107 htu.deleteTable(TableName.valueOf(tableName));
108 LOG.info("shuting down minicluster");
109 htu.shutdownMiniHBaseCluster();
110 htu.shutdownMiniZKCluster();
111 LOG.info(" - minicluster shut down");
112 htu.cleanupTestDir();
113 } catch (Exception e) {
114 throw new RuntimeException(e);
115 }
116 jsc.stop();
117 jsc = null;
118 }
119
120 @Test
121 public void testBulkPut() throws IOException {
122
123 List<String> list = new ArrayList<>();
124 list.add("1," + columnFamilyStr + ",a,1");
125 list.add("2," + columnFamilyStr + ",a,2");
126 list.add("3," + columnFamilyStr + ",a,3");
127 list.add("4," + columnFamilyStr + ",a,4");
128 list.add("5," + columnFamilyStr + ",a,5");
129
130 JavaRDD<String> rdd = jsc.parallelize(list);
131
132 Configuration conf = htu.getConfiguration();
133
134 JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
135
136 Connection conn = ConnectionFactory.createConnection(conf);
137 Table table = conn.getTable(TableName.valueOf(tableName));
138
139 try {
140 List<Delete> deletes = new ArrayList<>();
141 for (int i = 1; i < 6; i++) {
142 deletes.add(new Delete(Bytes.toBytes(Integer.toString(i))));
143 }
144 table.delete(deletes);
145 } finally {
146 table.close();
147 }
148
149 hbaseContext.bulkPut(rdd,
150 TableName.valueOf(tableName),
151 new PutFunction());
152
153 table = conn.getTable(TableName.valueOf(tableName));
154
155 try {
156 Result result1 = table.get(new Get(Bytes.toBytes("1")));
157 Assert.assertNotNull("Row 1 should had been deleted", result1.getRow());
158
159 Result result2 = table.get(new Get(Bytes.toBytes("2")));
160 Assert.assertNotNull("Row 2 should had been deleted", result2.getRow());
161
162 Result result3 = table.get(new Get(Bytes.toBytes("3")));
163 Assert.assertNotNull("Row 3 should had been deleted", result3.getRow());
164
165 Result result4 = table.get(new Get(Bytes.toBytes("4")));
166 Assert.assertNotNull("Row 4 should had been deleted", result4.getRow());
167
168 Result result5 = table.get(new Get(Bytes.toBytes("5")));
169 Assert.assertNotNull("Row 5 should had been deleted", result5.getRow());
170 } finally {
171 table.close();
172 conn.close();
173 }
174 }
175
176 public static class PutFunction implements Function<String, Put> {
177
178 private static final long serialVersionUID = 1L;
179
180 public Put call(String v) throws Exception {
181 String[] cells = v.split(",");
182 Put put = new Put(Bytes.toBytes(cells[0]));
183
184 put.addColumn(Bytes.toBytes(cells[1]), Bytes.toBytes(cells[2]),
185 Bytes.toBytes(cells[3]));
186 return put;
187 }
188 }
189
190 @Test
191 public void testBulkDelete() throws IOException {
192 List<byte[]> list = new ArrayList<>();
193 list.add(Bytes.toBytes("1"));
194 list.add(Bytes.toBytes("2"));
195 list.add(Bytes.toBytes("3"));
196
197 JavaRDD<byte[]> rdd = jsc.parallelize(list);
198
199 Configuration conf = htu.getConfiguration();
200
201 populateTableWithMockData(conf, TableName.valueOf(tableName));
202
203 JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
204
205 hbaseContext.bulkDelete(rdd, TableName.valueOf(tableName),
206 new JavaHBaseBulkDeleteExample.DeleteFunction(), 2);
207
208
209
210 try (
211 Connection conn = ConnectionFactory.createConnection(conf);
212 Table table = conn.getTable(TableName.valueOf(tableName))
213 ){
214 Result result1 = table.get(new Get(Bytes.toBytes("1")));
215 Assert.assertNull("Row 1 should had been deleted", result1.getRow());
216
217 Result result2 = table.get(new Get(Bytes.toBytes("2")));
218 Assert.assertNull("Row 2 should had been deleted", result2.getRow());
219
220 Result result3 = table.get(new Get(Bytes.toBytes("3")));
221 Assert.assertNull("Row 3 should had been deleted", result3.getRow());
222
223 Result result4 = table.get(new Get(Bytes.toBytes("4")));
224 Assert.assertNotNull("Row 4 should had been deleted", result4.getRow());
225
226 Result result5 = table.get(new Get(Bytes.toBytes("5")));
227 Assert.assertNotNull("Row 5 should had been deleted", result5.getRow());
228 }
229 }
230
231 @Test
232 public void testDistributedScan() throws IOException {
233 Configuration conf = htu.getConfiguration();
234
235 populateTableWithMockData(conf, TableName.valueOf(tableName));
236
237 JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
238
239 Scan scan = new Scan();
240 scan.setCaching(100);
241
242 JavaRDD<String> javaRdd =
243 hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan)
244 .map(new ScanConvertFunction());
245
246 List<String> results = javaRdd.collect();
247
248 Assert.assertEquals(results.size(), 5);
249 }
250
251 private static class ScanConvertFunction implements
252 Function<Tuple2<ImmutableBytesWritable, Result>, String> {
253 @Override
254 public String call(Tuple2<ImmutableBytesWritable, Result> v1) throws Exception {
255 return Bytes.toString(v1._1().copyBytes());
256 }
257 }
258
259 @Test
260 public void testBulkGet() throws IOException {
261 List<byte[]> list = new ArrayList<>();
262 list.add(Bytes.toBytes("1"));
263 list.add(Bytes.toBytes("2"));
264 list.add(Bytes.toBytes("3"));
265 list.add(Bytes.toBytes("4"));
266 list.add(Bytes.toBytes("5"));
267
268 JavaRDD<byte[]> rdd = jsc.parallelize(list);
269
270 Configuration conf = htu.getConfiguration();
271
272 populateTableWithMockData(conf, TableName.valueOf(tableName));
273
274 JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
275
276 final JavaRDD<String> stringJavaRDD =
277 hbaseContext.bulkGet(TableName.valueOf(tableName), 2, rdd,
278 new GetFunction(),
279 new ResultFunction());
280
281 Assert.assertEquals(stringJavaRDD.count(), 5);
282 }
283
284 public static class GetFunction implements Function<byte[], Get> {
285
286 private static final long serialVersionUID = 1L;
287
288 public Get call(byte[] v) throws Exception {
289 return new Get(v);
290 }
291 }
292
293 public static class ResultFunction implements Function<Result, String> {
294
295 private static final long serialVersionUID = 1L;
296
297 public String call(Result result) throws Exception {
298 Iterator<Cell> it = result.listCells().iterator();
299 StringBuilder b = new StringBuilder();
300
301 b.append(Bytes.toString(result.getRow())).append(":");
302
303 while (it.hasNext()) {
304 Cell cell = it.next();
305 String q = Bytes.toString(CellUtil.cloneQualifier(cell));
306 if ("counter".equals(q)) {
307 b.append("(")
308 .append(q)
309 .append(",")
310 .append(Bytes.toLong(CellUtil.cloneValue(cell)))
311 .append(")");
312 } else {
313 b.append("(")
314 .append(q)
315 .append(",")
316 .append(Bytes.toString(CellUtil.cloneValue(cell)))
317 .append(")");
318 }
319 }
320 return b.toString();
321 }
322 }
323
324 private void populateTableWithMockData(Configuration conf, TableName tableName)
325 throws IOException {
326 try (
327 Connection conn = ConnectionFactory.createConnection(conf);
328 Table table = conn.getTable(tableName)) {
329
330 List<Put> puts = new ArrayList<>();
331
332 for (int i = 1; i < 6; i++) {
333 Put put = new Put(Bytes.toBytes(Integer.toString(i)));
334 put.addColumn(columnFamily, columnFamily, columnFamily);
335 puts.add(put);
336 }
337 table.put(puts);
338 }
339 }
340
341 }