View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.snapshot;
19  
20  import java.io.FileNotFoundException;
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.HashMap;
25  import java.util.HashSet;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Set;
30  import java.util.concurrent.ThreadPoolExecutor;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.classification.InterfaceStability;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FSDataInputStream;
38  import org.apache.hadoop.fs.FileStatus;
39  import org.apache.hadoop.fs.FileSystem;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.hbase.TableName;
42  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.HTableDescriptor;
45  import org.apache.hadoop.hbase.Stoppable;
46  import org.apache.hadoop.hbase.MetaTableAccessor;
47  import org.apache.hadoop.hbase.errorhandling.ForeignException;
48  import org.apache.hadoop.hbase.executor.ExecutorService;
49  import org.apache.hadoop.hbase.ipc.RpcServer;
50  import org.apache.hadoop.hbase.master.AssignmentManager;
51  import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
52  import org.apache.hadoop.hbase.master.MasterFileSystem;
53  import org.apache.hadoop.hbase.master.MasterServices;
54  import org.apache.hadoop.hbase.master.MetricsMaster;
55  import org.apache.hadoop.hbase.master.SnapshotSentinel;
56  import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
57  import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
58  import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
59  import org.apache.hadoop.hbase.procedure.Procedure;
60  import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
61  import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
62  import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs;
63  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
64  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
65  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
66  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type;
67  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
68  import org.apache.hadoop.hbase.quotas.QuotaExceededException;
69  import org.apache.hadoop.hbase.security.AccessDeniedException;
70  import org.apache.hadoop.hbase.security.User;
71  import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
72  import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
73  import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
74  import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
75  import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
76  import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
77  import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException;
78  import org.apache.hadoop.hbase.snapshot.SnapshotExistsException;
79  import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
80  import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
81  import org.apache.hadoop.hbase.snapshot.TablePartiallyOpenException;
82  import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
83  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
84  import org.apache.hadoop.hbase.util.FSUtils;
85  import org.apache.zookeeper.KeeperException;
86  
87  /**
88   * This class manages the procedure of taking and restoring snapshots. There is only one
89   * SnapshotManager for the master.
90   * <p>
91   * The class provides methods for monitoring in-progress snapshot actions.
92   * <p>
93   * Note: Currently there can only be one snapshot being taken at a time over the cluster. This is a
94   * simplification in the current implementation.
95   */
96  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
97  @InterfaceStability.Unstable
98  public class SnapshotManager extends MasterProcedureManager implements Stoppable {
99    private static final Log LOG = LogFactory.getLog(SnapshotManager.class);
100 
101   /** By default, check to see if the snapshot is complete every WAKE MILLIS (ms) */
102   private static final int SNAPSHOT_WAKE_MILLIS_DEFAULT = 500;
103 
104   /**
105    * Wait time before removing a finished sentinel from the in-progress map
106    *
107    * NOTE: This is used as a safety auto cleanup.
108    * The snapshot and restore handlers map entries are removed when a user asks if a snapshot or
109    * restore is completed. This operation is part of the HBaseAdmin snapshot/restore API flow.
110    * In case something fails on the client side and the snapshot/restore state is not reclaimed
111    * after a default timeout, the entry is removed from the in-progress map.
112    * At this point, if the user asks for the snapshot/restore status, the result will be
113    * snapshot done if exists or failed if it doesn't exists.
114    */
115   private static final int SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT = 60 * 1000;
116 
117   /** Enable or disable snapshot support */
118   public static final String HBASE_SNAPSHOT_ENABLED = "hbase.snapshot.enabled";
119 
120   /**
121    * Conf key for # of ms elapsed between checks for snapshot errors while waiting for
122    * completion.
123    */
124   private static final String SNAPSHOT_WAKE_MILLIS_KEY = "hbase.snapshot.master.wakeMillis";
125 
126   /** Name of the operation to use in the controller */
127   public static final String ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION = "online-snapshot";
128 
129   /** Conf key for # of threads used by the SnapshotManager thread pool */
130   private static final String SNAPSHOT_POOL_THREADS_KEY = "hbase.snapshot.master.threads";
131 
132   /** number of current operations running on the master */
133   private static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1;
134 
135   private boolean stopped;
136   private MasterServices master;  // Needed by TableEventHandlers
137   private ProcedureCoordinator coordinator;
138 
139   // Is snapshot feature enabled?
140   private boolean isSnapshotSupported = false;
141 
142   // Snapshot handlers map, with table name as key.
143   // The map is always accessed and modified under the object lock using synchronized.
144   // snapshotTable() will insert an Handler in the table.
145   // isSnapshotDone() will remove the handler requested if the operation is finished.
146   private Map<TableName, SnapshotSentinel> snapshotHandlers =
147       new HashMap<TableName, SnapshotSentinel>();
148 
149   // Restore Sentinels map, with table name as key.
150   // The map is always accessed and modified under the object lock using synchronized.
151   // restoreSnapshot()/cloneSnapshot() will insert an Handler in the table.
152   // isRestoreDone() will remove the handler requested if the operation is finished.
153   private Map<TableName, SnapshotSentinel> restoreHandlers =
154       new HashMap<TableName, SnapshotSentinel>();
155 
156   private Path rootDir;
157   private ExecutorService executorService;
158 
159   public SnapshotManager() {}
160 
161   /**
162    * Fully specify all necessary components of a snapshot manager. Exposed for testing.
163    * @param master services for the master where the manager is running
164    * @param coordinator procedure coordinator instance.  exposed for testing.
165    * @param pool HBase ExecutorServcie instance, exposed for testing.
166    */
167   public SnapshotManager(final MasterServices master, final MetricsMaster metricsMaster,
168       ProcedureCoordinator coordinator, ExecutorService pool)
169       throws IOException, UnsupportedOperationException {
170     this.master = master;
171 
172     this.rootDir = master.getMasterFileSystem().getRootDir();
173     checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
174 
175     this.coordinator = coordinator;
176     this.executorService = pool;
177     resetTempDir();
178   }
179 
180   /**
181    * Gets the list of all completed snapshots.
182    * @return list of SnapshotDescriptions
183    * @throws IOException File system exception
184    */
185   public List<SnapshotDescription> getCompletedSnapshots() throws IOException {
186     return getCompletedSnapshots(SnapshotDescriptionUtils.getSnapshotsDir(rootDir));
187   }
188 
189   /**
190    * Gets the list of all completed snapshots.
191    * @param snapshotDir snapshot directory
192    * @return list of SnapshotDescriptions
193    * @throws IOException File system exception
194    */
195   private List<SnapshotDescription> getCompletedSnapshots(Path snapshotDir) throws IOException {
196     List<SnapshotDescription> snapshotDescs = new ArrayList<SnapshotDescription>();
197     // first create the snapshot root path and check to see if it exists
198     FileSystem fs = master.getMasterFileSystem().getFileSystem();
199     if (snapshotDir == null) snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
200 
201     // if there are no snapshots, return an empty list
202     if (!fs.exists(snapshotDir)) {
203       return snapshotDescs;
204     }
205 
206     // ignore all the snapshots in progress
207     FileStatus[] snapshots = fs.listStatus(snapshotDir,
208       new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
209     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
210     // loop through all the completed snapshots
211     for (FileStatus snapshot : snapshots) {
212       Path info = new Path(snapshot.getPath(), SnapshotDescriptionUtils.SNAPSHOTINFO_FILE);
213       // if the snapshot is bad
214       if (!fs.exists(info)) {
215         LOG.error("Snapshot information for " + snapshot.getPath() + " doesn't exist");
216         continue;
217       }
218       FSDataInputStream in = null;
219       try {
220         in = fs.open(info);
221         SnapshotDescription desc = SnapshotDescription.parseFrom(in);
222         if (cpHost != null) {
223           try {
224             cpHost.preListSnapshot(desc);
225           } catch (AccessDeniedException e) {
226             LOG.warn("Current user does not have access to " + desc.getName() + " snapshot. "
227                 + "Either you should be owner of this snapshot or admin user.");
228             // Skip this and try for next snapshot
229             continue;
230           }
231         }
232         snapshotDescs.add(desc);
233 
234         // call coproc post hook
235         if (cpHost != null) {
236           cpHost.postListSnapshot(desc);
237         }
238       } catch (IOException e) {
239         LOG.warn("Found a corrupted snapshot " + snapshot.getPath(), e);
240       } finally {
241         if (in != null) {
242           in.close();
243         }
244       }
245     }
246     return snapshotDescs;
247   }
248 
249   /**
250    * Cleans up any snapshots in the snapshot/.tmp directory that were left from failed
251    * snapshot attempts.
252    *
253    * @throws IOException if we can't reach the filesystem
254    */
255   void resetTempDir() throws IOException {
256     // cleanup any existing snapshots.
257     Path tmpdir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir);
258     if (master.getMasterFileSystem().getFileSystem().exists(tmpdir)) {
259       if (!master.getMasterFileSystem().getFileSystem().delete(tmpdir, true)) {
260         LOG.warn("Couldn't delete working snapshot directory: " + tmpdir);
261       }
262     }
263   }
264 
265   /**
266    * Delete the specified snapshot
267    * @param snapshot
268    * @throws SnapshotDoesNotExistException If the specified snapshot does not exist.
269    * @throws IOException For filesystem IOExceptions
270    */
271   public void deleteSnapshot(SnapshotDescription snapshot) throws SnapshotDoesNotExistException, IOException {
272     // check to see if it is completed
273     if (!isSnapshotCompleted(snapshot)) {
274       throw new SnapshotDoesNotExistException(snapshot);
275     }
276 
277     String snapshotName = snapshot.getName();
278     // first create the snapshot description and check to see if it exists
279     FileSystem fs = master.getMasterFileSystem().getFileSystem();
280     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
281     // Get snapshot info from file system. The one passed as parameter is a "fake" snapshotInfo with
282     // just the "name" and it does not contains the "real" snapshot information
283     snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
284 
285     // call coproc pre hook
286     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
287     if (cpHost != null) {
288       cpHost.preDeleteSnapshot(snapshot);
289     }
290 
291     LOG.debug("Deleting snapshot: " + snapshotName);
292     // delete the existing snapshot
293     if (!fs.delete(snapshotDir, true)) {
294       throw new HBaseSnapshotException("Failed to delete snapshot directory: " + snapshotDir);
295     }
296 
297     // call coproc post hook
298     if (cpHost != null) {
299       cpHost.postDeleteSnapshot(snapshot);
300     }
301 
302   }
303 
304   /**
305    * Check if the specified snapshot is done
306    *
307    * @param expected
308    * @return true if snapshot is ready to be restored, false if it is still being taken.
309    * @throws IOException IOException if error from HDFS or RPC
310    * @throws UnknownSnapshotException if snapshot is invalid or does not exist.
311    */
312   public boolean isSnapshotDone(SnapshotDescription expected) throws IOException {
313     // check the request to make sure it has a snapshot
314     if (expected == null) {
315       throw new UnknownSnapshotException(
316          "No snapshot name passed in request, can't figure out which snapshot you want to check.");
317     }
318 
319     String ssString = ClientSnapshotDescriptionUtils.toString(expected);
320 
321     // check to see if the sentinel exists,
322     // and if the task is complete removes it from the in-progress snapshots map.
323     SnapshotSentinel handler = removeSentinelIfFinished(this.snapshotHandlers, expected);
324 
325     // stop tracking "abandoned" handlers
326     cleanupSentinels();
327 
328     if (handler == null) {
329       // If there's no handler in the in-progress map, it means one of the following:
330       //   - someone has already requested the snapshot state
331       //   - the requested snapshot was completed long time ago (cleanupSentinels() timeout)
332       //   - the snapshot was never requested
333       // In those cases returns to the user the "done state" if the snapshots exists on disk,
334       // otherwise raise an exception saying that the snapshot is not running and doesn't exist.
335       if (!isSnapshotCompleted(expected)) {
336         throw new UnknownSnapshotException("Snapshot " + ssString
337             + " is not currently running or one of the known completed snapshots.");
338       }
339       // was done, return true;
340       return true;
341     }
342 
343     // pass on any failure we find in the sentinel
344     try {
345       handler.rethrowExceptionIfFailed();
346     } catch (ForeignException e) {
347       // Give some procedure info on an exception.
348       String status;
349       Procedure p = coordinator.getProcedure(expected.getName());
350       if (p != null) {
351         status = p.getStatus();
352       } else {
353         status = expected.getName() + " not found in proclist " + coordinator.getProcedureNames();
354       }
355       throw new HBaseSnapshotException("Snapshot " + ssString +  " had an error.  " + status, e,
356           expected);
357     }
358 
359     // check to see if we are done
360     if (handler.isFinished()) {
361       LOG.debug("Snapshot '" + ssString + "' has completed, notifying client.");
362       return true;
363     } else if (LOG.isDebugEnabled()) {
364       LOG.debug("Snapshoting '" + ssString + "' is still in progress!");
365     }
366     return false;
367   }
368 
369   /**
370    * Check to see if there is a snapshot in progress with the same name or on the same table.
371    * Currently we have a limitation only allowing a single snapshot per table at a time. Also we
372    * don't allow snapshot with the same name.
373    * @param snapshot description of the snapshot being checked.
374    * @return <tt>true</tt> if there is a snapshot in progress with the same name or on the same
375    *         table.
376    */
377   synchronized boolean isTakingSnapshot(final SnapshotDescription snapshot) {
378     TableName snapshotTable = TableName.valueOf(snapshot.getTable());
379     if (isTakingSnapshot(snapshotTable)) {
380       return true;
381     }
382     Iterator<Map.Entry<TableName, SnapshotSentinel>> it = this.snapshotHandlers.entrySet().iterator();
383     while (it.hasNext()) {
384       Map.Entry<TableName, SnapshotSentinel> entry = it.next();
385       SnapshotSentinel sentinel = entry.getValue();
386       if (snapshot.getName().equals(sentinel.getSnapshot().getName()) && !sentinel.isFinished()) {
387         return true;
388       }
389     }
390     return false;
391   }
392 
393   /**
394    * Check to see if the specified table has a snapshot in progress.  Currently we have a
395    * limitation only allowing a single snapshot per table at a time.
396    * @param tableName name of the table being snapshotted.
397    * @return <tt>true</tt> if there is a snapshot in progress on the specified table.
398    */
399   synchronized boolean isTakingSnapshot(final TableName tableName) {
400     SnapshotSentinel handler = this.snapshotHandlers.get(tableName);
401     return handler != null && !handler.isFinished();
402   }
403 
404   /**
405    * Check to make sure that we are OK to run the passed snapshot. Checks to make sure that we
406    * aren't already running a snapshot or restore on the requested table.
407    * @param snapshot description of the snapshot we want to start
408    * @throws HBaseSnapshotException if the filesystem could not be prepared to start the snapshot
409    */
410   private synchronized void prepareToTakeSnapshot(SnapshotDescription snapshot)
411       throws HBaseSnapshotException {
412     FileSystem fs = master.getMasterFileSystem().getFileSystem();
413     Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
414     TableName snapshotTable =
415         TableName.valueOf(snapshot.getTable());
416 
417     // make sure we aren't already running a snapshot
418     if (isTakingSnapshot(snapshot)) {
419       SnapshotSentinel handler = this.snapshotHandlers.get(snapshotTable);
420       throw new SnapshotCreationException("Rejected taking "
421           + ClientSnapshotDescriptionUtils.toString(snapshot)
422           + " because we are already running another snapshot "
423           + (handler != null ? ("on the same table " +
424               ClientSnapshotDescriptionUtils.toString(handler.getSnapshot()))
425               : "with the same name"), snapshot);
426     }
427 
428     // make sure we aren't running a restore on the same table
429     if (isRestoringTable(snapshotTable)) {
430       SnapshotSentinel handler = restoreHandlers.get(snapshotTable);
431       throw new SnapshotCreationException("Rejected taking "
432           + ClientSnapshotDescriptionUtils.toString(snapshot)
433           + " because we are already have a restore in progress on the same snapshot "
434           + ClientSnapshotDescriptionUtils.toString(handler.getSnapshot()), snapshot);
435     }
436 
437     try {
438       // delete the working directory, since we aren't running the snapshot. Likely leftovers
439       // from a failed attempt.
440       fs.delete(workingDir, true);
441 
442       // recreate the working directory for the snapshot
443       if (!fs.mkdirs(workingDir)) {
444         throw new SnapshotCreationException("Couldn't create working directory (" + workingDir
445             + ") for snapshot" , snapshot);
446       }
447     } catch (HBaseSnapshotException e) {
448       throw e;
449     } catch (IOException e) {
450       throw new SnapshotCreationException(
451           "Exception while checking to see if snapshot could be started.", e, snapshot);
452     }
453   }
454 
455   /**
456    * Take a snapshot of a disabled table.
457    * @param snapshot description of the snapshot to take. Modified to be {@link Type#DISABLED}.
458    * @throws HBaseSnapshotException if the snapshot could not be started
459    */
460   private synchronized void snapshotDisabledTable(SnapshotDescription snapshot)
461       throws HBaseSnapshotException {
462     // setup the snapshot
463     prepareToTakeSnapshot(snapshot);
464 
465     // set the snapshot to be a disabled snapshot, since the client doesn't know about that
466     snapshot = snapshot.toBuilder().setType(Type.DISABLED).build();
467 
468     // Take the snapshot of the disabled table
469     DisabledTableSnapshotHandler handler =
470         new DisabledTableSnapshotHandler(snapshot, master);
471     snapshotTable(snapshot, handler);
472   }
473 
474   /**
475    * Take a snapshot of an enabled table.
476    * @param snapshot description of the snapshot to take.
477    * @throws HBaseSnapshotException if the snapshot could not be started
478    */
479   private synchronized void snapshotEnabledTable(SnapshotDescription snapshot)
480       throws HBaseSnapshotException {
481     // setup the snapshot
482     prepareToTakeSnapshot(snapshot);
483 
484     // Take the snapshot of the enabled table
485     EnabledTableSnapshotHandler handler =
486         new EnabledTableSnapshotHandler(snapshot, master, this);
487     snapshotTable(snapshot, handler);
488   }
489 
490   /**
491    * Take a snapshot using the specified handler.
492    * On failure the snapshot temporary working directory is removed.
493    * NOTE: prepareToTakeSnapshot() called before this one takes care of the rejecting the
494    *       snapshot request if the table is busy with another snapshot/restore operation.
495    * @param snapshot the snapshot description
496    * @param handler the snapshot handler
497    */
498   private synchronized void snapshotTable(SnapshotDescription snapshot,
499       final TakeSnapshotHandler handler) throws HBaseSnapshotException {
500     try {
501       handler.prepare();
502       this.executorService.submit(handler);
503       this.snapshotHandlers.put(TableName.valueOf(snapshot.getTable()), handler);
504     } catch (Exception e) {
505       // cleanup the working directory by trying to delete it from the fs.
506       Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
507       try {
508         if (!this.master.getMasterFileSystem().getFileSystem().delete(workingDir, true)) {
509           LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
510               ClientSnapshotDescriptionUtils.toString(snapshot));
511         }
512       } catch (IOException e1) {
513         LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
514             ClientSnapshotDescriptionUtils.toString(snapshot));
515       }
516       // fail the snapshot
517       throw new SnapshotCreationException("Could not build snapshot handler", e, snapshot);
518     }
519   }
520 
521   /**
522    * Take a snapshot based on the enabled/disabled state of the table.
523    *
524    * @param snapshot
525    * @throws HBaseSnapshotException when a snapshot specific exception occurs.
526    * @throws IOException when some sort of generic IO exception occurs.
527    */
528   public void takeSnapshot(SnapshotDescription snapshot) throws IOException {
529     // check to see if we already completed the snapshot
530     if (isSnapshotCompleted(snapshot)) {
531       throw new SnapshotExistsException("Snapshot '" + snapshot.getName()
532           + "' already stored on the filesystem.", snapshot);
533     }
534 
535     LOG.debug("No existing snapshot, attempting snapshot...");
536 
537     // stop tracking "abandoned" handlers
538     cleanupSentinels();
539 
540     // check to see if the table exists
541     HTableDescriptor desc = null;
542     try {
543       desc = master.getTableDescriptors().get(
544           TableName.valueOf(snapshot.getTable()));
545     } catch (FileNotFoundException e) {
546       String msg = "Table:" + snapshot.getTable() + " info doesn't exist!";
547       LOG.error(msg);
548       throw new SnapshotCreationException(msg, e, snapshot);
549     } catch (IOException e) {
550       throw new SnapshotCreationException("Error while geting table description for table "
551           + snapshot.getTable(), e, snapshot);
552     }
553     if (desc == null) {
554       throw new SnapshotCreationException("Table '" + snapshot.getTable()
555           + "' doesn't exist, can't take snapshot.", snapshot);
556     }
557     SnapshotDescription.Builder builder = snapshot.toBuilder();
558     // if not specified, set the snapshot format
559     if (!snapshot.hasVersion()) {
560       builder.setVersion(SnapshotDescriptionUtils.SNAPSHOT_LAYOUT_VERSION);
561     }
562     User user = RpcServer.getRequestUser();
563     LOG.info("User " + user + " requested snapshot " + snapshot);
564     if (User.isHBaseSecurityEnabled(master.getConfiguration()) && user != null) {
565       builder.setOwner(user.getShortName());
566     }
567     snapshot = builder.build();
568 
569     // call pre coproc hook
570     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
571     if (cpHost != null) {
572       cpHost.preSnapshot(snapshot, desc);
573     }
574 
575     // if the table is enabled, then have the RS run actually the snapshot work
576     TableName snapshotTable = TableName.valueOf(snapshot.getTable());
577     AssignmentManager assignmentMgr = master.getAssignmentManager();
578     if (assignmentMgr.getTableStateManager().isTableState(snapshotTable,
579         ZooKeeperProtos.Table.State.ENABLED)) {
580       LOG.debug("Table enabled, starting distributed snapshot.");
581       snapshotEnabledTable(snapshot);
582       LOG.debug("Started snapshot: " + ClientSnapshotDescriptionUtils.toString(snapshot));
583     }
584     // For disabled table, snapshot is created by the master
585     else if (assignmentMgr.getTableStateManager().isTableState(snapshotTable,
586         ZooKeeperProtos.Table.State.DISABLED)) {
587       LOG.debug("Table is disabled, running snapshot entirely on master.");
588       snapshotDisabledTable(snapshot);
589       LOG.debug("Started snapshot: " + ClientSnapshotDescriptionUtils.toString(snapshot));
590     } else {
591       LOG.error("Can't snapshot table '" + snapshot.getTable()
592           + "', isn't open or closed, we don't know what to do!");
593       TablePartiallyOpenException tpoe = new TablePartiallyOpenException(snapshot.getTable()
594           + " isn't fully open.");
595       throw new SnapshotCreationException("Table is not entirely open or closed", tpoe, snapshot);
596     }
597 
598     // call post coproc hook
599     if (cpHost != null) {
600       cpHost.postSnapshot(snapshot, desc);
601     }
602   }
603 
604   /**
605    * Set the handler for the current snapshot
606    * <p>
607    * Exposed for TESTING
608    * @param tableName
609    * @param handler handler the master should use
610    *
611    * TODO get rid of this if possible, repackaging, modify tests.
612    */
613   public synchronized void setSnapshotHandlerForTesting(
614       final TableName tableName,
615       final SnapshotSentinel handler) {
616     if (handler != null) {
617       this.snapshotHandlers.put(tableName, handler);
618     } else {
619       this.snapshotHandlers.remove(tableName);
620     }
621   }
622 
623   /**
624    * @return distributed commit coordinator for all running snapshots
625    */
626   ProcedureCoordinator getCoordinator() {
627     return coordinator;
628   }
629 
630   /**
631    * Check to see if the snapshot is one of the currently completed snapshots
632    * Returns true if the snapshot exists in the "completed snapshots folder".
633    *
634    * @param snapshot expected snapshot to check
635    * @return <tt>true</tt> if the snapshot is stored on the {@link FileSystem}, <tt>false</tt> if is
636    *         not stored
637    * @throws IOException if the filesystem throws an unexpected exception,
638    * @throws IllegalArgumentException if snapshot name is invalid.
639    */
640   private boolean isSnapshotCompleted(SnapshotDescription snapshot) throws IOException {
641     try {
642       final Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
643       FileSystem fs = master.getMasterFileSystem().getFileSystem();
644       // check to see if the snapshot already exists
645       return fs.exists(snapshotDir);
646     } catch (IllegalArgumentException iae) {
647       throw new UnknownSnapshotException("Unexpected exception thrown", iae);
648     }
649   }
650 
651   /**
652    * Clone the specified snapshot into a new table.
653    * The operation will fail if the destination table has a snapshot or restore in progress.
654    *
655    * @param snapshot Snapshot Descriptor
656    * @param hTableDescriptor Table Descriptor of the table to create
657    */
658   synchronized void cloneSnapshot(final SnapshotDescription snapshot,
659       final HTableDescriptor hTableDescriptor, final boolean restoreAcl)
660       throws HBaseSnapshotException {
661     TableName tableName = hTableDescriptor.getTableName();
662 
663     // make sure we aren't running a snapshot on the same table
664     if (isTakingSnapshot(tableName)) {
665       throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
666     }
667 
668     // make sure we aren't running a restore on the same table
669     if (isRestoringTable(tableName)) {
670       throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
671     }
672 
673     try {
674       CloneSnapshotHandler handler =
675           new CloneSnapshotHandler(master, snapshot, hTableDescriptor, restoreAcl).prepare();
676       this.executorService.submit(handler);
677       this.restoreHandlers.put(tableName, handler);
678     } catch (Exception e) {
679       String msg = "Couldn't clone the snapshot=" + ClientSnapshotDescriptionUtils.toString(snapshot) +
680         " on table=" + tableName;
681       LOG.error(msg, e);
682       throw new RestoreSnapshotException(msg, e);
683     }
684   }
685 
686   /**
687    * Restore the specified snapshot
688    * @param reqSnapshot
689    * @throws IOException
690    */
691   public void restoreSnapshot(SnapshotDescription reqSnapshot, boolean restoreAcl)
692       throws IOException {
693     FileSystem fs = master.getMasterFileSystem().getFileSystem();
694     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(reqSnapshot, rootDir);
695     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
696 
697     // check if the snapshot exists
698     if (!fs.exists(snapshotDir)) {
699       LOG.error("A Snapshot named '" + reqSnapshot.getName() + "' does not exist.");
700       throw new SnapshotDoesNotExistException(reqSnapshot);
701     }
702 
703     // Get snapshot info from file system. The reqSnapshot is a "fake" snapshotInfo with
704     // just the snapshot "name" and table name to restore. It does not contains the "real" snapshot
705     // information.
706     SnapshotDescription snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
707     SnapshotManifest manifest = SnapshotManifest.open(master.getConfiguration(), fs,
708         snapshotDir, snapshot);
709     HTableDescriptor snapshotTableDesc = manifest.getTableDescriptor();
710     TableName tableName = TableName.valueOf(reqSnapshot.getTable());
711 
712     // stop tracking "abandoned" handlers
713     cleanupSentinels();
714 
715     // Verify snapshot validity
716     SnapshotReferenceUtil.verifySnapshot(master.getConfiguration(), fs, manifest);
717 
718     // Execute the restore/clone operation
719     if (MetaTableAccessor.tableExists(master.getConnection(), tableName)) {
720       if (master.getAssignmentManager().getTableStateManager().isTableState(
721           TableName.valueOf(snapshot.getTable()), ZooKeeperProtos.Table.State.ENABLED)) {
722         throw new UnsupportedOperationException("Table '" +
723             TableName.valueOf(snapshot.getTable()) + "' must be disabled in order to " +
724             "perform a restore operation" +
725             ".");
726       }
727 
728       // call coproc pre hook
729       if (cpHost != null) {
730         LOG.debug("snapshot owner " + snapshot.getOwner() + " for " + snapshot.getName());
731         cpHost.preRestoreSnapshot(snapshot, snapshotTableDesc);
732       }
733 
734       int tableRegionCount = -1;
735       try {
736         // Table already exist. Check and update the region quota for this table namespace.
737         // The region quota may not be updated correctly if there are concurrent restore snapshot
738         // requests for the same table
739 
740         tableRegionCount = getRegionCountOfTable(tableName);
741         int snapshotRegionCount = manifest.getRegionManifestsMap().size();
742 
743         // Update region quota when snapshotRegionCount is larger. If we updated the region count
744         // to a smaller value before retoreSnapshot and the retoreSnapshot fails, we may fail to
745         // reset the region count to its original value if the region quota is consumed by other
746         // tables in the namespace
747         if (tableRegionCount > 0 && tableRegionCount < snapshotRegionCount) {
748           checkAndUpdateNamespaceRegionQuota(snapshotRegionCount, tableName);
749         }
750         restoreSnapshot(snapshot, snapshotTableDesc, restoreAcl);
751         // Update the region quota if snapshotRegionCount is smaller. This step should not fail
752         // because we have reserved enough region quota before hand
753         if (tableRegionCount > 0 && tableRegionCount > snapshotRegionCount) {
754           checkAndUpdateNamespaceRegionQuota(snapshotRegionCount, tableName);
755         }
756       } catch (QuotaExceededException e) {
757         LOG.error("Region quota exceeded while restoring the snapshot " + snapshot.getName()
758           + " as table " + tableName.getNameAsString(), e);
759         // If QEE is thrown before restoreSnapshot, quota information is not updated, so we
760         // should throw the exception directly. If QEE is thrown after restoreSnapshot, there
761         // must be unexpected reasons, we also throw the exception directly
762         throw e;
763       } catch (IOException e) {
764         if (tableRegionCount > 0) {
765           // reset the region count for table
766           checkAndUpdateNamespaceRegionQuota(tableRegionCount, tableName);
767         }
768         LOG.error("Exception occurred while restoring the snapshot " + snapshot.getName()
769             + " as table " + tableName.getNameAsString(), e);
770         throw e;
771       }
772       LOG.info("Restore snapshot=" + snapshot.getName() + " as table=" + tableName);
773 
774       if (cpHost != null) {
775         cpHost.postRestoreSnapshot(snapshot, snapshotTableDesc);
776       }
777     } else {
778       HTableDescriptor htd = RestoreSnapshotHelper.cloneTableSchema(snapshotTableDesc, tableName);
779       if (cpHost != null) {
780         cpHost.preCloneSnapshot(snapshot, htd);
781       }
782       try {
783         checkAndUpdateNamespaceQuota(manifest, tableName);
784         cloneSnapshot(snapshot, htd, restoreAcl);
785       } catch (IOException e) {
786         this.master.getMasterQuotaManager().removeTableFromNamespaceQuota(tableName);
787         LOG.error("Exception occurred while cloning the snapshot " + snapshot.getName()
788             + " as table " + tableName.getNameAsString(), e);
789         throw e;
790       }
791       LOG.info("Clone snapshot=" + snapshot.getName() + " as table=" + tableName);
792 
793       if (cpHost != null) {
794         cpHost.postCloneSnapshot(snapshot, htd);
795       }
796     }
797   }
798   
799   private void checkAndUpdateNamespaceQuota(SnapshotManifest manifest, TableName tableName)
800       throws IOException {
801     if (this.master.getMasterQuotaManager().isQuotaInitialized()) {
802       this.master.getMasterQuotaManager().checkNamespaceTableAndRegionQuota(tableName,
803         manifest.getRegionManifestsMap().size());
804     }
805   }
806 
807   private void checkAndUpdateNamespaceRegionQuota(int updatedRegionCount, TableName tableName)
808       throws IOException {
809     if (this.master.getMasterQuotaManager().isQuotaInitialized()) {
810       this.master.getMasterQuotaManager().checkAndUpdateNamespaceRegionQuota(tableName,
811         updatedRegionCount);
812     }
813   }
814 
815   /**
816    * @return cached region count, or -1 if quota manager is disabled or table status not found
817   */
818   private int getRegionCountOfTable(TableName tableName) throws IOException {
819     if (this.master.getMasterQuotaManager().isQuotaInitialized()) {
820       return this.master.getMasterQuotaManager().getRegionCountOfTable(tableName);
821     }
822     return -1;
823   }
824 
825   /**
826    * Restore the specified snapshot.
827    * The restore will fail if the destination table has a snapshot or restore in progress.
828    *
829    * @param snapshot Snapshot Descriptor
830    * @param hTableDescriptor Table Descriptor
831    */
832   private synchronized void restoreSnapshot(final SnapshotDescription snapshot,
833       final HTableDescriptor hTableDescriptor, final boolean restoreAcl)
834       throws HBaseSnapshotException {
835     TableName tableName = hTableDescriptor.getTableName();
836 
837     // make sure we aren't running a snapshot on the same table
838     if (isTakingSnapshot(tableName)) {
839       throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
840     }
841 
842     // make sure we aren't running a restore on the same table
843     if (isRestoringTable(tableName)) {
844       throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
845     }
846 
847     try {
848       RestoreSnapshotHandler handler =
849         new RestoreSnapshotHandler(master, snapshot, hTableDescriptor, restoreAcl).prepare();
850       this.executorService.submit(handler);
851       restoreHandlers.put(tableName, handler);
852     } catch (Exception e) {
853       String msg = "Couldn't restore the snapshot=" + ClientSnapshotDescriptionUtils.toString(
854           snapshot)  +
855           " on table=" + tableName;
856       LOG.error(msg, e);
857       throw new RestoreSnapshotException(msg, e);
858     }
859   }
860 
861   /**
862    * Verify if the restore of the specified table is in progress.
863    *
864    * @param tableName table under restore
865    * @return <tt>true</tt> if there is a restore in progress of the specified table.
866    */
867   private synchronized boolean isRestoringTable(final TableName tableName) {
868     SnapshotSentinel sentinel = this.restoreHandlers.get(tableName);
869     return(sentinel != null && !sentinel.isFinished());
870   }
871 
872   /**
873    * Returns the status of a restore operation.
874    * If the in-progress restore is failed throws the exception that caused the failure.
875    *
876    * @param snapshot
877    * @return false if in progress, true if restore is completed or not requested.
878    * @throws IOException if there was a failure during the restore
879    */
880   public boolean isRestoreDone(final SnapshotDescription snapshot) throws IOException {
881     // check to see if the sentinel exists,
882     // and if the task is complete removes it from the in-progress restore map.
883     SnapshotSentinel sentinel = removeSentinelIfFinished(this.restoreHandlers, snapshot);
884 
885     // stop tracking "abandoned" handlers
886     cleanupSentinels();
887 
888     if (sentinel == null) {
889       // there is no sentinel so restore is not in progress.
890       return true;
891     }
892 
893     LOG.debug("Verify snapshot=" + snapshot.getName() + " against="
894         + sentinel.getSnapshot().getName() + " table=" +
895         TableName.valueOf(snapshot.getTable()));
896 
897     // If the restore is failed, rethrow the exception
898     sentinel.rethrowExceptionIfFailed();
899 
900     // check to see if we are done
901     if (sentinel.isFinished()) {
902       LOG.debug("Restore snapshot=" + ClientSnapshotDescriptionUtils.toString(snapshot) +
903           " has completed. Notifying the client.");
904       return true;
905     }
906 
907     if (LOG.isDebugEnabled()) {
908       LOG.debug("Sentinel is not yet finished with restoring snapshot=" +
909           ClientSnapshotDescriptionUtils.toString(snapshot));
910     }
911     return false;
912   }
913 
914   /**
915    * Return the handler if it is currently live and has the same snapshot target name.
916    * The handler is removed from the sentinels map if completed.
917    * @param sentinels live handlers
918    * @param snapshot snapshot description
919    * @return null if doesn't match, else a live handler.
920    */
921   private synchronized SnapshotSentinel removeSentinelIfFinished(
922       final Map<TableName, SnapshotSentinel> sentinels,
923       final SnapshotDescription snapshot) {
924     if (!snapshot.hasTable()) {
925       return null;
926     }
927 
928     TableName snapshotTable = TableName.valueOf(snapshot.getTable());
929     SnapshotSentinel h = sentinels.get(snapshotTable);
930     if (h == null) {
931       return null;
932     }
933 
934     if (!h.getSnapshot().getName().equals(snapshot.getName())) {
935       // specified snapshot is to the one currently running
936       return null;
937     }
938 
939     // Remove from the "in-progress" list once completed
940     if (h.isFinished()) {
941       sentinels.remove(snapshotTable);
942     }
943 
944     return h;
945   }
946 
947   /**
948    * Removes "abandoned" snapshot/restore requests.
949    * As part of the HBaseAdmin snapshot/restore API the operation status is checked until completed,
950    * and the in-progress maps are cleaned up when the status of a completed task is requested.
951    * To avoid having sentinels staying around for long time if something client side is failed,
952    * each operation tries to clean up the in-progress maps sentinels finished from a long time.
953    */
954   private void cleanupSentinels() {
955     cleanupSentinels(this.snapshotHandlers);
956     cleanupSentinels(this.restoreHandlers);
957   }
958 
959   /**
960    * Remove the sentinels that are marked as finished and the completion time
961    * has exceeded the removal timeout.
962    * @param sentinels map of sentinels to clean
963    */
964   private synchronized void cleanupSentinels(final Map<TableName, SnapshotSentinel> sentinels) {
965     long currentTime = EnvironmentEdgeManager.currentTime();
966     Iterator<Map.Entry<TableName, SnapshotSentinel>> it =
967         sentinels.entrySet().iterator();
968     while (it.hasNext()) {
969       Map.Entry<TableName, SnapshotSentinel> entry = it.next();
970       SnapshotSentinel sentinel = entry.getValue();
971       if (sentinel.isFinished() &&
972           (currentTime - sentinel.getCompletionTimestamp()) > SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT)
973       {
974         it.remove();
975       }
976     }
977   }
978 
979   //
980   // Implementing Stoppable interface
981   //
982 
983   @Override
984   public void stop(String why) {
985     // short circuit
986     if (this.stopped) return;
987     // make sure we get stop
988     this.stopped = true;
989     // pass the stop onto take snapshot handlers
990     for (SnapshotSentinel snapshotHandler: this.snapshotHandlers.values()) {
991       snapshotHandler.cancel(why);
992     }
993 
994     // pass the stop onto all the restore handlers
995     for (SnapshotSentinel restoreHandler: this.restoreHandlers.values()) {
996       restoreHandler.cancel(why);
997     }
998     try {
999       if (coordinator != null) {
1000         coordinator.close();
1001       }
1002     } catch (IOException e) {
1003       LOG.error("stop ProcedureCoordinator error", e);
1004     }
1005   }
1006 
1007   @Override
1008   public boolean isStopped() {
1009     return this.stopped;
1010   }
1011 
1012   /**
1013    * Throws an exception if snapshot operations (take a snapshot, restore, clone) are not supported.
1014    * Called at the beginning of snapshot() and restoreSnapshot() methods.
1015    * @throws UnsupportedOperationException if snapshot are not supported
1016    */
1017   public void checkSnapshotSupport() throws UnsupportedOperationException {
1018     if (!this.isSnapshotSupported) {
1019       throw new UnsupportedOperationException(
1020         "To use snapshots, You must add to the hbase-site.xml of the HBase Master: '" +
1021           HBASE_SNAPSHOT_ENABLED + "' property with value 'true'.");
1022     }
1023   }
1024 
1025   /**
1026    * Called at startup, to verify if snapshot operation is supported, and to avoid
1027    * starting the master if there're snapshots present but the cleaners needed are missing.
1028    * Otherwise we can end up with snapshot data loss.
1029    * @param conf The {@link Configuration} object to use
1030    * @param mfs The MasterFileSystem to use
1031    * @throws IOException in case of file-system operation failure
1032    * @throws UnsupportedOperationException in case cleaners are missing and
1033    *         there're snapshot in the system
1034    */
1035   private void checkSnapshotSupport(final Configuration conf, final MasterFileSystem mfs)
1036       throws IOException, UnsupportedOperationException {
1037     // Verify if snapshot is disabled by the user
1038     String enabled = conf.get(HBASE_SNAPSHOT_ENABLED);
1039     boolean snapshotEnabled = conf.getBoolean(HBASE_SNAPSHOT_ENABLED, false);
1040     boolean userDisabled = (enabled != null && enabled.trim().length() > 0 && !snapshotEnabled);
1041 
1042     // Extract cleaners from conf
1043     Set<String> hfileCleaners = new HashSet<String>();
1044     String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
1045     if (cleaners != null) Collections.addAll(hfileCleaners, cleaners);
1046 
1047     Set<String> logCleaners = new HashSet<String>();
1048     cleaners = conf.getStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS);
1049     if (cleaners != null) Collections.addAll(logCleaners, cleaners);
1050 
1051     // check if an older version of snapshot directory was present
1052     Path oldSnapshotDir = new Path(mfs.getRootDir(), HConstants.OLD_SNAPSHOT_DIR_NAME);
1053     FileSystem fs = mfs.getFileSystem();
1054     List<SnapshotDescription> ss = getCompletedSnapshots(new Path(rootDir, oldSnapshotDir));
1055     if (ss != null && !ss.isEmpty()) {
1056       LOG.error("Snapshots from an earlier release were found under: " + oldSnapshotDir);
1057       LOG.error("Please rename the directory as " + HConstants.SNAPSHOT_DIR_NAME);
1058     }
1059 
1060     // If the user has enabled the snapshot, we force the cleaners to be present
1061     // otherwise we still need to check if cleaners are enabled or not and verify
1062     // that there're no snapshot in the .snapshot folder.
1063     if (snapshotEnabled) {
1064       // Inject snapshot cleaners, if snapshot.enable is true
1065       hfileCleaners.add(SnapshotHFileCleaner.class.getName());
1066       hfileCleaners.add(HFileLinkCleaner.class.getName());
1067       logCleaners.add(SnapshotLogCleaner.class.getName());
1068 
1069       // Set cleaners conf
1070       conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
1071         hfileCleaners.toArray(new String[hfileCleaners.size()]));
1072       conf.setStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS,
1073         logCleaners.toArray(new String[logCleaners.size()]));
1074     } else {
1075       // Verify if cleaners are present
1076       snapshotEnabled = logCleaners.contains(SnapshotLogCleaner.class.getName()) &&
1077         hfileCleaners.contains(SnapshotHFileCleaner.class.getName()) &&
1078         hfileCleaners.contains(HFileLinkCleaner.class.getName());
1079 
1080       // Warn if the cleaners are enabled but the snapshot.enabled property is false/not set.
1081       if (snapshotEnabled) {
1082         LOG.warn("Snapshot log and hfile cleaners are present in the configuration, " +
1083           "but the '" + HBASE_SNAPSHOT_ENABLED + "' property " +
1084           (userDisabled ? "is set to 'false'." : "is not set."));
1085       }
1086     }
1087 
1088     // Mark snapshot feature as enabled if cleaners are present and user has not disabled it.
1089     this.isSnapshotSupported = snapshotEnabled && !userDisabled;
1090 
1091     // If cleaners are not enabled, verify that there're no snapshot in the .snapshot folder
1092     // otherwise we end up with snapshot data loss.
1093     if (!snapshotEnabled) {
1094       LOG.info("Snapshot feature is not enabled, missing log and hfile cleaners.");
1095       Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(mfs.getRootDir());
1096       if (fs.exists(snapshotDir)) {
1097         FileStatus[] snapshots = FSUtils.listStatus(fs, snapshotDir,
1098           new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
1099         if (snapshots != null) {
1100           LOG.error("Snapshots are present, but cleaners are not enabled.");
1101           checkSnapshotSupport();
1102         }
1103       }
1104     }
1105   }
1106 
1107   @Override
1108   public void initialize(MasterServices master, MetricsMaster metricsMaster) throws KeeperException,
1109       IOException, UnsupportedOperationException {
1110     this.master = master;
1111 
1112     this.rootDir = master.getMasterFileSystem().getRootDir();
1113     checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
1114 
1115     // get the configuration for the coordinator
1116     Configuration conf = master.getConfiguration();
1117     long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
1118     long timeoutMillis = Math.max(conf.getLong(SnapshotDescriptionUtils.SNAPSHOT_TIMEOUT_MILLIS_KEY,
1119                     SnapshotDescriptionUtils.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT),
1120             conf.getLong(SnapshotDescriptionUtils.MASTER_SNAPSHOT_TIMEOUT_MILLIS,
1121                     SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME));
1122     int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, SNAPSHOT_POOL_THREADS_DEFAULT);
1123 
1124     // setup the default procedure coordinator
1125     String name = master.getServerName().toString();
1126     ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
1127     ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
1128         master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
1129 
1130     this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
1131     this.executorService = master.getExecutorService();
1132     resetTempDir();
1133   }
1134 
1135   @Override
1136   public String getProcedureSignature() {
1137     return ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
1138   }
1139 
1140   @Override
1141   public void execProcedure(ProcedureDescription desc) throws IOException {
1142     takeSnapshot(toSnapshotDescription(desc));
1143   }
1144 
1145   @Override
1146   public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
1147     return isSnapshotDone(toSnapshotDescription(desc));
1148   }
1149 
1150   private SnapshotDescription toSnapshotDescription(ProcedureDescription desc)
1151       throws IOException {
1152     SnapshotDescription.Builder builder = SnapshotDescription.newBuilder();
1153     if (!desc.hasInstance()) {
1154       throw new IOException("Snapshot name is not defined: " + desc.toString());
1155     }
1156     String snapshotName = desc.getInstance();
1157     List<NameStringPair> props = desc.getConfigurationList();
1158     String table = null;
1159     for (NameStringPair prop : props) {
1160       if ("table".equalsIgnoreCase(prop.getName())) {
1161         table = prop.getValue();
1162       }
1163     }
1164     if (table == null) {
1165       throw new IOException("Snapshot table is not defined: " + desc.toString());
1166     }
1167     TableName tableName = TableName.valueOf(table);
1168     builder.setTable(tableName.getNameAsString());
1169     builder.setName(snapshotName);
1170     builder.setType(SnapshotDescription.Type.FLUSH);
1171     return builder.build();
1172   }
1173 }