1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.procedure;
19
20 import java.io.Closeable;
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorCompletionService;
27 import java.util.concurrent.Future;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.ThreadPoolExecutor;
30 import java.util.concurrent.TimeUnit;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.hbase.Abortable;
36 import org.apache.hadoop.hbase.DaemonThreadFactory;
37 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
38 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
39 import org.apache.hadoop.hbase.errorhandling.ForeignException;
40 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
41 import org.apache.zookeeper.KeeperException;
42
43 public class SimpleRSProcedureManager extends RegionServerProcedureManager {
44
45 private static final Log LOG = LogFactory.getLog(SimpleRSProcedureManager.class);
46
47 private RegionServerServices rss;
48 private ProcedureMemberRpcs memberRpcs;
49 private ProcedureMember member;
50
51 @Override
52 public void initialize(RegionServerServices rss) throws IOException {
53 this.rss = rss;
54 ZooKeeperWatcher zkw = rss.getZooKeeper();
55 this.memberRpcs = new ZKProcedureMemberRpcs(zkw, getProcedureSignature());
56
57 ThreadPoolExecutor pool =
58 ProcedureMember.defaultPool(rss.getServerName().toString(), 1);
59 this.member = new ProcedureMember(memberRpcs, pool, new SimleSubprocedureBuilder());
60 LOG.info("Initialized: " + rss.getServerName().toString());
61 }
62
63 @Override
64 public void start() {
65 this.memberRpcs.start(rss.getServerName().toString(), member);
66 LOG.info("Started.");
67 }
68
69 @Override
70 public void stop(boolean force) throws IOException {
71 LOG.info("stop: " + force);
72 try {
73 this.member.close();
74 } finally {
75 this.memberRpcs.close();
76 }
77 }
78
79 @Override
80 public String getProcedureSignature() {
81 return SimpleMasterProcedureManager.SIMPLE_SIGNATURE;
82 }
83
84
85
86
87
88 public Subprocedure buildSubprocedure(String name) {
89
90
91 if (rss.isStopping() || rss.isStopped()) {
92 throw new IllegalStateException("Can't start procedure on RS: " + rss.getServerName()
93 + ", because stopping/stopped!");
94 }
95
96 LOG.info("Attempting to run a procedure.");
97 ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher();
98 Configuration conf = rss.getConfiguration();
99
100 SimpleSubprocedurePool taskManager =
101 new SimpleSubprocedurePool(rss.getServerName().toString(), conf);
102 return new SimpleSubprocedure(rss, member, errorDispatcher, taskManager, name);
103 }
104
105
106
107
108 public class SimleSubprocedureBuilder implements SubprocedureFactory {
109
110 @Override
111 public Subprocedure buildSubprocedure(String name, byte[] data) {
112 LOG.info("Building procedure: " + name);
113 return SimpleRSProcedureManager.this.buildSubprocedure(name);
114 }
115 }
116
117 public class SimpleSubprocedurePool implements Closeable, Abortable {
118
119 private final ExecutorCompletionService<Void> taskPool;
120 private final ThreadPoolExecutor executor;
121 private volatile boolean aborted;
122 private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
123 private final String name;
124
125 public SimpleSubprocedurePool(String name, Configuration conf) {
126 this.name = name;
127 executor = new ThreadPoolExecutor(1, 1, 500, TimeUnit.SECONDS,
128 new LinkedBlockingQueue<Runnable>(),
129 new DaemonThreadFactory("rs(" + name + ")-procedure-pool"));
130 taskPool = new ExecutorCompletionService<Void>(executor);
131 }
132
133
134
135
136 public void submitTask(final Callable<Void> task) {
137 Future<Void> f = this.taskPool.submit(task);
138 futures.add(f);
139 }
140
141
142
143
144
145
146
147 public boolean waitForOutstandingTasks() throws ForeignException {
148 LOG.debug("Waiting for procedure to finish.");
149
150 try {
151 for (Future<Void> f: futures) {
152 f.get();
153 }
154 return true;
155 } catch (InterruptedException e) {
156 if (aborted) throw new ForeignException(
157 "Interrupted and found to be aborted while waiting for tasks!", e);
158 Thread.currentThread().interrupt();
159 } catch (ExecutionException e) {
160 if (e.getCause() instanceof ForeignException) {
161 throw (ForeignException) e.getCause();
162 }
163 throw new ForeignException(name, e.getCause());
164 } finally {
165
166 for (Future<Void> f: futures) {
167 if (!f.isDone()) {
168 f.cancel(true);
169 }
170 }
171 }
172 return false;
173 }
174
175
176
177
178
179 @Override
180 public void close() {
181 executor.shutdown();
182 }
183
184 @Override
185 public void abort(String why, Throwable e) {
186 if (this.aborted) return;
187
188 this.aborted = true;
189 LOG.warn("Aborting because: " + why, e);
190 this.executor.shutdownNow();
191 }
192
193 @Override
194 public boolean isAborted() {
195 return this.aborted;
196 }
197 }
198
199 public class SimpleSubprocedure extends Subprocedure {
200 private final RegionServerServices rss;
201 private final SimpleSubprocedurePool taskManager;
202
203 public SimpleSubprocedure(RegionServerServices rss, ProcedureMember member,
204 ForeignExceptionDispatcher errorListener, SimpleSubprocedurePool taskManager, String name) {
205 super(member, name, errorListener, 500, 60000);
206 LOG.info("Constructing a SimpleSubprocedure.");
207 this.rss = rss;
208 this.taskManager = taskManager;
209 }
210
211
212
213
214
215
216 class RSSimpleTask implements Callable<Void> {
217 RSSimpleTask() {}
218
219 @Override
220 public Void call() throws Exception {
221 LOG.info("Execute subprocedure on " + rss.getServerName().toString());
222 return null;
223 }
224
225 }
226
227 private void execute() throws ForeignException {
228
229 monitor.rethrowException();
230
231
232 taskManager.submitTask(new RSSimpleTask());
233 monitor.rethrowException();
234
235
236 taskManager.waitForOutstandingTasks();
237 monitor.rethrowException();
238
239 }
240
241 @Override
242 public void acquireBarrier() throws ForeignException {
243
244 }
245
246
247
248
249 @Override
250 public byte[] insideBarrier() throws ForeignException {
251 execute();
252 return SimpleMasterProcedureManager.SIMPLE_DATA.getBytes();
253 }
254
255
256
257
258 @Override
259 public void cleanup(Exception e) {
260 taskManager.abort("Aborting simple subprocedure tasks due to error", e);
261 }
262 }
263
264 }