1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22
23 import java.io.File;
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.NavigableMap;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.fs.FileUtil;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.hbase.HBaseTestingUtility;
36 import org.apache.hadoop.hbase.testclassification.LargeTests;
37 import org.apache.hadoop.hbase.TableName;
38 import org.apache.hadoop.hbase.client.HTable;
39 import org.apache.hadoop.hbase.client.Result;
40 import org.apache.hadoop.hbase.client.Scan;
41 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
42 import org.apache.hadoop.hbase.util.Bytes;
43 import org.apache.hadoop.io.NullWritable;
44 import org.apache.hadoop.mapreduce.Job;
45 import org.apache.hadoop.mapreduce.Reducer;
46 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
47 import org.junit.After;
48 import org.junit.AfterClass;
49 import org.junit.BeforeClass;
50 import org.junit.Ignore;
51 import org.junit.Test;
52 import org.junit.experimental.categories.Category;
53
54
55
56
57
58
59 @Category(LargeTests.class)
60 public class TestMultiTableInputFormat {
61
62 static final Log LOG = LogFactory.getLog(TestMultiTableInputFormat.class);
63 static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
64
65 static final String TABLE_NAME = "scantest";
66 static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
67 static final String KEY_STARTROW = "startRow";
68 static final String KEY_LASTROW = "stpRow";
69
70 @BeforeClass
71 public static void setUpBeforeClass() throws Exception {
72
73 TEST_UTIL.enableDebug(MultiTableInputFormat.class);
74 TEST_UTIL.enableDebug(MultiTableInputFormatBase.class);
75
76 TEST_UTIL.startMiniCluster(3);
77
78 for (int i = 0; i < 3; i++) {
79 try (HTable table =
80 TEST_UTIL.createMultiRegionTable(TableName.valueOf(TABLE_NAME + String.valueOf(i)),
81 INPUT_FAMILY, 4)) {
82 TEST_UTIL.loadTable(table, INPUT_FAMILY, false);
83 }
84 }
85
86 TEST_UTIL.startMiniMapReduceCluster();
87 }
88
89 @AfterClass
90 public static void tearDownAfterClass() throws Exception {
91 TEST_UTIL.shutdownMiniMapReduceCluster();
92 TEST_UTIL.shutdownMiniCluster();
93 }
94
95 @After
96 public void tearDown() throws Exception {
97 Configuration c = TEST_UTIL.getConfiguration();
98 FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir")));
99 }
100
101
102
103
104 public static class ScanMapper extends
105 TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
106
107
108
109
110
111
112
113
114 @Override
115 public void map(ImmutableBytesWritable key, Result value, Context context)
116 throws IOException, InterruptedException {
117 if (value.size() != 1) {
118 throw new IOException("There should only be one input column");
119 }
120 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cf =
121 value.getMap();
122 if (!cf.containsKey(INPUT_FAMILY)) {
123 throw new IOException("Wrong input columns. Missing: '" +
124 Bytes.toString(INPUT_FAMILY) + "'.");
125 }
126 String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
127 LOG.debug("map: key -> " + Bytes.toStringBinary(key.get()) +
128 ", value -> " + val);
129 context.write(key, key);
130 }
131 }
132
133
134
135
136 public static class ScanReducer
137 extends
138 Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
139 NullWritable, NullWritable> {
140 private String first = null;
141 private String last = null;
142
143 @Override
144 protected void reduce(ImmutableBytesWritable key,
145 Iterable<ImmutableBytesWritable> values, Context context)
146 throws IOException, InterruptedException {
147 int count = 0;
148 for (ImmutableBytesWritable value : values) {
149 String val = Bytes.toStringBinary(value.get());
150 LOG.debug("reduce: key[" + count + "] -> " +
151 Bytes.toStringBinary(key.get()) + ", value -> " + val);
152 if (first == null) first = val;
153 last = val;
154 count++;
155 }
156 assertEquals(3, count);
157 }
158
159 @Override
160 protected void cleanup(Context context) throws IOException,
161 InterruptedException {
162 Configuration c = context.getConfiguration();
163 String startRow = c.get(KEY_STARTROW);
164 String lastRow = c.get(KEY_LASTROW);
165 LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" +
166 startRow + "\"");
167 LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow +
168 "\"");
169 if (startRow != null && startRow.length() > 0) {
170 assertEquals(startRow, first);
171 }
172 if (lastRow != null && lastRow.length() > 0) {
173 assertEquals(lastRow, last);
174 }
175 }
176 }
177
178 @Ignore
179 @Test
180 public void testScanEmptyToEmpty() throws IOException, InterruptedException,
181 ClassNotFoundException {
182 testScan(null, null, null);
183 }
184
185 @Ignore
186 @Test
187 public void testScanEmptyToAPP() throws IOException, InterruptedException,
188 ClassNotFoundException {
189 testScan(null, "app", "apo");
190 }
191
192 @Test
193 public void testScanOBBToOPP() throws IOException, InterruptedException,
194 ClassNotFoundException {
195 testScan("obb", "opp", "opo");
196 }
197
198 @Ignore
199 @Test
200 public void testScanYZYToEmpty() throws IOException, InterruptedException,
201 ClassNotFoundException {
202 testScan("yzy", null, "zzz");
203 }
204
205
206
207
208
209
210
211
212 private void testScan(String start, String stop, String last)
213 throws IOException, InterruptedException, ClassNotFoundException {
214 String jobName =
215 "Scan" + (start != null ? start.toUpperCase() : "Empty") + "To" +
216 (stop != null ? stop.toUpperCase() : "Empty");
217 LOG.info("Before map/reduce startup - job " + jobName);
218 Configuration c = new Configuration(TEST_UTIL.getConfiguration());
219
220 c.set(KEY_STARTROW, start != null ? start : "");
221 c.set(KEY_LASTROW, last != null ? last : "");
222
223 List<Scan> scans = new ArrayList<Scan>();
224
225 for(int i=0; i<3; i++){
226 Scan scan = new Scan();
227
228 scan.addFamily(INPUT_FAMILY);
229 scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(TABLE_NAME + i));
230
231 if (start != null) {
232 scan.setStartRow(Bytes.toBytes(start));
233 }
234 if (stop != null) {
235 scan.setStopRow(Bytes.toBytes(stop));
236 }
237
238 scans.add(scan);
239
240 LOG.info("scan before: " + scan);
241 }
242
243 Job job = new Job(c, jobName);
244
245 TableMapReduceUtil.initTableMapperJob(scans, ScanMapper.class,
246 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
247 job.setReducerClass(ScanReducer.class);
248 job.setNumReduceTasks(1);
249 FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
250 LOG.info("Started " + job.getJobName());
251 job.waitForCompletion(true);
252 assertTrue(job.isSuccessful());
253 LOG.info("After map/reduce completion - job " + jobName);
254 }
255 }