View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.HashSet;
25  import java.util.List;
26  import java.util.Set;
27  import java.util.concurrent.locks.Lock;
28  import java.util.concurrent.locks.ReentrantLock;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.fs.FileStatus;
35  import org.apache.hadoop.fs.FileSystem;
36  import org.apache.hadoop.fs.Path;
37  import org.apache.hadoop.fs.PathFilter;
38  import org.apache.hadoop.fs.permission.FsPermission;
39  import org.apache.hadoop.hbase.ClusterId;
40  import org.apache.hadoop.hbase.TableName;
41  import org.apache.hadoop.hbase.HColumnDescriptor;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.HRegionInfo;
44  import org.apache.hadoop.hbase.HTableDescriptor;
45  import org.apache.hadoop.hbase.InvalidFamilyOperationException;
46  import org.apache.hadoop.hbase.RemoteExceptionHandler;
47  import org.apache.hadoop.hbase.Server;
48  import org.apache.hadoop.hbase.ServerName;
49  import org.apache.hadoop.hbase.backup.HFileArchiver;
50  import org.apache.hadoop.hbase.exceptions.DeserializationException;
51  import org.apache.hadoop.hbase.fs.HFileSystem;
52  import org.apache.hadoop.hbase.mob.MobConstants;
53  import org.apache.hadoop.hbase.mob.MobUtils;
54  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
55  import org.apache.hadoop.hbase.regionserver.HRegion;
56  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
57  import org.apache.hadoop.hbase.wal.WALSplitter;
58  import org.apache.hadoop.hbase.util.Bytes;
59  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
60  import org.apache.hadoop.hbase.util.FSTableDescriptors;
61  import org.apache.hadoop.hbase.util.FSUtils;
62  
63  /**
64   * This class abstracts a bunch of operations the HMaster needs to interact with
65   * the underlying file system, including splitting log files, checking file
66   * system status, etc.
67   */
68  @InterfaceAudience.Private
69  public class MasterFileSystem {
70    private static final Log LOG = LogFactory.getLog(MasterFileSystem.class);
71  
72    /** Parameter name for HBase instance root directory permission*/
73    public static final String HBASE_DIR_PERMS = "hbase.rootdir.perms";
74  
75    /** Parameter name for HBase WAL directory permission*/
76    public static final String HBASE_WAL_DIR_PERMS = "hbase.wal.dir.perms";
77  
78    // HBase configuration
79    Configuration conf;
80    // master status
81    Server master;
82    // metrics for master
83    private final MetricsMasterFileSystem metricsMasterFilesystem = new MetricsMasterFileSystem();
84    // Persisted unique cluster ID
85    private ClusterId clusterId;
86    // Keep around for convenience.
87    private final FileSystem fs;
88    private final FileSystem walFs;
89    // root WAL directory
90    private final Path walRootDir;
91    // Is the fileystem ok?
92    private volatile boolean walFsOk = true;
93    // The Path to the old logs dir
94    private final Path oldLogDir;
95    // root hbase directory on the FS
96    private final Path rootdir;
97    // hbase temp directory used for table construction and deletion
98    private final Path tempdir;
99    // create the split log lock
100   final Lock splitLogLock = new ReentrantLock();
101   final boolean distributedLogReplay;
102   final SplitLogManager splitLogManager;
103   private final MasterServices services;
104 
105   final static PathFilter META_FILTER = new PathFilter() {
106     @Override
107     public boolean accept(Path p) {
108       return DefaultWALProvider.isMetaFile(p);
109     }
110   };
111 
112   final static PathFilter NON_META_FILTER = new PathFilter() {
113     @Override
114     public boolean accept(Path p) {
115       return !DefaultWALProvider.isMetaFile(p);
116     }
117   };
118 
119   public MasterFileSystem(Server master, MasterServices services)
120   throws IOException {
121     this.conf = master.getConfiguration();
122     this.master = master;
123     this.services = services;
124     // Set filesystem to be that of this.rootdir else we get complaints about
125     // mismatched filesystems if hbase.rootdir is hdfs and fs.defaultFS is
126     // default localfs.  Presumption is that rootdir is fully-qualified before
127     // we get to here with appropriate fs scheme.
128     this.rootdir = FSUtils.getRootDir(conf);
129     this.tempdir = new Path(this.rootdir, HConstants.HBASE_TEMP_DIRECTORY);
130     // Cover both bases, the old way of setting default fs and the new.
131     // We're supposed to run on 0.20 and 0.21 anyways.
132     this.fs = this.rootdir.getFileSystem(conf);
133     this.walRootDir = FSUtils.getWALRootDir(conf);
134     this.walFs = FSUtils.getWALFileSystem(conf);
135     FSUtils.setFsDefault(conf, new Path(this.walFs.getUri()));
136     walFs.setConf(conf);
137     FSUtils.setFsDefault(conf, new Path(this.fs.getUri()));
138     // make sure the fs has the same conf
139     fs.setConf(conf);
140     // setup the filesystem variable
141     // set up the archived logs path
142     this.oldLogDir = createInitialFileSystemLayout();
143     HFileSystem.addLocationsOrderInterceptor(conf);
144     this.splitLogManager =
145         new SplitLogManager(master, master.getConfiguration(), master, services,
146             master.getServerName());
147     this.distributedLogReplay = this.splitLogManager.isLogReplaying();
148   }
149 
150   /**
151    * Create initial layout in filesystem.
152    * <ol>
153    * <li>Check if the meta region exists and is readable, if not create it.
154    * Create hbase.version and the hbase:meta directory if not one.
155    * </li>
156    * <li>Create a log archive directory for RS to put archived logs</li>
157    * </ol>
158    * Idempotent.
159    */
160   private Path createInitialFileSystemLayout() throws IOException {
161 
162     checkRootDir(this.rootdir, conf, this.fs, HConstants.HBASE_DIR, HBASE_DIR_PERMS);
163     // if the log directory is different from root, check if it exists
164     if (!this.walRootDir.equals(this.rootdir)) {
165       checkRootDir(this.walRootDir, conf, this.walFs, HFileSystem.HBASE_WAL_DIR, HBASE_WAL_DIR_PERMS);
166     }
167 
168     // check if temp directory exists and clean it
169     checkTempDir(this.tempdir, conf, this.fs);
170 
171     Path oldLogDir = new Path(this.walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
172 
173     // Make sure the region servers can archive their old logs
174     if(!this.walFs.exists(oldLogDir)) {
175       this.walFs.mkdirs(oldLogDir);
176     }
177 
178     return oldLogDir;
179   }
180 
181   public FileSystem getFileSystem() {
182     return this.fs;
183   }
184 
185   /**
186    * Get the directory where old logs go
187    * @return the dir
188    */
189   public Path getOldLogDir() {
190     return this.oldLogDir;
191   }
192 
193   /**
194    * Checks to see if the file system is still accessible.
195    * If not, sets closed
196    * @return false if file system is not available
197    */
198   public boolean checkFileSystem() {
199     if (this.walFsOk) {
200       try {
201         FSUtils.checkFileSystemAvailable(this.walFs);
202         FSUtils.checkDfsSafeMode(this.conf);
203       } catch (IOException e) {
204         master.abort("Shutting down HBase cluster: file system not available", e);
205         this.walFsOk = false;
206       }
207     }
208     return this.walFsOk;
209   }
210 
211   protected FileSystem getWALFileSystem() {
212     return this.walFs;
213   }
214 
215   public Configuration getConfiguration() {
216     return this.conf;
217   }
218 
219   /**
220    * @return HBase root dir.
221    */
222   public Path getRootDir() {
223     return this.rootdir;
224   }
225 
226   /**
227    * @return HBase root log dir.
228    */
229   public Path getWALRootDir() { return this.walRootDir; }
230 
231   /**
232    * @return HBase temp dir.
233    */
234   public Path getTempDir() {
235     return this.tempdir;
236   }
237 
238   /**
239    * @return The unique identifier generated for this cluster
240    */
241   public ClusterId getClusterId() {
242     return clusterId;
243   }
244 
245   /**
246    * Inspect the log directory to find dead servers which need recovery work
247    * @return A set of ServerNames which aren't running but still have WAL files left in file system
248    */
249   Set<ServerName> getFailedServersFromLogFolders() {
250     boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors",
251         WALSplitter.SPLIT_SKIP_ERRORS_DEFAULT);
252 
253     Set<ServerName> serverNames = new HashSet<ServerName>();
254     Path logsDirPath = new Path(this.walRootDir, HConstants.HREGION_LOGDIR_NAME);
255 
256     do {
257       if (master.isStopped()) {
258         LOG.warn("Master stopped while trying to get failed servers.");
259         break;
260       }
261       try {
262         if (!this.walFs.exists(logsDirPath)) return serverNames;
263         FileStatus[] logFolders = FSUtils.listStatus(this.walFs, logsDirPath, null);
264         // Get online servers after getting log folders to avoid log folder deletion of newly
265         // checked in region servers . see HBASE-5916
266         Set<ServerName> onlineServers = ((HMaster) master).getServerManager().getOnlineServers()
267             .keySet();
268 
269         if (logFolders == null || logFolders.length == 0) {
270           LOG.debug("No log files to split, proceeding...");
271           return serverNames;
272         }
273         for (FileStatus status : logFolders) {
274           FileStatus[] curLogFiles = FSUtils.listStatus(this.walFs, status.getPath(), null);
275           if (curLogFiles == null || curLogFiles.length == 0) {
276             // Empty log folder. No recovery needed
277             continue;
278           }
279           final ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(
280               status.getPath());
281           if (null == serverName) {
282             LOG.warn("Log folder " + status.getPath() + " doesn't look like its name includes a " +
283                 "region server name; leaving in place. If you see later errors about missing " +
284                 "write ahead logs they may be saved in this location.");
285           } else if (!onlineServers.contains(serverName)) {
286             LOG.info("Log folder " + status.getPath() + " doesn't belong "
287                 + "to a known region server, splitting");
288             serverNames.add(serverName);
289           } else {
290             LOG.info("Log folder " + status.getPath() + " belongs to an existing region server");
291           }
292         }
293         retrySplitting = false;
294       } catch (IOException ioe) {
295         LOG.warn("Failed getting failed servers to be recovered.", ioe);
296         if (!checkFileSystem()) {
297           LOG.warn("Bad Filesystem, exiting");
298           Runtime.getRuntime().halt(1);
299         }
300         try {
301           if (retrySplitting) {
302             Thread.sleep(conf.getInt("hbase.hlog.split.failure.retry.interval", 30 * 1000));
303           }
304         } catch (InterruptedException e) {
305           LOG.warn("Interrupted, aborting since cannot return w/o splitting");
306           Thread.currentThread().interrupt();
307           retrySplitting = false;
308           Runtime.getRuntime().halt(1);
309         }
310       }
311     } while (retrySplitting);
312 
313     return serverNames;
314   }
315 
316   public void splitLog(final ServerName serverName) throws IOException {
317     Set<ServerName> serverNames = new HashSet<ServerName>();
318     serverNames.add(serverName);
319     splitLog(serverNames);
320   }
321 
322   /**
323    * Specialized method to handle the splitting for meta WAL
324    * @param serverName
325    * @throws IOException
326    */
327   public void splitMetaLog(final ServerName serverName) throws IOException {
328     Set<ServerName> serverNames = new HashSet<ServerName>();
329     serverNames.add(serverName);
330     splitMetaLog(serverNames);
331   }
332 
333   /**
334    * Specialized method to handle the splitting for meta WAL
335    * @param serverNames
336    * @throws IOException
337    */
338   public void splitMetaLog(final Set<ServerName> serverNames) throws IOException {
339     splitLog(serverNames, META_FILTER);
340   }
341 
342   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK", justification=
343       "We only release this lock when we set it. Updates to code that uses it should verify use " +
344       "of the guard boolean.")
345   private List<Path> getLogDirs(final Set<ServerName> serverNames) throws IOException {
346     List<Path> logDirs = new ArrayList<Path>();
347     boolean needReleaseLock = false;
348     if (!this.services.isInitialized()) {
349       // during master initialization, we could have multiple places splitting a same wal
350       this.splitLogLock.lock();
351       needReleaseLock = true;
352     }
353     try {
354       for (ServerName serverName : serverNames) {
355         Path logDir = new Path(this.walRootDir,
356             DefaultWALProvider.getWALDirectoryName(serverName.toString()));
357         Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT);
358         // Rename the directory so a rogue RS doesn't create more WALs
359         if (walFs.exists(logDir)) {
360           if (!this.walFs.rename(logDir, splitDir)) {
361             throw new IOException("Failed fs.rename for log split: " + logDir);
362           }
363           logDir = splitDir;
364           LOG.debug("Renamed region directory: " + splitDir);
365         } else if (!walFs.exists(splitDir)) {
366           LOG.info("Log dir for server " + serverName + " does not exist");
367           continue;
368         }
369         logDirs.add(splitDir);
370       }
371     } catch (IOException ioe) {
372       if (!checkFileSystem()) {
373         this.services.abort("Aborting due to filesystem unavailable", ioe);
374         throw ioe;
375       }
376     } finally {
377       if (needReleaseLock) {
378         this.splitLogLock.unlock();
379       }
380     }
381     return logDirs;
382   }
383 
384   /**
385    * Mark regions in recovering state when distributedLogReplay are set true
386    * @param serverName Failed region server whose wals to be replayed
387    * @param regions Set of regions to be recovered
388    * @throws IOException
389    */
390   public void prepareLogReplay(ServerName serverName, Set<HRegionInfo> regions) throws IOException {
391     if (!this.distributedLogReplay) {
392       return;
393     }
394     // mark regions in recovering state
395     if (regions == null || regions.isEmpty()) {
396       return;
397     }
398     this.splitLogManager.markRegionsRecovering(serverName, regions);
399   }
400 
401   public void splitLog(final Set<ServerName> serverNames) throws IOException {
402     splitLog(serverNames, NON_META_FILTER);
403   }
404 
405   /**
406    * Wrapper function on {@link SplitLogManager#removeStaleRecoveringRegions(Set)}
407    * @param failedServers
408    * @throws IOException
409    */
410   void removeStaleRecoveringRegionsFromZK(final Set<ServerName> failedServers)
411       throws IOException, InterruptedIOException {
412     this.splitLogManager.removeStaleRecoveringRegions(failedServers);
413   }
414 
415   /**
416    * This method is the base split method that splits WAL files matching a filter. Callers should
417    * pass the appropriate filter for meta and non-meta WALs.
418    * @param serverNames logs belonging to these servers will be split; this will rename the log
419    *                    directory out from under a soft-failed server
420    * @param filter
421    * @throws IOException
422    */
423   public void splitLog(final Set<ServerName> serverNames, PathFilter filter) throws IOException {
424     long splitTime = 0, splitLogSize = 0;
425     List<Path> logDirs = getLogDirs(serverNames);
426 
427     splitLogManager.handleDeadWorkers(serverNames);
428     splitTime = EnvironmentEdgeManager.currentTime();
429     splitLogSize = splitLogManager.splitLogDistributed(serverNames, logDirs, filter);
430     splitTime = EnvironmentEdgeManager.currentTime() - splitTime;
431 
432     if (this.metricsMasterFilesystem != null) {
433       if (filter == META_FILTER) {
434         this.metricsMasterFilesystem.addMetaWALSplit(splitTime, splitLogSize);
435       } else {
436         this.metricsMasterFilesystem.addSplit(splitTime, splitLogSize);
437       }
438     }
439   }
440 
441   /**
442    * Get the rootdir.  Make sure its wholesome and exists before returning.
443    * @param rd
444    * @param c
445    * @param fs
446    * @return hbase.rootdir (after checks for existence and bootstrapping if
447    * needed populating the directory with necessary bootup files).
448    * @throws IOException
449    */
450   @SuppressWarnings("deprecation")
451   private Path checkRootDir(final Path rd, final Configuration c,
452     final FileSystem fs, final String dirConfKey, final String dirPermsConfName)
453   throws IOException {
454     // If FS is in safe mode wait till out of it.
455     FSUtils.waitOnSafeMode(c, c.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000));
456 
457     boolean isSecurityEnabled = "kerberos".equalsIgnoreCase(c.get("hbase.security.authentication"));
458     FsPermission dirPerms = new FsPermission(c.get(dirPermsConfName, "700"));
459 
460     // Filesystem is good. Go ahead and check for rootdir.
461     try {
462       if (!fs.exists(rd)) {
463         if (isSecurityEnabled) {
464           fs.mkdirs(rd, dirPerms);
465         } else {
466           fs.mkdirs(rd);
467         }
468 
469         // HBASE-17437 updates createInitialFileSystemLayout() to re-use checkRootDir()
470         // to check hbase.wal.dir after checking hbase.rootdir.
471         // But FSUtils.setVersion() is supposed to be called only when checking hbase.rootdir,
472         // while it is supposed to be bypassed when checking hbase.wal.dir.
473         if (dirConfKey.equals(HConstants.HBASE_DIR)) {
474           // DFS leaves safe mode with 0 DNs when there are 0 blocks.
475           // We used to handle this by checking the current DN count and waiting until
476           // it is nonzero. With security, the check for datanode count doesn't work --
477           // it is a privileged op. So instead we adopt the strategy of the jobtracker
478           // and simply retry file creation during bootstrap indefinitely. As soon as
479           // there is one datanode it will succeed. Permission problems should have
480           // already been caught by mkdirs above.
481           FSUtils.setVersion(fs, rd,
482             c.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000),
483             c.getInt(HConstants.VERSION_FILE_WRITE_ATTEMPTS, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS));
484         }
485       } else {
486         if (!fs.isDirectory(rd)) {
487           throw new IllegalArgumentException(rd.toString() + " is not a directory");
488         }
489         if (isSecurityEnabled && !dirPerms.equals(fs.getFileStatus(rd).getPermission())) {
490           // check whether the permission match
491           LOG.warn("Found rootdir permissions NOT matching expected \"" + dirPermsConfName + "\" for "
492               + "rootdir=" + rd.toString() + " permissions=" + fs.getFileStatus(rd).getPermission()
493               + " and  \"" + dirPermsConfName + "\" configured as "
494               + c.get(dirPermsConfName, "700") + ". Automatically setting the permissions. You"
495               + " can change the permissions by setting \"" + dirPermsConfName + "\" in hbase-site.xml "
496               + "and restarting the master");
497           fs.setPermission(rd, dirPerms);
498         }
499 
500         // HBASE-17437 updates createInitialFileSystemLayout() to re-use checkRootDir()
501         // to check hbase.wal.dir after checking hbase.rootdir.
502         // But FSUtils.checkVersion() is supposed to be called only when checking hbase.rootdir,
503         // while it is supposed to be bypassed when checking hbase.wal.dir.
504         if (dirConfKey.equals(HConstants.HBASE_DIR)) {
505           FSUtils.checkVersion(fs, rd, true,
506             c.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000),
507             c.getInt(HConstants.VERSION_FILE_WRITE_ATTEMPTS, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS));
508         }
509       }
510     } catch (DeserializationException de) {
511       LOG.fatal("Please fix invalid configuration for " + dirConfKey, de);
512       IOException ioe = new IOException();
513       ioe.initCause(de);
514       throw ioe;
515     } catch (IllegalArgumentException iae) {
516       LOG.fatal("Please fix invalid configuration for "
517         + dirConfKey + " " + rd.toString(), iae);
518       throw iae;
519     }
520 
521     if (dirConfKey.equals(HConstants.HBASE_DIR)) {
522       // Make sure cluster ID exists
523       if (!FSUtils.checkClusterIdExists(fs, rd, c.getInt(
524           HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000))) {
525         FSUtils.setClusterId(fs, rd, new ClusterId(), c.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000));
526       }
527       clusterId = FSUtils.getClusterId(fs, rd);
528 
529       // Make sure the meta region directory exists!
530       if (!FSUtils.metaRegionExists(fs, rd)) {
531         bootstrap(rd, c);
532       } else {
533         // Migrate table descriptor files if necessary
534         org.apache.hadoop.hbase.util.FSTableDescriptorMigrationToSubdir
535             .migrateFSTableDescriptorsIfNecessary(fs, rd);
536       }
537 
538       // Create tableinfo-s for hbase:meta if not already there.
539 
540       // meta table is a system table, so descriptors are predefined,
541       // we should get them from registry.
542       FSTableDescriptors fsd = new FSTableDescriptors(c, fs, rd);
543       fsd.createTableDescriptor(
544           new HTableDescriptor(fsd.get(TableName.META_TABLE_NAME)));
545     }
546 
547     return rd;
548   }
549 
550   /**
551    * Make sure the hbase temp directory exists and is empty.
552    * NOTE that this method is only executed once just after the master becomes the active one.
553    */
554   private void checkTempDir(final Path tmpdir, final Configuration c, final FileSystem fs)
555       throws IOException {
556     // If the temp directory exists, clear the content (left over, from the previous run)
557     if (fs.exists(tmpdir)) {
558       // Archive table in temp, maybe left over from failed deletion,
559       // if not the cleaner will take care of them.
560       for (Path tabledir: FSUtils.getTableDirs(fs, tmpdir)) {
561         for (Path regiondir: FSUtils.getRegionDirs(fs, tabledir)) {
562           HFileArchiver.archiveRegion(fs, this.rootdir, tabledir, regiondir);
563         }
564       }
565       if (!fs.delete(tmpdir, true)) {
566         throw new IOException("Unable to clean the temp directory: " + tmpdir);
567       }
568     }
569 
570     // Create the temp directory
571     if (!fs.mkdirs(tmpdir)) {
572       throw new IOException("HBase temp directory '" + tmpdir + "' creation failure.");
573     }
574   }
575 
576   private static void bootstrap(final Path rd, final Configuration c)
577   throws IOException {
578     LOG.info("BOOTSTRAP: creating hbase:meta region");
579     try {
580       // Bootstrapping, make sure blockcache is off.  Else, one will be
581       // created here in bootstrap and it'll need to be cleaned up.  Better to
582       // not make it in first place.  Turn off block caching for bootstrap.
583       // Enable after.
584       HRegionInfo metaHRI = new HRegionInfo(HRegionInfo.FIRST_META_REGIONINFO);
585       HTableDescriptor metaDescriptor = new FSTableDescriptors(c).get(TableName.META_TABLE_NAME);
586       setInfoFamilyCachingForMeta(metaDescriptor, false);
587       HRegion meta = HRegion.createHRegion(metaHRI, rd, c, metaDescriptor);
588       setInfoFamilyCachingForMeta(metaDescriptor, true);
589       HRegion.closeHRegion(meta);
590     } catch (IOException e) {
591       e = RemoteExceptionHandler.checkIOException(e);
592       LOG.error("bootstrap", e);
593       throw e;
594     }
595   }
596 
597   /**
598    * Enable in memory caching for hbase:meta
599    */
600   public static void setInfoFamilyCachingForMeta(final HTableDescriptor metaDescriptor,
601       final boolean b) {
602     for (HColumnDescriptor hcd: metaDescriptor.getColumnFamilies()) {
603       if (Bytes.equals(hcd.getName(), HConstants.CATALOG_FAMILY)) {
604         hcd.setBlockCacheEnabled(b);
605         hcd.setInMemory(b);
606       }
607     }
608   }
609 
610   public void deleteRegion(HRegionInfo region) throws IOException {
611     HFileArchiver.archiveRegion(conf, fs, region);
612   }
613 
614   public void deleteTable(TableName tableName) throws IOException {
615     fs.delete(FSUtils.getTableDir(rootdir, tableName), true);
616   }
617 
618   /**
619    * Move the specified table to the hbase temp directory
620    * @param tableName Table name to move
621    * @return The temp location of the table moved
622    * @throws IOException in case of file-system failure
623    */
624   public Path moveTableToTemp(TableName tableName) throws IOException {
625     Path srcPath = FSUtils.getTableDir(rootdir, tableName);
626     Path tempPath = FSUtils.getTableDir(this.tempdir, tableName);
627 
628     // Ensure temp exists
629     if (!fs.exists(tempPath.getParent()) && !fs.mkdirs(tempPath.getParent())) {
630       throw new IOException("HBase temp directory '" + tempPath.getParent() + "' creation failure.");
631     }
632 
633     if (!fs.rename(srcPath, tempPath)) {
634       throw new IOException("Unable to move '" + srcPath + "' to temp '" + tempPath + "'");
635     }
636 
637     return tempPath;
638   }
639 
640   public void updateRegionInfo(HRegionInfo region) {
641     // TODO implement this.  i think this is currently broken in trunk i don't
642     //      see this getting updated.
643     //      @see HRegion.checkRegioninfoOnFilesystem()
644   }
645 
646   public void deleteFamilyFromFS(HRegionInfo region, byte[] familyName, boolean hasMob)
647       throws IOException {
648     // archive family store files
649     Path tableDir = FSUtils.getTableDir(rootdir, region.getTable());
650     HFileArchiver.archiveFamily(fs, conf, region, tableDir, familyName);
651 
652     // delete the family folder
653     Path familyDir = new Path(tableDir,
654       new Path(region.getEncodedName(), Bytes.toString(familyName)));
655     if (fs.delete(familyDir, true) == false) {
656       if (fs.exists(familyDir)) {
657         throw new IOException("Could not delete family "
658             + Bytes.toString(familyName) + " from FileSystem for region "
659             + region.getRegionNameAsString() + "(" + region.getEncodedName()
660             + ")");
661       }
662     }
663 
664     // archive and delete mob files
665     if (hasMob) {
666       Path mobTableDir =
667           FSUtils.getTableDir(new Path(getRootDir(), MobConstants.MOB_DIR_NAME), region.getTable());
668       HRegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(region.getTable());
669       Path mobFamilyDir =
670           new Path(mobTableDir,
671               new Path(mobRegionInfo.getEncodedName(), Bytes.toString(familyName)));
672       // archive mob family store files
673       MobUtils.archiveMobStoreFiles(conf, fs, mobRegionInfo, mobFamilyDir, familyName);
674 
675       if (!fs.delete(mobFamilyDir, true)) {
676         throw new IOException("Could not delete mob store files for family "
677             + Bytes.toString(familyName) + " from FileSystem region "
678             + mobRegionInfo.getRegionNameAsString() + "(" + mobRegionInfo.getEncodedName() + ")");
679       }
680     }
681   }
682 
683   public void stop() {
684     if (splitLogManager != null) {
685       this.splitLogManager.stop();
686     }
687   }
688 
689   /**
690    * Delete column of a table
691    * @param tableName
692    * @param familyName
693    * @return Modified HTableDescriptor with requested column deleted.
694    * @throws IOException
695    */
696   public HTableDescriptor deleteColumn(TableName tableName, byte[] familyName)
697       throws IOException {
698     LOG.info("DeleteColumn. Table = " + tableName
699         + " family = " + Bytes.toString(familyName));
700     HTableDescriptor htd = this.services.getTableDescriptors().get(tableName);
701     htd.removeFamily(familyName);
702     this.services.getTableDescriptors().add(htd);
703     return htd;
704   }
705 
706   /**
707    * Modify Column of a table
708    * @param tableName
709    * @param hcd HColumnDesciptor
710    * @return Modified HTableDescriptor with the column modified.
711    * @throws IOException
712    */
713   public HTableDescriptor modifyColumn(TableName tableName, HColumnDescriptor hcd)
714       throws IOException {
715     LOG.info("AddModifyColumn. Table = " + tableName
716         + " HCD = " + hcd.toString());
717 
718     HTableDescriptor htd = this.services.getTableDescriptors().get(tableName);
719     byte [] familyName = hcd.getName();
720     if(!htd.hasFamily(familyName)) {
721       throw new InvalidFamilyOperationException("Family '" +
722         Bytes.toString(familyName) + "' doesn't exists so cannot be modified");
723     }
724     htd.modifyFamily(hcd);
725     this.services.getTableDescriptors().add(htd);
726     return htd;
727   }
728 
729   /**
730    * Add column to a table
731    * @param tableName
732    * @param hcd
733    * @return Modified HTableDescriptor with new column added.
734    * @throws IOException
735    */
736   public HTableDescriptor addColumn(TableName tableName, HColumnDescriptor hcd)
737       throws IOException {
738     LOG.info("AddColumn. Table = " + tableName + " HCD = " +
739       hcd.toString());
740     HTableDescriptor htd = this.services.getTableDescriptors().get(tableName);
741     if (htd == null) {
742       throw new InvalidFamilyOperationException("Family '" +
743         hcd.getNameAsString() + "' cannot be modified as HTD is null");
744     }
745     htd.addFamily(hcd);
746     this.services.getTableDescriptors().add(htd);
747     return htd;
748   }
749 
750   /**
751    * The function is used in SSH to set recovery mode based on configuration after all outstanding
752    * log split tasks drained.
753    * @throws IOException
754    */
755   public void setLogRecoveryMode() throws IOException {
756       this.splitLogManager.setRecoveryMode(false);
757   }
758 
759   public RecoveryMode getLogRecoveryMode() {
760     return this.splitLogManager.getRecoveryMode();
761   }
762 }