View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.snapshot;
20  
21  import java.io.BufferedInputStream;
22  import java.io.FileNotFoundException;
23  import java.io.DataInput;
24  import java.io.DataOutput;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.util.ArrayList;
28  import java.util.Collections;
29  import java.util.Comparator;
30  import java.util.LinkedList;
31  import java.util.List;
32  import java.util.Random;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.hbase.classification.InterfaceAudience;
37  import org.apache.hadoop.hbase.classification.InterfaceStability;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.conf.Configured;
40  import org.apache.hadoop.fs.FSDataInputStream;
41  import org.apache.hadoop.fs.FSDataOutputStream;
42  import org.apache.hadoop.fs.FileChecksum;
43  import org.apache.hadoop.fs.FileStatus;
44  import org.apache.hadoop.fs.FileSystem;
45  import org.apache.hadoop.fs.FileUtil;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.fs.permission.FsPermission;
48  import org.apache.hadoop.hbase.TableName;
49  import org.apache.hadoop.hbase.HBaseConfiguration;
50  import org.apache.hadoop.hbase.HConstants;
51  import org.apache.hadoop.hbase.HRegionInfo;
52  import org.apache.hadoop.hbase.io.FileLink;
53  import org.apache.hadoop.hbase.io.HFileLink;
54  import org.apache.hadoop.hbase.io.hfile.HFile;
55  import org.apache.hadoop.hbase.mapreduce.JobUtil;
56  import org.apache.hadoop.hbase.io.WALLink;
57  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
58  import org.apache.hadoop.hbase.mob.MobUtils;
59  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
60  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
61  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
62  import org.apache.hadoop.hbase.util.FSUtils;
63  import org.apache.hadoop.hbase.util.HFileArchiveUtil;
64  import org.apache.hadoop.hbase.util.Pair;
65  import org.apache.hadoop.io.BytesWritable;
66  import org.apache.hadoop.io.IOUtils;
67  import org.apache.hadoop.io.NullWritable;
68  import org.apache.hadoop.io.Writable;
69  import org.apache.hadoop.mapreduce.Job;
70  import org.apache.hadoop.mapreduce.JobContext;
71  import org.apache.hadoop.mapreduce.Mapper;
72  import org.apache.hadoop.mapreduce.InputFormat;
73  import org.apache.hadoop.mapreduce.InputSplit;
74  import org.apache.hadoop.mapreduce.RecordReader;
75  import org.apache.hadoop.mapreduce.TaskAttemptContext;
76  import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
77  import org.apache.hadoop.mapreduce.security.TokenCache;
78  import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
79  import org.apache.hadoop.util.StringUtils;
80  import org.apache.hadoop.util.Tool;
81  import org.apache.hadoop.util.ToolRunner;
82  
83  /**
84   * Export the specified snapshot to a given FileSystem.
85   *
86   * The .snapshot/name folder is copied to the destination cluster
87   * and then all the hfiles/wals are copied using a Map-Reduce Job in the .archive/ location.
88   * When everything is done, the second cluster can restore the snapshot.
89   */
90  @InterfaceAudience.Public
91  @InterfaceStability.Evolving
92  public class ExportSnapshot extends Configured implements Tool {
93    public static final String NAME = "exportsnapshot";
94  
95    private static final Log LOG = LogFactory.getLog(ExportSnapshot.class);
96  
97    private static final String MR_NUM_MAPS = "mapreduce.job.maps";
98    private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
99    private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
100   private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
101   private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
102   private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
103   private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
104   private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
105   private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
106   private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
107   private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
108   private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
109   private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
110   protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
111 
112   static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
113   static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry";
114 
115   private static final String INPUT_FOLDER_PREFIX = "export-files.";
116 
117   // Export Map-Reduce Counters, to keep track of the progress
118   public enum Counter {
119     MISSING_FILES, FILES_COPIED, FILES_SKIPPED, COPY_FAILED,
120     BYTES_EXPECTED, BYTES_SKIPPED, BYTES_COPIED
121   }
122 
123   private static class ExportMapper extends Mapper<BytesWritable, NullWritable,
124                                                    NullWritable, NullWritable> {
125     final static int REPORT_SIZE = 1 * 1024 * 1024;
126     final static int BUFFER_SIZE = 64 * 1024;
127 
128     private boolean testFailures;
129     private Random random;
130 
131     private boolean verifyChecksum;
132     private String filesGroup;
133     private String filesUser;
134     private short filesMode;
135     private int bufferSize;
136 
137     private FileSystem outputFs;
138     private Path outputArchive;
139     private Path outputRoot;
140 
141     private FileSystem inputFs;
142     private Path inputArchive;
143     private Path inputRoot;
144 
145     @Override
146     public void setup(Context context) throws IOException {
147       Configuration conf = context.getConfiguration();
148       verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
149 
150       filesGroup = conf.get(CONF_FILES_GROUP);
151       filesUser = conf.get(CONF_FILES_USER);
152       filesMode = (short)conf.getInt(CONF_FILES_MODE, 0);
153       outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
154       inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
155 
156       inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
157       outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
158 
159       testFailures = conf.getBoolean(CONF_TEST_FAILURE, false);
160 
161       try {
162         conf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
163         inputFs = FileSystem.get(inputRoot.toUri(), conf);
164       } catch (IOException e) {
165         throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
166       }
167 
168       try {
169         conf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
170         outputFs = FileSystem.get(outputRoot.toUri(), conf);
171       } catch (IOException e) {
172         throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e);
173       }
174 
175       // Use the default block size of the outputFs if bigger
176       int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
177       bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
178       LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
179 
180       for (Counter c : Counter.values()) {
181         context.getCounter(c).increment(0);
182       }
183     }
184 
185     @Override
186     protected void cleanup(Context context) {
187       IOUtils.closeStream(inputFs);
188       IOUtils.closeStream(outputFs);
189     }
190 
191     @Override
192     public void map(BytesWritable key, NullWritable value, Context context)
193         throws InterruptedException, IOException {
194       SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
195       Path outputPath = getOutputPath(inputInfo);
196 
197       copyFile(context, inputInfo, outputPath);
198     }
199 
200     /**
201      * Returns the location where the inputPath will be copied.
202      */
203     private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
204       Path path = null;
205       switch (inputInfo.getType()) {
206         case HFILE:
207           Path inputPath = new Path(inputInfo.getHfile());
208           String family = inputPath.getParent().getName();
209           TableName table =HFileLink.getReferencedTableName(inputPath.getName());
210           String region = HFileLink.getReferencedRegionName(inputPath.getName());
211           String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
212           path = new Path(FSUtils.getTableDir(new Path("./"), table),
213               new Path(region, new Path(family, hfile)));
214           break;
215         case WAL:
216           Path oldLogsDir = new Path(outputRoot, HConstants.HREGION_OLDLOGDIR_NAME);
217           path = new Path(oldLogsDir, inputInfo.getWalName());
218           break;
219         default:
220           throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
221       }
222       return new Path(outputArchive, path);
223     }
224 
225     /*
226      * Used by TestExportSnapshot to simulate a failure
227      */
228     private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
229         throws IOException {
230       if (testFailures) {
231         if (context.getConfiguration().getBoolean(CONF_TEST_RETRY, false)) {
232           if (random == null) {
233             random = new Random();
234           }
235 
236           // FLAKY-TEST-WARN: lower is better, we can get some runs without the
237           // retry, but at least we reduce the number of test failures due to
238           // this test exception from the same map task.
239           if (random.nextFloat() < 0.03) {
240             throw new IOException("TEST RETRY FAILURE: Unable to copy input=" + inputInfo
241                                   + " time=" + System.currentTimeMillis());
242           }
243         } else {
244           context.getCounter(Counter.COPY_FAILED).increment(1);
245           throw new IOException("TEST FAILURE: Unable to copy input=" + inputInfo);
246         }
247       }
248     }
249 
250     private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
251         final Path outputPath) throws IOException {
252       injectTestFailure(context, inputInfo);
253 
254       // Get the file information
255       FileStatus inputStat = getSourceFileStatus(context, inputInfo);
256 
257       // Verify if the output file exists and is the same that we want to copy
258       if (outputFs.exists(outputPath)) {
259         FileStatus outputStat = outputFs.getFileStatus(outputPath);
260         if (outputStat != null && sameFile(inputStat, outputStat)) {
261           LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
262           context.getCounter(Counter.FILES_SKIPPED).increment(1);
263           context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
264           return;
265         }
266       }
267 
268       InputStream in = openSourceFile(context, inputInfo);
269       int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
270       if (Integer.MAX_VALUE != bandwidthMB) {
271         in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024);
272       }
273 
274       try {
275         context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
276 
277         // Ensure that the output folder is there and copy the file
278         createOutputPath(outputPath.getParent());
279         FSDataOutputStream out = outputFs.create(outputPath, true);
280         try {
281           copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen());
282         } finally {
283           out.close();
284         }
285 
286         // Try to Preserve attributes
287         if (!preserveAttributes(outputPath, inputStat)) {
288           LOG.warn("You may have to run manually chown on: " + outputPath);
289         }
290       } finally {
291         in.close();
292       }
293     }
294 
295     /**
296      * Create the output folder and optionally set ownership.
297      */
298     private void createOutputPath(final Path path) throws IOException {
299       if (filesUser == null && filesGroup == null) {
300         outputFs.mkdirs(path);
301       } else {
302         Path parent = path.getParent();
303         if (!outputFs.exists(parent) && !parent.isRoot()) {
304           createOutputPath(parent);
305         }
306         outputFs.mkdirs(path);
307         if (filesUser != null || filesGroup != null) {
308           // override the owner when non-null user/group is specified
309           outputFs.setOwner(path, filesUser, filesGroup);
310         }
311         if (filesMode > 0) {
312           outputFs.setPermission(path, new FsPermission(filesMode));
313         }
314       }
315     }
316 
317     /**
318      * Try to Preserve the files attribute selected by the user copying them from the source file
319      * This is only required when you are exporting as a different user than "hbase" or on a system
320      * that doesn't have the "hbase" user.
321      *
322      * This is not considered a blocking failure since the user can force a chmod with the user
323      * that knows is available on the system.
324      */
325     private boolean preserveAttributes(final Path path, final FileStatus refStat) {
326       FileStatus stat;
327       try {
328         stat = outputFs.getFileStatus(path);
329       } catch (IOException e) {
330         LOG.warn("Unable to get the status for file=" + path);
331         return false;
332       }
333 
334       try {
335         if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
336           outputFs.setPermission(path, new FsPermission(filesMode));
337         } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
338           outputFs.setPermission(path, refStat.getPermission());
339         }
340       } catch (IOException e) {
341         LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage());
342         return false;
343       }
344 
345       boolean hasRefStat = (refStat != null);
346       String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
347       String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
348       if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
349         try {
350           if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
351             outputFs.setOwner(path, user, group);
352           }
353         } catch (IOException e) {
354           LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage());
355           LOG.warn("The user/group may not exist on the destination cluster: user=" +
356                    user + " group=" + group);
357           return false;
358         }
359       }
360 
361       return true;
362     }
363 
364     private boolean stringIsNotEmpty(final String str) {
365       return str != null && str.length() > 0;
366     }
367 
368     private void copyData(final Context context,
369         final Path inputPath, final InputStream in,
370         final Path outputPath, final FSDataOutputStream out,
371         final long inputFileSize)
372         throws IOException {
373       final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) +
374                                    " (%.1f%%)";
375 
376       try {
377         byte[] buffer = new byte[bufferSize];
378         long totalBytesWritten = 0;
379         int reportBytes = 0;
380         int bytesRead;
381 
382         long stime = System.currentTimeMillis();
383         while ((bytesRead = in.read(buffer)) > 0) {
384           out.write(buffer, 0, bytesRead);
385           totalBytesWritten += bytesRead;
386           reportBytes += bytesRead;
387 
388           if (reportBytes >= REPORT_SIZE) {
389             context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
390             context.setStatus(String.format(statusMessage,
391                               StringUtils.humanReadableInt(totalBytesWritten),
392                               (totalBytesWritten/(float)inputFileSize) * 100.0f) +
393                               " from " + inputPath + " to " + outputPath);
394             reportBytes = 0;
395           }
396         }
397         long etime = System.currentTimeMillis();
398 
399         context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
400         context.setStatus(String.format(statusMessage,
401                           StringUtils.humanReadableInt(totalBytesWritten),
402                           (totalBytesWritten/(float)inputFileSize) * 100.0f) +
403                           " from " + inputPath + " to " + outputPath);
404 
405         // Verify that the written size match
406         if (totalBytesWritten != inputFileSize) {
407           String msg = "number of bytes copied not matching copied=" + totalBytesWritten +
408                        " expected=" + inputFileSize + " for file=" + inputPath;
409           throw new IOException(msg);
410         }
411 
412         LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
413         LOG.info("size=" + totalBytesWritten +
414             " (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" +
415             " time=" + StringUtils.formatTimeDiff(etime, stime) +
416             String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0));
417         context.getCounter(Counter.FILES_COPIED).increment(1);
418       } catch (IOException e) {
419         LOG.error("Error copying " + inputPath + " to " + outputPath, e);
420         context.getCounter(Counter.COPY_FAILED).increment(1);
421         throw e;
422       }
423     }
424 
425     /**
426      * Try to open the "source" file.
427      * Throws an IOException if the communication with the inputFs fail or
428      * if the file is not found.
429      */
430     private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
431             throws IOException {
432       try {
433         Configuration conf = context.getConfiguration();
434         FileLink link = null;
435         switch (fileInfo.getType()) {
436           case HFILE:
437             Path inputPath = new Path(fileInfo.getHfile());
438             link = getFileLink(inputPath, conf);
439             break;
440           case WAL:
441             String serverName = fileInfo.getWalServer();
442             String logName = fileInfo.getWalName();
443             link = new WALLink(inputRoot, serverName, logName);
444             break;
445           default:
446             throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
447         }
448         return link.open(inputFs);
449       } catch (IOException e) {
450         context.getCounter(Counter.MISSING_FILES).increment(1);
451         LOG.error("Unable to open source file=" + fileInfo.toString(), e);
452         throw e;
453       }
454     }
455 
456     private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
457         throws IOException {
458       try {
459         Configuration conf = context.getConfiguration();
460         FileLink link = null;
461         switch (fileInfo.getType()) {
462           case HFILE:
463             Path inputPath = new Path(fileInfo.getHfile());
464             link = getFileLink(inputPath, conf);
465             break;
466           case WAL:
467             link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
468             break;
469           default:
470             throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
471         }
472         return link.getFileStatus(inputFs);
473       } catch (FileNotFoundException e) {
474         context.getCounter(Counter.MISSING_FILES).increment(1);
475         LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
476         throw e;
477       } catch (IOException e) {
478         LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
479         throw e;
480       }
481     }
482 
483     private FileLink getFileLink(Path path, Configuration conf) throws IOException{
484       String regionName = HFileLink.getReferencedRegionName(path.getName());
485       TableName tableName = HFileLink.getReferencedTableName(path.getName());
486       if(MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) {
487         return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf),
488                 HFileArchiveUtil.getArchivePath(conf), path);
489       }
490       return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path);
491     }
492 
493     private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
494       try {
495         return fs.getFileChecksum(path);
496       } catch (IOException e) {
497         LOG.warn("Unable to get checksum for file=" + path, e);
498         return null;
499       }
500     }
501 
502     /**
503      * Check if the two files are equal by looking at the file length,
504      * and at the checksum (if user has specified the verifyChecksum flag).
505      */
506     private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
507       // Not matching length
508       if (inputStat.getLen() != outputStat.getLen()) return false;
509 
510       // Mark files as equals, since user asked for no checksum verification
511       if (!verifyChecksum) return true;
512 
513       // If checksums are not available, files are not the same.
514       FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
515       if (inChecksum == null) return false;
516 
517       FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
518       if (outChecksum == null) return false;
519 
520       return inChecksum.equals(outChecksum);
521     }
522   }
523 
524   // ==========================================================================
525   //  Input Format
526   // ==========================================================================
527 
528   /**
529    * Extract the list of files (HFiles/WALs) to copy using Map-Reduce.
530    * @return list of files referenced by the snapshot (pair of path and size)
531    */
532   private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
533       final FileSystem fs, final Path snapshotDir) throws IOException {
534     SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
535 
536     final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<Pair<SnapshotFileInfo, Long>>();
537     final TableName table = TableName.valueOf(snapshotDesc.getTable());
538 
539     // Get snapshot files
540     LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
541     SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
542       new SnapshotReferenceUtil.SnapshotVisitor() {
543         @Override
544         public void storeFile(final HRegionInfo regionInfo, final String family,
545             final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
546           if (storeFile.hasReference()) {
547             // copied as part of the manifest
548           } else {
549             String region = regionInfo.getEncodedName();
550             String hfile = storeFile.getName();
551             Path path = HFileLink.createPath(table, region, family, hfile);
552 
553             SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
554               .setType(SnapshotFileInfo.Type.HFILE)
555               .setHfile(path.toString())
556               .build();
557 
558             long size;
559             if (storeFile.hasFileSize()) {
560               size = storeFile.getFileSize();
561             } else {
562               size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen();
563             }
564             files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
565           }
566         }
567 
568         @Override
569         public void logFile (final String server, final String logfile)
570             throws IOException {
571           SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
572             .setType(SnapshotFileInfo.Type.WAL)
573             .setWalServer(server)
574             .setWalName(logfile)
575             .build();
576 
577           long size = new WALLink(conf, server, logfile).getFileStatus(fs).getLen();
578           files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
579         }
580     });
581 
582     return files;
583   }
584 
585   /**
586    * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
587    * The groups created will have similar amounts of bytes.
588    * <p>
589    * The algorithm used is pretty straightforward; the file list is sorted by size,
590    * and then each group fetch the bigger file available, iterating through groups
591    * alternating the direction.
592    */
593   static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits(
594       final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
595     // Sort files by size, from small to big
596     Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
597       public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
598         long r = a.getSecond() - b.getSecond();
599         return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
600       }
601     });
602 
603     // create balanced groups
604     List<List<Pair<SnapshotFileInfo, Long>>> fileGroups =
605       new LinkedList<List<Pair<SnapshotFileInfo, Long>>>();
606     long[] sizeGroups = new long[ngroups];
607     int hi = files.size() - 1;
608     int lo = 0;
609 
610     List<Pair<SnapshotFileInfo, Long>> group;
611     int dir = 1;
612     int g = 0;
613 
614     while (hi >= lo) {
615       if (g == fileGroups.size()) {
616         group = new LinkedList<Pair<SnapshotFileInfo, Long>>();
617         fileGroups.add(group);
618       } else {
619         group = fileGroups.get(g);
620       }
621 
622       Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
623 
624       // add the hi one
625       sizeGroups[g] += fileInfo.getSecond();
626       group.add(fileInfo);
627 
628       // change direction when at the end or the beginning
629       g += dir;
630       if (g == ngroups) {
631         dir = -1;
632         g = ngroups - 1;
633       } else if (g < 0) {
634         dir = 1;
635         g = 0;
636       }
637     }
638 
639     if (LOG.isDebugEnabled()) {
640       for (int i = 0; i < sizeGroups.length; ++i) {
641         LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
642       }
643     }
644 
645     return fileGroups;
646   }
647 
648   private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
649     @Override
650     public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
651         TaskAttemptContext tac) throws IOException, InterruptedException {
652       return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit)split).getSplitKeys());
653     }
654 
655     @Override
656     public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
657       Configuration conf = context.getConfiguration();
658       String snapshotName = conf.get(CONF_SNAPSHOT_NAME);
659       Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
660       FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
661 
662       List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
663       int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
664       if (mappers == 0 && snapshotFiles.size() > 0) {
665         mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
666         mappers = Math.min(mappers, snapshotFiles.size());
667         conf.setInt(CONF_NUM_SPLITS, mappers);
668         conf.setInt(MR_NUM_MAPS, mappers);
669       }
670 
671       List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
672       List<InputSplit> splits = new ArrayList(groups.size());
673       for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
674         splits.add(new ExportSnapshotInputSplit(files));
675       }
676       return splits;
677     }
678 
679     private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
680       private List<Pair<BytesWritable, Long>> files;
681       private long length;
682 
683       public ExportSnapshotInputSplit() {
684         this.files = null;
685       }
686 
687       public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
688         this.files = new ArrayList(snapshotFiles.size());
689         for (Pair<SnapshotFileInfo, Long> fileInfo: snapshotFiles) {
690           this.files.add(new Pair<BytesWritable, Long>(
691             new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
692           this.length += fileInfo.getSecond();
693         }
694       }
695 
696       private List<Pair<BytesWritable, Long>> getSplitKeys() {
697         return files;
698       }
699 
700       @Override
701       public long getLength() throws IOException, InterruptedException {
702         return length;
703       }
704 
705       @Override
706       public String[] getLocations() throws IOException, InterruptedException {
707         return new String[] {};
708       }
709 
710       @Override
711       public void readFields(DataInput in) throws IOException {
712         int count = in.readInt();
713         files = new ArrayList<Pair<BytesWritable, Long>>(count);
714         length = 0;
715         for (int i = 0; i < count; ++i) {
716           BytesWritable fileInfo = new BytesWritable();
717           fileInfo.readFields(in);
718           long size = in.readLong();
719           files.add(new Pair<BytesWritable, Long>(fileInfo, size));
720           length += size;
721         }
722       }
723 
724       @Override
725       public void write(DataOutput out) throws IOException {
726         out.writeInt(files.size());
727         for (final Pair<BytesWritable, Long> fileInfo: files) {
728           fileInfo.getFirst().write(out);
729           out.writeLong(fileInfo.getSecond());
730         }
731       }
732     }
733 
734     private static class ExportSnapshotRecordReader
735         extends RecordReader<BytesWritable, NullWritable> {
736       private final List<Pair<BytesWritable, Long>> files;
737       private long totalSize = 0;
738       private long procSize = 0;
739       private int index = -1;
740 
741       ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
742         this.files = files;
743         for (Pair<BytesWritable, Long> fileInfo: files) {
744           totalSize += fileInfo.getSecond();
745         }
746       }
747 
748       @Override
749       public void close() { }
750 
751       @Override
752       public BytesWritable getCurrentKey() { return files.get(index).getFirst(); }
753 
754       @Override
755       public NullWritable getCurrentValue() { return NullWritable.get(); }
756 
757       @Override
758       public float getProgress() { return (float)procSize / totalSize; }
759 
760       @Override
761       public void initialize(InputSplit split, TaskAttemptContext tac) { }
762 
763       @Override
764       public boolean nextKeyValue() {
765         if (index >= 0) {
766           procSize += files.get(index).getSecond();
767         }
768         return(++index < files.size());
769       }
770     }
771   }
772 
773   // ==========================================================================
774   //  Tool
775   // ==========================================================================
776 
777   /**
778    * Run Map-Reduce Job to perform the files copy.
779    */
780   private void runCopyJob(final Path inputRoot, final Path outputRoot,
781       final String snapshotName, final Path snapshotDir, final boolean verifyChecksum,
782       final String filesUser, final String filesGroup, final int filesMode,
783       final int mappers, final int bandwidthMB)
784           throws IOException, InterruptedException, ClassNotFoundException {
785     Configuration conf = getConf();
786     if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
787     if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
788     if (mappers > 0) {
789       conf.setInt(CONF_NUM_SPLITS, mappers);
790       conf.setInt(MR_NUM_MAPS, mappers);
791     }
792     conf.setInt(CONF_FILES_MODE, filesMode);
793     conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
794     conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
795     conf.set(CONF_INPUT_ROOT, inputRoot.toString());
796     conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
797     conf.set(CONF_SNAPSHOT_NAME, snapshotName);
798     conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
799 
800     Job job = new Job(conf);
801     job.setJobName("ExportSnapshot-" + snapshotName);
802     job.setJarByClass(ExportSnapshot.class);
803     TableMapReduceUtil.addDependencyJars(job);
804     job.setMapperClass(ExportMapper.class);
805     job.setInputFormatClass(ExportSnapshotInputFormat.class);
806     job.setOutputFormatClass(NullOutputFormat.class);
807     job.setMapSpeculativeExecution(false);
808     job.setNumReduceTasks(0);
809 
810     // Acquire the delegation Tokens
811     TokenCache.obtainTokensForNamenodes(job.getCredentials(),
812       new Path[] { inputRoot, outputRoot }, conf);
813 
814     // Run the MR Job
815     if (!job.waitForCompletion(true)) {
816       // TODO: Replace the fixed string with job.getStatus().getFailureInfo()
817       // when it will be available on all the supported versions.
818       throw new ExportSnapshotException("Copy Files Map-Reduce Job failed");
819     }
820   }
821 
822   private void verifySnapshot(final Configuration baseConf,
823       final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException {
824     // Update the conf with the current root dir, since may be a different cluster
825     Configuration conf = new Configuration(baseConf);
826     FSUtils.setRootDir(conf, rootDir);
827     FSUtils.setFsDefault(conf, FSUtils.getRootDir(conf));
828     SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
829     SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
830   }
831 
832   /**
833    * Set path ownership.
834    */
835   private void setOwner(final FileSystem fs, final Path path, final String user,
836       final String group, final boolean recursive) throws IOException {
837     if (user != null || group != null) {
838       if (recursive && fs.isDirectory(path)) {
839         for (FileStatus child : fs.listStatus(path)) {
840           setOwner(fs, child.getPath(), user, group, recursive);
841         }
842       }
843       fs.setOwner(path, user, group);
844     }
845   }
846 
847   /**
848    * Set path permission.
849    */
850   private void setPermission(final FileSystem fs, final Path path, final short filesMode,
851       final boolean recursive) throws IOException {
852     if (filesMode > 0) {
853       FsPermission perm = new FsPermission(filesMode);
854       if (recursive && fs.isDirectory(path)) {
855         for (FileStatus child : fs.listStatus(path)) {
856           setPermission(fs, child.getPath(), filesMode, recursive);
857         }
858       }
859       fs.setPermission(path, perm);
860     }
861   }
862 
863   /**
864    * Execute the export snapshot by copying the snapshot metadata, hfiles and wals.
865    * @return 0 on success, and != 0 upon failure.
866    */
867   @Override
868   public int run(String[] args) throws IOException {
869     boolean verifyTarget = true;
870     boolean verifyChecksum = true;
871     String snapshotName = null;
872     String targetName = null;
873     boolean overwrite = false;
874     String filesGroup = null;
875     String filesUser = null;
876     Path outputRoot = null;
877     int bandwidthMB = Integer.MAX_VALUE;
878     int filesMode = 0;
879     int mappers = 0;
880 
881     Configuration conf = getConf();
882     Path inputRoot = FSUtils.getRootDir(conf);
883 
884     // Process command line args
885     for (int i = 0; i < args.length; i++) {
886       String cmd = args[i];
887       if (cmd.equals("-snapshot")) {
888         snapshotName = args[++i];
889       } else if (cmd.equals("-target")) {
890         targetName = args[++i];
891       } else if (cmd.equals("-copy-to")) {
892         outputRoot = new Path(args[++i]);
893       } else if (cmd.equals("-copy-from")) {
894         inputRoot = new Path(args[++i]);
895         FSUtils.setRootDir(conf, inputRoot);
896       } else if (cmd.equals("-no-checksum-verify")) {
897         verifyChecksum = false;
898       } else if (cmd.equals("-no-target-verify")) {
899         verifyTarget = false;
900       } else if (cmd.equals("-mappers")) {
901         mappers = Integer.parseInt(args[++i]);
902       } else if (cmd.equals("-chuser")) {
903         filesUser = args[++i];
904       } else if (cmd.equals("-chgroup")) {
905         filesGroup = args[++i];
906       } else if (cmd.equals("-bandwidth")) {
907         bandwidthMB = Integer.parseInt(args[++i]);
908       } else if (cmd.equals("-chmod")) {
909         filesMode = Integer.parseInt(args[++i], 8);
910       } else if (cmd.equals("-overwrite")) {
911         overwrite = true;
912       } else if (cmd.equals("-h") || cmd.equals("--help")) {
913         printUsageAndExit();
914       } else {
915         System.err.println("UNEXPECTED: " + cmd);
916         printUsageAndExit();
917       }
918     }
919 
920     // Check user options
921     if (snapshotName == null) {
922       System.err.println("Snapshot name not provided.");
923       printUsageAndExit();
924     }
925 
926     if (outputRoot == null) {
927       System.err.println("Destination file-system not provided.");
928       printUsageAndExit();
929     }
930 
931     if (targetName == null) {
932       targetName = snapshotName;
933     }
934 
935     conf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
936     FileSystem inputFs = FileSystem.get(inputRoot.toUri(), conf);
937     LOG.debug("inputFs=" + inputFs.getUri().toString() + " inputRoot=" + inputRoot);
938     conf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
939     FileSystem outputFs = FileSystem.get(outputRoot.toUri(), conf);
940     LOG.debug("outputFs=" + outputFs.getUri().toString() + " outputRoot=" + outputRoot.toString());
941 
942     boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false);
943 
944     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
945     Path snapshotTmpDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot);
946     Path outputSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
947     Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
948 
949     // Check if the snapshot already exists
950     if (outputFs.exists(outputSnapshotDir)) {
951       if (overwrite) {
952         if (!outputFs.delete(outputSnapshotDir, true)) {
953           System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
954           return 1;
955         }
956       } else {
957         System.err.println("The snapshot '" + targetName +
958           "' already exists in the destination: " + outputSnapshotDir);
959         return 1;
960       }
961     }
962 
963     if (!skipTmp) {
964       // Check if the snapshot already in-progress
965       if (outputFs.exists(snapshotTmpDir)) {
966         if (overwrite) {
967           if (!outputFs.delete(snapshotTmpDir, true)) {
968             System.err.println("Unable to remove existing snapshot tmp directory: "+snapshotTmpDir);
969             return 1;
970           }
971         } else {
972           System.err.println("A snapshot with the same name '"+ targetName +"' may be in-progress");
973           System.err.println("Please check "+snapshotTmpDir+". If the snapshot has completed, ");
974           System.err.println("consider removing "+snapshotTmpDir+" by using the -overwrite option");
975           return 1;
976         }
977       }
978     }
979 
980     // Step 1 - Copy fs1:/.snapshot/<snapshot> to  fs2:/.snapshot/.tmp/<snapshot>
981     // The snapshot references must be copied before the hfiles otherwise the cleaner
982     // will remove them because they are unreferenced.
983     try {
984       LOG.info("Copy Snapshot Manifest");
985       FileUtil.copy(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, false, false, conf);
986       if (filesUser != null || filesGroup != null) {
987         setOwner(outputFs, snapshotTmpDir, filesUser, filesGroup, true);
988       }
989       if (filesMode > 0) {
990         setPermission(outputFs, snapshotTmpDir, (short)filesMode, true);
991       }
992     } catch (IOException e) {
993       throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" +
994         snapshotDir + " to=" + initialOutputSnapshotDir, e);
995     }
996 
997     // Write a new .snapshotinfo if the target name is different from the source name
998     if (!targetName.equals(snapshotName)) {
999       SnapshotDescription snapshotDesc =
1000         SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir)
1001           .toBuilder()
1002           .setName(targetName)
1003           .build();
1004       SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDesc, snapshotTmpDir, outputFs);
1005     }
1006 
1007     // Step 2 - Start MR Job to copy files
1008     // The snapshot references must be copied before the files otherwise the files gets removed
1009     // by the HFileArchiver, since they have no references.
1010     try {
1011       runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum,
1012                  filesUser, filesGroup, filesMode, mappers, bandwidthMB);
1013 
1014       LOG.info("Finalize the Snapshot Export");
1015       if (!skipTmp) {
1016         // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
1017         if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
1018           throw new ExportSnapshotException("Unable to rename snapshot directory from=" +
1019             snapshotTmpDir + " to=" + outputSnapshotDir);
1020         }
1021       }
1022 
1023       // Step 4 - Verify snapshot integrity
1024       if (verifyTarget) {
1025         LOG.info("Verify snapshot integrity");
1026         verifySnapshot(conf, outputFs, outputRoot, outputSnapshotDir);
1027       }
1028 
1029       LOG.info("Export Completed: " + targetName);
1030       return 0;
1031     } catch (Exception e) {
1032       LOG.error("Snapshot export failed", e);
1033       if (!skipTmp) {
1034         outputFs.delete(snapshotTmpDir, true);
1035       }
1036       outputFs.delete(outputSnapshotDir, true);
1037       return 1;
1038     } finally {
1039       IOUtils.closeStream(inputFs);
1040       IOUtils.closeStream(outputFs);
1041     }
1042   }
1043 
1044   // ExportSnapshot
1045   private void printUsageAndExit() {
1046     System.err.printf("Usage: bin/hbase %s [options]%n", getClass().getName());
1047     System.err.println(" where [options] are:");
1048     System.err.println("  -h|-help                Show this help and exit.");
1049     System.err.println("  -snapshot NAME          Snapshot to restore.");
1050     System.err.println("  -copy-to NAME           Remote destination hdfs://");
1051     System.err.println("  -copy-from NAME         Input folder hdfs:// (default hbase.rootdir)");
1052     System.err.println("  -no-checksum-verify     Do not verify checksum, use name+length only.");
1053     System.err.println("  -no-target-verify       Do not verify the integrity of the \\" +
1054         "exported snapshot.");
1055     System.err.println("  -overwrite              Rewrite the snapshot manifest if already exists");
1056     System.err.println("  -chuser USERNAME        Change the owner of the files " +
1057         "to the specified one.");
1058     System.err.println("  -chgroup GROUP          Change the group of the files to " +
1059         "the specified one.");
1060     System.err.println("  -chmod MODE             Change the permission of the files " +
1061         "to the specified one.");
1062     System.err.println("  -mappers                Number of mappers to use during the " +
1063         "copy (mapreduce.job.maps).");
1064     System.err.println("  -bandwidth              Limit bandwidth to this value in MB/second.");
1065     System.err.println();
1066     System.err.println("Examples:");
1067     System.err.println("  hbase " + getClass().getName() + " \\");
1068     System.err.println("    -snapshot MySnapshot -copy-to hdfs://srv2:8082/hbase \\");
1069     System.err.println("    -chuser MyUser -chgroup MyGroup -chmod 700 -mappers 16");
1070     System.err.println();
1071     System.err.println("  hbase " + getClass().getName() + " \\");
1072     System.err.println("    -snapshot MySnapshot -copy-from hdfs://srv2:8082/hbase \\");
1073     System.err.println("    -copy-to hdfs://srv1:50070/hbase \\");
1074     System.exit(1);
1075   }
1076 
1077   /**
1078    * The guts of the {@link #main} method.
1079    * Call this method to avoid the {@link #main(String[])} System.exit.
1080    * @param args
1081    * @return errCode
1082    * @throws Exception
1083    */
1084   static int innerMain(final Configuration conf, final String [] args) throws Exception {
1085     return ToolRunner.run(conf, new ExportSnapshot(), args);
1086   }
1087 
1088   public static void main(String[] args) throws Exception {
1089     System.exit(innerMain(HBaseConfiguration.create(), args));
1090   }
1091 }