View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.backup.mapreduce;
19  
20  import java.io.IOException;
21  
22  import org.apache.commons.lang.StringUtils;
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.fs.Path;
27  import org.apache.hadoop.hbase.HConstants;
28  import org.apache.hadoop.hbase.TableName;
29  import org.apache.hadoop.hbase.backup.IncrementalRestoreService;
30  import org.apache.hadoop.hbase.backup.util.BackupServerUtil;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.hbase.classification.InterfaceStability;
33  import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
34  
35  @InterfaceAudience.Private
36  @InterfaceStability.Evolving
37  public class MapReduceRestoreService implements IncrementalRestoreService {
38    public static final Log LOG = LogFactory.getLog(MapReduceRestoreService.class);
39  
40    private Configuration conf;
41  
42    public MapReduceRestoreService() {
43    }
44  
45    @Override
46    public void run(Path[] fileDirPaths, TableName[] tableNames, TableName[] newTableNames)
47        throws IOException {
48  
49      // tableNames and new TableNames are of length 1
50  
51      String logDirs = StringUtils.join(fileDirPaths, ",");
52      LOG.info("Restore incremental backup from directory " + logDirs + " from hbase tables "
53          + BackupServerUtil.join(tableNames) + " to tables " + BackupServerUtil.join(newTableNames));
54  
55      for (int i = 0; i < tableNames.length; i++) {
56  
57        LOG.info("Restore " + tableNames[i] + " into " + newTableNames[i]);
58  
59        int result = 0;
60        int loaderResult = 0;
61        try {
62          // do bulk load
63          for (Path bulkOutputPath : fileDirPaths) {
64            LoadIncrementalHFiles loader = createLoader(conf);
65            if (LOG.isDebugEnabled()) {
66              LOG.debug("Restoring HFiles from directory " + bulkOutputPath);
67            }
68  
69            String[] args = { bulkOutputPath.toString(), newTableNames[i].getNameAsString() };
70            loaderResult = loader.run(args);
71            if (failed(loaderResult)) {
72              throw new IOException("Can not restore from backup directory " + logDirs
73                  + " (check Hadoop and HBase logs). Bulk loader return code =" + loaderResult);
74            }
75          }
76          LOG.debug("Restore Job finished:" + result);
77        } catch (Exception e) {
78          throw new IOException("Can not restore from backup directory " + logDirs
79              + " (check Hadoop and HBase logs) ", e);
80        }
81  
82      }
83    }
84  
85    private boolean failed(int result) {
86      return result != 0;
87    }
88  
89    public static LoadIncrementalHFiles createLoader(Configuration conf) throws IOException {
90      // set configuration for restore:
91      // LoadIncrementalHFile needs more time
92      // <name>hbase.rpc.timeout</name> <value>600000</value>
93      // calculates
94      Integer milliSecInHour = 3600000;
95      conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, milliSecInHour);
96  
97      // By default, it is 32 and loader will fail if # of files in any region exceed this
98      // limit. Bad for snapshot restore.
99      conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE);
100     LoadIncrementalHFiles loader = null;
101     try {
102       loader = new LoadIncrementalHFiles(conf);
103     } catch (Exception e) {
104       throw new IOException(e);
105     }
106     return loader;
107   }
108 
109   @Override
110   public Configuration getConf() {
111     return this.conf;
112   }
113 
114   @Override
115   public void setConf(Configuration conf) {
116     this.conf = conf;
117   }
118 
119 }