View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import static org.junit.Assert.assertTrue;
21  import static org.junit.Assert.fail;
22  
23  import java.io.File;
24  import java.io.IOException;
25  import java.util.Iterator;
26  import java.util.Map;
27  import java.util.NavigableMap;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.fs.FileUtil;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.hbase.Cell;
35  import org.apache.hadoop.hbase.CellUtil;
36  import org.apache.hadoop.hbase.HBaseTestingUtility;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.TableName;
39  import org.apache.hadoop.hbase.client.HTable;
40  import org.apache.hadoop.hbase.client.Put;
41  import org.apache.hadoop.hbase.client.Result;
42  import org.apache.hadoop.hbase.client.ResultScanner;
43  import org.apache.hadoop.hbase.client.Scan;
44  import org.apache.hadoop.hbase.client.Table;
45  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
46  import org.apache.hadoop.hbase.testclassification.LargeTests;
47  import org.apache.hadoop.hbase.util.Bytes;
48  import org.apache.hadoop.mapreduce.Job;
49  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
50  import org.junit.AfterClass;
51  import org.junit.BeforeClass;
52  import org.junit.Test;
53  import org.junit.experimental.categories.Category;
54  
55  /**
56   * Test Map/Reduce job over HBase tables. The map/reduce process we're testing
57   * on our tables is simple - take every row in the table, reverse the value of
58   * a particular cell, and write it back to the table.
59   */
60  @Category(LargeTests.class)
61  public class TestMultithreadedTableMapper {
62    private static final Log LOG = LogFactory.getLog(TestMultithreadedTableMapper.class);
63    private static final HBaseTestingUtility UTIL =
64        new HBaseTestingUtility();
65    static final TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("mrtest");
66    static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
67    static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
68    static final int    NUMBER_OF_THREADS = 10;
69  
70    @BeforeClass
71    public static void beforeClass() throws Exception {
72      UTIL.startMiniCluster();
73      HTable table =
74          UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, new byte[][] { INPUT_FAMILY,
75              OUTPUT_FAMILY });
76      UTIL.loadTable(table, INPUT_FAMILY, false);
77      UTIL.startMiniMapReduceCluster();
78      UTIL.waitUntilAllRegionsAssigned(MULTI_REGION_TABLE_NAME);
79    }
80  
81    @AfterClass
82    public static void afterClass() throws Exception {
83      UTIL.shutdownMiniMapReduceCluster();
84      UTIL.shutdownMiniCluster();
85    }
86  
87    /**
88     * Pass the given key and processed record reduce
89     */
90    public static class ProcessContentsMapper
91    extends TableMapper<ImmutableBytesWritable, Put> {
92  
93      /**
94       * Pass the key, and reversed value to reduce
95       *
96       * @param key
97       * @param value
98       * @param context
99       * @throws IOException
100      */
101     @Override
102     public void map(ImmutableBytesWritable key, Result value,
103         Context context)
104             throws IOException, InterruptedException {
105       if (value.size() != 1) {
106         throw new IOException("There should only be one input column");
107       }
108       Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
109       cf = value.getMap();
110       if(!cf.containsKey(INPUT_FAMILY)) {
111         throw new IOException("Wrong input columns. Missing: '" +
112             Bytes.toString(INPUT_FAMILY) + "'.");
113       }
114       // Get the original value and reverse it
115       String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
116       StringBuilder newValue = new StringBuilder(originalValue);
117       newValue.reverse();
118       // Now set the value to be collected
119       Put outval = new Put(key.get());
120       outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
121       context.write(key, outval);
122     }
123   }
124 
125   /**
126    * Test multithreadedTableMappper map/reduce against a multi-region table
127    * @throws IOException
128    * @throws ClassNotFoundException
129    * @throws InterruptedException
130    */
131   @Test
132   public void testMultithreadedTableMapper()
133       throws IOException, InterruptedException, ClassNotFoundException {
134     runTestOnTable(new HTable(new Configuration(UTIL.getConfiguration()),
135         MULTI_REGION_TABLE_NAME));
136   }
137 
138   private void runTestOnTable(HTable table)
139       throws IOException, InterruptedException, ClassNotFoundException {
140     Job job = null;
141     try {
142       LOG.info("Before map/reduce startup");
143       job = new Job(table.getConfiguration(), "process column contents");
144       job.setNumReduceTasks(1);
145       Scan scan = new Scan();
146       scan.addFamily(INPUT_FAMILY);
147       TableMapReduceUtil.initTableMapperJob(
148           table.getTableName(), scan,
149           MultithreadedTableMapper.class, ImmutableBytesWritable.class,
150           Put.class, job);
151       MultithreadedTableMapper.setMapperClass(job, ProcessContentsMapper.class);
152       MultithreadedTableMapper.setNumberOfThreads(job, NUMBER_OF_THREADS);
153       TableMapReduceUtil.initTableReducerJob(
154           Bytes.toString(table.getTableName()),
155           IdentityTableReducer.class, job);
156       FileOutputFormat.setOutputPath(job, new Path("test"));
157       LOG.info("Started " + table.getTableName());
158       assertTrue(job.waitForCompletion(true));
159       LOG.info("After map/reduce completion");
160       // verify map-reduce results
161       verify(table.getName());
162     } finally {
163       table.close();
164       if (job != null) {
165         FileUtil.fullyDelete(
166             new File(job.getConfiguration().get("hadoop.tmp.dir")));
167       }
168     }
169   }
170 
171   private void verify(TableName tableName) throws IOException {
172     Table table = new HTable(new Configuration(UTIL.getConfiguration()), tableName);
173     boolean verified = false;
174     long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000);
175     int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
176     for (int i = 0; i < numRetries; i++) {
177       try {
178         LOG.info("Verification attempt #" + i);
179         verifyAttempt(table);
180         verified = true;
181         break;
182       } catch (NullPointerException e) {
183         // If here, a cell was empty.  Presume its because updates came in
184         // after the scanner had been opened.  Wait a while and retry.
185         LOG.debug("Verification attempt failed: " + e.getMessage());
186       }
187       try {
188         Thread.sleep(pause);
189       } catch (InterruptedException e) {
190         // continue
191       }
192     }
193     assertTrue(verified);
194     table.close();
195   }
196 
197   /**
198    * Looks at every value of the mapreduce output and verifies that indeed
199    * the values have been reversed.
200    *
201    * @param table Table to scan.
202    * @throws IOException
203    * @throws NullPointerException if we failed to find a cell value
204    */
205   private void verifyAttempt(final Table table)
206       throws IOException, NullPointerException {
207     Scan scan = new Scan();
208     scan.addFamily(INPUT_FAMILY);
209     scan.addFamily(OUTPUT_FAMILY);
210     ResultScanner scanner = table.getScanner(scan);
211     try {
212       Iterator<Result> itr = scanner.iterator();
213       assertTrue(itr.hasNext());
214       while(itr.hasNext()) {
215         Result r = itr.next();
216         if (LOG.isDebugEnabled()) {
217           if (r.size() > 2 ) {
218             throw new IOException("Too many results, expected 2 got " +
219                 r.size());
220           }
221         }
222         byte[] firstValue = null;
223         byte[] secondValue = null;
224         int count = 0;
225         for(Cell kv : r.listCells()) {
226           if (count == 0) {
227             firstValue = CellUtil.cloneValue(kv);
228           }else if (count == 1) {
229             secondValue = CellUtil.cloneValue(kv);
230           }else if (count == 2) {
231             break;
232           }
233           count++;
234         }
235         String first = "";
236         if (firstValue == null) {
237           throw new NullPointerException(Bytes.toString(r.getRow()) +
238               ": first value is null");
239         }
240         first = Bytes.toString(firstValue);
241         String second = "";
242         if (secondValue == null) {
243           throw new NullPointerException(Bytes.toString(r.getRow()) +
244               ": second value is null");
245         }
246         byte[] secondReversed = new byte[secondValue.length];
247         for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
248           secondReversed[i] = secondValue[j];
249         }
250         second = Bytes.toString(secondReversed);
251         if (first.compareTo(second) != 0) {
252           if (LOG.isDebugEnabled()) {
253             LOG.debug("second key is not the reverse of first. row=" +
254                 Bytes.toStringBinary(r.getRow()) + ", first value=" + first +
255                 ", second value=" + second);
256           }
257           fail();
258         }
259       }
260     } finally {
261       scanner.close();
262     }
263   }
264 
265 }
266