1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master.cleaner;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertTrue;
23
24 import java.io.IOException;
25 import java.util.List;
26 import java.util.Random;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.FSDataOutputStream;
32 import org.apache.hadoop.fs.FileStatus;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.hbase.ChoreService;
36 import org.apache.hadoop.hbase.CoordinatedStateManager;
37 import org.apache.hadoop.hbase.HBaseTestingUtility;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.testclassification.MediumTests;
40 import org.apache.hadoop.hbase.Server;
41 import org.apache.hadoop.hbase.ServerName;
42 import org.apache.hadoop.hbase.client.ClusterConnection;
43 import org.apache.hadoop.hbase.util.EnvironmentEdge;
44 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
45 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
46 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
47 import org.junit.AfterClass;
48 import org.junit.Assert;
49 import org.junit.BeforeClass;
50 import org.junit.Test;
51 import org.junit.experimental.categories.Category;
52
53 @Category(MediumTests.class)
54 public class TestHFileCleaner {
55 private static final Log LOG = LogFactory.getLog(TestHFileCleaner.class);
56
57 private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
58
59 @BeforeClass
60 public static void setupCluster() throws Exception {
61
62 UTIL.startMiniDFSCluster(1);
63 }
64
65 @AfterClass
66 public static void shutdownCluster() throws IOException {
67 UTIL.shutdownMiniDFSCluster();
68 }
69
70 @Test
71 public void testTTLCleaner() throws IOException, InterruptedException {
72 FileSystem fs = UTIL.getDFSCluster().getFileSystem();
73 Path root = UTIL.getDataTestDirOnTestFS();
74 Path file = new Path(root, "file");
75 fs.createNewFile(file);
76 long createTime = System.currentTimeMillis();
77 assertTrue("Test file not created!", fs.exists(file));
78 TimeToLiveHFileCleaner cleaner = new TimeToLiveHFileCleaner();
79
80 fs.setTimes(file, createTime - 100, -1);
81 Configuration conf = UTIL.getConfiguration();
82 conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 100);
83 cleaner.setConf(conf);
84 assertTrue("File not set deletable - check mod time:" + getFileStats(file, fs)
85 + " with create time:" + createTime, cleaner.isFileDeletable(fs.getFileStatus(file)));
86 }
87
88
89
90
91
92 private String getFileStats(Path file, FileSystem fs) throws IOException {
93 FileStatus status = fs.getFileStatus(file);
94 return "File" + file + ", mtime:" + status.getModificationTime() + ", atime:"
95 + status.getAccessTime();
96 }
97
98 @Test(timeout = 60 *1000)
99 public void testHFileCleaning() throws Exception {
100 final EnvironmentEdge originalEdge = EnvironmentEdgeManager.getDelegate();
101 String prefix = "someHFileThatWouldBeAUUID";
102 Configuration conf = UTIL.getConfiguration();
103
104 long ttl = 2000;
105 conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
106 "org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner");
107 conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, ttl);
108 Server server = new DummyServer();
109 Path archivedHfileDir = new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY);
110 FileSystem fs = FileSystem.get(conf);
111 HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir);
112
113
114 final long createTime = System.currentTimeMillis();
115 fs.delete(archivedHfileDir, true);
116 fs.mkdirs(archivedHfileDir);
117
118 fs.createNewFile(new Path(archivedHfileDir, "dfd-dfd"));
119
120
121 LOG.debug("Now is: " + createTime);
122 for (int i = 1; i < 32; i++) {
123
124
125 Path fileName = new Path(archivedHfileDir, (prefix + "." + (createTime + i)));
126 fs.createNewFile(fileName);
127
128 fs.setTimes(fileName, createTime - ttl - 1, -1);
129 LOG.debug("Creating " + getFileStats(fileName, fs));
130 }
131
132
133
134 Path saved = new Path(archivedHfileDir, prefix + ".00000000000");
135 fs.createNewFile(saved);
136
137 fs.setTimes(saved, createTime - ttl / 2, -1);
138 LOG.debug("Creating " + getFileStats(saved, fs));
139 for (FileStatus stat : fs.listStatus(archivedHfileDir)) {
140 LOG.debug(stat.getPath().toString());
141 }
142
143 assertEquals(33, fs.listStatus(archivedHfileDir).length);
144
145
146 EnvironmentEdge setTime = new EnvironmentEdge() {
147 @Override
148 public long currentTime() {
149 return createTime;
150 }
151 };
152 EnvironmentEdgeManager.injectEdge(setTime);
153
154
155 cleaner.chore();
156
157
158 assertEquals(1, fs.listStatus(archivedHfileDir).length);
159
160 for (FileStatus file : fs.listStatus(archivedHfileDir)) {
161 LOG.debug("Kept hfiles: " + file.getPath().getName());
162 }
163
164
165 EnvironmentEdgeManager.injectEdge(originalEdge);
166 }
167
168 @Test
169 public void testRemovesEmptyDirectories() throws Exception {
170 Configuration conf = UTIL.getConfiguration();
171
172 conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, "");
173 Server server = new DummyServer();
174 Path archivedHfileDir = new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY);
175
176
177 FileSystem fs = UTIL.getDFSCluster().getFileSystem();
178 HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir);
179
180
181 Path table = new Path(archivedHfileDir, "table");
182 Path region = new Path(table, "regionsomthing");
183 Path family = new Path(region, "fam");
184 Path file = new Path(family, "file12345");
185 fs.mkdirs(family);
186 if (!fs.exists(family)) throw new RuntimeException("Couldn't create test family:" + family);
187 fs.create(file).close();
188 if (!fs.exists(file)) throw new RuntimeException("Test file didn't get created:" + file);
189
190
191 cleaner.chore();
192
193
194 assertFalse("family directory not removed for empty directory", fs.exists(family));
195 assertFalse("region directory not removed for empty directory", fs.exists(region));
196 assertFalse("table directory not removed for empty directory", fs.exists(table));
197 assertTrue("archive directory", fs.exists(archivedHfileDir));
198 }
199
200 static class DummyServer implements Server {
201
202 @Override
203 public Configuration getConfiguration() {
204 return UTIL.getConfiguration();
205 }
206
207 @Override
208 public ZooKeeperWatcher getZooKeeper() {
209 try {
210 return new ZooKeeperWatcher(getConfiguration(), "dummy server", this);
211 } catch (IOException e) {
212 e.printStackTrace();
213 }
214 return null;
215 }
216
217 @Override
218 public CoordinatedStateManager getCoordinatedStateManager() {
219 return null;
220 }
221
222 @Override
223 public ClusterConnection getConnection() {
224 return null;
225 }
226
227 @Override
228 public MetaTableLocator getMetaTableLocator() {
229 return null;
230 }
231
232 @Override
233 public ServerName getServerName() {
234 return ServerName.valueOf("regionserver,60020,000000");
235 }
236
237 @Override
238 public void abort(String why, Throwable e) {
239 }
240
241 @Override
242 public boolean isAborted() {
243 return false;
244 }
245
246 @Override
247 public void stop(String why) {
248 }
249
250 @Override
251 public boolean isStopped() {
252 return false;
253 }
254
255 @Override
256 public ChoreService getChoreService() {
257 return null;
258 }
259 }
260
261 @Test
262 public void testThreadCleanup() throws Exception {
263 Configuration conf = UTIL.getConfiguration();
264 conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, "");
265 Server server = new DummyServer();
266 Path archivedHfileDir =
267 new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY);
268
269
270 FileSystem fs = UTIL.getDFSCluster().getFileSystem();
271 HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir);
272
273 fs.delete(archivedHfileDir, true);
274 fs.mkdirs(archivedHfileDir);
275
276 fs.createNewFile(new Path(archivedHfileDir, "dfd-dfd"));
277
278 cleaner.chore();
279
280 cleaner.cleanup();
281
282 Thread.sleep(100);
283 for (Thread thread : cleaner.getCleanerThreads()) {
284 Assert.assertFalse(thread.isAlive());
285 }
286 }
287
288 @Test
289 public void testLargeSmallIsolation() throws Exception {
290 Configuration conf = UTIL.getConfiguration();
291
292 conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, "");
293 conf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, 512 * 1024);
294 Server server = new DummyServer();
295 Path archivedHfileDir =
296 new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY);
297
298
299 FileSystem fs = UTIL.getDFSCluster().getFileSystem();
300 HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir);
301
302 fs.delete(archivedHfileDir, true);
303 fs.mkdirs(archivedHfileDir);
304
305 final int LARGE_FILE_NUM = 5;
306 final int SMALL_FILE_NUM = 20;
307 createFilesForTesting(LARGE_FILE_NUM, SMALL_FILE_NUM, fs, archivedHfileDir);
308
309 cleaner.chore();
310
311 Assert.assertEquals(LARGE_FILE_NUM, cleaner.getNumOfDeletedLargeFiles());
312 Assert.assertEquals(SMALL_FILE_NUM, cleaner.getNumOfDeletedSmallFiles());
313 }
314
315 @Test(timeout = 60 * 1000)
316 public void testOnConfigurationChange() throws Exception {
317
318 final int ORIGINAL_THROTTLE_POINT = 512 * 1024;
319 final int ORIGINAL_QUEUE_INIT_SIZE = 512;
320 final int UPDATE_THROTTLE_POINT = 1024;
321 final int UPDATE_QUEUE_INIT_SIZE = 1024;
322 final int LARGE_FILE_NUM = 5;
323 final int SMALL_FILE_NUM = 20;
324 final int LARGE_THREAD_NUM = 2;
325 final int SMALL_THREAD_NUM = 4;
326
327 Configuration conf = UTIL.getConfiguration();
328
329 conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, "");
330 conf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, ORIGINAL_THROTTLE_POINT);
331 conf.setInt(HFileCleaner.LARGE_HFILE_QUEUE_INIT_SIZE, ORIGINAL_QUEUE_INIT_SIZE);
332 conf.setInt(HFileCleaner.SMALL_HFILE_QUEUE_INIT_SIZE, ORIGINAL_QUEUE_INIT_SIZE);
333 Server server = new DummyServer();
334 Path archivedHfileDir =
335 new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY);
336
337
338 FileSystem fs = UTIL.getDFSCluster().getFileSystem();
339 final HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir);
340 Assert.assertEquals(ORIGINAL_THROTTLE_POINT, cleaner.getThrottlePoint());
341 Assert.assertEquals(ORIGINAL_QUEUE_INIT_SIZE, cleaner.getLargeQueueInitSize());
342 Assert.assertEquals(ORIGINAL_QUEUE_INIT_SIZE, cleaner.getSmallQueueInitSize());
343
344
345 fs.delete(archivedHfileDir, true);
346 fs.mkdirs(archivedHfileDir);
347 createFilesForTesting(LARGE_FILE_NUM, SMALL_FILE_NUM, fs, archivedHfileDir);
348
349
350 Thread t = new Thread() {
351 @Override
352 public void run() {
353 cleaner.chore();
354 }
355 };
356 t.setDaemon(true);
357 t.start();
358
359 while (cleaner.getNumOfDeletedSmallFiles() == 0) {
360 Thread.yield();
361 }
362
363
364 Configuration newConf = new Configuration(conf);
365 newConf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, UPDATE_THROTTLE_POINT);
366 newConf.setInt(HFileCleaner.LARGE_HFILE_QUEUE_INIT_SIZE, UPDATE_QUEUE_INIT_SIZE);
367 newConf.setInt(HFileCleaner.SMALL_HFILE_QUEUE_INIT_SIZE, UPDATE_QUEUE_INIT_SIZE);
368 newConf.setInt(HFileCleaner.LARGE_HFILE_DELETE_THREAD_NUMBER, LARGE_THREAD_NUM);
369 newConf.setInt(HFileCleaner.SMALL_HFILE_DELETE_THREAD_NUMBER, SMALL_THREAD_NUM);
370 LOG.debug("File deleted from large queue: " + cleaner.getNumOfDeletedLargeFiles()
371 + "; from small queue: " + cleaner.getNumOfDeletedSmallFiles());
372 cleaner.onConfigurationChange(newConf);
373
374
375 Assert.assertEquals(UPDATE_THROTTLE_POINT, cleaner.getThrottlePoint());
376 Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, cleaner.getLargeQueueInitSize());
377 Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, cleaner.getSmallQueueInitSize());
378 Assert.assertEquals(LARGE_THREAD_NUM + SMALL_THREAD_NUM, cleaner.getCleanerThreads().size());
379
380
381 List<Thread> oldThreads = cleaner.getCleanerThreads();
382 cleaner.onConfigurationChange(newConf);
383 List<Thread> newThreads = cleaner.getCleanerThreads();
384 Assert.assertArrayEquals(oldThreads.toArray(), newThreads.toArray());
385
386
387 t.join();
388 LOG.debug("File deleted from large queue: " + cleaner.getNumOfDeletedLargeFiles()
389 + "; from small queue: " + cleaner.getNumOfDeletedSmallFiles());
390 Assert.assertTrue("Should delete more than " + LARGE_FILE_NUM
391 + " files from large queue but actually " + cleaner.getNumOfDeletedLargeFiles(),
392 cleaner.getNumOfDeletedLargeFiles() > LARGE_FILE_NUM);
393 Assert.assertTrue("Should delete less than " + SMALL_FILE_NUM
394 + " files from small queue but actually " + cleaner.getNumOfDeletedSmallFiles(),
395 cleaner.getNumOfDeletedSmallFiles() < SMALL_FILE_NUM);
396 }
397
398 private void createFilesForTesting(int largeFileNum, int smallFileNum, FileSystem fs,
399 Path archivedHfileDir) throws IOException {
400 final Random rand = new Random();
401 final byte[] large = new byte[1024 * 1024];
402 for (int i = 0; i < large.length; i++) {
403 large[i] = (byte) rand.nextInt(128);
404 }
405 final byte[] small = new byte[1024];
406 for (int i = 0; i < small.length; i++) {
407 small[i] = (byte) rand.nextInt(128);
408 }
409
410 for (int i = 1; i <= largeFileNum; i++) {
411 FSDataOutputStream out = fs.create(new Path(archivedHfileDir, "large-file-" + i));
412 out.write(large);
413 out.close();
414 }
415 for (int i = 1; i <= smallFileNum; i++) {
416 FSDataOutputStream out = fs.create(new Path(archivedHfileDir, "small-file-" + i));
417 out.write(small);
418 out.close();
419 }
420 }
421 }