View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import com.google.common.annotations.VisibleForTesting;
22  import com.google.common.cache.Cache;
23  import com.google.common.cache.CacheBuilder;
24  import com.google.common.collect.Lists;
25  import com.google.common.collect.Maps;
26  import com.google.protobuf.Descriptors;
27  import com.google.protobuf.Service;
28  
29  import java.io.IOException;
30  import java.io.InterruptedIOException;
31  import java.lang.reflect.Constructor;
32  import java.lang.reflect.InvocationTargetException;
33  import java.net.InetAddress;
34  import java.net.InetSocketAddress;
35  import java.net.UnknownHostException;
36  import java.util.ArrayList;
37  import java.util.Collection;
38  import java.util.Collections;
39  import java.util.Comparator;
40  import java.util.HashSet;
41  import java.util.Iterator;
42  import java.util.List;
43  import java.util.Map;
44  import java.util.Map.Entry;
45  import java.util.Set;
46  import java.util.concurrent.CountDownLatch;
47  import java.util.concurrent.TimeUnit;
48  import java.util.concurrent.atomic.AtomicInteger;
49  import java.util.concurrent.atomic.AtomicReference;
50  import java.util.regex.Pattern;
51  
52  import javax.servlet.ServletException;
53  import javax.servlet.http.HttpServlet;
54  import javax.servlet.http.HttpServletRequest;
55  import javax.servlet.http.HttpServletResponse;
56  
57  import org.apache.commons.logging.Log;
58  import org.apache.commons.logging.LogFactory;
59  import org.apache.hadoop.conf.Configuration;
60  import org.apache.hadoop.fs.FileSystem;
61  import org.apache.hadoop.fs.Path;
62  import org.apache.hadoop.hbase.ClusterStatus;
63  import org.apache.hadoop.hbase.CoordinatedStateException;
64  import org.apache.hadoop.hbase.CoordinatedStateManager;
65  import org.apache.hadoop.hbase.DoNotRetryIOException;
66  import org.apache.hadoop.hbase.HBaseIOException;
67  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
68  import org.apache.hadoop.hbase.HColumnDescriptor;
69  import org.apache.hadoop.hbase.HConstants;
70  import org.apache.hadoop.hbase.HRegionInfo;
71  import org.apache.hadoop.hbase.HTableDescriptor;
72  import org.apache.hadoop.hbase.MasterNotRunningException;
73  import org.apache.hadoop.hbase.MetaMigrationConvertingToPB;
74  import org.apache.hadoop.hbase.MetaTableAccessor;
75  import org.apache.hadoop.hbase.NamespaceDescriptor;
76  import org.apache.hadoop.hbase.NamespaceNotFoundException;
77  import org.apache.hadoop.hbase.PleaseHoldException;
78  import org.apache.hadoop.hbase.ProcedureInfo;
79  import org.apache.hadoop.hbase.ScheduledChore;
80  import org.apache.hadoop.hbase.Server;
81  import org.apache.hadoop.hbase.ServerLoad;
82  import org.apache.hadoop.hbase.ServerName;
83  import org.apache.hadoop.hbase.TableDescriptors;
84  import org.apache.hadoop.hbase.TableName;
85  import org.apache.hadoop.hbase.TableNotDisabledException;
86  import org.apache.hadoop.hbase.TableNotFoundException;
87  import org.apache.hadoop.hbase.TableStateManager;
88  import org.apache.hadoop.hbase.UnknownRegionException;
89  import org.apache.hadoop.hbase.backup.BackupType;
90  import org.apache.hadoop.hbase.backup.HBackupFileSystem;
91  import org.apache.hadoop.hbase.backup.impl.BackupManager;
92  import org.apache.hadoop.hbase.backup.impl.BackupRestoreConstants;
93  import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
94  import org.apache.hadoop.hbase.backup.master.FullTableBackupProcedure;
95  import org.apache.hadoop.hbase.backup.master.IncrementalTableBackupProcedure;
96  import org.apache.hadoop.hbase.classification.InterfaceAudience;
97  import org.apache.hadoop.hbase.client.Admin;
98  import org.apache.hadoop.hbase.client.MetaScanner;
99  import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
100 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
101 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
102 import org.apache.hadoop.hbase.client.Result;
103 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
104 import org.apache.hadoop.hbase.exceptions.DeserializationException;
105 import org.apache.hadoop.hbase.executor.ExecutorType;
106 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
107 import org.apache.hadoop.hbase.ipc.RpcServer;
108 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
109 import org.apache.hadoop.hbase.master.MasterRpcServices.BalanceSwitchMode;
110 import org.apache.hadoop.hbase.master.RegionState.State;
111 import org.apache.hadoop.hbase.master.balancer.BalancerChore;
112 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
113 import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
114 import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
115 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
116 import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
117 import org.apache.hadoop.hbase.master.handler.CreateTableHandler;
118 import org.apache.hadoop.hbase.master.handler.DeleteTableHandler;
119 import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
120 import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler;
121 import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
122 import org.apache.hadoop.hbase.master.handler.ModifyTableHandler;
123 import org.apache.hadoop.hbase.master.handler.TableAddFamilyHandler;
124 import org.apache.hadoop.hbase.master.handler.TableDeleteFamilyHandler;
125 import org.apache.hadoop.hbase.master.handler.TableModifyFamilyHandler;
126 import org.apache.hadoop.hbase.master.handler.TruncateTableHandler;
127 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
128 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
129 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
130 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
131 import org.apache.hadoop.hbase.master.procedure.AddColumnFamilyProcedure;
132 import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
133 import org.apache.hadoop.hbase.master.procedure.DeleteColumnFamilyProcedure;
134 import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
135 import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
136 import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
137 import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
138 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
139 import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure;
140 import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
141 import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
142 import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
143 import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
144 import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
145 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
146 import org.apache.hadoop.hbase.mob.MobConstants;
147 import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
148 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
149 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
150 import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
151 import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
152 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
153 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
154 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
155 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
156 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
157 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.SpaceViolationPolicy;
158 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
159 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
160 import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
161 import org.apache.hadoop.hbase.quotas.MasterSpaceQuotaObserver;
162 import org.apache.hadoop.hbase.quotas.QuotaObserverChore;
163 import org.apache.hadoop.hbase.quotas.QuotaUtil;
164 import org.apache.hadoop.hbase.quotas.RegionStateListener;
165 import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshotNotifier;
166 import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshotNotifierFactory;
167 import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
168 import org.apache.hadoop.hbase.regionserver.HRegionServer;
169 import org.apache.hadoop.hbase.regionserver.HStore;
170 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
171 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
172 import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
173 import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy;
174 import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy;
175 import org.apache.hadoop.hbase.replication.regionserver.Replication;
176 import org.apache.hadoop.hbase.security.AccessDeniedException;
177 import org.apache.hadoop.hbase.security.UserProvider;
178 import org.apache.hadoop.hbase.util.Addressing;
179 import org.apache.hadoop.hbase.util.Bytes;
180 import org.apache.hadoop.hbase.util.CompressionTest;
181 import org.apache.hadoop.hbase.util.ConfigUtil;
182 import org.apache.hadoop.hbase.util.EncryptionTest;
183 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
184 import org.apache.hadoop.hbase.util.FSUtils;
185 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
186 import org.apache.hadoop.hbase.util.HasThread;
187 import org.apache.hadoop.hbase.util.IdLock;
188 import org.apache.hadoop.hbase.util.ModifyRegionUtils;
189 import org.apache.hadoop.hbase.util.Pair;
190 import org.apache.hadoop.hbase.util.Threads;
191 import org.apache.hadoop.hbase.util.VersionInfo;
192 import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker;
193 import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
194 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
195 import org.apache.hadoop.hbase.zookeeper.MasterMaintenanceModeTracker;
196 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
197 import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
198 import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
199 import org.apache.hadoop.hbase.zookeeper.SplitOrMergeTracker;
200 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
201 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
202 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
203 import org.apache.zookeeper.KeeperException;
204 import org.mortbay.jetty.Connector;
205 import org.mortbay.jetty.nio.SelectChannelConnector;
206 import org.mortbay.jetty.servlet.Context;
207 
208 /**
209  * HMaster is the "master server" for HBase. An HBase cluster has one active
210  * master.  If many masters are started, all compete.  Whichever wins goes on to
211  * run the cluster.  All others park themselves in their constructor until
212  * master or cluster shutdown or until the active master loses its lease in
213  * zookeeper.  Thereafter, all running master jostle to take over master role.
214  *
215  * <p>The Master can be asked shutdown the cluster. See {@link #shutdown()}.  In
216  * this case it will tell all regionservers to go down and then wait on them
217  * all reporting in that they are down.  This master will then shut itself down.
218  *
219  * <p>You can also shutdown just this master.  Call {@link #stopMaster()}.
220  *
221  * @see org.apache.zookeeper.Watcher
222  */
223 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
224 @SuppressWarnings("deprecation")
225 public class HMaster extends HRegionServer implements MasterServices, Server {
226   private static final Log LOG = LogFactory.getLog(HMaster.class.getName());
227 
228   /**
229    * Protection against zombie master. Started once Master accepts active responsibility and
230    * starts taking over responsibilities. Allows a finite time window before giving up ownership.
231    */
232   private static class InitializationMonitor extends HasThread {
233     /** The amount of time in milliseconds to sleep before checking initialization status. */
234     public static final String TIMEOUT_KEY = "hbase.master.initializationmonitor.timeout";
235     public static final long TIMEOUT_DEFAULT = TimeUnit.MILLISECONDS.convert(15, TimeUnit.MINUTES);
236 
237     /**
238      * When timeout expired and initialization has not complete, call {@link System#exit(int)} when
239      * true, do nothing otherwise.
240      */
241     public static final String HALT_KEY = "hbase.master.initializationmonitor.haltontimeout";
242     public static final boolean HALT_DEFAULT = false;
243 
244     private final HMaster master;
245     private final long timeout;
246     private final boolean haltOnTimeout;
247 
248     /** Creates a Thread that monitors the {@link #isInitialized()} state. */
249     InitializationMonitor(HMaster master) {
250       super("MasterInitializationMonitor");
251       this.master = master;
252       this.timeout = master.getConfiguration().getLong(TIMEOUT_KEY, TIMEOUT_DEFAULT);
253       this.haltOnTimeout = master.getConfiguration().getBoolean(HALT_KEY, HALT_DEFAULT);
254       this.setDaemon(true);
255     }
256 
257     @Override
258     public void run() {
259       try {
260         while (!master.isStopped() && master.isActiveMaster()) {
261           Thread.sleep(timeout);
262           if (master.isInitialized()) {
263             LOG.debug("Initialization completed within allotted tolerance. Monitor exiting.");
264           } else {
265             LOG.error("Master failed to complete initialization after " + timeout + "ms. Please"
266                 + " consider submitting a bug report including a thread dump of this process.");
267             if (haltOnTimeout) {
268               LOG.error("Zombie Master exiting. Thread dump to stdout");
269               Threads.printThreadInfo(System.out, "Zombie HMaster");
270               System.exit(-1);
271             }
272           }
273         }
274       } catch (InterruptedException ie) {
275         LOG.trace("InitMonitor thread interrupted. Existing.");
276       }
277     }
278   }
279 
280   // MASTER is name of the webapp and the attribute name used stuffing this
281   //instance into web context.
282   public static final String MASTER = "master";
283 
284   // Manager and zk listener for master election
285   private final ActiveMasterManager activeMasterManager;
286   // Region server tracker
287   RegionServerTracker regionServerTracker;
288   // Draining region server tracker
289   private DrainingServerTracker drainingServerTracker;
290   // Tracker for load balancer state
291   LoadBalancerTracker loadBalancerTracker;
292 
293   // Tracker for split and merge state
294   private SplitOrMergeTracker splitOrMergeTracker;
295 
296   // Tracker for region normalizer state
297   private RegionNormalizerTracker regionNormalizerTracker;
298 
299   /** Namespace stuff */
300   private TableNamespaceManager tableNamespaceManager;
301 
302   //Tracker for master maintenance mode setting
303   private MasterMaintenanceModeTracker maintenanceModeTracker;
304 
305   // Metrics for the HMaster
306   final MetricsMaster metricsMaster;
307   // file system manager for the master FS operations
308   private MasterFileSystem fileSystemManager;
309 
310   // server manager to deal with region server info
311   volatile ServerManager serverManager;
312 
313   // manager of assignment nodes in zookeeper
314   AssignmentManager assignmentManager;
315 
316   // buffer for "fatal error" notices from region servers
317   // in the cluster. This is only used for assisting
318   // operations/debugging.
319   MemoryBoundedLogMessageBuffer rsFatals;
320 
321   // flag set after we become the active master (used for testing)
322   private volatile boolean isActiveMaster = false;
323 
324   // flag set after we complete initialization once active,
325   // it is not private since it's used in unit tests
326   volatile boolean initialized = false;
327 
328   // flag set after master services are started,
329   // initialization may have not completed yet.
330   volatile boolean serviceStarted = false;
331 
332   // flag set after we complete assignMeta.
333   private volatile boolean serverShutdownHandlerEnabled = false;
334 
335   LoadBalancer balancer;
336   private RegionNormalizer normalizer;
337   private BalancerChore balancerChore;
338   private RegionNormalizerChore normalizerChore;
339   private ClusterStatusChore clusterStatusChore;
340   private ClusterStatusPublisher clusterStatusPublisherChore = null;
341   private PeriodicDoMetrics periodicDoMetricsChore = null;
342 
343   CatalogJanitor catalogJanitorChore;
344   private LogCleaner logCleaner;
345   private HFileCleaner hfileCleaner;
346   private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
347   private MobCompactionChore mobCompactChore;
348   private MasterMobCompactionThread mobCompactThread;
349   // used to synchronize the mobCompactionStates
350   private final IdLock mobCompactionLock = new IdLock();
351   // save the information of mob compactions in tables.
352   // the key is table name, the value is the number of compactions in that table.
353   private Map<TableName, AtomicInteger> mobCompactionStates = Maps.newConcurrentMap();
354 
355   MasterCoprocessorHost cpHost;
356 
357   private final boolean preLoadTableDescriptors;
358 
359   // Time stamps for when a hmaster became active
360   private long masterActiveTime;
361 
362   //should we check the compression codec type at master side, default true, HBASE-6370
363   private final boolean masterCheckCompression;
364 
365   //should we check encryption settings at master side, default true
366   private final boolean masterCheckEncryption;
367 
368   // This is for fallback to use the code from 1.0 release.
369   private enum ProcedureConf {
370     PROCEDURE_ENABLED, // default
371     HANDLER_USED, // handler code executed in DDL, procedure executor still start
372     PROCEDURE_FULLY_DISABLED, // procedure fully disabled
373   }
374   private final ProcedureConf procedureConf;
375 
376   Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
377 
378   // monitor for snapshot of hbase tables
379   SnapshotManager snapshotManager;
380   // monitor for distributed procedures
381   private MasterProcedureManagerHost mpmHost;
382 
383   // it is assigned after 'initialized' guard set to true, so should be volatile
384   private volatile MasterQuotaManager quotaManager;
385   private SpaceQuotaSnapshotNotifier spaceQuotaSnapshotNotifier;
386   private QuotaObserverChore quotaObserverChore;
387 
388   private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
389   private WALProcedureStore procedureStore;
390 
391   /** flag used in test cases in order to simulate RS failures during master initialization */
392   private volatile boolean initializationBeforeMetaAssignment = false;
393 
394   /** jetty server for master to redirect requests to regionserver infoServer */
395   private org.mortbay.jetty.Server masterJettyServer;
396 
397   public static class RedirectServlet extends HttpServlet {
398     private static final long serialVersionUID = 2894774810058302472L;
399     private static int regionServerInfoPort;
400 
401     @Override
402     public void doGet(HttpServletRequest request,
403         HttpServletResponse response) throws ServletException, IOException {
404       String redirectUrl = request.getScheme() + "://"
405         + request.getServerName() + ":" + regionServerInfoPort
406         + request.getRequestURI();
407       response.sendRedirect(redirectUrl);
408     }
409   }
410 
411   private static class PeriodicDoMetrics extends ScheduledChore {
412     private final HMaster server;
413     public PeriodicDoMetrics(int doMetricsInterval, final HMaster server) {
414       super(server.getServerName() + "-DoMetricsChore", server, doMetricsInterval);
415       this.server = server;
416     }
417 
418     @Override
419     protected void chore() {
420       server.doMetrics();
421     }
422   }
423 
424   /**
425    * Initializes the HMaster. The steps are as follows:
426    * <p>
427    * <ol>
428    * <li>Initialize the local HRegionServer
429    * <li>Start the ActiveMasterManager.
430    * </ol>
431    * <p>
432    * Remaining steps of initialization occur in
433    * #finishActiveMasterInitialization(MonitoredTask) after
434    * the master becomes the active one.
435    *
436    * @throws InterruptedException
437    * @throws KeeperException
438    * @throws IOException
439    */
440   public HMaster(final Configuration conf, CoordinatedStateManager csm)
441       throws IOException, KeeperException, InterruptedException {
442     super(conf, csm);
443     this.rsFatals = new MemoryBoundedLogMessageBuffer(
444       conf.getLong("hbase.master.buffer.for.rs.fatals", 1*1024*1024));
445 
446     LOG.info("hbase.rootdir=" + getRootDir() +
447       ", hbase.cluster.distributed=" + this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false));
448 
449     // Disable usage of meta replicas in the master
450     this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
451 
452     Replication.decorateMasterConfiguration(this.conf);
453     BackupManager.decorateMasterConfiguration(this.conf);
454 
455     // Hack! Maps DFSClient => Master for logs.  HDFS made this
456     // config param for task trackers, but we can piggyback off of it.
457     if (this.conf.get("mapreduce.task.attempt.id") == null) {
458       this.conf.set("mapreduce.task.attempt.id", "hb_m_" + this.serverName.toString());
459     }
460 
461     // should we check the compression codec type at master side, default true, HBASE-6370
462     this.masterCheckCompression = conf.getBoolean("hbase.master.check.compression", true);
463 
464     // should we check encryption settings at master side, default true
465     this.masterCheckEncryption = conf.getBoolean("hbase.master.check.encryption", true);
466 
467     this.metricsMaster = new MetricsMaster(new MetricsMasterWrapperImpl(this));
468 
469     // Check configuration to see whether procedure is disabled (not execute at all),
470     // unused (not used to execute DDL, but executor starts to complete unfinished operations
471     // in procedure store, or enabled (default behavior).
472     String procedureConfString = conf.get("hbase.master.procedure.tableddl", "enabled");
473     if (procedureConfString.equalsIgnoreCase("disabled")) {
474       LOG.info("Master will use handler for new table DDL"
475         + " and all unfinished table DDLs in procedure store will be disgarded.");
476       this.procedureConf = ProcedureConf.PROCEDURE_FULLY_DISABLED;
477     } else if (procedureConfString.equalsIgnoreCase("unused")) {
478       LOG.info("Master will use handler for new table DDL"
479         + " and all unfinished table DDLs in procedure store will continue to execute.");
480       this.procedureConf = ProcedureConf.HANDLER_USED;
481     } else {
482       this.procedureConf = ProcedureConf.PROCEDURE_ENABLED;
483     }
484     // preload table descriptor at startup
485     this.preLoadTableDescriptors = conf.getBoolean("hbase.master.preload.tabledescriptors", true);
486 
487     // Do we publish the status?
488 
489     boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED,
490         HConstants.STATUS_PUBLISHED_DEFAULT);
491     Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
492         conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
493             ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS,
494             ClusterStatusPublisher.Publisher.class);
495 
496     if (shouldPublish) {
497       if (publisherClass == null) {
498         LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
499             ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS +
500             " is not set - not publishing status");
501       } else {
502         clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
503         getChoreService().scheduleChore(clusterStatusPublisherChore);
504       }
505     }
506 
507     // Some unit tests don't need a cluster, so no zookeeper at all
508     if (!conf.getBoolean("hbase.testing.nocluster", false)) {
509       setInitLatch(new CountDownLatch(1));
510       activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this);
511       int infoPort = putUpJettyServer();
512       startActiveMasterManager(infoPort);
513     } else {
514       activeMasterManager = null;
515     }
516   }
517 
518   // return the actual infoPort, -1 means disable info server.
519   private int putUpJettyServer() throws IOException {
520     if (!conf.getBoolean("hbase.master.infoserver.redirect", true)) {
521       return -1;
522     }
523     int infoPort = conf.getInt("hbase.master.info.port.orig",
524       HConstants.DEFAULT_MASTER_INFOPORT);
525     // -1 is for disabling info server, so no redirecting
526     if (infoPort < 0 || infoServer == null) {
527       return -1;
528     }
529     String addr = conf.get("hbase.master.info.bindAddress", "0.0.0.0");
530     if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
531       String msg =
532           "Failed to start redirecting jetty server. Address " + addr
533               + " does not belong to this host. Correct configuration parameter: "
534               + "hbase.master.info.bindAddress";
535       LOG.error(msg);
536       throw new IOException(msg);
537     }
538 
539     RedirectServlet.regionServerInfoPort = infoServer.getPort();
540     if(RedirectServlet.regionServerInfoPort == infoPort) {
541       return infoPort;
542     }
543     masterJettyServer = new org.mortbay.jetty.Server();
544     Connector connector = new SelectChannelConnector();
545     connector.setHost(addr);
546     connector.setPort(infoPort);
547     masterJettyServer.addConnector(connector);
548     masterJettyServer.setStopAtShutdown(true);
549     Context context = new Context(masterJettyServer, "/", Context.NO_SESSIONS);
550     context.addServlet(RedirectServlet.class, "/*");
551     try {
552       masterJettyServer.start();
553     } catch (Exception e) {
554       throw new IOException("Failed to start redirecting jetty server", e);
555     }
556     return connector.getLocalPort();
557   }
558 
559   /**
560    * For compatibility, if failed with regionserver credentials, try the master one
561    */
562   @Override
563   protected void login(UserProvider user, String host) throws IOException {
564     try {
565       super.login(user, host);
566     } catch (IOException ie) {
567       user.login("hbase.master.keytab.file",
568         "hbase.master.kerberos.principal", host);
569     }
570   }
571 
572   /**
573    * If configured to put regions on active master,
574    * wait till a backup master becomes active.
575    * Otherwise, loop till the server is stopped or aborted.
576    */
577   @Override
578   protected void waitForMasterActive(){
579     boolean tablesOnMaster = BaseLoadBalancer.tablesOnMaster(conf);
580     while (!(tablesOnMaster && isActiveMaster)
581         && !isStopped() && !isAborted()) {
582       sleeper.sleep();
583     }
584   }
585 
586   @VisibleForTesting
587   public MasterRpcServices getMasterRpcServices() {
588     return (MasterRpcServices)rpcServices;
589   }
590 
591   public boolean balanceSwitch(final boolean b) throws IOException {
592     return getMasterRpcServices().switchBalancer(b, BalanceSwitchMode.ASYNC);
593   }
594 
595   @Override
596   protected String getProcessName() {
597     return MASTER;
598   }
599 
600   @Override
601   protected boolean canCreateBaseZNode() {
602     return true;
603   }
604 
605   @Override
606   protected boolean canUpdateTableDescriptor() {
607     return true;
608   }
609 
610   @Override
611   protected RSRpcServices createRpcServices() throws IOException {
612     return new MasterRpcServices(this);
613   }
614 
615   @Override
616   protected void configureInfoServer() {
617     infoServer.addServlet("master-status", "/master-status", MasterStatusServlet.class);
618     infoServer.setAttribute(MASTER, this);
619     if (BaseLoadBalancer.tablesOnMaster(conf)) {
620       super.configureInfoServer();
621     }
622   }
623 
624   @Override
625   protected Class<? extends HttpServlet> getDumpServlet() {
626     return MasterDumpServlet.class;
627   }
628 
629   /**
630    * Emit the HMaster metrics, such as region in transition metrics.
631    * Surrounding in a try block just to be sure metrics doesn't abort HMaster.
632    */
633   private void doMetrics() {
634     try {
635       if (assignmentManager != null) {
636         assignmentManager.updateRegionsInTransitionMetrics();
637       }
638     } catch (Throwable e) {
639       LOG.error("Couldn't update metrics: " + e.getMessage());
640     }
641   }
642 
643   MetricsMaster getMasterMetrics() {
644     return metricsMaster;
645   }
646 
647   /**
648    * Initialize all ZK based system trackers.
649    * @throws IOException
650    * @throws InterruptedException
651    * @throws KeeperException
652    * @throws CoordinatedStateException
653    */
654   void initializeZKBasedSystemTrackers() throws IOException,
655       InterruptedException, KeeperException, CoordinatedStateException {
656     this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
657     this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
658     this.normalizer.setMasterServices(this);
659     this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
660     this.loadBalancerTracker.start();
661 
662     this.regionNormalizerTracker = new RegionNormalizerTracker(zooKeeper, this);
663     this.regionNormalizerTracker.start();
664 
665     this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);
666     this.splitOrMergeTracker.start();
667 
668     this.assignmentManager = new AssignmentManager(this, serverManager,
669       this.balancer, this.service, this.metricsMaster,
670       this.tableLockManager);
671     zooKeeper.registerListenerFirst(assignmentManager);
672 
673     this.regionServerTracker = new RegionServerTracker(zooKeeper, this,
674         this.serverManager);
675     this.regionServerTracker.start();
676 
677     this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this,
678       this.serverManager);
679     this.drainingServerTracker.start();
680 
681     this.maintenanceModeTracker = new MasterMaintenanceModeTracker(zooKeeper);
682     this.maintenanceModeTracker.start();
683 
684     // Set the cluster as up.  If new RSs, they'll be waiting on this before
685     // going ahead with their startup.
686     boolean wasUp = this.clusterStatusTracker.isClusterUp();
687     if (!wasUp) this.clusterStatusTracker.setClusterUp();
688 
689     LOG.info("Server active/primary master=" + this.serverName +
690         ", sessionid=0x" +
691         Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
692         ", setting cluster-up flag (Was=" + wasUp + ")");
693 
694     // create/initialize the snapshot manager and other procedure managers
695     this.snapshotManager = new SnapshotManager();
696     this.mpmHost = new MasterProcedureManagerHost();
697     this.mpmHost.register(this.snapshotManager);
698     this.mpmHost.register(new MasterFlushTableProcedureManager());
699     this.mpmHost.loadProcedures(conf);
700     this.mpmHost.initialize(this, this.metricsMaster);
701   }
702 
703   /**
704    * Finish initialization of HMaster after becoming the primary master.
705    *
706    * <ol>
707    * <li>Initialize master components - file system manager, server manager,
708    *     assignment manager, region server tracker, etc</li>
709    * <li>Start necessary service threads - balancer, catalog janior,
710    *     executor services, etc</li>
711    * <li>Set cluster as UP in ZooKeeper</li>
712    * <li>Wait for RegionServers to check-in</li>
713    * <li>Split logs and perform data recovery, if necessary</li>
714    * <li>Ensure assignment of meta/namespace regions<li>
715    * <li>Handle either fresh cluster start or master failover</li>
716    * </ol>
717    *
718    * @throws IOException
719    * @throws InterruptedException
720    * @throws KeeperException
721    * @throws CoordinatedStateException
722    */
723   private void finishActiveMasterInitialization(MonitoredTask status)
724       throws IOException, InterruptedException, KeeperException, CoordinatedStateException {
725 
726     isActiveMaster = true;
727     Thread zombieDetector = new Thread(new InitializationMonitor(this));
728     zombieDetector.start();
729 
730     /*
731      * We are active master now... go initialize components we need to run.
732      * Note, there may be dross in zk from previous runs; it'll get addressed
733      * below after we determine if cluster startup or failover.
734      */
735 
736     status.setStatus("Initializing Master file system");
737 
738     this.masterActiveTime = System.currentTimeMillis();
739     // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
740     this.fileSystemManager = new MasterFileSystem(this, this);
741 
742     // enable table descriptors cache
743     this.tableDescriptors.setCacheOn();
744     // set the META's descriptor to the correct replication
745     this.tableDescriptors.get(TableName.META_TABLE_NAME).setRegionReplication(
746         conf.getInt(HConstants.META_REPLICAS_NUM, HConstants.DEFAULT_META_REPLICA_NUM));
747     // warm-up HTDs cache on master initialization
748     if (preLoadTableDescriptors) {
749       status.setStatus("Pre-loading table descriptors");
750       this.tableDescriptors.getAll();
751     }
752 
753     // publish cluster ID
754     status.setStatus("Publishing Cluster ID in ZooKeeper");
755     ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
756     this.initLatch.countDown();
757     this.serverManager = createServerManager(this, this);
758 
759     setupClusterConnection();
760 
761     // Invalidate all write locks held previously
762     this.tableLockManager.reapWriteLocks();
763 
764     status.setStatus("Initializing ZK system trackers");
765     initializeZKBasedSystemTrackers();
766 
767     // Add the Observer to delete space quotas on table deletion before starting all CPs by
768     // default with quota support, avoiding if user specifically asks to not load this Observer.
769     if (QuotaUtil.isQuotaEnabled(conf)) {
770       updateConfigurationForSpaceQuotaObserver(conf);
771     }
772     // initialize master side coprocessors before we start handling requests
773     status.setStatus("Initializing master coprocessors");
774     this.cpHost = new MasterCoprocessorHost(this, this.conf);
775 
776     // start up all service threads.
777     status.setStatus("Initializing master service threads");
778     startServiceThreads();
779 
780     // Wake up this server to check in
781     sleeper.skipSleepCycle();
782 
783     // Wait for region servers to report in
784     this.serverManager.waitForRegionServers(status);
785     // Check zk for region servers that are up but didn't register
786     for (ServerName sn: this.regionServerTracker.getOnlineServers()) {
787       // The isServerOnline check is opportunistic, correctness is handled inside
788       if (!this.serverManager.isServerOnline(sn)
789           && serverManager.checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
790         LOG.info("Registered server found up in zk but who has not yet reported in: " + sn);
791       }
792     }
793 
794     // get a list for previously failed RS which need log splitting work
795     // we recover hbase:meta region servers inside master initialization and
796     // handle other failed servers in SSH in order to start up master node ASAP
797     Set<ServerName> previouslyFailedServers = this.fileSystemManager
798         .getFailedServersFromLogFolders();
799 
800     // remove stale recovering regions from previous run
801     this.fileSystemManager.removeStaleRecoveringRegionsFromZK(previouslyFailedServers);
802 
803     // log splitting for hbase:meta server
804     ServerName oldMetaServerLocation = metaTableLocator.getMetaRegionLocation(this.getZooKeeper());
805     if (oldMetaServerLocation != null && previouslyFailedServers.contains(oldMetaServerLocation)) {
806       splitMetaLogBeforeAssignment(oldMetaServerLocation);
807       // Note: we can't remove oldMetaServerLocation from previousFailedServers list because it
808       // may also host user regions
809     }
810     Set<ServerName> previouslyFailedMetaRSs = getPreviouselyFailedMetaServersFromZK();
811     // need to use union of previouslyFailedMetaRSs recorded in ZK and previouslyFailedServers
812     // instead of previouslyFailedMetaRSs alone to address the following two situations:
813     // 1) the chained failure situation(recovery failed multiple times in a row).
814     // 2) master get killed right before it could delete the recovering hbase:meta from ZK while the
815     // same server still has non-meta wals to be replayed so that
816     // removeStaleRecoveringRegionsFromZK can't delete the stale hbase:meta region
817     // Passing more servers into splitMetaLog is all right. If a server doesn't have hbase:meta wal,
818     // there is no op for the server.
819     previouslyFailedMetaRSs.addAll(previouslyFailedServers);
820 
821     this.initializationBeforeMetaAssignment = true;
822 
823     // Wait for regionserver to finish initialization.
824     if (BaseLoadBalancer.tablesOnMaster(conf)) {
825       waitForServerOnline();
826     }
827 
828     //initialize load balancer
829     this.balancer.setClusterStatus(getClusterStatus());
830     this.balancer.setMasterServices(this);
831     this.balancer.initialize();
832 
833     // Check if master is shutting down because of some issue
834     // in initializing the regionserver or the balancer.
835     if(isStopped()) return;
836 
837     // Make sure meta assigned before proceeding.
838     status.setStatus("Assigning Meta Region");
839     assignMeta(status, previouslyFailedMetaRSs, HRegionInfo.DEFAULT_REPLICA_ID);
840     // check if master is shutting down because above assignMeta could return even hbase:meta isn't
841     // assigned when master is shutting down
842     if(isStopped()) return;
843 
844     status.setStatus("Submitting log splitting work for previously failed region servers");
845     // Master has recovered hbase:meta region server and we put
846     // other failed region servers in a queue to be handled later by SSH
847     for (ServerName tmpServer : previouslyFailedServers) {
848       this.serverManager.processDeadServer(tmpServer, true);
849     }
850 
851     // Update meta with new PB serialization if required. i.e migrate all HRI to PB serialization
852     // in meta. This must happen before we assign all user regions or else the assignment will
853     // fail.
854     if (this.conf.getBoolean("hbase.MetaMigrationConvertingToPB", true)) {
855       MetaMigrationConvertingToPB.updateMetaIfNecessary(this);
856     }
857 
858     // Fix up assignment manager status
859     status.setStatus("Starting assignment manager");
860     this.assignmentManager.joinCluster();
861 
862     //set cluster status again after user regions are assigned
863     this.balancer.setClusterStatus(getClusterStatus());
864 
865     // Start balancer and meta catalog janitor after meta and regions have
866     // been assigned.
867     status.setStatus("Starting balancer and catalog janitor");
868     this.clusterStatusChore = new ClusterStatusChore(this, balancer);
869     getChoreService().scheduleChore(clusterStatusChore);
870     this.balancerChore = new BalancerChore(this);
871     getChoreService().scheduleChore(balancerChore);
872     this.normalizerChore = new RegionNormalizerChore(this);
873     getChoreService().scheduleChore(normalizerChore);
874     this.catalogJanitorChore = new CatalogJanitor(this, this);
875     getChoreService().scheduleChore(catalogJanitorChore);
876 
877     // Do Metrics periodically
878     periodicDoMetricsChore = new PeriodicDoMetrics(msgInterval, this);
879     getChoreService().scheduleChore(periodicDoMetricsChore);
880 
881     status.setStatus("Starting namespace manager and quota manager");
882     initNamespaceAndQuotaManager();
883 
884     if (this.cpHost != null) {
885       try {
886         this.cpHost.preMasterInitialization();
887       } catch (IOException e) {
888         LOG.error("Coprocessor preMasterInitialization() hook failed", e);
889       }
890     }
891 
892     status.markComplete("Initialization successful");
893     LOG.info("Master has completed initialization");
894     configurationManager.registerObserver(this.balancer);
895     //configurationManager.registerObserver(this.hfileCleaner);
896     initialized = true;
897 
898     assignmentManager.checkIfShouldMoveSystemRegionAsync();
899 
900     status.setStatus("Starting quota manager");
901     if (QuotaUtil.isQuotaEnabled(conf)) {
902       // Create the quota snapshot notifier
903       spaceQuotaSnapshotNotifier = createQuotaSnapshotNotifier();
904       spaceQuotaSnapshotNotifier.initialize(this.clusterConnection);
905       this.quotaObserverChore = new QuotaObserverChore(this, getMasterMetrics());
906       // Start the chore to read the region FS space reports and act on them
907       getChoreService().scheduleChore(quotaObserverChore);
908     }
909 
910     // assign the meta replicas
911     Set<ServerName> EMPTY_SET = new HashSet<ServerName>();
912     int numReplicas = conf.getInt(HConstants.META_REPLICAS_NUM,
913            HConstants.DEFAULT_META_REPLICA_NUM);
914     for (int i = 1; i < numReplicas; i++) {
915       assignMeta(status, EMPTY_SET, i);
916     }
917     unassignExcessMetaReplica(zooKeeper, numReplicas);
918 
919     // clear the dead servers with same host name and port of online server because we are not
920     // removing dead server with same hostname and port of rs which is trying to check in before
921     // master initialization. See HBASE-5916.
922     this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
923 
924     // Check and set the znode ACLs if needed in case we are overtaking a non-secure configuration
925     status.setStatus("Checking ZNode ACLs");
926     zooKeeper.checkAndSetZNodeAcls();
927 
928     status.setStatus("Calling postStartMaster coprocessors");
929 
930     this.expiredMobFileCleanerChore = new ExpiredMobFileCleanerChore(this);
931     getChoreService().scheduleChore(expiredMobFileCleanerChore);
932 
933     int mobCompactionPeriod = conf.getInt(MobConstants.MOB_COMPACTION_CHORE_PERIOD,
934       MobConstants.DEFAULT_MOB_COMPACTION_CHORE_PERIOD);
935     if (mobCompactionPeriod > 0) {
936       this.mobCompactChore = new MobCompactionChore(this, mobCompactionPeriod);
937       getChoreService().scheduleChore(mobCompactChore);
938     } else {
939       LOG
940         .info("The period is " + mobCompactionPeriod + " seconds, MobCompactionChore is disabled");
941     }
942     this.mobCompactThread = new MasterMobCompactionThread(this);
943 
944     if (this.cpHost != null) {
945       // don't let cp initialization errors kill the master
946       try {
947         this.cpHost.postStartMaster();
948       } catch (IOException ioe) {
949         LOG.error("Coprocessor postStartMaster() hook failed", ioe);
950       }
951     }
952 
953     zombieDetector.interrupt();
954   }
955 
956   /**
957    * Adds the {@code MasterSpaceQuotaObserver} to the list of configured Master observers to
958    * automatically remove space quotas for a table when that table is deleted.
959    */
960   @VisibleForTesting
961   public void updateConfigurationForSpaceQuotaObserver(Configuration conf) {
962     // We're configured to not delete quotas on table deletion, so we don't need to add the obs.
963     if (!conf.getBoolean(
964           MasterSpaceQuotaObserver.REMOVE_QUOTA_ON_TABLE_DELETE,
965           MasterSpaceQuotaObserver.REMOVE_QUOTA_ON_TABLE_DELETE_DEFAULT)) {
966       return;
967     }
968     String[] masterCoprocs = conf.getStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY);
969     final int length = null == masterCoprocs ? 0 : masterCoprocs.length;
970     String[] updatedCoprocs = new String[length + 1];
971     if (length > 0) {
972       System.arraycopy(masterCoprocs, 0, updatedCoprocs, 0, masterCoprocs.length);
973     }
974     updatedCoprocs[length] = MasterSpaceQuotaObserver.class.getName();
975     conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, updatedCoprocs);
976   }
977 
978   private void initQuotaManager() throws IOException {
979     quotaManager = new MasterQuotaManager(this);
980     this.assignmentManager.setRegionStateListener((RegionStateListener) quotaManager);
981     quotaManager.start();
982   }
983 
984   /**
985    * Create a {@link ServerManager} instance.
986    * @param master
987    * @param services
988    * @return An instance of {@link ServerManager}
989    * @throws org.apache.hadoop.hbase.ZooKeeperConnectionException
990    * @throws IOException
991    */
992   ServerManager createServerManager(final Server master,
993       final MasterServices services)
994   throws IOException {
995     // We put this out here in a method so can do a Mockito.spy and stub it out
996     // w/ a mocked up ServerManager.
997     return new ServerManager(master, services);
998   }
999 
1000   private void unassignExcessMetaReplica(ZooKeeperWatcher zkw, int numMetaReplicasConfigured) {
1001     // unassign the unneeded replicas (for e.g., if the previous master was configured
1002     // with a replication of 3 and now it is 2, we need to unassign the 1 unneeded replica)
1003     try {
1004       List<String> metaReplicaZnodes = zooKeeper.getMetaReplicaNodes();
1005       for (String metaReplicaZnode : metaReplicaZnodes) {
1006         int replicaId = zooKeeper.getMetaReplicaIdFromZnode(metaReplicaZnode);
1007         if (replicaId >= numMetaReplicasConfigured) {
1008           RegionState r = MetaTableLocator.getMetaRegionState(zkw, replicaId);
1009           LOG.info("Closing excess replica of meta region " + r.getRegion());
1010           // send a close and wait for a max of 30 seconds
1011           ServerManager.closeRegionSilentlyAndWait(getConnection(), r.getServerName(),
1012               r.getRegion(), 30000);
1013           ZKUtil.deleteNode(zkw, zkw.getZNodeForReplica(replicaId));
1014         }
1015       }
1016     } catch (Exception ex) {
1017       // ignore the exception since we don't want the master to be wedged due to potential
1018       // issues in the cleanup of the extra regions. We can do that cleanup via hbck or manually
1019       LOG.warn("Ignoring exception " + ex);
1020     }
1021   }
1022 
1023   /**
1024    * Check <code>hbase:meta</code> is assigned. If not, assign it.
1025    * @param status MonitoredTask
1026    * @param previouslyFailedMetaRSs
1027    * @param replicaId
1028    * @throws InterruptedException
1029    * @throws IOException
1030    * @throws KeeperException
1031    */
1032   void assignMeta(MonitoredTask status, Set<ServerName> previouslyFailedMetaRSs, int replicaId)
1033       throws InterruptedException, IOException, KeeperException {
1034     // Work on meta region
1035     int assigned = 0;
1036     long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000);
1037     if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
1038       status.setStatus("Assigning hbase:meta region");
1039     } else {
1040       status.setStatus("Assigning hbase:meta region, replicaId " + replicaId);
1041     }
1042     // Get current meta state from zk.
1043     RegionStates regionStates = assignmentManager.getRegionStates();
1044     RegionState metaState = MetaTableLocator.getMetaRegionState(getZooKeeper(), replicaId);
1045     HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO,
1046         replicaId);
1047     ServerName currentMetaServer = metaState.getServerName();
1048     if (!ConfigUtil.useZKForAssignment(conf)) {
1049       regionStates.createRegionState(hri, metaState.getState(),
1050         currentMetaServer, null);
1051     } else {
1052       regionStates.createRegionState(hri);
1053     }
1054     boolean rit = this.assignmentManager.
1055       processRegionInTransitionAndBlockUntilAssigned(hri);
1056     boolean metaRegionLocation = metaTableLocator.verifyMetaRegionLocation(
1057       this.getConnection(), this.getZooKeeper(), timeout, replicaId);
1058     if (!metaRegionLocation || !metaState.isOpened()) {
1059       // Meta location is not verified. It should be in transition, or offline.
1060       // We will wait for it to be assigned in enableSSHandWaitForMeta below.
1061       assigned++;
1062       if (!ConfigUtil.useZKForAssignment(conf)) {
1063         assignMetaZkLess(regionStates, metaState, timeout, previouslyFailedMetaRSs);
1064       } else if (!rit) {
1065         // Assign meta since not already in transition
1066         if (currentMetaServer != null) {
1067           // If the meta server is not known to be dead or online,
1068           // just split the meta log, and don't expire it since this
1069           // could be a full cluster restart. Otherwise, we will think
1070           // this is a failover and lose previous region locations.
1071           // If it is really a failover case, AM will find out in rebuilding
1072           // user regions. Otherwise, we are good since all logs are split
1073           // or known to be replayed before user regions are assigned.
1074           if (serverManager.isServerOnline(currentMetaServer)) {
1075             LOG.info("Forcing expire of " + currentMetaServer);
1076             serverManager.expireServer(currentMetaServer);
1077           }
1078           if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
1079             splitMetaLogBeforeAssignment(currentMetaServer);
1080             previouslyFailedMetaRSs.add(currentMetaServer);
1081           }
1082         }
1083         assignmentManager.assignMeta(hri);
1084       }
1085     } else {
1086       // Region already assigned. We didn't assign it. Add to in-memory state.
1087       regionStates.updateRegionState(hri, State.OPEN, currentMetaServer);
1088       this.assignmentManager.regionOnline(hri, currentMetaServer);
1089     }
1090 
1091     if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) enableMeta(TableName.META_TABLE_NAME);
1092 
1093     if ((RecoveryMode.LOG_REPLAY == this.getMasterFileSystem().getLogRecoveryMode())
1094         && (!previouslyFailedMetaRSs.isEmpty())) {
1095       // replay WAL edits mode need new hbase:meta RS is assigned firstly
1096       status.setStatus("replaying log for Meta Region");
1097       this.fileSystemManager.splitMetaLog(previouslyFailedMetaRSs);
1098     }
1099 
1100     // Make sure a hbase:meta location is set. We need to enable SSH here since
1101     // if the meta region server is died at this time, we need it to be re-assigned
1102     // by SSH so that system tables can be assigned.
1103     // No need to wait for meta is assigned = 0 when meta is just verified.
1104     if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) enableServerShutdownHandler(assigned != 0);
1105     LOG.info("hbase:meta with replicaId " + replicaId + " assigned=" + assigned + ", rit=" + rit +
1106       ", location=" + metaTableLocator.getMetaRegionLocation(this.getZooKeeper(), replicaId));
1107     status.setStatus("META assigned.");
1108   }
1109 
1110   private void assignMetaZkLess(RegionStates regionStates, RegionState regionState, long timeout,
1111       Set<ServerName> previouslyFailedRs) throws IOException, KeeperException {
1112     ServerName currentServer = regionState.getServerName();
1113     if (serverManager.isServerOnline(currentServer)) {
1114       LOG.info("Meta was in transition on " + currentServer);
1115       assignmentManager.processRegionInTransitionZkLess();
1116     } else {
1117       if (currentServer != null) {
1118         if (regionState.getRegion().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
1119           splitMetaLogBeforeAssignment(currentServer);
1120           regionStates.logSplit(HRegionInfo.FIRST_META_REGIONINFO);
1121           previouslyFailedRs.add(currentServer);
1122         }
1123       }
1124       LOG.info("Re-assigning hbase:meta, it was on " + currentServer);
1125       regionStates.updateRegionState(regionState.getRegion(), State.OFFLINE);
1126       assignmentManager.assignMeta(regionState.getRegion());
1127     }
1128   }
1129 
1130   /*
1131    * The main purpose is to start namespace manager and quota manager async to	   
1132    * unblock overall master initialization
1133    *
1134    * @throws IOException
1135    */
1136   private void initNamespaceAndQuotaManager() throws IOException {
1137     //create namespace manager
1138 	tableNamespaceManager = new TableNamespaceManager(this);
1139 
1140     //create quota manager
1141 	this.quotaManager = new MasterQuotaManager(this);
1142 	this.assignmentManager.setRegionStateListener((RegionStateListener)quotaManager);
1143 
1144     if (conf.getBoolean("hbase.master.start.wait.for.namespacemanager", false)) {
1145       // If being asked not to async start namespace manager, then just block
1146 	  // master service starting until namespace manager is ready.
1147 	  //
1148       // Note: Quota manager depends on namespace manager.  Therefore, its starting
1149       // method has to be in-sync with namespace manager.
1150       LOG.info("Starting namespace manager and quota manager synchronously");
1151 	
1152       tableNamespaceManager.start();
1153       LOG.info("Namespace manager started successfully.");
1154 
1155 	  quotaManager.start();
1156 	  LOG.info("Quota manager started successfully.");
1157 	} else { // else asynchronously start namespace manager and quota manager
1158       LOG.info("Starting namespace manager and quota manager asynchronously");
1159       Threads.setDaemonThreadRunning(new Thread(new Runnable() {
1160         @Override
1161         public void run() {
1162           // Start namespace manager and wait to it to be fully started.
1163           try {
1164             tableNamespaceManager.start();
1165             LOG.info("Namespace manager started successfully.");
1166           } catch (IOException e) {
1167         	LOG.error("Namespace manager failed to start. ", e);
1168             abort("Shutdown Master due to namespace manager failed to start. ", e);
1169           }
1170           // Quota Manager depends on Namespace manager to be fully initialized.
1171           try {
1172             quotaManager.start();
1173             LOG.info("Quota manager started successfully.");
1174           } catch (IOException ie) {
1175             LOG.error("Quota Manager failed to start. ", ie);
1176             abort("Shutdown Master due to Quota Manager failure to start. ", ie);
1177           }
1178 	    }
1179 	  }, "Init Namespace Manager and Quota Manager Async"));
1180 	}
1181   }
1182 
1183   SpaceQuotaSnapshotNotifier createQuotaSnapshotNotifier() {
1184     SpaceQuotaSnapshotNotifier notifier =
1185         SpaceQuotaSnapshotNotifierFactory.getInstance().create(getConfiguration());
1186     notifier.initialize(getConnection());
1187     return notifier;
1188   }
1189 
1190   boolean isCatalogJanitorEnabled() {
1191     return catalogJanitorChore != null ?
1192       catalogJanitorChore.getEnabled() : false;
1193   }
1194 
1195   private void splitMetaLogBeforeAssignment(ServerName currentMetaServer) throws IOException {
1196     if (RecoveryMode.LOG_REPLAY == this.getMasterFileSystem().getLogRecoveryMode()) {
1197       // In log replay mode, we mark hbase:meta region as recovering in ZK
1198       Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
1199       regions.add(HRegionInfo.FIRST_META_REGIONINFO);
1200       this.fileSystemManager.prepareLogReplay(currentMetaServer, regions);
1201     } else {
1202       // In recovered.edits mode: create recovered edits file for hbase:meta server
1203       this.fileSystemManager.splitMetaLog(currentMetaServer);
1204     }
1205   }
1206 
1207   private void enableServerShutdownHandler(
1208       final boolean waitForMeta) throws IOException, InterruptedException {
1209     // If ServerShutdownHandler is disabled, we enable it and expire those dead
1210     // but not expired servers. This is required so that if meta is assigning to
1211     // a server which dies after assignMeta starts assignment,
1212     // SSH can re-assign it. Otherwise, we will be
1213     // stuck here waiting forever if waitForMeta is specified.
1214     if (!serverShutdownHandlerEnabled) {
1215       serverShutdownHandlerEnabled = true;
1216       this.serverManager.processQueuedDeadServers();
1217     }
1218 
1219     if (waitForMeta) {
1220       metaTableLocator.waitMetaRegionLocation(this.getZooKeeper());
1221       // Above check waits for general meta availability but this does not
1222       // guarantee that the transition has completed
1223       this.assignmentManager.waitForAssignment(HRegionInfo.FIRST_META_REGIONINFO);
1224     }
1225   }
1226 
1227   private void enableMeta(TableName metaTableName) {
1228     if (!this.assignmentManager.getTableStateManager().isTableState(metaTableName,
1229         ZooKeeperProtos.Table.State.ENABLED)) {
1230       this.assignmentManager.setEnabledTable(metaTableName);
1231     }
1232   }
1233 
1234   /**
1235    * This function returns a set of region server names under hbase:meta recovering region ZK node
1236    * @return Set of meta server names which were recorded in ZK
1237    * @throws KeeperException
1238    */
1239   private Set<ServerName> getPreviouselyFailedMetaServersFromZK() throws KeeperException {
1240     Set<ServerName> result = new HashSet<ServerName>();
1241     String metaRecoveringZNode = ZKUtil.joinZNode(zooKeeper.recoveringRegionsZNode,
1242       HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
1243     List<String> regionFailedServers = ZKUtil.listChildrenNoWatch(zooKeeper, metaRecoveringZNode);
1244     if (regionFailedServers == null) return result;
1245 
1246     for(String failedServer : regionFailedServers) {
1247       ServerName server = ServerName.parseServerName(failedServer);
1248       result.add(server);
1249     }
1250     return result;
1251   }
1252 
1253   @Override
1254   public TableDescriptors getTableDescriptors() {
1255     return this.tableDescriptors;
1256   }
1257 
1258   @Override
1259   public ServerManager getServerManager() {
1260     return this.serverManager;
1261   }
1262 
1263   @Override
1264   public MasterFileSystem getMasterFileSystem() {
1265     return this.fileSystemManager;
1266   }
1267 
1268   /*
1269    * Start up all services. If any of these threads gets an unhandled exception
1270    * then they just die with a logged message.  This should be fine because
1271    * in general, we do not expect the master to get such unhandled exceptions
1272    *  as OOMEs; it should be lightly loaded. See what HRegionServer does if
1273    *  need to install an unexpected exception handler.
1274    */
1275   private void startServiceThreads() throws IOException{
1276    // Start the executor service pools
1277    this.service.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
1278       conf.getInt("hbase.master.executor.openregion.threads", 5));
1279    this.service.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
1280       conf.getInt("hbase.master.executor.closeregion.threads", 5));
1281    this.service.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
1282       conf.getInt("hbase.master.executor.serverops.threads", 5));
1283    this.service.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
1284       conf.getInt("hbase.master.executor.serverops.threads", 5));
1285    this.service.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
1286       conf.getInt("hbase.master.executor.logreplayops.threads", 10));
1287 
1288    // We depend on there being only one instance of this executor running
1289    // at a time.  To do concurrency, would need fencing of enable/disable of
1290    // tables.
1291    // Any time changing this maxThreads to > 1, pls see the comment at
1292    // AccessController#postCreateTableHandler
1293    this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
1294    startProcedureExecutor();
1295 
1296    // Start log cleaner thread
1297    int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000);
1298    this.logCleaner =
1299       new LogCleaner(cleanerInterval,
1300          this, conf, getMasterFileSystem().getOldLogDir().getFileSystem(conf),
1301          getMasterFileSystem().getOldLogDir());
1302     getChoreService().scheduleChore(logCleaner);
1303 
1304    //start the hfile archive cleaner thread
1305     Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
1306     this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem()
1307         .getFileSystem(), archiveDir);
1308     getChoreService().scheduleChore(hfileCleaner);
1309     serviceStarted = true;
1310     if (LOG.isTraceEnabled()) {
1311       LOG.trace("Started service threads");
1312     }
1313   }
1314 
1315   @Override
1316   protected void sendShutdownInterrupt() {
1317     super.sendShutdownInterrupt();
1318     stopProcedureExecutor();
1319   }
1320 
1321   @Override
1322   protected void stopServiceThreads() {
1323     if (masterJettyServer != null) {
1324       LOG.info("Stopping master jetty server");
1325       try {
1326         masterJettyServer.stop();
1327       } catch (Exception e) {
1328         LOG.error("Failed to stop master jetty server", e);
1329       }
1330     }
1331     super.stopServiceThreads();
1332     stopChores();
1333 
1334     // Wait for all the remaining region servers to report in IFF we were
1335     // running a cluster shutdown AND we were NOT aborting.
1336     if (!isAborted() && this.serverManager != null &&
1337         this.serverManager.isClusterShutdown()) {
1338       this.serverManager.letRegionServersShutdown();
1339     }
1340     if (LOG.isDebugEnabled()) {
1341       LOG.debug("Stopping service threads");
1342     }
1343     // Clean up and close up shop
1344     if (this.logCleaner != null) this.logCleaner.cancel(true);
1345     if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
1346     if (this.quotaManager != null) this.quotaManager.stop();
1347     if (this.activeMasterManager != null) this.activeMasterManager.stop();
1348     if (this.serverManager != null) this.serverManager.stop();
1349     if (this.assignmentManager != null) this.assignmentManager.stop();
1350     if (this.fileSystemManager != null) this.fileSystemManager.stop();
1351     if (this.mpmHost != null) this.mpmHost.stop("server shutting down.");
1352   }
1353 
1354   /**
1355    * Check whether the procedure executor is enabled
1356    */
1357   @Override
1358   public boolean isMasterProcedureExecutorEnabled() {
1359     return (this.procedureConf == ProcedureConf.PROCEDURE_ENABLED);
1360   }
1361 
1362   private void startProcedureExecutor() throws IOException {
1363     final MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
1364     final Path walDir = new Path(FSUtils.getWALRootDir(this.conf),
1365         MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
1366 
1367     if (this.procedureConf == ProcedureConf.PROCEDURE_FULLY_DISABLED) {
1368       // Clean up the procedure store so that we will in a clean state when procedure
1369       // is enabled later.
1370       // Note: hbck might needed for uncompleted procedures.
1371       try {
1372         fs.delete(walDir, true);
1373         LOG.warn("Procedure executor is disabled from configuartion. " +
1374             "All the state logs from procedure store were removed." +
1375             "You should check the cluster state using HBCK.");
1376       } catch (Exception e) {
1377         // Ignore exception and move on.
1378         LOG.error("Removing all the state logs from procedure store failed." +
1379             "You should check the cluster state using HBCK.");
1380       }
1381       return;
1382     }
1383 
1384     procedureStore = new WALProcedureStore(conf, walDir.getFileSystem(conf), walDir,
1385         new MasterProcedureEnv.WALStoreLeaseRecovery(this));
1386 
1387     procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
1388     procedureExecutor = new ProcedureExecutor(conf, procEnv, procedureStore,
1389         procEnv.getProcedureQueue());
1390 
1391     final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
1392         Math.max(Runtime.getRuntime().availableProcessors(),
1393           MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
1394     procedureStore.start(numThreads);
1395     procedureExecutor.start(numThreads);
1396   }
1397 
1398   private void stopProcedureExecutor() {
1399     if (procedureExecutor != null) {
1400       procedureExecutor.stop();
1401     }
1402 
1403     if (procedureStore != null) {
1404       procedureStore.stop(isAborted());
1405     }
1406   }
1407 
1408   private void stopChores() {
1409     if (this.expiredMobFileCleanerChore != null) {
1410       this.expiredMobFileCleanerChore.cancel(true);
1411     }
1412     if (this.mobCompactChore != null) {
1413       this.mobCompactChore.cancel(true);
1414     }
1415     if (this.balancerChore != null) {
1416       this.balancerChore.cancel(true);
1417     }
1418     if (this.normalizerChore != null) {
1419       this.normalizerChore.cancel(true);
1420     }
1421     if (this.clusterStatusChore != null) {
1422       this.clusterStatusChore.cancel(true);
1423     }
1424     if (this.catalogJanitorChore != null) {
1425       this.catalogJanitorChore.cancel(true);
1426     }
1427     if (this.clusterStatusPublisherChore != null){
1428       clusterStatusPublisherChore.cancel(true);
1429     }
1430     if (this.mobCompactThread != null) {
1431       this.mobCompactThread.close();
1432     }
1433     if (this.quotaObserverChore != null) {
1434       quotaObserverChore.cancel();
1435     }
1436     if (this.periodicDoMetricsChore != null) {
1437       periodicDoMetricsChore.cancel();
1438     }
1439   }
1440 
1441   /**
1442    * @return Get remote side's InetAddress
1443    * @throws UnknownHostException
1444    */
1445   InetAddress getRemoteInetAddress(final int port,
1446       final long serverStartCode) throws UnknownHostException {
1447     // Do it out here in its own little method so can fake an address when
1448     // mocking up in tests.
1449     InetAddress ia = RpcServer.getRemoteIp();
1450 
1451     // The call could be from the local regionserver,
1452     // in which case, there is no remote address.
1453     if (ia == null && serverStartCode == startcode) {
1454       InetSocketAddress isa = rpcServices.getSocketAddress();
1455       if (isa != null && isa.getPort() == port) {
1456         ia = isa.getAddress();
1457       }
1458     }
1459     return ia;
1460   }
1461 
1462   /**
1463    * @return Maximum time we should run balancer for
1464    */
1465   private int getBalancerCutoffTime() {
1466     int balancerCutoffTime =
1467       getConfiguration().getInt("hbase.balancer.max.balancing", -1);
1468     if (balancerCutoffTime == -1) {
1469       // No time period set so create one
1470       int balancerPeriod =
1471         getConfiguration().getInt("hbase.balancer.period", 300000);
1472       balancerCutoffTime = balancerPeriod;
1473       // If nonsense period, set it to balancerPeriod
1474       if (balancerCutoffTime <= 0) balancerCutoffTime = balancerPeriod;
1475     }
1476     return balancerCutoffTime;
1477   }
1478 
1479   public boolean balance() throws IOException {
1480     return balance(false);
1481   }
1482 
1483   public boolean balance(boolean force) throws IOException {
1484     // if master not initialized, don't run balancer.
1485     if (!this.initialized) {
1486       LOG.debug("Master has not been initialized, don't run balancer.");
1487       return false;
1488     }
1489 
1490     if (isInMaintenanceMode()) {
1491       LOG.info("Master is in maintenanceMode mode, don't run balancer.");
1492       return false;
1493     }
1494 
1495     // Do this call outside of synchronized block.
1496     int maximumBalanceTime = getBalancerCutoffTime();
1497     synchronized (this.balancer) {
1498       // If balance not true, don't run balancer.
1499       if (!this.loadBalancerTracker.isBalancerOn()) return false;
1500       // Only allow one balance run at at time.
1501       if (this.assignmentManager.getRegionStates().isRegionsInTransition()) {
1502         Map<String, RegionState> regionsInTransition =
1503           this.assignmentManager.getRegionStates().getRegionsInTransition();
1504         // if hbase:meta region is in transition, result of assignment cannot be recorded
1505         // ignore the force flag in that case
1506         boolean metaInTransition = assignmentManager.getRegionStates().isMetaRegionInTransition();
1507         String prefix = force && !metaInTransition ? "R" : "Not r";
1508         LOG.debug(prefix + "unning balancer because " + regionsInTransition.size() +
1509           " region(s) in transition: " + org.apache.commons.lang.StringUtils.
1510             abbreviate(regionsInTransition.toString(), 256));
1511         if (!force || metaInTransition) return false;
1512       }
1513       if (this.serverManager.areDeadServersInProgress()) {
1514         LOG.debug("Not running balancer because processing dead regionserver(s): " +
1515           this.serverManager.getDeadServers());
1516         return false;
1517       }
1518 
1519       if (this.cpHost != null) {
1520         try {
1521           if (this.cpHost.preBalance()) {
1522             LOG.debug("Coprocessor bypassing balancer request");
1523             return false;
1524           }
1525         } catch (IOException ioe) {
1526           LOG.error("Error invoking master coprocessor preBalance()", ioe);
1527           return false;
1528         }
1529       }
1530 
1531       Map<TableName, Map<ServerName, List<HRegionInfo>>> assignmentsByTable =
1532         this.assignmentManager.getRegionStates().getAssignmentsByTable();
1533 
1534       List<RegionPlan> plans = new ArrayList<RegionPlan>();
1535 
1536       //Give the balancer the current cluster state.
1537       this.balancer.setClusterStatus(getClusterStatus());
1538       for (Entry<TableName, Map<ServerName, List<HRegionInfo>>> e : assignmentsByTable.entrySet()) {
1539         List<RegionPlan> partialPlans = this.balancer.balanceCluster(e.getKey(), e.getValue());
1540         if (partialPlans != null) plans.addAll(partialPlans);
1541       }
1542 
1543       long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
1544       int rpCount = 0;  // number of RegionPlans balanced so far
1545       long totalRegPlanExecTime = 0;
1546       if (plans != null && !plans.isEmpty()) {
1547         for (RegionPlan plan: plans) {
1548           LOG.info("balance " + plan);
1549           long balStartTime = System.currentTimeMillis();
1550           //TODO: bulk assign
1551           this.assignmentManager.balance(plan);
1552           totalRegPlanExecTime += System.currentTimeMillis()-balStartTime;
1553           rpCount++;
1554           if (rpCount < plans.size() &&
1555               // if performing next balance exceeds cutoff time, exit the loop
1556               (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) {
1557             //TODO: After balance, there should not be a cutoff time (keeping it as a security net for now)
1558             LOG.debug("No more balancing till next balance run; maximumBalanceTime=" +
1559               maximumBalanceTime);
1560             break;
1561           }
1562         }
1563       }
1564       if (this.cpHost != null) {
1565         try {
1566           this.cpHost.postBalance(rpCount < plans.size() ? plans.subList(0, rpCount) : plans);
1567         } catch (IOException ioe) {
1568           // balancing already succeeded so don't change the result
1569           LOG.error("Error invoking master coprocessor postBalance()", ioe);
1570         }
1571       }
1572     }
1573     // If LoadBalancer did not generate any plans, it means the cluster is already balanced.
1574     // Return true indicating a success.
1575     return true;
1576   }
1577 
1578   /**
1579    * Perform normalization of cluster (invoked by {@link RegionNormalizerChore}).
1580    *
1581    * @return true if normalization step was performed successfully, false otherwise
1582    *   (specifically, if HMaster hasn't been initialized properly or normalization
1583    *   is globally disabled)
1584    * @throws IOException, CoordinatedStateException
1585    */
1586   public boolean normalizeRegions() throws IOException, CoordinatedStateException {
1587     if (!this.initialized) {
1588       LOG.debug("Master has not been initialized, don't run region normalizer.");
1589       return false;
1590     }
1591 
1592     if (isInMaintenanceMode()) {
1593       LOG.info("Master is in maintenance mode, don't run region normalizer.");
1594       return false;
1595     }
1596 
1597     if (!this.regionNormalizerTracker.isNormalizerOn()) {
1598       LOG.debug("Region normalization is disabled, don't run region normalizer.");
1599       return false;
1600     }
1601 
1602     synchronized (this.normalizer) {
1603       // Don't run the normalizer concurrently
1604       List<TableName> allEnabledTables = new ArrayList<>(
1605         this.assignmentManager.getTableStateManager().getTablesInStates(
1606           ZooKeeperProtos.Table.State.ENABLED));
1607 
1608       Collections.shuffle(allEnabledTables);
1609 
1610       for (TableName table : allEnabledTables) {
1611         if (isInMaintenanceMode()) {
1612           LOG.debug("Master is in maintenance mode, stop running region normalizer.");
1613           return false;
1614         }
1615 
1616         if (table.isSystemTable() || (getTableDescriptors().get(table) != null &&
1617             !getTableDescriptors().get(table).isNormalizationEnabled())) {
1618           LOG.debug("Skipping normalization for table: " + table + ", as it's either system"
1619               + " table or doesn't have auto normalization turned on");
1620           continue;
1621         }
1622         List<NormalizationPlan> plans = this.normalizer.computePlanForTable(table);
1623         if (plans != null) {
1624           for (NormalizationPlan plan : plans) {
1625             plan.execute(clusterConnection.getAdmin());
1626           }
1627         }
1628       }
1629     }
1630     // If Region did not generate any plans, it means the cluster is already balanced.
1631     // Return true indicating a success.
1632     return true;
1633   }
1634 
1635   /**
1636    * @return Client info for use as prefix on an audit log string; who did an action
1637    */
1638   String getClientIdAuditPrefix() {
1639     return "Client=" + RpcServer.getRequestUserName() + "/" + RpcServer.getRemoteAddress();
1640   }
1641 
1642   /**
1643    * Switch for the background CatalogJanitor thread.
1644    * Used for testing.  The thread will continue to run.  It will just be a noop
1645    * if disabled.
1646    * @param b If false, the catalog janitor won't do anything.
1647    */
1648   public void setCatalogJanitorEnabled(final boolean b) {
1649     this.catalogJanitorChore.setEnabled(b);
1650   }
1651 
1652   @Override
1653   public void dispatchMergingRegions(final HRegionInfo region_a,
1654       final HRegionInfo region_b, final boolean forcible) throws IOException {
1655     checkInitialized();
1656     this.service.submit(new DispatchMergingRegionHandler(this,
1657       this.catalogJanitorChore, region_a, region_b, forcible));
1658   }
1659 
1660   @VisibleForTesting // Public so can be accessed by tests.
1661   public void move(final byte[] encodedRegionName,
1662       byte[] destServerName) throws HBaseIOException {
1663     RegionState regionState = assignmentManager.getRegionStates().
1664       getRegionState(Bytes.toString(encodedRegionName));
1665 
1666     HRegionInfo hri;
1667     if (regionState != null) {
1668       hri = regionState.getRegion();
1669     } else {
1670       throw new UnknownRegionException(Bytes.toStringBinary(encodedRegionName));
1671     }
1672 
1673     ServerName dest;
1674     List<ServerName> exclude = hri.isSystemTable() ? assignmentManager.getExcludedServersForSystemTable()
1675         : new ArrayList<ServerName>(1);
1676     if (destServerName != null && exclude.contains(ServerName.valueOf(Bytes.toString(destServerName)))) {
1677       LOG.info(
1678           Bytes.toString(encodedRegionName) + " can not move to " + Bytes.toString(destServerName)
1679               + " because the server is in exclude list");
1680       destServerName = null;
1681     }
1682 
1683     if (destServerName == null || destServerName.length == 0) {
1684       LOG.info("Passed destination servername is null/empty so " +
1685         "choosing a server at random");
1686       exclude.add(regionState.getServerName());
1687       final List<ServerName> destServers = this.serverManager.createDestinationServersList(exclude);
1688       dest = balancer.randomAssignment(hri, destServers);
1689       if (dest == null) {
1690         LOG.debug("Unable to determine a plan to assign " + hri);
1691         return;
1692       }
1693     } else {
1694       ServerName candidate = ServerName.valueOf(Bytes.toString(destServerName));
1695       dest = balancer.randomAssignment(hri, Lists.newArrayList(candidate));
1696       if (dest == null) {
1697         LOG.debug("Unable to determine a plan to assign " + hri);
1698         return;
1699       }
1700       if (dest.equals(serverName) && balancer instanceof BaseLoadBalancer
1701           && !((BaseLoadBalancer)balancer).shouldBeOnMaster(hri)) {
1702         // To avoid unnecessary region moving later by balancer. Don't put user
1703         // regions on master. Regions on master could be put on other region
1704         // server intentionally by test however.
1705         LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1706           + " to avoid unnecessary region moving later by load balancer,"
1707           + " because it should not be on master");
1708         return;
1709       }
1710     }
1711 
1712     if (dest.equals(regionState.getServerName())) {
1713       LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1714         + " because region already assigned to the same server " + dest + ".");
1715       return;
1716     }
1717 
1718     // Now we can do the move
1719     RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), dest);
1720 
1721     try {
1722       checkInitialized();
1723       if (this.cpHost != null) {
1724         if (this.cpHost.preMove(hri, rp.getSource(), rp.getDestination())) {
1725           return;
1726         }
1727       }
1728       // warmup the region on the destination before initiating the move. this call
1729       // is synchronous and takes some time. doing it before the source region gets
1730       // closed
1731       serverManager.sendRegionWarmup(rp.getDestination(), hri);
1732 
1733       LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
1734       this.assignmentManager.balance(rp);
1735       if (this.cpHost != null) {
1736         this.cpHost.postMove(hri, rp.getSource(), rp.getDestination());
1737       }
1738     } catch (IOException ioe) {
1739       if (ioe instanceof HBaseIOException) {
1740         throw (HBaseIOException)ioe;
1741       }
1742       throw new HBaseIOException(ioe);
1743     }
1744   }
1745 
1746   @Override
1747   public long createTable(
1748       final HTableDescriptor hTableDescriptor,
1749       final byte [][] splitKeys,
1750       final long nonceGroup,
1751       final long nonce) throws IOException {
1752     if (isStopped()) {
1753       throw new MasterNotRunningException();
1754     }
1755 
1756     TableName tableName = hTableDescriptor.getTableName();
1757     String namespace = tableName.getNamespaceAsString();
1758     ensureNamespaceExists(namespace);
1759 
1760     final HRegionInfo[] newRegions =
1761         ModifyRegionUtils.createHRegionInfos(hTableDescriptor, splitKeys);
1762     checkInitialized();
1763     sanityCheckTableDescriptor(hTableDescriptor);
1764 
1765     if (isMasterProcedureExecutorEnabled()) {
1766       return MasterProcedureUtil.submitProcedure(
1767         new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
1768         @Override
1769         protected void run() throws IOException {
1770           getMaster().getMasterCoprocessorHost().preCreateTable(hTableDescriptor, newRegions);
1771 
1772           LOG.info(getClientIdAuditPrefix() + " create " + hTableDescriptor);
1773 
1774           // TODO: We can handle/merge duplicate requests, and differentiate the case of
1775           //       TableExistsException by saying if the schema is the same or not.
1776           ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
1777           submitProcedure(new CreateTableProcedure(
1778             procedureExecutor.getEnvironment(), hTableDescriptor, newRegions, latch));
1779           latch.await();
1780 
1781           getMaster().getMasterCoprocessorHost().postCreateTable(hTableDescriptor, newRegions);
1782         }
1783 
1784         @Override
1785         protected String getDescription() {
1786           return "CreateTableProcedure";
1787         }
1788       });
1789     } else {
1790       try {
1791         this.quotaManager.checkNamespaceTableAndRegionQuota(tableName, newRegions.length);
1792         if (cpHost != null) {
1793           cpHost.preCreateTable(hTableDescriptor, newRegions);
1794         }
1795         LOG.info(getClientIdAuditPrefix() + " create " + hTableDescriptor);
1796         this.service.submit(new CreateTableHandler(this, this.fileSystemManager, hTableDescriptor,
1797             conf, newRegions, this).prepare());
1798         if (cpHost != null) {
1799           cpHost.postCreateTable(hTableDescriptor, newRegions);
1800         }
1801       } catch (IOException e) {
1802         this.quotaManager.removeTableFromNamespaceQuota(tableName);
1803         LOG.error("Exception occurred while creating the table " + tableName.getNameAsString(), e);
1804         throw e;
1805       }
1806       return -1;
1807     }
1808   }
1809 
1810   /**
1811    * Checks whether the table conforms to some sane limits, and configured
1812    * values (compression, etc) work. Throws an exception if something is wrong.
1813    * @throws IOException
1814    */
1815   private void sanityCheckTableDescriptor(final HTableDescriptor htd) throws IOException {
1816     final String CONF_KEY = "hbase.table.sanity.checks";
1817     boolean logWarn = false;
1818     if (!conf.getBoolean(CONF_KEY, true)) {
1819       logWarn = true;
1820     }
1821     String tableVal = htd.getConfigurationValue(CONF_KEY);
1822     if (tableVal != null && !Boolean.valueOf(tableVal)) {
1823       logWarn = true;
1824     }
1825 
1826     // check max file size
1827     long maxFileSizeLowerLimit = 2 * 1024 * 1024L; // 2M is the default lower limit
1828     long maxFileSize = htd.getMaxFileSize();
1829     if (maxFileSize < 0) {
1830       maxFileSize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, maxFileSizeLowerLimit);
1831     }
1832     if (maxFileSize < conf.getLong("hbase.hregion.max.filesize.limit", maxFileSizeLowerLimit)) {
1833       String message = "MAX_FILESIZE for table descriptor or "
1834           + "\"hbase.hregion.max.filesize\" (" + maxFileSize
1835           + ") is too small, which might cause over splitting into unmanageable "
1836           + "number of regions.";
1837       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1838     }
1839 
1840     // check flush size
1841     long flushSizeLowerLimit = 1024 * 1024L; // 1M is the default lower limit
1842     long flushSize = htd.getMemStoreFlushSize();
1843     if (flushSize < 0) {
1844       flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeLowerLimit);
1845     }
1846     if (flushSize < conf.getLong("hbase.hregion.memstore.flush.size.limit", flushSizeLowerLimit)) {
1847       String message = "MEMSTORE_FLUSHSIZE for table descriptor or "
1848           + "\"hbase.hregion.memstore.flush.size\" ("+flushSize+") is too small, which might cause"
1849           + " very frequent flushing.";
1850       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1851     }
1852 
1853     // check that coprocessors and other specified plugin classes can be loaded
1854     try {
1855       checkClassLoading(conf, htd);
1856     } catch (Exception ex) {
1857       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, ex.getMessage(), null);
1858     }
1859 
1860     // check compression can be loaded
1861     try {
1862       checkCompression(htd);
1863     } catch (IOException e) {
1864       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, e.getMessage(), e);
1865     }
1866 
1867     // check encryption can be loaded
1868     try {
1869       checkEncryption(conf, htd);
1870     } catch (IOException e) {
1871       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, e.getMessage(), e);
1872     }
1873     // Verify compaction policy
1874     try{
1875       checkCompactionPolicy(conf, htd);
1876     } catch(IOException e){
1877       warnOrThrowExceptionForFailure(false, CONF_KEY, e.getMessage(), e);
1878     }
1879     // check that we have at least 1 CF
1880     if (htd.getColumnFamilies().length == 0) {
1881       String message = "Table should have at least one column family.";
1882       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1883     }
1884 
1885     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1886       if (hcd.getTimeToLive() <= 0) {
1887         String message = "TTL for column family " + hcd.getNameAsString() + " must be positive.";
1888         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1889       }
1890 
1891       // check blockSize
1892       if (hcd.getBlocksize() < 1024 || hcd.getBlocksize() > 16 * 1024 * 1024) {
1893         String message = "Block size for column family " + hcd.getNameAsString()
1894             + "  must be between 1K and 16MB.";
1895         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1896       }
1897 
1898       // check versions
1899       if (hcd.getMinVersions() < 0) {
1900         String message = "Min versions for column family " + hcd.getNameAsString()
1901           + "  must be positive.";
1902         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1903       }
1904       // max versions already being checked
1905 
1906       // HBASE-13776 Setting illegal versions for HColumnDescriptor
1907       //  does not throw IllegalArgumentException
1908       // check minVersions <= maxVerions
1909       if (hcd.getMinVersions() > hcd.getMaxVersions()) {
1910         String message = "Min versions for column family " + hcd.getNameAsString()
1911             + " must be less than the Max versions.";
1912         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1913       }
1914 
1915       // check replication scope
1916       if (hcd.getScope() < 0) {
1917         String message = "Replication scope for column family "
1918           + hcd.getNameAsString() + "  must be positive.";
1919         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1920       }
1921 
1922       // TODO: should we check coprocessors and encryption ?
1923     }
1924   }
1925 
1926   private void checkCompactionPolicy(Configuration conf, HTableDescriptor htd)
1927       throws IOException {
1928     // FIFO compaction has some requirements
1929     // Actually FCP ignores periodic major compactions
1930     String className =
1931         htd.getConfigurationValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY);
1932     if (className == null) {
1933       className =
1934           conf.get(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY,
1935             ExploringCompactionPolicy.class.getName());
1936     }
1937 
1938 
1939     int blockingFileCount = HStore.DEFAULT_BLOCKING_STOREFILE_COUNT;
1940     String sv = htd.getConfigurationValue(HStore.BLOCKING_STOREFILES_KEY);
1941     if (sv != null) {
1942       blockingFileCount = Integer.parseInt(sv);
1943     } else {
1944       blockingFileCount = conf.getInt(HStore.BLOCKING_STOREFILES_KEY, blockingFileCount);
1945     }
1946 
1947     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1948       String compactionPolicy =
1949           hcd.getConfigurationValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY);
1950       if (compactionPolicy == null) {
1951         compactionPolicy = className;
1952       }
1953       if (compactionPolicy.equals(FIFOCompactionPolicy.class.getName()) == false) {
1954         continue;
1955       }
1956       // FIFOCompaction
1957       String message = null;
1958 
1959       // 1. Check TTL
1960       if (hcd.getTimeToLive() == HColumnDescriptor.DEFAULT_TTL) {
1961         message = "Default TTL is not supported for FIFO compaction";
1962         throw new IOException(message);
1963       }
1964 
1965       // 2. Check min versions
1966       if (hcd.getMinVersions() > 0) {
1967         message = "MIN_VERSION > 0 is not supported for FIFO compaction";
1968         throw new IOException(message);
1969       }
1970 
1971       // 3. blocking file count
1972       String sbfc = htd.getConfigurationValue(HStore.BLOCKING_STOREFILES_KEY);
1973       if (sbfc != null) {
1974         blockingFileCount = Integer.parseInt(sbfc);
1975       }
1976       if (blockingFileCount < 1000) {
1977         message =
1978             "blocking file count '" + HStore.BLOCKING_STOREFILES_KEY + "' " + blockingFileCount
1979                 + " is below recommended minimum of 1000";
1980         throw new IOException(message);
1981       }
1982     }
1983   }
1984 
1985   // HBASE-13350 - Helper method to log warning on sanity check failures if checks disabled.
1986   private static void warnOrThrowExceptionForFailure(boolean logWarn, String confKey,
1987       String message, Exception cause) throws IOException {
1988     if (!logWarn) {
1989       throw new DoNotRetryIOException(message + " Set " + confKey +
1990           " to false at conf or table descriptor if you want to bypass sanity checks", cause);
1991     }
1992     LOG.warn(message);
1993   }
1994 
1995   private void startActiveMasterManager(int infoPort) throws KeeperException {
1996     String backupZNode = ZKUtil.joinZNode(
1997       zooKeeper.backupMasterAddressesZNode, serverName.toString());
1998     /*
1999     * Add a ZNode for ourselves in the backup master directory since we
2000     * may not become the active master. If so, we want the actual active
2001     * master to know we are backup masters, so that it won't assign
2002     * regions to us if so configured.
2003     *
2004     * If we become the active master later, ActiveMasterManager will delete
2005     * this node explicitly.  If we crash before then, ZooKeeper will delete
2006     * this node for us since it is ephemeral.
2007     */
2008     LOG.info("Adding backup master ZNode " + backupZNode);
2009     if (!MasterAddressTracker.setMasterAddress(zooKeeper, backupZNode,
2010         serverName, infoPort)) {
2011       LOG.warn("Failed create of " + backupZNode + " by " + serverName);
2012     }
2013 
2014     activeMasterManager.setInfoPort(infoPort);
2015     // Start a thread to try to become the active master, so we won't block here
2016     Threads.setDaemonThreadRunning(new Thread(new Runnable() {
2017       @Override
2018       public void run() {
2019         int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT,
2020           HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
2021         // If we're a backup master, stall until a primary to writes his address
2022         if (conf.getBoolean(HConstants.MASTER_TYPE_BACKUP,
2023           HConstants.DEFAULT_MASTER_TYPE_BACKUP)) {
2024           LOG.debug("HMaster started in backup mode. "
2025             + "Stalling until master znode is written.");
2026           // This will only be a minute or so while the cluster starts up,
2027           // so don't worry about setting watches on the parent znode
2028           while (!activeMasterManager.hasActiveMaster()) {
2029             LOG.debug("Waiting for master address ZNode to be written "
2030               + "(Also watching cluster state node)");
2031             Threads.sleep(timeout);
2032           }
2033         }
2034         MonitoredTask status = TaskMonitor.get().createStatus("Master startup");
2035         status.setDescription("Master startup");
2036         try {
2037           if (activeMasterManager.blockUntilBecomingActiveMaster(timeout, status)) {
2038             finishActiveMasterInitialization(status);
2039           }
2040         } catch (Throwable t) {
2041           status.setStatus("Failed to become active: " + t.getMessage());
2042           LOG.fatal("Failed to become active master", t);
2043           // HBASE-5680: Likely hadoop23 vs hadoop 20.x/1.x incompatibility
2044           if (t instanceof NoClassDefFoundError &&
2045               t.getMessage()
2046                   .contains("org/apache/hadoop/hdfs/protocol/HdfsConstants$SafeModeAction")) {
2047             // improved error message for this special case
2048             abort("HBase is having a problem with its Hadoop jars.  You may need to "
2049               + "recompile HBase against Hadoop version "
2050               +  org.apache.hadoop.util.VersionInfo.getVersion()
2051               + " or change your hadoop jars to start properly", t);
2052           } else {
2053             abort("Unhandled exception. Starting shutdown.", t);
2054           }
2055         } finally {
2056           status.cleanup();
2057         }
2058       }
2059     }, getServerName().toShortString() + ".activeMasterManager"));
2060   }
2061 
2062   private void checkCompression(final HTableDescriptor htd)
2063   throws IOException {
2064     if (!this.masterCheckCompression) return;
2065     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
2066       checkCompression(hcd);
2067     }
2068   }
2069 
2070   private void checkCompression(final HColumnDescriptor hcd)
2071   throws IOException {
2072     if (!this.masterCheckCompression) return;
2073     CompressionTest.testCompression(hcd.getCompression());
2074     CompressionTest.testCompression(hcd.getCompactionCompression());
2075   }
2076 
2077   private void checkEncryption(final Configuration conf, final HTableDescriptor htd)
2078   throws IOException {
2079     if (!this.masterCheckEncryption) return;
2080     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
2081       checkEncryption(conf, hcd);
2082     }
2083   }
2084 
2085   private void checkEncryption(final Configuration conf, final HColumnDescriptor hcd)
2086   throws IOException {
2087     if (!this.masterCheckEncryption) return;
2088     EncryptionTest.testEncryption(conf, hcd.getEncryptionType(), hcd.getEncryptionKey());
2089   }
2090 
2091   private void checkClassLoading(final Configuration conf, final HTableDescriptor htd)
2092   throws IOException {
2093     RegionSplitPolicy.getSplitPolicyClass(htd, conf);
2094     RegionCoprocessorHost.testTableCoprocessorAttrs(conf, htd);
2095   }
2096 
2097   private static boolean isCatalogTable(final TableName tableName) {
2098     return tableName.equals(TableName.META_TABLE_NAME);
2099   }
2100 
2101   @Override
2102   public long deleteTable(
2103       final TableName tableName,
2104       final long nonceGroup,
2105       final long nonce) throws IOException {
2106     checkInitialized();
2107 
2108     if (isMasterProcedureExecutorEnabled()) {
2109       return MasterProcedureUtil.submitProcedure(
2110         new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2111         @Override
2112         protected void run() throws IOException {
2113           getMaster().getMasterCoprocessorHost().preDeleteTable(tableName);
2114 
2115           LOG.info(getClientIdAuditPrefix() + " delete " + tableName);
2116 
2117           // TODO: We can handle/merge duplicate request
2118           ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
2119           submitProcedure(new DeleteTableProcedure(procedureExecutor.getEnvironment(),
2120               tableName, latch));
2121           latch.await();
2122 
2123           getMaster().getMasterCoprocessorHost().postDeleteTable(tableName);
2124         }
2125 
2126         @Override
2127         protected String getDescription() {
2128           return "DeleteTableProcedure";
2129         }
2130       });
2131     } else {
2132       if (cpHost != null) {
2133         cpHost.preDeleteTable(tableName);
2134       }
2135       LOG.info(getClientIdAuditPrefix() + " delete " + tableName);
2136       this.service.submit(new DeleteTableHandler(tableName, this, this).prepare());
2137       if (cpHost != null) {
2138         cpHost.postDeleteTable(tableName);
2139       }
2140       return -1;
2141     }
2142   }
2143 
2144   @Override
2145   public void truncateTable(
2146       final TableName tableName,
2147       final boolean preserveSplits,
2148       final long nonceGroup,
2149       final long nonce) throws IOException {
2150     checkInitialized();
2151 
2152     if (isMasterProcedureExecutorEnabled()) {
2153       MasterProcedureUtil.submitProcedure(
2154         new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2155         @Override
2156         protected void run() throws IOException {
2157           getMaster().getMasterCoprocessorHost().preTruncateTable(tableName);
2158 
2159           LOG.info(getClientIdAuditPrefix() + " truncate " + tableName);
2160 
2161           long procId = submitProcedure(new TruncateTableProcedure(procedureExecutor.getEnvironment(),
2162               tableName, preserveSplits));
2163           ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
2164 
2165           getMaster().getMasterCoprocessorHost().postTruncateTable(tableName);
2166         }
2167 
2168         @Override
2169         protected String getDescription() {
2170           return "TruncateTableProcedure";
2171         }
2172       });
2173     } else {
2174       if (cpHost != null) {
2175         cpHost.preTruncateTable(tableName);
2176       }
2177       LOG.info(getClientIdAuditPrefix() + " truncate " + tableName);
2178       TruncateTableHandler handler =
2179           new TruncateTableHandler(tableName, this, this, preserveSplits);
2180       handler.prepare();
2181       handler.process();
2182       if (cpHost != null) {
2183         cpHost.postTruncateTable(tableName);
2184       }
2185     }
2186   }
2187 
2188   @Override
2189   public void addColumn(
2190       final TableName tableName,
2191       final HColumnDescriptor columnDescriptor,
2192       final long nonceGroup,
2193       final long nonce)
2194       throws IOException {
2195     checkInitialized();
2196     checkCompression(columnDescriptor);
2197     checkEncryption(conf, columnDescriptor);
2198 
2199     if (isMasterProcedureExecutorEnabled()) {
2200       MasterProcedureUtil.submitProcedure(
2201         new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2202         @Override
2203         protected void run() throws IOException {
2204           if (getMaster().getMasterCoprocessorHost().preAddColumn(tableName, columnDescriptor)) {
2205             return;
2206           }
2207 
2208           LOG.info(getClientIdAuditPrefix() + " add " + columnDescriptor);
2209           // Execute the operation synchronously, wait for the operation to complete before
2210           // continuing
2211           long procId = submitProcedure(new AddColumnFamilyProcedure(
2212             procedureExecutor.getEnvironment(), tableName, columnDescriptor));
2213           ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
2214 
2215           getMaster().getMasterCoprocessorHost().postAddColumn(tableName, columnDescriptor);
2216         }
2217         @Override
2218         protected String getDescription() {
2219           return "AddColumnFamilyProcedure";
2220         }
2221       });
2222     } else {
2223       if (cpHost != null) {
2224         if (cpHost.preAddColumn(tableName, columnDescriptor)) {
2225           return;
2226         }
2227       }
2228       LOG.info(getClientIdAuditPrefix() + " add " + columnDescriptor);
2229       new TableAddFamilyHandler(tableName, columnDescriptor, this, this).prepare().process();
2230       if (cpHost != null) {
2231         cpHost.postAddColumn(tableName, columnDescriptor);
2232       }
2233     }
2234   }
2235 
2236   @Override
2237   public void modifyColumn(
2238       final TableName tableName,
2239       final HColumnDescriptor descriptor,
2240       final long nonceGroup,
2241       final long nonce)
2242       throws IOException {
2243     checkInitialized();
2244     checkCompression(descriptor);
2245     checkEncryption(conf, descriptor);
2246 
2247     if (isMasterProcedureExecutorEnabled()) {
2248       MasterProcedureUtil.submitProcedure(
2249         new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2250         @Override
2251         protected void run() throws IOException {
2252           if (getMaster().getMasterCoprocessorHost().preModifyColumn(tableName, descriptor)) {
2253             return;
2254           }
2255 
2256           LOG.info(getClientIdAuditPrefix() + " modify " + descriptor);
2257 
2258           // Execute the operation synchronously - wait for the operation to complete before
2259           // continuing.
2260           long procId = submitProcedure(new ModifyColumnFamilyProcedure(
2261             procedureExecutor.getEnvironment(), tableName, descriptor));
2262           ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
2263 
2264           getMaster().getMasterCoprocessorHost().postModifyColumn(tableName, descriptor);
2265         }
2266 
2267         @Override
2268         protected String getDescription() {
2269           return "ModifyColumnFamilyProcedure";
2270         }
2271       });
2272     } else {
2273       if (cpHost != null) {
2274         if (cpHost.preModifyColumn(tableName, descriptor)) {
2275           return;
2276         }
2277       }
2278       LOG.info(getClientIdAuditPrefix() + " modify " + descriptor);
2279       new TableModifyFamilyHandler(tableName, descriptor, this, this).prepare().process();
2280       if (cpHost != null) {
2281         cpHost.postModifyColumn(tableName, descriptor);
2282       }
2283     }
2284   }
2285 
2286   @Override
2287   public void deleteColumn(
2288       final TableName tableName,
2289       final byte[] columnName,
2290       final long nonceGroup,
2291       final long nonce)
2292       throws IOException {
2293     checkInitialized();
2294 
2295     if (isMasterProcedureExecutorEnabled()) {
2296       MasterProcedureUtil.submitProcedure(
2297         new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2298         @Override
2299         protected void run() throws IOException {
2300           if (getMaster().getMasterCoprocessorHost().preDeleteColumn(tableName, columnName)) {
2301             return;
2302           }
2303 
2304           LOG.info(getClientIdAuditPrefix() + " delete " + Bytes.toString(columnName));
2305 
2306           // Execute the operation synchronously - wait for the operation to complete before
2307           // continuing.
2308           long procId = submitProcedure(new DeleteColumnFamilyProcedure(
2309             procedureExecutor.getEnvironment(), tableName, columnName));
2310           ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
2311 
2312           getMaster().getMasterCoprocessorHost().postDeleteColumn(tableName, columnName);
2313         }
2314 
2315         @Override
2316         protected String getDescription() {
2317           return "DeleteColumnFamilyProcedure";
2318         }
2319       });
2320     } else {
2321       if (cpHost != null) {
2322         if (cpHost.preDeleteColumn(tableName, columnName)) {
2323           return;
2324         }
2325       }
2326       LOG.info(getClientIdAuditPrefix() + " delete " + Bytes.toString(columnName));
2327       new TableDeleteFamilyHandler(tableName, columnName, this, this).prepare().process();
2328       if (cpHost != null) {
2329         cpHost.postDeleteColumn(tableName, columnName);
2330       }
2331     }
2332   }
2333 
2334   @Override
2335   public long enableTable(final TableName tableName, final long nonceGroup, final long nonce)
2336       throws IOException {
2337     checkInitialized();
2338     if (cpHost != null) {
2339       cpHost.preEnableTable(tableName);
2340     }
2341 
2342     // Normally, it would make sense for this authorization check to exist inside AccessController,
2343     // but because the authorization check is done based on internal state (rather than explicit
2344     // permissions) we'll do the check here instead of in the coprocessor.
2345     MasterQuotaManager quotaManager = getMasterQuotaManager();
2346     if (quotaManager != null) {
2347       if (quotaManager.isQuotaInitialized()) {
2348         Quotas quotaForTable = QuotaUtil.getTableQuota(getConnection(), tableName);
2349         if (quotaForTable != null && quotaForTable.hasSpace()) {
2350           SpaceViolationPolicy policy = quotaForTable.getSpace().getViolationPolicy();
2351           if (SpaceViolationPolicy.DISABLE == policy) {
2352             throw new AccessDeniedException("Enabling the table '" + tableName
2353                 + "' is disallowed due to a violated space quota.");
2354           }
2355         }
2356       } else if (LOG.isTraceEnabled()) {
2357         LOG.trace("Unable to check for space quotas as the manager is not enabled");
2358       }
2359     }
2360 
2361     LOG.info(getClientIdAuditPrefix() + " enable " + tableName);
2362 
2363     if (isMasterProcedureExecutorEnabled()) {
2364       return MasterProcedureUtil.submitProcedure(
2365         new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2366         @Override
2367         protected void run() throws IOException {
2368           getMaster().getMasterCoprocessorHost().preEnableTable(tableName);
2369 
2370           LOG.info(getClientIdAuditPrefix() + " enable " + tableName);
2371 
2372           // Execute the operation asynchronously - client will check the progress of the operation
2373           // In case the request is from a <1.1 client before returning,
2374           // we want to make sure that the table is prepared to be
2375           // enabled (the table is locked and the table state is set).
2376           // Note: if the procedure throws exception, we will catch it and rethrow.
2377           final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch();
2378           submitProcedure(new EnableTableProcedure(
2379             procedureExecutor.getEnvironment(), tableName, false, prepareLatch));
2380           // Before returning to client, we want to make sure that the table is prepared to be
2381           // enabled (the table is locked and the table state is set).
2382           //
2383           // Note: if the procedure throws exception, we will catch it and rethrow.
2384           prepareLatch.await();
2385 
2386           getMaster().getMasterCoprocessorHost().postEnableTable(tableName);
2387         }
2388 
2389         @Override
2390         protected String getDescription() {
2391           return "EnableTableProcedure";
2392         }
2393       });
2394     } else {
2395       if (cpHost != null) {
2396         cpHost.preEnableTable(tableName);
2397       }
2398       LOG.info(getClientIdAuditPrefix() + " enable " + tableName);
2399       this.service.submit(new EnableTableHandler(this, tableName,
2400         assignmentManager, tableLockManager, false).prepare());
2401       if (cpHost != null) {
2402         cpHost.postEnableTable(tableName);
2403       }
2404       return -1;
2405     }
2406   }
2407 
2408   @Override
2409   public long disableTable(final TableName tableName, final long nonceGroup, final long nonce)
2410       throws IOException {
2411     checkInitialized();
2412 
2413     if (isMasterProcedureExecutorEnabled()) {
2414       return MasterProcedureUtil.submitProcedure(
2415         new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2416         @Override
2417         protected void run() throws IOException {
2418           getMaster().getMasterCoprocessorHost().preDisableTable(tableName);
2419 
2420           LOG.info(getClientIdAuditPrefix() + " disable " + tableName);
2421 
2422           // Execute the operation asynchronously - client will check the progress of the operation
2423           // In case the request is from a <1.1 client before returning,
2424           // we want to make sure that the table is prepared to be
2425           // enabled (the table is locked and the table state is set).
2426           // Note: if the procedure throws exception, we will catch it and rethrow.
2427           final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch();
2428           submitProcedure(new DisableTableProcedure(procedureExecutor.getEnvironment(),
2429             tableName, false, prepareLatch));
2430           prepareLatch.await();
2431 
2432           getMaster().getMasterCoprocessorHost().postDisableTable(tableName);
2433         }
2434 
2435         @Override
2436         protected String getDescription() {
2437           return "DisableTableProcedure";
2438         }
2439       });
2440     } else {
2441       if (cpHost != null) {
2442         cpHost.preDisableTable(tableName);
2443       }
2444       LOG.info(getClientIdAuditPrefix() + " disable " + tableName);
2445       this.service.submit(new DisableTableHandler(this, tableName,
2446         assignmentManager, tableLockManager, false).prepare());
2447       if (cpHost != null) {
2448         cpHost.postDisableTable(tableName);
2449       }
2450       return -1;
2451     }
2452   }
2453 
2454   /**
2455    * Return the region and current deployment for the region containing
2456    * the given row. If the region cannot be found, returns null. If it
2457    * is found, but not currently deployed, the second element of the pair
2458    * may be null.
2459    */
2460   @VisibleForTesting // Used by TestMaster.
2461   Pair<HRegionInfo, ServerName> getTableRegionForRow(
2462       final TableName tableName, final byte [] rowKey)
2463   throws IOException {
2464     final AtomicReference<Pair<HRegionInfo, ServerName>> result =
2465       new AtomicReference<Pair<HRegionInfo, ServerName>>(null);
2466 
2467     MetaScannerVisitor visitor =
2468       new MetaScannerVisitorBase() {
2469         @Override
2470         public boolean processRow(Result data) throws IOException {
2471           if (data == null || data.size() <= 0) {
2472             return true;
2473           }
2474           Pair<HRegionInfo, ServerName> pair = HRegionInfo.getHRegionInfoAndServerName(data);
2475           if (pair == null) {
2476             return false;
2477           }
2478           if (!pair.getFirst().getTable().equals(tableName)) {
2479             return false;
2480           }
2481           result.set(pair);
2482           return true;
2483         }
2484     };
2485 
2486     MetaScanner.metaScan(clusterConnection, visitor, tableName, rowKey, 1);
2487     return result.get();
2488   }
2489 
2490   @Override
2491   public void modifyTable(
2492       final TableName tableName,
2493       final HTableDescriptor descriptor,
2494       final long nonceGroup,
2495       final long nonce)
2496       throws IOException {
2497     checkInitialized();
2498     sanityCheckTableDescriptor(descriptor);
2499 
2500     if (isMasterProcedureExecutorEnabled()) {
2501       MasterProcedureUtil.submitProcedure(
2502         new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2503         @Override
2504         protected void run() throws IOException {
2505           getMaster().getMasterCoprocessorHost().preModifyTable(tableName, descriptor);
2506           LOG.info(getClientIdAuditPrefix() + " modify " + tableName);
2507 
2508           // Execute the operation synchronously - wait for the operation completes before continuing.
2509           long procId = submitProcedure(new ModifyTableProcedure(
2510             procedureExecutor.getEnvironment(), descriptor));
2511           ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
2512 
2513           getMaster().getMasterCoprocessorHost().postModifyTable(tableName, descriptor);
2514         }
2515 
2516         @Override
2517         protected String getDescription() {
2518           return "ModifyTableProcedure";
2519         }
2520       });
2521     } else {
2522       if (cpHost != null) {
2523         cpHost.preModifyTable(tableName, descriptor);
2524       }
2525       LOG.info(getClientIdAuditPrefix() + " modify " + tableName);
2526       new ModifyTableHandler(tableName, descriptor, this, this).prepare().process();
2527       if (cpHost != null) {
2528         cpHost.postModifyTable(tableName, descriptor);
2529       }
2530     }
2531   }
2532 
2533   @Override
2534   public void checkTableModifiable(final TableName tableName)
2535       throws IOException, TableNotFoundException, TableNotDisabledException {
2536     if (isCatalogTable(tableName)) {
2537       throw new IOException("Can't modify catalog tables");
2538     }
2539     if (!MetaTableAccessor.tableExists(getConnection(), tableName)) {
2540       throw new TableNotFoundException(tableName);
2541     }
2542     if (!getAssignmentManager().getTableStateManager().
2543         isTableState(tableName, ZooKeeperProtos.Table.State.DISABLED)) {
2544       throw new TableNotDisabledException(tableName);
2545     }
2546   }
2547 
2548   /**
2549    * @return cluster status
2550    */
2551   public ClusterStatus getClusterStatus() throws InterruptedIOException {
2552     // Build Set of backup masters from ZK nodes
2553     List<String> backupMasterStrings;
2554     try {
2555       backupMasterStrings = ZKUtil.listChildrenNoWatch(this.zooKeeper,
2556         this.zooKeeper.backupMasterAddressesZNode);
2557     } catch (KeeperException e) {
2558       LOG.warn(this.zooKeeper.prefix("Unable to list backup servers"), e);
2559       backupMasterStrings = null;
2560     }
2561 
2562     List<ServerName> backupMasters = null;
2563     if (backupMasterStrings != null && !backupMasterStrings.isEmpty()) {
2564       backupMasters = new ArrayList<ServerName>(backupMasterStrings.size());
2565       for (String s: backupMasterStrings) {
2566         try {
2567           byte [] bytes;
2568           try {
2569             bytes = ZKUtil.getData(this.zooKeeper, ZKUtil.joinZNode(
2570                 this.zooKeeper.backupMasterAddressesZNode, s));
2571           } catch (InterruptedException e) {
2572             throw new InterruptedIOException();
2573           }
2574           if (bytes != null) {
2575             ServerName sn;
2576             try {
2577               sn = ServerName.parseFrom(bytes);
2578             } catch (DeserializationException e) {
2579               LOG.warn("Failed parse, skipping registering backup server", e);
2580               continue;
2581             }
2582             backupMasters.add(sn);
2583           }
2584         } catch (KeeperException e) {
2585           LOG.warn(this.zooKeeper.prefix("Unable to get information about " +
2586                    "backup servers"), e);
2587         }
2588       }
2589       Collections.sort(backupMasters, new Comparator<ServerName>() {
2590         @Override
2591         public int compare(ServerName s1, ServerName s2) {
2592           return s1.getServerName().compareTo(s2.getServerName());
2593         }});
2594     }
2595 
2596     String clusterId = fileSystemManager != null ?
2597       fileSystemManager.getClusterId().toString() : null;
2598     Map<String, RegionState> regionsInTransition = assignmentManager != null ?
2599       assignmentManager.getRegionStates().getRegionsInTransition() : null;
2600     String[] coprocessors = cpHost != null ? getMasterCoprocessors() : null;
2601     boolean balancerOn = loadBalancerTracker != null ?
2602       loadBalancerTracker.isBalancerOn() : false;
2603     Map<ServerName, ServerLoad> onlineServers = null;
2604     Set<ServerName> deadServers = null;
2605     if (serverManager != null) {
2606       deadServers = serverManager.getDeadServers().copyServerNames();
2607       onlineServers = serverManager.getOnlineServers();
2608     }
2609     return new ClusterStatus(VersionInfo.getVersion(), clusterId,
2610       onlineServers, deadServers, serverName, backupMasters,
2611       regionsInTransition, coprocessors, balancerOn);
2612   }
2613 
2614   /**
2615    * The set of loaded coprocessors is stored in a static set. Since it's
2616    * statically allocated, it does not require that HMaster's cpHost be
2617    * initialized prior to accessing it.
2618    * @return a String representation of the set of names of the loaded
2619    * coprocessors.
2620    */
2621   public static String getLoadedCoprocessors() {
2622     return CoprocessorHost.getLoadedCoprocessors().toString();
2623   }
2624 
2625   /**
2626    * @return timestamp in millis when HMaster was started.
2627    */
2628   public long getMasterStartTime() {
2629     return startcode;
2630   }
2631 
2632   /**
2633    * @return timestamp in millis when HMaster became the active master.
2634    */
2635   public long getMasterActiveTime() {
2636     return masterActiveTime;
2637   }
2638 
2639   public int getNumWALFiles() {
2640     return procedureStore != null ? procedureStore.getActiveLogs().size() : 0;
2641   }
2642 
2643   public WALProcedureStore getWalProcedureStore() {
2644     return procedureStore;
2645   }
2646 
2647   public int getRegionServerInfoPort(final ServerName sn) {
2648     RegionServerInfo info = this.regionServerTracker.getRegionServerInfo(sn);
2649     if (info == null || info.getInfoPort() == 0) {
2650       return conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
2651         HConstants.DEFAULT_REGIONSERVER_INFOPORT);
2652     }
2653     return info.getInfoPort();
2654   }
2655 
2656   public String getRegionServerVersion(final ServerName sn) {
2657     RegionServerInfo info = this.regionServerTracker.getRegionServerInfo(sn);
2658     if (info != null && info.hasVersionInfo()) {
2659       return info.getVersionInfo().getVersion();
2660     }
2661     return "0.0.0"; //Lowest version to prevent move system region to unknown version RS.
2662   }
2663 
2664   @Override
2665   public void checkIfShouldMoveSystemRegionAsync() {
2666     assignmentManager.checkIfShouldMoveSystemRegionAsync();
2667   }
2668 
2669   /**
2670    * @return array of coprocessor SimpleNames.
2671    */
2672   public String[] getMasterCoprocessors() {
2673     Set<String> masterCoprocessors = getMasterCoprocessorHost().getCoprocessors();
2674     return masterCoprocessors.toArray(new String[masterCoprocessors.size()]);
2675   }
2676 
2677   @Override
2678   public void abort(final String msg, final Throwable t) {
2679     if (isAborted() || isStopped()) {
2680       return;
2681     }
2682     if (cpHost != null) {
2683       // HBASE-4014: dump a list of loaded coprocessors.
2684       LOG.fatal("Master server abort: loaded coprocessors are: " +
2685           getLoadedCoprocessors());
2686     }
2687     if (t != null) LOG.fatal(msg, t);
2688     stop(msg);
2689   }
2690 
2691   @Override
2692   public ZooKeeperWatcher getZooKeeper() {
2693     return zooKeeper;
2694   }
2695 
2696   @Override
2697   public MasterCoprocessorHost getMasterCoprocessorHost() {
2698     return cpHost;
2699   }
2700 
2701   @Override
2702   public MasterQuotaManager getMasterQuotaManager() {
2703     return quotaManager;
2704   }
2705 
2706   @Override
2707   public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
2708     return procedureExecutor;
2709   }
2710 
2711   @Override
2712   public ServerName getServerName() {
2713     return this.serverName;
2714   }
2715 
2716   @Override
2717   public AssignmentManager getAssignmentManager() {
2718     return this.assignmentManager;
2719   }
2720 
2721   public MemoryBoundedLogMessageBuffer getRegionServerFatalLogBuffer() {
2722     return rsFatals;
2723   }
2724 
2725   public void shutdown() throws IOException {
2726     if (cpHost != null) {
2727       cpHost.preShutdown();
2728     }
2729 
2730     if (this.serverManager != null) {
2731       this.serverManager.shutdownCluster();
2732     }
2733     if (this.clusterStatusTracker != null){
2734       try {
2735         this.clusterStatusTracker.setClusterDown();
2736       } catch (KeeperException e) {
2737         LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e);
2738       }
2739     }
2740   }
2741 
2742   public void stopMaster() throws IOException {
2743     if (cpHost != null) {
2744       cpHost.preStopMaster();
2745     }
2746     stop("Stopped by " + Thread.currentThread().getName());
2747   }
2748 
2749   void checkServiceStarted() throws ServerNotRunningYetException {
2750     if (!serviceStarted) {
2751       throw new ServerNotRunningYetException("Server is not running yet");
2752     }
2753   }
2754 
2755   void checkInitialized() throws PleaseHoldException, ServerNotRunningYetException {
2756     checkServiceStarted();
2757     if (!this.initialized) {
2758       throw new PleaseHoldException("Master is initializing");
2759     }
2760   }
2761 
2762   void checkNamespaceManagerReady() throws IOException {
2763     checkInitialized();
2764 	if (tableNamespaceManager == null) {
2765 	  throw new IOException("Table Namespace Manager not ready yet, try again later");
2766 	} else if (!tableNamespaceManager.isTableAvailableAndInitialized()) {
2767 	  try {
2768 		// Wait some time.
2769 	    long startTime = EnvironmentEdgeManager.currentTime();
2770 	    int timeout = conf.getInt("hbase.master.namespace.wait.for.ready", 30000);
2771 	    while (!tableNamespaceManager.isTableNamespaceManagerStarted() &&
2772 	      EnvironmentEdgeManager.currentTime() - startTime < timeout) {
2773 	      Thread.sleep(100);
2774 	    }
2775 	  } catch (InterruptedException e) {
2776 	    throw (InterruptedIOException) new InterruptedIOException().initCause(e);
2777 	  }
2778 	  if (!tableNamespaceManager.isTableNamespaceManagerStarted()) {
2779 		throw new IOException("Table Namespace Manager not fully initialized, try again later");
2780 	  }
2781     }
2782   }
2783 
2784   /**
2785    * Report whether this master is currently the active master or not.
2786    * If not active master, we are parked on ZK waiting to become active.
2787    *
2788    * This method is used for testing.
2789    *
2790    * @return true if active master, false if not.
2791    */
2792   public boolean isActiveMaster() {
2793     return isActiveMaster;
2794   }
2795 
2796   /**
2797    * Report whether this master has completed with its initialization and is
2798    * ready.  If ready, the master is also the active master.  A standby master
2799    * is never ready.
2800    *
2801    * This method is used for testing.
2802    *
2803    * @return true if master is ready to go, false if not.
2804    */
2805   @Override
2806   public boolean isInitialized() {
2807     return initialized;
2808   }
2809 
2810   /**
2811    * Report whether this master has completed with its initialization and
2812    * namespace manager is ready
2813    *
2814    * This method is used for testing.
2815    *
2816    * @return true if master is fully initialized
2817    */
2818   @Override
2819   public boolean isNamespaceManagerInitialized() {
2820     return initialized & tableNamespaceManager != null
2821       && tableNamespaceManager.isTableNamespaceManagerStarted();
2822   }
2823 
2824   /**
2825    * Report whether this master is in maintenance mode.
2826    *
2827    * @return true if master is in maintenanceMode
2828    */
2829   @Override
2830   public boolean isInMaintenanceMode() {
2831     return maintenanceModeTracker.isInMaintenanceMode();
2832   }
2833 
2834   /**
2835    * ServerShutdownHandlerEnabled is set false before completing
2836    * assignMeta to prevent processing of ServerShutdownHandler.
2837    * @return true if assignMeta has completed;
2838    */
2839   @Override
2840   public boolean isServerShutdownHandlerEnabled() {
2841     return this.serverShutdownHandlerEnabled;
2842   }
2843 
2844   /**
2845    * Report whether this master has started initialization and is about to do meta region assignment
2846    * @return true if master is in initialization & about to assign hbase:meta regions
2847    */
2848   public boolean isInitializationStartsMetaRegionAssignment() {
2849     return this.initializationBeforeMetaAssignment;
2850   }
2851 
2852   public void assignRegion(HRegionInfo hri) {
2853     assignmentManager.assign(hri, true);
2854   }
2855 
2856   /**
2857    * Compute the average load across all region servers.
2858    * Currently, this uses a very naive computation - just uses the number of
2859    * regions being served, ignoring stats about number of requests.
2860    * @return the average load
2861    */
2862   public double getAverageLoad() {
2863     if (this.assignmentManager == null) {
2864       return 0;
2865     }
2866 
2867     RegionStates regionStates = this.assignmentManager.getRegionStates();
2868     if (regionStates == null) {
2869       return 0;
2870     }
2871     return regionStates.getAverageLoad();
2872   }
2873 
2874   @Override
2875   public boolean registerService(Service instance) {
2876     /*
2877      * No stacking of instances is allowed for a single service name
2878      */
2879     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
2880     String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
2881     if (coprocessorServiceHandlers.containsKey(serviceName)) {
2882       LOG.error("Coprocessor service "+serviceName+
2883           " already registered, rejecting request from "+instance
2884       );
2885       return false;
2886     }
2887 
2888     coprocessorServiceHandlers.put(serviceName, instance);
2889     if (LOG.isDebugEnabled()) {
2890       LOG.debug("Registered master coprocessor service: service="+serviceName);
2891     }
2892     return true;
2893   }
2894 
2895   /**
2896    * Utility for constructing an instance of the passed HMaster class.
2897    * @param masterClass
2898    * @param conf
2899    * @return HMaster instance.
2900    */
2901   public static HMaster constructMaster(Class<? extends HMaster> masterClass,
2902       final Configuration conf, final CoordinatedStateManager cp)  {
2903     try {
2904       Constructor<? extends HMaster> c =
2905         masterClass.getConstructor(Configuration.class, CoordinatedStateManager.class);
2906       return c.newInstance(conf, cp);
2907     } catch (InvocationTargetException ite) {
2908       Throwable target = ite.getTargetException() != null?
2909         ite.getTargetException(): ite;
2910       if (target.getCause() != null) target = target.getCause();
2911       throw new RuntimeException("Failed construction of Master: " +
2912         masterClass.toString(), target);
2913     } catch (Exception e) {
2914       throw new RuntimeException("Failed construction of Master: " +
2915         masterClass.toString() + ((e.getCause() != null)?
2916           e.getCause().getMessage(): ""), e);
2917     }
2918   }
2919 
2920   /**
2921    * @see org.apache.hadoop.hbase.master.HMasterCommandLine
2922    */
2923   public static void main(String [] args) {
2924     VersionInfo.logVersion();
2925     new HMasterCommandLine(HMaster.class).doMain(args);
2926   }
2927 
2928   public HFileCleaner getHFileCleaner() {
2929     return this.hfileCleaner;
2930   }
2931 
2932   /**
2933    * @return the underlying snapshot manager
2934    */
2935   public SnapshotManager getSnapshotManager() {
2936     return this.snapshotManager;
2937   }
2938 
2939   /**
2940    * @return the underlying MasterProcedureManagerHost
2941    */
2942   public MasterProcedureManagerHost getMasterProcedureManagerHost() {
2943     return mpmHost;
2944   }
2945 
2946   @Override
2947   public void createNamespace(NamespaceDescriptor descriptor) throws IOException {
2948     TableName.isLegalNamespaceName(Bytes.toBytes(descriptor.getName()));
2949     checkNamespaceManagerReady();
2950     if (cpHost != null) {
2951       if (cpHost.preCreateNamespace(descriptor)) {
2952         return;
2953       }
2954     }
2955     LOG.info(getClientIdAuditPrefix() + " creating " + descriptor);
2956     tableNamespaceManager.create(descriptor);
2957     if (cpHost != null) {
2958       cpHost.postCreateNamespace(descriptor);
2959     }
2960   }
2961 
2962   @Override
2963   public void modifyNamespace(NamespaceDescriptor descriptor) throws IOException {
2964     TableName.isLegalNamespaceName(Bytes.toBytes(descriptor.getName()));
2965     checkNamespaceManagerReady();
2966     if (cpHost != null) {
2967       if (cpHost.preModifyNamespace(descriptor)) {
2968         return;
2969       }
2970     }
2971     LOG.info(getClientIdAuditPrefix() + " modify " + descriptor);
2972     tableNamespaceManager.update(descriptor);
2973     if (cpHost != null) {
2974       cpHost.postModifyNamespace(descriptor);
2975     }
2976   }
2977 
2978   @Override
2979   public void deleteNamespace(String name) throws IOException {
2980     checkNamespaceManagerReady();
2981     if (cpHost != null) {
2982       if (cpHost.preDeleteNamespace(name)) {
2983         return;
2984       }
2985     }
2986     LOG.info(getClientIdAuditPrefix() + " delete " + name);
2987     tableNamespaceManager.remove(name);
2988     if (cpHost != null) {
2989       cpHost.postDeleteNamespace(name);
2990     }
2991   }
2992 
2993   /**
2994    * Ensure that the specified namespace exists, otherwise throws a NamespaceNotFoundException
2995    *
2996    * @param name the namespace to check
2997    * @throws IOException if the namespace manager is not ready yet.
2998    * @throws NamespaceNotFoundException if the namespace does not exists
2999    */
3000   private void ensureNamespaceExists(final String name)
3001       throws IOException, NamespaceNotFoundException {
3002     checkNamespaceManagerReady();
3003     NamespaceDescriptor nsd = tableNamespaceManager.get(name);
3004     if (nsd == null) {
3005       throw new NamespaceNotFoundException(name);
3006     }
3007   }
3008 
3009   @Override
3010   public NamespaceDescriptor getNamespaceDescriptor(String name) throws IOException {
3011     checkNamespaceManagerReady();
3012 
3013     if (cpHost != null) {
3014       cpHost.preGetNamespaceDescriptor(name);
3015     }
3016 
3017     NamespaceDescriptor nsd = tableNamespaceManager.get(name);
3018     if (nsd == null) {
3019       throw new NamespaceNotFoundException(name);
3020     }
3021 
3022     if (cpHost != null) {
3023       cpHost.postGetNamespaceDescriptor(nsd);
3024     }
3025 
3026     return nsd;
3027   }
3028 
3029   @Override
3030   public List<NamespaceDescriptor> listNamespaceDescriptors() throws IOException {
3031     checkNamespaceManagerReady();
3032 
3033     final List<NamespaceDescriptor> descriptors = new ArrayList<NamespaceDescriptor>();
3034     boolean bypass = false;
3035     if (cpHost != null) {
3036       bypass = cpHost.preListNamespaceDescriptors(descriptors);
3037     }
3038 
3039     if (!bypass) {
3040       descriptors.addAll(tableNamespaceManager.list());
3041 
3042       if (cpHost != null) {
3043         cpHost.postListNamespaceDescriptors(descriptors);
3044       }
3045     }
3046     return descriptors;
3047   }
3048 
3049   @Override
3050   public boolean abortProcedure(final long procId, final boolean mayInterruptIfRunning)
3051       throws IOException {
3052     if (cpHost != null) {
3053       cpHost.preAbortProcedure(this.procedureExecutor, procId);
3054     }
3055 
3056     final boolean result = this.procedureExecutor.abort(procId, mayInterruptIfRunning);
3057 
3058     if (cpHost != null) {
3059       cpHost.postAbortProcedure();
3060     }
3061 
3062     return result;
3063   }
3064 
3065   @Override
3066   public List<ProcedureInfo> listProcedures() throws IOException {
3067     if (cpHost != null) {
3068       cpHost.preListProcedures();
3069     }
3070 
3071     final List<ProcedureInfo> procInfoList = this.procedureExecutor.listProcedures();
3072 
3073     if (cpHost != null) {
3074       cpHost.postListProcedures(procInfoList);
3075     }
3076 
3077     return procInfoList;
3078   }
3079 
3080   @Override
3081   public List<HTableDescriptor> listTableDescriptorsByNamespace(String name) throws IOException {
3082     ensureNamespaceExists(name);
3083     return listTableDescriptors(name, null, null, true);
3084   }
3085 
3086   @Override
3087   public List<TableName> listTableNamesByNamespace(String name) throws IOException {
3088     ensureNamespaceExists(name);
3089     return listTableNames(name, null, true);
3090   }
3091 
3092   @Override
3093   public Pair<Long, String> backupTables(final BackupType type,
3094         List<TableName> tableList, final String targetRootDir, final int workers,
3095         final long bandwidth) throws IOException {
3096     long procId;
3097     String backupId = BackupRestoreConstants.BACKUPID_PREFIX + 
3098         EnvironmentEdgeManager.currentTime();
3099     if (type == BackupType.INCREMENTAL) {
3100       Set<TableName> incrTableSet = null;
3101       try (BackupSystemTable table = new BackupSystemTable(getConnection())) {
3102         incrTableSet = table.getIncrementalBackupTableSet(targetRootDir);
3103       }
3104          
3105       if (incrTableSet.isEmpty()) {
3106         LOG.warn("Incremental backup table set contains no table.\n"
3107             + "Use 'backup create full' or 'backup stop' to \n "
3108             + "change the tables covered by incremental backup.");
3109         throw new DoNotRetryIOException("No table covered by incremental backup.");
3110       }
3111 
3112       LOG.info("Incremental backup for the following table set: " + incrTableSet);
3113       tableList = Lists.newArrayList(incrTableSet);
3114     }
3115     if (tableList != null && !tableList.isEmpty()) {
3116       for (TableName table : tableList) {
3117         String targetTableBackupDir =
3118             HBackupFileSystem.getTableBackupDir(targetRootDir, backupId, table);
3119         Path targetTableBackupDirPath = new Path(targetTableBackupDir);
3120         FileSystem outputFs = FileSystem.get(targetTableBackupDirPath.toUri(), conf);
3121         if (outputFs.exists(targetTableBackupDirPath)) {
3122           throw new DoNotRetryIOException("Target backup directory " + targetTableBackupDir
3123             + " exists already.");
3124         }
3125       }
3126       ArrayList<TableName> nonExistingTableList = null;
3127       for (TableName tableName : tableList) {
3128         if (!MetaTableAccessor.tableExists(getConnection(), tableName)) {
3129           if (nonExistingTableList == null) {
3130             nonExistingTableList = new ArrayList<>();
3131           }
3132           nonExistingTableList.add(tableName);
3133         }
3134       }
3135       if (nonExistingTableList != null) {
3136         if (type == BackupType.INCREMENTAL ) {
3137           LOG.warn("Incremental backup table set contains non-exising table: "
3138               + nonExistingTableList);
3139           // Update incremental backup set 
3140           tableList = excludeNonExistingTables(tableList, nonExistingTableList);
3141         } else {
3142           // Throw exception only in full mode - we try to backup non-existing table
3143           throw new DoNotRetryIOException("Non-existing tables found in the table list: "
3144               + nonExistingTableList);
3145         }
3146       }
3147     }
3148     if (type == BackupType.FULL) {
3149       procId = this.procedureExecutor.submitProcedure(
3150         new FullTableBackupProcedure(procedureExecutor.getEnvironment(), backupId,
3151           tableList, targetRootDir, workers, bandwidth));
3152     } else {
3153       procId = this.procedureExecutor.submitProcedure(
3154         new IncrementalTableBackupProcedure(procedureExecutor.getEnvironment(), backupId,
3155           tableList, targetRootDir, workers, bandwidth));
3156     }
3157     return new Pair<>(procId, backupId);
3158   }
3159 
3160   private List<TableName> excludeNonExistingTables(List<TableName> tableList,
3161       List<TableName> nonExistingTableList) {
3162     
3163     for(TableName table: nonExistingTableList) {
3164       tableList.remove(table);
3165     }
3166     return tableList;
3167   }
3168 
3169   /**
3170    * Returns the list of table descriptors that match the specified request
3171    *
3172    * @param namespace the namespace to query, or null if querying for all
3173    * @param regex The regular expression to match against, or null if querying for all
3174    * @param tableNameList the list of table names, or null if querying for all
3175    * @param includeSysTables False to match only against userspace tables
3176    * @return the list of table descriptors
3177    */
3178   public List<HTableDescriptor> listTableDescriptors(final String namespace, final String regex,
3179       final List<TableName> tableNameList, final boolean includeSysTables)
3180       throws IOException {
3181     final List<HTableDescriptor> descriptors = new ArrayList<HTableDescriptor>();
3182 
3183     boolean bypass = false;
3184     if (cpHost != null) {
3185       bypass = cpHost.preGetTableDescriptors(tableNameList, descriptors);
3186       // method required for AccessController.
3187       bypass |= cpHost.preGetTableDescriptors(tableNameList, descriptors, regex);
3188     }
3189 
3190     if (!bypass) {
3191       if (tableNameList == null || tableNameList.size() == 0) {
3192         // request for all TableDescriptors
3193         Collection<HTableDescriptor> htds;
3194         if (namespace != null && namespace.length() > 0) {
3195           htds = tableDescriptors.getByNamespace(namespace).values();
3196         } else {
3197           htds = tableDescriptors.getAll().values();
3198         }
3199 
3200         for (HTableDescriptor desc: htds) {
3201           if (includeSysTables || !desc.getTableName().isSystemTable()) {
3202             descriptors.add(desc);
3203           }
3204         }
3205       } else {
3206         for (TableName s: tableNameList) {
3207           HTableDescriptor desc = tableDescriptors.get(s);
3208           if (desc != null) {
3209             descriptors.add(desc);
3210           }
3211         }
3212         if (tableNameList.size() == 1) { // For debugging
3213           LOG.info(getClientIdAuditPrefix() + " List Table Descriptor for the " + tableNameList.get(0)
3214             + " table " + ((descriptors.size() == 1) ? "succeeds" : "fails"));
3215         }
3216       }
3217 
3218       // Retains only those matched by regular expression.
3219       if (regex != null) {
3220         filterTablesByRegex(descriptors, Pattern.compile(regex));
3221       }
3222 
3223       if (cpHost != null) {
3224         cpHost.postGetTableDescriptors(descriptors);
3225         // method required for AccessController.
3226         cpHost.postGetTableDescriptors(tableNameList, descriptors, regex);
3227       }
3228     }
3229     return descriptors;
3230   }
3231 
3232   /**
3233    * Returns the list of table names that match the specified request
3234    * @param regex The regular expression to match against, or null if querying for all
3235    * @param namespace the namespace to query, or null if querying for all
3236    * @param includeSysTables False to match only against userspace tables
3237    * @return the list of table names
3238    */
3239   public List<TableName> listTableNames(final String namespace, final String regex,
3240       final boolean includeSysTables) throws IOException {
3241     final List<HTableDescriptor> descriptors = new ArrayList<HTableDescriptor>();
3242 
3243     boolean bypass = false;
3244     if (cpHost != null) {
3245       bypass = cpHost.preGetTableNames(descriptors, regex);
3246     }
3247 
3248     if (!bypass) {
3249       // get all descriptors
3250       Collection<HTableDescriptor> htds;
3251       if (namespace != null && namespace.length() > 0) {
3252         htds = tableDescriptors.getByNamespace(namespace).values();
3253       } else {
3254         htds = tableDescriptors.getAll().values();
3255       }
3256 
3257       for (HTableDescriptor htd: htds) {
3258         if (includeSysTables || !htd.getTableName().isSystemTable()) {
3259           descriptors.add(htd);
3260         }
3261       }
3262 
3263       // Retains only those matched by regular expression.
3264       if (regex != null) {
3265         filterTablesByRegex(descriptors, Pattern.compile(regex));
3266       }
3267 
3268       if (cpHost != null) {
3269         cpHost.postGetTableNames(descriptors, regex);
3270       }
3271     }
3272 
3273     List<TableName> result = new ArrayList<TableName>(descriptors.size());
3274     for (HTableDescriptor htd: descriptors) {
3275       result.add(htd.getTableName());
3276     }
3277     return result;
3278   }
3279 
3280 
3281   /**
3282    * Removes the table descriptors that don't match the pattern.
3283    * @param descriptors list of table descriptors to filter
3284    * @param pattern the regex to use
3285    */
3286   private static void filterTablesByRegex(final Collection<HTableDescriptor> descriptors,
3287       final Pattern pattern) {
3288     final String defaultNS = NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR;
3289     Iterator<HTableDescriptor> itr = descriptors.iterator();
3290     while (itr.hasNext()) {
3291       HTableDescriptor htd = itr.next();
3292       String tableName = htd.getTableName().getNameAsString();
3293       boolean matched = pattern.matcher(tableName).matches();
3294       if (!matched && htd.getTableName().getNamespaceAsString().equals(defaultNS)) {
3295         matched = pattern.matcher(defaultNS + TableName.NAMESPACE_DELIM + tableName).matches();
3296       }
3297       if (!matched) {
3298         itr.remove();
3299       }
3300     }
3301   }
3302 
3303   @Override
3304   public long getLastMajorCompactionTimestamp(TableName table) throws IOException {
3305     return getClusterStatus().getLastMajorCompactionTsForTable(table);
3306   }
3307 
3308   @Override
3309   public long getLastMajorCompactionTimestampForRegion(byte[] regionName) throws IOException {
3310     return getClusterStatus().getLastMajorCompactionTsForRegion(regionName);
3311   }
3312 
3313   /**
3314    * Gets the mob file compaction state for a specific table.
3315    * Whether all the mob files are selected is known during the compaction execution, but
3316    * the statistic is done just before compaction starts, it is hard to know the compaction
3317    * type at that time, so the rough statistics are chosen for the mob file compaction. Only two
3318    * compaction states are available, CompactionState.MAJOR_AND_MINOR and CompactionState.NONE.
3319    * @param tableName The current table name.
3320    * @return If a given table is in mob file compaction now.
3321    */
3322   public CompactionState getMobCompactionState(TableName tableName) {
3323     AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
3324     if (compactionsCount != null && compactionsCount.get() != 0) {
3325       return CompactionState.MAJOR_AND_MINOR;
3326     }
3327     return CompactionState.NONE;
3328   }
3329 
3330   public void reportMobCompactionStart(TableName tableName) throws IOException {
3331     IdLock.Entry lockEntry = null;
3332     try {
3333       lockEntry = mobCompactionLock.getLockEntry(tableName.hashCode());
3334       AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
3335       if (compactionsCount == null) {
3336         compactionsCount = new AtomicInteger(0);
3337         mobCompactionStates.put(tableName, compactionsCount);
3338       }
3339       compactionsCount.incrementAndGet();
3340     } finally {
3341       if (lockEntry != null) {
3342         mobCompactionLock.releaseLockEntry(lockEntry);
3343       }
3344     }
3345   }
3346 
3347   public void reportMobCompactionEnd(TableName tableName) throws IOException {
3348     IdLock.Entry lockEntry = null;
3349     try {
3350       lockEntry = mobCompactionLock.getLockEntry(tableName.hashCode());
3351       AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
3352       if (compactionsCount != null) {
3353         int count = compactionsCount.decrementAndGet();
3354         // remove the entry if the count is 0.
3355         if (count == 0) {
3356           mobCompactionStates.remove(tableName);
3357         }
3358       }
3359     } finally {
3360       if (lockEntry != null) {
3361         mobCompactionLock.releaseLockEntry(lockEntry);
3362       }
3363     }
3364   }
3365 
3366   /**
3367    * Requests mob compaction.
3368    * @param tableName The table the compact.
3369    * @param columns The compacted columns.
3370    * @param allFiles Whether add all mob files into the compaction.
3371    */
3372   public void requestMobCompaction(TableName tableName,
3373     List<HColumnDescriptor> columns, boolean allFiles) throws IOException {
3374     mobCompactThread.requestMobCompaction(conf, fs, tableName, columns,
3375       tableLockManager, allFiles);
3376   }
3377 
3378   /**
3379    * Queries the state of the {@link LoadBalancerTracker}. If the balancer is not initialized,
3380    * false is returned.
3381    *
3382    * @return The state of the load balancer, or false if the load balancer isn't defined.
3383    */
3384   public boolean isBalancerOn() {
3385     if (null == loadBalancerTracker || isInMaintenanceMode()) {
3386       return false;
3387     }
3388     return loadBalancerTracker.isBalancerOn();
3389   }
3390 
3391   /**
3392    * Queries the state of the {@link RegionNormalizerTracker}. If it's not initialized,
3393    * false is returned.
3394    */
3395   public boolean isNormalizerOn() {
3396     return (null == regionNormalizerTracker || isInMaintenanceMode()) ?
3397         false: regionNormalizerTracker.isNormalizerOn();
3398   }
3399 
3400   /**
3401    * Queries the state of the {@link SplitOrMergeTracker}. If it is not initialized,
3402    * false is returned. If switchType is illegal, false will return.
3403    * @param switchType see {@link org.apache.hadoop.hbase.client.Admin.MasterSwitchType}
3404    * @return The state of the switch
3405    */
3406   public boolean isSplitOrMergeEnabled(Admin.MasterSwitchType switchType) {
3407     if (null == splitOrMergeTracker || isInMaintenanceMode()) {
3408       return false;
3409     }
3410     return splitOrMergeTracker.isSplitOrMergeEnabled(switchType);
3411   }
3412 
3413   /**
3414    * Fetch the configured {@link LoadBalancer} class name. If none is set, a default is returned.
3415    *
3416    * @return The name of the {@link LoadBalancer} in use.
3417    */
3418   public String getLoadBalancerClassName() {
3419     return conf.get(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, LoadBalancerFactory
3420         .getDefaultLoadBalancerClass().getName());
3421   }
3422 
3423   /**
3424    * @return RegionNormalizerTracker instance
3425    */
3426   public RegionNormalizerTracker getRegionNormalizerTracker() {
3427     return regionNormalizerTracker;
3428   }
3429 
3430   public SplitOrMergeTracker getSplitOrMergeTracker() {
3431     return splitOrMergeTracker;
3432   }
3433 
3434   @Override
3435   public LoadBalancer getLoadBalancer() {
3436     return balancer;
3437   }
3438 
3439   @Override
3440   public TableStateManager getTableStateManager() {
3441     return assignmentManager.getTableStateManager();
3442   }
3443 
3444   public QuotaObserverChore getQuotaObserverChore() {
3445     return this.quotaObserverChore;
3446   }
3447 
3448   public SpaceQuotaSnapshotNotifier getSpaceQuotaSnapshotNotifier() {
3449     return this.spaceQuotaSnapshotNotifier;
3450   }
3451 }