View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.lang.management.ManagementFactory;
22  import java.util.concurrent.BlockingQueue;
23  import java.util.concurrent.Executors;
24  import java.util.concurrent.LinkedBlockingQueue;
25  import java.util.concurrent.ScheduledExecutorService;
26  import java.util.concurrent.TimeUnit;
27  import java.util.concurrent.atomic.AtomicLong;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
34  import org.apache.hadoop.hbase.regionserver.HeapMemStoreLAB.Chunk;
35  import org.apache.hadoop.util.StringUtils;
36  
37  import com.google.common.util.concurrent.ThreadFactoryBuilder;
38  
39  /**
40   * A pool of {@link HeapMemStoreLAB$Chunk} instances.
41   * 
42   * MemStoreChunkPool caches a number of retired chunks for reusing, it could
43   * decrease allocating bytes when writing, thereby optimizing the garbage
44   * collection on JVM.
45   * 
46   * The pool instance is globally unique and could be obtained through
47   * {@link MemStoreChunkPool#getPool(Configuration)}
48   * 
49   * {@link MemStoreChunkPool#getChunk()} is called when MemStoreLAB allocating
50   * bytes, and {@link MemStoreChunkPool#putbackChunks(BlockingQueue)} is called
51   * when MemStore clearing snapshot for flush
52   */
53  @SuppressWarnings("javadoc")
54  @InterfaceAudience.Private
55  public class MemStoreChunkPool {
56    private static final Log LOG = LogFactory.getLog(MemStoreChunkPool.class);
57    final static String CHUNK_POOL_MAXSIZE_KEY = "hbase.hregion.memstore.chunkpool.maxsize";
58    final static String CHUNK_POOL_INITIALSIZE_KEY = "hbase.hregion.memstore.chunkpool.initialsize";
59    final static float POOL_MAX_SIZE_DEFAULT = 0.0f;
60    final static float POOL_INITIAL_SIZE_DEFAULT = 0.0f;
61  
62    // Static reference to the MemStoreChunkPool
63    private static MemStoreChunkPool globalInstance;
64    /** Boolean whether we have disabled the memstore chunk pool entirely. */
65    static boolean chunkPoolDisabled = false;
66  
67    private final int maxCount;
68  
69    // A queue of reclaimed chunks
70    private final BlockingQueue<Chunk> reclaimedChunks;
71    private final int chunkSize;
72  
73    /** Statistics thread schedule pool */
74    private final ScheduledExecutorService scheduleThreadPool;
75    /** Statistics thread */
76    private static final int statThreadPeriod = 60 * 5;
77    private AtomicLong createdChunkCount = new AtomicLong();
78    private AtomicLong reusedChunkCount = new AtomicLong();
79  
80    MemStoreChunkPool(Configuration conf, int chunkSize, int maxCount,
81        int initialCount) {
82      this.maxCount = maxCount;
83      this.chunkSize = chunkSize;
84      this.reclaimedChunks = new LinkedBlockingQueue<Chunk>();
85      for (int i = 0; i < initialCount; i++) {
86        Chunk chunk = new Chunk(chunkSize);
87        chunk.init();
88        reclaimedChunks.add(chunk);
89      }
90      final String n = Thread.currentThread().getName();
91      scheduleThreadPool = Executors.newScheduledThreadPool(1,
92          new ThreadFactoryBuilder().setNameFormat(n+"-MemStoreChunkPool Statistics")
93              .setDaemon(true).build());
94      this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
95          statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
96    }
97  
98    /**
99     * Poll a chunk from the pool, reset it if not null, else create a new chunk
100    * to return
101    * @return a chunk
102    */
103   Chunk getChunk() {
104     Chunk chunk = reclaimedChunks.poll();
105     if (chunk == null) {
106       chunk = new Chunk(chunkSize);
107       createdChunkCount.incrementAndGet();
108     } else {
109       chunk.reset();
110       reusedChunkCount.incrementAndGet();
111     }
112     return chunk;
113   }
114 
115   /**
116    * Add the chunks to the pool, when the pool achieves the max size, it will
117    * skip the remaining chunks
118    * @param chunks
119    */
120   void putbackChunks(BlockingQueue<Chunk> chunks) {
121     int maxNumToPutback = this.maxCount - reclaimedChunks.size();
122     if (maxNumToPutback <= 0) {
123       return;
124     }
125     chunks.drainTo(reclaimedChunks, maxNumToPutback);
126   }
127 
128   /**
129    * Add the chunk to the pool, if the pool has achieved the max size, it will
130    * skip it
131    * @param chunk
132    */
133   void putbackChunk(Chunk chunk) {
134     if (reclaimedChunks.size() >= this.maxCount) {
135       return;
136     }
137     reclaimedChunks.add(chunk);
138   }
139 
140   int getPoolSize() {
141     return this.reclaimedChunks.size();
142   }
143 
144   /*
145    * Only used in testing
146    */
147   void clearChunks() {
148     this.reclaimedChunks.clear();
149   }
150 
151   private static class StatisticsThread extends Thread {
152     MemStoreChunkPool mcp;
153 
154     public StatisticsThread(MemStoreChunkPool mcp) {
155       super("MemStoreChunkPool.StatisticsThread");
156       setDaemon(true);
157       this.mcp = mcp;
158     }
159 
160     @Override
161     public void run() {
162       mcp.logStats();
163     }
164   }
165 
166   private void logStats() {
167     if (!LOG.isDebugEnabled()) return;
168     long created = createdChunkCount.get();
169     long reused = reusedChunkCount.get();
170     long total = created + reused;
171     LOG.debug("Stats: current pool size=" + reclaimedChunks.size()
172         + ",created chunk count=" + created
173         + ",reused chunk count=" + reused
174         + ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent(
175             (float) reused / (float) total, 2)));
176   }
177 
178   /**
179    * @param conf
180    * @return the global MemStoreChunkPool instance
181    */
182   static MemStoreChunkPool getPool(Configuration conf) {
183     if (globalInstance != null) return globalInstance;
184 
185     synchronized (MemStoreChunkPool.class) {
186       if (chunkPoolDisabled) return null;
187       if (globalInstance != null) return globalInstance;
188       float poolSizePercentage = conf.getFloat(CHUNK_POOL_MAXSIZE_KEY, POOL_MAX_SIZE_DEFAULT);
189       if (poolSizePercentage <= 0) {
190         chunkPoolDisabled = true;
191         return null;
192       }
193       if (poolSizePercentage > 1.0) {
194         throw new IllegalArgumentException(CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0");
195       }
196       long heapMax = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
197       long globalMemStoreLimit = (long) (heapMax * HeapMemorySizeUtil.getGlobalMemStorePercent(conf,
198           false));
199       int chunkSize = conf.getInt(HeapMemStoreLAB.CHUNK_SIZE_KEY,
200           HeapMemStoreLAB.CHUNK_SIZE_DEFAULT);
201       int maxCount = (int) (globalMemStoreLimit * poolSizePercentage / chunkSize);
202 
203       float initialCountPercentage = conf.getFloat(CHUNK_POOL_INITIALSIZE_KEY,
204           POOL_INITIAL_SIZE_DEFAULT);
205       if (initialCountPercentage > 1.0 || initialCountPercentage < 0) {
206         throw new IllegalArgumentException(CHUNK_POOL_INITIALSIZE_KEY
207             + " must be between 0.0 and 1.0");
208       }
209 
210       int initialCount = (int) (initialCountPercentage * maxCount);
211       LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize)
212           + ", max count " + maxCount + ", initial count " + initialCount);
213       globalInstance = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount);
214       return globalInstance;
215     }
216   }
217 
218 }