1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertTrue;
24 import static org.mockito.Matchers.anyObject;
25 import static org.mockito.Mockito.doAnswer;
26 import static org.mockito.Mockito.doReturn;
27 import static org.mockito.Mockito.doThrow;
28 import static org.mockito.Mockito.mock;
29 import static org.mockito.Mockito.spy;
30
31 import java.io.IOException;
32 import java.util.Arrays;
33 import java.util.Map;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.hbase.*;
38 import org.apache.hadoop.hbase.client.Connection;
39 import org.apache.hadoop.hbase.client.ConnectionFactory;
40 import org.apache.hadoop.hbase.client.HTable;
41 import org.apache.hadoop.hbase.client.Put;
42 import org.apache.hadoop.hbase.client.Result;
43 import org.apache.hadoop.hbase.client.ResultScanner;
44 import org.apache.hadoop.hbase.client.Scan;
45 import org.apache.hadoop.hbase.client.Table;
46 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
47 import org.apache.hadoop.hbase.filter.Filter;
48 import org.apache.hadoop.hbase.filter.RegexStringComparator;
49 import org.apache.hadoop.hbase.filter.RowFilter;
50 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
51 import org.apache.hadoop.hbase.testclassification.LargeTests;
52 import org.apache.hadoop.hbase.util.Bytes;
53 import org.apache.hadoop.io.NullWritable;
54 import org.apache.hadoop.mapred.JobConf;
55 import org.apache.hadoop.mapred.JobConfigurable;
56 import org.apache.hadoop.mapred.MiniMRCluster;
57 import org.apache.hadoop.mapreduce.InputFormat;
58 import org.apache.hadoop.mapreduce.Job;
59 import org.apache.hadoop.mapreduce.JobContext;
60 import org.apache.hadoop.mapreduce.Mapper.Context;
61 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
62 import org.junit.AfterClass;
63 import org.junit.Before;
64 import org.junit.BeforeClass;
65 import org.junit.Test;
66 import org.junit.experimental.categories.Category;
67 import org.mockito.invocation.InvocationOnMock;
68 import org.mockito.stubbing.Answer;
69
70
71
72
73
74 @Category(LargeTests.class)
75 public class TestTableInputFormat {
76
77 private static final Log LOG = LogFactory.getLog(TestTableInputFormat.class);
78
79 private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
80 private static MiniMRCluster mrCluster;
81 static final byte[] FAMILY = Bytes.toBytes("family");
82
83 private static final byte[][] columns = new byte[][] { FAMILY };
84
85 @BeforeClass
86 public static void beforeClass() throws Exception {
87 UTIL.startMiniCluster();
88 mrCluster = UTIL.startMiniMapReduceCluster();
89 }
90
91 @AfterClass
92 public static void afterClass() throws Exception {
93 UTIL.shutdownMiniMapReduceCluster();
94 UTIL.shutdownMiniCluster();
95 }
96
97 @Before
98 public void before() throws IOException {
99 LOG.info("before");
100 UTIL.ensureSomeRegionServersAvailable(1);
101 LOG.info("before done");
102 }
103
104
105
106
107
108
109
110
111 public static Table createTable(byte[] tableName) throws IOException {
112 return createTable(tableName, new byte[][] { FAMILY });
113 }
114
115
116
117
118
119
120
121
122 public static Table createTable(byte[] tableName, byte[][] families) throws IOException {
123 Table table = UTIL.createTable(TableName.valueOf(tableName), families);
124 Put p = new Put("aaa".getBytes());
125 for (byte[] family : families) {
126 p.add(family, null, "value aaa".getBytes());
127 }
128 table.put(p);
129 p = new Put("bbb".getBytes());
130 for (byte[] family : families) {
131 p.add(family, null, "value bbb".getBytes());
132 }
133 table.put(p);
134 return table;
135 }
136
137
138
139
140
141
142
143
144
145
146 static boolean checkResult(Result r, ImmutableBytesWritable key,
147 byte[] expectedKey, byte[] expectedValue) {
148 assertEquals(0, key.compareTo(expectedKey));
149 Map<byte[], byte[]> vals = r.getFamilyMap(FAMILY);
150 byte[] value = vals.values().iterator().next();
151 assertTrue(Arrays.equals(value, expectedValue));
152 return true;
153 }
154
155
156
157
158
159
160
161
162
163 static void runTestMapreduce(Table table) throws IOException,
164 InterruptedException {
165 org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl trr =
166 new org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl();
167 Scan s = new Scan();
168 s.setStartRow("aaa".getBytes());
169 s.setStopRow("zzz".getBytes());
170 s.addFamily(FAMILY);
171 trr.setScan(s);
172 trr.setHTable(table);
173
174 trr.initialize(null, null);
175 Result r = new Result();
176 ImmutableBytesWritable key = new ImmutableBytesWritable();
177
178 boolean more = trr.nextKeyValue();
179 assertTrue(more);
180 key = trr.getCurrentKey();
181 r = trr.getCurrentValue();
182 checkResult(r, key, "aaa".getBytes(), "value aaa".getBytes());
183
184 more = trr.nextKeyValue();
185 assertTrue(more);
186 key = trr.getCurrentKey();
187 r = trr.getCurrentValue();
188 checkResult(r, key, "bbb".getBytes(), "value bbb".getBytes());
189
190
191 more = trr.nextKeyValue();
192 assertFalse(more);
193 }
194
195
196
197
198
199
200 static Table createIOEScannerTable(byte[] name, final int failCnt)
201 throws IOException {
202
203 Answer<ResultScanner> a = new Answer<ResultScanner>() {
204 int cnt = 0;
205
206 @Override
207 public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
208
209 if (cnt++ < failCnt) {
210
211 Scan scan = mock(Scan.class);
212 doReturn("bogus".getBytes()).when(scan).getStartRow();
213 ResultScanner scanner = mock(ResultScanner.class);
214
215 doThrow(new IOException("Injected exception")).when(scanner).next();
216 return scanner;
217 }
218
219
220 return (ResultScanner) invocation.callRealMethod();
221 }
222 };
223
224 Table htable = spy(createTable(name));
225 doAnswer(a).when(htable).getScanner((Scan) anyObject());
226 return htable;
227 }
228
229
230
231
232
233
234
235 static Table createDNRIOEScannerTable(byte[] name, final int failCnt)
236 throws IOException {
237
238 Answer<ResultScanner> a = new Answer<ResultScanner>() {
239 int cnt = 0;
240
241 @Override
242 public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
243
244 if (cnt++ < failCnt) {
245
246 Scan scan = mock(Scan.class);
247 doReturn("bogus".getBytes()).when(scan).getStartRow();
248 ResultScanner scanner = mock(ResultScanner.class);
249
250 invocation.callRealMethod();
251 doThrow(
252 new NotServingRegionException("Injected simulated TimeoutException"))
253 .when(scanner).next();
254 return scanner;
255 }
256
257
258 return (ResultScanner) invocation.callRealMethod();
259 }
260 };
261
262 Table htable = spy(createTable(name));
263 doAnswer(a).when(htable).getScanner((Scan) anyObject());
264 return htable;
265 }
266
267
268
269
270
271
272
273 @Test
274 public void testTableRecordReaderMapreduce() throws IOException,
275 InterruptedException {
276 Table table = createTable("table1-mr".getBytes());
277 runTestMapreduce(table);
278 }
279
280
281
282
283
284
285
286 @Test
287 public void testTableRecordReaderScannerFailMapreduce() throws IOException,
288 InterruptedException {
289 Table htable = createIOEScannerTable("table2-mr".getBytes(), 1);
290 runTestMapreduce(htable);
291 }
292
293
294
295
296
297
298
299 @Test(expected = IOException.class)
300 public void testTableRecordReaderScannerFailMapreduceTwice() throws IOException,
301 InterruptedException {
302 Table htable = createIOEScannerTable("table3-mr".getBytes(), 2);
303 runTestMapreduce(htable);
304 }
305
306
307
308
309
310
311
312 @Test
313 public void testTableRecordReaderScannerTimeoutMapreduce()
314 throws IOException, InterruptedException {
315 Table htable = createDNRIOEScannerTable("table4-mr".getBytes(), 1);
316 runTestMapreduce(htable);
317 }
318
319
320
321
322
323
324
325 @Test(expected = org.apache.hadoop.hbase.NotServingRegionException.class)
326 public void testTableRecordReaderScannerTimeoutMapreduceTwice()
327 throws IOException, InterruptedException {
328 Table htable = createDNRIOEScannerTable("table5-mr".getBytes(), 2);
329 runTestMapreduce(htable);
330 }
331
332
333
334
335 @Test
336 public void testExtensionOfTableInputFormatBase()
337 throws IOException, InterruptedException, ClassNotFoundException {
338 LOG.info("testing use of an InputFormat taht extends InputFormatBase");
339 final Table htable = createTable(Bytes.toBytes("exampleTable"),
340 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
341 testInputFormat(ExampleTIF.class);
342 }
343
344 @Test
345 public void testJobConfigurableExtensionOfTableInputFormatBase()
346 throws IOException, InterruptedException, ClassNotFoundException {
347 LOG.info("testing use of an InputFormat taht extends InputFormatBase, " +
348 "using JobConfigurable.");
349 final Table htable = createTable(Bytes.toBytes("exampleJobConfigurableTable"),
350 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
351 testInputFormat(ExampleJobConfigurableTIF.class);
352 }
353
354 @Test
355 public void testDeprecatedExtensionOfTableInputFormatBase()
356 throws IOException, InterruptedException, ClassNotFoundException {
357 LOG.info("testing use of an InputFormat taht extends InputFormatBase, " +
358 "using the approach documented in 0.98.");
359 final Table htable = createTable(Bytes.toBytes("exampleDeprecatedTable"),
360 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
361 testInputFormat(ExampleDeprecatedTIF.class);
362 }
363
364 void testInputFormat(Class<? extends InputFormat> clazz)
365 throws IOException, InterruptedException, ClassNotFoundException {
366 final Job job = MapreduceTestingShim.createJob(UTIL.getConfiguration());
367 job.setInputFormatClass(clazz);
368 job.setOutputFormatClass(NullOutputFormat.class);
369 job.setMapperClass(ExampleVerifier.class);
370 job.setNumReduceTasks(0);
371
372 LOG.debug("submitting job.");
373 assertTrue("job failed!", job.waitForCompletion(true));
374 assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, job.getCounters()
375 .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getValue());
376 assertEquals("Saw any instances of the filtered out row.", 0, job.getCounters()
377 .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getValue());
378 assertEquals("Saw the wrong number of instances of columnA.", 1, job.getCounters()
379 .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getValue());
380 assertEquals("Saw the wrong number of instances of columnB.", 1, job.getCounters()
381 .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getValue());
382 assertEquals("Saw the wrong count of values for the filtered-for row.", 2, job.getCounters()
383 .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getValue());
384 assertEquals("Saw the wrong count of values for the filtered-out row.", 0, job.getCounters()
385 .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getValue());
386 }
387
388 public static class ExampleVerifier extends TableMapper<NullWritable, NullWritable> {
389
390 @Override
391 public void map(ImmutableBytesWritable key, Result value, Context context)
392 throws IOException {
393 for (Cell cell : value.listCells()) {
394 context.getCounter(TestTableInputFormat.class.getName() + ":row",
395 Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()))
396 .increment(1l);
397 context.getCounter(TestTableInputFormat.class.getName() + ":family",
398 Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))
399 .increment(1l);
400 context.getCounter(TestTableInputFormat.class.getName() + ":value",
401 Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()))
402 .increment(1l);
403 }
404 }
405
406 }
407
408 public static class ExampleDeprecatedTIF extends TableInputFormatBase implements JobConfigurable {
409
410 @Override
411 public void configure(JobConf job) {
412 try {
413 HTable exampleTable = new HTable(HBaseConfiguration.create(job),
414 Bytes.toBytes("exampleDeprecatedTable"));
415
416 setHTable(exampleTable);
417 byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
418 Bytes.toBytes("columnB") };
419
420 Scan scan = new Scan();
421 for (byte[] family : inputColumns) {
422 scan.addFamily(family);
423 }
424 Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
425 scan.setFilter(exampleFilter);
426 setScan(scan);
427 } catch (IOException exception) {
428 throw new RuntimeException("Failed to configure for job.", exception);
429 }
430 }
431
432 }
433
434
435 public static class ExampleJobConfigurableTIF extends TableInputFormatBase
436 implements JobConfigurable {
437
438 @Override
439 public void configure(JobConf job) {
440 try {
441 Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
442 TableName tableName = TableName.valueOf("exampleJobConfigurableTable");
443
444 initializeTable(connection, tableName);
445 byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
446 Bytes.toBytes("columnB") };
447
448 Scan scan = new Scan();
449 for (byte[] family : inputColumns) {
450 scan.addFamily(family);
451 }
452 Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
453 scan.setFilter(exampleFilter);
454 setScan(scan);
455 } catch (IOException exception) {
456 throw new RuntimeException("Failed to initialize.", exception);
457 }
458 }
459 }
460
461
462 public static class ExampleTIF extends TableInputFormatBase {
463
464 @Override
465 protected void initialize(JobContext job) throws IOException {
466 Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(
467 job.getConfiguration()));
468 TableName tableName = TableName.valueOf("exampleTable");
469
470 initializeTable(connection, tableName);
471 byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
472 Bytes.toBytes("columnB") };
473
474 Scan scan = new Scan();
475 for (byte[] family : inputColumns) {
476 scan.addFamily(family);
477 }
478 Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
479 scan.setFilter(exampleFilter);
480 setScan(scan);
481 }
482
483 }
484 }
485