View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.compactions;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.List;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
31  import org.apache.hadoop.hbase.regionserver.StoreFile;
32  import org.apache.hadoop.hbase.regionserver.StoreUtils;
33  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
34  
35  /**
36   * 
37   * FIFO compaction policy selects only files which have all cells expired. 
38   * The column family MUST have non-default TTL. One of the use cases for this 
39   * policy is when we need to store raw data which will be post-processed later 
40   * and discarded completely after quite short period of time. Raw time-series vs. 
41   * time-based roll up aggregates and compacted time-series. We collect raw time-series
42   * and store them into CF with FIFO compaction policy, periodically we run task 
43   * which creates roll up aggregates and compacts time-series, the original raw data 
44   * can be discarded after that.
45   * 
46   */
47  @InterfaceAudience.Private
48  public class FIFOCompactionPolicy extends ExploringCompactionPolicy {
49    
50    private static final Log LOG = LogFactory.getLog(FIFOCompactionPolicy.class);
51  
52  
53    public FIFOCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) {
54      super(conf, storeConfigInfo);
55    }
56  
57    @Override
58    public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
59        List<StoreFile> filesCompacting, boolean isUserCompaction, boolean mayUseOffPeak,
60        boolean forceMajor) throws IOException {
61      
62      if(forceMajor){
63        LOG.warn("Major compaction is not supported for FIFO compaction policy. Ignore the flag.");
64      }
65      boolean isAfterSplit = StoreUtils.hasReferences(candidateFiles);
66      if(isAfterSplit){
67        LOG.info("Split detected, delegate selection to the parent policy.");
68        return super.selectCompaction(candidateFiles, filesCompacting, isUserCompaction, 
69          mayUseOffPeak, forceMajor);
70      }
71      
72      // Nothing to compact
73      Collection<StoreFile> toCompact = getExpiredStores(candidateFiles, filesCompacting);
74      CompactionRequest result = new CompactionRequest(toCompact);
75      return result;
76    }
77  
78    @Override
79    public boolean shouldPerformMajorCompaction(Collection<StoreFile> filesToCompact)
80      throws IOException {
81      boolean isAfterSplit = StoreUtils.hasReferences(filesToCompact);
82      if(isAfterSplit){
83        LOG.info("Split detected, delegate to the parent policy.");
84        return super.shouldPerformMajorCompaction(filesToCompact);
85      }
86      return false;
87    }
88  
89    @Override
90    public boolean needsCompaction(Collection<StoreFile> storeFiles, 
91        List<StoreFile> filesCompacting) {  
92      boolean isAfterSplit = StoreUtils.hasReferences(storeFiles);
93      if(isAfterSplit){
94        LOG.info("Split detected, delegate to the parent policy.");
95        return super.needsCompaction(storeFiles, filesCompacting);
96      }
97      return hasExpiredStores(storeFiles);
98    }
99  
100   private  boolean hasExpiredStores(Collection<StoreFile> files) {
101     long currentTime = EnvironmentEdgeManager.currentTime();
102     for(StoreFile sf: files){
103       // Check MIN_VERSIONS is in HStore removeUnneededFiles
104       Long maxTs = sf.getReader().getMaxTimestamp();
105       long maxTtl = storeConfigInfo.getStoreFileTtl();
106       if(maxTs == null 
107           || maxTtl == Long.MAX_VALUE
108           || (currentTime - maxTtl < maxTs)){
109         continue; 
110       } else{
111         return true;
112       }
113     }
114     return false;
115   }
116 
117   private  Collection<StoreFile> getExpiredStores(Collection<StoreFile> files,
118     Collection<StoreFile> filesCompacting) {
119     long currentTime = EnvironmentEdgeManager.currentTime();
120     Collection<StoreFile> expiredStores = new ArrayList<StoreFile>();    
121     for(StoreFile sf: files){
122       // Check MIN_VERSIONS is in HStore removeUnneededFiles
123       Long maxTs = sf.getReader().getMaxTimestamp();
124       long maxTtl = storeConfigInfo.getStoreFileTtl();
125       if(maxTs == null 
126           || maxTtl == Long.MAX_VALUE
127           || (currentTime - maxTtl < maxTs)){
128         continue; 
129       } else if(filesCompacting == null || filesCompacting.contains(sf) == false){
130         expiredStores.add(sf);
131       }
132     }
133     return expiredStores;
134   }
135 }