1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.lang.reflect.UndeclaredThrowableException;
25 import java.net.InetAddress;
26 import java.net.InetSocketAddress;
27 import java.util.ArrayList;
28 import java.util.Date;
29 import java.util.HashSet;
30 import java.util.LinkedHashMap;
31 import java.util.List;
32 import java.util.concurrent.BlockingQueue;
33 import java.util.Map;
34 import java.util.Map.Entry;
35 import java.util.NavigableMap;
36 import java.util.Set;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.ConcurrentMap;
39 import java.util.concurrent.ExecutorService;
40 import java.util.concurrent.LinkedBlockingQueue;
41 import java.util.concurrent.ThreadPoolExecutor;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicBoolean;
44 import java.util.concurrent.atomic.AtomicInteger;
45
46 import org.apache.commons.logging.Log;
47 import org.apache.commons.logging.LogFactory;
48 import org.apache.hadoop.conf.Configuration;
49 import org.apache.hadoop.hbase.DoNotRetryIOException;
50 import org.apache.hadoop.hbase.HBaseConfiguration;
51 import org.apache.hadoop.hbase.HConstants;
52 import org.apache.hadoop.hbase.HRegionInfo;
53 import org.apache.hadoop.hbase.HRegionLocation;
54 import org.apache.hadoop.hbase.HTableDescriptor;
55 import org.apache.hadoop.hbase.MasterNotRunningException;
56 import org.apache.hadoop.hbase.MetaTableAccessor;
57 import org.apache.hadoop.hbase.RegionLocations;
58 import org.apache.hadoop.hbase.RegionTooBusyException;
59 import org.apache.hadoop.hbase.ServerName;
60 import org.apache.hadoop.hbase.TableName;
61 import org.apache.hadoop.hbase.TableNotEnabledException;
62 import org.apache.hadoop.hbase.TableNotFoundException;
63 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
64 import org.apache.hadoop.hbase.classification.InterfaceAudience;
65 import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
66 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
67 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
68 import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
69 import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
70 import org.apache.hadoop.hbase.client.coprocessor.Batch;
71 import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
72 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
73 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
74 import org.apache.hadoop.hbase.ipc.RpcClient;
75 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
76 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
77 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
78 import org.apache.hadoop.hbase.protobuf.RequestConverter;
79 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
80 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
81 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
82 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
83 import org.apache.hadoop.hbase.protobuf.generated.*;
84 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
85 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse;
86 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
87 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionResponse;
88 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest;
89 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceResponse;
90 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest;
91 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceResponse;
92 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest;
93 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableResponse;
94 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnRequest;
95 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnResponse;
96 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
97 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
98 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
99 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
100 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableRequest;
101 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableResponse;
102 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableRequest;
103 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableResponse;
104 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest;
105 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsResponse;
106 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
107 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
108 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableRequest;
109 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableResponse;
110 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest;
111 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse;
112 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusRequest;
113 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusResponse;
114 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
115 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
116 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
117 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
118 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
119 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
120 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
121 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
122 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesRequest;
123 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesResponse;
124 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultRequest;
125 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultResponse;
126 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
127 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
128 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
129 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
130 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
131 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse;
132 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
133 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
134 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
135 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
136 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneRequest;
137 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneResponse;
138 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
139 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
140 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
141 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
142 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
143 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
144 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
145 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
146 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
147 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
148 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
149 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
150 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnRequest;
151 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnResponse;
152 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
153 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
154 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableRequest;
155 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableResponse;
156 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionRequest;
157 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionResponse;
158 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.NormalizeRequest;
159 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.NormalizeResponse;
160 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionRequest;
161 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionResponse;
162 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
163 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
164 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanRequest;
165 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse;
166 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
167 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
168 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
169 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
170 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaRequest;
171 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaResponse;
172 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownRequest;
173 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownResponse;
174 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest;
175 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse;
176 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterRequest;
177 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterResponse;
178 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableRequest;
179 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableResponse;
180 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest;
181 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionResponse;
182 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
183 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
184 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
185 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
186 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
187 import org.apache.hadoop.hbase.security.User;
188 import org.apache.hadoop.hbase.security.UserProvider;
189 import org.apache.hadoop.hbase.util.Bytes;
190 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
191 import org.apache.hadoop.hbase.util.ExceptionUtil;
192 import org.apache.hadoop.hbase.util.Threads;
193 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
194 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
195 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
196 import org.apache.hadoop.ipc.RemoteException;
197 import org.apache.zookeeper.KeeperException;
198
199 import com.google.common.annotations.VisibleForTesting;
200 import com.google.protobuf.BlockingRpcChannel;
201 import com.google.protobuf.RpcController;
202 import com.google.protobuf.ServiceException;
203
204
205
206
207 @SuppressWarnings("serial")
208 @InterfaceAudience.Private
209
210 class ConnectionManager {
211 static final Log LOG = LogFactory.getLog(ConnectionManager.class);
212
213 public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
214 private static final String CLIENT_NONCES_ENABLED_KEY = "hbase.client.nonces.enabled";
215
216
217
218
219 static final Map<HConnectionKey, HConnectionImplementation> CONNECTION_INSTANCES;
220
221 public static final int MAX_CACHED_CONNECTION_INSTANCES;
222
223
224
225
226
227 private static volatile NonceGenerator nonceGenerator = null;
228
229 private static Object nonceGeneratorCreateLock = new Object();
230
231 static {
232
233
234
235
236 MAX_CACHED_CONNECTION_INSTANCES = HBaseConfiguration.create().getInt(
237 HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS) + 1;
238 CONNECTION_INSTANCES = new LinkedHashMap<HConnectionKey, HConnectionImplementation>(
239 (int) (MAX_CACHED_CONNECTION_INSTANCES / 0.75F) + 1, 0.75F, true) {
240 @Override
241 protected boolean removeEldestEntry(
242 Map.Entry<HConnectionKey, HConnectionImplementation> eldest) {
243 return size() > MAX_CACHED_CONNECTION_INSTANCES;
244 }
245 };
246 }
247
248
249 static class NoNonceGenerator implements NonceGenerator {
250 @Override
251 public long getNonceGroup() {
252 return HConstants.NO_NONCE;
253 }
254 @Override
255 public long newNonce() {
256 return HConstants.NO_NONCE;
257 }
258 }
259
260
261
262
263 private ConnectionManager() {
264 super();
265 }
266
267
268
269
270
271
272 @VisibleForTesting
273 static NonceGenerator injectNonceGeneratorForTesting(
274 ClusterConnection conn, NonceGenerator cnm) {
275 HConnectionImplementation connImpl = (HConnectionImplementation)conn;
276 NonceGenerator ng = connImpl.getNonceGenerator();
277 LOG.warn("Nonce generator is being replaced by test code for " + cnm.getClass().getName());
278 connImpl.nonceGenerator = cnm;
279 return ng;
280 }
281
282
283
284
285
286
287
288
289
290
291 @Deprecated
292 public static HConnection getConnection(final Configuration conf) throws IOException {
293 return getConnectionInternal(conf);
294 }
295
296
297 static ClusterConnection getConnectionInternal(final Configuration conf)
298 throws IOException {
299 HConnectionKey connectionKey = new HConnectionKey(conf);
300 synchronized (CONNECTION_INSTANCES) {
301 HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
302 if (connection == null) {
303 connection = (HConnectionImplementation)createConnection(conf, true);
304 CONNECTION_INSTANCES.put(connectionKey, connection);
305 } else if (connection.isClosed()) {
306 ConnectionManager.deleteConnection(connectionKey, true);
307 connection = (HConnectionImplementation)createConnection(conf, true);
308 CONNECTION_INSTANCES.put(connectionKey, connection);
309 }
310 connection.incCount();
311 return connection;
312 }
313 }
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335 public static HConnection createConnection(Configuration conf) throws IOException {
336 return createConnectionInternal(conf);
337 }
338
339 static ClusterConnection createConnectionInternal(Configuration conf) throws IOException {
340 UserProvider provider = UserProvider.instantiate(conf);
341 return createConnection(conf, false, null, provider.getCurrent());
342 }
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364 public static HConnection createConnection(Configuration conf, ExecutorService pool)
365 throws IOException {
366 UserProvider provider = UserProvider.instantiate(conf);
367 return createConnection(conf, false, pool, provider.getCurrent());
368 }
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390 public static HConnection createConnection(Configuration conf, User user)
391 throws IOException {
392 return createConnection(conf, false, null, user);
393 }
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416 public static HConnection createConnection(Configuration conf, ExecutorService pool, User user)
417 throws IOException {
418 return createConnection(conf, false, pool, user);
419 }
420
421 @Deprecated
422 static HConnection createConnection(final Configuration conf, final boolean managed)
423 throws IOException {
424 UserProvider provider = UserProvider.instantiate(conf);
425 return createConnection(conf, managed, null, provider.getCurrent());
426 }
427
428 @Deprecated
429 static ClusterConnection createConnection(final Configuration conf, final boolean managed,
430 final ExecutorService pool, final User user)
431 throws IOException {
432 return (ClusterConnection) ConnectionFactory.createConnection(conf, managed, pool, user);
433 }
434
435
436
437
438
439
440
441
442
443 @Deprecated
444 public static void deleteConnection(Configuration conf) {
445 deleteConnection(new HConnectionKey(conf), false);
446 }
447
448
449
450
451
452
453
454
455 @Deprecated
456 public static void deleteStaleConnection(HConnection connection) {
457 deleteConnection(connection, true);
458 }
459
460
461
462
463
464
465
466 @Deprecated
467 public static void deleteAllConnections(boolean staleConnection) {
468 synchronized (CONNECTION_INSTANCES) {
469 Set<HConnectionKey> connectionKeys = new HashSet<HConnectionKey>();
470 connectionKeys.addAll(CONNECTION_INSTANCES.keySet());
471 for (HConnectionKey connectionKey : connectionKeys) {
472 deleteConnection(connectionKey, staleConnection);
473 }
474 CONNECTION_INSTANCES.clear();
475 }
476 }
477
478
479
480
481
482 @Deprecated
483 public static void deleteAllConnections() {
484 deleteAllConnections(false);
485 }
486
487
488 @Deprecated
489 private static void deleteConnection(HConnection connection, boolean staleConnection) {
490 synchronized (CONNECTION_INSTANCES) {
491 for (Entry<HConnectionKey, HConnectionImplementation> e: CONNECTION_INSTANCES.entrySet()) {
492 if (e.getValue() == connection) {
493 deleteConnection(e.getKey(), staleConnection);
494 break;
495 }
496 }
497 }
498 }
499
500 @Deprecated
501 private static void deleteConnection(HConnectionKey connectionKey, boolean staleConnection) {
502 synchronized (CONNECTION_INSTANCES) {
503 HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
504 if (connection != null) {
505 connection.decCount();
506 if (connection.isZeroReference() || staleConnection) {
507 CONNECTION_INSTANCES.remove(connectionKey);
508 connection.internalClose();
509 }
510 } else {
511 LOG.error("Connection not found in the list, can't delete it "+
512 "(connection key=" + connectionKey + "). May be the key was modified?", new Exception());
513 }
514 }
515 }
516
517
518
519
520
521
522
523
524
525
526
527
528 @InterfaceAudience.Private
529 public static <T> T execute(HConnectable<T> connectable) throws IOException {
530 if (connectable == null || connectable.conf == null) {
531 return null;
532 }
533 Configuration conf = connectable.conf;
534 HConnection connection = getConnection(conf);
535 boolean connectSucceeded = false;
536 try {
537 T returnValue = connectable.connect(connection);
538 connectSucceeded = true;
539 return returnValue;
540 } finally {
541 try {
542 connection.close();
543 } catch (Exception e) {
544 ExceptionUtil.rethrowIfInterrupt(e);
545 if (connectSucceeded) {
546 throw new IOException("The connection to " + connection
547 + " could not be deleted.", e);
548 }
549 }
550 }
551 }
552
553
554 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
555 value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
556 justification="Access to the conncurrent hash map is under a lock so should be fine.")
557 static class HConnectionImplementation implements ClusterConnection, Closeable {
558 static final Log LOG = LogFactory.getLog(HConnectionImplementation.class);
559 private final long pause;
560 private boolean useMetaReplicas;
561 private final int metaReplicaCallTimeoutScanInMicroSecond;
562 private final int numTries;
563 final int rpcTimeout;
564 private NonceGenerator nonceGenerator = null;
565 private final AsyncProcess asyncProcess;
566
567 private final ServerStatisticTracker stats;
568
569 private volatile boolean closed;
570 private volatile boolean aborted;
571
572
573 ClusterStatusListener clusterStatusListener;
574
575
576 private final Object metaRegionLock = new Object();
577
578
579
580
581
582
583 private final Object masterAndZKLock = new Object();
584
585 private long keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
586
587
588
589 private volatile ExecutorService batchPool = null;
590
591
592 private volatile ExecutorService metaLookupPool = null;
593 private volatile boolean cleanupPool = false;
594
595 private final Configuration conf;
596
597
598
599 private final ConnectionConfiguration connectionConfig;
600
601
602 private RpcClient rpcClient;
603
604 private MetaCache metaCache = new MetaCache();
605
606 private int refCount;
607
608
609 private boolean managed;
610
611 private User user;
612
613 private RpcRetryingCallerFactory rpcCallerFactory;
614
615 private RpcControllerFactory rpcControllerFactory;
616
617 private final RetryingCallerInterceptor interceptor;
618
619
620
621
622 Registry registry;
623
624 private final ClientBackoffPolicy backoffPolicy;
625
626 HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
627 this(conf, managed, null, null);
628 }
629
630
631
632
633
634
635
636
637
638
639
640
641 HConnectionImplementation(Configuration conf, boolean managed,
642 ExecutorService pool, User user) throws IOException {
643 this(conf);
644 this.user = user;
645 this.batchPool = pool;
646 this.managed = managed;
647 this.registry = setupRegistry();
648 retrieveClusterId();
649
650 this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId);
651 this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
652
653
654 boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
655 HConstants.STATUS_PUBLISHED_DEFAULT);
656 Class<? extends ClusterStatusListener.Listener> listenerClass =
657 conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
658 ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
659 ClusterStatusListener.Listener.class);
660 if (shouldListen) {
661 if (listenerClass == null) {
662 LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
663 ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
664 } else {
665 clusterStatusListener = new ClusterStatusListener(
666 new ClusterStatusListener.DeadServerHandler() {
667 @Override
668 public void newDead(ServerName sn) {
669 clearCaches(sn);
670 rpcClient.cancelConnections(sn);
671 }
672 }, conf, listenerClass);
673 }
674 }
675 }
676
677
678
679
680 protected HConnectionImplementation(Configuration conf) {
681 this.conf = conf;
682 this.connectionConfig = new ConnectionConfiguration(conf);
683 this.closed = false;
684 this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
685 HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
686 this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
687 HConstants.DEFAULT_USE_META_REPLICAS);
688 this.metaReplicaCallTimeoutScanInMicroSecond =
689 connectionConfig.getMetaReplicaCallTimeoutMicroSecondScan();
690
691 this.numTries = connectionConfig.getRetriesNumber();
692 this.rpcTimeout = conf.getInt(
693 HConstants.HBASE_RPC_TIMEOUT_KEY,
694 HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
695 if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
696 synchronized (nonceGeneratorCreateLock) {
697 if (ConnectionManager.nonceGenerator == null) {
698 ConnectionManager.nonceGenerator = new PerClientRandomNonceGenerator();
699 }
700 this.nonceGenerator = ConnectionManager.nonceGenerator;
701 }
702 } else {
703 this.nonceGenerator = new NoNonceGenerator();
704 }
705 stats = ServerStatisticTracker.create(conf);
706 this.asyncProcess = createAsyncProcess(this.conf);
707 this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
708 this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
709 this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
710 }
711
712
713
714
715 @VisibleForTesting
716 void setUseMetaReplicas(final boolean useMetaReplicas) {
717 this.useMetaReplicas = useMetaReplicas;
718 }
719
720 @Override
721 public HTableInterface getTable(String tableName) throws IOException {
722 return getTable(TableName.valueOf(tableName));
723 }
724
725 @Override
726 public HTableInterface getTable(byte[] tableName) throws IOException {
727 return getTable(TableName.valueOf(tableName));
728 }
729
730 @Override
731 public HTableInterface getTable(TableName tableName) throws IOException {
732 return getTable(tableName, getBatchPool());
733 }
734
735 @Override
736 public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException {
737 return getTable(TableName.valueOf(tableName), pool);
738 }
739
740 @Override
741 public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException {
742 return getTable(TableName.valueOf(tableName), pool);
743 }
744
745 @Override
746 public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
747 if (managed) {
748 throw new NeedUnmanagedConnectionException();
749 }
750 return new HTable(tableName, this, connectionConfig, rpcCallerFactory, rpcControllerFactory, pool);
751 }
752
753 @Override
754 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) {
755 if (params.getTableName() == null) {
756 throw new IllegalArgumentException("TableName cannot be null.");
757 }
758 if (params.getPool() == null) {
759 params.pool(HTable.getDefaultExecutor(getConfiguration()));
760 }
761 if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) {
762 params.writeBufferSize(connectionConfig.getWriteBufferSize());
763 }
764 if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
765 params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
766 }
767 return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
768 }
769
770 @Override
771 public BufferedMutator getBufferedMutator(TableName tableName) {
772 return getBufferedMutator(new BufferedMutatorParams(tableName));
773 }
774
775 @Override
776 public RegionLocator getRegionLocator(TableName tableName) throws IOException {
777 return new HRegionLocator(tableName, this);
778 }
779
780 @Override
781 public Admin getAdmin() throws IOException {
782 if (managed) {
783 throw new NeedUnmanagedConnectionException();
784 }
785 return new HBaseAdmin(this);
786 }
787
788 private ExecutorService getBatchPool() {
789 if (batchPool == null) {
790 synchronized (this) {
791 if (batchPool == null) {
792 this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256),
793 conf.getInt("hbase.hconnection.threads.core", 256), "-shared-", null);
794 this.cleanupPool = true;
795 }
796 }
797 }
798 return this.batchPool;
799 }
800
801 private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint,
802 BlockingQueue<Runnable> passedWorkQueue) {
803
804 if (maxThreads == 0) {
805 maxThreads = Runtime.getRuntime().availableProcessors() * 8;
806 }
807 if (coreThreads == 0) {
808 coreThreads = Runtime.getRuntime().availableProcessors() * 8;
809 }
810 long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
811 BlockingQueue<Runnable> workQueue = passedWorkQueue;
812 if (workQueue == null) {
813 workQueue =
814 new LinkedBlockingQueue<Runnable>(maxThreads *
815 conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
816 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
817 }
818 ThreadPoolExecutor tpe = new ThreadPoolExecutor(
819 coreThreads,
820 maxThreads,
821 keepAliveTime,
822 TimeUnit.SECONDS,
823 workQueue,
824 Threads.newDaemonThreadFactory(toString() + nameHint));
825 tpe.allowCoreThreadTimeOut(true);
826 return tpe;
827 }
828
829 private ExecutorService getMetaLookupPool() {
830 if (this.metaLookupPool == null) {
831 synchronized (this) {
832 if (this.metaLookupPool == null) {
833
834
835
836
837 this.metaLookupPool = getThreadPool(
838 conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128),
839 conf.getInt("hbase.hconnection.meta.lookup.threads.core", 10),
840 "-metaLookup-shared-", new LinkedBlockingQueue<Runnable>());
841 }
842 }
843 }
844 return this.metaLookupPool;
845 }
846
847 protected ExecutorService getCurrentMetaLookupPool() {
848 return metaLookupPool;
849 }
850
851 protected ExecutorService getCurrentBatchPool() {
852 return batchPool;
853 }
854
855 private void shutdownPools() {
856 if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) {
857 shutdownBatchPool(this.batchPool);
858 }
859 if (this.metaLookupPool != null && !this.metaLookupPool.isShutdown()) {
860 shutdownBatchPool(this.metaLookupPool);
861 }
862 }
863
864 private void shutdownBatchPool(ExecutorService pool) {
865 pool.shutdown();
866 try {
867 if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
868 pool.shutdownNow();
869 }
870 } catch (InterruptedException e) {
871 pool.shutdownNow();
872 }
873 }
874
875
876
877
878
879 private Registry setupRegistry() throws IOException {
880 return RegistryFactory.getRegistry(this);
881 }
882
883
884
885
886 @VisibleForTesting
887 RpcClient getRpcClient() {
888 return rpcClient;
889 }
890
891
892
893
894 @Override
895 public String toString(){
896 return "hconnection-0x" + Integer.toHexString(hashCode());
897 }
898
899 protected String clusterId = null;
900
901 void retrieveClusterId() {
902 if (clusterId != null) return;
903 this.clusterId = this.registry.getClusterId();
904 if (clusterId == null) {
905 clusterId = HConstants.CLUSTER_ID_DEFAULT;
906 LOG.debug("clusterid came back null, using default " + clusterId);
907 }
908 }
909
910 @Override
911 public Configuration getConfiguration() {
912 return this.conf;
913 }
914
915 private void checkIfBaseNodeAvailable(ZooKeeperWatcher zkw)
916 throws MasterNotRunningException {
917 String errorMsg;
918 try {
919 if (ZKUtil.checkExists(zkw, zkw.baseZNode) == -1) {
920 errorMsg = "The node " + zkw.baseZNode+" is not in ZooKeeper. "
921 + "It should have been written by the master. "
922 + "Check the value configured in 'zookeeper.znode.parent'. "
923 + "There could be a mismatch with the one configured in the master.";
924 LOG.error(errorMsg);
925 throw new MasterNotRunningException(errorMsg);
926 }
927 } catch (KeeperException e) {
928 errorMsg = "Can't get connection to ZooKeeper: " + e.getMessage();
929 LOG.error(errorMsg);
930 throw new MasterNotRunningException(errorMsg, e);
931 }
932 }
933
934
935
936
937
938
939 @Deprecated
940 @Override
941 public boolean isMasterRunning()
942 throws MasterNotRunningException, ZooKeeperConnectionException {
943
944
945
946 MasterKeepAliveConnection m = getKeepAliveMasterService();
947 m.close();
948 return true;
949 }
950
951 @Override
952 public HRegionLocation getRegionLocation(final TableName tableName,
953 final byte [] row, boolean reload)
954 throws IOException {
955 return reload? relocateRegion(tableName, row): locateRegion(tableName, row);
956 }
957
958 @Override
959 public HRegionLocation getRegionLocation(final byte[] tableName,
960 final byte [] row, boolean reload)
961 throws IOException {
962 return getRegionLocation(TableName.valueOf(tableName), row, reload);
963 }
964
965 @Override
966 public boolean isTableEnabled(TableName tableName) throws IOException {
967 return this.registry.isTableOnlineState(tableName, true);
968 }
969
970 @Override
971 public boolean isTableEnabled(byte[] tableName) throws IOException {
972 return isTableEnabled(TableName.valueOf(tableName));
973 }
974
975 @Override
976 public boolean isTableDisabled(TableName tableName) throws IOException {
977 return this.registry.isTableOnlineState(tableName, false);
978 }
979
980 @Override
981 public boolean isTableDisabled(byte[] tableName) throws IOException {
982 return isTableDisabled(TableName.valueOf(tableName));
983 }
984
985 @Override
986 public boolean isTableAvailable(final TableName tableName) throws IOException {
987 final AtomicBoolean available = new AtomicBoolean(true);
988 final AtomicInteger regionCount = new AtomicInteger(0);
989 MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
990 @Override
991 public boolean processRow(Result row) throws IOException {
992 HRegionInfo info = MetaScanner.getHRegionInfo(row);
993 if (info != null && !info.isSplitParent()) {
994 if (tableName.equals(info.getTable())) {
995 ServerName server = HRegionInfo.getServerName(row);
996 if (server == null) {
997 available.set(false);
998 return false;
999 }
1000 regionCount.incrementAndGet();
1001 } else if (tableName.compareTo(info.getTable()) < 0) {
1002
1003 return false;
1004 }
1005 }
1006 return true;
1007 }
1008 };
1009 MetaScanner.metaScan(this, visitor, tableName);
1010 return available.get() && (regionCount.get() > 0);
1011 }
1012
1013 @Override
1014 public boolean isTableAvailable(final byte[] tableName) throws IOException {
1015 return isTableAvailable(TableName.valueOf(tableName));
1016 }
1017
1018 @Override
1019 public boolean isTableAvailable(final TableName tableName, final byte[][] splitKeys)
1020 throws IOException {
1021 final AtomicBoolean available = new AtomicBoolean(true);
1022 final AtomicInteger regionCount = new AtomicInteger(0);
1023 MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
1024 @Override
1025 public boolean processRow(Result row) throws IOException {
1026 HRegionInfo info = MetaScanner.getHRegionInfo(row);
1027 if (info != null && !info.isSplitParent()) {
1028 if (tableName.equals(info.getTable())) {
1029 ServerName server = HRegionInfo.getServerName(row);
1030 if (server == null) {
1031 available.set(false);
1032 return false;
1033 }
1034 if (!Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
1035 for (byte[] splitKey : splitKeys) {
1036
1037 if (Bytes.equals(info.getStartKey(), splitKey)) {
1038 regionCount.incrementAndGet();
1039 break;
1040 }
1041 }
1042 } else {
1043
1044 regionCount.incrementAndGet();
1045 }
1046 } else if (tableName.compareTo(info.getTable()) < 0) {
1047
1048 return false;
1049 }
1050 }
1051 return true;
1052 }
1053 };
1054 MetaScanner.metaScan(this, visitor, tableName);
1055
1056 return available.get() && (regionCount.get() == splitKeys.length + 1);
1057 }
1058
1059 @Override
1060 public boolean isTableAvailable(final byte[] tableName, final byte[][] splitKeys)
1061 throws IOException {
1062 return isTableAvailable(TableName.valueOf(tableName), splitKeys);
1063 }
1064
1065 @Override
1066 public HRegionLocation locateRegion(final byte[] regionName) throws IOException {
1067 RegionLocations locations = locateRegion(HRegionInfo.getTable(regionName),
1068 HRegionInfo.getStartKey(regionName), false, true);
1069 return locations == null ? null : locations.getRegionLocation();
1070 }
1071
1072 @Override
1073 public boolean isDeadServer(ServerName sn) {
1074 if (clusterStatusListener == null) {
1075 return false;
1076 } else {
1077 return clusterStatusListener.isDeadServer(sn);
1078 }
1079 }
1080
1081 @Override
1082 public List<HRegionLocation> locateRegions(final TableName tableName)
1083 throws IOException {
1084 return locateRegions (tableName, false, true);
1085 }
1086
1087 @Override
1088 public List<HRegionLocation> locateRegions(final byte[] tableName)
1089 throws IOException {
1090 return locateRegions(TableName.valueOf(tableName));
1091 }
1092
1093 @Override
1094 public List<HRegionLocation> locateRegions(final TableName tableName,
1095 final boolean useCache, final boolean offlined) throws IOException {
1096 NavigableMap<HRegionInfo, ServerName> regions = MetaScanner.allTableRegions(this, tableName);
1097 final List<HRegionLocation> locations = new ArrayList<HRegionLocation>();
1098 for (HRegionInfo regionInfo : regions.keySet()) {
1099 RegionLocations list = locateRegion(tableName, regionInfo.getStartKey(), useCache, true);
1100 if (list != null) {
1101 for (HRegionLocation loc : list.getRegionLocations()) {
1102 if (loc != null) {
1103 locations.add(loc);
1104 }
1105 }
1106 }
1107 }
1108 return locations;
1109 }
1110
1111 @Override
1112 public List<HRegionLocation> locateRegions(final byte[] tableName,
1113 final boolean useCache, final boolean offlined) throws IOException {
1114 return locateRegions(TableName.valueOf(tableName), useCache, offlined);
1115 }
1116
1117 @Override
1118 public HRegionLocation locateRegion(
1119 final TableName tableName, final byte[] row) throws IOException{
1120 RegionLocations locations = locateRegion(tableName, row, true, true);
1121 return locations == null ? null : locations.getRegionLocation();
1122 }
1123
1124 @Override
1125 public HRegionLocation locateRegion(final byte[] tableName,
1126 final byte [] row)
1127 throws IOException{
1128 return locateRegion(TableName.valueOf(tableName), row);
1129 }
1130
1131 @Override
1132 public HRegionLocation relocateRegion(final TableName tableName,
1133 final byte [] row) throws IOException{
1134 RegionLocations locations = relocateRegion(tableName, row,
1135 RegionReplicaUtil.DEFAULT_REPLICA_ID);
1136 return locations == null ? null :
1137 locations.getRegionLocation(RegionReplicaUtil.DEFAULT_REPLICA_ID);
1138 }
1139
1140 @Override
1141 public RegionLocations relocateRegion(final TableName tableName,
1142 final byte [] row, int replicaId) throws IOException{
1143
1144
1145
1146 if (!tableName.equals(TableName.META_TABLE_NAME) && isTableDisabled(tableName)) {
1147 throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
1148 }
1149
1150 return locateRegion(tableName, row, false, true, replicaId);
1151 }
1152
1153 @Override
1154 public HRegionLocation relocateRegion(final byte[] tableName,
1155 final byte [] row) throws IOException {
1156 return relocateRegion(TableName.valueOf(tableName), row);
1157 }
1158
1159 @Override
1160 public RegionLocations locateRegion(final TableName tableName,
1161 final byte [] row, boolean useCache, boolean retry)
1162 throws IOException {
1163 return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID);
1164 }
1165
1166 @Override
1167 public RegionLocations locateRegion(final TableName tableName,
1168 final byte [] row, boolean useCache, boolean retry, int replicaId)
1169 throws IOException {
1170 if (this.closed) throw new DoNotRetryIOException(toString() + " closed");
1171 if (tableName== null || tableName.getName().length == 0) {
1172 throw new IllegalArgumentException(
1173 "table name cannot be null or zero length");
1174 }
1175 if (tableName.equals(TableName.META_TABLE_NAME)) {
1176 return locateMeta(tableName, useCache, replicaId);
1177 } else {
1178
1179 return locateRegionInMeta(tableName, row, useCache, retry, replicaId);
1180 }
1181 }
1182
1183 private RegionLocations locateMeta(final TableName tableName,
1184 boolean useCache, int replicaId) throws IOException {
1185
1186
1187
1188 byte[] metaCacheKey = HConstants.EMPTY_START_ROW;
1189 RegionLocations locations = null;
1190 if (useCache) {
1191 locations = getCachedLocation(tableName, metaCacheKey);
1192 if (locations != null && locations.getRegionLocation(replicaId) != null) {
1193 return locations;
1194 }
1195 }
1196
1197
1198 synchronized (metaRegionLock) {
1199
1200
1201 if (useCache) {
1202 locations = getCachedLocation(tableName, metaCacheKey);
1203 if (locations != null && locations.getRegionLocation(replicaId) != null) {
1204 return locations;
1205 }
1206 }
1207
1208
1209 locations = this.registry.getMetaRegionLocation();
1210 if (locations != null) {
1211 cacheLocation(tableName, locations);
1212 }
1213 }
1214 return locations;
1215 }
1216
1217
1218
1219
1220
1221 private RegionLocations locateRegionInMeta(TableName tableName, byte[] row,
1222 boolean useCache, boolean retry, int replicaId) throws IOException {
1223
1224
1225
1226 if (useCache) {
1227 RegionLocations locations = getCachedLocation(tableName, row);
1228 if (locations != null && locations.getRegionLocation(replicaId) != null) {
1229 return locations;
1230 }
1231 }
1232
1233
1234
1235
1236 byte[] metaKey = HRegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
1237
1238 Scan s = new Scan();
1239 s.setReversed(true);
1240 s.setStartRow(metaKey);
1241 s.setSmall(true);
1242 s.setCaching(1);
1243 if (this.useMetaReplicas) {
1244 s.setConsistency(Consistency.TIMELINE);
1245 }
1246
1247 int localNumRetries = (retry ? numTries : 1);
1248
1249 for (int tries = 0; true; tries++) {
1250 if (tries >= localNumRetries) {
1251 throw new NoServerForRegionException("Unable to find region for "
1252 + Bytes.toStringBinary(row) + " in " + tableName +
1253 " after " + localNumRetries + " tries.");
1254 }
1255 if (useCache) {
1256 RegionLocations locations = getCachedLocation(tableName, row);
1257 if (locations != null && locations.getRegionLocation(replicaId) != null) {
1258 return locations;
1259 }
1260 } else {
1261
1262
1263
1264 metaCache.clearCache(tableName, row, replicaId);
1265 }
1266
1267
1268 try {
1269 Result regionInfoRow = null;
1270 ReversedClientScanner rcs = null;
1271 try {
1272 rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this,
1273 rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(),
1274 metaReplicaCallTimeoutScanInMicroSecond);
1275 regionInfoRow = rcs.next();
1276 } finally {
1277 if (rcs != null) {
1278 rcs.close();
1279 }
1280 }
1281
1282 if (regionInfoRow == null) {
1283 throw new TableNotFoundException(tableName);
1284 }
1285
1286
1287 RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow);
1288 if (locations == null || locations.getRegionLocation(replicaId) == null) {
1289 throw new IOException("HRegionInfo was null in " +
1290 tableName + ", row=" + regionInfoRow);
1291 }
1292 HRegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegionInfo();
1293 if (regionInfo == null) {
1294 throw new IOException("HRegionInfo was null or empty in " +
1295 TableName.META_TABLE_NAME + ", row=" + regionInfoRow);
1296 }
1297
1298
1299 if (!regionInfo.getTable().equals(tableName)) {
1300 throw new TableNotFoundException(
1301 "Table '" + tableName + "' was not found, got: " +
1302 regionInfo.getTable() + ".");
1303 }
1304 if (regionInfo.isSplit()) {
1305 throw new RegionOfflineException("the only available region for" +
1306 " the required row is a split parent," +
1307 " the daughters should be online soon: " +
1308 regionInfo.getRegionNameAsString());
1309 }
1310 if (regionInfo.isOffline()) {
1311 throw new RegionOfflineException("the region is offline, could" +
1312 " be caused by a disable table call: " +
1313 regionInfo.getRegionNameAsString());
1314 }
1315
1316 ServerName serverName = locations.getRegionLocation(replicaId).getServerName();
1317 if (serverName == null) {
1318 throw new NoServerForRegionException("No server address listed " +
1319 "in " + TableName.META_TABLE_NAME + " for region " +
1320 regionInfo.getRegionNameAsString() + " containing row " +
1321 Bytes.toStringBinary(row));
1322 }
1323
1324 if (isDeadServer(serverName)){
1325 throw new RegionServerStoppedException("hbase:meta says the region "+
1326 regionInfo.getRegionNameAsString()+" is managed by the server " + serverName +
1327 ", but it is dead.");
1328 }
1329
1330 cacheLocation(tableName, locations);
1331 return locations;
1332 } catch (TableNotFoundException e) {
1333
1334
1335
1336 throw e;
1337 } catch (IOException e) {
1338 ExceptionUtil.rethrowIfInterrupt(e);
1339
1340 if (e instanceof RemoteException) {
1341 e = ((RemoteException)e).unwrapRemoteException();
1342 }
1343 if (tries < localNumRetries - 1) {
1344 if (LOG.isDebugEnabled()) {
1345 LOG.debug("locateRegionInMeta parentTable=" +
1346 TableName.META_TABLE_NAME + ", metaLocation=" +
1347 ", attempt=" + tries + " of " +
1348 localNumRetries + " failed; retrying after sleep of " +
1349 ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage());
1350 }
1351 } else {
1352 throw e;
1353 }
1354
1355 if(!(e instanceof RegionOfflineException ||
1356 e instanceof NoServerForRegionException)) {
1357 relocateRegion(TableName.META_TABLE_NAME, metaKey, replicaId);
1358 }
1359 }
1360 try{
1361 Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
1362 } catch (InterruptedException e) {
1363 throw new InterruptedIOException("Giving up trying to location region in " +
1364 "meta: thread is interrupted.");
1365 }
1366 }
1367 }
1368
1369
1370
1371
1372
1373
1374 @Override
1375 public void cacheLocation(final TableName tableName, final RegionLocations location) {
1376 metaCache.cacheLocation(tableName, location);
1377 }
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387 RegionLocations getCachedLocation(final TableName tableName,
1388 final byte [] row) {
1389 return metaCache.getCachedLocation(tableName, row);
1390 }
1391
1392 public void clearRegionCache(final TableName tableName, byte[] row) {
1393 metaCache.clearCache(tableName, row);
1394 }
1395
1396
1397
1398
1399 @Override
1400 public void clearCaches(final ServerName serverName) {
1401 metaCache.clearCache(serverName);
1402 }
1403
1404 @Override
1405 public void clearRegionCache() {
1406 metaCache.clearCache();
1407 }
1408
1409 @Override
1410 public void clearRegionCache(final TableName tableName) {
1411 metaCache.clearCache(tableName);
1412 }
1413
1414 @Override
1415 public void clearRegionCache(final byte[] tableName) {
1416 clearRegionCache(TableName.valueOf(tableName));
1417 }
1418
1419
1420
1421
1422
1423
1424
1425 private void cacheLocation(final TableName tableName, final ServerName source,
1426 final HRegionLocation location) {
1427 metaCache.cacheLocation(tableName, source, location);
1428 }
1429
1430
1431 private final ConcurrentHashMap<String, Object> stubs =
1432 new ConcurrentHashMap<String, Object>();
1433
1434 private final ConcurrentHashMap<String, String> connectionLock =
1435 new ConcurrentHashMap<String, String>();
1436
1437
1438
1439
1440 static class MasterServiceState {
1441 HConnection connection;
1442 MasterService.BlockingInterface stub;
1443 int userCount;
1444
1445 MasterServiceState (final HConnection connection) {
1446 super();
1447 this.connection = connection;
1448 }
1449
1450 @Override
1451 public String toString() {
1452 return "MasterService";
1453 }
1454
1455 Object getStub() {
1456 return this.stub;
1457 }
1458
1459 void clearStub() {
1460 this.stub = null;
1461 }
1462
1463 boolean isMasterRunning() throws ServiceException {
1464 IsMasterRunningResponse response =
1465 this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1466 return response != null? response.getIsMasterRunning(): false;
1467 }
1468 }
1469
1470
1471
1472
1473
1474
1475 abstract class StubMaker {
1476
1477
1478
1479 protected abstract String getServiceName();
1480
1481
1482
1483
1484
1485 protected abstract Object makeStub(final BlockingRpcChannel channel);
1486
1487
1488
1489
1490
1491 protected abstract void isMasterRunning() throws ServiceException;
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501 private Object makeStubNoRetries() throws IOException, KeeperException, ServiceException {
1502 ZooKeeperKeepAliveConnection zkw;
1503 try {
1504 zkw = getKeepAliveZooKeeperWatcher();
1505 } catch (IOException e) {
1506 ExceptionUtil.rethrowIfInterrupt(e);
1507 throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
1508 }
1509 try {
1510 checkIfBaseNodeAvailable(zkw);
1511 ServerName sn = MasterAddressTracker.getMasterAddress(zkw);
1512 if (sn == null) {
1513 String msg = "ZooKeeper available but no active master location found";
1514 LOG.info(msg);
1515 throw new MasterNotRunningException(msg);
1516 }
1517 if (isDeadServer(sn)) {
1518 throw new MasterNotRunningException(sn + " is dead.");
1519 }
1520
1521 String key = getStubKey(getServiceName(), sn.getHostname(), sn.getPort());
1522 connectionLock.putIfAbsent(key, key);
1523 Object stub = null;
1524 synchronized (connectionLock.get(key)) {
1525 stub = stubs.get(key);
1526 if (stub == null) {
1527 BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
1528 stub = makeStub(channel);
1529 isMasterRunning();
1530 stubs.put(key, stub);
1531 }
1532 }
1533 return stub;
1534 } finally {
1535 zkw.close();
1536 }
1537 }
1538
1539
1540
1541
1542
1543
1544 Object makeStub() throws IOException {
1545
1546
1547 synchronized (masterAndZKLock) {
1548 Exception exceptionCaught = null;
1549 if (!closed) {
1550 try {
1551 return makeStubNoRetries();
1552 } catch (IOException e) {
1553 exceptionCaught = e;
1554 } catch (KeeperException e) {
1555 exceptionCaught = e;
1556 } catch (ServiceException e) {
1557 exceptionCaught = e;
1558 }
1559
1560 throw new MasterNotRunningException(exceptionCaught);
1561 } else {
1562 throw new DoNotRetryIOException("Connection was closed while trying to get master");
1563 }
1564 }
1565 }
1566 }
1567
1568
1569
1570
1571 class MasterServiceStubMaker extends StubMaker {
1572 private MasterService.BlockingInterface stub;
1573 @Override
1574 protected String getServiceName() {
1575 return MasterService.getDescriptor().getName();
1576 }
1577
1578 @Override
1579 MasterService.BlockingInterface makeStub() throws IOException {
1580 return (MasterService.BlockingInterface)super.makeStub();
1581 }
1582
1583 @Override
1584 protected Object makeStub(BlockingRpcChannel channel) {
1585 this.stub = MasterService.newBlockingStub(channel);
1586 return this.stub;
1587 }
1588
1589 @Override
1590 protected void isMasterRunning() throws ServiceException {
1591 this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1592 }
1593 }
1594
1595 @Override
1596 public AdminService.BlockingInterface getAdmin(final ServerName serverName)
1597 throws IOException {
1598 return getAdmin(serverName, false);
1599 }
1600
1601 @Override
1602
1603 public AdminService.BlockingInterface getAdmin(final ServerName serverName,
1604 final boolean master)
1605 throws IOException {
1606 if (isDeadServer(serverName)) {
1607 throw new RegionServerStoppedException(serverName + " is dead.");
1608 }
1609 String key = getStubKey(AdminService.BlockingInterface.class.getName(),
1610 serverName.getHostname(), serverName.getPort());
1611 this.connectionLock.putIfAbsent(key, key);
1612 AdminService.BlockingInterface stub = null;
1613 synchronized (this.connectionLock.get(key)) {
1614 stub = (AdminService.BlockingInterface)this.stubs.get(key);
1615 if (stub == null) {
1616 BlockingRpcChannel channel =
1617 this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
1618 stub = AdminService.newBlockingStub(channel);
1619 this.stubs.put(key, stub);
1620 }
1621 }
1622 return stub;
1623 }
1624
1625 @Override
1626 public ClientService.BlockingInterface getClient(final ServerName sn)
1627 throws IOException {
1628 if (isDeadServer(sn)) {
1629 throw new RegionServerStoppedException(sn + " is dead.");
1630 }
1631 String key = getStubKey(ClientService.BlockingInterface.class.getName(), sn.getHostname(),
1632 sn.getPort());
1633 this.connectionLock.putIfAbsent(key, key);
1634 ClientService.BlockingInterface stub = null;
1635 synchronized (this.connectionLock.get(key)) {
1636 stub = (ClientService.BlockingInterface)this.stubs.get(key);
1637 if (stub == null) {
1638 BlockingRpcChannel channel =
1639 this.rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
1640 stub = ClientService.newBlockingStub(channel);
1641
1642
1643 this.stubs.put(key, stub);
1644 }
1645 }
1646 return stub;
1647 }
1648
1649 static String getStubKey(final String serviceName, final String rsHostname, int port) {
1650
1651
1652
1653
1654
1655 InetAddress i = new InetSocketAddress(rsHostname, port).getAddress();
1656 String address = rsHostname;
1657 if (i != null) {
1658 address = i.getHostAddress() + "-" + rsHostname;
1659 }
1660 return serviceName + "@" + address + ":" + port;
1661 }
1662
1663 private ZooKeeperKeepAliveConnection keepAliveZookeeper;
1664 private AtomicInteger keepAliveZookeeperUserCount = new AtomicInteger(0);
1665 private boolean canCloseZKW = true;
1666
1667
1668 private static final long keepAlive = 5 * 60 * 1000;
1669
1670
1671
1672
1673
1674 ZooKeeperKeepAliveConnection getKeepAliveZooKeeperWatcher()
1675 throws IOException {
1676 synchronized (masterAndZKLock) {
1677 if (keepAliveZookeeper == null) {
1678 if (this.closed) {
1679 throw new IOException(toString() + " closed");
1680 }
1681
1682
1683 keepAliveZookeeper = new ZooKeeperKeepAliveConnection(conf, this.toString(), this);
1684 }
1685 keepAliveZookeeperUserCount.addAndGet(1);
1686 keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
1687 return keepAliveZookeeper;
1688 }
1689 }
1690
1691 void releaseZooKeeperWatcher(final ZooKeeperWatcher zkw) {
1692 if (zkw == null){
1693 return;
1694 }
1695 if (keepAliveZookeeperUserCount.addAndGet(-1) <= 0 ){
1696 keepZooKeeperWatcherAliveUntil = System.currentTimeMillis() + keepAlive;
1697 }
1698 }
1699
1700 private void closeZooKeeperWatcher() {
1701 synchronized (masterAndZKLock) {
1702 if (keepAliveZookeeper != null) {
1703 LOG.info("Closing zookeeper sessionid=0x" +
1704 Long.toHexString(
1705 keepAliveZookeeper.getRecoverableZooKeeper().getSessionId()));
1706 keepAliveZookeeper.internalClose();
1707 keepAliveZookeeper = null;
1708 }
1709 keepAliveZookeeperUserCount.set(0);
1710 }
1711 }
1712
1713 final MasterServiceState masterServiceState = new MasterServiceState(this);
1714
1715 @Override
1716 public MasterService.BlockingInterface getMaster() throws MasterNotRunningException {
1717 return getKeepAliveMasterService();
1718 }
1719
1720 private void resetMasterServiceState(final MasterServiceState mss) {
1721 mss.userCount++;
1722 }
1723
1724 @Override
1725 public MasterKeepAliveConnection getKeepAliveMasterService()
1726 throws MasterNotRunningException {
1727 synchronized (masterAndZKLock) {
1728 if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) {
1729 MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
1730 try {
1731 this.masterServiceState.stub = stubMaker.makeStub();
1732 } catch (MasterNotRunningException ex) {
1733 throw ex;
1734 } catch (IOException e) {
1735
1736 throw new MasterNotRunningException(e);
1737 }
1738 }
1739 resetMasterServiceState(this.masterServiceState);
1740 }
1741
1742 final MasterService.BlockingInterface stub = this.masterServiceState.stub;
1743 return new MasterKeepAliveConnection() {
1744 MasterServiceState mss = masterServiceState;
1745 @Override
1746 public MasterProtos.AbortProcedureResponse abortProcedure(
1747 RpcController controller,
1748 MasterProtos.AbortProcedureRequest request) throws ServiceException {
1749 return stub.abortProcedure(controller, request);
1750 }
1751 @Override
1752 public MasterProtos.ListProceduresResponse listProcedures(
1753 RpcController controller,
1754 MasterProtos.ListProceduresRequest request) throws ServiceException {
1755 return stub.listProcedures(controller, request);
1756 }
1757 @Override
1758 public MasterProtos.BackupTablesResponse backupTables(
1759 RpcController controller,
1760 MasterProtos.BackupTablesRequest request) throws ServiceException {
1761 return stub.backupTables(controller, request);
1762 }
1763
1764 @Override
1765 public AddColumnResponse addColumn(RpcController controller, AddColumnRequest request)
1766 throws ServiceException {
1767 return stub.addColumn(controller, request);
1768 }
1769
1770 @Override
1771 public DeleteColumnResponse deleteColumn(RpcController controller,
1772 DeleteColumnRequest request)
1773 throws ServiceException {
1774 return stub.deleteColumn(controller, request);
1775 }
1776
1777 @Override
1778 public ModifyColumnResponse modifyColumn(RpcController controller,
1779 ModifyColumnRequest request)
1780 throws ServiceException {
1781 return stub.modifyColumn(controller, request);
1782 }
1783
1784 @Override
1785 public MoveRegionResponse moveRegion(RpcController controller,
1786 MoveRegionRequest request) throws ServiceException {
1787 return stub.moveRegion(controller, request);
1788 }
1789
1790 @Override
1791 public DispatchMergingRegionsResponse dispatchMergingRegions(
1792 RpcController controller, DispatchMergingRegionsRequest request)
1793 throws ServiceException {
1794 return stub.dispatchMergingRegions(controller, request);
1795 }
1796
1797 @Override
1798 public AssignRegionResponse assignRegion(RpcController controller,
1799 AssignRegionRequest request) throws ServiceException {
1800 return stub.assignRegion(controller, request);
1801 }
1802
1803 @Override
1804 public UnassignRegionResponse unassignRegion(RpcController controller,
1805 UnassignRegionRequest request) throws ServiceException {
1806 return stub.unassignRegion(controller, request);
1807 }
1808
1809 @Override
1810 public OfflineRegionResponse offlineRegion(RpcController controller,
1811 OfflineRegionRequest request) throws ServiceException {
1812 return stub.offlineRegion(controller, request);
1813 }
1814
1815 @Override
1816 public DeleteTableResponse deleteTable(RpcController controller,
1817 DeleteTableRequest request) throws ServiceException {
1818 return stub.deleteTable(controller, request);
1819 }
1820
1821 @Override
1822 public TruncateTableResponse truncateTable(RpcController controller,
1823 TruncateTableRequest request) throws ServiceException {
1824 return stub.truncateTable(controller, request);
1825 }
1826
1827 @Override
1828 public EnableTableResponse enableTable(RpcController controller,
1829 EnableTableRequest request) throws ServiceException {
1830 return stub.enableTable(controller, request);
1831 }
1832
1833 @Override
1834 public DisableTableResponse disableTable(RpcController controller,
1835 DisableTableRequest request) throws ServiceException {
1836 return stub.disableTable(controller, request);
1837 }
1838
1839 @Override
1840 public ModifyTableResponse modifyTable(RpcController controller,
1841 ModifyTableRequest request) throws ServiceException {
1842 return stub.modifyTable(controller, request);
1843 }
1844
1845 @Override
1846 public CreateTableResponse createTable(RpcController controller,
1847 CreateTableRequest request) throws ServiceException {
1848 return stub.createTable(controller, request);
1849 }
1850
1851 @Override
1852 public ShutdownResponse shutdown(RpcController controller,
1853 ShutdownRequest request) throws ServiceException {
1854 return stub.shutdown(controller, request);
1855 }
1856
1857 @Override
1858 public StopMasterResponse stopMaster(RpcController controller,
1859 StopMasterRequest request) throws ServiceException {
1860 return stub.stopMaster(controller, request);
1861 }
1862
1863 @Override
1864 public MasterProtos.IsInMaintenanceModeResponse isMasterInMaintenanceMode(
1865 final RpcController controller,
1866 final MasterProtos.IsInMaintenanceModeRequest request) throws ServiceException {
1867 return stub.isMasterInMaintenanceMode(controller, request);
1868 }
1869
1870 @Override
1871 public BalanceResponse balance(RpcController controller,
1872 BalanceRequest request) throws ServiceException {
1873 return stub.balance(controller, request);
1874 }
1875
1876 @Override
1877 public SetBalancerRunningResponse setBalancerRunning(
1878 RpcController controller, SetBalancerRunningRequest request)
1879 throws ServiceException {
1880 return stub.setBalancerRunning(controller, request);
1881 }
1882
1883 @Override
1884 public NormalizeResponse normalize(RpcController controller,
1885 NormalizeRequest request) throws ServiceException {
1886 return stub.normalize(controller, request);
1887 }
1888
1889 @Override
1890 public SetNormalizerRunningResponse setNormalizerRunning(
1891 RpcController controller, SetNormalizerRunningRequest request)
1892 throws ServiceException {
1893 return stub.setNormalizerRunning(controller, request);
1894 }
1895
1896 @Override
1897 public RunCatalogScanResponse runCatalogScan(RpcController controller,
1898 RunCatalogScanRequest request) throws ServiceException {
1899 return stub.runCatalogScan(controller, request);
1900 }
1901
1902 @Override
1903 public EnableCatalogJanitorResponse enableCatalogJanitor(
1904 RpcController controller, EnableCatalogJanitorRequest request)
1905 throws ServiceException {
1906 return stub.enableCatalogJanitor(controller, request);
1907 }
1908
1909 @Override
1910 public IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(
1911 RpcController controller, IsCatalogJanitorEnabledRequest request)
1912 throws ServiceException {
1913 return stub.isCatalogJanitorEnabled(controller, request);
1914 }
1915
1916 @Override
1917 public CoprocessorServiceResponse execMasterService(
1918 RpcController controller, CoprocessorServiceRequest request)
1919 throws ServiceException {
1920 return stub.execMasterService(controller, request);
1921 }
1922
1923 @Override
1924 public SnapshotResponse snapshot(RpcController controller,
1925 SnapshotRequest request) throws ServiceException {
1926 return stub.snapshot(controller, request);
1927 }
1928
1929 @Override
1930 public GetCompletedSnapshotsResponse getCompletedSnapshots(
1931 RpcController controller, GetCompletedSnapshotsRequest request)
1932 throws ServiceException {
1933 return stub.getCompletedSnapshots(controller, request);
1934 }
1935
1936 @Override
1937 public DeleteSnapshotResponse deleteSnapshot(RpcController controller,
1938 DeleteSnapshotRequest request) throws ServiceException {
1939 return stub.deleteSnapshot(controller, request);
1940 }
1941
1942 @Override
1943 public IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
1944 IsSnapshotDoneRequest request) throws ServiceException {
1945 return stub.isSnapshotDone(controller, request);
1946 }
1947
1948 @Override
1949 public RestoreSnapshotResponse restoreSnapshot(
1950 RpcController controller, RestoreSnapshotRequest request)
1951 throws ServiceException {
1952 return stub.restoreSnapshot(controller, request);
1953 }
1954
1955 @Override
1956 public IsRestoreSnapshotDoneResponse isRestoreSnapshotDone(
1957 RpcController controller, IsRestoreSnapshotDoneRequest request)
1958 throws ServiceException {
1959 return stub.isRestoreSnapshotDone(controller, request);
1960 }
1961
1962 @Override
1963 public ExecProcedureResponse execProcedure(
1964 RpcController controller, ExecProcedureRequest request)
1965 throws ServiceException {
1966 return stub.execProcedure(controller, request);
1967 }
1968
1969 @Override
1970 public ExecProcedureResponse execProcedureWithRet(
1971 RpcController controller, ExecProcedureRequest request)
1972 throws ServiceException {
1973 return stub.execProcedureWithRet(controller, request);
1974 }
1975
1976 @Override
1977 public IsProcedureDoneResponse isProcedureDone(RpcController controller,
1978 IsProcedureDoneRequest request) throws ServiceException {
1979 return stub.isProcedureDone(controller, request);
1980 }
1981
1982 @Override
1983 public GetProcedureResultResponse getProcedureResult(RpcController controller,
1984 GetProcedureResultRequest request) throws ServiceException {
1985 return stub.getProcedureResult(controller, request);
1986 }
1987
1988 @Override
1989 public IsMasterRunningResponse isMasterRunning(
1990 RpcController controller, IsMasterRunningRequest request)
1991 throws ServiceException {
1992 return stub.isMasterRunning(controller, request);
1993 }
1994
1995 @Override
1996 public ModifyNamespaceResponse modifyNamespace(RpcController controller,
1997 ModifyNamespaceRequest request)
1998 throws ServiceException {
1999 return stub.modifyNamespace(controller, request);
2000 }
2001
2002 @Override
2003 public CreateNamespaceResponse createNamespace(
2004 RpcController controller, CreateNamespaceRequest request) throws ServiceException {
2005 return stub.createNamespace(controller, request);
2006 }
2007
2008 @Override
2009 public DeleteNamespaceResponse deleteNamespace(
2010 RpcController controller, DeleteNamespaceRequest request) throws ServiceException {
2011 return stub.deleteNamespace(controller, request);
2012 }
2013
2014 @Override
2015 public GetNamespaceDescriptorResponse getNamespaceDescriptor(RpcController controller,
2016 GetNamespaceDescriptorRequest request) throws ServiceException {
2017 return stub.getNamespaceDescriptor(controller, request);
2018 }
2019
2020 @Override
2021 public ListNamespaceDescriptorsResponse listNamespaceDescriptors(RpcController controller,
2022 ListNamespaceDescriptorsRequest request) throws ServiceException {
2023 return stub.listNamespaceDescriptors(controller, request);
2024 }
2025
2026 @Override
2027 public ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace(
2028 RpcController controller, ListTableDescriptorsByNamespaceRequest request)
2029 throws ServiceException {
2030 return stub.listTableDescriptorsByNamespace(controller, request);
2031 }
2032
2033 @Override
2034 public ListTableNamesByNamespaceResponse listTableNamesByNamespace(
2035 RpcController controller, ListTableNamesByNamespaceRequest request)
2036 throws ServiceException {
2037 return stub.listTableNamesByNamespace(controller, request);
2038 }
2039
2040 @Override
2041 public void close() {
2042 release(this.mss);
2043 }
2044
2045 @Override
2046 public GetSchemaAlterStatusResponse getSchemaAlterStatus(
2047 RpcController controller, GetSchemaAlterStatusRequest request)
2048 throws ServiceException {
2049 return stub.getSchemaAlterStatus(controller, request);
2050 }
2051
2052 @Override
2053 public GetTableDescriptorsResponse getTableDescriptors(
2054 RpcController controller, GetTableDescriptorsRequest request)
2055 throws ServiceException {
2056 return stub.getTableDescriptors(controller, request);
2057 }
2058
2059 @Override
2060 public GetTableNamesResponse getTableNames(
2061 RpcController controller, GetTableNamesRequest request)
2062 throws ServiceException {
2063 return stub.getTableNames(controller, request);
2064 }
2065
2066 @Override
2067 public GetClusterStatusResponse getClusterStatus(
2068 RpcController controller, GetClusterStatusRequest request)
2069 throws ServiceException {
2070 return stub.getClusterStatus(controller, request);
2071 }
2072
2073 @Override
2074 public SetQuotaResponse setQuota(RpcController controller, SetQuotaRequest request)
2075 throws ServiceException {
2076 return stub.setQuota(controller, request);
2077 }
2078
2079 @Override
2080 public MajorCompactionTimestampResponse getLastMajorCompactionTimestamp(
2081 RpcController controller, MajorCompactionTimestampRequest request)
2082 throws ServiceException {
2083 return stub.getLastMajorCompactionTimestamp(controller, request);
2084 }
2085
2086 @Override
2087 public MajorCompactionTimestampResponse getLastMajorCompactionTimestampForRegion(
2088 RpcController controller, MajorCompactionTimestampForRegionRequest request)
2089 throws ServiceException {
2090 return stub.getLastMajorCompactionTimestampForRegion(controller, request);
2091 }
2092
2093 @Override
2094 public IsBalancerEnabledResponse isBalancerEnabled(RpcController controller,
2095 IsBalancerEnabledRequest request) throws ServiceException {
2096 return stub.isBalancerEnabled(controller, request);
2097 }
2098
2099 @Override
2100 public MasterProtos.SetSplitOrMergeEnabledResponse setSplitOrMergeEnabled(
2101 RpcController controller,
2102 MasterProtos.SetSplitOrMergeEnabledRequest request) throws ServiceException {
2103 return stub.setSplitOrMergeEnabled(controller, request);
2104 }
2105
2106 @Override
2107 public MasterProtos.IsSplitOrMergeEnabledResponse isSplitOrMergeEnabled(
2108 RpcController controller,
2109 MasterProtos.IsSplitOrMergeEnabledRequest request) throws ServiceException {
2110 return stub.isSplitOrMergeEnabled(controller, request);
2111 }
2112
2113 @Override
2114 public IsNormalizerEnabledResponse isNormalizerEnabled(RpcController controller,
2115 IsNormalizerEnabledRequest request) throws ServiceException {
2116 return stub.isNormalizerEnabled(controller, request);
2117 }
2118
2119 @Override
2120 public GetSpaceQuotaRegionSizesResponse getSpaceQuotaRegionSizes(
2121 RpcController controller, GetSpaceQuotaRegionSizesRequest request)
2122 throws ServiceException {
2123 return stub.getSpaceQuotaRegionSizes(controller, request);
2124 }
2125
2126 @Override
2127 public GetQuotaStatesResponse getQuotaStates(RpcController controller,
2128 GetQuotaStatesRequest request) throws ServiceException {
2129 return stub.getQuotaStates(controller, request);
2130 }
2131 };
2132 }
2133
2134
2135 private static void release(MasterServiceState mss) {
2136 if (mss != null && mss.connection != null) {
2137 ((HConnectionImplementation)mss.connection).releaseMaster(mss);
2138 }
2139 }
2140
2141 private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) {
2142 if (mss.getStub() == null){
2143 return false;
2144 }
2145 try {
2146 return mss.isMasterRunning();
2147 } catch (UndeclaredThrowableException e) {
2148
2149
2150 LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
2151 return false;
2152 } catch (ServiceException se) {
2153 LOG.warn("Checking master connection", se);
2154 return false;
2155 }
2156 }
2157
2158 void releaseMaster(MasterServiceState mss) {
2159 if (mss.getStub() == null) return;
2160 synchronized (masterAndZKLock) {
2161 --mss.userCount;
2162 }
2163 }
2164
2165 private void closeMasterService(MasterServiceState mss) {
2166 if (mss.getStub() != null) {
2167 LOG.info("Closing master protocol: " + mss);
2168 mss.clearStub();
2169 }
2170 mss.userCount = 0;
2171 }
2172
2173
2174
2175
2176
2177 private void closeMaster() {
2178 synchronized (masterAndZKLock) {
2179 closeMasterService(masterServiceState);
2180 }
2181 }
2182
2183 void updateCachedLocation(HRegionInfo hri, ServerName source,
2184 ServerName serverName, long seqNum) {
2185 HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
2186 cacheLocation(hri.getTable(), source, newHrl);
2187 }
2188
2189 @Override
2190 public void deleteCachedRegionLocation(final HRegionLocation location) {
2191 metaCache.clearCache(location);
2192 }
2193
2194 @Override
2195 public void updateCachedLocations(final TableName tableName, byte[] rowkey,
2196 final Object exception, final HRegionLocation source) {
2197 assert source != null;
2198 updateCachedLocations(tableName, source.getRegionInfo().getRegionName()
2199 , rowkey, exception, source.getServerName());
2200 }
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210 @Override
2211 public void updateCachedLocations(final TableName tableName, byte[] regionName, byte[] rowkey,
2212 final Object exception, final ServerName source) {
2213 if (rowkey == null || tableName == null) {
2214 LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) +
2215 ", tableName=" + (tableName == null ? "null" : tableName));
2216 return;
2217 }
2218
2219 if (source == null) {
2220
2221 return;
2222 }
2223
2224 if (regionName == null) {
2225
2226 metaCache.clearCache(tableName, rowkey, source);
2227 return;
2228 }
2229
2230
2231 final RegionLocations oldLocations = getCachedLocation(tableName, rowkey);
2232 HRegionLocation oldLocation = null;
2233 if (oldLocations != null) {
2234 oldLocation = oldLocations.getRegionLocationByRegionName(regionName);
2235 }
2236 if (oldLocation == null || !source.equals(oldLocation.getServerName())) {
2237
2238
2239 return;
2240 }
2241
2242 HRegionInfo regionInfo = oldLocation.getRegionInfo();
2243 Throwable cause = ClientExceptionsUtil.findException(exception);
2244 if (cause != null) {
2245 if (!ClientExceptionsUtil.isMetaClearingException(cause)) {
2246
2247 return;
2248 }
2249
2250 if (cause instanceof RegionMovedException) {
2251 RegionMovedException rme = (RegionMovedException) cause;
2252 if (LOG.isTraceEnabled()) {
2253 LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " +
2254 rme.getHostname() + ":" + rme.getPort() +
2255 " according to " + source.getHostAndPort());
2256 }
2257
2258
2259 updateCachedLocation(
2260 regionInfo, source, rme.getServerName(), rme.getLocationSeqNum());
2261 return;
2262 }
2263 }
2264
2265
2266
2267 metaCache.clearCache(regionInfo);
2268 }
2269
2270 @Override
2271 public void updateCachedLocations(final byte[] tableName, byte[] rowkey,
2272 final Object exception, final HRegionLocation source) {
2273 updateCachedLocations(TableName.valueOf(tableName), rowkey, exception, source);
2274 }
2275
2276 @Override
2277 @Deprecated
2278 public void processBatch(List<? extends Row> list,
2279 final TableName tableName,
2280 ExecutorService pool,
2281 Object[] results) throws IOException, InterruptedException {
2282
2283
2284
2285 if (results.length != list.size()) {
2286 throw new IllegalArgumentException(
2287 "argument results must be the same size as argument list");
2288 }
2289 processBatchCallback(list, tableName, pool, results, null);
2290 }
2291
2292 @Override
2293 @Deprecated
2294 public void processBatch(List<? extends Row> list,
2295 final byte[] tableName,
2296 ExecutorService pool,
2297 Object[] results) throws IOException, InterruptedException {
2298 processBatch(list, TableName.valueOf(tableName), pool, results);
2299 }
2300
2301
2302
2303
2304
2305
2306
2307
2308 @Override
2309 @Deprecated
2310 public <R> void processBatchCallback(
2311 List<? extends Row> list,
2312 TableName tableName,
2313 ExecutorService pool,
2314 Object[] results,
2315 Batch.Callback<R> callback)
2316 throws IOException, InterruptedException {
2317
2318 AsyncRequestFuture ars = this.asyncProcess.submitAll(
2319 pool, tableName, list, callback, results);
2320 ars.waitUntilDone();
2321 if (ars.hasError()) {
2322 throw ars.getErrors();
2323 }
2324 }
2325
2326 @Override
2327 @Deprecated
2328 public <R> void processBatchCallback(
2329 List<? extends Row> list,
2330 byte[] tableName,
2331 ExecutorService pool,
2332 Object[] results,
2333 Batch.Callback<R> callback)
2334 throws IOException, InterruptedException {
2335 processBatchCallback(list, TableName.valueOf(tableName), pool, results, callback);
2336 }
2337
2338
2339 protected AsyncProcess createAsyncProcess(Configuration conf) {
2340
2341 return new AsyncProcess(this, conf, this.batchPool,
2342 RpcRetryingCallerFactory.instantiate(conf, this.getStatisticsTracker()), false,
2343 RpcControllerFactory.instantiate(conf));
2344 }
2345
2346 @Override
2347 public AsyncProcess getAsyncProcess() {
2348 return asyncProcess;
2349 }
2350
2351 @Override
2352 public ServerStatisticTracker getStatisticsTracker() {
2353 return this.stats;
2354 }
2355
2356 @Override
2357 public ClientBackoffPolicy getBackoffPolicy() {
2358 return this.backoffPolicy;
2359 }
2360
2361
2362
2363
2364
2365 @VisibleForTesting
2366 int getNumberOfCachedRegionLocations(final TableName tableName) {
2367 return metaCache.getNumberOfCachedRegionLocations(tableName);
2368 }
2369
2370 @Override
2371 @Deprecated
2372 public void setRegionCachePrefetch(final TableName tableName, final boolean enable) {
2373 }
2374
2375 @Override
2376 @Deprecated
2377 public void setRegionCachePrefetch(final byte[] tableName,
2378 final boolean enable) {
2379 }
2380
2381 @Override
2382 @Deprecated
2383 public boolean getRegionCachePrefetch(TableName tableName) {
2384 return false;
2385 }
2386
2387 @Override
2388 @Deprecated
2389 public boolean getRegionCachePrefetch(byte[] tableName) {
2390 return false;
2391 }
2392
2393 @Override
2394 public void abort(final String msg, Throwable t) {
2395 if (t instanceof KeeperException.SessionExpiredException
2396 && keepAliveZookeeper != null) {
2397 synchronized (masterAndZKLock) {
2398 if (keepAliveZookeeper != null) {
2399 LOG.warn("This client just lost it's session with ZooKeeper," +
2400 " closing it." +
2401 " It will be recreated next time someone needs it", t);
2402 closeZooKeeperWatcher();
2403 }
2404 }
2405 } else {
2406 if (t != null) {
2407 LOG.fatal(msg, t);
2408 } else {
2409 LOG.fatal(msg);
2410 }
2411 this.aborted = true;
2412 close();
2413 this.closed = true;
2414 }
2415 }
2416
2417 @Override
2418 public boolean isClosed() {
2419 return this.closed;
2420 }
2421
2422 @Override
2423 public boolean isAborted(){
2424 return this.aborted;
2425 }
2426
2427 @Override
2428 public int getCurrentNrHRS() throws IOException {
2429 return this.registry.getCurrentNrHRS();
2430 }
2431
2432
2433
2434
2435 void incCount() {
2436 ++refCount;
2437 }
2438
2439
2440
2441
2442 void decCount() {
2443 if (refCount > 0) {
2444 --refCount;
2445 }
2446 }
2447
2448
2449
2450
2451
2452
2453 boolean isZeroReference() {
2454 return refCount == 0;
2455 }
2456
2457 void internalClose() {
2458 if (this.closed) {
2459 return;
2460 }
2461 closeMaster();
2462 shutdownPools();
2463 this.closed = true;
2464 closeZooKeeperWatcher();
2465 this.stubs.clear();
2466 if (clusterStatusListener != null) {
2467 clusterStatusListener.close();
2468 }
2469 if (rpcClient != null) {
2470 rpcClient.close();
2471 }
2472 }
2473
2474 @Override
2475 public void close() {
2476 if (managed) {
2477 if (aborted) {
2478 ConnectionManager.deleteStaleConnection(this);
2479 } else {
2480 ConnectionManager.deleteConnection(this, false);
2481 }
2482 } else {
2483 internalClose();
2484 }
2485 }
2486
2487
2488
2489
2490
2491
2492
2493
2494
2495
2496
2497
2498 @Override
2499 protected void finalize() throws Throwable {
2500 super.finalize();
2501
2502 refCount = 1;
2503 close();
2504 }
2505
2506
2507
2508
2509 @Deprecated
2510 @Override
2511 public HTableDescriptor[] listTables() throws IOException {
2512 MasterKeepAliveConnection master = getKeepAliveMasterService();
2513 try {
2514 GetTableDescriptorsRequest req =
2515 RequestConverter.buildGetTableDescriptorsRequest((List<TableName>)null);
2516 return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(null, req));
2517 } catch (ServiceException se) {
2518 throw ProtobufUtil.getRemoteException(se);
2519 } finally {
2520 master.close();
2521 }
2522 }
2523
2524
2525
2526
2527 @Deprecated
2528 @Override
2529 public String[] getTableNames() throws IOException {
2530 TableName[] tableNames = listTableNames();
2531 String result[] = new String[tableNames.length];
2532 for (int i = 0; i < tableNames.length; i++) {
2533 result[i] = tableNames[i].getNameAsString();
2534 }
2535 return result;
2536 }
2537
2538
2539
2540
2541 @Deprecated
2542 @Override
2543 public TableName[] listTableNames() throws IOException {
2544 MasterKeepAliveConnection master = getKeepAliveMasterService();
2545 try {
2546 return ProtobufUtil.getTableNameArray(master.getTableNames(null,
2547 GetTableNamesRequest.newBuilder().build())
2548 .getTableNamesList());
2549 } catch (ServiceException se) {
2550 throw ProtobufUtil.getRemoteException(se);
2551 } finally {
2552 master.close();
2553 }
2554 }
2555
2556
2557
2558
2559 @Deprecated
2560 @Override
2561 public HTableDescriptor[] getHTableDescriptorsByTableName(
2562 List<TableName> tableNames) throws IOException {
2563 if (tableNames == null || tableNames.isEmpty()) return new HTableDescriptor[0];
2564 MasterKeepAliveConnection master = getKeepAliveMasterService();
2565 try {
2566 GetTableDescriptorsRequest req =
2567 RequestConverter.buildGetTableDescriptorsRequest(tableNames);
2568 return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(null, req));
2569 } catch (ServiceException se) {
2570 throw ProtobufUtil.getRemoteException(se);
2571 } finally {
2572 master.close();
2573 }
2574 }
2575
2576
2577
2578
2579 @Deprecated
2580 @Override
2581 public HTableDescriptor[] getHTableDescriptors(
2582 List<String> names) throws IOException {
2583 List<TableName> tableNames = new ArrayList<TableName>(names.size());
2584 for(String name : names) {
2585 tableNames.add(TableName.valueOf(name));
2586 }
2587
2588 return getHTableDescriptorsByTableName(tableNames);
2589 }
2590
2591 @Override
2592 public NonceGenerator getNonceGenerator() {
2593 return this.nonceGenerator;
2594 }
2595
2596
2597
2598
2599
2600
2601
2602
2603 @Deprecated
2604 @Override
2605 public HTableDescriptor getHTableDescriptor(final TableName tableName)
2606 throws IOException {
2607 if (tableName == null) return null;
2608 MasterKeepAliveConnection master = getKeepAliveMasterService();
2609 GetTableDescriptorsResponse htds;
2610 try {
2611 GetTableDescriptorsRequest req =
2612 RequestConverter.buildGetTableDescriptorsRequest(tableName);
2613 htds = master.getTableDescriptors(null, req);
2614 } catch (ServiceException se) {
2615 throw ProtobufUtil.getRemoteException(se);
2616 } finally {
2617 master.close();
2618 }
2619 if (!htds.getTableSchemaList().isEmpty()) {
2620 return HTableDescriptor.convert(htds.getTableSchemaList().get(0));
2621 }
2622 throw new TableNotFoundException(tableName.getNameAsString());
2623 }
2624
2625
2626
2627
2628 @Deprecated
2629 @Override
2630 public HTableDescriptor getHTableDescriptor(final byte[] tableName)
2631 throws IOException {
2632 return getHTableDescriptor(TableName.valueOf(tableName));
2633 }
2634
2635 @Override
2636 public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
2637 return RpcRetryingCallerFactory
2638 .instantiate(conf, this.interceptor, this.getStatisticsTracker());
2639 }
2640
2641 @Override
2642 public boolean isManaged() {
2643 return managed;
2644 }
2645
2646 @Override
2647 public ConnectionConfiguration getConnectionConfiguration() {
2648 return this.connectionConfig;
2649 }
2650
2651 @Override
2652 public RpcRetryingCallerFactory getRpcRetryingCallerFactory() {
2653 return this.rpcCallerFactory;
2654 }
2655
2656 @Override
2657 public RpcControllerFactory getRpcControllerFactory() {
2658 return this.rpcControllerFactory;
2659 }
2660
2661 @Override
2662 public boolean hasCellBlockSupport() {
2663 return this.rpcClient.hasCellBlockSupport();
2664 }
2665 }
2666
2667
2668
2669
2670 static class ServerErrorTracker {
2671
2672 private final ConcurrentMap<ServerName, ServerErrors> errorsByServer =
2673 new ConcurrentHashMap<ServerName, ServerErrors>();
2674 private final long canRetryUntil;
2675 private final int maxRetries;
2676 private final long startTrackingTime;
2677
2678 public ServerErrorTracker(long timeout, int maxRetries) {
2679 this.maxRetries = maxRetries;
2680 this.canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout;
2681 this.startTrackingTime = new Date().getTime();
2682 }
2683
2684
2685
2686
2687 boolean canRetryMore(int numRetry) {
2688
2689 return numRetry < maxRetries || (maxRetries > 1 &&
2690 EnvironmentEdgeManager.currentTime() < this.canRetryUntil);
2691 }
2692
2693
2694
2695
2696
2697
2698
2699
2700 long calculateBackoffTime(ServerName server, long basePause) {
2701 long result;
2702 ServerErrors errorStats = errorsByServer.get(server);
2703 if (errorStats != null) {
2704 result = ConnectionUtils.getPauseTime(basePause, errorStats.retries.get());
2705 } else {
2706 result = 0;
2707 }
2708 return result;
2709 }
2710
2711
2712
2713
2714
2715
2716 void reportServerError(ServerName server) {
2717 ServerErrors errors = errorsByServer.get(server);
2718 if (errors != null) {
2719 errors.addError();
2720 } else {
2721 errors = errorsByServer.putIfAbsent(server, new ServerErrors());
2722 if (errors != null){
2723 errors.addError();
2724 }
2725 }
2726 }
2727
2728 long getStartTrackingTime() {
2729 return startTrackingTime;
2730 }
2731
2732
2733
2734
2735 private static class ServerErrors {
2736 public final AtomicInteger retries = new AtomicInteger(0);
2737
2738 public void addError() {
2739 retries.incrementAndGet();
2740 }
2741 }
2742 }
2743 }