1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import io.netty.bootstrap.Bootstrap;
21 import io.netty.channel.Channel;
22 import io.netty.channel.ChannelInitializer;
23 import io.netty.channel.ChannelOption;
24 import io.netty.channel.EventLoopGroup;
25 import io.netty.channel.epoll.EpollEventLoopGroup;
26 import io.netty.channel.epoll.EpollSocketChannel;
27 import io.netty.channel.nio.NioEventLoopGroup;
28 import io.netty.channel.socket.SocketChannel;
29 import io.netty.channel.socket.nio.NioSocketChannel;
30 import io.netty.util.HashedWheelTimer;
31 import io.netty.util.Timeout;
32 import io.netty.util.TimerTask;
33 import io.netty.util.concurrent.Future;
34 import io.netty.util.concurrent.GenericFutureListener;
35 import io.netty.util.concurrent.Promise;
36
37 import java.io.IOException;
38 import java.net.InetSocketAddress;
39 import java.net.SocketAddress;
40 import java.nio.ByteBuffer;
41 import java.util.concurrent.ExecutionException;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.TimeoutException;
44 import java.util.concurrent.atomic.AtomicInteger;
45
46 import org.apache.hadoop.conf.Configuration;
47 import org.apache.hadoop.hbase.CellScanner;
48 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
49 import org.apache.hadoop.hbase.HConstants;
50 import org.apache.hadoop.hbase.ServerName;
51 import org.apache.hadoop.hbase.classification.InterfaceAudience;
52 import org.apache.hadoop.hbase.security.User;
53 import org.apache.hadoop.hbase.util.JVM;
54 import org.apache.hadoop.hbase.util.Pair;
55 import org.apache.hadoop.hbase.util.PoolMap;
56 import org.apache.hadoop.hbase.util.Threads;
57
58 import com.google.common.annotations.VisibleForTesting;
59 import com.google.protobuf.Descriptors;
60 import com.google.protobuf.Message;
61 import com.google.protobuf.RpcCallback;
62 import com.google.protobuf.RpcChannel;
63 import com.google.protobuf.RpcController;
64
65
66
67
68 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
69 public class AsyncRpcClient extends AbstractRpcClient {
70
71 public static final String CLIENT_MAX_THREADS = "hbase.rpc.client.threads.max";
72 public static final String USE_NATIVE_TRANSPORT = "hbase.rpc.client.nativetransport";
73 public static final String USE_GLOBAL_EVENT_LOOP_GROUP = "hbase.rpc.client.globaleventloopgroup";
74
75 private static final HashedWheelTimer WHEEL_TIMER =
76 new HashedWheelTimer(Threads.newDaemonThreadFactory("AsyncRpcChannel-timer"),
77 100, TimeUnit.MILLISECONDS);
78
79 private static final ChannelInitializer<SocketChannel> DEFAULT_CHANNEL_INITIALIZER =
80 new ChannelInitializer<SocketChannel>() {
81 @Override
82 protected void initChannel(SocketChannel ch) throws Exception {
83
84 }
85 };
86
87 protected final AtomicInteger callIdCnt = new AtomicInteger();
88
89 private final PoolMap<Integer, AsyncRpcChannel> connections;
90
91 final FailedServers failedServers;
92
93 @VisibleForTesting
94 final Bootstrap bootstrap;
95
96 private final boolean useGlobalEventLoopGroup;
97
98 @VisibleForTesting
99 static Pair<EventLoopGroup, Class<? extends Channel>> GLOBAL_EVENT_LOOP_GROUP;
100
101 private synchronized static Pair<EventLoopGroup, Class<? extends Channel>>
102 getGlobalEventLoopGroup(Configuration conf) {
103 if (GLOBAL_EVENT_LOOP_GROUP == null) {
104 GLOBAL_EVENT_LOOP_GROUP = createEventLoopGroup(conf);
105 if (LOG.isDebugEnabled()) {
106 LOG.debug("Create global event loop group "
107 + GLOBAL_EVENT_LOOP_GROUP.getFirst().getClass().getSimpleName());
108 }
109 }
110 return GLOBAL_EVENT_LOOP_GROUP;
111 }
112
113 private static Pair<EventLoopGroup, Class<? extends Channel>> createEventLoopGroup(
114 Configuration conf) {
115
116 int maxThreads = conf.getInt(CLIENT_MAX_THREADS, 0);
117
118
119
120 boolean epollEnabled = conf.getBoolean(USE_NATIVE_TRANSPORT, false);
121
122
123 if (epollEnabled && JVM.isLinux() && JVM.isAmd64()) {
124 if (LOG.isDebugEnabled()) {
125 LOG.debug("Create EpollEventLoopGroup with maxThreads = " + maxThreads);
126 }
127 return new Pair<EventLoopGroup, Class<? extends Channel>>(new EpollEventLoopGroup(maxThreads,
128 Threads.newDaemonThreadFactory("AsyncRpcChannel")), EpollSocketChannel.class);
129 } else {
130 if (LOG.isDebugEnabled()) {
131 LOG.debug("Create NioEventLoopGroup with maxThreads = " + maxThreads);
132 }
133 return new Pair<EventLoopGroup, Class<? extends Channel>>(new NioEventLoopGroup(maxThreads,
134 Threads.newDaemonThreadFactory("AsyncRpcChannel")), NioSocketChannel.class);
135 }
136 }
137
138
139
140
141
142
143
144
145
146 @VisibleForTesting
147 AsyncRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress,
148 ChannelInitializer<SocketChannel> channelInitializer) {
149 super(configuration, clusterId, localAddress);
150
151 if (LOG.isDebugEnabled()) {
152 LOG.debug("Starting async Hbase RPC client");
153 }
154
155 Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass;
156 this.useGlobalEventLoopGroup = conf.getBoolean(USE_GLOBAL_EVENT_LOOP_GROUP, true);
157 if (useGlobalEventLoopGroup) {
158 eventLoopGroupAndChannelClass = getGlobalEventLoopGroup(configuration);
159 } else {
160 eventLoopGroupAndChannelClass = createEventLoopGroup(configuration);
161 }
162 if (LOG.isDebugEnabled()) {
163 LOG.debug("Use " + (useGlobalEventLoopGroup ? "global" : "individual") + " event loop group "
164 + eventLoopGroupAndChannelClass.getFirst().getClass().getSimpleName());
165 }
166
167 this.connections = new PoolMap<>(getPoolType(configuration), getPoolSize(configuration));
168 this.failedServers = new FailedServers(configuration);
169
170 int operationTimeout = configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
171 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
172
173
174 this.bootstrap = new Bootstrap();
175 bootstrap.group(eventLoopGroupAndChannelClass.getFirst())
176 .channel(eventLoopGroupAndChannelClass.getSecond())
177 .option(ChannelOption.TCP_NODELAY, tcpNoDelay)
178 .option(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
179 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, operationTimeout);
180 if (channelInitializer == null) {
181 channelInitializer = DEFAULT_CHANNEL_INITIALIZER;
182 }
183 bootstrap.handler(channelInitializer);
184 if (localAddress != null) {
185 bootstrap.localAddress(localAddress);
186 }
187 }
188
189
190
191
192
193
194
195
196 public AsyncRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress) {
197 this(configuration, clusterId, localAddress, null);
198 }
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214 @Override
215 protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
216 Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
217 InetSocketAddress addr) throws IOException, InterruptedException {
218 if (pcrc == null) {
219 pcrc = new PayloadCarryingRpcController();
220 }
221 final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket);
222
223 Promise<Message> promise = connection.callMethod(md, pcrc, param, returnType);
224 long timeout = pcrc.hasCallTimeout() ? pcrc.getCallTimeout() : 0;
225 try {
226 Message response = timeout > 0 ? promise.get(timeout, TimeUnit.MILLISECONDS) : promise.get();
227 return new Pair<>(response, pcrc.cellScanner());
228 } catch (ExecutionException e) {
229 if (e.getCause() instanceof IOException) {
230 throw (IOException) e.getCause();
231 } else {
232 throw new IOException(e.getCause());
233 }
234 } catch (TimeoutException e) {
235 throw new CallTimeoutException(promise.toString());
236 }
237 }
238
239
240
241
242 private void callMethod(Descriptors.MethodDescriptor md, final PayloadCarryingRpcController pcrc,
243 Message param, Message returnType, User ticket, InetSocketAddress addr,
244 final RpcCallback<Message> done) {
245 final AsyncRpcChannel connection;
246 try {
247 connection = createRpcChannel(md.getService().getName(), addr, ticket);
248
249 connection.callMethod(md, pcrc, param, returnType).addListener(
250 new GenericFutureListener<Future<Message>>() {
251 @Override
252 public void operationComplete(Future<Message> future) throws Exception {
253 if(!future.isSuccess()){
254 Throwable cause = future.cause();
255 if (cause instanceof IOException) {
256 pcrc.setFailed((IOException) cause);
257 }else{
258 pcrc.setFailed(new IOException(cause));
259 }
260 }else{
261 try {
262 done.run(future.get());
263 }catch (ExecutionException e){
264 Throwable cause = e.getCause();
265 if (cause instanceof IOException) {
266 pcrc.setFailed((IOException) cause);
267 }else{
268 pcrc.setFailed(new IOException(cause));
269 }
270 }catch (InterruptedException e){
271 pcrc.setFailed(new IOException(e));
272 }
273 }
274 }
275 });
276 } catch (StoppedRpcClientException|FailedServerException e) {
277 pcrc.setFailed(e);
278 }
279 }
280
281 private boolean closed = false;
282
283
284
285
286 public void close() {
287 if (LOG.isDebugEnabled()) {
288 LOG.debug("Stopping async HBase RPC client");
289 }
290
291 synchronized (connections) {
292 if (closed) {
293 return;
294 }
295 closed = true;
296 for (AsyncRpcChannel conn : connections.values()) {
297 conn.close(null);
298 }
299 }
300
301 if (!useGlobalEventLoopGroup) {
302 bootstrap.group().shutdownGracefully();
303 }
304 }
305
306
307
308
309
310
311
312
313 public CellScanner createCellScanner(byte[] cellBlock) throws IOException {
314 return ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);
315 }
316
317
318
319
320
321
322
323
324 public ByteBuffer buildCellBlock(CellScanner cells) throws IOException {
325 return ipcUtil.buildCellBlock(this.codec, this.compressor, cells);
326 }
327
328
329
330
331
332
333
334
335
336
337
338 private AsyncRpcChannel createRpcChannel(String serviceName, InetSocketAddress location,
339 User ticket) throws StoppedRpcClientException, FailedServerException {
340
341 if (this.failedServers.isFailedServer(location)) {
342 if (LOG.isDebugEnabled()) {
343 LOG.debug("Not trying to connect to " + location +
344 " this server is in the failed servers list");
345 }
346 throw new FailedServerException(
347 "This server is in the failed servers list: " + location);
348 }
349
350 int hashCode = ConnectionId.hashCode(ticket,serviceName,location);
351
352 AsyncRpcChannel rpcChannel;
353 synchronized (connections) {
354 if (closed) {
355 throw new StoppedRpcClientException();
356 }
357 rpcChannel = connections.get(hashCode);
358 if (rpcChannel == null || !rpcChannel.isAlive()) {
359 rpcChannel = new AsyncRpcChannel(this.bootstrap, this, ticket, serviceName, location);
360 connections.put(hashCode, rpcChannel);
361 }
362 }
363
364 return rpcChannel;
365 }
366
367
368
369
370
371
372
373
374
375
376
377 @Override
378 public void cancelConnections(ServerName sn) {
379 synchronized (connections) {
380 for (AsyncRpcChannel rpcChannel : connections.values()) {
381 if (rpcChannel.isAlive() &&
382 rpcChannel.address.getPort() == sn.getPort() &&
383 rpcChannel.address.getHostName().contentEquals(sn.getHostname())) {
384 LOG.info("The server on " + sn.toString() +
385 " is dead - stopping the connection " + rpcChannel.toString());
386 rpcChannel.close(null);
387 }
388 }
389 }
390 }
391
392
393
394
395 public void removeConnection(AsyncRpcChannel connection) {
396 int connectionHashCode = connection.getConnectionHashCode();
397 synchronized (connections) {
398
399
400 AsyncRpcChannel connectionInPool = this.connections.get(connectionHashCode);
401 if (connectionInPool == connection) {
402 this.connections.remove(connectionHashCode);
403 } else if (LOG.isDebugEnabled()) {
404 LOG.debug(String.format("%s already removed, expected instance %08x, actual %08x",
405 connection.toString(), System.identityHashCode(connection),
406 System.identityHashCode(connectionInPool)));
407 }
408 }
409 }
410
411
412
413
414
415
416
417
418
419
420
421
422 public RpcChannel createRpcChannel(final ServerName sn, final User user, int rpcTimeout) {
423 return new RpcChannelImplementation(this, sn, user, rpcTimeout);
424 }
425
426
427
428
429 @VisibleForTesting
430 public static class RpcChannelImplementation implements RpcChannel {
431 private final InetSocketAddress isa;
432 private final AsyncRpcClient rpcClient;
433 private final User ticket;
434 private final int channelOperationTimeout;
435
436
437
438
439 protected RpcChannelImplementation(final AsyncRpcClient rpcClient,
440 final ServerName sn, final User ticket, int channelOperationTimeout) {
441 this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
442 this.rpcClient = rpcClient;
443 this.ticket = ticket;
444 this.channelOperationTimeout = channelOperationTimeout;
445 }
446
447 @Override
448 public void callMethod(Descriptors.MethodDescriptor md, RpcController controller,
449 Message param, Message returnType, RpcCallback<Message> done) {
450 PayloadCarryingRpcController pcrc;
451 if (controller != null) {
452 pcrc = (PayloadCarryingRpcController) controller;
453 if (!pcrc.hasCallTimeout()) {
454 pcrc.setCallTimeout(channelOperationTimeout);
455 }
456 } else {
457 pcrc = new PayloadCarryingRpcController();
458 pcrc.setCallTimeout(channelOperationTimeout);
459 }
460
461 this.rpcClient.callMethod(md, pcrc, param, returnType, this.ticket, this.isa, done);
462 }
463 }
464
465 Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
466 return WHEEL_TIMER.newTimeout(task, delay, unit);
467 }
468 }