View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.lang.ref.WeakReference;
22  import java.nio.ByteBuffer;
23  import java.util.EnumMap;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.PriorityQueue;
28  import java.util.SortedSet;
29  import java.util.TreeSet;
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.Executors;
32  import java.util.concurrent.ScheduledExecutorService;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.atomic.AtomicLong;
35  import java.util.concurrent.locks.ReentrantLock;
36  
37  import com.google.common.base.Objects;
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.hbase.io.HeapSize;
43  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
44  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.hbase.util.ClassSize;
47  import org.apache.hadoop.hbase.util.HasThread;
48  import org.apache.hadoop.util.StringUtils;
49  import org.codehaus.jackson.annotate.JsonIgnore;
50  import org.codehaus.jackson.annotate.JsonIgnoreProperties;
51  
52  import com.google.common.annotations.VisibleForTesting;
53  import com.google.common.util.concurrent.ThreadFactoryBuilder;
54  
55  /**
56   * A block cache implementation that is memory-aware using {@link HeapSize},
57   * memory-bound using an LRU eviction algorithm, and concurrent: backed by a
58   * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving
59   * constant-time {@link #cacheBlock} and {@link #getBlock} operations.<p>
60   *
61   * Contains three levels of block priority to allow for
62   * scan-resistance and in-memory families 
63   * {@link org.apache.hadoop.hbase.HColumnDescriptor#setInMemory(boolean)} (An
64   * in-memory column family is a column family that should be served from memory if possible):
65   * single-access, multiple-accesses, and in-memory priority.
66   * A block is added with an in-memory priority flag if
67   * {@link org.apache.hadoop.hbase.HColumnDescriptor#isInMemory()}, 
68   * otherwise a block becomes a single access
69   * priority the first time it is read into this block cache.  If a block is accessed again while
70   * in cache, it is marked as a multiple access priority block.  This delineation of blocks is used
71   * to prevent scans from thrashing the cache adding a least-frequently-used
72   * element to the eviction algorithm.<p>
73   *
74   * Each priority is given its own chunk of the total cache to ensure
75   * fairness during eviction.  Each priority will retain close to its maximum
76   * size, however, if any priority is not using its entire chunk the others
77   * are able to grow beyond their chunk size.<p>
78   *
79   * Instantiated at a minimum with the total size and average block size.
80   * All sizes are in bytes.  The block size is not especially important as this
81   * cache is fully dynamic in its sizing of blocks.  It is only used for
82   * pre-allocating data structures and in initial heap estimation of the map.<p>
83   *
84   * The detailed constructor defines the sizes for the three priorities (they
85   * should total to the <code>maximum size</code> defined).  It also sets the levels that
86   * trigger and control the eviction thread.<p>
87   *
88   * The <code>acceptable size</code> is the cache size level which triggers the eviction
89   * process to start.  It evicts enough blocks to get the size below the
90   * minimum size specified.<p>
91   *
92   * Eviction happens in a separate thread and involves a single full-scan
93   * of the map.  It determines how many bytes must be freed to reach the minimum
94   * size, and then while scanning determines the fewest least-recently-used
95   * blocks necessary from each of the three priorities (would be 3 times bytes
96   * to free).  It then uses the priority chunk sizes to evict fairly according
97   * to the relative sizes and usage.
98   */
99  @InterfaceAudience.Private
100 @JsonIgnoreProperties({"encodingCountsForTest"})
101 public class LruBlockCache implements ResizableBlockCache, HeapSize {
102 
103   static final Log LOG = LogFactory.getLog(LruBlockCache.class);
104 
105   /**
106    * Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep
107    * evicting during an eviction run till the cache size is down to 80% of the total.
108    */
109   static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";
110 
111   /**
112    * Acceptable size of cache (no evictions if size < acceptable)
113    */
114   static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.acceptable.factor";
115 
116   static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.single.percentage";
117   static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.multi.percentage";
118   static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.memory.percentage";
119 
120   /**
121    * Configuration key to force data-block always (except in-memory are too much)
122    * cached in memory for in-memory hfile, unlike inMemory, which is a column-family
123    * configuration, inMemoryForceMode is a cluster-wide configuration
124    */
125   static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME = "hbase.lru.rs.inmemoryforcemode";
126 
127   /** Default Configuration Parameters*/
128 
129   /** Backing Concurrent Map Configuration */
130   static final float DEFAULT_LOAD_FACTOR = 0.75f;
131   static final int DEFAULT_CONCURRENCY_LEVEL = 16;
132 
133   /** Eviction thresholds */
134   static final float DEFAULT_MIN_FACTOR = 0.95f;
135   static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f;
136 
137   /** Priority buckets */
138   static final float DEFAULT_SINGLE_FACTOR = 0.25f;
139   static final float DEFAULT_MULTI_FACTOR = 0.50f;
140   static final float DEFAULT_MEMORY_FACTOR = 0.25f;
141 
142   static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false;
143 
144   /** Statistics thread */
145   static final int statThreadPeriod = 60 * 5;
146 
147   /** Concurrent map (the cache) */
148   private final Map<BlockCacheKey,LruCachedBlock> map;
149 
150   /** Eviction lock (locked when eviction in process) */
151   private final ReentrantLock evictionLock = new ReentrantLock(true);
152 
153   /** Volatile boolean to track if we are in an eviction process or not */
154   private volatile boolean evictionInProgress = false;
155 
156   /** Eviction thread */
157   private final EvictionThread evictionThread;
158 
159   /** Statistics thread schedule pool (for heavy debugging, could remove) */
160   private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
161     new ThreadFactoryBuilder().setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build());
162 
163   /** Current size of cache */
164   private final AtomicLong size;
165 
166   /** Current number of cached elements */
167   private final AtomicLong elements;
168 
169   /** Cache access count (sequential ID) */
170   private final AtomicLong count;
171 
172   /** Cache statistics */
173   private final CacheStats stats;
174 
175   /** Maximum allowable size of cache (block put if size > max, evict) */
176   private long maxSize;
177 
178   /** Approximate block size */
179   private long blockSize;
180 
181   /** Acceptable size of cache (no evictions if size < acceptable) */
182   private float acceptableFactor;
183 
184   /** Minimum threshold of cache (when evicting, evict until size < min) */
185   private float minFactor;
186 
187   /** Single access bucket size */
188   private float singleFactor;
189 
190   /** Multiple access bucket size */
191   private float multiFactor;
192 
193   /** In-memory bucket size */
194   private float memoryFactor;
195 
196   /** Overhead of the structure itself */
197   private long overhead;
198 
199   /** Whether in-memory hfile's data block has higher priority when evicting */
200   private boolean forceInMemory;
201 
202   /** Where to send victims (blocks evicted/missing from the cache) */
203   private BlockCache victimHandler = null;
204 
205   /**
206    * Default constructor.  Specify maximum size and expected average block
207    * size (approximation is fine).
208    *
209    * <p>All other factors will be calculated based on defaults specified in
210    * this class.
211    * @param maxSize maximum size of cache, in bytes
212    * @param blockSize approximate size of each block, in bytes
213    */
214   public LruBlockCache(long maxSize, long blockSize) {
215     this(maxSize, blockSize, true);
216   }
217 
218   /**
219    * Constructor used for testing.  Allows disabling of the eviction thread.
220    */
221   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
222     this(maxSize, blockSize, evictionThread,
223         (int)Math.ceil(1.2*maxSize/blockSize),
224         DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
225         DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
226         DEFAULT_SINGLE_FACTOR,
227         DEFAULT_MULTI_FACTOR,
228         DEFAULT_MEMORY_FACTOR,
229         false
230         );
231   }
232 
233   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) {
234     this(maxSize, blockSize, evictionThread,
235         (int)Math.ceil(1.2*maxSize/blockSize),
236         DEFAULT_LOAD_FACTOR,
237         DEFAULT_CONCURRENCY_LEVEL,
238         conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR),
239         conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR),
240         conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR),
241         conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR),
242         conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR),
243         conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE)
244         );
245   }
246 
247   public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
248     this(maxSize, blockSize, true, conf);
249   }
250 
251   /**
252    * Configurable constructor.  Use this constructor if not using defaults.
253    * @param maxSize maximum size of this cache, in bytes
254    * @param blockSize expected average size of blocks, in bytes
255    * @param evictionThread whether to run evictions in a bg thread or not
256    * @param mapInitialSize initial size of backing ConcurrentHashMap
257    * @param mapLoadFactor initial load factor of backing ConcurrentHashMap
258    * @param mapConcurrencyLevel initial concurrency factor for backing CHM
259    * @param minFactor percentage of total size that eviction will evict until
260    * @param acceptableFactor percentage of total size that triggers eviction
261    * @param singleFactor percentage of total size for single-access blocks
262    * @param multiFactor percentage of total size for multiple-access blocks
263    * @param memoryFactor percentage of total size for in-memory blocks
264    */
265   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
266       int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
267       float minFactor, float acceptableFactor, float singleFactor,
268       float multiFactor, float memoryFactor, boolean forceInMemory) {
269     if(singleFactor + multiFactor + memoryFactor != 1 ||
270         singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) {
271       throw new IllegalArgumentException("Single, multi, and memory factors " +
272           " should be non-negative and total 1.0");
273     }
274     if(minFactor >= acceptableFactor) {
275       throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
276     }
277     if(minFactor >= 1.0f || acceptableFactor >= 1.0f) {
278       throw new IllegalArgumentException("all factors must be < 1");
279     }
280     this.maxSize = maxSize;
281     this.blockSize = blockSize;
282     this.forceInMemory = forceInMemory;
283     map = new ConcurrentHashMap<BlockCacheKey,LruCachedBlock>(mapInitialSize,
284         mapLoadFactor, mapConcurrencyLevel);
285     this.minFactor = minFactor;
286     this.acceptableFactor = acceptableFactor;
287     this.singleFactor = singleFactor;
288     this.multiFactor = multiFactor;
289     this.memoryFactor = memoryFactor;
290     this.stats = new CacheStats(this.getClass().getSimpleName());
291     this.count = new AtomicLong(0);
292     this.elements = new AtomicLong(0);
293     this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
294     this.size = new AtomicLong(this.overhead);
295     if(evictionThread) {
296       this.evictionThread = new EvictionThread(this);
297       this.evictionThread.start(); // FindBugs SC_START_IN_CTOR
298     } else {
299       this.evictionThread = null;
300     }
301     // TODO: Add means of turning this off.  Bit obnoxious running thread just to make a log
302     // every five minutes.
303     this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
304         statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
305   }
306 
307   @Override
308   public void setMaxSize(long maxSize) {
309     this.maxSize = maxSize;
310     if(this.size.get() > acceptableSize() && !evictionInProgress) {
311       runEviction();
312     }
313   }
314 
315   // BlockCache implementation
316 
317   /**
318    * Cache the block with the specified name and buffer.
319    * <p>
320    * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)
321    * this can happen, for which we compare the buffer contents.
322    * @param cacheKey block's cache key
323    * @param buf block buffer
324    * @param inMemory if block is in-memory
325    * @param cacheDataInL1
326    */
327   @Override
328   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
329       final boolean cacheDataInL1) {
330     LruCachedBlock cb = map.get(cacheKey);
331     if (cb != null) {
332       // compare the contents, if they are not equal, we are in big trouble
333       if (compare(buf, cb.getBuffer()) != 0) {
334         throw new RuntimeException("Cached block contents differ, which should not have happened."
335           + "cacheKey:" + cacheKey);
336       }
337       String msg = "Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey();
338       msg += ". This is harmless and can happen in rare cases (see HBASE-8547)";
339       LOG.warn(msg);
340       return;
341     }
342     cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
343     long newSize = updateSizeMetrics(cb, false);
344     map.put(cacheKey, cb);
345     long val = elements.incrementAndGet();
346     if (LOG.isTraceEnabled()) {
347       long size = map.size();
348       assertCounterSanity(size, val);
349     }
350     if (newSize > acceptableSize() && !evictionInProgress) {
351       runEviction();
352     }
353   }
354 
355   /**
356    * Sanity-checking for parity between actual block cache content and metrics.
357    * Intended only for use with TRACE level logging and -ea JVM.
358    */
359   private static void assertCounterSanity(long mapSize, long counterVal) {
360     if (counterVal < 0) {
361       LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal +
362         ", mapSize=" + mapSize);
363       return;
364     }
365     if (mapSize < Integer.MAX_VALUE) {
366       double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.);
367       if (pct_diff > 0.05) {
368         LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal +
369           ", mapSize=" + mapSize);
370       }
371     }
372   }
373 
374   private int compare(Cacheable left, Cacheable right) {
375     ByteBuffer l = ByteBuffer.allocate(left.getSerializedLength());
376     left.serialize(l);
377     ByteBuffer r = ByteBuffer.allocate(right.getSerializedLength());
378     right.serialize(r);
379     return Bytes.compareTo(l.array(), l.arrayOffset(), l.limit(),
380       r.array(), r.arrayOffset(), r.limit());
381   }
382 
383   /**
384    * Cache the block with the specified name and buffer.
385    * <p>
386    * @param cacheKey block's cache key
387    * @param buf block buffer
388    */
389   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
390     cacheBlock(cacheKey, buf, false, false);
391   }
392 
393   /**
394    * Helper function that updates the local size counter and also updates any
395    * per-cf or per-blocktype metrics it can discern from given
396    * {@link LruCachedBlock}
397    *
398    * @param cb
399    * @param evict
400    */
401   protected long updateSizeMetrics(LruCachedBlock cb, boolean evict) {
402     long heapsize = cb.heapSize();
403     if (evict) {
404       heapsize *= -1;
405     }
406     return size.addAndGet(heapsize);
407   }
408 
409   /**
410    * Get the buffer of the block with the specified name.
411    * @param cacheKey block's cache key
412    * @param caching true if the caller caches blocks on cache misses
413    * @param repeat Whether this is a repeat lookup for the same block
414    *        (used to avoid double counting cache misses when doing double-check locking)
415    * @param updateCacheMetrics Whether to update cache metrics or not
416    * @return buffer of specified cache key, or null if not in cache
417    */
418   @Override
419   public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
420       boolean updateCacheMetrics) {
421     LruCachedBlock cb = map.get(cacheKey);
422     if (cb == null) {
423       if (!repeat && updateCacheMetrics) {
424         stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
425       }
426       // If there is another block cache then try and read there.
427       // However if this is a retry ( second time in double checked locking )
428       // And it's already a miss then the l2 will also be a miss.
429       if (victimHandler != null && !repeat) {
430         Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
431 
432         // Promote this to L1.
433         if (result != null && caching) {
434           cacheBlock(cacheKey, result, /* inMemory = */ false, /* cacheData = */ true);
435         }
436         return result;
437       }
438       return null;
439     }
440     if (updateCacheMetrics) stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
441     cb.access(count.incrementAndGet());
442     return cb.getBuffer();
443   }
444 
445   /**
446    * Whether the cache contains block with specified cacheKey
447    * @param cacheKey
448    * @return true if contains the block
449    */
450   public boolean containsBlock(BlockCacheKey cacheKey) {
451     return map.containsKey(cacheKey);
452   }
453 
454   @Override
455   public boolean evictBlock(BlockCacheKey cacheKey) {
456     LruCachedBlock cb = map.get(cacheKey);
457     if (cb == null) return false;
458     evictBlock(cb, false);
459     return true;
460   }
461 
462   /**
463    * Evicts all blocks for a specific HFile. This is an
464    * expensive operation implemented as a linear-time search through all blocks
465    * in the cache. Ideally this should be a search in a log-access-time map.
466    *
467    * <p>
468    * This is used for evict-on-close to remove all blocks of a specific HFile.
469    *
470    * @return the number of blocks evicted
471    */
472   @Override
473   public int evictBlocksByHfileName(String hfileName) {
474     int numEvicted = 0;
475     for (BlockCacheKey key : map.keySet()) {
476       if (key.getHfileName().equals(hfileName)) {
477         if (evictBlock(key))
478           ++numEvicted;
479       }
480     }
481     if (victimHandler != null) {
482       numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
483     }
484     return numEvicted;
485   }
486 
487   /**
488    * Evict the block, and it will be cached by the victim handler if exists &&
489    * block may be read again later
490    * @param block
491    * @param evictedByEvictionProcess true if the given block is evicted by
492    *          EvictionThread
493    * @return the heap size of evicted block
494    */
495   protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) {
496     map.remove(block.getCacheKey());
497     updateSizeMetrics(block, true);
498     long val = elements.decrementAndGet();
499     if (LOG.isTraceEnabled()) {
500       long size = map.size();
501       assertCounterSanity(size, val);
502     }
503     stats.evicted(block.getCachedTime(), block.getCacheKey().isPrimary());
504     if (evictedByEvictionProcess && victimHandler != null) {
505       if (victimHandler instanceof BucketCache) {
506         boolean wait = getCurrentSize() < acceptableSize();
507         boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
508         ((BucketCache)victimHandler).cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
509             inMemory, wait);
510       } else {
511         victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer());
512       }
513     }
514     return block.heapSize();
515   }
516 
517   /**
518    * Multi-threaded call to run the eviction process.
519    */
520   private void runEviction() {
521     if(evictionThread == null) {
522       evict();
523     } else {
524       evictionThread.evict();
525     }
526   }
527 
528   /**
529    * Eviction method.
530    */
531   void evict() {
532 
533     // Ensure only one eviction at a time
534     if(!evictionLock.tryLock()) return;
535 
536     try {
537       evictionInProgress = true;
538       long currentSize = this.size.get();
539       long bytesToFree = currentSize - minSize();
540 
541       if (LOG.isTraceEnabled()) {
542         LOG.trace("Block cache LRU eviction started; Attempting to free " +
543           StringUtils.byteDesc(bytesToFree) + " of total=" +
544           StringUtils.byteDesc(currentSize));
545       }
546 
547       if(bytesToFree <= 0) return;
548 
549       // Instantiate priority buckets
550       BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize,
551           singleSize());
552       BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize,
553           multiSize());
554       BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize,
555           memorySize());
556 
557       // Scan entire map putting into appropriate buckets
558       for(LruCachedBlock cachedBlock : map.values()) {
559         switch(cachedBlock.getPriority()) {
560           case SINGLE: {
561             bucketSingle.add(cachedBlock);
562             break;
563           }
564           case MULTI: {
565             bucketMulti.add(cachedBlock);
566             break;
567           }
568           case MEMORY: {
569             bucketMemory.add(cachedBlock);
570             break;
571           }
572         }
573       }
574 
575       long bytesFreed = 0;
576       if (forceInMemory || memoryFactor > 0.999f) {
577         long s = bucketSingle.totalSize();
578         long m = bucketMulti.totalSize();
579         if (bytesToFree > (s + m)) {
580           // this means we need to evict blocks in memory bucket to make room,
581           // so the single and multi buckets will be emptied
582           bytesFreed = bucketSingle.free(s);
583           bytesFreed += bucketMulti.free(m);
584           if (LOG.isTraceEnabled()) {
585             LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
586               " from single and multi buckets");
587           }
588           bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
589           if (LOG.isTraceEnabled()) {
590             LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
591               " total from all three buckets ");
592           }
593         } else {
594           // this means no need to evict block in memory bucket,
595           // and we try best to make the ratio between single-bucket and
596           // multi-bucket is 1:2
597           long bytesRemain = s + m - bytesToFree;
598           if (3 * s <= bytesRemain) {
599             // single-bucket is small enough that no eviction happens for it
600             // hence all eviction goes from multi-bucket
601             bytesFreed = bucketMulti.free(bytesToFree);
602           } else if (3 * m <= 2 * bytesRemain) {
603             // multi-bucket is small enough that no eviction happens for it
604             // hence all eviction goes from single-bucket
605             bytesFreed = bucketSingle.free(bytesToFree);
606           } else {
607             // both buckets need to evict some blocks
608             bytesFreed = bucketSingle.free(s - bytesRemain / 3);
609             if (bytesFreed < bytesToFree) {
610               bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);
611             }
612           }
613         }
614       } else {
615         PriorityQueue<BlockBucket> bucketQueue =
616           new PriorityQueue<BlockBucket>(3);
617 
618         bucketQueue.add(bucketSingle);
619         bucketQueue.add(bucketMulti);
620         bucketQueue.add(bucketMemory);
621 
622         int remainingBuckets = 3;
623 
624         BlockBucket bucket;
625         while((bucket = bucketQueue.poll()) != null) {
626           long overflow = bucket.overflow();
627           if(overflow > 0) {
628             long bucketBytesToFree = Math.min(overflow,
629                 (bytesToFree - bytesFreed) / remainingBuckets);
630             bytesFreed += bucket.free(bucketBytesToFree);
631           }
632           remainingBuckets--;
633         }
634       }
635 
636       if (LOG.isTraceEnabled()) {
637         long single = bucketSingle.totalSize();
638         long multi = bucketMulti.totalSize();
639         long memory = bucketMemory.totalSize();
640         LOG.trace("Block cache LRU eviction completed; " +
641           "freed=" + StringUtils.byteDesc(bytesFreed) + ", " +
642           "total=" + StringUtils.byteDesc(this.size.get()) + ", " +
643           "single=" + StringUtils.byteDesc(single) + ", " +
644           "multi=" + StringUtils.byteDesc(multi) + ", " +
645           "memory=" + StringUtils.byteDesc(memory));
646       }
647     } finally {
648       stats.evict();
649       evictionInProgress = false;
650       evictionLock.unlock();
651     }
652   }
653 
654   @Override
655   public String toString() {
656     return Objects.toStringHelper(this)
657       .add("blockCount", getBlockCount())
658       .add("currentSize", getCurrentSize())
659       .add("freeSize", getFreeSize())
660       .add("maxSize", getMaxSize())
661       .add("heapSize", heapSize())
662       .add("minSize", minSize())
663       .add("minFactor", minFactor)
664       .add("multiSize", multiSize())
665       .add("multiFactor", multiFactor)
666       .add("singleSize", singleSize())
667       .add("singleFactor", singleFactor)
668       .toString();
669   }
670 
671   /**
672    * Used to group blocks into priority buckets.  There will be a BlockBucket
673    * for each priority (single, multi, memory).  Once bucketed, the eviction
674    * algorithm takes the appropriate number of elements out of each according
675    * to configuration parameters and their relatives sizes.
676    */
677   private class BlockBucket implements Comparable<BlockBucket> {
678 
679     private final String name;
680     private LruCachedBlockQueue queue;
681     private long totalSize = 0;
682     private long bucketSize;
683 
684     public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) {
685       this.name = name;
686       this.bucketSize = bucketSize;
687       queue = new LruCachedBlockQueue(bytesToFree, blockSize);
688       totalSize = 0;
689     }
690 
691     public void add(LruCachedBlock block) {
692       totalSize += block.heapSize();
693       queue.add(block);
694     }
695 
696     public long free(long toFree) {
697       if (LOG.isTraceEnabled()) {
698         LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this);
699       }
700       LruCachedBlock cb;
701       long freedBytes = 0;
702       while ((cb = queue.pollLast()) != null) {
703         freedBytes += evictBlock(cb, true);
704         if (freedBytes >= toFree) {
705           return freedBytes;
706         }
707       }
708       if (LOG.isTraceEnabled()) {
709         LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this);
710       }
711       return freedBytes;
712     }
713 
714     public long overflow() {
715       return totalSize - bucketSize;
716     }
717 
718     public long totalSize() {
719       return totalSize;
720     }
721 
722     public int compareTo(BlockBucket that) {
723       if(this.overflow() == that.overflow()) return 0;
724       return this.overflow() > that.overflow() ? 1 : -1;
725     }
726 
727     @Override
728     public boolean equals(Object that) {
729       if (that == null || !(that instanceof BlockBucket)){
730         return false;
731       }
732       return compareTo((BlockBucket)that) == 0;
733     }
734 
735     @Override
736     public int hashCode() {
737       return Objects.hashCode(name, bucketSize, queue, totalSize);
738     }
739 
740     @Override
741     public String toString() {
742       return Objects.toStringHelper(this)
743         .add("name", name)
744         .add("totalSize", StringUtils.byteDesc(totalSize))
745         .add("bucketSize", StringUtils.byteDesc(bucketSize))
746         .toString();
747     }
748   }
749 
750   /**
751    * Get the maximum size of this cache.
752    * @return max size in bytes
753    */
754   public long getMaxSize() {
755     return this.maxSize;
756   }
757 
758   @Override
759   public long getCurrentSize() {
760     return this.size.get();
761   }
762 
763   @Override
764   public long getFreeSize() {
765     return getMaxSize() - getCurrentSize();
766   }
767 
768   @Override
769   public long size() {
770     return getMaxSize();
771   }
772 
773   @Override
774   public long getBlockCount() {
775     return this.elements.get();
776   }
777 
778   EvictionThread getEvictionThread() {
779     return this.evictionThread;
780   }
781 
782   /*
783    * Eviction thread.  Sits in waiting state until an eviction is triggered
784    * when the cache size grows above the acceptable level.<p>
785    *
786    * Thread is triggered into action by {@link LruBlockCache#runEviction()}
787    */
788   static class EvictionThread extends HasThread {
789     private WeakReference<LruBlockCache> cache;
790     private volatile boolean go = true;
791     // flag set after enter the run method, used for test
792     private boolean enteringRun = false;
793 
794     public EvictionThread(LruBlockCache cache) {
795       super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread");
796       setDaemon(true);
797       this.cache = new WeakReference<LruBlockCache>(cache);
798     }
799 
800     @Override
801     public void run() {
802       enteringRun = true;
803       while (this.go) {
804         synchronized(this) {
805           try {
806             this.wait(1000 * 10/*Don't wait for ever*/);
807           } catch(InterruptedException e) {
808             LOG.warn("Interrupted eviction thread ", e);
809             Thread.currentThread().interrupt();
810           }
811         }
812         LruBlockCache cache = this.cache.get();
813         if (cache == null) break;
814         cache.evict();
815       }
816     }
817 
818     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
819         justification="This is what we want")
820     public void evict() {
821       synchronized(this) {
822         this.notifyAll();
823       }
824     }
825 
826     synchronized void shutdown() {
827       this.go = false;
828       this.notifyAll();
829     }
830 
831     /**
832      * Used for the test.
833      */
834     boolean isEnteringRun() {
835       return this.enteringRun;
836     }
837   }
838 
839   /*
840    * Statistics thread.  Periodically prints the cache statistics to the log.
841    */
842   static class StatisticsThread extends Thread {
843     private final LruBlockCache lru;
844 
845     public StatisticsThread(LruBlockCache lru) {
846       super("LruBlockCacheStats");
847       setDaemon(true);
848       this.lru = lru;
849     }
850 
851     @Override
852     public void run() {
853       lru.logStats();
854     }
855   }
856 
857   public void logStats() {
858     // Log size
859     long totalSize = heapSize();
860     long freeSize = maxSize - totalSize;
861     LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
862         "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
863         "max=" + StringUtils.byteDesc(this.maxSize) + ", " +
864         "blockCount=" + getBlockCount() + ", " +
865         "accesses=" + stats.getRequestCount() + ", " +
866         "hits=" + stats.getHitCount() + ", " +
867         "hitRatio=" + (stats.getHitCount() == 0 ?
868           "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
869         "cachingAccesses=" + stats.getRequestCachingCount() + ", " +
870         "cachingHits=" + stats.getHitCachingCount() + ", " +
871         "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ?
872           "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) +
873         "evictions=" + stats.getEvictionCount() + ", " +
874         "evicted=" + stats.getEvictedCount() + ", " +
875         "evictedPerRun=" + stats.evictedPerEviction());
876   }
877 
878   /**
879    * Get counter statistics for this cache.
880    *
881    * <p>Includes: total accesses, hits, misses, evicted blocks, and runs
882    * of the eviction processes.
883    */
884   public CacheStats getStats() {
885     return this.stats;
886   }
887 
888   public final static long CACHE_FIXED_OVERHEAD = ClassSize.align(
889       (3 * Bytes.SIZEOF_LONG) + (9 * ClassSize.REFERENCE) +
890       (5 * Bytes.SIZEOF_FLOAT) + Bytes.SIZEOF_BOOLEAN
891       + ClassSize.OBJECT);
892 
893   @Override
894   public long heapSize() {
895     return getCurrentSize();
896   }
897 
898   public static long calculateOverhead(long maxSize, long blockSize, int concurrency){
899     // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG
900     return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP +
901         ((long)Math.ceil(maxSize*1.2/blockSize)
902             * ClassSize.CONCURRENT_HASHMAP_ENTRY) +
903         ((long)concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
904   }
905 
906   @Override
907   public Iterator<CachedBlock> iterator() {
908     final Iterator<LruCachedBlock> iterator = map.values().iterator();
909 
910     return new Iterator<CachedBlock>() {
911       private final long now = System.nanoTime();
912 
913       @Override
914       public boolean hasNext() {
915         return iterator.hasNext();
916       }
917 
918       @Override
919       public CachedBlock next() {
920         final LruCachedBlock b = iterator.next();
921         return new CachedBlock() {
922           @Override
923           public String toString() {
924             return BlockCacheUtil.toString(this, now);
925           }
926 
927           @Override
928           public BlockPriority getBlockPriority() {
929             return b.getPriority();
930           }
931 
932           @Override
933           public BlockType getBlockType() {
934             return b.getBuffer().getBlockType();
935           }
936 
937           @Override
938           public long getOffset() {
939             return b.getCacheKey().getOffset();
940           }
941 
942           @Override
943           public long getSize() {
944             return b.getBuffer().heapSize();
945           }
946 
947           @Override
948           public long getCachedTime() {
949             return b.getCachedTime();
950           }
951 
952           @Override
953           public String getFilename() {
954             return b.getCacheKey().getHfileName();
955           }
956 
957           @Override
958           public int compareTo(CachedBlock other) {
959             int diff = this.getFilename().compareTo(other.getFilename());
960             if (diff != 0) return diff;
961             diff = (int)(this.getOffset() - other.getOffset());
962             if (diff != 0) return diff;
963             if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
964               throw new IllegalStateException("" + this.getCachedTime() + ", " +
965                 other.getCachedTime());
966             }
967             return (int)(other.getCachedTime() - this.getCachedTime());
968           }
969 
970           @Override
971           public int hashCode() {
972             return b.hashCode();
973           }
974 
975           @Override
976           public boolean equals(Object obj) {
977             if (obj instanceof CachedBlock) {
978               CachedBlock cb = (CachedBlock)obj;
979               return compareTo(cb) == 0;
980             } else {
981               return false;
982             }
983           }
984         };
985       }
986 
987       @Override
988       public void remove() {
989         throw new UnsupportedOperationException();
990       }
991     };
992   }
993 
994   // Simple calculators of sizes given factors and maxSize
995 
996   long acceptableSize() {
997     return (long)Math.floor(this.maxSize * this.acceptableFactor);
998   }
999   private long minSize() {
1000     return (long)Math.floor(this.maxSize * this.minFactor);
1001   }
1002   private long singleSize() {
1003     return (long)Math.floor(this.maxSize * this.singleFactor * this.minFactor);
1004   }
1005   private long multiSize() {
1006     return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor);
1007   }
1008   private long memorySize() {
1009     return (long)Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
1010   }
1011 
1012   public void shutdown() {
1013     if (victimHandler != null)
1014       victimHandler.shutdown();
1015     this.scheduleThreadPool.shutdown();
1016     for (int i = 0; i < 10; i++) {
1017       if (!this.scheduleThreadPool.isShutdown()) {
1018         try {
1019           Thread.sleep(10);
1020         } catch (InterruptedException e) {
1021           LOG.warn("Interrupted while sleeping");
1022           Thread.currentThread().interrupt();
1023           break;
1024         }
1025       }
1026     }
1027 
1028     if (!this.scheduleThreadPool.isShutdown()) {
1029       List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
1030       LOG.debug("Still running " + runnables);
1031     }
1032     this.evictionThread.shutdown();
1033   }
1034 
1035   /** Clears the cache. Used in tests. */
1036   @VisibleForTesting
1037   public void clearCache() {
1038     this.map.clear();
1039     this.elements.set(0);
1040   }
1041 
1042   /**
1043    * Used in testing. May be very inefficient.
1044    * @return the set of cached file names
1045    */
1046   @VisibleForTesting
1047   SortedSet<String> getCachedFileNamesForTest() {
1048     SortedSet<String> fileNames = new TreeSet<String>();
1049     for (BlockCacheKey cacheKey : map.keySet()) {
1050       fileNames.add(cacheKey.getHfileName());
1051     }
1052     return fileNames;
1053   }
1054 
1055   @VisibleForTesting
1056   Map<BlockType, Integer> getBlockTypeCountsForTest() {
1057     Map<BlockType, Integer> counts =
1058         new EnumMap<BlockType, Integer>(BlockType.class);
1059     for (LruCachedBlock cb : map.values()) {
1060       BlockType blockType = ((Cacheable)cb.getBuffer()).getBlockType();
1061       Integer count = counts.get(blockType);
1062       counts.put(blockType, (count == null ? 0 : count) + 1);
1063     }
1064     return counts;
1065   }
1066 
1067   @VisibleForTesting
1068   public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
1069     Map<DataBlockEncoding, Integer> counts =
1070         new EnumMap<DataBlockEncoding, Integer>(DataBlockEncoding.class);
1071     for (LruCachedBlock block : map.values()) {
1072       DataBlockEncoding encoding =
1073               ((HFileBlock) block.getBuffer()).getDataBlockEncoding();
1074       Integer count = counts.get(encoding);
1075       counts.put(encoding, (count == null ? 0 : count) + 1);
1076     }
1077     return counts;
1078   }
1079 
1080   public void setVictimCache(BlockCache handler) {
1081     assert victimHandler == null;
1082     victimHandler = handler;
1083   }
1084 
1085   @VisibleForTesting
1086   Map<BlockCacheKey, LruCachedBlock> getMapForTests() {
1087     return map;
1088   }
1089 
1090   BlockCache getVictimHandler() {
1091     return this.victimHandler;
1092   }
1093 
1094   @Override
1095   @JsonIgnore
1096   public BlockCache[] getBlockCaches() {
1097     if (victimHandler != null)
1098       return new BlockCache[] {this, this.victimHandler};
1099     return null;
1100   }
1101 }