View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import static org.junit.Assert.assertTrue;
22  import static org.junit.Assert.fail;
23  
24  import java.io.IOException;
25  import java.util.Iterator;
26  import java.util.Map;
27  import java.util.NavigableMap;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.hbase.Cell;
32  import org.apache.hadoop.hbase.CellUtil;
33  import org.apache.hadoop.hbase.HBaseTestingUtility;
34  import org.apache.hadoop.hbase.HConstants;
35  import org.apache.hadoop.hbase.TableName;
36  import org.apache.hadoop.hbase.client.HTable;
37  import org.apache.hadoop.hbase.client.Put;
38  import org.apache.hadoop.hbase.client.Result;
39  import org.apache.hadoop.hbase.client.ResultScanner;
40  import org.apache.hadoop.hbase.client.Scan;
41  import org.apache.hadoop.hbase.client.Table;
42  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
43  import org.apache.hadoop.hbase.util.Bytes;
44  import org.junit.AfterClass;
45  import org.junit.BeforeClass;
46  import org.junit.Test;
47  
48  /**
49   * A base class for a test Map/Reduce job over HBase tables. The map/reduce process we're testing
50   * on our tables is simple - take every row in the table, reverse the value of a particular cell,
51   * and write it back to the table. Implements common components between mapred and mapreduce
52   * implementations.
53   */
54  public abstract class TestTableMapReduceBase {
55  
56    protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
57    protected static final TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("mrtest");
58    protected static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
59    protected static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
60  
61    protected static final byte[][] columns = new byte[][] {
62      INPUT_FAMILY,
63      OUTPUT_FAMILY
64    };
65  
66    /**
67     * Retrieve my logger instance.
68     */
69    protected abstract Log getLog();
70  
71    /**
72     * Handles API-specifics for setting up and executing the job.
73     */
74    protected abstract void runTestOnTable(HTable table) throws IOException;
75  
76    @BeforeClass
77    public static void beforeClass() throws Exception {
78      UTIL.startMiniCluster();
79      HTable table =
80          UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, new byte[][] { INPUT_FAMILY,
81              OUTPUT_FAMILY });
82      UTIL.loadTable(table, INPUT_FAMILY, false);
83      UTIL.startMiniMapReduceCluster();
84    }
85  
86    @AfterClass
87    public static void afterClass() throws Exception {
88      UTIL.shutdownMiniMapReduceCluster();
89      UTIL.shutdownMiniCluster();
90    }
91  
92    /**
93     * Test a map/reduce against a multi-region table
94     * @throws IOException
95     */
96    @Test
97    public void testMultiRegionTable() throws IOException {
98      runTestOnTable(new HTable(UTIL.getConfiguration(), MULTI_REGION_TABLE_NAME));
99    }
100 
101   @Test
102   public void testCombiner() throws IOException {
103     Configuration conf = new Configuration(UTIL.getConfiguration());
104     // force use of combiner for testing purposes
105     conf.setInt("mapreduce.map.combine.minspills", 1);
106     runTestOnTable(new HTable(conf, MULTI_REGION_TABLE_NAME));
107   }
108 
109   /**
110    * Implements mapper logic for use across APIs.
111    */
112   protected static Put map(ImmutableBytesWritable key, Result value) throws IOException {
113     if (value.size() != 1) {
114       throw new IOException("There should only be one input column");
115     }
116     Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
117       cf = value.getMap();
118     if(!cf.containsKey(INPUT_FAMILY)) {
119       throw new IOException("Wrong input columns. Missing: '" +
120         Bytes.toString(INPUT_FAMILY) + "'.");
121     }
122 
123     // Get the original value and reverse it
124 
125     String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
126     StringBuilder newValue = new StringBuilder(originalValue);
127     newValue.reverse();
128 
129     // Now set the value to be collected
130 
131     Put outval = new Put(key.get());
132     outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
133     return outval;
134   }
135 
136   protected void verify(TableName tableName) throws IOException {
137     Table table = new HTable(UTIL.getConfiguration(), tableName);
138     boolean verified = false;
139     long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000);
140     int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
141     for (int i = 0; i < numRetries; i++) {
142       try {
143         getLog().info("Verification attempt #" + i);
144         verifyAttempt(table);
145         verified = true;
146         break;
147       } catch (NullPointerException e) {
148         // If here, a cell was empty. Presume its because updates came in
149         // after the scanner had been opened. Wait a while and retry.
150         getLog().debug("Verification attempt failed: " + e.getMessage());
151       }
152       try {
153         Thread.sleep(pause);
154       } catch (InterruptedException e) {
155         // continue
156       }
157     }
158     assertTrue(verified);
159   }
160 
161   /**
162    * Looks at every value of the mapreduce output and verifies that indeed
163    * the values have been reversed.
164    * @param table Table to scan.
165    * @throws IOException
166    * @throws NullPointerException if we failed to find a cell value
167    */
168   private void verifyAttempt(final Table table) throws IOException, NullPointerException {
169     Scan scan = new Scan();
170     TableInputFormat.addColumns(scan, columns);
171     ResultScanner scanner = table.getScanner(scan);
172     try {
173       Iterator<Result> itr = scanner.iterator();
174       assertTrue(itr.hasNext());
175       while(itr.hasNext()) {
176         Result r = itr.next();
177         if (getLog().isDebugEnabled()) {
178           if (r.size() > 2 ) {
179             throw new IOException("Too many results, expected 2 got " +
180               r.size());
181           }
182         }
183         byte[] firstValue = null;
184         byte[] secondValue = null;
185         int count = 0;
186          for(Cell kv : r.listCells()) {
187           if (count == 0) {
188             firstValue = CellUtil.cloneValue(kv);
189           }
190           if (count == 1) {
191             secondValue = CellUtil.cloneValue(kv);
192           }
193           count++;
194           if (count == 2) {
195             break;
196           }
197         }
198 
199 
200         if (firstValue == null) {
201           throw new NullPointerException(Bytes.toString(r.getRow()) +
202             ": first value is null");
203         }
204         String first = Bytes.toString(firstValue);
205 
206         if (secondValue == null) {
207           throw new NullPointerException(Bytes.toString(r.getRow()) +
208             ": second value is null");
209         }
210         byte[] secondReversed = new byte[secondValue.length];
211         for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
212           secondReversed[i] = secondValue[j];
213         }
214         String second = Bytes.toString(secondReversed);
215 
216         if (first.compareTo(second) != 0) {
217           if (getLog().isDebugEnabled()) {
218             getLog().debug("second key is not the reverse of first. row=" +
219                 Bytes.toStringBinary(r.getRow()) + ", first value=" + first +
220                 ", second value=" + second);
221           }
222           fail();
223         }
224       }
225     } finally {
226       scanner.close();
227     }
228   }
229 }