1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.ipc;
21
22 import com.google.protobuf.Descriptors.MethodDescriptor;
23 import com.google.protobuf.Message;
24 import com.google.protobuf.Message.Builder;
25 import com.google.protobuf.RpcCallback;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.CellScanner;
28 import org.apache.hadoop.hbase.DoNotRetryIOException;
29 import org.apache.hadoop.hbase.HConstants;
30 import org.apache.hadoop.hbase.ServerName;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.hbase.codec.Codec;
33 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
34 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
35 import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
36 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
37 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
38 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
39 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
40 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
41 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
42 import org.apache.hadoop.hbase.protobuf.generated.TracingProtos.RPCTInfo;
43 import org.apache.hadoop.hbase.security.AuthMethod;
44 import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
45 import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
46 import org.apache.hadoop.hbase.security.SecurityInfo;
47 import org.apache.hadoop.hbase.security.User;
48 import org.apache.hadoop.hbase.security.UserProvider;
49 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
50 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
51 import org.apache.hadoop.hbase.util.ExceptionUtil;
52 import org.apache.hadoop.hbase.util.Pair;
53 import org.apache.hadoop.hbase.util.PoolMap;
54 import org.apache.hadoop.io.IOUtils;
55 import org.apache.hadoop.io.Text;
56 import org.apache.hadoop.io.compress.CompressionCodec;
57 import org.apache.hadoop.ipc.RemoteException;
58 import org.apache.hadoop.net.NetUtils;
59 import org.apache.hadoop.security.SecurityUtil;
60 import org.apache.hadoop.security.UserGroupInformation;
61 import org.apache.hadoop.security.token.Token;
62 import org.apache.hadoop.security.token.TokenIdentifier;
63 import org.apache.hadoop.security.token.TokenSelector;
64 import org.apache.htrace.Span;
65 import org.apache.htrace.Trace;
66 import org.apache.htrace.TraceScope;
67
68 import javax.net.SocketFactory;
69 import javax.security.sasl.SaslException;
70 import java.io.BufferedInputStream;
71 import java.io.BufferedOutputStream;
72 import java.io.Closeable;
73 import java.io.DataInputStream;
74 import java.io.DataOutputStream;
75 import java.io.IOException;
76 import java.io.InputStream;
77 import java.io.InterruptedIOException;
78 import java.io.OutputStream;
79 import java.net.ConnectException;
80 import java.net.InetSocketAddress;
81 import java.net.Socket;
82 import java.net.SocketAddress;
83 import java.net.SocketTimeoutException;
84 import java.net.UnknownHostException;
85 import java.nio.ByteBuffer;
86 import java.security.PrivilegedExceptionAction;
87 import java.util.HashMap;
88 import java.util.HashSet;
89 import java.util.Iterator;
90 import java.util.Map;
91 import java.util.Map.Entry;
92 import java.util.Random;
93 import java.util.Set;
94 import java.util.concurrent.ArrayBlockingQueue;
95 import java.util.concurrent.BlockingQueue;
96 import java.util.concurrent.ConcurrentSkipListMap;
97 import java.util.concurrent.atomic.AtomicBoolean;
98 import java.util.concurrent.atomic.AtomicInteger;
99
100
101
102
103
104 @InterfaceAudience.Private
105 public class RpcClientImpl extends AbstractRpcClient {
106 protected final AtomicInteger callIdCnt = new AtomicInteger();
107
108 protected final PoolMap<ConnectionId, Connection> connections;
109
110 protected final AtomicBoolean running = new AtomicBoolean(true);
111
112 protected final FailedServers failedServers;
113
114 protected final SocketFactory socketFactory;
115
116 protected final static Map<AuthenticationProtos.TokenIdentifier.Kind,
117 TokenSelector<? extends TokenIdentifier>> tokenHandlers =
118 new HashMap<AuthenticationProtos.TokenIdentifier.Kind,
119 TokenSelector<? extends TokenIdentifier>>();
120 static {
121 tokenHandlers.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
122 new AuthenticationTokenSelector());
123 }
124
125
126
127
128
129 protected Connection createConnection(ConnectionId remoteId, final Codec codec,
130 final CompressionCodec compressor)
131 throws IOException {
132 return new Connection(remoteId, codec, compressor);
133 }
134
135
136
137
138 private static class CallFuture {
139 final Call call;
140 final int priority;
141 final Span span;
142
143
144 final static CallFuture DEATH_PILL = new CallFuture(null, -1, null);
145
146 CallFuture(Call call, int priority, Span span) {
147 this.call = call;
148 this.priority = priority;
149 this.span = span;
150 }
151 }
152
153
154
155
156 protected class Connection extends Thread {
157 private ConnectionHeader header;
158 protected ConnectionId remoteId;
159 protected Socket socket = null;
160 protected DataInputStream in;
161 protected DataOutputStream out;
162 private Object outLock = new Object();
163 private InetSocketAddress server;
164 private String serverPrincipal;
165 private AuthMethod authMethod;
166 private boolean useSasl;
167 private Token<? extends TokenIdentifier> token;
168 private HBaseSaslRpcClient saslRpcClient;
169 private int reloginMaxBackoff;
170 private final Codec codec;
171 private final CompressionCodec compressor;
172
173
174 protected final ConcurrentSkipListMap<Integer, Call> calls =
175 new ConcurrentSkipListMap<Integer, Call>();
176
177 protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean();
178 protected final CallSender callSender;
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199 private class CallSender extends Thread implements Closeable {
200 protected final BlockingQueue<CallFuture> callsToWrite;
201
202
203 public CallFuture sendCall(Call call, int priority, Span span)
204 throws InterruptedException, IOException {
205 CallFuture cts = new CallFuture(call, priority, span);
206 if (!callsToWrite.offer(cts)) {
207 throw new IOException("Can't add the call " + call.id +
208 " to the write queue. callsToWrite.size()=" + callsToWrite.size());
209 }
210 checkIsOpen();
211
212 return cts;
213 }
214
215 @Override
216 public void close(){
217 assert shouldCloseConnection.get();
218 callsToWrite.offer(CallFuture.DEATH_PILL);
219
220
221 }
222
223 CallSender(String name, Configuration conf) {
224 int queueSize = conf.getInt("hbase.ipc.client.write.queueSize", 1000);
225 callsToWrite = new ArrayBlockingQueue<CallFuture>(queueSize);
226 setDaemon(true);
227 setName(name + " - writer");
228 }
229
230 public void remove(CallFuture cts){
231 callsToWrite.remove(cts);
232
233
234
235 calls.remove(cts.call.id);
236 cts.call.callComplete();
237 }
238
239
240
241
242 @Override
243 public void run() {
244 while (!shouldCloseConnection.get()) {
245 CallFuture cts = null;
246 try {
247 cts = callsToWrite.take();
248 } catch (InterruptedException e) {
249 markClosed(new InterruptedIOException());
250 }
251
252 if (cts == null || cts == CallFuture.DEATH_PILL) {
253 assert shouldCloseConnection.get();
254 break;
255 }
256
257 if (cts.call.done) {
258 continue;
259 }
260
261 if (cts.call.checkAndSetTimeout()) {
262 continue;
263 }
264
265 try {
266 Connection.this.tracedWriteRequest(cts.call, cts.priority, cts.span);
267 } catch (IOException e) {
268 if (LOG.isDebugEnabled()) {
269 LOG.debug("call write error for call #" + cts.call.id
270 + ", message =" + e.getMessage());
271 }
272 cts.call.setException(e);
273 markClosed(e);
274 }
275 }
276
277 cleanup();
278 }
279
280
281
282
283 private void cleanup() {
284 assert shouldCloseConnection.get();
285
286 IOException ie = new ConnectionClosingException("Connection to " + server + " is closing.");
287 while (true) {
288 CallFuture cts = callsToWrite.poll();
289 if (cts == null) {
290 break;
291 }
292 if (cts.call != null && !cts.call.done) {
293 cts.call.setException(ie);
294 }
295 }
296 }
297 }
298
299 Connection(ConnectionId remoteId, final Codec codec, final CompressionCodec compressor)
300 throws IOException {
301 if (remoteId.getAddress().isUnresolved()) {
302 throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName());
303 }
304 this.server = remoteId.getAddress();
305 this.codec = codec;
306 this.compressor = compressor;
307
308 UserGroupInformation ticket = remoteId.getTicket().getUGI();
309 SecurityInfo securityInfo = SecurityInfo.getInfo(remoteId.getServiceName());
310 this.useSasl = userProvider.isHBaseSecurityEnabled();
311 if (useSasl && securityInfo != null) {
312 AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
313 if (tokenKind != null) {
314 TokenSelector<? extends TokenIdentifier> tokenSelector =
315 tokenHandlers.get(tokenKind);
316 if (tokenSelector != null) {
317 token = tokenSelector.selectToken(new Text(clusterId),
318 ticket.getTokens());
319 } else if (LOG.isDebugEnabled()) {
320 LOG.debug("No token selector found for type "+tokenKind);
321 }
322 }
323 String serverKey = securityInfo.getServerPrincipal();
324 if (serverKey == null) {
325 throw new IOException(
326 "Can't obtain server Kerberos config key from SecurityInfo");
327 }
328 serverPrincipal = SecurityUtil.getServerPrincipal(
329 conf.get(serverKey), server.getAddress().getCanonicalHostName().toLowerCase());
330 if (LOG.isDebugEnabled()) {
331 LOG.debug("RPC Server Kerberos principal name for service="
332 + remoteId.getServiceName() + " is " + serverPrincipal);
333 }
334 }
335
336 if (!useSasl) {
337 authMethod = AuthMethod.SIMPLE;
338 } else if (token != null) {
339 authMethod = AuthMethod.DIGEST;
340 } else {
341 authMethod = AuthMethod.KERBEROS;
342 }
343
344 if (LOG.isDebugEnabled()) {
345 LOG.debug("Use " + authMethod + " authentication for service " + remoteId.serviceName +
346 ", sasl=" + useSasl);
347 }
348 reloginMaxBackoff = conf.getInt("hbase.security.relogin.maxbackoff", 5000);
349 this.remoteId = remoteId;
350
351 ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
352 builder.setServiceName(remoteId.getServiceName());
353 UserInformation userInfoPB = getUserInfo(ticket);
354 if (userInfoPB != null) {
355 builder.setUserInfo(userInfoPB);
356 }
357 if (this.codec != null) {
358 builder.setCellBlockCodecClass(this.codec.getClass().getCanonicalName());
359 }
360 if (this.compressor != null) {
361 builder.setCellBlockCompressorClass(this.compressor.getClass().getCanonicalName());
362 }
363 builder.setVersionInfo(ProtobufUtil.getVersionInfo());
364 this.header = builder.build();
365
366 this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
367 remoteId.getAddress().toString() +
368 ((ticket==null)?" from an unknown user": (" from "
369 + ticket.getUserName())));
370 this.setDaemon(true);
371
372 if (conf.getBoolean(SPECIFIC_WRITE_THREAD, false)) {
373 callSender = new CallSender(getName(), conf);
374 callSender.start();
375 } else {
376 callSender = null;
377 }
378 }
379
380 private UserInformation getUserInfo(UserGroupInformation ugi) {
381 if (ugi == null || authMethod == AuthMethod.DIGEST) {
382
383 return null;
384 }
385 UserInformation.Builder userInfoPB = UserInformation.newBuilder();
386 if (authMethod == AuthMethod.KERBEROS) {
387
388 userInfoPB.setEffectiveUser(ugi.getUserName());
389 } else if (authMethod == AuthMethod.SIMPLE) {
390
391 userInfoPB.setEffectiveUser(ugi.getUserName());
392 if (ugi.getRealUser() != null) {
393 userInfoPB.setRealUser(ugi.getRealUser().getUserName());
394 }
395 }
396 return userInfoPB.build();
397 }
398
399 protected synchronized void setupConnection() throws IOException {
400 short ioFailures = 0;
401 short timeoutFailures = 0;
402 while (true) {
403 try {
404 this.socket = socketFactory.createSocket();
405 this.socket.setTcpNoDelay(tcpNoDelay);
406 this.socket.setKeepAlive(tcpKeepAlive);
407 if (localAddr != null) {
408 this.socket.bind(localAddr);
409 }
410 NetUtils.connect(this.socket, remoteId.getAddress(), connectTO);
411 this.socket.setSoTimeout(readTO);
412 return;
413 } catch (SocketTimeoutException toe) {
414
415
416
417 handleConnectionFailure(timeoutFailures++, maxRetries, toe);
418 } catch (IOException ie) {
419 handleConnectionFailure(ioFailures++, maxRetries, ie);
420 }
421 }
422 }
423
424 protected synchronized void closeConnection() {
425 if (socket == null) {
426 return;
427 }
428
429
430 try {
431 if (socket.getOutputStream() != null) {
432 socket.getOutputStream().close();
433 }
434 } catch (IOException ignored) {
435 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
436 }
437 try {
438 if (socket.getInputStream() != null) {
439 socket.getInputStream().close();
440 }
441 } catch (IOException ignored) {
442 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
443 }
444 try {
445 if (socket.getChannel() != null) {
446 socket.getChannel().close();
447 }
448 } catch (IOException ignored) {
449 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
450 }
451 try {
452 socket.close();
453 } catch (IOException e) {
454 LOG.warn("Not able to close a socket", e);
455 }
456
457
458
459 socket = null;
460 }
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477 private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe)
478 throws IOException {
479 closeConnection();
480
481
482 if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) {
483 throw ioe;
484 }
485
486
487 try {
488 Thread.sleep(failureSleep);
489 } catch (InterruptedException ie) {
490 ExceptionUtil.rethrowIfInterrupt(ie);
491 }
492
493 LOG.info("Retrying connect to server: " + remoteId.getAddress() +
494 " after sleeping " + failureSleep + "ms. Already tried " + curRetries +
495 " time(s).");
496 }
497
498
499
500
501 private void checkIsOpen() throws IOException {
502 if (shouldCloseConnection.get()) {
503 throw new ConnectionClosingException(getName() + " is closing");
504 }
505 }
506
507
508
509
510
511
512
513 protected synchronized boolean waitForWork() throws InterruptedException {
514
515
516 long waitUntil = EnvironmentEdgeManager.currentTime() + minIdleTimeBeforeClose;
517
518 while (true) {
519 if (shouldCloseConnection.get()) {
520 return false;
521 }
522
523 if (!running.get()) {
524 markClosed(new IOException("stopped with " + calls.size() + " pending request(s)"));
525 return false;
526 }
527
528 if (!calls.isEmpty()) {
529
530
531 return true;
532 }
533
534 if (EnvironmentEdgeManager.currentTime() >= waitUntil) {
535
536
537
538
539 markClosed(new IOException(
540 "idle connection closed with " + calls.size() + " pending request(s)"));
541 return false;
542 }
543
544 wait(Math.min(minIdleTimeBeforeClose, 1000));
545 }
546 }
547
548 public InetSocketAddress getRemoteAddress() {
549 return remoteId.getAddress();
550 }
551
552 @Override
553 public void run() {
554 if (LOG.isTraceEnabled()) {
555 LOG.trace(getName() + ": starting, connections " + connections.size());
556 }
557
558 try {
559 while (waitForWork()) {
560 readResponse();
561 }
562 } catch (InterruptedException t) {
563 if (LOG.isTraceEnabled()) {
564 LOG.trace(getName() + ": interrupted while waiting for call responses");
565 }
566 markClosed(ExceptionUtil.asInterrupt(t));
567 } catch (Throwable t) {
568 if (LOG.isDebugEnabled()) {
569 LOG.debug(getName() + ": unexpected throwable while waiting for call responses", t);
570 }
571 markClosed(new IOException("Unexpected throwable while waiting call responses", t));
572 }
573
574 close();
575
576 if (LOG.isTraceEnabled()) {
577 LOG.trace(getName() + ": stopped, connections " + connections.size());
578 }
579 }
580
581 private synchronized void disposeSasl() {
582 if (saslRpcClient != null) {
583 try {
584 saslRpcClient.dispose();
585 saslRpcClient = null;
586 } catch (IOException ioe) {
587 LOG.error("Error disposing of SASL client", ioe);
588 }
589 }
590 }
591
592 private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
593 UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
594 UserGroupInformation currentUser =
595 UserGroupInformation.getCurrentUser();
596 UserGroupInformation realUser = currentUser.getRealUser();
597 return authMethod == AuthMethod.KERBEROS &&
598 loginUser != null &&
599
600 loginUser.hasKerberosCredentials() &&
601
602
603 (loginUser.equals(currentUser) || loginUser.equals(realUser));
604 }
605
606 private synchronized boolean setupSaslConnection(final InputStream in2,
607 final OutputStream out2) throws IOException {
608 saslRpcClient = new HBaseSaslRpcClient(authMethod, token, serverPrincipal, fallbackAllowed,
609 conf.get("hbase.rpc.protection",
610 QualityOfProtection.AUTHENTICATION.name().toLowerCase()));
611 return saslRpcClient.saslConnect(in2, out2);
612 }
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632 private synchronized void handleSaslConnectionFailure(
633 final int currRetries,
634 final int maxRetries, final Exception ex, final Random rand,
635 final UserGroupInformation user)
636 throws IOException, InterruptedException{
637 user.doAs(new PrivilegedExceptionAction<Object>() {
638 @Override
639 public Object run() throws IOException, InterruptedException {
640 closeConnection();
641 if (shouldAuthenticateOverKrb()) {
642 if (currRetries < maxRetries) {
643 if (LOG.isDebugEnabled()) {
644 LOG.debug("Exception encountered while connecting to " +
645 "the server : " + ex);
646 }
647
648 if (UserGroupInformation.isLoginKeytabBased()) {
649 UserGroupInformation.getLoginUser().reloginFromKeytab();
650 } else {
651 UserGroupInformation.getLoginUser().reloginFromTicketCache();
652 }
653 disposeSasl();
654
655
656
657
658 Thread.sleep((rand.nextInt(reloginMaxBackoff) + 1));
659 return null;
660 } else {
661 String msg = "Couldn't setup connection for " +
662 UserGroupInformation.getLoginUser().getUserName() +
663 " to " + serverPrincipal;
664 LOG.warn(msg);
665 throw (IOException) new IOException(msg).initCause(ex);
666 }
667 } else {
668 LOG.warn("Exception encountered while connecting to " +
669 "the server : " + ex);
670 }
671 if (ex instanceof RemoteException) {
672 throw (RemoteException)ex;
673 }
674 if (ex instanceof SaslException) {
675 String msg = "SASL authentication failed." +
676 " The most likely cause is missing or invalid credentials." +
677 " Consider 'kinit'.";
678 LOG.fatal(msg, ex);
679 throw new RuntimeException(msg, ex);
680 }
681 throw new IOException(ex);
682 }
683 });
684 }
685
686 protected synchronized void setupIOstreams() throws IOException {
687 if (socket != null) {
688
689 return;
690 }
691
692 if (shouldCloseConnection.get()){
693 throw new ConnectionClosingException("This connection is closing");
694 }
695
696 if (failedServers.isFailedServer(remoteId.getAddress())) {
697 if (LOG.isDebugEnabled()) {
698 LOG.debug("Not trying to connect to " + server +
699 " this server is in the failed servers list");
700 }
701 IOException e = new FailedServerException(
702 "This server is in the failed servers list: " + server);
703 markClosed(e);
704 close();
705 throw e;
706 }
707
708 try {
709 if (LOG.isDebugEnabled()) {
710 LOG.debug("Connecting to " + server);
711 }
712 short numRetries = 0;
713 final short MAX_RETRIES = 5;
714 Random rand = null;
715 while (true) {
716 setupConnection();
717 InputStream inStream = NetUtils.getInputStream(socket);
718
719 OutputStream outStream = NetUtils.getOutputStream(socket, writeTO);
720
721 writeConnectionHeaderPreamble(outStream);
722 if (useSasl) {
723 final InputStream in2 = inStream;
724 final OutputStream out2 = outStream;
725 UserGroupInformation ticket = remoteId.getTicket().getUGI();
726 if (authMethod == AuthMethod.KERBEROS) {
727 if (ticket != null && ticket.getRealUser() != null) {
728 ticket = ticket.getRealUser();
729 }
730 }
731 boolean continueSasl;
732 if (ticket == null) throw new FatalConnectionException("ticket/user is null");
733 try {
734 continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
735 @Override
736 public Boolean run() throws IOException {
737 return setupSaslConnection(in2, out2);
738 }
739 });
740 } catch (Exception ex) {
741 ExceptionUtil.rethrowIfInterrupt(ex);
742 if (rand == null) {
743 rand = new Random();
744 }
745 handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, rand, ticket);
746 continue;
747 }
748 if (continueSasl) {
749
750 inStream = saslRpcClient.getInputStream(inStream);
751 outStream = saslRpcClient.getOutputStream(outStream);
752 } else {
753
754 authMethod = AuthMethod.SIMPLE;
755 useSasl = false;
756 }
757 }
758 this.in = new DataInputStream(new BufferedInputStream(inStream));
759 synchronized (this.outLock) {
760 this.out = new DataOutputStream(new BufferedOutputStream(outStream));
761 }
762
763 writeConnectionHeader();
764
765
766 start();
767 return;
768 }
769 } catch (Throwable t) {
770 IOException e = ExceptionUtil.asInterrupt(t);
771 if (e == null) {
772 failedServers.addToFailedServers(remoteId.address);
773 if (t instanceof LinkageError) {
774
775 e = new DoNotRetryIOException(t);
776 } else if (t instanceof IOException) {
777 e = (IOException) t;
778 } else {
779 e = new IOException("Could not set up IO Streams to " + server, t);
780 }
781 }
782 markClosed(e);
783 close();
784 throw e;
785 }
786 }
787
788
789
790
791 private void writeConnectionHeaderPreamble(OutputStream outStream) throws IOException {
792
793
794
795
796
797 int rpcHeaderLen = HConstants.RPC_HEADER.length;
798 byte [] preamble = new byte [rpcHeaderLen + 2];
799 System.arraycopy(HConstants.RPC_HEADER, 0, preamble, 0, rpcHeaderLen);
800 preamble[rpcHeaderLen] = HConstants.RPC_CURRENT_VERSION;
801 preamble[rpcHeaderLen + 1] = authMethod.code;
802 outStream.write(preamble);
803 outStream.flush();
804 }
805
806
807
808
809 private synchronized void writeConnectionHeader() throws IOException {
810 synchronized (this.outLock) {
811 this.out.writeInt(this.header.getSerializedSize());
812 this.header.writeTo(this.out);
813 this.out.flush();
814 }
815 }
816
817
818 protected synchronized void close() {
819 if (!shouldCloseConnection.get()) {
820 LOG.error(getName() + ": the connection is not in the closed state");
821 return;
822 }
823
824
825
826 synchronized (connections) {
827 connections.removeValue(remoteId, this);
828 }
829
830
831 synchronized(this.outLock) {
832 if (this.out != null) {
833 IOUtils.closeStream(out);
834 this.out = null;
835 }
836 }
837 IOUtils.closeStream(in);
838 this.in = null;
839 disposeSasl();
840
841
842 if (LOG.isTraceEnabled()) {
843 LOG.trace(getName() + ": closing ipc connection to " + server);
844 }
845
846 cleanupCalls(true);
847
848 if (LOG.isTraceEnabled()) {
849 LOG.trace(getName() + ": ipc connection to " + server + " closed");
850 }
851 }
852
853 protected void tracedWriteRequest(Call call, int priority, Span span) throws IOException {
854 TraceScope ts = Trace.startSpan("RpcClientImpl.tracedWriteRequest", span);
855 try {
856 writeRequest(call, priority, span);
857 } finally {
858 ts.close();
859 }
860 }
861
862
863
864
865
866
867
868 private void writeRequest(Call call, final int priority, Span span) throws IOException {
869 RequestHeader.Builder builder = RequestHeader.newBuilder();
870 builder.setCallId(call.id);
871 if (span != null) {
872 builder.setTraceInfo(
873 RPCTInfo.newBuilder().setParentId(span.getSpanId()).setTraceId(span.getTraceId()));
874 }
875 builder.setMethodName(call.md.getName());
876 builder.setRequestParam(call.param != null);
877 ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells);
878 if (cellBlock != null) {
879 CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
880 cellBlockBuilder.setLength(cellBlock.limit());
881 builder.setCellBlockMeta(cellBlockBuilder.build());
882 }
883
884 if (priority != PayloadCarryingRpcController.PRIORITY_UNSET) {
885 builder.setPriority(priority);
886 }
887 RequestHeader header = builder.build();
888
889 setupIOstreams();
890
891
892
893
894 checkIsOpen();
895 IOException writeException = null;
896 synchronized (this.outLock) {
897 if (Thread.interrupted()) throw new InterruptedIOException();
898
899 calls.put(call.id, call);
900 checkIsOpen();
901
902 try {
903 IPCUtil.write(this.out, header, call.param, cellBlock);
904 } catch (IOException e) {
905
906
907 shouldCloseConnection.set(true);
908 writeException = e;
909 interrupt();
910 }
911 }
912
913
914 if (writeException != null) {
915 markClosed(writeException);
916 close();
917 }
918
919
920
921 synchronized (this) {
922 notifyAll();
923 }
924
925
926 if (writeException != null) throw writeException;
927 }
928
929
930
931
932 protected void readResponse() {
933 if (shouldCloseConnection.get()) return;
934 Call call = null;
935 boolean expectedCall = false;
936 try {
937
938
939 int totalSize = in.readInt();
940
941
942 ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
943 int id = responseHeader.getCallId();
944 call = calls.remove(id);
945 expectedCall = (call != null && !call.done);
946 if (!expectedCall) {
947
948
949
950
951
952 int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
953 int whatIsLeftToRead = totalSize - readSoFar;
954 IOUtils.skipFully(in, whatIsLeftToRead);
955 return;
956 }
957 if (responseHeader.hasException()) {
958 ExceptionResponse exceptionResponse = responseHeader.getException();
959 RemoteException re = createRemoteException(exceptionResponse);
960 call.setException(re);
961 if (isFatalConnectionException(exceptionResponse)) {
962 markClosed(re);
963 }
964 } else {
965 Message value = null;
966 if (call.responseDefaultType != null) {
967 Builder builder = call.responseDefaultType.newBuilderForType();
968 ProtobufUtil.mergeDelimitedFrom(builder, in);
969 value = builder.build();
970 }
971 CellScanner cellBlockScanner = null;
972 if (responseHeader.hasCellBlockMeta()) {
973 int size = responseHeader.getCellBlockMeta().getLength();
974 byte [] cellBlock = new byte[size];
975 IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);
976 cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);
977 }
978 call.setResponse(value, cellBlockScanner);
979 }
980 } catch (IOException e) {
981 if (expectedCall) call.setException(e);
982 if (e instanceof SocketTimeoutException) {
983
984
985
986 if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
987 } else {
988
989 markClosed(e);
990 }
991 } finally {
992 cleanupCalls(false);
993 }
994 }
995
996
997
998
999 private boolean isFatalConnectionException(final ExceptionResponse e) {
1000 return e.getExceptionClassName().
1001 equals(FatalConnectionException.class.getName());
1002 }
1003
1004
1005
1006
1007
1008 private RemoteException createRemoteException(final ExceptionResponse e) {
1009 String innerExceptionClassName = e.getExceptionClassName();
1010 boolean doNotRetry = e.getDoNotRetry();
1011 return e.hasHostname()?
1012
1013 new RemoteWithExtrasException(innerExceptionClassName,
1014 e.getStackTrace(), e.getHostname(), e.getPort(), doNotRetry):
1015 new RemoteWithExtrasException(innerExceptionClassName,
1016 e.getStackTrace(), doNotRetry);
1017 }
1018
1019 protected synchronized boolean markClosed(IOException e) {
1020 if (e == null) throw new NullPointerException();
1021
1022 boolean ret = shouldCloseConnection.compareAndSet(false, true);
1023 if (ret) {
1024 if (LOG.isTraceEnabled()) {
1025 LOG.trace(getName() + ": marking at should close, reason: " + e.getMessage());
1026 }
1027 if (callSender != null) {
1028 callSender.close();
1029 }
1030 notifyAll();
1031 }
1032 return ret;
1033 }
1034
1035
1036
1037
1038
1039
1040 protected synchronized void cleanupCalls(boolean allCalls) {
1041 Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator();
1042 while (itor.hasNext()) {
1043 Call c = itor.next().getValue();
1044 if (c.done) {
1045
1046 itor.remove();
1047 } else if (allCalls) {
1048 long waitTime = EnvironmentEdgeManager.currentTime() - c.getStartTime();
1049 IOException ie = new ConnectionClosingException("Connection to " + getRemoteAddress()
1050 + " is closing. Call id=" + c.id + ", waitTime=" + waitTime);
1051 c.setException(ie);
1052 itor.remove();
1053 } else if (c.checkAndSetTimeout()) {
1054 itor.remove();
1055 } else {
1056
1057
1058
1059 break;
1060 }
1061 }
1062 }
1063 }
1064
1065
1066
1067
1068
1069
1070
1071 RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory) {
1072 this(conf, clusterId, factory, null);
1073 }
1074
1075
1076
1077
1078
1079
1080
1081
1082 RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory,
1083 SocketAddress localAddr) {
1084 super(conf, clusterId, localAddr);
1085
1086 this.socketFactory = factory;
1087 this.connections = new PoolMap<ConnectionId, Connection>(getPoolType(conf), getPoolSize(conf));
1088 this.failedServers = new FailedServers(conf);
1089 }
1090
1091
1092
1093
1094
1095
1096 public RpcClientImpl(Configuration conf, String clusterId) {
1097 this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null);
1098 }
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109 public RpcClientImpl(Configuration conf, String clusterId, SocketAddress localAddr) {
1110 this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), localAddr);
1111 }
1112
1113
1114
1115 @Override
1116 public void close() {
1117 if (LOG.isDebugEnabled()) LOG.debug("Stopping rpc client");
1118 if (!running.compareAndSet(true, false)) return;
1119
1120 Set<Connection> connsToClose = null;
1121
1122 synchronized (connections) {
1123 for (Connection conn : connections.values()) {
1124 conn.interrupt();
1125 if (conn.callSender != null) {
1126 conn.callSender.interrupt();
1127 }
1128
1129
1130
1131 if (!conn.isAlive()) {
1132 if (connsToClose == null) {
1133 connsToClose = new HashSet<Connection>();
1134 }
1135 connsToClose.add(conn);
1136 }
1137 }
1138 }
1139 if (connsToClose != null) {
1140 for (Connection conn : connsToClose) {
1141 conn.markClosed(new InterruptedIOException("RpcClient is closing"));
1142 conn.close();
1143 }
1144 }
1145
1146
1147 while (!connections.isEmpty()) {
1148 try {
1149 Thread.sleep(10);
1150 } catch (InterruptedException e) {
1151 LOG.info("Interrupted while stopping the client. We still have " + connections.size() +
1152 " connections.");
1153 Thread.currentThread().interrupt();
1154 return;
1155 }
1156 }
1157 }
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171 @Override
1172 protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, MethodDescriptor md,
1173 Message param, Message returnType, User ticket, InetSocketAddress addr)
1174 throws IOException, InterruptedException {
1175 if (pcrc == null) {
1176 pcrc = new PayloadCarryingRpcController();
1177 }
1178 CellScanner cells = pcrc.cellScanner();
1179
1180 final Call call = new Call(this.callIdCnt.getAndIncrement(), md, param, cells, returnType,
1181 pcrc.getCallTimeout());
1182
1183 final Connection connection = getConnection(ticket, call, addr);
1184
1185 final CallFuture cts;
1186 if (connection.callSender != null) {
1187 cts = connection.callSender.sendCall(call, pcrc.getPriority(), Trace.currentSpan());
1188 pcrc.notifyOnCancel(new RpcCallback<Object>() {
1189 @Override
1190 public void run(Object parameter) {
1191 connection.callSender.remove(cts);
1192 }
1193 });
1194 if (pcrc.isCanceled()) {
1195
1196 call.callComplete();
1197 return new Pair<Message, CellScanner>(call.response, call.cells);
1198 }
1199 } else {
1200 cts = null;
1201 connection.tracedWriteRequest(call, pcrc.getPriority(), Trace.currentSpan());
1202 }
1203
1204 while (!call.done) {
1205 if (call.checkAndSetTimeout()) {
1206 if (cts != null) connection.callSender.remove(cts);
1207 break;
1208 }
1209 if (connection.shouldCloseConnection.get()) {
1210 throw new ConnectionClosingException("Call id=" + call.id +
1211 " on server " + addr + " aborted: connection is closing");
1212 }
1213 try {
1214 synchronized (call) {
1215 if (call.done) break;
1216 call.wait(Math.min(call.remainingTime(), 1000) + 1);
1217 }
1218 } catch (InterruptedException e) {
1219 call.setException(new InterruptedIOException());
1220 if (cts != null) connection.callSender.remove(cts);
1221 throw e;
1222 }
1223 }
1224
1225 if (call.error != null) {
1226 if (call.error instanceof RemoteException) {
1227 call.error.fillInStackTrace();
1228 throw call.error;
1229 }
1230
1231 throw wrapException(addr, call.error);
1232 }
1233
1234 return new Pair<Message, CellScanner>(call.response, call.cells);
1235 }
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250 protected IOException wrapException(InetSocketAddress addr,
1251 IOException exception) {
1252 if (exception instanceof ConnectException) {
1253
1254 return (ConnectException)new ConnectException(
1255 "Call to " + addr + " failed on connection exception: " + exception).initCause(exception);
1256 } else if (exception instanceof SocketTimeoutException) {
1257 return (SocketTimeoutException)new SocketTimeoutException("Call to " + addr +
1258 " failed because " + exception).initCause(exception);
1259 } else if (exception instanceof ConnectionClosingException){
1260 return (ConnectionClosingException) new ConnectionClosingException(
1261 "Call to " + addr + " failed on local exception: " + exception).initCause(exception);
1262 } else {
1263 return (IOException)new IOException("Call to " + addr + " failed on local exception: " +
1264 exception).initCause(exception);
1265 }
1266 }
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276 @Override
1277 public void cancelConnections(ServerName sn) {
1278 synchronized (connections) {
1279 for (Connection connection : connections.values()) {
1280 if (connection.isAlive() &&
1281 connection.getRemoteAddress().getPort() == sn.getPort() &&
1282 connection.getRemoteAddress().getHostName().equals(sn.getHostname())) {
1283 LOG.info("The server on " + sn.toString() +
1284 " is dead - stopping the connection " + connection.remoteId);
1285 connection.interrupt();
1286
1287 }
1288 }
1289 }
1290 }
1291
1292
1293
1294
1295
1296 protected Connection getConnection(User ticket, Call call, InetSocketAddress addr)
1297 throws IOException {
1298 if (!running.get()) throw new StoppedRpcClientException();
1299 Connection connection;
1300 ConnectionId remoteId =
1301 new ConnectionId(ticket, call.md.getService().getName(), addr);
1302 synchronized (connections) {
1303 connection = connections.get(remoteId);
1304 if (connection == null) {
1305 connection = createConnection(remoteId, this.codec, this.compressor);
1306 connections.put(remoteId, connection);
1307 }
1308 }
1309
1310 return connection;
1311 }
1312 }