1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.IOException;
21 import java.text.ParseException;
22 import java.text.SimpleDateFormat;
23 import java.util.Map;
24 import java.util.TreeMap;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.conf.Configured;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.Cell;
32 import org.apache.hadoop.hbase.CellUtil;
33 import org.apache.hadoop.hbase.HBaseConfiguration;
34 import org.apache.hadoop.hbase.KeyValue;
35 import org.apache.hadoop.hbase.KeyValueUtil;
36 import org.apache.hadoop.hbase.TableName;
37 import org.apache.hadoop.hbase.classification.InterfaceAudience;
38 import org.apache.hadoop.hbase.classification.InterfaceStability;
39 import org.apache.hadoop.hbase.client.Connection;
40 import org.apache.hadoop.hbase.client.ConnectionFactory;
41 import org.apache.hadoop.hbase.client.Delete;
42 import org.apache.hadoop.hbase.client.Mutation;
43 import org.apache.hadoop.hbase.client.Put;
44 import org.apache.hadoop.hbase.client.RegionLocator;
45 import org.apache.hadoop.hbase.client.Table;
46 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
47 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
48 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
49 import org.apache.hadoop.hbase.util.Bytes;
50 import org.apache.hadoop.hbase.wal.WALKey;
51 import org.apache.hadoop.mapreduce.Job;
52 import org.apache.hadoop.mapreduce.Mapper;
53 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
54 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
55 import org.apache.hadoop.util.GenericOptionsParser;
56 import org.apache.hadoop.util.Tool;
57 import org.apache.hadoop.util.ToolRunner;
58
59
60
61
62
63
64
65
66
67
68
69 @InterfaceAudience.Public
70 @InterfaceStability.Stable
71 public class WALPlayer extends Configured implements Tool {
72 final static Log LOG = LogFactory.getLog(WALPlayer.class);
73 final static String NAME = "WALPlayer";
74 public final static String BULK_OUTPUT_CONF_KEY = "wal.bulk.output";
75 public final static String TABLES_KEY = "wal.input.tables";
76 public final static String TABLE_MAP_KEY = "wal.input.tablesmap";
77 public final static String INPUT_FILES_SEPARATOR_KEY = "wal.input.separator";
78
79
80 static {
81 Configuration.addDeprecation("hlog.bulk.output", BULK_OUTPUT_CONF_KEY);
82 Configuration.addDeprecation("hlog.input.tables", TABLES_KEY);
83 Configuration.addDeprecation("hlog.input.tablesmap", TABLE_MAP_KEY);
84 Configuration.addDeprecation(HLogInputFormat.START_TIME_KEY, WALInputFormat.START_TIME_KEY);
85 Configuration.addDeprecation(HLogInputFormat.END_TIME_KEY, WALInputFormat.END_TIME_KEY);
86 }
87
88 public WALPlayer(){
89 }
90
91
92
93
94
95 static class WALKeyValueMapper
96 extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue> {
97 private byte[] table;
98
99 @Override
100 public void map(WALKey key, WALEdit value,
101 Context context)
102 throws IOException {
103 try {
104
105 if (Bytes.equals(table, key.getTablename().getName())) {
106 for (Cell cell : value.getCells()) {
107 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
108 if (WALEdit.isMetaEditFamily(kv.getFamily())) continue;
109 context.write(new ImmutableBytesWritable(kv.getRow()), kv);
110 }
111 }
112 } catch (InterruptedException e) {
113 e.printStackTrace();
114 }
115 }
116
117 @Override
118 public void setup(Context context) throws IOException {
119
120 String tables[] = context.getConfiguration().getStrings(TABLES_KEY);
121 if (tables == null || tables.length != 1) {
122
123 throw new IOException("Exactly one table must be specified for bulk HFile case.");
124 }
125 table = Bytes.toBytes(tables[0]);
126 }
127 }
128
129
130
131
132
133 protected static class WALMapper
134 extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation> {
135 private Map<TableName, TableName> tables = new TreeMap<TableName, TableName>();
136
137
138 @Override
139 public void map(WALKey key, WALEdit value, Context context)
140 throws IOException {
141 try {
142 if (tables.isEmpty() || tables.containsKey(key.getTablename())) {
143 TableName targetTable = tables.isEmpty() ?
144 key.getTablename() :
145 tables.get(key.getTablename());
146 ImmutableBytesWritable tableOut = new ImmutableBytesWritable(targetTable.getName());
147 Put put = null;
148 Delete del = null;
149 Cell lastCell = null;
150
151 for (Cell cell : value.getCells()) {
152
153 if (WALEdit.isMetaEditFamily(cell.getFamily())) continue;
154
155
156 if (filter(context, cell)) {
157
158
159
160
161 if (lastCell == null || lastCell.getTypeByte() != cell.getTypeByte()
162 || !CellUtil.matchingRow(lastCell, cell)) {
163
164 if (put != null) {
165 context.write(tableOut, put);
166 }
167 if (del != null) {
168 context.write(tableOut, del);
169 }
170 if (CellUtil.isDelete(cell)) {
171 del = new Delete(cell.getRow());
172 } else {
173 put = new Put(cell.getRow());
174 }
175 }
176 if (CellUtil.isDelete(cell)) {
177 del.addDeleteMarker(cell);
178 } else {
179 put.add(cell);
180 }
181 }
182 lastCell = cell;
183 }
184
185 if (put != null) {
186 context.write(tableOut, put);
187 }
188 if (del != null) {
189 context.write(tableOut, del);
190 }
191 }
192 } catch (InterruptedException e) {
193 e.printStackTrace();
194 }
195 }
196
197
198
199
200
201
202 protected boolean filter(Context context, final Cell cell) {
203 return true;
204 }
205
206 @Override
207 protected void
208 cleanup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
209 throws IOException, InterruptedException {
210 super.cleanup(context);
211 }
212
213 @Override
214 public void setup(Context context) throws IOException {
215 String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY);
216 String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY);
217
218 if (tableMap == null) {
219 tableMap = tablesToUse;
220 }
221 if (tablesToUse == null) {
222
223 } else if (tablesToUse.length != tableMap.length) {
224
225 throw new IOException("Incorrect table mapping specified.");
226 }
227 int i = 0;
228 if (tablesToUse != null) {
229 for (String table : tablesToUse) {
230 tables.put(TableName.valueOf(table),
231 TableName.valueOf(tableMap[i++]));
232 }
233 }
234 }
235 }
236
237
238
239
240 public WALPlayer(Configuration conf) {
241 super(conf);
242 }
243
244 void setupTime(Configuration conf, String option) throws IOException {
245 String val = conf.get(option);
246 if (null == val) {
247 return;
248 }
249 long ms;
250 try {
251
252 ms = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SS").parse(val).getTime();
253 } catch (ParseException pe) {
254 try {
255
256 ms = Long.parseLong(val);
257 } catch (NumberFormatException nfe) {
258 throw new IOException(option
259 + " must be specified either in the form 2001-02-20T16:35:06.99 "
260 + "or as number of milliseconds");
261 }
262 }
263 conf.setLong(option, ms);
264 }
265
266
267
268
269
270
271
272
273 public Job createSubmittableJob(String[] args)
274 throws IOException {
275 Configuration conf = getConf();
276 setupTime(conf, HLogInputFormat.START_TIME_KEY);
277 setupTime(conf, HLogInputFormat.END_TIME_KEY);
278 String inputDirs = args[0];
279 String[] tables = args[1].split(",");
280 String[] tableMap;
281 if (args.length > 2) {
282 tableMap = args[2].split(",");
283 if (tableMap.length != tables.length) {
284 throw new IOException("The same number of tables and mapping must be provided.");
285 }
286 } else {
287
288 tableMap = tables;
289 }
290 conf.setStrings(TABLES_KEY, tables);
291 conf.setStrings(TABLE_MAP_KEY, tableMap);
292 conf.set(FileInputFormat.INPUT_DIR, inputDirs);
293 Job job = new Job(conf, NAME + "_" + System.currentTimeMillis());
294 job.setJarByClass(WALPlayer.class);
295 job.setInputFormatClass(WALInputFormat.class);
296 job.setMapOutputKeyClass(ImmutableBytesWritable.class);
297 String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
298 if (hfileOutPath != null) {
299
300 if (tables.length != 1) {
301 throw new IOException("Exactly one table must be specified for the bulk export option");
302 }
303 TableName tableName = TableName.valueOf(tables[0]);
304 job.setMapperClass(WALKeyValueMapper.class);
305 job.setReducerClass(KeyValueSortReducer.class);
306 Path outputDir = new Path(hfileOutPath);
307 FileOutputFormat.setOutputPath(job, outputDir);
308 job.setMapOutputValueClass(KeyValue.class);
309 try (Connection conn = ConnectionFactory.createConnection(conf);
310 Table table = conn.getTable(tableName);
311 RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
312 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
313 }
314 LOG.debug("success configuring load incremental job");
315
316 TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
317 com.google.common.base.Preconditions.class);
318 } else {
319
320 job.setMapperClass(WALMapper.class);
321 job.setOutputFormatClass(MultiTableOutputFormat.class);
322 TableMapReduceUtil.addDependencyJars(job);
323 TableMapReduceUtil.initCredentials(job);
324
325 job.setNumReduceTasks(0);
326 }
327 String codecCls = WALCellCodec.getWALCellCodecClass(conf);
328 try {
329 TableMapReduceUtil.addDependencyJars(job.getConfiguration(), Class.forName(codecCls));
330 LOG.debug("tmpjars: " + job.getConfiguration().get("tmpjars"));
331 } catch (Exception e) {
332 throw new IOException("Cannot determine wal codec class " + codecCls, e);
333 }
334 return job;
335 }
336
337
338
339
340
341
342 private void usage(final String errorMsg) {
343 if (errorMsg != null && errorMsg.length() > 0) {
344 System.err.println("ERROR: " + errorMsg);
345 }
346 System.err.println("Usage: " + NAME + " [options] <wal inputdir> <tables> [<tableMappings>]");
347 System.err.println("Read all WAL entries for <tables>.");
348 System.err.println("If no tables (\"\") are specific, all tables are imported.");
349 System.err.println("(Careful, even -ROOT- and hbase:meta entries will be imported"+
350 " in that case.)");
351 System.err.println("Otherwise <tables> is a comma separated list of tables.\n");
352 System.err.println("The WAL entries can be mapped to new set of tables via <tableMapping>.");
353 System.err.println("<tableMapping> is a command separated list of targettables.");
354 System.err.println("If specified, each table in <tables> must have a mapping.\n");
355 System.err.println("By default " + NAME + " will load data directly into HBase.");
356 System.err.println("To generate HFiles for a bulk data load instead, pass the option:");
357 System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
358 System.err.println(" (Only one table can be specified, and no mapping is allowed!)");
359 System.err.println("Other options: (specify time range to WAL edit to consider)");
360 System.err.println(" -D" + WALInputFormat.START_TIME_KEY + "=[date|ms]");
361 System.err.println(" -D" + WALInputFormat.END_TIME_KEY + "=[date|ms]");
362 System.err.println("For performance also consider the following options:\n"
363 + " -Dmapreduce.map.speculative=false\n"
364 + " -Dmapreduce.reduce.speculative=false");
365 }
366
367
368
369
370
371
372
373 public static void main(String[] args) throws Exception {
374 int ret = ToolRunner.run(new WALPlayer(HBaseConfiguration.create()), args);
375 System.exit(ret);
376 }
377
378 @Override
379 public int run(String[] args) throws Exception {
380 String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
381 if (otherArgs.length < 2) {
382 usage("Wrong number of arguments: " + otherArgs.length);
383 System.exit(-1);
384 }
385 Job job = createSubmittableJob(otherArgs);
386 return job.waitForCompletion(true) ? 0 : 1;
387 }
388 }