View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.procedure2.store.wal;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  import java.util.Iterator;
25  
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.hbase.classification.InterfaceStability;
28  import org.apache.hadoop.fs.FSDataInputStream;
29  import org.apache.hadoop.fs.FSDataOutputStream;
30  import org.apache.hadoop.hbase.io.util.StreamUtils;
31  import org.apache.hadoop.hbase.procedure2.Procedure;
32  import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
33  import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
34  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
35  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
36  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALTrailer;
37  
38  import com.google.protobuf.InvalidProtocolBufferException;
39  
40  /**
41   * Helper class that contains the WAL serialization utils.
42   */
43  @InterfaceAudience.Private
44  @InterfaceStability.Evolving
45  public final class ProcedureWALFormat {
46    static final byte LOG_TYPE_STREAM = 0;
47    static final byte LOG_TYPE_COMPACTED = 1;
48    static final byte LOG_TYPE_MAX_VALID = 1;
49  
50    static final byte HEADER_VERSION = 1;
51    static final byte TRAILER_VERSION = 1;
52    static final long HEADER_MAGIC = 0x31764c4157637250L;
53    static final long TRAILER_MAGIC = 0x50726357414c7631L;
54  
55    @InterfaceAudience.Private
56    public static class InvalidWALDataException extends IOException {
57      public InvalidWALDataException(String s) {
58        super(s);
59      }
60  
61      public InvalidWALDataException(Throwable t) {
62        super(t);
63      }
64    }
65  
66    interface Loader {
67      void markCorruptedWAL(ProcedureWALFile log, IOException e);
68    }
69  
70    private ProcedureWALFormat() {}
71  
72    public static Iterator<Procedure> load(final Iterator<ProcedureWALFile> logs,
73        final ProcedureStoreTracker tracker, final Loader loader) throws IOException {
74      ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker);
75      tracker.setKeepDeletes(true);
76      try {
77        while (logs.hasNext()) {
78          ProcedureWALFile log = logs.next();
79          log.open();
80          try {
81            reader.read(log, loader);
82          } finally {
83            log.close();
84          }
85        }
86        // The tracker is now updated with all the procedures read from the logs
87        tracker.setPartialFlag(false);
88        tracker.resetUpdates();
89      } finally {
90        tracker.setKeepDeletes(false);
91      }
92      // TODO: Write compacted version?
93      return reader.getProcedures();
94    }
95  
96    public static void writeHeader(OutputStream stream, ProcedureWALHeader header)
97        throws IOException {
98      header.writeDelimitedTo(stream);
99    }
100 
101   /*
102    * +-----------------+
103    * | END OF WAL DATA | <---+
104    * +-----------------+     |
105    * |                 |     |
106    * |     Tracker     |     |
107    * |                 |     |
108    * +-----------------+     |
109    * |     version     |     |
110    * +-----------------+     |
111    * |  TRAILER_MAGIC  |     |
112    * +-----------------+     |
113    * |      offset     |-----+
114    * +-----------------+
115    */
116   public static long writeTrailer(FSDataOutputStream stream, ProcedureStoreTracker tracker)
117       throws IOException {
118     long offset = stream.getPos();
119 
120     // Write EOF Entry
121     ProcedureWALEntry.newBuilder()
122       .setType(ProcedureWALEntry.Type.PROCEDURE_WAL_EOF)
123       .build().writeDelimitedTo(stream);
124 
125     // Write Tracker
126     tracker.writeTo(stream);
127 
128     stream.write(TRAILER_VERSION);
129     StreamUtils.writeLong(stream, TRAILER_MAGIC);
130     StreamUtils.writeLong(stream, offset);
131     return stream.getPos() - offset;
132   }
133 
134   public static ProcedureWALHeader readHeader(InputStream stream)
135       throws IOException {
136     ProcedureWALHeader header;
137     try {
138       header = ProcedureWALHeader.parseDelimitedFrom(stream);
139     } catch (InvalidProtocolBufferException e) {
140       throw new InvalidWALDataException(e);
141     }
142 
143     if (header == null) {
144       throw new InvalidWALDataException("No data available to read the Header");
145     }
146 
147     if (header.getVersion() < 0 || header.getVersion() != HEADER_VERSION) {
148       throw new InvalidWALDataException("Invalid Header version. got " + header.getVersion() +
149           " expected " + HEADER_VERSION);
150     }
151 
152     if (header.getType() < 0 || header.getType() > LOG_TYPE_MAX_VALID) {
153       throw new InvalidWALDataException("Invalid header type. got " + header.getType());
154     }
155 
156     return header;
157   }
158 
159   public static ProcedureWALTrailer readTrailer(FSDataInputStream stream, long startPos, long size)
160       throws IOException {
161     long trailerPos = size - 17; // Beginning of the Trailer Jump
162 
163     if (trailerPos < startPos) {
164       throw new InvalidWALDataException("Missing trailer: size=" + size + " startPos=" + startPos);
165     }
166 
167     stream.seek(trailerPos);
168     int version = stream.read();
169     if (version != TRAILER_VERSION) {
170       throw new InvalidWALDataException("Invalid Trailer version. got " + version +
171           " expected " + TRAILER_VERSION);
172     }
173 
174     long magic = StreamUtils.readLong(stream);
175     if (magic != TRAILER_MAGIC) {
176       throw new InvalidWALDataException("Invalid Trailer magic. got " + magic +
177           " expected " + TRAILER_MAGIC);
178     }
179 
180     long trailerOffset = StreamUtils.readLong(stream);
181     stream.seek(trailerOffset);
182 
183     ProcedureWALEntry entry = readEntry(stream);
184     if (entry.getType() != ProcedureWALEntry.Type.PROCEDURE_WAL_EOF) {
185       throw new InvalidWALDataException("Invalid Trailer begin");
186     }
187 
188     ProcedureWALTrailer trailer = ProcedureWALTrailer.newBuilder()
189       .setVersion(version)
190       .setTrackerPos(stream.getPos())
191       .build();
192     return trailer;
193   }
194 
195   public static ProcedureWALEntry readEntry(InputStream stream) throws IOException {
196     return ProcedureWALEntry.parseDelimitedFrom(stream);
197   }
198 
199   public static void writeEntry(ByteSlot slot, ProcedureWALEntry.Type type,
200       Procedure proc, Procedure[] subprocs) throws IOException {
201     ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
202     builder.setType(type);
203     builder.addProcedure(Procedure.convert(proc));
204     if (subprocs != null) {
205       for (int i = 0; i < subprocs.length; ++i) {
206         builder.addProcedure(Procedure.convert(subprocs[i]));
207       }
208     }
209     builder.build().writeDelimitedTo(slot);
210   }
211 
212   public static void writeInsert(ByteSlot slot, Procedure proc)
213       throws IOException {
214     writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INIT, proc, null);
215   }
216 
217   public static void writeInsert(ByteSlot slot, Procedure proc, Procedure[] subprocs)
218       throws IOException {
219     writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INSERT, proc, subprocs);
220   }
221 
222   public static void writeUpdate(ByteSlot slot, Procedure proc)
223       throws IOException {
224     writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_UPDATE, proc, null);
225   }
226 
227   public static void writeDelete(ByteSlot slot, long procId)
228       throws IOException {
229     ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
230     builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE);
231     builder.setProcId(procId);
232     builder.build().writeDelimitedTo(slot);
233   }
234 }