View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.mapred;
20  
21  import static org.mockito.Mockito.mock;
22  
23  import org.apache.hadoop.fs.Path;
24  import org.apache.hadoop.hbase.HBaseTestingUtility;
25  import org.apache.hadoop.hbase.HConstants;
26  import org.apache.hadoop.hbase.testclassification.LargeTests;
27  import org.apache.hadoop.hbase.TableName;
28  import org.apache.hadoop.hbase.client.Result;
29  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
30  import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatTestBase;
31  import org.apache.hadoop.hbase.util.Bytes;
32  import org.apache.hadoop.io.NullWritable;
33  import org.apache.hadoop.mapred.InputSplit;
34  import org.apache.hadoop.mapred.JobClient;
35  import org.apache.hadoop.mapred.JobConf;
36  import org.apache.hadoop.mapred.MapReduceBase;
37  import org.apache.hadoop.mapred.OutputCollector;
38  import org.apache.hadoop.mapred.RecordReader;
39  import org.apache.hadoop.mapred.Reducer;
40  import org.apache.hadoop.mapred.Reporter;
41  import org.apache.hadoop.mapred.RunningJob;
42  import org.apache.hadoop.mapred.lib.NullOutputFormat;
43  import org.junit.Assert;
44  import org.junit.Test;
45  import org.junit.experimental.categories.Category;
46  
47  import java.io.IOException;
48  import java.util.Iterator;
49  
50  @Category(LargeTests.class)
51  public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase {
52  
53    private static final byte[] aaa = Bytes.toBytes("aaa");
54    private static final byte[] after_zzz = Bytes.toBytes("zz{"); // 'z' + 1 => '{'
55    private static final String COLUMNS =
56      Bytes.toString(FAMILIES[0]) + " " + Bytes.toString(FAMILIES[1]);
57  
58    @Override
59    protected byte[] getStartRow() {
60      return aaa;
61    }
62  
63    @Override
64    protected byte[] getEndRow() {
65      return after_zzz;
66    }
67  
68    static class TestTableSnapshotMapper extends MapReduceBase
69        implements  TableMap<ImmutableBytesWritable, NullWritable> {
70      @Override
71      public void map(ImmutableBytesWritable key, Result value,
72          OutputCollector<ImmutableBytesWritable, NullWritable> collector, Reporter reporter)
73          throws IOException {
74        verifyRowFromMap(key, value);
75        collector.collect(key, NullWritable.get());
76      }
77    }
78  
79    public static class TestTableSnapshotReducer extends MapReduceBase
80        implements Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> {
81      HBaseTestingUtility.SeenRowTracker rowTracker =
82        new HBaseTestingUtility.SeenRowTracker(aaa, after_zzz);
83  
84      @Override
85      public void reduce(ImmutableBytesWritable key, Iterator<NullWritable> values,
86          OutputCollector<NullWritable, NullWritable> collector, Reporter reporter)
87          throws IOException {
88        rowTracker.addRow(key.get());
89      }
90  
91      @Override
92      public void close() {
93        rowTracker.validate();
94      }
95    }
96  
97    @Test
98    public void testInitTableSnapshotMapperJobConfig() throws Exception {
99      setupCluster();
100     TableName tableName = TableName.valueOf("testInitTableSnapshotMapperJobConfig");
101     String snapshotName = "foo";
102 
103     try {
104       createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
105       JobConf job = new JobConf(UTIL.getConfiguration());
106       Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
107 
108       TableMapReduceUtil.initTableSnapshotMapJob(snapshotName,
109         COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
110         NullWritable.class, job, false, tmpTableDir);
111 
112       // TODO: would be better to examine directly the cache instance that results from this
113       // config. Currently this is not possible because BlockCache initialization is static.
114       Assert.assertEquals(
115         "Snapshot job should be configured for default LruBlockCache.",
116         HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT,
117         job.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01);
118       Assert.assertEquals(
119         "Snapshot job should not use BucketCache.",
120         0, job.getFloat("hbase.bucketcache.size", -1), 0.01);
121     } finally {
122       UTIL.getHBaseAdmin().deleteSnapshot(snapshotName);
123       UTIL.deleteTable(tableName);
124       tearDownCluster();
125     }
126   }
127 
128   // TODO: mapred does not support limiting input range by startrow, endrow.
129   // Thus the following tests must override parameterverification.
130 
131   @Test
132   @Override
133   public void testWithMockedMapReduceMultiRegion() throws Exception {
134     testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 10);
135   }
136 
137   @Test
138   @Override
139   public void testWithMapReduceMultiRegion() throws Exception {
140     testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 10, false);
141   }
142 
143   @Test
144   @Override
145   // run the MR job while HBase is offline
146   public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception {
147     testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 10, true);
148   }
149 
150   @Override
151   public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName,
152       String snapshotName, Path tmpTableDir) throws Exception {
153     JobConf job = new JobConf(UTIL.getConfiguration());
154     TableMapReduceUtil.initTableSnapshotMapJob(snapshotName,
155       COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
156       NullWritable.class, job, false, tmpTableDir);
157   }
158 
159   @Override
160   protected void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
161       int numRegions, int expectedNumSplits) throws Exception {
162     setupCluster();
163     TableName tableName = TableName.valueOf("testWithMockedMapReduce");
164     try {
165       createTableAndSnapshot(
166         util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
167 
168       JobConf job = new JobConf(util.getConfiguration());
169       Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
170 
171       TableMapReduceUtil.initTableSnapshotMapJob(snapshotName,
172         COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
173         NullWritable.class, job, false, tmpTableDir);
174 
175       // mapred doesn't support start and end keys? o.O
176       verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
177 
178     } finally {
179       util.getHBaseAdmin().deleteSnapshot(snapshotName);
180       util.deleteTable(tableName);
181       tearDownCluster();
182     }
183   }
184 
185   private void verifyWithMockedMapReduce(JobConf job, int numRegions, int expectedNumSplits,
186       byte[] startRow, byte[] stopRow) throws IOException, InterruptedException {
187     TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
188     InputSplit[] splits = tsif.getSplits(job, 0);
189 
190     Assert.assertEquals(expectedNumSplits, splits.length);
191 
192     HBaseTestingUtility.SeenRowTracker rowTracker =
193       new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
194 
195     for (int i = 0; i < splits.length; i++) {
196       // validate input split
197       InputSplit split = splits[i];
198       Assert.assertTrue(split instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit);
199 
200       // validate record reader
201       OutputCollector collector = mock(OutputCollector.class);
202       Reporter reporter = mock(Reporter.class);
203       RecordReader<ImmutableBytesWritable, Result> rr = tsif.getRecordReader(split, job, reporter);
204 
205       // validate we can read all the data back
206       ImmutableBytesWritable key = rr.createKey();
207       Result value = rr.createValue();
208       while (rr.next(key, value)) {
209         verifyRowFromMap(key, value);
210         rowTracker.addRow(key.copyBytes());
211       }
212 
213       rr.close();
214     }
215 
216     // validate all rows are seen
217     rowTracker.validate();
218   }
219 
220   @Override
221   protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
222       String snapshotName, Path tableDir, int numRegions, int expectedNumSplits,
223       boolean shutdownCluster) throws Exception {
224     doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir,
225       numRegions, expectedNumSplits, shutdownCluster);
226   }
227 
228   // this is also called by the IntegrationTestTableSnapshotInputFormat
229   public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName,
230       String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
231       int expectedNumSplits, boolean shutdownCluster) throws Exception {
232 
233     //create the table and snapshot
234     createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions);
235 
236     if (shutdownCluster) {
237       util.shutdownMiniHBaseCluster();
238     }
239 
240     try {
241       // create the job
242       JobConf jobConf = new JobConf(util.getConfiguration());
243 
244       jobConf.setJarByClass(util.getClass());
245       org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(jobConf,
246         TestTableSnapshotInputFormat.class);
247 
248       TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS,
249         TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
250         NullWritable.class, jobConf, true, tableDir);
251 
252       jobConf.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
253       jobConf.setNumReduceTasks(1);
254       jobConf.setOutputFormat(NullOutputFormat.class);
255 
256       RunningJob job = JobClient.runJob(jobConf);
257       Assert.assertTrue(job.isSuccessful());
258     } finally {
259       if (!shutdownCluster) {
260         util.getHBaseAdmin().deleteSnapshot(snapshotName);
261         util.deleteTable(tableName);
262       }
263     }
264   }
265 }