View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.cleaner;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.Collections;
23  import java.util.Comparator;
24  import java.util.HashMap;
25  import java.util.LinkedList;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.concurrent.atomic.AtomicBoolean;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.fs.FileStatus;
34  import org.apache.hadoop.fs.FileSystem;
35  import org.apache.hadoop.fs.Path;
36  import org.apache.hadoop.hbase.RemoteExceptionHandler;
37  import org.apache.hadoop.hbase.ScheduledChore;
38  import org.apache.hadoop.hbase.Stoppable;
39  import org.apache.hadoop.hbase.util.FSUtils;
40  
41  import com.google.common.annotations.VisibleForTesting;
42  import com.google.common.collect.ImmutableSet;
43  import com.google.common.collect.Iterables;
44  import com.google.common.collect.Lists;
45  
46  /**
47   * Abstract Cleaner that uses a chain of delegates to clean a directory of files
48   * @param <T> Cleaner delegate class that is dynamically loaded from configuration
49   */
50  public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore {
51  
52    private static final Log LOG = LogFactory.getLog(CleanerChore.class.getName());
53  
54    protected final FileSystem fs;
55    private final Path oldFileDir;
56    private final Configuration conf;
57    protected List<T> cleanersChain;
58  
59    /**
60     * @param name name of the chore being run
61     * @param sleepPeriod the period of time to sleep between each run
62     * @param s the stopper
63     * @param conf configuration to use
64     * @param fs handle to the FS
65     * @param oldFileDir the path to the archived files
66     * @param confKey configuration key for the classes to instantiate
67     */
68    public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
69        FileSystem fs, Path oldFileDir, String confKey) {
70      super(name, s, sleepPeriod);
71      this.fs = fs;
72      this.oldFileDir = oldFileDir;
73      this.conf = conf;
74  
75      initCleanerChain(confKey);
76    }
77  
78    /**
79     * Validate the file to see if it even belongs in the directory. If it is valid, then the file
80     * will go through the cleaner delegates, but otherwise the file is just deleted.
81     * @param file full {@link Path} of the file to be checked
82     * @return <tt>true</tt> if the file is valid, <tt>false</tt> otherwise
83     */
84    protected abstract boolean validate(Path file);
85  
86    /**
87     * Instantiate and initialize all the file cleaners set in the configuration
88     * @param confKey key to get the file cleaner classes from the configuration
89     */
90    private void initCleanerChain(String confKey) {
91      this.cleanersChain = new LinkedList<T>();
92      String[] logCleaners = conf.getStrings(confKey);
93      if (logCleaners != null) {
94        for (String className : logCleaners) {
95          T logCleaner = newFileCleaner(className, conf);
96          if (logCleaner != null) {
97            LOG.debug("initialize cleaner=" + className);
98            this.cleanersChain.add(logCleaner);
99          }
100       }
101     }
102   }
103 
104   /**
105    * A utility method to create new instances of LogCleanerDelegate based on the class name of the
106    * LogCleanerDelegate.
107    * @param className fully qualified class name of the LogCleanerDelegate
108    * @param conf
109    * @return the new instance
110    */
111   private T newFileCleaner(String className, Configuration conf) {
112     try {
113       Class<? extends FileCleanerDelegate> c = Class.forName(className).asSubclass(
114         FileCleanerDelegate.class);
115       @SuppressWarnings("unchecked")
116       T cleaner = (T) c.newInstance();
117       cleaner.setConf(conf);
118       return cleaner;
119     } catch (Exception e) {
120       LOG.warn("Can NOT create CleanerDelegate: " + className, e);
121       // skipping if can't instantiate
122       return null;
123     }
124   }
125 
126   @Override
127   protected void chore() {
128     try {
129       FileStatus[] files = FSUtils.listStatus(this.fs, this.oldFileDir);
130       checkAndDeleteEntries(files);
131     } catch (IOException e) {
132       e = RemoteExceptionHandler.checkIOException(e);
133       LOG.warn("Error while cleaning the logs", e);
134     }
135   }
136 
137   /**
138    * Sort the given list in (descending) order of the space each element takes
139    * @param dirs the list to sort, element in it should be directory (not file)
140    */
141   private void sortByConsumedSpace(List<FileStatus> dirs) {
142     if (dirs == null || dirs.size() < 2) {
143       // no need to sort for empty or single directory
144       return;
145     }
146     Collections.sort(dirs, new Comparator<FileStatus>() {
147       HashMap<FileStatus, Long> directorySpaces = new HashMap<FileStatus, Long>();
148 
149       @Override
150       public int compare(FileStatus f1, FileStatus f2) {
151         long f1ConsumedSpace = getSpace(f1);
152         long f2ConsumedSpace = getSpace(f2);
153         return (f1ConsumedSpace > f2ConsumedSpace) ? -1
154             : (f1ConsumedSpace < f2ConsumedSpace ? 1 : 0);
155       }
156 
157       private long getSpace(FileStatus f) {
158         Long cached = directorySpaces.get(f);
159         if (cached != null) {
160           return cached;
161         }
162         try {
163           long space =
164               f.isDirectory() ? fs.getContentSummary(f.getPath()).getSpaceConsumed() : f.getLen();
165           directorySpaces.put(f, space);
166           return space;
167         } catch (IOException e) {
168           if (LOG.isTraceEnabled()) {
169             LOG.trace("failed to get space consumed by path " + f.getPath(), e);
170           }
171           return -1;
172         }
173       }
174     });
175   }
176 
177   /**
178    * Loop over the given directory entries, and check whether they can be deleted.
179    * If an entry is itself a directory it will be recursively checked and deleted itself iff
180    * all subentries are deleted (and no new subentries are added in the mean time)
181    *
182    * @param entries directory entries to check
183    * @return true if all entries were successfully deleted
184    */
185   private boolean checkAndDeleteEntries(FileStatus[] entries) {
186     if (entries == null) {
187       return true;
188     }
189     boolean allEntriesDeleted = true;
190     List<FileStatus> files = Lists.newArrayListWithCapacity(entries.length);
191     List<FileStatus> dirs = new ArrayList<>();
192     for (FileStatus child : entries) {
193       if (child.isDirectory()) {
194         dirs.add(child);
195       } else {
196         // collect all files to attempt to delete in one batch
197         files.add(child);
198       }
199     }
200     if (dirs.size() > 0) {
201       sortByConsumedSpace(dirs);
202       LOG.debug("Prepared to delete files in directories: " + dirs);
203       for (FileStatus child : dirs) {
204         Path path = child.getPath();
205         // for each subdirectory delete it and all entries if possible
206         if (!checkAndDeleteDirectory(path)) {
207           allEntriesDeleted = false;
208         }
209       }
210     }
211     if (!checkAndDeleteFiles(files)) {
212       allEntriesDeleted = false;
213     }
214     return allEntriesDeleted;
215   }
216   
217   /**
218    * Attempt to delete a directory and all files under that directory. Each child file is passed
219    * through the delegates to see if it can be deleted. If the directory has no children when the
220    * cleaners have finished it is deleted.
221    * <p>
222    * If new children files are added between checks of the directory, the directory will <b>not</b>
223    * be deleted.
224    * @param dir directory to check
225    * @return <tt>true</tt> if the directory was deleted, <tt>false</tt> otherwise.
226    */
227   @VisibleForTesting boolean checkAndDeleteDirectory(Path dir) {
228     if (LOG.isTraceEnabled()) {
229       LOG.trace("Checking directory: " + dir);
230     }
231 
232     try {
233       FileStatus[] children = FSUtils.listStatus(fs, dir);
234       boolean allChildrenDeleted = checkAndDeleteEntries(children);
235   
236       // if the directory still has children, we can't delete it, so we are done
237       if (!allChildrenDeleted) return false;
238     } catch (IOException e) {
239       e = RemoteExceptionHandler.checkIOException(e);
240       LOG.warn("Error while listing directory: " + dir, e);
241       // couldn't list directory, so don't try to delete, and don't return success
242       return false;
243     }
244 
245     // otherwise, all the children (that we know about) have been deleted, so we should try to
246     // delete this directory. However, don't do so recursively so we don't delete files that have
247     // been added since we last checked.
248     try {
249       return fs.delete(dir, false);
250     } catch (IOException e) {
251       if (LOG.isTraceEnabled()) {
252         LOG.trace("Couldn't delete directory: " + dir, e);
253       }
254       // couldn't delete w/o exception, so we can't return success.
255       return false;
256     }
257   }
258 
259   /**
260    * Run the given files through each of the cleaners to see if it should be deleted, deleting it if
261    * necessary.
262    * @param files List of FileStatus for the files to check (and possibly delete)
263    * @return true iff successfully deleted all files
264    */
265   private boolean checkAndDeleteFiles(List<FileStatus> files) {
266     // first check to see if the path is valid
267     List<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size());
268     List<FileStatus> invalidFiles = Lists.newArrayList();
269     for (FileStatus file : files) {
270       if (validate(file.getPath())) {
271         validFiles.add(file);
272       } else {
273         LOG.warn("Found a wrongly formatted file: " + file.getPath() + " - will delete it.");
274         invalidFiles.add(file);
275       }
276     }
277 
278     Iterable<FileStatus> deletableValidFiles = validFiles;
279     // check each of the cleaners for the valid files
280     for (T cleaner : cleanersChain) {
281       if (cleaner.isStopped() || getStopper().isStopped()) {
282         LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any more files in:"
283             + this.oldFileDir);
284         return false;
285       }
286 
287       Iterable<FileStatus> filteredFiles = cleaner.getDeletableFiles(deletableValidFiles);
288       
289       // trace which cleaner is holding on to each file
290       if (LOG.isTraceEnabled()) {
291         ImmutableSet<FileStatus> filteredFileSet = ImmutableSet.copyOf(filteredFiles);
292         for (FileStatus file : deletableValidFiles) {
293           if (!filteredFileSet.contains(file)) {
294             LOG.trace(file.getPath() + " is not deletable according to:" + cleaner);
295           }
296         }
297       }
298       
299       deletableValidFiles = filteredFiles;
300     }
301     
302     Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles);
303     return deleteFiles(filesToDelete) == files.size();
304   }
305 
306   /**
307    * Delete the given files
308    * @param filesToDelete files to delete
309    * @return number of deleted files
310    */
311   protected int deleteFiles(Iterable<FileStatus> filesToDelete) {
312     int deletedFileCount = 0;
313     for (FileStatus file : filesToDelete) {
314       Path filePath = file.getPath();
315       if (LOG.isDebugEnabled()) {
316         LOG.debug("Removing: " + filePath + " from archive");
317       }
318       try {
319         boolean success = this.fs.delete(filePath, false);
320         if (success) {
321           deletedFileCount++;
322         } else {
323           LOG.warn("Attempted to delete:" + filePath
324               + ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
325         }
326       } catch (IOException e) {
327         e = RemoteExceptionHandler.checkIOException(e);
328         LOG.warn("Error while deleting: " + filePath, e);
329       }
330     }
331     return deletedFileCount;
332   }
333 
334   @Override
335   public synchronized void cleanup() {
336     for (T lc : this.cleanersChain) {
337       try {
338         lc.stop("Exiting");
339       } catch (Throwable t) {
340         LOG.warn("Stopping", t);
341       }
342     }
343   }
344 }