View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.backup.master;
20  
21  import java.io.FileNotFoundException;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.OutputStream;
25  import java.net.URI;
26  import java.net.URISyntaxException;
27  import java.util.ArrayList;
28  import java.util.HashMap;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Map.Entry;
32  import java.util.Set;
33  import java.util.TreeMap;
34  import java.util.concurrent.atomic.AtomicBoolean;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.fs.FileSystem;
40  import org.apache.hadoop.fs.FileUtil;
41  import org.apache.hadoop.fs.Path;
42  import org.apache.hadoop.hbase.TableName;
43  import org.apache.hadoop.hbase.backup.BackupCopyService;
44  import org.apache.hadoop.hbase.backup.BackupInfo;
45  import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
46  import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
47  import org.apache.hadoop.hbase.backup.BackupRestoreServerFactory;
48  import org.apache.hadoop.hbase.backup.BackupType;
49  import org.apache.hadoop.hbase.backup.impl.BackupManager;
50  import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
51  import org.apache.hadoop.hbase.backup.impl.IncrementalBackupManager;
52  import org.apache.hadoop.hbase.backup.util.BackupClientUtil;
53  import org.apache.hadoop.hbase.backup.util.BackupServerUtil;
54  import org.apache.hadoop.hbase.classification.InterfaceAudience;
55  import org.apache.hadoop.hbase.client.Admin;
56  import org.apache.hadoop.hbase.client.Connection;
57  import org.apache.hadoop.hbase.mapreduce.WALPlayer;
58  import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
59  import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
60  import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
61  import org.apache.hadoop.hbase.protobuf.generated.BackupProtos;
62  import org.apache.hadoop.hbase.protobuf.generated.BackupProtos.IncrementalTableBackupState;
63  import org.apache.hadoop.hbase.protobuf.generated.BackupProtos.ServerTimestamp;
64  import org.apache.hadoop.hbase.util.Bytes;
65  import org.apache.hadoop.hbase.util.FSUtils;
66  import org.apache.hadoop.hbase.util.HFileArchiveUtil;
67  import org.apache.hadoop.hbase.util.Pair;
68  import org.apache.hadoop.util.StringUtils;
69  import org.apache.hadoop.util.Tool;
70  
71  @InterfaceAudience.Private
72  public class IncrementalTableBackupProcedure
73      extends StateMachineProcedure<MasterProcedureEnv, IncrementalTableBackupState>
74      implements TableProcedureInterface {
75    private static final Log LOG = LogFactory.getLog(IncrementalTableBackupProcedure.class);
76  
77    private final AtomicBoolean aborted = new AtomicBoolean(false);
78    private Configuration conf;
79    private String backupId;
80    private List<TableName> tableList;
81    private String targetRootDir;
82    HashMap<String, Long> newTimestamps = null;
83  
84    private BackupManager backupManager;
85    private BackupInfo backupContext;
86  
87    public IncrementalTableBackupProcedure() {
88      // Required by the Procedure framework to create the procedure on replay
89    }
90  
91    public IncrementalTableBackupProcedure(final MasterProcedureEnv env,
92        final String backupId,
93        List<TableName> tableList, String targetRootDir, final int workers,
94        final long bandwidth) throws IOException {
95      backupManager = new BackupManager(env.getMasterConfiguration());
96      this.backupId = backupId;
97      this.tableList = tableList;
98      this.targetRootDir = targetRootDir;
99      backupContext = backupManager.createBackupInfo(backupId, BackupType.INCREMENTAL, tableList,
100           targetRootDir, workers, bandwidth);
101   }
102 
103   @Override
104   public byte[] getResult() {
105     return backupId.getBytes();
106   }
107 
108   private List<String> filterMissingFiles(List<String> incrBackupFileList) throws IOException {
109     FileSystem fs = FileSystem.get(conf);
110     List<String> list = new ArrayList<String>();
111     for(String file : incrBackupFileList){
112       if(fs.exists(new Path(file))){
113         list.add(file);
114       } else{
115         LOG.warn("Can't find file: "+file);
116       }
117     }
118     return list;
119   }
120 
121   Map<byte[], List<Path>>[] handleBulkLoad(List<TableName> sTableList) throws IOException {
122     Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList.size()];
123     Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> pair =
124         backupManager.readOrigBulkloadRows(sTableList);
125     Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = pair.getFirst();
126     FileSystem fs = FileSystem.get(conf);
127     FileSystem tgtFs;
128     try {
129       tgtFs = FileSystem.get(new URI(backupContext.getTargetRootDir()), conf);
130     } catch (URISyntaxException use) {
131       throw new IOException("Unable to get FileSystem", use);
132     }
133     Path rootdir = FSUtils.getRootDir(conf);
134     Path tgtRoot = new Path(new Path(backupContext.getTargetRootDir()), backupId);
135     LOG.debug("in handleBulkLoad, tgtRoot = " + tgtRoot);
136     for (Map.Entry<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> tblEntry :
137       map.entrySet()) {
138       TableName srcTable = tblEntry.getKey();
139       int srcIdx = BackupSystemTable.getIndex(srcTable, sTableList);
140       if (srcIdx < 0) {
141         LOG.warn("Couldn't find " + srcTable + " in source table List");
142         continue;
143       }
144       if (mapForSrc[srcIdx] == null) {
145         mapForSrc[srcIdx] = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
146       }
147       Path tblDir = FSUtils.getTableDir(rootdir, srcTable);
148       Path tgtTable = new Path(new Path(tgtRoot, srcTable.getNamespaceAsString()),
149           srcTable.getQualifierAsString());
150       for (Map.Entry<String,Map<String,List<Pair<String, Boolean>>>> regionEntry :
151         tblEntry.getValue().entrySet()){
152         String regionName = regionEntry.getKey();
153         Path regionDir = new Path(tblDir, regionName);
154         // map from family to List of hfiles
155         for (Map.Entry<String,List<Pair<String, Boolean>>> famEntry :
156           regionEntry.getValue().entrySet()) {
157           String fam = famEntry.getKey();
158           Path famDir = new Path(regionDir, fam);
159           List<Path> files;
160           if (!mapForSrc[srcIdx].containsKey(fam.getBytes())) {
161             files = new ArrayList<Path>();
162             mapForSrc[srcIdx].put(fam.getBytes(), files);
163           } else {
164             files = mapForSrc[srcIdx].get(fam.getBytes());
165           }
166           Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam);
167           String tblName = srcTable.getQualifierAsString();
168           Path tgtFam = new Path(new Path(tgtTable, regionName), fam);
169           if (!tgtFs.mkdirs(tgtFam)) {
170             throw new IOException("couldn't create " + tgtFam);
171           }
172           for (Pair<String, Boolean> fileWithState : famEntry.getValue()) {
173             String file = fileWithState.getFirst();
174             boolean raw = fileWithState.getSecond();
175             int idx = file.lastIndexOf("/");
176             String filename = file;
177             if (idx > 0) {
178               filename = file.substring(idx+1);
179             }
180             Path p = new Path(famDir, filename);
181             Path tgt = new Path(tgtFam, filename);
182             Path archive = new Path(archiveDir, filename);
183             LOG.debug("bulk testing " + p + " " + fs.exists(p));
184             if (fs.exists(p)) {
185               LOG.debug("found bulk hfile " + file + " in " + famDir + " for " + tblName);
186               try {
187                 LOG.debug("copying " + p + " to " + tgt);
188                 FileUtil.copy(fs, p, tgtFs, tgt, false,conf);
189               } catch (FileNotFoundException e) {
190                 LOG.debug("copying archive " + archive + " to " + tgt);
191                 try {
192                   FileUtil.copy(fs, archive, tgtFs, tgt, false, conf);
193                 } catch (FileNotFoundException fnfe) {
194                   if (!raw) throw fnfe;
195                 }
196               }
197             } else {
198               LOG.debug("copying archive " + archive + " to " + tgt);
199               try {
200                 FileUtil.copy(fs, archive, tgtFs, tgt, false, conf);
201               } catch (FileNotFoundException fnfe) {
202                 if (!raw) throw fnfe;
203               }
204             }
205             files.add(tgt);
206           }
207         }
208       }
209     }
210     backupManager.writeBulkLoadedFiles(sTableList, mapForSrc);
211     backupManager.removeOrigBulkLoadedRows(sTableList, pair.getSecond());
212     return mapForSrc;
213   }
214 
215   /**
216    * Do incremental copy.
217    * @param backupContext backup context
218    */
219   private void incrementalCopy(BackupInfo backupContext) throws Exception {
220 
221     LOG.info("Incremental copy is starting.");
222 
223     // set overall backup phase: incremental_copy
224     backupContext.setPhase(BackupPhase.INCREMENTAL_COPY);
225 
226     // get incremental backup file list and prepare parms for DistCp
227     List<String> incrBackupFileList = backupContext.getIncrBackupFileList();
228     // filter missing files out (they have been copied by previous backups)
229     incrBackupFileList = filterMissingFiles(incrBackupFileList);
230     String[] strArr = incrBackupFileList.toArray(new String[incrBackupFileList.size() + 1]);
231     strArr[strArr.length - 1] = backupContext.getHLogTargetDir();
232 
233     BackupCopyService copyService = BackupRestoreServerFactory.getBackupCopyService(conf);
234     int res = copyService.copy(backupContext, backupManager, conf,
235       BackupCopyService.Type.INCREMENTAL, strArr);
236 
237     if (res != 0) {
238       LOG.error("Copy incremental log files failed with return code: " + res + ".");
239       throw new IOException("Failed of Hadoop Distributed Copy from " + incrBackupFileList + " to "
240           + backupContext.getHLogTargetDir());
241     }
242     LOG.info("Incremental copy from " + incrBackupFileList + " to "
243         + backupContext.getHLogTargetDir() + " finished.");
244   }
245 
246   @Override
247   protected Flow executeFromState(final MasterProcedureEnv env,
248       final IncrementalTableBackupState state) {
249     if (conf == null) {
250       conf = env.getMasterConfiguration();
251     }
252     if (backupManager == null) {
253       try {
254         backupManager = new BackupManager(env.getMasterConfiguration());
255       } catch (IOException ioe) {
256         setFailure("incremental backup", ioe);
257       }
258     }
259     if (LOG.isTraceEnabled()) {
260       LOG.trace(this + " execute state=" + state);
261     }
262     try {
263       switch (state) {
264         case PREPARE_INCREMENTAL:
265           FullTableBackupProcedure.beginBackup(backupManager, backupContext);
266           LOG.debug("For incremental backup, current table set is "
267               + backupManager.getIncrementalBackupTableSet());
268           try {
269             IncrementalBackupManager incrBackupManager =new IncrementalBackupManager(backupManager);
270 
271             newTimestamps = incrBackupManager.getIncrBackupLogFileList(backupContext);
272           } catch (Exception e) {
273             setFailure("Failure in incremental-backup: preparation phase " + backupId, e);
274             // fail the overall backup and return
275             FullTableBackupProcedure.failBackup(env, backupContext, backupManager, e,
276               "Unexpected Exception : ", BackupType.INCREMENTAL, conf);
277           }
278 
279           setNextState(IncrementalTableBackupState.INCREMENTAL_COPY);
280           break;
281         case INCREMENTAL_COPY:
282           try {
283             // copy out the table and region info files for each table
284             BackupServerUtil.copyTableRegionInfo(backupContext, conf);
285             // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
286             convertWALsAndCopy(backupContext, env.getMasterServices().getConnection());
287             incrementalCopyHFiles(backupContext);
288             // Save list of WAL files copied
289             backupManager.recordWALFiles(backupContext.getIncrBackupFileList());
290           } catch (Exception e) {
291             String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId;
292             setFailure(msg, e);
293             // fail the overall backup and return
294             FullTableBackupProcedure.failBackup(env, backupContext, backupManager, e,
295               msg, BackupType.INCREMENTAL, conf);
296           }
297           setNextState(IncrementalTableBackupState.INCR_BACKUP_COMPLETE);
298           break;
299         case INCR_BACKUP_COMPLETE:
300           // set overall backup status: complete. Here we make sure to complete the backup.
301           // After this checkpoint, even if entering cancel process, will let the backup finished
302           backupContext.setState(BackupState.COMPLETE);
303           // Set the previousTimestampMap which is before this current log roll to the manifest.
304           HashMap<TableName, HashMap<String, Long>> previousTimestampMap =
305               backupManager.readLogTimestampMap();
306           backupContext.setIncrTimestampMap(previousTimestampMap);
307 
308           // The table list in backupContext is good for both full backup and incremental backup.
309           // For incremental backup, it contains the incremental backup table set.
310           backupManager.writeRegionServerLogTimestamp(backupContext.getTables(), newTimestamps);
311 
312           HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap =
313               backupManager.readLogTimestampMap();
314 
315           Long newStartCode = BackupClientUtil
316               .getMinValue(BackupServerUtil.getRSLogTimestampMins(newTableSetTimestampMap));
317           backupManager.writeBackupStartCode(newStartCode);
318 
319           handleBulkLoad(backupContext.getTableNames());
320           // backup complete
321           FullTableBackupProcedure.completeBackup(env, backupContext, backupManager,
322             BackupType.INCREMENTAL, conf);
323           return Flow.NO_MORE_STATE;
324 
325         default:
326           throw new UnsupportedOperationException("unhandled state=" + state);
327       }
328     } catch (IOException e) {
329       setFailure("snapshot-table", e);
330     }
331     return Flow.HAS_MORE_STATE;
332   }
333 
334   @Override
335   protected void rollbackState(final MasterProcedureEnv env,
336       final IncrementalTableBackupState state) throws IOException {
337     // clean up the uncompleted data at target directory if the ongoing backup has already entered
338     // the copy phase
339     // For incremental backup, DistCp logs will be cleaned with the targetDir.
340     FullTableBackupProcedure.cleanupTargetDir(backupContext, conf);
341   }
342 
343   @Override
344   protected IncrementalTableBackupState getState(final int stateId) {
345     return IncrementalTableBackupState.valueOf(stateId);
346   }
347 
348   @Override
349   protected int getStateId(final IncrementalTableBackupState state) {
350     return state.getNumber();
351   }
352 
353   @Override
354   protected IncrementalTableBackupState getInitialState() {
355     return IncrementalTableBackupState.PREPARE_INCREMENTAL;
356   }
357 
358   @Override
359   protected void setNextState(final IncrementalTableBackupState state) {
360     if (aborted.get()) {
361       setAbortFailure("snapshot-table", "abort requested");
362     } else {
363       super.setNextState(state);
364     }
365   }
366 
367   @Override
368   public boolean abort(final MasterProcedureEnv env) {
369     aborted.set(true);
370     return true;
371   }
372 
373   @Override
374   public void toStringClassDetails(StringBuilder sb) {
375     sb.append(getClass().getSimpleName());
376     sb.append(" (targetRootDir=");
377     sb.append(targetRootDir);
378     sb.append("; backupId=").append(backupId);
379     sb.append("; tables=");
380     int len = tableList.size();
381     for (int i = 0; i < len-1; i++) {
382       sb.append(tableList.get(i)).append(",");
383     }
384     if (len >= 1) sb.append(tableList.get(len-1));
385     sb.append(")");
386   }
387 
388   BackupProtos.BackupProcContext toBackupInfo() {
389     BackupProtos.BackupProcContext.Builder ctxBuilder = BackupProtos.BackupProcContext.newBuilder();
390     ctxBuilder.setCtx(backupContext.toProtosBackupInfo());
391     if (newTimestamps != null && !newTimestamps.isEmpty()) {
392       BackupProtos.ServerTimestamp.Builder tsBuilder = ServerTimestamp.newBuilder();
393       for (Entry<String, Long> entry : newTimestamps.entrySet()) {
394         tsBuilder.clear().setServer(entry.getKey()).setTimestamp(entry.getValue());
395         ctxBuilder.addServerTimestamp(tsBuilder.build());
396       }
397     }
398     return ctxBuilder.build();
399   }
400 
401   @Override
402   public void serializeStateData(final OutputStream stream) throws IOException {
403     super.serializeStateData(stream);
404 
405     BackupProtos.BackupProcContext backupProcCtx = toBackupInfo();
406     backupProcCtx.writeDelimitedTo(stream);
407   }
408 
409   @Override
410   public void deserializeStateData(final InputStream stream) throws IOException {
411     super.deserializeStateData(stream);
412 
413     BackupProtos.BackupProcContext proto =BackupProtos.BackupProcContext.parseDelimitedFrom(stream);
414     backupContext = BackupInfo.fromProto(proto.getCtx());
415     backupId = backupContext.getBackupId();
416     targetRootDir = backupContext.getTargetRootDir();
417     tableList = backupContext.getTableNames();
418     List<ServerTimestamp> svrTimestamps = proto.getServerTimestampList();
419     if (svrTimestamps != null && !svrTimestamps.isEmpty()) {
420       newTimestamps = new HashMap<>();
421       for (ServerTimestamp ts : svrTimestamps) {
422         newTimestamps.put(ts.getServer(), ts.getTimestamp());
423       }
424     }
425   }
426 
427   @Override
428   public TableName getTableName() {
429     return TableName.BACKUP_TABLE_NAME;
430   }
431 
432   @Override
433   public TableOperationType getTableOperationType() {
434     return TableOperationType.BACKUP;
435   }
436 
437   @Override
438   protected boolean acquireLock(final MasterProcedureEnv env) {
439     if (!env.isInitialized() && !getTableName().isSystemTable()) {
440       return false;
441     }
442     return env.getProcedureQueue().tryAcquireTableWrite(getTableName(), "incremental backup");
443     /*
444     if (env.waitInitialized(this)) {
445       return false;
446     }
447     return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, TableName.BACKUP_TABLE_NAME);
448     */
449   }
450 
451   @Override
452   protected void releaseLock(final MasterProcedureEnv env) {
453     env.getProcedureQueue().releaseTableWrite(getTableName());
454   }
455 
456    private void incrementalCopyHFiles(BackupInfo backupContext) throws Exception {
457 
458     LOG.info("Incremental copy HFiles is starting.");
459     // set overall backup phase: incremental_copy
460     backupContext.setPhase(BackupPhase.INCREMENTAL_COPY);
461     // get incremental backup file list and prepare parms for DistCp
462     List<String> incrBackupFileList = new ArrayList<String>();
463     // Add Bulk output
464     incrBackupFileList.add(getBulkOutputDir().toString());
465     // filter missing files out (they have been copied by previous backups)
466     String[] strArr = incrBackupFileList.toArray(new String[incrBackupFileList.size() + 1]);
467     strArr[strArr.length - 1] = backupContext.getTargetRootDir();
468 
469     BackupCopyService copyService = BackupRestoreServerFactory.getBackupCopyService(conf);
470 
471     int res = copyService.copy(backupContext, backupManager, conf, BackupCopyService.Type.INCREMENTAL, strArr);
472 
473     if (res != 0) {
474       LOG.error("Copy incremental HFile files failed with return code: " + res + ".");
475       throw new IOException("Failed of Hadoop Distributed Copy from "
476           + StringUtils.join(",", incrBackupFileList) + " to " + backupContext.getHLogTargetDir());
477     }
478     deleteBulkLoadDirectory();
479     LOG.info("Incremental copy HFiles from " + StringUtils.join(",", incrBackupFileList) + " to "
480         + backupContext.getTargetRootDir() + " finished.");
481   }
482 
483   private void deleteBulkLoadDirectory() throws IOException {
484     // delete original bulk load directory on method exit
485     Path path = getBulkOutputDir();
486     FileSystem fs = FileSystem.get(conf);
487     boolean result = fs.delete(path, true);
488     if (!result) {
489       LOG.warn ("Could not delete " + path);
490     }
491 
492 }
493   private void convertWALsAndCopy(BackupInfo backupContext, Connection conn) throws IOException {
494     // get incremental backup file list and prepare parms for DistCp
495     List<String> incrBackupFileList = backupContext.getIncrBackupFileList();
496     // filter missing files out (they have been copied by previous backups)
497     incrBackupFileList = filterMissingFiles(incrBackupFileList);
498     // Get list of tables in incremental backup set
499     Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
500     for(TableName table : tableSet) {
501       // Check if table exists
502       if(tableExists(table, conn)) {
503         convertWALToHFiles(incrBackupFileList, table);
504       } else {
505         LOG.warn("Table "+ table+" does not exists. Skipping in WAL converter");
506       }
507     }
508 
509   }
510 
511   private boolean tableExists(TableName table, Connection conn) throws IOException {
512     try (Admin admin = conn.getAdmin();) {
513       return admin.tableExists(table);
514     }
515   }
516 
517   private void convertWALToHFiles(List<String> dirPaths, TableName tableName) throws IOException {
518 
519     String bulkOutputConfKey;
520     Tool player = new WALPlayer();
521 
522     bulkOutputConfKey = WALPlayer.BULK_OUTPUT_CONF_KEY;
523 
524     // Player reads all files in arbitrary directory structure and creates
525     // a Map task for each file. We use ';' as separator
526     // because WAL file names contains ','
527      String dirs = StringUtils.join(";", dirPaths);
528 
529      Path bulkOutputPath = getBulkOutputDirForTable(tableName);
530      conf.set(bulkOutputConfKey, bulkOutputPath.toString());
531      conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
532      String[] playerArgs = { dirs, tableName.getNameAsString() };
533 
534      try {
535        // TODO Player must tolerate missing files or exceptions during conversion
536        player.setConf(conf);
537        player.run(playerArgs);
538       // TODO Check missing files and repeat
539       conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY);
540     } catch (Exception e) {
541       throw new IOException("Can not convert from directory " + dirs
542           + " (check Hadoop and HBase logs) ", e);
543       }
544     }
545 
546   private Path getBulkOutputDirForTable(TableName table) {
547     Path tablePath = getBulkOutputDir();
548     tablePath = new Path(tablePath, table.getNamespaceAsString());
549     tablePath = new Path(tablePath, table.getQualifierAsString());
550     return new Path(tablePath, "data");
551   }
552 
553   private Path getBulkOutputDir() {
554     String backupId = backupContext.getBackupId();
555     Path path = new Path(backupContext.getTargetRootDir());
556     path = new Path(path, ".tmp");
557     path = new Path(path, backupId);
558     return path;
559   }
560 
561 
562 }