1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.backup.regionserver;
20
21 import java.io.Closeable;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorCompletionService;
27 import java.util.concurrent.Future;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.ThreadPoolExecutor;
30 import java.util.concurrent.TimeUnit;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.hbase.Abortable;
36 import org.apache.hadoop.hbase.DaemonThreadFactory;
37 import org.apache.hadoop.hbase.errorhandling.ForeignException;
38
39
40
41
42
43 public class LogRollBackupSubprocedurePool implements Closeable, Abortable {
44 private static final Log LOG = LogFactory.getLog(LogRollBackupSubprocedurePool.class);
45
46
47 private static final String CONCURENT_BACKUP_TASKS_KEY = "hbase.backup.region.concurrentTasks";
48 private static final int DEFAULT_CONCURRENT_BACKUP_TASKS = 3;
49
50 private final ExecutorCompletionService<Void> taskPool;
51 private final ThreadPoolExecutor executor;
52 private volatile boolean aborted;
53 private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
54 private final String name;
55
56 public LogRollBackupSubprocedurePool(String name, Configuration conf) {
57
58 long keepAlive =
59 conf.getLong(LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_KEY,
60 LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_DEFAULT);
61 int threads = conf.getInt(CONCURENT_BACKUP_TASKS_KEY, DEFAULT_CONCURRENT_BACKUP_TASKS);
62 this.name = name;
63 executor =
64 new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.SECONDS,
65 new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory("rs(" + name
66 + ")-backup-pool"));
67 taskPool = new ExecutorCompletionService<Void>(executor);
68 }
69
70
71
72
73 public void submitTask(final Callable<Void> task) {
74 Future<Void> f = this.taskPool.submit(task);
75 futures.add(f);
76 }
77
78
79
80
81
82
83 public boolean waitForOutstandingTasks() throws ForeignException {
84 LOG.debug("Waiting for backup procedure to finish.");
85
86 try {
87 for (Future<Void> f : futures) {
88 f.get();
89 }
90 return true;
91 } catch (InterruptedException e) {
92 if (aborted) {
93 throw new ForeignException("Interrupted and found to be aborted while waiting for tasks!",
94 e);
95 }
96 Thread.currentThread().interrupt();
97 } catch (ExecutionException e) {
98 if (e.getCause() instanceof ForeignException) {
99 throw (ForeignException) e.getCause();
100 }
101 throw new ForeignException(name, e.getCause());
102 } finally {
103
104 for (Future<Void> f : futures) {
105 if (!f.isDone()) {
106 f.cancel(true);
107 }
108 }
109 }
110 return false;
111 }
112
113
114
115
116
117 @Override
118 public void close() {
119 executor.shutdown();
120 }
121
122 @Override
123 public void abort(String why, Throwable e) {
124 if (this.aborted) {
125 return;
126 }
127
128 this.aborted = true;
129 LOG.warn("Aborting because: " + why, e);
130 this.executor.shutdownNow();
131 }
132
133 @Override
134 public boolean isAborted() {
135 return this.aborted;
136 }
137 }