View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.ipc;
21  
22  import com.google.protobuf.Descriptors.MethodDescriptor;
23  import com.google.protobuf.Message;
24  import com.google.protobuf.Message.Builder;
25  import com.google.protobuf.RpcCallback;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.hbase.CellScanner;
28  import org.apache.hadoop.hbase.DoNotRetryIOException;
29  import org.apache.hadoop.hbase.HConstants;
30  import org.apache.hadoop.hbase.ServerName;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.hbase.codec.Codec;
33  import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
34  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
35  import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
36  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
37  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
38  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
39  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
40  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
41  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
42  import org.apache.hadoop.hbase.protobuf.generated.TracingProtos.RPCTInfo;
43  import org.apache.hadoop.hbase.security.AuthMethod;
44  import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
45  import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
46  import org.apache.hadoop.hbase.security.SecurityInfo;
47  import org.apache.hadoop.hbase.security.User;
48  import org.apache.hadoop.hbase.security.UserProvider;
49  import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
50  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
51  import org.apache.hadoop.hbase.util.ExceptionUtil;
52  import org.apache.hadoop.hbase.util.Pair;
53  import org.apache.hadoop.hbase.util.PoolMap;
54  import org.apache.hadoop.io.IOUtils;
55  import org.apache.hadoop.io.Text;
56  import org.apache.hadoop.io.compress.CompressionCodec;
57  import org.apache.hadoop.ipc.RemoteException;
58  import org.apache.hadoop.net.NetUtils;
59  import org.apache.hadoop.security.SecurityUtil;
60  import org.apache.hadoop.security.UserGroupInformation;
61  import org.apache.hadoop.security.token.Token;
62  import org.apache.hadoop.security.token.TokenIdentifier;
63  import org.apache.hadoop.security.token.TokenSelector;
64  import org.apache.htrace.Span;
65  import org.apache.htrace.Trace;
66  import org.apache.htrace.TraceScope;
67  
68  import javax.net.SocketFactory;
69  import javax.security.sasl.SaslException;
70  import java.io.BufferedInputStream;
71  import java.io.BufferedOutputStream;
72  import java.io.Closeable;
73  import java.io.DataInputStream;
74  import java.io.DataOutputStream;
75  import java.io.IOException;
76  import java.io.InputStream;
77  import java.io.InterruptedIOException;
78  import java.io.OutputStream;
79  import java.net.ConnectException;
80  import java.net.InetSocketAddress;
81  import java.net.Socket;
82  import java.net.SocketAddress;
83  import java.net.SocketTimeoutException;
84  import java.net.UnknownHostException;
85  import java.nio.ByteBuffer;
86  import java.security.PrivilegedExceptionAction;
87  import java.util.HashMap;
88  import java.util.HashSet;
89  import java.util.Iterator;
90  import java.util.Map;
91  import java.util.Map.Entry;
92  import java.util.Random;
93  import java.util.Set;
94  import java.util.concurrent.ArrayBlockingQueue;
95  import java.util.concurrent.BlockingQueue;
96  import java.util.concurrent.ConcurrentSkipListMap;
97  import java.util.concurrent.atomic.AtomicBoolean;
98  import java.util.concurrent.atomic.AtomicInteger;
99  
100 /**
101  * Does RPC against a cluster.  Manages connections per regionserver in the cluster.
102  * <p>See HBaseServer
103  */
104 @InterfaceAudience.Private
105 public class RpcClientImpl extends AbstractRpcClient {
106   protected final AtomicInteger callIdCnt = new AtomicInteger();
107 
108   protected final PoolMap<ConnectionId, Connection> connections;
109 
110   protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs
111 
112   protected final FailedServers failedServers;
113 
114   protected final SocketFactory socketFactory;           // how to create sockets
115 
116   protected final static Map<AuthenticationProtos.TokenIdentifier.Kind,
117       TokenSelector<? extends TokenIdentifier>> tokenHandlers =
118       new HashMap<AuthenticationProtos.TokenIdentifier.Kind,
119         TokenSelector<? extends TokenIdentifier>>();
120   static {
121     tokenHandlers.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
122         new AuthenticationTokenSelector());
123   }
124 
125   /**
126    * Creates a connection. Can be overridden by a subclass for testing.
127    * @param remoteId - the ConnectionId to use for the connection creation.
128    */
129   protected Connection createConnection(ConnectionId remoteId, final Codec codec,
130       final CompressionCodec compressor)
131   throws IOException {
132     return new Connection(remoteId, codec, compressor);
133   }
134 
135   /**
136    * see {@link RpcClientImpl.Connection.CallSender}
137    */
138   private static class CallFuture {
139     final Call call;
140     final int priority;
141     final Span span;
142 
143     // We will use this to stop the writer
144     final static CallFuture DEATH_PILL = new CallFuture(null, -1, null);
145 
146     CallFuture(Call call, int priority, Span span) {
147       this.call = call;
148       this.priority = priority;
149       this.span = span;
150     }
151   }
152 
153   /** Thread that reads responses and notifies callers.  Each connection owns a
154    * socket connected to a remote address.  Calls are multiplexed through this
155    * socket: responses may be delivered out of order. */
156   protected class Connection extends Thread {
157     private ConnectionHeader header;              // connection header
158     protected ConnectionId remoteId;
159     protected Socket socket = null;                 // connected socket
160     protected DataInputStream in;
161     protected DataOutputStream out;
162     private Object outLock = new Object();
163     private InetSocketAddress server;             // server ip:port
164     private String serverPrincipal;  // server's krb5 principal name
165     private AuthMethod authMethod; // authentication method
166     private boolean useSasl;
167     private Token<? extends TokenIdentifier> token;
168     private HBaseSaslRpcClient saslRpcClient;
169     private int reloginMaxBackoff; // max pause before relogin on sasl failure
170     private final Codec codec;
171     private final CompressionCodec compressor;
172 
173     // currently active calls
174     protected final ConcurrentSkipListMap<Integer, Call> calls =
175       new ConcurrentSkipListMap<Integer, Call>();
176 
177     protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean();
178     protected final CallSender callSender;
179 
180 
181     /**
182      * If the client wants to interrupt its calls easily (i.e. call Thread#interrupt),
183      *  it gets into a java issue: an interruption during a write closes the socket/channel.
184      * A way to avoid this is to use a different thread for writing. This way, on interruptions,
185      *  we either cancel the writes or ignore the answer if the write is already done, but we
186      *  don't stop the write in the middle.
187      * This adds a thread per region server in the client, so it's kept as an option.
188      * <p>
189      * The implementation is simple: the client threads adds their call to the queue, and then
190      *  wait for an answer. The CallSender blocks on the queue, and writes the calls one
191      *  after the other. On interruption, the client cancels its call. The CallSender checks that
192      *  the call has not been canceled before writing it.
193      * </p>
194      * When the connection closes, all the calls not yet sent are dismissed. The client thread
195      *  is notified with an appropriate exception, as if the call was already sent but the answer
196      *  not yet received.
197      * </p>
198      */
199     private class CallSender extends Thread implements Closeable {
200       protected final BlockingQueue<CallFuture> callsToWrite;
201 
202 
203       public CallFuture sendCall(Call call, int priority, Span span)
204           throws InterruptedException, IOException {
205         CallFuture cts = new CallFuture(call, priority, span);
206         if (!callsToWrite.offer(cts)) {
207           throw new IOException("Can't add the call " + call.id +
208               " to the write queue. callsToWrite.size()=" + callsToWrite.size());
209         }
210         checkIsOpen(); // We check after the put, to be sure that the call we added won't stay
211                        //  in the list while the cleanup was already done.
212         return cts;
213       }
214 
215       @Override
216       public void close(){
217         assert shouldCloseConnection.get();
218         callsToWrite.offer(CallFuture.DEATH_PILL);
219         // We don't care if we can't add the death pill to the queue: the writer
220         //  won't be blocked in the 'take', as its queue is full.
221       }
222 
223       CallSender(String name, Configuration conf) {
224         int queueSize = conf.getInt("hbase.ipc.client.write.queueSize", 1000);
225         callsToWrite = new ArrayBlockingQueue<CallFuture>(queueSize);
226         setDaemon(true);
227         setName(name + " - writer");
228       }
229 
230       public void remove(CallFuture cts){
231         callsToWrite.remove(cts);
232 
233         // By removing the call from the expected call list, we make the list smaller, but
234         //  it means as well that we don't know how many calls we cancelled.
235         calls.remove(cts.call.id);
236         cts.call.callComplete();
237       }
238 
239       /**
240        * Reads the call from the queue, write them on the socket.
241        */
242       @Override
243       public void run() {
244         while (!shouldCloseConnection.get()) {
245           CallFuture cts = null;
246           try {
247             cts = callsToWrite.take();
248           } catch (InterruptedException e) {
249             markClosed(new InterruptedIOException());
250           }
251 
252           if (cts == null || cts == CallFuture.DEATH_PILL) {
253             assert shouldCloseConnection.get();
254             break;
255           }
256 
257           if (cts.call.done) {
258             continue;
259           }
260 
261           if (cts.call.checkAndSetTimeout()) {
262             continue;
263           }
264 
265           try {
266             Connection.this.tracedWriteRequest(cts.call, cts.priority, cts.span);
267           } catch (IOException e) {
268             if (LOG.isDebugEnabled()) {
269               LOG.debug("call write error for call #" + cts.call.id
270                 + ", message =" + e.getMessage());
271             }
272             cts.call.setException(e);
273             markClosed(e);
274           }
275         }
276 
277         cleanup();
278       }
279 
280       /**
281        * Cleans the call not yet sent when we finish.
282        */
283       private void cleanup() {
284         assert shouldCloseConnection.get();
285 
286         IOException ie = new ConnectionClosingException("Connection to " + server + " is closing.");
287         while (true) {
288           CallFuture cts = callsToWrite.poll();
289           if (cts == null) {
290             break;
291           }
292           if (cts.call != null && !cts.call.done) {
293             cts.call.setException(ie);
294           }
295         }
296       }
297     }
298 
299     Connection(ConnectionId remoteId, final Codec codec, final CompressionCodec compressor)
300     throws IOException {
301       if (remoteId.getAddress().isUnresolved()) {
302         throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName());
303       }
304       this.server = remoteId.getAddress();
305       this.codec = codec;
306       this.compressor = compressor;
307 
308       UserGroupInformation ticket = remoteId.getTicket().getUGI();
309       SecurityInfo securityInfo = SecurityInfo.getInfo(remoteId.getServiceName());
310       this.useSasl = userProvider.isHBaseSecurityEnabled();
311       if (useSasl && securityInfo != null) {
312         AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
313         if (tokenKind != null) {
314           TokenSelector<? extends TokenIdentifier> tokenSelector =
315               tokenHandlers.get(tokenKind);
316           if (tokenSelector != null) {
317             token = tokenSelector.selectToken(new Text(clusterId),
318                 ticket.getTokens());
319           } else if (LOG.isDebugEnabled()) {
320             LOG.debug("No token selector found for type "+tokenKind);
321           }
322         }
323         String serverKey = securityInfo.getServerPrincipal();
324         if (serverKey == null) {
325           throw new IOException(
326               "Can't obtain server Kerberos config key from SecurityInfo");
327         }
328         serverPrincipal = SecurityUtil.getServerPrincipal(
329             conf.get(serverKey), server.getAddress().getCanonicalHostName().toLowerCase());
330         if (LOG.isDebugEnabled()) {
331           LOG.debug("RPC Server Kerberos principal name for service="
332               + remoteId.getServiceName() + " is " + serverPrincipal);
333         }
334       }
335 
336       if (!useSasl) {
337         authMethod = AuthMethod.SIMPLE;
338       } else if (token != null) {
339         authMethod = AuthMethod.DIGEST;
340       } else {
341         authMethod = AuthMethod.KERBEROS;
342       }
343 
344       if (LOG.isDebugEnabled()) {
345         LOG.debug("Use " + authMethod + " authentication for service " + remoteId.serviceName +
346           ", sasl=" + useSasl);
347       }
348       reloginMaxBackoff = conf.getInt("hbase.security.relogin.maxbackoff", 5000);
349       this.remoteId = remoteId;
350 
351       ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
352       builder.setServiceName(remoteId.getServiceName());
353       UserInformation userInfoPB = getUserInfo(ticket);
354       if (userInfoPB != null) {
355         builder.setUserInfo(userInfoPB);
356       }
357       if (this.codec != null) {
358         builder.setCellBlockCodecClass(this.codec.getClass().getCanonicalName());
359       }
360       if (this.compressor != null) {
361         builder.setCellBlockCompressorClass(this.compressor.getClass().getCanonicalName());
362       }
363       builder.setVersionInfo(ProtobufUtil.getVersionInfo());
364       this.header = builder.build();
365 
366       this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
367         remoteId.getAddress().toString() +
368         ((ticket==null)?" from an unknown user": (" from "
369         + ticket.getUserName())));
370       this.setDaemon(true);
371 
372       if (conf.getBoolean(SPECIFIC_WRITE_THREAD, false)) {
373         callSender = new CallSender(getName(), conf);
374         callSender.start();
375       } else {
376         callSender = null;
377       }
378     }
379 
380     private UserInformation getUserInfo(UserGroupInformation ugi) {
381       if (ugi == null || authMethod == AuthMethod.DIGEST) {
382         // Don't send user for token auth
383         return null;
384       }
385       UserInformation.Builder userInfoPB = UserInformation.newBuilder();
386       if (authMethod == AuthMethod.KERBEROS) {
387         // Send effective user for Kerberos auth
388         userInfoPB.setEffectiveUser(ugi.getUserName());
389       } else if (authMethod == AuthMethod.SIMPLE) {
390         //Send both effective user and real user for simple auth
391         userInfoPB.setEffectiveUser(ugi.getUserName());
392         if (ugi.getRealUser() != null) {
393           userInfoPB.setRealUser(ugi.getRealUser().getUserName());
394         }
395       }
396       return userInfoPB.build();
397     }
398 
399     protected synchronized void setupConnection() throws IOException {
400       short ioFailures = 0;
401       short timeoutFailures = 0;
402       while (true) {
403         try {
404           this.socket = socketFactory.createSocket();
405           this.socket.setTcpNoDelay(tcpNoDelay);
406           this.socket.setKeepAlive(tcpKeepAlive);
407           if (localAddr != null) {
408             this.socket.bind(localAddr);
409           }
410           NetUtils.connect(this.socket, remoteId.getAddress(), connectTO);
411           this.socket.setSoTimeout(readTO);
412           return;
413         } catch (SocketTimeoutException toe) {
414           /* The max number of retries is 45,
415            * which amounts to 20s*45 = 15 minutes retries.
416            */
417           handleConnectionFailure(timeoutFailures++, maxRetries, toe);
418         } catch (IOException ie) {
419           handleConnectionFailure(ioFailures++, maxRetries, ie);
420         }
421       }
422     }
423 
424     protected synchronized void closeConnection() {
425       if (socket == null) {
426         return;
427       }
428 
429       // close the current connection
430       try {
431         if (socket.getOutputStream() != null) {
432           socket.getOutputStream().close();
433         }
434       } catch (IOException ignored) {  // Can happen if the socket is already closed
435         if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
436       }
437       try {
438         if (socket.getInputStream() != null) {
439           socket.getInputStream().close();
440         }
441       } catch (IOException ignored) {  // Can happen if the socket is already closed
442         if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
443       }
444       try {
445         if (socket.getChannel() != null) {
446           socket.getChannel().close();
447         }
448       } catch (IOException ignored) {  // Can happen if the socket is already closed
449         if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
450       }
451       try {
452         socket.close();
453       } catch (IOException e) {
454         LOG.warn("Not able to close a socket", e);
455       }
456 
457       // set socket to null so that the next call to setupIOstreams
458       // can start the process of connect all over again.
459       socket = null;
460     }
461 
462     /**
463      *  Handle connection failures
464      *
465      * If the current number of retries is equal to the max number of retries,
466      * stop retrying and throw the exception; Otherwise backoff N seconds and
467      * try connecting again.
468      *
469      * This Method is only called from inside setupIOstreams(), which is
470      * synchronized. Hence the sleep is synchronized; the locks will be retained.
471      *
472      * @param curRetries current number of retries
473      * @param maxRetries max number of retries allowed
474      * @param ioe failure reason
475      * @throws IOException if max number of retries is reached
476      */
477     private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe)
478     throws IOException {
479       closeConnection();
480 
481       // throw the exception if the maximum number of retries is reached
482       if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) {
483         throw ioe;
484       }
485 
486       // otherwise back off and retry
487       try {
488         Thread.sleep(failureSleep);
489       } catch (InterruptedException ie) {
490         ExceptionUtil.rethrowIfInterrupt(ie);
491       }
492 
493       LOG.info("Retrying connect to server: " + remoteId.getAddress() +
494         " after sleeping " + failureSleep + "ms. Already tried " + curRetries +
495         " time(s).");
496     }
497 
498     /**
499      * @throws IOException if the connection is not open.
500      */
501     private void checkIsOpen() throws IOException {
502       if (shouldCloseConnection.get()) {
503         throw new ConnectionClosingException(getName() + " is closing");
504       }
505     }
506 
507     /* wait till someone signals us to start reading RPC response or
508      * it is idle too long, it is marked as to be closed,
509      * or the client is marked as not running.
510      *
511      * @return true if it is time to read a response; false otherwise.
512      */
513     protected synchronized boolean waitForWork() throws InterruptedException {
514       // beware of the concurrent access to the calls list: we can add calls, but as well
515       //  remove them.
516       long waitUntil = EnvironmentEdgeManager.currentTime() + minIdleTimeBeforeClose;
517 
518       while (true) {
519         if (shouldCloseConnection.get()) {
520           return false;
521         }
522 
523         if (!running.get()) {
524           markClosed(new IOException("stopped with " + calls.size() + " pending request(s)"));
525           return false;
526         }
527 
528         if (!calls.isEmpty()) {
529           // shouldCloseConnection can be set to true by a parallel thread here. The caller
530           //  will need to check anyway.
531           return true;
532         }
533 
534         if (EnvironmentEdgeManager.currentTime() >= waitUntil) {
535           // Connection is idle.
536           // We expect the number of calls to be zero here, but actually someone can
537           //  adds a call at the any moment, as there is no synchronization between this task
538           //  and adding new calls. It's not a big issue, but it will get an exception.
539           markClosed(new IOException(
540               "idle connection closed with " + calls.size() + " pending request(s)"));
541           return false;
542         }
543 
544         wait(Math.min(minIdleTimeBeforeClose, 1000));
545       }
546     }
547 
548     public InetSocketAddress getRemoteAddress() {
549       return remoteId.getAddress();
550     }
551 
552     @Override
553     public void run() {
554       if (LOG.isTraceEnabled()) {
555         LOG.trace(getName() + ": starting, connections " + connections.size());
556       }
557 
558       try {
559         while (waitForWork()) { // Wait here for work - read or close connection
560           readResponse();
561         }
562       } catch (InterruptedException t) {
563         if (LOG.isTraceEnabled()) {
564           LOG.trace(getName() + ": interrupted while waiting for call responses");
565         }
566         markClosed(ExceptionUtil.asInterrupt(t));
567       } catch (Throwable t) {
568         if (LOG.isDebugEnabled()) {
569           LOG.debug(getName() + ": unexpected throwable while waiting for call responses", t);
570         }
571         markClosed(new IOException("Unexpected throwable while waiting call responses", t));
572       }
573 
574       close();
575 
576       if (LOG.isTraceEnabled()) {
577         LOG.trace(getName() + ": stopped, connections " + connections.size());
578       }
579     }
580 
581     private synchronized void disposeSasl() {
582       if (saslRpcClient != null) {
583         try {
584           saslRpcClient.dispose();
585           saslRpcClient = null;
586         } catch (IOException ioe) {
587           LOG.error("Error disposing of SASL client", ioe);
588         }
589       }
590     }
591 
592     private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
593       UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
594       UserGroupInformation currentUser =
595         UserGroupInformation.getCurrentUser();
596       UserGroupInformation realUser = currentUser.getRealUser();
597       return authMethod == AuthMethod.KERBEROS &&
598           loginUser != null &&
599           //Make sure user logged in using Kerberos either keytab or TGT
600           loginUser.hasKerberosCredentials() &&
601           // relogin only in case it is the login user (e.g. JT)
602           // or superuser (like oozie).
603           (loginUser.equals(currentUser) || loginUser.equals(realUser));
604     }
605 
606     private synchronized boolean setupSaslConnection(final InputStream in2,
607         final OutputStream out2) throws IOException {
608       saslRpcClient = new HBaseSaslRpcClient(authMethod, token, serverPrincipal, fallbackAllowed,
609           conf.get("hbase.rpc.protection",
610               QualityOfProtection.AUTHENTICATION.name().toLowerCase()));
611       return saslRpcClient.saslConnect(in2, out2);
612     }
613 
614     /**
615      * If multiple clients with the same principal try to connect
616      * to the same server at the same time, the server assumes a
617      * replay attack is in progress. This is a feature of kerberos.
618      * In order to work around this, what is done is that the client
619      * backs off randomly and tries to initiate the connection
620      * again.
621      * The other problem is to do with ticket expiry. To handle that,
622      * a relogin is attempted.
623      * <p>
624      * The retry logic is governed by the {@link #shouldAuthenticateOverKrb}
625      * method. In case when the user doesn't have valid credentials, we don't
626      * need to retry (from cache or ticket). In such cases, it is prudent to
627      * throw a runtime exception when we receive a SaslException from the
628      * underlying authentication implementation, so there is no retry from
629      * other high level (for eg, HCM or HBaseAdmin).
630      * </p>
631      */
632     private synchronized void handleSaslConnectionFailure(
633         final int currRetries,
634         final int maxRetries, final Exception ex, final Random rand,
635         final UserGroupInformation user)
636     throws IOException, InterruptedException{
637       user.doAs(new PrivilegedExceptionAction<Object>() {
638         @Override
639         public Object run() throws IOException, InterruptedException {
640           closeConnection();
641           if (shouldAuthenticateOverKrb()) {
642             if (currRetries < maxRetries) {
643               if (LOG.isDebugEnabled()) {
644                 LOG.debug("Exception encountered while connecting to " +
645                     "the server : " + ex);
646               }
647               //try re-login
648               if (UserGroupInformation.isLoginKeytabBased()) {
649                 UserGroupInformation.getLoginUser().reloginFromKeytab();
650               } else {
651                 UserGroupInformation.getLoginUser().reloginFromTicketCache();
652               }
653               disposeSasl();
654               //have granularity of milliseconds
655               //we are sleeping with the Connection lock held but since this
656               //connection instance is being used for connecting to the server
657               //in question, it is okay
658               Thread.sleep((rand.nextInt(reloginMaxBackoff) + 1));
659               return null;
660             } else {
661               String msg = "Couldn't setup connection for " +
662               UserGroupInformation.getLoginUser().getUserName() +
663               " to " + serverPrincipal;
664               LOG.warn(msg);
665               throw (IOException) new IOException(msg).initCause(ex);
666             }
667           } else {
668             LOG.warn("Exception encountered while connecting to " +
669                 "the server : " + ex);
670           }
671           if (ex instanceof RemoteException) {
672             throw (RemoteException)ex;
673           }
674           if (ex instanceof SaslException) {
675             String msg = "SASL authentication failed." +
676               " The most likely cause is missing or invalid credentials." +
677               " Consider 'kinit'.";
678             LOG.fatal(msg, ex);
679             throw new RuntimeException(msg, ex);
680           }
681           throw new IOException(ex);
682         }
683       });
684     }
685 
686     protected synchronized void setupIOstreams() throws IOException {
687       if (socket != null) {
688         // The connection is already available. Perfect.
689         return;
690       }
691 
692       if (shouldCloseConnection.get()){
693         throw new ConnectionClosingException("This connection is closing");
694       }
695 
696       if (failedServers.isFailedServer(remoteId.getAddress())) {
697         if (LOG.isDebugEnabled()) {
698           LOG.debug("Not trying to connect to " + server +
699               " this server is in the failed servers list");
700         }
701         IOException e = new FailedServerException(
702             "This server is in the failed servers list: " + server);
703         markClosed(e);
704         close();
705         throw e;
706       }
707 
708       try {
709         if (LOG.isDebugEnabled()) {
710           LOG.debug("Connecting to " + server);
711         }
712         short numRetries = 0;
713         final short MAX_RETRIES = 5;
714         Random rand = null;
715         while (true) {
716           setupConnection();
717           InputStream inStream = NetUtils.getInputStream(socket);
718           // This creates a socket with a write timeout. This timeout cannot be changed.
719           OutputStream outStream = NetUtils.getOutputStream(socket, writeTO);
720           // Write out the preamble -- MAGIC, version, and auth to use.
721           writeConnectionHeaderPreamble(outStream);
722           if (useSasl) {
723             final InputStream in2 = inStream;
724             final OutputStream out2 = outStream;
725             UserGroupInformation ticket = remoteId.getTicket().getUGI();
726             if (authMethod == AuthMethod.KERBEROS) {
727               if (ticket != null && ticket.getRealUser() != null) {
728                 ticket = ticket.getRealUser();
729               }
730             }
731             boolean continueSasl;
732             if (ticket == null) throw new FatalConnectionException("ticket/user is null");
733             try {
734               continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
735                 @Override
736                 public Boolean run() throws IOException {
737                   return setupSaslConnection(in2, out2);
738                 }
739               });
740             } catch (Exception ex) {
741               ExceptionUtil.rethrowIfInterrupt(ex);
742               if (rand == null) {
743                 rand = new Random();
744               }
745               handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, rand, ticket);
746               continue;
747             }
748             if (continueSasl) {
749               // Sasl connect is successful. Let's set up Sasl i/o streams.
750               inStream = saslRpcClient.getInputStream(inStream);
751               outStream = saslRpcClient.getOutputStream(outStream);
752             } else {
753               // fall back to simple auth because server told us so.
754               authMethod = AuthMethod.SIMPLE;
755               useSasl = false;
756             }
757           }
758           this.in = new DataInputStream(new BufferedInputStream(inStream));
759           synchronized (this.outLock) {
760             this.out = new DataOutputStream(new BufferedOutputStream(outStream));
761           }
762           // Now write out the connection header
763           writeConnectionHeader();
764 
765           // start the receiver thread after the socket connection has been set up
766           start();
767           return;
768         }
769       } catch (Throwable t) {
770         IOException e = ExceptionUtil.asInterrupt(t);
771         if (e == null) {
772           failedServers.addToFailedServers(remoteId.address);
773           if (t instanceof LinkageError) {
774             // probably the hbase hadoop version does not match the running hadoop version
775             e = new DoNotRetryIOException(t);
776           } else if (t instanceof IOException) {
777             e = (IOException) t;
778           } else {
779             e = new IOException("Could not set up IO Streams to " + server, t);
780           }
781         }
782         markClosed(e);
783         close();
784         throw e;
785       }
786     }
787 
788     /**
789      * Write the RPC header: <MAGIC WORD -- 'HBas'> <ONEBYTE_VERSION> <ONEBYTE_AUTH_TYPE>
790      */
791     private void writeConnectionHeaderPreamble(OutputStream outStream) throws IOException {
792       // Assemble the preamble up in a buffer first and then send it.  Writing individual elements,
793       // they are getting sent across piecemeal according to wireshark and then server is messing
794       // up the reading on occasion (the passed in stream is not buffered yet).
795 
796       // Preamble is six bytes -- 'HBas' + VERSION + AUTH_CODE
797       int rpcHeaderLen = HConstants.RPC_HEADER.length;
798       byte [] preamble = new byte [rpcHeaderLen + 2];
799       System.arraycopy(HConstants.RPC_HEADER, 0, preamble, 0, rpcHeaderLen);
800       preamble[rpcHeaderLen] = HConstants.RPC_CURRENT_VERSION;
801       preamble[rpcHeaderLen + 1] = authMethod.code;
802       outStream.write(preamble);
803       outStream.flush();
804     }
805 
806     /**
807      * Write the connection header.
808      */
809     private synchronized void writeConnectionHeader() throws IOException {
810       synchronized (this.outLock) {
811         this.out.writeInt(this.header.getSerializedSize());
812         this.header.writeTo(this.out);
813         this.out.flush();
814       }
815     }
816 
817     /** Close the connection. */
818     protected synchronized void close() {
819       if (!shouldCloseConnection.get()) {
820         LOG.error(getName() + ": the connection is not in the closed state");
821         return;
822       }
823 
824       // release the resources
825       // first thing to do;take the connection out of the connection list
826       synchronized (connections) {
827         connections.removeValue(remoteId, this);
828       }
829 
830       // close the streams and therefore the socket
831       synchronized(this.outLock) {
832         if (this.out != null) {
833           IOUtils.closeStream(out);
834           this.out = null;
835         }
836       }
837       IOUtils.closeStream(in);
838       this.in = null;
839       disposeSasl();
840 
841       // log the info
842       if (LOG.isTraceEnabled()) {
843         LOG.trace(getName() + ": closing ipc connection to " + server);
844       }
845 
846       cleanupCalls(true);
847 
848       if (LOG.isTraceEnabled()) {
849         LOG.trace(getName() + ": ipc connection to " + server + " closed");
850       }
851     }
852 
853     protected void tracedWriteRequest(Call call, int priority, Span span) throws IOException {
854       TraceScope ts = Trace.startSpan("RpcClientImpl.tracedWriteRequest", span);
855       try {
856         writeRequest(call, priority, span);
857       } finally {
858         ts.close();
859       }
860     }
861 
862     /**
863      * Initiates a call by sending the parameter to the remote server.
864      * Note: this is not called from the Connection thread, but by other
865      * threads.
866      * @see #readResponse()
867      */
868     private void writeRequest(Call call, final int priority, Span span) throws IOException {
869       RequestHeader.Builder builder = RequestHeader.newBuilder();
870       builder.setCallId(call.id);
871       if (span != null) {
872         builder.setTraceInfo(
873             RPCTInfo.newBuilder().setParentId(span.getSpanId()).setTraceId(span.getTraceId()));
874       }
875       builder.setMethodName(call.md.getName());
876       builder.setRequestParam(call.param != null);
877       ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells);
878       if (cellBlock != null) {
879         CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
880         cellBlockBuilder.setLength(cellBlock.limit());
881         builder.setCellBlockMeta(cellBlockBuilder.build());
882       }
883       // Only pass priority if there is one set.
884       if (priority != PayloadCarryingRpcController.PRIORITY_UNSET) {
885         builder.setPriority(priority);
886       }
887       RequestHeader header = builder.build();
888 
889       setupIOstreams();
890 
891       // Now we're going to write the call. We take the lock, then check that the connection
892       //  is still valid, and, if so we do the write to the socket. If the write fails, we don't
893       //  know where we stand, we have to close the connection.
894       checkIsOpen();
895       IOException writeException = null;
896       synchronized (this.outLock) {
897         if (Thread.interrupted()) throw new InterruptedIOException();
898 
899         calls.put(call.id, call); // We put first as we don't want the connection to become idle.
900         checkIsOpen(); // Now we're checking that it didn't became idle in between.
901 
902         try {
903           IPCUtil.write(this.out, header, call.param, cellBlock);
904         } catch (IOException e) {
905           // We set the value inside the synchronized block, this way the next in line
906           //  won't even try to write. Otherwise we might miss a call in the calls map?
907           shouldCloseConnection.set(true);
908           writeException = e;
909           interrupt();
910         }
911       }
912 
913       // call close outside of the synchronized (outLock) to prevent deadlock - HBASE-14474
914       if (writeException != null) {
915         markClosed(writeException);
916         close();
917       }
918 
919       // We added a call, and may be started the connection close. In both cases, we
920       //  need to notify the reader.
921       synchronized (this) {
922         notifyAll();
923       }
924 
925       // Now that we notified, we can rethrow the exception if any. Otherwise we're good.
926       if (writeException != null) throw writeException;
927     }
928 
929     /* Receive a response.
930      * Because only one receiver, so no synchronization on in.
931      */
932     protected void readResponse() {
933       if (shouldCloseConnection.get()) return;
934       Call call = null;
935       boolean expectedCall = false;
936       try {
937         // See HBaseServer.Call.setResponse for where we write out the response.
938         // Total size of the response.  Unused.  But have to read it in anyways.
939         int totalSize = in.readInt();
940 
941         // Read the header
942         ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
943         int id = responseHeader.getCallId();
944         call = calls.remove(id); // call.done have to be set before leaving this method
945         expectedCall = (call != null && !call.done);
946         if (!expectedCall) {
947           // So we got a response for which we have no corresponding 'call' here on the client-side.
948           // We probably timed out waiting, cleaned up all references, and now the server decides
949           // to return a response.  There is nothing we can do w/ the response at this stage. Clean
950           // out the wire of the response so its out of the way and we can get other responses on
951           // this connection.
952           int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
953           int whatIsLeftToRead = totalSize - readSoFar;
954           IOUtils.skipFully(in, whatIsLeftToRead);
955           return;
956         }
957         if (responseHeader.hasException()) {
958           ExceptionResponse exceptionResponse = responseHeader.getException();
959           RemoteException re = createRemoteException(exceptionResponse);
960           call.setException(re);
961           if (isFatalConnectionException(exceptionResponse)) {
962             markClosed(re);
963           }
964         } else {
965           Message value = null;
966           if (call.responseDefaultType != null) {
967             Builder builder = call.responseDefaultType.newBuilderForType();
968             ProtobufUtil.mergeDelimitedFrom(builder, in);
969             value = builder.build();
970           }
971           CellScanner cellBlockScanner = null;
972           if (responseHeader.hasCellBlockMeta()) {
973             int size = responseHeader.getCellBlockMeta().getLength();
974             byte [] cellBlock = new byte[size];
975             IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);
976             cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);
977           }
978           call.setResponse(value, cellBlockScanner);
979         }
980       } catch (IOException e) {
981         if (expectedCall) call.setException(e);
982         if (e instanceof SocketTimeoutException) {
983           // Clean up open calls but don't treat this as a fatal condition,
984           // since we expect certain responses to not make it by the specified
985           // {@link ConnectionId#rpcTimeout}.
986           if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
987         } else {
988           // Treat this as a fatal condition and close this connection
989           markClosed(e);
990         }
991       } finally {
992         cleanupCalls(false);
993       }
994     }
995 
996     /**
997      * @return True if the exception is a fatal connection exception.
998      */
999     private boolean isFatalConnectionException(final ExceptionResponse e) {
1000       return e.getExceptionClassName().
1001         equals(FatalConnectionException.class.getName());
1002     }
1003 
1004     /**
1005      * @param e exception to be wrapped
1006      * @return RemoteException made from passed <code>e</code>
1007      */
1008     private RemoteException createRemoteException(final ExceptionResponse e) {
1009       String innerExceptionClassName = e.getExceptionClassName();
1010       boolean doNotRetry = e.getDoNotRetry();
1011       return e.hasHostname()?
1012         // If a hostname then add it to the RemoteWithExtrasException
1013         new RemoteWithExtrasException(innerExceptionClassName,
1014           e.getStackTrace(), e.getHostname(), e.getPort(), doNotRetry):
1015         new RemoteWithExtrasException(innerExceptionClassName,
1016           e.getStackTrace(), doNotRetry);
1017     }
1018 
1019     protected synchronized boolean markClosed(IOException e) {
1020       if (e == null) throw new NullPointerException();
1021 
1022       boolean ret = shouldCloseConnection.compareAndSet(false, true);
1023       if (ret) {
1024         if (LOG.isTraceEnabled()) {
1025           LOG.trace(getName() + ": marking at should close, reason: " + e.getMessage());
1026         }
1027         if (callSender != null) {
1028           callSender.close();
1029         }
1030         notifyAll();
1031       }
1032       return ret;
1033     }
1034 
1035 
1036     /**
1037      * Cleanup the calls older than a given timeout, in milli seconds.
1038      * @param allCalls true for all calls, false for only the calls in timeout
1039      */
1040     protected synchronized void cleanupCalls(boolean allCalls) {
1041       Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator();
1042       while (itor.hasNext()) {
1043         Call c = itor.next().getValue();
1044         if (c.done) {
1045           // To catch the calls without timeout that were cancelled.
1046           itor.remove();
1047         } else if (allCalls) {
1048           long waitTime = EnvironmentEdgeManager.currentTime() - c.getStartTime();
1049           IOException ie = new ConnectionClosingException("Connection to " + getRemoteAddress()
1050               + " is closing. Call id=" + c.id + ", waitTime=" + waitTime);
1051           c.setException(ie);
1052           itor.remove();
1053         } else if (c.checkAndSetTimeout()) {
1054           itor.remove();
1055         } else {
1056           // We expect the call to be ordered by timeout. It may not be the case, but stopping
1057           //  at the first valid call allows to be sure that we still have something to do without
1058           //  spending too much time by reading the full list.
1059           break;
1060         }
1061       }
1062     }
1063   }
1064 
1065   /**
1066    * Construct an IPC cluster client whose values are of the {@link Message} class.
1067    * @param conf configuration
1068    * @param clusterId the cluster id
1069    * @param factory socket factory
1070    */
1071   RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory) {
1072     this(conf, clusterId, factory, null);
1073   }
1074 
1075   /**
1076    * Construct an IPC cluster client whose values are of the {@link Message} class.
1077    * @param conf configuration
1078    * @param clusterId the cluster id
1079    * @param factory socket factory
1080    * @param localAddr client socket bind address
1081    */
1082   RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory,
1083       SocketAddress localAddr) {
1084     super(conf, clusterId, localAddr);
1085 
1086     this.socketFactory = factory;
1087     this.connections = new PoolMap<ConnectionId, Connection>(getPoolType(conf), getPoolSize(conf));
1088     this.failedServers = new FailedServers(conf);
1089   }
1090 
1091   /**
1092    * Construct an IPC client for the cluster <code>clusterId</code> with the default SocketFactory
1093    * @param conf configuration
1094    * @param clusterId the cluster id
1095    */
1096   public RpcClientImpl(Configuration conf, String clusterId) {
1097     this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null);
1098   }
1099 
1100   /**
1101    * Construct an IPC client for the cluster <code>clusterId</code> with the default SocketFactory
1102    *
1103    * This method is called with reflection by the RpcClientFactory to create an instance
1104    *
1105    * @param conf configuration
1106    * @param clusterId the cluster id
1107    * @param localAddr client socket bind address.
1108    */
1109   public RpcClientImpl(Configuration conf, String clusterId, SocketAddress localAddr) {
1110     this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), localAddr);
1111   }
1112 
1113   /** Stop all threads related to this client.  No further calls may be made
1114    * using this client. */
1115   @Override
1116   public void close() {
1117     if (LOG.isDebugEnabled()) LOG.debug("Stopping rpc client");
1118     if (!running.compareAndSet(true, false)) return;
1119 
1120     Set<Connection> connsToClose = null;
1121     // wake up all connections
1122     synchronized (connections) {
1123       for (Connection conn : connections.values()) {
1124         conn.interrupt();
1125         if (conn.callSender != null) {
1126           conn.callSender.interrupt();
1127         }
1128 
1129         // In case the CallSender did not setupIOStreams() yet, the Connection may not be started
1130         // at all (if CallSender has a cancelled Call it can happen). See HBASE-13851
1131         if (!conn.isAlive()) {
1132           if (connsToClose == null) {
1133             connsToClose = new HashSet<Connection>();
1134           }
1135           connsToClose.add(conn);
1136         }
1137       }
1138     }
1139     if (connsToClose != null) {
1140       for (Connection conn : connsToClose) {
1141         conn.markClosed(new InterruptedIOException("RpcClient is closing"));
1142         conn.close();
1143       }
1144     }
1145 
1146     // wait until all connections are closed
1147     while (!connections.isEmpty()) {
1148       try {
1149         Thread.sleep(10);
1150       } catch (InterruptedException e) {
1151         LOG.info("Interrupted while stopping the client. We still have " + connections.size() +
1152             " connections.");
1153         Thread.currentThread().interrupt();
1154         return;
1155       }
1156     }
1157   }
1158 
1159   /** Make a call, passing <code>param</code>, to the IPC server running at
1160    * <code>address</code> which is servicing the <code>protocol</code> protocol,
1161    * with the <code>ticket</code> credentials, returning the value.
1162    * Throws exceptions if there are network problems or if the remote code
1163    * threw an exception.
1164    * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
1165    *          {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
1166    *          new Connection each time.
1167    * @return A pair with the Message response and the Cell data (if any).
1168    * @throws InterruptedException
1169    * @throws IOException
1170    */
1171   @Override
1172   protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, MethodDescriptor md,
1173       Message param, Message returnType, User ticket, InetSocketAddress addr)
1174       throws IOException, InterruptedException {
1175     if (pcrc == null) {
1176       pcrc = new PayloadCarryingRpcController();
1177     }
1178     CellScanner cells = pcrc.cellScanner();
1179 
1180     final Call call = new Call(this.callIdCnt.getAndIncrement(), md, param, cells, returnType,
1181         pcrc.getCallTimeout());
1182 
1183     final Connection connection = getConnection(ticket, call, addr);
1184 
1185     final CallFuture cts;
1186     if (connection.callSender != null) {
1187       cts = connection.callSender.sendCall(call, pcrc.getPriority(), Trace.currentSpan());
1188         pcrc.notifyOnCancel(new RpcCallback<Object>() {
1189           @Override
1190           public void run(Object parameter) {
1191             connection.callSender.remove(cts);
1192           }
1193         });
1194         if (pcrc.isCanceled()) {
1195           // To finish if the call was cancelled before we set the notification (race condition)
1196           call.callComplete();
1197           return new Pair<Message, CellScanner>(call.response, call.cells);
1198         }
1199     } else {
1200       cts = null;
1201       connection.tracedWriteRequest(call, pcrc.getPriority(), Trace.currentSpan());
1202     }
1203 
1204     while (!call.done) {
1205       if (call.checkAndSetTimeout()) {
1206         if (cts != null) connection.callSender.remove(cts);
1207         break;
1208       }
1209       if (connection.shouldCloseConnection.get()) {
1210         throw new ConnectionClosingException("Call id=" + call.id +
1211             " on server " + addr + " aborted: connection is closing");
1212       }
1213       try {
1214         synchronized (call) {
1215           if (call.done) break;
1216           call.wait(Math.min(call.remainingTime(), 1000) + 1);
1217         }
1218       } catch (InterruptedException e) {
1219         call.setException(new InterruptedIOException());
1220         if (cts != null) connection.callSender.remove(cts);
1221         throw e;
1222       }
1223     }
1224 
1225     if (call.error != null) {
1226       if (call.error instanceof RemoteException) {
1227         call.error.fillInStackTrace();
1228         throw call.error;
1229       }
1230       // local exception
1231       throw wrapException(addr, call.error);
1232     }
1233 
1234     return new Pair<Message, CellScanner>(call.response, call.cells);
1235   }
1236 
1237 
1238   /**
1239    * Take an IOException and the address we were trying to connect to
1240    * and return an IOException with the input exception as the cause.
1241    * The new exception provides the stack trace of the place where
1242    * the exception is thrown and some extra diagnostics information.
1243    * If the exception is ConnectException or SocketTimeoutException,
1244    * return a new one of the same type; Otherwise return an IOException.
1245    *
1246    * @param addr target address
1247    * @param exception the relevant exception
1248    * @return an exception to throw
1249    */
1250   protected IOException wrapException(InetSocketAddress addr,
1251                                          IOException exception) {
1252     if (exception instanceof ConnectException) {
1253       //connection refused; include the host:port in the error
1254       return (ConnectException)new ConnectException(
1255          "Call to " + addr + " failed on connection exception: " + exception).initCause(exception);
1256     } else if (exception instanceof SocketTimeoutException) {
1257       return (SocketTimeoutException)new SocketTimeoutException("Call to " + addr +
1258         " failed because " + exception).initCause(exception);
1259     } else if (exception instanceof ConnectionClosingException){
1260       return (ConnectionClosingException) new ConnectionClosingException(
1261           "Call to " + addr + " failed on local exception: " + exception).initCause(exception);
1262     } else {
1263       return (IOException)new IOException("Call to " + addr + " failed on local exception: " +
1264         exception).initCause(exception);
1265     }
1266   }
1267 
1268   /**
1269    * Interrupt the connections to the given ip:port server. This should be called if the server
1270    *  is known as actually dead. This will not prevent current operation to be retried, and,
1271    *  depending on their own behavior, they may retry on the same server. This can be a feature,
1272    *  for example at startup. In any case, they're likely to get connection refused (if the
1273    *  process died) or no route to host: i.e. their next retries should be faster and with a
1274    *  safe exception.
1275    */
1276   @Override
1277   public void cancelConnections(ServerName sn) {
1278     synchronized (connections) {
1279       for (Connection connection : connections.values()) {
1280         if (connection.isAlive() &&
1281             connection.getRemoteAddress().getPort() == sn.getPort() &&
1282             connection.getRemoteAddress().getHostName().equals(sn.getHostname())) {
1283           LOG.info("The server on " + sn.toString() +
1284               " is dead - stopping the connection " + connection.remoteId);
1285           connection.interrupt(); // We're interrupting a Reader. It means we want it to finish.
1286                                   // This will close the connection as well.
1287         }
1288       }
1289     }
1290   }
1291 
1292   /**
1293    *  Get a connection from the pool, or create a new one and add it to the
1294    * pool. Connections to a given host/port are reused.
1295    */
1296   protected Connection getConnection(User ticket, Call call, InetSocketAddress addr)
1297   throws IOException {
1298     if (!running.get()) throw new StoppedRpcClientException();
1299     Connection connection;
1300     ConnectionId remoteId =
1301       new ConnectionId(ticket, call.md.getService().getName(), addr);
1302     synchronized (connections) {
1303       connection = connections.get(remoteId);
1304       if (connection == null) {
1305         connection = createConnection(remoteId, this.codec, this.compressor);
1306         connections.put(remoteId, connection);
1307       }
1308     }
1309 
1310     return connection;
1311   }
1312 }