View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.DataInputStream;
23  import java.io.EOFException;
24  import java.io.FileNotFoundException;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.io.InterruptedIOException;
28  import java.lang.reflect.InvocationTargetException;
29  import java.lang.reflect.Method;
30  import java.net.InetSocketAddress;
31  import java.net.URI;
32  import java.net.URISyntaxException;
33  import java.util.ArrayList;
34  import java.util.Arrays;
35  import java.util.Collections;
36  import java.util.HashMap;
37  import java.util.Iterator;
38  import java.util.LinkedList;
39  import java.util.List;
40  import java.util.Map;
41  import java.util.Vector;
42  import java.util.concurrent.ArrayBlockingQueue;
43  import java.util.concurrent.ConcurrentHashMap;
44  import java.util.concurrent.ExecutionException;
45  import java.util.concurrent.ExecutorService;
46  import java.util.concurrent.Future;
47  import java.util.concurrent.FutureTask;
48  import java.util.concurrent.ThreadPoolExecutor;
49  import java.util.concurrent.TimeUnit;
50  import java.util.regex.Pattern;
51  
52  import org.apache.commons.logging.Log;
53  import org.apache.commons.logging.LogFactory;
54  import org.apache.hadoop.HadoopIllegalArgumentException;
55  import org.apache.hadoop.conf.Configuration;
56  import org.apache.hadoop.fs.BlockLocation;
57  import org.apache.hadoop.fs.FSDataInputStream;
58  import org.apache.hadoop.fs.FSDataOutputStream;
59  import org.apache.hadoop.fs.FileStatus;
60  import org.apache.hadoop.fs.FileSystem;
61  import org.apache.hadoop.fs.Path;
62  import org.apache.hadoop.fs.PathFilter;
63  import org.apache.hadoop.fs.permission.FsAction;
64  import org.apache.hadoop.fs.permission.FsPermission;
65  import org.apache.hadoop.hbase.ClusterId;
66  import org.apache.hadoop.hbase.HColumnDescriptor;
67  import org.apache.hadoop.hbase.HConstants;
68  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
69  import org.apache.hadoop.hbase.HRegionInfo;
70  import org.apache.hadoop.hbase.RemoteExceptionHandler;
71  import org.apache.hadoop.hbase.TableName;
72  import org.apache.hadoop.hbase.classification.InterfaceAudience;
73  import org.apache.hadoop.hbase.exceptions.DeserializationException;
74  import org.apache.hadoop.hbase.fs.HFileSystem;
75  import org.apache.hadoop.hbase.io.HFileLink;
76  import org.apache.hadoop.hbase.master.HMaster;
77  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
78  import org.apache.hadoop.hbase.protobuf.generated.FSProtos;
79  import org.apache.hadoop.hbase.regionserver.HRegion;
80  import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
81  import org.apache.hadoop.hbase.security.AccessDeniedException;
82  import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
83  import org.apache.hadoop.hdfs.DistributedFileSystem;
84  import org.apache.hadoop.hdfs.protocol.FSConstants;
85  import org.apache.hadoop.io.IOUtils;
86  import org.apache.hadoop.io.SequenceFile;
87  import org.apache.hadoop.ipc.RemoteException;
88  import org.apache.hadoop.security.UserGroupInformation;
89  import org.apache.hadoop.util.Progressable;
90  import org.apache.hadoop.util.ReflectionUtils;
91  import org.apache.hadoop.util.StringUtils;
92  
93  import com.google.common.annotations.VisibleForTesting;
94  import com.google.common.base.Throwables;
95  import com.google.common.collect.Iterators;
96  import com.google.common.primitives.Ints;
97  
98  import edu.umd.cs.findbugs.annotations.CheckForNull;
99  
100 
101 import static org.apache.hadoop.hbase.HConstants.HBASE_DIR;
102 
103 /**
104  * Utility methods for interacting with the underlying file system.
105  */
106 @InterfaceAudience.Private
107 public abstract class FSUtils {
108   private static final Log LOG = LogFactory.getLog(FSUtils.class);
109 
110   /** Full access permissions (starting point for a umask) */
111   public static final String FULL_RWX_PERMISSIONS = "777";
112   private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
113   private static final int DEFAULT_THREAD_POOLSIZE = 2;
114 
115   /** Set to true on Windows platforms */
116   public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
117 
118   protected FSUtils() {
119     super();
120   }
121 
122   /**
123    * Sets storage policy for given path according to config setting.
124    * If the passed path is a directory, we'll set the storage policy for all files
125    * created in the future in said directory. Note that this change in storage
126    * policy takes place at the HDFS level; it will persist beyond this RS's lifecycle.
127    * If we're running on a version of HDFS that doesn't support the given storage policy
128    * (or storage policies at all), then we'll issue a log message and continue.
129    *
130    * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
131    *
132    * @param fs We only do anything if an instance of DistributedFileSystem
133    * @param conf used to look up storage policy with given key; not modified.
134    * @param path the Path whose storage policy is to be set
135    * @param policyKey e.g. HConstants.WAL_STORAGE_POLICY
136    * @param defaultPolicy usually should be the policy NONE to delegate to HDFS
137    */
138   public static void setStoragePolicy(final FileSystem fs, final Configuration conf,
139       final Path path, final String policyKey, final String defaultPolicy) {
140     String storagePolicy = conf.get(policyKey, defaultPolicy).toUpperCase();
141     if (storagePolicy.equals(defaultPolicy)) {
142       if (LOG.isTraceEnabled()) {
143         LOG.trace("default policy of " + defaultPolicy + " requested, exiting early.");
144       }
145       return;
146     }
147     if (fs instanceof DistributedFileSystem) {
148       DistributedFileSystem dfs = (DistributedFileSystem)fs;
149       // Once our minimum supported Hadoop version is 2.6.0 we can remove reflection.
150       Class<? extends DistributedFileSystem> dfsClass = dfs.getClass();
151       Method m = null;
152       try {
153         m = dfsClass.getDeclaredMethod("setStoragePolicy",
154             new Class<?>[] { Path.class, String.class });
155         m.setAccessible(true);
156       } catch (NoSuchMethodException e) {
157         LOG.info("FileSystem doesn't support"
158             + " setStoragePolicy; --HDFS-6584 not available");
159       } catch (SecurityException e) {
160         LOG.info("Doesn't have access to setStoragePolicy on "
161             + "FileSystems --HDFS-6584 not available", e);
162         m = null; // could happen on setAccessible()
163       }
164       if (m != null) {
165         try {
166           m.invoke(dfs, path, storagePolicy);
167           LOG.info("set " + storagePolicy + " for " + path);
168         } catch (Exception e) {
169           // check for lack of HDFS-7228
170           boolean probablyBadPolicy = false;
171           if (e instanceof InvocationTargetException) {
172             final Throwable exception = e.getCause();
173             if (exception instanceof RemoteException &&
174                 HadoopIllegalArgumentException.class.getName().equals(
175                     ((RemoteException)exception).getClassName())) {
176               LOG.warn("Given storage policy, '" + storagePolicy + "', was rejected and probably " +
177                   "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " +
178                   "trying to use SSD related policies then you're likely missing HDFS-7228. For " +
179                   "more information see the 'ArchivalStorage' docs for your Hadoop release.");
180               LOG.debug("More information about the invalid storage policy.", exception);
181               probablyBadPolicy = true;
182             }
183           }
184           if (!probablyBadPolicy) {
185             // This swallows FNFE, should we be throwing it? seems more likely to indicate dev
186             // misuse than a runtime problem with HDFS.
187             LOG.warn("Unable to set " + storagePolicy + " for " + path, e);
188           }
189         }
190       }
191     } else {
192       LOG.info("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't " +
193           "support setStoragePolicy.");
194     }
195   }
196 
197   /**
198    * @return True is <code>fs</code> is instance of DistributedFileSystem
199    * @throws IOException
200    */
201   public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException {
202     FileSystem fileSystem = fs;
203     // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem.
204     // Check its backing fs for dfs-ness.
205     if (fs instanceof HFileSystem) {
206       fileSystem = ((HFileSystem)fs).getBackingFs();
207     }
208     return fileSystem instanceof DistributedFileSystem;
209   }
210 
211   /**
212    * Compare of path component. Does not consider schema; i.e. if schemas different but <code>path
213    * <code> starts with <code>rootPath<code>, then the function returns true
214    * @param rootPath
215    * @param path
216    * @return True if <code>path</code> starts with <code>rootPath</code>
217    */
218   public static boolean isStartingWithPath(final Path rootPath, final String path) {
219     String uriRootPath = rootPath.toUri().getPath();
220     String tailUriPath = (new Path(path)).toUri().getPath();
221     return tailUriPath.startsWith(uriRootPath);
222   }
223 
224   /**
225    * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
226    * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
227    * the two will equate.
228    * @param pathToSearch Path we will be trying to match.
229    * @param pathTail
230    * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
231    */
232   public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
233     return isMatchingTail(pathToSearch, new Path(pathTail));
234   }
235 
236   /**
237    * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
238    * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true.  Does not consider
239    * schema; i.e. if schemas different but path or subpath matches, the two will equate.
240    * @param pathToSearch Path we will be trying to match.
241    * @param pathTail
242    * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
243    */
244   public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
245     if (pathToSearch.depth() != pathTail.depth()) return false;
246     Path tailPath = pathTail;
247     String tailName;
248     Path toSearch = pathToSearch;
249     String toSearchName;
250     boolean result = false;
251     do {
252       tailName = tailPath.getName();
253       if (tailName == null || tailName.length() <= 0) {
254         result = true;
255         break;
256       }
257       toSearchName = toSearch.getName();
258       if (toSearchName == null || toSearchName.length() <= 0) break;
259       // Move up a parent on each path for next go around.  Path doesn't let us go off the end.
260       tailPath = tailPath.getParent();
261       toSearch = toSearch.getParent();
262     } while(tailName.equals(toSearchName));
263     return result;
264   }
265 
266   public static FSUtils getInstance(FileSystem fs, Configuration conf) {
267     String scheme = fs.getUri().getScheme();
268     if (scheme == null) {
269       LOG.warn("Could not find scheme for uri " +
270           fs.getUri() + ", default to hdfs");
271       scheme = "hdfs";
272     }
273     Class<?> fsUtilsClass = conf.getClass("hbase.fsutil." +
274         scheme + ".impl", FSHDFSUtils.class); // Default to HDFS impl
275     FSUtils fsUtils = (FSUtils)ReflectionUtils.newInstance(fsUtilsClass, conf);
276     return fsUtils;
277   }
278 
279   /**
280    * Delete if exists.
281    * @param fs filesystem object
282    * @param dir directory to delete
283    * @return True if deleted <code>dir</code>
284    * @throws IOException e
285    */
286   public static boolean deleteDirectory(final FileSystem fs, final Path dir)
287   throws IOException {
288     return fs.exists(dir) && fs.delete(dir, true);
289   }
290 
291   /**
292    * Delete the region directory if exists.
293    * @param conf
294    * @param hri
295    * @return True if deleted the region directory.
296    * @throws IOException
297    */
298   public static boolean deleteRegionDir(final Configuration conf, final HRegionInfo hri)
299   throws IOException {
300     Path rootDir = getRootDir(conf);
301     FileSystem fs = rootDir.getFileSystem(conf);
302     return deleteDirectory(fs,
303       new Path(getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
304   }
305 
306   /**
307    * Return the number of bytes that large input files should be optimally
308    * be split into to minimize i/o time.
309    *
310    * use reflection to search for getDefaultBlockSize(Path f)
311    * if the method doesn't exist, fall back to using getDefaultBlockSize()
312    *
313    * @param fs filesystem object
314    * @return the default block size for the path's filesystem
315    * @throws IOException e
316    */
317   public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException {
318     Method m = null;
319     Class<? extends FileSystem> cls = fs.getClass();
320     try {
321       m = cls.getMethod("getDefaultBlockSize", new Class<?>[] { Path.class });
322     } catch (NoSuchMethodException e) {
323       LOG.info("FileSystem doesn't support getDefaultBlockSize");
324     } catch (SecurityException e) {
325       LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e);
326       m = null; // could happen on setAccessible()
327     }
328     if (m == null) {
329       return fs.getDefaultBlockSize(path);
330     } else {
331       try {
332         Object ret = m.invoke(fs, path);
333         return ((Long)ret).longValue();
334       } catch (Exception e) {
335         throw new IOException(e);
336       }
337     }
338   }
339 
340   /*
341    * Get the default replication.
342    *
343    * use reflection to search for getDefaultReplication(Path f)
344    * if the method doesn't exist, fall back to using getDefaultReplication()
345    *
346    * @param fs filesystem object
347    * @param f path of file
348    * @return default replication for the path's filesystem
349    * @throws IOException e
350    */
351   public static short getDefaultReplication(final FileSystem fs, final Path path) throws IOException {
352     Method m = null;
353     Class<? extends FileSystem> cls = fs.getClass();
354     try {
355       m = cls.getMethod("getDefaultReplication", new Class<?>[] { Path.class });
356     } catch (NoSuchMethodException e) {
357       LOG.info("FileSystem doesn't support getDefaultReplication");
358     } catch (SecurityException e) {
359       LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e);
360       m = null; // could happen on setAccessible()
361     }
362     if (m == null) {
363       return fs.getDefaultReplication(path);
364     } else {
365       try {
366         Object ret = m.invoke(fs, path);
367         return ((Number)ret).shortValue();
368       } catch (Exception e) {
369         throw new IOException(e);
370       }
371     }
372   }
373 
374   /**
375    * Returns the default buffer size to use during writes.
376    *
377    * The size of the buffer should probably be a multiple of hardware
378    * page size (4096 on Intel x86), and it determines how much data is
379    * buffered during read and write operations.
380    *
381    * @param fs filesystem object
382    * @return default buffer size to use during writes
383    */
384   public static int getDefaultBufferSize(final FileSystem fs) {
385     return fs.getConf().getInt("io.file.buffer.size", 4096);
386   }
387 
388   /**
389    * Create the specified file on the filesystem. By default, this will:
390    * <ol>
391    * <li>overwrite the file if it exists</li>
392    * <li>apply the umask in the configuration (if it is enabled)</li>
393    * <li>use the fs configured buffer size (or 4096 if not set)</li>
394    * <li>use the default replication</li>
395    * <li>use the default block size</li>
396    * <li>not track progress</li>
397    * </ol>
398    *
399    * @param fs {@link FileSystem} on which to write the file
400    * @param path {@link Path} to the file to write
401    * @param perm permissions
402    * @param favoredNodes
403    * @return output stream to the created file
404    * @throws IOException if the file cannot be created
405    */
406   public static FSDataOutputStream create(FileSystem fs, Path path,
407       FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
408     if (fs instanceof HFileSystem) {
409       FileSystem backingFs = ((HFileSystem)fs).getBackingFs();
410       if (backingFs instanceof DistributedFileSystem) {
411         // Try to use the favoredNodes version via reflection to allow backwards-
412         // compatibility.
413         try {
414           return (FSDataOutputStream) (DistributedFileSystem.class
415               .getDeclaredMethod("create", Path.class, FsPermission.class,
416                   boolean.class, int.class, short.class, long.class,
417                   Progressable.class, InetSocketAddress[].class)
418                   .invoke(backingFs, path, perm, true,
419                       getDefaultBufferSize(backingFs),
420                       getDefaultReplication(backingFs, path),
421                       getDefaultBlockSize(backingFs, path),
422                       null, favoredNodes));
423         } catch (InvocationTargetException ite) {
424           // Function was properly called, but threw it's own exception.
425           throw new IOException(ite.getCause());
426         } catch (NoSuchMethodException e) {
427           LOG.debug("DFS Client does not support most favored nodes create; using default create");
428           if (LOG.isTraceEnabled()) LOG.trace("Ignoring; use default create", e);
429         } catch (IllegalArgumentException e) {
430           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
431         } catch (SecurityException e) {
432           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
433         } catch (IllegalAccessException e) {
434           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
435         }
436       }
437     }
438     return create(fs, path, perm, true);
439   }
440 
441   /**
442    * Create the specified file on the filesystem. By default, this will:
443    * <ol>
444    * <li>apply the umask in the configuration (if it is enabled)</li>
445    * <li>use the fs configured buffer size (or 4096 if not set)</li>
446    * <li>use the default replication</li>
447    * <li>use the default block size</li>
448    * <li>not track progress</li>
449    * </ol>
450    *
451    * @param fs {@link FileSystem} on which to write the file
452    * @param path {@link Path} to the file to write
453    * @param perm
454    * @param overwrite Whether or not the created file should be overwritten.
455    * @return output stream to the created file
456    * @throws IOException if the file cannot be created
457    */
458   public static FSDataOutputStream create(FileSystem fs, Path path,
459       FsPermission perm, boolean overwrite) throws IOException {
460     if (LOG.isTraceEnabled()) {
461       LOG.trace("Creating file=" + path + " with permission=" + perm + ", overwrite=" + overwrite);
462     }
463     return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
464         getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
465   }
466 
467   /**
468    * Get the file permissions specified in the configuration, if they are
469    * enabled.
470    *
471    * @param fs filesystem that the file will be created on.
472    * @param conf configuration to read for determining if permissions are
473    *          enabled and which to use
474    * @param permssionConfKey property key in the configuration to use when
475    *          finding the permission
476    * @return the permission to use when creating a new file on the fs. If
477    *         special permissions are not specified in the configuration, then
478    *         the default permissions on the the fs will be returned.
479    */
480   public static FsPermission getFilePermissions(final FileSystem fs,
481       final Configuration conf, final String permssionConfKey) {
482     boolean enablePermissions = conf.getBoolean(
483         HConstants.ENABLE_DATA_FILE_UMASK, false);
484 
485     if (enablePermissions) {
486       try {
487         FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
488         // make sure that we have a mask, if not, go default.
489         String mask = conf.get(permssionConfKey);
490         if (mask == null)
491           return FsPermission.getFileDefault();
492         // appy the umask
493         FsPermission umask = new FsPermission(mask);
494         return perm.applyUMask(umask);
495       } catch (IllegalArgumentException e) {
496         LOG.warn(
497             "Incorrect umask attempted to be created: "
498                 + conf.get(permssionConfKey)
499                 + ", using default file permissions.", e);
500         return FsPermission.getFileDefault();
501       }
502     }
503     return FsPermission.getFileDefault();
504   }
505 
506   /**
507    * Checks to see if the specified file system is available
508    *
509    * @param fs filesystem
510    * @throws IOException e
511    */
512   public static void checkFileSystemAvailable(final FileSystem fs)
513   throws IOException {
514     if (!(fs instanceof DistributedFileSystem)) {
515       return;
516     }
517     IOException exception = null;
518     DistributedFileSystem dfs = (DistributedFileSystem) fs;
519     try {
520       if (dfs.exists(new Path("/"))) {
521         return;
522       }
523     } catch (IOException e) {
524       exception = RemoteExceptionHandler.checkIOException(e);
525     }
526     try {
527       fs.close();
528     } catch (Exception e) {
529       LOG.error("file system close failed: ", e);
530     }
531     IOException io = new IOException("File system is not available");
532     io.initCause(exception);
533     throw io;
534   }
535 
536   /**
537    * We use reflection because {@link DistributedFileSystem#setSafeMode(
538    * HdfsConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1
539    *
540    * @param dfs
541    * @return whether we're in safe mode
542    * @throws IOException
543    */
544   private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
545     boolean inSafeMode = false;
546     try {
547       Method m = DistributedFileSystem.class.getMethod("setSafeMode", new Class<?> []{
548           org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.class, boolean.class});
549       inSafeMode = (Boolean) m.invoke(dfs,
550         org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET, true);
551     } catch (Exception e) {
552       if (e instanceof IOException) throw (IOException) e;
553 
554       // Check whether dfs is on safemode.
555       inSafeMode = dfs.setSafeMode(
556         org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET);
557     }
558     return inSafeMode;
559   }
560 
561   /**
562    * Check whether dfs is in safemode.
563    * @param conf
564    * @throws IOException
565    */
566   public static void checkDfsSafeMode(final Configuration conf)
567   throws IOException {
568     boolean isInSafeMode = false;
569     FileSystem fs = FileSystem.get(conf);
570     if (fs instanceof DistributedFileSystem) {
571       DistributedFileSystem dfs = (DistributedFileSystem)fs;
572       isInSafeMode = isInSafeMode(dfs);
573     }
574     if (isInSafeMode) {
575       throw new IOException("File system is in safemode, it can't be written now");
576     }
577   }
578 
579   /**
580    * Verifies current version of file system
581    *
582    * @param fs filesystem object
583    * @param rootdir root hbase directory
584    * @return null if no version file exists, version string otherwise.
585    * @throws IOException e
586    * @throws org.apache.hadoop.hbase.exceptions.DeserializationException
587    */
588   public static String getVersion(FileSystem fs, Path rootdir)
589   throws IOException, DeserializationException {
590     Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
591     FileStatus[] status = null;
592     try {
593       // hadoop 2.0 throws FNFE if directory does not exist.
594       // hadoop 1.0 returns null if directory does not exist.
595       status = fs.listStatus(versionFile);
596     } catch (FileNotFoundException fnfe) {
597       return null;
598     }
599     if (status == null || status.length == 0) return null;
600     String version = null;
601     byte [] content = new byte [(int)status[0].getLen()];
602     FSDataInputStream s = fs.open(versionFile);
603     try {
604       IOUtils.readFully(s, content, 0, content.length);
605       if (ProtobufUtil.isPBMagicPrefix(content)) {
606         version = parseVersionFrom(content);
607       } else {
608         // Presume it pre-pb format.
609         InputStream is = new ByteArrayInputStream(content);
610         DataInputStream dis = new DataInputStream(is);
611         try {
612           version = dis.readUTF();
613         } finally {
614           dis.close();
615         }
616       }
617     } catch (EOFException eof) {
618       LOG.warn("Version file was empty, odd, will try to set it.");
619     } finally {
620       s.close();
621     }
622     return version;
623   }
624 
625   /**
626    * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
627    * @param bytes The byte content of the hbase.version file.
628    * @return The version found in the file as a String.
629    * @throws DeserializationException
630    */
631   static String parseVersionFrom(final byte [] bytes)
632   throws DeserializationException {
633     ProtobufUtil.expectPBMagicPrefix(bytes);
634     int pblen = ProtobufUtil.lengthOfPBMagic();
635     FSProtos.HBaseVersionFileContent.Builder builder =
636       FSProtos.HBaseVersionFileContent.newBuilder();
637     try {
638       ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
639       return builder.getVersion();
640     } catch (IOException e) {
641       // Convert
642       throw new DeserializationException(e);
643     }
644   }
645 
646   /**
647    * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
648    * @param version Version to persist
649    * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix.
650    */
651   static byte [] toVersionByteArray(final String version) {
652     FSProtos.HBaseVersionFileContent.Builder builder =
653       FSProtos.HBaseVersionFileContent.newBuilder();
654     return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
655   }
656 
657   /**
658    * Verifies current version of file system
659    *
660    * @param fs file system
661    * @param rootdir root directory of HBase installation
662    * @param message if true, issues a message on System.out
663    *
664    * @throws IOException e
665    * @throws DeserializationException
666    */
667   public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
668   throws IOException, DeserializationException {
669     checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
670   }
671 
672   /**
673    * Verifies current version of file system
674    *
675    * @param fs file system
676    * @param rootdir root directory of HBase installation
677    * @param message if true, issues a message on System.out
678    * @param wait wait interval
679    * @param retries number of times to retry
680    *
681    * @throws IOException e
682    * @throws DeserializationException
683    */
684   public static void checkVersion(FileSystem fs, Path rootdir,
685       boolean message, int wait, int retries)
686   throws IOException, DeserializationException {
687     String version = getVersion(fs, rootdir);
688     if (version == null) {
689       if (!metaRegionExists(fs, rootdir)) {
690         // rootDir is empty (no version file and no root region)
691         // just create new version file (HBASE-1195)
692         setVersion(fs, rootdir, wait, retries);
693         return;
694       }
695     } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) return;
696 
697     // version is deprecated require migration
698     // Output on stdout so user sees it in terminal.
699     String msg = "HBase file layout needs to be upgraded."
700       + " You have version " + version
701       + " and I want version " + HConstants.FILE_SYSTEM_VERSION
702       + ". Consult http://hbase.apache.org/book.html for further information about upgrading HBase."
703       + " Is your hbase.rootdir valid? If so, you may need to run "
704       + "'hbase hbck -fixVersionFile'.";
705     if (message) {
706       System.out.println("WARNING! " + msg);
707     }
708     throw new FileSystemVersionException(msg);
709   }
710 
711   /**
712    * Sets version of file system
713    *
714    * @param fs filesystem object
715    * @param rootdir hbase root
716    * @throws IOException e
717    */
718   public static void setVersion(FileSystem fs, Path rootdir)
719   throws IOException {
720     setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
721       HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
722   }
723 
724   /**
725    * Sets version of file system
726    *
727    * @param fs filesystem object
728    * @param rootdir hbase root
729    * @param wait time to wait for retry
730    * @param retries number of times to retry before failing
731    * @throws IOException e
732    */
733   public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
734   throws IOException {
735     setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
736   }
737 
738 
739   /**
740    * Sets version of file system
741    *
742    * @param fs filesystem object
743    * @param rootdir hbase root directory
744    * @param version version to set
745    * @param wait time to wait for retry
746    * @param retries number of times to retry before throwing an IOException
747    * @throws IOException e
748    */
749   public static void setVersion(FileSystem fs, Path rootdir, String version,
750       int wait, int retries) throws IOException {
751     Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
752     Path tempVersionFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR +
753       HConstants.VERSION_FILE_NAME);
754     while (true) {
755       try {
756         // Write the version to a temporary file
757         FSDataOutputStream s = fs.create(tempVersionFile);
758         try {
759           s.write(toVersionByteArray(version));
760           s.close();
761           s = null;
762           // Move the temp version file to its normal location. Returns false
763           // if the rename failed. Throw an IOE in that case.
764           if (!fs.rename(tempVersionFile, versionFile)) {
765             throw new IOException("Unable to move temp version file to " + versionFile);
766           }
767         } finally {
768           // Cleaning up the temporary if the rename failed would be trying
769           // too hard. We'll unconditionally create it again the next time
770           // through anyway, files are overwritten by default by create().
771 
772           // Attempt to close the stream on the way out if it is still open.
773           try {
774             if (s != null) s.close();
775           } catch (IOException ignore) { }
776         }
777         LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
778         return;
779       } catch (IOException e) {
780         if (retries > 0) {
781           LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
782           fs.delete(versionFile, false);
783           try {
784             if (wait > 0) {
785               Thread.sleep(wait);
786             }
787           } catch (InterruptedException ie) {
788             throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
789           }
790           retries--;
791         } else {
792           throw e;
793         }
794       }
795     }
796   }
797 
798   /**
799    * Checks that a cluster ID file exists in the HBase root directory
800    * @param fs the root directory FileSystem
801    * @param rootdir the HBase root directory in HDFS
802    * @param wait how long to wait between retries
803    * @return <code>true</code> if the file exists, otherwise <code>false</code>
804    * @throws IOException if checking the FileSystem fails
805    */
806   public static boolean checkClusterIdExists(FileSystem fs, Path rootdir,
807       int wait) throws IOException {
808     while (true) {
809       try {
810         Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
811         return fs.exists(filePath);
812       } catch (IOException ioe) {
813         if (wait > 0) {
814           LOG.warn("Unable to check cluster ID file in " + rootdir.toString() +
815               ", retrying in "+wait+"msec: "+StringUtils.stringifyException(ioe));
816           try {
817             Thread.sleep(wait);
818           } catch (InterruptedException e) {
819             throw (InterruptedIOException)new InterruptedIOException().initCause(e);
820           }
821         } else {
822           throw ioe;
823         }
824       }
825     }
826   }
827 
828   /**
829    * Returns the value of the unique cluster ID stored for this HBase instance.
830    * @param fs the root directory FileSystem
831    * @param rootdir the path to the HBase root directory
832    * @return the unique cluster identifier
833    * @throws IOException if reading the cluster ID file fails
834    */
835   public static ClusterId getClusterId(FileSystem fs, Path rootdir)
836   throws IOException {
837     Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
838     ClusterId clusterId = null;
839     FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath):  null;
840     if (status != null) {
841       int len = Ints.checkedCast(status.getLen());
842       byte [] content = new byte[len];
843       FSDataInputStream in = fs.open(idPath);
844       try {
845         in.readFully(content);
846       } catch (EOFException eof) {
847         LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
848       } finally{
849         in.close();
850       }
851       try {
852         clusterId = ClusterId.parseFrom(content);
853       } catch (DeserializationException e) {
854         throw new IOException("content=" + Bytes.toString(content), e);
855       }
856       // If not pb'd, make it so.
857       if (!ProtobufUtil.isPBMagicPrefix(content)) {
858         String cid = null;
859         in = fs.open(idPath);
860         try {
861           cid = in.readUTF();
862           clusterId = new ClusterId(cid);
863         } catch (EOFException eof) {
864           LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
865         } finally {
866           in.close();
867         }
868         rewriteAsPb(fs, rootdir, idPath, clusterId);
869       }
870       return clusterId;
871     } else {
872       LOG.warn("Cluster ID file does not exist at " + idPath.toString());
873     }
874     return clusterId;
875   }
876 
877   /**
878    * @param cid
879    * @throws IOException
880    */
881   private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
882       final ClusterId cid)
883   throws IOException {
884     // Rewrite the file as pb.  Move aside the old one first, write new
885     // then delete the moved-aside file.
886     Path movedAsideName = new Path(p + "." + System.currentTimeMillis());
887     if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
888     setClusterId(fs, rootdir, cid, 100);
889     if (!fs.delete(movedAsideName, false)) {
890       throw new IOException("Failed delete of " + movedAsideName);
891     }
892     LOG.debug("Rewrote the hbase.id file as pb");
893   }
894 
895   /**
896    * Writes a new unique identifier for this cluster to the "hbase.id" file
897    * in the HBase root directory
898    * @param fs the root directory FileSystem
899    * @param rootdir the path to the HBase root directory
900    * @param clusterId the unique identifier to store
901    * @param wait how long (in milliseconds) to wait between retries
902    * @throws IOException if writing to the FileSystem fails and no wait value
903    */
904   public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId,
905       int wait) throws IOException {
906     while (true) {
907       try {
908         Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
909         Path tempIdFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY +
910           Path.SEPARATOR + HConstants.CLUSTER_ID_FILE_NAME);
911         // Write the id file to a temporary location
912         FSDataOutputStream s = fs.create(tempIdFile);
913         try {
914           s.write(clusterId.toByteArray());
915           s.close();
916           s = null;
917           // Move the temporary file to its normal location. Throw an IOE if
918           // the rename failed
919           if (!fs.rename(tempIdFile, idFile)) {
920             throw new IOException("Unable to move temp version file to " + idFile);
921           }
922         } finally {
923           // Attempt to close the stream if still open on the way out
924           try {
925             if (s != null) s.close();
926           } catch (IOException ignore) { }
927         }
928         if (LOG.isDebugEnabled()) {
929           LOG.debug("Created cluster ID file at " + idFile.toString() + " with ID: " + clusterId);
930         }
931         return;
932       } catch (IOException ioe) {
933         if (wait > 0) {
934           LOG.warn("Unable to create cluster ID file in " + rootdir.toString() +
935               ", retrying in " + wait + "msec: " + StringUtils.stringifyException(ioe));
936           try {
937             Thread.sleep(wait);
938           } catch (InterruptedException e) {
939             throw (InterruptedIOException)new InterruptedIOException().initCause(e);
940           }
941         } else {
942           throw ioe;
943         }
944       }
945     }
946   }
947 
948   /**
949    * Verifies root directory path is a valid URI with a scheme
950    *
951    * @param root root directory path
952    * @return Passed <code>root</code> argument.
953    * @throws IOException if not a valid URI with a scheme
954    */
955   public static Path validateRootPath(Path root) throws IOException {
956     try {
957       URI rootURI = new URI(root.toString());
958       String scheme = rootURI.getScheme();
959       if (scheme == null) {
960         throw new IOException("Root directory does not have a scheme");
961       }
962       return root;
963     } catch (URISyntaxException e) {
964       IOException io = new IOException("Root directory path is not a valid " +
965         "URI -- check your " + HBASE_DIR + " configuration");
966       io.initCause(e);
967       throw io;
968     }
969   }
970 
971   /**
972    * Checks for the presence of the WAL log root path (using the provided conf object) in the given path. If
973    * it exists, this method removes it and returns the String representation of remaining relative path.
974    * @param path
975    * @param conf
976    * @return String representation of the remaining relative path
977    * @throws IOException
978    */
979   public static String removeWALRootPath(Path path, final Configuration conf) throws IOException {
980     Path root = getWALRootDir(conf);
981     String pathStr = path.toString();
982     // check that the path is absolute... it has the root path in it.
983     if (!pathStr.startsWith(root.toString())) return pathStr;
984     // if not, return as it is.
985     return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
986   }
987 
988   /**
989    * If DFS, check safe mode and if so, wait until we clear it.
990    * @param conf configuration
991    * @param wait Sleep between retries
992    * @throws IOException e
993    */
994   public static void waitOnSafeMode(final Configuration conf,
995     final long wait)
996   throws IOException {
997     FileSystem fs = FileSystem.get(conf);
998     if (!(fs instanceof DistributedFileSystem)) return;
999     DistributedFileSystem dfs = (DistributedFileSystem)fs;
1000     // Make sure dfs is not in safe mode
1001     while (isInSafeMode(dfs)) {
1002       LOG.info("Waiting for dfs to exit safe mode...");
1003       try {
1004         Thread.sleep(wait);
1005       } catch (InterruptedException e) {
1006         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1007       }
1008     }
1009   }
1010 
1011   /**
1012    * Return the 'path' component of a Path.  In Hadoop, Path is an URI.  This
1013    * method returns the 'path' component of a Path's URI: e.g. If a Path is
1014    * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>,
1015    * this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>.
1016    * This method is useful if you want to print out a Path without qualifying
1017    * Filesystem instance.
1018    * @param p Filesystem Path whose 'path' component we are to return.
1019    * @return Path portion of the Filesystem
1020    */
1021   public static String getPath(Path p) {
1022     return p.toUri().getPath();
1023   }
1024 
1025   /**
1026    * @param c configuration
1027    * @return {@link Path} to hbase root directory: i.e. {@value org.apache.hadoop.hbase.HConstants#HBASE_DIR} from
1028    * configuration as a qualified Path.
1029    * @throws IOException e
1030    */
1031   public static Path getRootDir(final Configuration c) throws IOException {
1032     Path p = new Path(c.get(HBASE_DIR));
1033     FileSystem fs = p.getFileSystem(c);
1034     return p.makeQualified(fs);
1035   }
1036 
1037   public static void setRootDir(final Configuration c, final Path root) throws IOException {
1038     c.set(HBASE_DIR, root.toString());
1039   }
1040 
1041   public static void setFsDefault(final Configuration c, final Path root) throws IOException {
1042     c.set("fs.defaultFS", root.toString());    // for hadoop 0.21+
1043   }
1044 
1045   public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException {
1046     Path p = getRootDir(c);
1047     return p.getFileSystem(c);
1048   }
1049 
1050   /**
1051    * @param c configuration
1052    * @return {@link Path} to hbase log root directory: i.e. {@value org.apache.hadoop.hbase.fs.HFileSystem#HBASE_WAL_DIR} from
1053    * configuration as a qualified Path. Defaults to {@value org.apache.hadoop.hbase.HConstants#HBASE_DIR}
1054    * @throws IOException e
1055    */
1056   public static Path getWALRootDir(final Configuration c) throws IOException {
1057     Path p = new Path(c.get(HFileSystem.HBASE_WAL_DIR, c.get(HBASE_DIR)));
1058     if (!isValidWALRootDir(p, c)) {
1059       return FSUtils.getRootDir(c);
1060     }
1061     FileSystem fs = p.getFileSystem(c);
1062     return p.makeQualified(fs);
1063   }
1064 
1065   @VisibleForTesting
1066   public static void setWALRootDir(final Configuration c, final Path root) throws IOException {
1067     c.set(HFileSystem.HBASE_WAL_DIR, root.toString());
1068   }
1069 
1070   public static FileSystem getWALFileSystem(final Configuration c) throws IOException {
1071     Path p = getWALRootDir(c);
1072     return p.getFileSystem(c);
1073   }
1074 
1075   private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException {
1076     Path rootDir = FSUtils.getRootDir(c);
1077     if (walDir != rootDir) {
1078       if (walDir.toString().startsWith(rootDir.toString() + "/")) {
1079         throw new IllegalStateException("Illegal WAL directory specified. " +
1080             "WAL directories are not permitted to be under the root directory if set.");
1081       }
1082     }
1083     return true;
1084   }
1085 
1086   /**
1087    * Checks if meta region exists
1088    *
1089    * @param fs file system
1090    * @param rootdir root directory of HBase installation
1091    * @return true if exists
1092    * @throws IOException e
1093    */
1094   @SuppressWarnings("deprecation")
1095   public static boolean metaRegionExists(FileSystem fs, Path rootdir)
1096   throws IOException {
1097     Path metaRegionDir =
1098       HRegion.getRegionDir(rootdir, HRegionInfo.FIRST_META_REGIONINFO);
1099     return fs.exists(metaRegionDir);
1100   }
1101 
1102   /**
1103    * Compute HDFS blocks distribution of a given file, or a portion of the file
1104    * @param fs file system
1105    * @param status file status of the file
1106    * @param start start position of the portion
1107    * @param length length of the portion
1108    * @return The HDFS blocks distribution
1109    */
1110   static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
1111     final FileSystem fs, FileStatus status, long start, long length)
1112     throws IOException {
1113     HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
1114     BlockLocation [] blockLocations =
1115       fs.getFileBlockLocations(status, start, length);
1116     for(BlockLocation bl : blockLocations) {
1117       String [] hosts = bl.getHosts();
1118       long len = bl.getLength();
1119       blocksDistribution.addHostsAndBlockWeight(hosts, len);
1120     }
1121 
1122     return blocksDistribution;
1123   }
1124 
1125 
1126 
1127   /**
1128    * Runs through the hbase rootdir and checks all stores have only
1129    * one file in them -- that is, they've been major compacted.  Looks
1130    * at root and meta tables too.
1131    * @param fs filesystem
1132    * @param hbaseRootDir hbase root directory
1133    * @return True if this hbase install is major compacted.
1134    * @throws IOException e
1135    */
1136   public static boolean isMajorCompacted(final FileSystem fs,
1137       final Path hbaseRootDir)
1138   throws IOException {
1139     List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
1140     PathFilter regionFilter = new RegionDirFilter(fs);
1141     PathFilter familyFilter = new FamilyDirFilter(fs);
1142     for (Path d : tableDirs) {
1143       FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
1144       for (FileStatus regionDir : regionDirs) {
1145         Path dd = regionDir.getPath();
1146         // Else its a region name.  Now look in region for families.
1147         FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
1148         for (FileStatus familyDir : familyDirs) {
1149           Path family = familyDir.getPath();
1150           // Now in family make sure only one file.
1151           FileStatus[] familyStatus = fs.listStatus(family);
1152           if (familyStatus.length > 1) {
1153             LOG.debug(family.toString() + " has " + familyStatus.length +
1154                 " files.");
1155             return false;
1156           }
1157         }
1158       }
1159     }
1160     return true;
1161   }
1162 
1163   // TODO move this method OUT of FSUtils. No dependencies to HMaster
1164   /**
1165    * Returns the total overall fragmentation percentage. Includes hbase:meta and
1166    * -ROOT- as well.
1167    *
1168    * @param master  The master defining the HBase root and file system.
1169    * @return A map for each table and its percentage.
1170    * @throws IOException When scanning the directory fails.
1171    */
1172   public static int getTotalTableFragmentation(final HMaster master)
1173   throws IOException {
1174     Map<String, Integer> map = getTableFragmentation(master);
1175     return map != null && map.size() > 0 ? map.get("-TOTAL-") : -1;
1176   }
1177 
1178   /**
1179    * Runs through the HBase rootdir and checks how many stores for each table
1180    * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
1181    * percentage across all tables is stored under the special key "-TOTAL-".
1182    *
1183    * @param master  The master defining the HBase root and file system.
1184    * @return A map for each table and its percentage.
1185    *
1186    * @throws IOException When scanning the directory fails.
1187    */
1188   public static Map<String, Integer> getTableFragmentation(
1189     final HMaster master)
1190   throws IOException {
1191     Path path = getRootDir(master.getConfiguration());
1192     // since HMaster.getFileSystem() is package private
1193     FileSystem fs = path.getFileSystem(master.getConfiguration());
1194     return getTableFragmentation(fs, path);
1195   }
1196 
1197   /**
1198    * Runs through the HBase rootdir and checks how many stores for each table
1199    * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
1200    * percentage across all tables is stored under the special key "-TOTAL-".
1201    *
1202    * @param fs  The file system to use.
1203    * @param hbaseRootDir  The root directory to scan.
1204    * @return A map for each table and its percentage.
1205    * @throws IOException When scanning the directory fails.
1206    */
1207   public static Map<String, Integer> getTableFragmentation(
1208     final FileSystem fs, final Path hbaseRootDir)
1209   throws IOException {
1210     Map<String, Integer> frags = new HashMap<String, Integer>();
1211     int cfCountTotal = 0;
1212     int cfFragTotal = 0;
1213     PathFilter regionFilter = new RegionDirFilter(fs);
1214     PathFilter familyFilter = new FamilyDirFilter(fs);
1215     List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
1216     for (Path d : tableDirs) {
1217       int cfCount = 0;
1218       int cfFrag = 0;
1219       FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
1220       for (FileStatus regionDir : regionDirs) {
1221         Path dd = regionDir.getPath();
1222         // else its a region name, now look in region for families
1223         FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
1224         for (FileStatus familyDir : familyDirs) {
1225           cfCount++;
1226           cfCountTotal++;
1227           Path family = familyDir.getPath();
1228           // now in family make sure only one file
1229           FileStatus[] familyStatus = fs.listStatus(family);
1230           if (familyStatus.length > 1) {
1231             cfFrag++;
1232             cfFragTotal++;
1233           }
1234         }
1235       }
1236       // compute percentage per table and store in result list
1237       frags.put(FSUtils.getTableName(d).getNameAsString(),
1238         cfCount == 0? 0: Math.round((float) cfFrag / cfCount * 100));
1239     }
1240     // set overall percentage for all tables
1241     frags.put("-TOTAL-",
1242       cfCountTotal == 0? 0: Math.round((float) cfFragTotal / cfCountTotal * 100));
1243     return frags;
1244   }
1245 
1246   /**
1247    * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
1248    * path rootdir
1249    *
1250    * @param rootdir qualified path of HBase root directory
1251    * @param tableName name of table
1252    * @return {@link org.apache.hadoop.fs.Path} for table
1253    */
1254   public static Path getTableDir(Path rootdir, final TableName tableName) {
1255     return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
1256         tableName.getQualifierAsString());
1257   }
1258 
1259   /**
1260    * Returns the {@link org.apache.hadoop.hbase.TableName} object representing
1261    * the table directory under
1262    * path rootdir
1263    *
1264    * @param tablePath path of table
1265    * @return {@link org.apache.hadoop.fs.Path} for table
1266    */
1267   public static TableName getTableName(Path tablePath) {
1268     return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
1269   }
1270 
1271   /**
1272    * Returns the {@link org.apache.hadoop.fs.Path} object representing
1273    * the namespace directory under path rootdir
1274    *
1275    * @param rootdir qualified path of HBase root directory
1276    * @param namespace namespace name
1277    * @return {@link org.apache.hadoop.fs.Path} for table
1278    */
1279   public static Path getNamespaceDir(Path rootdir, final String namespace) {
1280     return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR,
1281         new Path(namespace)));
1282   }
1283 
1284   /**
1285    * A {@link PathFilter} that returns only regular files.
1286    */
1287   static class FileFilter extends AbstractFileStatusFilter {
1288     private final FileSystem fs;
1289 
1290     public FileFilter(final FileSystem fs) {
1291       this.fs = fs;
1292     }
1293 
1294     @Override
1295     protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1296       try {
1297         return isFile(fs, isDir, p);
1298       } catch (IOException e) {
1299         LOG.warn("unable to verify if path=" + p + " is a regular file", e);
1300         return false;
1301       }
1302     }
1303   }
1304 
1305   /**
1306    * Directory filter that doesn't include any of the directories in the specified blacklist
1307    */
1308   public static class BlackListDirFilter extends AbstractFileStatusFilter {
1309     private final FileSystem fs;
1310     private List<String> blacklist;
1311 
1312     /**
1313      * Create a filter on the givem filesystem with the specified blacklist
1314      * @param fs filesystem to filter
1315      * @param directoryNameBlackList list of the names of the directories to filter. If
1316      *          <tt>null</tt>, all directories are returned
1317      */
1318     @SuppressWarnings("unchecked")
1319     public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
1320       this.fs = fs;
1321       blacklist =
1322         (List<String>) (directoryNameBlackList == null ? Collections.emptyList()
1323           : directoryNameBlackList);
1324     }
1325 
1326     @Override
1327     protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1328       if (!isValidName(p.getName())) {
1329         return false;
1330       }
1331 
1332       try {
1333         return isDirectory(fs, isDir, p);
1334       } catch (IOException e) {
1335         LOG.warn("An error occurred while verifying if [" + p.toString()
1336             + "] is a valid directory. Returning 'not valid' and continuing.", e);
1337         return false;
1338       }
1339     }
1340 
1341     protected boolean isValidName(final String name) {
1342       return !blacklist.contains(name);
1343     }
1344   }
1345 
1346   /**
1347    * A {@link PathFilter} that only allows directories.
1348    */
1349   public static class DirFilter extends BlackListDirFilter {
1350 
1351     public DirFilter(FileSystem fs) {
1352       super(fs, null);
1353     }
1354   }
1355 
1356   /**
1357    * A {@link PathFilter} that returns usertable directories. To get all directories use the
1358    * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
1359    */
1360   public static class UserTableDirFilter extends BlackListDirFilter {
1361     public UserTableDirFilter(FileSystem fs) {
1362       super(fs, HConstants.HBASE_NON_TABLE_DIRS);
1363     }
1364 
1365     protected boolean isValidName(final String name) {
1366       if (!super.isValidName(name))
1367         return false;
1368 
1369       try {
1370         TableName.isLegalTableQualifierName(Bytes.toBytes(name));
1371       } catch (IllegalArgumentException e) {
1372         LOG.info("INVALID NAME " + name);
1373         return false;
1374       }
1375       return true;
1376     }
1377   }
1378 
1379   /**
1380    * Heuristic to determine whether is safe or not to open a file for append
1381    * Looks both for dfs.support.append and use reflection to search
1382    * for SequenceFile.Writer.syncFs() or FSDataOutputStream.hflush()
1383    * @param conf
1384    * @return True if append support
1385    */
1386   public static boolean isAppendSupported(final Configuration conf) {
1387     boolean append = conf.getBoolean("dfs.support.append", false);
1388     if (append) {
1389       try {
1390         // TODO: The implementation that comes back when we do a createWriter
1391         // may not be using SequenceFile so the below is not a definitive test.
1392         // Will do for now (hdfs-200).
1393         SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
1394         append = true;
1395       } catch (SecurityException e) {
1396       } catch (NoSuchMethodException e) {
1397         append = false;
1398       }
1399     }
1400     if (!append) {
1401       // Look for the 0.21, 0.22, new-style append evidence.
1402       try {
1403         FSDataOutputStream.class.getMethod("hflush", new Class<?> []{});
1404         append = true;
1405       } catch (NoSuchMethodException e) {
1406         append = false;
1407       }
1408     }
1409     return append;
1410   }
1411 
1412   /**
1413    * @param conf
1414    * @return True if this filesystem whose scheme is 'hdfs'.
1415    * @throws IOException
1416    */
1417   public static boolean isHDFS(final Configuration conf) throws IOException {
1418     FileSystem fs = FileSystem.get(conf);
1419     String scheme = fs.getUri().getScheme();
1420     return scheme.equalsIgnoreCase("hdfs");
1421   }
1422 
1423   /**
1424    * Recover file lease. Used when a file might be suspect
1425    * to be had been left open by another process.
1426    * @param fs FileSystem handle
1427    * @param p Path of file to recover lease
1428    * @param conf Configuration handle
1429    * @throws IOException
1430    */
1431   public abstract void recoverFileLease(final FileSystem fs, final Path p,
1432       Configuration conf, CancelableProgressable reporter) throws IOException;
1433 
1434   public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
1435       throws IOException {
1436     List<Path> tableDirs = new LinkedList<Path>();
1437 
1438     for(FileStatus status :
1439         fs.globStatus(new Path(rootdir,
1440             new Path(HConstants.BASE_NAMESPACE_DIR, "*")))) {
1441       tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
1442     }
1443     return tableDirs;
1444   }
1445 
1446   /**
1447    * @param fs
1448    * @param rootdir
1449    * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as
1450    * .logs, .oldlogs, .corrupt folders.
1451    * @throws IOException
1452    */
1453   public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
1454       throws IOException {
1455     // presumes any directory under hbase.rootdir is a table
1456     FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
1457     List<Path> tabledirs = new ArrayList<Path>(dirs.length);
1458     for (FileStatus dir: dirs) {
1459       tabledirs.add(dir.getPath());
1460     }
1461     return tabledirs;
1462   }
1463 
1464   /**
1465    * Checks if the given path is the one with 'recovered.edits' dir.
1466    * @param path
1467    * @return True if we recovered edits
1468    */
1469   public static boolean isRecoveredEdits(Path path) {
1470     return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
1471   }
1472 
1473   /**
1474    * Filter for all dirs that don't start with '.'
1475    */
1476   public static class RegionDirFilter extends AbstractFileStatusFilter {
1477     // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
1478     final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
1479     final FileSystem fs;
1480 
1481     public RegionDirFilter(FileSystem fs) {
1482       this.fs = fs;
1483     }
1484 
1485     @Override
1486     protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1487       if (!regionDirPattern.matcher(p.getName()).matches()) {
1488         return false;
1489       }
1490 
1491       try {
1492         return isDirectory(fs, isDir, p);
1493       } catch (IOException ioe) {
1494         // Maybe the file was moved or the fs was disconnected.
1495         LOG.warn("Skipping file " + p +" due to IOException", ioe);
1496         return false;
1497       }
1498     }
1499   }
1500 
1501   /**
1502    * Given a particular table dir, return all the regiondirs inside it, excluding files such as
1503    * .tableinfo
1504    * @param fs A file system for the Path
1505    * @param tableDir Path to a specific table directory <hbase.rootdir>/<tabledir>
1506    * @return List of paths to valid region directories in table dir.
1507    * @throws IOException
1508    */
1509   public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException {
1510     // assumes we are in a table dir.
1511     List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1512     if (rds == null) {
1513       return new ArrayList<Path>();
1514     }
1515     List<Path> regionDirs = new ArrayList<Path>(rds.size());
1516     for (FileStatus rdfs: rds) {
1517       Path rdPath = rdfs.getPath();
1518       regionDirs.add(rdPath);
1519     }
1520     return regionDirs;
1521   }
1522 
1523   /**
1524    * Filter for all dirs that are legal column family names.  This is generally used for colfam
1525    * dirs <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>.
1526    */
1527   public static class FamilyDirFilter extends AbstractFileStatusFilter {
1528     final FileSystem fs;
1529 
1530     public FamilyDirFilter(FileSystem fs) {
1531       this.fs = fs;
1532     }
1533 
1534     @Override
1535     protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1536       try {
1537         // throws IAE if invalid
1538         HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(p.getName()));
1539       } catch (IllegalArgumentException iae) {
1540         // path name is an invalid family name and thus is excluded.
1541         return false;
1542       }
1543 
1544       try {
1545         return isDirectory(fs, isDir, p);
1546       } catch (IOException ioe) {
1547         // Maybe the file was moved or the fs was disconnected.
1548         LOG.warn("Skipping file " + p +" due to IOException", ioe);
1549         return false;
1550       }
1551     }
1552   }
1553 
1554   /**
1555    * Given a particular region dir, return all the familydirs inside it
1556    *
1557    * @param fs A file system for the Path
1558    * @param regionDir Path to a specific region directory
1559    * @return List of paths to valid family directories in region dir.
1560    * @throws IOException
1561    */
1562   public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
1563     // assumes we are in a region dir.
1564     FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
1565     List<Path> familyDirs = new ArrayList<Path>(fds.length);
1566     for (FileStatus fdfs: fds) {
1567       Path fdPath = fdfs.getPath();
1568       familyDirs.add(fdPath);
1569     }
1570     return familyDirs;
1571   }
1572 
1573   public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException {
1574     List<FileStatus> fds = listStatusWithStatusFilter(fs, familyDir, new ReferenceFileFilter(fs));
1575     if (fds == null) {
1576       return new ArrayList<Path>();
1577     }
1578     List<Path> referenceFiles = new ArrayList<Path>(fds.size());
1579     for (FileStatus fdfs: fds) {
1580       Path fdPath = fdfs.getPath();
1581       referenceFiles.add(fdPath);
1582     }
1583     return referenceFiles;
1584   }
1585 
1586   /**
1587    * Filter for HFiles that excludes reference files.
1588    */
1589   public static class HFileFilter extends AbstractFileStatusFilter {
1590     final FileSystem fs;
1591 
1592     public HFileFilter(FileSystem fs) {
1593       this.fs = fs;
1594     }
1595 
1596     @Override
1597     protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1598       if (!StoreFileInfo.isHFile(p)) {
1599         return false;
1600       }
1601 
1602       try {
1603         return isFile(fs, isDir, p);
1604       } catch (IOException ioe) {
1605         // Maybe the file was moved or the fs was disconnected.
1606         LOG.warn("Skipping file " + p +" due to IOException", ioe);
1607         return false;
1608       }
1609     }
1610   }
1611 
1612   /**
1613    * Filter for HFileLinks (StoreFiles and HFiles not included).
1614    * the filter itself does not consider if a link is file or not.
1615    */
1616   public static class HFileLinkFilter implements PathFilter {
1617 
1618     @Override
1619     public boolean accept(Path p) {
1620       return HFileLink.isHFileLink(p);
1621     }
1622   }
1623 
1624   public static class ReferenceFileFilter extends AbstractFileStatusFilter {
1625 
1626     private final FileSystem fs;
1627 
1628     public ReferenceFileFilter(FileSystem fs) {
1629       this.fs = fs;
1630     }
1631 
1632     @Override
1633     protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1634       if (!StoreFileInfo.isReference(p)) {
1635         return false;
1636       }
1637 
1638       try {
1639         // only files can be references.
1640         return isFile(fs, isDir, p);
1641       } catch (IOException ioe) {
1642         // Maybe the file was moved or the fs was disconnected.
1643         LOG.warn("Skipping file " + p +" due to IOException", ioe);
1644         return false;
1645       }
1646     }
1647   }
1648 
1649 
1650   /**
1651    * @param conf
1652    * @return Returns the filesystem of the hbase rootdir.
1653    * @throws IOException
1654    */
1655   public static FileSystem getCurrentFileSystem(Configuration conf)
1656   throws IOException {
1657     return getRootDir(conf).getFileSystem(conf);
1658   }
1659 
1660 
1661   /**
1662    * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1663    * table StoreFile names to the full Path.
1664    * <br>
1665    * Example...<br>
1666    * Key = 3944417774205889744  <br>
1667    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1668    *
1669    * @param map map to add values.  If null, this method will create and populate one to return
1670    * @param fs  The file system to use.
1671    * @param hbaseRootDir  The root directory to scan.
1672    * @param tableName name of the table to scan.
1673    * @return Map keyed by StoreFile name with a value of the full Path.
1674    * @throws IOException When scanning the directory fails.
1675    * @throws InterruptedException
1676    */
1677   public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1678   final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1679   throws IOException, InterruptedException {
1680     return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null, null);
1681   }
1682 
1683   /**
1684    * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1685    * table StoreFile names to the full Path.  Note that because this method can be called
1686    * on a 'live' HBase system that we will skip files that no longer exist by the time
1687    * we traverse them and similarly the user of the result needs to consider that some
1688    * entries in this map may not exist by the time this call completes.
1689    * <br>
1690    * Example...<br>
1691    * Key = 3944417774205889744  <br>
1692    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1693    *
1694    * @param resultMap map to add values.  If null, this method will create and populate one to return
1695    * @param fs  The file system to use.
1696    * @param hbaseRootDir  The root directory to scan.
1697    * @param tableName name of the table to scan.
1698    * @param sfFilter optional path filter to apply to store files
1699    * @param executor optional executor service to parallelize this operation
1700    * @param errors ErrorReporter instance or null
1701    * @return Map keyed by StoreFile name with a value of the full Path.
1702    * @throws IOException When scanning the directory fails.
1703    * @throws InterruptedException
1704    */
1705   public static Map<String, Path> getTableStoreFilePathMap(
1706       Map<String, Path> resultMap,
1707       final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1708       ExecutorService executor, final ErrorReporter errors) throws IOException, InterruptedException {
1709 
1710     final Map<String, Path> finalResultMap =
1711         resultMap == null ? new ConcurrentHashMap<String, Path>(128, 0.75f, 32) : resultMap;
1712 
1713     // only include the directory paths to tables
1714     Path tableDir = FSUtils.getTableDir(hbaseRootDir, tableName);
1715     // Inside a table, there are compaction.dir directories to skip.  Otherwise, all else
1716     // should be regions.
1717     final FamilyDirFilter familyFilter = new FamilyDirFilter(fs);
1718     final Vector<Exception> exceptions = new Vector<Exception>();
1719 
1720     try {
1721       List<FileStatus> regionDirs = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1722       if (regionDirs == null) {
1723         return finalResultMap;
1724       }
1725 
1726       final List<Future<?>> futures = new ArrayList<Future<?>>(regionDirs.size());
1727 
1728       for (FileStatus regionDir : regionDirs) {
1729         if (null != errors) {
1730           errors.progress();
1731         }
1732         final Path dd = regionDir.getPath();
1733 
1734         if (!exceptions.isEmpty()) {
1735           break;
1736         }
1737 
1738         Runnable getRegionStoreFileMapCall = new Runnable() {
1739           @Override
1740           public void run() {
1741             try {
1742               HashMap<String,Path> regionStoreFileMap = new HashMap<String, Path>();
1743               List<FileStatus> familyDirs = FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter);
1744               if (familyDirs == null) {
1745                 if (!fs.exists(dd)) {
1746                   LOG.warn("Skipping region because it no longer exists: " + dd);
1747                 } else {
1748                   LOG.warn("Skipping region because it has no family dirs: " + dd);
1749                 }
1750                 return;
1751               }
1752               for (FileStatus familyDir : familyDirs) {
1753                 if (null != errors) {
1754                   errors.progress();
1755                 }
1756                 Path family = familyDir.getPath();
1757                 if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1758                   continue;
1759                 }
1760                 // now in family, iterate over the StoreFiles and
1761                 // put in map
1762                 FileStatus[] familyStatus = fs.listStatus(family);
1763                 for (FileStatus sfStatus : familyStatus) {
1764                   if (null != errors) {
1765                     errors.progress();
1766                   }
1767                   Path sf = sfStatus.getPath();
1768                   if (sfFilter == null || sfFilter.accept(sf)) {
1769                     regionStoreFileMap.put( sf.getName(), sf);
1770                   }
1771                 }
1772               }
1773               finalResultMap.putAll(regionStoreFileMap);
1774             } catch (Exception e) {
1775               LOG.error("Could not get region store file map for region: " + dd, e);
1776               exceptions.add(e);
1777             }
1778           }
1779         };
1780 
1781         // If executor is available, submit async tasks to exec concurrently, otherwise
1782         // just do serial sync execution
1783         if (executor != null) {
1784           Future<?> future = executor.submit(getRegionStoreFileMapCall);
1785           futures.add(future);
1786         } else {
1787           FutureTask<?> future = new FutureTask<Object>(getRegionStoreFileMapCall, null);
1788           future.run();
1789           futures.add(future);
1790         }
1791       }
1792 
1793       // Ensure all pending tasks are complete (or that we run into an exception)
1794       for (Future<?> f : futures) {
1795         if (!exceptions.isEmpty()) {
1796           break;
1797         }
1798         try {
1799           f.get();
1800         } catch (ExecutionException e) {
1801           LOG.error("Unexpected exec exception!  Should've been caught already.  (Bug?)", e);
1802           // Shouldn't happen, we already logged/caught any exceptions in the Runnable
1803         }
1804       }
1805     } catch (IOException e) {
1806       LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e);
1807       exceptions.add(e);
1808     } finally {
1809       if (!exceptions.isEmpty()) {
1810         // Just throw the first exception as an indication something bad happened
1811         // Don't need to propagate all the exceptions, we already logged them all anyway
1812         Throwables.propagateIfInstanceOf(exceptions.firstElement(), IOException.class);
1813         throw Throwables.propagate(exceptions.firstElement());
1814       }
1815     }
1816 
1817     return finalResultMap;
1818   }
1819 
1820   public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1821     int result = 0;
1822     try {
1823       for (Path familyDir:getFamilyDirs(fs, p)){
1824         result += getReferenceFilePaths(fs, familyDir).size();
1825       }
1826     } catch (IOException e) {
1827       LOG.warn("Error Counting reference files.", e);
1828     }
1829     return result;
1830   }
1831 
1832   /**
1833    * Runs through the HBase rootdir and creates a reverse lookup map for
1834    * table StoreFile names to the full Path.
1835    * <br>
1836    * Example...<br>
1837    * Key = 3944417774205889744  <br>
1838    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1839    *
1840    * @param fs  The file system to use.
1841    * @param hbaseRootDir  The root directory to scan.
1842    * @return Map keyed by StoreFile name with a value of the full Path.
1843    * @throws IOException When scanning the directory fails.
1844    * @throws InterruptedException
1845    */
1846   public static Map<String, Path> getTableStoreFilePathMap(
1847     final FileSystem fs, final Path hbaseRootDir)
1848   throws IOException, InterruptedException {
1849     return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, null);
1850   }
1851 
1852   /**
1853    * Runs through the HBase rootdir and creates a reverse lookup map for
1854    * table StoreFile names to the full Path.
1855    * <br>
1856    * Example...<br>
1857    * Key = 3944417774205889744  <br>
1858    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1859    *
1860    * @param fs  The file system to use.
1861    * @param hbaseRootDir  The root directory to scan.
1862    * @param sfFilter optional path filter to apply to store files
1863    * @param executor optional executor service to parallelize this operation
1864    * @param errors ErrorReporter instance or null
1865    * @return Map keyed by StoreFile name with a value of the full Path.
1866    * @throws IOException When scanning the directory fails.
1867    * @throws InterruptedException
1868    */
1869   public static Map<String, Path> getTableStoreFilePathMap(
1870     final FileSystem fs, final Path hbaseRootDir, PathFilter sfFilter,
1871     ExecutorService executor, ErrorReporter errors)
1872   throws IOException, InterruptedException {
1873     ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<String, Path>(1024, 0.75f, 32);
1874 
1875     // if this method looks similar to 'getTableFragmentation' that is because
1876     // it was borrowed from it.
1877 
1878     // only include the directory paths to tables
1879     for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1880       getTableStoreFilePathMap(map, fs, hbaseRootDir,
1881           FSUtils.getTableName(tableDir), sfFilter, executor, errors);
1882     }
1883     return map;
1884   }
1885 
1886   /**
1887    * Filters FileStatuses in an array and returns a list
1888    *
1889    * @param input   An array of FileStatuses
1890    * @param filter  A required filter to filter the array
1891    * @return        A list of FileStatuses
1892    */
1893   public static List<FileStatus> filterFileStatuses(FileStatus[] input,
1894       FileStatusFilter filter) {
1895     if (input == null) return null;
1896     return filterFileStatuses(Iterators.forArray(input), filter);
1897   }
1898 
1899   /**
1900    * Filters FileStatuses in an iterator and returns a list
1901    *
1902    * @param input   An iterator of FileStatuses
1903    * @param filter  A required filter to filter the array
1904    * @return        A list of FileStatuses
1905    */
1906   public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input,
1907       FileStatusFilter filter) {
1908     if (input == null) return null;
1909     ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1910     while (input.hasNext()) {
1911       FileStatus f = input.next();
1912       if (filter.accept(f)) {
1913         results.add(f);
1914       }
1915     }
1916     return results;
1917   }
1918 
1919   /**
1920    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1921    * This accommodates differences between hadoop versions, where hadoop 1
1922    * does not throw a FileNotFoundException, and return an empty FileStatus[]
1923    * while Hadoop 2 will throw FileNotFoundException.
1924    *
1925    * @param fs file system
1926    * @param dir directory
1927    * @param filter file status filter
1928    * @return null if dir is empty or doesn't exist, otherwise FileStatus list
1929    */
1930   public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs,
1931       final Path dir, final FileStatusFilter filter) throws IOException {
1932     FileStatus [] status = null;
1933     try {
1934       status = fs.listStatus(dir);
1935     } catch (FileNotFoundException fnfe) {
1936       // if directory doesn't exist, return null
1937       if (LOG.isTraceEnabled()) {
1938         LOG.trace(dir + " doesn't exist");
1939       }
1940     }
1941 
1942     if (status == null || status.length < 1)  {
1943       return null;
1944     }
1945 
1946     if (filter == null) {
1947       return Arrays.asList(status);
1948     } else {
1949       List<FileStatus> status2 = filterFileStatuses(status, filter);
1950       if (status2 == null || status2.isEmpty()) {
1951         return null;
1952       } else {
1953         return status2;
1954       }
1955     }
1956   }
1957 
1958   /**
1959    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1960    * This accommodates differences between hadoop versions, where hadoop 1
1961    * does not throw a FileNotFoundException, and return an empty FileStatus[]
1962    * while Hadoop 2 will throw FileNotFoundException.
1963    *
1964    * Where possible, prefer {@link #listStatusWithStatusFilter(FileSystem,
1965    * Path, FileStatusFilter)} instead.
1966    *
1967    * @param fs file system
1968    * @param dir directory
1969    * @param filter path filter
1970    * @return null if dir is empty or doesn't exist, otherwise FileStatus array
1971    */
1972   public static FileStatus [] listStatus(final FileSystem fs,
1973       final Path dir, final PathFilter filter) throws IOException {
1974     FileStatus [] status = null;
1975     try {
1976       status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
1977     } catch (FileNotFoundException fnfe) {
1978       // if directory doesn't exist, return null
1979       if (LOG.isTraceEnabled()) {
1980         LOG.trace(dir + " doesn't exist");
1981       }
1982     }
1983     if (status == null || status.length < 1) return null;
1984     return status;
1985   }
1986 
1987   /**
1988    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1989    * This would accommodates differences between hadoop versions
1990    *
1991    * @param fs file system
1992    * @param dir directory
1993    * @return null if dir is empty or doesn't exist, otherwise FileStatus array
1994    */
1995   public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
1996     return listStatus(fs, dir, null);
1997   }
1998 
1999   /**
2000    * Calls fs.delete() and returns the value returned by the fs.delete()
2001    *
2002    * @param fs
2003    * @param path
2004    * @param recursive
2005    * @return the value returned by the fs.delete()
2006    * @throws IOException
2007    */
2008   public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
2009       throws IOException {
2010     return fs.delete(path, recursive);
2011   }
2012 
2013   /**
2014    * Calls fs.exists(). Checks if the specified path exists
2015    *
2016    * @param fs
2017    * @param path
2018    * @return the value returned by fs.exists()
2019    * @throws IOException
2020    */
2021   public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
2022     return fs.exists(path);
2023   }
2024 
2025   /**
2026    * Throw an exception if an action is not permitted by a user on a file.
2027    *
2028    * @param ugi
2029    *          the user
2030    * @param file
2031    *          the file
2032    * @param action
2033    *          the action
2034    */
2035   public static void checkAccess(UserGroupInformation ugi, FileStatus file,
2036       FsAction action) throws AccessDeniedException {
2037     if (ugi.getShortUserName().equals(file.getOwner())) {
2038       if (file.getPermission().getUserAction().implies(action)) {
2039         return;
2040       }
2041     } else if (contains(ugi.getGroupNames(), file.getGroup())) {
2042       if (file.getPermission().getGroupAction().implies(action)) {
2043         return;
2044       }
2045     } else if (file.getPermission().getOtherAction().implies(action)) {
2046       return;
2047     }
2048     throw new AccessDeniedException("Permission denied:" + " action=" + action
2049         + " path=" + file.getPath() + " user=" + ugi.getShortUserName());
2050   }
2051 
2052   private static boolean contains(String[] groups, String user) {
2053     for (String group : groups) {
2054       if (group.equals(user)) {
2055         return true;
2056       }
2057     }
2058     return false;
2059   }
2060 
2061   /**
2062    * Log the current state of the filesystem from a certain root directory
2063    * @param fs filesystem to investigate
2064    * @param root root file/directory to start logging from
2065    * @param LOG log to output information
2066    * @throws IOException if an unexpected exception occurs
2067    */
2068   public static void logFileSystemState(final FileSystem fs, final Path root, Log LOG)
2069       throws IOException {
2070     LOG.debug("Current file system:");
2071     logFSTree(LOG, fs, root, "|-");
2072   }
2073 
2074   /**
2075    * Recursive helper to log the state of the FS
2076    *
2077    * @see #logFileSystemState(FileSystem, Path, Log)
2078    */
2079   private static void logFSTree(Log LOG, final FileSystem fs, final Path root, String prefix)
2080       throws IOException {
2081     FileStatus[] files = FSUtils.listStatus(fs, root, null);
2082     if (files == null) return;
2083 
2084     for (FileStatus file : files) {
2085       if (file.isDirectory()) {
2086         LOG.debug(prefix + file.getPath().getName() + "/");
2087         logFSTree(LOG, fs, file.getPath(), prefix + "---");
2088       } else {
2089         LOG.debug(prefix + file.getPath().getName());
2090       }
2091     }
2092   }
2093 
2094   public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
2095       throws IOException {
2096     // set the modify time for TimeToLive Cleaner
2097     fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1);
2098     return fs.rename(src, dest);
2099   }
2100 
2101   /**
2102    * This function is to scan the root path of the file system to get the
2103    * degree of locality for each region on each of the servers having at least
2104    * one block of that region.
2105    * This is used by the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer}
2106    *
2107    * @param conf
2108    *          the configuration to use
2109    * @return the mapping from region encoded name to a map of server names to
2110    *           locality fraction
2111    * @throws IOException
2112    *           in case of file system errors or interrupts
2113    */
2114   public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
2115       final Configuration conf) throws IOException {
2116     return getRegionDegreeLocalityMappingFromFS(
2117         conf, null,
2118         conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
2119 
2120   }
2121 
2122   /**
2123    * This function is to scan the root path of the file system to get the
2124    * degree of locality for each region on each of the servers having at least
2125    * one block of that region.
2126    *
2127    * @param conf
2128    *          the configuration to use
2129    * @param desiredTable
2130    *          the table you wish to scan locality for
2131    * @param threadPoolSize
2132    *          the thread pool size to use
2133    * @return the mapping from region encoded name to a map of server names to
2134    *           locality fraction
2135    * @throws IOException
2136    *           in case of file system errors or interrupts
2137    */
2138   public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
2139       final Configuration conf, final String desiredTable, int threadPoolSize)
2140       throws IOException {
2141     Map<String, Map<String, Float>> regionDegreeLocalityMapping =
2142         new ConcurrentHashMap<String, Map<String, Float>>();
2143     getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, null,
2144         regionDegreeLocalityMapping);
2145     return regionDegreeLocalityMapping;
2146   }
2147 
2148   /**
2149    * This function is to scan the root path of the file system to get either the
2150    * mapping between the region name and its best locality region server or the
2151    * degree of locality of each region on each of the servers having at least
2152    * one block of that region. The output map parameters are both optional.
2153    *
2154    * @param conf
2155    *          the configuration to use
2156    * @param desiredTable
2157    *          the table you wish to scan locality for
2158    * @param threadPoolSize
2159    *          the thread pool size to use
2160    * @param regionToBestLocalityRSMapping
2161    *          the map into which to put the best locality mapping or null
2162    * @param regionDegreeLocalityMapping
2163    *          the map into which to put the locality degree mapping or null,
2164    *          must be a thread-safe implementation
2165    * @throws IOException
2166    *           in case of file system errors or interrupts
2167    */
2168   private static void getRegionLocalityMappingFromFS(
2169       final Configuration conf, final String desiredTable,
2170       int threadPoolSize,
2171       Map<String, String> regionToBestLocalityRSMapping,
2172       Map<String, Map<String, Float>> regionDegreeLocalityMapping)
2173       throws IOException {
2174     FileSystem fs =  FileSystem.get(conf);
2175     Path rootPath = FSUtils.getRootDir(conf);
2176     long startTime = EnvironmentEdgeManager.currentTime();
2177     Path queryPath;
2178     // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
2179     if (null == desiredTable) {
2180       queryPath = new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
2181     } else {
2182       queryPath = new Path(FSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
2183     }
2184 
2185     // reject all paths that are not appropriate
2186     PathFilter pathFilter = new PathFilter() {
2187       @Override
2188       public boolean accept(Path path) {
2189         // this is the region name; it may get some noise data
2190         if (null == path) {
2191           return false;
2192         }
2193 
2194         // no parent?
2195         Path parent = path.getParent();
2196         if (null == parent) {
2197           return false;
2198         }
2199 
2200         String regionName = path.getName();
2201         if (null == regionName) {
2202           return false;
2203         }
2204 
2205         if (!regionName.toLowerCase().matches("[0-9a-f]+")) {
2206           return false;
2207         }
2208         return true;
2209       }
2210     };
2211 
2212     FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
2213 
2214     if (null == statusList) {
2215       return;
2216     } else {
2217       LOG.debug("Query Path: " + queryPath + " ; # list of files: " +
2218           statusList.length);
2219     }
2220 
2221     // lower the number of threads in case we have very few expected regions
2222     threadPoolSize = Math.min(threadPoolSize, statusList.length);
2223 
2224     // run in multiple threads
2225     ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadPoolSize,
2226         threadPoolSize, 60, TimeUnit.SECONDS,
2227         new ArrayBlockingQueue<Runnable>(statusList.length));
2228     try {
2229       // ignore all file status items that are not of interest
2230       for (FileStatus regionStatus : statusList) {
2231         if (null == regionStatus) {
2232           continue;
2233         }
2234 
2235         if (!regionStatus.isDirectory()) {
2236           continue;
2237         }
2238 
2239         Path regionPath = regionStatus.getPath();
2240         if (null == regionPath) {
2241           continue;
2242         }
2243 
2244         tpe.execute(new FSRegionScanner(fs, regionPath,
2245             regionToBestLocalityRSMapping, regionDegreeLocalityMapping));
2246       }
2247     } finally {
2248       tpe.shutdown();
2249       int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
2250           60 * 1000);
2251       try {
2252         // here we wait until TPE terminates, which is either naturally or by
2253         // exceptions in the execution of the threads
2254         while (!tpe.awaitTermination(threadWakeFrequency,
2255             TimeUnit.MILLISECONDS)) {
2256           // printing out rough estimate, so as to not introduce
2257           // AtomicInteger
2258           LOG.info("Locality checking is underway: { Scanned Regions : "
2259               + tpe.getCompletedTaskCount() + "/"
2260               + tpe.getTaskCount() + " }");
2261         }
2262       } catch (InterruptedException e) {
2263         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
2264       }
2265     }
2266 
2267     long overhead = EnvironmentEdgeManager.currentTime() - startTime;
2268     String overheadMsg = "Scan DFS for locality info takes " + overhead + " ms";
2269 
2270     LOG.info(overheadMsg);
2271   }
2272 
2273   /**
2274    * Do our short circuit read setup.
2275    * Checks buffer size to use and whether to do checksumming in hbase or hdfs.
2276    * @param conf
2277    */
2278   public static void setupShortCircuitRead(final Configuration conf) {
2279     // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
2280     boolean shortCircuitSkipChecksum =
2281       conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
2282     boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
2283     if (shortCircuitSkipChecksum) {
2284       LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
2285         "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " +
2286         "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
2287       assert !shortCircuitSkipChecksum; //this will fail if assertions are on
2288     }
2289     checkShortCircuitReadBufferSize(conf);
2290   }
2291 
2292   /**
2293    * Check if short circuit read buffer size is set and if not, set it to hbase value.
2294    * @param conf
2295    */
2296   public static void checkShortCircuitReadBufferSize(final Configuration conf) {
2297     final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
2298     final int notSet = -1;
2299     // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
2300     final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
2301     int size = conf.getInt(dfsKey, notSet);
2302     // If a size is set, return -- we will use it.
2303     if (size != notSet) return;
2304     // But short circuit buffer size is normally not set.  Put in place the hbase wanted size.
2305     int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
2306     conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
2307   }
2308 }