1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.backup.master;
20
21 import java.io.FileNotFoundException;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.io.OutputStream;
25 import java.net.URI;
26 import java.net.URISyntaxException;
27 import java.util.ArrayList;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Map.Entry;
32 import java.util.Set;
33 import java.util.TreeMap;
34 import java.util.concurrent.atomic.AtomicBoolean;
35
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.fs.FileSystem;
40 import org.apache.hadoop.fs.FileUtil;
41 import org.apache.hadoop.fs.Path;
42 import org.apache.hadoop.hbase.TableName;
43 import org.apache.hadoop.hbase.backup.BackupCopyService;
44 import org.apache.hadoop.hbase.backup.BackupInfo;
45 import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
46 import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
47 import org.apache.hadoop.hbase.backup.BackupRestoreServerFactory;
48 import org.apache.hadoop.hbase.backup.BackupType;
49 import org.apache.hadoop.hbase.backup.impl.BackupManager;
50 import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
51 import org.apache.hadoop.hbase.backup.impl.IncrementalBackupManager;
52 import org.apache.hadoop.hbase.backup.util.BackupClientUtil;
53 import org.apache.hadoop.hbase.backup.util.BackupServerUtil;
54 import org.apache.hadoop.hbase.classification.InterfaceAudience;
55 import org.apache.hadoop.hbase.client.Admin;
56 import org.apache.hadoop.hbase.client.Connection;
57 import org.apache.hadoop.hbase.mapreduce.WALPlayer;
58 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
59 import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
60 import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
61 import org.apache.hadoop.hbase.protobuf.generated.BackupProtos;
62 import org.apache.hadoop.hbase.protobuf.generated.BackupProtos.IncrementalTableBackupState;
63 import org.apache.hadoop.hbase.protobuf.generated.BackupProtos.ServerTimestamp;
64 import org.apache.hadoop.hbase.util.Bytes;
65 import org.apache.hadoop.hbase.util.FSUtils;
66 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
67 import org.apache.hadoop.hbase.util.Pair;
68 import org.apache.hadoop.util.StringUtils;
69 import org.apache.hadoop.util.Tool;
70
71 @InterfaceAudience.Private
72 public class IncrementalTableBackupProcedure
73 extends StateMachineProcedure<MasterProcedureEnv, IncrementalTableBackupState>
74 implements TableProcedureInterface {
75 private static final Log LOG = LogFactory.getLog(IncrementalTableBackupProcedure.class);
76
77 private final AtomicBoolean aborted = new AtomicBoolean(false);
78 private Configuration conf;
79 private String backupId;
80 private List<TableName> tableList;
81 private String targetRootDir;
82 HashMap<String, Long> newTimestamps = null;
83
84 private BackupManager backupManager;
85 private BackupInfo backupContext;
86
87 public IncrementalTableBackupProcedure() {
88
89 }
90
91 public IncrementalTableBackupProcedure(final MasterProcedureEnv env,
92 final String backupId,
93 List<TableName> tableList, String targetRootDir, final int workers,
94 final long bandwidth) throws IOException {
95 backupManager = new BackupManager(env.getMasterConfiguration());
96 this.backupId = backupId;
97 this.tableList = tableList;
98 this.targetRootDir = targetRootDir;
99 backupContext = backupManager.createBackupInfo(backupId, BackupType.INCREMENTAL, tableList,
100 targetRootDir, workers, bandwidth);
101 }
102
103 @Override
104 public byte[] getResult() {
105 return backupId.getBytes();
106 }
107
108 private List<String> filterMissingFiles(List<String> incrBackupFileList) throws IOException {
109 FileSystem fs = FileSystem.get(conf);
110 List<String> list = new ArrayList<String>();
111 for(String file : incrBackupFileList){
112 if(fs.exists(new Path(file))){
113 list.add(file);
114 } else{
115 LOG.warn("Can't find file: "+file);
116 }
117 }
118 return list;
119 }
120
121 Map<byte[], List<Path>>[] handleBulkLoad(List<TableName> sTableList) throws IOException {
122 Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList.size()];
123 Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> pair =
124 backupManager.readOrigBulkloadRows(sTableList);
125 Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = pair.getFirst();
126 FileSystem fs = FileSystem.get(conf);
127 FileSystem tgtFs;
128 try {
129 tgtFs = FileSystem.get(new URI(backupContext.getTargetRootDir()), conf);
130 } catch (URISyntaxException use) {
131 throw new IOException("Unable to get FileSystem", use);
132 }
133 Path rootdir = FSUtils.getRootDir(conf);
134 Path tgtRoot = new Path(new Path(backupContext.getTargetRootDir()), backupId);
135 LOG.debug("in handleBulkLoad, tgtRoot = " + tgtRoot);
136 for (Map.Entry<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> tblEntry :
137 map.entrySet()) {
138 TableName srcTable = tblEntry.getKey();
139 int srcIdx = BackupSystemTable.getIndex(srcTable, sTableList);
140 if (srcIdx < 0) {
141 LOG.warn("Couldn't find " + srcTable + " in source table List");
142 continue;
143 }
144 if (mapForSrc[srcIdx] == null) {
145 mapForSrc[srcIdx] = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
146 }
147 Path tblDir = FSUtils.getTableDir(rootdir, srcTable);
148 Path tgtTable = new Path(new Path(tgtRoot, srcTable.getNamespaceAsString()),
149 srcTable.getQualifierAsString());
150 for (Map.Entry<String,Map<String,List<Pair<String, Boolean>>>> regionEntry :
151 tblEntry.getValue().entrySet()){
152 String regionName = regionEntry.getKey();
153 Path regionDir = new Path(tblDir, regionName);
154
155 for (Map.Entry<String,List<Pair<String, Boolean>>> famEntry :
156 regionEntry.getValue().entrySet()) {
157 String fam = famEntry.getKey();
158 Path famDir = new Path(regionDir, fam);
159 List<Path> files;
160 if (!mapForSrc[srcIdx].containsKey(fam.getBytes())) {
161 files = new ArrayList<Path>();
162 mapForSrc[srcIdx].put(fam.getBytes(), files);
163 } else {
164 files = mapForSrc[srcIdx].get(fam.getBytes());
165 }
166 Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam);
167 String tblName = srcTable.getQualifierAsString();
168 Path tgtFam = new Path(new Path(tgtTable, regionName), fam);
169 if (!tgtFs.mkdirs(tgtFam)) {
170 throw new IOException("couldn't create " + tgtFam);
171 }
172 for (Pair<String, Boolean> fileWithState : famEntry.getValue()) {
173 String file = fileWithState.getFirst();
174 boolean raw = fileWithState.getSecond();
175 int idx = file.lastIndexOf("/");
176 String filename = file;
177 if (idx > 0) {
178 filename = file.substring(idx+1);
179 }
180 Path p = new Path(famDir, filename);
181 Path tgt = new Path(tgtFam, filename);
182 Path archive = new Path(archiveDir, filename);
183 LOG.debug("bulk testing " + p + " " + fs.exists(p));
184 if (fs.exists(p)) {
185 LOG.debug("found bulk hfile " + file + " in " + famDir + " for " + tblName);
186 try {
187 LOG.debug("copying " + p + " to " + tgt);
188 FileUtil.copy(fs, p, tgtFs, tgt, false,conf);
189 } catch (FileNotFoundException e) {
190 LOG.debug("copying archive " + archive + " to " + tgt);
191 try {
192 FileUtil.copy(fs, archive, tgtFs, tgt, false, conf);
193 } catch (FileNotFoundException fnfe) {
194 if (!raw) throw fnfe;
195 }
196 }
197 } else {
198 LOG.debug("copying archive " + archive + " to " + tgt);
199 try {
200 FileUtil.copy(fs, archive, tgtFs, tgt, false, conf);
201 } catch (FileNotFoundException fnfe) {
202 if (!raw) throw fnfe;
203 }
204 }
205 files.add(tgt);
206 }
207 }
208 }
209 }
210 backupManager.writeBulkLoadedFiles(sTableList, mapForSrc);
211 backupManager.removeOrigBulkLoadedRows(sTableList, pair.getSecond());
212 return mapForSrc;
213 }
214
215
216
217
218
219 private void incrementalCopy(BackupInfo backupContext) throws Exception {
220
221 LOG.info("Incremental copy is starting.");
222
223
224 backupContext.setPhase(BackupPhase.INCREMENTAL_COPY);
225
226
227 List<String> incrBackupFileList = backupContext.getIncrBackupFileList();
228
229 incrBackupFileList = filterMissingFiles(incrBackupFileList);
230 String[] strArr = incrBackupFileList.toArray(new String[incrBackupFileList.size() + 1]);
231 strArr[strArr.length - 1] = backupContext.getHLogTargetDir();
232
233 BackupCopyService copyService = BackupRestoreServerFactory.getBackupCopyService(conf);
234 int res = copyService.copy(backupContext, backupManager, conf,
235 BackupCopyService.Type.INCREMENTAL, strArr);
236
237 if (res != 0) {
238 LOG.error("Copy incremental log files failed with return code: " + res + ".");
239 throw new IOException("Failed of Hadoop Distributed Copy from " + incrBackupFileList + " to "
240 + backupContext.getHLogTargetDir());
241 }
242 LOG.info("Incremental copy from " + incrBackupFileList + " to "
243 + backupContext.getHLogTargetDir() + " finished.");
244 }
245
246 @Override
247 protected Flow executeFromState(final MasterProcedureEnv env,
248 final IncrementalTableBackupState state) {
249 if (conf == null) {
250 conf = env.getMasterConfiguration();
251 }
252 if (backupManager == null) {
253 try {
254 backupManager = new BackupManager(env.getMasterConfiguration());
255 } catch (IOException ioe) {
256 setFailure("incremental backup", ioe);
257 }
258 }
259 if (LOG.isTraceEnabled()) {
260 LOG.trace(this + " execute state=" + state);
261 }
262 try {
263 switch (state) {
264 case PREPARE_INCREMENTAL:
265 FullTableBackupProcedure.beginBackup(backupManager, backupContext);
266 LOG.debug("For incremental backup, current table set is "
267 + backupManager.getIncrementalBackupTableSet());
268 try {
269 IncrementalBackupManager incrBackupManager =new IncrementalBackupManager(backupManager);
270
271 newTimestamps = incrBackupManager.getIncrBackupLogFileList(backupContext);
272 } catch (Exception e) {
273 setFailure("Failure in incremental-backup: preparation phase " + backupId, e);
274
275 FullTableBackupProcedure.failBackup(env, backupContext, backupManager, e,
276 "Unexpected Exception : ", BackupType.INCREMENTAL, conf);
277 }
278
279 setNextState(IncrementalTableBackupState.INCREMENTAL_COPY);
280 break;
281 case INCREMENTAL_COPY:
282 try {
283
284 BackupServerUtil.copyTableRegionInfo(backupContext, conf);
285
286 convertWALsAndCopy(backupContext, env.getMasterServices().getConnection());
287 incrementalCopyHFiles(backupContext);
288
289 backupManager.recordWALFiles(backupContext.getIncrBackupFileList());
290 } catch (Exception e) {
291 String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId;
292 setFailure(msg, e);
293
294 FullTableBackupProcedure.failBackup(env, backupContext, backupManager, e,
295 msg, BackupType.INCREMENTAL, conf);
296 }
297 setNextState(IncrementalTableBackupState.INCR_BACKUP_COMPLETE);
298 break;
299 case INCR_BACKUP_COMPLETE:
300
301
302 backupContext.setState(BackupState.COMPLETE);
303
304 HashMap<TableName, HashMap<String, Long>> previousTimestampMap =
305 backupManager.readLogTimestampMap();
306 backupContext.setIncrTimestampMap(previousTimestampMap);
307
308
309
310 backupManager.writeRegionServerLogTimestamp(backupContext.getTables(), newTimestamps);
311
312 HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap =
313 backupManager.readLogTimestampMap();
314
315 Long newStartCode = BackupClientUtil
316 .getMinValue(BackupServerUtil.getRSLogTimestampMins(newTableSetTimestampMap));
317 backupManager.writeBackupStartCode(newStartCode);
318
319 handleBulkLoad(backupContext.getTableNames());
320
321 FullTableBackupProcedure.completeBackup(env, backupContext, backupManager,
322 BackupType.INCREMENTAL, conf);
323 return Flow.NO_MORE_STATE;
324
325 default:
326 throw new UnsupportedOperationException("unhandled state=" + state);
327 }
328 } catch (IOException e) {
329 setFailure("snapshot-table", e);
330 }
331 return Flow.HAS_MORE_STATE;
332 }
333
334 @Override
335 protected void rollbackState(final MasterProcedureEnv env,
336 final IncrementalTableBackupState state) throws IOException {
337
338
339
340 FullTableBackupProcedure.cleanupTargetDir(backupContext, conf);
341 }
342
343 @Override
344 protected IncrementalTableBackupState getState(final int stateId) {
345 return IncrementalTableBackupState.valueOf(stateId);
346 }
347
348 @Override
349 protected int getStateId(final IncrementalTableBackupState state) {
350 return state.getNumber();
351 }
352
353 @Override
354 protected IncrementalTableBackupState getInitialState() {
355 return IncrementalTableBackupState.PREPARE_INCREMENTAL;
356 }
357
358 @Override
359 protected void setNextState(final IncrementalTableBackupState state) {
360 if (aborted.get()) {
361 setAbortFailure("snapshot-table", "abort requested");
362 } else {
363 super.setNextState(state);
364 }
365 }
366
367 @Override
368 public boolean abort(final MasterProcedureEnv env) {
369 aborted.set(true);
370 return true;
371 }
372
373 @Override
374 public void toStringClassDetails(StringBuilder sb) {
375 sb.append(getClass().getSimpleName());
376 sb.append(" (targetRootDir=");
377 sb.append(targetRootDir);
378 sb.append("; backupId=").append(backupId);
379 sb.append("; tables=");
380 int len = tableList.size();
381 for (int i = 0; i < len-1; i++) {
382 sb.append(tableList.get(i)).append(",");
383 }
384 if (len >= 1) sb.append(tableList.get(len-1));
385 sb.append(")");
386 }
387
388 BackupProtos.BackupProcContext toBackupInfo() {
389 BackupProtos.BackupProcContext.Builder ctxBuilder = BackupProtos.BackupProcContext.newBuilder();
390 ctxBuilder.setCtx(backupContext.toProtosBackupInfo());
391 if (newTimestamps != null && !newTimestamps.isEmpty()) {
392 BackupProtos.ServerTimestamp.Builder tsBuilder = ServerTimestamp.newBuilder();
393 for (Entry<String, Long> entry : newTimestamps.entrySet()) {
394 tsBuilder.clear().setServer(entry.getKey()).setTimestamp(entry.getValue());
395 ctxBuilder.addServerTimestamp(tsBuilder.build());
396 }
397 }
398 return ctxBuilder.build();
399 }
400
401 @Override
402 public void serializeStateData(final OutputStream stream) throws IOException {
403 super.serializeStateData(stream);
404
405 BackupProtos.BackupProcContext backupProcCtx = toBackupInfo();
406 backupProcCtx.writeDelimitedTo(stream);
407 }
408
409 @Override
410 public void deserializeStateData(final InputStream stream) throws IOException {
411 super.deserializeStateData(stream);
412
413 BackupProtos.BackupProcContext proto =BackupProtos.BackupProcContext.parseDelimitedFrom(stream);
414 backupContext = BackupInfo.fromProto(proto.getCtx());
415 backupId = backupContext.getBackupId();
416 targetRootDir = backupContext.getTargetRootDir();
417 tableList = backupContext.getTableNames();
418 List<ServerTimestamp> svrTimestamps = proto.getServerTimestampList();
419 if (svrTimestamps != null && !svrTimestamps.isEmpty()) {
420 newTimestamps = new HashMap<>();
421 for (ServerTimestamp ts : svrTimestamps) {
422 newTimestamps.put(ts.getServer(), ts.getTimestamp());
423 }
424 }
425 }
426
427 @Override
428 public TableName getTableName() {
429 return TableName.BACKUP_TABLE_NAME;
430 }
431
432 @Override
433 public TableOperationType getTableOperationType() {
434 return TableOperationType.BACKUP;
435 }
436
437 @Override
438 protected boolean acquireLock(final MasterProcedureEnv env) {
439 if (!env.isInitialized() && !getTableName().isSystemTable()) {
440 return false;
441 }
442 return env.getProcedureQueue().tryAcquireTableWrite(getTableName(), "incremental backup");
443
444
445
446
447
448
449 }
450
451 @Override
452 protected void releaseLock(final MasterProcedureEnv env) {
453 env.getProcedureQueue().releaseTableWrite(getTableName());
454 }
455
456 private void incrementalCopyHFiles(BackupInfo backupContext) throws Exception {
457
458 LOG.info("Incremental copy HFiles is starting.");
459
460 backupContext.setPhase(BackupPhase.INCREMENTAL_COPY);
461
462 List<String> incrBackupFileList = new ArrayList<String>();
463
464 incrBackupFileList.add(getBulkOutputDir().toString());
465
466 String[] strArr = incrBackupFileList.toArray(new String[incrBackupFileList.size() + 1]);
467 strArr[strArr.length - 1] = backupContext.getTargetRootDir();
468
469 BackupCopyService copyService = BackupRestoreServerFactory.getBackupCopyService(conf);
470
471 int res = copyService.copy(backupContext, backupManager, conf, BackupCopyService.Type.INCREMENTAL, strArr);
472
473 if (res != 0) {
474 LOG.error("Copy incremental HFile files failed with return code: " + res + ".");
475 throw new IOException("Failed of Hadoop Distributed Copy from "
476 + StringUtils.join(",", incrBackupFileList) + " to " + backupContext.getHLogTargetDir());
477 }
478 deleteBulkLoadDirectory();
479 LOG.info("Incremental copy HFiles from " + StringUtils.join(",", incrBackupFileList) + " to "
480 + backupContext.getTargetRootDir() + " finished.");
481 }
482
483 private void deleteBulkLoadDirectory() throws IOException {
484
485 Path path = getBulkOutputDir();
486 FileSystem fs = FileSystem.get(conf);
487 boolean result = fs.delete(path, true);
488 if (!result) {
489 LOG.warn ("Could not delete " + path);
490 }
491
492 }
493 private void convertWALsAndCopy(BackupInfo backupContext, Connection conn) throws IOException {
494
495 List<String> incrBackupFileList = backupContext.getIncrBackupFileList();
496
497 incrBackupFileList = filterMissingFiles(incrBackupFileList);
498
499 Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
500 for(TableName table : tableSet) {
501
502 if(tableExists(table, conn)) {
503 convertWALToHFiles(incrBackupFileList, table);
504 } else {
505 LOG.warn("Table "+ table+" does not exists. Skipping in WAL converter");
506 }
507 }
508
509 }
510
511 private boolean tableExists(TableName table, Connection conn) throws IOException {
512 try (Admin admin = conn.getAdmin();) {
513 return admin.tableExists(table);
514 }
515 }
516
517 private void convertWALToHFiles(List<String> dirPaths, TableName tableName) throws IOException {
518
519 String bulkOutputConfKey;
520 Tool player = new WALPlayer();
521
522 bulkOutputConfKey = WALPlayer.BULK_OUTPUT_CONF_KEY;
523
524
525
526
527 String dirs = StringUtils.join(";", dirPaths);
528
529 Path bulkOutputPath = getBulkOutputDirForTable(tableName);
530 conf.set(bulkOutputConfKey, bulkOutputPath.toString());
531 conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
532 String[] playerArgs = { dirs, tableName.getNameAsString() };
533
534 try {
535
536 player.setConf(conf);
537 player.run(playerArgs);
538
539 conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY);
540 } catch (Exception e) {
541 throw new IOException("Can not convert from directory " + dirs
542 + " (check Hadoop and HBase logs) ", e);
543 }
544 }
545
546 private Path getBulkOutputDirForTable(TableName table) {
547 Path tablePath = getBulkOutputDir();
548 tablePath = new Path(tablePath, table.getNamespaceAsString());
549 tablePath = new Path(tablePath, table.getQualifierAsString());
550 return new Path(tablePath, "data");
551 }
552
553 private Path getBulkOutputDir() {
554 String backupId = backupContext.getBackupId();
555 Path path = new Path(backupContext.getTargetRootDir());
556 path = new Path(path, ".tmp");
557 path = new Path(path, backupId);
558 return path;
559 }
560
561
562 }