1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mob;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.ScheduledExecutorService;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicLong;
31 import java.util.concurrent.locks.ReentrantLock;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.hbase.classification.InterfaceAudience;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FileSystem;
38 import org.apache.hadoop.fs.Path;
39 import org.apache.hadoop.hbase.util.IdLock;
40
41 import com.google.common.util.concurrent.ThreadFactoryBuilder;
42
43
44
45
46
47
48
49 @InterfaceAudience.Private
50 public class MobFileCache {
51
52 private static final Log LOG = LogFactory.getLog(MobFileCache.class);
53
54
55
56
57
58
59 static class EvictionThread extends Thread {
60 MobFileCache lru;
61
62 public EvictionThread(MobFileCache lru) {
63 super("MobFileCache.EvictionThread");
64 setDaemon(true);
65 this.lru = lru;
66 }
67
68 @Override
69 public void run() {
70 lru.evict();
71 }
72 }
73
74
75 private Map<String, CachedMobFile> map = null;
76
77 private final AtomicLong count = new AtomicLong(0);
78 private long lastAccess = 0;
79 private final AtomicLong miss = new AtomicLong(0);
80 private long lastMiss = 0;
81 private final AtomicLong evictedFileCount = new AtomicLong(0);
82 private long lastEvictedFileCount = 0;
83
84
85
86 private final ReentrantLock evictionLock = new ReentrantLock(true);
87
88
89 private final IdLock keyLock = new IdLock();
90
91 private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
92 new ThreadFactoryBuilder().setNameFormat("MobFileCache #%d").setDaemon(true).build());
93 private final Configuration conf;
94
95
96 private final int mobFileMaxCacheSize;
97 private final boolean isCacheEnabled;
98 private float evictRemainRatio;
99
100 public MobFileCache(Configuration conf) {
101 this.conf = conf;
102 this.mobFileMaxCacheSize = conf.getInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY,
103 MobConstants.DEFAULT_MOB_FILE_CACHE_SIZE);
104 isCacheEnabled = (mobFileMaxCacheSize > 0);
105 map = new ConcurrentHashMap<String, CachedMobFile>(mobFileMaxCacheSize);
106 if (isCacheEnabled) {
107 long period = conf.getLong(MobConstants.MOB_CACHE_EVICT_PERIOD,
108 MobConstants.DEFAULT_MOB_CACHE_EVICT_PERIOD);
109 evictRemainRatio = conf.getFloat(MobConstants.MOB_CACHE_EVICT_REMAIN_RATIO,
110 MobConstants.DEFAULT_EVICT_REMAIN_RATIO);
111 if (evictRemainRatio < 0.0) {
112 evictRemainRatio = 0.0f;
113 LOG.warn(MobConstants.MOB_CACHE_EVICT_REMAIN_RATIO + " is less than 0.0, 0.0 is used.");
114 } else if (evictRemainRatio > 1.0) {
115 evictRemainRatio = 1.0f;
116 LOG.warn(MobConstants.MOB_CACHE_EVICT_REMAIN_RATIO + " is larger than 1.0, 1.0 is used.");
117 }
118 this.scheduleThreadPool.scheduleAtFixedRate(new EvictionThread(this), period, period,
119 TimeUnit.SECONDS);
120
121 LOG.info("MobFileCache enabled with cacheSize=" + mobFileMaxCacheSize +
122 ", evictPeriods=" + period + "sec, evictRemainRatio=" + evictRemainRatio);
123 } else {
124 LOG.info("MobFileCache disabled");
125 }
126 }
127
128
129
130
131
132 public void evict() {
133 if (isCacheEnabled) {
134
135 if (!evictionLock.tryLock()) {
136 return;
137 }
138 printStatistics();
139 List<CachedMobFile> evictedFiles = new ArrayList<CachedMobFile>();
140 try {
141 if (map.size() <= mobFileMaxCacheSize) {
142 return;
143 }
144 List<CachedMobFile> files = new ArrayList<CachedMobFile>(map.values());
145 Collections.sort(files);
146 int start = (int) (mobFileMaxCacheSize * evictRemainRatio);
147 if (start >= 0) {
148 for (int i = start; i < files.size(); i++) {
149 String name = files.get(i).getFileName();
150 CachedMobFile evictedFile = map.remove(name);
151 if (evictedFile != null) {
152 evictedFiles.add(evictedFile);
153 }
154 }
155 }
156 } finally {
157 evictionLock.unlock();
158 }
159
160
161 for (CachedMobFile evictedFile : evictedFiles) {
162 closeFile(evictedFile);
163 }
164 evictedFileCount.addAndGet(evictedFiles.size());
165 }
166 }
167
168
169
170
171
172 public void evictFile(String fileName) {
173 if (isCacheEnabled) {
174 IdLock.Entry lockEntry = null;
175 try {
176
177 lockEntry = keyLock.getLockEntry(fileName.hashCode());
178 CachedMobFile evictedFile = map.remove(fileName);
179 if (evictedFile != null) {
180 evictedFile.close();
181 evictedFileCount.incrementAndGet();
182 }
183 } catch (IOException e) {
184 LOG.error("Failed to evict the file " + fileName, e);
185 } finally {
186 if (lockEntry != null) {
187 keyLock.releaseLockEntry(lockEntry);
188 }
189 }
190 }
191 }
192
193
194
195
196
197
198
199
200
201 public MobFile openFile(FileSystem fs, Path path, MobCacheConfig cacheConf) throws IOException {
202 if (!isCacheEnabled) {
203 MobFile mobFile = MobFile.create(fs, path, conf, cacheConf);
204 mobFile.open();
205 return mobFile;
206 } else {
207 String fileName = path.getName();
208 CachedMobFile cached = map.get(fileName);
209 IdLock.Entry lockEntry = keyLock.getLockEntry(fileName.hashCode());
210 try {
211 if (cached == null) {
212 cached = map.get(fileName);
213 if (cached == null) {
214 if (map.size() > mobFileMaxCacheSize) {
215 evict();
216 }
217 cached = CachedMobFile.create(fs, path, conf, cacheConf);
218 cached.open();
219 map.put(fileName, cached);
220 miss.incrementAndGet();
221 }
222 }
223 cached.open();
224 cached.access(count.incrementAndGet());
225 } finally {
226 keyLock.releaseLockEntry(lockEntry);
227 }
228 return cached;
229 }
230 }
231
232
233
234
235
236 public void closeFile(MobFile file) {
237 IdLock.Entry lockEntry = null;
238 try {
239 if (!isCacheEnabled) {
240 file.close();
241 } else {
242 lockEntry = keyLock.getLockEntry(file.getFileName().hashCode());
243 file.close();
244 }
245 } catch (IOException e) {
246 LOG.error("MobFileCache, Exception happen during close " + file.getFileName(), e);
247 } finally {
248 if (lockEntry != null) {
249 keyLock.releaseLockEntry(lockEntry);
250 }
251 }
252 }
253
254 public void shutdown() {
255 this.scheduleThreadPool.shutdown();
256 for (int i = 0; i < 100; i++) {
257 if (!this.scheduleThreadPool.isShutdown()) {
258 try {
259 Thread.sleep(10);
260 } catch (InterruptedException e) {
261 LOG.warn("Interrupted while sleeping");
262 Thread.currentThread().interrupt();
263 break;
264 }
265 }
266 }
267
268 if (!this.scheduleThreadPool.isShutdown()) {
269 List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
270 LOG.debug("Still running " + runnables);
271 }
272 }
273
274
275
276
277
278 public int getCacheSize() {
279 return map == null ? 0 : map.size();
280 }
281
282
283
284
285
286 public long getAccessCount() {
287 return count.get();
288 }
289
290
291
292
293
294 public long getMissCount() {
295 return miss.get();
296 }
297
298
299
300
301
302 public long getEvictedFileCount() {
303 return evictedFileCount.get();
304 }
305
306
307
308
309
310 public double getHitRatio() {
311 return count.get() == 0 ? 0 : ((float) (count.get() - miss.get())) / (float) count.get();
312 }
313
314
315
316
317 public void printStatistics() {
318 long access = count.get() - lastAccess;
319 long missed = miss.get() - lastMiss;
320 long evicted = evictedFileCount.get() - lastEvictedFileCount;
321 int hitRatio = access == 0 ? 0 : (int) (((float) (access - missed)) / (float) access * 100);
322 LOG.info("MobFileCache Statistics, access: " + access + ", miss: " + missed + ", hit: "
323 + (access - missed) + ", hit ratio: " + hitRatio + "%, evicted files: " + evicted);
324 lastAccess += access;
325 lastMiss += missed;
326 lastEvictedFileCount += evicted;
327 }
328
329 }