1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.backup.util;
20
21 import java.io.FileNotFoundException;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.TreeMap;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.FileStatus;
31 import org.apache.hadoop.fs.FileSystem;
32 import org.apache.hadoop.fs.FileUtil;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.HTableDescriptor;
36 import org.apache.hadoop.hbase.TableName;
37 import org.apache.hadoop.hbase.backup.BackupRestoreServerFactory;
38 import org.apache.hadoop.hbase.backup.HBackupFileSystem;
39 import org.apache.hadoop.hbase.backup.IncrementalRestoreService;
40 import org.apache.hadoop.hbase.classification.InterfaceAudience;
41 import org.apache.hadoop.hbase.classification.InterfaceStability;
42 import org.apache.hadoop.hbase.client.Admin;
43 import org.apache.hadoop.hbase.client.Connection;
44 import org.apache.hadoop.hbase.client.ConnectionFactory;
45 import org.apache.hadoop.hbase.client.HBaseAdmin;
46 import org.apache.hadoop.hbase.io.HFileLink;
47 import org.apache.hadoop.hbase.io.hfile.HFile;
48 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
49 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
50 import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
51 import org.apache.hadoop.hbase.regionserver.HStore;
52 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
53 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
54 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
55 import org.apache.hadoop.hbase.util.Bytes;
56 import org.apache.hadoop.hbase.util.FSHDFSUtils;
57
58
59
60
61 @InterfaceAudience.Private
62 @InterfaceStability.Evolving
63 public class RestoreServerUtil {
64
65 public static final Log LOG = LogFactory.getLog(RestoreServerUtil.class);
66
67 private final String[] ignoreDirs = { "recovered.edits" };
68
69 protected Configuration conf = null;
70
71 protected Path backupRootPath;
72
73 protected String backupId;
74
75 protected FileSystem fs;
76 private final String RESTORE_TMP_PATH = "/tmp";
77 private final Path restoreTmpPath;
78
79
80 private final HashMap<TableName, Path> snapshotMap = new HashMap<>();
81
82 public RestoreServerUtil(Configuration conf, final Path backupRootPath, final String backupId)
83 throws IOException {
84 this.conf = conf;
85 this.backupRootPath = backupRootPath;
86 this.backupId = backupId;
87 this.fs = backupRootPath.getFileSystem(conf);
88 this.restoreTmpPath = new Path(conf.get("hbase.fs.tmp.dir") != null?
89 conf.get("hbase.fs.tmp.dir"): RESTORE_TMP_PATH,
90 "restore");
91 }
92
93
94
95
96
97
98
99
100 Path getTableArchivePath(TableName tableName)
101 throws IOException {
102 Path baseDir = new Path(HBackupFileSystem.getTableBackupPath(tableName, backupRootPath,
103 backupId), HConstants.HFILE_ARCHIVE_DIRECTORY);
104 Path dataDir = new Path(baseDir, HConstants.BASE_NAMESPACE_DIR);
105 Path archivePath = new Path(dataDir, tableName.getNamespaceAsString());
106 Path tableArchivePath =
107 new Path(archivePath, tableName.getQualifierAsString());
108 if (!fs.exists(tableArchivePath) || !fs.getFileStatus(tableArchivePath).isDirectory()) {
109 LOG.debug("Folder tableArchivePath: " + tableArchivePath.toString() + " does not exists");
110 tableArchivePath = null;
111 }
112 return tableArchivePath;
113 }
114
115
116
117
118
119
120
121
122 ArrayList<Path> getRegionList(TableName tableName)
123 throws FileNotFoundException, IOException {
124 Path tableArchivePath = this.getTableArchivePath(tableName);
125 ArrayList<Path> regionDirList = new ArrayList<Path>();
126 FileStatus[] children = fs.listStatus(tableArchivePath);
127 for (FileStatus childStatus : children) {
128
129 Path child = childStatus.getPath();
130 regionDirList.add(child);
131 }
132 return regionDirList;
133 }
134
135
136
137
138
139
140
141
142
143
144 public void incrementalRestoreTable(Path[] logDirs,
145 TableName[] tableNames, TableName[] newTableNames) throws IOException {
146
147 if (tableNames.length != newTableNames.length) {
148 throw new IOException("Number of source tables and target tables does not match!");
149 }
150
151
152
153 try (Connection conn = ConnectionFactory.createConnection(conf);
154 Admin admin = conn.getAdmin()) {
155 for (TableName tableName : newTableNames) {
156 if (!admin.tableExists(tableName)) {
157 admin.close();
158 throw new IOException("HBase table " + tableName
159 + " does not exist. Create the table first, e.g. by restoring a full backup.");
160 }
161 }
162
163 Path[] tempDirs = new Path[logDirs.length];
164 int index = 0;
165 for(Path p : logDirs) {
166 tempDirs[index++] = checkLocalAndCopy(p);
167 }
168 IncrementalRestoreService restoreService =
169 BackupRestoreServerFactory.getIncrementalRestoreService(conf);
170
171 restoreService.run(tempDirs, tableNames, newTableNames);
172 cleanTmp();
173 }
174 }
175
176 private void cleanTmp() throws IOException {
177 FileSystem fs = FileSystem.get(conf);
178 boolean result = fs.delete(restoreTmpPath, true);
179 if (!result) {
180 LOG.warn("Could not delete " + restoreTmpPath);
181 }
182 }
183
184 public void fullRestoreTable(Path tableBackupPath, TableName tableName, TableName newTableName,
185 boolean converted, boolean truncateIfExists) throws IOException {
186 restoreTableAndCreate(tableName, newTableName, tableBackupPath, converted, truncateIfExists);
187 }
188
189
190
191
192
193
194
195
196
197 static Path getTableSnapshotPath(Path backupRootPath, TableName tableName,
198 String backupId) {
199 return new Path(HBackupFileSystem.getTableBackupPath(tableName, backupRootPath, backupId),
200 HConstants.SNAPSHOT_DIR_NAME);
201 }
202
203
204
205
206
207
208
209
210
211
212
213 Path getTableInfoPath(TableName tableName)
214 throws FileNotFoundException, IOException {
215 Path tableSnapShotPath = getTableSnapshotPath(backupRootPath, tableName, backupId);
216 Path tableInfoPath = null;
217
218
219 FileStatus[] snapshots = fs.listStatus(tableSnapShotPath);
220 for (FileStatus snapshot : snapshots) {
221 tableInfoPath = snapshot.getPath();
222
223 if (tableInfoPath.getName().endsWith("data.manifest")) {
224 break;
225 }
226 }
227 return tableInfoPath;
228 }
229
230
231
232
233
234 HTableDescriptor getTableDesc(TableName tableName)
235 throws FileNotFoundException, IOException {
236 Path tableInfoPath = this.getTableInfoPath(tableName);
237 SnapshotDescription desc = SnapshotDescriptionUtils.readSnapshotInfo(fs, tableInfoPath);
238 SnapshotManifest manifest = SnapshotManifest.open(conf, fs, tableInfoPath, desc);
239 HTableDescriptor tableDescriptor = manifest.getTableDescriptor();
240 if (!tableDescriptor.getTableName().equals(tableName)) {
241 LOG.error("couldn't find Table Desc for table: " + tableName + " under tableInfoPath: "
242 + tableInfoPath.toString());
243 LOG.error("tableDescriptor.getNameAsString() = " + tableDescriptor.getNameAsString());
244 throw new FileNotFoundException("couldn't find Table Desc for table: " + tableName +
245 " under tableInfoPath: " + tableInfoPath.toString());
246 }
247 return tableDescriptor;
248 }
249
250 Path checkLocalAndBackup(Path tableArchivePath) throws IOException {
251 return checkLocalAndBackup(tableArchivePath, true);
252 }
253
254
255
256
257
258
259
260
261 Path checkLocalAndCopy(Path tableArchivePath) throws IOException {
262
263 boolean isCopyNeeded = false;
264 Path p = tableArchivePath.getParent();
265 String tableName = p.getName();
266 String ns = p.getParent().getName();
267 Path tmpPath = new Path(restoreTmpPath, ns);
268 tmpPath = new Path(tmpPath, tableName);
269 FileSystem srcFs = tableArchivePath.getFileSystem(conf);
270 FileSystem desFs = FileSystem.get(conf);
271 if (tableArchivePath.getName().startsWith("/")) {
272 isCopyNeeded = true;
273 } else {
274
275
276 if (FSHDFSUtils.isSameHdfs(conf, srcFs, desFs)) {
277 LOG.debug("cluster hold the backup image: " + srcFs.getUri() + "; local cluster node: "
278 + desFs.getUri());
279 isCopyNeeded = true;
280 }
281 }
282 if (isCopyNeeded) {
283 LOG.debug("File " + tableArchivePath + " on local cluster, back it up before restore");
284
285 boolean result = FileUtil.copy(srcFs, tableArchivePath, desFs, tmpPath, false, conf);
286 if (!result) {
287 throw new IOException("Copy failed: src="+tableArchivePath+" dst="+tmpPath);
288 }
289 LOG.debug("Copied to temporary path on local cluster: " + tmpPath);
290 tableArchivePath = tmpPath;
291 }
292 return tableArchivePath;
293 }
294
295 Path checkLocalAndBackup(Path tableArchivePath, boolean clean) throws IOException {
296
297 boolean isCopyNeeded = false;
298
299 FileSystem srcFs = tableArchivePath.getFileSystem(conf);
300 FileSystem desFs = FileSystem.get(conf);
301 if (tableArchivePath.getName().startsWith("/")) {
302 isCopyNeeded = true;
303 } else {
304
305
306 if (srcFs.getUri().equals(desFs.getUri())) {
307 LOG.debug("cluster hold the backup image: " + srcFs.getUri() + "; local cluster node: "
308 + desFs.getUri());
309 isCopyNeeded = true;
310 }
311 }
312 if (isCopyNeeded) {
313 LOG.debug("File " + tableArchivePath + " on local cluster, back it up before restore");
314 if (desFs.exists(restoreTmpPath) && clean) {
315 try {
316 desFs.delete(restoreTmpPath, true);
317 } catch (IOException e) {
318 LOG.debug("Failed to delete path: " + restoreTmpPath
319 + ", need to check whether restore target DFS cluster is healthy");
320 }
321 }
322 FileUtil.copy(srcFs, tableArchivePath, desFs, restoreTmpPath, false, conf);
323 LOG.debug("Copied to temporary path on local cluster: " + restoreTmpPath);
324 tableArchivePath = restoreTmpPath;
325 }
326 return tableArchivePath;
327 }
328
329 private void restoreTableAndCreate(TableName tableName, TableName newTableName,
330 Path tableBackupPath, boolean converted, boolean truncateIfExists) throws IOException {
331 if (newTableName == null || newTableName.equals("")) {
332 newTableName = tableName;
333 }
334
335 FileSystem fileSys = tableBackupPath.getFileSystem(this.conf);
336
337
338 HTableDescriptor tableDescriptor = null;
339
340 Path tableSnapshotPath = getTableSnapshotPath(backupRootPath, tableName, backupId);
341
342 if (fileSys.exists(tableSnapshotPath)) {
343
344
345 if (snapshotMap.get(tableName) != null) {
346 SnapshotDescription desc =
347 SnapshotDescriptionUtils.readSnapshotInfo(fileSys, tableSnapshotPath);
348 SnapshotManifest manifest = SnapshotManifest.open(conf, fileSys, tableSnapshotPath, desc);
349 tableDescriptor = manifest.getTableDescriptor();
350 } else {
351 tableDescriptor = getTableDesc(tableName);
352 snapshotMap.put(tableName, getTableInfoPath(tableName));
353 }
354 if (tableDescriptor == null) {
355 LOG.debug("Found no table descriptor in the snapshot dir, previous schema would be lost");
356 }
357 } else if (converted) {
358
359 LOG.error("convert will be supported in a future jira");
360 }
361
362 Path tableArchivePath = getTableArchivePath(tableName);
363 if (tableArchivePath == null) {
364 if (tableDescriptor != null) {
365
366 if(LOG.isDebugEnabled()) {
367 LOG.debug("find table descriptor but no archive dir for table " + tableName
368 + ", will only create table");
369 }
370 tableDescriptor.setName(newTableName);
371 checkAndCreateTable(tableBackupPath, tableName, newTableName, null,
372 tableDescriptor, truncateIfExists);
373 return;
374 } else {
375 throw new IllegalStateException("Cannot restore hbase table because directory '"
376 + " tableArchivePath is null.");
377 }
378 }
379
380 if (tableDescriptor == null) {
381 tableDescriptor = new HTableDescriptor(newTableName);
382 } else {
383 tableDescriptor.setName(newTableName);
384 }
385
386 if (!converted) {
387
388
389 try {
390 ArrayList<Path> regionPathList = getRegionList(tableName);
391
392
393
394 checkAndCreateTable(tableBackupPath, tableName, newTableName, regionPathList,
395 tableDescriptor, truncateIfExists);
396 if (tableArchivePath != null) {
397
398
399 Path tempTableArchivePath = checkLocalAndBackup(tableArchivePath);
400 if (tempTableArchivePath.equals(tableArchivePath)) {
401 if(LOG.isDebugEnabled()) {
402 LOG.debug("TableArchivePath for bulkload using existPath: " + tableArchivePath);
403 }
404 } else {
405 regionPathList = getRegionList(tempTableArchivePath);
406 if(LOG.isDebugEnabled()) {
407 LOG.debug("TableArchivePath for bulkload using tempPath: " + tempTableArchivePath);
408 }
409 }
410 LoadIncrementalHFiles loader = createLoader(tempTableArchivePath, false);
411 for (Path regionPath : regionPathList) {
412 String regionName = regionPath.toString();
413 if(LOG.isDebugEnabled()) {
414 LOG.debug("Restoring HFiles from directory " + regionName);
415 }
416 String[] args = { regionName, newTableName.getNameAsString()};
417 loader.run(args);
418 }
419 }
420
421 } catch (Exception e) {
422 throw new IllegalStateException("Cannot restore hbase table", e);
423 }
424 } else {
425 LOG.debug("convert will be supported in a future jira");
426 }
427 }
428
429
430
431
432
433
434
435
436 ArrayList<Path> getRegionList(Path tableArchivePath) throws FileNotFoundException,
437 IOException {
438 ArrayList<Path> regionDirList = new ArrayList<Path>();
439 FileStatus[] children = fs.listStatus(tableArchivePath);
440 for (FileStatus childStatus : children) {
441
442 Path child = childStatus.getPath();
443 regionDirList.add(child);
444 }
445 return regionDirList;
446 }
447
448
449
450
451
452
453
454 int getNumberOfFilesInDir(Path regionPath) throws IOException {
455 int result = 0;
456
457 if (!fs.exists(regionPath) || !fs.getFileStatus(regionPath).isDirectory()) {
458 throw new IllegalStateException("Cannot restore hbase table because directory '"
459 + regionPath.toString() + "' is not a directory.");
460 }
461
462 FileStatus[] tableDirContent = fs.listStatus(regionPath);
463 for (FileStatus subDirStatus : tableDirContent) {
464 FileStatus[] colFamilies = fs.listStatus(subDirStatus.getPath());
465 for (FileStatus colFamilyStatus : colFamilies) {
466 FileStatus[] colFamilyContent = fs.listStatus(colFamilyStatus.getPath());
467 result += colFamilyContent.length;
468 }
469 }
470 return result;
471 }
472
473
474
475
476
477
478
479
480 int getMaxNumberOfFilesInSubDir(Path tableArchivePath) throws IOException {
481 int result = 1;
482 ArrayList<Path> regionPathList = getRegionList(tableArchivePath);
483
484
485 if (regionPathList == null || regionPathList.size() == 0) {
486 throw new IllegalStateException("Cannot restore hbase table because directory '"
487 + tableArchivePath + "' is not a directory.");
488 }
489
490 for (Path regionPath : regionPathList) {
491 result = Math.max(result, getNumberOfFilesInDir(regionPath));
492 }
493 return result;
494 }
495
496
497
498
499
500
501
502 private LoadIncrementalHFiles createLoader(Path tableArchivePath, boolean multipleTables)
503 throws IOException {
504
505
506
507
508 Integer milliSecInMin = 60000;
509 Integer previousMillis = this.conf.getInt("hbase.rpc.timeout", 0);
510 Integer numberOfFilesInDir =
511 multipleTables ? getMaxNumberOfFilesInSubDir(tableArchivePath) :
512 getNumberOfFilesInDir(tableArchivePath);
513 Integer calculatedMillis = numberOfFilesInDir * milliSecInMin;
514 Integer resultMillis = Math.max(calculatedMillis, previousMillis);
515 if (resultMillis > previousMillis) {
516 LOG.info("Setting configuration for restore with LoadIncrementalHFile: "
517 + "hbase.rpc.timeout to " + calculatedMillis / milliSecInMin
518 + " minutes, to handle the number of files in backup " + tableArchivePath);
519 this.conf.setInt("hbase.rpc.timeout", resultMillis);
520 }
521
522
523
524 this.conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE);
525 LoadIncrementalHFiles loader = null;
526 try {
527 loader = new LoadIncrementalHFiles(this.conf);
528 } catch (Exception e1) {
529 throw new IOException(e1);
530 }
531 return loader;
532 }
533
534
535
536
537
538
539 byte[][] generateBoundaryKeys(ArrayList<Path> regionDirList)
540 throws FileNotFoundException, IOException {
541 TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
542
543 byte[][] keys = null;
544
545 for (Path regionDir : regionDirList) {
546 LOG.debug("Parsing region dir: " + regionDir);
547 Path hfofDir = regionDir;
548
549 if (!fs.exists(hfofDir)) {
550 LOG.warn("HFileOutputFormat dir " + hfofDir + " not found");
551 }
552
553 FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);
554 if (familyDirStatuses == null) {
555 throw new IOException("No families found in " + hfofDir);
556 }
557
558 for (FileStatus stat : familyDirStatuses) {
559 if (!stat.isDirectory()) {
560 LOG.warn("Skipping non-directory " + stat.getPath());
561 continue;
562 }
563 boolean isIgnore = false;
564 String pathName = stat.getPath().getName();
565 for (String ignore : ignoreDirs) {
566 if (pathName.contains(ignore)) {
567 LOG.warn("Skipping non-family directory" + pathName);
568 isIgnore = true;
569 break;
570 }
571 }
572 if (isIgnore) {
573 continue;
574 }
575 Path familyDir = stat.getPath();
576 LOG.debug("Parsing family dir [" + familyDir.toString() + " in region [" + regionDir + "]");
577
578 if (familyDir.getName().startsWith("_") || familyDir.getName().startsWith(".")) {
579 continue;
580 }
581
582
583 Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
584 for (Path hfile : hfiles) {
585 if (hfile.getName().startsWith("_") || hfile.getName().startsWith(".")
586 || StoreFileInfo.isReference(hfile.getName())
587 || HFileLink.isHFileLink(hfile.getName())) {
588 continue;
589 }
590 HFile.Reader reader = HFile.createReader(fs, hfile, conf);
591 final byte[] first, last;
592 try {
593 reader.loadFileInfo();
594 first = reader.getFirstRowKey();
595 last = reader.getLastRowKey();
596 LOG.debug("Trying to figure out region boundaries hfile=" + hfile + " first="
597 + Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last));
598
599
600 Integer value = map.containsKey(first) ? (Integer) map.get(first) : 0;
601 map.put(first, value + 1);
602 value = map.containsKey(last) ? (Integer) map.get(last) : 0;
603 map.put(last, value - 1);
604 } finally {
605 reader.close();
606 }
607 }
608 }
609 }
610 keys = LoadIncrementalHFiles.inferBoundaries(map);
611 return keys;
612 }
613
614
615
616
617
618
619
620
621
622
623
624 private void checkAndCreateTable(Path tableBackupPath, TableName tableName,
625 TableName targetTableName, ArrayList<Path> regionDirList,
626 HTableDescriptor htd, boolean truncateIfExists)
627 throws IOException {
628 HBaseAdmin hbadmin = null;
629 Connection conn = null;
630 try {
631 conn = ConnectionFactory.createConnection(conf);
632 hbadmin = (HBaseAdmin) conn.getAdmin();
633 boolean createNew = false;
634 if (hbadmin.tableExists(targetTableName)) {
635 if(truncateIfExists) {
636 LOG.info("Truncating exising target table '" + targetTableName +
637 "', preserving region splits");
638 hbadmin.disableTable(targetTableName);
639 hbadmin.truncateTable(targetTableName, true);
640 } else{
641 LOG.info("Using exising target table '" + targetTableName + "'");
642 }
643 } else {
644 createNew = true;
645 }
646 if(createNew){
647 LOG.info("Creating target table '" + targetTableName + "'");
648
649 if (regionDirList == null || regionDirList.size() == 0) {
650 hbadmin.createTable(htd);
651 return;
652 }
653 byte[][] keys = generateBoundaryKeys(regionDirList);
654
655 hbadmin.createTable(htd, keys);
656 }
657 } catch (Exception e) {
658 throw new IOException(e);
659 } finally {
660 if (hbadmin != null) {
661 hbadmin.close();
662 }
663 if(conn != null){
664 conn.close();
665 }
666 }
667 }
668
669 }