1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import static org.junit.Assert.assertTrue;
22 import static org.junit.Assert.fail;
23
24 import java.io.IOException;
25 import java.util.Iterator;
26 import java.util.Map;
27 import java.util.NavigableMap;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.Cell;
32 import org.apache.hadoop.hbase.CellUtil;
33 import org.apache.hadoop.hbase.HBaseTestingUtility;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.TableName;
36 import org.apache.hadoop.hbase.client.HTable;
37 import org.apache.hadoop.hbase.client.Put;
38 import org.apache.hadoop.hbase.client.Result;
39 import org.apache.hadoop.hbase.client.ResultScanner;
40 import org.apache.hadoop.hbase.client.Scan;
41 import org.apache.hadoop.hbase.client.Table;
42 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
43 import org.apache.hadoop.hbase.util.Bytes;
44 import org.junit.AfterClass;
45 import org.junit.BeforeClass;
46 import org.junit.Test;
47
48
49
50
51
52
53
54 public abstract class TestTableMapReduceBase {
55
56 protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
57 protected static final TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("mrtest");
58 protected static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
59 protected static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
60
61 protected static final byte[][] columns = new byte[][] {
62 INPUT_FAMILY,
63 OUTPUT_FAMILY
64 };
65
66
67
68
69 protected abstract Log getLog();
70
71
72
73
74 protected abstract void runTestOnTable(HTable table) throws IOException;
75
76 @BeforeClass
77 public static void beforeClass() throws Exception {
78 UTIL.startMiniCluster();
79 HTable table =
80 UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, new byte[][] { INPUT_FAMILY,
81 OUTPUT_FAMILY });
82 UTIL.loadTable(table, INPUT_FAMILY, false);
83 UTIL.startMiniMapReduceCluster();
84 }
85
86 @AfterClass
87 public static void afterClass() throws Exception {
88 UTIL.shutdownMiniMapReduceCluster();
89 UTIL.shutdownMiniCluster();
90 }
91
92
93
94
95
96 @Test
97 public void testMultiRegionTable() throws IOException {
98 runTestOnTable(new HTable(UTIL.getConfiguration(), MULTI_REGION_TABLE_NAME));
99 }
100
101 @Test
102 public void testCombiner() throws IOException {
103 Configuration conf = new Configuration(UTIL.getConfiguration());
104
105 conf.setInt("mapreduce.map.combine.minspills", 1);
106 runTestOnTable(new HTable(conf, MULTI_REGION_TABLE_NAME));
107 }
108
109
110
111
112 protected static Put map(ImmutableBytesWritable key, Result value) throws IOException {
113 if (value.size() != 1) {
114 throw new IOException("There should only be one input column");
115 }
116 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
117 cf = value.getMap();
118 if(!cf.containsKey(INPUT_FAMILY)) {
119 throw new IOException("Wrong input columns. Missing: '" +
120 Bytes.toString(INPUT_FAMILY) + "'.");
121 }
122
123
124
125 String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
126 StringBuilder newValue = new StringBuilder(originalValue);
127 newValue.reverse();
128
129
130
131 Put outval = new Put(key.get());
132 outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
133 return outval;
134 }
135
136 protected void verify(TableName tableName) throws IOException {
137 Table table = new HTable(UTIL.getConfiguration(), tableName);
138 boolean verified = false;
139 long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000);
140 int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
141 for (int i = 0; i < numRetries; i++) {
142 try {
143 getLog().info("Verification attempt #" + i);
144 verifyAttempt(table);
145 verified = true;
146 break;
147 } catch (NullPointerException e) {
148
149
150 getLog().debug("Verification attempt failed: " + e.getMessage());
151 }
152 try {
153 Thread.sleep(pause);
154 } catch (InterruptedException e) {
155
156 }
157 }
158 assertTrue(verified);
159 }
160
161
162
163
164
165
166
167
168 private void verifyAttempt(final Table table) throws IOException, NullPointerException {
169 Scan scan = new Scan();
170 TableInputFormat.addColumns(scan, columns);
171 ResultScanner scanner = table.getScanner(scan);
172 try {
173 Iterator<Result> itr = scanner.iterator();
174 assertTrue(itr.hasNext());
175 while(itr.hasNext()) {
176 Result r = itr.next();
177 if (getLog().isDebugEnabled()) {
178 if (r.size() > 2 ) {
179 throw new IOException("Too many results, expected 2 got " +
180 r.size());
181 }
182 }
183 byte[] firstValue = null;
184 byte[] secondValue = null;
185 int count = 0;
186 for(Cell kv : r.listCells()) {
187 if (count == 0) {
188 firstValue = CellUtil.cloneValue(kv);
189 }
190 if (count == 1) {
191 secondValue = CellUtil.cloneValue(kv);
192 }
193 count++;
194 if (count == 2) {
195 break;
196 }
197 }
198
199
200 if (firstValue == null) {
201 throw new NullPointerException(Bytes.toString(r.getRow()) +
202 ": first value is null");
203 }
204 String first = Bytes.toString(firstValue);
205
206 if (secondValue == null) {
207 throw new NullPointerException(Bytes.toString(r.getRow()) +
208 ": second value is null");
209 }
210 byte[] secondReversed = new byte[secondValue.length];
211 for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
212 secondReversed[i] = secondValue[j];
213 }
214 String second = Bytes.toString(secondReversed);
215
216 if (first.compareTo(second) != 0) {
217 if (getLog().isDebugEnabled()) {
218 getLog().debug("second key is not the reverse of first. row=" +
219 Bytes.toStringBinary(r.getRow()) + ", first value=" + first +
220 ", second value=" + second);
221 }
222 fail();
223 }
224 }
225 } finally {
226 scanner.close();
227 }
228 }
229 }