View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.util.hbck;
19  
20  import java.io.FileNotFoundException;
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.HashSet;
25  import java.util.List;
26  import java.util.Set;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.ConcurrentSkipListSet;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.atomic.AtomicInteger;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.hbase.classification.InterfaceAudience;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FileStatus;
39  import org.apache.hadoop.fs.FileSystem;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.TableName;
43  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
44  import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
45  import org.apache.hadoop.hbase.io.hfile.HFile;
46  import org.apache.hadoop.hbase.mob.MobUtils;
47  import org.apache.hadoop.hbase.util.FSUtils;
48  import org.apache.hadoop.hbase.util.FSUtils.FamilyDirFilter;
49  import org.apache.hadoop.hbase.util.FSUtils.HFileFilter;
50  import org.apache.hadoop.hbase.util.FSUtils.RegionDirFilter;
51  import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
52  
53  /**
54   * This class marches through all of the region's hfiles and verifies that
55   * they are all valid files. One just needs to instantiate the class, use
56   * checkTables(List<Path>) and then retrieve the corrupted hfiles (and
57   * quarantined files if in quarantining mode)
58   *
59   * The implementation currently parallelizes at the regionDir level.
60   */
61  @InterfaceAudience.Private
62  public class HFileCorruptionChecker {
63    private static final Log LOG = LogFactory.getLog(HFileCorruptionChecker.class);
64  
65    final Configuration conf;
66    final FileSystem fs;
67    final CacheConfig cacheConf;
68    final ExecutorService executor;
69    final Set<Path> corrupted = new ConcurrentSkipListSet<Path>();
70    final Set<Path> failures = new ConcurrentSkipListSet<Path>();
71    final Set<Path> quarantined = new ConcurrentSkipListSet<Path>();
72    final Set<Path> missing = new ConcurrentSkipListSet<Path>();
73    final Set<Path> corruptedMobFiles = new ConcurrentSkipListSet<Path>();
74    final Set<Path> failureMobFiles = new ConcurrentSkipListSet<Path>();
75    final Set<Path> missedMobFiles = new ConcurrentSkipListSet<Path>();
76    final Set<Path> quarantinedMobFiles = new ConcurrentSkipListSet<Path>();
77    final boolean inQuarantineMode;
78    final AtomicInteger hfilesChecked = new AtomicInteger();
79    final AtomicInteger mobFilesChecked = new AtomicInteger();
80  
81    public HFileCorruptionChecker(Configuration conf, ExecutorService executor,
82        boolean quarantine) throws IOException {
83      this.conf = conf;
84      this.fs = FileSystem.get(conf);
85      this.cacheConf = new CacheConfig(conf);
86      this.executor = executor;
87      this.inQuarantineMode = quarantine;
88    }
89  
90    /**
91     * Checks a path to see if it is a valid hfile.
92     *
93     * @param p
94     *          full Path to an HFile
95     * @throws IOException
96     *           This is a connectivity related exception
97     */
98    protected void checkHFile(Path p) throws IOException {
99      HFile.Reader r = null;
100     try {
101       r = HFile.createReader(fs, p, cacheConf, conf);
102     } catch (CorruptHFileException che) {
103       LOG.warn("Found corrupt HFile " + p, che);
104       corrupted.add(p);
105       if (inQuarantineMode) {
106         Path dest = createQuarantinePath(p);
107         LOG.warn("Quarantining corrupt HFile " + p + " into " + dest);
108         boolean success = fs.mkdirs(dest.getParent());
109         success = success ? fs.rename(p, dest): false;
110         if (!success) {
111           failures.add(p);
112         } else {
113           quarantined.add(dest);
114         }
115       }
116       return;
117     } catch (FileNotFoundException fnfe) {
118       LOG.warn("HFile " + p + " was missing.  Likely removed due to compaction/split?");
119       missing.add(p);
120     } finally {
121       hfilesChecked.addAndGet(1);
122       if (r != null) {
123         r.close(true);
124       }
125     }
126   }
127 
128   /**
129    * Given a path, generates a new path to where we move a corrupted hfile (bad
130    * trailer, no trailer).
131    *
132    * @param hFile
133    *          Path to a corrupt hfile (assumes that it is HBASE_DIR/ table
134    *          /region/cf/file)
135    * @return path to where corrupted files are stored. This should be
136    *         HBASE_DIR/.corrupt/table/region/cf/file.
137    */
138   Path createQuarantinePath(Path hFile) throws IOException {
139     // extract the normal dirs structure
140     Path cfDir = hFile.getParent();
141     Path regionDir = cfDir.getParent();
142     Path tableDir = regionDir.getParent();
143 
144     // build up the corrupted dirs strcture
145     Path corruptBaseDir = new Path(FSUtils.getRootDir(conf), conf.get(
146         "hbase.hfile.quarantine.dir", HConstants.CORRUPT_DIR_NAME));
147     Path corruptTableDir = new Path(corruptBaseDir, tableDir.getName());
148     Path corruptRegionDir = new Path(corruptTableDir, regionDir.getName());
149     Path corruptFamilyDir = new Path(corruptRegionDir, cfDir.getName());
150     Path corruptHfile = new Path(corruptFamilyDir, hFile.getName());
151     return corruptHfile;
152   }
153 
154   /**
155    * Check all files in a column family dir.
156    *
157    * @param cfDir
158    *          column family directory
159    * @throws IOException
160    */
161   protected void checkColFamDir(Path cfDir) throws IOException {
162     FileStatus[] statuses = null;
163     try {
164       statuses = fs.listStatus(cfDir); // use same filter as scanner.
165     } catch (FileNotFoundException fnfe) {
166       // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
167       LOG.warn("Colfam Directory " + cfDir +
168           " does not exist.  Likely due to concurrent split/compaction. Skipping.");
169       missing.add(cfDir);
170       return;
171     }
172 
173     List<FileStatus> hfs = FSUtils.filterFileStatuses(statuses, new HFileFilter(fs));
174     // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
175     if (hfs.size() == 0 && !fs.exists(cfDir)) {
176       LOG.warn("Colfam Directory " + cfDir +
177           " does not exist.  Likely due to concurrent split/compaction. Skipping.");
178       missing.add(cfDir);
179       return;
180     }
181     for (FileStatus hfFs : hfs) {
182       Path hf = hfFs.getPath();
183       checkHFile(hf);
184     }
185   }
186 
187   /**
188    * Check all files in a mob column family dir.
189    *
190    * @param cfDir
191    *          mob column family directory
192    * @throws IOException
193    */
194   protected void checkMobColFamDir(Path cfDir) throws IOException {
195     FileStatus[] hfs = null;
196     try {
197       hfs = fs.listStatus(cfDir, new HFileFilter(fs)); // use same filter as scanner.
198     } catch (FileNotFoundException fnfe) {
199       // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
200       LOG.warn("Mob colfam Directory " + cfDir +
201           " does not exist.  Likely the table is deleted. Skipping.");
202       missedMobFiles.add(cfDir);
203       return;
204     }
205 
206     // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
207     if (hfs.length == 0 && !fs.exists(cfDir)) {
208       LOG.warn("Mob colfam Directory " + cfDir +
209           " does not exist.  Likely the table is deleted. Skipping.");
210       missedMobFiles.add(cfDir);
211       return;
212     }
213     for (FileStatus hfFs : hfs) {
214       Path hf = hfFs.getPath();
215       checkMobFile(hf);
216     }
217   }
218 
219   /**
220    * Checks a path to see if it is a valid mob file.
221    *
222    * @param p
223    *          full Path to a mob file.
224    * @throws IOException
225    *           This is a connectivity related exception
226    */
227   protected void checkMobFile(Path p) throws IOException {
228     HFile.Reader r = null;
229     try {
230       r = HFile.createReader(fs, p, cacheConf, conf);
231     } catch (CorruptHFileException che) {
232       LOG.warn("Found corrupt mob file " + p, che);
233       corruptedMobFiles.add(p);
234       if (inQuarantineMode) {
235         Path dest = createQuarantinePath(p);
236         LOG.warn("Quarantining corrupt mob file " + p + " into " + dest);
237         boolean success = fs.mkdirs(dest.getParent());
238         success = success ? fs.rename(p, dest): false;
239         if (!success) {
240           failureMobFiles.add(p);
241         } else {
242           quarantinedMobFiles.add(dest);
243         }
244       }
245       return;
246     } catch (FileNotFoundException fnfe) {
247       LOG.warn("Mob file " + p + " was missing.  Likely removed due to compaction?");
248       missedMobFiles.add(p);
249     } finally {
250       mobFilesChecked.addAndGet(1);
251       if (r != null) {
252         r.close(true);
253       }
254     }
255   }
256 
257   /**
258    * Checks all the mob files of a table.
259    * @param regionDir The mob region directory
260    * @throws IOException
261    */
262   private void checkMobRegionDir(Path regionDir) throws IOException {
263     if (!fs.exists(regionDir)) {
264       return;
265     }
266     FileStatus[] hfs = null;
267     try {
268       hfs = fs.listStatus(regionDir, new FamilyDirFilter(fs));
269     } catch (FileNotFoundException fnfe) {
270       // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
271       LOG.warn("Mob directory " + regionDir
272         + " does not exist.  Likely the table is deleted. Skipping.");
273       missedMobFiles.add(regionDir);
274       return;
275     }
276 
277     // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
278     if (hfs.length == 0 && !fs.exists(regionDir)) {
279       LOG.warn("Mob directory " + regionDir
280         + " does not exist.  Likely the table is deleted. Skipping.");
281       missedMobFiles.add(regionDir);
282       return;
283     }
284     for (FileStatus hfFs : hfs) {
285       Path hf = hfFs.getPath();
286       checkMobColFamDir(hf);
287     }
288   }
289 
290   /**
291    * Check all column families in a region dir.
292    *
293    * @param regionDir
294    *          region directory
295    * @throws IOException
296    */
297   protected void checkRegionDir(Path regionDir) throws IOException {
298     FileStatus[] statuses = null;
299     try {
300       statuses = fs.listStatus(regionDir);
301     } catch (FileNotFoundException fnfe) {
302       // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
303       LOG.warn("Region Directory " + regionDir +
304           " does not exist.  Likely due to concurrent split/compaction. Skipping.");
305       missing.add(regionDir);
306       return;
307     }
308 
309     List<FileStatus> cfs = FSUtils.filterFileStatuses(statuses, new FamilyDirFilter(fs));
310     // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
311     if (cfs.size() == 0 && !fs.exists(regionDir)) {
312       LOG.warn("Region Directory " + regionDir +
313           " does not exist.  Likely due to concurrent split/compaction. Skipping.");
314       missing.add(regionDir);
315       return;
316     }
317 
318     for (FileStatus cfFs : cfs) {
319       Path cfDir = cfFs.getPath();
320       checkColFamDir(cfDir);
321     }
322   }
323 
324   /**
325    * Check all the regiondirs in the specified tableDir
326    *
327    * @param tableDir
328    *          path to a table
329    * @throws IOException
330    */
331   void checkTableDir(Path tableDir) throws IOException {
332     List<FileStatus> rds = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
333     if (rds == null) {
334       if (!fs.exists(tableDir)) {
335         LOG.warn("Table Directory " + tableDir +
336             " does not exist.  Likely due to concurrent delete. Skipping.");
337         missing.add(tableDir);
338       }
339       return;
340     }
341 
342     // Parallelize check at the region dir level
343     List<RegionDirChecker> rdcs = new ArrayList<RegionDirChecker>();
344     List<Future<Void>> rdFutures;
345 
346     for (FileStatus rdFs : rds) {
347       Path rdDir = rdFs.getPath();
348       RegionDirChecker work = new RegionDirChecker(rdDir);
349       rdcs.add(work);
350     }
351 
352     // add mob region
353     rdcs.add(createMobRegionDirChecker(tableDir));
354     // Submit and wait for completion
355     try {
356       rdFutures = executor.invokeAll(rdcs);
357     } catch (InterruptedException ie) {
358       Thread.currentThread().interrupt();
359       LOG.warn("Region dirs checking interrupted!", ie);
360       return;
361     }
362 
363     for (int i = 0; i < rdFutures.size(); i++) {
364       Future<Void> f = rdFutures.get(i);
365       try {
366         f.get();
367       } catch (ExecutionException e) {
368         LOG.warn("Failed to quaratine an HFile in regiondir "
369             + rdcs.get(i).regionDir, e.getCause());
370         // rethrow IOExceptions
371         if (e.getCause() instanceof IOException) {
372           throw (IOException) e.getCause();
373         }
374 
375         // rethrow RuntimeExceptions
376         if (e.getCause() instanceof RuntimeException) {
377           throw (RuntimeException) e.getCause();
378         }
379 
380         // this should never happen
381         LOG.error("Unexpected exception encountered", e);
382         return; // bailing out.
383       } catch (InterruptedException ie) {
384         Thread.currentThread().interrupt();
385         LOG.warn("Region dirs check interrupted!", ie);
386         // bailing out
387         return;
388       }
389     }
390   }
391 
392   /**
393    * An individual work item for parallelized regiondir processing. This is
394    * intentionally an inner class so it can use the shared error sets and fs.
395    */
396   private class RegionDirChecker implements Callable<Void> {
397     final Path regionDir;
398 
399     RegionDirChecker(Path regionDir) {
400       this.regionDir = regionDir;
401     }
402 
403     @Override
404     public Void call() throws IOException {
405       checkRegionDir(regionDir);
406       return null;
407     }
408   }
409 
410   /**
411    * An individual work item for parallelized mob dir processing. This is
412    * intentionally an inner class so it can use the shared error sets and fs.
413    */
414   private class MobRegionDirChecker extends RegionDirChecker {
415 
416     MobRegionDirChecker(Path regionDir) {
417       super(regionDir);
418     }
419 
420     @Override
421     public Void call() throws IOException {
422       checkMobRegionDir(regionDir);
423       return null;
424     }
425   }
426 
427   /**
428    * Creates an instance of MobRegionDirChecker.
429    * @param tableDir The current table directory.
430    * @return An instance of MobRegionDirChecker.
431    */
432   private MobRegionDirChecker createMobRegionDirChecker(Path tableDir) {
433     TableName tableName = FSUtils.getTableName(tableDir);
434     Path mobDir = MobUtils.getMobRegionPath(conf, tableName);
435     return new MobRegionDirChecker(mobDir);
436   }
437 
438   /**
439    * Check the specified table dirs for bad hfiles.
440    */
441   public void checkTables(Collection<Path> tables) throws IOException {
442     for (Path t : tables) {
443       checkTableDir(t);
444     }
445   }
446 
447   /**
448    * @return the set of check failure file paths after checkTables is called.
449    */
450   public Collection<Path> getFailures() {
451     return new HashSet<Path>(failures);
452   }
453 
454   /**
455    * @return the set of corrupted file paths after checkTables is called.
456    */
457   public Collection<Path> getCorrupted() {
458     return new HashSet<Path>(corrupted);
459   }
460 
461   /**
462    * @return number of hfiles checked in the last HfileCorruptionChecker run
463    */
464   public int getHFilesChecked() {
465     return hfilesChecked.get();
466   }
467 
468   /**
469    * @return the set of successfully quarantined paths after checkTables is called.
470    */
471   public Collection<Path> getQuarantined() {
472     return new HashSet<Path>(quarantined);
473   }
474 
475   /**
476    * @return the set of paths that were missing.  Likely due to deletion/moves from
477    *  compaction or flushes.
478    */
479   public Collection<Path> getMissing() {
480     return new HashSet<Path>(missing);
481   }
482 
483   /**
484    * @return the set of check failure mob file paths after checkTables is called.
485    */
486   public Collection<Path> getFailureMobFiles() {
487     return new HashSet<Path>(failureMobFiles);
488   }
489 
490   /**
491    * @return the set of corrupted mob file paths after checkTables is called.
492    */
493   public Collection<Path> getCorruptedMobFiles() {
494     return new HashSet<Path>(corruptedMobFiles);
495   }
496 
497   /**
498    * @return number of mob files checked in the last HfileCorruptionChecker run
499    */
500   public int getMobFilesChecked() {
501     return mobFilesChecked.get();
502   }
503 
504   /**
505    * @return the set of successfully quarantined paths after checkTables is called.
506    */
507   public Collection<Path> getQuarantinedMobFiles() {
508     return new HashSet<Path>(quarantinedMobFiles);
509   }
510 
511   /**
512    * @return the set of paths that were missing.  Likely due to table deletion or
513    *  deletion/moves from compaction.
514    */
515   public Collection<Path> getMissedMobFiles() {
516     return new HashSet<Path>(missedMobFiles);
517   }
518 
519   /**
520    * Print a human readable summary of hfile quarantining operations.
521    * @param out
522    */
523   public void report(ErrorReporter out) {
524     out.print("Checked " + hfilesChecked.get() + " hfile for corruption");
525     out.print("  HFiles corrupted:                  " + corrupted.size());
526     if (inQuarantineMode) {
527       out.print("    HFiles successfully quarantined: " + quarantined.size());
528       for (Path sq : quarantined) {
529         out.print("      " + sq);
530       }
531       out.print("    HFiles failed quarantine:        " + failures.size());
532       for (Path fq : failures) {
533         out.print("      " + fq);
534       }
535     }
536     out.print("    HFiles moved while checking:     " + missing.size());
537     for (Path mq : missing) {
538       out.print("      " + mq);
539     }
540 
541     String initialState = (corrupted.size() == 0) ? "OK" : "CORRUPTED";
542     String fixedState = (corrupted.size() == quarantined.size()) ? "OK"
543         : "CORRUPTED";
544 
545     // print mob-related report
546     if (inQuarantineMode) {
547       out.print("    Mob files successfully quarantined: " + quarantinedMobFiles.size());
548       for (Path sq : quarantinedMobFiles) {
549         out.print("      " + sq);
550       }
551       out.print("    Mob files failed quarantine:        " + failureMobFiles.size());
552       for (Path fq : failureMobFiles) {
553         out.print("      " + fq);
554       }
555     }
556     out.print("    Mob files moved while checking:     " + missedMobFiles.size());
557     for (Path mq : missedMobFiles) {
558       out.print("      " + mq);
559     }
560     String initialMobState = (corruptedMobFiles.size() == 0) ? "OK" : "CORRUPTED";
561     String fixedMobState = (corruptedMobFiles.size() == quarantinedMobFiles.size()) ? "OK"
562         : "CORRUPTED";
563 
564     if (inQuarantineMode) {
565       out.print("Summary: " + initialState + " => " + fixedState);
566       out.print("Mob summary: " + initialMobState + " => " + fixedMobState);
567     } else {
568       out.print("Summary: " + initialState);
569       out.print("Mob summary: " + initialMobState);
570     }
571   }
572 }