1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.executor;
20
21 import java.io.IOException;
22 import java.io.Writer;
23 import java.lang.management.ThreadInfo;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Map.Entry;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.ConcurrentMap;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicLong;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.hbase.classification.InterfaceAudience;
38 import org.apache.hadoop.hbase.executor.EventHandler.EventHandlerListener;
39 import org.apache.hadoop.hbase.monitoring.ThreadMonitoring;
40
41 import com.google.common.annotations.VisibleForTesting;
42 import com.google.common.collect.Lists;
43 import com.google.common.collect.Maps;
44 import com.google.common.util.concurrent.ThreadFactoryBuilder;
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61 @InterfaceAudience.Private
62 public class ExecutorService {
63 private static final Log LOG = LogFactory.getLog(ExecutorService.class);
64
65
66 private final ConcurrentHashMap<String, Executor> executorMap =
67 new ConcurrentHashMap<String, Executor>();
68
69
70 private ConcurrentHashMap<EventType, EventHandlerListener> eventHandlerListeners =
71 new ConcurrentHashMap<EventType, EventHandlerListener>();
72
73
74 private final String servername;
75
76
77
78
79
80 public ExecutorService(final String servername) {
81 super();
82 this.servername = servername;
83 }
84
85
86
87
88
89
90 void startExecutorService(String name, int maxThreads) {
91 if (this.executorMap.get(name) != null) {
92 throw new RuntimeException("An executor service with the name " + name +
93 " is already running!");
94 }
95 Executor hbes = new Executor(name, maxThreads, this.eventHandlerListeners);
96 if (this.executorMap.putIfAbsent(name, hbes) != null) {
97 throw new RuntimeException("An executor service with the name " + name +
98 " is already running (2)!");
99 }
100 LOG.debug("Starting executor service name=" + name +
101 ", corePoolSize=" + hbes.threadPoolExecutor.getCorePoolSize() +
102 ", maxPoolSize=" + hbes.threadPoolExecutor.getMaximumPoolSize());
103 }
104
105 boolean isExecutorServiceRunning(String name) {
106 return this.executorMap.containsKey(name);
107 }
108
109 public void shutdown() {
110 for(Entry<String, Executor> entry: this.executorMap.entrySet()) {
111 List<Runnable> wasRunning =
112 entry.getValue().threadPoolExecutor.shutdownNow();
113 if (!wasRunning.isEmpty()) {
114 LOG.info(entry.getValue() + " had " + wasRunning + " on shutdown");
115 }
116 }
117 this.executorMap.clear();
118 }
119
120 Executor getExecutor(final ExecutorType type) {
121 return getExecutor(type.getExecutorName(this.servername));
122 }
123
124 Executor getExecutor(String name) {
125 Executor executor = this.executorMap.get(name);
126 return executor;
127 }
128
129 @VisibleForTesting
130 public ThreadPoolExecutor getExecutorThreadPool(final ExecutorType type) {
131 return getExecutor(type).getThreadPoolExecutor();
132 }
133
134 public void startExecutorService(final ExecutorType type, final int maxThreads) {
135 String name = type.getExecutorName(this.servername);
136 if (isExecutorServiceRunning(name)) {
137 LOG.debug("Executor service " + toString() + " already running on " +
138 this.servername);
139 return;
140 }
141 startExecutorService(name, maxThreads);
142 }
143
144 public void submit(final EventHandler eh) {
145 Executor executor = getExecutor(eh.getEventType().getExecutorServiceType());
146 if (executor == null) {
147
148
149
150 LOG.error("Cannot submit [" + eh + "] because the executor is missing." +
151 " Is this process shutting down?");
152 } else {
153 executor.submit(eh);
154 }
155 }
156
157
158
159
160
161
162
163
164 public void registerListener(final EventType type,
165 final EventHandlerListener listener) {
166 this.eventHandlerListeners.put(type, listener);
167 }
168
169
170
171
172
173
174
175 public EventHandlerListener unregisterListener(final EventType type) {
176 return this.eventHandlerListeners.remove(type);
177 }
178
179 public Map<String, ExecutorStatus> getAllExecutorStatuses() {
180 Map<String, ExecutorStatus> ret = Maps.newHashMap();
181 for (Map.Entry<String, Executor> e : executorMap.entrySet()) {
182 ret.put(e.getKey(), e.getValue().getStatus());
183 }
184 return ret;
185 }
186
187
188
189
190 static class Executor {
191
192 static final long keepAliveTimeInMillis = 1000;
193
194 final TrackingThreadPoolExecutor threadPoolExecutor;
195
196 final BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
197 private final String name;
198 private final Map<EventType, EventHandlerListener> eventHandlerListeners;
199 private static final AtomicLong seqids = new AtomicLong(0);
200 private final long id;
201
202 protected Executor(String name, int maxThreads,
203 final Map<EventType, EventHandlerListener> eventHandlerListeners) {
204 this.id = seqids.incrementAndGet();
205 this.name = name;
206 this.eventHandlerListeners = eventHandlerListeners;
207
208 this.threadPoolExecutor = new TrackingThreadPoolExecutor(
209 maxThreads, maxThreads,
210 keepAliveTimeInMillis, TimeUnit.MILLISECONDS, q);
211
212 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
213 tfb.setNameFormat(this.name + "-%d");
214 this.threadPoolExecutor.setThreadFactory(tfb.build());
215 }
216
217
218
219
220
221 void submit(final EventHandler event) {
222
223
224 EventHandlerListener listener =
225 this.eventHandlerListeners.get(event.getEventType());
226 if (listener != null) {
227 event.setListener(listener);
228 }
229 this.threadPoolExecutor.execute(event);
230 }
231
232 TrackingThreadPoolExecutor getThreadPoolExecutor() {
233 return threadPoolExecutor;
234 }
235
236 @Override
237 public String toString() {
238 return getClass().getSimpleName() + "-" + id + "-" + name;
239 }
240
241 public ExecutorStatus getStatus() {
242 List<EventHandler> queuedEvents = Lists.newArrayList();
243 for (Runnable r : q) {
244 if (!(r instanceof EventHandler)) {
245 LOG.warn("Non-EventHandler " + r + " queued in " + name);
246 continue;
247 }
248 queuedEvents.add((EventHandler)r);
249 }
250
251 List<RunningEventStatus> running = Lists.newArrayList();
252 for (Map.Entry<Thread, Runnable> e :
253 threadPoolExecutor.getRunningTasks().entrySet()) {
254 Runnable r = e.getValue();
255 if (!(r instanceof EventHandler)) {
256 LOG.warn("Non-EventHandler " + r + " running in " + name);
257 continue;
258 }
259 running.add(new RunningEventStatus(e.getKey(), (EventHandler)r));
260 }
261
262 return new ExecutorStatus(this, queuedEvents, running);
263 }
264 }
265
266
267
268
269
270 static class TrackingThreadPoolExecutor extends ThreadPoolExecutor {
271 private ConcurrentMap<Thread, Runnable> running = Maps.newConcurrentMap();
272
273 public TrackingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
274 long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
275 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
276 }
277
278 @Override
279 protected void afterExecute(Runnable r, Throwable t) {
280 super.afterExecute(r, t);
281 running.remove(Thread.currentThread());
282 }
283
284 @Override
285 protected void beforeExecute(Thread t, Runnable r) {
286 Runnable oldPut = running.put(t, r);
287 assert oldPut == null : "inconsistency for thread " + t;
288 super.beforeExecute(t, r);
289 }
290
291
292
293
294
295
296
297 public ConcurrentMap<Thread, Runnable> getRunningTasks() {
298 return running;
299 }
300 }
301
302
303
304
305
306
307
308
309 public static class ExecutorStatus {
310 final Executor executor;
311 final List<EventHandler> queuedEvents;
312 final List<RunningEventStatus> running;
313
314 ExecutorStatus(Executor executor,
315 List<EventHandler> queuedEvents,
316 List<RunningEventStatus> running) {
317 this.executor = executor;
318 this.queuedEvents = queuedEvents;
319 this.running = running;
320 }
321
322
323
324
325
326
327
328
329 public void dumpTo(Writer out, String indent) throws IOException {
330 out.write(indent + "Status for executor: " + executor + "\n");
331 out.write(indent + "=======================================\n");
332 out.write(indent + queuedEvents.size() + " events queued, " +
333 running.size() + " running\n");
334 if (!queuedEvents.isEmpty()) {
335 out.write(indent + "Queued:\n");
336 for (EventHandler e : queuedEvents) {
337 out.write(indent + " " + e + "\n");
338 }
339 out.write("\n");
340 }
341 if (!running.isEmpty()) {
342 out.write(indent + "Running:\n");
343 for (RunningEventStatus stat : running) {
344 out.write(indent + " Running on thread '" +
345 stat.threadInfo.getThreadName() +
346 "': " + stat.event + "\n");
347 out.write(ThreadMonitoring.formatThreadInfo(
348 stat.threadInfo, indent + " "));
349 out.write("\n");
350 }
351 }
352 out.flush();
353 }
354 }
355
356
357
358
359
360 public static class RunningEventStatus {
361 final ThreadInfo threadInfo;
362 final EventHandler event;
363
364 public RunningEventStatus(Thread t, EventHandler event) {
365 this.threadInfo = ThreadMonitoring.getThreadInfo(t);
366 this.event = event;
367 }
368 }
369 }