1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.backup;
19
20 import java.io.IOException;
21 import java.util.Collections;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.FileStatus;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.hbase.Abortable;
33 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
34 import org.apache.hadoop.hbase.TableName;
35 import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
36 import org.apache.hadoop.hbase.classification.InterfaceAudience;
37 import org.apache.hadoop.hbase.client.Connection;
38 import org.apache.hadoop.hbase.client.ConnectionFactory;
39 import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
40 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
41
42 import com.google.common.annotations.VisibleForTesting;
43 import com.google.common.base.Predicate;
44 import com.google.common.collect.Iterables;
45
46
47
48
49
50 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
51 public class BackupHFileCleaner extends BaseHFileCleanerDelegate implements Abortable {
52 private static final Log LOG = LogFactory.getLog(BackupHFileCleaner.class);
53 private boolean stopped = false;
54 private boolean aborted;
55 private Configuration conf;
56 private Connection connection;
57 private long prevReadFromBackupTbl = 0,
58 secondPrevReadFromBackupTbl = 0;
59
60 private boolean checkForFullyBackedUpTables = true;
61 private List<TableName> fullyBackedUpTables = null;
62
63 private Set<String> getFilenameFromBulkLoad(Map<byte[], List<Path>>[] maps) {
64 Set<String> filenames = new HashSet<String>();
65 for (Map<byte[], List<Path>> map : maps) {
66 if (map == null) continue;
67 for (List<Path> paths : map.values()) {
68 for (Path p : paths) {
69 filenames.add(p.getName());
70 }
71 }
72 }
73 return filenames;
74 }
75
76 private Set<String> loadHFileRefs(List<TableName> tableList) throws IOException {
77 if (connection == null) {
78 connection = ConnectionFactory.createConnection(conf);
79 }
80 try (BackupSystemTable tbl = new BackupSystemTable(connection)) {
81 Map<byte[], List<Path>>[] res =
82 tbl.readBulkLoadedFiles(null, tableList);
83 secondPrevReadFromBackupTbl = prevReadFromBackupTbl;
84 prevReadFromBackupTbl = EnvironmentEdgeManager.currentTime();
85 return getFilenameFromBulkLoad(res);
86 }
87 }
88
89 @VisibleForTesting
90 void setCheckForFullyBackedUpTables(boolean b) {
91 checkForFullyBackedUpTables = b;
92 }
93 @Override
94 public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
95 if (conf == null) {
96 return files;
97 }
98
99
100 if (checkForFullyBackedUpTables) {
101 if (connection == null) return files;
102 try (BackupSystemTable tbl = new BackupSystemTable(connection)) {
103 fullyBackedUpTables = tbl.getTablesForBackupType(BackupType.FULL);
104 } catch (IOException ioe) {
105 LOG.error("Failed to get tables which have been fully backed up, skipping checking", ioe);
106 return Collections.emptyList();
107 }
108 Collections.sort(fullyBackedUpTables);
109 }
110 final Set<String> hfileRefs;
111 try {
112 hfileRefs = loadHFileRefs(fullyBackedUpTables);
113 } catch (IOException ioe) {
114 LOG.error("Failed to read hfile references, skipping checking deletable files", ioe);
115 return Collections.emptyList();
116 }
117 Iterable<FileStatus> deletables = Iterables.filter(files, new Predicate<FileStatus>() {
118 @Override
119 public boolean apply(FileStatus file) {
120
121 if (file.getModificationTime() > secondPrevReadFromBackupTbl) {
122 return false;
123 }
124 String hfile = file.getPath().getName();
125 boolean foundHFileRef = hfileRefs.contains(hfile);
126 return !foundHFileRef;
127 }
128 });
129 return deletables;
130 }
131
132 @Override
133 public boolean isFileDeletable(FileStatus fStat) {
134
135 return true;
136 }
137
138 @Override
139 public void setConf(Configuration config) {
140 this.conf = config;
141 this.connection = null;
142 try {
143 this.connection = ConnectionFactory.createConnection(conf);
144 } catch (IOException ioe) {
145 LOG.error("Couldn't establish connection", ioe);
146 }
147 }
148
149 @Override
150 public void stop(String why) {
151 if (this.stopped) {
152 return;
153 }
154 this.stopped = true;
155 }
156
157 @Override
158 public boolean isStopped() {
159 return this.stopped;
160 }
161
162 @Override
163 public void abort(String why, Throwable e) {
164 LOG.warn("Aborting ReplicationHFileCleaner because " + why, e);
165 this.aborted = true;
166 stop(why);
167 }
168
169 @Override
170 public boolean isAborted() {
171 return this.aborted;
172 }
173 }