1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.io.hfile;
21
22 import net.spy.memcached.CachedData;
23 import net.spy.memcached.ConnectionFactoryBuilder;
24 import net.spy.memcached.FailureMode;
25 import net.spy.memcached.MemcachedClient;
26 import net.spy.memcached.transcoders.Transcoder;
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.hbase.util.Addressing;
33 import org.apache.htrace.Trace;
34 import org.apache.htrace.TraceScope;
35
36 import java.io.IOException;
37 import java.net.InetSocketAddress;
38 import java.nio.ByteBuffer;
39 import java.util.ArrayList;
40 import java.util.Iterator;
41 import java.util.List;
42 import java.util.NoSuchElementException;
43 import java.util.concurrent.ExecutionException;
44
45
46
47
48
49
50
51 @InterfaceAudience.Private
52 public class MemcachedBlockCache implements BlockCache {
53 private static final Log LOG = LogFactory.getLog(MemcachedBlockCache.class.getName());
54
55
56
57 public static final int MAX_SIZE = 1020 * 1024;
58
59
60
61
62
63
64 public static final String MEMCACHED_CONFIG_KEY = "hbase.cache.memcached.servers";
65 public static final String MEMCACHED_TIMEOUT_KEY = "hbase.cache.memcached.timeout";
66 public static final String MEMCACHED_OPTIMEOUT_KEY = "hbase.cache.memcached.optimeout";
67 public static final long MEMCACHED_DEFAULT_TIMEOUT = 500;
68
69 private final MemcachedClient client;
70 private final HFileBlockTranscoder tc = new HFileBlockTranscoder();
71 private final CacheStats cacheStats = new CacheStats("MemcachedBlockCache");
72
73 public MemcachedBlockCache(Configuration c) throws IOException {
74 LOG.info("Creating MemcachedBlockCache");
75
76 long opTimeout = c.getLong(MEMCACHED_OPTIMEOUT_KEY, MEMCACHED_DEFAULT_TIMEOUT);
77 long queueTimeout = c.getLong(MEMCACHED_TIMEOUT_KEY, opTimeout + MEMCACHED_DEFAULT_TIMEOUT);
78
79 ConnectionFactoryBuilder builder = new ConnectionFactoryBuilder()
80 .setOpTimeout(opTimeout)
81 .setOpQueueMaxBlockTime(queueTimeout)
82 .setFailureMode(FailureMode.Redistribute)
83 .setShouldOptimize(true)
84
85 .setDaemon(true)
86 .setUseNagleAlgorithm(false)
87 .setReadBufferSize(HConstants.DEFAULT_BLOCKSIZE * 4 * 1024);
88
89
90
91
92
93
94
95
96
97 String serverListString = c.get(MEMCACHED_CONFIG_KEY,"localhost:11211");
98 String[] servers = serverListString.split(",");
99 List<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>(servers.length);
100 for (String s:servers) {
101 serverAddresses.add(Addressing.createInetSocketAddressFromHostAndPortStr(s));
102 }
103
104 client = new MemcachedClient(builder.build(), serverAddresses);
105 }
106
107 @Override
108 public void cacheBlock(BlockCacheKey cacheKey,
109 Cacheable buf,
110 boolean inMemory,
111 boolean cacheDataInL1) {
112 cacheBlock(cacheKey, buf);
113 }
114
115 @Override
116 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
117 if (buf instanceof HFileBlock) {
118 client.add(cacheKey.toString(), MAX_SIZE, (HFileBlock) buf, tc);
119 } else {
120 if (LOG.isDebugEnabled()) {
121 LOG.debug("MemcachedBlockCache can not cache Cacheable's of type "
122 + buf.getClass().toString());
123 }
124 }
125 }
126
127 @Override
128 public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching,
129 boolean repeat, boolean updateCacheMetrics) {
130
131 HFileBlock result = null;
132
133 try (TraceScope traceScope = Trace.startSpan("MemcachedBlockCache.getBlock")) {
134 result = client.get(cacheKey.toString(), tc);
135 } catch (Exception e) {
136
137
138 if (LOG.isDebugEnabled()) {
139 LOG.debug("Exception pulling from memcached [ "
140 + cacheKey.toString()
141 + " ]. Treating as a miss.", e);
142 }
143 result = null;
144 } finally {
145
146 if (updateCacheMetrics) {
147 if (result == null) {
148 cacheStats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
149 } else {
150 cacheStats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
151 }
152 }
153 }
154
155 return result;
156 }
157
158 @Override
159 public boolean evictBlock(BlockCacheKey cacheKey) {
160 try {
161 cacheStats.evict();
162 return client.delete(cacheKey.toString()).get();
163 } catch (InterruptedException e) {
164 LOG.warn("Error deleting " + cacheKey.toString(), e);
165 Thread.currentThread().interrupt();
166 } catch (ExecutionException e) {
167 if (LOG.isDebugEnabled()) {
168 LOG.debug("Error deleting " + cacheKey.toString(), e);
169 }
170 }
171 return false;
172 }
173
174
175
176
177 @Override
178 public int evictBlocksByHfileName(String hfileName) {
179 return 0;
180 }
181
182 @Override
183 public CacheStats getStats() {
184 return cacheStats;
185 }
186
187 @Override
188 public void shutdown() {
189 client.shutdown();
190 }
191
192 @Override
193 public long size() {
194 return 0;
195 }
196
197 @Override
198 public long getFreeSize() {
199 return 0;
200 }
201
202 @Override
203 public long getCurrentSize() {
204 return 0;
205 }
206
207 @Override
208 public long getBlockCount() {
209 return 0;
210 }
211
212 @Override
213 public Iterator<CachedBlock> iterator() {
214 return new Iterator<CachedBlock>() {
215 @Override
216 public boolean hasNext() {
217 return false;
218 }
219
220 @Override
221 public CachedBlock next() {
222 throw new NoSuchElementException("MemcachedBlockCache can't iterate over blocks.");
223 }
224
225 @Override
226 public void remove() {
227
228 }
229 };
230 }
231
232 @Override
233 public BlockCache[] getBlockCaches() {
234 return null;
235 }
236
237
238
239
240 private static class HFileBlockTranscoder implements Transcoder<HFileBlock> {
241
242 @Override
243 public boolean asyncDecode(CachedData d) {
244 return false;
245 }
246
247 @Override
248 public CachedData encode(HFileBlock block) {
249 ByteBuffer bb = ByteBuffer.allocate(block.getSerializedLength());
250 block.serialize(bb);
251 return new CachedData(0, bb.array(), CachedData.MAX_SIZE);
252 }
253
254 @Override
255 public HFileBlock decode(CachedData d) {
256 try {
257 ByteBuffer buf = ByteBuffer.wrap(d.getData());
258 return (HFileBlock) HFileBlock.blockDeserializer.deserialize(buf, true);
259 } catch (IOException e) {
260 LOG.warn("Error deserializing data from memcached",e);
261 }
262 return null;
263 }
264
265 @Override
266 public int getMaxSize() {
267 return MAX_SIZE;
268 }
269 }
270
271 }