1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.backup.mapreduce;
19
20 import java.io.IOException;
21 import java.lang.reflect.Field;
22 import java.lang.reflect.Method;
23 import java.math.BigDecimal;
24 import java.util.Arrays;
25 import java.util.List;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.FileSystem;
31 import org.apache.hadoop.fs.LocatedFileStatus;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.fs.RemoteIterator;
34 import org.apache.hadoop.hbase.TableName;
35 import org.apache.hadoop.hbase.backup.BackupCopyService;
36 import org.apache.hadoop.hbase.backup.BackupInfo;
37 import org.apache.hadoop.hbase.backup.impl.BackupManager;
38 import org.apache.hadoop.hbase.backup.util.BackupServerUtil;
39 import org.apache.hadoop.hbase.classification.InterfaceAudience;
40 import org.apache.hadoop.hbase.classification.InterfaceStability;
41 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
42 import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
43 import org.apache.hadoop.mapreduce.Cluster;
44 import org.apache.hadoop.mapreduce.Job;
45 import org.apache.hadoop.mapreduce.JobID;
46 import org.apache.hadoop.mapreduce.Mapper;
47 import org.apache.hadoop.tools.DistCp;
48 import org.apache.hadoop.tools.DistCpConstants;
49 import org.apache.hadoop.tools.DistCpOptions;
50 import org.apache.hadoop.util.ClassUtil;
51 import org.apache.zookeeper.KeeperException.NoNodeException;
52
53
54
55
56
57
58
59
60
61
62 @InterfaceAudience.Private
63 @InterfaceStability.Evolving
64 public class MapReduceBackupCopyService implements BackupCopyService {
65 private static final Log LOG = LogFactory.getLog(MapReduceBackupCopyService.class);
66
67 private Configuration conf;
68
69
70
71 private float progressDone = 0.1f;
72 private long bytesCopied = 0;
73 private static float INIT_PROGRESS = 0.1f;
74
75
76
77 private float subTaskPercntgInWholeTask = 1f;
78
79 public MapReduceBackupCopyService() {
80 }
81
82 @Override
83 public Configuration getConf() {
84 return conf;
85 }
86
87 @Override
88 public void setConf(Configuration conf) {
89 this.conf = conf;
90 }
91
92
93
94
95
96 public float getSubTaskPercntgInWholeTask() {
97 return subTaskPercntgInWholeTask;
98 }
99
100
101
102
103
104
105
106 public void setSubTaskPercntgInWholeTask(float subTaskPercntgInWholeTask) {
107 this.subTaskPercntgInWholeTask = subTaskPercntgInWholeTask;
108 }
109
110 class SnapshotCopy extends ExportSnapshot {
111 private BackupInfo backupContext;
112 private TableName table;
113
114 public SnapshotCopy(BackupInfo backupContext, TableName table) {
115 super();
116 this.backupContext = backupContext;
117 this.table = table;
118 }
119
120 public TableName getTable() {
121 return this.table;
122 }
123 }
124
125
126
127
128
129
130
131
132
133 static void updateProgress(BackupInfo backupContext, BackupManager backupManager,
134 int newProgress, long bytesCopied) throws IOException {
135
136 String backupProgressData = newProgress + "%";
137
138 backupContext.setProgress(newProgress);
139 backupManager.updateBackupInfo(backupContext);
140 LOG.debug("Backup progress data \"" + backupProgressData
141 + "\" has been updated to hbase:backup for " + backupContext.getBackupId());
142 }
143
144
145
146
147
148
149
150 class BackupDistCp extends DistCp {
151
152 private BackupInfo backupContext;
153 private BackupManager backupManager;
154
155 public BackupDistCp(Configuration conf, DistCpOptions options, BackupInfo backupContext,
156 BackupManager backupManager)
157 throws Exception {
158 super(conf, options);
159 this.backupContext = backupContext;
160 this.backupManager = backupManager;
161 }
162
163 @Override
164 public Job execute() throws Exception {
165
166
167 Class<?> classDistCp = org.apache.hadoop.tools.DistCp.class;
168 Method methodCreateMetaFolderPath = classDistCp.getDeclaredMethod("createMetaFolderPath");
169 Method methodCreateJob = classDistCp.getDeclaredMethod("createJob");
170 Method methodCreateInputFileListing =
171 classDistCp.getDeclaredMethod("createInputFileListing", Job.class);
172 Method methodCleanup = classDistCp.getDeclaredMethod("cleanup");
173
174 Field fieldInputOptions = classDistCp.getDeclaredField("inputOptions");
175 Field fieldMetaFolder = classDistCp.getDeclaredField("metaFolder");
176 Field fieldJobFS = classDistCp.getDeclaredField("jobFS");
177 Field fieldSubmitted = classDistCp.getDeclaredField("submitted");
178
179 methodCreateMetaFolderPath.setAccessible(true);
180 methodCreateJob.setAccessible(true);
181 methodCreateInputFileListing.setAccessible(true);
182 methodCleanup.setAccessible(true);
183
184 fieldInputOptions.setAccessible(true);
185 fieldMetaFolder.setAccessible(true);
186 fieldJobFS.setAccessible(true);
187 fieldSubmitted.setAccessible(true);
188
189
190 assert fieldInputOptions.get(this) != null;
191 assert getConf() != null;
192
193 Job job = null;
194 try {
195 synchronized (this) {
196
197 fieldMetaFolder.set(this, methodCreateMetaFolderPath.invoke(this));
198 fieldJobFS.set(this, ((Path) fieldMetaFolder.get(this)).getFileSystem(getConf()));
199 job = (Job) methodCreateJob.invoke(this);
200 }
201 methodCreateInputFileListing.invoke(this, job);
202
203
204 List<Path> srcs = ((DistCpOptions) fieldInputOptions.get(this)).getSourcePaths();
205
206 long totalSrcLgth = 0;
207 for (Path aSrc : srcs) {
208 totalSrcLgth += BackupServerUtil.getFilesLength(aSrc.getFileSystem(getConf()), aSrc);
209 }
210
211
212 job.submit();
213 fieldSubmitted.set(this, true);
214
215
216
217
218
219 int progressReportFreq =
220 this.getConf().getInt("hbase.backup.progressreport.frequency", 500);
221 float lastProgress = progressDone;
222 while (!job.isComplete()) {
223 float newProgress =
224 progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS);
225
226 if (newProgress > lastProgress) {
227
228 BigDecimal progressData =
229 new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
230 String newProgressStr = progressData + "%";
231 LOG.info("Progress: " + newProgressStr);
232 updateProgress(backupContext, backupManager, progressData.intValue(),
233 bytesCopied);
234 LOG.debug("Backup progress data updated to hbase:backup: \"Progress: " + newProgressStr
235 + ".\"");
236 lastProgress = newProgress;
237 }
238 Thread.sleep(progressReportFreq);
239 }
240
241 float newProgress =
242 progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS);
243 BigDecimal progressData =
244 new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
245
246 String newProgressStr = progressData + "%";
247 LOG.info("Progress: " + newProgressStr);
248
249
250 progressDone = newProgress;
251 bytesCopied += totalSrcLgth;
252
253 updateProgress(backupContext, backupManager, progressData.intValue(),
254 bytesCopied);
255 LOG.debug("Backup progress data updated to hbase:backup: \"Progress: " + newProgressStr
256 + " - " + bytesCopied + " bytes copied.\"");
257
258 } finally {
259 if (!fieldSubmitted.getBoolean(this)) {
260 methodCleanup.invoke(this);
261 }
262 }
263
264 String jobID = job.getJobID().toString();
265 job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
266
267 LOG.debug("DistCp job-id: " + jobID);
268 return job;
269 }
270
271 }
272
273
274
275
276
277
278
279
280
281
282 @Override
283 public int copy(BackupInfo context, BackupManager backupManager, Configuration conf,
284 BackupCopyService.Type copyType, String[] options) throws IOException {
285 int res = 0;
286
287 try {
288 if (copyType == Type.FULL) {
289 SnapshotCopy snapshotCp =
290 new SnapshotCopy(context, context.getTableBySnapshot(options[1]));
291 LOG.debug("Doing SNAPSHOT_COPY");
292
293 snapshotCp.setConf(new Configuration(conf));
294 res = snapshotCp.run(options);
295
296 } else if (copyType == Type.INCREMENTAL) {
297 LOG.debug("Doing COPY_TYPE_DISTCP");
298 setSubTaskPercntgInWholeTask(1f);
299
300 BackupDistCp distcp = new BackupDistCp(new Configuration(conf), null, context,
301 backupManager);
302
303
304
305
306 LOG.debug("DistCp options: " + Arrays.toString(options));
307 if (options.length == 2) {
308 Path dest = new Path(options[1]);
309 FileSystem destfs = dest.getFileSystem(conf);
310 if (!destfs.exists(dest)) {
311 destfs.mkdirs(dest);
312 }
313 }
314 res = distcp.run(options);
315 }
316 return res;
317
318 } catch (Exception e) {
319 throw new IOException(e);
320 }
321 }
322
323 @Override
324 public void cancelCopyJob(String jobId) throws IOException {
325 JobID id = JobID.forName(jobId);
326 Cluster cluster = new Cluster(getConf());
327 try {
328 Job job = cluster.getJob(id);
329 if (job == null) {
330 LOG.error("No job found for " + id);
331
332 }
333 if (job.isComplete() || job.isRetired()) {
334 return;
335 }
336
337 job.killJob();
338 LOG.debug("Killed job " + id);
339 } catch (InterruptedException e) {
340 throw new IOException(e);
341 }
342 }
343
344 }