1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.ipc;
20
21 import com.google.common.annotations.VisibleForTesting;
22 import com.google.protobuf.BlockingRpcChannel;
23 import com.google.protobuf.Descriptors;
24 import com.google.protobuf.Message;
25 import com.google.protobuf.RpcController;
26 import com.google.protobuf.ServiceException;
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.CellScanner;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.ServerName;
33 import org.apache.hadoop.hbase.classification.InterfaceAudience;
34 import org.apache.hadoop.hbase.codec.Codec;
35 import org.apache.hadoop.hbase.codec.KeyValueCodec;
36 import org.apache.hadoop.hbase.security.User;
37 import org.apache.hadoop.hbase.security.UserProvider;
38 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
39 import org.apache.hadoop.hbase.util.Pair;
40 import org.apache.hadoop.hbase.util.PoolMap;
41 import org.apache.hadoop.io.compress.CompressionCodec;
42
43 import java.io.IOException;
44 import java.net.InetSocketAddress;
45 import java.net.SocketAddress;
46
47
48
49
50 @InterfaceAudience.Private
51 public abstract class AbstractRpcClient implements RpcClient {
52 public static final Log LOG = LogFactory.getLog(AbstractRpcClient.class);
53
54 protected final Configuration conf;
55 protected String clusterId;
56 protected final SocketAddress localAddr;
57
58 protected UserProvider userProvider;
59 protected final IPCUtil ipcUtil;
60
61 protected final int minIdleTimeBeforeClose;
62
63 protected final int maxRetries;
64 protected final long failureSleep;
65 protected final boolean tcpNoDelay;
66 protected final boolean tcpKeepAlive;
67 protected final Codec codec;
68 protected final CompressionCodec compressor;
69 protected final boolean fallbackAllowed;
70
71 protected final int connectTO;
72 protected final int readTO;
73 protected final int writeTO;
74
75
76
77
78
79
80
81
82 public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr) {
83 this.userProvider = UserProvider.instantiate(conf);
84 this.localAddr = localAddr;
85 this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
86 this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT;
87 this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
88 HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
89 this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
90 this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
91 this.ipcUtil = new IPCUtil(conf);
92
93 this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000);
94 this.conf = conf;
95 this.codec = getCodec();
96 this.compressor = getCompressor(conf);
97 this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
98 IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
99 this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
100 this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
101 this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
102
103
104 if (LOG.isDebugEnabled()) {
105 LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor +
106 ", tcpKeepAlive=" + this.tcpKeepAlive +
107 ", tcpNoDelay=" + this.tcpNoDelay +
108 ", connectTO=" + this.connectTO +
109 ", readTO=" + this.readTO +
110 ", writeTO=" + this.writeTO +
111 ", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose +
112 ", maxRetries=" + this.maxRetries +
113 ", fallbackAllowed=" + this.fallbackAllowed +
114 ", bind address=" + (this.localAddr != null ? this.localAddr : "null"));
115 }
116 }
117
118 @VisibleForTesting
119 public static String getDefaultCodec(final Configuration c) {
120
121
122
123 return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName());
124 }
125
126
127
128
129
130 Codec getCodec() {
131
132
133 String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
134 if (className == null || className.length() == 0) return null;
135 try {
136 return (Codec)Class.forName(className).newInstance();
137 } catch (Exception e) {
138 throw new RuntimeException("Failed getting codec " + className, e);
139 }
140 }
141
142 @Override
143 public boolean hasCellBlockSupport() {
144 return this.codec != null;
145 }
146
147
148
149
150
151
152 private static CompressionCodec getCompressor(final Configuration conf) {
153 String className = conf.get("hbase.client.rpc.compressor", null);
154 if (className == null || className.isEmpty()) return null;
155 try {
156 return (CompressionCodec)Class.forName(className).newInstance();
157 } catch (Exception e) {
158 throw new RuntimeException("Failed getting compressor " + className, e);
159 }
160 }
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178 protected static PoolMap.PoolType getPoolType(Configuration config) {
179 return PoolMap.PoolType
180 .valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), PoolMap.PoolType.RoundRobin,
181 PoolMap.PoolType.ThreadLocal);
182 }
183
184
185
186
187
188
189
190
191 protected static int getPoolSize(Configuration config) {
192 return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
193 }
194
195
196
197
198
199
200
201
202
203
204
205 Message callBlockingMethod(Descriptors.MethodDescriptor md, PayloadCarryingRpcController pcrc,
206 Message param, Message returnType, final User ticket, final InetSocketAddress isa)
207 throws ServiceException {
208 if (pcrc == null) {
209 pcrc = new PayloadCarryingRpcController();
210 }
211
212 long startTime = 0;
213 if (LOG.isTraceEnabled()) {
214 startTime = EnvironmentEdgeManager.currentTime();
215 }
216 Pair<Message, CellScanner> val;
217 try {
218 val = call(pcrc, md, param, returnType, ticket, isa);
219
220 pcrc.setCellScanner(val.getSecond());
221
222 if (LOG.isTraceEnabled()) {
223 long callTime = EnvironmentEdgeManager.currentTime() - startTime;
224 LOG.trace("Call: " + md.getName() + ", callTime: " + callTime + "ms");
225 }
226 return val.getFirst();
227 } catch (Throwable e) {
228 throw new ServiceException(e);
229 }
230 }
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247 protected abstract Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
248 Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
249 InetSocketAddress isa) throws IOException, InterruptedException;
250
251 @Override
252 public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket,
253 int defaultOperationTimeout) {
254 return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout);
255 }
256
257
258
259
260 @VisibleForTesting
261 public static class BlockingRpcChannelImplementation implements BlockingRpcChannel {
262 private final InetSocketAddress isa;
263 private final AbstractRpcClient rpcClient;
264 private final User ticket;
265 private final int channelOperationTimeout;
266
267
268
269
270 protected BlockingRpcChannelImplementation(final AbstractRpcClient rpcClient,
271 final ServerName sn, final User ticket, int channelOperationTimeout) {
272 this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
273 this.rpcClient = rpcClient;
274 this.ticket = ticket;
275 this.channelOperationTimeout = channelOperationTimeout;
276 }
277
278 @Override
279 public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
280 Message param, Message returnType) throws ServiceException {
281 PayloadCarryingRpcController pcrc;
282 if (controller != null && controller instanceof PayloadCarryingRpcController) {
283 pcrc = (PayloadCarryingRpcController) controller;
284 if (!pcrc.hasCallTimeout()) {
285 pcrc.setCallTimeout(channelOperationTimeout);
286 }
287 } else {
288 pcrc = new PayloadCarryingRpcController();
289 pcrc.setCallTimeout(channelOperationTimeout);
290 }
291
292 return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa);
293 }
294 }
295 }