View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.EOFException;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.List;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.fs.FileStatus;
32  import org.apache.hadoop.fs.FileSystem;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
36  import org.apache.hadoop.hbase.wal.WAL;
37  import org.apache.hadoop.hbase.wal.WAL.Entry;
38  import org.apache.hadoop.hbase.wal.WAL.Reader;
39  import org.apache.hadoop.hbase.wal.WALFactory;
40  import org.apache.hadoop.hbase.wal.WALKey;
41  import org.apache.hadoop.io.Writable;
42  import org.apache.hadoop.mapreduce.InputFormat;
43  import org.apache.hadoop.mapreduce.InputSplit;
44  import org.apache.hadoop.mapreduce.JobContext;
45  import org.apache.hadoop.mapreduce.RecordReader;
46  import org.apache.hadoop.mapreduce.TaskAttemptContext;
47  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
48  import org.apache.hadoop.util.StringUtils;
49  
50  /**
51   * Simple {@link InputFormat} for {@link org.apache.hadoop.hbase.wal.WAL} files.
52   */
53  @InterfaceAudience.Public
54  public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
55    private static final Log LOG = LogFactory.getLog(WALInputFormat.class);
56  
57    public static final String START_TIME_KEY = "wal.start.time";
58    public static final String END_TIME_KEY = "wal.end.time";
59  
60    /**
61     * {@link InputSplit} for {@link WAL} files. Each split represent
62     * exactly one log file.
63     */
64    static class WALSplit extends InputSplit implements Writable {
65      private String logFileName;
66      private long fileSize;
67      private long startTime;
68      private long endTime;
69  
70      /** for serialization */
71      public WALSplit() {}
72  
73      /**
74       * Represent an WALSplit, i.e. a single WAL file.
75       * Start- and EndTime are managed by the split, so that WAL files can be
76       * filtered before WALEdits are passed to the mapper(s).
77       * @param logFileName
78       * @param fileSize
79       * @param startTime
80       * @param endTime
81       */
82      public WALSplit(String logFileName, long fileSize, long startTime, long endTime) {
83        this.logFileName = logFileName;
84        this.fileSize = fileSize;
85        this.startTime = startTime;
86        this.endTime = endTime;
87      }
88  
89      @Override
90      public long getLength() throws IOException, InterruptedException {
91        return fileSize;
92      }
93  
94      @Override
95      public String[] getLocations() throws IOException, InterruptedException {
96        // TODO: Find the data node with the most blocks for this WAL?
97        return new String[] {};
98      }
99  
100     public String getLogFileName() {
101       return logFileName;
102     }
103 
104     public long getStartTime() {
105       return startTime;
106     }
107 
108     public long getEndTime() {
109       return endTime;
110     }
111 
112     @Override
113     public void readFields(DataInput in) throws IOException {
114       logFileName = in.readUTF();
115       fileSize = in.readLong();
116       startTime = in.readLong();
117       endTime = in.readLong();
118     }
119 
120     @Override
121     public void write(DataOutput out) throws IOException {
122       out.writeUTF(logFileName);
123       out.writeLong(fileSize);
124       out.writeLong(startTime);
125       out.writeLong(endTime);
126     }
127 
128     @Override
129     public String toString() {
130       return logFileName + " (" + startTime + ":" + endTime + ") length:" + fileSize;
131     }
132   }
133 
134   /**
135    * {@link RecordReader} for an {@link WAL} file.
136    * Implementation shared with deprecated HLogInputFormat.
137    */
138   static abstract class WALRecordReader<K extends WALKey> extends RecordReader<K, WALEdit> {
139     private Reader reader = null;
140     // visible until we can remove the deprecated HLogInputFormat
141     Entry currentEntry = new Entry();
142     private long startTime;
143     private long endTime;
144 
145     @Override
146     public void initialize(InputSplit split, TaskAttemptContext context)
147         throws IOException, InterruptedException {
148       WALSplit hsplit = (WALSplit)split;
149       Path logFile = new Path(hsplit.getLogFileName());
150       Configuration conf = context.getConfiguration();
151       LOG.info("Opening reader for "+split);
152       try {
153         this.reader = WALFactory.createReader(logFile.getFileSystem(conf), logFile, conf);
154       } catch (EOFException x) {
155         LOG.info("Ignoring corrupted WAL file: " + logFile
156             + " (This is normal when a RegionServer crashed.)");
157         this.reader = null;
158       }
159       this.startTime = hsplit.getStartTime();
160       this.endTime = hsplit.getEndTime();
161     }
162 
163     @Override
164     public boolean nextKeyValue() throws IOException, InterruptedException {
165       if (reader == null) return false;
166 
167       Entry temp;
168       long i = -1;
169       do {
170         // skip older entries
171         try {
172           temp = reader.next(currentEntry);
173           i++;
174         } catch (EOFException x) {
175           LOG.info("Corrupted entry detected. Ignoring the rest of the file."
176               + " (This is normal when a RegionServer crashed.)");
177           return false;
178         }
179       }
180       while(temp != null && temp.getKey().getWriteTime() < startTime);
181 
182       if (temp == null) {
183         if (i > 0) LOG.info("Skipped " + i + " entries.");
184         LOG.info("Reached end of file.");
185         return false;
186       } else if (i > 0) {
187         LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + ".");
188       }
189       boolean res = temp.getKey().getWriteTime() <= endTime;
190       if (!res) {
191         LOG.info("Reached ts: " + temp.getKey().getWriteTime() + " ignoring the rest of the file.");
192       }
193       return res;
194     }
195 
196     @Override
197     public WALEdit getCurrentValue() throws IOException, InterruptedException {
198       return currentEntry.getEdit();
199     }
200 
201     @Override
202     public float getProgress() throws IOException, InterruptedException {
203       // N/A depends on total number of entries, which is unknown
204       return 0;
205     }
206 
207     @Override
208     public void close() throws IOException {
209       LOG.info("Closing reader");
210       if (reader != null) this.reader.close();
211     }
212   }
213 
214   /**
215    * handler for non-deprecated WALKey version. fold into WALRecordReader once we no longer
216    * need to support HLogInputFormat.
217    */
218   static class WALKeyRecordReader extends WALRecordReader<WALKey> {
219     @Override
220     public WALKey getCurrentKey() throws IOException, InterruptedException {
221       return currentEntry.getKey();
222     }
223   }
224 
225   @Override
226   public List<InputSplit> getSplits(JobContext context) throws IOException,
227       InterruptedException {
228     return getSplits(context, START_TIME_KEY, END_TIME_KEY);
229   }
230 
231   /**
232    * implementation shared with deprecated HLogInputFormat
233    */
234   List<InputSplit> getSplits(final JobContext context, final String startKey, final String endKey)
235       throws IOException, InterruptedException {
236     Configuration conf = context.getConfiguration();
237 
238     Path[] inputPaths = getInputPaths(conf);
239 
240     long startTime = conf.getLong(startKey, Long.MIN_VALUE);
241     long endTime = conf.getLong(endKey, Long.MAX_VALUE);
242 
243     List<FileStatus> allFiles = new ArrayList<FileStatus>();
244     for(Path inputPath: inputPaths){
245       FileSystem fs = inputPath.getFileSystem(conf);
246       List<FileStatus> files = getFiles(fs, inputPath, startTime, endTime);
247       allFiles.addAll(files);
248     }
249     List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size());
250     for (FileStatus file : allFiles) {
251       splits.add(new WALSplit(file.getPath().toString(), file.getLen(), startTime, endTime));
252     }
253     return splits;
254   }
255 
256   private Path[] getInputPaths(Configuration conf) {
257     String inpDirs = conf.get(FileInputFormat.INPUT_DIR);
258     return StringUtils.stringToPath(inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ",")));
259   }
260 
261   private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime)
262       throws IOException {
263     List<FileStatus> result = new ArrayList<FileStatus>();
264     LOG.debug("Scanning " + dir.toString() + " for WAL files");
265 
266     FileStatus[] files = fs.listStatus(dir);
267     if (files == null) return Collections.emptyList();
268     for (FileStatus file : files) {
269       if (file.isDirectory()) {
270         // recurse into sub directories
271         result.addAll(getFiles(fs, file.getPath(), startTime, endTime));
272       } else {
273         String name = file.getPath().toString();
274         int idx = name.lastIndexOf('.');
275         if (idx > 0) {
276           try {
277             long fileStartTime = Long.parseLong(name.substring(idx+1));
278             if (fileStartTime <= endTime) {
279               LOG.info("Found: " + name);
280               result.add(file);
281             }
282           } catch (NumberFormatException x) {
283             idx = 0;
284           }
285         }
286         if (idx == 0) {
287           LOG.warn("File " + name + " does not appear to be an WAL file. Skipping...");
288         }
289       }
290     }
291     return result;
292   }
293 
294   @Override
295   public RecordReader<WALKey, WALEdit> createRecordReader(InputSplit split,
296       TaskAttemptContext context) throws IOException, InterruptedException {
297     return new WALKeyRecordReader();
298   }
299 }