1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.compactions;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.List;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
31 import org.apache.hadoop.hbase.regionserver.StoreFile;
32 import org.apache.hadoop.hbase.regionserver.StoreUtils;
33 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
34
35
36
37
38
39
40
41
42
43
44
45
46
47 @InterfaceAudience.Private
48 public class FIFOCompactionPolicy extends ExploringCompactionPolicy {
49
50 private static final Log LOG = LogFactory.getLog(FIFOCompactionPolicy.class);
51
52
53 public FIFOCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) {
54 super(conf, storeConfigInfo);
55 }
56
57 @Override
58 public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
59 List<StoreFile> filesCompacting, boolean isUserCompaction, boolean mayUseOffPeak,
60 boolean forceMajor) throws IOException {
61
62 if(forceMajor){
63 LOG.warn("Major compaction is not supported for FIFO compaction policy. Ignore the flag.");
64 }
65 boolean isAfterSplit = StoreUtils.hasReferences(candidateFiles);
66 if(isAfterSplit){
67 LOG.info("Split detected, delegate selection to the parent policy.");
68 return super.selectCompaction(candidateFiles, filesCompacting, isUserCompaction,
69 mayUseOffPeak, forceMajor);
70 }
71
72
73 Collection<StoreFile> toCompact = getExpiredStores(candidateFiles, filesCompacting);
74 CompactionRequest result = new CompactionRequest(toCompact);
75 return result;
76 }
77
78 @Override
79 public boolean shouldPerformMajorCompaction(Collection<StoreFile> filesToCompact)
80 throws IOException {
81 boolean isAfterSplit = StoreUtils.hasReferences(filesToCompact);
82 if(isAfterSplit){
83 LOG.info("Split detected, delegate to the parent policy.");
84 return super.shouldPerformMajorCompaction(filesToCompact);
85 }
86 return false;
87 }
88
89 @Override
90 public boolean needsCompaction(Collection<StoreFile> storeFiles,
91 List<StoreFile> filesCompacting) {
92 boolean isAfterSplit = StoreUtils.hasReferences(storeFiles);
93 if(isAfterSplit){
94 LOG.info("Split detected, delegate to the parent policy.");
95 return super.needsCompaction(storeFiles, filesCompacting);
96 }
97 return hasExpiredStores(storeFiles);
98 }
99
100 private boolean hasExpiredStores(Collection<StoreFile> files) {
101 long currentTime = EnvironmentEdgeManager.currentTime();
102 for(StoreFile sf: files){
103
104 Long maxTs = sf.getReader().getMaxTimestamp();
105 long maxTtl = storeConfigInfo.getStoreFileTtl();
106 if(maxTs == null
107 || maxTtl == Long.MAX_VALUE
108 || (currentTime - maxTtl < maxTs)){
109 continue;
110 } else{
111 return true;
112 }
113 }
114 return false;
115 }
116
117 private Collection<StoreFile> getExpiredStores(Collection<StoreFile> files,
118 Collection<StoreFile> filesCompacting) {
119 long currentTime = EnvironmentEdgeManager.currentTime();
120 Collection<StoreFile> expiredStores = new ArrayList<StoreFile>();
121 for(StoreFile sf: files){
122
123 Long maxTs = sf.getReader().getMaxTimestamp();
124 long maxTtl = storeConfigInfo.getStoreFileTtl();
125 if(maxTs == null
126 || maxTtl == Long.MAX_VALUE
127 || (currentTime - maxTtl < maxTs)){
128 continue;
129 } else if(filesCompacting == null || filesCompacting.contains(sf) == false){
130 expiredStores.add(sf);
131 }
132 }
133 return expiredStores;
134 }
135 }