View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.Comparator;
28  import java.util.HashMap;
29  import java.util.HashSet;
30  import java.util.Iterator;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.NavigableMap;
34  import java.util.Random;
35  import java.util.Set;
36  import java.util.TreeMap;
37  import java.util.concurrent.Callable;
38  import java.util.concurrent.ConcurrentHashMap;
39  import java.util.concurrent.CopyOnWriteArrayList;
40  import java.util.concurrent.ThreadFactory;
41  import java.util.concurrent.TimeUnit;
42  import java.util.concurrent.atomic.AtomicBoolean;
43  import java.util.concurrent.atomic.AtomicInteger;
44  import java.util.concurrent.locks.Lock;
45  import java.util.concurrent.locks.ReentrantLock;
46  
47  import org.apache.commons.logging.Log;
48  import org.apache.commons.logging.LogFactory;
49  import org.apache.hadoop.conf.Configuration;
50  import org.apache.hadoop.fs.FileSystem;
51  import org.apache.hadoop.fs.Path;
52  import org.apache.hadoop.hbase.CoordinatedStateException;
53  import org.apache.hadoop.hbase.HBaseIOException;
54  import org.apache.hadoop.hbase.HConstants;
55  import org.apache.hadoop.hbase.HRegionInfo;
56  import org.apache.hadoop.hbase.HRegionLocation;
57  import org.apache.hadoop.hbase.HTableDescriptor;
58  import org.apache.hadoop.hbase.MetaTableAccessor;
59  import org.apache.hadoop.hbase.NotServingRegionException;
60  import org.apache.hadoop.hbase.RegionLocations;
61  import org.apache.hadoop.hbase.RegionTransition;
62  import org.apache.hadoop.hbase.ServerName;
63  import org.apache.hadoop.hbase.TableName;
64  import org.apache.hadoop.hbase.TableNotFoundException;
65  import org.apache.hadoop.hbase.TableStateManager;
66  import org.apache.hadoop.hbase.classification.InterfaceAudience;
67  import org.apache.hadoop.hbase.client.Admin;
68  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
69  import org.apache.hadoop.hbase.client.Result;
70  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
71  import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
72  import org.apache.hadoop.hbase.coordination.RegionMergeCoordination;
73  import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination.SplitTransactionDetails;
74  import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
75  import org.apache.hadoop.hbase.coordination.ZkRegionMergeCoordination;
76  import org.apache.hadoop.hbase.exceptions.DeserializationException;
77  import org.apache.hadoop.hbase.executor.EventHandler;
78  import org.apache.hadoop.hbase.executor.EventType;
79  import org.apache.hadoop.hbase.executor.ExecutorService;
80  import org.apache.hadoop.hbase.ipc.FailedServerException;
81  import org.apache.hadoop.hbase.ipc.RpcClient;
82  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
83  import org.apache.hadoop.hbase.master.RegionState.State;
84  import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
85  import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
86  import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
87  import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
88  import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
89  import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
90  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
91  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
92  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
93  import org.apache.hadoop.hbase.quotas.RegionStateListener;
94  import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
95  import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
96  import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
97  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
98  import org.apache.hadoop.hbase.util.ConfigUtil;
99  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
100 import org.apache.hadoop.hbase.util.FSUtils;
101 import org.apache.hadoop.hbase.util.KeyLocker;
102 import org.apache.hadoop.hbase.util.Pair;
103 import org.apache.hadoop.hbase.util.PairOfSameType;
104 import org.apache.hadoop.hbase.util.Threads;
105 import org.apache.hadoop.hbase.util.Triple;
106 import org.apache.hadoop.hbase.util.VersionInfo;
107 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
108 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
109 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
110 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
111 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
112 import org.apache.hadoop.ipc.RemoteException;
113 import org.apache.hadoop.util.StringUtils;
114 import org.apache.zookeeper.AsyncCallback;
115 import org.apache.zookeeper.KeeperException;
116 import org.apache.zookeeper.KeeperException.NoNodeException;
117 import org.apache.zookeeper.KeeperException.NodeExistsException;
118 import org.apache.zookeeper.data.Stat;
119 
120 import com.google.common.annotations.VisibleForTesting;
121 import com.google.common.collect.LinkedHashMultimap;
122 
123 /**
124  * Manages and performs region assignment.
125  * <p>
126  * Monitors ZooKeeper for events related to regions in transition.
127  * <p>
128  * Handles existing regions in transition during master failover.
129  */
130 @InterfaceAudience.Private
131 public class AssignmentManager extends ZooKeeperListener {
132   private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
133 
134   public static final ServerName HBCK_CODE_SERVERNAME = ServerName.valueOf(HConstants.HBCK_CODE_NAME,
135       -1, -1L);
136 
137   static final String ALREADY_IN_TRANSITION_WAITTIME
138     = "hbase.assignment.already.intransition.waittime";
139   static final int DEFAULT_ALREADY_IN_TRANSITION_WAITTIME = 60000; // 1 minute
140 
141   protected final MasterServices server;
142 
143   private ServerManager serverManager;
144 
145   private boolean shouldAssignRegionsWithFavoredNodes;
146 
147   private LoadBalancer balancer;
148 
149   private final MetricsAssignmentManager metricsAssignmentManager;
150 
151   private final TableLockManager tableLockManager;
152 
153   private AtomicInteger numRegionsOpened = new AtomicInteger(0);
154 
155   final private KeyLocker<String> locker = new KeyLocker<String>();
156 
157   Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>());
158 
159   /**
160    * Map of regions to reopen after the schema of a table is changed. Key -
161    * encoded region name, value - HRegionInfo
162    */
163   private final Map <String, HRegionInfo> regionsToReopen;
164 
165   /*
166    * Maximum times we recurse an assignment/unassignment.
167    * See below in {@link #assign()} and {@link #unassign()}.
168    */
169   private final int maximumAttempts;
170 
171   /**
172    * Map of two merging regions from the region to be created.
173    */
174   private final Map<String, PairOfSameType<HRegionInfo>> mergingRegions
175     = new HashMap<String, PairOfSameType<HRegionInfo>>();
176 
177   private final Map<HRegionInfo, PairOfSameType<HRegionInfo>> splitRegions
178   = new HashMap<HRegionInfo, PairOfSameType<HRegionInfo>>();
179 
180   /**
181    * The sleep time for which the assignment will wait before retrying in case of hbase:meta assignment
182    * failure due to lack of availability of region plan or bad region plan
183    */
184   private final long sleepTimeBeforeRetryingMetaAssignment;
185 
186   /** Plans for region movement. Key is the encoded version of a region name*/
187   // TODO: When do plans get cleaned out?  Ever? In server open and in server
188   // shutdown processing -- St.Ack
189   // All access to this Map must be synchronized.
190   final NavigableMap<String, RegionPlan> regionPlans =
191     new TreeMap<String, RegionPlan>();
192 
193   private final TableStateManager tableStateManager;
194 
195   private final ExecutorService executorService;
196 
197   // For unit tests, keep track of calls to ClosedRegionHandler
198   private Map<HRegionInfo, AtomicBoolean> closedRegionHandlerCalled = null;
199 
200   // For unit tests, keep track of calls to OpenedRegionHandler
201   private Map<HRegionInfo, AtomicBoolean> openedRegionHandlerCalled = null;
202 
203   //Thread pool executor service for timeout monitor
204   private java.util.concurrent.ExecutorService threadPoolExecutorService;
205 
206   // A bunch of ZK events workers. Each is a single thread executor service
207   private final java.util.concurrent.ExecutorService zkEventWorkers;
208 
209   private List<EventType> ignoreStatesRSOffline = Arrays.asList(
210       EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED);
211 
212   private final RegionStates regionStates;
213 
214   // The threshold to use bulk assigning. Using bulk assignment
215   // only if assigning at least this many regions to at least this
216   // many servers. If assigning fewer regions to fewer servers,
217   // bulk assigning may be not as efficient.
218   private final int bulkAssignThresholdRegions;
219   private final int bulkAssignThresholdServers;
220   private final int bulkPerRegionOpenTimeGuesstimate;
221 
222   // Should bulk assignment wait till all regions are assigned,
223   // or it is timed out?  This is useful to measure bulk assignment
224   // performance, but not needed in most use cases.
225   private final boolean bulkAssignWaitTillAllAssigned;
226 
227   /**
228    * Indicator that AssignmentManager has recovered the region states so
229    * that ServerShutdownHandler can be fully enabled and re-assign regions
230    * of dead servers. So that when re-assignment happens, AssignmentManager
231    * has proper region states.
232    *
233    * Protected to ease testing.
234    */
235   protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
236 
237   /**
238    * A map to track the count a region fails to open in a row.
239    * So that we don't try to open a region forever if the failure is
240    * unrecoverable.  We don't put this information in region states
241    * because we don't expect this to happen frequently; we don't
242    * want to copy this information over during each state transition either.
243    */
244   private final ConcurrentHashMap<String, AtomicInteger>
245     failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
246 
247   // A flag to indicate if we are using ZK for region assignment
248   private final boolean useZKForAssignment;
249 
250   // In case not using ZK for region assignment, region states
251   // are persisted in meta with a state store
252   private final RegionStateStore regionStateStore;
253 
254   /**
255    * For testing only!  Set to true to skip handling of split.
256    */
257   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
258   public static boolean TEST_SKIP_SPLIT_HANDLING = false;
259 
260   /** Listeners that are called on assignment events. */
261   private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
262 
263   private RegionStateListener regionStateListener;
264 
265   public enum ServerHostRegion {
266     NOT_HOSTING_REGION, HOSTING_REGION, UNKNOWN,
267   }
268 
269   private final Object checkIfShouldMoveSystemRegionLock = new Object();
270 
271   /**
272    * Constructs a new assignment manager.
273    *
274    * @param server instance of HMaster this AM running inside
275    * @param serverManager serverManager for associated HMaster
276    * @param balancer implementation of {@link LoadBalancer}
277    * @param service Executor service
278    * @param metricsMaster metrics manager
279    * @param tableLockManager TableLock manager
280    * @throws KeeperException
281    * @throws IOException
282    */
283   public AssignmentManager(MasterServices server, ServerManager serverManager,
284       final LoadBalancer balancer,
285       final ExecutorService service, MetricsMaster metricsMaster,
286       final TableLockManager tableLockManager) throws KeeperException,
287         IOException, CoordinatedStateException {
288     super(server.getZooKeeper());
289     this.server = server;
290     this.serverManager = serverManager;
291     this.executorService = service;
292     this.regionStateStore = new RegionStateStore(server);
293     this.regionsToReopen = Collections.synchronizedMap
294                            (new HashMap<String, HRegionInfo> ());
295     Configuration conf = server.getConfiguration();
296     // Only read favored nodes if using the favored nodes load balancer.
297     this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
298            HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
299            FavoredNodeLoadBalancer.class);
300     try {
301       if (server.getCoordinatedStateManager() != null) {
302         this.tableStateManager = server.getCoordinatedStateManager().getTableStateManager();
303       } else {
304         this.tableStateManager = null;
305       }
306     } catch (InterruptedException e) {
307       throw new InterruptedIOException();
308     }
309     // This is the max attempts, not retries, so it should be at least 1.
310     this.maximumAttempts = Math.max(1,
311       this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
312     this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
313         "hbase.meta.assignment.retry.sleeptime", 1000l);
314     this.balancer = balancer;
315     int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
316     this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
317       maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
318     this.regionStates = new RegionStates(
319       server, tableStateManager, serverManager, regionStateStore);
320 
321     this.bulkAssignWaitTillAllAssigned =
322       conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
323     this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
324     this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
325     this.bulkPerRegionOpenTimeGuesstimate =
326       conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000);
327 
328     int workers = conf.getInt("hbase.assignment.zkevent.workers", 20);
329     ThreadFactory threadFactory = Threads.newDaemonThreadFactory("AM.ZK.Worker");
330     zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L,
331             TimeUnit.SECONDS, threadFactory);
332     this.tableLockManager = tableLockManager;
333 
334     this.metricsAssignmentManager = new MetricsAssignmentManager();
335     useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
336   }
337 
338   MetricsAssignmentManager getAssignmentManagerMetrics() {
339     return this.metricsAssignmentManager;
340   }
341 
342   /**
343    * Add the listener to the notification list.
344    * @param listener The AssignmentListener to register
345    */
346   public void registerListener(final AssignmentListener listener) {
347     this.listeners.add(listener);
348   }
349 
350   /**
351    * Remove the listener from the notification list.
352    * @param listener The AssignmentListener to unregister
353    */
354   public boolean unregisterListener(final AssignmentListener listener) {
355     return this.listeners.remove(listener);
356   }
357 
358   /**
359    * @return Instance of ZKTableStateManager.
360    */
361   public TableStateManager getTableStateManager() {
362     // These are 'expensive' to make involving trip to zk ensemble so allow
363     // sharing.
364     return this.tableStateManager;
365   }
366 
367   /**
368    * This SHOULD not be public. It is public now
369    * because of some unit tests.
370    *
371    * TODO: make it package private and keep RegionStates in the master package
372    */
373   public RegionStates getRegionStates() {
374     return regionStates;
375   }
376 
377   /**
378    * Used in some tests to mock up region state in meta
379    */
380   @VisibleForTesting
381   RegionStateStore getRegionStateStore() {
382     return regionStateStore;
383   }
384 
385   public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
386     return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
387   }
388 
389   /**
390    * Add a regionPlan for the specified region.
391    * @param encodedName
392    * @param plan
393    */
394   public void addPlan(String encodedName, RegionPlan plan) {
395     synchronized (regionPlans) {
396       regionPlans.put(encodedName, plan);
397     }
398   }
399 
400   /**
401    * Add a map of region plans.
402    */
403   public void addPlans(Map<String, RegionPlan> plans) {
404     synchronized (regionPlans) {
405       regionPlans.putAll(plans);
406     }
407   }
408 
409   /**
410    * Set the list of regions that will be reopened
411    * because of an update in table schema
412    *
413    * @param regions
414    *          list of regions that should be tracked for reopen
415    */
416   public void setRegionsToReopen(List <HRegionInfo> regions) {
417     for(HRegionInfo hri : regions) {
418       regionsToReopen.put(hri.getEncodedName(), hri);
419     }
420   }
421 
422   /**
423    * Used by the client to identify if all regions have the schema updates
424    *
425    * @param tableName
426    * @return Pair indicating the status of the alter command
427    * @throws IOException
428    */
429   public Pair<Integer, Integer> getReopenStatus(TableName tableName)
430       throws IOException {
431     List<HRegionInfo> hris;
432     if (TableName.META_TABLE_NAME.equals(tableName)) {
433       hris = new MetaTableLocator().getMetaRegions(server.getZooKeeper());
434     } else {
435       hris = MetaTableAccessor.getTableRegions(server.getZooKeeper(),
436         server.getConnection(), tableName, true);
437     }
438 
439     Integer pending = 0;
440     for (HRegionInfo hri : hris) {
441       String name = hri.getEncodedName();
442       // no lock concurrent access ok: sequential consistency respected.
443       if (regionsToReopen.containsKey(name)
444           || regionStates.isRegionInTransition(name)) {
445         pending++;
446       }
447     }
448     return new Pair<Integer, Integer>(pending, hris.size());
449   }
450 
451   /**
452    * Used by ServerShutdownHandler to make sure AssignmentManager has completed
453    * the failover cleanup before re-assigning regions of dead servers. So that
454    * when re-assignment happens, AssignmentManager has proper region states.
455    */
456   public boolean isFailoverCleanupDone() {
457     return failoverCleanupDone.get();
458   }
459 
460   /**
461    * To avoid racing with AM, external entities may need to lock a region,
462    * for example, when SSH checks what regions to skip re-assigning.
463    */
464   public Lock acquireRegionLock(final String encodedName) {
465     return locker.acquireLock(encodedName);
466   }
467 
468   /**
469    * Now, failover cleanup is completed. Notify server manager to
470    * process queued up dead servers processing, if any.
471    */
472   void failoverCleanupDone() {
473     failoverCleanupDone.set(true);
474     serverManager.processQueuedDeadServers();
475   }
476 
477   /**
478    * Called on startup.
479    * Figures whether a fresh cluster start of we are joining extant running cluster.
480    * @throws IOException
481    * @throws KeeperException
482    * @throws InterruptedException
483    * @throws CoordinatedStateException
484    */
485   void joinCluster() throws IOException,
486       KeeperException, InterruptedException, CoordinatedStateException {
487     long startTime = System.currentTimeMillis();
488     // Concurrency note: In the below the accesses on regionsInTransition are
489     // outside of a synchronization block where usually all accesses to RIT are
490     // synchronized.  The presumption is that in this case it is safe since this
491     // method is being played by a single thread on startup.
492 
493     // TODO: Regions that have a null location and are not in regionsInTransitions
494     // need to be handled.
495 
496     // Scan hbase:meta to build list of existing regions, servers, and assignment
497     // Returns servers who have not checked in (assumed dead) that some regions
498     // were assigned to (according to the meta)
499     Set<ServerName> deadServers = rebuildUserRegions();
500 
501     // This method will assign all user regions if a clean server startup or
502     // it will reconstruct master state and cleanup any leftovers from
503     // previous master process.
504     boolean failover = processDeadServersAndRegionsInTransition(deadServers);
505 
506     if (!useZKForAssignment) {
507       // Not use ZK for assignment any more, remove the ZNode
508       ZKUtil.deleteNodeRecursively(watcher, watcher.assignmentZNode);
509     }
510     recoverTableInDisablingState();
511     recoverTableInEnablingState();
512     LOG.info("Joined the cluster in " + (System.currentTimeMillis()
513       - startTime) + "ms, failover=" + failover);
514   }
515 
516   /**
517    * Process all regions that are in transition in zookeeper and also
518    * processes the list of dead servers by scanning the META.
519    * Used by master joining an cluster.  If we figure this is a clean cluster
520    * startup, will assign all user regions.
521    * @param deadServers
522    *          Map of dead servers and their regions. Can be null.
523    * @throws KeeperException
524    * @throws IOException
525    * @throws InterruptedException
526    */
527   boolean processDeadServersAndRegionsInTransition(
528       final Set<ServerName> deadServers) throws KeeperException,
529         IOException, InterruptedException, CoordinatedStateException {
530     List<String> nodes = ZKUtil.listChildrenNoWatch(watcher,
531       watcher.assignmentZNode);
532 
533     if (useZKForAssignment && nodes == null) {
534       String errorMessage = "Failed to get the children from ZK";
535       server.abort(errorMessage, new IOException(errorMessage));
536       return true; // Doesn't matter in this case
537     }
538 
539     boolean failover = !serverManager.getDeadServers().isEmpty();
540     if (failover) {
541       // This may not be a failover actually, especially if meta is on this master.
542       if (LOG.isDebugEnabled()) {
543         LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
544       }
545     } else {
546       // If any one region except meta is assigned, it's a failover.
547       Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
548       for (Map.Entry<HRegionInfo, ServerName> en:
549           regionStates.getRegionAssignments().entrySet()) {
550         HRegionInfo hri = en.getKey();
551         if (!hri.isMetaTable()
552             && onlineServers.contains(en.getValue())) {
553           LOG.debug("Found " + hri + " out on cluster");
554           failover = true;
555           break;
556         }
557       }
558       if (!failover && nodes != null) {
559         // If any one region except meta is in transition, it's a failover.
560         for (String encodedName: nodes) {
561           RegionState regionState = regionStates.getRegionState(encodedName);
562           if (regionState != null && !regionState.getRegion().isMetaRegion()) {
563             LOG.debug("Found " + regionState + " in RITs");
564             failover = true;
565             break;
566           }
567         }
568       }
569     }
570     if (!failover && !useZKForAssignment) {
571       // If any region except meta is in transition on a live server, it's a failover.
572       Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
573       if (!regionsInTransition.isEmpty()) {
574         Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
575         for (RegionState regionState: regionsInTransition.values()) {
576           ServerName serverName = regionState.getServerName();
577           if (!regionState.getRegion().isMetaRegion()
578               && serverName != null && onlineServers.contains(serverName)) {
579             LOG.debug("Found " + regionState + " in RITs");
580             failover = true;
581             break;
582           }
583         }
584       }
585     }
586     if (!failover) {
587       // If we get here, we have a full cluster restart. It is a failover only
588       // if there are some WALs are not split yet. For meta WALs, they should have
589       // been split already, if any. We can walk through those queued dead servers,
590       // if they don't have any WALs, this restart should be considered as a clean one
591       Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
592       if (!queuedDeadServers.isEmpty()) {
593         Configuration conf = server.getConfiguration();
594         Path walRootDir = FSUtils.getWALRootDir(conf);
595         FileSystem walFs = FSUtils.getWALFileSystem(conf);
596         for (ServerName serverName: queuedDeadServers) {
597           // In the case of a clean exit, the shutdown handler would have presplit any WALs and
598           // removed empty directories.
599           Path walDir = new Path(walRootDir,
600               DefaultWALProvider.getWALDirectoryName(serverName.toString()));
601           Path splitDir = walDir.suffix(DefaultWALProvider.SPLITTING_EXT);
602           if (walFs.exists(walDir) || walFs.exists(splitDir)) {
603             LOG.debug("Found queued dead server " + serverName);
604             failover = true;
605             break;
606           }
607         }
608         if (!failover) {
609           // We figured that it's not a failover, so no need to
610           // work on these re-queued dead servers any more.
611           LOG.info("AM figured that it's not a failover and cleaned up "
612             + queuedDeadServers.size() + " queued dead servers");
613           serverManager.removeRequeuedDeadServers();
614         }
615       }
616     }
617 
618     Set<TableName> disabledOrDisablingOrEnabling = null;
619     Map<HRegionInfo, ServerName> allRegions = null;
620 
621     if (!failover) {
622       disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
623         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING,
624         ZooKeeperProtos.Table.State.ENABLING);
625 
626       // Clean re/start, mark all user regions closed before reassignment
627       allRegions = regionStates.closeAllUserRegions(
628         disabledOrDisablingOrEnabling);
629     }
630 
631     // Now region states are restored
632     regionStateStore.start();
633 
634     // If we found user regions out on cluster, its a failover.
635     if (failover) {
636       LOG.info("Found regions out on cluster or in RIT; presuming failover");
637       // Process list of dead servers and regions in RIT.
638       // See HBASE-4580 for more information.
639       processDeadServersAndRecoverLostRegions(deadServers);
640     }
641 
642     if (!failover && useZKForAssignment) {
643       // Cleanup any existing ZK nodes and start watching
644       ZKAssign.deleteAllNodes(watcher);
645       ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
646         this.watcher.assignmentZNode);
647     }
648 
649     // Now we can safely claim failover cleanup completed and enable
650     // ServerShutdownHandler for further processing. The nodes (below)
651     // in transition, if any, are for regions not related to those
652     // dead servers at all, and can be done in parallel to SSH.
653     failoverCleanupDone();
654     if (!failover) {
655       // Fresh cluster startup.
656       LOG.info("Clean cluster startup. Assigning user regions");
657       assignAllUserRegions(allRegions);
658     }
659     // unassign replicas of the split parents and the merged regions
660     // the daughter replicas are opened in assignAllUserRegions if it was
661     // not already opened.
662     for (HRegionInfo h : replicasToClose) {
663       unassign(h);
664     }
665     replicasToClose.clear();
666     return failover;
667   }
668 
669   /**
670    * If region is up in zk in transition, then do fixup and block and wait until
671    * the region is assigned and out of transition.  Used on startup for
672    * catalog regions.
673    * @param hri Region to look for.
674    * @return True if we processed a region in transition else false if region
675    * was not up in zk in transition.
676    * @throws InterruptedException
677    * @throws KeeperException
678    * @throws IOException
679    */
680   boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)
681       throws InterruptedException, KeeperException, IOException {
682     String encodedRegionName = hri.getEncodedName();
683     if (!processRegionInTransition(encodedRegionName, hri)) {
684       return false; // The region is not in transition
685     }
686     LOG.debug("Waiting on " + HRegionInfo.prettyPrint(encodedRegionName));
687     while (!this.server.isStopped() &&
688         this.regionStates.isRegionInTransition(encodedRegionName)) {
689       RegionState state = this.regionStates.getRegionTransitionState(encodedRegionName);
690       if (state == null || !serverManager.isServerOnline(state.getServerName())) {
691         // The region is not in transition, or not in transition on an online
692         // server. Doesn't help to block here any more. Caller need to
693         // verify the region is actually assigned.
694         break;
695       }
696       this.regionStates.waitForUpdate(100);
697     }
698     return true;
699   }
700 
701   /**
702    * Process failover of new master for region <code>encodedRegionName</code>
703    * up in zookeeper.
704    * @param encodedRegionName Region to process failover for.
705    * @param regionInfo If null we'll go get it from meta table.
706    * @return True if we processed <code>regionInfo</code> as a RIT.
707    * @throws KeeperException
708    * @throws IOException
709    */
710   boolean processRegionInTransition(final String encodedRegionName,
711       final HRegionInfo regionInfo) throws KeeperException, IOException {
712     // We need a lock here to ensure that we will not put the same region twice
713     // It has no reason to be a lock shared with the other operations.
714     // We can do the lock on the region only, instead of a global lock: what we want to ensure
715     // is that we don't have two threads working on the same region.
716     Lock lock = locker.acquireLock(encodedRegionName);
717     try {
718       Stat stat = new Stat();
719       byte [] data = ZKAssign.getDataAndWatch(watcher, encodedRegionName, stat);
720       if (data == null) return false;
721       RegionTransition rt;
722       try {
723         rt = RegionTransition.parseFrom(data);
724       } catch (DeserializationException e) {
725         LOG.warn("Failed parse znode data", e);
726         return false;
727       }
728       HRegionInfo hri = regionInfo;
729       if (hri == null) {
730         // The region info is not passed in. We will try to find the region
731         // from region states map/meta based on the encoded region name. But we
732         // may not be able to find it. This is valid for online merge that
733         // the region may have not been created if the merge is not completed.
734         // Therefore, it is not in meta at master recovery time.
735         hri = regionStates.getRegionInfo(rt.getRegionName());
736         EventType et = rt.getEventType();
737         if (hri == null && et != EventType.RS_ZK_REGION_MERGING
738             && et != EventType.RS_ZK_REQUEST_REGION_MERGE) {
739           LOG.warn("Couldn't find the region in recovering " + rt);
740           return false;
741         }
742       }
743 
744       // TODO: This code is tied to ZK anyway, so for now leaving it as is,
745       // will refactor when whole region assignment will be abstracted from ZK
746       BaseCoordinatedStateManager cp =
747         (BaseCoordinatedStateManager) this.server.getCoordinatedStateManager();
748       OpenRegionCoordination openRegionCoordination = cp.getOpenRegionCoordination();
749 
750       ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
751         new ZkOpenRegionCoordination.ZkOpenRegionDetails();
752       zkOrd.setVersion(stat.getVersion());
753       zkOrd.setServerName(cp.getServer().getServerName());
754 
755       return processRegionsInTransition(
756         rt, hri, openRegionCoordination, zkOrd);
757     } finally {
758       lock.unlock();
759     }
760   }
761 
762   /**
763    * This call is invoked only (1) master assign meta;
764    * (2) during failover mode startup, zk assignment node processing.
765    * The locker is set in the caller. It returns true if the region
766    * is in transition for sure, false otherwise.
767    *
768    * It should be private but it is used by some test too.
769    */
770   boolean processRegionsInTransition(
771       final RegionTransition rt, final HRegionInfo regionInfo,
772       OpenRegionCoordination coordination,
773       final OpenRegionCoordination.OpenRegionDetails ord) throws KeeperException {
774     EventType et = rt.getEventType();
775     // Get ServerName.  Could not be null.
776     final ServerName sn = rt.getServerName();
777     final byte[] regionName = rt.getRegionName();
778     final String encodedName = HRegionInfo.encodeRegionName(regionName);
779     final String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
780     LOG.info("Processing " + prettyPrintedRegionName + " in state: " + et);
781 
782     if (regionStates.isRegionInTransition(encodedName)
783         && (regionInfo.isMetaRegion() || !useZKForAssignment)) {
784       LOG.info("Processed region " + prettyPrintedRegionName + " in state: "
785         + et + ", does nothing since the region is already in transition "
786         + regionStates.getRegionTransitionState(encodedName));
787       // Just return
788       return true;
789     }
790     if (!serverManager.isServerOnline(sn)) {
791       // It was transitioning on a dead server, so it's closed now.
792       // Force to OFFLINE and put it in transition, but not assign it
793       // since log splitting for the dead server is not done yet.
794       LOG.debug("RIT " + encodedName + " in state=" + rt.getEventType() +
795         " was on deadserver; forcing offline");
796       if (regionStates.isRegionOnline(regionInfo)) {
797         // Meta could still show the region is assigned to the previous
798         // server. If that server is online, when we reload the meta, the
799         // region is put back to online, we need to offline it.
800         regionStates.regionOffline(regionInfo);
801         sendRegionClosedNotification(regionInfo);
802       }
803       // Put it back in transition so that SSH can re-assign it
804       regionStates.updateRegionState(regionInfo, State.OFFLINE, sn);
805 
806       if (regionInfo.isMetaRegion()) {
807         // If it's meta region, reset the meta location.
808         // So that master knows the right meta region server.
809         MetaTableLocator.setMetaLocation(watcher, sn, State.OPEN);
810       } else {
811         // No matter the previous server is online or offline,
812         // we need to reset the last region server of the region.
813         regionStates.setLastRegionServerOfRegion(sn, encodedName);
814         // Make sure we know the server is dead.
815         if (!serverManager.isServerDead(sn)) {
816           serverManager.expireServer(sn);
817         }
818       }
819       return false;
820     }
821     switch (et) {
822       case M_ZK_REGION_CLOSING:
823         // Insert into RIT & resend the query to the region server: may be the previous master
824         // died before sending the query the first time.
825         final RegionState rsClosing = regionStates.updateRegionState(rt, State.CLOSING);
826         this.executorService.submit(
827           new EventHandler(server, EventType.M_MASTER_RECOVERY) {
828             @Override
829             public void process() throws IOException {
830               ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
831               try {
832                 final int expectedVersion = ((ZkOpenRegionCoordination.ZkOpenRegionDetails) ord)
833                   .getVersion();
834                 unassign(regionInfo, rsClosing, expectedVersion, null, useZKForAssignment, null);
835                 if (regionStates.isRegionOffline(regionInfo)) {
836                   assign(regionInfo, true);
837                 }
838               } finally {
839                 lock.unlock();
840               }
841             }
842           });
843         break;
844 
845       case RS_ZK_REGION_CLOSED:
846       case RS_ZK_REGION_FAILED_OPEN:
847         // Region is closed, insert into RIT and handle it
848         regionStates.setRegionStateTOCLOSED(regionInfo, sn);
849         if (!replicasToClose.contains(regionInfo)) {
850           invokeAssign(regionInfo);
851         } else {
852           offlineDisabledRegion(regionInfo);
853         }
854         break;
855 
856       case M_ZK_REGION_OFFLINE:
857         // Insert in RIT and resend to the regionserver
858         regionStates.updateRegionState(rt, State.OFFLINE);
859         final RegionState rsOffline = regionStates.getRegionState(regionInfo);
860         this.executorService.submit(
861           new EventHandler(server, EventType.M_MASTER_RECOVERY) {
862             @Override
863             public void process() throws IOException {
864               ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
865               try {
866                 RegionPlan plan = new RegionPlan(regionInfo, null, sn);
867                 addPlan(encodedName, plan);
868                 assign(rsOffline, true, false);
869               } finally {
870                 lock.unlock();
871               }
872             }
873           });
874         break;
875 
876       case RS_ZK_REGION_OPENING:
877         regionStates.updateRegionState(rt, State.OPENING);
878         break;
879 
880       case RS_ZK_REGION_OPENED:
881         // Region is opened, insert into RIT and handle it
882         // This could be done asynchronously, we would need then to acquire the lock in the
883         //  handler.
884         regionStates.updateRegionState(rt, State.OPEN);
885         new OpenedRegionHandler(server, this, regionInfo, coordination, ord).process();
886         break;
887       case RS_ZK_REQUEST_REGION_SPLIT:
888       case RS_ZK_REGION_SPLITTING:
889       case RS_ZK_REGION_SPLIT:
890         // Splitting region should be online. We could have skipped it during
891         // user region rebuilding since we may consider the split is completed.
892         // Put it in SPLITTING state to avoid complications.
893         regionStates.regionOnline(regionInfo, sn);
894         regionStates.updateRegionState(rt, State.SPLITTING);
895         if (!handleRegionSplitting(
896             rt, encodedName, prettyPrintedRegionName, sn)) {
897           deleteSplittingNode(encodedName, sn);
898         }
899         break;
900       case RS_ZK_REQUEST_REGION_MERGE:
901       case RS_ZK_REGION_MERGING:
902       case RS_ZK_REGION_MERGED:
903         if (!handleRegionMerging(
904             rt, encodedName, prettyPrintedRegionName, sn)) {
905           deleteMergingNode(encodedName, sn);
906         }
907         break;
908       default:
909         throw new IllegalStateException("Received region in state:" + et + " is not valid.");
910     }
911     LOG.info("Processed region " + prettyPrintedRegionName + " in state "
912       + et + ", on " + (serverManager.isServerOnline(sn) ? "" : "dead ")
913       + "server: " + sn);
914     return true;
915   }
916 
917   /**
918    * When a region is closed, it should be removed from the regionsToReopen
919    * @param hri HRegionInfo of the region which was closed
920    */
921   public void removeClosedRegion(HRegionInfo hri) {
922     if (regionsToReopen.remove(hri.getEncodedName()) != null) {
923       LOG.debug("Removed region from reopening regions because it was closed");
924     }
925   }
926 
927   /**
928    * Handles various states an unassigned node can be in.
929    * <p>
930    * Method is called when a state change is suspected for an unassigned node.
931    * <p>
932    * This deals with skipped transitions (we got a CLOSED but didn't see CLOSING
933    * yet).
934    * @param rt region transition
935    * @param coordination coordination for opening region
936    * @param ord details about opening region
937    */
938   void handleRegion(final RegionTransition rt, OpenRegionCoordination coordination,
939                     OpenRegionCoordination.OpenRegionDetails ord) {
940     if (rt == null) {
941       LOG.warn("Unexpected NULL input for RegionTransition rt");
942       return;
943     }
944     final ServerName sn = rt.getServerName();
945     // Check if this is a special HBCK transition
946     if (sn.equals(HBCK_CODE_SERVERNAME)) {
947       handleHBCK(rt);
948       return;
949     }
950     final long createTime = rt.getCreateTime();
951     final byte[] regionName = rt.getRegionName();
952     String encodedName = HRegionInfo.encodeRegionName(regionName);
953     String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
954     // Verify this is a known server
955     if (!serverManager.isServerOnline(sn)
956       && !ignoreStatesRSOffline.contains(rt.getEventType())) {
957       LOG.warn("Attempted to handle region transition for server but " +
958         "it is not online: " + prettyPrintedRegionName + ", " + rt);
959       return;
960     }
961 
962     RegionState regionState =
963       regionStates.getRegionState(encodedName);
964     long startTime = System.currentTimeMillis();
965     if (LOG.isDebugEnabled()) {
966       boolean lateEvent = createTime < (startTime - 15000);
967       LOG.debug("Handling " + rt.getEventType() +
968         ", server=" + sn + ", region=" +
969         (prettyPrintedRegionName == null ? "null" : prettyPrintedRegionName) +
970         (lateEvent ? ", which is more than 15 seconds late" : "") +
971         ", current_state=" + regionState);
972     }
973     // We don't do anything for this event,
974     // so separate it out, no need to lock/unlock anything
975     if (rt.getEventType() == EventType.M_ZK_REGION_OFFLINE) {
976       return;
977     }
978 
979     // We need a lock on the region as we could update it
980     Lock lock = locker.acquireLock(encodedName);
981     try {
982       RegionState latestState =
983         regionStates.getRegionState(encodedName);
984       if ((regionState == null && latestState != null)
985           || (regionState != null && latestState == null)
986           || (regionState != null && latestState != null
987             && latestState.getState() != regionState.getState())) {
988         LOG.warn("Region state changed from " + regionState + " to "
989           + latestState + ", while acquiring lock");
990       }
991       long waitedTime = System.currentTimeMillis() - startTime;
992       if (waitedTime > 5000) {
993         LOG.warn("Took " + waitedTime + "ms to acquire the lock");
994       }
995       regionState = latestState;
996       switch (rt.getEventType()) {
997       case RS_ZK_REQUEST_REGION_SPLIT:
998       case RS_ZK_REGION_SPLITTING:
999       case RS_ZK_REGION_SPLIT:
1000         if (!handleRegionSplitting(
1001             rt, encodedName, prettyPrintedRegionName, sn)) {
1002           deleteSplittingNode(encodedName, sn);
1003         }
1004         break;
1005 
1006       case RS_ZK_REQUEST_REGION_MERGE:
1007       case RS_ZK_REGION_MERGING:
1008       case RS_ZK_REGION_MERGED:
1009         // Merged region is a new region, we can't find it in the region states now.
1010         // However, the two merging regions are not new. They should be in state for merging.
1011         if (!handleRegionMerging(
1012             rt, encodedName, prettyPrintedRegionName, sn)) {
1013           deleteMergingNode(encodedName, sn);
1014         }
1015         break;
1016 
1017       case M_ZK_REGION_CLOSING:
1018         // Should see CLOSING after we have asked it to CLOSE or additional
1019         // times after already being in state of CLOSING
1020         if (regionState == null
1021             || !regionState.isPendingCloseOrClosingOnServer(sn)) {
1022           LOG.warn("Received CLOSING for " + prettyPrintedRegionName
1023             + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
1024             + regionStates.getRegionState(encodedName));
1025           return;
1026         }
1027         // Transition to CLOSING (or update stamp if already CLOSING)
1028         regionStates.updateRegionState(rt, State.CLOSING);
1029         break;
1030 
1031       case RS_ZK_REGION_CLOSED:
1032         // Should see CLOSED after CLOSING but possible after PENDING_CLOSE
1033         if (regionState == null
1034             || !regionState.isPendingCloseOrClosingOnServer(sn)) {
1035           LOG.warn("Received CLOSED for " + prettyPrintedRegionName
1036             + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
1037             + regionStates.getRegionState(encodedName));
1038           return;
1039         }
1040         // Handle CLOSED by assigning elsewhere or stopping if a disable
1041         // If we got here all is good.  Need to update RegionState -- else
1042         // what follows will fail because not in expected state.
1043         new ClosedRegionHandler(server, this, regionState.getRegion()).process();
1044         updateClosedRegionHandlerTracker(regionState.getRegion());
1045         break;
1046 
1047         case RS_ZK_REGION_FAILED_OPEN:
1048           if (regionState == null
1049               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1050             LOG.warn("Received FAILED_OPEN for " + prettyPrintedRegionName
1051               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1052               + regionStates.getRegionState(encodedName));
1053             return;
1054           }
1055           AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
1056           if (failedOpenCount == null) {
1057             failedOpenCount = new AtomicInteger();
1058             // No need to use putIfAbsent, or extra synchronization since
1059             // this whole handleRegion block is locked on the encoded region
1060             // name, and failedOpenTracker is updated only in this block
1061             failedOpenTracker.put(encodedName, failedOpenCount);
1062           }
1063           if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
1064             regionStates.updateRegionState(rt, State.FAILED_OPEN);
1065             // remove the tracking info to save memory, also reset
1066             // the count for next open initiative
1067             failedOpenTracker.remove(encodedName);
1068           } else {
1069             // Handle this the same as if it were opened and then closed.
1070             regionState = regionStates.setRegionStateTOCLOSED(rt.getRegionName(), sn);
1071             if (regionState != null) {
1072               // When there are more than one region server a new RS is selected as the
1073               // destination and the same is updated in the regionplan. (HBASE-5546)
1074               getRegionPlan(regionState.getRegion(), sn, true);
1075               new ClosedRegionHandler(server, this, regionState.getRegion()).process();
1076             }
1077           }
1078           break;
1079 
1080         case RS_ZK_REGION_OPENING:
1081           // Should see OPENING after we have asked it to OPEN or additional
1082           // times after already being in state of OPENING
1083           if (regionState == null
1084               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1085             LOG.warn("Received OPENING for " + prettyPrintedRegionName
1086               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1087               + regionStates.getRegionState(encodedName));
1088             return;
1089           }
1090           // Transition to OPENING (or update stamp if already OPENING)
1091           regionStates.updateRegionState(rt, State.OPENING);
1092           break;
1093 
1094         case RS_ZK_REGION_OPENED:
1095           // Should see OPENED after OPENING but possible after PENDING_OPEN.
1096           if (regionState == null
1097               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1098             LOG.warn("Received OPENED for " + prettyPrintedRegionName
1099               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1100               + regionStates.getRegionState(encodedName));
1101 
1102             if (regionState != null) {
1103               if(regionState.isOpened() && regionState.getServerName().equals(sn)) {
1104                 //if this region was opened before on this rs, we don't have to unassign it. It won't cause
1105                 //double assign. One possible scenario of what happened is HBASE-17275
1106                 failedOpenTracker.remove(encodedName); // reset the count, if any
1107                 new OpenedRegionHandler(
1108                     server, this, regionState.getRegion(), coordination, ord).process();
1109                 updateOpenedRegionHandlerTracker(regionState.getRegion());
1110               } else {
1111                 // Close it without updating the internal region states,
1112                 // so as not to create double assignments in unlucky scenarios
1113                 // mentioned in OpenRegionHandler#process
1114                 unassign(regionState.getRegion(), null, -1, null, false, sn);
1115               }
1116             }
1117             return;
1118           }
1119           // Handle OPENED by removing from transition and deleted zk node
1120           regionState =
1121               regionStates.transitionOpenFromPendingOpenOrOpeningOnServer(rt,regionState, sn);
1122           if (regionState != null) {
1123             failedOpenTracker.remove(encodedName); // reset the count, if any
1124             new OpenedRegionHandler(
1125               server, this, regionState.getRegion(), coordination, ord).process();
1126             updateOpenedRegionHandlerTracker(regionState.getRegion());
1127           }
1128           break;
1129 
1130         default:
1131           throw new IllegalStateException("Received event is not valid.");
1132       }
1133     } finally {
1134       lock.unlock();
1135     }
1136   }
1137 
1138   //For unit tests only
1139   boolean wasClosedHandlerCalled(HRegionInfo hri) {
1140     AtomicBoolean b = closedRegionHandlerCalled.get(hri);
1141     //compareAndSet to be sure that unit tests don't see stale values. Means,
1142     //we will return true exactly once unless the handler code resets to true
1143     //this value.
1144     return b == null ? false : b.compareAndSet(true, false);
1145   }
1146 
1147   //For unit tests only
1148   boolean wasOpenedHandlerCalled(HRegionInfo hri) {
1149     AtomicBoolean b = openedRegionHandlerCalled.get(hri);
1150     //compareAndSet to be sure that unit tests don't see stale values. Means,
1151     //we will return true exactly once unless the handler code resets to true
1152     //this value.
1153     return b == null ? false : b.compareAndSet(true, false);
1154   }
1155 
1156   //For unit tests only
1157   void initializeHandlerTrackers() {
1158     closedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1159     openedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1160   }
1161 
1162   void updateClosedRegionHandlerTracker(HRegionInfo hri) {
1163     if (closedRegionHandlerCalled != null) { //only for unit tests this is true
1164       closedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1165     }
1166   }
1167 
1168   void updateOpenedRegionHandlerTracker(HRegionInfo hri) {
1169     if (openedRegionHandlerCalled != null) { //only for unit tests this is true
1170       openedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1171     }
1172   }
1173 
1174   // TODO: processFavoredNodes might throw an exception, for e.g., if the
1175   // meta could not be contacted/updated. We need to see how seriously to treat
1176   // this problem as. Should we fail the current assignment. We should be able
1177   // to recover from this problem eventually (if the meta couldn't be updated
1178   // things should work normally and eventually get fixed up).
1179   void processFavoredNodes(List<HRegionInfo> regions) throws IOException {
1180     if (!shouldAssignRegionsWithFavoredNodes) return;
1181     // The AM gets the favored nodes info for each region and updates the meta
1182     // table with that info
1183     Map<HRegionInfo, List<ServerName>> regionToFavoredNodes =
1184         new HashMap<HRegionInfo, List<ServerName>>();
1185     for (HRegionInfo region : regions) {
1186       regionToFavoredNodes.put(region,
1187           ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region));
1188     }
1189     FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes,
1190       this.server.getConnection());
1191   }
1192 
1193   /**
1194    * Handle a ZK unassigned node transition triggered by HBCK repair tool.
1195    * <p>
1196    * This is handled in a separate code path because it breaks the normal rules.
1197    * @param rt
1198    */
1199   @SuppressWarnings("deprecation")
1200   private void handleHBCK(RegionTransition rt) {
1201     String encodedName = HRegionInfo.encodeRegionName(rt.getRegionName());
1202     LOG.info("Handling HBCK triggered transition=" + rt.getEventType() +
1203       ", server=" + rt.getServerName() + ", region=" +
1204       HRegionInfo.prettyPrint(encodedName));
1205     RegionState regionState = regionStates.getRegionTransitionState(encodedName);
1206     switch (rt.getEventType()) {
1207       case M_ZK_REGION_OFFLINE:
1208         HRegionInfo regionInfo;
1209         if (regionState != null) {
1210           regionInfo = regionState.getRegion();
1211         } else {
1212           try {
1213             byte [] name = rt.getRegionName();
1214             Pair<HRegionInfo, ServerName> p = MetaTableAccessor.getRegion(
1215               this.server.getConnection(), name);
1216             regionInfo = p.getFirst();
1217           } catch (IOException e) {
1218             LOG.info("Exception reading hbase:meta doing HBCK repair operation", e);
1219             return;
1220           }
1221         }
1222         LOG.info("HBCK repair is triggering assignment of region=" +
1223             regionInfo.getRegionNameAsString());
1224         // trigger assign, node is already in OFFLINE so don't need to update ZK
1225         assign(regionInfo, false);
1226         break;
1227 
1228       default:
1229         LOG.warn("Received unexpected region state from HBCK: " + rt.toString());
1230         break;
1231     }
1232 
1233   }
1234 
1235   // ZooKeeper events
1236 
1237   /**
1238    * New unassigned node has been created.
1239    *
1240    * <p>This happens when an RS begins the OPENING or CLOSING of a region by
1241    * creating an unassigned node.
1242    *
1243    * <p>When this happens we must:
1244    * <ol>
1245    *   <li>Watch the node for further events</li>
1246    *   <li>Read and handle the state in the node</li>
1247    * </ol>
1248    */
1249   @Override
1250   public void nodeCreated(String path) {
1251     handleAssignmentEvent(path);
1252   }
1253 
1254   /**
1255    * Existing unassigned node has had data changed.
1256    *
1257    * <p>This happens when an RS transitions from OFFLINE to OPENING, or between
1258    * OPENING/OPENED and CLOSING/CLOSED.
1259    *
1260    * <p>When this happens we must:
1261    * <ol>
1262    *   <li>Watch the node for further events</li>
1263    *   <li>Read and handle the state in the node</li>
1264    * </ol>
1265    */
1266   @Override
1267   public void nodeDataChanged(String path) {
1268     handleAssignmentEvent(path);
1269   }
1270 
1271 
1272   // We  don't want to have two events on the same region managed simultaneously.
1273   // For this reason, we need to wait if an event on the same region is currently in progress.
1274   // So we track the region names of the events in progress, and we keep a waiting list.
1275   private final Set<String> regionsInProgress = new HashSet<String>();
1276   // In a LinkedHashMultimap, the put order is kept when we retrieve the collection back. We need
1277   //  this as we want the events to be managed in the same order as we received them.
1278   private final LinkedHashMultimap <String, RegionRunnable>
1279       zkEventWorkerWaitingList = LinkedHashMultimap.create();
1280 
1281   /**
1282    * A specific runnable that works only on a region.
1283    */
1284   private interface RegionRunnable extends Runnable{
1285     /**
1286      * @return - the name of the region it works on.
1287      */
1288     String getRegionName();
1289   }
1290 
1291   /**
1292    * Submit a task, ensuring that there is only one task at a time that working on a given region.
1293    * Order is respected.
1294    */
1295   protected void zkEventWorkersSubmit(final RegionRunnable regRunnable) {
1296 
1297     synchronized (regionsInProgress) {
1298       // If we're there is already a task with this region, we add it to the
1299       //  waiting list and return.
1300       if (regionsInProgress.contains(regRunnable.getRegionName())) {
1301         synchronized (zkEventWorkerWaitingList){
1302           zkEventWorkerWaitingList.put(regRunnable.getRegionName(), regRunnable);
1303         }
1304         return;
1305       }
1306 
1307       // No event in progress on this region => we can submit a new task immediately.
1308       regionsInProgress.add(regRunnable.getRegionName());
1309       zkEventWorkers.submit(new Runnable() {
1310         @Override
1311         public void run() {
1312           try {
1313             regRunnable.run();
1314           } finally {
1315             // now that we have finished, let's see if there is an event for the same region in the
1316             //  waiting list. If it's the case, we can now submit it to the pool.
1317             synchronized (regionsInProgress) {
1318               regionsInProgress.remove(regRunnable.getRegionName());
1319               synchronized (zkEventWorkerWaitingList) {
1320                 java.util.Set<RegionRunnable> waiting = zkEventWorkerWaitingList.get(
1321                     regRunnable.getRegionName());
1322                 if (!waiting.isEmpty()) {
1323                   // We want the first object only. The only way to get it is through an iterator.
1324                   RegionRunnable toSubmit = waiting.iterator().next();
1325                   zkEventWorkerWaitingList.remove(toSubmit.getRegionName(), toSubmit);
1326                   zkEventWorkersSubmit(toSubmit);
1327                 }
1328               }
1329             }
1330           }
1331         }
1332       });
1333     }
1334   }
1335 
1336   @Override
1337   public void nodeDeleted(final String path) {
1338     if (path.startsWith(watcher.assignmentZNode)) {
1339       final String regionName = ZKAssign.getRegionName(watcher, path);
1340       zkEventWorkersSubmit(new RegionRunnable() {
1341         @Override
1342         public String getRegionName() {
1343           return regionName;
1344         }
1345 
1346         @Override
1347         public void run() {
1348           Lock lock = locker.acquireLock(regionName);
1349           try {
1350             RegionState rs = regionStates.getRegionTransitionState(regionName);
1351             if (rs == null) {
1352               rs = regionStates.getRegionState(regionName);
1353               if (rs == null || !rs.isMergingNew()) {
1354                 // MergingNew is an offline state
1355                 return;
1356               }
1357             }
1358 
1359             HRegionInfo regionInfo = rs.getRegion();
1360             String regionNameStr = regionInfo.getRegionNameAsString();
1361             LOG.debug("Znode " + regionNameStr + " deleted, state: " + rs);
1362 
1363             boolean disabled = getTableStateManager().isTableState(regionInfo.getTable(),
1364                 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING);
1365 
1366             ServerName serverName = rs.getServerName();
1367             if (serverManager.isServerOnline(serverName)) {
1368               if (rs.isOnServer(serverName) && (rs.isOpened() || rs.isSplitting())) {
1369                 synchronized (regionStates) {
1370                   regionOnline(regionInfo, serverName);
1371                   if (rs.isSplitting() && splitRegions.containsKey(regionInfo)) {
1372                     // Check if the daugter regions are still there, if they are present, offline
1373                     // as its the case of a rollback.
1374                     HRegionInfo hri_a = splitRegions.get(regionInfo).getFirst();
1375                     HRegionInfo hri_b = splitRegions.get(regionInfo).getSecond();
1376                     if (!regionStates.isRegionInTransition(hri_a.getEncodedName())) {
1377                       LOG.warn("Split daughter region not in transition " + hri_a);
1378                     }
1379                     if (!regionStates.isRegionInTransition(hri_b.getEncodedName())) {
1380                       LOG.warn("Split daughter region not in transition" + hri_b);
1381                     }
1382                     regionOffline(hri_a);
1383                     regionOffline(hri_b);
1384                     splitRegions.remove(regionInfo);
1385                   }
1386                   if (disabled) {
1387                     // if server is offline, no hurt to unassign again
1388                     LOG.info("Opened " + regionNameStr
1389                         + "but this table is disabled, triggering close of region");
1390                     unassign(regionInfo);
1391                   }
1392                 }
1393               } else if (rs.isMergingNew()) {
1394                 synchronized (regionStates) {
1395                   String p = regionInfo.getEncodedName();
1396                   PairOfSameType<HRegionInfo> regions = mergingRegions.get(p);
1397                   if (regions != null) {
1398                     onlineMergingRegion(disabled, regions.getFirst(), serverName);
1399                     onlineMergingRegion(disabled, regions.getSecond(), serverName);
1400                   }
1401                 }
1402               }
1403             }
1404           } finally {
1405             lock.unlock();
1406           }
1407         }
1408 
1409         private void onlineMergingRegion(boolean disabled,
1410             final HRegionInfo hri, final ServerName serverName) {
1411           RegionState regionState = regionStates.getRegionState(hri);
1412           if (regionState != null && regionState.isMerging()
1413               && regionState.isOnServer(serverName)) {
1414             regionOnline(regionState.getRegion(), serverName);
1415             if (disabled) {
1416               unassign(hri);
1417             }
1418           }
1419         }
1420       });
1421     }
1422   }
1423 
1424   /**
1425    * New unassigned node has been created.
1426    *
1427    * <p>This happens when an RS begins the OPENING, SPLITTING or CLOSING of a
1428    * region by creating a znode.
1429    *
1430    * <p>When this happens we must:
1431    * <ol>
1432    *   <li>Watch the node for further children changed events</li>
1433    *   <li>Watch all new children for changed events</li>
1434    * </ol>
1435    */
1436   @Override
1437   public void nodeChildrenChanged(String path) {
1438     if (path.equals(watcher.assignmentZNode)) {
1439       zkEventWorkers.submit(new Runnable() {
1440         @Override
1441         public void run() {
1442           try {
1443             // Just make sure we see the changes for the new znodes
1444             List<String> children =
1445               ZKUtil.listChildrenAndWatchForNewChildren(
1446                 watcher, watcher.assignmentZNode);
1447             if (children != null) {
1448               Stat stat = new Stat();
1449               for (String child : children) {
1450                 // if region is in transition, we already have a watch
1451                 // on it, so no need to watch it again. So, as I know for now,
1452                 // this is needed to watch splitting nodes only.
1453                 if (!regionStates.isRegionInTransition(child)) {
1454                   ZKAssign.getDataAndWatch(watcher, child, stat);
1455                 }
1456               }
1457             }
1458           } catch (KeeperException e) {
1459             server.abort("Unexpected ZK exception reading unassigned children", e);
1460           }
1461         }
1462       });
1463     }
1464   }
1465 
1466 
1467   /**
1468    * Marks the region as online.  Removes it from regions in transition and
1469    * updates the in-memory assignment information.
1470    * <p>
1471    * Used when a region has been successfully opened on a region server.
1472    * @param regionInfo
1473    * @param sn
1474    */
1475   void regionOnline(HRegionInfo regionInfo, ServerName sn) {
1476     regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
1477   }
1478 
1479   void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
1480     numRegionsOpened.incrementAndGet();
1481     regionStates.regionOnline(regionInfo, sn, openSeqNum);
1482 
1483     // Remove plan if one.
1484     clearRegionPlan(regionInfo);
1485     balancer.regionOnline(regionInfo, sn);
1486 
1487     // Tell our listeners that a region was opened
1488     sendRegionOpenedNotification(regionInfo, sn);
1489   }
1490 
1491   /**
1492    * Pass the assignment event to a worker for processing.
1493    * Each worker is a single thread executor service.  The reason
1494    * for just one thread is to make sure all events for a given
1495    * region are processed in order.
1496    *
1497    * @param path
1498    */
1499   private void handleAssignmentEvent(final String path) {
1500     if (path.startsWith(watcher.assignmentZNode)) {
1501       final String regionName = ZKAssign.getRegionName(watcher, path);
1502 
1503       zkEventWorkersSubmit(new RegionRunnable() {
1504         @Override
1505         public String getRegionName() {
1506           return regionName;
1507         }
1508 
1509         @Override
1510         public void run() {
1511           try {
1512             Stat stat = new Stat();
1513             byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat);
1514             if (data == null) return;
1515 
1516             RegionTransition rt = RegionTransition.parseFrom(data);
1517 
1518             // TODO: This code is tied to ZK anyway, so for now leaving it as is,
1519             // will refactor when whole region assignment will be abstracted from ZK
1520             BaseCoordinatedStateManager csm =
1521               (BaseCoordinatedStateManager) server.getCoordinatedStateManager();
1522             OpenRegionCoordination openRegionCoordination = csm.getOpenRegionCoordination();
1523 
1524             ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
1525               new ZkOpenRegionCoordination.ZkOpenRegionDetails();
1526             zkOrd.setVersion(stat.getVersion());
1527             zkOrd.setServerName(csm.getServer().getServerName());
1528 
1529             handleRegion(rt, openRegionCoordination, zkOrd);
1530           } catch (KeeperException e) {
1531             server.abort("Unexpected ZK exception reading unassigned node data", e);
1532           } catch (DeserializationException e) {
1533             server.abort("Unexpected exception deserializing node data", e);
1534           }
1535         }
1536       });
1537     }
1538   }
1539 
1540   /**
1541    * Marks the region as offline.  Removes it from regions in transition and
1542    * removes in-memory assignment information.
1543    * <p>
1544    * Used when a region has been closed and should remain closed.
1545    * @param regionInfo
1546    */
1547   public void regionOffline(final HRegionInfo regionInfo) {
1548     regionOffline(regionInfo, null);
1549   }
1550 
1551   public void offlineDisabledRegion(HRegionInfo regionInfo) {
1552     if (useZKForAssignment) {
1553       // Disabling so should not be reassigned, just delete the CLOSED node
1554       LOG.debug("Table being disabled so deleting ZK node and removing from " +
1555         "regions in transition, skipping assignment of region " +
1556           regionInfo.getRegionNameAsString());
1557       String encodedName = regionInfo.getEncodedName();
1558       deleteNodeInStates(encodedName, "closed", null,
1559         EventType.RS_ZK_REGION_CLOSED, EventType.M_ZK_REGION_OFFLINE);
1560     }
1561     replicasToClose.remove(regionInfo);
1562     regionOffline(regionInfo);
1563   }
1564 
1565   // Assignment methods
1566 
1567   /**
1568    * Assigns the specified region.
1569    * <p>
1570    * If a RegionPlan is available with a valid destination then it will be used
1571    * to determine what server region is assigned to.  If no RegionPlan is
1572    * available, region will be assigned to a random available server.
1573    * <p>
1574    * Updates the RegionState and sends the OPEN RPC.
1575    * <p>
1576    * This will only succeed if the region is in transition and in a CLOSED or
1577    * OFFLINE state or not in transition (in-memory not zk), and of course, the
1578    * chosen server is up and running (It may have just crashed!).  If the
1579    * in-memory checks pass, the zk node is forced to OFFLINE before assigning.
1580    *
1581    * @param region server to be assigned
1582    * @param setOfflineInZK whether ZK node should be created/transitioned to an
1583    *                       OFFLINE state before assigning the region
1584    */
1585   public void assign(HRegionInfo region, boolean setOfflineInZK) {
1586     assign(region, setOfflineInZK, false);
1587   }
1588 
1589   /**
1590    * Use care with forceNewPlan. It could cause double assignment.
1591    */
1592   @VisibleForTesting
1593   public void assign(HRegionInfo region,
1594       boolean setOfflineInZK, boolean forceNewPlan) {
1595     if (isDisabledorDisablingRegionInRIT(region)) {
1596       return;
1597     }
1598     String encodedName = region.getEncodedName();
1599     Lock lock = locker.acquireLock(encodedName);
1600     try {
1601       RegionState state = forceRegionStateToOffline(region, forceNewPlan);
1602       if (state != null) {
1603         if (regionStates.wasRegionOnDeadServer(encodedName)) {
1604           LOG.info("Skip assigning " + region.getRegionNameAsString()
1605             + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1606             + " is dead but not processed yet");
1607           return;
1608         }
1609         assign(state, setOfflineInZK && useZKForAssignment, forceNewPlan);
1610       }
1611     } finally {
1612       lock.unlock();
1613     }
1614   }
1615 
1616   /**
1617    * Bulk assign regions to <code>destination</code>.
1618    * @param destination
1619    * @param regions Regions to assign.
1620    * @return true if successful
1621    */
1622   boolean assign(final ServerName destination, final List<HRegionInfo> regions)
1623     throws InterruptedException {
1624     long startTime = EnvironmentEdgeManager.currentTime();
1625     try {
1626       int regionCount = regions.size();
1627       if (regionCount == 0) {
1628         return true;
1629       }
1630       LOG.info("Assigning " + regionCount + " region(s) to " + destination.toString());
1631       Set<String> encodedNames = new HashSet<String>(regionCount);
1632       for (HRegionInfo region : regions) {
1633         encodedNames.add(region.getEncodedName());
1634       }
1635 
1636       List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
1637       Map<String, Lock> locks = locker.acquireLocks(encodedNames);
1638       try {
1639         AtomicInteger counter = new AtomicInteger(0);
1640         Map<String, Integer> offlineNodesVersions = new ConcurrentHashMap<String, Integer>();
1641         OfflineCallback cb = new OfflineCallback(
1642           watcher, destination, counter, offlineNodesVersions);
1643         Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size());
1644         List<RegionState> states = new ArrayList<RegionState>(regions.size());
1645         for (HRegionInfo region : regions) {
1646           String encodedName = region.getEncodedName();
1647           if (!isDisabledorDisablingRegionInRIT(region)) {
1648             RegionState state = forceRegionStateToOffline(region, false);
1649             boolean onDeadServer = false;
1650             if (state != null) {
1651               if (regionStates.wasRegionOnDeadServer(encodedName)) {
1652                 LOG.info("Skip assigning " + region.getRegionNameAsString()
1653                   + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1654                   + " is dead but not processed yet");
1655                 onDeadServer = true;
1656               } else if (!useZKForAssignment
1657                   || asyncSetOfflineInZooKeeper(state, cb, destination)) {
1658                 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
1659                 plans.put(encodedName, plan);
1660                 states.add(state);
1661                 continue;
1662               }
1663             }
1664             // Reassign if the region wasn't on a dead server
1665             if (!onDeadServer) {
1666               LOG.info("failed to force region state to offline or "
1667                 + "failed to set it offline in ZK, will reassign later: " + region);
1668               failedToOpenRegions.add(region); // assign individually later
1669             }
1670           }
1671           // Release the lock, this region is excluded from bulk assign because
1672           // we can't update its state, or set its znode to offline.
1673           Lock lock = locks.remove(encodedName);
1674           lock.unlock();
1675         }
1676 
1677         if (useZKForAssignment) {
1678           // Wait until all unassigned nodes have been put up and watchers set.
1679           int total = states.size();
1680           for (int oldCounter = 0; !server.isStopped();) {
1681             int count = counter.get();
1682             if (oldCounter != count) {
1683               LOG.debug(destination.toString() + " unassigned znodes=" + count +
1684                 " of total=" + total + "; oldCounter=" + oldCounter);
1685               oldCounter = count;
1686             }
1687             if (count >= total) break;
1688             Thread.sleep(5);
1689           }
1690         }
1691 
1692         if (server.isStopped()) {
1693           return false;
1694         }
1695 
1696         // Add region plans, so we can updateTimers when one region is opened so
1697         // that unnecessary timeout on RIT is reduced.
1698         this.addPlans(plans);
1699 
1700         List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos =
1701           new ArrayList<Triple<HRegionInfo, Integer, List<ServerName>>>(states.size());
1702         for (RegionState state: states) {
1703           HRegionInfo region = state.getRegion();
1704           String encodedRegionName = region.getEncodedName();
1705           Integer nodeVersion = offlineNodesVersions.get(encodedRegionName);
1706           if (useZKForAssignment && (nodeVersion == null || nodeVersion == -1)) {
1707             LOG.warn("failed to offline in zookeeper: " + region);
1708             failedToOpenRegions.add(region); // assign individually later
1709             Lock lock = locks.remove(encodedRegionName);
1710             lock.unlock();
1711           } else {
1712             regionStates.updateRegionState(
1713               region, State.PENDING_OPEN, destination);
1714             List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1715             if (this.shouldAssignRegionsWithFavoredNodes) {
1716               favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1717             }
1718             regionOpenInfos.add(new Triple<HRegionInfo, Integer,  List<ServerName>>(
1719               region, nodeVersion, favoredNodes));
1720           }
1721         }
1722 
1723         // Move on to open regions.
1724         try {
1725           // Send OPEN RPC. If it fails on a IOE or RemoteException,
1726           // regions will be assigned individually.
1727           long maxWaitTime = System.currentTimeMillis() +
1728             this.server.getConfiguration().
1729               getLong("hbase.regionserver.rpc.startup.waittime", 60000);
1730           for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
1731             try {
1732               // regionOpenInfos is empty if all regions are in failedToOpenRegions list
1733               if (regionOpenInfos.isEmpty()) {
1734                 break;
1735               }
1736               List<RegionOpeningState> regionOpeningStateList = serverManager
1737                 .sendRegionOpen(destination, regionOpenInfos);
1738               if (regionOpeningStateList == null) {
1739                 // Failed getting RPC connection to this server
1740                 return false;
1741               }
1742               for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
1743                 RegionOpeningState openingState = regionOpeningStateList.get(k);
1744                 if (openingState != RegionOpeningState.OPENED) {
1745                   HRegionInfo region = regionOpenInfos.get(k).getFirst();
1746                   if (openingState == RegionOpeningState.ALREADY_OPENED) {
1747                     processAlreadyOpenedRegion(region, destination);
1748                   } else if (openingState == RegionOpeningState.FAILED_OPENING) {
1749                     // Failed opening this region, reassign it later
1750                     failedToOpenRegions.add(region);
1751                   } else {
1752                     LOG.warn("THIS SHOULD NOT HAPPEN: unknown opening state "
1753                       + openingState + " in assigning region " + region);
1754                   }
1755                 }
1756               }
1757               break;
1758             } catch (IOException e) {
1759               if (e instanceof RemoteException) {
1760                 e = ((RemoteException)e).unwrapRemoteException();
1761               }
1762               if (e instanceof RegionServerStoppedException) {
1763                 LOG.warn("The region server was shut down, ", e);
1764                 // No need to retry, the region server is a goner.
1765                 return false;
1766               } else if (e instanceof ServerNotRunningYetException) {
1767                 long now = System.currentTimeMillis();
1768                 if (now < maxWaitTime) {
1769                   LOG.debug("Server is not yet up; waiting up to " +
1770                     (maxWaitTime - now) + "ms", e);
1771                   Thread.sleep(100);
1772                   i--; // reset the try count
1773                   continue;
1774                 }
1775               } else if (e instanceof java.net.SocketTimeoutException
1776                   && this.serverManager.isServerOnline(destination)) {
1777                 // In case socket is timed out and the region server is still online,
1778                 // the openRegion RPC could have been accepted by the server and
1779                 // just the response didn't go through.  So we will retry to
1780                 // open the region on the same server.
1781                 if (LOG.isDebugEnabled()) {
1782                   LOG.debug("Bulk assigner openRegion() to " + destination
1783                     + " has timed out, but the regions might"
1784                     + " already be opened on it.", e);
1785                 }
1786                 // wait and reset the re-try count, server might be just busy.
1787                 Thread.sleep(100);
1788                 i--;
1789                 continue;
1790               }
1791               throw e;
1792             }
1793           }
1794         } catch (IOException e) {
1795           // Can be a socket timeout, EOF, NoRouteToHost, etc
1796           LOG.info("Unable to communicate with " + destination
1797             + " in order to assign regions, ", e);
1798           return false;
1799         }
1800       } finally {
1801         for (Lock lock : locks.values()) {
1802           lock.unlock();
1803         }
1804       }
1805 
1806       if (!failedToOpenRegions.isEmpty()) {
1807         for (HRegionInfo region : failedToOpenRegions) {
1808           if (!regionStates.isRegionOnline(region)) {
1809             invokeAssign(region);
1810           }
1811         }
1812       }
1813 
1814       // wait for assignment completion
1815       ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions.size());
1816       for (HRegionInfo region: regions) {
1817         if (!region.getTable().isSystemTable()) {
1818           userRegionSet.add(region);
1819         }
1820       }
1821       if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
1822             System.currentTimeMillis())) {
1823         LOG.debug("some user regions are still in transition: " + userRegionSet);
1824       }
1825       LOG.debug("Bulk assigning done for " + destination);
1826       return true;
1827     } finally {
1828       metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTime() - startTime);
1829     }
1830   }
1831 
1832   /**
1833    * Send CLOSE RPC if the server is online, otherwise, offline the region.
1834    *
1835    * The RPC will be sent only to the region sever found in the region state
1836    * if it is passed in, otherwise, to the src server specified. If region
1837    * state is not specified, we don't update region state at all, instead
1838    * we just send the RPC call. This is useful for some cleanup without
1839    * messing around the region states (see handleRegion, on region opened
1840    * on an unexpected server scenario, for an example)
1841    */
1842   private void unassign(final HRegionInfo region,
1843       final RegionState state, final int versionOfClosingNode,
1844       final ServerName dest, final boolean transitionInZK,
1845       final ServerName src) {
1846     ServerName server = src;
1847     if (state != null) {
1848       server = state.getServerName();
1849     }
1850     long maxWaitTime = -1;
1851     for (int i = 1; i <= this.maximumAttempts; i++) {
1852       if (this.server.isStopped() || this.server.isAborted()) {
1853         LOG.debug("Server stopped/aborted; skipping unassign of " + region);
1854         return;
1855       }
1856       // ClosedRegionhandler can remove the server from this.regions
1857       if (!serverManager.isServerOnline(server)) {
1858         LOG.debug("Offline " + region.getRegionNameAsString()
1859           + ", no need to unassign since it's on a dead server: " + server);
1860         if (transitionInZK) {
1861           // delete the node. if no node exists need not bother.
1862           deleteClosingOrClosedNode(region, server);
1863         }
1864         if (state != null) {
1865           regionOffline(region);
1866         }
1867         return;
1868       }
1869       try {
1870         // Send CLOSE RPC
1871         if (serverManager.sendRegionClose(server, region,
1872           versionOfClosingNode, dest, transitionInZK)) {
1873           LOG.debug("Sent CLOSE to " + server + " for region " +
1874             region.getRegionNameAsString());
1875           if (useZKForAssignment && !transitionInZK && state != null) {
1876             // Retry to make sure the region is
1877             // closed so as to avoid double assignment.
1878             unassign(region, state, versionOfClosingNode,
1879               dest, transitionInZK, src);
1880           }
1881           return;
1882         }
1883         // This never happens. Currently regionserver close always return true.
1884         // Todo; this can now happen (0.96) if there is an exception in a coprocessor
1885         LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
1886           region.getRegionNameAsString());
1887       } catch (Throwable t) {
1888         long sleepTime = 0;
1889         Configuration conf = this.server.getConfiguration();
1890         if (t instanceof RemoteException) {
1891           t = ((RemoteException)t).unwrapRemoteException();
1892         }
1893         boolean logRetries = true;
1894         if (t instanceof RegionServerAbortedException
1895             || t instanceof RegionServerStoppedException
1896             || t instanceof ServerNotRunningYetException) {
1897           // RS is aborting or stopping, we cannot offline the region since the region may need
1898           // to do WAL recovery. Until we see  the RS expiration, we should retry.
1899           sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1900             RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1901 
1902         } else if (t instanceof NotServingRegionException) {
1903           LOG.debug("Offline " + region.getRegionNameAsString()
1904             + ", it's not any more on " + server, t);
1905           if (transitionInZK) {
1906             deleteClosingOrClosedNode(region, server);
1907           }
1908           if (state != null) {
1909             regionOffline(region);
1910           }
1911           return;
1912         } else if ((t instanceof FailedServerException) || (state != null &&
1913             t instanceof RegionAlreadyInTransitionException)) {
1914           if(t instanceof FailedServerException) {
1915             sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1916                   RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1917           } else {
1918             // RS is already processing this region, only need to update the timestamp
1919             LOG.debug("update " + state + " the timestamp.");
1920             state.updateTimestampToNow();
1921             if (maxWaitTime < 0) {
1922               maxWaitTime =
1923                   EnvironmentEdgeManager.currentTime()
1924                       + conf.getLong(ALREADY_IN_TRANSITION_WAITTIME,
1925                         DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
1926             }
1927             long now = EnvironmentEdgeManager.currentTime();
1928             if (now < maxWaitTime) {
1929               LOG.debug("Region is already in transition; "
1930                 + "waiting up to " + (maxWaitTime - now) + "ms", t);
1931               sleepTime = 100;
1932               i--; // reset the try count
1933               logRetries = false;
1934             }
1935           }
1936         }
1937 
1938         try {
1939           if (sleepTime > 0) {
1940             Thread.sleep(sleepTime);
1941           }
1942         } catch (InterruptedException ie) {
1943           LOG.warn("Failed to unassign "
1944             + region.getRegionNameAsString() + " since interrupted", ie);
1945           Thread.currentThread().interrupt();
1946           if (state != null) {
1947             regionStates.updateRegionState(region, State.FAILED_CLOSE);
1948           }
1949           return;
1950         }
1951 
1952         if (logRetries) {
1953           LOG.info("Server " + server + " returned " + t + " for "
1954             + region.getRegionNameAsString() + ", try=" + i
1955             + " of " + this.maximumAttempts, t);
1956           // Presume retry or server will expire.
1957         }
1958       }
1959     }
1960     // Run out of attempts
1961     if (state != null) {
1962       regionStates.updateRegionState(region, State.FAILED_CLOSE);
1963     }
1964   }
1965 
1966   /**
1967    * Set region to OFFLINE unless it is opening and forceNewPlan is false.
1968    */
1969   private RegionState forceRegionStateToOffline(
1970       final HRegionInfo region, final boolean forceNewPlan) {
1971     RegionState state = regionStates.getRegionState(region);
1972     if (state == null) {
1973       LOG.warn("Assigning but not in region states: " + region);
1974       state = regionStates.createRegionState(region);
1975     }
1976 
1977     ServerName sn = state.getServerName();
1978     if (forceNewPlan && LOG.isDebugEnabled()) {
1979       LOG.debug("Force region state offline " + state);
1980     }
1981 
1982     switch (state.getState()) {
1983     case OPEN:
1984     case OPENING:
1985     case PENDING_OPEN:
1986     case CLOSING:
1987     case PENDING_CLOSE:
1988       if (!forceNewPlan) {
1989         LOG.debug("Skip assigning " +
1990           region + ", it is already " + state);
1991         return null;
1992       }
1993     case FAILED_CLOSE:
1994     case FAILED_OPEN:
1995       unassign(region, state, -1, null, false, null);
1996       state = regionStates.getRegionState(region);
1997       if (state.isFailedClose()) {
1998         // If we can't close the region, we can't re-assign
1999         // it so as to avoid possible double assignment/data loss.
2000         LOG.info("Skip assigning " +
2001           region + ", we couldn't close it: " + state);
2002         return null;
2003       }
2004     case OFFLINE:
2005       // This region could have been open on this server
2006       // for a while. If the server is dead and not processed
2007       // yet, we can move on only if the meta shows the
2008       // region is not on this server actually, or on a server
2009       // not dead, or dead and processed already.
2010       // In case not using ZK, we don't need this check because
2011       // we have the latest info in memory, and the caller
2012       // will do another round checking any way.
2013       if (useZKForAssignment
2014           && regionStates.isServerDeadAndNotProcessed(sn)
2015           && wasRegionOnDeadServerByMeta(region, sn)) {
2016         if (!regionStates.isRegionInTransition(region)) {
2017           LOG.info("Updating the state to " + State.OFFLINE + " to allow to be reassigned by SSH");
2018           regionStates.updateRegionState(region, State.OFFLINE);
2019         }
2020         LOG.info("Skip assigning " + region.getRegionNameAsString()
2021             + ", it is on a dead but not processed yet server: " + sn);
2022         return null;
2023       }
2024     case CLOSED:
2025       break;
2026     default:
2027       LOG.error("Trying to assign region " + region
2028         + ", which is " + state);
2029       return null;
2030     }
2031     return state;
2032   }
2033 
2034   @SuppressWarnings("deprecation")
2035   protected boolean wasRegionOnDeadServerByMeta(
2036       final HRegionInfo region, final ServerName sn) {
2037     try {
2038       if (region.isMetaRegion()) {
2039         ServerName server = this.server.getMetaTableLocator().
2040           getMetaRegionLocation(this.server.getZooKeeper());
2041         return regionStates.isServerDeadAndNotProcessed(server);
2042       }
2043       while (!server.isStopped()) {
2044         try {
2045           this.server.getMetaTableLocator().waitMetaRegionLocation(server.getZooKeeper());
2046           Result r = MetaTableAccessor.getRegionResult(server.getConnection(),
2047             region.getRegionName());
2048           if (r == null || r.isEmpty()) return false;
2049           ServerName server = HRegionInfo.getServerName(r);
2050           return regionStates.isServerDeadAndNotProcessed(server);
2051         } catch (IOException ioe) {
2052           LOG.info("Received exception accessing hbase:meta during force assign "
2053             + region.getRegionNameAsString() + ", retrying", ioe);
2054         }
2055       }
2056     } catch (InterruptedException e) {
2057       Thread.currentThread().interrupt();
2058       LOG.info("Interrupted accessing hbase:meta", e);
2059     }
2060     // Call is interrupted or server is stopped.
2061     return regionStates.isServerDeadAndNotProcessed(sn);
2062   }
2063 
2064   /**
2065    * Caller must hold lock on the passed <code>state</code> object.
2066    * @param state
2067    * @param setOfflineInZK
2068    * @param forceNewPlan
2069    */
2070   public void assign(RegionState state,
2071       boolean setOfflineInZK, final boolean forceNewPlan) {
2072     long startTime = EnvironmentEdgeManager.currentTime();
2073     try {
2074       Configuration conf = server.getConfiguration();
2075       RegionState currentState = state;
2076       int versionOfOfflineNode = -1;
2077       RegionPlan plan = null;
2078       long maxWaitTime = -1;
2079       HRegionInfo region = state.getRegion();
2080       RegionOpeningState regionOpenState;
2081       Throwable previousException = null;
2082       for (int i = 1; i <= maximumAttempts; i++) {
2083         if (server.isStopped() || server.isAborted()) {
2084           LOG.info("Skip assigning " + region.getRegionNameAsString()
2085             + ", the server is stopped/aborted");
2086           return;
2087         }
2088 
2089         if (plan == null) { // Get a server for the region at first
2090           try {
2091             plan = getRegionPlan(region, forceNewPlan);
2092           } catch (HBaseIOException e) {
2093             LOG.warn("Failed to get region plan", e);
2094           }
2095         }
2096 
2097         if (plan == null) {
2098           LOG.warn("Unable to determine a plan to assign " + region);
2099 
2100           // For meta region, we have to keep retrying until succeeding
2101           if (region.isMetaRegion()) {
2102             if (i == maximumAttempts) {
2103               i = 0; // re-set attempt count to 0 for at least 1 retry
2104 
2105               LOG.warn("Unable to determine a plan to assign a hbase:meta region " + region +
2106                 " after maximumAttempts (" + this.maximumAttempts +
2107                 "). Reset attempts count and continue retrying.");
2108             }
2109             waitForRetryingMetaAssignment();
2110             continue;
2111           }
2112 
2113           regionStates.updateRegionState(region, State.FAILED_OPEN);
2114           return;
2115         }
2116         if (setOfflineInZK && versionOfOfflineNode == -1) {
2117           LOG.info("Setting node as OFFLINED in ZooKeeper for region " + region);
2118           // get the version of the znode after setting it to OFFLINE.
2119           // versionOfOfflineNode will be -1 if the znode was not set to OFFLINE
2120           versionOfOfflineNode = setOfflineInZooKeeper(currentState, plan.getDestination());
2121           if (versionOfOfflineNode != -1) {
2122             if (isDisabledorDisablingRegionInRIT(region)) {
2123               return;
2124             }
2125             // In case of assignment from EnableTableHandler table state is ENABLING. Any how
2126             // EnableTableHandler will set ENABLED after assigning all the table regions. If we
2127             // try to set to ENABLED directly then client API may think table is enabled.
2128             // When we have a case such as all the regions are added directly into hbase:meta and we call
2129             // assignRegion then we need to make the table ENABLED. Hence in such case the table
2130             // will not be in ENABLING or ENABLED state.
2131             TableName tableName = region.getTable();
2132             if (!tableStateManager.isTableState(tableName,
2133               ZooKeeperProtos.Table.State.ENABLED, ZooKeeperProtos.Table.State.ENABLING)) {
2134               LOG.debug("Setting table " + tableName + " to ENABLED state.");
2135               setEnabledTable(tableName);
2136             }
2137           }
2138         }
2139         if (setOfflineInZK && versionOfOfflineNode == -1) {
2140           LOG.info("Unable to set offline in ZooKeeper to assign " + region);
2141           // Setting offline in ZK must have been failed due to ZK racing or some
2142           // exception which may make the server to abort. If it is ZK racing,
2143           // we should retry since we already reset the region state,
2144           // existing (re)assignment will fail anyway.
2145           if (!server.isAborted()) {
2146             continue;
2147           }
2148         }
2149         LOG.info("Assigning " + region.getRegionNameAsString() +
2150             " to " + plan.getDestination());
2151         // Transition RegionState to PENDING_OPEN
2152         currentState = regionStates.updateRegionState(region,
2153           State.PENDING_OPEN, plan.getDestination());
2154 
2155         boolean needNewPlan;
2156         final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
2157             " to " + plan.getDestination();
2158         try {
2159           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
2160           if (this.shouldAssignRegionsWithFavoredNodes) {
2161             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
2162           }
2163           regionOpenState = serverManager.sendRegionOpen(
2164               plan.getDestination(), region, versionOfOfflineNode, favoredNodes);
2165 
2166           if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
2167             // Failed opening this region, looping again on a new server.
2168             needNewPlan = true;
2169             LOG.warn(assignMsg + ", regionserver says 'FAILED_OPENING', " +
2170                 " trying to assign elsewhere instead; " +
2171                 "try=" + i + " of " + this.maximumAttempts);
2172           } else {
2173             // we're done
2174             if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
2175               processAlreadyOpenedRegion(region, plan.getDestination());
2176             }
2177             return;
2178           }
2179 
2180         } catch (Throwable t) {
2181           if (t instanceof RemoteException) {
2182             t = ((RemoteException) t).unwrapRemoteException();
2183           }
2184           previousException = t;
2185 
2186           // Should we wait a little before retrying? If the server is starting it's yes.
2187           // If the region is already in transition, it's yes as well: we want to be sure that
2188           //  the region will get opened but we don't want a double assignment.
2189           boolean hold = (t instanceof RegionAlreadyInTransitionException ||
2190               t instanceof ServerNotRunningYetException);
2191 
2192           // In case socket is timed out and the region server is still online,
2193           // the openRegion RPC could have been accepted by the server and
2194           // just the response didn't go through.  So we will retry to
2195           // open the region on the same server to avoid possible
2196           // double assignment.
2197           boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
2198               && this.serverManager.isServerOnline(plan.getDestination()));
2199 
2200 
2201           if (hold) {
2202             LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
2203               "try=" + i + " of " + this.maximumAttempts, t);
2204 
2205             if (maxWaitTime < 0) {
2206               if (t instanceof RegionAlreadyInTransitionException) {
2207                 maxWaitTime = EnvironmentEdgeManager.currentTime()
2208                   + this.server.getConfiguration().getLong(ALREADY_IN_TRANSITION_WAITTIME,
2209                     DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
2210               } else {
2211                 maxWaitTime = EnvironmentEdgeManager.currentTime()
2212                   + this.server.getConfiguration().getLong(
2213                     "hbase.regionserver.rpc.startup.waittime", 60000);
2214               }
2215             }
2216             try {
2217               needNewPlan = false;
2218               long now = EnvironmentEdgeManager.currentTime();
2219               if (now < maxWaitTime) {
2220                 LOG.debug("Server is not yet up or region is already in transition; "
2221                   + "waiting up to " + (maxWaitTime - now) + "ms", t);
2222                 Thread.sleep(100);
2223                 i--; // reset the try count
2224               } else if (!(t instanceof RegionAlreadyInTransitionException)) {
2225                 LOG.debug("Server is not up for a while; try a new one", t);
2226                 needNewPlan = true;
2227               }
2228             } catch (InterruptedException ie) {
2229               LOG.warn("Failed to assign "
2230                   + region.getRegionNameAsString() + " since interrupted", ie);
2231               regionStates.updateRegionState(region, State.FAILED_OPEN);
2232               Thread.currentThread().interrupt();
2233               return;
2234             }
2235           } else if (retry) {
2236             needNewPlan = false;
2237             i--; // we want to retry as many times as needed as long as the RS is not dead.
2238             LOG.warn(assignMsg + ", trying to assign to the same region server due ", t);
2239           } else {
2240             needNewPlan = true;
2241             LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
2242                 " try=" + i + " of " + this.maximumAttempts, t);
2243           }
2244         }
2245 
2246         if (i == this.maximumAttempts) {
2247           // For meta region, we have to keep retrying until succeeding
2248           if (region.isMetaRegion()) {
2249             i = 0; // re-set attempt count to 0 for at least 1 retry
2250             LOG.warn(assignMsg +
2251                 ", trying to assign a hbase:meta region reached to maximumAttempts (" +
2252                 this.maximumAttempts + ").  Reset attempt counts and continue retrying.");
2253             waitForRetryingMetaAssignment();
2254           }
2255           else {
2256             // Don't reset the region state or get a new plan any more.
2257             // This is the last try.
2258             continue;
2259           }
2260         }
2261 
2262         // If region opened on destination of present plan, reassigning to new
2263         // RS may cause double assignments. In case of RegionAlreadyInTransitionException
2264         // reassigning to same RS.
2265         if (needNewPlan) {
2266           // Force a new plan and reassign. Will return null if no servers.
2267           // The new plan could be the same as the existing plan since we don't
2268           // exclude the server of the original plan, which should not be
2269           // excluded since it could be the only server up now.
2270           RegionPlan newPlan = null;
2271           try {
2272             newPlan = getRegionPlan(region, true);
2273           } catch (HBaseIOException e) {
2274             LOG.warn("Failed to get region plan", e);
2275           }
2276           if (newPlan == null) {
2277             regionStates.updateRegionState(region, State.FAILED_OPEN);
2278             LOG.warn("Unable to find a viable location to assign region " +
2279                 region.getRegionNameAsString());
2280             return;
2281           }
2282 
2283           if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
2284             // Clean out plan we failed execute and one that doesn't look like it'll
2285             // succeed anyways; we need a new plan!
2286             // Transition back to OFFLINE
2287             LOG.info("Region assignment plan changed from " + plan.getDestination() + " to "
2288                 + newPlan.getDestination() + " server.");
2289             currentState = regionStates.updateRegionState(region, State.OFFLINE);
2290             versionOfOfflineNode = -1;
2291             if (useZKForAssignment) {
2292               setOfflineInZK = true;
2293             }
2294             plan = newPlan;
2295           } else if(plan.getDestination().equals(newPlan.getDestination()) &&
2296               previousException instanceof FailedServerException) {
2297             try {
2298               LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
2299                 " to the same failed server.");
2300               Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
2301                 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
2302             } catch (InterruptedException ie) {
2303               LOG.warn("Failed to assign "
2304                   + region.getRegionNameAsString() + " since interrupted", ie);
2305               regionStates.updateRegionState(region, State.FAILED_OPEN);
2306               Thread.currentThread().interrupt();
2307               return;
2308             }
2309           }
2310         }
2311       }
2312       // Run out of attempts
2313       regionStates.updateRegionState(region, State.FAILED_OPEN);
2314     } finally {
2315       metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTime() - startTime);
2316     }
2317   }
2318 
2319   private void processAlreadyOpenedRegion(HRegionInfo region, ServerName sn) {
2320     // Remove region from in-memory transition and unassigned node from ZK
2321     // While trying to enable the table the regions of the table were
2322     // already enabled.
2323     LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString()
2324       + " to " + sn);
2325     String encodedName = region.getEncodedName();
2326 
2327     // If use ZkForAssignment, region already Opened event should not be handled,
2328     // leave it to zk event. See HBase-14407.
2329     if (useZKForAssignment) {
2330       String node = ZKAssign.getNodeName(watcher, encodedName);
2331       Stat stat = new Stat();
2332       try {
2333         byte[] existingBytes = ZKUtil.getDataNoWatch(watcher, node, stat);
2334         if (existingBytes != null) {
2335           RegionTransition rt= RegionTransition.parseFrom(existingBytes);
2336           EventType et = rt.getEventType();
2337           if (et.equals(EventType.RS_ZK_REGION_OPENED)) {
2338             LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString()
2339               + " and node in " + et + " state");
2340             return;
2341           }
2342         }
2343       } catch (KeeperException ke) {
2344         LOG.warn("Unexpected ZK exception getData " + node
2345           + " node for the region " + encodedName, ke);
2346       } catch (DeserializationException e) {
2347         LOG.warn("Get RegionTransition from zk deserialization failed! ", e);
2348       }
2349       deleteNodeInStates(encodedName, "offline", sn, EventType.M_ZK_REGION_OFFLINE);
2350     }
2351 
2352     regionStates.regionOnline(region, sn);
2353   }
2354 
2355   private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
2356     if (this.tableStateManager.isTableState(region.getTable(),
2357         ZooKeeperProtos.Table.State.DISABLED,
2358         ZooKeeperProtos.Table.State.DISABLING) || replicasToClose.contains(region)) {
2359       LOG.info("Table " + region.getTable() + " is disabled or disabling;"
2360         + " skipping assign of " + region.getRegionNameAsString());
2361       offlineDisabledRegion(region);
2362       return true;
2363     }
2364     return false;
2365   }
2366 
2367   /**
2368    * Set region as OFFLINED up in zookeeper
2369    *
2370    * @param state
2371    * @return the version of the offline node if setting of the OFFLINE node was
2372    *         successful, -1 otherwise.
2373    */
2374   private int setOfflineInZooKeeper(final RegionState state, final ServerName destination) {
2375     if (!state.isClosed() && !state.isOffline()) {
2376       String msg = "Unexpected state : " + state + " .. Cannot transit it to OFFLINE.";
2377       this.server.abort(msg, new IllegalStateException(msg));
2378       return -1;
2379     }
2380     regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
2381     int versionOfOfflineNode;
2382     try {
2383       // get the version after setting the znode to OFFLINE
2384       versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(watcher,
2385         state.getRegion(), destination);
2386       if (versionOfOfflineNode == -1) {
2387         LOG.warn("Attempted to create/force node into OFFLINE state before "
2388             + "completing assignment but failed to do so for " + state);
2389         return -1;
2390       }
2391     } catch (KeeperException e) {
2392       server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
2393       return -1;
2394     }
2395     return versionOfOfflineNode;
2396   }
2397 
2398   /**
2399    * Get a list of servers that this region can not assign to.
2400    * For system table, we must assign them to a server with highest version.
2401    * RS will report to master before register on zk, and only when RS have registered on zk we can
2402    * know the version. So in fact we will never assign a system region to a RS without registering on zk.
2403    */
2404   public List<ServerName> getExcludedServersForSystemTable() {
2405     List<Pair<ServerName, String>> serverList = new ArrayList<>();
2406     for (ServerName s : serverManager.getOnlineServersList()) {
2407       serverList.add(new Pair<>(s, server.getRegionServerVersion(s)));
2408     }
2409     if (serverList.isEmpty()) {
2410       return new ArrayList<>();
2411     }
2412     String highestVersion = Collections.max(serverList, new Comparator<Pair<ServerName, String>>() {
2413       @Override
2414       public int compare(Pair<ServerName, String> o1, Pair<ServerName, String> o2) {
2415         return VersionInfo.compareVersion(o1.getSecond(), o2.getSecond());
2416       }
2417     }).getSecond();
2418     List<ServerName> res = new ArrayList<>();
2419     for (Pair<ServerName, String> pair : serverList) {
2420       if (!pair.getSecond().equals(highestVersion)) {
2421         res.add(pair.getFirst());
2422       }
2423     }
2424     return res;
2425   }
2426 
2427 
2428   /**
2429    * @param region the region to assign
2430    * @return Plan for passed <code>region</code> (If none currently, it creates one or
2431    * if no servers to assign, it returns null).
2432    */
2433   private RegionPlan getRegionPlan(final HRegionInfo region,
2434       final boolean forceNewPlan)  throws HBaseIOException {
2435     return getRegionPlan(region, null, forceNewPlan);
2436   }
2437 
2438   /**
2439    * @param region the region to assign
2440    * @param serverToExclude Server to exclude (we know its bad). Pass null if
2441    * all servers are thought to be assignable.
2442    * @param forceNewPlan If true, then if an existing plan exists, a new plan
2443    * will be generated.
2444    * @return Plan for passed <code>region</code> (If none currently, it creates one or
2445    * if no servers to assign, it returns null).
2446    */
2447   private RegionPlan getRegionPlan(final HRegionInfo region,
2448       final ServerName serverToExclude, final boolean forceNewPlan) {
2449     // Pickup existing plan or make a new one
2450     final String encodedName = region.getEncodedName();
2451     List<ServerName> exclude = new ArrayList<>();
2452     if (region.isSystemTable()) {
2453       exclude.addAll(getExcludedServersForSystemTable());
2454     }
2455     if (serverToExclude !=null) {
2456       exclude.add(serverToExclude);
2457     }
2458     final List<ServerName> destServers =
2459       serverManager.createDestinationServersList(exclude);
2460 
2461     if (destServers.isEmpty()){
2462       LOG.warn("Can't move " + encodedName +
2463         ", there is no destination server available.");
2464       return null;
2465     }
2466 
2467     RegionPlan randomPlan = null;
2468     boolean newPlan = false;
2469     RegionPlan existingPlan;
2470 
2471     synchronized (this.regionPlans) {
2472       existingPlan = this.regionPlans.get(encodedName);
2473 
2474       if (existingPlan != null && existingPlan.getDestination() != null) {
2475         LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
2476           + " destination server is " + existingPlan.getDestination() +
2477             " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
2478       }
2479 
2480       if (forceNewPlan
2481           || existingPlan == null
2482           || existingPlan.getDestination() == null
2483           || !destServers.contains(existingPlan.getDestination())) {
2484         newPlan = true;
2485       }
2486     }
2487 
2488     if (newPlan) {
2489       ServerName destination = null;
2490       try {
2491         destination = balancer.randomAssignment(region, destServers);
2492       } catch (HBaseIOException e) {
2493         LOG.warn(e);
2494       }
2495       if (destination == null) {
2496         LOG.warn("Can't find a destination for " + encodedName);
2497         return null;
2498       }
2499       synchronized (this.regionPlans) {
2500         randomPlan = new RegionPlan(region, null, destination);
2501         if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
2502           List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
2503           regions.add(region);
2504           try {
2505             processFavoredNodes(regions);
2506           } catch (IOException ie) {
2507             LOG.warn("Ignoring exception in processFavoredNodes " + ie);
2508           }
2509         }
2510         this.regionPlans.put(encodedName, randomPlan);
2511       }
2512       LOG.debug("No previous transition plan found (or ignoring " + "an existing plan) for "
2513           + region.getRegionNameAsString() + "; generated random plan=" + randomPlan + "; "
2514           + destServers.size() + " (online=" + serverManager.getOnlineServers().size()
2515           + ") available servers, forceNewPlan=" + forceNewPlan);
2516       return randomPlan;
2517     }
2518     LOG.debug("Using pre-existing plan for " +
2519       region.getRegionNameAsString() + "; plan=" + existingPlan);
2520     return existingPlan;
2521   }
2522 
2523   /**
2524    * Wait for some time before retrying meta table region assignment
2525    */
2526   private void waitForRetryingMetaAssignment() {
2527     try {
2528       Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
2529     } catch (InterruptedException e) {
2530       LOG.error("Got exception while waiting for hbase:meta assignment");
2531       Thread.currentThread().interrupt();
2532     }
2533   }
2534 
2535   /**
2536    * Start a new thread to check if there are region servers whose versions are higher than others.
2537    * If so, move all system table regions to RS with the highest version to keep compatibility.
2538    * The reason is, RS in new version may not be able to access RS in old version when there are
2539    * some incompatible changes.
2540    */
2541   public void checkIfShouldMoveSystemRegionAsync() {
2542     new Thread(new Runnable() {
2543       @Override
2544       public void run() {
2545         try {
2546           synchronized (checkIfShouldMoveSystemRegionLock) {
2547             // RS register on ZK after reports startup on master
2548             List<HRegionInfo> regionsShouldMove = new ArrayList<>();
2549             for (ServerName server : getExcludedServersForSystemTable()) {
2550               regionsShouldMove.addAll(getCarryingSystemTables(server));
2551             }
2552             if (!regionsShouldMove.isEmpty()) {
2553               List<RegionPlan> plans = new ArrayList<>();
2554               for (HRegionInfo regionInfo : regionsShouldMove) {
2555                 RegionPlan plan = getRegionPlan(regionInfo, true);
2556                 if (regionInfo.isMetaRegion()) {
2557                   // Must move meta region first.
2558                   balance(plan);
2559                 } else {
2560                   plans.add(plan);
2561                 }
2562               }
2563               for (RegionPlan plan : plans) {
2564                 balance(plan);
2565               }
2566             }
2567           }
2568         } catch (Throwable t) {
2569           LOG.error(t);
2570         }
2571       }
2572     }).start();
2573   }
2574 
2575 
2576   /**
2577    * Unassigns the specified region.
2578    * <p>
2579    * Updates the RegionState and sends the CLOSE RPC unless region is being
2580    * split by regionserver; then the unassign fails (silently) because we
2581    * presume the region being unassigned no longer exists (its been split out
2582    * of existence). TODO: What to do if split fails and is rolled back and
2583    * parent is revivified?
2584    * <p>
2585    * If a RegionPlan is already set, it will remain.
2586    *
2587    * @param region server to be unassigned
2588    */
2589   public void unassign(HRegionInfo region) {
2590     unassign(region, false);
2591   }
2592 
2593 
2594   /**
2595    * Unassigns the specified region.
2596    * <p>
2597    * Updates the RegionState and sends the CLOSE RPC unless region is being
2598    * split by regionserver; then the unassign fails (silently) because we
2599    * presume the region being unassigned no longer exists (its been split out
2600    * of existence). TODO: What to do if split fails and is rolled back and
2601    * parent is revivified?
2602    * <p>
2603    * If a RegionPlan is already set, it will remain.
2604    *
2605    * @param region server to be unassigned
2606    * @param force if region should be closed even if already closing
2607    */
2608   public void unassign(HRegionInfo region, boolean force, ServerName dest) {
2609     // TODO: Method needs refactoring.  Ugly buried returns throughout.  Beware!
2610     LOG.debug("Starting unassign of " + region.getRegionNameAsString()
2611       + " (offlining), current state: " + regionStates.getRegionState(region));
2612 
2613     String encodedName = region.getEncodedName();
2614     // Grab the state of this region and synchronize on it
2615     int versionOfClosingNode = -1;
2616     // We need a lock here as we're going to do a put later and we don't want multiple states
2617     //  creation
2618     ReentrantLock lock = locker.acquireLock(encodedName);
2619     RegionState state = regionStates.getRegionTransitionState(encodedName);
2620     boolean reassign = true;
2621     try {
2622       if (state == null) {
2623         // Region is not in transition.
2624         // We can unassign it only if it's not SPLIT/MERGED.
2625         state = regionStates.getRegionState(encodedName);
2626         if (state != null && state.isUnassignable()) {
2627           LOG.info("Attempting to unassign " + state + ", ignored");
2628           // Offline region will be reassigned below
2629           return;
2630         }
2631         // Create the znode in CLOSING state
2632         try {
2633           if (state == null || state.getServerName() == null) {
2634             // We don't know where the region is, offline it.
2635             // No need to send CLOSE RPC
2636             LOG.warn("Attempting to unassign a region not in RegionStates "
2637               + region.getRegionNameAsString() + ", offlined");
2638             regionOffline(region);
2639             return;
2640           }
2641           if (useZKForAssignment) {
2642             versionOfClosingNode = ZKAssign.createNodeClosing(
2643               watcher, region, state.getServerName());
2644             if (versionOfClosingNode == -1) {
2645               LOG.info("Attempting to unassign " +
2646                 region.getRegionNameAsString() + " but ZK closing node "
2647                 + "can't be created.");
2648               reassign = false; // not unassigned at all
2649               return;
2650             }
2651           }
2652         } catch (KeeperException e) {
2653           if (e instanceof NodeExistsException) {
2654             // Handle race between master initiated close and regionserver
2655             // orchestrated splitting. See if existing node is in a
2656             // SPLITTING or SPLIT state.  If so, the regionserver started
2657             // an op on node before we could get our CLOSING in.  Deal.
2658             NodeExistsException nee = (NodeExistsException)e;
2659             String path = nee.getPath();
2660             try {
2661               if (isSplitOrSplittingOrMergedOrMerging(path)) {
2662                 LOG.debug(path + " is SPLIT or SPLITTING or MERGED or MERGING; " +
2663                   "skipping unassign because region no longer exists -- its split or merge");
2664                 reassign = false; // no need to reassign for split/merged region
2665                 return;
2666               }
2667             } catch (KeeperException.NoNodeException ke) {
2668               LOG.warn("Failed getData on SPLITTING/SPLIT at " + path +
2669                 "; presuming split and that the region to unassign, " +
2670                 encodedName + ", no longer exists -- confirm", ke);
2671               return;
2672             } catch (KeeperException ke) {
2673               LOG.error("Unexpected zk state", ke);
2674             } catch (DeserializationException de) {
2675               LOG.error("Failed parse", de);
2676             }
2677           }
2678           // If we get here, don't understand whats going on -- abort.
2679           server.abort("Unexpected ZK exception creating node CLOSING", e);
2680           reassign = false; // heading out already
2681           return;
2682         }
2683         state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2684       } else if (state.isFailedOpen()) {
2685         // The region is not open yet
2686         regionOffline(region);
2687         return;
2688       } else if (force && state.isPendingCloseOrClosing()) {
2689         LOG.debug("Attempting to unassign " + region.getRegionNameAsString() +
2690           " which is already " + state.getState()  +
2691           " but forcing to send a CLOSE RPC again ");
2692         if (state.isFailedClose()) {
2693           state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2694         }
2695         state.updateTimestampToNow();
2696       } else {
2697         LOG.debug("Attempting to unassign " +
2698           region.getRegionNameAsString() + " but it is " +
2699           "already in transition (" + state.getState() + ", force=" + force + ")");
2700         return;
2701       }
2702 
2703       unassign(region, state, versionOfClosingNode, dest, useZKForAssignment, null);
2704     } finally {
2705       lock.unlock();
2706 
2707       // Region is expected to be reassigned afterwards
2708       if (!replicasToClose.contains(region) && reassign && regionStates.isRegionOffline(region)) {
2709         assign(region, true);
2710       }
2711     }
2712   }
2713 
2714   public void unassign(HRegionInfo region, boolean force){
2715      unassign(region, force, null);
2716   }
2717 
2718   /**
2719    * @param region regioninfo of znode to be deleted.
2720    */
2721   public void deleteClosingOrClosedNode(HRegionInfo region, ServerName sn) {
2722     String encodedName = region.getEncodedName();
2723     deleteNodeInStates(encodedName, "closing", sn, EventType.M_ZK_REGION_CLOSING,
2724       EventType.RS_ZK_REGION_CLOSED);
2725   }
2726 
2727   /**
2728    * @param path
2729    * @return True if znode is in SPLIT or SPLITTING or MERGED or MERGING state.
2730    * @throws KeeperException Can happen if the znode went away in meantime.
2731    * @throws DeserializationException
2732    */
2733   private boolean isSplitOrSplittingOrMergedOrMerging(final String path)
2734       throws KeeperException, DeserializationException {
2735     boolean result = false;
2736     // This may fail if the SPLIT or SPLITTING or MERGED or MERGING znode gets
2737     // cleaned up before we can get data from it.
2738     byte [] data = ZKAssign.getData(watcher, path);
2739     if (data == null) {
2740       LOG.info("Node " + path + " is gone");
2741       return false;
2742     }
2743     RegionTransition rt = RegionTransition.parseFrom(data);
2744     switch (rt.getEventType()) {
2745     case RS_ZK_REQUEST_REGION_SPLIT:
2746     case RS_ZK_REGION_SPLIT:
2747     case RS_ZK_REGION_SPLITTING:
2748     case RS_ZK_REQUEST_REGION_MERGE:
2749     case RS_ZK_REGION_MERGED:
2750     case RS_ZK_REGION_MERGING:
2751       result = true;
2752       break;
2753     default:
2754       LOG.info("Node " + path + " is in " + rt.getEventType());
2755       break;
2756     }
2757     return result;
2758   }
2759 
2760   /**
2761    * Used by unit tests. Return the number of regions opened so far in the life
2762    * of the master. Increases by one every time the master opens a region
2763    * @return the counter value of the number of regions opened so far
2764    */
2765   public int getNumRegionsOpened() {
2766     return numRegionsOpened.get();
2767   }
2768 
2769   /**
2770    * Waits until the specified region has completed assignment.
2771    * <p>
2772    * If the region is already assigned, returns immediately.  Otherwise, method
2773    * blocks until the region is assigned.
2774    * @param regionInfo region to wait on assignment for
2775    * @return true if the region is assigned false otherwise.
2776    * @throws InterruptedException
2777    */
2778   public boolean waitForAssignment(HRegionInfo regionInfo)
2779       throws InterruptedException {
2780     ArrayList<HRegionInfo> regionSet = new ArrayList<HRegionInfo>(1);
2781     regionSet.add(regionInfo);
2782     return waitForAssignment(regionSet, true, Long.MAX_VALUE);
2783   }
2784 
2785   /**
2786    * Waits until the specified region has completed assignment, or the deadline is reached.
2787    */
2788   protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
2789       final boolean waitTillAllAssigned, final int reassigningRegions,
2790       final long minEndTime) throws InterruptedException {
2791     long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1);
2792     if (deadline < 0) { // Overflow
2793       deadline = Long.MAX_VALUE; // wait forever
2794     }
2795     return waitForAssignment(regionSet, waitTillAllAssigned, deadline);
2796   }
2797 
2798   /**
2799    * Waits until the specified region has completed assignment, or the deadline is reached.
2800    * @param regionSet set of region to wait on. the set is modified and the assigned regions removed
2801    * @param waitTillAllAssigned true if we should wait all the regions to be assigned
2802    * @param deadline the timestamp after which the wait is aborted
2803    * @return true if all the regions are assigned false otherwise.
2804    * @throws InterruptedException
2805    */
2806   protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
2807       final boolean waitTillAllAssigned, final long deadline) throws InterruptedException {
2808     // We're not synchronizing on regionsInTransition now because we don't use any iterator.
2809     while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) {
2810       int failedOpenCount = 0;
2811       Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
2812       while (regionInfoIterator.hasNext()) {
2813         HRegionInfo hri = regionInfoIterator.next();
2814         if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
2815             State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
2816           regionInfoIterator.remove();
2817         } else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) {
2818           failedOpenCount++;
2819         }
2820       }
2821       if (!waitTillAllAssigned) {
2822         // No need to wait, let assignment going on asynchronously
2823         break;
2824       }
2825       if (!regionSet.isEmpty()) {
2826         if (failedOpenCount == regionSet.size()) {
2827           // all the regions we are waiting had an error on open.
2828           break;
2829         }
2830         regionStates.waitForUpdate(100);
2831       }
2832     }
2833     return regionSet.isEmpty();
2834   }
2835 
2836   /**
2837    * Assigns the hbase:meta region or a replica.
2838    * <p>
2839    * Assumes that hbase:meta is currently closed and is not being actively served by
2840    * any RegionServer.
2841    * <p>
2842    * Forcibly unsets the current meta region location in ZooKeeper and assigns
2843    * hbase:meta to a random RegionServer.
2844    * @param hri TODO
2845    * @throws KeeperException
2846    */
2847   public void assignMeta(HRegionInfo hri) throws KeeperException {
2848     this.server.getMetaTableLocator().deleteMetaLocation(this.watcher, hri.getReplicaId());
2849     assign(hri, true);
2850   }
2851 
2852   /**
2853    * Assigns specified regions retaining assignments, if any.
2854    * <p>
2855    * This is a synchronous call and will return once every region has been
2856    * assigned.  If anything fails, an exception is thrown
2857    * @throws InterruptedException
2858    * @throws IOException
2859    */
2860   public void assign(Map<HRegionInfo, ServerName> regions)
2861         throws IOException, InterruptedException {
2862     if (regions == null || regions.isEmpty()) {
2863       return;
2864     }
2865     List<ServerName> servers = serverManager.createDestinationServersList();
2866     if (servers == null || servers.isEmpty()) {
2867       throw new IOException("Found no destination server to assign region(s)");
2868     }
2869 
2870     // Reuse existing assignment info
2871     Map<ServerName, List<HRegionInfo>> bulkPlan =
2872       balancer.retainAssignment(regions, servers);
2873     if (bulkPlan == null) {
2874       throw new IOException("Unable to determine a plan to assign region(s)");
2875     }
2876 
2877     processBogusAssignments(bulkPlan);
2878 
2879     assign(regions.size(), servers.size(),
2880       "retainAssignment=true", bulkPlan);
2881   }
2882 
2883   /**
2884    * Assigns specified regions round robin, if any.
2885    * <p>
2886    * This is a synchronous call and will return once every region has been
2887    * assigned.  If anything fails, an exception is thrown
2888    * @throws InterruptedException
2889    * @throws IOException
2890    */
2891   public void assign(List<HRegionInfo> regions)
2892         throws IOException, InterruptedException {
2893     if (regions == null || regions.isEmpty()) {
2894       return;
2895     }
2896 
2897     List<ServerName> servers = serverManager.createDestinationServersList();
2898     if (servers == null || servers.isEmpty()) {
2899       throw new IOException("Found no destination server to assign region(s)");
2900     }
2901 
2902     // Generate a round-robin bulk assignment plan
2903     Map<ServerName, List<HRegionInfo>> bulkPlan
2904       = balancer.roundRobinAssignment(regions, servers);
2905     if (bulkPlan == null) {
2906       throw new IOException("Unable to determine a plan to assign region(s)");
2907     }
2908 
2909     processBogusAssignments(bulkPlan);
2910 
2911     processFavoredNodes(regions);
2912     assign(regions.size(), servers.size(),
2913       "round-robin=true", bulkPlan);
2914   }
2915 
2916   private void assign(int regions, int totalServers,
2917       String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
2918           throws InterruptedException, IOException {
2919 
2920     int servers = bulkPlan.size();
2921     if (servers == 1 || (regions < bulkAssignThresholdRegions
2922         && servers < bulkAssignThresholdServers)) {
2923 
2924       // Not use bulk assignment.  This could be more efficient in small
2925       // cluster, especially mini cluster for testing, so that tests won't time out
2926       if (LOG.isTraceEnabled()) {
2927         LOG.trace("Not using bulk assignment since we are assigning only " + regions +
2928           " region(s) to " + servers + " server(s)");
2929       }
2930 
2931       // invoke assignment (async)
2932       ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions);
2933       for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
2934         if (!assign(plan.getKey(), plan.getValue())) {
2935           for (HRegionInfo region: plan.getValue()) {
2936             if (!regionStates.isRegionOnline(region)) {
2937               invokeAssign(region);
2938               if (!region.getTable().isSystemTable()) {
2939                 userRegionSet.add(region);
2940               }
2941             }
2942           }
2943         }
2944       }
2945 
2946       // wait for assignment completion
2947       if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
2948             System.currentTimeMillis())) {
2949         LOG.debug("some user regions are still in transition: " + userRegionSet);
2950       }
2951     } else {
2952       LOG.info("Bulk assigning " + regions + " region(s) across "
2953         + totalServers + " server(s), " + message);
2954 
2955       // Use fixed count thread pool assigning.
2956       BulkAssigner ba = new GeneralBulkAssigner(
2957         this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
2958       ba.bulkAssign();
2959       LOG.info("Bulk assigning done");
2960     }
2961   }
2962 
2963   /**
2964    * Assigns all user regions, if any exist.  Used during cluster startup.
2965    * <p>
2966    * This is a synchronous call and will return once every region has been
2967    * assigned.  If anything fails, an exception is thrown and the cluster
2968    * should be shutdown.
2969    * @throws InterruptedException
2970    * @throws IOException
2971    */
2972   private void assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions)
2973       throws IOException, InterruptedException {
2974     if (allRegions == null || allRegions.isEmpty()) return;
2975 
2976     // Determine what type of assignment to do on startup
2977     boolean retainAssignment = server.getConfiguration().
2978       getBoolean("hbase.master.startup.retainassign", true);
2979 
2980     Set<HRegionInfo> regionsFromMetaScan = allRegions.keySet();
2981     if (retainAssignment) {
2982       assign(allRegions);
2983     } else {
2984       List<HRegionInfo> regions = new ArrayList<HRegionInfo>(regionsFromMetaScan);
2985       assign(regions);
2986     }
2987 
2988     for (HRegionInfo hri : regionsFromMetaScan) {
2989       TableName tableName = hri.getTable();
2990       if (!tableStateManager.isTableState(tableName,
2991           ZooKeeperProtos.Table.State.ENABLED)) {
2992         setEnabledTable(tableName);
2993       }
2994     }
2995     // assign all the replicas that were not recorded in the meta
2996     assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, server));
2997   }
2998 
2999   /**
3000    * Get a list of replica regions that are:
3001    * not recorded in meta yet. We might not have recorded the locations
3002    * for the replicas since the replicas may not have been online yet, master restarted
3003    * in the middle of assigning, ZK erased, etc.
3004    * @param regionsRecordedInMeta the list of regions we know are recorded in meta
3005    * either as a default, or, as the location of a replica
3006    * @param master
3007    * @return list of replica regions
3008    * @throws IOException
3009    */
3010   public static List<HRegionInfo> replicaRegionsNotRecordedInMeta(
3011       Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)throws IOException {
3012     List<HRegionInfo> regionsNotRecordedInMeta = new ArrayList<HRegionInfo>();
3013     for (HRegionInfo hri : regionsRecordedInMeta) {
3014       TableName table = hri.getTable();
3015       HTableDescriptor htd = master.getTableDescriptors().get(table);
3016       // look at the HTD for the replica count. That's the source of truth
3017       int desiredRegionReplication = htd.getRegionReplication();
3018       for (int i = 0; i < desiredRegionReplication; i++) {
3019         HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i);
3020         if (regionsRecordedInMeta.contains(replica)) continue;
3021         regionsNotRecordedInMeta.add(replica);
3022       }
3023     }
3024     return regionsNotRecordedInMeta;
3025   }
3026 
3027   /**
3028    * Wait until no regions in transition.
3029    * @param timeout How long to wait.
3030    * @return True if nothing in regions in transition.
3031    * @throws InterruptedException
3032    */
3033   boolean waitUntilNoRegionsInTransition(final long timeout)
3034       throws InterruptedException {
3035     // Blocks until there are no regions in transition. It is possible that
3036     // there
3037     // are regions in transition immediately after this returns but guarantees
3038     // that if it returns without an exception that there was a period of time
3039     // with no regions in transition from the point-of-view of the in-memory
3040     // state of the Master.
3041     final long endTime = System.currentTimeMillis() + timeout;
3042 
3043     while (!this.server.isStopped() && regionStates.isRegionsInTransition()
3044         && endTime > System.currentTimeMillis()) {
3045       regionStates.waitForUpdate(100);
3046     }
3047 
3048     return !regionStates.isRegionsInTransition();
3049   }
3050 
3051   /**
3052    * Rebuild the list of user regions and assignment information.
3053    * <p>
3054    * Returns a set of servers that are not found to be online that hosted
3055    * some regions.
3056    * @return set of servers not online that hosted some regions per meta
3057    * @throws IOException
3058    */
3059   Set<ServerName> rebuildUserRegions() throws
3060       IOException, KeeperException, CoordinatedStateException {
3061     Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
3062       ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.ENABLING);
3063 
3064     Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
3065       ZooKeeperProtos.Table.State.DISABLED,
3066       ZooKeeperProtos.Table.State.DISABLING,
3067       ZooKeeperProtos.Table.State.ENABLING);
3068 
3069     // Region assignment from META
3070     List<Result> results = MetaTableAccessor.fullScanOfMeta(server.getConnection());
3071     // Get any new but slow to checkin region server that joined the cluster
3072     Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
3073     // Set of offline servers to be returned
3074     Set<ServerName> offlineServers = new HashSet<ServerName>();
3075     // Iterate regions in META
3076     for (Result result : results) {
3077       if (result == null && LOG.isDebugEnabled()){
3078         LOG.debug("null result from meta - ignoring but this is strange.");
3079         continue;
3080       }
3081       // keep a track of replicas to close. These were the replicas of the originally
3082       // unmerged regions. The master might have closed them before but it mightn't
3083       // maybe because it crashed.
3084       PairOfSameType<HRegionInfo> p = MetaTableAccessor.getMergeRegions(result);
3085       if (p.getFirst() != null && p.getSecond() != null) {
3086         int numReplicas = server.getTableDescriptors().get(p.getFirst().
3087             getTable()).getRegionReplication();
3088         for (HRegionInfo merge : p) {
3089           for (int i = 1; i < numReplicas; i++) {
3090             replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i));
3091           }
3092         }
3093       }
3094       RegionLocations rl =  MetaTableAccessor.getRegionLocations(result);
3095       if (rl == null) continue;
3096       HRegionLocation[] locations = rl.getRegionLocations();
3097       if (locations == null) continue;
3098       for (HRegionLocation hrl : locations) {
3099         if (hrl == null) continue;
3100         HRegionInfo regionInfo = hrl.getRegionInfo();
3101         if (regionInfo == null) continue;
3102         int replicaId = regionInfo.getReplicaId();
3103         State state = RegionStateStore.getRegionState(result, replicaId);
3104         // keep a track of replicas to close. These were the replicas of the split parents
3105         // from the previous life of the master. The master should have closed them before
3106         // but it couldn't maybe because it crashed
3107         if (replicaId == 0 && state.equals(State.SPLIT)) {
3108           for (HRegionLocation h : locations) {
3109             replicasToClose.add(h.getRegionInfo());
3110           }
3111         }
3112         ServerName lastHost = hrl.getServerName();
3113         ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId);
3114         if (tableStateManager.isTableState(regionInfo.getTable(),
3115              ZooKeeperProtos.Table.State.DISABLED)) {
3116           // force region to forget it hosts for disabled/disabling tables.
3117           // see HBASE-13326
3118           lastHost = null;
3119           regionLocation = null;
3120         }
3121         regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
3122         if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
3123           // Region is not open (either offline or in transition), skip
3124           continue;
3125         }
3126         TableName tableName = regionInfo.getTable();
3127         if (!onlineServers.contains(regionLocation)) {
3128           // Region is located on a server that isn't online
3129           offlineServers.add(regionLocation);
3130           if (useZKForAssignment) {
3131             regionStates.regionOffline(regionInfo);
3132           }
3133         } else if (!disabledOrEnablingTables.contains(tableName)) {
3134           // Region is being served and on an active server
3135           // add only if region not in disabled or enabling table
3136           regionStates.regionOnline(regionInfo, regionLocation);
3137           balancer.regionOnline(regionInfo, regionLocation);
3138         } else if (useZKForAssignment) {
3139           regionStates.regionOffline(regionInfo);
3140         }
3141         // need to enable the table if not disabled or disabling or enabling
3142         // this will be used in rolling restarts
3143         if (!disabledOrDisablingOrEnabling.contains(tableName)
3144           && !getTableStateManager().isTableState(tableName,
3145             ZooKeeperProtos.Table.State.ENABLED)) {
3146           setEnabledTable(tableName);
3147         }
3148       }
3149     }
3150     return offlineServers;
3151   }
3152 
3153   /**
3154    * Recover the tables that were not fully moved to DISABLED state. These
3155    * tables are in DISABLING state when the master restarted/switched.
3156    *
3157    * @throws KeeperException
3158    * @throws TableNotFoundException
3159    * @throws IOException
3160    */
3161   private void recoverTableInDisablingState()
3162       throws KeeperException, IOException, CoordinatedStateException {
3163     Set<TableName> disablingTables =
3164       tableStateManager.getTablesInStates(ZooKeeperProtos.Table.State.DISABLING);
3165     if (disablingTables.size() != 0) {
3166       for (TableName tableName : disablingTables) {
3167         // Recover by calling DisableTableHandler
3168         LOG.info("The table " + tableName
3169             + " is in DISABLING state.  Hence recovering by moving the table"
3170             + " to DISABLED state.");
3171         new DisableTableHandler(this.server, tableName,
3172             this, tableLockManager, true).prepare().process();
3173       }
3174     }
3175   }
3176 
3177   /**
3178    * Recover the tables that are not fully moved to ENABLED state. These tables
3179    * are in ENABLING state when the master restarted/switched
3180    *
3181    * @throws KeeperException
3182    * @throws org.apache.hadoop.hbase.TableNotFoundException
3183    * @throws IOException
3184    */
3185   private void recoverTableInEnablingState()
3186       throws KeeperException, IOException, CoordinatedStateException {
3187     Set<TableName> enablingTables = tableStateManager.
3188       getTablesInStates(ZooKeeperProtos.Table.State.ENABLING);
3189     if (enablingTables.size() != 0) {
3190       for (TableName tableName : enablingTables) {
3191         // Recover by calling EnableTableHandler
3192         LOG.info("The table " + tableName
3193             + " is in ENABLING state.  Hence recovering by moving the table"
3194             + " to ENABLED state.");
3195         // enableTable in sync way during master startup,
3196         // no need to invoke coprocessor
3197         EnableTableHandler eth = new EnableTableHandler(this.server, tableName,
3198           this, tableLockManager, true);
3199         try {
3200           eth.prepare();
3201         } catch (TableNotFoundException e) {
3202           LOG.warn("Table " + tableName + " not found in hbase:meta to recover.");
3203           continue;
3204         }
3205         eth.process();
3206       }
3207     }
3208   }
3209 
3210   /**
3211    * Processes list of dead servers from result of hbase:meta scan and regions in RIT
3212    * <p>
3213    * This is used for failover to recover the lost regions that belonged to
3214    * RegionServers which failed while there was no active master or regions
3215    * that were in RIT.
3216    * <p>
3217    *
3218    *
3219    * @param deadServers
3220    *          The list of dead servers which failed while there was no active
3221    *          master. Can be null.
3222    * @throws IOException
3223    * @throws KeeperException
3224    */
3225   private void processDeadServersAndRecoverLostRegions(
3226       Set<ServerName> deadServers) throws IOException, KeeperException {
3227     if (deadServers != null && !deadServers.isEmpty()) {
3228       for (ServerName serverName: deadServers) {
3229         if (!serverManager.isServerDead(serverName)) {
3230           serverManager.expireServer(serverName); // Let SSH do region re-assign
3231         }
3232       }
3233     }
3234 
3235     List<String> nodes = useZKForAssignment ?
3236       ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.assignmentZNode)
3237       : ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
3238     if (nodes != null && !nodes.isEmpty()) {
3239       for (String encodedRegionName : nodes) {
3240         processRegionInTransition(encodedRegionName, null);
3241       }
3242     } else if (!useZKForAssignment) {
3243       processRegionInTransitionZkLess();
3244     }
3245   }
3246 
3247   void processRegionInTransitionZkLess() {
3248  // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions
3249     // in case the RPC call is not sent out yet before the master was shut down
3250     // since we update the state before we send the RPC call. We can't update
3251     // the state after the RPC call. Otherwise, we don't know what's happened
3252     // to the region if the master dies right after the RPC call is out.
3253     Map<String, RegionState> rits = regionStates.getRegionsInTransition();
3254     for (RegionState regionState : rits.values()) {
3255       LOG.info("Processing " + regionState);
3256       ServerName serverName = regionState.getServerName();
3257       // Server could be null in case of FAILED_OPEN when master cannot find a region plan. In that
3258       // case, try assigning it here.
3259       if (serverName != null
3260           && !serverManager.getOnlineServers().containsKey(serverName)) {
3261         LOG.info("Server " + serverName + " isn't online. SSH will handle this");
3262         continue;
3263       }
3264       HRegionInfo regionInfo = regionState.getRegion();
3265       State state = regionState.getState();
3266 
3267       switch (state) {
3268       case CLOSED:
3269         invokeAssign(regionInfo);
3270         break;
3271       case PENDING_OPEN:
3272         retrySendRegionOpen(regionState);
3273         break;
3274       case PENDING_CLOSE:
3275         retrySendRegionClose(regionState);
3276         break;
3277       case FAILED_CLOSE:
3278       case FAILED_OPEN:
3279         invokeUnAssign(regionInfo);
3280         break;
3281       default:
3282         // No process for other states
3283       }
3284     }
3285   }
3286 
3287   /**
3288    * At master failover, for pending_open region, make sure
3289    * sendRegionOpen RPC call is sent to the target regionserver
3290    */
3291   private void retrySendRegionOpen(final RegionState regionState) {
3292     this.executorService.submit(
3293       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
3294         @Override
3295         public void process() throws IOException {
3296           HRegionInfo hri = regionState.getRegion();
3297           ServerName serverName = regionState.getServerName();
3298           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
3299           try {
3300             for (int i = 1; i <= maximumAttempts; i++) {
3301               if (!serverManager.isServerOnline(serverName)
3302                   || server.isStopped() || server.isAborted()) {
3303                 return; // No need any more
3304               }
3305               try {
3306                 if (!regionState.equals(regionStates.getRegionState(hri))) {
3307                   return; // Region is not in the expected state any more
3308                 }
3309                 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
3310                 if (shouldAssignRegionsWithFavoredNodes) {
3311                   favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri);
3312                 }
3313                 RegionOpeningState regionOpenState = serverManager.sendRegionOpen(
3314                   serverName, hri, -1, favoredNodes);
3315 
3316                 if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
3317                   // Failed opening this region, this means the target server didn't get
3318                   // the original region open RPC, so re-assign it with a new plan
3319                   LOG.debug("Got failed_opening in retry sendRegionOpen for "
3320                     + regionState + ", re-assign it");
3321                   invokeAssign(hri, true);
3322                 }
3323                 return; // Done.
3324               } catch (Throwable t) {
3325                 if (t instanceof RemoteException) {
3326                   t = ((RemoteException) t).unwrapRemoteException();
3327                 }
3328                 // In case SocketTimeoutException/FailedServerException, retry
3329                 if (t instanceof java.net.SocketTimeoutException
3330                     || t instanceof FailedServerException) {
3331                   Threads.sleep(100);
3332                   continue;
3333                 }
3334                 // For other exceptions, re-assign it
3335                 LOG.debug("Got exception in retry sendRegionOpen for "
3336                   + regionState + ", re-assign it", t);
3337                 invokeAssign(hri);
3338                 return; // Done.
3339               }
3340             }
3341           } finally {
3342             lock.unlock();
3343           }
3344         }
3345       });
3346   }
3347 
3348   /**
3349    * At master failover, for pending_close region, make sure
3350    * sendRegionClose RPC call is sent to the target regionserver
3351    */
3352   private void retrySendRegionClose(final RegionState regionState) {
3353     this.executorService.submit(
3354       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
3355         @Override
3356         public void process() throws IOException {
3357           HRegionInfo hri = regionState.getRegion();
3358           ServerName serverName = regionState.getServerName();
3359           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
3360           try {
3361             for (int i = 1; i <= maximumAttempts; i++) {
3362               if (!serverManager.isServerOnline(serverName)
3363                   || server.isStopped() || server.isAborted()) {
3364                 return; // No need any more
3365               }
3366               try {
3367                 if (!regionState.equals(regionStates.getRegionState(hri))) {
3368                   return; // Region is not in the expected state any more
3369                 }
3370                 if (!serverManager.sendRegionClose(serverName, hri, -1, null, false)) {
3371                   // This means the region is still on the target server
3372                   LOG.debug("Got false in retry sendRegionClose for "
3373                     + regionState + ", re-close it");
3374                   invokeUnAssign(hri);
3375                 }
3376                 return; // Done.
3377               } catch (Throwable t) {
3378                 if (t instanceof RemoteException) {
3379                   t = ((RemoteException) t).unwrapRemoteException();
3380                 }
3381                 // In case SocketTimeoutException/FailedServerException, retry
3382                 if (t instanceof java.net.SocketTimeoutException
3383                     || t instanceof FailedServerException) {
3384                   Threads.sleep(100);
3385                   continue;
3386                 }
3387                 if (!(t instanceof NotServingRegionException
3388                     || t instanceof RegionAlreadyInTransitionException)) {
3389                   // NotServingRegionException/RegionAlreadyInTransitionException
3390                   // means the target server got the original region close request.
3391                   // For other exceptions, re-close it
3392                   LOG.debug("Got exception in retry sendRegionClose for "
3393                     + regionState + ", re-close it", t);
3394                   invokeUnAssign(hri);
3395                 }
3396                 return; // Done.
3397               }
3398             }
3399           } finally {
3400             lock.unlock();
3401           }
3402         }
3403       });
3404   }
3405 
3406   /**
3407    * Set Regions in transitions metrics.
3408    * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
3409    * This iterator is not fail fast, which may lead to stale read; but that's better than
3410    * creating a copy of the map for metrics computation, as this method will be invoked
3411    * on a frequent interval.
3412    */
3413   public void updateRegionsInTransitionMetrics() {
3414     long currentTime = System.currentTimeMillis();
3415     int totalRITs = 0;
3416     int totalRITsOverThreshold = 0;
3417     long oldestRITTime = 0;
3418     int ritThreshold = this.server.getConfiguration().
3419       getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
3420     for (RegionState state: regionStates.getRegionsInTransition().values()) {
3421       totalRITs++;
3422       long ritTime = currentTime - state.getStamp();
3423       if (ritTime > ritThreshold) { // more than the threshold
3424         totalRITsOverThreshold++;
3425       }
3426       if (oldestRITTime < ritTime) {
3427         oldestRITTime = ritTime;
3428       }
3429     }
3430     if (this.metricsAssignmentManager != null) {
3431       this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
3432       this.metricsAssignmentManager.updateRITCount(totalRITs);
3433       this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
3434     }
3435   }
3436 
3437   /**
3438    * @param region Region whose plan we are to clear.
3439    */
3440   void clearRegionPlan(final HRegionInfo region) {
3441     synchronized (this.regionPlans) {
3442       this.regionPlans.remove(region.getEncodedName());
3443     }
3444   }
3445 
3446   /**
3447    * Wait on region to clear regions-in-transition.
3448    * @param hri Region to wait on.
3449    * @throws IOException
3450    */
3451   public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
3452       throws IOException, InterruptedException {
3453     waitOnRegionToClearRegionsInTransition(hri, -1L);
3454   }
3455 
3456   /**
3457    * Wait on region to clear regions-in-transition or time out
3458    * @param hri
3459    * @param timeOut Milliseconds to wait for current region to be out of transition state.
3460    * @return True when a region clears regions-in-transition before timeout otherwise false
3461    * @throws InterruptedException
3462    */
3463   public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
3464       throws InterruptedException {
3465     if (!regionStates.isRegionInTransition(hri)) return true;
3466     long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTime()
3467         + timeOut;
3468     // There is already a timeout monitor on regions in transition so I
3469     // should not have to have one here too?
3470     LOG.info("Waiting for " + hri.getEncodedName() +
3471         " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
3472     while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
3473       regionStates.waitForUpdate(100);
3474       if (EnvironmentEdgeManager.currentTime() > end) {
3475         LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
3476         return false;
3477       }
3478     }
3479     if (this.server.isStopped()) {
3480       LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
3481       return false;
3482     }
3483     return true;
3484   }
3485 
3486   void invokeAssign(HRegionInfo regionInfo) {
3487     invokeAssign(regionInfo, true);
3488   }
3489 
3490   void invokeAssign(HRegionInfo regionInfo, boolean newPlan) {
3491     threadPoolExecutorService.submit(new AssignCallable(this, regionInfo, newPlan));
3492   }
3493 
3494   void invokeUnAssign(HRegionInfo regionInfo) {
3495     threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
3496   }
3497 
3498   public ServerHostRegion isCarryingMeta(ServerName serverName) {
3499     return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
3500   }
3501 
3502   public ServerHostRegion isCarryingMetaReplica(ServerName serverName, int replicaId) {
3503     return isCarryingRegion(serverName,
3504         RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, replicaId));
3505   }
3506 
3507   public ServerHostRegion isCarryingMetaReplica(ServerName serverName, HRegionInfo metaHri) {
3508     return isCarryingRegion(serverName, metaHri);
3509   }
3510 
3511   private List<HRegionInfo> getCarryingSystemTables(ServerName serverName) {
3512     Set<HRegionInfo> regions = this.getRegionStates().getServerRegions(serverName);
3513     if (regions == null) {
3514       return new ArrayList<>();
3515     }
3516     List<HRegionInfo> list = new ArrayList<>();
3517     for (HRegionInfo region : regions) {
3518       if (region.isSystemTable()) {
3519         list.add(region);
3520       }
3521     }
3522     return list;
3523   }
3524 
3525   /**
3526    * Check if the shutdown server carries the specific region.
3527    * We have a bunch of places that store region location
3528    * Those values aren't consistent. There is a delay of notification.
3529    * The location from zookeeper unassigned node has the most recent data;
3530    * but the node could be deleted after the region is opened by AM.
3531    * The AM's info could be old when OpenedRegionHandler
3532    * processing hasn't finished yet when server shutdown occurs.
3533    * @return whether the serverName currently hosts the region
3534    */
3535   private ServerHostRegion isCarryingRegion(ServerName serverName, HRegionInfo hri) {
3536     RegionTransition rt = null;
3537     try {
3538       byte [] data = ZKAssign.getData(watcher, hri.getEncodedName());
3539       // This call can legitimately come by null
3540       rt = data == null? null: RegionTransition.parseFrom(data);
3541     } catch (KeeperException e) {
3542       server.abort("Exception reading unassigned node for region=" + hri.getEncodedName(), e);
3543     } catch (DeserializationException e) {
3544       server.abort("Exception parsing unassigned node for region=" + hri.getEncodedName(), e);
3545     }
3546 
3547     ServerName addressFromZK = rt != null? rt.getServerName():  null;
3548     if (addressFromZK != null) {
3549       // if we get something from ZK, we will use the data
3550       boolean matchZK = addressFromZK.equals(serverName);
3551       LOG.debug("Checking region=" + hri.getRegionNameAsString() + ", zk server=" + addressFromZK +
3552         " current=" + serverName + ", matches=" + matchZK);
3553       return matchZK ? ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION;
3554     }
3555 
3556     ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
3557     if (addressFromAM != null) {
3558       boolean matchAM = addressFromAM.equals(serverName);
3559       LOG.debug("based on AM, current region=" + hri.getRegionNameAsString() +
3560         " is on server=" + (addressFromAM != null ? addressFromAM : "null") +
3561         " server being checked: " + serverName);
3562       return matchAM ? ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION;
3563     }
3564 
3565     if (hri.isMetaRegion() && RegionReplicaUtil.isDefaultReplica(hri)) {
3566       // For the Meta region (default replica), we can do one more check on MetaTableLocator
3567       final ServerName serverNameInZK =
3568           server.getMetaTableLocator().getMetaRegionLocation(this.server.getZooKeeper());
3569       LOG.debug("Based on MetaTableLocator, the META region is on server="
3570           + (serverNameInZK == null ? "null" : serverNameInZK)
3571           + " server being checked: " + serverName);
3572       if (serverNameInZK != null) {
3573         return serverNameInZK.equals(serverName) ?
3574             ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION;
3575       }
3576     }
3577     // Checked everywhere, if reaching here, we are sure that the server is not
3578     // carrying region.
3579     return ServerHostRegion.UNKNOWN;
3580   }
3581 
3582   /**
3583    * Process shutdown server removing any assignments.
3584    * @param sn Server that went down.
3585    * @return list of regions in transition on this server
3586    */
3587   public List<HRegionInfo> processServerShutdown(final ServerName sn) {
3588     // Clean out any existing assignment plans for this server
3589     synchronized (this.regionPlans) {
3590       for (Iterator <Map.Entry<String, RegionPlan>> i =
3591           this.regionPlans.entrySet().iterator(); i.hasNext();) {
3592         Map.Entry<String, RegionPlan> e = i.next();
3593         ServerName otherSn = e.getValue().getDestination();
3594         // The name will be null if the region is planned for a random assign.
3595         if (otherSn != null && otherSn.equals(sn)) {
3596           // Use iterator's remove else we'll get CME
3597           i.remove();
3598         }
3599       }
3600     }
3601     List<HRegionInfo> regions = regionStates.serverOffline(watcher, sn);
3602     for (Iterator<HRegionInfo> it = regions.iterator(); it.hasNext(); ) {
3603       HRegionInfo hri = it.next();
3604       String encodedName = hri.getEncodedName();
3605 
3606       // We need a lock on the region as we could update it
3607       Lock lock = locker.acquireLock(encodedName);
3608       try {
3609         RegionState regionState =
3610           regionStates.getRegionTransitionState(encodedName);
3611         if (regionState == null
3612             || (regionState.getServerName() != null && !regionState.isOnServer(sn))
3613             || !(regionState.isFailedClose() || regionState.isOffline()
3614               || regionState.isPendingOpenOrOpening())) {
3615           LOG.info("Skip " + regionState + " since it is not opening/failed_close"
3616             + " on the dead server any more: " + sn);
3617           it.remove();
3618         } else {
3619           try {
3620             // Delete the ZNode if exists
3621             ZKAssign.deleteNodeFailSilent(watcher, hri);
3622           } catch (KeeperException ke) {
3623             server.abort("Unexpected ZK exception deleting node " + hri, ke);
3624           }
3625           if (tableStateManager.isTableState(hri.getTable(),
3626               ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3627             regionStates.regionOffline(hri);
3628             it.remove();
3629             continue;
3630           }
3631           // Mark the region offline and assign it again by SSH
3632           regionStates.updateRegionState(hri, State.OFFLINE);
3633         }
3634       } finally {
3635         lock.unlock();
3636       }
3637     }
3638     return regions;
3639   }
3640 
3641   /**
3642    * @param plan Plan to execute.
3643    */
3644   public void balance(final RegionPlan plan) {
3645 
3646     HRegionInfo hri = plan.getRegionInfo();
3647     TableName tableName = hri.getTable();
3648     if (tableStateManager.isTableState(tableName,
3649       ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3650       LOG.info("Ignored moving region of disabling/disabled table "
3651         + tableName);
3652       return;
3653     }
3654 
3655     // Move the region only if it's assigned
3656     String encodedName = hri.getEncodedName();
3657     ReentrantLock lock = locker.acquireLock(encodedName);
3658     try {
3659       if (!regionStates.isRegionOnline(hri)) {
3660         RegionState state = regionStates.getRegionState(encodedName);
3661         LOG.info("Ignored moving region not assigned: " + hri + ", "
3662           + (state == null ? "not in region states" : state));
3663         return;
3664       }
3665       synchronized (this.regionPlans) {
3666         this.regionPlans.put(plan.getRegionName(), plan);
3667       }
3668       unassign(hri, false, plan.getDestination());
3669     } finally {
3670       lock.unlock();
3671     }
3672   }
3673 
3674   public void stop() {
3675     shutdown(); // Stop executor service, etc
3676   }
3677 
3678   /**
3679    * Shutdown the threadpool executor service
3680    */
3681   public void shutdown() {
3682     // It's an immediate shutdown, so we're clearing the remaining tasks.
3683     synchronized (zkEventWorkerWaitingList){
3684       zkEventWorkerWaitingList.clear();
3685     }
3686 
3687     // Shutdown the threadpool executor service
3688     threadPoolExecutorService.shutdownNow();
3689     zkEventWorkers.shutdownNow();
3690     regionStateStore.stop();
3691   }
3692 
3693   protected void setEnabledTable(TableName tableName) {
3694     try {
3695       this.tableStateManager.setTableState(tableName,
3696         ZooKeeperProtos.Table.State.ENABLED);
3697     } catch (CoordinatedStateException e) {
3698       // here we can abort as it is the start up flow
3699       String errorMsg = "Unable to ensure that the table " + tableName
3700           + " will be" + " enabled because of a ZooKeeper issue";
3701       LOG.error(errorMsg);
3702       this.server.abort(errorMsg, e);
3703     }
3704   }
3705 
3706   /**
3707    * Set region as OFFLINED up in zookeeper asynchronously.
3708    * @param state
3709    * @return True if we succeeded, false otherwise (State was incorrect or failed
3710    * updating zk).
3711    */
3712   private boolean asyncSetOfflineInZooKeeper(final RegionState state,
3713       final AsyncCallback.StringCallback cb, final ServerName destination) {
3714     if (!state.isClosed() && !state.isOffline()) {
3715       this.server.abort("Unexpected state trying to OFFLINE; " + state,
3716         new IllegalStateException());
3717       return false;
3718     }
3719     regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
3720     try {
3721       ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
3722         destination, cb, state);
3723     } catch (KeeperException e) {
3724       if (e instanceof NodeExistsException) {
3725         LOG.warn("Node for " + state.getRegion() + " already exists");
3726       } else {
3727         server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
3728       }
3729       return false;
3730     }
3731     return true;
3732   }
3733 
3734   private boolean deleteNodeInStates(String encodedName,
3735       String desc, ServerName sn, EventType... types) {
3736     try {
3737       for (EventType et: types) {
3738         if (ZKAssign.deleteNode(watcher, encodedName, et, sn)) {
3739           return true;
3740         }
3741       }
3742       LOG.info("Failed to delete the " + desc + " node for "
3743         + encodedName + ". The node type may not match");
3744     } catch (NoNodeException e) {
3745       if (LOG.isDebugEnabled()) {
3746         LOG.debug("The " + desc + " node for " + encodedName + " already deleted");
3747       }
3748     } catch (KeeperException ke) {
3749       server.abort("Unexpected ZK exception deleting " + desc
3750         + " node for the region " + encodedName, ke);
3751     }
3752     return false;
3753   }
3754 
3755   private void deleteMergingNode(String encodedName, ServerName sn) {
3756     deleteNodeInStates(encodedName, "merging", sn, EventType.RS_ZK_REGION_MERGING,
3757       EventType.RS_ZK_REQUEST_REGION_MERGE, EventType.RS_ZK_REGION_MERGED);
3758   }
3759 
3760   private void deleteSplittingNode(String encodedName, ServerName sn) {
3761     deleteNodeInStates(encodedName, "splitting", sn, EventType.RS_ZK_REGION_SPLITTING,
3762       EventType.RS_ZK_REQUEST_REGION_SPLIT, EventType.RS_ZK_REGION_SPLIT);
3763   }
3764 
3765   private void onRegionFailedOpen(
3766       final HRegionInfo hri, final ServerName sn) {
3767     String encodedName = hri.getEncodedName();
3768     AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
3769     if (failedOpenCount == null) {
3770       failedOpenCount = new AtomicInteger();
3771       // No need to use putIfAbsent, or extra synchronization since
3772       // this whole handleRegion block is locked on the encoded region
3773       // name, and failedOpenTracker is updated only in this block
3774       failedOpenTracker.put(encodedName, failedOpenCount);
3775     }
3776     if (failedOpenCount.incrementAndGet() >= maximumAttempts && !hri.isMetaRegion()) {
3777       regionStates.updateRegionState(hri, State.FAILED_OPEN);
3778       // remove the tracking info to save memory, also reset
3779       // the count for next open initiative
3780       failedOpenTracker.remove(encodedName);
3781     } else {
3782       if (hri.isMetaRegion() && failedOpenCount.get() >= maximumAttempts) {
3783         // Log a warning message if a meta region failedOpenCount exceeds maximumAttempts
3784         // so that we are aware of potential problem if it persists for a long time.
3785         LOG.warn("Failed to open the hbase:meta region " +
3786             hri.getRegionNameAsString() + " after" +
3787             failedOpenCount.get() + " retries. Continue retrying.");
3788       }
3789 
3790       // Handle this the same as if it were opened and then closed.
3791       RegionState regionState = regionStates.updateRegionState(hri, State.CLOSED);
3792       if (regionState != null) {
3793         // When there are more than one region server a new RS is selected as the
3794         // destination and the same is updated in the region plan. (HBASE-5546)
3795         if (getTableStateManager().isTableState(hri.getTable(),
3796             ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
3797             replicasToClose.contains(hri)) {
3798           offlineDisabledRegion(hri);
3799           return;
3800         }
3801         // ZK Node is in CLOSED state, assign it.
3802          regionStates.updateRegionState(hri, RegionState.State.CLOSED);
3803         // This below has to do w/ online enable/disable of a table
3804         removeClosedRegion(hri);
3805         getRegionPlan(hri, sn, true);
3806         invokeAssign(hri, false);
3807       }
3808     }
3809   }
3810 
3811   private void onRegionOpen(
3812       final HRegionInfo hri, final ServerName sn, long openSeqNum) {
3813     regionOnline(hri, sn, openSeqNum);
3814     if (useZKForAssignment) {
3815       try {
3816         // Delete the ZNode if exists
3817         ZKAssign.deleteNodeFailSilent(watcher, hri);
3818       } catch (KeeperException ke) {
3819         server.abort("Unexpected ZK exception deleting node " + hri, ke);
3820       }
3821     }
3822 
3823     // reset the count, if any
3824     failedOpenTracker.remove(hri.getEncodedName());
3825     if (getTableStateManager().isTableState(hri.getTable(),
3826         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3827       invokeUnAssign(hri);
3828     }
3829   }
3830 
3831   private void onRegionClosed(final HRegionInfo hri) {
3832     if (getTableStateManager().isTableState(hri.getTable(),
3833         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
3834         replicasToClose.contains(hri)) {
3835       offlineDisabledRegion(hri);
3836       return;
3837     }
3838     regionStates.updateRegionState(hri, RegionState.State.CLOSED);
3839     sendRegionClosedNotification(hri);
3840     // This below has to do w/ online enable/disable of a table
3841     removeClosedRegion(hri);
3842     invokeAssign(hri, false);
3843   }
3844 
3845   private String onRegionSplit(ServerName sn, TransitionCode code,
3846       final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3847     final RegionState rs_p = regionStates.getRegionState(p);
3848     RegionState rs_a = regionStates.getRegionState(a);
3849     RegionState rs_b = regionStates.getRegionState(b);
3850     if (!(rs_p.isOpenOrSplittingOnServer(sn)
3851         && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
3852         && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
3853       return "Not in state good for split";
3854     }
3855     regionStates.updateRegionState(a, State.SPLITTING_NEW, sn);
3856     regionStates.updateRegionState(b, State.SPLITTING_NEW, sn);
3857     regionStates.updateRegionState(p, State.SPLITTING);
3858 
3859     if (code == TransitionCode.SPLIT) {
3860       if (TEST_SKIP_SPLIT_HANDLING) {
3861         return "Skipping split message, TEST_SKIP_SPLIT_HANDLING is set";
3862       }
3863       regionOffline(p, State.SPLIT);
3864       regionOnline(a, sn, 1);
3865       regionOnline(b, sn, 1);
3866 
3867       // User could disable the table before master knows the new region.
3868       if (getTableStateManager().isTableState(p.getTable(),
3869           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3870         invokeUnAssign(a);
3871         invokeUnAssign(b);
3872       } else {
3873         Callable<Object> splitReplicasCallable = new Callable<Object>() {
3874           @Override
3875           public Object call() {
3876             doSplittingOfReplicas(p, a, b);
3877             return null;
3878           }
3879         };
3880         threadPoolExecutorService.submit(splitReplicasCallable);
3881       }
3882     } else if (code == TransitionCode.SPLIT_PONR) {
3883       try {
3884         regionStates.splitRegion(p, a, b, sn);
3885       } catch (IOException ioe) {
3886         LOG.info("Failed to record split region " + p.getShortNameToLog());
3887         return "Failed to record the splitting in meta";
3888       }
3889     } else if (code == TransitionCode.SPLIT_REVERTED) {
3890       // Always bring the parent back online. Even if it's not offline
3891       // There's no harm in making it online again.
3892       regionOnline(p, sn);
3893 
3894       // Only offline the region if they are known to exist.
3895       RegionState regionStateA = regionStates.getRegionState(a);
3896       RegionState regionStateB = regionStates.getRegionState(b);
3897       if (regionStateA != null) {
3898         regionOffline(a);
3899       }
3900       if (regionStateB != null) {
3901         regionOffline(b);
3902       }
3903 
3904       if (getTableStateManager().isTableState(p.getTable(),
3905           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3906         invokeUnAssign(p);
3907       }
3908     }
3909     return null;
3910   }
3911 
3912   private String onRegionMerge(ServerName sn, TransitionCode code,
3913       final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3914     RegionState rs_p = regionStates.getRegionState(p);
3915     RegionState rs_a = regionStates.getRegionState(a);
3916     RegionState rs_b = regionStates.getRegionState(b);
3917     if (!(rs_a.isOpenOrMergingOnServer(sn) && rs_b.isOpenOrMergingOnServer(sn)
3918         && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
3919       return "Not in state good for merge";
3920     }
3921     regionStates.updateRegionState(a, State.MERGING);
3922     regionStates.updateRegionState(b, State.MERGING);
3923     regionStates.updateRegionState(p, State.MERGING_NEW, sn);
3924 
3925     String encodedName = p.getEncodedName();
3926     if (code == TransitionCode.READY_TO_MERGE) {
3927       mergingRegions.put(encodedName,
3928         new PairOfSameType<HRegionInfo>(a, b));
3929     } else if (code == TransitionCode.MERGED) {
3930       mergingRegions.remove(encodedName);
3931       regionOffline(a, State.MERGED);
3932       regionOffline(b, State.MERGED);
3933       regionOnline(p, sn, 1);
3934 
3935       // User could disable the table before master knows the new region.
3936       if (getTableStateManager().isTableState(p.getTable(),
3937           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3938         invokeUnAssign(p);
3939       } else {
3940         Callable<Object> mergeReplicasCallable = new Callable<Object>() {
3941           @Override
3942           public Object call() {
3943             doMergingOfReplicas(p, a, b);
3944             return null;
3945           }
3946         };
3947         threadPoolExecutorService.submit(mergeReplicasCallable);
3948       }
3949     } else if (code == TransitionCode.MERGE_PONR) {
3950       try {
3951         regionStates.mergeRegions(p, a, b, sn);
3952       } catch (IOException ioe) {
3953         LOG.info("Failed to record merged region " + p.getShortNameToLog());
3954         return "Failed to record the merging in meta";
3955       }
3956     }
3957     return null;
3958   }
3959 
3960   private String onRegionMergeReverted(ServerName sn, TransitionCode code,
3961 	      final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3962     RegionState rs_p = regionStates.getRegionState(p);
3963     String encodedName = p.getEncodedName();
3964     mergingRegions.remove(encodedName);
3965 
3966     // Always bring the children back online. Even if they are not offline
3967     // there's no harm in making them online again.
3968     regionOnline(a, sn);
3969     regionOnline(b, sn);
3970 
3971     // Only offline the merging region if it is known to exist.
3972     if (rs_p != null) {
3973       regionOffline(p);
3974     }
3975 
3976     if (getTableStateManager().isTableState(p.getTable(),
3977         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3978       invokeUnAssign(a);
3979       invokeUnAssign(b);
3980     }
3981 
3982     return null;
3983   }
3984 
3985   /**
3986    * A helper to handle region merging transition event.
3987    * It transitions merging regions to MERGING state.
3988    */
3989   private boolean handleRegionMerging(final RegionTransition rt, final String encodedName,
3990       final String prettyPrintedRegionName, final ServerName sn) {
3991     if (!serverManager.isServerOnline(sn)) {
3992       LOG.warn("Dropped merging! ServerName=" + sn + " unknown.");
3993       return false;
3994     }
3995     byte [] payloadOfMerging = rt.getPayload();
3996     List<HRegionInfo> mergingRegions;
3997     try {
3998       mergingRegions = HRegionInfo.parseDelimitedFrom(
3999         payloadOfMerging, 0, payloadOfMerging.length);
4000     } catch (IOException e) {
4001       LOG.error("Dropped merging! Failed reading "  + rt.getEventType()
4002         + " payload for " + prettyPrintedRegionName);
4003       return false;
4004     }
4005     assert mergingRegions.size() == 3;
4006     HRegionInfo p = mergingRegions.get(0);
4007     HRegionInfo hri_a = mergingRegions.get(1);
4008     HRegionInfo hri_b = mergingRegions.get(2);
4009 
4010     RegionState rs_p = regionStates.getRegionState(p);
4011     RegionState rs_a = regionStates.getRegionState(hri_a);
4012     RegionState rs_b = regionStates.getRegionState(hri_b);
4013 
4014     if (!((rs_a == null || rs_a.isOpenOrMergingOnServer(sn))
4015         && (rs_b == null || rs_b.isOpenOrMergingOnServer(sn))
4016         && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
4017       LOG.warn("Dropped merging! Not in state good for MERGING; rs_p="
4018         + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
4019       return false;
4020     }
4021 
4022     EventType et = rt.getEventType();
4023     if (et == EventType.RS_ZK_REQUEST_REGION_MERGE) {
4024       try {
4025         RegionMergeCoordination.RegionMergeDetails std =
4026             ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
4027                 .getRegionMergeCoordination().getDefaultDetails();
4028         ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
4029             .getRegionMergeCoordination().processRegionMergeRequest(p, hri_a, hri_b, sn, std);
4030         if (((ZkRegionMergeCoordination.ZkRegionMergeDetails) std).getZnodeVersion() == -1) {
4031           byte[] data = ZKAssign.getData(watcher, encodedName);
4032          EventType currentType = null;
4033           if (data != null) {
4034             RegionTransition newRt = RegionTransition.parseFrom(data);
4035             currentType = newRt.getEventType();
4036           }
4037           if (currentType == null || (currentType != EventType.RS_ZK_REGION_MERGED
4038               && currentType != EventType.RS_ZK_REGION_MERGING)) {
4039             LOG.warn("Failed to transition pending_merge node "
4040               + encodedName + " to merging, it's now " + currentType);
4041             return false;
4042           }
4043         }
4044       } catch (Exception e) {
4045         LOG.warn("Failed to transition pending_merge node "
4046           + encodedName + " to merging", e);
4047         return false;
4048       }
4049     }
4050 
4051     synchronized (regionStates) {
4052       regionStates.updateRegionState(hri_a, State.MERGING);
4053       regionStates.updateRegionState(hri_b, State.MERGING);
4054       regionStates.updateRegionState(p, State.MERGING_NEW, sn);
4055 
4056       if (et != EventType.RS_ZK_REGION_MERGED) {
4057         this.mergingRegions.put(encodedName,
4058           new PairOfSameType<HRegionInfo>(hri_a, hri_b));
4059       } else {
4060         this.mergingRegions.remove(encodedName);
4061         regionOffline(hri_a, State.MERGED);
4062         regionOffline(hri_b, State.MERGED);
4063         regionOnline(p, sn);
4064       }
4065     }
4066 
4067     if (et == EventType.RS_ZK_REGION_MERGED) {
4068       doMergingOfReplicas(p, hri_a, hri_b);
4069       LOG.debug("Handling MERGED event for " + encodedName + "; deleting node");
4070       // Remove region from ZK
4071       try {
4072         boolean successful = false;
4073         while (!successful) {
4074           // It's possible that the RS tickles in between the reading of the
4075           // znode and the deleting, so it's safe to retry.
4076           successful = ZKAssign.deleteNode(watcher, encodedName,
4077             EventType.RS_ZK_REGION_MERGED, sn);
4078         }
4079       } catch (KeeperException e) {
4080         if (e instanceof NoNodeException) {
4081           String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
4082           LOG.debug("The znode " + znodePath + " does not exist.  May be deleted already.");
4083         } else {
4084           server.abort("Error deleting MERGED node " + encodedName, e);
4085         }
4086       }
4087       LOG.info("Handled MERGED event; merged=" + p.getRegionNameAsString()
4088         + ", region_a=" + hri_a.getRegionNameAsString() + ", region_b="
4089         + hri_b.getRegionNameAsString() + ", on " + sn);
4090 
4091       // User could disable the table before master knows the new region.
4092       if (tableStateManager.isTableState(p.getTable(),
4093           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
4094         unassign(p);
4095       }
4096     }
4097     return true;
4098   }
4099 
4100   /**
4101    * A helper to handle region splitting transition event.
4102    */
4103   private boolean handleRegionSplitting(final RegionTransition rt, final String encodedName,
4104       final String prettyPrintedRegionName, final ServerName sn) {
4105     if (!serverManager.isServerOnline(sn)) {
4106       LOG.warn("Dropped splitting! ServerName=" + sn + " unknown.");
4107       return false;
4108     }
4109     byte [] payloadOfSplitting = rt.getPayload();
4110     List<HRegionInfo> splittingRegions;
4111     try {
4112       splittingRegions = HRegionInfo.parseDelimitedFrom(
4113         payloadOfSplitting, 0, payloadOfSplitting.length);
4114     } catch (IOException e) {
4115       LOG.error("Dropped splitting! Failed reading " + rt.getEventType()
4116         + " payload for " + prettyPrintedRegionName);
4117       return false;
4118     }
4119     assert splittingRegions.size() == 2;
4120     HRegionInfo hri_a = splittingRegions.get(0);
4121     HRegionInfo hri_b = splittingRegions.get(1);
4122 
4123     RegionState rs_p = regionStates.getRegionState(encodedName);
4124     RegionState rs_a = regionStates.getRegionState(hri_a);
4125     RegionState rs_b = regionStates.getRegionState(hri_b);
4126 
4127     if (!((rs_p == null || rs_p.isOpenOrSplittingOnServer(sn))
4128         && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
4129         && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
4130       LOG.warn("Dropped splitting! Not in state good for SPLITTING; rs_p="
4131         + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
4132       return false;
4133     }
4134 
4135     if (rs_p == null) {
4136       // Splitting region should be online
4137       rs_p = regionStates.updateRegionState(rt, State.OPEN);
4138       if (rs_p == null) {
4139         LOG.warn("Received splitting for region " + prettyPrintedRegionName
4140           + " from server " + sn + " but it doesn't exist anymore,"
4141           + " probably already processed its split");
4142         return false;
4143       }
4144       regionStates.regionOnline(rs_p.getRegion(), sn);
4145     }
4146 
4147     HRegionInfo p = rs_p.getRegion();
4148     EventType et = rt.getEventType();
4149     if (et == EventType.RS_ZK_REQUEST_REGION_SPLIT) {
4150       try {
4151         SplitTransactionDetails std =
4152             ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
4153                 .getSplitTransactionCoordination().getDefaultDetails();
4154         if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
4155             .getSplitTransactionCoordination().processTransition(p, hri_a, hri_b, sn, std) == -1) {
4156           byte[] data = ZKAssign.getData(watcher, encodedName);
4157           EventType currentType = null;
4158           if (data != null) {
4159             RegionTransition newRt = RegionTransition.parseFrom(data);
4160             currentType = newRt.getEventType();
4161           }
4162           if (currentType == null
4163               || (currentType != EventType.RS_ZK_REGION_SPLIT && currentType != EventType.RS_ZK_REGION_SPLITTING)) {
4164             LOG.warn("Failed to transition pending_split node " + encodedName
4165                 + " to splitting, it's now " + currentType);
4166             return false;
4167           }
4168         }
4169       } catch (Exception e) {
4170         LOG.warn("Failed to transition pending_split node " + encodedName + " to splitting", e);
4171         return false;
4172       }
4173     }
4174 
4175     synchronized (regionStates) {
4176       splitRegions.put(p, new PairOfSameType<HRegionInfo>(hri_a, hri_b));
4177       regionStates.updateRegionState(hri_a, State.SPLITTING_NEW, sn);
4178       regionStates.updateRegionState(hri_b, State.SPLITTING_NEW, sn);
4179       regionStates.updateRegionState(rt, State.SPLITTING);
4180 
4181       // The below is for testing ONLY!  We can't do fault injection easily, so
4182       // resort to this kinda uglyness -- St.Ack 02/25/2011.
4183       if (TEST_SKIP_SPLIT_HANDLING) {
4184         LOG.warn("Skipping split message, TEST_SKIP_SPLIT_HANDLING is set");
4185         return true; // return true so that the splitting node stays
4186       }
4187 
4188       if (et == EventType.RS_ZK_REGION_SPLIT) {
4189         regionOffline(p, State.SPLIT);
4190         regionOnline(hri_a, sn);
4191         regionOnline(hri_b, sn);
4192         splitRegions.remove(p);
4193       }
4194     }
4195 
4196     if (et == EventType.RS_ZK_REGION_SPLIT) {
4197       // split replicas
4198       doSplittingOfReplicas(rs_p.getRegion(), hri_a, hri_b);
4199       LOG.debug("Handling SPLIT event for " + encodedName + "; deleting node");
4200       // Remove region from ZK
4201       try {
4202         boolean successful = false;
4203         while (!successful) {
4204           // It's possible that the RS tickles in between the reading of the
4205           // znode and the deleting, so it's safe to retry.
4206           successful = ZKAssign.deleteNode(watcher, encodedName,
4207             EventType.RS_ZK_REGION_SPLIT, sn);
4208         }
4209       } catch (KeeperException e) {
4210         if (e instanceof NoNodeException) {
4211           String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
4212           LOG.debug("The znode " + znodePath + " does not exist.  May be deleted already.");
4213         } else {
4214           server.abort("Error deleting SPLIT node " + encodedName, e);
4215         }
4216       }
4217       LOG.info("Handled SPLIT event; parent=" + p.getRegionNameAsString()
4218         + ", daughter a=" + hri_a.getRegionNameAsString() + ", daughter b="
4219         + hri_b.getRegionNameAsString() + ", on " + sn);
4220 
4221       // User could disable the table before master knows the new region.
4222       if (tableStateManager.isTableState(p.getTable(),
4223           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
4224         unassign(hri_a);
4225         unassign(hri_b);
4226       }
4227     }
4228     return true;
4229   }
4230 
4231   private void doMergingOfReplicas(HRegionInfo mergedHri, final HRegionInfo hri_a,
4232       final HRegionInfo hri_b) {
4233     // Close replicas for the original unmerged regions. create/assign new replicas
4234     // for the merged parent.
4235     List<HRegionInfo> unmergedRegions = new ArrayList<HRegionInfo>();
4236     unmergedRegions.add(hri_a);
4237     unmergedRegions.add(hri_b);
4238     Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(unmergedRegions);
4239     Collection<List<HRegionInfo>> c = map.values();
4240     for (List<HRegionInfo> l : c) {
4241       for (HRegionInfo h : l) {
4242         if (!RegionReplicaUtil.isDefaultReplica(h)) {
4243           LOG.debug("Unassigning un-merged replica " + h);
4244           unassign(h);
4245         }
4246       }
4247     }
4248     int numReplicas = 1;
4249     try {
4250       numReplicas = server.getTableDescriptors().get(mergedHri.getTable()).
4251           getRegionReplication();
4252     } catch (IOException e) {
4253       LOG.warn("Couldn't get the replication attribute of the table " + mergedHri.getTable() +
4254           " due to " + e.getMessage() + ". The assignment of replicas for the merged region " +
4255           "will not be done");
4256     }
4257     List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
4258     for (int i = 1; i < numReplicas; i++) {
4259       regions.add(RegionReplicaUtil.getRegionInfoForReplica(mergedHri, i));
4260     }
4261     try {
4262       assign(regions);
4263     } catch (IOException ioe) {
4264       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri + " because of " +
4265                 ioe.getMessage());
4266     } catch (InterruptedException ie) {
4267       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri+ " because of " +
4268                 ie.getMessage());
4269     }
4270   }
4271 
4272   private void doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a,
4273       final HRegionInfo hri_b) {
4274     // create new regions for the replica, and assign them to match with the
4275     // current replica assignments. If replica1 of parent is assigned to RS1,
4276     // the replica1s of daughters will be on the same machine
4277     int numReplicas = 1;
4278     try {
4279       numReplicas = server.getTableDescriptors().get(parentHri.getTable()).
4280           getRegionReplication();
4281     } catch (IOException e) {
4282       LOG.warn("Couldn't get the replication attribute of the table " + parentHri.getTable() +
4283           " due to " + e.getMessage() + ". The assignment of daughter replicas " +
4284           "replicas will not be done");
4285     }
4286     // unassign the old replicas
4287     List<HRegionInfo> parentRegion = new ArrayList<HRegionInfo>();
4288     parentRegion.add(parentHri);
4289     Map<ServerName, List<HRegionInfo>> currentAssign =
4290         regionStates.getRegionAssignments(parentRegion);
4291     Collection<List<HRegionInfo>> c = currentAssign.values();
4292     for (List<HRegionInfo> l : c) {
4293       for (HRegionInfo h : l) {
4294         if (!RegionReplicaUtil.isDefaultReplica(h)) {
4295           LOG.debug("Unassigning parent's replica " + h);
4296           unassign(h);
4297         }
4298       }
4299     }
4300     // assign daughter replicas
4301     Map<HRegionInfo, ServerName> map = new HashMap<HRegionInfo, ServerName>();
4302     for (int i = 1; i < numReplicas; i++) {
4303       prepareDaughterReplicaForAssignment(hri_a, parentHri, i, map);
4304       prepareDaughterReplicaForAssignment(hri_b, parentHri, i, map);
4305     }
4306     try {
4307       assign(map);
4308     } catch (IOException e) {
4309       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
4310     } catch (InterruptedException e) {
4311       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
4312     }
4313   }
4314 
4315   private void prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri,
4316       int replicaId, Map<HRegionInfo, ServerName> map) {
4317     HRegionInfo parentReplica = RegionReplicaUtil.getRegionInfoForReplica(parentHri, replicaId);
4318     HRegionInfo daughterReplica = RegionReplicaUtil.getRegionInfoForReplica(daughterHri,
4319         replicaId);
4320     LOG.debug("Created replica region for daughter " + daughterReplica);
4321     ServerName sn;
4322     if ((sn = regionStates.getRegionServerOfRegion(parentReplica)) != null) {
4323       map.put(daughterReplica, sn);
4324     } else {
4325       List<ServerName> servers = serverManager.getOnlineServersList();
4326       sn = servers.get((new Random(System.currentTimeMillis())).nextInt(servers.size()));
4327       map.put(daughterReplica, sn);
4328     }
4329   }
4330 
4331   public Set<HRegionInfo> getReplicasToClose() {
4332     return replicasToClose;
4333   }
4334 
4335   /**
4336    * A region is offline.  The new state should be the specified one,
4337    * if not null.  If the specified state is null, the new state is Offline.
4338    * The specified state can be Split/Merged/Offline/null only.
4339    */
4340   private void regionOffline(final HRegionInfo regionInfo, final State state) {
4341     regionStates.regionOffline(regionInfo, state);
4342     removeClosedRegion(regionInfo);
4343     // remove the region plan as well just in case.
4344     clearRegionPlan(regionInfo);
4345     balancer.regionOffline(regionInfo);
4346 
4347     // Tell our listeners that a region was closed
4348     sendRegionClosedNotification(regionInfo);
4349     // also note that all the replicas of the primary should be closed
4350     if (state != null && state.equals(State.SPLIT)) {
4351       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
4352       c.add(regionInfo);
4353       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
4354       Collection<List<HRegionInfo>> allReplicas = map.values();
4355       for (List<HRegionInfo> list : allReplicas) {
4356         replicasToClose.addAll(list);
4357       }
4358     }
4359     else if (state != null && state.equals(State.MERGED)) {
4360       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
4361       c.add(regionInfo);
4362       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
4363       Collection<List<HRegionInfo>> allReplicas = map.values();
4364       for (List<HRegionInfo> list : allReplicas) {
4365         replicasToClose.addAll(list);
4366       }
4367     }
4368   }
4369 
4370   private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
4371       final ServerName serverName) {
4372     if (!this.listeners.isEmpty()) {
4373       for (AssignmentListener listener : this.listeners) {
4374         listener.regionOpened(regionInfo, serverName);
4375       }
4376     }
4377   }
4378 
4379   private void sendRegionClosedNotification(final HRegionInfo regionInfo) {
4380     if (!this.listeners.isEmpty()) {
4381       for (AssignmentListener listener : this.listeners) {
4382         listener.regionClosed(regionInfo);
4383       }
4384     }
4385   }
4386 
4387   /**
4388    * Try to update some region states. If the state machine prevents
4389    * such update, an error message is returned to explain the reason.
4390    *
4391    * It's expected that in each transition there should have just one
4392    * region for opening/closing, 3 regions for splitting/merging.
4393    * These regions should be on the server that requested the change.
4394    *
4395    * Region state machine. Only these transitions
4396    * are expected to be triggered by a region server.
4397    *
4398    * On the state transition:
4399    *  (1) Open/Close should be initiated by master
4400    *      (a) Master sets the region to pending_open/pending_close
4401    *        in memory and hbase:meta after sending the request
4402    *        to the region server
4403    *      (b) Region server reports back to the master
4404    *        after open/close is done (either success/failure)
4405    *      (c) If region server has problem to report the status
4406    *        to master, it must be because the master is down or some
4407    *        temporary network issue. Otherwise, the region server should
4408    *        abort since it must be a bug. If the master is not accessible,
4409    *        the region server should keep trying until the server is
4410    *        stopped or till the status is reported to the (new) master
4411    *      (d) If region server dies in the middle of opening/closing
4412    *        a region, SSH picks it up and finishes it
4413    *      (e) If master dies in the middle, the new master recovers
4414    *        the state during initialization from hbase:meta. Region server
4415    *        can report any transition that has not been reported to
4416    *        the previous active master yet
4417    *  (2) Split/merge is initiated by region servers
4418    *      (a) To split a region, a region server sends a request
4419    *        to master to try to set a region to splitting, together with
4420    *        two daughters (to be created) to splitting new. If approved
4421    *        by the master, the splitting can then move ahead
4422    *      (b) To merge two regions, a region server sends a request to
4423    *        master to try to set the new merged region (to be created) to
4424    *        merging_new, together with two regions (to be merged) to merging.
4425    *        If it is ok with the master, the merge can then move ahead
4426    *      (c) Once the splitting/merging is done, the region server
4427    *        reports the status back to the master either success/failure.
4428    *      (d) Other scenarios should be handled similarly as for
4429    *        region open/close
4430    */
4431   protected String onRegionTransition(final ServerName serverName,
4432       final RegionStateTransition transition) {
4433     TransitionCode code = transition.getTransitionCode();
4434     HRegionInfo hri = HRegionInfo.convert(transition.getRegionInfo(0));
4435     RegionState current = regionStates.getRegionState(hri);
4436     if (LOG.isDebugEnabled()) {
4437       LOG.debug("Got transition " + code + " for "
4438         + (current != null ? current.toString() : hri.getShortNameToLog())
4439         + " from " + serverName);
4440     }
4441     String errorMsg = null;
4442     switch (code) {
4443     case OPENED:
4444       if (current != null && current.isOpened() && current.isOnServer(serverName)) {
4445         LOG.info("Region " + hri.getShortNameToLog() + " is already " + current.getState() + " on "
4446             + serverName);
4447         break;
4448       }
4449     case FAILED_OPEN:
4450       if (current == null
4451           || !current.isPendingOpenOrOpeningOnServer(serverName)) {
4452         errorMsg = hri.getShortNameToLog()
4453           + " is not pending open on " + serverName;
4454       } else if (code == TransitionCode.FAILED_OPEN) {
4455         onRegionFailedOpen(hri, serverName);
4456       } else {
4457         long openSeqNum = HConstants.NO_SEQNUM;
4458         if (transition.hasOpenSeqNum()) {
4459           openSeqNum = transition.getOpenSeqNum();
4460         }
4461         if (openSeqNum < 0) {
4462           errorMsg = "Newly opened region has invalid open seq num " + openSeqNum;
4463         } else {
4464           onRegionOpen(hri, serverName, openSeqNum);
4465         }
4466       }
4467       break;
4468 
4469     case CLOSED:
4470       if (current == null
4471           || !current.isPendingCloseOrClosingOnServer(serverName)) {
4472         errorMsg = hri.getShortNameToLog()
4473           + " is not pending close on " + serverName;
4474       } else {
4475         onRegionClosed(hri);
4476       }
4477       break;
4478 
4479     case READY_TO_SPLIT:
4480       try {
4481         regionStateListener.onRegionSplit(hri);
4482         if (!((HMaster)server).getSplitOrMergeTracker().isSplitOrMergeEnabled(
4483                 Admin.MasterSwitchType.SPLIT)) {
4484           errorMsg = "split switch is off!";
4485         }
4486       } catch (IOException exp) {
4487         errorMsg = StringUtils.stringifyException(exp);
4488       }
4489       break;
4490     case SPLIT_PONR:
4491     case SPLIT:
4492     case SPLIT_REVERTED:
4493       errorMsg =
4494           onRegionSplit(serverName, code, hri, HRegionInfo.convert(transition.getRegionInfo(1)),
4495             HRegionInfo.convert(transition.getRegionInfo(2)));
4496       if (org.apache.commons.lang.StringUtils.isEmpty(errorMsg)) {
4497         try {
4498           regionStateListener.onRegionSplitReverted(hri);
4499         } catch (IOException exp) {
4500           LOG.warn(StringUtils.stringifyException(exp));
4501         }
4502       }
4503       break;
4504     case READY_TO_MERGE:
4505       if (!((HMaster)server).getSplitOrMergeTracker().isSplitOrMergeEnabled(
4506               Admin.MasterSwitchType.MERGE)) {
4507         errorMsg = "merge switch is off!";
4508       }
4509       break;
4510     case MERGE_PONR:
4511     case MERGED:
4512       errorMsg = onRegionMerge(serverName, code, hri,
4513         HRegionInfo.convert(transition.getRegionInfo(1)),
4514         HRegionInfo.convert(transition.getRegionInfo(2)));
4515       if (code == TransitionCode.MERGED && org.apache.commons.lang.StringUtils.isEmpty(errorMsg)) {
4516         try {
4517           regionStateListener.onRegionMerged(hri);
4518         } catch (IOException exp) {
4519           errorMsg = StringUtils.stringifyException(exp);
4520         }
4521       }
4522       break;
4523     case MERGE_REVERTED:
4524         errorMsg = onRegionMergeReverted(serverName, code, hri,
4525                 HRegionInfo.convert(transition.getRegionInfo(1)),
4526                 HRegionInfo.convert(transition.getRegionInfo(2)));
4527       break;
4528 
4529     default:
4530       errorMsg = "Unexpected transition code " + code;
4531     }
4532     if (errorMsg != null) {
4533       LOG.error("Failed to transtion region from " + current + " to "
4534         + code + " by " + serverName + ": " + errorMsg);
4535     }
4536     return errorMsg;
4537   }
4538 
4539   private void processBogusAssignments(Map<ServerName, List<HRegionInfo>> bulkPlan) {
4540     if (bulkPlan.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) {
4541       // Found no plan for some regions, put those regions in RIT
4542       for (HRegionInfo hri : bulkPlan.get(LoadBalancer.BOGUS_SERVER_NAME)) {
4543         regionStates.updateRegionState(hri, State.FAILED_OPEN);
4544       }
4545       bulkPlan.remove(LoadBalancer.BOGUS_SERVER_NAME);
4546     }
4547   }
4548 
4549   /**
4550    * @return Instance of load balancer
4551    */
4552   public LoadBalancer getBalancer() {
4553     return this.balancer;
4554   }
4555 
4556   public Map<ServerName, List<HRegionInfo>>
4557     getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
4558     return getRegionStates().getRegionAssignments(infos);
4559   }
4560 
4561   void setRegionStateListener(RegionStateListener listener) {
4562     this.regionStateListener = listener;
4563   }
4564 }