1 package org.apache.hadoop.hbase.ipc;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 import java.net.InetSocketAddress;
20 import java.nio.channels.ClosedChannelException;
21
22 import org.apache.hadoop.hbase.CallDroppedException;
23 import org.apache.hadoop.hbase.CellScanner;
24 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.hbase.classification.InterfaceStability;
27 import org.apache.hadoop.hbase.ipc.RpcServer.Call;
28 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
29 import org.apache.hadoop.hbase.util.Pair;
30 import org.apache.hadoop.security.UserGroupInformation;
31 import org.apache.hadoop.util.StringUtils;
32 import org.apache.htrace.Trace;
33 import org.apache.htrace.TraceScope;
34
35 import com.google.protobuf.Message;
36
37
38
39
40
41
42 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
43 @InterfaceStability.Evolving
44 public class CallRunner {
45 private static final CallDroppedException CALL_DROPPED_EXCEPTION
46 = new CallDroppedException();
47 private Call call;
48 private RpcServerInterface rpcServer;
49 private MonitoredRPCHandler status;
50
51
52
53
54
55
56
57 CallRunner(final RpcServerInterface rpcServer, final Call call) {
58 this.call = call;
59 this.rpcServer = rpcServer;
60
61 if (call != null && rpcServer != null) {
62 this.rpcServer.addCallSize(call.getSize());
63 }
64 }
65
66 public Call getCall() {
67 return call;
68 }
69
70 public void setStatus(MonitoredRPCHandler status) {
71 this.status = status;
72 }
73
74
75
76
77 private void cleanup() {
78 this.call = null;
79 this.rpcServer = null;
80 }
81
82 public void run() {
83 try {
84 if (!call.connection.channel.isOpen()) {
85 if (RpcServer.LOG.isDebugEnabled()) {
86 RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " + call);
87 }
88 return;
89 }
90 this.status.setStatus("Setting up call");
91 this.status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort());
92 if (RpcServer.LOG.isTraceEnabled()) {
93 UserGroupInformation remoteUser = call.connection.user;
94 RpcServer.LOG.trace(call.toShortString() + " executing as " +
95 ((remoteUser == null) ? "NULL principal" : remoteUser.getUserName()));
96 }
97 Throwable errorThrowable = null;
98 String error = null;
99 Pair<Message, CellScanner> resultPair = null;
100 RpcServer.CurCall.set(call);
101 TraceScope traceScope = null;
102 try {
103 if (!this.rpcServer.isStarted()) {
104 InetSocketAddress address = rpcServer.getListenerAddress();
105 throw new ServerNotRunningYetException("Server " +
106 (address != null ? address : "(channel closed)") + " is not running yet");
107 }
108 if (call.tinfo != null) {
109 traceScope = Trace.startSpan(call.toTraceString(), call.tinfo);
110 }
111
112 resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
113 call.timestamp, this.status);
114 } catch (Throwable e) {
115 RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e);
116 errorThrowable = e;
117 error = StringUtils.stringifyException(e);
118 if (e instanceof Error) {
119 throw (Error)e;
120 }
121 } finally {
122 if (traceScope != null) {
123 traceScope.close();
124 }
125 RpcServer.CurCall.set(null);
126 }
127
128
129 if (!call.isDelayed() || !call.isReturnValueDelayed()) {
130 Message param = resultPair != null ? resultPair.getFirst() : null;
131 CellScanner cells = resultPair != null ? resultPair.getSecond() : null;
132 call.setResponse(param, cells, errorThrowable, error);
133 }
134 call.sendResponseIfReady();
135 this.status.markComplete("Sent response");
136 this.status.pause("Waiting for a call");
137 } catch (OutOfMemoryError e) {
138 if (this.rpcServer.getErrorHandler() != null) {
139 if (this.rpcServer.getErrorHandler().checkOOME(e)) {
140 RpcServer.LOG.info(Thread.currentThread().getName() + ": exiting on OutOfMemoryError");
141 return;
142 }
143 } else {
144
145 throw e;
146 }
147 } catch (ClosedChannelException cce) {
148 InetSocketAddress address = rpcServer.getListenerAddress();
149 RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " +
150 "this means that the server " + (address != null ? address : "(channel closed)") +
151 " was processing a request but the client went away. The error message was: " +
152 cce.getMessage());
153 } catch (Exception e) {
154 RpcServer.LOG.warn(Thread.currentThread().getName()
155 + ": caught: " + StringUtils.stringifyException(e));
156 } finally {
157
158 this.rpcServer.addCallSize(call.getSize() * -1);
159 cleanup();
160 }
161 }
162
163
164
165
166 public void drop() {
167 try {
168 if (!call.connection.channel.isOpen()) {
169 if (RpcServer.LOG.isDebugEnabled()) {
170 RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " + call);
171 }
172 return;
173 }
174
175
176 InetSocketAddress address = rpcServer.getListenerAddress();
177 call.setResponse(null, null, CALL_DROPPED_EXCEPTION, "Call dropped, server "
178 + (address != null ? address : "(channel closed)") + " is overloaded, please retry.");
179 call.sendResponseIfReady();
180 } catch (ClosedChannelException cce) {
181 InetSocketAddress address = rpcServer.getListenerAddress();
182 RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " +
183 "this means that the server " + (address != null ? address : "(channel closed)") +
184 " was processing a request but the client went away. The error message was: " +
185 cce.getMessage());
186 } catch (Exception e) {
187 RpcServer.LOG.warn(Thread.currentThread().getName()
188 + ": caught: " + StringUtils.stringifyException(e));
189 } finally {
190
191 this.rpcServer.addCallSize(call.getSize() * -1);
192 cleanup();
193 }
194 }
195 }