View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.ipc;
20  
21  import com.google.common.annotations.VisibleForTesting;
22  import com.google.protobuf.BlockingRpcChannel;
23  import com.google.protobuf.Descriptors;
24  import com.google.protobuf.Message;
25  import com.google.protobuf.RpcController;
26  import com.google.protobuf.ServiceException;
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.CellScanner;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.ServerName;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  import org.apache.hadoop.hbase.codec.Codec;
35  import org.apache.hadoop.hbase.codec.KeyValueCodec;
36  import org.apache.hadoop.hbase.security.User;
37  import org.apache.hadoop.hbase.security.UserProvider;
38  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
39  import org.apache.hadoop.hbase.util.Pair;
40  import org.apache.hadoop.hbase.util.PoolMap;
41  import org.apache.hadoop.io.compress.CompressionCodec;
42  
43  import java.io.IOException;
44  import java.net.InetSocketAddress;
45  import java.net.SocketAddress;
46  
47  /**
48   * Provides the basics for a RpcClient implementation like configuration and Logging.
49   */
50  @InterfaceAudience.Private
51  public abstract class AbstractRpcClient implements RpcClient {
52    public static final Log LOG = LogFactory.getLog(AbstractRpcClient.class);
53  
54    protected final Configuration conf;
55    protected String clusterId;
56    protected final SocketAddress localAddr;
57  
58    protected UserProvider userProvider;
59    protected final IPCUtil ipcUtil;
60  
61    protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this
62    // time (in ms), it will be closed at any moment.
63    protected final int maxRetries; //the max. no. of retries for socket connections
64    protected final long failureSleep; // Time to sleep before retry on failure.
65    protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
66    protected final boolean tcpKeepAlive; // if T then use keepalives
67    protected final Codec codec;
68    protected final CompressionCodec compressor;
69    protected final boolean fallbackAllowed;
70  
71    protected final int connectTO;
72    protected final int readTO;
73    protected final int writeTO;
74  
75    /**
76     * Construct an IPC client for the cluster <code>clusterId</code>
77     *
78     * @param conf configuration
79     * @param clusterId the cluster id
80     * @param localAddr client socket bind address.
81     */
82    public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr) {
83      this.userProvider = UserProvider.instantiate(conf);
84      this.localAddr = localAddr;
85      this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
86      this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT;
87      this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
88          HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
89      this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
90      this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
91      this.ipcUtil = new IPCUtil(conf);
92  
93      this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes
94      this.conf = conf;
95      this.codec = getCodec();
96      this.compressor = getCompressor(conf);
97      this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
98          IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
99      this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
100     this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
101     this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
102 
103     // login the server principal (if using secure Hadoop)
104     if (LOG.isDebugEnabled()) {
105       LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor +
106           ", tcpKeepAlive=" + this.tcpKeepAlive +
107           ", tcpNoDelay=" + this.tcpNoDelay +
108           ", connectTO=" + this.connectTO +
109           ", readTO=" + this.readTO +
110           ", writeTO=" + this.writeTO +
111           ", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose +
112           ", maxRetries=" + this.maxRetries +
113           ", fallbackAllowed=" + this.fallbackAllowed +
114           ", bind address=" + (this.localAddr != null ? this.localAddr : "null"));
115     }
116   }
117 
118   @VisibleForTesting
119   public static String getDefaultCodec(final Configuration c) {
120     // If "hbase.client.default.rpc.codec" is empty string -- you can't set it to null because
121     // Configuration will complain -- then no default codec (and we'll pb everything).  Else
122     // default is KeyValueCodec
123     return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName());
124   }
125 
126   /**
127    * Encapsulate the ugly casting and RuntimeException conversion in private method.
128    * @return Codec to use on this client.
129    */
130   Codec getCodec() {
131     // For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND
132     // "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding.
133     String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
134     if (className == null || className.length() == 0) return null;
135     try {
136       return (Codec)Class.forName(className).newInstance();
137     } catch (Exception e) {
138       throw new RuntimeException("Failed getting codec " + className, e);
139     }
140   }
141 
142   @Override
143   public boolean hasCellBlockSupport() {
144     return this.codec != null;
145   }
146 
147   /**
148    * Encapsulate the ugly casting and RuntimeException conversion in private method.
149    * @param conf configuration
150    * @return The compressor to use on this client.
151    */
152   private static CompressionCodec getCompressor(final Configuration conf) {
153     String className = conf.get("hbase.client.rpc.compressor", null);
154     if (className == null || className.isEmpty()) return null;
155     try {
156         return (CompressionCodec)Class.forName(className).newInstance();
157     } catch (Exception e) {
158       throw new RuntimeException("Failed getting compressor " + className, e);
159     }
160   }
161 
162   /**
163    * Return the pool type specified in the configuration, which must be set to
164    * either {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
165    * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal},
166    * otherwise default to the former.
167    *
168    * For applications with many user threads, use a small round-robin pool. For
169    * applications with few user threads, you may want to try using a
170    * thread-local pool. In any case, the number of {@link org.apache.hadoop.hbase.ipc.RpcClient}
171    * instances should not exceed the operating system's hard limit on the number of
172    * connections.
173    *
174    * @param config configuration
175    * @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
176    *         {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}
177    */
178   protected static PoolMap.PoolType getPoolType(Configuration config) {
179     return PoolMap.PoolType
180         .valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), PoolMap.PoolType.RoundRobin,
181             PoolMap.PoolType.ThreadLocal);
182   }
183 
184   /**
185    * Return the pool size specified in the configuration, which is applicable only if
186    * the pool type is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}.
187    *
188    * @param config configuration
189    * @return the maximum pool size
190    */
191   protected static int getPoolSize(Configuration config) {
192     return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
193   }
194 
195   /**
196    * Make a blocking call. Throws exceptions if there are network problems or if the remote code
197    * threw an exception.
198    *
199    * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
200    *               {@link UserProvider#getCurrent()} makes a new instance of User each time so
201    *               will be a
202    *               new Connection each time.
203    * @return A pair with the Message response and the Cell data (if any).
204    */
205   Message callBlockingMethod(Descriptors.MethodDescriptor md, PayloadCarryingRpcController pcrc,
206       Message param, Message returnType, final User ticket, final InetSocketAddress isa)
207       throws ServiceException {
208     if (pcrc == null) {
209       pcrc = new PayloadCarryingRpcController();
210     }
211 
212     long startTime = 0;
213     if (LOG.isTraceEnabled()) {
214       startTime = EnvironmentEdgeManager.currentTime();
215     }
216     Pair<Message, CellScanner> val;
217     try {
218       val = call(pcrc, md, param, returnType, ticket, isa);
219       // Shove the results into controller so can be carried across the proxy/pb service void.
220       pcrc.setCellScanner(val.getSecond());
221 
222       if (LOG.isTraceEnabled()) {
223         long callTime = EnvironmentEdgeManager.currentTime() - startTime;
224         LOG.trace("Call: " + md.getName() + ", callTime: " + callTime + "ms");
225       }
226       return val.getFirst();
227     } catch (Throwable e) {
228       throw new ServiceException(e);
229     }
230   }
231 
232   /**
233    * Make a call, passing <code>param</code>, to the IPC server running at
234    * <code>address</code> which is servicing the <code>protocol</code> protocol,
235    * with the <code>ticket</code> credentials, returning the value.
236    * Throws exceptions if there are network problems or if the remote code
237    * threw an exception.
238    *
239    * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
240    *               {@link UserProvider#getCurrent()} makes a new instance of User each time so
241    *               will be a
242    *               new Connection each time.
243    * @return A pair with the Message response and the Cell data (if any).
244    * @throws InterruptedException
245    * @throws java.io.IOException
246    */
247   protected abstract Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
248       Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
249       InetSocketAddress isa) throws IOException, InterruptedException;
250 
251   @Override
252   public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket,
253       int defaultOperationTimeout) {
254     return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout);
255   }
256 
257   /**
258    * Blocking rpc channel that goes via hbase rpc.
259    */
260   @VisibleForTesting
261   public static class BlockingRpcChannelImplementation implements BlockingRpcChannel {
262     private final InetSocketAddress isa;
263     private final AbstractRpcClient rpcClient;
264     private final User ticket;
265     private final int channelOperationTimeout;
266 
267     /**
268      * @param channelOperationTimeout - the default timeout when no timeout is given
269      */
270     protected BlockingRpcChannelImplementation(final AbstractRpcClient rpcClient,
271         final ServerName sn, final User ticket, int channelOperationTimeout) {
272       this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
273       this.rpcClient = rpcClient;
274       this.ticket = ticket;
275       this.channelOperationTimeout = channelOperationTimeout;
276     }
277 
278     @Override
279     public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
280         Message param, Message returnType) throws ServiceException {
281       PayloadCarryingRpcController pcrc;
282       if (controller != null && controller instanceof PayloadCarryingRpcController) {
283         pcrc = (PayloadCarryingRpcController) controller;
284         if (!pcrc.hasCallTimeout()) {
285           pcrc.setCallTimeout(channelOperationTimeout);
286         }
287       } else {
288         pcrc = new PayloadCarryingRpcController();
289         pcrc.setCallTimeout(channelOperationTimeout);
290       }
291 
292       return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa);
293     }
294   }
295 }