View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.cleaner;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.Comparator;
23  import java.util.List;
24  import java.util.concurrent.BlockingQueue;
25  import java.util.concurrent.atomic.AtomicLong;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.fs.FileStatus;
31  import org.apache.hadoop.fs.FileSystem;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.hbase.Stoppable;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.io.HFileLink;
36  import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
37  import org.apache.hadoop.hbase.util.StealJobQueue;
38  
39  import com.google.common.annotations.VisibleForTesting;
40  /**
41   * This Chore, every time it runs, will clear the HFiles in the hfile archive
42   * folder that are deletable for each HFile cleaner in the chain.
43   */
44  @InterfaceAudience.Private
45  public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> {
46  
47    public static final String MASTER_HFILE_CLEANER_PLUGINS = "hbase.master.hfilecleaner.plugins";
48  
49    // Configuration key for large/small throttle point
50    public final static String HFILE_DELETE_THROTTLE_THRESHOLD =
51        "hbase.regionserver.thread.hfilecleaner.throttle";
52    public final static int DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD = 64 * 1024 * 1024;// 64M
53  
54    // Configuration key for large queue initial size
55    public final static String LARGE_HFILE_QUEUE_INIT_SIZE =
56        "hbase.regionserver.hfilecleaner.large.queue.size";
57    public final static int DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE = 10240;
58  
59    // Configuration key for small queue initial size
60    public final static String SMALL_HFILE_QUEUE_INIT_SIZE =
61        "hbase.regionserver.hfilecleaner.small.queue.size";
62    public final static int DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE = 10240;
63  
64    // Configuration key for large file delete thread number
65    public final static String LARGE_HFILE_DELETE_THREAD_NUMBER =
66        "hbase.regionserver.hfilecleaner.large.thread.count";
67    public final static int DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER = 1;
68  
69    // Configuration key for small file delete thread number
70    public final static String SMALL_HFILE_DELETE_THREAD_NUMBER =
71        "hbase.regionserver.hfilecleaner.small.thread.count";
72    public final static int DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER = 1;
73  
74    private static final Log LOG = LogFactory.getLog(HFileCleaner.class);
75  
76    StealJobQueue<HFileDeleteTask> largeFileQueue;
77    BlockingQueue<HFileDeleteTask> smallFileQueue;
78    private int throttlePoint;
79    private int largeQueueInitSize;
80    private int smallQueueInitSize;
81    private int largeFileDeleteThreadNumber;
82    private int smallFileDeleteThreadNumber;
83    private List<Thread> threads = new ArrayList<Thread>();
84    private boolean running;
85  
86    private AtomicLong deletedLargeFiles = new AtomicLong();
87    private AtomicLong deletedSmallFiles = new AtomicLong();
88  
89    /**
90     * @param period the period of time to sleep between each run
91     * @param stopper the stopper
92     * @param conf configuration to use
93     * @param fs handle to the FS
94     * @param directory directory to be cleaned
95     */
96    public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
97        Path directory) {
98      super("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS);
99      throttlePoint =
100         conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
101     largeQueueInitSize =
102         conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
103     smallQueueInitSize =
104         conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
105     largeFileQueue = new StealJobQueue<HFileDeleteTask>(largeQueueInitSize, smallQueueInitSize,
106         COMPARATOR);
107     smallFileQueue = largeFileQueue.getStealFromQueue();
108     largeFileDeleteThreadNumber =
109         conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
110     smallFileDeleteThreadNumber =
111         conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
112     startHFileDeleteThreads();
113   }
114 
115   @Override
116   protected boolean validate(Path file) {
117     if (HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent())) {
118       return true;
119     }
120     return StoreFileInfo.validateStoreFileName(file.getName());
121   }
122 
123   /**
124    * Exposed for TESTING!
125    */
126   public List<BaseHFileCleanerDelegate> getDelegatesForTesting() {
127     return this.cleanersChain;
128   }
129 
130   @Override
131   public int deleteFiles(Iterable<FileStatus> filesToDelete) {
132     int deletedFiles = 0;
133     List<HFileDeleteTask> tasks = new ArrayList<HFileDeleteTask>();
134     // construct delete tasks and add into relative queue
135     for (FileStatus file : filesToDelete) {
136       HFileDeleteTask task = deleteFile(file);
137       if (task != null) {
138         tasks.add(task);
139       }
140     }
141     // wait for each submitted task to finish
142     for (HFileDeleteTask task : tasks) {
143       if (task.getResult()) {
144         deletedFiles++;
145       }
146     }
147     return deletedFiles;
148   }
149 
150   /**
151    * Construct an {@link HFileDeleteTask} for each file to delete and add into the correct queue
152    * @param file the file to delete
153    * @return HFileDeleteTask to track progress
154    */
155   private HFileDeleteTask deleteFile(FileStatus file) {
156     HFileDeleteTask task = new HFileDeleteTask(file);
157     boolean enqueued = dispatch(task);
158     return enqueued ? task : null;
159   }
160 
161   private boolean dispatch(HFileDeleteTask task) {
162     if (task.fileLength >= this.throttlePoint) {
163       if (!this.largeFileQueue.offer(task)) {
164         // should never arrive here as long as we use PriorityQueue
165         if (LOG.isTraceEnabled()) {
166           LOG.trace("Large file deletion queue is full");
167         }
168         return false;
169       }
170     } else {
171       if (!this.smallFileQueue.offer(task)) {
172         // should never arrive here as long as we use PriorityQueue
173         if (LOG.isTraceEnabled()) {
174           LOG.trace("Small file deletion queue is full");
175         }
176         return false;
177       }
178     }
179     return true;
180   }
181 
182   @Override
183   public void cleanup() {
184     super.cleanup();
185     stopHFileDeleteThreads();
186   }
187 
188   /**
189    * Start threads for hfile deletion
190    */
191   private void startHFileDeleteThreads() {
192     final String n = Thread.currentThread().getName();
193     running = true;
194     // start thread for large file deletion
195     for (int i = 0; i < largeFileDeleteThreadNumber; i++) {
196       Thread large = new Thread() {
197         @Override
198         public void run() {
199           consumerLoop(largeFileQueue);
200         }
201       };
202       large.setDaemon(true);
203       large.setName(n + "-HFileCleaner.large." + i + "-" + System.currentTimeMillis());
204       large.start();
205       LOG.debug("Starting hfile cleaner for large files: " + large.getName());
206       threads.add(large);
207     }
208 
209     // start thread for small file deletion
210     for (int i = 0; i < smallFileDeleteThreadNumber; i++) {
211       Thread small = new Thread() {
212         @Override
213         public void run() {
214           consumerLoop(smallFileQueue);
215         }
216       };
217       small.setDaemon(true);
218       small.setName(n + "-HFileCleaner.small." + i + "-" + System.currentTimeMillis());
219       small.start();
220       LOG.debug("Starting hfile cleaner for small files: " + small.getName());
221       threads.add(small);
222     }
223   }
224 
225   protected void consumerLoop(BlockingQueue<HFileDeleteTask> queue) {
226     try {
227       while (running) {
228         HFileDeleteTask task = null;
229         try {
230           task = queue.take();
231         } catch (InterruptedException e) {
232           if (LOG.isDebugEnabled()) {
233             LOG.debug("Interrupted while trying to take a task from queue", e);
234           }
235           break;
236         }
237         if (task != null) {
238           if (LOG.isDebugEnabled()) {
239             LOG.debug("Removing: " + task.filePath + " from archive");
240           }
241           boolean succeed;
242           try {
243             succeed = this.fs.delete(task.filePath, false);
244           } catch (IOException e) {
245             LOG.warn("Failed to delete file " + task.filePath, e);
246             succeed = false;
247           }
248           task.setResult(succeed);
249           if (succeed) {
250             countDeletedFiles(task.fileLength >= throttlePoint, queue == largeFileQueue);
251           }
252         }
253       }
254     } finally {
255       if (LOG.isDebugEnabled()) {
256         LOG.debug("Exit thread: " + Thread.currentThread());
257       }
258     }
259   }
260 
261   // Currently only for testing purpose
262   private void countDeletedFiles(boolean isLargeFile, boolean fromLargeQueue) {
263     if (isLargeFile) {
264       if (deletedLargeFiles.get() == Long.MAX_VALUE) {
265         LOG.info("Deleted more than Long.MAX_VALUE large files, reset counter to 0");
266         deletedLargeFiles.set(0L);
267       }
268       deletedLargeFiles.incrementAndGet();
269     } else {
270       if (deletedSmallFiles.get() == Long.MAX_VALUE) {
271         LOG.info("Deleted more than Long.MAX_VALUE small files, reset counter to 0");
272         deletedSmallFiles.set(0L);;
273       }
274       if (fromLargeQueue && LOG.isTraceEnabled()) {
275         LOG.trace("Stolen a small file deletion task in large file thread");
276       }
277       deletedSmallFiles.incrementAndGet();
278     }
279   }
280 
281   /**
282    * Stop threads for hfile deletion
283    */
284   private void stopHFileDeleteThreads() {
285     running = false;
286     if (LOG.isDebugEnabled()) {
287       LOG.debug("Stopping file delete threads");
288     }
289     for(Thread thread: threads){
290       thread.interrupt();
291     }
292   }
293 
294   private static final Comparator<HFileDeleteTask> COMPARATOR = new Comparator<HFileDeleteTask>() {
295 
296     @Override
297     public int compare(HFileDeleteTask o1, HFileDeleteTask o2) {
298       // larger file first so reverse compare
299       int cmp = Long.compare(o2.fileLength, o1.fileLength);
300       if (cmp != 0) {
301         return cmp;
302       }
303       // just use hashCode to generate a stable result.
304       return System.identityHashCode(o1) - System.identityHashCode(o2);
305     }
306   };
307 
308   private static class HFileDeleteTask {
309     private static final long MAX_WAIT = 60 * 1000L;
310     private static final long WAIT_UNIT = 1000L;
311 
312     boolean done = false;
313     boolean result;
314     final Path filePath;
315     final long fileLength;
316 
317     public HFileDeleteTask(FileStatus file) {
318       this.filePath = file.getPath();
319       this.fileLength = file.getLen();
320     }
321 
322     public synchronized void setResult(boolean result) {
323       this.done = true;
324       this.result = result;
325       notify();
326     }
327 
328     public synchronized boolean getResult() {
329       long waitTime = 0;
330       try {
331         while (!done) {
332           wait(WAIT_UNIT);
333           waitTime += WAIT_UNIT;
334           if (done) {
335             return this.result;
336           }
337           if (waitTime > MAX_WAIT) {
338             LOG.warn("Wait more than " + MAX_WAIT + " ms for deleting " + this.filePath
339                 + ", exit...");
340             return false;
341           }
342         }
343       } catch (InterruptedException e) {
344         LOG.warn("Interrupted while waiting for result of deleting " + filePath
345             + ", will return false", e);
346         return false;
347       }
348       return this.result;
349     }
350 
351     @Override
352     public boolean equals(Object o) {
353       if (this == o) {
354         return true;
355       }
356       if (o == null || !(o instanceof HFileDeleteTask)) {
357         return false;
358       }
359       HFileDeleteTask otherTask = (HFileDeleteTask) o;
360       return this.filePath.equals(otherTask.filePath) && (this.fileLength == otherTask.fileLength);
361     }
362 
363     @Override
364     public int hashCode() {
365       return filePath.hashCode();
366     }
367   }
368 
369   @VisibleForTesting
370   public List<Thread> getCleanerThreads() {
371     return threads;
372   }
373 
374   @VisibleForTesting
375   public long getNumOfDeletedLargeFiles() {
376     return deletedLargeFiles.get();
377   }
378 
379   @VisibleForTesting
380   public long getNumOfDeletedSmallFiles() {
381     return deletedSmallFiles.get();
382   }
383 
384   @VisibleForTesting
385   public long getLargeQueueInitSize() {
386     return largeQueueInitSize;
387   }
388 
389   @VisibleForTesting
390   public long getSmallQueueInitSize() {
391     return smallQueueInitSize;
392   }
393 
394   @VisibleForTesting
395   public long getThrottlePoint() {
396     return throttlePoint;
397   }
398 
399   //@Override
400   public void onConfigurationChange(Configuration conf) {
401     if (!checkAndUpdateConfigurations(conf)) {
402       LOG.debug("Update configuration triggered but nothing changed for this cleaner");
403       return;
404     }
405     stopHFileDeleteThreads();
406     // record the left over tasks
407     List<HFileDeleteTask> leftOverTasks =
408         new ArrayList<>(largeFileQueue.size() + smallFileQueue.size());
409     for (HFileDeleteTask task : largeFileQueue) {
410       leftOverTasks.add(task);
411     }
412     for (HFileDeleteTask task : smallFileQueue) {
413       leftOverTasks.add(task);
414     }
415     largeFileQueue = new StealJobQueue<HFileDeleteTask>(largeQueueInitSize, smallQueueInitSize,
416         COMPARATOR);
417     smallFileQueue = largeFileQueue.getStealFromQueue();
418     threads.clear();
419     startHFileDeleteThreads();
420     // re-dispatch the left over tasks
421     for (HFileDeleteTask task : leftOverTasks) {
422       dispatch(task);
423     }
424   }
425 
426   /**
427    * Check new configuration and update settings if value changed
428    * @param conf The new configuration
429    * @return true if any configuration for HFileCleaner changes, false if no change
430    */
431   private boolean checkAndUpdateConfigurations(Configuration conf) {
432     boolean updated = false;
433     int throttlePoint =
434         conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
435     if (throttlePoint != this.throttlePoint) {
436       LOG.debug("Updating throttle point, from " + this.throttlePoint + " to " + throttlePoint);
437       this.throttlePoint = throttlePoint;
438       updated = true;
439     }
440     int largeQueueInitSize =
441         conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
442     if (largeQueueInitSize != this.largeQueueInitSize) {
443       LOG.debug("Updating largeQueueInitSize, from " + this.largeQueueInitSize + " to "
444           + largeQueueInitSize);
445       this.largeQueueInitSize = largeQueueInitSize;
446       updated = true;
447     }
448     int smallQueueInitSize =
449         conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
450     if (smallQueueInitSize != this.smallQueueInitSize) {
451       LOG.debug("Updating smallQueueInitSize, from " + this.smallQueueInitSize + " to "
452           + smallQueueInitSize);
453       this.smallQueueInitSize = smallQueueInitSize;
454       updated = true;
455     }
456     int largeFileDeleteThreadNumber =
457         conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
458     if (largeFileDeleteThreadNumber != this.largeFileDeleteThreadNumber) {
459       LOG.debug("Updating largeFileDeleteThreadNumber, from " + this.largeFileDeleteThreadNumber
460           + " to " + largeFileDeleteThreadNumber);
461       this.largeFileDeleteThreadNumber = largeFileDeleteThreadNumber;
462       updated = true;
463     }
464     int smallFileDeleteThreadNumber =
465         conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
466     if (smallFileDeleteThreadNumber != this.smallFileDeleteThreadNumber) {
467       LOG.debug("Updating smallFileDeleteThreadNumber, from " + this.smallFileDeleteThreadNumber
468           + " to " + smallFileDeleteThreadNumber);
469       this.smallFileDeleteThreadNumber = smallFileDeleteThreadNumber;
470       updated = true;
471     }
472     return updated;
473   }
474 }