1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import static org.junit.Assert.assertTrue;
21 import static org.junit.Assert.fail;
22
23 import java.io.File;
24 import java.io.IOException;
25 import java.util.Iterator;
26 import java.util.Map;
27 import java.util.NavigableMap;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.FileUtil;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.hbase.Cell;
35 import org.apache.hadoop.hbase.CellUtil;
36 import org.apache.hadoop.hbase.HBaseTestingUtility;
37 import org.apache.hadoop.hbase.HConstants;
38 import org.apache.hadoop.hbase.TableName;
39 import org.apache.hadoop.hbase.client.HTable;
40 import org.apache.hadoop.hbase.client.Put;
41 import org.apache.hadoop.hbase.client.Result;
42 import org.apache.hadoop.hbase.client.ResultScanner;
43 import org.apache.hadoop.hbase.client.Scan;
44 import org.apache.hadoop.hbase.client.Table;
45 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
46 import org.apache.hadoop.hbase.testclassification.LargeTests;
47 import org.apache.hadoop.hbase.util.Bytes;
48 import org.apache.hadoop.mapreduce.Job;
49 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
50 import org.junit.AfterClass;
51 import org.junit.BeforeClass;
52 import org.junit.Test;
53 import org.junit.experimental.categories.Category;
54
55
56
57
58
59
60 @Category(LargeTests.class)
61 public class TestMultithreadedTableMapper {
62 private static final Log LOG = LogFactory.getLog(TestMultithreadedTableMapper.class);
63 private static final HBaseTestingUtility UTIL =
64 new HBaseTestingUtility();
65 static final TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("mrtest");
66 static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
67 static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
68 static final int NUMBER_OF_THREADS = 10;
69
70 @BeforeClass
71 public static void beforeClass() throws Exception {
72 UTIL.startMiniCluster();
73 HTable table =
74 UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, new byte[][] { INPUT_FAMILY,
75 OUTPUT_FAMILY });
76 UTIL.loadTable(table, INPUT_FAMILY, false);
77 UTIL.startMiniMapReduceCluster();
78 UTIL.waitUntilAllRegionsAssigned(MULTI_REGION_TABLE_NAME);
79 }
80
81 @AfterClass
82 public static void afterClass() throws Exception {
83 UTIL.shutdownMiniMapReduceCluster();
84 UTIL.shutdownMiniCluster();
85 }
86
87
88
89
90 public static class ProcessContentsMapper
91 extends TableMapper<ImmutableBytesWritable, Put> {
92
93
94
95
96
97
98
99
100
101 @Override
102 public void map(ImmutableBytesWritable key, Result value,
103 Context context)
104 throws IOException, InterruptedException {
105 if (value.size() != 1) {
106 throw new IOException("There should only be one input column");
107 }
108 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
109 cf = value.getMap();
110 if(!cf.containsKey(INPUT_FAMILY)) {
111 throw new IOException("Wrong input columns. Missing: '" +
112 Bytes.toString(INPUT_FAMILY) + "'.");
113 }
114
115 String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
116 StringBuilder newValue = new StringBuilder(originalValue);
117 newValue.reverse();
118
119 Put outval = new Put(key.get());
120 outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
121 context.write(key, outval);
122 }
123 }
124
125
126
127
128
129
130
131 @Test
132 public void testMultithreadedTableMapper()
133 throws IOException, InterruptedException, ClassNotFoundException {
134 runTestOnTable(new HTable(new Configuration(UTIL.getConfiguration()),
135 MULTI_REGION_TABLE_NAME));
136 }
137
138 private void runTestOnTable(HTable table)
139 throws IOException, InterruptedException, ClassNotFoundException {
140 Job job = null;
141 try {
142 LOG.info("Before map/reduce startup");
143 job = new Job(table.getConfiguration(), "process column contents");
144 job.setNumReduceTasks(1);
145 Scan scan = new Scan();
146 scan.addFamily(INPUT_FAMILY);
147 TableMapReduceUtil.initTableMapperJob(
148 table.getTableName(), scan,
149 MultithreadedTableMapper.class, ImmutableBytesWritable.class,
150 Put.class, job);
151 MultithreadedTableMapper.setMapperClass(job, ProcessContentsMapper.class);
152 MultithreadedTableMapper.setNumberOfThreads(job, NUMBER_OF_THREADS);
153 TableMapReduceUtil.initTableReducerJob(
154 Bytes.toString(table.getTableName()),
155 IdentityTableReducer.class, job);
156 FileOutputFormat.setOutputPath(job, new Path("test"));
157 LOG.info("Started " + table.getTableName());
158 assertTrue(job.waitForCompletion(true));
159 LOG.info("After map/reduce completion");
160
161 verify(table.getName());
162 } finally {
163 table.close();
164 if (job != null) {
165 FileUtil.fullyDelete(
166 new File(job.getConfiguration().get("hadoop.tmp.dir")));
167 }
168 }
169 }
170
171 private void verify(TableName tableName) throws IOException {
172 Table table = new HTable(new Configuration(UTIL.getConfiguration()), tableName);
173 boolean verified = false;
174 long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000);
175 int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
176 for (int i = 0; i < numRetries; i++) {
177 try {
178 LOG.info("Verification attempt #" + i);
179 verifyAttempt(table);
180 verified = true;
181 break;
182 } catch (NullPointerException e) {
183
184
185 LOG.debug("Verification attempt failed: " + e.getMessage());
186 }
187 try {
188 Thread.sleep(pause);
189 } catch (InterruptedException e) {
190
191 }
192 }
193 assertTrue(verified);
194 table.close();
195 }
196
197
198
199
200
201
202
203
204
205 private void verifyAttempt(final Table table)
206 throws IOException, NullPointerException {
207 Scan scan = new Scan();
208 scan.addFamily(INPUT_FAMILY);
209 scan.addFamily(OUTPUT_FAMILY);
210 ResultScanner scanner = table.getScanner(scan);
211 try {
212 Iterator<Result> itr = scanner.iterator();
213 assertTrue(itr.hasNext());
214 while(itr.hasNext()) {
215 Result r = itr.next();
216 if (LOG.isDebugEnabled()) {
217 if (r.size() > 2 ) {
218 throw new IOException("Too many results, expected 2 got " +
219 r.size());
220 }
221 }
222 byte[] firstValue = null;
223 byte[] secondValue = null;
224 int count = 0;
225 for(Cell kv : r.listCells()) {
226 if (count == 0) {
227 firstValue = CellUtil.cloneValue(kv);
228 }else if (count == 1) {
229 secondValue = CellUtil.cloneValue(kv);
230 }else if (count == 2) {
231 break;
232 }
233 count++;
234 }
235 String first = "";
236 if (firstValue == null) {
237 throw new NullPointerException(Bytes.toString(r.getRow()) +
238 ": first value is null");
239 }
240 first = Bytes.toString(firstValue);
241 String second = "";
242 if (secondValue == null) {
243 throw new NullPointerException(Bytes.toString(r.getRow()) +
244 ": second value is null");
245 }
246 byte[] secondReversed = new byte[secondValue.length];
247 for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
248 secondReversed[i] = secondValue[j];
249 }
250 second = Bytes.toString(secondReversed);
251 if (first.compareTo(second) != 0) {
252 if (LOG.isDebugEnabled()) {
253 LOG.debug("second key is not the reverse of first. row=" +
254 Bytes.toStringBinary(r.getRow()) + ", first value=" + first +
255 ", second value=" + second);
256 }
257 fail();
258 }
259 }
260 } finally {
261 scanner.close();
262 }
263 }
264
265 }
266