View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.backup.mapreduce;
19  
20  import java.io.IOException;
21  
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.apache.hadoop.conf.Configuration;
25  import org.apache.hadoop.conf.Configured;
26  import org.apache.hadoop.fs.Path;
27  import org.apache.hadoop.hbase.CellUtil;
28  import org.apache.hadoop.hbase.HBaseConfiguration;
29  import org.apache.hadoop.hbase.KeyValue;
30  import org.apache.hadoop.hbase.KeyValue.Type;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.hbase.client.Connection;
34  import org.apache.hadoop.hbase.client.ConnectionFactory;
35  import org.apache.hadoop.hbase.client.RegionLocator;
36  import org.apache.hadoop.hbase.client.Table;
37  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
38  import org.apache.hadoop.hbase.mapreduce.HFileInputFormat;
39  import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
40  import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
41  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
42  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
43  import org.apache.hadoop.io.NullWritable;
44  import org.apache.hadoop.mapreduce.Job;
45  import org.apache.hadoop.mapreduce.Mapper;
46  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
47  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
48  import org.apache.hadoop.util.Tool;
49  import org.apache.hadoop.util.ToolRunner;
50  
51  /**
52   * A tool to split HFiles into new region boundaries as a MapReduce job. The tool generates HFiles
53   * for later bulk importing.
54   */
55  @InterfaceAudience.Private
56  public class HFileSplitter extends Configured implements Tool {
57    private static final Log LOG = LogFactory.getLog(HFileSplitter.class);
58    final static String NAME = "HFileSplitter";
59    public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output";
60    public final static String TABLES_KEY = "hfile.input.tables";
61    public final static String TABLE_MAP_KEY = "hfile.input.tablesmap";
62    private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
63  
64    public HFileSplitter() {
65    }
66  
67    protected HFileSplitter(final Configuration c) {
68      super(c);
69    }
70  
71    /**
72     * A mapper that just writes out cells. This one can be used together with
73     * {@link KeyValueSortReducer}
74     */
75    static class HFileCellMapper extends
76        Mapper<NullWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
77  
78      @Override
79      public void map(NullWritable key, KeyValue value, Context context) throws IOException,
80          InterruptedException {
81        // Convert value to KeyValue if subclass
82        if (!value.getClass().equals(KeyValue.class)) {
83          value =
84              new KeyValue(value.getRowArray(), value.getRowOffset(), value.getRowLength(),
85                  value.getFamilyArray(), value.getFamilyOffset(), value.getFamilyLength(),
86                  value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength(),
87                  value.getTimestamp(), Type.codeToType(value.getTypeByte()), value.getValueArray(),
88                  value.getValueOffset(), value.getValueLength());
89        }
90        context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), value);
91      }
92  
93      @Override
94      public void setup(Context context) throws IOException {
95        // do nothing
96      }
97    }
98  
99    /**
100    * Sets up the actual job.
101    * @param args The command line parameters.
102    * @return The newly created job.
103    * @throws IOException When setting up the job fails.
104    */
105   public Job createSubmittableJob(String[] args) throws IOException {
106     Configuration conf = getConf();
107     String inputDirs = args[0];
108     String tabName = args[1];
109     conf.setStrings(TABLES_KEY, tabName);
110     Job job =
111         Job.getInstance(conf,
112           conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
113     job.setJarByClass(HFileSplitter.class);
114     FileInputFormat.addInputPaths(job, inputDirs);
115     job.setInputFormatClass(HFileInputFormat.class);
116     job.setMapOutputKeyClass(ImmutableBytesWritable.class);
117     String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
118     if (hfileOutPath != null) {
119       LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
120       TableName tableName = TableName.valueOf(tabName);
121       job.setMapperClass(HFileCellMapper.class);
122       job.setReducerClass(KeyValueSortReducer.class);
123       Path outputDir = new Path(hfileOutPath);
124       FileOutputFormat.setOutputPath(job, outputDir);
125       job.setMapOutputValueClass(KeyValue.class);
126       try (Connection conn = ConnectionFactory.createConnection(conf);
127           Table table = conn.getTable(tableName);
128           RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
129         HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
130       }
131       LOG.debug("success configuring load incremental job");
132 
133       TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
134         com.google.common.base.Preconditions.class);
135     } else {
136       throw new IOException("No bulk output directory specified");
137     }
138     return job;
139   }
140 
141   /**
142    * Print usage
143    * @param errorMsg Error message. Can be null.
144    */
145   private void usage(final String errorMsg) {
146     if (errorMsg != null && errorMsg.length() > 0) {
147       System.err.println("ERROR: " + errorMsg);
148     }
149     System.err.println("Usage: " + NAME + " [options] <HFile inputdir(s)> <table>");
150     System.err.println("Read all HFile's for <table> and split them to <table> region boundaries.");
151     System.err.println("<table>  table to load.\n");
152     System.err.println("To generate HFiles for a bulk data load, pass the option:");
153     System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
154     System.err.println("Other options:");
155     System.err.println("   -D " + JOB_NAME_CONF_KEY
156         + "=jobName - use the specified mapreduce job name for the HFile splitter");
157     System.err.println("For performance also consider the following options:\n"
158         + "  -Dmapreduce.map.speculative=false\n" + "  -Dmapreduce.reduce.speculative=false");
159   }
160 
161   /**
162    * Main entry point.
163    * @param args The command line parameters.
164    * @throws Exception When running the job fails.
165    */
166   public static void main(String[] args) throws Exception {
167     int ret = ToolRunner.run(new HFileSplitter(HBaseConfiguration.create()), args);
168     System.exit(ret);
169   }
170 
171   @Override
172   public int run(String[] args) throws Exception {
173     if (args.length < 2) {
174       usage("Wrong number of arguments: " + args.length);
175       System.exit(-1);
176     }
177     Job job = createSubmittableJob(args);
178     int result = job.waitForCompletion(true) ? 0 : 1;
179     return result;
180   }
181 }