1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master.cleaner;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.Comparator;
24 import java.util.HashMap;
25 import java.util.LinkedList;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.concurrent.atomic.AtomicBoolean;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.fs.FileStatus;
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.hbase.RemoteExceptionHandler;
37 import org.apache.hadoop.hbase.ScheduledChore;
38 import org.apache.hadoop.hbase.Stoppable;
39 import org.apache.hadoop.hbase.util.FSUtils;
40
41 import com.google.common.annotations.VisibleForTesting;
42 import com.google.common.collect.ImmutableSet;
43 import com.google.common.collect.Iterables;
44 import com.google.common.collect.Lists;
45
46
47
48
49
50 public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore {
51
52 private static final Log LOG = LogFactory.getLog(CleanerChore.class.getName());
53
54 protected final FileSystem fs;
55 private final Path oldFileDir;
56 private final Configuration conf;
57 protected List<T> cleanersChain;
58
59
60
61
62
63
64
65
66
67
68 public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
69 FileSystem fs, Path oldFileDir, String confKey) {
70 super(name, s, sleepPeriod);
71 this.fs = fs;
72 this.oldFileDir = oldFileDir;
73 this.conf = conf;
74
75 initCleanerChain(confKey);
76 }
77
78
79
80
81
82
83
84 protected abstract boolean validate(Path file);
85
86
87
88
89
90 private void initCleanerChain(String confKey) {
91 this.cleanersChain = new LinkedList<T>();
92 String[] logCleaners = conf.getStrings(confKey);
93 if (logCleaners != null) {
94 for (String className : logCleaners) {
95 T logCleaner = newFileCleaner(className, conf);
96 if (logCleaner != null) {
97 LOG.debug("initialize cleaner=" + className);
98 this.cleanersChain.add(logCleaner);
99 }
100 }
101 }
102 }
103
104
105
106
107
108
109
110
111 private T newFileCleaner(String className, Configuration conf) {
112 try {
113 Class<? extends FileCleanerDelegate> c = Class.forName(className).asSubclass(
114 FileCleanerDelegate.class);
115 @SuppressWarnings("unchecked")
116 T cleaner = (T) c.newInstance();
117 cleaner.setConf(conf);
118 return cleaner;
119 } catch (Exception e) {
120 LOG.warn("Can NOT create CleanerDelegate: " + className, e);
121
122 return null;
123 }
124 }
125
126 @Override
127 protected void chore() {
128 try {
129 FileStatus[] files = FSUtils.listStatus(this.fs, this.oldFileDir);
130 checkAndDeleteEntries(files);
131 } catch (IOException e) {
132 e = RemoteExceptionHandler.checkIOException(e);
133 LOG.warn("Error while cleaning the logs", e);
134 }
135 }
136
137
138
139
140
141 private void sortByConsumedSpace(List<FileStatus> dirs) {
142 if (dirs == null || dirs.size() < 2) {
143
144 return;
145 }
146 Collections.sort(dirs, new Comparator<FileStatus>() {
147 HashMap<FileStatus, Long> directorySpaces = new HashMap<FileStatus, Long>();
148
149 @Override
150 public int compare(FileStatus f1, FileStatus f2) {
151 long f1ConsumedSpace = getSpace(f1);
152 long f2ConsumedSpace = getSpace(f2);
153 return (f1ConsumedSpace > f2ConsumedSpace) ? -1
154 : (f1ConsumedSpace < f2ConsumedSpace ? 1 : 0);
155 }
156
157 private long getSpace(FileStatus f) {
158 Long cached = directorySpaces.get(f);
159 if (cached != null) {
160 return cached;
161 }
162 try {
163 long space =
164 f.isDirectory() ? fs.getContentSummary(f.getPath()).getSpaceConsumed() : f.getLen();
165 directorySpaces.put(f, space);
166 return space;
167 } catch (IOException e) {
168 if (LOG.isTraceEnabled()) {
169 LOG.trace("failed to get space consumed by path " + f.getPath(), e);
170 }
171 return -1;
172 }
173 }
174 });
175 }
176
177
178
179
180
181
182
183
184
185 private boolean checkAndDeleteEntries(FileStatus[] entries) {
186 if (entries == null) {
187 return true;
188 }
189 boolean allEntriesDeleted = true;
190 List<FileStatus> files = Lists.newArrayListWithCapacity(entries.length);
191 List<FileStatus> dirs = new ArrayList<>();
192 for (FileStatus child : entries) {
193 if (child.isDirectory()) {
194 dirs.add(child);
195 } else {
196
197 files.add(child);
198 }
199 }
200 if (dirs.size() > 0) {
201 sortByConsumedSpace(dirs);
202 LOG.debug("Prepared to delete files in directories: " + dirs);
203 for (FileStatus child : dirs) {
204 Path path = child.getPath();
205
206 if (!checkAndDeleteDirectory(path)) {
207 allEntriesDeleted = false;
208 }
209 }
210 }
211 if (!checkAndDeleteFiles(files)) {
212 allEntriesDeleted = false;
213 }
214 return allEntriesDeleted;
215 }
216
217
218
219
220
221
222
223
224
225
226
227 @VisibleForTesting boolean checkAndDeleteDirectory(Path dir) {
228 if (LOG.isTraceEnabled()) {
229 LOG.trace("Checking directory: " + dir);
230 }
231
232 try {
233 FileStatus[] children = FSUtils.listStatus(fs, dir);
234 boolean allChildrenDeleted = checkAndDeleteEntries(children);
235
236
237 if (!allChildrenDeleted) return false;
238 } catch (IOException e) {
239 e = RemoteExceptionHandler.checkIOException(e);
240 LOG.warn("Error while listing directory: " + dir, e);
241
242 return false;
243 }
244
245
246
247
248 try {
249 return fs.delete(dir, false);
250 } catch (IOException e) {
251 if (LOG.isTraceEnabled()) {
252 LOG.trace("Couldn't delete directory: " + dir, e);
253 }
254
255 return false;
256 }
257 }
258
259
260
261
262
263
264
265 private boolean checkAndDeleteFiles(List<FileStatus> files) {
266
267 List<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size());
268 List<FileStatus> invalidFiles = Lists.newArrayList();
269 for (FileStatus file : files) {
270 if (validate(file.getPath())) {
271 validFiles.add(file);
272 } else {
273 LOG.warn("Found a wrongly formatted file: " + file.getPath() + " - will delete it.");
274 invalidFiles.add(file);
275 }
276 }
277
278 Iterable<FileStatus> deletableValidFiles = validFiles;
279
280 for (T cleaner : cleanersChain) {
281 if (cleaner.isStopped() || getStopper().isStopped()) {
282 LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any more files in:"
283 + this.oldFileDir);
284 return false;
285 }
286
287 Iterable<FileStatus> filteredFiles = cleaner.getDeletableFiles(deletableValidFiles);
288
289
290 if (LOG.isTraceEnabled()) {
291 ImmutableSet<FileStatus> filteredFileSet = ImmutableSet.copyOf(filteredFiles);
292 for (FileStatus file : deletableValidFiles) {
293 if (!filteredFileSet.contains(file)) {
294 LOG.trace(file.getPath() + " is not deletable according to:" + cleaner);
295 }
296 }
297 }
298
299 deletableValidFiles = filteredFiles;
300 }
301
302 Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles);
303 return deleteFiles(filesToDelete) == files.size();
304 }
305
306
307
308
309
310
311 protected int deleteFiles(Iterable<FileStatus> filesToDelete) {
312 int deletedFileCount = 0;
313 for (FileStatus file : filesToDelete) {
314 Path filePath = file.getPath();
315 if (LOG.isDebugEnabled()) {
316 LOG.debug("Removing: " + filePath + " from archive");
317 }
318 try {
319 boolean success = this.fs.delete(filePath, false);
320 if (success) {
321 deletedFileCount++;
322 } else {
323 LOG.warn("Attempted to delete:" + filePath
324 + ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
325 }
326 } catch (IOException e) {
327 e = RemoteExceptionHandler.checkIOException(e);
328 LOG.warn("Error while deleting: " + filePath, e);
329 }
330 }
331 return deletedFileCount;
332 }
333
334 @Override
335 public synchronized void cleanup() {
336 for (T lc : this.cleanersChain) {
337 try {
338 lc.stop("Exiting");
339 } catch (Throwable t) {
340 LOG.warn("Stopping", t);
341 }
342 }
343 }
344 }