1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.DataInput;
21 import java.io.DataOutput;
22 import java.io.EOFException;
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.List;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.FileStatus;
32 import org.apache.hadoop.fs.FileSystem;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.hbase.classification.InterfaceAudience;
35 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
36 import org.apache.hadoop.hbase.wal.WAL;
37 import org.apache.hadoop.hbase.wal.WAL.Entry;
38 import org.apache.hadoop.hbase.wal.WAL.Reader;
39 import org.apache.hadoop.hbase.wal.WALFactory;
40 import org.apache.hadoop.hbase.wal.WALKey;
41 import org.apache.hadoop.io.Writable;
42 import org.apache.hadoop.mapreduce.InputFormat;
43 import org.apache.hadoop.mapreduce.InputSplit;
44 import org.apache.hadoop.mapreduce.JobContext;
45 import org.apache.hadoop.mapreduce.RecordReader;
46 import org.apache.hadoop.mapreduce.TaskAttemptContext;
47 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
48 import org.apache.hadoop.util.StringUtils;
49
50
51
52
53 @InterfaceAudience.Public
54 public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
55 private static final Log LOG = LogFactory.getLog(WALInputFormat.class);
56
57 public static final String START_TIME_KEY = "wal.start.time";
58 public static final String END_TIME_KEY = "wal.end.time";
59
60
61
62
63
64 static class WALSplit extends InputSplit implements Writable {
65 private String logFileName;
66 private long fileSize;
67 private long startTime;
68 private long endTime;
69
70
71 public WALSplit() {}
72
73
74
75
76
77
78
79
80
81
82 public WALSplit(String logFileName, long fileSize, long startTime, long endTime) {
83 this.logFileName = logFileName;
84 this.fileSize = fileSize;
85 this.startTime = startTime;
86 this.endTime = endTime;
87 }
88
89 @Override
90 public long getLength() throws IOException, InterruptedException {
91 return fileSize;
92 }
93
94 @Override
95 public String[] getLocations() throws IOException, InterruptedException {
96
97 return new String[] {};
98 }
99
100 public String getLogFileName() {
101 return logFileName;
102 }
103
104 public long getStartTime() {
105 return startTime;
106 }
107
108 public long getEndTime() {
109 return endTime;
110 }
111
112 @Override
113 public void readFields(DataInput in) throws IOException {
114 logFileName = in.readUTF();
115 fileSize = in.readLong();
116 startTime = in.readLong();
117 endTime = in.readLong();
118 }
119
120 @Override
121 public void write(DataOutput out) throws IOException {
122 out.writeUTF(logFileName);
123 out.writeLong(fileSize);
124 out.writeLong(startTime);
125 out.writeLong(endTime);
126 }
127
128 @Override
129 public String toString() {
130 return logFileName + " (" + startTime + ":" + endTime + ") length:" + fileSize;
131 }
132 }
133
134
135
136
137
138 static abstract class WALRecordReader<K extends WALKey> extends RecordReader<K, WALEdit> {
139 private Reader reader = null;
140
141 Entry currentEntry = new Entry();
142 private long startTime;
143 private long endTime;
144
145 @Override
146 public void initialize(InputSplit split, TaskAttemptContext context)
147 throws IOException, InterruptedException {
148 WALSplit hsplit = (WALSplit)split;
149 Path logFile = new Path(hsplit.getLogFileName());
150 Configuration conf = context.getConfiguration();
151 LOG.info("Opening reader for "+split);
152 try {
153 this.reader = WALFactory.createReader(logFile.getFileSystem(conf), logFile, conf);
154 } catch (EOFException x) {
155 LOG.info("Ignoring corrupted WAL file: " + logFile
156 + " (This is normal when a RegionServer crashed.)");
157 this.reader = null;
158 }
159 this.startTime = hsplit.getStartTime();
160 this.endTime = hsplit.getEndTime();
161 }
162
163 @Override
164 public boolean nextKeyValue() throws IOException, InterruptedException {
165 if (reader == null) return false;
166
167 Entry temp;
168 long i = -1;
169 do {
170
171 try {
172 temp = reader.next(currentEntry);
173 i++;
174 } catch (EOFException x) {
175 LOG.info("Corrupted entry detected. Ignoring the rest of the file."
176 + " (This is normal when a RegionServer crashed.)");
177 return false;
178 }
179 }
180 while(temp != null && temp.getKey().getWriteTime() < startTime);
181
182 if (temp == null) {
183 if (i > 0) LOG.info("Skipped " + i + " entries.");
184 LOG.info("Reached end of file.");
185 return false;
186 } else if (i > 0) {
187 LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + ".");
188 }
189 boolean res = temp.getKey().getWriteTime() <= endTime;
190 if (!res) {
191 LOG.info("Reached ts: " + temp.getKey().getWriteTime() + " ignoring the rest of the file.");
192 }
193 return res;
194 }
195
196 @Override
197 public WALEdit getCurrentValue() throws IOException, InterruptedException {
198 return currentEntry.getEdit();
199 }
200
201 @Override
202 public float getProgress() throws IOException, InterruptedException {
203
204 return 0;
205 }
206
207 @Override
208 public void close() throws IOException {
209 LOG.info("Closing reader");
210 if (reader != null) this.reader.close();
211 }
212 }
213
214
215
216
217
218 static class WALKeyRecordReader extends WALRecordReader<WALKey> {
219 @Override
220 public WALKey getCurrentKey() throws IOException, InterruptedException {
221 return currentEntry.getKey();
222 }
223 }
224
225 @Override
226 public List<InputSplit> getSplits(JobContext context) throws IOException,
227 InterruptedException {
228 return getSplits(context, START_TIME_KEY, END_TIME_KEY);
229 }
230
231
232
233
234 List<InputSplit> getSplits(final JobContext context, final String startKey, final String endKey)
235 throws IOException, InterruptedException {
236 Configuration conf = context.getConfiguration();
237
238 Path[] inputPaths = getInputPaths(conf);
239
240 long startTime = conf.getLong(startKey, Long.MIN_VALUE);
241 long endTime = conf.getLong(endKey, Long.MAX_VALUE);
242
243 List<FileStatus> allFiles = new ArrayList<FileStatus>();
244 for(Path inputPath: inputPaths){
245 FileSystem fs = inputPath.getFileSystem(conf);
246 List<FileStatus> files = getFiles(fs, inputPath, startTime, endTime);
247 allFiles.addAll(files);
248 }
249 List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size());
250 for (FileStatus file : allFiles) {
251 splits.add(new WALSplit(file.getPath().toString(), file.getLen(), startTime, endTime));
252 }
253 return splits;
254 }
255
256 private Path[] getInputPaths(Configuration conf) {
257 String inpDirs = conf.get(FileInputFormat.INPUT_DIR);
258 return StringUtils.stringToPath(inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ",")));
259 }
260
261 private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime)
262 throws IOException {
263 List<FileStatus> result = new ArrayList<FileStatus>();
264 LOG.debug("Scanning " + dir.toString() + " for WAL files");
265
266 FileStatus[] files = fs.listStatus(dir);
267 if (files == null) return Collections.emptyList();
268 for (FileStatus file : files) {
269 if (file.isDirectory()) {
270
271 result.addAll(getFiles(fs, file.getPath(), startTime, endTime));
272 } else {
273 String name = file.getPath().toString();
274 int idx = name.lastIndexOf('.');
275 if (idx > 0) {
276 try {
277 long fileStartTime = Long.parseLong(name.substring(idx+1));
278 if (fileStartTime <= endTime) {
279 LOG.info("Found: " + name);
280 result.add(file);
281 }
282 } catch (NumberFormatException x) {
283 idx = 0;
284 }
285 }
286 if (idx == 0) {
287 LOG.warn("File " + name + " does not appear to be an WAL file. Skipping...");
288 }
289 }
290 }
291 return result;
292 }
293
294 @Override
295 public RecordReader<WALKey, WALEdit> createRecordReader(InputSplit split,
296 TaskAttemptContext context) throws IOException, InterruptedException {
297 return new WALKeyRecordReader();
298 }
299 }