View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.backup.impl;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.HashMap;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Set;
29  import java.util.concurrent.ExecutorService;
30  import java.util.concurrent.LinkedBlockingQueue;
31  import java.util.concurrent.ThreadPoolExecutor;
32  import java.util.concurrent.TimeUnit;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.HTableDescriptor;
40  import org.apache.hadoop.hbase.TableName;
41  import org.apache.hadoop.hbase.backup.BackupInfo;
42  import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
43  import org.apache.hadoop.hbase.backup.BackupType;
44  import org.apache.hadoop.hbase.backup.HBackupFileSystem;
45  import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
46  import org.apache.hadoop.hbase.backup.master.BackupController;
47  import org.apache.hadoop.hbase.backup.master.BackupLogCleaner;
48  import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
49  import org.apache.hadoop.hbase.backup.regionserver.LogRollRegionServerProcedureManager;
50  import org.apache.hadoop.hbase.classification.InterfaceAudience;
51  import org.apache.hadoop.hbase.classification.InterfaceStability;
52  import org.apache.hadoop.hbase.client.Admin;
53  import org.apache.hadoop.hbase.client.Connection;
54  import org.apache.hadoop.hbase.client.ConnectionFactory;
55  import org.apache.hadoop.hbase.util.Pair;
56  
57  import com.google.common.util.concurrent.ThreadFactoryBuilder;
58  
59  /**
60   * Handles backup requests on server-side, creates backup context records in hbase:backup
61   * to keep track backup. The timestamps kept in hbase:backup table will be used for future
62   * incremental backup. Creates BackupContext and DispatchRequest.
63   */
64  @InterfaceAudience.Private
65  @InterfaceStability.Evolving
66  public class BackupManager implements Closeable {
67    private static final Log LOG = LogFactory.getLog(BackupManager.class);
68  
69    private Configuration conf = null;
70    private BackupInfo backupInfo = null;
71  
72    private ExecutorService pool = null;
73  
74    private boolean backupComplete = false;
75  
76    private BackupSystemTable systemTable;
77  
78    private final Connection conn;
79  
80    /**
81     * Backup manager constructor.
82     * @param conf configuration
83     * @throws IOException exception
84     */
85    public BackupManager(Configuration conf) throws IOException {
86      if (!conf.getBoolean(HConstants.BACKUP_ENABLE_KEY, HConstants.BACKUP_ENABLE_DEFAULT)) {
87        throw new BackupException("HBase backup is not enabled. Check your " +
88            HConstants.BACKUP_ENABLE_KEY + " setting.");
89      }
90      this.conf = conf;
91      this.conn = ConnectionFactory.createConnection(conf);
92      this.systemTable = new BackupSystemTable(conn);
93       
94    }
95  
96    /**
97     * Return backup context
98     */
99    protected BackupInfo getBackupContext()
100   {
101     return backupInfo;
102   }
103   /**
104    * This method modifies the master's configuration in order to inject backup-related features
105    * @param conf configuration
106    */
107   public static void decorateMasterConfiguration(Configuration conf) {
108     if (!isBackupEnabled(conf)) {
109       return;
110     }
111     // Add WAL archive cleaner plug-in
112     String plugins = conf.get(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS);
113     String cleanerClass = BackupLogCleaner.class.getCanonicalName();
114     if (!plugins.contains(cleanerClass)) {
115       conf.set(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
116     }    
117     
118     String classes = conf.get("hbase.procedure.master.classes");
119     String masterProcedureClass = LogRollMasterProcedureManager.class.getName();
120     if(classes == null){    
121       conf.set("hbase.procedure.master.classes", masterProcedureClass);
122     } else if(!classes.contains(masterProcedureClass)){
123       conf.set("hbase.procedure.master.classes", classes +","+masterProcedureClass);
124     }    
125  
126     // Set Master Observer - Backup Controller
127     classes = conf.get("hbase.coprocessor.master.classes");
128     String observerClass = BackupController.class.getName();
129     if(classes == null){    
130       conf.set("hbase.coprocessor.master.classes", observerClass);
131     } else if(!classes.contains(observerClass)){
132       conf.set("hbase.coprocessor.master.classes", classes +","+observerClass);
133     }    
134 
135     if (LOG.isDebugEnabled()) {
136       LOG.debug("Added log cleaner: " + cleanerClass);
137       LOG.debug("Added master procedure manager: "+masterProcedureClass);
138       LOG.debug("Added master observer: "+observerClass);      
139     }
140     
141   }
142 
143   /**
144    * This method modifies the RS configuration in order to inject backup-related features
145    * @param conf configuration
146    */
147   public static void decorateRSConfiguration(Configuration conf) {
148     if (!isBackupEnabled(conf)) {
149       return;
150     }
151     
152     String classes = conf.get("hbase.procedure.regionserver.classes");
153     String regionProcedureClass = LogRollRegionServerProcedureManager.class.getName();
154     if(classes == null){    
155       conf.set("hbase.procedure.regionserver.classes", regionProcedureClass);
156     } else if(!classes.contains(regionProcedureClass)){
157       conf.set("hbase.procedure.regionserver.classes", classes +","+regionProcedureClass);
158     }    
159     if (LOG.isDebugEnabled()) {
160       LOG.debug("Added region procedure manager: "+regionProcedureClass);
161     }
162     
163   }
164   
165   
166   public static boolean isBackupEnabled(Configuration conf) {
167     return conf.getBoolean(HConstants.BACKUP_ENABLE_KEY, HConstants.BACKUP_ENABLE_DEFAULT);
168   }
169 
170   /**
171    * Get configuration
172    * @return configuration
173    */
174   Configuration getConf() {
175     return conf;
176   }
177 
178   /**
179    * Stop all the work of backup.
180    */
181   @Override
182   public void close() {
183     // currently, we shutdown now for all ongoing back handlers, we may need to do something like
184     // record the failed list somewhere later
185     if (this.pool != null) {
186       this.pool.shutdownNow();
187     }
188     if (systemTable != null) {
189       try {
190         systemTable.close();
191       } catch (Exception e) {
192         LOG.error(e);
193       }
194     }
195     if (conn != null) {
196       try {
197         conn.close();
198       } catch (IOException e) {
199         LOG.error(e);
200       }
201     }
202   }
203 
204   /**
205    * Create a BackupContext based on input backup request.
206    * @param backupId backup id
207    * @param type    type
208    * @param tablelist table list
209    * @param targetRootDir root dir
210    * @param snapshot snapshot name
211    * @return BackupContext context
212    * @throws BackupException exception
213    */
214   public BackupInfo createBackupInfo(String backupId, BackupType type,
215       List<TableName> tableList, String targetRootDir, int workers, long bandwidth)
216           throws BackupException {
217     if (targetRootDir == null) {
218       throw new BackupException("Wrong backup request parameter: target backup root directory");
219     }
220 
221     if (type == BackupType.FULL && (tableList == null || tableList.isEmpty())) {
222       // If table list is null for full backup, which means backup all tables. Then fill the table
223       // list with all user tables from meta. It no table available, throw the request exception.
224 
225       HTableDescriptor[] htds = null;
226       try (Admin hbadmin = conn.getAdmin()) {
227         htds = hbadmin.listTables();
228       } catch (Exception e) {
229         throw new BackupException(e);
230       }
231 
232       if (htds == null) {
233         throw new BackupException("No table exists for full backup of all tables.");
234       } else {
235         tableList = new ArrayList<>();
236         for (HTableDescriptor hTableDescriptor : htds) {
237           tableList.add(hTableDescriptor.getTableName());
238         }
239 
240         LOG.info("Full backup all the tables available in the cluster: " + tableList);
241       }
242     }
243 
244     // there are one or more tables in the table list
245     backupInfo = new BackupInfo(backupId, type, 
246       tableList.toArray(new TableName[tableList.size()]),
247       targetRootDir);
248     backupInfo.setBandwidth(bandwidth);
249     backupInfo.setWorkers(workers);
250     return backupInfo;
251   }
252 
253   /**
254    * Check if any ongoing backup. Currently, we only reply on checking status in hbase:backup. We
255    * need to consider to handle the case of orphan records in the future. Otherwise, all the coming
256    * request will fail.
257    * @return the ongoing backup id if on going backup exists, otherwise null
258    * @throws IOException exception
259    */
260   private String getOngoingBackupId() throws IOException {
261 
262     ArrayList<BackupInfo> sessions = systemTable.getBackupContexts(BackupState.RUNNING);
263     if (sessions.size() == 0) {
264       return null;
265     }
266     return sessions.get(0).getBackupId();
267   }
268 
269   /**
270    * Start the backup manager service.
271    * @throws IOException exception
272    */
273   public void initialize() throws IOException {
274     String ongoingBackupId = this.getOngoingBackupId();
275     if (ongoingBackupId != null) {
276       LOG.info("There is a ongoing backup " + ongoingBackupId
277           + ". Can not launch new backup until no ongoing backup remains.");
278       throw new BackupException("There is ongoing backup.");
279     }
280 
281     // Initialize thread pools
282     int nrThreads = this.conf.getInt("hbase.backup.threads.max", 1);
283     ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
284     builder.setNameFormat("BackupHandler-%1$d");
285     this.pool =
286         new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS,
287             new LinkedBlockingQueue<Runnable>(), builder.build());
288     ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
289   }
290 
291   public void setBackupInfo(BackupInfo backupInfo) {
292     this.backupInfo = backupInfo;
293   }
294 
295   /**
296    * Get direct ancestors of the current backup.
297    * @param backupCtx The backup context for the current backup
298    * @return The ancestors for the current backup
299    * @throws IOException exception
300    * @throws BackupException exception
301    */
302   public ArrayList<BackupImage> getAncestors(BackupInfo backupCtx) throws IOException,
303       BackupException {
304     LOG.debug("Getting the direct ancestors of the current backup "+ 
305       backupCtx.getBackupId());
306 
307     ArrayList<BackupImage> ancestors = new ArrayList<BackupImage>();
308 
309     // full backup does not have ancestor
310     if (backupCtx.getType() == BackupType.FULL) {
311       LOG.debug("Current backup is a full backup, no direct ancestor for it.");
312       return ancestors;
313     }
314 
315     // get all backup history list in descending order
316 
317     ArrayList<BackupInfo> allHistoryList = getBackupHistory(true);
318     for (BackupInfo backup : allHistoryList) {
319       BackupImage image =
320           new BackupImage(backup.getBackupId(), backup.getType(),
321             backup.getTargetRootDir(),
322               backup.getTableNames(), backup.getStartTs(), backup
323                   .getEndTs());
324       // add the full backup image as an ancestor until the last incremental backup
325       if (backup.getType().equals(BackupType.FULL)) {
326         // check the backup image coverage, if previous image could be covered by the newer ones,
327         // then no need to add
328         if (!BackupManifest.canCoverImage(ancestors, image)) {
329           ancestors.add(image);
330         }
331       } else {
332         // found last incremental backup, if previously added full backup ancestor images can cover
333         // it, then this incremental ancestor is not the dependent of the current incremental
334         // backup, that is to say, this is the backup scope boundary of current table set.
335         // Otherwise, this incremental backup ancestor is the dependent ancestor of the ongoing
336         // incremental backup
337         if (BackupManifest.canCoverImage(ancestors, image)) {
338           LOG.debug("Met the backup boundary of the current table set. "
339               + "The root full backup images for the current backup scope:");
340           for (BackupImage image1 : ancestors) {
341             LOG.debug("  BackupId: " + image1.getBackupId() + ", Backup directory: "
342                 + image1.getRootDir());
343           }
344         } else {
345           Path logBackupPath =
346               HBackupFileSystem.getLogBackupPath(backup.getTargetRootDir(),
347                 backup.getBackupId());
348           LOG.debug("Current backup has an incremental backup ancestor, "
349               + "touching its image manifest in " + logBackupPath.toString()
350               + " to construct the dependency.");
351           BackupManifest lastIncrImgManifest = new BackupManifest(conf, logBackupPath);
352           BackupImage lastIncrImage = lastIncrImgManifest.getBackupImage();
353           ancestors.add(lastIncrImage);
354 
355           LOG.debug("Last dependent incremental backup image information:");
356           LOG.debug("  Token: " + lastIncrImage.getBackupId());
357           LOG.debug("  Backup directory: " + lastIncrImage.getRootDir());
358         }
359       }
360     }
361     LOG.debug("Got " + ancestors.size() + " ancestors for the current backup.");
362     return ancestors;
363   }
364 
365   /**
366    * Get the direct ancestors of this backup for one table involved.
367    * @param backupContext backup context
368    * @param table table
369    * @return backupImages on the dependency list
370    * @throws BackupException exception
371    * @throws IOException exception
372    */
373   public ArrayList<BackupImage> getAncestors(BackupInfo backupContext, TableName table)
374       throws BackupException, IOException {
375     ArrayList<BackupImage> ancestors = getAncestors(backupContext);
376     ArrayList<BackupImage> tableAncestors = new ArrayList<BackupImage>();
377     for (BackupImage image : ancestors) {
378       if (image.hasTable(table)) {
379         tableAncestors.add(image);
380         if (image.getType() == BackupType.FULL) {
381           break;
382         }
383       }
384     }
385     return tableAncestors;
386   }
387 
388   /*
389    * hbase:backup operations
390    */
391 
392   /**
393    * Updates status (state) of a backup session in a persistent store
394    * @param context context
395    * @throws IOException exception
396    */
397   public void updateBackupInfo(BackupInfo context) throws IOException {
398     systemTable.updateBackupInfo(context);
399   }
400 
401   /**
402    * Read the last backup start code (timestamp) of last successful backup. Will return null
403    * if there is no startcode stored in hbase:backup or the value is of length 0. These two
404    * cases indicate there is no successful backup completed so far.
405    * @return the timestamp of a last successful backup
406    * @throws IOException exception
407    */
408   public String readBackupStartCode() throws IOException {
409     return systemTable.readBackupStartCode(backupInfo.getTargetRootDir());
410   }
411 
412   /**
413    * Write the start code (timestamp) to hbase:backup. If passed in null, then write 0 byte.
414    * @param startCode start code
415    * @throws IOException exception
416    */
417   public void writeBackupStartCode(Long startCode) throws IOException {
418     systemTable.writeBackupStartCode(startCode, backupInfo.getTargetRootDir());
419   }
420 
421   public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>>
422   readOrigBulkloadRows(List<TableName> tableList) throws IOException {
423     return systemTable.readOrigBulkloadRows(tableList);
424   }
425 
426   public void removeOrigBulkLoadedRows(List<TableName> lst, List<byte[]> rows) throws IOException {
427     systemTable.removeOrigBulkLoadedRows(lst, rows);
428   }
429 
430   public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps)
431       throws IOException {
432     systemTable.writeBulkLoadedFiles(sTableList, maps, backupInfo.getBackupId());
433   }
434 
435   /**
436    * Get the RS log information after the last log roll from hbase:backup.
437    * @return RS log info
438    * @throws IOException exception
439    */
440   public HashMap<String, Long> readRegionServerLastLogRollResult() throws IOException {
441     return systemTable.readRegionServerLastLogRollResult(backupInfo.getTargetRootDir());
442   }
443 
444   /**
445    * Get all completed backup information (in desc order by time)
446    * @return history info of BackupCompleteData
447    * @throws IOException exception
448    */
449   public ArrayList<BackupInfo> getBackupHistory() throws IOException {
450     return systemTable.getBackupHistory();
451   }
452 
453   public ArrayList<BackupInfo> getBackupHistory(boolean completed) throws IOException {
454     return systemTable.getBackupHistory(completed);
455   }
456   /**
457    * Write the current timestamps for each regionserver to hbase:backup after a successful full or
458    * incremental backup. Each table may have a different set of log timestamps. The saved timestamp
459    * is of the last log file that was backed up already.
460    * @param tables tables
461    * @throws IOException exception
462    */
463   public void writeRegionServerLogTimestamp(Set<TableName> tables,
464       HashMap<String, Long> newTimestamps) throws IOException {
465     systemTable.writeRegionServerLogTimestamp(tables, newTimestamps, 
466       backupInfo.getTargetRootDir());
467   }
468 
469   /**
470    * Read the timestamp for each region server log after the last successful backup. Each table has
471    * its own set of the timestamps.
472    * @return the timestamp for each region server. key: tableName value:
473    *         RegionServer,PreviousTimeStamp
474    * @throws IOException exception
475    */
476   public HashMap<TableName, HashMap<String, Long>> readLogTimestampMap() throws IOException {
477     return systemTable.readLogTimestampMap(backupInfo.getTargetRootDir());
478   }
479 
480   /**
481    * Return the current tables covered by incremental backup.
482    * @return set of tableNames
483    * @throws IOException exception
484    */
485   public Set<TableName> getIncrementalBackupTableSet() throws IOException {
486     return systemTable.getIncrementalBackupTableSet(backupInfo.getTargetRootDir());
487   }
488 
489   /**
490    * Adds set of tables to overall incremental backup table set
491    * @param tables tables
492    * @throws IOException exception
493    */
494   public void addIncrementalBackupTableSet(Set<TableName> tables) throws IOException {
495     systemTable.addIncrementalBackupTableSet(tables, backupInfo.getTargetRootDir());
496   }
497 
498   /**
499    * Saves list of WAL files after incremental backup operation. These files will be stored until
500    * TTL expiration and are used by Backup Log Cleaner plugin to determine which WAL files can be
501    * safely purged.
502    */
503   public void recordWALFiles(List<String> files) throws IOException {
504     systemTable.addWALFiles(files, 
505       backupInfo.getBackupId(), backupInfo.getTargetRootDir());
506   }
507 
508   /**
509    * Get WAL files iterator
510    * @return WAL files iterator from hbase:backup
511    * @throws IOException
512    */
513   public Iterator<BackupSystemTable.WALItem> getWALFilesFromBackupSystem() throws IOException {
514     return  systemTable.getWALFilesIterator(backupInfo.getTargetRootDir());
515   }
516 
517   public Connection getConnection() {
518     return conn;
519   }
520 }