View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.procedure2.store.wal;
20  
21  import java.io.IOException;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.fs.FSDataInputStream;
26  import org.apache.hadoop.fs.FileStatus;
27  import org.apache.hadoop.fs.FileSystem;
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.classification.InterfaceStability;
31  import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
32  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
33  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALTrailer;
34  
35  /**
36   * Describes a WAL File
37   */
38  @InterfaceAudience.Private
39  @InterfaceStability.Evolving
40  public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
41    private static final Log LOG = LogFactory.getLog(ProcedureWALFile.class);
42  
43    private ProcedureWALHeader header;
44    private FSDataInputStream stream;
45    private FileSystem fs;
46    private Path logFile;
47    private long startPos;
48    private long minProcId;
49    private long maxProcId;
50    private long logSize;
51    private long timestamp;
52  
53    public ProcedureWALFile(final FileSystem fs, final FileStatus logStatus) {
54      this.fs = fs;
55      this.logFile = logStatus.getPath();
56      this.logSize = logStatus.getLen();
57      this.timestamp = logStatus.getModificationTime();
58    }
59  
60    public ProcedureWALFile(FileSystem fs, Path logFile, ProcedureWALHeader header,
61        long startPos, long timestamp) {
62      this.fs = fs;
63      this.header = header;
64      this.logFile = logFile;
65      this.startPos = startPos;
66      this.logSize = startPos;
67      this.timestamp = timestamp;
68    }
69  
70    public void open() throws IOException {
71      if (stream == null) {
72        stream = fs.open(logFile);
73      }
74  
75      if (header == null) {
76        header = ProcedureWALFormat.readHeader(stream);
77        startPos = stream.getPos();
78      } else {
79        stream.seek(startPos);
80      }
81    }
82  
83    public ProcedureWALTrailer readTrailer() throws IOException {
84      try {
85        return ProcedureWALFormat.readTrailer(stream, startPos, logSize);
86      } finally {
87        stream.seek(startPos);
88      }
89    }
90  
91    public void readTracker(ProcedureStoreTracker tracker) throws IOException {
92      ProcedureWALTrailer trailer = readTrailer();
93      try {
94        stream.seek(trailer.getTrackerPos());
95        tracker.readFrom(stream);
96      } finally {
97        stream.seek(startPos);
98      }
99    }
100 
101   public void close() {
102     if (stream == null) return;
103     try {
104       stream.close();
105     } catch (IOException e) {
106       LOG.warn("unable to close the wal file: " + logFile, e);
107     } finally {
108       stream = null;
109     }
110   }
111 
112   public FSDataInputStream getStream() {
113     return stream;
114   }
115 
116   public ProcedureWALHeader getHeader() {
117     return header;
118   }
119 
120   public long getTimestamp() {
121     return timestamp;
122   }
123 
124   public boolean isCompacted() {
125     return header.getType() == ProcedureWALFormat.LOG_TYPE_COMPACTED;
126   }
127 
128   public long getLogId() {
129     return header.getLogId();
130   }
131 
132   public long getSize() {
133     return logSize;
134   }
135 
136   /**
137    * Used to update in-progress log sizes. the FileStatus will report 0 otherwise.
138    */
139   void addToSize(long size) {
140     this.logSize += size;
141   }
142 
143   public void removeFile() throws IOException {
144     close();
145     fs.delete(logFile, false);
146   }
147 
148   public void setProcIds(long minId, long maxId) {
149     this.minProcId = minId;
150     this.maxProcId = maxId;
151   }
152 
153   public long getMinProcId() {
154     return minProcId;
155   }
156 
157   public long getMaxProcId() {
158     return maxProcId;
159   }
160 
161   @Override
162   public int compareTo(final ProcedureWALFile other) {
163     long diff = header.getLogId() - other.header.getLogId();
164     return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
165   }
166 
167   @Override
168   public boolean equals(Object o) {
169     if (this == o) return true;
170     if (!(o instanceof ProcedureWALFile)) return false;
171     return compareTo((ProcedureWALFile)o) == 0;
172   }
173 
174   @Override
175   public int hashCode() {
176     return logFile.hashCode();
177   }
178 
179   @Override
180   public String toString() {
181     return logFile.toString();
182   }
183 }