View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.PrintWriter;
23  import java.io.StringWriter;
24  import java.util.ArrayList;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.PriorityBlockingQueue;
30  import java.util.concurrent.RejectedExecutionException;
31  import java.util.concurrent.RejectedExecutionHandler;
32  import java.util.concurrent.ThreadFactory;
33  import java.util.concurrent.ThreadPoolExecutor;
34  import java.util.concurrent.TimeUnit;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.hbase.RemoteExceptionHandler;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.conf.ConfigurationManager;
42  import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
43  import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
44  import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
45  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
46  import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
47  import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
48  import org.apache.hadoop.hbase.security.User;
49  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
50  import org.apache.hadoop.hbase.util.Pair;
51  import org.apache.hadoop.util.StringUtils;
52  
53  import com.google.common.annotations.VisibleForTesting;
54  import com.google.common.base.Preconditions;
55  
56  /**
57   * Compact region on request and then run split if appropriate
58   */
59  @InterfaceAudience.Private
60  public class CompactSplitThread implements CompactionRequestor, PropagatingConfigurationObserver {
61    static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
62  
63    // Configuration key for the large compaction threads.
64    public final static String LARGE_COMPACTION_THREADS =
65        "hbase.regionserver.thread.compaction.large";
66    public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;
67    
68    // Configuration key for the small compaction threads.
69    public final static String SMALL_COMPACTION_THREADS =
70        "hbase.regionserver.thread.compaction.small";
71    public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;
72    
73    // Configuration key for split threads
74    public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
75    public final static int SPLIT_THREADS_DEFAULT = 1;
76    
77    // Configuration keys for merge threads
78    public final static String MERGE_THREADS = "hbase.regionserver.thread.merge";
79    public final static int MERGE_THREADS_DEFAULT = 1;
80  
81    public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
82        "hbase.regionserver.regionSplitLimit";
83    public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000;
84    
85    private final HRegionServer server;
86    private final Configuration conf;
87  
88    private final ThreadPoolExecutor longCompactions;
89    private final ThreadPoolExecutor shortCompactions;
90    private final ThreadPoolExecutor splits;
91    private final ThreadPoolExecutor mergePool;
92  
93    private volatile CompactionThroughputController compactionThroughputController;
94  
95    /**
96     * Splitting should not take place if the total number of regions exceed this.
97     * This is not a hard limit to the number of regions but it is a guideline to
98     * stop splitting after number of online regions is greater than this.
99     */
100   private int regionSplitLimit;
101 
102   /** @param server */
103   CompactSplitThread(HRegionServer server) {
104     super();
105     this.server = server;
106     this.conf = server.getConfiguration();
107     this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT,
108         DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
109 
110     int largeThreads = Math.max(1, conf.getInt(
111         LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
112     int smallThreads = conf.getInt(
113         SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
114 
115     int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
116 
117     // if we have throttle threads, make sure the user also specified size
118     Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
119 
120     final String n = Thread.currentThread().getName();
121 
122     this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
123         60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(),
124         new ThreadFactory() {
125           @Override
126           public Thread newThread(Runnable r) {
127             Thread t = new Thread(r);
128             t.setName(n + "-longCompactions-" + System.currentTimeMillis());
129             return t;
130           }
131       });
132     this.longCompactions.setRejectedExecutionHandler(new Rejection());
133     this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
134         60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(),
135         new ThreadFactory() {
136           @Override
137           public Thread newThread(Runnable r) {
138             Thread t = new Thread(r);
139             t.setName(n + "-shortCompactions-" + System.currentTimeMillis());
140             return t;
141           }
142       });
143     this.shortCompactions
144         .setRejectedExecutionHandler(new Rejection());
145     this.splits = (ThreadPoolExecutor)
146         Executors.newFixedThreadPool(splitThreads,
147             new ThreadFactory() {
148           @Override
149           public Thread newThread(Runnable r) {
150             Thread t = new Thread(r);
151             t.setName(n + "-splits-" + System.currentTimeMillis());
152             return t;
153           }
154       });
155     int mergeThreads = conf.getInt(MERGE_THREADS, MERGE_THREADS_DEFAULT);
156     this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool(
157         mergeThreads, new ThreadFactory() {
158           @Override
159           public Thread newThread(Runnable r) {
160             Thread t = new Thread(r);
161             t.setName(n + "-merges-" + System.currentTimeMillis());
162             return t;
163           }
164         });
165 
166     // compaction throughput controller
167     this.compactionThroughputController =
168         CompactionThroughputControllerFactory.create(server, conf);
169   }
170 
171   @Override
172   public String toString() {
173     return "compaction_queue=("
174         + longCompactions.getQueue().size() + ":"
175         + shortCompactions.getQueue().size() + ")"
176         + ", split_queue=" + splits.getQueue().size()
177         + ", merge_queue=" + mergePool.getQueue().size();
178   }
179   
180   public String dumpQueue() {
181     StringBuffer queueLists = new StringBuffer();
182     queueLists.append("Compaction/Split Queue dump:\n");
183     queueLists.append("  LargeCompation Queue:\n");
184     BlockingQueue<Runnable> lq = longCompactions.getQueue();
185     Iterator<Runnable> it = lq.iterator();
186     while (it.hasNext()) {
187       queueLists.append("    " + it.next().toString());
188       queueLists.append("\n");
189     }
190 
191     if (shortCompactions != null) {
192       queueLists.append("\n");
193       queueLists.append("  SmallCompation Queue:\n");
194       lq = shortCompactions.getQueue();
195       it = lq.iterator();
196       while (it.hasNext()) {
197         queueLists.append("    " + it.next().toString());
198         queueLists.append("\n");
199       }
200     }
201 
202     queueLists.append("\n");
203     queueLists.append("  Split Queue:\n");
204     lq = splits.getQueue();
205     it = lq.iterator();
206     while (it.hasNext()) {
207       queueLists.append("    " + it.next().toString());
208       queueLists.append("\n");
209     }
210 
211     queueLists.append("\n");
212     queueLists.append("  Region Merge Queue:\n");
213     lq = mergePool.getQueue();
214     it = lq.iterator();
215     while (it.hasNext()) {
216       queueLists.append("    " + it.next().toString());
217       queueLists.append("\n");
218     }
219 
220     return queueLists.toString();
221   }
222 
223   public synchronized void requestRegionsMerge(final Region a,
224       final Region b, final boolean forcible, long masterSystemTime, User user) {
225     try {
226       mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible, masterSystemTime,user));
227       if (LOG.isDebugEnabled()) {
228         LOG.debug("Region merge requested for " + a + "," + b + ", forcible="
229             + forcible + ".  " + this);
230       }
231     } catch (RejectedExecutionException ree) {
232       LOG.warn("Could not execute merge for " + a + "," + b + ", forcible="
233           + forcible, ree);
234     }
235   }
236 
237   public synchronized boolean requestSplit(final Region r) {
238     // don't split regions that are blocking
239     if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= Store.PRIORITY_USER) {
240       byte[] midKey = ((HRegion)r).checkSplit();
241       if (midKey != null) {
242         requestSplit(r, midKey);
243         return true;
244       }
245     }
246     return false;
247   }
248 
249   public synchronized void requestSplit(final Region r, byte[] midKey) {
250     requestSplit(r, midKey, null);
251   }
252 
253   /*
254    * The User parameter allows the split thread to assume the correct user identity
255    */
256   public synchronized void requestSplit(final Region r, byte[] midKey, User user) {
257     if (midKey == null) {
258       LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() +
259         " not splittable because midkey=null");
260       if (((HRegion)r).shouldForceSplit()) {
261         ((HRegion)r).clearSplit();
262       }
263       return;
264     }
265     try {
266       this.splits.execute(new SplitRequest(r, midKey, this.server, user));
267       if (LOG.isDebugEnabled()) {
268         LOG.debug("Split requested for " + r + ".  " + this);
269       }
270     } catch (RejectedExecutionException ree) {
271       LOG.info("Could not execute split for " + r, ree);
272     }
273   }
274 
275   @Override
276   public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why)
277       throws IOException {
278     return requestCompaction(r, why, null);
279   }
280 
281   @Override
282   public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
283       List<Pair<CompactionRequest, Store>> requests) throws IOException {
284     return requestCompaction(r, why, Store.NO_PRIORITY, requests, null);
285   }
286 
287   @Override
288   public synchronized CompactionRequest requestCompaction(final Region r, final Store s,
289       final String why, CompactionRequest request) throws IOException {
290     return requestCompaction(r, s, why, Store.NO_PRIORITY, request, null);
291   }
292 
293   @Override
294   public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
295       int p, List<Pair<CompactionRequest, Store>> requests, User user) throws IOException {
296     return requestCompactionInternal(r, why, p, requests, true, user);
297   }
298 
299   private List<CompactionRequest> requestCompactionInternal(final Region r, final String why,
300       int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow, User user)
301           throws IOException {
302     // not a special compaction request, so make our own list
303     List<CompactionRequest> ret = null;
304     if (requests == null) {
305       ret = selectNow ? new ArrayList<CompactionRequest>(r.getStores().size()) : null;
306       for (Store s : r.getStores()) {
307         CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow, user);
308         if (selectNow) ret.add(cr);
309       }
310     } else {
311       Preconditions.checkArgument(selectNow); // only system requests have selectNow == false
312       ret = new ArrayList<CompactionRequest>(requests.size());
313       for (Pair<CompactionRequest, Store> pair : requests) {
314         ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst(), user));
315       }
316     }
317     return ret;
318   }
319 
320   public CompactionRequest requestCompaction(final Region r, final Store s,
321       final String why, int priority, CompactionRequest request, User user) throws IOException {
322     return requestCompactionInternal(r, s, why, priority, request, true, user);
323   }
324 
325   public synchronized void requestSystemCompaction(
326       final Region r, final String why) throws IOException {
327     requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false, null);
328   }
329 
330   public void requestSystemCompaction(
331       final Region r, final Store s, final String why) throws IOException {
332     requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false, null);
333   }
334 
335   /**
336    * @param r region store belongs to
337    * @param s Store to request compaction on
338    * @param why Why compaction requested -- used in debug messages
339    * @param priority override the default priority (NO_PRIORITY == decide)
340    * @param request custom compaction request. Can be <tt>null</tt> in which case a simple
341    *          compaction will be used.
342    */
343   private synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s,
344       final String why, int priority, CompactionRequest request, boolean selectNow, User user)
345           throws IOException {
346     if (this.server.isStopped()
347         || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
348       return null;
349     }
350 
351     CompactionContext compaction = null;
352     if (selectNow) {
353       compaction = selectCompaction(r, s, priority, request, user);
354       if (compaction == null) return null; // message logged inside
355     }
356 
357     final RegionServerSpaceQuotaManager spaceQuotaManager =
358       this.server.getRegionServerSpaceQuotaManager();
359     if (spaceQuotaManager != null && spaceQuotaManager.areCompactionsDisabled(
360         r.getTableDesc().getTableName())) {
361       if (LOG.isDebugEnabled()) {
362         LOG.debug("Ignoring compaction request for " + r + " as an active space quota violation "
363             + " policy disallows compactions.");
364       }
365       return null;
366     }
367 
368     // We assume that most compactions are small. So, put system compactions into small
369     // pool; we will do selection there, and move to large pool if necessary.
370     ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))
371       ? longCompactions : shortCompactions;
372     pool.execute(new CompactionRunner(s, r, compaction, pool, user));
373     if (LOG.isDebugEnabled()) {
374       String type = (pool == shortCompactions) ? "Small " : "Large ";
375       LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
376           + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
377     }
378     return selectNow ? compaction.getRequest() : null;
379   }
380 
381   private CompactionContext selectCompaction(final Region r, final Store s,
382       int priority, CompactionRequest request, User user) throws IOException {
383     CompactionContext compaction = s.requestCompaction(priority, request, user);
384     if (compaction == null) {
385       if(LOG.isDebugEnabled() && r.getRegionInfo() != null) {
386         LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() +
387             " because compaction request was cancelled");
388       }
389       return null;
390     }
391     assert compaction.hasSelection();
392     if (priority != Store.NO_PRIORITY) {
393       compaction.getRequest().setPriority(priority);
394     }
395     return compaction;
396   }
397 
398   /**
399    * Only interrupt once it's done with a run through the work loop.
400    */
401   void interruptIfNecessary() {
402     splits.shutdown();
403     mergePool.shutdown();
404     longCompactions.shutdown();
405     shortCompactions.shutdown();
406   }
407 
408   private void waitFor(ThreadPoolExecutor t, String name) {
409     boolean done = false;
410     while (!done) {
411       try {
412         done = t.awaitTermination(60, TimeUnit.SECONDS);
413         LOG.info("Waiting for " + name + " to finish...");
414         if (!done) {
415           t.shutdownNow();
416         }
417       } catch (InterruptedException ie) {
418         LOG.warn("Interrupted waiting for " + name + " to finish...");
419       }
420     }
421   }
422 
423   void join() {
424     waitFor(splits, "Split Thread");
425     waitFor(mergePool, "Merge Thread");
426     waitFor(longCompactions, "Large Compaction Thread");
427     waitFor(shortCompactions, "Small Compaction Thread");
428   }
429 
430   /**
431    * Returns the current size of the queue containing regions that are
432    * processed.
433    *
434    * @return The current size of the regions queue.
435    */
436   public int getCompactionQueueSize() {
437     return longCompactions.getQueue().size() + shortCompactions.getQueue().size();
438   }
439 
440   public int getLargeCompactionQueueSize() {
441     return longCompactions.getQueue().size();
442   }
443 
444 
445   public int getSmallCompactionQueueSize() {
446     return shortCompactions.getQueue().size();
447   }
448 
449   public int getSplitQueueSize() {
450     return splits.getQueue().size();
451   }
452 
453   private boolean shouldSplitRegion() {
454     if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) {
455       LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". "
456           + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt");
457     }
458     return (regionSplitLimit > server.getNumberOfOnlineRegions());
459   }
460 
461   /**
462    * @return the regionSplitLimit
463    */
464   public int getRegionSplitLimit() {
465     return this.regionSplitLimit;
466   }
467 
468   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
469       justification="Contrived use of compareTo")
470   private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
471     private final Store store;
472     private final HRegion region;
473     private CompactionContext compaction;
474     private int queuedPriority;
475     private ThreadPoolExecutor parent;
476     private User user;
477 
478     public CompactionRunner(Store store, Region region,
479         CompactionContext compaction, ThreadPoolExecutor parent, User user) {
480       super();
481       this.store = store;
482       this.region = (HRegion)region;
483       this.compaction = compaction;
484       this.queuedPriority = (this.compaction == null)
485           ? store.getCompactPriority() : compaction.getRequest().getPriority();
486       this.parent = parent;
487       this.user = user;
488     }
489 
490     @Override
491     public String toString() {
492       return (this.compaction != null) ? ("Request = " + compaction.getRequest())
493           : ("Store = " + store.toString() + ", pri = " + queuedPriority);
494     }
495 
496     private void doCompaction(User user) {
497       // Common case - system compaction without a file selection. Select now.
498       if (this.compaction == null) {
499         int oldPriority = this.queuedPriority;
500         this.queuedPriority = this.store.getCompactPriority();
501         if (this.queuedPriority > oldPriority) {
502           // Store priority decreased while we were in queue (due to some other compaction?),
503           // requeue with new priority to avoid blocking potential higher priorities.
504           this.parent.execute(this);
505           return;
506         }
507         try {
508           this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user);
509         } catch (IOException ex) {
510           LOG.error("Compaction selection failed " + this, ex);
511           server.checkFileSystem();
512           return;
513         }
514         if (this.compaction == null) return; // nothing to do
515         // Now see if we are in correct pool for the size; if not, go to the correct one.
516         // We might end up waiting for a while, so cancel the selection.
517         assert this.compaction.hasSelection();
518         ThreadPoolExecutor pool = store.throttleCompaction(
519             compaction.getRequest().getSize()) ? longCompactions : shortCompactions;
520         if (this.parent != pool) {
521           this.store.cancelRequestedCompaction(this.compaction);
522           this.compaction = null;
523           this.parent = pool;
524           this.parent.execute(this);
525           return;
526         }
527       }
528       // Finally we can compact something.
529       assert this.compaction != null;
530 
531       this.compaction.getRequest().beforeExecute();
532       try {
533         // Note: please don't put single-compaction logic here;
534         //       put it into region/store/etc. This is CST logic.
535         long start = EnvironmentEdgeManager.currentTime();
536         boolean completed =
537             region.compact(compaction, store, compactionThroughputController, user);
538         long now = EnvironmentEdgeManager.currentTime();
539         LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
540               this + "; duration=" + StringUtils.formatTimeDiff(now, start));
541         if (completed) {
542           // degenerate case: blocked regions require recursive enqueues
543           if (store.getCompactPriority() <= 0) {
544             requestSystemCompaction(region, store, "Recursive enqueue");
545           } else {
546             // see if the compaction has caused us to exceed max region size
547             requestSplit(region);
548           }
549         }
550       } catch (IOException ex) {
551         IOException remoteEx = RemoteExceptionHandler.checkIOException(ex);
552         LOG.error("Compaction failed " + this, remoteEx);
553         if (remoteEx != ex) {
554           LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
555         }
556         server.checkFileSystem();
557       } catch (Exception ex) {
558         LOG.error("Compaction failed " + this, ex);
559         server.checkFileSystem();
560       } finally {
561         LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
562       }
563       this.compaction.getRequest().afterExecute();
564     }
565 
566     @Override
567     public void run() {
568       Preconditions.checkNotNull(server);
569       if (server.isStopped()
570           || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
571         return;
572       }
573       doCompaction(user);
574     }
575 
576     private String formatStackTrace(Exception ex) {
577       StringWriter sw = new StringWriter();
578       PrintWriter pw = new PrintWriter(sw);
579       ex.printStackTrace(pw);
580       pw.flush();
581       return sw.toString();
582     }
583 
584     @Override
585     public int compareTo(CompactionRunner o) {
586       // Only compare the underlying request (if any), for queue sorting purposes.
587       int compareVal = queuedPriority - o.queuedPriority; // compare priority
588       if (compareVal != 0) return compareVal;
589       CompactionContext tc = this.compaction, oc = o.compaction;
590       // Sort pre-selected (user?) compactions before system ones with equal priority.
591       return (tc == null) ? ((oc == null) ? 0 : 1)
592           : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest()));
593     }
594   }
595 
596   /**
597    * Cleanup class to use when rejecting a compaction request from the queue.
598    */
599   private static class Rejection implements RejectedExecutionHandler {
600     @Override
601     public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
602       if (runnable instanceof CompactionRunner) {
603         CompactionRunner runner = (CompactionRunner)runnable;
604         LOG.debug("Compaction Rejected: " + runner);
605         runner.store.cancelRequestedCompaction(runner.compaction);
606       }
607     }
608   }
609 
610   /**
611    * {@inheritDoc}
612    */
613   @Override
614   public void onConfigurationChange(Configuration newConf) {
615     // Check if number of large / small compaction threads has changed, and then
616     // adjust the core pool size of the thread pools, by using the
617     // setCorePoolSize() method. According to the javadocs, it is safe to
618     // change the core pool size on-the-fly. We need to reset the maximum
619     // pool size, as well.
620     int largeThreads = Math.max(1, newConf.getInt(
621             LARGE_COMPACTION_THREADS,
622             LARGE_COMPACTION_THREADS_DEFAULT));
623     if (this.longCompactions.getCorePoolSize() != largeThreads) {
624       LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS +
625               " from " + this.longCompactions.getCorePoolSize() + " to " +
626               largeThreads);
627       this.longCompactions.setMaximumPoolSize(largeThreads);
628       this.longCompactions.setCorePoolSize(largeThreads);
629     }
630 
631     int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS,
632             SMALL_COMPACTION_THREADS_DEFAULT);
633     if (this.shortCompactions.getCorePoolSize() != smallThreads) {
634       LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS +
635                 " from " + this.shortCompactions.getCorePoolSize() + " to " +
636                 smallThreads);
637       this.shortCompactions.setMaximumPoolSize(smallThreads);
638       this.shortCompactions.setCorePoolSize(smallThreads);
639     }
640 
641     int splitThreads = newConf.getInt(SPLIT_THREADS,
642             SPLIT_THREADS_DEFAULT);
643     if (this.splits.getCorePoolSize() != splitThreads) {
644       LOG.info("Changing the value of " + SPLIT_THREADS +
645                 " from " + this.splits.getCorePoolSize() + " to " +
646                 splitThreads);
647       this.splits.setMaximumPoolSize(smallThreads);
648       this.splits.setCorePoolSize(smallThreads);
649     }
650 
651     int mergeThreads = newConf.getInt(MERGE_THREADS,
652             MERGE_THREADS_DEFAULT);
653     if (this.mergePool.getCorePoolSize() != mergeThreads) {
654       LOG.info("Changing the value of " + MERGE_THREADS +
655                 " from " + this.mergePool.getCorePoolSize() + " to " +
656                 mergeThreads);
657       this.mergePool.setMaximumPoolSize(smallThreads);
658       this.mergePool.setCorePoolSize(smallThreads);
659     }
660 
661     CompactionThroughputController old = this.compactionThroughputController;
662     if (old != null) {
663       old.stop("configuration change");
664     }
665     this.compactionThroughputController =
666         CompactionThroughputControllerFactory.create(server, newConf);
667 
668     // We change this atomically here instead of reloading the config in order that upstream
669     // would be the only one with the flexibility to reload the config.
670     this.conf.reloadConfiguration();
671   }
672 
673   protected int getSmallCompactionThreadNum() {
674     return this.shortCompactions.getCorePoolSize();
675   }
676 
677   public int getLargeCompactionThreadNum() {
678     return this.longCompactions.getCorePoolSize();
679   }
680 
681   /**
682    * {@inheritDoc}
683    */
684   @Override
685   public void registerChildren(ConfigurationManager manager) {
686     // No children to register.
687   }
688 
689   /**
690    * {@inheritDoc}
691    */
692   @Override
693   public void deregisterChildren(ConfigurationManager manager) {
694     // No children to register
695   }
696 
697   @VisibleForTesting
698   public CompactionThroughputController getCompactionThroughputController() {
699     return compactionThroughputController;
700   }
701 
702 }