1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21 import java.io.ByteArrayInputStream;
22 import java.io.DataInputStream;
23 import java.io.EOFException;
24 import java.io.FileNotFoundException;
25 import java.io.IOException;
26 import java.io.InputStream;
27 import java.io.InterruptedIOException;
28 import java.lang.reflect.InvocationTargetException;
29 import java.lang.reflect.Method;
30 import java.net.InetSocketAddress;
31 import java.net.URI;
32 import java.net.URISyntaxException;
33 import java.util.ArrayList;
34 import java.util.Arrays;
35 import java.util.Collections;
36 import java.util.HashMap;
37 import java.util.Iterator;
38 import java.util.LinkedList;
39 import java.util.List;
40 import java.util.Map;
41 import java.util.Vector;
42 import java.util.concurrent.ArrayBlockingQueue;
43 import java.util.concurrent.ConcurrentHashMap;
44 import java.util.concurrent.ExecutionException;
45 import java.util.concurrent.ExecutorService;
46 import java.util.concurrent.Future;
47 import java.util.concurrent.FutureTask;
48 import java.util.concurrent.ThreadPoolExecutor;
49 import java.util.concurrent.TimeUnit;
50 import java.util.regex.Pattern;
51
52 import org.apache.commons.logging.Log;
53 import org.apache.commons.logging.LogFactory;
54 import org.apache.hadoop.HadoopIllegalArgumentException;
55 import org.apache.hadoop.conf.Configuration;
56 import org.apache.hadoop.fs.BlockLocation;
57 import org.apache.hadoop.fs.FSDataInputStream;
58 import org.apache.hadoop.fs.FSDataOutputStream;
59 import org.apache.hadoop.fs.FileStatus;
60 import org.apache.hadoop.fs.FileSystem;
61 import org.apache.hadoop.fs.Path;
62 import org.apache.hadoop.fs.PathFilter;
63 import org.apache.hadoop.fs.permission.FsAction;
64 import org.apache.hadoop.fs.permission.FsPermission;
65 import org.apache.hadoop.hbase.ClusterId;
66 import org.apache.hadoop.hbase.HColumnDescriptor;
67 import org.apache.hadoop.hbase.HConstants;
68 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
69 import org.apache.hadoop.hbase.HRegionInfo;
70 import org.apache.hadoop.hbase.RemoteExceptionHandler;
71 import org.apache.hadoop.hbase.TableName;
72 import org.apache.hadoop.hbase.classification.InterfaceAudience;
73 import org.apache.hadoop.hbase.exceptions.DeserializationException;
74 import org.apache.hadoop.hbase.fs.HFileSystem;
75 import org.apache.hadoop.hbase.io.HFileLink;
76 import org.apache.hadoop.hbase.master.HMaster;
77 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
78 import org.apache.hadoop.hbase.protobuf.generated.FSProtos;
79 import org.apache.hadoop.hbase.regionserver.HRegion;
80 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
81 import org.apache.hadoop.hbase.security.AccessDeniedException;
82 import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
83 import org.apache.hadoop.hdfs.DistributedFileSystem;
84 import org.apache.hadoop.hdfs.protocol.FSConstants;
85 import org.apache.hadoop.io.IOUtils;
86 import org.apache.hadoop.io.SequenceFile;
87 import org.apache.hadoop.ipc.RemoteException;
88 import org.apache.hadoop.security.UserGroupInformation;
89 import org.apache.hadoop.util.Progressable;
90 import org.apache.hadoop.util.ReflectionUtils;
91 import org.apache.hadoop.util.StringUtils;
92
93 import com.google.common.annotations.VisibleForTesting;
94 import com.google.common.base.Throwables;
95 import com.google.common.collect.Iterators;
96 import com.google.common.primitives.Ints;
97
98 import edu.umd.cs.findbugs.annotations.CheckForNull;
99
100
101 import static org.apache.hadoop.hbase.HConstants.HBASE_DIR;
102
103
104
105
106 @InterfaceAudience.Private
107 public abstract class FSUtils {
108 private static final Log LOG = LogFactory.getLog(FSUtils.class);
109
110
111 public static final String FULL_RWX_PERMISSIONS = "777";
112 private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
113 private static final int DEFAULT_THREAD_POOLSIZE = 2;
114
115
116 public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
117
118 protected FSUtils() {
119 super();
120 }
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138 public static void setStoragePolicy(final FileSystem fs, final Configuration conf,
139 final Path path, final String policyKey, final String defaultPolicy) {
140 String storagePolicy = conf.get(policyKey, defaultPolicy).toUpperCase();
141 if (storagePolicy.equals(defaultPolicy)) {
142 if (LOG.isTraceEnabled()) {
143 LOG.trace("default policy of " + defaultPolicy + " requested, exiting early.");
144 }
145 return;
146 }
147 if (fs instanceof DistributedFileSystem) {
148 DistributedFileSystem dfs = (DistributedFileSystem)fs;
149
150 Class<? extends DistributedFileSystem> dfsClass = dfs.getClass();
151 Method m = null;
152 try {
153 m = dfsClass.getDeclaredMethod("setStoragePolicy",
154 new Class<?>[] { Path.class, String.class });
155 m.setAccessible(true);
156 } catch (NoSuchMethodException e) {
157 LOG.info("FileSystem doesn't support"
158 + " setStoragePolicy; --HDFS-6584 not available");
159 } catch (SecurityException e) {
160 LOG.info("Doesn't have access to setStoragePolicy on "
161 + "FileSystems --HDFS-6584 not available", e);
162 m = null;
163 }
164 if (m != null) {
165 try {
166 m.invoke(dfs, path, storagePolicy);
167 LOG.info("set " + storagePolicy + " for " + path);
168 } catch (Exception e) {
169
170 boolean probablyBadPolicy = false;
171 if (e instanceof InvocationTargetException) {
172 final Throwable exception = e.getCause();
173 if (exception instanceof RemoteException &&
174 HadoopIllegalArgumentException.class.getName().equals(
175 ((RemoteException)exception).getClassName())) {
176 LOG.warn("Given storage policy, '" + storagePolicy + "', was rejected and probably " +
177 "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " +
178 "trying to use SSD related policies then you're likely missing HDFS-7228. For " +
179 "more information see the 'ArchivalStorage' docs for your Hadoop release.");
180 LOG.debug("More information about the invalid storage policy.", exception);
181 probablyBadPolicy = true;
182 }
183 }
184 if (!probablyBadPolicy) {
185
186
187 LOG.warn("Unable to set " + storagePolicy + " for " + path, e);
188 }
189 }
190 }
191 } else {
192 LOG.info("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't " +
193 "support setStoragePolicy.");
194 }
195 }
196
197
198
199
200
201 public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException {
202 FileSystem fileSystem = fs;
203
204
205 if (fs instanceof HFileSystem) {
206 fileSystem = ((HFileSystem)fs).getBackingFs();
207 }
208 return fileSystem instanceof DistributedFileSystem;
209 }
210
211
212
213
214
215
216
217
218 public static boolean isStartingWithPath(final Path rootPath, final String path) {
219 String uriRootPath = rootPath.toUri().getPath();
220 String tailUriPath = (new Path(path)).toUri().getPath();
221 return tailUriPath.startsWith(uriRootPath);
222 }
223
224
225
226
227
228
229
230
231
232 public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
233 return isMatchingTail(pathToSearch, new Path(pathTail));
234 }
235
236
237
238
239
240
241
242
243
244 public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
245 if (pathToSearch.depth() != pathTail.depth()) return false;
246 Path tailPath = pathTail;
247 String tailName;
248 Path toSearch = pathToSearch;
249 String toSearchName;
250 boolean result = false;
251 do {
252 tailName = tailPath.getName();
253 if (tailName == null || tailName.length() <= 0) {
254 result = true;
255 break;
256 }
257 toSearchName = toSearch.getName();
258 if (toSearchName == null || toSearchName.length() <= 0) break;
259
260 tailPath = tailPath.getParent();
261 toSearch = toSearch.getParent();
262 } while(tailName.equals(toSearchName));
263 return result;
264 }
265
266 public static FSUtils getInstance(FileSystem fs, Configuration conf) {
267 String scheme = fs.getUri().getScheme();
268 if (scheme == null) {
269 LOG.warn("Could not find scheme for uri " +
270 fs.getUri() + ", default to hdfs");
271 scheme = "hdfs";
272 }
273 Class<?> fsUtilsClass = conf.getClass("hbase.fsutil." +
274 scheme + ".impl", FSHDFSUtils.class);
275 FSUtils fsUtils = (FSUtils)ReflectionUtils.newInstance(fsUtilsClass, conf);
276 return fsUtils;
277 }
278
279
280
281
282
283
284
285
286 public static boolean deleteDirectory(final FileSystem fs, final Path dir)
287 throws IOException {
288 return fs.exists(dir) && fs.delete(dir, true);
289 }
290
291
292
293
294
295
296
297
298 public static boolean deleteRegionDir(final Configuration conf, final HRegionInfo hri)
299 throws IOException {
300 Path rootDir = getRootDir(conf);
301 FileSystem fs = rootDir.getFileSystem(conf);
302 return deleteDirectory(fs,
303 new Path(getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
304 }
305
306
307
308
309
310
311
312
313
314
315
316
317 public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException {
318 Method m = null;
319 Class<? extends FileSystem> cls = fs.getClass();
320 try {
321 m = cls.getMethod("getDefaultBlockSize", new Class<?>[] { Path.class });
322 } catch (NoSuchMethodException e) {
323 LOG.info("FileSystem doesn't support getDefaultBlockSize");
324 } catch (SecurityException e) {
325 LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e);
326 m = null;
327 }
328 if (m == null) {
329 return fs.getDefaultBlockSize(path);
330 } else {
331 try {
332 Object ret = m.invoke(fs, path);
333 return ((Long)ret).longValue();
334 } catch (Exception e) {
335 throw new IOException(e);
336 }
337 }
338 }
339
340
341
342
343
344
345
346
347
348
349
350
351 public static short getDefaultReplication(final FileSystem fs, final Path path) throws IOException {
352 Method m = null;
353 Class<? extends FileSystem> cls = fs.getClass();
354 try {
355 m = cls.getMethod("getDefaultReplication", new Class<?>[] { Path.class });
356 } catch (NoSuchMethodException e) {
357 LOG.info("FileSystem doesn't support getDefaultReplication");
358 } catch (SecurityException e) {
359 LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e);
360 m = null;
361 }
362 if (m == null) {
363 return fs.getDefaultReplication(path);
364 } else {
365 try {
366 Object ret = m.invoke(fs, path);
367 return ((Number)ret).shortValue();
368 } catch (Exception e) {
369 throw new IOException(e);
370 }
371 }
372 }
373
374
375
376
377
378
379
380
381
382
383
384 public static int getDefaultBufferSize(final FileSystem fs) {
385 return fs.getConf().getInt("io.file.buffer.size", 4096);
386 }
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406 public static FSDataOutputStream create(FileSystem fs, Path path,
407 FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
408 if (fs instanceof HFileSystem) {
409 FileSystem backingFs = ((HFileSystem)fs).getBackingFs();
410 if (backingFs instanceof DistributedFileSystem) {
411
412
413 try {
414 return (FSDataOutputStream) (DistributedFileSystem.class
415 .getDeclaredMethod("create", Path.class, FsPermission.class,
416 boolean.class, int.class, short.class, long.class,
417 Progressable.class, InetSocketAddress[].class)
418 .invoke(backingFs, path, perm, true,
419 getDefaultBufferSize(backingFs),
420 getDefaultReplication(backingFs, path),
421 getDefaultBlockSize(backingFs, path),
422 null, favoredNodes));
423 } catch (InvocationTargetException ite) {
424
425 throw new IOException(ite.getCause());
426 } catch (NoSuchMethodException e) {
427 LOG.debug("DFS Client does not support most favored nodes create; using default create");
428 if (LOG.isTraceEnabled()) LOG.trace("Ignoring; use default create", e);
429 } catch (IllegalArgumentException e) {
430 LOG.debug("Ignoring (most likely Reflection related exception) " + e);
431 } catch (SecurityException e) {
432 LOG.debug("Ignoring (most likely Reflection related exception) " + e);
433 } catch (IllegalAccessException e) {
434 LOG.debug("Ignoring (most likely Reflection related exception) " + e);
435 }
436 }
437 }
438 return create(fs, path, perm, true);
439 }
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458 public static FSDataOutputStream create(FileSystem fs, Path path,
459 FsPermission perm, boolean overwrite) throws IOException {
460 if (LOG.isTraceEnabled()) {
461 LOG.trace("Creating file=" + path + " with permission=" + perm + ", overwrite=" + overwrite);
462 }
463 return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
464 getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
465 }
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480 public static FsPermission getFilePermissions(final FileSystem fs,
481 final Configuration conf, final String permssionConfKey) {
482 boolean enablePermissions = conf.getBoolean(
483 HConstants.ENABLE_DATA_FILE_UMASK, false);
484
485 if (enablePermissions) {
486 try {
487 FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
488
489 String mask = conf.get(permssionConfKey);
490 if (mask == null)
491 return FsPermission.getFileDefault();
492
493 FsPermission umask = new FsPermission(mask);
494 return perm.applyUMask(umask);
495 } catch (IllegalArgumentException e) {
496 LOG.warn(
497 "Incorrect umask attempted to be created: "
498 + conf.get(permssionConfKey)
499 + ", using default file permissions.", e);
500 return FsPermission.getFileDefault();
501 }
502 }
503 return FsPermission.getFileDefault();
504 }
505
506
507
508
509
510
511
512 public static void checkFileSystemAvailable(final FileSystem fs)
513 throws IOException {
514 if (!(fs instanceof DistributedFileSystem)) {
515 return;
516 }
517 IOException exception = null;
518 DistributedFileSystem dfs = (DistributedFileSystem) fs;
519 try {
520 if (dfs.exists(new Path("/"))) {
521 return;
522 }
523 } catch (IOException e) {
524 exception = RemoteExceptionHandler.checkIOException(e);
525 }
526 try {
527 fs.close();
528 } catch (Exception e) {
529 LOG.error("file system close failed: ", e);
530 }
531 IOException io = new IOException("File system is not available");
532 io.initCause(exception);
533 throw io;
534 }
535
536
537
538
539
540
541
542
543
544 private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
545 boolean inSafeMode = false;
546 try {
547 Method m = DistributedFileSystem.class.getMethod("setSafeMode", new Class<?> []{
548 org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.class, boolean.class});
549 inSafeMode = (Boolean) m.invoke(dfs,
550 org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET, true);
551 } catch (Exception e) {
552 if (e instanceof IOException) throw (IOException) e;
553
554
555 inSafeMode = dfs.setSafeMode(
556 org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET);
557 }
558 return inSafeMode;
559 }
560
561
562
563
564
565
566 public static void checkDfsSafeMode(final Configuration conf)
567 throws IOException {
568 boolean isInSafeMode = false;
569 FileSystem fs = FileSystem.get(conf);
570 if (fs instanceof DistributedFileSystem) {
571 DistributedFileSystem dfs = (DistributedFileSystem)fs;
572 isInSafeMode = isInSafeMode(dfs);
573 }
574 if (isInSafeMode) {
575 throw new IOException("File system is in safemode, it can't be written now");
576 }
577 }
578
579
580
581
582
583
584
585
586
587
588 public static String getVersion(FileSystem fs, Path rootdir)
589 throws IOException, DeserializationException {
590 Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
591 FileStatus[] status = null;
592 try {
593
594
595 status = fs.listStatus(versionFile);
596 } catch (FileNotFoundException fnfe) {
597 return null;
598 }
599 if (status == null || status.length == 0) return null;
600 String version = null;
601 byte [] content = new byte [(int)status[0].getLen()];
602 FSDataInputStream s = fs.open(versionFile);
603 try {
604 IOUtils.readFully(s, content, 0, content.length);
605 if (ProtobufUtil.isPBMagicPrefix(content)) {
606 version = parseVersionFrom(content);
607 } else {
608
609 InputStream is = new ByteArrayInputStream(content);
610 DataInputStream dis = new DataInputStream(is);
611 try {
612 version = dis.readUTF();
613 } finally {
614 dis.close();
615 }
616 }
617 } catch (EOFException eof) {
618 LOG.warn("Version file was empty, odd, will try to set it.");
619 } finally {
620 s.close();
621 }
622 return version;
623 }
624
625
626
627
628
629
630
631 static String parseVersionFrom(final byte [] bytes)
632 throws DeserializationException {
633 ProtobufUtil.expectPBMagicPrefix(bytes);
634 int pblen = ProtobufUtil.lengthOfPBMagic();
635 FSProtos.HBaseVersionFileContent.Builder builder =
636 FSProtos.HBaseVersionFileContent.newBuilder();
637 try {
638 ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
639 return builder.getVersion();
640 } catch (IOException e) {
641
642 throw new DeserializationException(e);
643 }
644 }
645
646
647
648
649
650
651 static byte [] toVersionByteArray(final String version) {
652 FSProtos.HBaseVersionFileContent.Builder builder =
653 FSProtos.HBaseVersionFileContent.newBuilder();
654 return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
655 }
656
657
658
659
660
661
662
663
664
665
666
667 public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
668 throws IOException, DeserializationException {
669 checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
670 }
671
672
673
674
675
676
677
678
679
680
681
682
683
684 public static void checkVersion(FileSystem fs, Path rootdir,
685 boolean message, int wait, int retries)
686 throws IOException, DeserializationException {
687 String version = getVersion(fs, rootdir);
688 if (version == null) {
689 if (!metaRegionExists(fs, rootdir)) {
690
691
692 setVersion(fs, rootdir, wait, retries);
693 return;
694 }
695 } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) return;
696
697
698
699 String msg = "HBase file layout needs to be upgraded."
700 + " You have version " + version
701 + " and I want version " + HConstants.FILE_SYSTEM_VERSION
702 + ". Consult http://hbase.apache.org/book.html for further information about upgrading HBase."
703 + " Is your hbase.rootdir valid? If so, you may need to run "
704 + "'hbase hbck -fixVersionFile'.";
705 if (message) {
706 System.out.println("WARNING! " + msg);
707 }
708 throw new FileSystemVersionException(msg);
709 }
710
711
712
713
714
715
716
717
718 public static void setVersion(FileSystem fs, Path rootdir)
719 throws IOException {
720 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
721 HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
722 }
723
724
725
726
727
728
729
730
731
732
733 public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
734 throws IOException {
735 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
736 }
737
738
739
740
741
742
743
744
745
746
747
748
749 public static void setVersion(FileSystem fs, Path rootdir, String version,
750 int wait, int retries) throws IOException {
751 Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
752 Path tempVersionFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR +
753 HConstants.VERSION_FILE_NAME);
754 while (true) {
755 try {
756
757 FSDataOutputStream s = fs.create(tempVersionFile);
758 try {
759 s.write(toVersionByteArray(version));
760 s.close();
761 s = null;
762
763
764 if (!fs.rename(tempVersionFile, versionFile)) {
765 throw new IOException("Unable to move temp version file to " + versionFile);
766 }
767 } finally {
768
769
770
771
772
773 try {
774 if (s != null) s.close();
775 } catch (IOException ignore) { }
776 }
777 LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
778 return;
779 } catch (IOException e) {
780 if (retries > 0) {
781 LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
782 fs.delete(versionFile, false);
783 try {
784 if (wait > 0) {
785 Thread.sleep(wait);
786 }
787 } catch (InterruptedException ie) {
788 throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
789 }
790 retries--;
791 } else {
792 throw e;
793 }
794 }
795 }
796 }
797
798
799
800
801
802
803
804
805
806 public static boolean checkClusterIdExists(FileSystem fs, Path rootdir,
807 int wait) throws IOException {
808 while (true) {
809 try {
810 Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
811 return fs.exists(filePath);
812 } catch (IOException ioe) {
813 if (wait > 0) {
814 LOG.warn("Unable to check cluster ID file in " + rootdir.toString() +
815 ", retrying in "+wait+"msec: "+StringUtils.stringifyException(ioe));
816 try {
817 Thread.sleep(wait);
818 } catch (InterruptedException e) {
819 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
820 }
821 } else {
822 throw ioe;
823 }
824 }
825 }
826 }
827
828
829
830
831
832
833
834
835 public static ClusterId getClusterId(FileSystem fs, Path rootdir)
836 throws IOException {
837 Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
838 ClusterId clusterId = null;
839 FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath): null;
840 if (status != null) {
841 int len = Ints.checkedCast(status.getLen());
842 byte [] content = new byte[len];
843 FSDataInputStream in = fs.open(idPath);
844 try {
845 in.readFully(content);
846 } catch (EOFException eof) {
847 LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
848 } finally{
849 in.close();
850 }
851 try {
852 clusterId = ClusterId.parseFrom(content);
853 } catch (DeserializationException e) {
854 throw new IOException("content=" + Bytes.toString(content), e);
855 }
856
857 if (!ProtobufUtil.isPBMagicPrefix(content)) {
858 String cid = null;
859 in = fs.open(idPath);
860 try {
861 cid = in.readUTF();
862 clusterId = new ClusterId(cid);
863 } catch (EOFException eof) {
864 LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
865 } finally {
866 in.close();
867 }
868 rewriteAsPb(fs, rootdir, idPath, clusterId);
869 }
870 return clusterId;
871 } else {
872 LOG.warn("Cluster ID file does not exist at " + idPath.toString());
873 }
874 return clusterId;
875 }
876
877
878
879
880
881 private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
882 final ClusterId cid)
883 throws IOException {
884
885
886 Path movedAsideName = new Path(p + "." + System.currentTimeMillis());
887 if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
888 setClusterId(fs, rootdir, cid, 100);
889 if (!fs.delete(movedAsideName, false)) {
890 throw new IOException("Failed delete of " + movedAsideName);
891 }
892 LOG.debug("Rewrote the hbase.id file as pb");
893 }
894
895
896
897
898
899
900
901
902
903
904 public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId,
905 int wait) throws IOException {
906 while (true) {
907 try {
908 Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
909 Path tempIdFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY +
910 Path.SEPARATOR + HConstants.CLUSTER_ID_FILE_NAME);
911
912 FSDataOutputStream s = fs.create(tempIdFile);
913 try {
914 s.write(clusterId.toByteArray());
915 s.close();
916 s = null;
917
918
919 if (!fs.rename(tempIdFile, idFile)) {
920 throw new IOException("Unable to move temp version file to " + idFile);
921 }
922 } finally {
923
924 try {
925 if (s != null) s.close();
926 } catch (IOException ignore) { }
927 }
928 if (LOG.isDebugEnabled()) {
929 LOG.debug("Created cluster ID file at " + idFile.toString() + " with ID: " + clusterId);
930 }
931 return;
932 } catch (IOException ioe) {
933 if (wait > 0) {
934 LOG.warn("Unable to create cluster ID file in " + rootdir.toString() +
935 ", retrying in " + wait + "msec: " + StringUtils.stringifyException(ioe));
936 try {
937 Thread.sleep(wait);
938 } catch (InterruptedException e) {
939 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
940 }
941 } else {
942 throw ioe;
943 }
944 }
945 }
946 }
947
948
949
950
951
952
953
954
955 public static Path validateRootPath(Path root) throws IOException {
956 try {
957 URI rootURI = new URI(root.toString());
958 String scheme = rootURI.getScheme();
959 if (scheme == null) {
960 throw new IOException("Root directory does not have a scheme");
961 }
962 return root;
963 } catch (URISyntaxException e) {
964 IOException io = new IOException("Root directory path is not a valid " +
965 "URI -- check your " + HBASE_DIR + " configuration");
966 io.initCause(e);
967 throw io;
968 }
969 }
970
971
972
973
974
975
976
977
978
979 public static String removeWALRootPath(Path path, final Configuration conf) throws IOException {
980 Path root = getWALRootDir(conf);
981 String pathStr = path.toString();
982
983 if (!pathStr.startsWith(root.toString())) return pathStr;
984
985 return pathStr.substring(root.toString().length() + 1);
986 }
987
988
989
990
991
992
993
994 public static void waitOnSafeMode(final Configuration conf,
995 final long wait)
996 throws IOException {
997 FileSystem fs = FileSystem.get(conf);
998 if (!(fs instanceof DistributedFileSystem)) return;
999 DistributedFileSystem dfs = (DistributedFileSystem)fs;
1000
1001 while (isInSafeMode(dfs)) {
1002 LOG.info("Waiting for dfs to exit safe mode...");
1003 try {
1004 Thread.sleep(wait);
1005 } catch (InterruptedException e) {
1006 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1007 }
1008 }
1009 }
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021 public static String getPath(Path p) {
1022 return p.toUri().getPath();
1023 }
1024
1025
1026
1027
1028
1029
1030
1031 public static Path getRootDir(final Configuration c) throws IOException {
1032 Path p = new Path(c.get(HBASE_DIR));
1033 FileSystem fs = p.getFileSystem(c);
1034 return p.makeQualified(fs);
1035 }
1036
1037 public static void setRootDir(final Configuration c, final Path root) throws IOException {
1038 c.set(HBASE_DIR, root.toString());
1039 }
1040
1041 public static void setFsDefault(final Configuration c, final Path root) throws IOException {
1042 c.set("fs.defaultFS", root.toString());
1043 }
1044
1045 public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException {
1046 Path p = getRootDir(c);
1047 return p.getFileSystem(c);
1048 }
1049
1050
1051
1052
1053
1054
1055
1056 public static Path getWALRootDir(final Configuration c) throws IOException {
1057 Path p = new Path(c.get(HFileSystem.HBASE_WAL_DIR, c.get(HBASE_DIR)));
1058 if (!isValidWALRootDir(p, c)) {
1059 return FSUtils.getRootDir(c);
1060 }
1061 FileSystem fs = p.getFileSystem(c);
1062 return p.makeQualified(fs);
1063 }
1064
1065 @VisibleForTesting
1066 public static void setWALRootDir(final Configuration c, final Path root) throws IOException {
1067 c.set(HFileSystem.HBASE_WAL_DIR, root.toString());
1068 }
1069
1070 public static FileSystem getWALFileSystem(final Configuration c) throws IOException {
1071 Path p = getWALRootDir(c);
1072 return p.getFileSystem(c);
1073 }
1074
1075 private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException {
1076 Path rootDir = FSUtils.getRootDir(c);
1077 if (walDir != rootDir) {
1078 if (walDir.toString().startsWith(rootDir.toString() + "/")) {
1079 throw new IllegalStateException("Illegal WAL directory specified. " +
1080 "WAL directories are not permitted to be under the root directory if set.");
1081 }
1082 }
1083 return true;
1084 }
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094 @SuppressWarnings("deprecation")
1095 public static boolean metaRegionExists(FileSystem fs, Path rootdir)
1096 throws IOException {
1097 Path metaRegionDir =
1098 HRegion.getRegionDir(rootdir, HRegionInfo.FIRST_META_REGIONINFO);
1099 return fs.exists(metaRegionDir);
1100 }
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110 static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
1111 final FileSystem fs, FileStatus status, long start, long length)
1112 throws IOException {
1113 HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
1114 BlockLocation [] blockLocations =
1115 fs.getFileBlockLocations(status, start, length);
1116 for(BlockLocation bl : blockLocations) {
1117 String [] hosts = bl.getHosts();
1118 long len = bl.getLength();
1119 blocksDistribution.addHostsAndBlockWeight(hosts, len);
1120 }
1121
1122 return blocksDistribution;
1123 }
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136 public static boolean isMajorCompacted(final FileSystem fs,
1137 final Path hbaseRootDir)
1138 throws IOException {
1139 List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
1140 PathFilter regionFilter = new RegionDirFilter(fs);
1141 PathFilter familyFilter = new FamilyDirFilter(fs);
1142 for (Path d : tableDirs) {
1143 FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
1144 for (FileStatus regionDir : regionDirs) {
1145 Path dd = regionDir.getPath();
1146
1147 FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
1148 for (FileStatus familyDir : familyDirs) {
1149 Path family = familyDir.getPath();
1150
1151 FileStatus[] familyStatus = fs.listStatus(family);
1152 if (familyStatus.length > 1) {
1153 LOG.debug(family.toString() + " has " + familyStatus.length +
1154 " files.");
1155 return false;
1156 }
1157 }
1158 }
1159 }
1160 return true;
1161 }
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172 public static int getTotalTableFragmentation(final HMaster master)
1173 throws IOException {
1174 Map<String, Integer> map = getTableFragmentation(master);
1175 return map != null && map.size() > 0 ? map.get("-TOTAL-") : -1;
1176 }
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188 public static Map<String, Integer> getTableFragmentation(
1189 final HMaster master)
1190 throws IOException {
1191 Path path = getRootDir(master.getConfiguration());
1192
1193 FileSystem fs = path.getFileSystem(master.getConfiguration());
1194 return getTableFragmentation(fs, path);
1195 }
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207 public static Map<String, Integer> getTableFragmentation(
1208 final FileSystem fs, final Path hbaseRootDir)
1209 throws IOException {
1210 Map<String, Integer> frags = new HashMap<String, Integer>();
1211 int cfCountTotal = 0;
1212 int cfFragTotal = 0;
1213 PathFilter regionFilter = new RegionDirFilter(fs);
1214 PathFilter familyFilter = new FamilyDirFilter(fs);
1215 List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
1216 for (Path d : tableDirs) {
1217 int cfCount = 0;
1218 int cfFrag = 0;
1219 FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
1220 for (FileStatus regionDir : regionDirs) {
1221 Path dd = regionDir.getPath();
1222
1223 FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
1224 for (FileStatus familyDir : familyDirs) {
1225 cfCount++;
1226 cfCountTotal++;
1227 Path family = familyDir.getPath();
1228
1229 FileStatus[] familyStatus = fs.listStatus(family);
1230 if (familyStatus.length > 1) {
1231 cfFrag++;
1232 cfFragTotal++;
1233 }
1234 }
1235 }
1236
1237 frags.put(FSUtils.getTableName(d).getNameAsString(),
1238 cfCount == 0? 0: Math.round((float) cfFrag / cfCount * 100));
1239 }
1240
1241 frags.put("-TOTAL-",
1242 cfCountTotal == 0? 0: Math.round((float) cfFragTotal / cfCountTotal * 100));
1243 return frags;
1244 }
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254 public static Path getTableDir(Path rootdir, final TableName tableName) {
1255 return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
1256 tableName.getQualifierAsString());
1257 }
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267 public static TableName getTableName(Path tablePath) {
1268 return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
1269 }
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279 public static Path getNamespaceDir(Path rootdir, final String namespace) {
1280 return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR,
1281 new Path(namespace)));
1282 }
1283
1284
1285
1286
1287 static class FileFilter extends AbstractFileStatusFilter {
1288 private final FileSystem fs;
1289
1290 public FileFilter(final FileSystem fs) {
1291 this.fs = fs;
1292 }
1293
1294 @Override
1295 protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1296 try {
1297 return isFile(fs, isDir, p);
1298 } catch (IOException e) {
1299 LOG.warn("unable to verify if path=" + p + " is a regular file", e);
1300 return false;
1301 }
1302 }
1303 }
1304
1305
1306
1307
1308 public static class BlackListDirFilter extends AbstractFileStatusFilter {
1309 private final FileSystem fs;
1310 private List<String> blacklist;
1311
1312
1313
1314
1315
1316
1317
1318 @SuppressWarnings("unchecked")
1319 public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
1320 this.fs = fs;
1321 blacklist =
1322 (List<String>) (directoryNameBlackList == null ? Collections.emptyList()
1323 : directoryNameBlackList);
1324 }
1325
1326 @Override
1327 protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1328 if (!isValidName(p.getName())) {
1329 return false;
1330 }
1331
1332 try {
1333 return isDirectory(fs, isDir, p);
1334 } catch (IOException e) {
1335 LOG.warn("An error occurred while verifying if [" + p.toString()
1336 + "] is a valid directory. Returning 'not valid' and continuing.", e);
1337 return false;
1338 }
1339 }
1340
1341 protected boolean isValidName(final String name) {
1342 return !blacklist.contains(name);
1343 }
1344 }
1345
1346
1347
1348
1349 public static class DirFilter extends BlackListDirFilter {
1350
1351 public DirFilter(FileSystem fs) {
1352 super(fs, null);
1353 }
1354 }
1355
1356
1357
1358
1359
1360 public static class UserTableDirFilter extends BlackListDirFilter {
1361 public UserTableDirFilter(FileSystem fs) {
1362 super(fs, HConstants.HBASE_NON_TABLE_DIRS);
1363 }
1364
1365 protected boolean isValidName(final String name) {
1366 if (!super.isValidName(name))
1367 return false;
1368
1369 try {
1370 TableName.isLegalTableQualifierName(Bytes.toBytes(name));
1371 } catch (IllegalArgumentException e) {
1372 LOG.info("INVALID NAME " + name);
1373 return false;
1374 }
1375 return true;
1376 }
1377 }
1378
1379
1380
1381
1382
1383
1384
1385
1386 public static boolean isAppendSupported(final Configuration conf) {
1387 boolean append = conf.getBoolean("dfs.support.append", false);
1388 if (append) {
1389 try {
1390
1391
1392
1393 SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
1394 append = true;
1395 } catch (SecurityException e) {
1396 } catch (NoSuchMethodException e) {
1397 append = false;
1398 }
1399 }
1400 if (!append) {
1401
1402 try {
1403 FSDataOutputStream.class.getMethod("hflush", new Class<?> []{});
1404 append = true;
1405 } catch (NoSuchMethodException e) {
1406 append = false;
1407 }
1408 }
1409 return append;
1410 }
1411
1412
1413
1414
1415
1416
1417 public static boolean isHDFS(final Configuration conf) throws IOException {
1418 FileSystem fs = FileSystem.get(conf);
1419 String scheme = fs.getUri().getScheme();
1420 return scheme.equalsIgnoreCase("hdfs");
1421 }
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431 public abstract void recoverFileLease(final FileSystem fs, final Path p,
1432 Configuration conf, CancelableProgressable reporter) throws IOException;
1433
1434 public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
1435 throws IOException {
1436 List<Path> tableDirs = new LinkedList<Path>();
1437
1438 for(FileStatus status :
1439 fs.globStatus(new Path(rootdir,
1440 new Path(HConstants.BASE_NAMESPACE_DIR, "*")))) {
1441 tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
1442 }
1443 return tableDirs;
1444 }
1445
1446
1447
1448
1449
1450
1451
1452
1453 public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
1454 throws IOException {
1455
1456 FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
1457 List<Path> tabledirs = new ArrayList<Path>(dirs.length);
1458 for (FileStatus dir: dirs) {
1459 tabledirs.add(dir.getPath());
1460 }
1461 return tabledirs;
1462 }
1463
1464
1465
1466
1467
1468
1469 public static boolean isRecoveredEdits(Path path) {
1470 return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
1471 }
1472
1473
1474
1475
1476 public static class RegionDirFilter extends AbstractFileStatusFilter {
1477
1478 final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
1479 final FileSystem fs;
1480
1481 public RegionDirFilter(FileSystem fs) {
1482 this.fs = fs;
1483 }
1484
1485 @Override
1486 protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1487 if (!regionDirPattern.matcher(p.getName()).matches()) {
1488 return false;
1489 }
1490
1491 try {
1492 return isDirectory(fs, isDir, p);
1493 } catch (IOException ioe) {
1494
1495 LOG.warn("Skipping file " + p +" due to IOException", ioe);
1496 return false;
1497 }
1498 }
1499 }
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509 public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException {
1510
1511 List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1512 if (rds == null) {
1513 return new ArrayList<Path>();
1514 }
1515 List<Path> regionDirs = new ArrayList<Path>(rds.size());
1516 for (FileStatus rdfs: rds) {
1517 Path rdPath = rdfs.getPath();
1518 regionDirs.add(rdPath);
1519 }
1520 return regionDirs;
1521 }
1522
1523
1524
1525
1526
1527 public static class FamilyDirFilter extends AbstractFileStatusFilter {
1528 final FileSystem fs;
1529
1530 public FamilyDirFilter(FileSystem fs) {
1531 this.fs = fs;
1532 }
1533
1534 @Override
1535 protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1536 try {
1537
1538 HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(p.getName()));
1539 } catch (IllegalArgumentException iae) {
1540
1541 return false;
1542 }
1543
1544 try {
1545 return isDirectory(fs, isDir, p);
1546 } catch (IOException ioe) {
1547
1548 LOG.warn("Skipping file " + p +" due to IOException", ioe);
1549 return false;
1550 }
1551 }
1552 }
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562 public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
1563
1564 FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
1565 List<Path> familyDirs = new ArrayList<Path>(fds.length);
1566 for (FileStatus fdfs: fds) {
1567 Path fdPath = fdfs.getPath();
1568 familyDirs.add(fdPath);
1569 }
1570 return familyDirs;
1571 }
1572
1573 public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException {
1574 List<FileStatus> fds = listStatusWithStatusFilter(fs, familyDir, new ReferenceFileFilter(fs));
1575 if (fds == null) {
1576 return new ArrayList<Path>();
1577 }
1578 List<Path> referenceFiles = new ArrayList<Path>(fds.size());
1579 for (FileStatus fdfs: fds) {
1580 Path fdPath = fdfs.getPath();
1581 referenceFiles.add(fdPath);
1582 }
1583 return referenceFiles;
1584 }
1585
1586
1587
1588
1589 public static class HFileFilter extends AbstractFileStatusFilter {
1590 final FileSystem fs;
1591
1592 public HFileFilter(FileSystem fs) {
1593 this.fs = fs;
1594 }
1595
1596 @Override
1597 protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1598 if (!StoreFileInfo.isHFile(p)) {
1599 return false;
1600 }
1601
1602 try {
1603 return isFile(fs, isDir, p);
1604 } catch (IOException ioe) {
1605
1606 LOG.warn("Skipping file " + p +" due to IOException", ioe);
1607 return false;
1608 }
1609 }
1610 }
1611
1612
1613
1614
1615
1616 public static class HFileLinkFilter implements PathFilter {
1617
1618 @Override
1619 public boolean accept(Path p) {
1620 return HFileLink.isHFileLink(p);
1621 }
1622 }
1623
1624 public static class ReferenceFileFilter extends AbstractFileStatusFilter {
1625
1626 private final FileSystem fs;
1627
1628 public ReferenceFileFilter(FileSystem fs) {
1629 this.fs = fs;
1630 }
1631
1632 @Override
1633 protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1634 if (!StoreFileInfo.isReference(p)) {
1635 return false;
1636 }
1637
1638 try {
1639
1640 return isFile(fs, isDir, p);
1641 } catch (IOException ioe) {
1642
1643 LOG.warn("Skipping file " + p +" due to IOException", ioe);
1644 return false;
1645 }
1646 }
1647 }
1648
1649
1650
1651
1652
1653
1654
1655 public static FileSystem getCurrentFileSystem(Configuration conf)
1656 throws IOException {
1657 return getRootDir(conf).getFileSystem(conf);
1658 }
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1678 final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1679 throws IOException, InterruptedException {
1680 return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null, null);
1681 }
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705 public static Map<String, Path> getTableStoreFilePathMap(
1706 Map<String, Path> resultMap,
1707 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1708 ExecutorService executor, final ErrorReporter errors) throws IOException, InterruptedException {
1709
1710 final Map<String, Path> finalResultMap =
1711 resultMap == null ? new ConcurrentHashMap<String, Path>(128, 0.75f, 32) : resultMap;
1712
1713
1714 Path tableDir = FSUtils.getTableDir(hbaseRootDir, tableName);
1715
1716
1717 final FamilyDirFilter familyFilter = new FamilyDirFilter(fs);
1718 final Vector<Exception> exceptions = new Vector<Exception>();
1719
1720 try {
1721 List<FileStatus> regionDirs = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1722 if (regionDirs == null) {
1723 return finalResultMap;
1724 }
1725
1726 final List<Future<?>> futures = new ArrayList<Future<?>>(regionDirs.size());
1727
1728 for (FileStatus regionDir : regionDirs) {
1729 if (null != errors) {
1730 errors.progress();
1731 }
1732 final Path dd = regionDir.getPath();
1733
1734 if (!exceptions.isEmpty()) {
1735 break;
1736 }
1737
1738 Runnable getRegionStoreFileMapCall = new Runnable() {
1739 @Override
1740 public void run() {
1741 try {
1742 HashMap<String,Path> regionStoreFileMap = new HashMap<String, Path>();
1743 List<FileStatus> familyDirs = FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter);
1744 if (familyDirs == null) {
1745 if (!fs.exists(dd)) {
1746 LOG.warn("Skipping region because it no longer exists: " + dd);
1747 } else {
1748 LOG.warn("Skipping region because it has no family dirs: " + dd);
1749 }
1750 return;
1751 }
1752 for (FileStatus familyDir : familyDirs) {
1753 if (null != errors) {
1754 errors.progress();
1755 }
1756 Path family = familyDir.getPath();
1757 if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1758 continue;
1759 }
1760
1761
1762 FileStatus[] familyStatus = fs.listStatus(family);
1763 for (FileStatus sfStatus : familyStatus) {
1764 if (null != errors) {
1765 errors.progress();
1766 }
1767 Path sf = sfStatus.getPath();
1768 if (sfFilter == null || sfFilter.accept(sf)) {
1769 regionStoreFileMap.put( sf.getName(), sf);
1770 }
1771 }
1772 }
1773 finalResultMap.putAll(regionStoreFileMap);
1774 } catch (Exception e) {
1775 LOG.error("Could not get region store file map for region: " + dd, e);
1776 exceptions.add(e);
1777 }
1778 }
1779 };
1780
1781
1782
1783 if (executor != null) {
1784 Future<?> future = executor.submit(getRegionStoreFileMapCall);
1785 futures.add(future);
1786 } else {
1787 FutureTask<?> future = new FutureTask<Object>(getRegionStoreFileMapCall, null);
1788 future.run();
1789 futures.add(future);
1790 }
1791 }
1792
1793
1794 for (Future<?> f : futures) {
1795 if (!exceptions.isEmpty()) {
1796 break;
1797 }
1798 try {
1799 f.get();
1800 } catch (ExecutionException e) {
1801 LOG.error("Unexpected exec exception! Should've been caught already. (Bug?)", e);
1802
1803 }
1804 }
1805 } catch (IOException e) {
1806 LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e);
1807 exceptions.add(e);
1808 } finally {
1809 if (!exceptions.isEmpty()) {
1810
1811
1812 Throwables.propagateIfInstanceOf(exceptions.firstElement(), IOException.class);
1813 throw Throwables.propagate(exceptions.firstElement());
1814 }
1815 }
1816
1817 return finalResultMap;
1818 }
1819
1820 public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1821 int result = 0;
1822 try {
1823 for (Path familyDir:getFamilyDirs(fs, p)){
1824 result += getReferenceFilePaths(fs, familyDir).size();
1825 }
1826 } catch (IOException e) {
1827 LOG.warn("Error Counting reference files.", e);
1828 }
1829 return result;
1830 }
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846 public static Map<String, Path> getTableStoreFilePathMap(
1847 final FileSystem fs, final Path hbaseRootDir)
1848 throws IOException, InterruptedException {
1849 return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, null);
1850 }
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869 public static Map<String, Path> getTableStoreFilePathMap(
1870 final FileSystem fs, final Path hbaseRootDir, PathFilter sfFilter,
1871 ExecutorService executor, ErrorReporter errors)
1872 throws IOException, InterruptedException {
1873 ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<String, Path>(1024, 0.75f, 32);
1874
1875
1876
1877
1878
1879 for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1880 getTableStoreFilePathMap(map, fs, hbaseRootDir,
1881 FSUtils.getTableName(tableDir), sfFilter, executor, errors);
1882 }
1883 return map;
1884 }
1885
1886
1887
1888
1889
1890
1891
1892
1893 public static List<FileStatus> filterFileStatuses(FileStatus[] input,
1894 FileStatusFilter filter) {
1895 if (input == null) return null;
1896 return filterFileStatuses(Iterators.forArray(input), filter);
1897 }
1898
1899
1900
1901
1902
1903
1904
1905
1906 public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input,
1907 FileStatusFilter filter) {
1908 if (input == null) return null;
1909 ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1910 while (input.hasNext()) {
1911 FileStatus f = input.next();
1912 if (filter.accept(f)) {
1913 results.add(f);
1914 }
1915 }
1916 return results;
1917 }
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930 public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs,
1931 final Path dir, final FileStatusFilter filter) throws IOException {
1932 FileStatus [] status = null;
1933 try {
1934 status = fs.listStatus(dir);
1935 } catch (FileNotFoundException fnfe) {
1936
1937 if (LOG.isTraceEnabled()) {
1938 LOG.trace(dir + " doesn't exist");
1939 }
1940 }
1941
1942 if (status == null || status.length < 1) {
1943 return null;
1944 }
1945
1946 if (filter == null) {
1947 return Arrays.asList(status);
1948 } else {
1949 List<FileStatus> status2 = filterFileStatuses(status, filter);
1950 if (status2 == null || status2.isEmpty()) {
1951 return null;
1952 } else {
1953 return status2;
1954 }
1955 }
1956 }
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972 public static FileStatus [] listStatus(final FileSystem fs,
1973 final Path dir, final PathFilter filter) throws IOException {
1974 FileStatus [] status = null;
1975 try {
1976 status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
1977 } catch (FileNotFoundException fnfe) {
1978
1979 if (LOG.isTraceEnabled()) {
1980 LOG.trace(dir + " doesn't exist");
1981 }
1982 }
1983 if (status == null || status.length < 1) return null;
1984 return status;
1985 }
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995 public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
1996 return listStatus(fs, dir, null);
1997 }
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008 public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
2009 throws IOException {
2010 return fs.delete(path, recursive);
2011 }
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021 public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
2022 return fs.exists(path);
2023 }
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035 public static void checkAccess(UserGroupInformation ugi, FileStatus file,
2036 FsAction action) throws AccessDeniedException {
2037 if (ugi.getShortUserName().equals(file.getOwner())) {
2038 if (file.getPermission().getUserAction().implies(action)) {
2039 return;
2040 }
2041 } else if (contains(ugi.getGroupNames(), file.getGroup())) {
2042 if (file.getPermission().getGroupAction().implies(action)) {
2043 return;
2044 }
2045 } else if (file.getPermission().getOtherAction().implies(action)) {
2046 return;
2047 }
2048 throw new AccessDeniedException("Permission denied:" + " action=" + action
2049 + " path=" + file.getPath() + " user=" + ugi.getShortUserName());
2050 }
2051
2052 private static boolean contains(String[] groups, String user) {
2053 for (String group : groups) {
2054 if (group.equals(user)) {
2055 return true;
2056 }
2057 }
2058 return false;
2059 }
2060
2061
2062
2063
2064
2065
2066
2067
2068 public static void logFileSystemState(final FileSystem fs, final Path root, Log LOG)
2069 throws IOException {
2070 LOG.debug("Current file system:");
2071 logFSTree(LOG, fs, root, "|-");
2072 }
2073
2074
2075
2076
2077
2078
2079 private static void logFSTree(Log LOG, final FileSystem fs, final Path root, String prefix)
2080 throws IOException {
2081 FileStatus[] files = FSUtils.listStatus(fs, root, null);
2082 if (files == null) return;
2083
2084 for (FileStatus file : files) {
2085 if (file.isDirectory()) {
2086 LOG.debug(prefix + file.getPath().getName() + "/");
2087 logFSTree(LOG, fs, file.getPath(), prefix + "---");
2088 } else {
2089 LOG.debug(prefix + file.getPath().getName());
2090 }
2091 }
2092 }
2093
2094 public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
2095 throws IOException {
2096
2097 fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1);
2098 return fs.rename(src, dest);
2099 }
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114 public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
2115 final Configuration conf) throws IOException {
2116 return getRegionDegreeLocalityMappingFromFS(
2117 conf, null,
2118 conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
2119
2120 }
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138 public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
2139 final Configuration conf, final String desiredTable, int threadPoolSize)
2140 throws IOException {
2141 Map<String, Map<String, Float>> regionDegreeLocalityMapping =
2142 new ConcurrentHashMap<String, Map<String, Float>>();
2143 getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, null,
2144 regionDegreeLocalityMapping);
2145 return regionDegreeLocalityMapping;
2146 }
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168 private static void getRegionLocalityMappingFromFS(
2169 final Configuration conf, final String desiredTable,
2170 int threadPoolSize,
2171 Map<String, String> regionToBestLocalityRSMapping,
2172 Map<String, Map<String, Float>> regionDegreeLocalityMapping)
2173 throws IOException {
2174 FileSystem fs = FileSystem.get(conf);
2175 Path rootPath = FSUtils.getRootDir(conf);
2176 long startTime = EnvironmentEdgeManager.currentTime();
2177 Path queryPath;
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278 public static void setupShortCircuitRead(final Configuration conf) {
2279
2280 boolean shortCircuitSkipChecksum =
2281 conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
2282 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
2283 if (shortCircuitSkipChecksum) {
2284 LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
2285 "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " +
2286 "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
2287 assert !shortCircuitSkipChecksum;
2288 }
2289 checkShortCircuitReadBufferSize(conf);
2290 }
2291
2292
2293
2294
2295
2296 public static void checkShortCircuitReadBufferSize(final Configuration conf) {
2297 final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
2298 final int notSet = -1;
2299
2300 final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
2301 int size = conf.getInt(dfsKey, notSet);
2302
2303 if (size != notSet) return;
2304
2305 int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
2306 conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
2307 }
2308 }