View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.net.InetAddress;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Map.Entry;
31  import java.util.Set;
32  import java.util.concurrent.ConcurrentHashMap;
33  import java.util.concurrent.ConcurrentNavigableMap;
34  import java.util.concurrent.ConcurrentSkipListMap;
35  import java.util.concurrent.CopyOnWriteArrayList;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.ClockOutOfSyncException;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HRegionInfo;
43  import org.apache.hadoop.hbase.NotServingRegionException;
44  import org.apache.hadoop.hbase.RegionLoad;
45  import org.apache.hadoop.hbase.Server;
46  import org.apache.hadoop.hbase.ServerLoad;
47  import org.apache.hadoop.hbase.ServerName;
48  import org.apache.hadoop.hbase.YouAreDeadException;
49  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
50  import org.apache.hadoop.hbase.classification.InterfaceAudience;
51  import org.apache.hadoop.hbase.client.ClusterConnection;
52  import org.apache.hadoop.hbase.client.ConnectionFactory;
53  import org.apache.hadoop.hbase.client.RetriesExhaustedException;
54  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
55  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
56  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
57  import org.apache.hadoop.hbase.master.handler.MetaServerShutdownHandler;
58  import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
59  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
60  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
61  import org.apache.hadoop.hbase.protobuf.RequestConverter;
62  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
63  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
64  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
65  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
66  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
67  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
68  import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
69  import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
70  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
71  import org.apache.hadoop.hbase.regionserver.HRegionServer;
72  import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
73  import org.apache.hadoop.hbase.util.Bytes;
74  import org.apache.hadoop.hbase.util.Triple;
75  import org.apache.hadoop.hbase.util.RetryCounter;
76  import org.apache.hadoop.hbase.util.RetryCounterFactory;
77  import org.apache.hadoop.hbase.util.Threads;
78  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
79  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
80  import org.apache.zookeeper.KeeperException;
81  
82  import com.google.common.annotations.VisibleForTesting;
83  import com.google.protobuf.ByteString;
84  import com.google.protobuf.ServiceException;
85  
86  /**
87   * The ServerManager class manages info about region servers.
88   * <p>
89   * Maintains lists of online and dead servers.  Processes the startups,
90   * shutdowns, and deaths of region servers.
91   * <p>
92   * Servers are distinguished in two different ways.  A given server has a
93   * location, specified by hostname and port, and of which there can only be one
94   * online at any given time.  A server instance is specified by the location
95   * (hostname and port) as well as the startcode (timestamp from when the server
96   * was started).  This is used to differentiate a restarted instance of a given
97   * server from the original instance.
98   * <p>
99   * If a sever is known not to be running any more, it is called dead. The dead
100  * server needs to be handled by a ServerShutdownHandler.  If the handler is not
101  * enabled yet, the server can't be handled right away so it is queued up.
102  * After the handler is enabled, the server will be submitted to a handler to handle.
103  * However, the handler may be just partially enabled.  If so,
104  * the server cannot be fully processed, and be queued up for further processing.
105  * A server is fully processed only after the handler is fully enabled
106  * and has completed the handling.
107  */
108 @InterfaceAudience.Private
109 public class ServerManager {
110   public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
111       "hbase.master.wait.on.regionservers.maxtostart";
112 
113   public static final String WAIT_ON_REGIONSERVERS_MINTOSTART =
114       "hbase.master.wait.on.regionservers.mintostart";
115 
116   public static final String WAIT_ON_REGIONSERVERS_TIMEOUT =
117       "hbase.master.wait.on.regionservers.timeout";
118 
119   public static final String WAIT_ON_REGIONSERVERS_INTERVAL =
120       "hbase.master.wait.on.regionservers.interval";
121 
122   private static final Log LOG = LogFactory.getLog(ServerManager.class);
123 
124   // Set if we are to shutdown the cluster.
125   private volatile boolean clusterShutdown = false;
126 
127   private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion =
128     new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
129 
130   private final ConcurrentNavigableMap<byte[], ConcurrentNavigableMap<byte[], Long>>
131     storeFlushedSequenceIdsByRegion =
132     new ConcurrentSkipListMap<byte[], ConcurrentNavigableMap<byte[], Long>>(Bytes.BYTES_COMPARATOR);
133 
134   /** Map of registered servers to their current load */
135   private final ConcurrentHashMap<ServerName, ServerLoad> onlineServers =
136     new ConcurrentHashMap<ServerName, ServerLoad>();
137 
138   /**
139    * Map of admin interfaces per registered regionserver; these interfaces we use to control
140    * regionservers out on the cluster
141    */
142   private final Map<ServerName, AdminService.BlockingInterface> rsAdmins =
143     new HashMap<ServerName, AdminService.BlockingInterface>();
144 
145   /**
146    * List of region servers <ServerName> that should not get any more new
147    * regions.
148    */
149   private final ArrayList<ServerName> drainingServers =
150     new ArrayList<ServerName>();
151 
152   private final Server master;
153   private final MasterServices services;
154   private final ClusterConnection connection;
155 
156   private final DeadServer deadservers = new DeadServer();
157 
158   private final long maxSkew;
159   private final long warningSkew;
160 
161   private final RetryCounterFactory pingRetryCounterFactory;
162   private final RpcControllerFactory rpcControllerFactory;
163 
164   /**
165    * Set of region servers which are dead but not processed immediately. If one
166    * server died before master enables ServerShutdownHandler, the server will be
167    * added to this set and will be processed through calling
168    * {@link ServerManager#processQueuedDeadServers()} by master.
169    * <p>
170    * A dead server is a server instance known to be dead, not listed in the /hbase/rs
171    * znode any more. It may have not been submitted to ServerShutdownHandler yet
172    * because the handler is not enabled.
173    * <p>
174    * A dead server, which has been submitted to ServerShutdownHandler while the
175    * handler is not enabled, is queued up.
176    * <p>
177    * So this is a set of region servers known to be dead but not submitted to
178    * ServerShutdownHander for processing yet.
179    */
180   private Set<ServerName> queuedDeadServers = new HashSet<ServerName>();
181 
182   /**
183    * Set of region servers which are dead and submitted to ServerShutdownHandler to process but not
184    * fully processed immediately.
185    * <p>
186    * If one server died before assignment manager finished the failover cleanup, the server will be
187    * added to this set and will be processed through calling
188    * {@link ServerManager#processQueuedDeadServers()} by assignment manager.
189    * <p>
190    * The Boolean value indicates whether log split is needed inside ServerShutdownHandler
191    * <p>
192    * ServerShutdownHandler processes a dead server submitted to the handler after the handler is
193    * enabled. It may not be able to complete the processing because meta is not yet online or master
194    * is currently in startup mode. In this case, the dead server will be parked in this set
195    * temporarily.
196    */
197   private Map<ServerName, Boolean> requeuedDeadServers
198     = new ConcurrentHashMap<ServerName, Boolean>();
199 
200   /** Listeners that are called on server events. */
201   private List<ServerListener> listeners = new CopyOnWriteArrayList<ServerListener>();
202 
203   /**
204    * Constructor.
205    * @param master
206    * @param services
207    * @throws ZooKeeperConnectionException
208    */
209   public ServerManager(final Server master, final MasterServices services)
210       throws IOException {
211     this(master, services, true);
212   }
213 
214   ServerManager(final Server master, final MasterServices services,
215       final boolean connect) throws IOException {
216     this.master = master;
217     this.services = services;
218     Configuration c = master.getConfiguration();
219     maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
220     warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
221     this.connection = connect ? (ClusterConnection)ConnectionFactory.createConnection(c) : null;
222     int pingMaxAttempts = Math.max(1, master.getConfiguration().getInt(
223       "hbase.master.maximum.ping.server.attempts", 10));
224     int pingSleepInterval = Math.max(1, master.getConfiguration().getInt(
225       "hbase.master.ping.server.retry.sleep.interval", 100));
226     this.pingRetryCounterFactory = new RetryCounterFactory(pingMaxAttempts, pingSleepInterval);
227     this.rpcControllerFactory = this.connection == null
228         ? null
229         : connection.getRpcControllerFactory();
230   }
231 
232   /**
233    * Add the listener to the notification list.
234    * @param listener The ServerListener to register
235    */
236   public void registerListener(final ServerListener listener) {
237     this.listeners.add(listener);
238   }
239 
240   /**
241    * Remove the listener from the notification list.
242    * @param listener The ServerListener to unregister
243    */
244   public boolean unregisterListener(final ServerListener listener) {
245     return this.listeners.remove(listener);
246   }
247 
248   /**
249    * Let the server manager know a new regionserver has come online
250    * @param request the startup request
251    * @param ia the InetAddress from which request is received
252    * @return The ServerName we know this server as.
253    * @throws IOException
254    */
255   ServerName regionServerStartup(RegionServerStartupRequest request, InetAddress ia)
256       throws IOException {
257     // Test for case where we get a region startup message from a regionserver
258     // that has been quickly restarted but whose znode expiration handler has
259     // not yet run, or from a server whose fail we are currently processing.
260     // Test its host+port combo is present in serverAddresstoServerInfo.  If it
261     // is, reject the server and trigger its expiration. The next time it comes
262     // in, it should have been removed from serverAddressToServerInfo and queued
263     // for processing by ProcessServerShutdown.
264 
265     final String hostname = request.hasUseThisHostnameInstead() ?
266         request.getUseThisHostnameInstead() :ia.getHostName();
267     ServerName sn = ServerName.valueOf(hostname, request.getPort(),
268       request.getServerStartCode());
269     checkClockSkew(sn, request.getServerCurrentTime());
270     checkIsDead(sn, "STARTUP");
271     if (!checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
272       LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup"
273         + " could not record the server: " + sn);
274     }
275     return sn;
276   }
277 
278   private ConcurrentNavigableMap<byte[], Long> getOrCreateStoreFlushedSequenceId(
279     byte[] regionName) {
280     ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
281         storeFlushedSequenceIdsByRegion.get(regionName);
282     if (storeFlushedSequenceId != null) {
283       return storeFlushedSequenceId;
284     }
285     storeFlushedSequenceId = new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
286     ConcurrentNavigableMap<byte[], Long> alreadyPut =
287         storeFlushedSequenceIdsByRegion.putIfAbsent(regionName, storeFlushedSequenceId);
288     return alreadyPut == null ? storeFlushedSequenceId : alreadyPut;
289   }
290   /**
291    * Updates last flushed sequence Ids for the regions on server sn
292    * @param sn
293    * @param hsl
294    */
295   private void updateLastFlushedSequenceIds(ServerName sn, ServerLoad hsl) {
296     Map<byte[], RegionLoad> regionsLoad = hsl.getRegionsLoad();
297     for (Entry<byte[], RegionLoad> entry : regionsLoad.entrySet()) {
298       byte[] encodedRegionName = Bytes.toBytes(HRegionInfo.encodeRegionName(entry.getKey()));
299       Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName);
300       long l = entry.getValue().getCompleteSequenceId();
301       // Don't let smaller sequence ids override greater sequence ids.
302       if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) {
303         flushedSequenceIdByRegion.put(encodedRegionName, l);
304       } else if (l != HConstants.NO_SEQNUM && l < existingValue) {
305         LOG.warn("RegionServer " + sn + " indicates a last flushed sequence id ("
306             + l + ") that is less than the previous last flushed sequence id ("
307             + existingValue + ") for region " + Bytes.toString(entry.getKey()) + " Ignoring.");
308       }
309       ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
310           getOrCreateStoreFlushedSequenceId(encodedRegionName);
311       for (StoreSequenceId storeSeqId : entry.getValue().getStoreCompleteSequenceId()) {
312         byte[] family = storeSeqId.getFamilyName().toByteArray();
313         existingValue = storeFlushedSequenceId.get(family);
314         l = storeSeqId.getSequenceId();
315         // Don't let smaller sequence ids override greater sequence ids.
316         if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue.longValue())) {
317           storeFlushedSequenceId.put(family, l);
318         }
319       }
320     }
321   }
322 
323   void regionServerReport(ServerName sn,
324       ServerLoad sl) throws YouAreDeadException {
325     checkIsDead(sn, "REPORT");
326     if (null == this.onlineServers.replace(sn, sl)) {
327       // Already have this host+port combo and its just different start code?
328       // Just let the server in. Presume master joining a running cluster.
329       // recordNewServer is what happens at the end of reportServerStartup.
330       // The only thing we are skipping is passing back to the regionserver
331       // the ServerName to use. Here we presume a master has already done
332       // that so we'll press on with whatever it gave us for ServerName.
333       if (!checkAndRecordNewServer(sn, sl)) {
334         LOG.info("RegionServerReport ignored, could not record the server: " + sn);
335         return; // Not recorded, so no need to move on
336       }
337     }
338     updateLastFlushedSequenceIds(sn, sl);
339   }
340 
341   /**
342    * Check is a server of same host and port already exists,
343    * if not, or the existed one got a smaller start code, record it.
344    *
345    * @param sn the server to check and record
346    * @param sl the server load on the server
347    * @return true if the server is recorded, otherwise, false
348    */
349   boolean checkAndRecordNewServer(
350       final ServerName serverName, final ServerLoad sl) {
351     ServerName existingServer = null;
352     synchronized (this.onlineServers) {
353       existingServer = findServerWithSameHostnamePortWithLock(serverName);
354       if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
355         LOG.info("Server serverName=" + serverName + " rejected; we already have "
356             + existingServer.toString() + " registered with same hostname and port");
357         return false;
358       }
359       recordNewServerWithLock(serverName, sl);
360     }
361 
362     // Tell our listeners that a server was added
363     if (!this.listeners.isEmpty()) {
364       for (ServerListener listener : this.listeners) {
365         listener.serverAdded(serverName);
366       }
367     }
368 
369     // Note that we assume that same ts means same server, and don't expire in that case.
370     //  TODO: ts can theoretically collide due to clock shifts, so this is a bit hacky.
371     if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) {
372       LOG.info("Triggering server recovery; existingServer " +
373           existingServer + " looks stale, new server:" + serverName);
374       expireServer(existingServer);
375     }
376     return true;
377   }
378 
379   /**
380    * Checks if the clock skew between the server and the master. If the clock skew exceeds the
381    * configured max, it will throw an exception; if it exceeds the configured warning threshold,
382    * it will log a warning but start normally.
383    * @param serverName Incoming servers's name
384    * @param serverCurrentTime
385    * @throws ClockOutOfSyncException if the skew exceeds the configured max value
386    */
387   private void checkClockSkew(final ServerName serverName, final long serverCurrentTime)
388   throws ClockOutOfSyncException {
389     long skew = Math.abs(System.currentTimeMillis() - serverCurrentTime);
390     if (skew > maxSkew) {
391       String message = "Server " + serverName + " has been " +
392         "rejected; Reported time is too far out of sync with master.  " +
393         "Time difference of " + skew + "ms > max allowed of " + maxSkew + "ms";
394       LOG.warn(message);
395       throw new ClockOutOfSyncException(message);
396     } else if (skew > warningSkew){
397       String message = "Reported time for server " + serverName + " is out of sync with master " +
398         "by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " +
399         "error threshold is " + maxSkew + "ms)";
400       LOG.warn(message);
401     }
402   }
403 
404   /**
405    * If this server is on the dead list, reject it with a YouAreDeadException.
406    * If it was dead but came back with a new start code, remove the old entry
407    * from the dead list.
408    * @param serverName
409    * @param what START or REPORT
410    * @throws org.apache.hadoop.hbase.YouAreDeadException
411    */
412   private void checkIsDead(final ServerName serverName, final String what)
413       throws YouAreDeadException {
414     if (this.deadservers.isDeadServer(serverName)) {
415       // host name, port and start code all match with existing one of the
416       // dead servers. So, this server must be dead.
417       String message = "Server " + what + " rejected; currently processing " +
418           serverName + " as dead server";
419       LOG.debug(message);
420       throw new YouAreDeadException(message);
421     }
422     // remove dead server with same hostname and port of newly checking in rs after master
423     // initialization.See HBASE-5916 for more information.
424     if ((this.services == null || ((HMaster) this.services).isInitialized())
425         && this.deadservers.cleanPreviousInstance(serverName)) {
426       // This server has now become alive after we marked it as dead.
427       // We removed it's previous entry from the dead list to reflect it.
428       LOG.debug(what + ":" + " Server " + serverName + " came back up," +
429           " removed it from the dead servers list");
430     }
431   }
432 
433   /**
434    * Assumes onlineServers is locked.
435    * @return ServerName with matching hostname and port.
436    */
437   private ServerName findServerWithSameHostnamePortWithLock(
438       final ServerName serverName) {
439     for (ServerName sn: this.onlineServers.keySet()) {
440       if (ServerName.isSameHostnameAndPort(serverName, sn)) return sn;
441     }
442     return null;
443   }
444 
445   /**
446    * Adds the onlineServers list. onlineServers should be locked.
447    * @param serverName The remote servers name.
448    * @param sl
449    * @return Server load from the removed server, if any.
450    */
451   @VisibleForTesting
452   void recordNewServerWithLock(final ServerName serverName, final ServerLoad sl) {
453     LOG.info("Registering server=" + serverName);
454     this.onlineServers.put(serverName, sl);
455     this.rsAdmins.remove(serverName);
456   }
457 
458   public RegionStoreSequenceIds getLastFlushedSequenceId(byte[] encodedRegionName) {
459     RegionStoreSequenceIds.Builder builder = RegionStoreSequenceIds.newBuilder();
460     Long seqId = flushedSequenceIdByRegion.get(encodedRegionName);
461     builder.setLastFlushedSequenceId(seqId != null ? seqId.longValue() : HConstants.NO_SEQNUM);
462     Map<byte[], Long> storeFlushedSequenceId =
463         storeFlushedSequenceIdsByRegion.get(encodedRegionName);
464     if (storeFlushedSequenceId != null) {
465       for (Map.Entry<byte[], Long> entry : storeFlushedSequenceId.entrySet()) {
466         builder.addStoreSequenceId(StoreSequenceId.newBuilder()
467             .setFamilyName(ByteString.copyFrom(entry.getKey()))
468             .setSequenceId(entry.getValue().longValue()).build());
469       }
470     }
471     return builder.build();
472   }
473 
474   /**
475    * @param serverName
476    * @return ServerLoad if serverName is known else null
477    */
478   public ServerLoad getLoad(final ServerName serverName) {
479     return this.onlineServers.get(serverName);
480   }
481 
482   /**
483    * Compute the average load across all region servers.
484    * Currently, this uses a very naive computation - just uses the number of
485    * regions being served, ignoring stats about number of requests.
486    * @return the average load
487    */
488   public double getAverageLoad() {
489     int totalLoad = 0;
490     int numServers = 0;
491     for (ServerLoad sl: this.onlineServers.values()) {
492         numServers++;
493         totalLoad += sl.getNumberOfRegions();
494     }
495     return numServers == 0 ? 0 :
496       (double)totalLoad / (double)numServers;
497   }
498 
499   /** @return the count of active regionservers */
500   public int countOfRegionServers() {
501     // Presumes onlineServers is a concurrent map
502     return this.onlineServers.size();
503   }
504 
505   /**
506    * @return Read-only map of servers to serverinfo
507    */
508   public Map<ServerName, ServerLoad> getOnlineServers() {
509     // Presumption is that iterating the returned Map is OK.
510     synchronized (this.onlineServers) {
511       return Collections.unmodifiableMap(this.onlineServers);
512     }
513   }
514 
515 
516   public DeadServer getDeadServers() {
517     return this.deadservers;
518   }
519 
520   /**
521    * Checks if any dead servers are currently in progress.
522    * @return true if any RS are being processed as dead, false if not
523    */
524   public boolean areDeadServersInProgress() {
525     return this.deadservers.areDeadServersInProgress();
526   }
527 
528   void letRegionServersShutdown() {
529     long previousLogTime = 0;
530     ServerName sn = master.getServerName();
531     ZooKeeperWatcher zkw = master.getZooKeeper();
532     int onlineServersCt;
533     while ((onlineServersCt = onlineServers.size()) > 0){
534 
535       if (System.currentTimeMillis() > (previousLogTime + 1000)) {
536         Set<ServerName> remainingServers = onlineServers.keySet();
537         synchronized (onlineServers) {
538           if (remainingServers.size() == 1 && remainingServers.contains(sn)) {
539             // Master will delete itself later.
540             return;
541           }
542         }
543         StringBuilder sb = new StringBuilder();
544         // It's ok here to not sync on onlineServers - merely logging
545         for (ServerName key : remainingServers) {
546           if (sb.length() > 0) {
547             sb.append(", ");
548           }
549           sb.append(key);
550         }
551         LOG.info("Waiting on regionserver(s) to go down " + sb.toString());
552         previousLogTime = System.currentTimeMillis();
553       }
554 
555       try {
556         List<String> servers = ZKUtil.listChildrenNoWatch(zkw, zkw.rsZNode);
557         if (servers == null || servers.size() == 0 || (servers.size() == 1
558             && servers.contains(sn.toString()))) {
559           LOG.info("ZK shows there is only the master self online, exiting now");
560           // Master could have lost some ZK events, no need to wait more.
561           break;
562         }
563       } catch (KeeperException ke) {
564         LOG.warn("Failed to list regionservers", ke);
565         // ZK is malfunctioning, don't hang here
566         break;
567       }
568       synchronized (onlineServers) {
569         try {
570           if (onlineServersCt == onlineServers.size()) onlineServers.wait(100);
571         } catch (InterruptedException ignored) {
572           // continue
573         }
574       }
575     }
576   }
577 
578   /*
579    * Expire the passed server.  Add it to list of dead servers and queue a
580    * shutdown processing.
581    */
582   public synchronized void expireServer(final ServerName serverName) {
583     if (serverName.equals(master.getServerName())) {
584       if (!(master.isAborted() || master.isStopped())) {
585         master.stop("We lost our znode?");
586       }
587       return;
588     }
589     if (!services.isServerShutdownHandlerEnabled()) {
590       LOG.info("Master doesn't enable ServerShutdownHandler during initialization, "
591           + "delay expiring server " + serverName);
592       this.queuedDeadServers.add(serverName);
593       return;
594     }
595     if (this.deadservers.isDeadServer(serverName)) {
596       // TODO: Can this happen?  It shouldn't be online in this case?
597       LOG.warn("Expiration of " + serverName +
598           " but server shutdown already in progress");
599       return;
600     }
601     synchronized (onlineServers) {
602       if (!this.onlineServers.containsKey(serverName)) {
603         LOG.warn("Expiration of " + serverName + " but server not online");
604       }
605       // Remove the server from the known servers lists and update load info BUT
606       // add to deadservers first; do this so it'll show in dead servers list if
607       // not in online servers list.
608       this.deadservers.add(serverName);
609       this.onlineServers.remove(serverName);
610       onlineServers.notifyAll();
611     }
612     this.rsAdmins.remove(serverName);
613     // If cluster is going down, yes, servers are going to be expiring; don't
614     // process as a dead server
615     if (this.clusterShutdown) {
616       LOG.info("Cluster shutdown set; " + serverName +
617         " expired; onlineServers=" + this.onlineServers.size());
618       if (this.onlineServers.isEmpty()) {
619         master.stop("Cluster shutdown set; onlineServer=0");
620       }
621       return;
622     }
623 
624     boolean carryingMeta = services.getAssignmentManager().isCarryingMeta(serverName) ==
625         AssignmentManager.ServerHostRegion.HOSTING_REGION;
626     if (carryingMeta) {
627       this.services.getExecutorService().submit(new MetaServerShutdownHandler(this.master,
628         this.services, this.deadservers, serverName));
629     } else {
630       this.services.getExecutorService().submit(new ServerShutdownHandler(this.master,
631         this.services, this.deadservers, serverName, true));
632     }
633     LOG.debug("Added=" + serverName +
634       " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta);
635 
636     // Tell our listeners that a server was removed
637     if (!this.listeners.isEmpty()) {
638       for (ServerListener listener : this.listeners) {
639         listener.serverRemoved(serverName);
640       }
641     }
642   }
643 
644   public synchronized void processDeadServer(final ServerName serverName) {
645     this.processDeadServer(serverName, false);
646   }
647 
648   public synchronized void processDeadServer(final ServerName serverName, boolean shouldSplitWal) {
649     // When assignment manager is cleaning up the zookeeper nodes and rebuilding the
650     // in-memory region states, region servers could be down. Meta table can and
651     // should be re-assigned, log splitting can be done too. However, it is better to
652     // wait till the cleanup is done before re-assigning user regions.
653     //
654     // We should not wait in the server shutdown handler thread since it can clog
655     // the handler threads and meta table could not be re-assigned in case
656     // the corresponding server is down. So we queue them up here instead.
657     if (!services.getAssignmentManager().isFailoverCleanupDone()) {
658       requeuedDeadServers.put(serverName, shouldSplitWal);
659       return;
660     }
661 
662     this.deadservers.add(serverName);
663     this.services.getExecutorService().submit(
664       new ServerShutdownHandler(this.master, this.services, this.deadservers, serverName,
665           shouldSplitWal));
666   }
667 
668   /**
669    * Process the servers which died during master's initialization. It will be
670    * called after HMaster#assignMeta and AssignmentManager#joinCluster.
671    * */
672   synchronized void processQueuedDeadServers() {
673     if (!services.isServerShutdownHandlerEnabled()) {
674       LOG.info("Master hasn't enabled ServerShutdownHandler");
675     }
676     Iterator<ServerName> serverIterator = queuedDeadServers.iterator();
677     while (serverIterator.hasNext()) {
678       ServerName tmpServerName = serverIterator.next();
679       expireServer(tmpServerName);
680       serverIterator.remove();
681       requeuedDeadServers.remove(tmpServerName);
682     }
683 
684     if (!services.getAssignmentManager().isFailoverCleanupDone()) {
685       LOG.info("AssignmentManager hasn't finished failover cleanup; waiting");
686     }
687 
688     for(ServerName tmpServerName : requeuedDeadServers.keySet()){
689       processDeadServer(tmpServerName, requeuedDeadServers.get(tmpServerName));
690     }
691     requeuedDeadServers.clear();
692   }
693 
694   /*
695    * Remove the server from the drain list.
696    */
697   public boolean removeServerFromDrainList(final ServerName sn) {
698     // Warn if the server (sn) is not online.  ServerName is of the form:
699     // <hostname> , <port> , <startcode>
700 
701     if (!this.isServerOnline(sn)) {
702       LOG.warn("Server " + sn + " is not currently online. " +
703                "Removing from draining list anyway, as requested.");
704     }
705     // Remove the server from the draining servers lists.
706     return this.drainingServers.remove(sn);
707   }
708 
709   /*
710    * Add the server to the drain list.
711    */
712   public boolean addServerToDrainList(final ServerName sn) {
713     // Warn if the server (sn) is not online.  ServerName is of the form:
714     // <hostname> , <port> , <startcode>
715 
716     if (!this.isServerOnline(sn)) {
717       LOG.warn("Server " + sn + " is not currently online. " +
718                "Ignoring request to add it to draining list.");
719       return false;
720     }
721     // Add the server to the draining servers lists, if it's not already in
722     // it.
723     if (this.drainingServers.contains(sn)) {
724       LOG.warn("Server " + sn + " is already in the draining server list." +
725                "Ignoring request to add it again.");
726       return false;
727     }
728     return this.drainingServers.add(sn);
729   }
730 
731   // RPC methods to region servers
732 
733   /**
734    * Sends an OPEN RPC to the specified server to open the specified region.
735    * <p>
736    * Open should not fail but can if server just crashed.
737    * <p>
738    * @param server server to open a region
739    * @param region region to open
740    * @param versionOfOfflineNode that needs to be present in the offline node
741    * when RS tries to change the state from OFFLINE to other states.
742    * @param favoredNodes
743    */
744   public RegionOpeningState sendRegionOpen(final ServerName server,
745       HRegionInfo region, int versionOfOfflineNode, List<ServerName> favoredNodes)
746   throws IOException {
747     AdminService.BlockingInterface admin = getRsAdmin(server);
748     if (admin == null) {
749       LOG.warn("Attempting to send OPEN RPC to server " + server.toString() +
750         " failed because no RPC connection found to this server");
751       return RegionOpeningState.FAILED_OPENING;
752     }
753     OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server, 
754       region, versionOfOfflineNode, favoredNodes, 
755       (RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode()));
756     try {
757       OpenRegionResponse response = admin.openRegion(null, request);
758       return ResponseConverter.getRegionOpeningState(response);
759     } catch (ServiceException se) {
760       throw ProtobufUtil.getRemoteException(se);
761     }
762   }
763 
764   /**
765    * Sends an OPEN RPC to the specified server to open the specified region.
766    * <p>
767    * Open should not fail but can if server just crashed.
768    * <p>
769    * @param server server to open a region
770    * @param regionOpenInfos info of a list of regions to open
771    * @return a list of region opening states
772    */
773   public List<RegionOpeningState> sendRegionOpen(ServerName server,
774       List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos)
775   throws IOException {
776     AdminService.BlockingInterface admin = getRsAdmin(server);
777     if (admin == null) {
778       LOG.warn("Attempting to send OPEN RPC to server " + server.toString() +
779         " failed because no RPC connection found to this server");
780       return null;
781     }
782 
783     OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server, regionOpenInfos,
784       (RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode()));
785     try {
786       OpenRegionResponse response = admin.openRegion(null, request);
787       return ResponseConverter.getRegionOpeningStateList(response);
788     } catch (ServiceException se) {
789       throw ProtobufUtil.getRemoteException(se);
790     }
791   }
792 
793   private PayloadCarryingRpcController newRpcController() {
794     return rpcControllerFactory == null ? null : rpcControllerFactory.newController();
795   }
796 
797   /**
798    * Sends an CLOSE RPC to the specified server to close the specified region.
799    * <p>
800    * A region server could reject the close request because it either does not
801    * have the specified region or the region is being split.
802    * @param server server to open a region
803    * @param region region to open
804    * @param versionOfClosingNode
805    *   the version of znode to compare when RS transitions the znode from
806    *   CLOSING state.
807    * @param dest - if the region is moved to another server, the destination server. null otherwise.
808    * @return true if server acknowledged close, false if not
809    * @throws IOException
810    */
811   public boolean sendRegionClose(ServerName server, HRegionInfo region,
812     int versionOfClosingNode, ServerName dest, boolean transitionInZK) throws IOException {
813     if (server == null) throw new NullPointerException("Passed server is null");
814     AdminService.BlockingInterface admin = getRsAdmin(server);
815     if (admin == null) {
816       throw new IOException("Attempting to send CLOSE RPC to server " +
817         server.toString() + " for region " +
818         region.getRegionNameAsString() +
819         " failed because no RPC connection found to this server");
820     }
821     PayloadCarryingRpcController controller = newRpcController();
822     return ProtobufUtil.closeRegion(controller, admin, server, region.getRegionName(),
823       versionOfClosingNode, dest, transitionInZK);
824   }
825 
826   public boolean sendRegionClose(ServerName server,
827       HRegionInfo region, int versionOfClosingNode) throws IOException {
828     return sendRegionClose(server, region, versionOfClosingNode, null, true);
829   }
830 
831   /**
832    * Sends a WARMUP RPC to the specified server to warmup the specified region.
833    * <p>
834    * A region server could reject the close request because it either does not
835    * have the specified region or the region is being split.
836    * @param server server to warmup a region
837    * @param region region to  warmup
838    */
839   public void sendRegionWarmup(ServerName server,
840       HRegionInfo region) {
841     if (server == null) return;
842     try {
843       AdminService.BlockingInterface admin = getRsAdmin(server);
844       PayloadCarryingRpcController controller = newRpcController();
845       ProtobufUtil.warmupRegion(controller, admin, region);
846     } catch (IOException e) {
847       LOG.error("Received exception in RPC for warmup server:" +
848         server + "region: " + region +
849         "exception: " + e);
850     }
851   }
852 
853   /**
854    * Contacts a region server and waits up to timeout ms
855    * to close the region.  This bypasses the active hmaster.
856    */
857   public static void closeRegionSilentlyAndWait(ClusterConnection connection,
858     ServerName server, HRegionInfo region, long timeout) throws IOException, InterruptedException {
859     AdminService.BlockingInterface rs = connection.getAdmin(server);
860     PayloadCarryingRpcController controller = connection.getRpcControllerFactory().newController();
861     try {
862       ProtobufUtil.closeRegion(controller, rs, server, region.getRegionName(), false);
863     } catch (IOException e) {
864       LOG.warn("Exception when closing region: " + region.getRegionNameAsString(), e);
865     }
866     long expiration = timeout + System.currentTimeMillis();
867     while (System.currentTimeMillis() < expiration) {
868       try {
869         HRegionInfo rsRegion =
870           ProtobufUtil.getRegionInfo(controller, rs, region.getRegionName());
871         if (rsRegion == null) return;
872       } catch (IOException ioe) {
873         if (ioe instanceof NotServingRegionException) // no need to retry again
874           return;
875         LOG.warn("Exception when retrieving regioninfo from: "
876           + region.getRegionNameAsString(), ioe);
877       }
878       Thread.sleep(1000);
879     }
880     throw new IOException("Region " + region + " failed to close within"
881         + " timeout " + timeout);
882   }
883 
884   /**
885    * Sends an MERGE REGIONS RPC to the specified server to merge the specified
886    * regions.
887    * <p>
888    * A region server could reject the close request because it either does not
889    * have the specified region.
890    * @param server server to merge regions
891    * @param region_a region to merge
892    * @param region_b region to merge
893    * @param forcible true if do a compulsory merge, otherwise we will only merge
894    *          two adjacent regions
895    * @throws IOException
896    */
897   public void sendRegionsMerge(ServerName server, HRegionInfo region_a,
898       HRegionInfo region_b, boolean forcible) throws IOException {
899     if (server == null)
900       throw new NullPointerException("Passed server is null");
901     if (region_a == null || region_b == null)
902       throw new NullPointerException("Passed region is null");
903     AdminService.BlockingInterface admin = getRsAdmin(server);
904     if (admin == null) {
905       throw new IOException("Attempting to send MERGE REGIONS RPC to server "
906           + server.toString() + " for region "
907           + region_a.getRegionNameAsString() + ","
908           + region_b.getRegionNameAsString()
909           + " failed because no RPC connection found to this server");
910     }
911     PayloadCarryingRpcController controller = newRpcController();
912     ProtobufUtil.mergeRegions(controller, admin, region_a, region_b, forcible);
913   }
914 
915   /**
916    * Check if a region server is reachable and has the expected start code
917    */
918   public boolean isServerReachable(ServerName server) {
919     if (server == null) throw new NullPointerException("Passed server is null");
920 
921     synchronized (this.onlineServers) {
922       if (this.deadservers.isDeadServer(server)) {
923         return false;
924       }
925     }
926 
927 
928     RetryCounter retryCounter = pingRetryCounterFactory.create();
929     while (retryCounter.shouldRetry()) {
930       try {
931         PayloadCarryingRpcController controller = newRpcController();
932         AdminService.BlockingInterface admin = getRsAdmin(server);
933         if (admin != null) {
934           ServerInfo info = ProtobufUtil.getServerInfo(controller, admin);
935           return info != null && info.hasServerName()
936             && server.getStartcode() == info.getServerName().getStartCode();
937         }
938       } catch (IOException ioe) {
939         if (LOG.isDebugEnabled()) {
940           LOG.debug("Couldn't reach " + server + ", try=" + retryCounter.getAttemptTimes() + " of "
941               + retryCounter.getMaxAttempts(), ioe);
942         }
943         try {
944           retryCounter.sleepUntilNextRetry();
945         } catch(InterruptedException ie) {
946           Thread.currentThread().interrupt();
947           break;
948         }
949       }
950     }
951     return false;
952   }
953 
954     /**
955     * @param sn
956     * @return Admin interface for the remote regionserver named <code>sn</code>
957     * @throws IOException
958     * @throws RetriesExhaustedException wrapping a ConnectException if failed
959     */
960   private AdminService.BlockingInterface getRsAdmin(final ServerName sn)
961   throws IOException {
962     AdminService.BlockingInterface admin = this.rsAdmins.get(sn);
963     if (admin == null) {
964       LOG.debug("New admin connection to " + sn.toString());
965       if (sn.equals(master.getServerName()) && master instanceof HRegionServer) {
966         // A master is also a region server now, see HBASE-10569 for details
967         admin = ((HRegionServer)master).getRSRpcServices();
968       } else {
969         admin = this.connection.getAdmin(sn);
970       }
971       this.rsAdmins.put(sn, admin);
972     }
973     return admin;
974   }
975 
976   /**
977    * Wait for the region servers to report in.
978    * We will wait until one of this condition is met:
979    *  - the master is stopped
980    *  - the 'hbase.master.wait.on.regionservers.maxtostart' number of
981    *    region servers is reached
982    *  - the 'hbase.master.wait.on.regionservers.mintostart' is reached AND
983    *   there have been no new region server in for
984    *      'hbase.master.wait.on.regionservers.interval' time AND
985    *   the 'hbase.master.wait.on.regionservers.timeout' is reached
986    *
987    * @throws InterruptedException
988    */
989   public void waitForRegionServers(MonitoredTask status)
990   throws InterruptedException {
991     final long interval = this.master.getConfiguration().
992       getLong(WAIT_ON_REGIONSERVERS_INTERVAL, 1500);
993     final long timeout = this.master.getConfiguration().
994       getLong(WAIT_ON_REGIONSERVERS_TIMEOUT, 4500);
995     int defaultMinToStart = 1;
996     if (BaseLoadBalancer.tablesOnMaster(master.getConfiguration())) {
997       // If we assign regions to master, we'd like to start
998       // at least another region server so that we don't
999       // assign all regions to master if other region servers
1000       // don't come up in time.
1001       defaultMinToStart = 2;
1002     }
1003     int minToStart = this.master.getConfiguration().
1004       getInt(WAIT_ON_REGIONSERVERS_MINTOSTART, defaultMinToStart);
1005     if (minToStart < 1) {
1006       LOG.warn(String.format(
1007         "The value of '%s' (%d) can not be less than 1, ignoring.",
1008         WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
1009       minToStart = 1;
1010     }
1011     int maxToStart = this.master.getConfiguration().
1012       getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART, Integer.MAX_VALUE);
1013     if (maxToStart < minToStart) {
1014         LOG.warn(String.format(
1015             "The value of '%s' (%d) is set less than '%s' (%d), ignoring.",
1016             WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart,
1017             WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
1018         maxToStart = Integer.MAX_VALUE;
1019     }
1020 
1021     long now =  System.currentTimeMillis();
1022     final long startTime = now;
1023     long slept = 0;
1024     long lastLogTime = 0;
1025     long lastCountChange = startTime;
1026     int count = countOfRegionServers();
1027     int oldCount = 0;
1028     while (!this.master.isStopped() && count < maxToStart
1029         && (lastCountChange+interval > now || timeout > slept || count < minToStart)) {
1030       // Log some info at every interval time or if there is a change
1031       if (oldCount != count || lastLogTime+interval < now){
1032         lastLogTime = now;
1033         String msg =
1034           "Waiting for region servers count to settle; currently"+
1035             " checked in " + count + ", slept for " + slept + " ms," +
1036             " expecting minimum of " + minToStart + ", maximum of "+ maxToStart+
1037             ", timeout of "+timeout+" ms, interval of "+interval+" ms.";
1038         LOG.info(msg);
1039         status.setStatus(msg);
1040       }
1041 
1042       // We sleep for some time
1043       final long sleepTime = 50;
1044       Thread.sleep(sleepTime);
1045       now =  System.currentTimeMillis();
1046       slept = now - startTime;
1047 
1048       oldCount = count;
1049       count = countOfRegionServers();
1050       if (count != oldCount) {
1051         lastCountChange = now;
1052       }
1053     }
1054 
1055     LOG.info("Finished waiting for region servers count to settle;" +
1056       " checked in " + count + ", slept for " + slept + " ms," +
1057       " expecting minimum of " + minToStart + ", maximum of "+ maxToStart+","+
1058       " master is "+ (this.master.isStopped() ? "stopped.": "running")
1059     );
1060   }
1061 
1062   /**
1063    * @return A copy of the internal list of online servers.
1064    */
1065   public List<ServerName> getOnlineServersList() {
1066     // TODO: optimize the load balancer call so we don't need to make a new list
1067     // TODO: FIX. THIS IS POPULAR CALL.
1068     return new ArrayList<ServerName>(this.onlineServers.keySet());
1069   }
1070 
1071   /**
1072    * @return A copy of the internal list of draining servers.
1073    */
1074   public List<ServerName> getDrainingServersList() {
1075     return new ArrayList<ServerName>(this.drainingServers);
1076   }
1077 
1078   /**
1079    * @return A copy of the internal set of deadNotExpired servers.
1080    */
1081   Set<ServerName> getDeadNotExpiredServers() {
1082     return new HashSet<ServerName>(this.queuedDeadServers);
1083   }
1084 
1085   /**
1086    * During startup, if we figure it is not a failover, i.e. there is
1087    * no more WAL files to split, we won't try to recover these dead servers.
1088    * So we just remove them from the queue. Use caution in calling this.
1089    */
1090   void removeRequeuedDeadServers() {
1091     requeuedDeadServers.clear();
1092   }
1093 
1094   /**
1095    * @return A copy of the internal map of requeuedDeadServers servers and their corresponding
1096    *         splitlog need flag.
1097    */
1098   Map<ServerName, Boolean> getRequeuedDeadServers() {
1099     return Collections.unmodifiableMap(this.requeuedDeadServers);
1100   }
1101 
1102   public boolean isServerOnline(ServerName serverName) {
1103     return serverName != null && onlineServers.containsKey(serverName);
1104   }
1105 
1106   /**
1107    * Check whether a server is online based on hostname and port
1108    * @return true if finding a server with matching hostname and port.
1109    */
1110   public boolean isServerWithSameHostnamePortOnline(final ServerName serverName) {
1111     return findServerWithSameHostnamePortWithLock(serverName) != null;
1112   }
1113 
1114   /**
1115    * Check if a server is known to be dead.  A server can be online,
1116    * or known to be dead, or unknown to this manager (i.e, not online,
1117    * not known to be dead either. it is simply not tracked by the
1118    * master any more, for example, a very old previous instance).
1119    */
1120   public synchronized boolean isServerDead(ServerName serverName) {
1121     return serverName == null || deadservers.isDeadServer(serverName)
1122       || queuedDeadServers.contains(serverName)
1123       || requeuedDeadServers.containsKey(serverName);
1124   }
1125 
1126   public void shutdownCluster() {
1127     this.clusterShutdown = true;
1128     this.master.stop("Cluster shutdown requested");
1129   }
1130 
1131   public boolean isClusterShutdown() {
1132     return this.clusterShutdown;
1133   }
1134 
1135   /**
1136    * Stop the ServerManager.  Currently closes the connection to the master.
1137    */
1138   public void stop() {
1139     if (connection != null) {
1140       try {
1141         connection.close();
1142       } catch (IOException e) {
1143         LOG.error("Attempt to close connection to master failed", e);
1144       }
1145     }
1146   }
1147 
1148   /**
1149    * Creates a list of possible destinations for a region. It contains the online servers, but not
1150    *  the draining or dying servers.
1151    *  @param serversToExclude can be null if there is no server to exclude
1152    */
1153   public List<ServerName> createDestinationServersList(final List<ServerName> serversToExclude){
1154     final List<ServerName> destServers = getOnlineServersList();
1155 
1156     if (serversToExclude != null){
1157       destServers.removeAll(serversToExclude);
1158     }
1159 
1160     // Loop through the draining server list and remove them from the server list
1161     final List<ServerName> drainingServersCopy = getDrainingServersList();
1162     if (!drainingServersCopy.isEmpty()) {
1163       for (final ServerName server: drainingServersCopy) {
1164         destServers.remove(server);
1165       }
1166     }
1167 
1168     // Remove the deadNotExpired servers from the server list.
1169     removeDeadNotExpiredServers(destServers);
1170     return destServers;
1171   }
1172 
1173   /**
1174    * Calls {@link #createDestinationServersList} without server to exclude.
1175    */
1176   public List<ServerName> createDestinationServersList(){
1177     return createDestinationServersList(null);
1178   }
1179 
1180     /**
1181     * Loop through the deadNotExpired server list and remove them from the
1182     * servers.
1183     * This function should be used carefully outside of this class. You should use a high level
1184     *  method such as {@link #createDestinationServersList()} instead of managing you own list.
1185     */
1186   void removeDeadNotExpiredServers(List<ServerName> servers) {
1187     Set<ServerName> deadNotExpiredServersCopy = this.getDeadNotExpiredServers();
1188     if (!deadNotExpiredServersCopy.isEmpty()) {
1189       for (ServerName server : deadNotExpiredServersCopy) {
1190         LOG.debug("Removing dead but not expired server: " + server
1191           + " from eligible server pool.");
1192         servers.remove(server);
1193       }
1194     }
1195   }
1196 
1197   /**
1198    * To clear any dead server with same host name and port of any online server
1199    */
1200   void clearDeadServersWithSameHostNameAndPortOfOnlineServer() {
1201     for (ServerName serverName : getOnlineServersList()) {
1202       deadservers.cleanAllPreviousInstances(serverName);
1203     }
1204   }
1205 
1206   /**
1207    * Called by delete table and similar to notify the ServerManager that a region was removed.
1208    */
1209   public void removeRegion(final HRegionInfo regionInfo) {
1210     final byte[] encodedName = regionInfo.getEncodedNameAsBytes();
1211     storeFlushedSequenceIdsByRegion.remove(encodedName);
1212     flushedSequenceIdByRegion.remove(encodedName);
1213   }
1214 
1215   /**
1216    * Called by delete table and similar to notify the ServerManager that a region was removed.
1217    */
1218   public void removeRegions(final List<HRegionInfo> regions) {
1219     for (HRegionInfo hri: regions) {
1220       removeRegion(hri);
1221     }
1222   }
1223 }