1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.io.PrintWriter;
23 import java.io.StringWriter;
24 import java.util.ArrayList;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.PriorityBlockingQueue;
30 import java.util.concurrent.RejectedExecutionException;
31 import java.util.concurrent.RejectedExecutionHandler;
32 import java.util.concurrent.ThreadFactory;
33 import java.util.concurrent.ThreadPoolExecutor;
34 import java.util.concurrent.TimeUnit;
35
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.hbase.RemoteExceptionHandler;
40 import org.apache.hadoop.hbase.classification.InterfaceAudience;
41 import org.apache.hadoop.hbase.conf.ConfigurationManager;
42 import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
43 import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
44 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
45 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
46 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
47 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
48 import org.apache.hadoop.hbase.security.User;
49 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
50 import org.apache.hadoop.hbase.util.Pair;
51 import org.apache.hadoop.util.StringUtils;
52
53 import com.google.common.annotations.VisibleForTesting;
54 import com.google.common.base.Preconditions;
55
56
57
58
59 @InterfaceAudience.Private
60 public class CompactSplitThread implements CompactionRequestor, PropagatingConfigurationObserver {
61 static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
62
63
64 public final static String LARGE_COMPACTION_THREADS =
65 "hbase.regionserver.thread.compaction.large";
66 public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;
67
68
69 public final static String SMALL_COMPACTION_THREADS =
70 "hbase.regionserver.thread.compaction.small";
71 public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;
72
73
74 public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
75 public final static int SPLIT_THREADS_DEFAULT = 1;
76
77
78 public final static String MERGE_THREADS = "hbase.regionserver.thread.merge";
79 public final static int MERGE_THREADS_DEFAULT = 1;
80
81 public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
82 "hbase.regionserver.regionSplitLimit";
83 public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000;
84
85 private final HRegionServer server;
86 private final Configuration conf;
87
88 private final ThreadPoolExecutor longCompactions;
89 private final ThreadPoolExecutor shortCompactions;
90 private final ThreadPoolExecutor splits;
91 private final ThreadPoolExecutor mergePool;
92
93 private volatile CompactionThroughputController compactionThroughputController;
94
95
96
97
98
99
100 private int regionSplitLimit;
101
102
103 CompactSplitThread(HRegionServer server) {
104 super();
105 this.server = server;
106 this.conf = server.getConfiguration();
107 this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT,
108 DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
109
110 int largeThreads = Math.max(1, conf.getInt(
111 LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
112 int smallThreads = conf.getInt(
113 SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
114
115 int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
116
117
118 Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
119
120 final String n = Thread.currentThread().getName();
121
122 this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
123 60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(),
124 new ThreadFactory() {
125 @Override
126 public Thread newThread(Runnable r) {
127 Thread t = new Thread(r);
128 t.setName(n + "-longCompactions-" + System.currentTimeMillis());
129 return t;
130 }
131 });
132 this.longCompactions.setRejectedExecutionHandler(new Rejection());
133 this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
134 60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(),
135 new ThreadFactory() {
136 @Override
137 public Thread newThread(Runnable r) {
138 Thread t = new Thread(r);
139 t.setName(n + "-shortCompactions-" + System.currentTimeMillis());
140 return t;
141 }
142 });
143 this.shortCompactions
144 .setRejectedExecutionHandler(new Rejection());
145 this.splits = (ThreadPoolExecutor)
146 Executors.newFixedThreadPool(splitThreads,
147 new ThreadFactory() {
148 @Override
149 public Thread newThread(Runnable r) {
150 Thread t = new Thread(r);
151 t.setName(n + "-splits-" + System.currentTimeMillis());
152 return t;
153 }
154 });
155 int mergeThreads = conf.getInt(MERGE_THREADS, MERGE_THREADS_DEFAULT);
156 this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool(
157 mergeThreads, new ThreadFactory() {
158 @Override
159 public Thread newThread(Runnable r) {
160 Thread t = new Thread(r);
161 t.setName(n + "-merges-" + System.currentTimeMillis());
162 return t;
163 }
164 });
165
166
167 this.compactionThroughputController =
168 CompactionThroughputControllerFactory.create(server, conf);
169 }
170
171 @Override
172 public String toString() {
173 return "compaction_queue=("
174 + longCompactions.getQueue().size() + ":"
175 + shortCompactions.getQueue().size() + ")"
176 + ", split_queue=" + splits.getQueue().size()
177 + ", merge_queue=" + mergePool.getQueue().size();
178 }
179
180 public String dumpQueue() {
181 StringBuffer queueLists = new StringBuffer();
182 queueLists.append("Compaction/Split Queue dump:\n");
183 queueLists.append(" LargeCompation Queue:\n");
184 BlockingQueue<Runnable> lq = longCompactions.getQueue();
185 Iterator<Runnable> it = lq.iterator();
186 while (it.hasNext()) {
187 queueLists.append(" " + it.next().toString());
188 queueLists.append("\n");
189 }
190
191 if (shortCompactions != null) {
192 queueLists.append("\n");
193 queueLists.append(" SmallCompation Queue:\n");
194 lq = shortCompactions.getQueue();
195 it = lq.iterator();
196 while (it.hasNext()) {
197 queueLists.append(" " + it.next().toString());
198 queueLists.append("\n");
199 }
200 }
201
202 queueLists.append("\n");
203 queueLists.append(" Split Queue:\n");
204 lq = splits.getQueue();
205 it = lq.iterator();
206 while (it.hasNext()) {
207 queueLists.append(" " + it.next().toString());
208 queueLists.append("\n");
209 }
210
211 queueLists.append("\n");
212 queueLists.append(" Region Merge Queue:\n");
213 lq = mergePool.getQueue();
214 it = lq.iterator();
215 while (it.hasNext()) {
216 queueLists.append(" " + it.next().toString());
217 queueLists.append("\n");
218 }
219
220 return queueLists.toString();
221 }
222
223 public synchronized void requestRegionsMerge(final Region a,
224 final Region b, final boolean forcible, long masterSystemTime, User user) {
225 try {
226 mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible, masterSystemTime,user));
227 if (LOG.isDebugEnabled()) {
228 LOG.debug("Region merge requested for " + a + "," + b + ", forcible="
229 + forcible + ". " + this);
230 }
231 } catch (RejectedExecutionException ree) {
232 LOG.warn("Could not execute merge for " + a + "," + b + ", forcible="
233 + forcible, ree);
234 }
235 }
236
237 public synchronized boolean requestSplit(final Region r) {
238
239 if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= Store.PRIORITY_USER) {
240 byte[] midKey = ((HRegion)r).checkSplit();
241 if (midKey != null) {
242 requestSplit(r, midKey);
243 return true;
244 }
245 }
246 return false;
247 }
248
249 public synchronized void requestSplit(final Region r, byte[] midKey) {
250 requestSplit(r, midKey, null);
251 }
252
253
254
255
256 public synchronized void requestSplit(final Region r, byte[] midKey, User user) {
257 if (midKey == null) {
258 LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() +
259 " not splittable because midkey=null");
260 if (((HRegion)r).shouldForceSplit()) {
261 ((HRegion)r).clearSplit();
262 }
263 return;
264 }
265 try {
266 this.splits.execute(new SplitRequest(r, midKey, this.server, user));
267 if (LOG.isDebugEnabled()) {
268 LOG.debug("Split requested for " + r + ". " + this);
269 }
270 } catch (RejectedExecutionException ree) {
271 LOG.info("Could not execute split for " + r, ree);
272 }
273 }
274
275 @Override
276 public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why)
277 throws IOException {
278 return requestCompaction(r, why, null);
279 }
280
281 @Override
282 public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
283 List<Pair<CompactionRequest, Store>> requests) throws IOException {
284 return requestCompaction(r, why, Store.NO_PRIORITY, requests, null);
285 }
286
287 @Override
288 public synchronized CompactionRequest requestCompaction(final Region r, final Store s,
289 final String why, CompactionRequest request) throws IOException {
290 return requestCompaction(r, s, why, Store.NO_PRIORITY, request, null);
291 }
292
293 @Override
294 public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
295 int p, List<Pair<CompactionRequest, Store>> requests, User user) throws IOException {
296 return requestCompactionInternal(r, why, p, requests, true, user);
297 }
298
299 private List<CompactionRequest> requestCompactionInternal(final Region r, final String why,
300 int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow, User user)
301 throws IOException {
302
303 List<CompactionRequest> ret = null;
304 if (requests == null) {
305 ret = selectNow ? new ArrayList<CompactionRequest>(r.getStores().size()) : null;
306 for (Store s : r.getStores()) {
307 CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow, user);
308 if (selectNow) ret.add(cr);
309 }
310 } else {
311 Preconditions.checkArgument(selectNow);
312 ret = new ArrayList<CompactionRequest>(requests.size());
313 for (Pair<CompactionRequest, Store> pair : requests) {
314 ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst(), user));
315 }
316 }
317 return ret;
318 }
319
320 public CompactionRequest requestCompaction(final Region r, final Store s,
321 final String why, int priority, CompactionRequest request, User user) throws IOException {
322 return requestCompactionInternal(r, s, why, priority, request, true, user);
323 }
324
325 public synchronized void requestSystemCompaction(
326 final Region r, final String why) throws IOException {
327 requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false, null);
328 }
329
330 public void requestSystemCompaction(
331 final Region r, final Store s, final String why) throws IOException {
332 requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false, null);
333 }
334
335
336
337
338
339
340
341
342
343 private synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s,
344 final String why, int priority, CompactionRequest request, boolean selectNow, User user)
345 throws IOException {
346 if (this.server.isStopped()
347 || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
348 return null;
349 }
350
351 CompactionContext compaction = null;
352 if (selectNow) {
353 compaction = selectCompaction(r, s, priority, request, user);
354 if (compaction == null) return null;
355 }
356
357 final RegionServerSpaceQuotaManager spaceQuotaManager =
358 this.server.getRegionServerSpaceQuotaManager();
359 if (spaceQuotaManager != null && spaceQuotaManager.areCompactionsDisabled(
360 r.getTableDesc().getTableName())) {
361 if (LOG.isDebugEnabled()) {
362 LOG.debug("Ignoring compaction request for " + r + " as an active space quota violation "
363 + " policy disallows compactions.");
364 }
365 return null;
366 }
367
368
369
370 ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))
371 ? longCompactions : shortCompactions;
372 pool.execute(new CompactionRunner(s, r, compaction, pool, user));
373 if (LOG.isDebugEnabled()) {
374 String type = (pool == shortCompactions) ? "Small " : "Large ";
375 LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
376 + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
377 }
378 return selectNow ? compaction.getRequest() : null;
379 }
380
381 private CompactionContext selectCompaction(final Region r, final Store s,
382 int priority, CompactionRequest request, User user) throws IOException {
383 CompactionContext compaction = s.requestCompaction(priority, request, user);
384 if (compaction == null) {
385 if(LOG.isDebugEnabled() && r.getRegionInfo() != null) {
386 LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() +
387 " because compaction request was cancelled");
388 }
389 return null;
390 }
391 assert compaction.hasSelection();
392 if (priority != Store.NO_PRIORITY) {
393 compaction.getRequest().setPriority(priority);
394 }
395 return compaction;
396 }
397
398
399
400
401 void interruptIfNecessary() {
402 splits.shutdown();
403 mergePool.shutdown();
404 longCompactions.shutdown();
405 shortCompactions.shutdown();
406 }
407
408 private void waitFor(ThreadPoolExecutor t, String name) {
409 boolean done = false;
410 while (!done) {
411 try {
412 done = t.awaitTermination(60, TimeUnit.SECONDS);
413 LOG.info("Waiting for " + name + " to finish...");
414 if (!done) {
415 t.shutdownNow();
416 }
417 } catch (InterruptedException ie) {
418 LOG.warn("Interrupted waiting for " + name + " to finish...");
419 }
420 }
421 }
422
423 void join() {
424 waitFor(splits, "Split Thread");
425 waitFor(mergePool, "Merge Thread");
426 waitFor(longCompactions, "Large Compaction Thread");
427 waitFor(shortCompactions, "Small Compaction Thread");
428 }
429
430
431
432
433
434
435
436 public int getCompactionQueueSize() {
437 return longCompactions.getQueue().size() + shortCompactions.getQueue().size();
438 }
439
440 public int getLargeCompactionQueueSize() {
441 return longCompactions.getQueue().size();
442 }
443
444
445 public int getSmallCompactionQueueSize() {
446 return shortCompactions.getQueue().size();
447 }
448
449 public int getSplitQueueSize() {
450 return splits.getQueue().size();
451 }
452
453 private boolean shouldSplitRegion() {
454 if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) {
455 LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". "
456 + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt");
457 }
458 return (regionSplitLimit > server.getNumberOfOnlineRegions());
459 }
460
461
462
463
464 public int getRegionSplitLimit() {
465 return this.regionSplitLimit;
466 }
467
468 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
469 justification="Contrived use of compareTo")
470 private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
471 private final Store store;
472 private final HRegion region;
473 private CompactionContext compaction;
474 private int queuedPriority;
475 private ThreadPoolExecutor parent;
476 private User user;
477
478 public CompactionRunner(Store store, Region region,
479 CompactionContext compaction, ThreadPoolExecutor parent, User user) {
480 super();
481 this.store = store;
482 this.region = (HRegion)region;
483 this.compaction = compaction;
484 this.queuedPriority = (this.compaction == null)
485 ? store.getCompactPriority() : compaction.getRequest().getPriority();
486 this.parent = parent;
487 this.user = user;
488 }
489
490 @Override
491 public String toString() {
492 return (this.compaction != null) ? ("Request = " + compaction.getRequest())
493 : ("Store = " + store.toString() + ", pri = " + queuedPriority);
494 }
495
496 private void doCompaction(User user) {
497
498 if (this.compaction == null) {
499 int oldPriority = this.queuedPriority;
500 this.queuedPriority = this.store.getCompactPriority();
501 if (this.queuedPriority > oldPriority) {
502
503
504 this.parent.execute(this);
505 return;
506 }
507 try {
508 this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user);
509 } catch (IOException ex) {
510 LOG.error("Compaction selection failed " + this, ex);
511 server.checkFileSystem();
512 return;
513 }
514 if (this.compaction == null) return;
515
516
517 assert this.compaction.hasSelection();
518 ThreadPoolExecutor pool = store.throttleCompaction(
519 compaction.getRequest().getSize()) ? longCompactions : shortCompactions;
520 if (this.parent != pool) {
521 this.store.cancelRequestedCompaction(this.compaction);
522 this.compaction = null;
523 this.parent = pool;
524 this.parent.execute(this);
525 return;
526 }
527 }
528
529 assert this.compaction != null;
530
531 this.compaction.getRequest().beforeExecute();
532 try {
533
534
535 long start = EnvironmentEdgeManager.currentTime();
536 boolean completed =
537 region.compact(compaction, store, compactionThroughputController, user);
538 long now = EnvironmentEdgeManager.currentTime();
539 LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
540 this + "; duration=" + StringUtils.formatTimeDiff(now, start));
541 if (completed) {
542
543 if (store.getCompactPriority() <= 0) {
544 requestSystemCompaction(region, store, "Recursive enqueue");
545 } else {
546
547 requestSplit(region);
548 }
549 }
550 } catch (IOException ex) {
551 IOException remoteEx = RemoteExceptionHandler.checkIOException(ex);
552 LOG.error("Compaction failed " + this, remoteEx);
553 if (remoteEx != ex) {
554 LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
555 }
556 server.checkFileSystem();
557 } catch (Exception ex) {
558 LOG.error("Compaction failed " + this, ex);
559 server.checkFileSystem();
560 } finally {
561 LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
562 }
563 this.compaction.getRequest().afterExecute();
564 }
565
566 @Override
567 public void run() {
568 Preconditions.checkNotNull(server);
569 if (server.isStopped()
570 || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
571 return;
572 }
573 doCompaction(user);
574 }
575
576 private String formatStackTrace(Exception ex) {
577 StringWriter sw = new StringWriter();
578 PrintWriter pw = new PrintWriter(sw);
579 ex.printStackTrace(pw);
580 pw.flush();
581 return sw.toString();
582 }
583
584 @Override
585 public int compareTo(CompactionRunner o) {
586
587 int compareVal = queuedPriority - o.queuedPriority;
588 if (compareVal != 0) return compareVal;
589 CompactionContext tc = this.compaction, oc = o.compaction;
590
591 return (tc == null) ? ((oc == null) ? 0 : 1)
592 : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest()));
593 }
594 }
595
596
597
598
599 private static class Rejection implements RejectedExecutionHandler {
600 @Override
601 public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
602 if (runnable instanceof CompactionRunner) {
603 CompactionRunner runner = (CompactionRunner)runnable;
604 LOG.debug("Compaction Rejected: " + runner);
605 runner.store.cancelRequestedCompaction(runner.compaction);
606 }
607 }
608 }
609
610
611
612
613 @Override
614 public void onConfigurationChange(Configuration newConf) {
615
616
617
618
619
620 int largeThreads = Math.max(1, newConf.getInt(
621 LARGE_COMPACTION_THREADS,
622 LARGE_COMPACTION_THREADS_DEFAULT));
623 if (this.longCompactions.getCorePoolSize() != largeThreads) {
624 LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS +
625 " from " + this.longCompactions.getCorePoolSize() + " to " +
626 largeThreads);
627 this.longCompactions.setMaximumPoolSize(largeThreads);
628 this.longCompactions.setCorePoolSize(largeThreads);
629 }
630
631 int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS,
632 SMALL_COMPACTION_THREADS_DEFAULT);
633 if (this.shortCompactions.getCorePoolSize() != smallThreads) {
634 LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS +
635 " from " + this.shortCompactions.getCorePoolSize() + " to " +
636 smallThreads);
637 this.shortCompactions.setMaximumPoolSize(smallThreads);
638 this.shortCompactions.setCorePoolSize(smallThreads);
639 }
640
641 int splitThreads = newConf.getInt(SPLIT_THREADS,
642 SPLIT_THREADS_DEFAULT);
643 if (this.splits.getCorePoolSize() != splitThreads) {
644 LOG.info("Changing the value of " + SPLIT_THREADS +
645 " from " + this.splits.getCorePoolSize() + " to " +
646 splitThreads);
647 this.splits.setMaximumPoolSize(smallThreads);
648 this.splits.setCorePoolSize(smallThreads);
649 }
650
651 int mergeThreads = newConf.getInt(MERGE_THREADS,
652 MERGE_THREADS_DEFAULT);
653 if (this.mergePool.getCorePoolSize() != mergeThreads) {
654 LOG.info("Changing the value of " + MERGE_THREADS +
655 " from " + this.mergePool.getCorePoolSize() + " to " +
656 mergeThreads);
657 this.mergePool.setMaximumPoolSize(smallThreads);
658 this.mergePool.setCorePoolSize(smallThreads);
659 }
660
661 CompactionThroughputController old = this.compactionThroughputController;
662 if (old != null) {
663 old.stop("configuration change");
664 }
665 this.compactionThroughputController =
666 CompactionThroughputControllerFactory.create(server, newConf);
667
668
669
670 this.conf.reloadConfiguration();
671 }
672
673 protected int getSmallCompactionThreadNum() {
674 return this.shortCompactions.getCorePoolSize();
675 }
676
677 public int getLargeCompactionThreadNum() {
678 return this.longCompactions.getCorePoolSize();
679 }
680
681
682
683
684 @Override
685 public void registerChildren(ConfigurationManager manager) {
686
687 }
688
689
690
691
692 @Override
693 public void deregisterChildren(ConfigurationManager manager) {
694
695 }
696
697 @VisibleForTesting
698 public CompactionThroughputController getCompactionThroughputController() {
699 return compactionThroughputController;
700 }
701
702 }