1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mob.compactions;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.Collections;
25 import java.util.Date;
26 import java.util.List;
27 import java.util.Random;
28 import java.util.UUID;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.RejectedExecutionException;
31 import java.util.concurrent.RejectedExecutionHandler;
32 import java.util.concurrent.SynchronousQueue;
33 import java.util.concurrent.ThreadPoolExecutor;
34 import java.util.concurrent.TimeUnit;
35
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FileStatus;
38 import org.apache.hadoop.fs.FileSystem;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.hbase.*;
41 import org.apache.hadoop.hbase.KeyValue.Type;
42 import org.apache.hadoop.hbase.regionserver.*;
43 import org.apache.hadoop.hbase.testclassification.LargeTests;
44 import org.apache.hadoop.hbase.client.Scan;
45 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
46 import org.apache.hadoop.hbase.io.hfile.HFileContext;
47 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
48 import org.apache.hadoop.hbase.mob.MobConstants;
49 import org.apache.hadoop.hbase.mob.MobFileName;
50 import org.apache.hadoop.hbase.mob.MobUtils;
51 import org.apache.hadoop.hbase.mob.compactions.MobCompactionRequest.CompactionType;
52 import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition;
53 import org.apache.hadoop.hbase.util.Bytes;
54 import org.apache.hadoop.hbase.util.FSUtils;
55 import org.apache.hadoop.hbase.util.Threads;
56 import org.apache.hadoop.hdfs.DistributedFileSystem;
57 import org.junit.AfterClass;
58 import org.junit.Assert;
59 import static org.junit.Assert.assertTrue;
60 import org.junit.BeforeClass;
61 import org.junit.Test;
62 import org.junit.experimental.categories.Category;
63
64 @Category(LargeTests.class)
65 public class TestPartitionedMobCompactor {
66 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
67 private final static String family = "family";
68 private final static String qf = "qf";
69 private HColumnDescriptor hcd = new HColumnDescriptor(family);
70 private Configuration conf = TEST_UTIL.getConfiguration();
71 private CacheConfig cacheConf = new CacheConfig(conf);
72 private FileSystem fs;
73 private List<FileStatus> mobFiles = new ArrayList<>();
74 private List<FileStatus> delFiles = new ArrayList<>();
75 private List<FileStatus> allFiles = new ArrayList<>();
76 private Path basePath;
77 private String mobSuffix;
78 private String delSuffix;
79 private static ExecutorService pool;
80
81 @BeforeClass
82 public static void setUpBeforeClass() throws Exception {
83 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
84
85 TEST_UTIL.getConfiguration().setClass("fs.hdfs.impl", FaultyDistributedFileSystem.class,
86 DistributedFileSystem.class);
87 TEST_UTIL.startMiniCluster(1);
88 pool = createThreadPool();
89 }
90
91 @AfterClass
92 public static void tearDownAfterClass() throws Exception {
93 pool.shutdown();
94 TEST_UTIL.shutdownMiniCluster();
95 }
96
97 private void init(String tableName) throws Exception {
98 fs = FileSystem.get(conf);
99 Path testDir = FSUtils.getRootDir(conf);
100 Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
101 basePath = new Path(new Path(mobTestDir, tableName), family);
102 mobSuffix = UUID.randomUUID().toString().replaceAll("-", "");
103 delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del";
104 }
105
106 @Test
107 public void testCompactionSelectWithAllFiles() throws Exception {
108 String tableName = "testCompactionSelectWithAllFiles";
109
110
111 testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD,
112 CompactionType.ALL_FILES, false, false);
113 }
114
115 @Test
116 public void testCompactionSelectToAvoidCompactOneFileWithDelete() throws Exception {
117 String tableName = "testCompactionSelectToAvoidCompactOneFileWithDelete";
118
119
120 testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD,
121 CompactionType.PART_FILES, false);
122 }
123
124
125 @Test
126 public void testCompactionSelectWithPartFiles() throws Exception {
127 String tableName = "testCompactionSelectWithPartFiles";
128 testCompactionAtMergeSize(tableName, 4000, CompactionType.PART_FILES, false);
129 }
130
131 @Test
132 public void testCompactionSelectWithForceAllFiles() throws Exception {
133 String tableName = "testCompactionSelectWithForceAllFiles";
134 testCompactionAtMergeSize(tableName, Long.MAX_VALUE, CompactionType.ALL_FILES, true);
135 }
136
137 private void testCompactionAtMergeSize(final String tableName,
138 final long mergeSize, final CompactionType type, final boolean isForceAllFiles)
139 throws Exception {
140 testCompactionAtMergeSize(tableName, mergeSize, type, isForceAllFiles, true);
141 }
142
143 private void testCompactionAtMergeSize(final String tableName,
144 final long mergeSize, final CompactionType type, final boolean isForceAllFiles,
145 final boolean createDelFiles)
146 throws Exception {
147 resetConf();
148 init(tableName);
149 int count = 10;
150
151 createStoreFiles(basePath, family, qf, count, Type.Put);
152
153 if (createDelFiles) {
154
155 createStoreFiles(basePath, family, qf, count, Type.Delete);
156 }
157
158 listFiles();
159 List<String> expectedStartKeys = new ArrayList<>();
160 for(FileStatus file : mobFiles) {
161 if(file.getLen() < mergeSize) {
162 String fileName = file.getPath().getName();
163 String startKey = fileName.substring(0, 32);
164
165
166
167 if (isForceAllFiles || !createDelFiles) {
168 expectedStartKeys.add(startKey);
169 }
170 }
171 }
172
173 conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
174 testSelectFiles(tableName, type, isForceAllFiles, expectedStartKeys);
175 }
176
177 @Test
178 public void testCompactDelFilesWithDefaultBatchSize() throws Exception {
179 String tableName = "testCompactDelFilesWithDefaultBatchSize";
180 testCompactDelFilesAtBatchSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE,
181 MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
182 }
183
184 @Test
185 public void testCompactDelFilesWithSmallBatchSize() throws Exception {
186 String tableName = "testCompactDelFilesWithSmallBatchSize";
187 testCompactDelFilesAtBatchSize(tableName, 4, MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
188 }
189
190 @Test
191 public void testCompactDelFilesChangeMaxDelFileCount() throws Exception {
192 String tableName = "testCompactDelFilesWithSmallBatchSize";
193 testCompactDelFilesAtBatchSize(tableName, 4, 2);
194 }
195
196 @Test
197 public void testCompactFilesWithDstDirFull() throws Exception {
198 String tableName = "testCompactFilesWithDstDirFull";
199 fs = FileSystem.get(conf);
200 FaultyDistributedFileSystem faultyFs = (FaultyDistributedFileSystem)fs;
201 Path testDir = FSUtils.getRootDir(conf);
202 Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
203 basePath = new Path(new Path(mobTestDir, tableName), family);
204
205 try {
206 int count = 2;
207
208 createStoreFiles(basePath, family, qf, count, Type.Put, true);
209 listFiles();
210
211 TableName tName = TableName.valueOf(tableName);
212 MobCompactor compactor = new PartitionedMobCompactor(conf, faultyFs, tName, hcd, pool);
213 faultyFs.setThrowException(true);
214 try {
215 compactor.compact(allFiles, true);
216 } catch (IOException e) {
217 System.out.println("Expected exception, ignore");
218 }
219
220
221 Path tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME);
222 FileStatus[] ls = faultyFs.listStatus(tempPath);
223
224
225 assertTrue(ls.length == 1);
226 assertTrue(MobConstants.BULKLOAD_DIR_NAME.equalsIgnoreCase(ls[0].getPath().getName()));
227
228 Path bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path(
229 tName.getNamespaceAsString(), tName.getQualifierAsString())));
230
231
232 FileStatus[] lsBulkload = faultyFs.listStatus(bulkloadPath);
233 assertTrue(lsBulkload.length == 0);
234
235 } finally {
236 faultyFs.setThrowException(false);
237 }
238 }
239
240
241 private void testCompactDelFilesAtBatchSize(String tableName, int batchSize,
242 int delfileMaxCount) throws Exception {
243 resetConf();
244 init(tableName);
245
246 createStoreFiles(basePath, family, qf, 20, Type.Put);
247
248 createStoreFiles(basePath, family, qf, 13, Type.Delete);
249 listFiles();
250
251
252 conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, delfileMaxCount);
253
254 conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, batchSize);
255 testCompactDelFiles(tableName, 1, 13, false);
256 }
257
258
259
260
261
262
263
264
265 private void testSelectFiles(String tableName, final CompactionType type,
266 final boolean isForceAllFiles, final List<String> expected) throws IOException {
267 PartitionedMobCompactor compactor = new PartitionedMobCompactor(conf, fs,
268 TableName.valueOf(tableName), hcd, pool) {
269 @Override
270 public List<Path> compact(List<FileStatus> files, boolean isForceAllFiles)
271 throws IOException {
272 if (files == null || files.isEmpty()) {
273 return null;
274 }
275 PartitionedMobCompactionRequest request = select(files, isForceAllFiles);
276
277 Assert.assertEquals(type, request.type);
278
279 compareCompactedPartitions(expected, request.compactionPartitions);
280
281 compareDelFiles(request.delFiles);
282 return null;
283 }
284 };
285 compactor.compact(allFiles, isForceAllFiles);
286 }
287
288
289
290
291
292
293
294
295 private void testCompactDelFiles(String tableName, final int expectedFileCount,
296 final int expectedCellCount, boolean isForceAllFiles) throws IOException {
297 PartitionedMobCompactor compactor = new PartitionedMobCompactor(conf, fs,
298 TableName.valueOf(tableName), hcd, pool) {
299 @Override
300 protected List<Path> performCompaction(PartitionedMobCompactionRequest request)
301 throws IOException {
302 List<Path> delFilePaths = new ArrayList<Path>();
303 for (FileStatus delFile : request.delFiles) {
304 delFilePaths.add(delFile.getPath());
305 }
306 List<Path> newDelPaths = compactDelFiles(request, delFilePaths);
307
308 Assert.assertEquals(expectedFileCount, newDelPaths.size());
309 Assert.assertEquals(expectedCellCount, countDelCellsInDelFiles(newDelPaths));
310 return null;
311 }
312 };
313 compactor.compact(allFiles, isForceAllFiles);
314 }
315
316
317
318
319 private void listFiles() throws IOException {
320 for (FileStatus file : fs.listStatus(basePath)) {
321 allFiles.add(file);
322 if (file.getPath().getName().endsWith("_del")) {
323 delFiles.add(file);
324 } else {
325 mobFiles.add(file);
326 }
327 }
328 }
329
330
331
332
333
334 private void compareCompactedPartitions(List<String> expected,
335 Collection<CompactionPartition> partitions) {
336 List<String> actualKeys = new ArrayList<>();
337 for (CompactionPartition partition : partitions) {
338 actualKeys.add(partition.getPartitionId().getStartKey());
339 }
340 Collections.sort(expected);
341 Collections.sort(actualKeys);
342 Assert.assertEquals(expected.size(), actualKeys.size());
343 for (int i = 0; i < expected.size(); i++) {
344 Assert.assertEquals(expected.get(i), actualKeys.get(i));
345 }
346 }
347
348
349
350
351
352 private void compareDelFiles(Collection<FileStatus> allDelFiles) {
353 int i = 0;
354 for (FileStatus file : allDelFiles) {
355 Assert.assertEquals(delFiles.get(i), file);
356 i++;
357 }
358 }
359
360
361
362
363
364
365
366
367
368 private void createStoreFiles(Path basePath, String family, String qualifier, int count,
369 Type type) throws IOException {
370 createStoreFiles(basePath, family, qualifier, count, type, false);
371 }
372
373 private void createStoreFiles(Path basePath, String family, String qualifier, int count,
374 Type type, boolean sameStartKey) throws IOException {
375 HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
376 String startKey = "row_";
377 MobFileName mobFileName = null;
378 for (int i = 0; i < count; i++) {
379 byte[] startRow;
380 if (sameStartKey) {
381
382 startRow = Bytes.toBytes(startKey);
383 mobSuffix = UUID.randomUUID().toString().replaceAll("-", "");
384 delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del";
385 } else {
386 startRow = Bytes.toBytes(startKey + i);
387 }
388 if(type.equals(Type.Delete)) {
389 mobFileName = MobFileName.create(startRow, MobUtils.formatDate(
390 new Date()), delSuffix);
391 }
392 if(type.equals(Type.Put)){
393 mobFileName = MobFileName.create(startRow, MobUtils.formatDate(
394 new Date()), mobSuffix);
395 }
396 StoreFile.Writer mobFileWriter = new StoreFile.WriterBuilder(conf, cacheConf, fs)
397 .withFileContext(meta).withFilePath(new Path(basePath, mobFileName.getFileName())).build();
398 writeStoreFile(mobFileWriter, startRow, Bytes.toBytes(family), Bytes.toBytes(qualifier),
399 type, (i+1)*1000);
400 }
401 }
402
403
404
405
406
407
408
409
410
411
412 private static void writeStoreFile(final StoreFile.Writer writer, byte[]row, byte[] family,
413 byte[] qualifier, Type type, int size) throws IOException {
414 long now = System.currentTimeMillis();
415 try {
416 byte[] dummyData = new byte[size];
417 new Random().nextBytes(dummyData);
418 writer.append(new KeyValue(row, family, qualifier, now, type, dummyData));
419 } finally {
420 writer.close();
421 }
422 }
423
424
425
426
427
428
429 private int countDelCellsInDelFiles(List<Path> paths) throws IOException {
430 List<StoreFile> sfs = new ArrayList<StoreFile>();
431 int size = 0;
432 for(Path path : paths) {
433 StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE);
434 sfs.add(sf);
435 }
436 List scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true,
437 false, null, HConstants.LATEST_TIMESTAMP);
438 Scan scan = new Scan();
439 scan.setMaxVersions(hcd.getMaxVersions());
440 long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
441 long ttl = HStore.determineTTLFromFamily(hcd);
442 ScanInfo scanInfo = new ScanInfo(hcd, ttl, timeToPurgeDeletes, KeyValue.COMPARATOR);
443 StoreScanner scanner = new StoreScanner(scan, scanInfo, ScanType.COMPACT_RETAIN_DELETES, null,
444 scanners, 0L, HConstants.LATEST_TIMESTAMP);
445 List<Cell> results = new ArrayList<>();
446 boolean hasMore = true;
447
448 while (hasMore) {
449 hasMore = scanner.next(results);
450 size += results.size();
451 results.clear();
452 }
453 scanner.close();
454 return size;
455 }
456
457 private static ExecutorService createThreadPool() {
458 int maxThreads = 10;
459 long keepAliveTime = 60;
460 final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
461 ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime,
462 TimeUnit.SECONDS, queue, Threads.newDaemonThreadFactory("MobFileCompactionChore"),
463 new RejectedExecutionHandler() {
464 @Override
465 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
466 try {
467
468 queue.put(r);
469 } catch (InterruptedException e) {
470 throw new RejectedExecutionException(e);
471 }
472 }
473 });
474 ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
475 return pool;
476 }
477
478
479
480
481 private void resetConf() {
482 conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD,
483 MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD);
484 conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
485 conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE,
486 MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE);
487 }
488
489
490
491
492 static class FaultyDistributedFileSystem extends DistributedFileSystem {
493 private volatile boolean throwException = false;
494
495 public FaultyDistributedFileSystem() {
496 super();
497 }
498
499 public void setThrowException(boolean throwException) {
500 this.throwException = throwException;
501 }
502
503 @Override
504 public boolean rename(Path src, Path dst) throws IOException {
505 if (throwException) {
506 throw new IOException("No more files allowed");
507 }
508 return super.rename(src, dst);
509 }
510 }
511 }