1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.backup.mapreduce;
19
20 import java.io.IOException;
21
22 import org.apache.commons.lang.StringUtils;
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.fs.Path;
27 import org.apache.hadoop.hbase.HConstants;
28 import org.apache.hadoop.hbase.TableName;
29 import org.apache.hadoop.hbase.backup.IncrementalRestoreService;
30 import org.apache.hadoop.hbase.backup.util.BackupServerUtil;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.hbase.classification.InterfaceStability;
33 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
34
35 @InterfaceAudience.Private
36 @InterfaceStability.Evolving
37 public class MapReduceRestoreService implements IncrementalRestoreService {
38 public static final Log LOG = LogFactory.getLog(MapReduceRestoreService.class);
39
40 private Configuration conf;
41
42 public MapReduceRestoreService() {
43 }
44
45 @Override
46 public void run(Path[] fileDirPaths, TableName[] tableNames, TableName[] newTableNames)
47 throws IOException {
48
49
50
51 String logDirs = StringUtils.join(fileDirPaths, ",");
52 LOG.info("Restore incremental backup from directory " + logDirs + " from hbase tables "
53 + BackupServerUtil.join(tableNames) + " to tables " + BackupServerUtil.join(newTableNames));
54
55 for (int i = 0; i < tableNames.length; i++) {
56
57 LOG.info("Restore " + tableNames[i] + " into " + newTableNames[i]);
58
59 int result = 0;
60 int loaderResult = 0;
61 try {
62
63 for (Path bulkOutputPath : fileDirPaths) {
64 LoadIncrementalHFiles loader = createLoader(conf);
65 if (LOG.isDebugEnabled()) {
66 LOG.debug("Restoring HFiles from directory " + bulkOutputPath);
67 }
68
69 String[] args = { bulkOutputPath.toString(), newTableNames[i].getNameAsString() };
70 loaderResult = loader.run(args);
71 if (failed(loaderResult)) {
72 throw new IOException("Can not restore from backup directory " + logDirs
73 + " (check Hadoop and HBase logs). Bulk loader return code =" + loaderResult);
74 }
75 }
76 LOG.debug("Restore Job finished:" + result);
77 } catch (Exception e) {
78 throw new IOException("Can not restore from backup directory " + logDirs
79 + " (check Hadoop and HBase logs) ", e);
80 }
81
82 }
83 }
84
85 private boolean failed(int result) {
86 return result != 0;
87 }
88
89 public static LoadIncrementalHFiles createLoader(Configuration conf) throws IOException {
90
91
92
93
94 Integer milliSecInHour = 3600000;
95 conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, milliSecInHour);
96
97
98
99 conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE);
100 LoadIncrementalHFiles loader = null;
101 try {
102 loader = new LoadIncrementalHFiles(conf);
103 } catch (Exception e) {
104 throw new IOException(e);
105 }
106 return loader;
107 }
108
109 @Override
110 public Configuration getConf() {
111 return this.conf;
112 }
113
114 @Override
115 public void setConf(Configuration conf) {
116 this.conf = conf;
117 }
118
119 }