1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.List;
23
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.fs.FileStatus;
26 import org.apache.hadoop.fs.FileSystem;
27 import org.apache.hadoop.fs.Path;
28 import org.apache.hadoop.fs.PathFilter;
29 import org.apache.hadoop.hbase.Cell;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
32 import org.apache.hadoop.hbase.io.hfile.HFile;
33 import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
34 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
35 import org.apache.hadoop.io.NullWritable;
36 import org.apache.hadoop.mapreduce.InputSplit;
37 import org.apache.hadoop.mapreduce.JobContext;
38 import org.apache.hadoop.mapreduce.RecordReader;
39 import org.apache.hadoop.mapreduce.TaskAttemptContext;
40 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
41 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45
46
47
48
49
50 @InterfaceAudience.Private
51 public class HFileInputFormat extends FileInputFormat<NullWritable, Cell> {
52
53 private static final Logger LOG = LoggerFactory.getLogger(HFileInputFormat.class);
54
55
56
57
58
59
60 static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() {
61 @Override
62 public boolean accept(Path p) {
63 String name = p.getName();
64 return !name.startsWith("_") && !name.startsWith(".");
65 }
66 };
67
68
69
70
71 private static class HFileRecordReader extends RecordReader<NullWritable, Cell> {
72
73 private Reader in;
74 protected Configuration conf;
75 private HFileScanner scanner;
76
77
78
79
80 private Cell value = null;
81 private long count;
82 private boolean seeked = false;
83
84 @Override
85 public void initialize(InputSplit split, TaskAttemptContext context)
86 throws IOException, InterruptedException {
87 FileSplit fileSplit = (FileSplit) split;
88 conf = context.getConfiguration();
89 Path path = fileSplit.getPath();
90 FileSystem fs = path.getFileSystem(conf);
91 LOG.info("Initialize HFileRecordReader for {}", path);
92 this.in = HFile.createReader(fs, path, new CacheConfig(conf), conf);
93
94
95
96 this.in.loadFileInfo();
97 this.scanner = in.getScanner(false, false);
98
99 }
100
101
102 @Override
103 public boolean nextKeyValue() throws IOException, InterruptedException {
104 boolean hasNext;
105 if (!seeked) {
106 LOG.info("Seeking to start");
107 hasNext = scanner.seekTo();
108 seeked = true;
109 } else {
110 hasNext = scanner.next();
111 }
112 if (!hasNext) {
113 return false;
114 }
115 value = scanner.getKeyValue();
116 count++;
117 return true;
118 }
119
120 @Override
121 public NullWritable getCurrentKey() throws IOException, InterruptedException {
122 return NullWritable.get();
123 }
124
125 @Override
126 public Cell getCurrentValue() throws IOException, InterruptedException {
127 return value;
128 }
129
130 @Override
131 public float getProgress() throws IOException, InterruptedException {
132
133
134 return 1.0f * count / in.getEntries();
135 }
136
137 @Override
138 public void close() throws IOException {
139 if (in != null) {
140 in.close();
141 in = null;
142 }
143 }
144 }
145
146 @Override
147 protected List<FileStatus> listStatus(JobContext job) throws IOException {
148 List<FileStatus> result = new ArrayList<FileStatus>();
149
150
151
152
153 for (FileStatus status : super.listStatus(job)) {
154 if (status.isDirectory()) {
155 FileSystem fs = status.getPath().getFileSystem(job.getConfiguration());
156 for (FileStatus match : fs.listStatus(status.getPath(), HIDDEN_FILE_FILTER)) {
157 result.add(match);
158 }
159 } else {
160 result.add(status);
161 }
162 }
163 return result;
164 }
165
166 @Override
167 public RecordReader<NullWritable, Cell> createRecordReader(InputSplit split, TaskAttemptContext context)
168 throws IOException, InterruptedException {
169 return new HFileRecordReader();
170 }
171
172 @Override
173 protected boolean isSplitable(JobContext context, Path filename) {
174
175 return false;
176 }
177 }