1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.backup.mapreduce;
19
20 import java.io.IOException;
21
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.conf.Configured;
26 import org.apache.hadoop.fs.Path;
27 import org.apache.hadoop.hbase.CellUtil;
28 import org.apache.hadoop.hbase.HBaseConfiguration;
29 import org.apache.hadoop.hbase.KeyValue;
30 import org.apache.hadoop.hbase.KeyValue.Type;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.hbase.client.Connection;
34 import org.apache.hadoop.hbase.client.ConnectionFactory;
35 import org.apache.hadoop.hbase.client.RegionLocator;
36 import org.apache.hadoop.hbase.client.Table;
37 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
38 import org.apache.hadoop.hbase.mapreduce.HFileInputFormat;
39 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
40 import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
41 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
42 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
43 import org.apache.hadoop.io.NullWritable;
44 import org.apache.hadoop.mapreduce.Job;
45 import org.apache.hadoop.mapreduce.Mapper;
46 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
47 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
48 import org.apache.hadoop.util.Tool;
49 import org.apache.hadoop.util.ToolRunner;
50
51
52
53
54
55 @InterfaceAudience.Private
56 public class HFileSplitter extends Configured implements Tool {
57 private static final Log LOG = LogFactory.getLog(HFileSplitter.class);
58 final static String NAME = "HFileSplitter";
59 public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output";
60 public final static String TABLES_KEY = "hfile.input.tables";
61 public final static String TABLE_MAP_KEY = "hfile.input.tablesmap";
62 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
63
64 public HFileSplitter() {
65 }
66
67 protected HFileSplitter(final Configuration c) {
68 super(c);
69 }
70
71
72
73
74
75 static class HFileCellMapper extends
76 Mapper<NullWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
77
78 @Override
79 public void map(NullWritable key, KeyValue value, Context context) throws IOException,
80 InterruptedException {
81
82 if (!value.getClass().equals(KeyValue.class)) {
83 value =
84 new KeyValue(value.getRowArray(), value.getRowOffset(), value.getRowLength(),
85 value.getFamilyArray(), value.getFamilyOffset(), value.getFamilyLength(),
86 value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength(),
87 value.getTimestamp(), Type.codeToType(value.getTypeByte()), value.getValueArray(),
88 value.getValueOffset(), value.getValueLength());
89 }
90 context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), value);
91 }
92
93 @Override
94 public void setup(Context context) throws IOException {
95
96 }
97 }
98
99
100
101
102
103
104
105 public Job createSubmittableJob(String[] args) throws IOException {
106 Configuration conf = getConf();
107 String inputDirs = args[0];
108 String tabName = args[1];
109 conf.setStrings(TABLES_KEY, tabName);
110 Job job =
111 Job.getInstance(conf,
112 conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
113 job.setJarByClass(HFileSplitter.class);
114 FileInputFormat.addInputPaths(job, inputDirs);
115 job.setInputFormatClass(HFileInputFormat.class);
116 job.setMapOutputKeyClass(ImmutableBytesWritable.class);
117 String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
118 if (hfileOutPath != null) {
119 LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
120 TableName tableName = TableName.valueOf(tabName);
121 job.setMapperClass(HFileCellMapper.class);
122 job.setReducerClass(KeyValueSortReducer.class);
123 Path outputDir = new Path(hfileOutPath);
124 FileOutputFormat.setOutputPath(job, outputDir);
125 job.setMapOutputValueClass(KeyValue.class);
126 try (Connection conn = ConnectionFactory.createConnection(conf);
127 Table table = conn.getTable(tableName);
128 RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
129 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
130 }
131 LOG.debug("success configuring load incremental job");
132
133 TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
134 com.google.common.base.Preconditions.class);
135 } else {
136 throw new IOException("No bulk output directory specified");
137 }
138 return job;
139 }
140
141
142
143
144
145 private void usage(final String errorMsg) {
146 if (errorMsg != null && errorMsg.length() > 0) {
147 System.err.println("ERROR: " + errorMsg);
148 }
149 System.err.println("Usage: " + NAME + " [options] <HFile inputdir(s)> <table>");
150 System.err.println("Read all HFile's for <table> and split them to <table> region boundaries.");
151 System.err.println("<table> table to load.\n");
152 System.err.println("To generate HFiles for a bulk data load, pass the option:");
153 System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
154 System.err.println("Other options:");
155 System.err.println(" -D " + JOB_NAME_CONF_KEY
156 + "=jobName - use the specified mapreduce job name for the HFile splitter");
157 System.err.println("For performance also consider the following options:\n"
158 + " -Dmapreduce.map.speculative=false\n" + " -Dmapreduce.reduce.speculative=false");
159 }
160
161
162
163
164
165
166 public static void main(String[] args) throws Exception {
167 int ret = ToolRunner.run(new HFileSplitter(HBaseConfiguration.create()), args);
168 System.exit(ret);
169 }
170
171 @Override
172 public int run(String[] args) throws Exception {
173 if (args.length < 2) {
174 usage("Wrong number of arguments: " + args.length);
175 System.exit(-1);
176 }
177 Job job = createSubmittableJob(args);
178 int result = job.waitForCompletion(true) ? 0 : 1;
179 return result;
180 }
181 }