View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.executor;
20  
21  import java.io.IOException;
22  import java.io.Writer;
23  import java.lang.management.ThreadInfo;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Map.Entry;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.ConcurrentHashMap;
29  import java.util.concurrent.ConcurrentMap;
30  import java.util.concurrent.LinkedBlockingQueue;
31  import java.util.concurrent.ThreadPoolExecutor;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.atomic.AtomicLong;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.hbase.classification.InterfaceAudience;
38  import org.apache.hadoop.hbase.executor.EventHandler.EventHandlerListener;
39  import org.apache.hadoop.hbase.monitoring.ThreadMonitoring;
40  
41  import com.google.common.annotations.VisibleForTesting;
42  import com.google.common.collect.Lists;
43  import com.google.common.collect.Maps;
44  import com.google.common.util.concurrent.ThreadFactoryBuilder;
45  
46  /**
47   * This is a generic executor service. This component abstracts a
48   * threadpool, a queue to which {@link EventType}s can be submitted,
49   * and a <code>Runnable</code> that handles the object that is added to the queue.
50   *
51   * <p>In order to create a new service, create an instance of this class and
52   * then do: <code>instance.startExecutorService("myService");</code>.  When done
53   * call {@link #shutdown()}.
54   *
55   * <p>In order to use the service created above, call
56   * {@link #submit(EventHandler)}. Register pre- and post- processing listeners
57   * by registering your implementation of {@link EventHandler.EventHandlerListener}
58   * with {@link #registerListener(EventType, EventHandler.EventHandlerListener)}.  Be sure
59   * to deregister your listener when done via {@link #unregisterListener(EventType)}.
60   */
61  @InterfaceAudience.Private
62  public class ExecutorService {
63    private static final Log LOG = LogFactory.getLog(ExecutorService.class);
64  
65    // hold the all the executors created in a map addressable by their names
66    private final ConcurrentHashMap<String, Executor> executorMap =
67      new ConcurrentHashMap<String, Executor>();
68  
69    // listeners that are called before and after an event is processed
70    private ConcurrentHashMap<EventType, EventHandlerListener> eventHandlerListeners =
71      new ConcurrentHashMap<EventType, EventHandlerListener>();
72  
73    // Name of the server hosting this executor service.
74    private final String servername;
75  
76    /**
77     * Default constructor.
78     * @param servername Name of the hosting server.
79     */
80    public ExecutorService(final String servername) {
81      super();
82      this.servername = servername;
83    }
84  
85    /**
86     * Start an executor service with a given name. If there was a service already
87     * started with the same name, this throws a RuntimeException.
88     * @param name Name of the service to start.
89     */
90    void startExecutorService(String name, int maxThreads) {
91      if (this.executorMap.get(name) != null) {
92        throw new RuntimeException("An executor service with the name " + name +
93          " is already running!");
94      }
95      Executor hbes = new Executor(name, maxThreads, this.eventHandlerListeners);
96      if (this.executorMap.putIfAbsent(name, hbes) != null) {
97        throw new RuntimeException("An executor service with the name " + name +
98        " is already running (2)!");
99      }
100     LOG.debug("Starting executor service name=" + name +
101       ", corePoolSize=" + hbes.threadPoolExecutor.getCorePoolSize() +
102       ", maxPoolSize=" + hbes.threadPoolExecutor.getMaximumPoolSize());
103   }
104 
105   boolean isExecutorServiceRunning(String name) {
106     return this.executorMap.containsKey(name);
107   }
108 
109   public void shutdown() {
110     for(Entry<String, Executor> entry: this.executorMap.entrySet()) {
111       List<Runnable> wasRunning =
112         entry.getValue().threadPoolExecutor.shutdownNow();
113       if (!wasRunning.isEmpty()) {
114         LOG.info(entry.getValue() + " had " + wasRunning + " on shutdown");
115       }
116     }
117     this.executorMap.clear();
118   }
119 
120   Executor getExecutor(final ExecutorType type) {
121     return getExecutor(type.getExecutorName(this.servername));
122   }
123 
124   Executor getExecutor(String name) {
125     Executor executor = this.executorMap.get(name);
126     return executor;
127   }
128 
129   @VisibleForTesting
130   public ThreadPoolExecutor getExecutorThreadPool(final ExecutorType type) {
131     return getExecutor(type).getThreadPoolExecutor();
132   }
133 
134   public void startExecutorService(final ExecutorType type, final int maxThreads) {
135     String name = type.getExecutorName(this.servername);
136     if (isExecutorServiceRunning(name)) {
137       LOG.debug("Executor service " + toString() + " already running on " +
138         this.servername);
139       return;
140     }
141     startExecutorService(name, maxThreads);
142   }
143 
144   public void submit(final EventHandler eh) {
145     Executor executor = getExecutor(eh.getEventType().getExecutorServiceType());
146     if (executor == null) {
147       // This happens only when events are submitted after shutdown() was
148       // called, so dropping them should be "ok" since it means we're
149       // shutting down.
150       LOG.error("Cannot submit [" + eh + "] because the executor is missing." +
151         " Is this process shutting down?");
152     } else {
153       executor.submit(eh);
154     }
155   }
156 
157   /**
158    * Subscribe to updates before and after processing instances of
159    * {@link EventType}.  Currently only one listener per
160    * event type.
161    * @param type Type of event we're registering listener for
162    * @param listener The listener to run.
163    */
164   public void registerListener(final EventType type,
165       final EventHandlerListener listener) {
166     this.eventHandlerListeners.put(type, listener);
167   }
168 
169   /**
170    * Stop receiving updates before and after processing instances of
171    * {@link EventType}
172    * @param type Type of event we're registering listener for
173    * @return The listener we removed or null if we did not remove it.
174    */
175   public EventHandlerListener unregisterListener(final EventType type) {
176     return this.eventHandlerListeners.remove(type);
177   }
178 
179   public Map<String, ExecutorStatus> getAllExecutorStatuses() {
180     Map<String, ExecutorStatus> ret = Maps.newHashMap();
181     for (Map.Entry<String, Executor> e : executorMap.entrySet()) {
182       ret.put(e.getKey(), e.getValue().getStatus());
183     }
184     return ret;
185   }
186 
187   /**
188    * Executor instance.
189    */
190   static class Executor {
191     // how long to retain excess threads
192     static final long keepAliveTimeInMillis = 1000;
193     // the thread pool executor that services the requests
194     final TrackingThreadPoolExecutor threadPoolExecutor;
195     // work queue to use - unbounded queue
196     final BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
197     private final String name;
198     private final Map<EventType, EventHandlerListener> eventHandlerListeners;
199     private static final AtomicLong seqids = new AtomicLong(0);
200     private final long id;
201 
202     protected Executor(String name, int maxThreads,
203         final Map<EventType, EventHandlerListener> eventHandlerListeners) {
204       this.id = seqids.incrementAndGet();
205       this.name = name;
206       this.eventHandlerListeners = eventHandlerListeners;
207       // create the thread pool executor
208       this.threadPoolExecutor = new TrackingThreadPoolExecutor(
209           maxThreads, maxThreads,
210           keepAliveTimeInMillis, TimeUnit.MILLISECONDS, q);
211       // name the threads for this threadpool
212       ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
213       tfb.setNameFormat(this.name + "-%d");
214       this.threadPoolExecutor.setThreadFactory(tfb.build());
215     }
216 
217     /**
218      * Submit the event to the queue for handling.
219      * @param event
220      */
221     void submit(final EventHandler event) {
222       // If there is a listener for this type, make sure we call the before
223       // and after process methods.
224       EventHandlerListener listener =
225         this.eventHandlerListeners.get(event.getEventType());
226       if (listener != null) {
227         event.setListener(listener);
228       }
229       this.threadPoolExecutor.execute(event);
230     }
231 
232     TrackingThreadPoolExecutor getThreadPoolExecutor() {
233       return threadPoolExecutor;
234     }
235 
236     @Override
237     public String toString() {
238       return getClass().getSimpleName() + "-" + id + "-" + name;
239     }
240 
241     public ExecutorStatus getStatus() {
242       List<EventHandler> queuedEvents = Lists.newArrayList();
243       for (Runnable r : q) {
244         if (!(r instanceof EventHandler)) {
245           LOG.warn("Non-EventHandler " + r + " queued in " + name);
246           continue;
247         }
248         queuedEvents.add((EventHandler)r);
249       }
250 
251       List<RunningEventStatus> running = Lists.newArrayList();
252       for (Map.Entry<Thread, Runnable> e :
253           threadPoolExecutor.getRunningTasks().entrySet()) {
254         Runnable r = e.getValue();
255         if (!(r instanceof EventHandler)) {
256           LOG.warn("Non-EventHandler " + r + " running in " + name);
257           continue;
258         }
259         running.add(new RunningEventStatus(e.getKey(), (EventHandler)r));
260       }
261 
262       return new ExecutorStatus(this, queuedEvents, running);
263     }
264   }
265 
266   /**
267    * A subclass of ThreadPoolExecutor that keeps track of the Runnables that
268    * are executing at any given point in time.
269    */
270   static class TrackingThreadPoolExecutor extends ThreadPoolExecutor {
271     private ConcurrentMap<Thread, Runnable> running = Maps.newConcurrentMap();
272 
273     public TrackingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
274         long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
275       super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
276     }
277 
278     @Override
279     protected void afterExecute(Runnable r, Throwable t) {
280       super.afterExecute(r, t);
281       running.remove(Thread.currentThread());
282     }
283 
284     @Override
285     protected void beforeExecute(Thread t, Runnable r) {
286       Runnable oldPut = running.put(t, r);
287       assert oldPut == null : "inconsistency for thread " + t;
288       super.beforeExecute(t, r);
289     }
290 
291     /**
292      * @return a map of the threads currently running tasks
293      * inside this executor. Each key is an active thread,
294      * and the value is the task that is currently running.
295      * Note that this is not a stable snapshot of the map.
296      */
297     public ConcurrentMap<Thread, Runnable> getRunningTasks() {
298       return running;
299     }
300   }
301 
302   /**
303    * A snapshot of the status of a particular executor. This includes
304    * the contents of the executor's pending queue, as well as the
305    * threads and events currently being processed.
306    *
307    * This is a consistent snapshot that is immutable once constructed.
308    */
309   public static class ExecutorStatus {
310     final Executor executor;
311     final List<EventHandler> queuedEvents;
312     final List<RunningEventStatus> running;
313 
314     ExecutorStatus(Executor executor,
315         List<EventHandler> queuedEvents,
316         List<RunningEventStatus> running) {
317       this.executor = executor;
318       this.queuedEvents = queuedEvents;
319       this.running = running;
320     }
321 
322     /**
323      * Dump a textual representation of the executor's status
324      * to the given writer.
325      *
326      * @param out the stream to write to
327      * @param indent a string prefix for each line, used for indentation
328      */
329     public void dumpTo(Writer out, String indent) throws IOException {
330       out.write(indent + "Status for executor: " + executor + "\n");
331       out.write(indent + "=======================================\n");
332       out.write(indent + queuedEvents.size() + " events queued, " +
333           running.size() + " running\n");
334       if (!queuedEvents.isEmpty()) {
335         out.write(indent + "Queued:\n");
336         for (EventHandler e : queuedEvents) {
337           out.write(indent + "  " + e + "\n");
338         }
339         out.write("\n");
340       }
341       if (!running.isEmpty()) {
342         out.write(indent + "Running:\n");
343         for (RunningEventStatus stat : running) {
344           out.write(indent + "  Running on thread '" +
345               stat.threadInfo.getThreadName() +
346               "': " + stat.event + "\n");
347           out.write(ThreadMonitoring.formatThreadInfo(
348               stat.threadInfo, indent + "  "));
349           out.write("\n");
350         }
351       }
352       out.flush();
353     }
354   }
355 
356   /**
357    * The status of a particular event that is in the middle of being
358    * handled by an executor.
359    */
360   public static class RunningEventStatus {
361     final ThreadInfo threadInfo;
362     final EventHandler event;
363 
364     public RunningEventStatus(Thread t, EventHandler event) {
365       this.threadInfo = ThreadMonitoring.getThreadInfo(t);
366       this.event = event;
367     }
368   }
369 }