View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import static org.junit.Assert.assertEquals;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.List;
26  import java.util.UUID;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configurable;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.fs.FSDataOutputStream;
33  import org.apache.hadoop.fs.FileSystem;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.hbase.HBaseTestingUtility;
36  import org.apache.hadoop.hbase.TableName;
37  import org.apache.hadoop.hbase.client.Durability;
38  import org.apache.hadoop.hbase.client.Put;
39  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
40  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
41  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
42  import org.apache.hadoop.hbase.regionserver.Region;
43  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
44  import org.apache.hadoop.hbase.testclassification.LargeTests;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.util.Tool;
47  import org.apache.hadoop.util.ToolRunner;
48  import org.junit.AfterClass;
49  import org.junit.BeforeClass;
50  import org.junit.Test;
51  import org.junit.experimental.categories.Category;
52  
53  @Category(LargeTests.class)
54  public class TestImportTSVWithTTLs implements Configurable {
55  
56    protected static final Log LOG = LogFactory.getLog(TestImportTSVWithTTLs.class);
57    protected static final String NAME = TestImportTsv.class.getSimpleName();
58    protected static HBaseTestingUtility util = new HBaseTestingUtility();
59  
60    /**
61     * Delete the tmp directory after running doMROnTableTest. Boolean. Default is
62     * false.
63     */
64    protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad";
65  
66    /**
67     * Force use of combiner in doMROnTableTest. Boolean. Default is true.
68     */
69    protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner";
70  
71    private final String FAMILY = "FAM";
72    private static Configuration conf;
73  
74    @Override
75    public Configuration getConf() {
76      return util.getConfiguration();
77    }
78  
79    @Override
80    public void setConf(Configuration conf) {
81      throw new IllegalArgumentException("setConf not supported");
82    }
83  
84    @BeforeClass
85    public static void provisionCluster() throws Exception {
86      conf = util.getConfiguration();
87      // We don't check persistence in HFiles in this test, but if we ever do we will
88      // need this where the default hfile version is not 3 (i.e. 0.98)
89      conf.setInt("hfile.format.version", 3);
90      conf.set("hbase.coprocessor.region.classes", TTLCheckingObserver.class.getName());
91      util.startMiniCluster();
92      util.startMiniMapReduceCluster();
93    }
94  
95    @AfterClass
96    public static void releaseCluster() throws Exception {
97      util.shutdownMiniMapReduceCluster();
98      util.shutdownMiniCluster();
99    }
100 
101   @Test
102   public void testMROnTable() throws Exception {
103     String tableName = "test-" + UUID.randomUUID();
104 
105     // Prepare the arguments required for the test.
106     String[] args = new String[] {
107         "-D" + ImportTsv.MAPPER_CONF_KEY
108             + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper",
109         "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_TTL",
110         "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName };
111     String data = "KEY\u001bVALUE1\u001bVALUE2\u001b1000000\n";
112     util.createTable(TableName.valueOf(tableName), FAMILY);
113     doMROnTableTest(util, FAMILY, data, args, 1);
114     util.deleteTable(tableName);
115   }
116 
117   protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data,
118       String[] args, int valueMultiplier) throws Exception {
119     TableName table = TableName.valueOf(args[args.length - 1]);
120     Configuration conf = new Configuration(util.getConfiguration());
121 
122     // populate input file
123     FileSystem fs = FileSystem.get(conf);
124     Path inputPath = fs.makeQualified(new Path(util
125         .getDataTestDirOnTestFS(table.getNameAsString()), "input.dat"));
126     FSDataOutputStream op = fs.create(inputPath, true);
127     op.write(Bytes.toBytes(data));
128     op.close();
129     LOG.debug(String.format("Wrote test data to file: %s", inputPath));
130 
131     if (conf.getBoolean(FORCE_COMBINER_CONF, true)) {
132       LOG.debug("Forcing combiner.");
133       conf.setInt("mapreduce.map.combine.minspills", 1);
134     }
135 
136     // run the import
137     List<String> argv = new ArrayList<String>(Arrays.asList(args));
138     argv.add(inputPath.toString());
139     Tool tool = new ImportTsv();
140     LOG.debug("Running ImportTsv with arguments: " + argv);
141     try {
142       // Job will fail if observer rejects entries without TTL
143       assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args)));
144     } finally {
145       // Clean up
146       if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
147         LOG.debug("Deleting test subdirectory");
148         util.cleanupDataTestDirOnTestFS(table.getNameAsString());
149       }
150     }
151 
152     return tool;
153   }
154 
155   public static class TTLCheckingObserver extends BaseRegionObserver {
156 
157     @Override
158     public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
159         Durability durability) throws IOException {
160       Region region = e.getEnvironment().getRegion();
161       if (!region.getRegionInfo().isMetaTable()
162           && !region.getRegionInfo().getTable().isSystemTable()) {
163         // The put carries the TTL attribute
164         if (put.getTTL() != Long.MAX_VALUE) {
165           return;
166         }
167         throw new IOException("Operation does not have TTL set");
168       }
169     }
170   }
171 }