1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import java.io.IOException;
22 import java.net.InetAddress;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Map.Entry;
31 import java.util.Set;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.ConcurrentNavigableMap;
34 import java.util.concurrent.ConcurrentSkipListMap;
35 import java.util.concurrent.CopyOnWriteArrayList;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.hbase.ClockOutOfSyncException;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.HRegionInfo;
43 import org.apache.hadoop.hbase.NotServingRegionException;
44 import org.apache.hadoop.hbase.RegionLoad;
45 import org.apache.hadoop.hbase.Server;
46 import org.apache.hadoop.hbase.ServerLoad;
47 import org.apache.hadoop.hbase.ServerName;
48 import org.apache.hadoop.hbase.YouAreDeadException;
49 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
50 import org.apache.hadoop.hbase.classification.InterfaceAudience;
51 import org.apache.hadoop.hbase.client.ClusterConnection;
52 import org.apache.hadoop.hbase.client.ConnectionFactory;
53 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
54 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
55 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
56 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
57 import org.apache.hadoop.hbase.master.handler.MetaServerShutdownHandler;
58 import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
59 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
60 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
61 import org.apache.hadoop.hbase.protobuf.RequestConverter;
62 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
63 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
64 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
65 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
66 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
67 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
68 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
69 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
70 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
71 import org.apache.hadoop.hbase.regionserver.HRegionServer;
72 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
73 import org.apache.hadoop.hbase.util.Bytes;
74 import org.apache.hadoop.hbase.util.Triple;
75 import org.apache.hadoop.hbase.util.RetryCounter;
76 import org.apache.hadoop.hbase.util.RetryCounterFactory;
77 import org.apache.hadoop.hbase.util.Threads;
78 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
79 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
80 import org.apache.zookeeper.KeeperException;
81
82 import com.google.common.annotations.VisibleForTesting;
83 import com.google.protobuf.ByteString;
84 import com.google.protobuf.ServiceException;
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108 @InterfaceAudience.Private
109 public class ServerManager {
110 public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
111 "hbase.master.wait.on.regionservers.maxtostart";
112
113 public static final String WAIT_ON_REGIONSERVERS_MINTOSTART =
114 "hbase.master.wait.on.regionservers.mintostart";
115
116 public static final String WAIT_ON_REGIONSERVERS_TIMEOUT =
117 "hbase.master.wait.on.regionservers.timeout";
118
119 public static final String WAIT_ON_REGIONSERVERS_INTERVAL =
120 "hbase.master.wait.on.regionservers.interval";
121
122 private static final Log LOG = LogFactory.getLog(ServerManager.class);
123
124
125 private volatile boolean clusterShutdown = false;
126
127 private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion =
128 new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
129
130 private final ConcurrentNavigableMap<byte[], ConcurrentNavigableMap<byte[], Long>>
131 storeFlushedSequenceIdsByRegion =
132 new ConcurrentSkipListMap<byte[], ConcurrentNavigableMap<byte[], Long>>(Bytes.BYTES_COMPARATOR);
133
134
135 private final ConcurrentHashMap<ServerName, ServerLoad> onlineServers =
136 new ConcurrentHashMap<ServerName, ServerLoad>();
137
138
139
140
141
142 private final Map<ServerName, AdminService.BlockingInterface> rsAdmins =
143 new HashMap<ServerName, AdminService.BlockingInterface>();
144
145
146
147
148
149 private final ArrayList<ServerName> drainingServers =
150 new ArrayList<ServerName>();
151
152 private final Server master;
153 private final MasterServices services;
154 private final ClusterConnection connection;
155
156 private final DeadServer deadservers = new DeadServer();
157
158 private final long maxSkew;
159 private final long warningSkew;
160
161 private final RetryCounterFactory pingRetryCounterFactory;
162 private final RpcControllerFactory rpcControllerFactory;
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180 private Set<ServerName> queuedDeadServers = new HashSet<ServerName>();
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197 private Map<ServerName, Boolean> requeuedDeadServers
198 = new ConcurrentHashMap<ServerName, Boolean>();
199
200
201 private List<ServerListener> listeners = new CopyOnWriteArrayList<ServerListener>();
202
203
204
205
206
207
208
209 public ServerManager(final Server master, final MasterServices services)
210 throws IOException {
211 this(master, services, true);
212 }
213
214 ServerManager(final Server master, final MasterServices services,
215 final boolean connect) throws IOException {
216 this.master = master;
217 this.services = services;
218 Configuration c = master.getConfiguration();
219 maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
220 warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
221 this.connection = connect ? (ClusterConnection)ConnectionFactory.createConnection(c) : null;
222 int pingMaxAttempts = Math.max(1, master.getConfiguration().getInt(
223 "hbase.master.maximum.ping.server.attempts", 10));
224 int pingSleepInterval = Math.max(1, master.getConfiguration().getInt(
225 "hbase.master.ping.server.retry.sleep.interval", 100));
226 this.pingRetryCounterFactory = new RetryCounterFactory(pingMaxAttempts, pingSleepInterval);
227 this.rpcControllerFactory = this.connection == null
228 ? null
229 : connection.getRpcControllerFactory();
230 }
231
232
233
234
235
236 public void registerListener(final ServerListener listener) {
237 this.listeners.add(listener);
238 }
239
240
241
242
243
244 public boolean unregisterListener(final ServerListener listener) {
245 return this.listeners.remove(listener);
246 }
247
248
249
250
251
252
253
254
255 ServerName regionServerStartup(RegionServerStartupRequest request, InetAddress ia)
256 throws IOException {
257
258
259
260
261
262
263
264
265 final String hostname = request.hasUseThisHostnameInstead() ?
266 request.getUseThisHostnameInstead() :ia.getHostName();
267 ServerName sn = ServerName.valueOf(hostname, request.getPort(),
268 request.getServerStartCode());
269 checkClockSkew(sn, request.getServerCurrentTime());
270 checkIsDead(sn, "STARTUP");
271 if (!checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
272 LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup"
273 + " could not record the server: " + sn);
274 }
275 return sn;
276 }
277
278 private ConcurrentNavigableMap<byte[], Long> getOrCreateStoreFlushedSequenceId(
279 byte[] regionName) {
280 ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
281 storeFlushedSequenceIdsByRegion.get(regionName);
282 if (storeFlushedSequenceId != null) {
283 return storeFlushedSequenceId;
284 }
285 storeFlushedSequenceId = new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
286 ConcurrentNavigableMap<byte[], Long> alreadyPut =
287 storeFlushedSequenceIdsByRegion.putIfAbsent(regionName, storeFlushedSequenceId);
288 return alreadyPut == null ? storeFlushedSequenceId : alreadyPut;
289 }
290
291
292
293
294
295 private void updateLastFlushedSequenceIds(ServerName sn, ServerLoad hsl) {
296 Map<byte[], RegionLoad> regionsLoad = hsl.getRegionsLoad();
297 for (Entry<byte[], RegionLoad> entry : regionsLoad.entrySet()) {
298 byte[] encodedRegionName = Bytes.toBytes(HRegionInfo.encodeRegionName(entry.getKey()));
299 Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName);
300 long l = entry.getValue().getCompleteSequenceId();
301
302 if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) {
303 flushedSequenceIdByRegion.put(encodedRegionName, l);
304 } else if (l != HConstants.NO_SEQNUM && l < existingValue) {
305 LOG.warn("RegionServer " + sn + " indicates a last flushed sequence id ("
306 + l + ") that is less than the previous last flushed sequence id ("
307 + existingValue + ") for region " + Bytes.toString(entry.getKey()) + " Ignoring.");
308 }
309 ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
310 getOrCreateStoreFlushedSequenceId(encodedRegionName);
311 for (StoreSequenceId storeSeqId : entry.getValue().getStoreCompleteSequenceId()) {
312 byte[] family = storeSeqId.getFamilyName().toByteArray();
313 existingValue = storeFlushedSequenceId.get(family);
314 l = storeSeqId.getSequenceId();
315
316 if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue.longValue())) {
317 storeFlushedSequenceId.put(family, l);
318 }
319 }
320 }
321 }
322
323 void regionServerReport(ServerName sn,
324 ServerLoad sl) throws YouAreDeadException {
325 checkIsDead(sn, "REPORT");
326 if (null == this.onlineServers.replace(sn, sl)) {
327
328
329
330
331
332
333 if (!checkAndRecordNewServer(sn, sl)) {
334 LOG.info("RegionServerReport ignored, could not record the server: " + sn);
335 return;
336 }
337 }
338 updateLastFlushedSequenceIds(sn, sl);
339 }
340
341
342
343
344
345
346
347
348
349 boolean checkAndRecordNewServer(
350 final ServerName serverName, final ServerLoad sl) {
351 ServerName existingServer = null;
352 synchronized (this.onlineServers) {
353 existingServer = findServerWithSameHostnamePortWithLock(serverName);
354 if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
355 LOG.info("Server serverName=" + serverName + " rejected; we already have "
356 + existingServer.toString() + " registered with same hostname and port");
357 return false;
358 }
359 recordNewServerWithLock(serverName, sl);
360 }
361
362
363 if (!this.listeners.isEmpty()) {
364 for (ServerListener listener : this.listeners) {
365 listener.serverAdded(serverName);
366 }
367 }
368
369
370
371 if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) {
372 LOG.info("Triggering server recovery; existingServer " +
373 existingServer + " looks stale, new server:" + serverName);
374 expireServer(existingServer);
375 }
376 return true;
377 }
378
379
380
381
382
383
384
385
386
387 private void checkClockSkew(final ServerName serverName, final long serverCurrentTime)
388 throws ClockOutOfSyncException {
389 long skew = Math.abs(System.currentTimeMillis() - serverCurrentTime);
390 if (skew > maxSkew) {
391 String message = "Server " + serverName + " has been " +
392 "rejected; Reported time is too far out of sync with master. " +
393 "Time difference of " + skew + "ms > max allowed of " + maxSkew + "ms";
394 LOG.warn(message);
395 throw new ClockOutOfSyncException(message);
396 } else if (skew > warningSkew){
397 String message = "Reported time for server " + serverName + " is out of sync with master " +
398 "by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " +
399 "error threshold is " + maxSkew + "ms)";
400 LOG.warn(message);
401 }
402 }
403
404
405
406
407
408
409
410
411
412 private void checkIsDead(final ServerName serverName, final String what)
413 throws YouAreDeadException {
414 if (this.deadservers.isDeadServer(serverName)) {
415
416
417 String message = "Server " + what + " rejected; currently processing " +
418 serverName + " as dead server";
419 LOG.debug(message);
420 throw new YouAreDeadException(message);
421 }
422
423
424 if ((this.services == null || ((HMaster) this.services).isInitialized())
425 && this.deadservers.cleanPreviousInstance(serverName)) {
426
427
428 LOG.debug(what + ":" + " Server " + serverName + " came back up," +
429 " removed it from the dead servers list");
430 }
431 }
432
433
434
435
436
437 private ServerName findServerWithSameHostnamePortWithLock(
438 final ServerName serverName) {
439 for (ServerName sn: this.onlineServers.keySet()) {
440 if (ServerName.isSameHostnameAndPort(serverName, sn)) return sn;
441 }
442 return null;
443 }
444
445
446
447
448
449
450
451 @VisibleForTesting
452 void recordNewServerWithLock(final ServerName serverName, final ServerLoad sl) {
453 LOG.info("Registering server=" + serverName);
454 this.onlineServers.put(serverName, sl);
455 this.rsAdmins.remove(serverName);
456 }
457
458 public RegionStoreSequenceIds getLastFlushedSequenceId(byte[] encodedRegionName) {
459 RegionStoreSequenceIds.Builder builder = RegionStoreSequenceIds.newBuilder();
460 Long seqId = flushedSequenceIdByRegion.get(encodedRegionName);
461 builder.setLastFlushedSequenceId(seqId != null ? seqId.longValue() : HConstants.NO_SEQNUM);
462 Map<byte[], Long> storeFlushedSequenceId =
463 storeFlushedSequenceIdsByRegion.get(encodedRegionName);
464 if (storeFlushedSequenceId != null) {
465 for (Map.Entry<byte[], Long> entry : storeFlushedSequenceId.entrySet()) {
466 builder.addStoreSequenceId(StoreSequenceId.newBuilder()
467 .setFamilyName(ByteString.copyFrom(entry.getKey()))
468 .setSequenceId(entry.getValue().longValue()).build());
469 }
470 }
471 return builder.build();
472 }
473
474
475
476
477
478 public ServerLoad getLoad(final ServerName serverName) {
479 return this.onlineServers.get(serverName);
480 }
481
482
483
484
485
486
487
488 public double getAverageLoad() {
489 int totalLoad = 0;
490 int numServers = 0;
491 for (ServerLoad sl: this.onlineServers.values()) {
492 numServers++;
493 totalLoad += sl.getNumberOfRegions();
494 }
495 return numServers == 0 ? 0 :
496 (double)totalLoad / (double)numServers;
497 }
498
499
500 public int countOfRegionServers() {
501
502 return this.onlineServers.size();
503 }
504
505
506
507
508 public Map<ServerName, ServerLoad> getOnlineServers() {
509
510 synchronized (this.onlineServers) {
511 return Collections.unmodifiableMap(this.onlineServers);
512 }
513 }
514
515
516 public DeadServer getDeadServers() {
517 return this.deadservers;
518 }
519
520
521
522
523
524 public boolean areDeadServersInProgress() {
525 return this.deadservers.areDeadServersInProgress();
526 }
527
528 void letRegionServersShutdown() {
529 long previousLogTime = 0;
530 ServerName sn = master.getServerName();
531 ZooKeeperWatcher zkw = master.getZooKeeper();
532 int onlineServersCt;
533 while ((onlineServersCt = onlineServers.size()) > 0){
534
535 if (System.currentTimeMillis() > (previousLogTime + 1000)) {
536 Set<ServerName> remainingServers = onlineServers.keySet();
537 synchronized (onlineServers) {
538 if (remainingServers.size() == 1 && remainingServers.contains(sn)) {
539
540 return;
541 }
542 }
543 StringBuilder sb = new StringBuilder();
544
545 for (ServerName key : remainingServers) {
546 if (sb.length() > 0) {
547 sb.append(", ");
548 }
549 sb.append(key);
550 }
551 LOG.info("Waiting on regionserver(s) to go down " + sb.toString());
552 previousLogTime = System.currentTimeMillis();
553 }
554
555 try {
556 List<String> servers = ZKUtil.listChildrenNoWatch(zkw, zkw.rsZNode);
557 if (servers == null || servers.size() == 0 || (servers.size() == 1
558 && servers.contains(sn.toString()))) {
559 LOG.info("ZK shows there is only the master self online, exiting now");
560
561 break;
562 }
563 } catch (KeeperException ke) {
564 LOG.warn("Failed to list regionservers", ke);
565
566 break;
567 }
568 synchronized (onlineServers) {
569 try {
570 if (onlineServersCt == onlineServers.size()) onlineServers.wait(100);
571 } catch (InterruptedException ignored) {
572
573 }
574 }
575 }
576 }
577
578
579
580
581
582 public synchronized void expireServer(final ServerName serverName) {
583 if (serverName.equals(master.getServerName())) {
584 if (!(master.isAborted() || master.isStopped())) {
585 master.stop("We lost our znode?");
586 }
587 return;
588 }
589 if (!services.isServerShutdownHandlerEnabled()) {
590 LOG.info("Master doesn't enable ServerShutdownHandler during initialization, "
591 + "delay expiring server " + serverName);
592 this.queuedDeadServers.add(serverName);
593 return;
594 }
595 if (this.deadservers.isDeadServer(serverName)) {
596
597 LOG.warn("Expiration of " + serverName +
598 " but server shutdown already in progress");
599 return;
600 }
601 synchronized (onlineServers) {
602 if (!this.onlineServers.containsKey(serverName)) {
603 LOG.warn("Expiration of " + serverName + " but server not online");
604 }
605
606
607
608 this.deadservers.add(serverName);
609 this.onlineServers.remove(serverName);
610 onlineServers.notifyAll();
611 }
612 this.rsAdmins.remove(serverName);
613
614
615 if (this.clusterShutdown) {
616 LOG.info("Cluster shutdown set; " + serverName +
617 " expired; onlineServers=" + this.onlineServers.size());
618 if (this.onlineServers.isEmpty()) {
619 master.stop("Cluster shutdown set; onlineServer=0");
620 }
621 return;
622 }
623
624 boolean carryingMeta = services.getAssignmentManager().isCarryingMeta(serverName) ==
625 AssignmentManager.ServerHostRegion.HOSTING_REGION;
626 if (carryingMeta) {
627 this.services.getExecutorService().submit(new MetaServerShutdownHandler(this.master,
628 this.services, this.deadservers, serverName));
629 } else {
630 this.services.getExecutorService().submit(new ServerShutdownHandler(this.master,
631 this.services, this.deadservers, serverName, true));
632 }
633 LOG.debug("Added=" + serverName +
634 " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta);
635
636
637 if (!this.listeners.isEmpty()) {
638 for (ServerListener listener : this.listeners) {
639 listener.serverRemoved(serverName);
640 }
641 }
642 }
643
644 public synchronized void processDeadServer(final ServerName serverName) {
645 this.processDeadServer(serverName, false);
646 }
647
648 public synchronized void processDeadServer(final ServerName serverName, boolean shouldSplitWal) {
649
650
651
652
653
654
655
656
657 if (!services.getAssignmentManager().isFailoverCleanupDone()) {
658 requeuedDeadServers.put(serverName, shouldSplitWal);
659 return;
660 }
661
662 this.deadservers.add(serverName);
663 this.services.getExecutorService().submit(
664 new ServerShutdownHandler(this.master, this.services, this.deadservers, serverName,
665 shouldSplitWal));
666 }
667
668
669
670
671
672 synchronized void processQueuedDeadServers() {
673 if (!services.isServerShutdownHandlerEnabled()) {
674 LOG.info("Master hasn't enabled ServerShutdownHandler");
675 }
676 Iterator<ServerName> serverIterator = queuedDeadServers.iterator();
677 while (serverIterator.hasNext()) {
678 ServerName tmpServerName = serverIterator.next();
679 expireServer(tmpServerName);
680 serverIterator.remove();
681 requeuedDeadServers.remove(tmpServerName);
682 }
683
684 if (!services.getAssignmentManager().isFailoverCleanupDone()) {
685 LOG.info("AssignmentManager hasn't finished failover cleanup; waiting");
686 }
687
688 for(ServerName tmpServerName : requeuedDeadServers.keySet()){
689 processDeadServer(tmpServerName, requeuedDeadServers.get(tmpServerName));
690 }
691 requeuedDeadServers.clear();
692 }
693
694
695
696
697 public boolean removeServerFromDrainList(final ServerName sn) {
698
699
700
701 if (!this.isServerOnline(sn)) {
702 LOG.warn("Server " + sn + " is not currently online. " +
703 "Removing from draining list anyway, as requested.");
704 }
705
706 return this.drainingServers.remove(sn);
707 }
708
709
710
711
712 public boolean addServerToDrainList(final ServerName sn) {
713
714
715
716 if (!this.isServerOnline(sn)) {
717 LOG.warn("Server " + sn + " is not currently online. " +
718 "Ignoring request to add it to draining list.");
719 return false;
720 }
721
722
723 if (this.drainingServers.contains(sn)) {
724 LOG.warn("Server " + sn + " is already in the draining server list." +
725 "Ignoring request to add it again.");
726 return false;
727 }
728 return this.drainingServers.add(sn);
729 }
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744 public RegionOpeningState sendRegionOpen(final ServerName server,
745 HRegionInfo region, int versionOfOfflineNode, List<ServerName> favoredNodes)
746 throws IOException {
747 AdminService.BlockingInterface admin = getRsAdmin(server);
748 if (admin == null) {
749 LOG.warn("Attempting to send OPEN RPC to server " + server.toString() +
750 " failed because no RPC connection found to this server");
751 return RegionOpeningState.FAILED_OPENING;
752 }
753 OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server,
754 region, versionOfOfflineNode, favoredNodes,
755 (RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode()));
756 try {
757 OpenRegionResponse response = admin.openRegion(null, request);
758 return ResponseConverter.getRegionOpeningState(response);
759 } catch (ServiceException se) {
760 throw ProtobufUtil.getRemoteException(se);
761 }
762 }
763
764
765
766
767
768
769
770
771
772
773 public List<RegionOpeningState> sendRegionOpen(ServerName server,
774 List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos)
775 throws IOException {
776 AdminService.BlockingInterface admin = getRsAdmin(server);
777 if (admin == null) {
778 LOG.warn("Attempting to send OPEN RPC to server " + server.toString() +
779 " failed because no RPC connection found to this server");
780 return null;
781 }
782
783 OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server, regionOpenInfos,
784 (RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode()));
785 try {
786 OpenRegionResponse response = admin.openRegion(null, request);
787 return ResponseConverter.getRegionOpeningStateList(response);
788 } catch (ServiceException se) {
789 throw ProtobufUtil.getRemoteException(se);
790 }
791 }
792
793 private PayloadCarryingRpcController newRpcController() {
794 return rpcControllerFactory == null ? null : rpcControllerFactory.newController();
795 }
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811 public boolean sendRegionClose(ServerName server, HRegionInfo region,
812 int versionOfClosingNode, ServerName dest, boolean transitionInZK) throws IOException {
813 if (server == null) throw new NullPointerException("Passed server is null");
814 AdminService.BlockingInterface admin = getRsAdmin(server);
815 if (admin == null) {
816 throw new IOException("Attempting to send CLOSE RPC to server " +
817 server.toString() + " for region " +
818 region.getRegionNameAsString() +
819 " failed because no RPC connection found to this server");
820 }
821 PayloadCarryingRpcController controller = newRpcController();
822 return ProtobufUtil.closeRegion(controller, admin, server, region.getRegionName(),
823 versionOfClosingNode, dest, transitionInZK);
824 }
825
826 public boolean sendRegionClose(ServerName server,
827 HRegionInfo region, int versionOfClosingNode) throws IOException {
828 return sendRegionClose(server, region, versionOfClosingNode, null, true);
829 }
830
831
832
833
834
835
836
837
838
839 public void sendRegionWarmup(ServerName server,
840 HRegionInfo region) {
841 if (server == null) return;
842 try {
843 AdminService.BlockingInterface admin = getRsAdmin(server);
844 PayloadCarryingRpcController controller = newRpcController();
845 ProtobufUtil.warmupRegion(controller, admin, region);
846 } catch (IOException e) {
847 LOG.error("Received exception in RPC for warmup server:" +
848 server + "region: " + region +
849 "exception: " + e);
850 }
851 }
852
853
854
855
856
857 public static void closeRegionSilentlyAndWait(ClusterConnection connection,
858 ServerName server, HRegionInfo region, long timeout) throws IOException, InterruptedException {
859 AdminService.BlockingInterface rs = connection.getAdmin(server);
860 PayloadCarryingRpcController controller = connection.getRpcControllerFactory().newController();
861 try {
862 ProtobufUtil.closeRegion(controller, rs, server, region.getRegionName(), false);
863 } catch (IOException e) {
864 LOG.warn("Exception when closing region: " + region.getRegionNameAsString(), e);
865 }
866 long expiration = timeout + System.currentTimeMillis();
867 while (System.currentTimeMillis() < expiration) {
868 try {
869 HRegionInfo rsRegion =
870 ProtobufUtil.getRegionInfo(controller, rs, region.getRegionName());
871 if (rsRegion == null) return;
872 } catch (IOException ioe) {
873 if (ioe instanceof NotServingRegionException)
874 return;
875 LOG.warn("Exception when retrieving regioninfo from: "
876 + region.getRegionNameAsString(), ioe);
877 }
878 Thread.sleep(1000);
879 }
880 throw new IOException("Region " + region + " failed to close within"
881 + " timeout " + timeout);
882 }
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897 public void sendRegionsMerge(ServerName server, HRegionInfo region_a,
898 HRegionInfo region_b, boolean forcible) throws IOException {
899 if (server == null)
900 throw new NullPointerException("Passed server is null");
901 if (region_a == null || region_b == null)
902 throw new NullPointerException("Passed region is null");
903 AdminService.BlockingInterface admin = getRsAdmin(server);
904 if (admin == null) {
905 throw new IOException("Attempting to send MERGE REGIONS RPC to server "
906 + server.toString() + " for region "
907 + region_a.getRegionNameAsString() + ","
908 + region_b.getRegionNameAsString()
909 + " failed because no RPC connection found to this server");
910 }
911 PayloadCarryingRpcController controller = newRpcController();
912 ProtobufUtil.mergeRegions(controller, admin, region_a, region_b, forcible);
913 }
914
915
916
917
918 public boolean isServerReachable(ServerName server) {
919 if (server == null) throw new NullPointerException("Passed server is null");
920
921 synchronized (this.onlineServers) {
922 if (this.deadservers.isDeadServer(server)) {
923 return false;
924 }
925 }
926
927
928 RetryCounter retryCounter = pingRetryCounterFactory.create();
929 while (retryCounter.shouldRetry()) {
930 try {
931 PayloadCarryingRpcController controller = newRpcController();
932 AdminService.BlockingInterface admin = getRsAdmin(server);
933 if (admin != null) {
934 ServerInfo info = ProtobufUtil.getServerInfo(controller, admin);
935 return info != null && info.hasServerName()
936 && server.getStartcode() == info.getServerName().getStartCode();
937 }
938 } catch (IOException ioe) {
939 if (LOG.isDebugEnabled()) {
940 LOG.debug("Couldn't reach " + server + ", try=" + retryCounter.getAttemptTimes() + " of "
941 + retryCounter.getMaxAttempts(), ioe);
942 }
943 try {
944 retryCounter.sleepUntilNextRetry();
945 } catch(InterruptedException ie) {
946 Thread.currentThread().interrupt();
947 break;
948 }
949 }
950 }
951 return false;
952 }
953
954
955
956
957
958
959
960 private AdminService.BlockingInterface getRsAdmin(final ServerName sn)
961 throws IOException {
962 AdminService.BlockingInterface admin = this.rsAdmins.get(sn);
963 if (admin == null) {
964 LOG.debug("New admin connection to " + sn.toString());
965 if (sn.equals(master.getServerName()) && master instanceof HRegionServer) {
966
967 admin = ((HRegionServer)master).getRSRpcServices();
968 } else {
969 admin = this.connection.getAdmin(sn);
970 }
971 this.rsAdmins.put(sn, admin);
972 }
973 return admin;
974 }
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989 public void waitForRegionServers(MonitoredTask status)
990 throws InterruptedException {
991 final long interval = this.master.getConfiguration().
992 getLong(WAIT_ON_REGIONSERVERS_INTERVAL, 1500);
993 final long timeout = this.master.getConfiguration().
994 getLong(WAIT_ON_REGIONSERVERS_TIMEOUT, 4500);
995 int defaultMinToStart = 1;
996 if (BaseLoadBalancer.tablesOnMaster(master.getConfiguration())) {
997
998
999
1000
1001 defaultMinToStart = 2;
1002 }
1003 int minToStart = this.master.getConfiguration().
1004 getInt(WAIT_ON_REGIONSERVERS_MINTOSTART, defaultMinToStart);
1005 if (minToStart < 1) {
1006 LOG.warn(String.format(
1007 "The value of '%s' (%d) can not be less than 1, ignoring.",
1008 WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
1009 minToStart = 1;
1010 }
1011 int maxToStart = this.master.getConfiguration().
1012 getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART, Integer.MAX_VALUE);
1013 if (maxToStart < minToStart) {
1014 LOG.warn(String.format(
1015 "The value of '%s' (%d) is set less than '%s' (%d), ignoring.",
1016 WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart,
1017 WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
1018 maxToStart = Integer.MAX_VALUE;
1019 }
1020
1021 long now = System.currentTimeMillis();
1022 final long startTime = now;
1023 long slept = 0;
1024 long lastLogTime = 0;
1025 long lastCountChange = startTime;
1026 int count = countOfRegionServers();
1027 int oldCount = 0;
1028 while (!this.master.isStopped() && count < maxToStart
1029 && (lastCountChange+interval > now || timeout > slept || count < minToStart)) {
1030
1031 if (oldCount != count || lastLogTime+interval < now){
1032 lastLogTime = now;
1033 String msg =
1034 "Waiting for region servers count to settle; currently"+
1035 " checked in " + count + ", slept for " + slept + " ms," +
1036 " expecting minimum of " + minToStart + ", maximum of "+ maxToStart+
1037 ", timeout of "+timeout+" ms, interval of "+interval+" ms.";
1038 LOG.info(msg);
1039 status.setStatus(msg);
1040 }
1041
1042
1043 final long sleepTime = 50;
1044 Thread.sleep(sleepTime);
1045 now = System.currentTimeMillis();
1046 slept = now - startTime;
1047
1048 oldCount = count;
1049 count = countOfRegionServers();
1050 if (count != oldCount) {
1051 lastCountChange = now;
1052 }
1053 }
1054
1055 LOG.info("Finished waiting for region servers count to settle;" +
1056 " checked in " + count + ", slept for " + slept + " ms," +
1057 " expecting minimum of " + minToStart + ", maximum of "+ maxToStart+","+
1058 " master is "+ (this.master.isStopped() ? "stopped.": "running")
1059 );
1060 }
1061
1062
1063
1064
1065 public List<ServerName> getOnlineServersList() {
1066
1067
1068 return new ArrayList<ServerName>(this.onlineServers.keySet());
1069 }
1070
1071
1072
1073
1074 public List<ServerName> getDrainingServersList() {
1075 return new ArrayList<ServerName>(this.drainingServers);
1076 }
1077
1078
1079
1080
1081 Set<ServerName> getDeadNotExpiredServers() {
1082 return new HashSet<ServerName>(this.queuedDeadServers);
1083 }
1084
1085
1086
1087
1088
1089
1090 void removeRequeuedDeadServers() {
1091 requeuedDeadServers.clear();
1092 }
1093
1094
1095
1096
1097
1098 Map<ServerName, Boolean> getRequeuedDeadServers() {
1099 return Collections.unmodifiableMap(this.requeuedDeadServers);
1100 }
1101
1102 public boolean isServerOnline(ServerName serverName) {
1103 return serverName != null && onlineServers.containsKey(serverName);
1104 }
1105
1106
1107
1108
1109
1110 public boolean isServerWithSameHostnamePortOnline(final ServerName serverName) {
1111 return findServerWithSameHostnamePortWithLock(serverName) != null;
1112 }
1113
1114
1115
1116
1117
1118
1119
1120 public synchronized boolean isServerDead(ServerName serverName) {
1121 return serverName == null || deadservers.isDeadServer(serverName)
1122 || queuedDeadServers.contains(serverName)
1123 || requeuedDeadServers.containsKey(serverName);
1124 }
1125
1126 public void shutdownCluster() {
1127 this.clusterShutdown = true;
1128 this.master.stop("Cluster shutdown requested");
1129 }
1130
1131 public boolean isClusterShutdown() {
1132 return this.clusterShutdown;
1133 }
1134
1135
1136
1137
1138 public void stop() {
1139 if (connection != null) {
1140 try {
1141 connection.close();
1142 } catch (IOException e) {
1143 LOG.error("Attempt to close connection to master failed", e);
1144 }
1145 }
1146 }
1147
1148
1149
1150
1151
1152
1153 public List<ServerName> createDestinationServersList(final List<ServerName> serversToExclude){
1154 final List<ServerName> destServers = getOnlineServersList();
1155
1156 if (serversToExclude != null){
1157 destServers.removeAll(serversToExclude);
1158 }
1159
1160
1161 final List<ServerName> drainingServersCopy = getDrainingServersList();
1162 if (!drainingServersCopy.isEmpty()) {
1163 for (final ServerName server: drainingServersCopy) {
1164 destServers.remove(server);
1165 }
1166 }
1167
1168
1169 removeDeadNotExpiredServers(destServers);
1170 return destServers;
1171 }
1172
1173
1174
1175
1176 public List<ServerName> createDestinationServersList(){
1177 return createDestinationServersList(null);
1178 }
1179
1180
1181
1182
1183
1184
1185
1186 void removeDeadNotExpiredServers(List<ServerName> servers) {
1187 Set<ServerName> deadNotExpiredServersCopy = this.getDeadNotExpiredServers();
1188 if (!deadNotExpiredServersCopy.isEmpty()) {
1189 for (ServerName server : deadNotExpiredServersCopy) {
1190 LOG.debug("Removing dead but not expired server: " + server
1191 + " from eligible server pool.");
1192 servers.remove(server);
1193 }
1194 }
1195 }
1196
1197
1198
1199
1200 void clearDeadServersWithSameHostNameAndPortOfOnlineServer() {
1201 for (ServerName serverName : getOnlineServersList()) {
1202 deadservers.cleanAllPreviousInstances(serverName);
1203 }
1204 }
1205
1206
1207
1208
1209 public void removeRegion(final HRegionInfo regionInfo) {
1210 final byte[] encodedName = regionInfo.getEncodedNameAsBytes();
1211 storeFlushedSequenceIdsByRegion.remove(encodedName);
1212 flushedSequenceIdByRegion.remove(encodedName);
1213 }
1214
1215
1216
1217
1218 public void removeRegions(final List<HRegionInfo> regions) {
1219 for (HRegionInfo hri: regions) {
1220 removeRegion(hri);
1221 }
1222 }
1223 }