View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitations
17   * under the License.
18   */
19  
20  package org.apache.hadoop.hbase.io.hfile;
21  
22  import net.spy.memcached.CachedData;
23  import net.spy.memcached.ConnectionFactoryBuilder;
24  import net.spy.memcached.FailureMode;
25  import net.spy.memcached.MemcachedClient;
26  import net.spy.memcached.transcoders.Transcoder;
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.hbase.util.Addressing;
33  import org.apache.htrace.Trace;
34  import org.apache.htrace.TraceScope;
35  
36  import java.io.IOException;
37  import java.net.InetSocketAddress;
38  import java.nio.ByteBuffer;
39  import java.util.ArrayList;
40  import java.util.Iterator;
41  import java.util.List;
42  import java.util.NoSuchElementException;
43  import java.util.concurrent.ExecutionException;
44  
45  /**
46   * Class to store blocks into memcached.
47   * This should only be used on a cluster of Memcached daemons that are tuned well and have a
48   * good network connection to the HBase regionservers. Any other use will likely slow down HBase
49   * greatly.
50   */
51  @InterfaceAudience.Private
52  public class MemcachedBlockCache implements BlockCache {
53    private static final Log LOG = LogFactory.getLog(MemcachedBlockCache.class.getName());
54  
55    // Some memcache versions won't take more than 1024 * 1024. So set the limit below
56    // that just in case this client is used with those versions.
57    public static final int MAX_SIZE = 1020 * 1024;
58  
59    // Config key for what memcached servers to use.
60    // They should be specified in a comma sperated list with ports.
61    // like:
62    //
63    // host1:11211,host3:8080,host4:11211
64    public static final String MEMCACHED_CONFIG_KEY = "hbase.cache.memcached.servers";
65    public static final String MEMCACHED_TIMEOUT_KEY = "hbase.cache.memcached.timeout";
66    public static final String MEMCACHED_OPTIMEOUT_KEY = "hbase.cache.memcached.optimeout";
67    public static final long MEMCACHED_DEFAULT_TIMEOUT = 500;
68  
69    private final MemcachedClient client;
70    private final HFileBlockTranscoder tc = new HFileBlockTranscoder();
71    private final CacheStats cacheStats = new CacheStats("MemcachedBlockCache");
72  
73    public MemcachedBlockCache(Configuration c) throws IOException {
74      LOG.info("Creating MemcachedBlockCache");
75  
76      long opTimeout = c.getLong(MEMCACHED_OPTIMEOUT_KEY, MEMCACHED_DEFAULT_TIMEOUT);
77      long queueTimeout = c.getLong(MEMCACHED_TIMEOUT_KEY, opTimeout + MEMCACHED_DEFAULT_TIMEOUT);
78  
79      ConnectionFactoryBuilder builder = new ConnectionFactoryBuilder()
80          .setOpTimeout(opTimeout)
81          .setOpQueueMaxBlockTime(queueTimeout) // Cap the max time before anything times out
82          .setFailureMode(FailureMode.Redistribute)
83          .setShouldOptimize(true)              // When regions move lots of reads happen together
84                                                // So combining them into single requests is nice.
85          .setDaemon(true)                      // Don't keep threads around past the end of days.
86          .setUseNagleAlgorithm(false)          // Ain't nobody got time for that
87          .setReadBufferSize(HConstants.DEFAULT_BLOCKSIZE * 4 * 1024);  // 4 times larger than the
88                                                                        // default block just in case
89  
90  
91      // Assume only the localhost is serving memecached.
92      // A la mcrouter or co-locating memcached with split regionservers.
93      //
94      // If this config is a pool of memecached servers they will all be used according to the
95      // default hashing scheme defined by the memcache client. Spy Memecache client in this
96      // case.
97      String serverListString = c.get(MEMCACHED_CONFIG_KEY,"localhost:11211");
98      String[] servers = serverListString.split(",");
99      List<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>(servers.length);
100     for (String s:servers) {
101       serverAddresses.add(Addressing.createInetSocketAddressFromHostAndPortStr(s));
102     }
103 
104     client = new MemcachedClient(builder.build(), serverAddresses);
105   }
106 
107   @Override
108   public void cacheBlock(BlockCacheKey cacheKey,
109                          Cacheable buf,
110                          boolean inMemory,
111                          boolean cacheDataInL1) {
112     cacheBlock(cacheKey, buf);
113   }
114 
115   @Override
116   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
117     if (buf instanceof HFileBlock) {
118       client.add(cacheKey.toString(), MAX_SIZE, (HFileBlock) buf, tc);
119     } else {
120       if (LOG.isDebugEnabled()) {
121         LOG.debug("MemcachedBlockCache can not cache Cacheable's of type "
122             + buf.getClass().toString());
123       }
124     }
125   }
126 
127   @Override
128   public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching,
129                             boolean repeat, boolean updateCacheMetrics) {
130     // Assume that nothing is the block cache
131     HFileBlock result = null;
132 
133     try (TraceScope traceScope = Trace.startSpan("MemcachedBlockCache.getBlock")) {
134       result = client.get(cacheKey.toString(), tc);
135     } catch (Exception e) {
136       // Catch a pretty broad set of exceptions to limit any changes in the memecache client
137       // and how it handles failures from leaking into the read path.
138       if (LOG.isDebugEnabled()) {
139         LOG.debug("Exception pulling from memcached [ "
140             + cacheKey.toString()
141             + " ]. Treating as a miss.", e);
142       }
143       result = null;
144     } finally {
145       // Update stats if this request doesn't have it turned off 100% of the time
146       if (updateCacheMetrics) {
147         if (result == null) {
148           cacheStats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
149         } else {
150           cacheStats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
151         }
152       }
153     }
154 
155     return result;
156   }
157 
158   @Override
159   public boolean evictBlock(BlockCacheKey cacheKey) {
160     try {
161       cacheStats.evict();
162       return client.delete(cacheKey.toString()).get();
163     } catch (InterruptedException e) {
164       LOG.warn("Error deleting " + cacheKey.toString(), e);
165       Thread.currentThread().interrupt();
166     } catch (ExecutionException e) {
167       if (LOG.isDebugEnabled()) {
168         LOG.debug("Error deleting " + cacheKey.toString(), e);
169       }
170     }
171     return false;
172   }
173 
174   /**
175    * This method does nothing so that memcached can handle all evictions.
176    */
177   @Override
178   public int evictBlocksByHfileName(String hfileName) {
179     return 0;
180   }
181 
182   @Override
183   public CacheStats getStats() {
184     return cacheStats;
185   }
186 
187   @Override
188   public void shutdown() {
189     client.shutdown();
190   }
191 
192   @Override
193   public long size() {
194     return 0;
195   }
196 
197   @Override
198   public long getFreeSize() {
199     return 0;
200   }
201 
202   @Override
203   public long getCurrentSize() {
204     return 0;
205   }
206 
207   @Override
208   public long getBlockCount() {
209     return 0;
210   }
211 
212   @Override
213   public Iterator<CachedBlock> iterator() {
214     return new Iterator<CachedBlock>() {
215       @Override
216       public boolean hasNext() {
217         return false;
218       }
219 
220       @Override
221       public CachedBlock next() {
222         throw new NoSuchElementException("MemcachedBlockCache can't iterate over blocks.");
223       }
224 
225       @Override
226       public void remove() {
227 
228       }
229     };
230   }
231 
232   @Override
233   public BlockCache[] getBlockCaches() {
234     return null;
235   }
236 
237   /**
238    * Class to encode and decode an HFileBlock to and from memecached's resulting byte arrays.
239    */
240   private static class HFileBlockTranscoder implements Transcoder<HFileBlock> {
241 
242     @Override
243     public boolean asyncDecode(CachedData d) {
244       return false;
245     }
246 
247     @Override
248     public CachedData encode(HFileBlock block) {
249       ByteBuffer bb = ByteBuffer.allocate(block.getSerializedLength());
250       block.serialize(bb);
251       return new CachedData(0, bb.array(), CachedData.MAX_SIZE);
252     }
253 
254     @Override
255     public HFileBlock decode(CachedData d) {
256       try {
257         ByteBuffer buf = ByteBuffer.wrap(d.getData());
258         return (HFileBlock) HFileBlock.blockDeserializer.deserialize(buf, true);
259       } catch (IOException e) {
260         LOG.warn("Error deserializing data from memcached",e);
261       }
262       return null;
263     }
264 
265     @Override
266     public int getMaxSize() {
267       return MAX_SIZE;
268     }
269   }
270 
271 }