View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mob;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.concurrent.ConcurrentHashMap;
27  import java.util.concurrent.Executors;
28  import java.util.concurrent.ScheduledExecutorService;
29  import java.util.concurrent.TimeUnit;
30  import java.util.concurrent.atomic.AtomicLong;
31  import java.util.concurrent.locks.ReentrantLock;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.hbase.classification.InterfaceAudience;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.util.IdLock;
40  
41  import com.google.common.util.concurrent.ThreadFactoryBuilder;
42  
43  /**
44   * The cache for mob files.
45   * This cache doesn't cache the mob file blocks. It only caches the references of mob files.
46   * We are doing this to avoid opening and closing mob files all the time. We just keep
47   * references open.
48   */
49  @InterfaceAudience.Private
50  public class MobFileCache {
51  
52    private static final Log LOG = LogFactory.getLog(MobFileCache.class);
53  
54    /*
55     * Eviction and statistics thread. Periodically run to print the statistics and
56     * evict the lru cached mob files when the count of the cached files is larger
57     * than the threshold.
58     */
59    static class EvictionThread extends Thread {
60      MobFileCache lru;
61  
62      public EvictionThread(MobFileCache lru) {
63        super("MobFileCache.EvictionThread");
64        setDaemon(true);
65        this.lru = lru;
66      }
67  
68      @Override
69      public void run() {
70        lru.evict();
71      }
72    }
73  
74    // a ConcurrentHashMap, accesses to this map are synchronized.
75    private Map<String, CachedMobFile> map = null;
76    // caches access count
77    private final AtomicLong count = new AtomicLong(0);
78    private long lastAccess = 0;
79    private final AtomicLong miss = new AtomicLong(0);
80    private long lastMiss = 0;
81    private final AtomicLong evictedFileCount = new AtomicLong(0);
82    private long lastEvictedFileCount = 0;
83  
84    // a lock to sync the evict to guarantee the eviction occurs in sequence.
85    // the method evictFile is not sync by this lock, the ConcurrentHashMap does the sync there.
86    private final ReentrantLock evictionLock = new ReentrantLock(true);
87  
88    //stripes lock on each mob file based on its hash. Sync the openFile/closeFile operations.
89    private final IdLock keyLock = new IdLock();
90  
91    private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
92        new ThreadFactoryBuilder().setNameFormat("MobFileCache #%d").setDaemon(true).build());
93    private final Configuration conf;
94  
95    // the count of the cached references to mob files
96    private final int mobFileMaxCacheSize;
97    private final boolean isCacheEnabled;
98    private float evictRemainRatio;
99  
100   public MobFileCache(Configuration conf) {
101     this.conf = conf;
102     this.mobFileMaxCacheSize = conf.getInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY,
103         MobConstants.DEFAULT_MOB_FILE_CACHE_SIZE);
104     isCacheEnabled = (mobFileMaxCacheSize > 0);
105     map = new ConcurrentHashMap<String, CachedMobFile>(mobFileMaxCacheSize);
106     if (isCacheEnabled) {
107       long period = conf.getLong(MobConstants.MOB_CACHE_EVICT_PERIOD,
108           MobConstants.DEFAULT_MOB_CACHE_EVICT_PERIOD); // in seconds
109       evictRemainRatio = conf.getFloat(MobConstants.MOB_CACHE_EVICT_REMAIN_RATIO,
110           MobConstants.DEFAULT_EVICT_REMAIN_RATIO);
111       if (evictRemainRatio < 0.0) {
112         evictRemainRatio = 0.0f;
113         LOG.warn(MobConstants.MOB_CACHE_EVICT_REMAIN_RATIO + " is less than 0.0, 0.0 is used.");
114       } else if (evictRemainRatio > 1.0) {
115         evictRemainRatio = 1.0f;
116         LOG.warn(MobConstants.MOB_CACHE_EVICT_REMAIN_RATIO + " is larger than 1.0, 1.0 is used.");
117       }
118       this.scheduleThreadPool.scheduleAtFixedRate(new EvictionThread(this), period, period,
119           TimeUnit.SECONDS);
120 
121       LOG.info("MobFileCache enabled with cacheSize=" + mobFileMaxCacheSize +
122           ", evictPeriods=" +  period + "sec, evictRemainRatio=" + evictRemainRatio);
123     } else {
124       LOG.info("MobFileCache disabled");
125     }
126   }
127 
128   /**
129    * Evicts the lru cached mob files when the count of the cached files is larger
130    * than the threshold.
131    */
132   public void evict() {
133     if (isCacheEnabled) {
134       // Ensure only one eviction at a time
135       if (!evictionLock.tryLock()) {
136         return;
137       }
138       printStatistics();
139       List<CachedMobFile> evictedFiles = new ArrayList<CachedMobFile>();
140       try {
141         if (map.size() <= mobFileMaxCacheSize) {
142           return;
143         }
144         List<CachedMobFile> files = new ArrayList<CachedMobFile>(map.values());
145         Collections.sort(files);
146         int start = (int) (mobFileMaxCacheSize * evictRemainRatio);
147         if (start >= 0) {
148           for (int i = start; i < files.size(); i++) {
149             String name = files.get(i).getFileName();
150             CachedMobFile evictedFile = map.remove(name);
151             if (evictedFile != null) {
152               evictedFiles.add(evictedFile);
153             }
154           }
155         }
156       } finally {
157         evictionLock.unlock();
158       }
159       // EvictionLock is released. Close the evicted files one by one.
160       // The closes are sync in the closeFile method.
161       for (CachedMobFile evictedFile : evictedFiles) {
162         closeFile(evictedFile);
163       }
164       evictedFileCount.addAndGet(evictedFiles.size());
165     }
166   }
167 
168   /**
169    * Evicts the cached file by the name.
170    * @param fileName The name of a cached file.
171    */
172   public void evictFile(String fileName) {
173     if (isCacheEnabled) {
174       IdLock.Entry lockEntry = null;
175       try {
176         // obtains the lock to close the cached file.
177         lockEntry = keyLock.getLockEntry(fileName.hashCode());
178         CachedMobFile evictedFile = map.remove(fileName);
179         if (evictedFile != null) {
180           evictedFile.close();
181           evictedFileCount.incrementAndGet();
182         }
183       } catch (IOException e) {
184         LOG.error("Failed to evict the file " + fileName, e);
185       } finally {
186         if (lockEntry != null) {
187           keyLock.releaseLockEntry(lockEntry);
188         }
189       }
190     }
191   }
192 
193   /**
194    * Opens a mob file.
195    * @param fs The current file system.
196    * @param path The file path.
197    * @param cacheConf The current MobCacheConfig
198    * @return A opened mob file.
199    * @throws IOException
200    */
201   public MobFile openFile(FileSystem fs, Path path, MobCacheConfig cacheConf) throws IOException {
202     if (!isCacheEnabled) {
203       MobFile mobFile = MobFile.create(fs, path, conf, cacheConf);
204       mobFile.open();
205       return mobFile;
206     } else {
207       String fileName = path.getName();
208       CachedMobFile cached = map.get(fileName);
209       IdLock.Entry lockEntry = keyLock.getLockEntry(fileName.hashCode());
210       try {
211         if (cached == null) {
212           cached = map.get(fileName);
213           if (cached == null) {
214             if (map.size() > mobFileMaxCacheSize) {
215               evict();
216             }
217             cached = CachedMobFile.create(fs, path, conf, cacheConf);
218             cached.open();
219             map.put(fileName, cached);
220             miss.incrementAndGet();
221           }
222         }
223         cached.open();
224         cached.access(count.incrementAndGet());
225       } finally {
226         keyLock.releaseLockEntry(lockEntry);
227       }
228       return cached;
229     }
230   }
231 
232   /**
233    * Closes a mob file.
234    * @param file The mob file that needs to be closed.
235    */
236   public void closeFile(MobFile file) {
237     IdLock.Entry lockEntry = null;
238     try {
239       if (!isCacheEnabled) {
240         file.close();
241       } else {
242         lockEntry = keyLock.getLockEntry(file.getFileName().hashCode());
243         file.close();
244       }
245     } catch (IOException e) {
246       LOG.error("MobFileCache, Exception happen during close " + file.getFileName(), e);
247     } finally {
248       if (lockEntry != null) {
249         keyLock.releaseLockEntry(lockEntry);
250       }
251     }
252   }
253 
254   public void shutdown() {
255     this.scheduleThreadPool.shutdown();
256     for (int i = 0; i < 100; i++) {
257       if (!this.scheduleThreadPool.isShutdown()) {
258         try {
259           Thread.sleep(10);
260         } catch (InterruptedException e) {
261           LOG.warn("Interrupted while sleeping");
262           Thread.currentThread().interrupt();
263           break;
264         }
265       }
266     }
267 
268     if (!this.scheduleThreadPool.isShutdown()) {
269       List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
270       LOG.debug("Still running " + runnables);
271     }
272   }
273 
274   /**
275    * Gets the count of cached mob files.
276    * @return The count of the cached mob files.
277    */
278   public int getCacheSize() {
279     return map == null ? 0 : map.size();
280   }
281 
282   /**
283    * Gets the count of accesses to the mob file cache.
284    * @return The count of accesses to the mob file cache.
285    */
286   public long getAccessCount() {
287     return count.get();
288   }
289 
290   /**
291    * Gets the count of misses to the mob file cache.
292    * @return The count of misses to the mob file cache.
293    */
294   public long getMissCount() {
295     return miss.get();
296   }
297 
298   /**
299    * Gets the number of items evicted from the mob file cache.
300    * @return The number of items evicted from the mob file cache.
301    */
302   public long getEvictedFileCount() {
303     return evictedFileCount.get();
304   }
305 
306   /**
307    * Gets the hit ratio to the mob file cache.
308    * @return The hit ratio to the mob file cache.
309    */
310   public double getHitRatio() {
311     return count.get() == 0 ? 0 : ((float) (count.get() - miss.get())) / (float) count.get();
312   }
313 
314   /**
315    * Prints the statistics.
316    */
317   public void printStatistics() {
318     long access = count.get() - lastAccess;
319     long missed = miss.get() - lastMiss;
320     long evicted = evictedFileCount.get() - lastEvictedFileCount;
321     int hitRatio = access == 0 ? 0 : (int) (((float) (access - missed)) / (float) access * 100);
322     LOG.info("MobFileCache Statistics, access: " + access + ", miss: " + missed + ", hit: "
323         + (access - missed) + ", hit ratio: " + hitRatio + "%, evicted files: " + evicted);
324     lastAccess += access;
325     lastMiss += missed;
326     lastEvictedFileCount += evicted;
327   }
328 
329 }