View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertTrue;
22  
23  import java.io.File;
24  import java.io.IOException;
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.NavigableMap;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.fs.FileUtil;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.hbase.HBaseTestingUtility;
36  import org.apache.hadoop.hbase.testclassification.LargeTests;
37  import org.apache.hadoop.hbase.TableName;
38  import org.apache.hadoop.hbase.client.HTable;
39  import org.apache.hadoop.hbase.client.Result;
40  import org.apache.hadoop.hbase.client.Scan;
41  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
42  import org.apache.hadoop.hbase.util.Bytes;
43  import org.apache.hadoop.io.NullWritable;
44  import org.apache.hadoop.mapreduce.Job;
45  import org.apache.hadoop.mapreduce.Reducer;
46  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
47  import org.junit.After;
48  import org.junit.AfterClass;
49  import org.junit.BeforeClass;
50  import org.junit.Ignore;
51  import org.junit.Test;
52  import org.junit.experimental.categories.Category;
53  
54  /**
55   * Tests various scan start and stop row scenarios. This is set in a scan and
56   * tested in a MapReduce job to see if that is handed over and done properly
57   * too.
58   */
59  @Category(LargeTests.class)
60  public class TestMultiTableInputFormat {
61  
62    static final Log LOG = LogFactory.getLog(TestMultiTableInputFormat.class);
63    static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
64  
65    static final String TABLE_NAME = "scantest";
66    static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
67    static final String KEY_STARTROW = "startRow";
68    static final String KEY_LASTROW = "stpRow";
69  
70    @BeforeClass
71    public static void setUpBeforeClass() throws Exception {
72      // switch TIF to log at DEBUG level
73      TEST_UTIL.enableDebug(MultiTableInputFormat.class);
74      TEST_UTIL.enableDebug(MultiTableInputFormatBase.class);
75      // start mini hbase cluster
76      TEST_UTIL.startMiniCluster(3);
77      // create and fill table
78      for (int i = 0; i < 3; i++) {
79        try (HTable table =
80            TEST_UTIL.createMultiRegionTable(TableName.valueOf(TABLE_NAME + String.valueOf(i)),
81              INPUT_FAMILY, 4)) {
82          TEST_UTIL.loadTable(table, INPUT_FAMILY, false);
83        }
84      }
85      // start MR cluster
86      TEST_UTIL.startMiniMapReduceCluster();
87    }
88  
89    @AfterClass
90    public static void tearDownAfterClass() throws Exception {
91      TEST_UTIL.shutdownMiniMapReduceCluster();
92      TEST_UTIL.shutdownMiniCluster();
93    }
94    
95    @After
96    public void tearDown() throws Exception {
97      Configuration c = TEST_UTIL.getConfiguration();
98      FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir")));
99    }
100 
101   /**
102    * Pass the key and value to reducer.
103    */
104   public static class ScanMapper extends
105       TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
106     /**
107      * Pass the key and value to reduce.
108      *
109      * @param key The key, here "aaa", "aab" etc.
110      * @param value The value is the same as the key.
111      * @param context The task context.
112      * @throws IOException When reading the rows fails.
113      */
114     @Override
115     public void map(ImmutableBytesWritable key, Result value, Context context)
116         throws IOException, InterruptedException {
117       if (value.size() != 1) {
118         throw new IOException("There should only be one input column");
119       }
120       Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cf =
121           value.getMap();
122       if (!cf.containsKey(INPUT_FAMILY)) {
123         throw new IOException("Wrong input columns. Missing: '" +
124             Bytes.toString(INPUT_FAMILY) + "'.");
125       }
126       String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
127       LOG.debug("map: key -> " + Bytes.toStringBinary(key.get()) +
128           ", value -> " + val);
129       context.write(key, key);
130     }
131   }
132 
133   /**
134    * Checks the last and first keys seen against the scanner boundaries.
135    */
136   public static class ScanReducer
137       extends
138       Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
139       NullWritable, NullWritable> {
140     private String first = null;
141     private String last = null;
142 
143     @Override
144     protected void reduce(ImmutableBytesWritable key,
145         Iterable<ImmutableBytesWritable> values, Context context)
146         throws IOException, InterruptedException {
147       int count = 0;
148       for (ImmutableBytesWritable value : values) {
149         String val = Bytes.toStringBinary(value.get());
150         LOG.debug("reduce: key[" + count + "] -> " +
151             Bytes.toStringBinary(key.get()) + ", value -> " + val);
152         if (first == null) first = val;
153         last = val;
154         count++;
155       }
156       assertEquals(3, count);
157     }
158 
159     @Override
160     protected void cleanup(Context context) throws IOException,
161         InterruptedException {
162       Configuration c = context.getConfiguration();
163       String startRow = c.get(KEY_STARTROW);
164       String lastRow = c.get(KEY_LASTROW);
165       LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" +
166           startRow + "\"");
167       LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow +
168           "\"");
169       if (startRow != null && startRow.length() > 0) {
170         assertEquals(startRow, first);
171       }
172       if (lastRow != null && lastRow.length() > 0) {
173         assertEquals(lastRow, last);
174       }
175     }
176   }
177 
178   @Ignore
179   @Test
180   public void testScanEmptyToEmpty() throws IOException, InterruptedException,
181       ClassNotFoundException {
182     testScan(null, null, null);
183   }
184   
185   @Ignore
186   @Test
187   public void testScanEmptyToAPP() throws IOException, InterruptedException,
188       ClassNotFoundException {
189     testScan(null, "app", "apo");
190   }
191 
192   @Test
193   public void testScanOBBToOPP() throws IOException, InterruptedException,
194       ClassNotFoundException {
195     testScan("obb", "opp", "opo");
196   }
197 
198   @Ignore
199   @Test
200   public void testScanYZYToEmpty() throws IOException, InterruptedException,
201       ClassNotFoundException {
202     testScan("yzy", null, "zzz");
203   }
204 
205   /**
206    * Tests a MR scan using specific start and stop rows.
207    *
208    * @throws IOException
209    * @throws ClassNotFoundException
210    * @throws InterruptedException
211    */
212   private void testScan(String start, String stop, String last)
213       throws IOException, InterruptedException, ClassNotFoundException {
214     String jobName =
215         "Scan" + (start != null ? start.toUpperCase() : "Empty") + "To" +
216             (stop != null ? stop.toUpperCase() : "Empty");
217     LOG.info("Before map/reduce startup - job " + jobName);
218     Configuration c = new Configuration(TEST_UTIL.getConfiguration());
219     
220     c.set(KEY_STARTROW, start != null ? start : "");
221     c.set(KEY_LASTROW, last != null ? last : "");
222     
223     List<Scan> scans = new ArrayList<Scan>();
224     
225     for(int i=0; i<3; i++){
226       Scan scan = new Scan();
227       
228       scan.addFamily(INPUT_FAMILY);
229       scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(TABLE_NAME + i));
230       
231       if (start != null) {
232         scan.setStartRow(Bytes.toBytes(start));
233       }
234       if (stop != null) {
235         scan.setStopRow(Bytes.toBytes(stop));
236       }
237       
238       scans.add(scan);
239       
240       LOG.info("scan before: " + scan);
241     }
242     
243     Job job = new Job(c, jobName);
244 
245     TableMapReduceUtil.initTableMapperJob(scans, ScanMapper.class,
246         ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
247     job.setReducerClass(ScanReducer.class);
248     job.setNumReduceTasks(1); // one to get final "first" and "last" key
249     FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
250     LOG.info("Started " + job.getJobName());
251     job.waitForCompletion(true);
252     assertTrue(job.isSuccessful());
253     LOG.info("After map/reduce completion - job " + jobName);
254   }
255 }