1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertNotNull;
23 import static org.junit.Assert.assertTrue;
24 import static org.junit.Assert.fail;
25
26 import java.io.File;
27 import java.io.IOException;
28 import java.util.Map;
29 import java.util.NavigableMap;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.fs.FileUtil;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.hbase.testclassification.LargeTests;
36 import org.apache.hadoop.hbase.client.HTable;
37 import org.apache.hadoop.hbase.client.Put;
38 import org.apache.hadoop.hbase.client.Result;
39 import org.apache.hadoop.hbase.client.Scan;
40 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
41 import org.apache.hadoop.hbase.util.Bytes;
42 import org.apache.hadoop.mapreduce.Counter;
43 import org.apache.hadoop.mapreduce.Counters;
44 import org.apache.hadoop.mapreduce.Job;
45 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
46 import org.junit.experimental.categories.Category;
47
48
49
50
51
52
53 @Category(LargeTests.class)
54 public class TestTableMapReduce extends TestTableMapReduceBase {
55 private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class);
56
57 @Override
58 protected Log getLog() { return LOG; }
59
60
61
62
63 static class ProcessContentsMapper extends TableMapper<ImmutableBytesWritable, Put> {
64
65
66
67
68
69
70
71
72
73 @Override
74 public void map(ImmutableBytesWritable key, Result value,
75 Context context)
76 throws IOException, InterruptedException {
77 if (value.size() != 1) {
78 throw new IOException("There should only be one input column");
79 }
80 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
81 cf = value.getMap();
82 if(!cf.containsKey(INPUT_FAMILY)) {
83 throw new IOException("Wrong input columns. Missing: '" +
84 Bytes.toString(INPUT_FAMILY) + "'.");
85 }
86
87
88 String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
89 StringBuilder newValue = new StringBuilder(originalValue);
90 newValue.reverse();
91
92 Put outval = new Put(key.get());
93 outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
94 context.write(key, outval);
95 }
96 }
97
98 protected void runTestOnTable(HTable table) throws IOException {
99 Job job = null;
100 try {
101 LOG.info("Before map/reduce startup");
102 job = new Job(table.getConfiguration(), "process column contents");
103 job.setNumReduceTasks(1);
104 Scan scan = new Scan();
105 scan.addFamily(INPUT_FAMILY);
106 TableMapReduceUtil.initTableMapperJob(
107 Bytes.toString(table.getTableName()), scan,
108 ProcessContentsMapper.class, ImmutableBytesWritable.class,
109 Put.class, job);
110 TableMapReduceUtil.initTableReducerJob(
111 Bytes.toString(table.getTableName()),
112 IdentityTableReducer.class, job);
113 FileOutputFormat.setOutputPath(job, new Path("test"));
114 LOG.info("Started " + Bytes.toString(table.getTableName()));
115 assertTrue(job.waitForCompletion(true));
116 LOG.info("After map/reduce completion");
117
118
119 verify(table.getName());
120
121 verifyJobCountersAreEmitted(job);
122 } catch (InterruptedException e) {
123 throw new IOException(e);
124 } catch (ClassNotFoundException e) {
125 throw new IOException(e);
126 } finally {
127 table.close();
128 if (job != null) {
129 FileUtil.fullyDelete(
130 new File(job.getConfiguration().get("hadoop.tmp.dir")));
131 }
132 }
133 }
134
135
136
137
138
139
140 private void verifyJobCountersAreEmitted(Job job) throws IOException {
141 Counters counters = job.getCounters();
142 Counter counter
143 = counters.findCounter(TableRecordReaderImpl.HBASE_COUNTER_GROUP_NAME, "RPC_CALLS");
144 assertNotNull("Unable to find Job counter for HBase scan metrics, RPC_CALLS", counter);
145 assertTrue("Counter value for RPC_CALLS should be larger than 0", counter.getValue() > 0);
146 }
147
148 }