View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.lang.reflect.UndeclaredThrowableException;
25  import java.net.InetAddress;
26  import java.net.InetSocketAddress;
27  import java.util.ArrayList;
28  import java.util.Date;
29  import java.util.HashSet;
30  import java.util.LinkedHashMap;
31  import java.util.List;
32  import java.util.concurrent.BlockingQueue;
33  import java.util.Map;
34  import java.util.Map.Entry;
35  import java.util.NavigableMap;
36  import java.util.Set;
37  import java.util.concurrent.ConcurrentHashMap;
38  import java.util.concurrent.ConcurrentMap;
39  import java.util.concurrent.ExecutorService;
40  import java.util.concurrent.LinkedBlockingQueue;
41  import java.util.concurrent.ThreadPoolExecutor;
42  import java.util.concurrent.TimeUnit;
43  import java.util.concurrent.atomic.AtomicBoolean;
44  import java.util.concurrent.atomic.AtomicInteger;
45  
46  import org.apache.commons.logging.Log;
47  import org.apache.commons.logging.LogFactory;
48  import org.apache.hadoop.conf.Configuration;
49  import org.apache.hadoop.hbase.DoNotRetryIOException;
50  import org.apache.hadoop.hbase.HBaseConfiguration;
51  import org.apache.hadoop.hbase.HConstants;
52  import org.apache.hadoop.hbase.HRegionInfo;
53  import org.apache.hadoop.hbase.HRegionLocation;
54  import org.apache.hadoop.hbase.HTableDescriptor;
55  import org.apache.hadoop.hbase.MasterNotRunningException;
56  import org.apache.hadoop.hbase.MetaTableAccessor;
57  import org.apache.hadoop.hbase.RegionLocations;
58  import org.apache.hadoop.hbase.RegionTooBusyException;
59  import org.apache.hadoop.hbase.ServerName;
60  import org.apache.hadoop.hbase.TableName;
61  import org.apache.hadoop.hbase.TableNotEnabledException;
62  import org.apache.hadoop.hbase.TableNotFoundException;
63  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
64  import org.apache.hadoop.hbase.classification.InterfaceAudience;
65  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
66  import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
67  import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
68  import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
69  import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
70  import org.apache.hadoop.hbase.client.coprocessor.Batch;
71  import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
72  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
73  import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
74  import org.apache.hadoop.hbase.ipc.RpcClient;
75  import org.apache.hadoop.hbase.ipc.RpcClientFactory;
76  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
77  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
78  import org.apache.hadoop.hbase.protobuf.RequestConverter;
79  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
80  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
81  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
82  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
83  import org.apache.hadoop.hbase.protobuf.generated.*;
84  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
85  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse;
86  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
87  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionResponse;
88  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest;
89  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceResponse;
90  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest;
91  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceResponse;
92  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest;
93  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableResponse;
94  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnRequest;
95  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnResponse;
96  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
97  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
98  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
99  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
100 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableRequest;
101 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableResponse;
102 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableRequest;
103 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableResponse;
104 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest;
105 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsResponse;
106 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
107 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
108 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableRequest;
109 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableResponse;
110 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest;
111 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse;
112 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusRequest;
113 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusResponse;
114 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
115 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
116 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
117 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
118 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
119 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
120 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
121 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
122 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesRequest;
123 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesResponse;
124 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultRequest;
125 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultResponse;
126 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
127 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
128 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
129 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
130 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
131 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse;
132 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
133 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
134 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
135 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
136 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneRequest;
137 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneResponse;
138 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
139 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
140 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
141 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
142 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
143 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
144 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
145 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
146 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
147 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
148 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
149 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
150 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnRequest;
151 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnResponse;
152 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
153 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
154 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableRequest;
155 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableResponse;
156 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionRequest;
157 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionResponse;
158 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.NormalizeRequest;
159 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.NormalizeResponse;
160 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionRequest;
161 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionResponse;
162 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
163 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
164 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanRequest;
165 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse;
166 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
167 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
168 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
169 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
170 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaRequest;
171 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaResponse;
172 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownRequest;
173 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownResponse;
174 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest;
175 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse;
176 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterRequest;
177 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterResponse;
178 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableRequest;
179 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableResponse;
180 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest;
181 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionResponse;
182 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
183 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
184 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
185 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
186 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
187 import org.apache.hadoop.hbase.security.User;
188 import org.apache.hadoop.hbase.security.UserProvider;
189 import org.apache.hadoop.hbase.util.Bytes;
190 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
191 import org.apache.hadoop.hbase.util.ExceptionUtil;
192 import org.apache.hadoop.hbase.util.Threads;
193 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
194 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
195 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
196 import org.apache.hadoop.ipc.RemoteException;
197 import org.apache.zookeeper.KeeperException;
198 
199 import com.google.common.annotations.VisibleForTesting;
200 import com.google.protobuf.BlockingRpcChannel;
201 import com.google.protobuf.RpcController;
202 import com.google.protobuf.ServiceException;
203 
204 /**
205  * An internal, non-instantiable class that manages creation of {@link HConnection}s.
206  */
207 @SuppressWarnings("serial")
208 @InterfaceAudience.Private
209 // NOTE: DO NOT make this class public. It was made package-private on purpose.
210 class ConnectionManager {
211   static final Log LOG = LogFactory.getLog(ConnectionManager.class);
212 
213   public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
214   private static final String CLIENT_NONCES_ENABLED_KEY = "hbase.client.nonces.enabled";
215 
216   // An LRU Map of HConnectionKey -> HConnection (TableServer).  All
217   // access must be synchronized.  This map is not private because tests
218   // need to be able to tinker with it.
219   static final Map<HConnectionKey, HConnectionImplementation> CONNECTION_INSTANCES;
220 
221   public static final int MAX_CACHED_CONNECTION_INSTANCES;
222 
223   /**
224    * Global nonceGenerator shared per client.Currently there's no reason to limit its scope.
225    * Once it's set under nonceGeneratorCreateLock, it is never unset or changed.
226    */
227   private static volatile NonceGenerator nonceGenerator = null;
228   /** The nonce generator lock. Only taken when creating HConnection, which gets a private copy. */
229   private static Object nonceGeneratorCreateLock = new Object();
230 
231   static {
232     // We set instances to one more than the value specified for {@link
233     // HConstants#ZOOKEEPER_MAX_CLIENT_CNXNS}. By default, the zk default max
234     // connections to the ensemble from the one client is 30, so in that case we
235     // should run into zk issues before the LRU hit this value of 31.
236     MAX_CACHED_CONNECTION_INSTANCES = HBaseConfiguration.create().getInt(
237       HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS) + 1;
238     CONNECTION_INSTANCES = new LinkedHashMap<HConnectionKey, HConnectionImplementation>(
239         (int) (MAX_CACHED_CONNECTION_INSTANCES / 0.75F) + 1, 0.75F, true) {
240       @Override
241       protected boolean removeEldestEntry(
242           Map.Entry<HConnectionKey, HConnectionImplementation> eldest) {
243          return size() > MAX_CACHED_CONNECTION_INSTANCES;
244        }
245     };
246   }
247 
248   /** Dummy nonce generator for disabled nonces. */
249   static class NoNonceGenerator implements NonceGenerator {
250     @Override
251     public long getNonceGroup() {
252       return HConstants.NO_NONCE;
253     }
254     @Override
255     public long newNonce() {
256       return HConstants.NO_NONCE;
257     }
258   }
259 
260   /*
261    * Non-instantiable.
262    */
263   private ConnectionManager() {
264     super();
265   }
266 
267   /**
268    * @param conn The connection for which to replace the generator.
269    * @param cnm Replaces the nonce generator used, for testing.
270    * @return old nonce generator.
271    */
272   @VisibleForTesting
273   static NonceGenerator injectNonceGeneratorForTesting(
274       ClusterConnection conn, NonceGenerator cnm) {
275     HConnectionImplementation connImpl = (HConnectionImplementation)conn;
276     NonceGenerator ng = connImpl.getNonceGenerator();
277     LOG.warn("Nonce generator is being replaced by test code for " + cnm.getClass().getName());
278     connImpl.nonceGenerator = cnm;
279     return ng;
280   }
281 
282   /**
283    * Get the connection that goes with the passed <code>conf</code> configuration instance.
284    * If no current connection exists, method creates a new connection and keys it using
285    * connection-specific properties from the passed {@link Configuration}; see
286    * {@link HConnectionKey}.
287    * @param conf configuration
288    * @return HConnection object for <code>conf</code>
289    * @throws ZooKeeperConnectionException
290    */
291   @Deprecated
292   public static HConnection getConnection(final Configuration conf) throws IOException {
293     return getConnectionInternal(conf);
294   }
295 
296 
297   static ClusterConnection getConnectionInternal(final Configuration conf)
298     throws IOException {
299     HConnectionKey connectionKey = new HConnectionKey(conf);
300     synchronized (CONNECTION_INSTANCES) {
301       HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
302       if (connection == null) {
303         connection = (HConnectionImplementation)createConnection(conf, true);
304         CONNECTION_INSTANCES.put(connectionKey, connection);
305       } else if (connection.isClosed()) {
306         ConnectionManager.deleteConnection(connectionKey, true);
307         connection = (HConnectionImplementation)createConnection(conf, true);
308         CONNECTION_INSTANCES.put(connectionKey, connection);
309       }
310       connection.incCount();
311       return connection;
312     }
313   }
314 
315   /**
316    * Create a new HConnection instance using the passed <code>conf</code> instance.
317    * <p>Note: This bypasses the usual HConnection life cycle management done by
318    * {@link #getConnection(Configuration)}. The caller is responsible for
319    * calling {@link HConnection#close()} on the returned connection instance.
320    *
321    * This is the recommended way to create HConnections.
322    * {@code
323    * HConnection connection = ConnectionManagerInternal.createConnection(conf);
324    * HTableInterface table = connection.getTable("mytable");
325    * table.get(...);
326    * ...
327    * table.close();
328    * connection.close();
329    * }
330    *
331    * @param conf configuration
332    * @return HConnection object for <code>conf</code>
333    * @throws ZooKeeperConnectionException
334    */
335   public static HConnection createConnection(Configuration conf) throws IOException {
336     return createConnectionInternal(conf);
337   }
338 
339   static ClusterConnection createConnectionInternal(Configuration conf) throws IOException {
340     UserProvider provider = UserProvider.instantiate(conf);
341     return createConnection(conf, false, null, provider.getCurrent());
342   }
343 
344   /**
345    * Create a new HConnection instance using the passed <code>conf</code> instance.
346    * <p>Note: This bypasses the usual HConnection life cycle management done by
347    * {@link #getConnection(Configuration)}. The caller is responsible for
348    * calling {@link HConnection#close()} on the returned connection instance.
349    * This is the recommended way to create HConnections.
350    * {@code
351    * ExecutorService pool = ...;
352    * HConnection connection = HConnectionManager.createConnection(conf, pool);
353    * HTableInterface table = connection.getTable("mytable");
354    * table.get(...);
355    * ...
356    * table.close();
357    * connection.close();
358    * }
359    * @param conf configuration
360    * @param pool the thread pool to use for batch operation in HTables used via this HConnection
361    * @return HConnection object for <code>conf</code>
362    * @throws ZooKeeperConnectionException
363    */
364   public static HConnection createConnection(Configuration conf, ExecutorService pool)
365   throws IOException {
366     UserProvider provider = UserProvider.instantiate(conf);
367     return createConnection(conf, false, pool, provider.getCurrent());
368   }
369 
370   /**
371    * Create a new HConnection instance using the passed <code>conf</code> instance.
372    * <p>Note: This bypasses the usual HConnection life cycle management done by
373    * {@link #getConnection(Configuration)}. The caller is responsible for
374    * calling {@link HConnection#close()} on the returned connection instance.
375    * This is the recommended way to create HConnections.
376    * {@code
377    * ExecutorService pool = ...;
378    * HConnection connection = HConnectionManager.createConnection(conf, pool);
379    * HTableInterface table = connection.getTable("mytable");
380    * table.get(...);
381    * ...
382    * table.close();
383    * connection.close();
384    * }
385    * @param conf configuration
386    * @param user the user the connection is for
387    * @return HConnection object for <code>conf</code>
388    * @throws ZooKeeperConnectionException
389    */
390   public static HConnection createConnection(Configuration conf, User user)
391   throws IOException {
392     return createConnection(conf, false, null, user);
393   }
394 
395   /**
396    * Create a new HConnection instance using the passed <code>conf</code> instance.
397    * <p>Note: This bypasses the usual HConnection life cycle management done by
398    * {@link #getConnection(Configuration)}. The caller is responsible for
399    * calling {@link HConnection#close()} on the returned connection instance.
400    * This is the recommended way to create HConnections.
401    * {@code
402    * ExecutorService pool = ...;
403    * HConnection connection = HConnectionManager.createConnection(conf, pool);
404    * HTableInterface table = connection.getTable("mytable");
405    * table.get(...);
406    * ...
407    * table.close();
408    * connection.close();
409    * }
410    * @param conf configuration
411    * @param pool the thread pool to use for batch operation in HTables used via this HConnection
412    * @param user the user the connection is for
413    * @return HConnection object for <code>conf</code>
414    * @throws ZooKeeperConnectionException
415    */
416   public static HConnection createConnection(Configuration conf, ExecutorService pool, User user)
417   throws IOException {
418     return createConnection(conf, false, pool, user);
419   }
420 
421   @Deprecated
422   static HConnection createConnection(final Configuration conf, final boolean managed)
423       throws IOException {
424     UserProvider provider = UserProvider.instantiate(conf);
425     return createConnection(conf, managed, null, provider.getCurrent());
426   }
427 
428   @Deprecated
429   static ClusterConnection createConnection(final Configuration conf, final boolean managed,
430       final ExecutorService pool, final User user)
431   throws IOException {
432     return (ClusterConnection) ConnectionFactory.createConnection(conf, managed, pool, user);
433   }
434 
435   /**
436    * Delete connection information for the instance specified by passed configuration.
437    * If there are no more references to the designated connection connection, this method will
438    * then close connection to the zookeeper ensemble and let go of all associated resources.
439    *
440    * @param conf configuration whose identity is used to find {@link HConnection} instance.
441    * @deprecated
442    */
443   @Deprecated
444   public static void deleteConnection(Configuration conf) {
445     deleteConnection(new HConnectionKey(conf), false);
446   }
447 
448   /**
449    * Cleanup a known stale connection.
450    * This will then close connection to the zookeeper ensemble and let go of all resources.
451    *
452    * @param connection
453    * @deprecated
454    */
455   @Deprecated
456   public static void deleteStaleConnection(HConnection connection) {
457     deleteConnection(connection, true);
458   }
459 
460   /**
461    * Delete information for all connections. Close or not the connection, depending on the
462    *  staleConnection boolean and the ref count. By default, you should use it with
463    *  staleConnection to true.
464    * @deprecated
465    */
466   @Deprecated
467   public static void deleteAllConnections(boolean staleConnection) {
468     synchronized (CONNECTION_INSTANCES) {
469       Set<HConnectionKey> connectionKeys = new HashSet<HConnectionKey>();
470       connectionKeys.addAll(CONNECTION_INSTANCES.keySet());
471       for (HConnectionKey connectionKey : connectionKeys) {
472         deleteConnection(connectionKey, staleConnection);
473       }
474       CONNECTION_INSTANCES.clear();
475     }
476   }
477 
478   /**
479    * Delete information for all connections..
480    * @deprecated kept for backward compatibility, but the behavior is broken. HBASE-8983
481    */
482   @Deprecated
483   public static void deleteAllConnections() {
484     deleteAllConnections(false);
485   }
486 
487 
488   @Deprecated
489   private static void deleteConnection(HConnection connection, boolean staleConnection) {
490     synchronized (CONNECTION_INSTANCES) {
491       for (Entry<HConnectionKey, HConnectionImplementation> e: CONNECTION_INSTANCES.entrySet()) {
492         if (e.getValue() == connection) {
493           deleteConnection(e.getKey(), staleConnection);
494           break;
495         }
496       }
497     }
498   }
499 
500   @Deprecated
501   private static void deleteConnection(HConnectionKey connectionKey, boolean staleConnection) {
502     synchronized (CONNECTION_INSTANCES) {
503       HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
504       if (connection != null) {
505         connection.decCount();
506         if (connection.isZeroReference() || staleConnection) {
507           CONNECTION_INSTANCES.remove(connectionKey);
508           connection.internalClose();
509         }
510       } else {
511         LOG.error("Connection not found in the list, can't delete it "+
512           "(connection key=" + connectionKey + "). May be the key was modified?", new Exception());
513       }
514     }
515   }
516 
517 
518   /**
519    * This convenience method invokes the given {@link HConnectable#connect}
520    * implementation using a {@link HConnection} instance that lasts just for the
521    * duration of the invocation.
522    *
523    * @param <T> the return type of the connect method
524    * @param connectable the {@link HConnectable} instance
525    * @return the value returned by the connect method
526    * @throws IOException
527    */
528   @InterfaceAudience.Private
529   public static <T> T execute(HConnectable<T> connectable) throws IOException {
530     if (connectable == null || connectable.conf == null) {
531       return null;
532     }
533     Configuration conf = connectable.conf;
534     HConnection connection = getConnection(conf);
535     boolean connectSucceeded = false;
536     try {
537       T returnValue = connectable.connect(connection);
538       connectSucceeded = true;
539       return returnValue;
540     } finally {
541       try {
542         connection.close();
543       } catch (Exception e) {
544         ExceptionUtil.rethrowIfInterrupt(e);
545         if (connectSucceeded) {
546           throw new IOException("The connection to " + connection
547               + " could not be deleted.", e);
548         }
549       }
550     }
551   }
552 
553   /** Encapsulates connection to zookeeper and regionservers.*/
554   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
555       value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
556       justification="Access to the conncurrent hash map is under a lock so should be fine.")
557   static class HConnectionImplementation implements ClusterConnection, Closeable {
558     static final Log LOG = LogFactory.getLog(HConnectionImplementation.class);
559     private final long pause;
560     private boolean useMetaReplicas;
561     private final int metaReplicaCallTimeoutScanInMicroSecond;
562     private final int numTries;
563     final int rpcTimeout;
564     private NonceGenerator nonceGenerator = null;
565     private final AsyncProcess asyncProcess;
566     // single tracker per connection
567     private final ServerStatisticTracker stats;
568 
569     private volatile boolean closed;
570     private volatile boolean aborted;
571 
572     // package protected for the tests
573     ClusterStatusListener clusterStatusListener;
574 
575 
576     private final Object metaRegionLock = new Object();
577 
578     // We have a single lock for master & zk to prevent deadlocks. Having
579     //  one lock for ZK and one lock for master is not possible:
580     //  When creating a connection to master, we need a connection to ZK to get
581     //  its address. But another thread could have taken the ZK lock, and could
582     //  be waiting for the master lock => deadlock.
583     private final Object masterAndZKLock = new Object();
584 
585     private long keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
586 
587     // thread executor shared by all HTableInterface instances created
588     // by this connection
589     private volatile ExecutorService batchPool = null;
590     // meta thread executor shared by all HTableInterface instances created
591     // by this connection
592     private volatile ExecutorService metaLookupPool = null;
593     private volatile boolean cleanupPool = false;
594 
595     private final Configuration conf;
596 
597     // cache the configuration value for tables so that we can avoid calling
598     // the expensive Configuration to fetch the value multiple times.
599     private final ConnectionConfiguration connectionConfig;
600 
601     // Client rpc instance.
602     private RpcClient rpcClient;
603 
604     private MetaCache metaCache = new MetaCache();
605 
606     private int refCount;
607 
608     // indicates whether this connection's life cycle is managed (by us)
609     private boolean managed;
610 
611     private User user;
612 
613     private RpcRetryingCallerFactory rpcCallerFactory;
614 
615     private RpcControllerFactory rpcControllerFactory;
616 
617     private final RetryingCallerInterceptor interceptor;
618 
619     /**
620      * Cluster registry of basic info such as clusterid and meta region location.
621      */
622      Registry registry;
623 
624     private final ClientBackoffPolicy backoffPolicy;
625 
626      HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
627        this(conf, managed, null, null);
628      }
629 
630     /**
631      * constructor
632      * @param conf Configuration object
633      * @param managed If true, does not do full shutdown on close; i.e. cleanup of connection
634      * to zk and shutdown of all services; we just close down the resources this connection was
635      * responsible for and decrement usage counters.  It is up to the caller to do the full
636      * cleanup.  It is set when we want have connection sharing going on -- reuse of zk connection,
637      * and cached region locations, established regionserver connections, etc.  When connections
638      * are shared, we have reference counting going on and will only do full cleanup when no more
639      * users of an HConnectionImplementation instance.
640      */
641     HConnectionImplementation(Configuration conf, boolean managed,
642         ExecutorService pool, User user) throws IOException {
643       this(conf);
644       this.user = user;
645       this.batchPool = pool;
646       this.managed = managed;
647       this.registry = setupRegistry();
648       retrieveClusterId();
649 
650       this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId);
651       this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
652 
653       // Do we publish the status?
654       boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
655           HConstants.STATUS_PUBLISHED_DEFAULT);
656       Class<? extends ClusterStatusListener.Listener> listenerClass =
657           conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
658               ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
659               ClusterStatusListener.Listener.class);
660       if (shouldListen) {
661         if (listenerClass == null) {
662           LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
663               ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
664         } else {
665           clusterStatusListener = new ClusterStatusListener(
666               new ClusterStatusListener.DeadServerHandler() {
667                 @Override
668                 public void newDead(ServerName sn) {
669                   clearCaches(sn);
670                   rpcClient.cancelConnections(sn);
671                 }
672               }, conf, listenerClass);
673         }
674       }
675     }
676 
677     /**
678      * For tests.
679      */
680     protected HConnectionImplementation(Configuration conf) {
681       this.conf = conf;
682       this.connectionConfig = new ConnectionConfiguration(conf);
683       this.closed = false;
684       this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
685           HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
686       this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
687           HConstants.DEFAULT_USE_META_REPLICAS);
688       this.metaReplicaCallTimeoutScanInMicroSecond =
689           connectionConfig.getMetaReplicaCallTimeoutMicroSecondScan();
690 
691       this.numTries = connectionConfig.getRetriesNumber();
692       this.rpcTimeout = conf.getInt(
693           HConstants.HBASE_RPC_TIMEOUT_KEY,
694           HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
695       if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
696         synchronized (nonceGeneratorCreateLock) {
697           if (ConnectionManager.nonceGenerator == null) {
698             ConnectionManager.nonceGenerator = new PerClientRandomNonceGenerator();
699           }
700           this.nonceGenerator = ConnectionManager.nonceGenerator;
701         }
702       } else {
703         this.nonceGenerator = new NoNonceGenerator();
704       }
705       stats = ServerStatisticTracker.create(conf);
706       this.asyncProcess = createAsyncProcess(this.conf);
707       this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
708       this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
709       this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
710     }
711 
712     /**
713      * @param useMetaReplicas
714      */
715     @VisibleForTesting
716     void setUseMetaReplicas(final boolean useMetaReplicas) {
717       this.useMetaReplicas = useMetaReplicas;
718     }
719 
720     @Override
721     public HTableInterface getTable(String tableName) throws IOException {
722       return getTable(TableName.valueOf(tableName));
723     }
724 
725     @Override
726     public HTableInterface getTable(byte[] tableName) throws IOException {
727       return getTable(TableName.valueOf(tableName));
728     }
729 
730     @Override
731     public HTableInterface getTable(TableName tableName) throws IOException {
732       return getTable(tableName, getBatchPool());
733     }
734 
735     @Override
736     public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException {
737       return getTable(TableName.valueOf(tableName), pool);
738     }
739 
740     @Override
741     public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException {
742       return getTable(TableName.valueOf(tableName), pool);
743     }
744 
745     @Override
746     public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
747       if (managed) {
748         throw new NeedUnmanagedConnectionException();
749       }
750       return new HTable(tableName, this, connectionConfig, rpcCallerFactory, rpcControllerFactory, pool);
751     }
752 
753     @Override
754     public BufferedMutator getBufferedMutator(BufferedMutatorParams params) {
755       if (params.getTableName() == null) {
756         throw new IllegalArgumentException("TableName cannot be null.");
757       }
758       if (params.getPool() == null) {
759         params.pool(HTable.getDefaultExecutor(getConfiguration()));
760       }
761       if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) {
762         params.writeBufferSize(connectionConfig.getWriteBufferSize());
763       }
764       if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
765         params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
766       }
767       return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
768     }
769 
770     @Override
771     public BufferedMutator getBufferedMutator(TableName tableName) {
772       return getBufferedMutator(new BufferedMutatorParams(tableName));
773     }
774 
775     @Override
776     public RegionLocator getRegionLocator(TableName tableName) throws IOException {
777       return new HRegionLocator(tableName, this);
778     }
779 
780     @Override
781     public Admin getAdmin() throws IOException {
782       if (managed) {
783         throw new NeedUnmanagedConnectionException();
784       }
785       return new HBaseAdmin(this);
786     }
787 
788     private ExecutorService getBatchPool() {
789       if (batchPool == null) {
790         synchronized (this) {
791           if (batchPool == null) {
792             this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256),
793                 conf.getInt("hbase.hconnection.threads.core", 256), "-shared-", null);
794             this.cleanupPool = true;
795           }
796         }
797       }
798       return this.batchPool;
799     }
800 
801     private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint,
802         BlockingQueue<Runnable> passedWorkQueue) {
803       // shared HTable thread executor not yet initialized
804       if (maxThreads == 0) {
805         maxThreads = Runtime.getRuntime().availableProcessors() * 8;
806       }
807       if (coreThreads == 0) {
808         coreThreads = Runtime.getRuntime().availableProcessors() * 8;
809       }
810       long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
811       BlockingQueue<Runnable> workQueue = passedWorkQueue;
812       if (workQueue == null) {
813         workQueue =
814           new LinkedBlockingQueue<Runnable>(maxThreads *
815               conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
816                   HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
817       }
818       ThreadPoolExecutor tpe = new ThreadPoolExecutor(
819           coreThreads,
820           maxThreads,
821           keepAliveTime,
822           TimeUnit.SECONDS,
823           workQueue,
824           Threads.newDaemonThreadFactory(toString() + nameHint));
825       tpe.allowCoreThreadTimeOut(true);
826       return tpe;
827     }
828 
829     private ExecutorService getMetaLookupPool() {
830       if (this.metaLookupPool == null) {
831         synchronized (this) {
832           if (this.metaLookupPool == null) {
833             //Some of the threads would be used for meta replicas
834             //To start with, threads.max.core threads can hit the meta (including replicas).
835             //After that, requests will get queued up in the passed queue, and only after
836             //the queue is full, a new thread will be started
837             this.metaLookupPool = getThreadPool(
838                conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128),
839                conf.getInt("hbase.hconnection.meta.lookup.threads.core", 10),
840              "-metaLookup-shared-", new LinkedBlockingQueue<Runnable>());
841           }
842         }
843       }
844       return this.metaLookupPool;
845     }
846 
847     protected ExecutorService getCurrentMetaLookupPool() {
848       return metaLookupPool;
849     }
850 
851     protected ExecutorService getCurrentBatchPool() {
852       return batchPool;
853     }
854 
855     private void shutdownPools() {
856       if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) {
857         shutdownBatchPool(this.batchPool);
858       }
859       if (this.metaLookupPool != null && !this.metaLookupPool.isShutdown()) {
860         shutdownBatchPool(this.metaLookupPool);
861       }
862     }
863 
864     private void shutdownBatchPool(ExecutorService pool) {
865       pool.shutdown();
866       try {
867         if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
868           pool.shutdownNow();
869         }
870       } catch (InterruptedException e) {
871         pool.shutdownNow();
872       }
873     }
874 
875     /**
876      * @return The cluster registry implementation to use.
877      * @throws IOException
878      */
879     private Registry setupRegistry() throws IOException {
880       return RegistryFactory.getRegistry(this);
881     }
882 
883     /**
884      * For tests only.
885      */
886     @VisibleForTesting
887     RpcClient getRpcClient() {
888       return rpcClient;
889     }
890 
891     /**
892      * An identifier that will remain the same for a given connection.
893      */
894     @Override
895     public String toString(){
896       return "hconnection-0x" + Integer.toHexString(hashCode());
897     }
898 
899     protected String clusterId = null;
900 
901     void retrieveClusterId() {
902       if (clusterId != null) return;
903       this.clusterId = this.registry.getClusterId();
904       if (clusterId == null) {
905         clusterId = HConstants.CLUSTER_ID_DEFAULT;
906         LOG.debug("clusterid came back null, using default " + clusterId);
907       }
908     }
909 
910     @Override
911     public Configuration getConfiguration() {
912       return this.conf;
913     }
914 
915     private void checkIfBaseNodeAvailable(ZooKeeperWatcher zkw)
916       throws MasterNotRunningException {
917       String errorMsg;
918       try {
919         if (ZKUtil.checkExists(zkw, zkw.baseZNode) == -1) {
920           errorMsg = "The node " + zkw.baseZNode+" is not in ZooKeeper. "
921             + "It should have been written by the master. "
922             + "Check the value configured in 'zookeeper.znode.parent'. "
923             + "There could be a mismatch with the one configured in the master.";
924           LOG.error(errorMsg);
925           throw new MasterNotRunningException(errorMsg);
926         }
927       } catch (KeeperException e) {
928         errorMsg = "Can't get connection to ZooKeeper: " + e.getMessage();
929         LOG.error(errorMsg);
930         throw new MasterNotRunningException(errorMsg, e);
931       }
932     }
933 
934     /**
935      * @return true if the master is running, throws an exception otherwise
936      * @throws MasterNotRunningException - if the master is not running
937      * @throws ZooKeeperConnectionException
938      */
939     @Deprecated
940     @Override
941     public boolean isMasterRunning()
942     throws MasterNotRunningException, ZooKeeperConnectionException {
943       // When getting the master connection, we check it's running,
944       // so if there is no exception, it means we've been able to get a
945       // connection on a running master
946       MasterKeepAliveConnection m = getKeepAliveMasterService();
947       m.close();
948       return true;
949     }
950 
951     @Override
952     public HRegionLocation getRegionLocation(final TableName tableName,
953         final byte [] row, boolean reload)
954     throws IOException {
955       return reload? relocateRegion(tableName, row): locateRegion(tableName, row);
956     }
957 
958     @Override
959     public HRegionLocation getRegionLocation(final byte[] tableName,
960         final byte [] row, boolean reload)
961     throws IOException {
962       return getRegionLocation(TableName.valueOf(tableName), row, reload);
963     }
964 
965     @Override
966     public boolean isTableEnabled(TableName tableName) throws IOException {
967       return this.registry.isTableOnlineState(tableName, true);
968     }
969 
970     @Override
971     public boolean isTableEnabled(byte[] tableName) throws IOException {
972       return isTableEnabled(TableName.valueOf(tableName));
973     }
974 
975     @Override
976     public boolean isTableDisabled(TableName tableName) throws IOException {
977       return this.registry.isTableOnlineState(tableName, false);
978     }
979 
980     @Override
981     public boolean isTableDisabled(byte[] tableName) throws IOException {
982       return isTableDisabled(TableName.valueOf(tableName));
983     }
984 
985     @Override
986     public boolean isTableAvailable(final TableName tableName) throws IOException {
987       final AtomicBoolean available = new AtomicBoolean(true);
988       final AtomicInteger regionCount = new AtomicInteger(0);
989       MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
990         @Override
991         public boolean processRow(Result row) throws IOException {
992           HRegionInfo info = MetaScanner.getHRegionInfo(row);
993           if (info != null && !info.isSplitParent()) {
994             if (tableName.equals(info.getTable())) {
995               ServerName server = HRegionInfo.getServerName(row);
996               if (server == null) {
997                 available.set(false);
998                 return false;
999               }
1000               regionCount.incrementAndGet();
1001             } else if (tableName.compareTo(info.getTable()) < 0) {
1002               // Return if we are done with the current table
1003               return false;
1004             }
1005           }
1006           return true;
1007         }
1008       };
1009       MetaScanner.metaScan(this, visitor, tableName);
1010       return available.get() && (regionCount.get() > 0);
1011     }
1012 
1013     @Override
1014     public boolean isTableAvailable(final byte[] tableName) throws IOException {
1015       return isTableAvailable(TableName.valueOf(tableName));
1016     }
1017 
1018     @Override
1019     public boolean isTableAvailable(final TableName tableName, final byte[][] splitKeys)
1020         throws IOException {
1021       final AtomicBoolean available = new AtomicBoolean(true);
1022       final AtomicInteger regionCount = new AtomicInteger(0);
1023       MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
1024         @Override
1025         public boolean processRow(Result row) throws IOException {
1026           HRegionInfo info = MetaScanner.getHRegionInfo(row);
1027           if (info != null && !info.isSplitParent()) {
1028             if (tableName.equals(info.getTable())) {
1029               ServerName server = HRegionInfo.getServerName(row);
1030               if (server == null) {
1031                 available.set(false);
1032                 return false;
1033               }
1034               if (!Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
1035                 for (byte[] splitKey : splitKeys) {
1036                   // Just check if the splitkey is available
1037                   if (Bytes.equals(info.getStartKey(), splitKey)) {
1038                     regionCount.incrementAndGet();
1039                     break;
1040                   }
1041                 }
1042               } else {
1043                 // Always empty start row should be counted
1044                 regionCount.incrementAndGet();
1045               }
1046             } else if (tableName.compareTo(info.getTable()) < 0) {
1047               // Return if we are done with the current table
1048               return false;
1049             }
1050           }
1051           return true;
1052         }
1053       };
1054       MetaScanner.metaScan(this, visitor, tableName);
1055       // +1 needs to be added so that the empty start row is also taken into account
1056       return available.get() && (regionCount.get() == splitKeys.length + 1);
1057     }
1058 
1059     @Override
1060     public boolean isTableAvailable(final byte[] tableName, final byte[][] splitKeys)
1061         throws IOException {
1062       return isTableAvailable(TableName.valueOf(tableName), splitKeys);
1063     }
1064 
1065     @Override
1066     public HRegionLocation locateRegion(final byte[] regionName) throws IOException {
1067       RegionLocations locations = locateRegion(HRegionInfo.getTable(regionName),
1068         HRegionInfo.getStartKey(regionName), false, true);
1069       return locations == null ? null : locations.getRegionLocation();
1070     }
1071 
1072     @Override
1073     public boolean isDeadServer(ServerName sn) {
1074       if (clusterStatusListener == null) {
1075         return false;
1076       } else {
1077         return clusterStatusListener.isDeadServer(sn);
1078       }
1079     }
1080 
1081     @Override
1082     public List<HRegionLocation> locateRegions(final TableName tableName)
1083     throws IOException {
1084       return locateRegions (tableName, false, true);
1085     }
1086 
1087     @Override
1088     public List<HRegionLocation> locateRegions(final byte[] tableName)
1089     throws IOException {
1090       return locateRegions(TableName.valueOf(tableName));
1091     }
1092 
1093     @Override
1094     public List<HRegionLocation> locateRegions(final TableName tableName,
1095         final boolean useCache, final boolean offlined) throws IOException {
1096       NavigableMap<HRegionInfo, ServerName> regions = MetaScanner.allTableRegions(this, tableName);
1097       final List<HRegionLocation> locations = new ArrayList<HRegionLocation>();
1098       for (HRegionInfo regionInfo : regions.keySet()) {
1099         RegionLocations list = locateRegion(tableName, regionInfo.getStartKey(), useCache, true);
1100         if (list != null) {
1101           for (HRegionLocation loc : list.getRegionLocations()) {
1102             if (loc != null) {
1103               locations.add(loc);
1104             }
1105           }
1106         }
1107       }
1108       return locations;
1109     }
1110 
1111     @Override
1112     public List<HRegionLocation> locateRegions(final byte[] tableName,
1113        final boolean useCache, final boolean offlined) throws IOException {
1114       return locateRegions(TableName.valueOf(tableName), useCache, offlined);
1115     }
1116 
1117     @Override
1118     public HRegionLocation locateRegion(
1119         final TableName tableName, final byte[] row) throws IOException{
1120       RegionLocations locations = locateRegion(tableName, row, true, true);
1121       return locations == null ? null : locations.getRegionLocation();
1122     }
1123 
1124     @Override
1125     public HRegionLocation locateRegion(final byte[] tableName,
1126         final byte [] row)
1127     throws IOException{
1128       return locateRegion(TableName.valueOf(tableName), row);
1129     }
1130 
1131     @Override
1132     public HRegionLocation relocateRegion(final TableName tableName,
1133         final byte [] row) throws IOException{
1134       RegionLocations locations =  relocateRegion(tableName, row,
1135         RegionReplicaUtil.DEFAULT_REPLICA_ID);
1136       return locations == null ? null :
1137         locations.getRegionLocation(RegionReplicaUtil.DEFAULT_REPLICA_ID);
1138     }
1139 
1140     @Override
1141     public RegionLocations relocateRegion(final TableName tableName,
1142         final byte [] row, int replicaId) throws IOException{
1143       // Since this is an explicit request not to use any caching, finding
1144       // disabled tables should not be desirable.  This will ensure that an exception is thrown when
1145       // the first time a disabled table is interacted with.
1146       if (!tableName.equals(TableName.META_TABLE_NAME) && isTableDisabled(tableName)) {
1147         throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
1148       }
1149 
1150       return locateRegion(tableName, row, false, true, replicaId);
1151     }
1152 
1153     @Override
1154     public HRegionLocation relocateRegion(final byte[] tableName,
1155         final byte [] row) throws IOException {
1156       return relocateRegion(TableName.valueOf(tableName), row);
1157     }
1158 
1159     @Override
1160     public RegionLocations locateRegion(final TableName tableName,
1161       final byte [] row, boolean useCache, boolean retry)
1162     throws IOException {
1163       return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID);
1164     }
1165 
1166     @Override
1167     public RegionLocations locateRegion(final TableName tableName,
1168       final byte [] row, boolean useCache, boolean retry, int replicaId)
1169     throws IOException {
1170       if (this.closed) throw new DoNotRetryIOException(toString() + " closed");
1171       if (tableName== null || tableName.getName().length == 0) {
1172         throw new IllegalArgumentException(
1173             "table name cannot be null or zero length");
1174       }
1175       if (tableName.equals(TableName.META_TABLE_NAME)) {
1176         return locateMeta(tableName, useCache, replicaId);
1177       } else {
1178         // Region not in the cache - have to go to the meta RS
1179         return locateRegionInMeta(tableName, row, useCache, retry, replicaId);
1180       }
1181     }
1182 
1183     private RegionLocations locateMeta(final TableName tableName,
1184         boolean useCache, int replicaId) throws IOException {
1185       // HBASE-10785: We cache the location of the META itself, so that we are not overloading
1186       // zookeeper with one request for every region lookup. We cache the META with empty row
1187       // key in MetaCache.
1188       byte[] metaCacheKey = HConstants.EMPTY_START_ROW; // use byte[0] as the row for meta
1189       RegionLocations locations = null;
1190       if (useCache) {
1191         locations = getCachedLocation(tableName, metaCacheKey);
1192         if (locations != null && locations.getRegionLocation(replicaId) != null) {
1193           return locations;
1194         }
1195       }
1196 
1197       // only one thread should do the lookup.
1198       synchronized (metaRegionLock) {
1199         // Check the cache again for a hit in case some other thread made the
1200         // same query while we were waiting on the lock.
1201         if (useCache) {
1202           locations = getCachedLocation(tableName, metaCacheKey);
1203           if (locations != null && locations.getRegionLocation(replicaId) != null) {
1204             return locations;
1205           }
1206         }
1207 
1208         // Look up from zookeeper
1209         locations = this.registry.getMetaRegionLocation();
1210         if (locations != null) {
1211           cacheLocation(tableName, locations);
1212         }
1213       }
1214       return locations;
1215     }
1216 
1217     /*
1218       * Search the hbase:meta table for the HRegionLocation
1219       * info that contains the table and row we're seeking.
1220       */
1221     private RegionLocations locateRegionInMeta(TableName tableName, byte[] row,
1222                    boolean useCache, boolean retry, int replicaId) throws IOException {
1223 
1224       // If we are supposed to be using the cache, look in the cache to see if
1225       // we already have the region.
1226       if (useCache) {
1227         RegionLocations locations = getCachedLocation(tableName, row);
1228         if (locations != null && locations.getRegionLocation(replicaId) != null) {
1229           return locations;
1230         }
1231       }
1232 
1233       // build the key of the meta region we should be looking for.
1234       // the extra 9's on the end are necessary to allow "exact" matches
1235       // without knowing the precise region names.
1236       byte[] metaKey = HRegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
1237 
1238       Scan s = new Scan();
1239       s.setReversed(true);
1240       s.setStartRow(metaKey);
1241       s.setSmall(true);
1242       s.setCaching(1);
1243       if (this.useMetaReplicas) {
1244         s.setConsistency(Consistency.TIMELINE);
1245       }
1246 
1247       int localNumRetries = (retry ? numTries : 1);
1248 
1249       for (int tries = 0; true; tries++) {
1250         if (tries >= localNumRetries) {
1251           throw new NoServerForRegionException("Unable to find region for "
1252               + Bytes.toStringBinary(row) + " in " + tableName +
1253               " after " + localNumRetries + " tries.");
1254         }
1255         if (useCache) {
1256           RegionLocations locations = getCachedLocation(tableName, row);
1257           if (locations != null && locations.getRegionLocation(replicaId) != null) {
1258             return locations;
1259           }
1260         } else {
1261           // If we are not supposed to be using the cache, delete any existing cached location
1262           // so it won't interfere.
1263           // We are only supposed to clean the cache for the specific replicaId
1264           metaCache.clearCache(tableName, row, replicaId);
1265         }
1266 
1267         // Query the meta region
1268         try {
1269           Result regionInfoRow = null;
1270           ReversedClientScanner rcs = null;
1271           try {
1272             rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this,
1273               rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(),
1274               metaReplicaCallTimeoutScanInMicroSecond);
1275             regionInfoRow = rcs.next();
1276           } finally {
1277             if (rcs != null) {
1278               rcs.close();
1279             }
1280           }
1281 
1282           if (regionInfoRow == null) {
1283             throw new TableNotFoundException(tableName);
1284           }
1285 
1286           // convert the row result into the HRegionLocation we need!
1287           RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow);
1288           if (locations == null || locations.getRegionLocation(replicaId) == null) {
1289             throw new IOException("HRegionInfo was null in " +
1290               tableName + ", row=" + regionInfoRow);
1291           }
1292           HRegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegionInfo();
1293           if (regionInfo == null) {
1294             throw new IOException("HRegionInfo was null or empty in " +
1295               TableName.META_TABLE_NAME + ", row=" + regionInfoRow);
1296           }
1297 
1298           // possible we got a region of a different table...
1299           if (!regionInfo.getTable().equals(tableName)) {
1300             throw new TableNotFoundException(
1301                   "Table '" + tableName + "' was not found, got: " +
1302                   regionInfo.getTable() + ".");
1303           }
1304           if (regionInfo.isSplit()) {
1305             throw new RegionOfflineException("the only available region for" +
1306               " the required row is a split parent," +
1307               " the daughters should be online soon: " +
1308               regionInfo.getRegionNameAsString());
1309           }
1310           if (regionInfo.isOffline()) {
1311             throw new RegionOfflineException("the region is offline, could" +
1312               " be caused by a disable table call: " +
1313               regionInfo.getRegionNameAsString());
1314           }
1315 
1316           ServerName serverName = locations.getRegionLocation(replicaId).getServerName();
1317           if (serverName == null) {
1318             throw new NoServerForRegionException("No server address listed " +
1319               "in " + TableName.META_TABLE_NAME + " for region " +
1320               regionInfo.getRegionNameAsString() + " containing row " +
1321               Bytes.toStringBinary(row));
1322           }
1323 
1324           if (isDeadServer(serverName)){
1325             throw new RegionServerStoppedException("hbase:meta says the region "+
1326                 regionInfo.getRegionNameAsString()+" is managed by the server " + serverName +
1327                 ", but it is dead.");
1328           }
1329           // Instantiate the location
1330           cacheLocation(tableName, locations);
1331           return locations;
1332         } catch (TableNotFoundException e) {
1333           // if we got this error, probably means the table just plain doesn't
1334           // exist. rethrow the error immediately. this should always be coming
1335           // from the HTable constructor.
1336           throw e;
1337         } catch (IOException e) {
1338           ExceptionUtil.rethrowIfInterrupt(e);
1339 
1340           if (e instanceof RemoteException) {
1341             e = ((RemoteException)e).unwrapRemoteException();
1342           }
1343           if (tries < localNumRetries - 1) {
1344             if (LOG.isDebugEnabled()) {
1345               LOG.debug("locateRegionInMeta parentTable=" +
1346                   TableName.META_TABLE_NAME + ", metaLocation=" +
1347                 ", attempt=" + tries + " of " +
1348                 localNumRetries + " failed; retrying after sleep of " +
1349                 ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage());
1350             }
1351           } else {
1352             throw e;
1353           }
1354           // Only relocate the parent region if necessary
1355           if(!(e instanceof RegionOfflineException ||
1356               e instanceof NoServerForRegionException)) {
1357             relocateRegion(TableName.META_TABLE_NAME, metaKey, replicaId);
1358           }
1359         }
1360         try{
1361           Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
1362         } catch (InterruptedException e) {
1363           throw new InterruptedIOException("Giving up trying to location region in " +
1364             "meta: thread is interrupted.");
1365         }
1366       }
1367     }
1368 
1369     /**
1370      * Put a newly discovered HRegionLocation into the cache.
1371      * @param tableName The table name.
1372      * @param location the new location
1373      */
1374     @Override
1375     public void cacheLocation(final TableName tableName, final RegionLocations location) {
1376       metaCache.cacheLocation(tableName, location);
1377     }
1378 
1379     /**
1380      * Search the cache for a location that fits our table and row key.
1381      * Return null if no suitable region is located.
1382      *
1383      * @param tableName
1384      * @param row
1385      * @return Null or region location found in cache.
1386      */
1387     RegionLocations getCachedLocation(final TableName tableName,
1388         final byte [] row) {
1389       return metaCache.getCachedLocation(tableName, row);
1390     }
1391 
1392     public void clearRegionCache(final TableName tableName, byte[] row) {
1393       metaCache.clearCache(tableName, row);
1394     }
1395 
1396     /*
1397      * Delete all cached entries of a table that maps to a specific location.
1398      */
1399     @Override
1400     public void clearCaches(final ServerName serverName) {
1401       metaCache.clearCache(serverName);
1402     }
1403 
1404     @Override
1405     public void clearRegionCache() {
1406       metaCache.clearCache();
1407     }
1408 
1409     @Override
1410     public void clearRegionCache(final TableName tableName) {
1411       metaCache.clearCache(tableName);
1412     }
1413 
1414     @Override
1415     public void clearRegionCache(final byte[] tableName) {
1416       clearRegionCache(TableName.valueOf(tableName));
1417     }
1418 
1419     /**
1420      * Put a newly discovered HRegionLocation into the cache.
1421      * @param tableName The table name.
1422      * @param source the source of the new location, if it's not coming from meta
1423      * @param location the new location
1424      */
1425     private void cacheLocation(final TableName tableName, final ServerName source,
1426         final HRegionLocation location) {
1427       metaCache.cacheLocation(tableName, source, location);
1428     }
1429 
1430     // Map keyed by service name + regionserver to service stub implementation
1431     private final ConcurrentHashMap<String, Object> stubs =
1432       new ConcurrentHashMap<String, Object>();
1433     // Map of locks used creating service stubs per regionserver.
1434     private final ConcurrentHashMap<String, String> connectionLock =
1435       new ConcurrentHashMap<String, String>();
1436 
1437     /**
1438      * State of the MasterService connection/setup.
1439      */
1440     static class MasterServiceState {
1441       HConnection connection;
1442       MasterService.BlockingInterface stub;
1443       int userCount;
1444 
1445       MasterServiceState (final HConnection connection) {
1446         super();
1447         this.connection = connection;
1448       }
1449 
1450       @Override
1451       public String toString() {
1452         return "MasterService";
1453       }
1454 
1455       Object getStub() {
1456         return this.stub;
1457       }
1458 
1459       void clearStub() {
1460         this.stub = null;
1461       }
1462 
1463       boolean isMasterRunning() throws ServiceException {
1464         IsMasterRunningResponse response =
1465           this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1466         return response != null? response.getIsMasterRunning(): false;
1467       }
1468     }
1469 
1470     /**
1471      * Makes a client-side stub for master services. Sub-class to specialize.
1472      * Depends on hosting class so not static.  Exists so we avoid duplicating a bunch of code
1473      * when setting up the MasterMonitorService and MasterAdminService.
1474      */
1475     abstract class StubMaker {
1476       /**
1477        * Returns the name of the service stub being created.
1478        */
1479       protected abstract String getServiceName();
1480 
1481       /**
1482        * Make stub and cache it internal so can be used later doing the isMasterRunning call.
1483        * @param channel
1484        */
1485       protected abstract Object makeStub(final BlockingRpcChannel channel);
1486 
1487       /**
1488        * Once setup, check it works by doing isMasterRunning check.
1489        * @throws ServiceException
1490        */
1491       protected abstract void isMasterRunning() throws ServiceException;
1492 
1493       /**
1494        * Create a stub. Try once only.  It is not typed because there is no common type to
1495        * protobuf services nor their interfaces.  Let the caller do appropriate casting.
1496        * @return A stub for master services.
1497        * @throws IOException
1498        * @throws KeeperException
1499        * @throws ServiceException
1500        */
1501       private Object makeStubNoRetries() throws IOException, KeeperException, ServiceException {
1502         ZooKeeperKeepAliveConnection zkw;
1503         try {
1504           zkw = getKeepAliveZooKeeperWatcher();
1505         } catch (IOException e) {
1506           ExceptionUtil.rethrowIfInterrupt(e);
1507           throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
1508         }
1509         try {
1510           checkIfBaseNodeAvailable(zkw);
1511           ServerName sn = MasterAddressTracker.getMasterAddress(zkw);
1512           if (sn == null) {
1513             String msg = "ZooKeeper available but no active master location found";
1514             LOG.info(msg);
1515             throw new MasterNotRunningException(msg);
1516           }
1517           if (isDeadServer(sn)) {
1518             throw new MasterNotRunningException(sn + " is dead.");
1519           }
1520           // Use the security info interface name as our stub key
1521           String key = getStubKey(getServiceName(), sn.getHostname(), sn.getPort());
1522           connectionLock.putIfAbsent(key, key);
1523           Object stub = null;
1524           synchronized (connectionLock.get(key)) {
1525             stub = stubs.get(key);
1526             if (stub == null) {
1527               BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
1528               stub = makeStub(channel);
1529               isMasterRunning();
1530               stubs.put(key, stub);
1531             }
1532           }
1533           return stub;
1534         } finally {
1535           zkw.close();
1536         }
1537       }
1538 
1539       /**
1540        * Create a stub against the master.  Retry if necessary.
1541        * @return A stub to do <code>intf</code> against the master
1542        * @throws MasterNotRunningException
1543        */
1544       Object makeStub() throws IOException {
1545         // The lock must be at the beginning to prevent multiple master creations
1546         //  (and leaks) in a multithread context
1547         synchronized (masterAndZKLock) {
1548           Exception exceptionCaught = null;
1549           if (!closed) {
1550             try {
1551               return makeStubNoRetries();
1552             } catch (IOException e) {
1553               exceptionCaught = e;
1554             } catch (KeeperException e) {
1555               exceptionCaught = e;
1556             } catch (ServiceException e) {
1557               exceptionCaught = e;
1558             }
1559 
1560             throw new MasterNotRunningException(exceptionCaught);
1561           } else {
1562             throw new DoNotRetryIOException("Connection was closed while trying to get master");
1563           }
1564         }
1565       }
1566     }
1567 
1568     /**
1569      * Class to make a MasterServiceStubMaker stub.
1570      */
1571     class MasterServiceStubMaker extends StubMaker {
1572       private MasterService.BlockingInterface stub;
1573       @Override
1574       protected String getServiceName() {
1575         return MasterService.getDescriptor().getName();
1576       }
1577 
1578       @Override
1579       MasterService.BlockingInterface makeStub() throws IOException {
1580         return (MasterService.BlockingInterface)super.makeStub();
1581       }
1582 
1583       @Override
1584       protected Object makeStub(BlockingRpcChannel channel) {
1585         this.stub = MasterService.newBlockingStub(channel);
1586         return this.stub;
1587       }
1588 
1589       @Override
1590       protected void isMasterRunning() throws ServiceException {
1591         this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1592       }
1593     }
1594 
1595     @Override
1596     public AdminService.BlockingInterface getAdmin(final ServerName serverName)
1597         throws IOException {
1598       return getAdmin(serverName, false);
1599     }
1600 
1601     @Override
1602     // Nothing is done w/ the 'master' parameter.  It is ignored.
1603     public AdminService.BlockingInterface getAdmin(final ServerName serverName,
1604       final boolean master)
1605     throws IOException {
1606       if (isDeadServer(serverName)) {
1607         throw new RegionServerStoppedException(serverName + " is dead.");
1608       }
1609       String key = getStubKey(AdminService.BlockingInterface.class.getName(),
1610           serverName.getHostname(), serverName.getPort());
1611       this.connectionLock.putIfAbsent(key, key);
1612       AdminService.BlockingInterface stub = null;
1613       synchronized (this.connectionLock.get(key)) {
1614         stub = (AdminService.BlockingInterface)this.stubs.get(key);
1615         if (stub == null) {
1616           BlockingRpcChannel channel =
1617               this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
1618           stub = AdminService.newBlockingStub(channel);
1619           this.stubs.put(key, stub);
1620         }
1621       }
1622       return stub;
1623     }
1624 
1625     @Override
1626     public ClientService.BlockingInterface getClient(final ServerName sn)
1627     throws IOException {
1628       if (isDeadServer(sn)) {
1629         throw new RegionServerStoppedException(sn + " is dead.");
1630       }
1631       String key = getStubKey(ClientService.BlockingInterface.class.getName(), sn.getHostname(),
1632           sn.getPort());
1633       this.connectionLock.putIfAbsent(key, key);
1634       ClientService.BlockingInterface stub = null;
1635       synchronized (this.connectionLock.get(key)) {
1636         stub = (ClientService.BlockingInterface)this.stubs.get(key);
1637         if (stub == null) {
1638           BlockingRpcChannel channel =
1639               this.rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
1640           stub = ClientService.newBlockingStub(channel);
1641           // In old days, after getting stub/proxy, we'd make a call.  We are not doing that here.
1642           // Just fail on first actual call rather than in here on setup.
1643           this.stubs.put(key, stub);
1644         }
1645       }
1646       return stub;
1647     }
1648 
1649     static String getStubKey(final String serviceName, final String rsHostname, int port) {
1650       // Sometimes, servers go down and they come back up with the same hostname but a different
1651       // IP address. Force a resolution of the rsHostname by trying to instantiate an
1652       // InetSocketAddress, and this way we will rightfully get a new stubKey.
1653       // Also, include the hostname in the key so as to take care of those cases where the
1654       // DNS name is different but IP address remains the same.
1655       InetAddress i =  new InetSocketAddress(rsHostname, port).getAddress();
1656       String address = rsHostname;
1657       if (i != null) {
1658         address = i.getHostAddress() + "-" + rsHostname;
1659       }
1660       return serviceName + "@" + address + ":" + port;
1661     }
1662 
1663     private ZooKeeperKeepAliveConnection keepAliveZookeeper;
1664     private AtomicInteger keepAliveZookeeperUserCount = new AtomicInteger(0);
1665     private boolean canCloseZKW = true;
1666 
1667     // keepAlive time, in ms. No reason to make it configurable.
1668     private static final long keepAlive = 5 * 60 * 1000;
1669 
1670     /**
1671      * Retrieve a shared ZooKeeperWatcher. You must close it it once you've have finished with it.
1672      * @return The shared instance. Never returns null.
1673      */
1674     ZooKeeperKeepAliveConnection getKeepAliveZooKeeperWatcher()
1675       throws IOException {
1676       synchronized (masterAndZKLock) {
1677         if (keepAliveZookeeper == null) {
1678           if (this.closed) {
1679             throw new IOException(toString() + " closed");
1680           }
1681           // We don't check that our link to ZooKeeper is still valid
1682           // But there is a retry mechanism in the ZooKeeperWatcher itself
1683           keepAliveZookeeper = new ZooKeeperKeepAliveConnection(conf, this.toString(), this);
1684         }
1685         keepAliveZookeeperUserCount.addAndGet(1);
1686         keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
1687         return keepAliveZookeeper;
1688       }
1689     }
1690 
1691     void releaseZooKeeperWatcher(final ZooKeeperWatcher zkw) {
1692       if (zkw == null){
1693         return;
1694       }
1695       if (keepAliveZookeeperUserCount.addAndGet(-1) <= 0 ){
1696         keepZooKeeperWatcherAliveUntil = System.currentTimeMillis() + keepAlive;
1697       }
1698     }
1699 
1700     private void closeZooKeeperWatcher() {
1701       synchronized (masterAndZKLock) {
1702         if (keepAliveZookeeper != null) {
1703           LOG.info("Closing zookeeper sessionid=0x" +
1704             Long.toHexString(
1705               keepAliveZookeeper.getRecoverableZooKeeper().getSessionId()));
1706           keepAliveZookeeper.internalClose();
1707           keepAliveZookeeper = null;
1708         }
1709         keepAliveZookeeperUserCount.set(0);
1710       }
1711     }
1712 
1713     final MasterServiceState masterServiceState = new MasterServiceState(this);
1714 
1715     @Override
1716     public MasterService.BlockingInterface getMaster() throws MasterNotRunningException {
1717       return getKeepAliveMasterService();
1718     }
1719 
1720     private void resetMasterServiceState(final MasterServiceState mss) {
1721       mss.userCount++;
1722     }
1723 
1724     @Override
1725     public MasterKeepAliveConnection getKeepAliveMasterService()
1726     throws MasterNotRunningException {
1727       synchronized (masterAndZKLock) {
1728         if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) {
1729           MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
1730           try {
1731             this.masterServiceState.stub = stubMaker.makeStub();
1732           } catch (MasterNotRunningException ex) {
1733             throw ex;
1734           } catch (IOException e) {
1735             // rethrow as MasterNotRunningException so that we can keep the method sig
1736             throw new MasterNotRunningException(e);
1737           }
1738         }
1739         resetMasterServiceState(this.masterServiceState);
1740       }
1741       // Ugly delegation just so we can add in a Close method.
1742       final MasterService.BlockingInterface stub = this.masterServiceState.stub;
1743       return new MasterKeepAliveConnection() {
1744         MasterServiceState mss = masterServiceState;
1745         @Override
1746         public MasterProtos.AbortProcedureResponse abortProcedure(
1747           RpcController controller,
1748           MasterProtos.AbortProcedureRequest request) throws ServiceException {
1749           return stub.abortProcedure(controller, request);
1750         }
1751         @Override
1752         public MasterProtos.ListProceduresResponse listProcedures(
1753             RpcController controller,
1754             MasterProtos.ListProceduresRequest request) throws ServiceException {
1755           return stub.listProcedures(controller, request);
1756         }
1757         @Override
1758         public MasterProtos.BackupTablesResponse backupTables(
1759             RpcController controller,
1760             MasterProtos.BackupTablesRequest request)  throws ServiceException {
1761           return stub.backupTables(controller, request);
1762         }
1763 
1764         @Override
1765         public AddColumnResponse addColumn(RpcController controller, AddColumnRequest request)
1766         throws ServiceException {
1767           return stub.addColumn(controller, request);
1768         }
1769 
1770         @Override
1771         public DeleteColumnResponse deleteColumn(RpcController controller,
1772             DeleteColumnRequest request)
1773         throws ServiceException {
1774           return stub.deleteColumn(controller, request);
1775         }
1776 
1777         @Override
1778         public ModifyColumnResponse modifyColumn(RpcController controller,
1779             ModifyColumnRequest request)
1780         throws ServiceException {
1781           return stub.modifyColumn(controller, request);
1782         }
1783 
1784         @Override
1785         public MoveRegionResponse moveRegion(RpcController controller,
1786             MoveRegionRequest request) throws ServiceException {
1787           return stub.moveRegion(controller, request);
1788         }
1789 
1790         @Override
1791         public DispatchMergingRegionsResponse dispatchMergingRegions(
1792             RpcController controller, DispatchMergingRegionsRequest request)
1793             throws ServiceException {
1794           return stub.dispatchMergingRegions(controller, request);
1795         }
1796 
1797         @Override
1798         public AssignRegionResponse assignRegion(RpcController controller,
1799             AssignRegionRequest request) throws ServiceException {
1800           return stub.assignRegion(controller, request);
1801         }
1802 
1803         @Override
1804         public UnassignRegionResponse unassignRegion(RpcController controller,
1805             UnassignRegionRequest request) throws ServiceException {
1806           return stub.unassignRegion(controller, request);
1807         }
1808 
1809         @Override
1810         public OfflineRegionResponse offlineRegion(RpcController controller,
1811             OfflineRegionRequest request) throws ServiceException {
1812           return stub.offlineRegion(controller, request);
1813         }
1814 
1815         @Override
1816         public DeleteTableResponse deleteTable(RpcController controller,
1817             DeleteTableRequest request) throws ServiceException {
1818           return stub.deleteTable(controller, request);
1819         }
1820 
1821         @Override
1822         public TruncateTableResponse truncateTable(RpcController controller,
1823             TruncateTableRequest request) throws ServiceException {
1824           return stub.truncateTable(controller, request);
1825         }
1826 
1827         @Override
1828         public EnableTableResponse enableTable(RpcController controller,
1829             EnableTableRequest request) throws ServiceException {
1830           return stub.enableTable(controller, request);
1831         }
1832 
1833         @Override
1834         public DisableTableResponse disableTable(RpcController controller,
1835             DisableTableRequest request) throws ServiceException {
1836           return stub.disableTable(controller, request);
1837         }
1838 
1839         @Override
1840         public ModifyTableResponse modifyTable(RpcController controller,
1841             ModifyTableRequest request) throws ServiceException {
1842           return stub.modifyTable(controller, request);
1843         }
1844 
1845         @Override
1846         public CreateTableResponse createTable(RpcController controller,
1847             CreateTableRequest request) throws ServiceException {
1848           return stub.createTable(controller, request);
1849         }
1850 
1851         @Override
1852         public ShutdownResponse shutdown(RpcController controller,
1853             ShutdownRequest request) throws ServiceException {
1854           return stub.shutdown(controller, request);
1855         }
1856 
1857         @Override
1858         public StopMasterResponse stopMaster(RpcController controller,
1859             StopMasterRequest request) throws ServiceException {
1860           return stub.stopMaster(controller, request);
1861         }
1862 
1863         @Override
1864         public MasterProtos.IsInMaintenanceModeResponse isMasterInMaintenanceMode(
1865             final RpcController controller,
1866             final MasterProtos.IsInMaintenanceModeRequest request) throws ServiceException {
1867           return stub.isMasterInMaintenanceMode(controller, request);
1868         }
1869 
1870         @Override
1871         public BalanceResponse balance(RpcController controller,
1872             BalanceRequest request) throws ServiceException {
1873           return stub.balance(controller, request);
1874         }
1875 
1876         @Override
1877         public SetBalancerRunningResponse setBalancerRunning(
1878             RpcController controller, SetBalancerRunningRequest request)
1879             throws ServiceException {
1880           return stub.setBalancerRunning(controller, request);
1881         }
1882 
1883         @Override
1884         public NormalizeResponse normalize(RpcController controller,
1885                                        NormalizeRequest request) throws ServiceException {
1886           return stub.normalize(controller, request);
1887         }
1888 
1889         @Override
1890         public SetNormalizerRunningResponse setNormalizerRunning(
1891           RpcController controller, SetNormalizerRunningRequest request)
1892           throws ServiceException {
1893           return stub.setNormalizerRunning(controller, request);
1894         }
1895 
1896         @Override
1897         public RunCatalogScanResponse runCatalogScan(RpcController controller,
1898             RunCatalogScanRequest request) throws ServiceException {
1899           return stub.runCatalogScan(controller, request);
1900         }
1901 
1902         @Override
1903         public EnableCatalogJanitorResponse enableCatalogJanitor(
1904             RpcController controller, EnableCatalogJanitorRequest request)
1905             throws ServiceException {
1906           return stub.enableCatalogJanitor(controller, request);
1907         }
1908 
1909         @Override
1910         public IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(
1911             RpcController controller, IsCatalogJanitorEnabledRequest request)
1912             throws ServiceException {
1913           return stub.isCatalogJanitorEnabled(controller, request);
1914         }
1915 
1916         @Override
1917         public CoprocessorServiceResponse execMasterService(
1918             RpcController controller, CoprocessorServiceRequest request)
1919             throws ServiceException {
1920           return stub.execMasterService(controller, request);
1921         }
1922 
1923         @Override
1924         public SnapshotResponse snapshot(RpcController controller,
1925             SnapshotRequest request) throws ServiceException {
1926           return stub.snapshot(controller, request);
1927         }
1928 
1929         @Override
1930         public GetCompletedSnapshotsResponse getCompletedSnapshots(
1931             RpcController controller, GetCompletedSnapshotsRequest request)
1932             throws ServiceException {
1933           return stub.getCompletedSnapshots(controller, request);
1934         }
1935 
1936         @Override
1937         public DeleteSnapshotResponse deleteSnapshot(RpcController controller,
1938             DeleteSnapshotRequest request) throws ServiceException {
1939           return stub.deleteSnapshot(controller, request);
1940         }
1941 
1942         @Override
1943         public IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
1944             IsSnapshotDoneRequest request) throws ServiceException {
1945           return stub.isSnapshotDone(controller, request);
1946         }
1947 
1948         @Override
1949         public RestoreSnapshotResponse restoreSnapshot(
1950             RpcController controller, RestoreSnapshotRequest request)
1951             throws ServiceException {
1952           return stub.restoreSnapshot(controller, request);
1953         }
1954 
1955         @Override
1956         public IsRestoreSnapshotDoneResponse isRestoreSnapshotDone(
1957             RpcController controller, IsRestoreSnapshotDoneRequest request)
1958             throws ServiceException {
1959           return stub.isRestoreSnapshotDone(controller, request);
1960         }
1961 
1962         @Override
1963         public ExecProcedureResponse execProcedure(
1964             RpcController controller, ExecProcedureRequest request)
1965             throws ServiceException {
1966           return stub.execProcedure(controller, request);
1967         }
1968 
1969         @Override
1970         public ExecProcedureResponse execProcedureWithRet(
1971             RpcController controller, ExecProcedureRequest request)
1972             throws ServiceException {
1973           return stub.execProcedureWithRet(controller, request);
1974         }
1975 
1976         @Override
1977         public IsProcedureDoneResponse isProcedureDone(RpcController controller,
1978             IsProcedureDoneRequest request) throws ServiceException {
1979           return stub.isProcedureDone(controller, request);
1980         }
1981 
1982         @Override
1983         public GetProcedureResultResponse getProcedureResult(RpcController controller,
1984             GetProcedureResultRequest request) throws ServiceException {
1985           return stub.getProcedureResult(controller, request);
1986         }
1987 
1988         @Override
1989         public IsMasterRunningResponse isMasterRunning(
1990             RpcController controller, IsMasterRunningRequest request)
1991             throws ServiceException {
1992           return stub.isMasterRunning(controller, request);
1993         }
1994 
1995         @Override
1996         public ModifyNamespaceResponse modifyNamespace(RpcController controller,
1997             ModifyNamespaceRequest request)
1998         throws ServiceException {
1999           return stub.modifyNamespace(controller, request);
2000         }
2001 
2002         @Override
2003         public CreateNamespaceResponse createNamespace(
2004             RpcController controller, CreateNamespaceRequest request) throws ServiceException {
2005           return stub.createNamespace(controller, request);
2006         }
2007 
2008         @Override
2009         public DeleteNamespaceResponse deleteNamespace(
2010             RpcController controller, DeleteNamespaceRequest request) throws ServiceException {
2011           return stub.deleteNamespace(controller, request);
2012         }
2013 
2014         @Override
2015         public GetNamespaceDescriptorResponse getNamespaceDescriptor(RpcController controller,
2016             GetNamespaceDescriptorRequest request) throws ServiceException {
2017           return stub.getNamespaceDescriptor(controller, request);
2018         }
2019 
2020         @Override
2021         public ListNamespaceDescriptorsResponse listNamespaceDescriptors(RpcController controller,
2022             ListNamespaceDescriptorsRequest request) throws ServiceException {
2023           return stub.listNamespaceDescriptors(controller, request);
2024         }
2025 
2026         @Override
2027         public ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace(
2028             RpcController controller, ListTableDescriptorsByNamespaceRequest request)
2029                 throws ServiceException {
2030           return stub.listTableDescriptorsByNamespace(controller, request);
2031         }
2032 
2033         @Override
2034         public ListTableNamesByNamespaceResponse listTableNamesByNamespace(
2035             RpcController controller, ListTableNamesByNamespaceRequest request)
2036                 throws ServiceException {
2037           return stub.listTableNamesByNamespace(controller, request);
2038         }
2039 
2040         @Override
2041         public void close() {
2042           release(this.mss);
2043         }
2044 
2045         @Override
2046         public GetSchemaAlterStatusResponse getSchemaAlterStatus(
2047             RpcController controller, GetSchemaAlterStatusRequest request)
2048             throws ServiceException {
2049           return stub.getSchemaAlterStatus(controller, request);
2050         }
2051 
2052         @Override
2053         public GetTableDescriptorsResponse getTableDescriptors(
2054             RpcController controller, GetTableDescriptorsRequest request)
2055             throws ServiceException {
2056           return stub.getTableDescriptors(controller, request);
2057         }
2058 
2059         @Override
2060         public GetTableNamesResponse getTableNames(
2061             RpcController controller, GetTableNamesRequest request)
2062             throws ServiceException {
2063           return stub.getTableNames(controller, request);
2064         }
2065 
2066         @Override
2067         public GetClusterStatusResponse getClusterStatus(
2068             RpcController controller, GetClusterStatusRequest request)
2069             throws ServiceException {
2070           return stub.getClusterStatus(controller, request);
2071         }
2072 
2073         @Override
2074         public SetQuotaResponse setQuota(RpcController controller, SetQuotaRequest request)
2075             throws ServiceException {
2076           return stub.setQuota(controller, request);
2077         }
2078 
2079         @Override
2080         public MajorCompactionTimestampResponse getLastMajorCompactionTimestamp(
2081             RpcController controller, MajorCompactionTimestampRequest request)
2082             throws ServiceException {
2083           return stub.getLastMajorCompactionTimestamp(controller, request);
2084         }
2085 
2086         @Override
2087         public MajorCompactionTimestampResponse getLastMajorCompactionTimestampForRegion(
2088             RpcController controller, MajorCompactionTimestampForRegionRequest request)
2089             throws ServiceException {
2090           return stub.getLastMajorCompactionTimestampForRegion(controller, request);
2091         }
2092 
2093         @Override
2094         public IsBalancerEnabledResponse isBalancerEnabled(RpcController controller,
2095             IsBalancerEnabledRequest request) throws ServiceException {
2096           return stub.isBalancerEnabled(controller, request);
2097         }
2098 
2099         @Override
2100         public MasterProtos.SetSplitOrMergeEnabledResponse setSplitOrMergeEnabled(
2101           RpcController controller,
2102           MasterProtos.SetSplitOrMergeEnabledRequest request) throws ServiceException {
2103           return stub.setSplitOrMergeEnabled(controller, request);
2104         }
2105 
2106         @Override
2107         public MasterProtos.IsSplitOrMergeEnabledResponse isSplitOrMergeEnabled(
2108           RpcController controller,
2109           MasterProtos.IsSplitOrMergeEnabledRequest request) throws ServiceException {
2110           return stub.isSplitOrMergeEnabled(controller, request);
2111         }
2112 
2113         @Override
2114         public IsNormalizerEnabledResponse isNormalizerEnabled(RpcController controller,
2115             IsNormalizerEnabledRequest request) throws ServiceException {
2116           return stub.isNormalizerEnabled(controller, request);
2117         }
2118 
2119 	      @Override
2120 	      public GetSpaceQuotaRegionSizesResponse getSpaceQuotaRegionSizes(
2121 	          RpcController controller, GetSpaceQuotaRegionSizesRequest request)
2122 	          throws ServiceException {
2123 	        return stub.getSpaceQuotaRegionSizes(controller, request);
2124 	      }
2125 
2126         @Override
2127         public GetQuotaStatesResponse getQuotaStates(RpcController controller,
2128             GetQuotaStatesRequest request) throws ServiceException {
2129           return stub.getQuotaStates(controller, request);
2130         }
2131       };
2132     }
2133 
2134 
2135     private static void release(MasterServiceState mss) {
2136       if (mss != null && mss.connection != null) {
2137         ((HConnectionImplementation)mss.connection).releaseMaster(mss);
2138       }
2139     }
2140 
2141     private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) {
2142       if (mss.getStub() == null){
2143         return false;
2144       }
2145       try {
2146         return mss.isMasterRunning();
2147       } catch (UndeclaredThrowableException e) {
2148         // It's somehow messy, but we can receive exceptions such as
2149         //  java.net.ConnectException but they're not declared. So we catch it...
2150         LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
2151         return false;
2152       } catch (ServiceException se) {
2153         LOG.warn("Checking master connection", se);
2154         return false;
2155       }
2156     }
2157 
2158     void releaseMaster(MasterServiceState mss) {
2159       if (mss.getStub() == null) return;
2160       synchronized (masterAndZKLock) {
2161         --mss.userCount;
2162       }
2163     }
2164 
2165     private void closeMasterService(MasterServiceState mss) {
2166       if (mss.getStub() != null) {
2167         LOG.info("Closing master protocol: " + mss);
2168         mss.clearStub();
2169       }
2170       mss.userCount = 0;
2171     }
2172 
2173     /**
2174      * Immediate close of the shared master. Can be by the delayed close or when closing the
2175      * connection itself.
2176      */
2177     private void closeMaster() {
2178       synchronized (masterAndZKLock) {
2179         closeMasterService(masterServiceState);
2180       }
2181     }
2182 
2183     void updateCachedLocation(HRegionInfo hri, ServerName source,
2184                               ServerName serverName, long seqNum) {
2185       HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
2186       cacheLocation(hri.getTable(), source, newHrl);
2187     }
2188 
2189     @Override
2190     public void deleteCachedRegionLocation(final HRegionLocation location) {
2191       metaCache.clearCache(location);
2192     }
2193 
2194     @Override
2195     public void updateCachedLocations(final TableName tableName, byte[] rowkey,
2196         final Object exception, final HRegionLocation source) {
2197       assert source != null;
2198       updateCachedLocations(tableName, source.getRegionInfo().getRegionName()
2199         , rowkey, exception, source.getServerName());
2200     }
2201 
2202     /**
2203      * Update the location with the new value (if the exception is a RegionMovedException)
2204      * or delete it from the cache. Does nothing if we can be sure from the exception that
2205      * the location is still accurate, or if the cache has already been updated.
2206      * @param exception an object (to simplify user code) on which we will try to find a nested
2207      *                  or wrapped or both RegionMovedException
2208      * @param source server that is the source of the location update.
2209      */
2210     @Override
2211     public void updateCachedLocations(final TableName tableName, byte[] regionName, byte[] rowkey,
2212       final Object exception, final ServerName source) {
2213       if (rowkey == null || tableName == null) {
2214         LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) +
2215             ", tableName=" + (tableName == null ? "null" : tableName));
2216         return;
2217       }
2218 
2219       if (source == null) {
2220         // This should not happen, but let's secure ourselves.
2221         return;
2222       }
2223 
2224       if (regionName == null) {
2225         // we do not know which region, so just remove the cache entry for the row and server
2226         metaCache.clearCache(tableName, rowkey, source);
2227         return;
2228       }
2229 
2230       // Is it something we have already updated?
2231       final RegionLocations oldLocations = getCachedLocation(tableName, rowkey);
2232       HRegionLocation oldLocation = null;
2233       if (oldLocations != null) {
2234         oldLocation = oldLocations.getRegionLocationByRegionName(regionName);
2235       }
2236       if (oldLocation == null || !source.equals(oldLocation.getServerName())) {
2237         // There is no such location in the cache (it's been removed already) or
2238         // the cache has already been refreshed with a different location.  => nothing to do
2239         return;
2240       }
2241 
2242       HRegionInfo regionInfo = oldLocation.getRegionInfo();
2243       Throwable cause = ClientExceptionsUtil.findException(exception);
2244       if (cause != null) {
2245         if (!ClientExceptionsUtil.isMetaClearingException(cause)) {
2246           // We know that the region is still on this region server
2247           return;
2248         }
2249 
2250         if (cause instanceof RegionMovedException) {
2251           RegionMovedException rme = (RegionMovedException) cause;
2252           if (LOG.isTraceEnabled()) {
2253             LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " +
2254                 rme.getHostname() + ":" + rme.getPort() +
2255                 " according to " + source.getHostAndPort());
2256           }
2257           // We know that the region is not anymore on this region server, but we know
2258           //  the new location.
2259           updateCachedLocation(
2260               regionInfo, source, rme.getServerName(), rme.getLocationSeqNum());
2261           return;
2262         }
2263       }
2264 
2265       // If we're here, it means that can cannot be sure about the location, so we remove it from
2266       // the cache. Do not send the source because source can be a new server in the same host:port
2267       metaCache.clearCache(regionInfo);
2268     }
2269 
2270     @Override
2271     public void updateCachedLocations(final byte[] tableName, byte[] rowkey,
2272       final Object exception, final HRegionLocation source) {
2273       updateCachedLocations(TableName.valueOf(tableName), rowkey, exception, source);
2274     }
2275 
2276     @Override
2277     @Deprecated
2278     public void processBatch(List<? extends Row> list,
2279         final TableName tableName,
2280         ExecutorService pool,
2281         Object[] results) throws IOException, InterruptedException {
2282       // This belongs in HTable!!! Not in here.  St.Ack
2283 
2284       // results must be the same size as list
2285       if (results.length != list.size()) {
2286         throw new IllegalArgumentException(
2287           "argument results must be the same size as argument list");
2288       }
2289       processBatchCallback(list, tableName, pool, results, null);
2290     }
2291 
2292     @Override
2293     @Deprecated
2294     public void processBatch(List<? extends Row> list,
2295         final byte[] tableName,
2296         ExecutorService pool,
2297         Object[] results) throws IOException, InterruptedException {
2298       processBatch(list, TableName.valueOf(tableName), pool, results);
2299     }
2300 
2301     /**
2302      * Send the queries in parallel on the different region servers. Retries on failures.
2303      * If the method returns it means that there is no error, and the 'results' array will
2304      * contain no exception. On error, an exception is thrown, and the 'results' array will
2305      * contain results and exceptions.
2306      * @deprecated since 0.96 - Use {@link HTable#processBatchCallback} instead
2307      */
2308     @Override
2309     @Deprecated
2310     public <R> void processBatchCallback(
2311       List<? extends Row> list,
2312       TableName tableName,
2313       ExecutorService pool,
2314       Object[] results,
2315       Batch.Callback<R> callback)
2316       throws IOException, InterruptedException {
2317 
2318       AsyncRequestFuture ars = this.asyncProcess.submitAll(
2319           pool, tableName, list, callback, results);
2320       ars.waitUntilDone();
2321       if (ars.hasError()) {
2322         throw ars.getErrors();
2323       }
2324     }
2325 
2326     @Override
2327     @Deprecated
2328     public <R> void processBatchCallback(
2329       List<? extends Row> list,
2330       byte[] tableName,
2331       ExecutorService pool,
2332       Object[] results,
2333       Batch.Callback<R> callback)
2334       throws IOException, InterruptedException {
2335       processBatchCallback(list, TableName.valueOf(tableName), pool, results, callback);
2336     }
2337 
2338     // For tests to override.
2339     protected AsyncProcess createAsyncProcess(Configuration conf) {
2340       // No default pool available.
2341       return new AsyncProcess(this, conf, this.batchPool,
2342           RpcRetryingCallerFactory.instantiate(conf, this.getStatisticsTracker()), false,
2343           RpcControllerFactory.instantiate(conf));
2344     }
2345 
2346     @Override
2347     public AsyncProcess getAsyncProcess() {
2348       return asyncProcess;
2349     }
2350 
2351     @Override
2352     public ServerStatisticTracker getStatisticsTracker() {
2353       return this.stats;
2354     }
2355 
2356     @Override
2357     public ClientBackoffPolicy getBackoffPolicy() {
2358       return this.backoffPolicy;
2359     }
2360 
2361     /*
2362      * Return the number of cached region for a table. It will only be called
2363      * from a unit test.
2364      */
2365     @VisibleForTesting
2366     int getNumberOfCachedRegionLocations(final TableName tableName) {
2367       return metaCache.getNumberOfCachedRegionLocations(tableName);
2368     }
2369 
2370     @Override
2371     @Deprecated
2372     public void setRegionCachePrefetch(final TableName tableName, final boolean enable) {
2373     }
2374 
2375     @Override
2376     @Deprecated
2377     public void setRegionCachePrefetch(final byte[] tableName,
2378         final boolean enable) {
2379     }
2380 
2381     @Override
2382     @Deprecated
2383     public boolean getRegionCachePrefetch(TableName tableName) {
2384       return false;
2385     }
2386 
2387     @Override
2388     @Deprecated
2389     public boolean getRegionCachePrefetch(byte[] tableName) {
2390       return false;
2391     }
2392 
2393     @Override
2394     public void abort(final String msg, Throwable t) {
2395       if (t instanceof KeeperException.SessionExpiredException
2396         && keepAliveZookeeper != null) {
2397         synchronized (masterAndZKLock) {
2398           if (keepAliveZookeeper != null) {
2399             LOG.warn("This client just lost it's session with ZooKeeper," +
2400               " closing it." +
2401               " It will be recreated next time someone needs it", t);
2402             closeZooKeeperWatcher();
2403           }
2404         }
2405       } else {
2406         if (t != null) {
2407           LOG.fatal(msg, t);
2408         } else {
2409           LOG.fatal(msg);
2410         }
2411         this.aborted = true;
2412         close();
2413         this.closed = true;
2414       }
2415     }
2416 
2417     @Override
2418     public boolean isClosed() {
2419       return this.closed;
2420     }
2421 
2422     @Override
2423     public boolean isAborted(){
2424       return this.aborted;
2425     }
2426 
2427     @Override
2428     public int getCurrentNrHRS() throws IOException {
2429       return this.registry.getCurrentNrHRS();
2430     }
2431 
2432     /**
2433      * Increment this client's reference count.
2434      */
2435     void incCount() {
2436       ++refCount;
2437     }
2438 
2439     /**
2440      * Decrement this client's reference count.
2441      */
2442     void decCount() {
2443       if (refCount > 0) {
2444         --refCount;
2445       }
2446     }
2447 
2448     /**
2449      * Return if this client has no reference
2450      *
2451      * @return true if this client has no reference; false otherwise
2452      */
2453     boolean isZeroReference() {
2454       return refCount == 0;
2455     }
2456 
2457     void internalClose() {
2458       if (this.closed) {
2459         return;
2460       }
2461       closeMaster();
2462       shutdownPools();
2463       this.closed = true;
2464       closeZooKeeperWatcher();
2465       this.stubs.clear();
2466       if (clusterStatusListener != null) {
2467         clusterStatusListener.close();
2468       }
2469       if (rpcClient != null) {
2470         rpcClient.close();
2471       }
2472     }
2473 
2474     @Override
2475     public void close() {
2476       if (managed) {
2477         if (aborted) {
2478           ConnectionManager.deleteStaleConnection(this);
2479         } else {
2480           ConnectionManager.deleteConnection(this, false);
2481         }
2482       } else {
2483         internalClose();
2484       }
2485     }
2486 
2487     /**
2488      * Close the connection for good, regardless of what the current value of
2489      * {@link #refCount} is. Ideally, {@link #refCount} should be zero at this
2490      * point, which would be the case if all of its consumers close the
2491      * connection. However, on the off chance that someone is unable to close
2492      * the connection, perhaps because it bailed out prematurely, the method
2493      * below will ensure that this {@link HConnection} instance is cleaned up.
2494      * Caveat: The JVM may take an unknown amount of time to call finalize on an
2495      * unreachable object, so our hope is that every consumer cleans up after
2496      * itself, like any good citizen.
2497      */
2498     @Override
2499     protected void finalize() throws Throwable {
2500       super.finalize();
2501       // Pretend as if we are about to release the last remaining reference
2502       refCount = 1;
2503       close();
2504     }
2505 
2506     /**
2507      * @deprecated Use {@link Admin#listTables()} instead
2508      */
2509     @Deprecated
2510     @Override
2511     public HTableDescriptor[] listTables() throws IOException {
2512       MasterKeepAliveConnection master = getKeepAliveMasterService();
2513       try {
2514         GetTableDescriptorsRequest req =
2515           RequestConverter.buildGetTableDescriptorsRequest((List<TableName>)null);
2516         return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(null, req));
2517       } catch (ServiceException se) {
2518         throw ProtobufUtil.getRemoteException(se);
2519       } finally {
2520         master.close();
2521       }
2522     }
2523 
2524     /**
2525      * @deprecated Use {@link Admin#listTableNames()} instead
2526      */
2527     @Deprecated
2528     @Override
2529     public String[] getTableNames() throws IOException {
2530       TableName[] tableNames = listTableNames();
2531       String result[] = new String[tableNames.length];
2532       for (int i = 0; i < tableNames.length; i++) {
2533         result[i] = tableNames[i].getNameAsString();
2534       }
2535       return result;
2536     }
2537 
2538     /**
2539      * @deprecated Use {@link Admin#listTableNames()} instead
2540      */
2541     @Deprecated
2542     @Override
2543     public TableName[] listTableNames() throws IOException {
2544       MasterKeepAliveConnection master = getKeepAliveMasterService();
2545       try {
2546         return ProtobufUtil.getTableNameArray(master.getTableNames(null,
2547             GetTableNamesRequest.newBuilder().build())
2548           .getTableNamesList());
2549       } catch (ServiceException se) {
2550         throw ProtobufUtil.getRemoteException(se);
2551       } finally {
2552         master.close();
2553       }
2554     }
2555 
2556     /**
2557      * @deprecated Use {@link Admin#getTableDescriptorsByTableName(List)} instead
2558      */
2559     @Deprecated
2560     @Override
2561     public HTableDescriptor[] getHTableDescriptorsByTableName(
2562         List<TableName> tableNames) throws IOException {
2563       if (tableNames == null || tableNames.isEmpty()) return new HTableDescriptor[0];
2564       MasterKeepAliveConnection master = getKeepAliveMasterService();
2565       try {
2566         GetTableDescriptorsRequest req =
2567           RequestConverter.buildGetTableDescriptorsRequest(tableNames);
2568         return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(null, req));
2569       } catch (ServiceException se) {
2570         throw ProtobufUtil.getRemoteException(se);
2571       } finally {
2572         master.close();
2573       }
2574     }
2575 
2576     /**
2577      * @deprecated Use {@link Admin#getTableDescriptorsByTableName(List)} instead
2578      */
2579     @Deprecated
2580     @Override
2581     public HTableDescriptor[] getHTableDescriptors(
2582         List<String> names) throws IOException {
2583       List<TableName> tableNames = new ArrayList<TableName>(names.size());
2584       for(String name : names) {
2585         tableNames.add(TableName.valueOf(name));
2586       }
2587 
2588       return getHTableDescriptorsByTableName(tableNames);
2589     }
2590 
2591     @Override
2592     public NonceGenerator getNonceGenerator() {
2593       return this.nonceGenerator;
2594     }
2595 
2596     /**
2597      * Connects to the master to get the table descriptor.
2598      * @param tableName table name
2599      * @throws IOException if the connection to master fails or if the table
2600      *  is not found.
2601      * @deprecated Use {@link Admin#getTableDescriptor(TableName)} instead
2602      */
2603     @Deprecated
2604     @Override
2605     public HTableDescriptor getHTableDescriptor(final TableName tableName)
2606     throws IOException {
2607       if (tableName == null) return null;
2608       MasterKeepAliveConnection master = getKeepAliveMasterService();
2609       GetTableDescriptorsResponse htds;
2610       try {
2611         GetTableDescriptorsRequest req =
2612           RequestConverter.buildGetTableDescriptorsRequest(tableName);
2613         htds = master.getTableDescriptors(null, req);
2614       } catch (ServiceException se) {
2615         throw ProtobufUtil.getRemoteException(se);
2616       } finally {
2617         master.close();
2618       }
2619       if (!htds.getTableSchemaList().isEmpty()) {
2620         return HTableDescriptor.convert(htds.getTableSchemaList().get(0));
2621       }
2622       throw new TableNotFoundException(tableName.getNameAsString());
2623     }
2624 
2625     /**
2626      * @deprecated Use {@link Admin#getTableDescriptor(TableName)} instead
2627      */
2628     @Deprecated
2629     @Override
2630     public HTableDescriptor getHTableDescriptor(final byte[] tableName)
2631     throws IOException {
2632       return getHTableDescriptor(TableName.valueOf(tableName));
2633     }
2634 
2635     @Override
2636     public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
2637       return RpcRetryingCallerFactory
2638           .instantiate(conf, this.interceptor, this.getStatisticsTracker());
2639     }
2640 
2641     @Override
2642     public boolean isManaged() {
2643       return managed;
2644     }
2645     
2646     @Override
2647     public ConnectionConfiguration getConnectionConfiguration() {
2648       return this.connectionConfig;
2649     }
2650 
2651     @Override
2652     public RpcRetryingCallerFactory getRpcRetryingCallerFactory() {
2653       return this.rpcCallerFactory;
2654     }
2655 
2656     @Override
2657     public RpcControllerFactory getRpcControllerFactory() {
2658       return this.rpcControllerFactory;
2659     }
2660 
2661     @Override
2662     public boolean hasCellBlockSupport() {
2663       return this.rpcClient.hasCellBlockSupport();
2664     }
2665   }
2666 
2667   /**
2668    * The record of errors for servers.
2669    */
2670   static class ServerErrorTracker {
2671     // We need a concurrent map here, as we could have multiple threads updating it in parallel.
2672     private final ConcurrentMap<ServerName, ServerErrors> errorsByServer =
2673         new ConcurrentHashMap<ServerName, ServerErrors>();
2674     private final long canRetryUntil;
2675     private final int maxRetries;
2676     private final long startTrackingTime;
2677 
2678     public ServerErrorTracker(long timeout, int maxRetries) {
2679       this.maxRetries = maxRetries;
2680       this.canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout;
2681       this.startTrackingTime = new Date().getTime();
2682     }
2683 
2684     /**
2685      * We stop to retry when we have exhausted BOTH the number of retries and the time allocated.
2686      */
2687     boolean canRetryMore(int numRetry) {
2688       // If there is a single try we must not take into account the time.
2689       return numRetry < maxRetries || (maxRetries > 1 &&
2690           EnvironmentEdgeManager.currentTime() < this.canRetryUntil);
2691     }
2692 
2693     /**
2694      * Calculates the back-off time for a retrying request to a particular server.
2695      *
2696      * @param server    The server in question.
2697      * @param basePause The default hci pause.
2698      * @return The time to wait before sending next request.
2699      */
2700     long calculateBackoffTime(ServerName server, long basePause) {
2701       long result;
2702       ServerErrors errorStats = errorsByServer.get(server);
2703       if (errorStats != null) {
2704         result = ConnectionUtils.getPauseTime(basePause, errorStats.retries.get());
2705       } else {
2706         result = 0; // yes, if the server is not in our list we don't wait before retrying.
2707       }
2708       return result;
2709     }
2710 
2711     /**
2712      * Reports that there was an error on the server to do whatever bean-counting necessary.
2713      *
2714      * @param server The server in question.
2715      */
2716     void reportServerError(ServerName server) {
2717       ServerErrors errors = errorsByServer.get(server);
2718       if (errors != null) {
2719         errors.addError();
2720       } else {
2721         errors = errorsByServer.putIfAbsent(server, new ServerErrors());
2722         if (errors != null){
2723           errors.addError();
2724         }
2725       }
2726     }
2727 
2728     long getStartTrackingTime() {
2729       return startTrackingTime;
2730     }
2731 
2732     /**
2733      * The record of errors for a server.
2734      */
2735     private static class ServerErrors {
2736       public final AtomicInteger retries = new AtomicInteger(0);
2737 
2738       public void addError() {
2739         retries.incrementAndGet();
2740       }
2741     }
2742   }
2743 }