1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.snapshot;
20
21 import java.io.BufferedInputStream;
22 import java.io.FileNotFoundException;
23 import java.io.DataInput;
24 import java.io.DataOutput;
25 import java.io.IOException;
26 import java.io.InputStream;
27 import java.util.ArrayList;
28 import java.util.Collections;
29 import java.util.Comparator;
30 import java.util.LinkedList;
31 import java.util.List;
32 import java.util.Random;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.hbase.classification.InterfaceAudience;
37 import org.apache.hadoop.hbase.classification.InterfaceStability;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.conf.Configured;
40 import org.apache.hadoop.fs.FSDataInputStream;
41 import org.apache.hadoop.fs.FSDataOutputStream;
42 import org.apache.hadoop.fs.FileChecksum;
43 import org.apache.hadoop.fs.FileStatus;
44 import org.apache.hadoop.fs.FileSystem;
45 import org.apache.hadoop.fs.FileUtil;
46 import org.apache.hadoop.fs.Path;
47 import org.apache.hadoop.fs.permission.FsPermission;
48 import org.apache.hadoop.hbase.TableName;
49 import org.apache.hadoop.hbase.HBaseConfiguration;
50 import org.apache.hadoop.hbase.HConstants;
51 import org.apache.hadoop.hbase.HRegionInfo;
52 import org.apache.hadoop.hbase.io.FileLink;
53 import org.apache.hadoop.hbase.io.HFileLink;
54 import org.apache.hadoop.hbase.io.hfile.HFile;
55 import org.apache.hadoop.hbase.mapreduce.JobUtil;
56 import org.apache.hadoop.hbase.io.WALLink;
57 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
58 import org.apache.hadoop.hbase.mob.MobUtils;
59 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
60 import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
61 import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
62 import org.apache.hadoop.hbase.util.FSUtils;
63 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
64 import org.apache.hadoop.hbase.util.Pair;
65 import org.apache.hadoop.io.BytesWritable;
66 import org.apache.hadoop.io.IOUtils;
67 import org.apache.hadoop.io.NullWritable;
68 import org.apache.hadoop.io.Writable;
69 import org.apache.hadoop.mapreduce.Job;
70 import org.apache.hadoop.mapreduce.JobContext;
71 import org.apache.hadoop.mapreduce.Mapper;
72 import org.apache.hadoop.mapreduce.InputFormat;
73 import org.apache.hadoop.mapreduce.InputSplit;
74 import org.apache.hadoop.mapreduce.RecordReader;
75 import org.apache.hadoop.mapreduce.TaskAttemptContext;
76 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
77 import org.apache.hadoop.mapreduce.security.TokenCache;
78 import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
79 import org.apache.hadoop.util.StringUtils;
80 import org.apache.hadoop.util.Tool;
81 import org.apache.hadoop.util.ToolRunner;
82
83
84
85
86
87
88
89
90 @InterfaceAudience.Public
91 @InterfaceStability.Evolving
92 public class ExportSnapshot extends Configured implements Tool {
93 public static final String NAME = "exportsnapshot";
94
95 private static final Log LOG = LogFactory.getLog(ExportSnapshot.class);
96
97 private static final String MR_NUM_MAPS = "mapreduce.job.maps";
98 private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
99 private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
100 private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
101 private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
102 private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
103 private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
104 private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
105 private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
106 private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
107 private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
108 private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
109 private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
110 protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
111
112 static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
113 static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry";
114
115 private static final String INPUT_FOLDER_PREFIX = "export-files.";
116
117
118 public enum Counter {
119 MISSING_FILES, FILES_COPIED, FILES_SKIPPED, COPY_FAILED,
120 BYTES_EXPECTED, BYTES_SKIPPED, BYTES_COPIED
121 }
122
123 private static class ExportMapper extends Mapper<BytesWritable, NullWritable,
124 NullWritable, NullWritable> {
125 final static int REPORT_SIZE = 1 * 1024 * 1024;
126 final static int BUFFER_SIZE = 64 * 1024;
127
128 private boolean testFailures;
129 private Random random;
130
131 private boolean verifyChecksum;
132 private String filesGroup;
133 private String filesUser;
134 private short filesMode;
135 private int bufferSize;
136
137 private FileSystem outputFs;
138 private Path outputArchive;
139 private Path outputRoot;
140
141 private FileSystem inputFs;
142 private Path inputArchive;
143 private Path inputRoot;
144
145 @Override
146 public void setup(Context context) throws IOException {
147 Configuration conf = context.getConfiguration();
148 verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
149
150 filesGroup = conf.get(CONF_FILES_GROUP);
151 filesUser = conf.get(CONF_FILES_USER);
152 filesMode = (short)conf.getInt(CONF_FILES_MODE, 0);
153 outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
154 inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
155
156 inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
157 outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
158
159 testFailures = conf.getBoolean(CONF_TEST_FAILURE, false);
160
161 try {
162 conf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
163 inputFs = FileSystem.get(inputRoot.toUri(), conf);
164 } catch (IOException e) {
165 throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
166 }
167
168 try {
169 conf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
170 outputFs = FileSystem.get(outputRoot.toUri(), conf);
171 } catch (IOException e) {
172 throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e);
173 }
174
175
176 int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
177 bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
178 LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
179
180 for (Counter c : Counter.values()) {
181 context.getCounter(c).increment(0);
182 }
183 }
184
185 @Override
186 protected void cleanup(Context context) {
187 IOUtils.closeStream(inputFs);
188 IOUtils.closeStream(outputFs);
189 }
190
191 @Override
192 public void map(BytesWritable key, NullWritable value, Context context)
193 throws InterruptedException, IOException {
194 SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
195 Path outputPath = getOutputPath(inputInfo);
196
197 copyFile(context, inputInfo, outputPath);
198 }
199
200
201
202
203 private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
204 Path path = null;
205 switch (inputInfo.getType()) {
206 case HFILE:
207 Path inputPath = new Path(inputInfo.getHfile());
208 String family = inputPath.getParent().getName();
209 TableName table =HFileLink.getReferencedTableName(inputPath.getName());
210 String region = HFileLink.getReferencedRegionName(inputPath.getName());
211 String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
212 path = new Path(FSUtils.getTableDir(new Path("./"), table),
213 new Path(region, new Path(family, hfile)));
214 break;
215 case WAL:
216 Path oldLogsDir = new Path(outputRoot, HConstants.HREGION_OLDLOGDIR_NAME);
217 path = new Path(oldLogsDir, inputInfo.getWalName());
218 break;
219 default:
220 throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
221 }
222 return new Path(outputArchive, path);
223 }
224
225
226
227
228 private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
229 throws IOException {
230 if (testFailures) {
231 if (context.getConfiguration().getBoolean(CONF_TEST_RETRY, false)) {
232 if (random == null) {
233 random = new Random();
234 }
235
236
237
238
239 if (random.nextFloat() < 0.03) {
240 throw new IOException("TEST RETRY FAILURE: Unable to copy input=" + inputInfo
241 + " time=" + System.currentTimeMillis());
242 }
243 } else {
244 context.getCounter(Counter.COPY_FAILED).increment(1);
245 throw new IOException("TEST FAILURE: Unable to copy input=" + inputInfo);
246 }
247 }
248 }
249
250 private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
251 final Path outputPath) throws IOException {
252 injectTestFailure(context, inputInfo);
253
254
255 FileStatus inputStat = getSourceFileStatus(context, inputInfo);
256
257
258 if (outputFs.exists(outputPath)) {
259 FileStatus outputStat = outputFs.getFileStatus(outputPath);
260 if (outputStat != null && sameFile(inputStat, outputStat)) {
261 LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
262 context.getCounter(Counter.FILES_SKIPPED).increment(1);
263 context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
264 return;
265 }
266 }
267
268 InputStream in = openSourceFile(context, inputInfo);
269 int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
270 if (Integer.MAX_VALUE != bandwidthMB) {
271 in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024);
272 }
273
274 try {
275 context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
276
277
278 createOutputPath(outputPath.getParent());
279 FSDataOutputStream out = outputFs.create(outputPath, true);
280 try {
281 copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen());
282 } finally {
283 out.close();
284 }
285
286
287 if (!preserveAttributes(outputPath, inputStat)) {
288 LOG.warn("You may have to run manually chown on: " + outputPath);
289 }
290 } finally {
291 in.close();
292 }
293 }
294
295
296
297
298 private void createOutputPath(final Path path) throws IOException {
299 if (filesUser == null && filesGroup == null) {
300 outputFs.mkdirs(path);
301 } else {
302 Path parent = path.getParent();
303 if (!outputFs.exists(parent) && !parent.isRoot()) {
304 createOutputPath(parent);
305 }
306 outputFs.mkdirs(path);
307 if (filesUser != null || filesGroup != null) {
308
309 outputFs.setOwner(path, filesUser, filesGroup);
310 }
311 if (filesMode > 0) {
312 outputFs.setPermission(path, new FsPermission(filesMode));
313 }
314 }
315 }
316
317
318
319
320
321
322
323
324
325 private boolean preserveAttributes(final Path path, final FileStatus refStat) {
326 FileStatus stat;
327 try {
328 stat = outputFs.getFileStatus(path);
329 } catch (IOException e) {
330 LOG.warn("Unable to get the status for file=" + path);
331 return false;
332 }
333
334 try {
335 if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
336 outputFs.setPermission(path, new FsPermission(filesMode));
337 } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
338 outputFs.setPermission(path, refStat.getPermission());
339 }
340 } catch (IOException e) {
341 LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage());
342 return false;
343 }
344
345 boolean hasRefStat = (refStat != null);
346 String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
347 String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
348 if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
349 try {
350 if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
351 outputFs.setOwner(path, user, group);
352 }
353 } catch (IOException e) {
354 LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage());
355 LOG.warn("The user/group may not exist on the destination cluster: user=" +
356 user + " group=" + group);
357 return false;
358 }
359 }
360
361 return true;
362 }
363
364 private boolean stringIsNotEmpty(final String str) {
365 return str != null && str.length() > 0;
366 }
367
368 private void copyData(final Context context,
369 final Path inputPath, final InputStream in,
370 final Path outputPath, final FSDataOutputStream out,
371 final long inputFileSize)
372 throws IOException {
373 final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) +
374 " (%.1f%%)";
375
376 try {
377 byte[] buffer = new byte[bufferSize];
378 long totalBytesWritten = 0;
379 int reportBytes = 0;
380 int bytesRead;
381
382 long stime = System.currentTimeMillis();
383 while ((bytesRead = in.read(buffer)) > 0) {
384 out.write(buffer, 0, bytesRead);
385 totalBytesWritten += bytesRead;
386 reportBytes += bytesRead;
387
388 if (reportBytes >= REPORT_SIZE) {
389 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
390 context.setStatus(String.format(statusMessage,
391 StringUtils.humanReadableInt(totalBytesWritten),
392 (totalBytesWritten/(float)inputFileSize) * 100.0f) +
393 " from " + inputPath + " to " + outputPath);
394 reportBytes = 0;
395 }
396 }
397 long etime = System.currentTimeMillis();
398
399 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
400 context.setStatus(String.format(statusMessage,
401 StringUtils.humanReadableInt(totalBytesWritten),
402 (totalBytesWritten/(float)inputFileSize) * 100.0f) +
403 " from " + inputPath + " to " + outputPath);
404
405
406 if (totalBytesWritten != inputFileSize) {
407 String msg = "number of bytes copied not matching copied=" + totalBytesWritten +
408 " expected=" + inputFileSize + " for file=" + inputPath;
409 throw new IOException(msg);
410 }
411
412 LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
413 LOG.info("size=" + totalBytesWritten +
414 " (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" +
415 " time=" + StringUtils.formatTimeDiff(etime, stime) +
416 String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0));
417 context.getCounter(Counter.FILES_COPIED).increment(1);
418 } catch (IOException e) {
419 LOG.error("Error copying " + inputPath + " to " + outputPath, e);
420 context.getCounter(Counter.COPY_FAILED).increment(1);
421 throw e;
422 }
423 }
424
425
426
427
428
429
430 private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
431 throws IOException {
432 try {
433 Configuration conf = context.getConfiguration();
434 FileLink link = null;
435 switch (fileInfo.getType()) {
436 case HFILE:
437 Path inputPath = new Path(fileInfo.getHfile());
438 link = getFileLink(inputPath, conf);
439 break;
440 case WAL:
441 String serverName = fileInfo.getWalServer();
442 String logName = fileInfo.getWalName();
443 link = new WALLink(inputRoot, serverName, logName);
444 break;
445 default:
446 throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
447 }
448 return link.open(inputFs);
449 } catch (IOException e) {
450 context.getCounter(Counter.MISSING_FILES).increment(1);
451 LOG.error("Unable to open source file=" + fileInfo.toString(), e);
452 throw e;
453 }
454 }
455
456 private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
457 throws IOException {
458 try {
459 Configuration conf = context.getConfiguration();
460 FileLink link = null;
461 switch (fileInfo.getType()) {
462 case HFILE:
463 Path inputPath = new Path(fileInfo.getHfile());
464 link = getFileLink(inputPath, conf);
465 break;
466 case WAL:
467 link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
468 break;
469 default:
470 throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
471 }
472 return link.getFileStatus(inputFs);
473 } catch (FileNotFoundException e) {
474 context.getCounter(Counter.MISSING_FILES).increment(1);
475 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
476 throw e;
477 } catch (IOException e) {
478 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
479 throw e;
480 }
481 }
482
483 private FileLink getFileLink(Path path, Configuration conf) throws IOException{
484 String regionName = HFileLink.getReferencedRegionName(path.getName());
485 TableName tableName = HFileLink.getReferencedTableName(path.getName());
486 if(MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) {
487 return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf),
488 HFileArchiveUtil.getArchivePath(conf), path);
489 }
490 return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path);
491 }
492
493 private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
494 try {
495 return fs.getFileChecksum(path);
496 } catch (IOException e) {
497 LOG.warn("Unable to get checksum for file=" + path, e);
498 return null;
499 }
500 }
501
502
503
504
505
506 private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
507
508 if (inputStat.getLen() != outputStat.getLen()) return false;
509
510
511 if (!verifyChecksum) return true;
512
513
514 FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
515 if (inChecksum == null) return false;
516
517 FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
518 if (outChecksum == null) return false;
519
520 return inChecksum.equals(outChecksum);
521 }
522 }
523
524
525
526
527
528
529
530
531
532 private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
533 final FileSystem fs, final Path snapshotDir) throws IOException {
534 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
535
536 final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<Pair<SnapshotFileInfo, Long>>();
537 final TableName table = TableName.valueOf(snapshotDesc.getTable());
538
539
540 LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
541 SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
542 new SnapshotReferenceUtil.SnapshotVisitor() {
543 @Override
544 public void storeFile(final HRegionInfo regionInfo, final String family,
545 final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
546 if (storeFile.hasReference()) {
547
548 } else {
549 String region = regionInfo.getEncodedName();
550 String hfile = storeFile.getName();
551 Path path = HFileLink.createPath(table, region, family, hfile);
552
553 SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
554 .setType(SnapshotFileInfo.Type.HFILE)
555 .setHfile(path.toString())
556 .build();
557
558 long size;
559 if (storeFile.hasFileSize()) {
560 size = storeFile.getFileSize();
561 } else {
562 size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen();
563 }
564 files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
565 }
566 }
567
568 @Override
569 public void logFile (final String server, final String logfile)
570 throws IOException {
571 SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
572 .setType(SnapshotFileInfo.Type.WAL)
573 .setWalServer(server)
574 .setWalName(logfile)
575 .build();
576
577 long size = new WALLink(conf, server, logfile).getFileStatus(fs).getLen();
578 files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
579 }
580 });
581
582 return files;
583 }
584
585
586
587
588
589
590
591
592
593 static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits(
594 final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
595
596 Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
597 public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
598 long r = a.getSecond() - b.getSecond();
599 return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
600 }
601 });
602
603
604 List<List<Pair<SnapshotFileInfo, Long>>> fileGroups =
605 new LinkedList<List<Pair<SnapshotFileInfo, Long>>>();
606 long[] sizeGroups = new long[ngroups];
607 int hi = files.size() - 1;
608 int lo = 0;
609
610 List<Pair<SnapshotFileInfo, Long>> group;
611 int dir = 1;
612 int g = 0;
613
614 while (hi >= lo) {
615 if (g == fileGroups.size()) {
616 group = new LinkedList<Pair<SnapshotFileInfo, Long>>();
617 fileGroups.add(group);
618 } else {
619 group = fileGroups.get(g);
620 }
621
622 Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
623
624
625 sizeGroups[g] += fileInfo.getSecond();
626 group.add(fileInfo);
627
628
629 g += dir;
630 if (g == ngroups) {
631 dir = -1;
632 g = ngroups - 1;
633 } else if (g < 0) {
634 dir = 1;
635 g = 0;
636 }
637 }
638
639 if (LOG.isDebugEnabled()) {
640 for (int i = 0; i < sizeGroups.length; ++i) {
641 LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
642 }
643 }
644
645 return fileGroups;
646 }
647
648 private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
649 @Override
650 public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
651 TaskAttemptContext tac) throws IOException, InterruptedException {
652 return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit)split).getSplitKeys());
653 }
654
655 @Override
656 public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
657 Configuration conf = context.getConfiguration();
658 String snapshotName = conf.get(CONF_SNAPSHOT_NAME);
659 Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
660 FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
661
662 List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
663 int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
664 if (mappers == 0 && snapshotFiles.size() > 0) {
665 mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
666 mappers = Math.min(mappers, snapshotFiles.size());
667 conf.setInt(CONF_NUM_SPLITS, mappers);
668 conf.setInt(MR_NUM_MAPS, mappers);
669 }
670
671 List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
672 List<InputSplit> splits = new ArrayList(groups.size());
673 for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
674 splits.add(new ExportSnapshotInputSplit(files));
675 }
676 return splits;
677 }
678
679 private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
680 private List<Pair<BytesWritable, Long>> files;
681 private long length;
682
683 public ExportSnapshotInputSplit() {
684 this.files = null;
685 }
686
687 public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
688 this.files = new ArrayList(snapshotFiles.size());
689 for (Pair<SnapshotFileInfo, Long> fileInfo: snapshotFiles) {
690 this.files.add(new Pair<BytesWritable, Long>(
691 new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
692 this.length += fileInfo.getSecond();
693 }
694 }
695
696 private List<Pair<BytesWritable, Long>> getSplitKeys() {
697 return files;
698 }
699
700 @Override
701 public long getLength() throws IOException, InterruptedException {
702 return length;
703 }
704
705 @Override
706 public String[] getLocations() throws IOException, InterruptedException {
707 return new String[] {};
708 }
709
710 @Override
711 public void readFields(DataInput in) throws IOException {
712 int count = in.readInt();
713 files = new ArrayList<Pair<BytesWritable, Long>>(count);
714 length = 0;
715 for (int i = 0; i < count; ++i) {
716 BytesWritable fileInfo = new BytesWritable();
717 fileInfo.readFields(in);
718 long size = in.readLong();
719 files.add(new Pair<BytesWritable, Long>(fileInfo, size));
720 length += size;
721 }
722 }
723
724 @Override
725 public void write(DataOutput out) throws IOException {
726 out.writeInt(files.size());
727 for (final Pair<BytesWritable, Long> fileInfo: files) {
728 fileInfo.getFirst().write(out);
729 out.writeLong(fileInfo.getSecond());
730 }
731 }
732 }
733
734 private static class ExportSnapshotRecordReader
735 extends RecordReader<BytesWritable, NullWritable> {
736 private final List<Pair<BytesWritable, Long>> files;
737 private long totalSize = 0;
738 private long procSize = 0;
739 private int index = -1;
740
741 ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
742 this.files = files;
743 for (Pair<BytesWritable, Long> fileInfo: files) {
744 totalSize += fileInfo.getSecond();
745 }
746 }
747
748 @Override
749 public void close() { }
750
751 @Override
752 public BytesWritable getCurrentKey() { return files.get(index).getFirst(); }
753
754 @Override
755 public NullWritable getCurrentValue() { return NullWritable.get(); }
756
757 @Override
758 public float getProgress() { return (float)procSize / totalSize; }
759
760 @Override
761 public void initialize(InputSplit split, TaskAttemptContext tac) { }
762
763 @Override
764 public boolean nextKeyValue() {
765 if (index >= 0) {
766 procSize += files.get(index).getSecond();
767 }
768 return(++index < files.size());
769 }
770 }
771 }
772
773
774
775
776
777
778
779
780 private void runCopyJob(final Path inputRoot, final Path outputRoot,
781 final String snapshotName, final Path snapshotDir, final boolean verifyChecksum,
782 final String filesUser, final String filesGroup, final int filesMode,
783 final int mappers, final int bandwidthMB)
784 throws IOException, InterruptedException, ClassNotFoundException {
785 Configuration conf = getConf();
786 if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
787 if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
788 if (mappers > 0) {
789 conf.setInt(CONF_NUM_SPLITS, mappers);
790 conf.setInt(MR_NUM_MAPS, mappers);
791 }
792 conf.setInt(CONF_FILES_MODE, filesMode);
793 conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
794 conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
795 conf.set(CONF_INPUT_ROOT, inputRoot.toString());
796 conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
797 conf.set(CONF_SNAPSHOT_NAME, snapshotName);
798 conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
799
800 Job job = new Job(conf);
801 job.setJobName("ExportSnapshot-" + snapshotName);
802 job.setJarByClass(ExportSnapshot.class);
803 TableMapReduceUtil.addDependencyJars(job);
804 job.setMapperClass(ExportMapper.class);
805 job.setInputFormatClass(ExportSnapshotInputFormat.class);
806 job.setOutputFormatClass(NullOutputFormat.class);
807 job.setMapSpeculativeExecution(false);
808 job.setNumReduceTasks(0);
809
810
811 TokenCache.obtainTokensForNamenodes(job.getCredentials(),
812 new Path[] { inputRoot, outputRoot }, conf);
813
814
815 if (!job.waitForCompletion(true)) {
816
817
818 throw new ExportSnapshotException("Copy Files Map-Reduce Job failed");
819 }
820 }
821
822 private void verifySnapshot(final Configuration baseConf,
823 final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException {
824
825 Configuration conf = new Configuration(baseConf);
826 FSUtils.setRootDir(conf, rootDir);
827 FSUtils.setFsDefault(conf, FSUtils.getRootDir(conf));
828 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
829 SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
830 }
831
832
833
834
835 private void setOwner(final FileSystem fs, final Path path, final String user,
836 final String group, final boolean recursive) throws IOException {
837 if (user != null || group != null) {
838 if (recursive && fs.isDirectory(path)) {
839 for (FileStatus child : fs.listStatus(path)) {
840 setOwner(fs, child.getPath(), user, group, recursive);
841 }
842 }
843 fs.setOwner(path, user, group);
844 }
845 }
846
847
848
849
850 private void setPermission(final FileSystem fs, final Path path, final short filesMode,
851 final boolean recursive) throws IOException {
852 if (filesMode > 0) {
853 FsPermission perm = new FsPermission(filesMode);
854 if (recursive && fs.isDirectory(path)) {
855 for (FileStatus child : fs.listStatus(path)) {
856 setPermission(fs, child.getPath(), filesMode, recursive);
857 }
858 }
859 fs.setPermission(path, perm);
860 }
861 }
862
863
864
865
866
867 @Override
868 public int run(String[] args) throws IOException {
869 boolean verifyTarget = true;
870 boolean verifyChecksum = true;
871 String snapshotName = null;
872 String targetName = null;
873 boolean overwrite = false;
874 String filesGroup = null;
875 String filesUser = null;
876 Path outputRoot = null;
877 int bandwidthMB = Integer.MAX_VALUE;
878 int filesMode = 0;
879 int mappers = 0;
880
881 Configuration conf = getConf();
882 Path inputRoot = FSUtils.getRootDir(conf);
883
884
885 for (int i = 0; i < args.length; i++) {
886 String cmd = args[i];
887 if (cmd.equals("-snapshot")) {
888 snapshotName = args[++i];
889 } else if (cmd.equals("-target")) {
890 targetName = args[++i];
891 } else if (cmd.equals("-copy-to")) {
892 outputRoot = new Path(args[++i]);
893 } else if (cmd.equals("-copy-from")) {
894 inputRoot = new Path(args[++i]);
895 FSUtils.setRootDir(conf, inputRoot);
896 } else if (cmd.equals("-no-checksum-verify")) {
897 verifyChecksum = false;
898 } else if (cmd.equals("-no-target-verify")) {
899 verifyTarget = false;
900 } else if (cmd.equals("-mappers")) {
901 mappers = Integer.parseInt(args[++i]);
902 } else if (cmd.equals("-chuser")) {
903 filesUser = args[++i];
904 } else if (cmd.equals("-chgroup")) {
905 filesGroup = args[++i];
906 } else if (cmd.equals("-bandwidth")) {
907 bandwidthMB = Integer.parseInt(args[++i]);
908 } else if (cmd.equals("-chmod")) {
909 filesMode = Integer.parseInt(args[++i], 8);
910 } else if (cmd.equals("-overwrite")) {
911 overwrite = true;
912 } else if (cmd.equals("-h") || cmd.equals("--help")) {
913 printUsageAndExit();
914 } else {
915 System.err.println("UNEXPECTED: " + cmd);
916 printUsageAndExit();
917 }
918 }
919
920
921 if (snapshotName == null) {
922 System.err.println("Snapshot name not provided.");
923 printUsageAndExit();
924 }
925
926 if (outputRoot == null) {
927 System.err.println("Destination file-system not provided.");
928 printUsageAndExit();
929 }
930
931 if (targetName == null) {
932 targetName = snapshotName;
933 }
934
935 conf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
936 FileSystem inputFs = FileSystem.get(inputRoot.toUri(), conf);
937 LOG.debug("inputFs=" + inputFs.getUri().toString() + " inputRoot=" + inputRoot);
938 conf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
939 FileSystem outputFs = FileSystem.get(outputRoot.toUri(), conf);
940 LOG.debug("outputFs=" + outputFs.getUri().toString() + " outputRoot=" + outputRoot.toString());
941
942 boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false);
943
944 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
945 Path snapshotTmpDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot);
946 Path outputSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
947 Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
948
949
950 if (outputFs.exists(outputSnapshotDir)) {
951 if (overwrite) {
952 if (!outputFs.delete(outputSnapshotDir, true)) {
953 System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
954 return 1;
955 }
956 } else {
957 System.err.println("The snapshot '" + targetName +
958 "' already exists in the destination: " + outputSnapshotDir);
959 return 1;
960 }
961 }
962
963 if (!skipTmp) {
964
965 if (outputFs.exists(snapshotTmpDir)) {
966 if (overwrite) {
967 if (!outputFs.delete(snapshotTmpDir, true)) {
968 System.err.println("Unable to remove existing snapshot tmp directory: "+snapshotTmpDir);
969 return 1;
970 }
971 } else {
972 System.err.println("A snapshot with the same name '"+ targetName +"' may be in-progress");
973 System.err.println("Please check "+snapshotTmpDir+". If the snapshot has completed, ");
974 System.err.println("consider removing "+snapshotTmpDir+" by using the -overwrite option");
975 return 1;
976 }
977 }
978 }
979
980
981
982
983 try {
984 LOG.info("Copy Snapshot Manifest");
985 FileUtil.copy(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, false, false, conf);
986 if (filesUser != null || filesGroup != null) {
987 setOwner(outputFs, snapshotTmpDir, filesUser, filesGroup, true);
988 }
989 if (filesMode > 0) {
990 setPermission(outputFs, snapshotTmpDir, (short)filesMode, true);
991 }
992 } catch (IOException e) {
993 throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" +
994 snapshotDir + " to=" + initialOutputSnapshotDir, e);
995 }
996
997
998 if (!targetName.equals(snapshotName)) {
999 SnapshotDescription snapshotDesc =
1000 SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir)
1001 .toBuilder()
1002 .setName(targetName)
1003 .build();
1004 SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDesc, snapshotTmpDir, outputFs);
1005 }
1006
1007
1008
1009
1010 try {
1011 runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum,
1012 filesUser, filesGroup, filesMode, mappers, bandwidthMB);
1013
1014 LOG.info("Finalize the Snapshot Export");
1015 if (!skipTmp) {
1016
1017 if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
1018 throw new ExportSnapshotException("Unable to rename snapshot directory from=" +
1019 snapshotTmpDir + " to=" + outputSnapshotDir);
1020 }
1021 }
1022
1023
1024 if (verifyTarget) {
1025 LOG.info("Verify snapshot integrity");
1026 verifySnapshot(conf, outputFs, outputRoot, outputSnapshotDir);
1027 }
1028
1029 LOG.info("Export Completed: " + targetName);
1030 return 0;
1031 } catch (Exception e) {
1032 LOG.error("Snapshot export failed", e);
1033 if (!skipTmp) {
1034 outputFs.delete(snapshotTmpDir, true);
1035 }
1036 outputFs.delete(outputSnapshotDir, true);
1037 return 1;
1038 } finally {
1039 IOUtils.closeStream(inputFs);
1040 IOUtils.closeStream(outputFs);
1041 }
1042 }
1043
1044
1045 private void printUsageAndExit() {
1046 System.err.printf("Usage: bin/hbase %s [options]%n", getClass().getName());
1047 System.err.println(" where [options] are:");
1048 System.err.println(" -h|-help Show this help and exit.");
1049 System.err.println(" -snapshot NAME Snapshot to restore.");
1050 System.err.println(" -copy-to NAME Remote destination hdfs://");
1051 System.err.println(" -copy-from NAME Input folder hdfs:// (default hbase.rootdir)");
1052 System.err.println(" -no-checksum-verify Do not verify checksum, use name+length only.");
1053 System.err.println(" -no-target-verify Do not verify the integrity of the \\" +
1054 "exported snapshot.");
1055 System.err.println(" -overwrite Rewrite the snapshot manifest if already exists");
1056 System.err.println(" -chuser USERNAME Change the owner of the files " +
1057 "to the specified one.");
1058 System.err.println(" -chgroup GROUP Change the group of the files to " +
1059 "the specified one.");
1060 System.err.println(" -chmod MODE Change the permission of the files " +
1061 "to the specified one.");
1062 System.err.println(" -mappers Number of mappers to use during the " +
1063 "copy (mapreduce.job.maps).");
1064 System.err.println(" -bandwidth Limit bandwidth to this value in MB/second.");
1065 System.err.println();
1066 System.err.println("Examples:");
1067 System.err.println(" hbase " + getClass().getName() + " \\");
1068 System.err.println(" -snapshot MySnapshot -copy-to hdfs://srv2:8082/hbase \\");
1069 System.err.println(" -chuser MyUser -chgroup MyGroup -chmod 700 -mappers 16");
1070 System.err.println();
1071 System.err.println(" hbase " + getClass().getName() + " \\");
1072 System.err.println(" -snapshot MySnapshot -copy-from hdfs://srv2:8082/hbase \\");
1073 System.err.println(" -copy-to hdfs://srv1:50070/hbase \\");
1074 System.exit(1);
1075 }
1076
1077
1078
1079
1080
1081
1082
1083
1084 static int innerMain(final Configuration conf, final String [] args) throws Exception {
1085 return ToolRunner.run(conf, new ExportSnapshot(), args);
1086 }
1087
1088 public static void main(String[] args) throws Exception {
1089 System.exit(innerMain(HBaseConfiguration.create(), args));
1090 }
1091 }