1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.ipc;
20
21
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.ThreadLocalRandom;
26 import java.util.concurrent.atomic.AtomicInteger;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.Abortable;
32 import org.apache.hadoop.hbase.HConstants;
33 import org.apache.hadoop.hbase.classification.InterfaceAudience;
34 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
35 import org.apache.hadoop.util.StringUtils;
36
37 import com.google.common.base.Preconditions;
38 import com.google.common.base.Strings;
39
40
41
42
43
44 @InterfaceAudience.Private
45 public abstract class RpcExecutor {
46 private static final Log LOG = LogFactory.getLog(RpcExecutor.class);
47
48 protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250;
49 protected volatile int currentQueueLimit;
50
51 private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
52 private final List<Handler> handlers;
53 private final int handlerCount;
54 private final String name;
55 private final AtomicInteger failedHandlerCount = new AtomicInteger(0);
56
57 private boolean running;
58
59 private Configuration conf = null;
60 private Abortable abortable = null;
61
62 public RpcExecutor(final String name, final int handlerCount) {
63 this.handlers = new ArrayList<Handler>(handlerCount);
64 this.handlerCount = handlerCount;
65 this.name = Strings.nullToEmpty(name);
66 }
67
68 public RpcExecutor(final String name, final int handlerCount, final Configuration conf,
69 final Abortable abortable) {
70 this(name, handlerCount);
71 this.conf = conf;
72 this.abortable = abortable;
73 }
74
75 public void start(final int port) {
76 running = true;
77 startHandlers(port);
78 }
79
80 public void stop() {
81 running = false;
82 for (Thread handler : handlers) {
83 handler.interrupt();
84 }
85 }
86
87 public int getActiveHandlerCount() {
88 return activeHandlerCount.get();
89 }
90
91
92 public abstract int getQueueLength();
93
94
95 public abstract boolean dispatch(final CallRunner callTask) throws InterruptedException;
96
97
98 protected abstract List<BlockingQueue<CallRunner>> getQueues();
99
100 protected void startHandlers(final int port) {
101 List<BlockingQueue<CallRunner>> callQueues = getQueues();
102 startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port);
103 }
104
105
106
107
108 protected Handler getHandler(final String name, final double handlerFailureThreshhold,
109 final BlockingQueue<CallRunner> q) {
110 return new Handler(name, handlerFailureThreshhold, q);
111 }
112
113
114
115
116 protected void startHandlers(final String nameSuffix, final int numHandlers,
117 final List<BlockingQueue<CallRunner>> callQueues,
118 final int qindex, final int qsize, final int port) {
119 final String threadPrefix = name + Strings.nullToEmpty(nameSuffix);
120 double handlerFailureThreshhold =
121 conf == null ? 1.0 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT,
122 HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT);
123 for (int i = 0; i < numHandlers; i++) {
124 final int index = qindex + (i % qsize);
125 String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" +
126 index + ",port=" + port;
127 Handler handler = getHandler(name, handlerFailureThreshhold, callQueues.get(index));
128 handler.start();
129 LOG.debug("Started " + name);
130 handlers.add(handler);
131 }
132 }
133
134
135
136
137 protected class Handler extends Thread {
138
139
140
141 final BlockingQueue<CallRunner> q;
142
143 final double handlerFailureThreshhold;
144
145 Handler(final String name, final double handlerFailureThreshhold,
146 final BlockingQueue<CallRunner> q) {
147 super(name);
148 setDaemon(true);
149 this.q = q;
150 this.handlerFailureThreshhold = handlerFailureThreshhold;
151 }
152
153
154
155
156
157 protected CallRunner getCallRunner() throws InterruptedException {
158 return this.q.take();
159 }
160
161 @Override
162 public void run() {
163 boolean interrupted = false;
164 try {
165 while (running) {
166 try {
167 run(getCallRunner());
168 } catch (InterruptedException e) {
169 interrupted = true;
170 }
171 }
172 } catch (Exception e) {
173 LOG.warn(e);
174 throw e;
175 } finally {
176 if (interrupted) {
177 Thread.currentThread().interrupt();
178 }
179 }
180 }
181
182 private void run(CallRunner cr) {
183 MonitoredRPCHandler status = RpcServer.getStatus();
184 cr.setStatus(status);
185 try {
186 activeHandlerCount.incrementAndGet();
187 cr.run();
188 } catch (Throwable e) {
189 if (e instanceof Error) {
190 int failedCount = failedHandlerCount.incrementAndGet();
191 if (this.handlerFailureThreshhold >= 0 &&
192 failedCount > handlerCount * this.handlerFailureThreshhold) {
193 String message = "Number of failed RpcServer handler runs exceeded threshhold " +
194 this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e);
195 if (abortable != null) {
196 abortable.abort(message, e);
197 } else {
198 LOG.error("Error but can't abort because abortable is null: " +
199 StringUtils.stringifyException(e));
200 throw e;
201 }
202 } else {
203 LOG.warn("Handler errors " + StringUtils.stringifyException(e));
204 }
205 } else {
206 LOG.warn("Handler exception " + StringUtils.stringifyException(e));
207 }
208 } finally {
209 activeHandlerCount.decrementAndGet();
210 }
211 }
212 }
213
214 public static abstract class QueueBalancer {
215
216
217
218 public abstract int getNextQueue();
219 }
220
221 public static QueueBalancer getBalancer(int queueSize) {
222 Preconditions.checkArgument(queueSize > 0, "Queue size is <= 0, must be at least 1");
223 if (queueSize == 1) {
224 return ONE_QUEUE;
225 } else {
226 return new RandomQueueBalancer(queueSize);
227 }
228 }
229
230
231
232
233 private static QueueBalancer ONE_QUEUE = new QueueBalancer() {
234 @Override
235 public int getNextQueue() {
236 return 0;
237 }
238 };
239
240
241
242
243 private static class RandomQueueBalancer extends QueueBalancer {
244 private final int queueSize;
245
246 public RandomQueueBalancer(int queueSize) {
247 this.queueSize = queueSize;
248 }
249
250 public int getNextQueue() {
251 return ThreadLocalRandom.current().nextInt(queueSize);
252 }
253 }
254
255
256
257
258
259 public void resizeQueues(Configuration conf) {
260 currentQueueLimit = conf.getInt("hbase.ipc.server.max.callqueue.length", currentQueueLimit);
261 }
262 }