View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.List;
23  
24  import org.apache.hadoop.conf.Configuration;
25  import org.apache.hadoop.fs.FileStatus;
26  import org.apache.hadoop.fs.FileSystem;
27  import org.apache.hadoop.fs.Path;
28  import org.apache.hadoop.fs.PathFilter;
29  import org.apache.hadoop.hbase.Cell;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
32  import org.apache.hadoop.hbase.io.hfile.HFile;
33  import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
34  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
35  import org.apache.hadoop.io.NullWritable;
36  import org.apache.hadoop.mapreduce.InputSplit;
37  import org.apache.hadoop.mapreduce.JobContext;
38  import org.apache.hadoop.mapreduce.RecordReader;
39  import org.apache.hadoop.mapreduce.TaskAttemptContext;
40  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
41  import org.apache.hadoop.mapreduce.lib.input.FileSplit;
42  import org.slf4j.Logger;
43  import org.slf4j.LoggerFactory;
44  
45  /**
46   * Simple MR input format for HFiles.
47   * This code was borrowed from Apache Crunch project.
48   * Updated to the recent version of HBase.
49   */
50  @InterfaceAudience.Private
51  public class HFileInputFormat extends FileInputFormat<NullWritable, Cell> {
52  
53    private static final Logger LOG = LoggerFactory.getLogger(HFileInputFormat.class);
54  
55    /**
56     * File filter that removes all "hidden" files. This might be something worth removing from
57     * a more general purpose utility; it accounts for the presence of metadata files created
58     * in the way we're doing exports.
59     */
60    static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() {
61      @Override
62      public boolean accept(Path p) {
63        String name = p.getName();
64        return !name.startsWith("_") && !name.startsWith(".");
65      }
66    };
67  
68    /**
69     * Record reader for HFiles.
70     */
71    private static class HFileRecordReader extends RecordReader<NullWritable, Cell> {
72  
73      private Reader in;
74      protected Configuration conf;
75      private HFileScanner scanner;
76  
77      /**
78       * A private cache of the key value so it doesn't need to be loaded twice from the scanner.
79       */
80      private Cell value = null;
81      private long count;
82      private boolean seeked = false;
83  
84      @Override
85      public void initialize(InputSplit split, TaskAttemptContext context)
86          throws IOException, InterruptedException {
87        FileSplit fileSplit = (FileSplit) split;
88        conf = context.getConfiguration();
89        Path path = fileSplit.getPath();
90        FileSystem fs = path.getFileSystem(conf);
91        LOG.info("Initialize HFileRecordReader for {}", path);
92        this.in = HFile.createReader(fs, path, new CacheConfig(conf), conf);
93  
94        // The file info must be loaded before the scanner can be used.
95        // This seems like a bug in HBase, but it's easily worked around.
96        this.in.loadFileInfo();
97        this.scanner = in.getScanner(false, false);
98  
99      }
100 
101 
102     @Override
103     public boolean nextKeyValue() throws IOException, InterruptedException {
104       boolean hasNext;
105       if (!seeked) {
106         LOG.info("Seeking to start");
107         hasNext = scanner.seekTo();
108         seeked = true;
109       } else {
110         hasNext = scanner.next();
111       }
112       if (!hasNext) {
113         return false;
114       }
115       value = scanner.getKeyValue();
116       count++;
117       return true;
118     }
119 
120     @Override
121     public NullWritable getCurrentKey() throws IOException, InterruptedException {
122       return NullWritable.get();
123     }
124 
125     @Override
126     public Cell getCurrentValue() throws IOException, InterruptedException {
127       return value;
128     }
129 
130     @Override
131     public float getProgress() throws IOException, InterruptedException {
132       // This would be inaccurate if KVs are not uniformly-sized or we have performed a seek to
133       // the start row, but better than nothing anyway.
134       return 1.0f * count / in.getEntries();
135     }
136 
137     @Override
138     public void close() throws IOException {
139       if (in != null) {
140         in.close();
141         in = null;
142       }
143     }
144   }
145 
146   @Override
147   protected List<FileStatus> listStatus(JobContext job) throws IOException {
148     List<FileStatus> result = new ArrayList<FileStatus>();
149 
150     // Explode out directories that match the original FileInputFormat filters
151     // since HFiles are written to directories where the
152     // directory name is the column name
153     for (FileStatus status : super.listStatus(job)) {
154       if (status.isDirectory()) {
155         FileSystem fs = status.getPath().getFileSystem(job.getConfiguration());
156         for (FileStatus match : fs.listStatus(status.getPath(), HIDDEN_FILE_FILTER)) {
157           result.add(match);
158         }
159       } else {
160         result.add(status);
161       }
162     }
163     return result;
164   }
165 
166   @Override
167   public RecordReader<NullWritable, Cell> createRecordReader(InputSplit split, TaskAttemptContext context)
168       throws IOException, InterruptedException {
169     return new HFileRecordReader();
170   }
171 
172   @Override
173   protected boolean isSplitable(JobContext context, Path filename) {
174     // This file isn't splittable.
175     return false;
176   }
177 }