1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master.cleaner;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.Comparator;
23 import java.util.List;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.atomic.AtomicLong;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.FileStatus;
31 import org.apache.hadoop.fs.FileSystem;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.hbase.Stoppable;
34 import org.apache.hadoop.hbase.classification.InterfaceAudience;
35 import org.apache.hadoop.hbase.io.HFileLink;
36 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
37 import org.apache.hadoop.hbase.util.StealJobQueue;
38
39 import com.google.common.annotations.VisibleForTesting;
40
41
42
43
44 @InterfaceAudience.Private
45 public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> {
46
47 public static final String MASTER_HFILE_CLEANER_PLUGINS = "hbase.master.hfilecleaner.plugins";
48
49
50 public final static String HFILE_DELETE_THROTTLE_THRESHOLD =
51 "hbase.regionserver.thread.hfilecleaner.throttle";
52 public final static int DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD = 64 * 1024 * 1024;
53
54
55 public final static String LARGE_HFILE_QUEUE_INIT_SIZE =
56 "hbase.regionserver.hfilecleaner.large.queue.size";
57 public final static int DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE = 10240;
58
59
60 public final static String SMALL_HFILE_QUEUE_INIT_SIZE =
61 "hbase.regionserver.hfilecleaner.small.queue.size";
62 public final static int DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE = 10240;
63
64
65 public final static String LARGE_HFILE_DELETE_THREAD_NUMBER =
66 "hbase.regionserver.hfilecleaner.large.thread.count";
67 public final static int DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER = 1;
68
69
70 public final static String SMALL_HFILE_DELETE_THREAD_NUMBER =
71 "hbase.regionserver.hfilecleaner.small.thread.count";
72 public final static int DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER = 1;
73
74 private static final Log LOG = LogFactory.getLog(HFileCleaner.class);
75
76 StealJobQueue<HFileDeleteTask> largeFileQueue;
77 BlockingQueue<HFileDeleteTask> smallFileQueue;
78 private int throttlePoint;
79 private int largeQueueInitSize;
80 private int smallQueueInitSize;
81 private int largeFileDeleteThreadNumber;
82 private int smallFileDeleteThreadNumber;
83 private List<Thread> threads = new ArrayList<Thread>();
84 private boolean running;
85
86 private AtomicLong deletedLargeFiles = new AtomicLong();
87 private AtomicLong deletedSmallFiles = new AtomicLong();
88
89
90
91
92
93
94
95
96 public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
97 Path directory) {
98 super("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS);
99 throttlePoint =
100 conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
101 largeQueueInitSize =
102 conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
103 smallQueueInitSize =
104 conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
105 largeFileQueue = new StealJobQueue<HFileDeleteTask>(largeQueueInitSize, smallQueueInitSize,
106 COMPARATOR);
107 smallFileQueue = largeFileQueue.getStealFromQueue();
108 largeFileDeleteThreadNumber =
109 conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
110 smallFileDeleteThreadNumber =
111 conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
112 startHFileDeleteThreads();
113 }
114
115 @Override
116 protected boolean validate(Path file) {
117 if (HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent())) {
118 return true;
119 }
120 return StoreFileInfo.validateStoreFileName(file.getName());
121 }
122
123
124
125
126 public List<BaseHFileCleanerDelegate> getDelegatesForTesting() {
127 return this.cleanersChain;
128 }
129
130 @Override
131 public int deleteFiles(Iterable<FileStatus> filesToDelete) {
132 int deletedFiles = 0;
133 List<HFileDeleteTask> tasks = new ArrayList<HFileDeleteTask>();
134
135 for (FileStatus file : filesToDelete) {
136 HFileDeleteTask task = deleteFile(file);
137 if (task != null) {
138 tasks.add(task);
139 }
140 }
141
142 for (HFileDeleteTask task : tasks) {
143 if (task.getResult()) {
144 deletedFiles++;
145 }
146 }
147 return deletedFiles;
148 }
149
150
151
152
153
154
155 private HFileDeleteTask deleteFile(FileStatus file) {
156 HFileDeleteTask task = new HFileDeleteTask(file);
157 boolean enqueued = dispatch(task);
158 return enqueued ? task : null;
159 }
160
161 private boolean dispatch(HFileDeleteTask task) {
162 if (task.fileLength >= this.throttlePoint) {
163 if (!this.largeFileQueue.offer(task)) {
164
165 if (LOG.isTraceEnabled()) {
166 LOG.trace("Large file deletion queue is full");
167 }
168 return false;
169 }
170 } else {
171 if (!this.smallFileQueue.offer(task)) {
172
173 if (LOG.isTraceEnabled()) {
174 LOG.trace("Small file deletion queue is full");
175 }
176 return false;
177 }
178 }
179 return true;
180 }
181
182 @Override
183 public void cleanup() {
184 super.cleanup();
185 stopHFileDeleteThreads();
186 }
187
188
189
190
191 private void startHFileDeleteThreads() {
192 final String n = Thread.currentThread().getName();
193 running = true;
194
195 for (int i = 0; i < largeFileDeleteThreadNumber; i++) {
196 Thread large = new Thread() {
197 @Override
198 public void run() {
199 consumerLoop(largeFileQueue);
200 }
201 };
202 large.setDaemon(true);
203 large.setName(n + "-HFileCleaner.large." + i + "-" + System.currentTimeMillis());
204 large.start();
205 LOG.debug("Starting hfile cleaner for large files: " + large.getName());
206 threads.add(large);
207 }
208
209
210 for (int i = 0; i < smallFileDeleteThreadNumber; i++) {
211 Thread small = new Thread() {
212 @Override
213 public void run() {
214 consumerLoop(smallFileQueue);
215 }
216 };
217 small.setDaemon(true);
218 small.setName(n + "-HFileCleaner.small." + i + "-" + System.currentTimeMillis());
219 small.start();
220 LOG.debug("Starting hfile cleaner for small files: " + small.getName());
221 threads.add(small);
222 }
223 }
224
225 protected void consumerLoop(BlockingQueue<HFileDeleteTask> queue) {
226 try {
227 while (running) {
228 HFileDeleteTask task = null;
229 try {
230 task = queue.take();
231 } catch (InterruptedException e) {
232 if (LOG.isDebugEnabled()) {
233 LOG.debug("Interrupted while trying to take a task from queue", e);
234 }
235 break;
236 }
237 if (task != null) {
238 if (LOG.isDebugEnabled()) {
239 LOG.debug("Removing: " + task.filePath + " from archive");
240 }
241 boolean succeed;
242 try {
243 succeed = this.fs.delete(task.filePath, false);
244 } catch (IOException e) {
245 LOG.warn("Failed to delete file " + task.filePath, e);
246 succeed = false;
247 }
248 task.setResult(succeed);
249 if (succeed) {
250 countDeletedFiles(task.fileLength >= throttlePoint, queue == largeFileQueue);
251 }
252 }
253 }
254 } finally {
255 if (LOG.isDebugEnabled()) {
256 LOG.debug("Exit thread: " + Thread.currentThread());
257 }
258 }
259 }
260
261
262 private void countDeletedFiles(boolean isLargeFile, boolean fromLargeQueue) {
263 if (isLargeFile) {
264 if (deletedLargeFiles.get() == Long.MAX_VALUE) {
265 LOG.info("Deleted more than Long.MAX_VALUE large files, reset counter to 0");
266 deletedLargeFiles.set(0L);
267 }
268 deletedLargeFiles.incrementAndGet();
269 } else {
270 if (deletedSmallFiles.get() == Long.MAX_VALUE) {
271 LOG.info("Deleted more than Long.MAX_VALUE small files, reset counter to 0");
272 deletedSmallFiles.set(0L);;
273 }
274 if (fromLargeQueue && LOG.isTraceEnabled()) {
275 LOG.trace("Stolen a small file deletion task in large file thread");
276 }
277 deletedSmallFiles.incrementAndGet();
278 }
279 }
280
281
282
283
284 private void stopHFileDeleteThreads() {
285 running = false;
286 if (LOG.isDebugEnabled()) {
287 LOG.debug("Stopping file delete threads");
288 }
289 for(Thread thread: threads){
290 thread.interrupt();
291 }
292 }
293
294 private static final Comparator<HFileDeleteTask> COMPARATOR = new Comparator<HFileDeleteTask>() {
295
296 @Override
297 public int compare(HFileDeleteTask o1, HFileDeleteTask o2) {
298
299 int cmp = Long.compare(o2.fileLength, o1.fileLength);
300 if (cmp != 0) {
301 return cmp;
302 }
303
304 return System.identityHashCode(o1) - System.identityHashCode(o2);
305 }
306 };
307
308 private static class HFileDeleteTask {
309 private static final long MAX_WAIT = 60 * 1000L;
310 private static final long WAIT_UNIT = 1000L;
311
312 boolean done = false;
313 boolean result;
314 final Path filePath;
315 final long fileLength;
316
317 public HFileDeleteTask(FileStatus file) {
318 this.filePath = file.getPath();
319 this.fileLength = file.getLen();
320 }
321
322 public synchronized void setResult(boolean result) {
323 this.done = true;
324 this.result = result;
325 notify();
326 }
327
328 public synchronized boolean getResult() {
329 long waitTime = 0;
330 try {
331 while (!done) {
332 wait(WAIT_UNIT);
333 waitTime += WAIT_UNIT;
334 if (done) {
335 return this.result;
336 }
337 if (waitTime > MAX_WAIT) {
338 LOG.warn("Wait more than " + MAX_WAIT + " ms for deleting " + this.filePath
339 + ", exit...");
340 return false;
341 }
342 }
343 } catch (InterruptedException e) {
344 LOG.warn("Interrupted while waiting for result of deleting " + filePath
345 + ", will return false", e);
346 return false;
347 }
348 return this.result;
349 }
350
351 @Override
352 public boolean equals(Object o) {
353 if (this == o) {
354 return true;
355 }
356 if (o == null || !(o instanceof HFileDeleteTask)) {
357 return false;
358 }
359 HFileDeleteTask otherTask = (HFileDeleteTask) o;
360 return this.filePath.equals(otherTask.filePath) && (this.fileLength == otherTask.fileLength);
361 }
362
363 @Override
364 public int hashCode() {
365 return filePath.hashCode();
366 }
367 }
368
369 @VisibleForTesting
370 public List<Thread> getCleanerThreads() {
371 return threads;
372 }
373
374 @VisibleForTesting
375 public long getNumOfDeletedLargeFiles() {
376 return deletedLargeFiles.get();
377 }
378
379 @VisibleForTesting
380 public long getNumOfDeletedSmallFiles() {
381 return deletedSmallFiles.get();
382 }
383
384 @VisibleForTesting
385 public long getLargeQueueInitSize() {
386 return largeQueueInitSize;
387 }
388
389 @VisibleForTesting
390 public long getSmallQueueInitSize() {
391 return smallQueueInitSize;
392 }
393
394 @VisibleForTesting
395 public long getThrottlePoint() {
396 return throttlePoint;
397 }
398
399
400 public void onConfigurationChange(Configuration conf) {
401 if (!checkAndUpdateConfigurations(conf)) {
402 LOG.debug("Update configuration triggered but nothing changed for this cleaner");
403 return;
404 }
405 stopHFileDeleteThreads();
406
407 List<HFileDeleteTask> leftOverTasks =
408 new ArrayList<>(largeFileQueue.size() + smallFileQueue.size());
409 for (HFileDeleteTask task : largeFileQueue) {
410 leftOverTasks.add(task);
411 }
412 for (HFileDeleteTask task : smallFileQueue) {
413 leftOverTasks.add(task);
414 }
415 largeFileQueue = new StealJobQueue<HFileDeleteTask>(largeQueueInitSize, smallQueueInitSize,
416 COMPARATOR);
417 smallFileQueue = largeFileQueue.getStealFromQueue();
418 threads.clear();
419 startHFileDeleteThreads();
420
421 for (HFileDeleteTask task : leftOverTasks) {
422 dispatch(task);
423 }
424 }
425
426
427
428
429
430
431 private boolean checkAndUpdateConfigurations(Configuration conf) {
432 boolean updated = false;
433 int throttlePoint =
434 conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
435 if (throttlePoint != this.throttlePoint) {
436 LOG.debug("Updating throttle point, from " + this.throttlePoint + " to " + throttlePoint);
437 this.throttlePoint = throttlePoint;
438 updated = true;
439 }
440 int largeQueueInitSize =
441 conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
442 if (largeQueueInitSize != this.largeQueueInitSize) {
443 LOG.debug("Updating largeQueueInitSize, from " + this.largeQueueInitSize + " to "
444 + largeQueueInitSize);
445 this.largeQueueInitSize = largeQueueInitSize;
446 updated = true;
447 }
448 int smallQueueInitSize =
449 conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
450 if (smallQueueInitSize != this.smallQueueInitSize) {
451 LOG.debug("Updating smallQueueInitSize, from " + this.smallQueueInitSize + " to "
452 + smallQueueInitSize);
453 this.smallQueueInitSize = smallQueueInitSize;
454 updated = true;
455 }
456 int largeFileDeleteThreadNumber =
457 conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
458 if (largeFileDeleteThreadNumber != this.largeFileDeleteThreadNumber) {
459 LOG.debug("Updating largeFileDeleteThreadNumber, from " + this.largeFileDeleteThreadNumber
460 + " to " + largeFileDeleteThreadNumber);
461 this.largeFileDeleteThreadNumber = largeFileDeleteThreadNumber;
462 updated = true;
463 }
464 int smallFileDeleteThreadNumber =
465 conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
466 if (smallFileDeleteThreadNumber != this.smallFileDeleteThreadNumber) {
467 LOG.debug("Updating smallFileDeleteThreadNumber, from " + this.smallFileDeleteThreadNumber
468 + " to " + smallFileDeleteThreadNumber);
469 this.smallFileDeleteThreadNumber = smallFileDeleteThreadNumber;
470 updated = true;
471 }
472 return updated;
473 }
474 }