View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.backup.mapreduce;
19  
20  import java.io.IOException;
21  import java.lang.reflect.Field;
22  import java.lang.reflect.Method;
23  import java.math.BigDecimal;
24  import java.util.Arrays;
25  import java.util.List;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.fs.FileSystem;
31  import org.apache.hadoop.fs.LocatedFileStatus;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.fs.RemoteIterator;
34  import org.apache.hadoop.hbase.TableName;
35  import org.apache.hadoop.hbase.backup.BackupCopyService;
36  import org.apache.hadoop.hbase.backup.BackupInfo;
37  import org.apache.hadoop.hbase.backup.impl.BackupManager;
38  import org.apache.hadoop.hbase.backup.util.BackupServerUtil;
39  import org.apache.hadoop.hbase.classification.InterfaceAudience;
40  import org.apache.hadoop.hbase.classification.InterfaceStability;
41  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
42  import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
43  import org.apache.hadoop.mapreduce.Cluster;
44  import org.apache.hadoop.mapreduce.Job;
45  import org.apache.hadoop.mapreduce.JobID;
46  import org.apache.hadoop.mapreduce.Mapper;
47  import org.apache.hadoop.tools.DistCp;
48  import org.apache.hadoop.tools.DistCpConstants;
49  import org.apache.hadoop.tools.DistCpOptions;
50  import org.apache.hadoop.util.ClassUtil;
51  import org.apache.zookeeper.KeeperException.NoNodeException;
52  /**
53   * Copier for backup operation. Basically, there are 2 types of copy. One is copying from snapshot,
54   * which bases on extending ExportSnapshot's function with copy progress reporting to ZooKeeper
55   * implementation. The other is copying for incremental log files, which bases on extending
56   * DistCp's function with copy progress reporting to ZooKeeper implementation.
57   *
58   * For now this is only a wrapper. The other features such as progress and increment backup will be
59   * implemented in future jira
60   */
61  
62  @InterfaceAudience.Private
63  @InterfaceStability.Evolving
64  public class MapReduceBackupCopyService implements BackupCopyService {
65    private static final Log LOG = LogFactory.getLog(MapReduceBackupCopyService.class);
66  
67    private Configuration conf;
68    // private static final long BYTES_PER_MAP = 2 * 256 * 1024 * 1024;
69  
70    // Accumulated progress within the whole backup process for the copy operation
71    private float progressDone = 0.1f;
72    private long bytesCopied = 0;
73    private static float INIT_PROGRESS = 0.1f;
74  
75    // The percentage of the current copy task within the whole task if multiple time copies are
76    // needed. The default value is 100%, which means only 1 copy task for the whole.
77    private float subTaskPercntgInWholeTask = 1f;
78  
79    public MapReduceBackupCopyService() {
80    }
81  
82    @Override
83    public Configuration getConf() {
84      return conf;
85    }
86  
87    @Override
88    public void setConf(Configuration conf) {
89      this.conf = conf;
90    }
91  
92    /**
93     * Get the current copy task percentage within the whole task if multiple copies are needed.
94     * @return the current copy task percentage
95     */
96    public float getSubTaskPercntgInWholeTask() {
97      return subTaskPercntgInWholeTask;
98    }
99  
100   /**
101    * Set the current copy task percentage within the whole task if multiple copies are needed. Must
102    * be called before calling
103    * {@link #copy(BackupHandler, Configuration, Type, String[])}
104    * @param subTaskPercntgInWholeTask The percentage of the copy subtask
105    */
106   public void setSubTaskPercntgInWholeTask(float subTaskPercntgInWholeTask) {
107     this.subTaskPercntgInWholeTask = subTaskPercntgInWholeTask;
108   }
109 
110   class SnapshotCopy extends ExportSnapshot {
111     private BackupInfo backupContext;
112     private TableName table;
113 
114     public SnapshotCopy(BackupInfo backupContext, TableName table) {
115       super();
116       this.backupContext = backupContext;
117       this.table = table;
118     }
119 
120     public TableName getTable() {
121       return this.table;
122     }
123   }
124 
125   /**
126    * Update the ongoing back token znode with new progress.
127    * @param backupContext backup context
128    * 
129    * @param newProgress progress
130    * @param bytesCopied bytes copied
131    * @throws NoNodeException exception
132    */
133   static void updateProgress(BackupInfo backupContext, BackupManager backupManager,
134       int newProgress, long bytesCopied) throws IOException {
135     // compose the new backup progress data, using fake number for now
136     String backupProgressData = newProgress + "%";
137 
138     backupContext.setProgress(newProgress);
139     backupManager.updateBackupInfo(backupContext);
140     LOG.debug("Backup progress data \"" + backupProgressData
141       + "\" has been updated to hbase:backup for " + backupContext.getBackupId());
142   }
143 
144   // Extends DistCp for progress updating to hbase:backup
145   // during backup. Using DistCpV2 (MAPREDUCE-2765).
146   // Simply extend it and override execute() method to get the
147   // Job reference for progress updating.
148   // Only the argument "src1, [src2, [...]] dst" is supported,
149   // no more DistCp options.
150   class BackupDistCp extends DistCp {
151 
152     private BackupInfo backupContext;
153     private BackupManager backupManager;
154 
155     public BackupDistCp(Configuration conf, DistCpOptions options, BackupInfo backupContext,
156         BackupManager backupManager)
157         throws Exception {
158       super(conf, options);
159       this.backupContext = backupContext;
160       this.backupManager = backupManager;
161     }
162 
163     @Override
164     public Job execute() throws Exception {
165 
166       // reflection preparation for private methods and fields
167       Class<?> classDistCp = org.apache.hadoop.tools.DistCp.class;
168       Method methodCreateMetaFolderPath = classDistCp.getDeclaredMethod("createMetaFolderPath");
169       Method methodCreateJob = classDistCp.getDeclaredMethod("createJob");
170       Method methodCreateInputFileListing =
171           classDistCp.getDeclaredMethod("createInputFileListing", Job.class);
172       Method methodCleanup = classDistCp.getDeclaredMethod("cleanup");
173 
174       Field fieldInputOptions = classDistCp.getDeclaredField("inputOptions");
175       Field fieldMetaFolder = classDistCp.getDeclaredField("metaFolder");
176       Field fieldJobFS = classDistCp.getDeclaredField("jobFS");
177       Field fieldSubmitted = classDistCp.getDeclaredField("submitted");
178 
179       methodCreateMetaFolderPath.setAccessible(true);
180       methodCreateJob.setAccessible(true);
181       methodCreateInputFileListing.setAccessible(true);
182       methodCleanup.setAccessible(true);
183 
184       fieldInputOptions.setAccessible(true);
185       fieldMetaFolder.setAccessible(true);
186       fieldJobFS.setAccessible(true);
187       fieldSubmitted.setAccessible(true);
188 
189       // execute() logic starts here
190       assert fieldInputOptions.get(this) != null;
191       assert getConf() != null;
192 
193       Job job = null;
194       try {
195         synchronized (this) {
196           // Don't cleanup while we are setting up.
197           fieldMetaFolder.set(this, methodCreateMetaFolderPath.invoke(this));
198           fieldJobFS.set(this, ((Path) fieldMetaFolder.get(this)).getFileSystem(getConf()));
199           job = (Job) methodCreateJob.invoke(this);
200         }
201         methodCreateInputFileListing.invoke(this, job);
202 
203         // Get the total length of the source files
204         List<Path> srcs = ((DistCpOptions) fieldInputOptions.get(this)).getSourcePaths();
205         
206         long totalSrcLgth = 0;
207         for (Path aSrc : srcs) {
208           totalSrcLgth += BackupServerUtil.getFilesLength(aSrc.getFileSystem(getConf()), aSrc);
209         }
210 
211         // submit the copy job
212         job.submit();
213         fieldSubmitted.set(this, true);
214 
215         // after submit the MR job, set its handler in backup handler for cancel process
216         // this.backupHandler.copyJob = job;
217 
218         // Update the copy progress to ZK every 0.5s if progress value changed
219         int progressReportFreq =
220             this.getConf().getInt("hbase.backup.progressreport.frequency", 500);
221         float lastProgress = progressDone;
222         while (!job.isComplete()) {
223           float newProgress =
224               progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS);
225 
226           if (newProgress > lastProgress) {
227 
228             BigDecimal progressData =
229                 new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
230             String newProgressStr = progressData + "%";
231             LOG.info("Progress: " + newProgressStr);
232             updateProgress(backupContext, backupManager, progressData.intValue(),
233               bytesCopied);
234             LOG.debug("Backup progress data updated to hbase:backup: \"Progress: " + newProgressStr
235               + ".\"");
236             lastProgress = newProgress;
237           }
238           Thread.sleep(progressReportFreq);
239         }
240         // update the progress data after copy job complete
241         float newProgress =
242             progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS);
243         BigDecimal progressData =
244             new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
245 
246         String newProgressStr = progressData + "%";
247         LOG.info("Progress: " + newProgressStr);
248 
249         // accumulate the overall backup progress
250         progressDone = newProgress;
251         bytesCopied += totalSrcLgth;
252 
253         updateProgress(backupContext, backupManager, progressData.intValue(),
254           bytesCopied);
255         LOG.debug("Backup progress data updated to hbase:backup: \"Progress: " + newProgressStr
256           + " - " + bytesCopied + " bytes copied.\"");
257 
258       } finally {
259         if (!fieldSubmitted.getBoolean(this)) {
260           methodCleanup.invoke(this);
261         }
262       }
263 
264       String jobID = job.getJobID().toString();
265       job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
266 
267       LOG.debug("DistCp job-id: " + jobID);
268       return job;
269     }
270 
271   }
272 
273   
274   /**
275    * Do backup copy based on different types.
276    * @param context The backup context
277    * @param conf The hadoop configuration
278    * @param copyType The backup copy type
279    * @param options Options for customized ExportSnapshot or DistCp
280    * @throws Exception exception
281    */
282   @Override
283   public int copy(BackupInfo context, BackupManager backupManager, Configuration conf,
284       BackupCopyService.Type copyType, String[] options) throws IOException {
285     int res = 0;
286 
287     try {
288       if (copyType == Type.FULL) {
289         SnapshotCopy snapshotCp =
290             new SnapshotCopy(context, context.getTableBySnapshot(options[1]));
291         LOG.debug("Doing SNAPSHOT_COPY");
292         // Make a new instance of conf to be used by the snapshot copy class.
293         snapshotCp.setConf(new Configuration(conf));
294         res = snapshotCp.run(options);
295 
296       } else if (copyType == Type.INCREMENTAL) {
297         LOG.debug("Doing COPY_TYPE_DISTCP");
298         setSubTaskPercntgInWholeTask(1f);
299 
300         BackupDistCp distcp = new BackupDistCp(new Configuration(conf), null, context,
301           backupManager);
302         // Handle a special case where the source file is a single file.
303         // In this case, distcp will not create the target dir. It just take the
304         // target as a file name and copy source file to the target (as a file name).
305         // We need to create the target dir before run distcp.
306         LOG.debug("DistCp options: " + Arrays.toString(options));
307         if (options.length == 2) {
308           Path dest = new Path(options[1]);
309           FileSystem destfs = dest.getFileSystem(conf);
310           if (!destfs.exists(dest)) {
311             destfs.mkdirs(dest);
312           }
313         }
314         res = distcp.run(options);
315       }
316       return res;
317 
318     } catch (Exception e) {
319       throw new IOException(e);
320     }
321   }
322 
323    @Override
324    public void cancelCopyJob(String jobId) throws IOException {
325      JobID id = JobID.forName(jobId);     
326      Cluster cluster = new Cluster(getConf());
327      try {
328        Job job = cluster.getJob(id);
329        if (job == null) {
330          LOG.error("No job found for " + id);
331          // should we throw exception
332        }
333        if (job.isComplete() || job.isRetired()) {
334          return;
335        }
336  
337        job.killJob();
338        LOG.debug("Killed job " + id);
339      } catch (InterruptedException e) {
340        throw new IOException(e);
341      }
342    }
343 
344 }