View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import static org.junit.Assert.assertFalse;
22  import static org.junit.Assert.assertNotNull;
23  import static org.junit.Assert.assertTrue;
24  import static org.junit.Assert.fail;
25  
26  import java.io.File;
27  import java.io.IOException;
28  import java.util.Map;
29  import java.util.NavigableMap;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.fs.FileUtil;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.hbase.testclassification.LargeTests;
36  import org.apache.hadoop.hbase.client.HTable;
37  import org.apache.hadoop.hbase.client.Put;
38  import org.apache.hadoop.hbase.client.Result;
39  import org.apache.hadoop.hbase.client.Scan;
40  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
41  import org.apache.hadoop.hbase.util.Bytes;
42  import org.apache.hadoop.mapreduce.Counter;
43  import org.apache.hadoop.mapreduce.Counters;
44  import org.apache.hadoop.mapreduce.Job;
45  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
46  import org.junit.experimental.categories.Category;
47  
48  /**
49   * Test Map/Reduce job over HBase tables. The map/reduce process we're testing
50   * on our tables is simple - take every row in the table, reverse the value of
51   * a particular cell, and write it back to the table.
52   */
53  @Category(LargeTests.class)
54  public class TestTableMapReduce extends TestTableMapReduceBase {
55    private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class);
56  
57    @Override
58    protected Log getLog() { return LOG; }
59  
60    /**
61     * Pass the given key and processed record reduce
62     */
63    static class ProcessContentsMapper extends TableMapper<ImmutableBytesWritable, Put> {
64  
65      /**
66       * Pass the key, and reversed value to reduce
67       *
68       * @param key
69       * @param value
70       * @param context
71       * @throws IOException
72       */
73      @Override
74      public void map(ImmutableBytesWritable key, Result value,
75        Context context)
76      throws IOException, InterruptedException {
77        if (value.size() != 1) {
78          throw new IOException("There should only be one input column");
79        }
80        Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
81          cf = value.getMap();
82        if(!cf.containsKey(INPUT_FAMILY)) {
83          throw new IOException("Wrong input columns. Missing: '" +
84            Bytes.toString(INPUT_FAMILY) + "'.");
85        }
86  
87        // Get the original value and reverse it
88        String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
89        StringBuilder newValue = new StringBuilder(originalValue);
90        newValue.reverse();
91        // Now set the value to be collected
92        Put outval = new Put(key.get());
93        outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
94        context.write(key, outval);
95      }
96    }
97  
98    protected void runTestOnTable(HTable table) throws IOException {
99      Job job = null;
100     try {
101       LOG.info("Before map/reduce startup");
102       job = new Job(table.getConfiguration(), "process column contents");
103       job.setNumReduceTasks(1);
104       Scan scan = new Scan();
105       scan.addFamily(INPUT_FAMILY);
106       TableMapReduceUtil.initTableMapperJob(
107         Bytes.toString(table.getTableName()), scan,
108         ProcessContentsMapper.class, ImmutableBytesWritable.class,
109         Put.class, job);
110       TableMapReduceUtil.initTableReducerJob(
111         Bytes.toString(table.getTableName()),
112         IdentityTableReducer.class, job);
113       FileOutputFormat.setOutputPath(job, new Path("test"));
114       LOG.info("Started " + Bytes.toString(table.getTableName()));
115       assertTrue(job.waitForCompletion(true));
116       LOG.info("After map/reduce completion");
117 
118       // verify map-reduce results
119       verify(table.getName());
120 
121       verifyJobCountersAreEmitted(job);
122     } catch (InterruptedException e) {
123       throw new IOException(e);
124     } catch (ClassNotFoundException e) {
125       throw new IOException(e);
126     } finally {
127       table.close();
128       if (job != null) {
129         FileUtil.fullyDelete(
130           new File(job.getConfiguration().get("hadoop.tmp.dir")));
131       }
132     }
133   }
134 
135   /**
136    * Verify scan counters are emitted from the job
137    * @param job
138    * @throws IOException
139    */
140   private void verifyJobCountersAreEmitted(Job job) throws IOException {
141     Counters counters = job.getCounters();
142     Counter counter
143       = counters.findCounter(TableRecordReaderImpl.HBASE_COUNTER_GROUP_NAME, "RPC_CALLS");
144     assertNotNull("Unable to find Job counter for HBase scan metrics, RPC_CALLS", counter);
145     assertTrue("Counter value for RPC_CALLS should be larger than 0", counter.getValue() > 0);
146   }
147 
148 }