1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2.store.wal;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.util.Iterator;
25
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.classification.InterfaceStability;
28 import org.apache.hadoop.fs.FSDataInputStream;
29 import org.apache.hadoop.fs.FSDataOutputStream;
30 import org.apache.hadoop.hbase.io.util.StreamUtils;
31 import org.apache.hadoop.hbase.procedure2.Procedure;
32 import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
33 import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
34 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
35 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
36 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALTrailer;
37
38 import com.google.protobuf.InvalidProtocolBufferException;
39
40
41
42
43 @InterfaceAudience.Private
44 @InterfaceStability.Evolving
45 public final class ProcedureWALFormat {
46 static final byte LOG_TYPE_STREAM = 0;
47 static final byte LOG_TYPE_COMPACTED = 1;
48 static final byte LOG_TYPE_MAX_VALID = 1;
49
50 static final byte HEADER_VERSION = 1;
51 static final byte TRAILER_VERSION = 1;
52 static final long HEADER_MAGIC = 0x31764c4157637250L;
53 static final long TRAILER_MAGIC = 0x50726357414c7631L;
54
55 @InterfaceAudience.Private
56 public static class InvalidWALDataException extends IOException {
57 public InvalidWALDataException(String s) {
58 super(s);
59 }
60
61 public InvalidWALDataException(Throwable t) {
62 super(t);
63 }
64 }
65
66 interface Loader {
67 void markCorruptedWAL(ProcedureWALFile log, IOException e);
68 }
69
70 private ProcedureWALFormat() {}
71
72 public static Iterator<Procedure> load(final Iterator<ProcedureWALFile> logs,
73 final ProcedureStoreTracker tracker, final Loader loader) throws IOException {
74 ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker);
75 tracker.setKeepDeletes(true);
76 try {
77 while (logs.hasNext()) {
78 ProcedureWALFile log = logs.next();
79 log.open();
80 try {
81 reader.read(log, loader);
82 } finally {
83 log.close();
84 }
85 }
86
87 tracker.setPartialFlag(false);
88 tracker.resetUpdates();
89 } finally {
90 tracker.setKeepDeletes(false);
91 }
92
93 return reader.getProcedures();
94 }
95
96 public static void writeHeader(OutputStream stream, ProcedureWALHeader header)
97 throws IOException {
98 header.writeDelimitedTo(stream);
99 }
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116 public static long writeTrailer(FSDataOutputStream stream, ProcedureStoreTracker tracker)
117 throws IOException {
118 long offset = stream.getPos();
119
120
121 ProcedureWALEntry.newBuilder()
122 .setType(ProcedureWALEntry.Type.PROCEDURE_WAL_EOF)
123 .build().writeDelimitedTo(stream);
124
125
126 tracker.writeTo(stream);
127
128 stream.write(TRAILER_VERSION);
129 StreamUtils.writeLong(stream, TRAILER_MAGIC);
130 StreamUtils.writeLong(stream, offset);
131 return stream.getPos() - offset;
132 }
133
134 public static ProcedureWALHeader readHeader(InputStream stream)
135 throws IOException {
136 ProcedureWALHeader header;
137 try {
138 header = ProcedureWALHeader.parseDelimitedFrom(stream);
139 } catch (InvalidProtocolBufferException e) {
140 throw new InvalidWALDataException(e);
141 }
142
143 if (header == null) {
144 throw new InvalidWALDataException("No data available to read the Header");
145 }
146
147 if (header.getVersion() < 0 || header.getVersion() != HEADER_VERSION) {
148 throw new InvalidWALDataException("Invalid Header version. got " + header.getVersion() +
149 " expected " + HEADER_VERSION);
150 }
151
152 if (header.getType() < 0 || header.getType() > LOG_TYPE_MAX_VALID) {
153 throw new InvalidWALDataException("Invalid header type. got " + header.getType());
154 }
155
156 return header;
157 }
158
159 public static ProcedureWALTrailer readTrailer(FSDataInputStream stream, long startPos, long size)
160 throws IOException {
161 long trailerPos = size - 17;
162
163 if (trailerPos < startPos) {
164 throw new InvalidWALDataException("Missing trailer: size=" + size + " startPos=" + startPos);
165 }
166
167 stream.seek(trailerPos);
168 int version = stream.read();
169 if (version != TRAILER_VERSION) {
170 throw new InvalidWALDataException("Invalid Trailer version. got " + version +
171 " expected " + TRAILER_VERSION);
172 }
173
174 long magic = StreamUtils.readLong(stream);
175 if (magic != TRAILER_MAGIC) {
176 throw new InvalidWALDataException("Invalid Trailer magic. got " + magic +
177 " expected " + TRAILER_MAGIC);
178 }
179
180 long trailerOffset = StreamUtils.readLong(stream);
181 stream.seek(trailerOffset);
182
183 ProcedureWALEntry entry = readEntry(stream);
184 if (entry.getType() != ProcedureWALEntry.Type.PROCEDURE_WAL_EOF) {
185 throw new InvalidWALDataException("Invalid Trailer begin");
186 }
187
188 ProcedureWALTrailer trailer = ProcedureWALTrailer.newBuilder()
189 .setVersion(version)
190 .setTrackerPos(stream.getPos())
191 .build();
192 return trailer;
193 }
194
195 public static ProcedureWALEntry readEntry(InputStream stream) throws IOException {
196 return ProcedureWALEntry.parseDelimitedFrom(stream);
197 }
198
199 public static void writeEntry(ByteSlot slot, ProcedureWALEntry.Type type,
200 Procedure proc, Procedure[] subprocs) throws IOException {
201 ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
202 builder.setType(type);
203 builder.addProcedure(Procedure.convert(proc));
204 if (subprocs != null) {
205 for (int i = 0; i < subprocs.length; ++i) {
206 builder.addProcedure(Procedure.convert(subprocs[i]));
207 }
208 }
209 builder.build().writeDelimitedTo(slot);
210 }
211
212 public static void writeInsert(ByteSlot slot, Procedure proc)
213 throws IOException {
214 writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INIT, proc, null);
215 }
216
217 public static void writeInsert(ByteSlot slot, Procedure proc, Procedure[] subprocs)
218 throws IOException {
219 writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INSERT, proc, subprocs);
220 }
221
222 public static void writeUpdate(ByteSlot slot, Procedure proc)
223 throws IOException {
224 writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_UPDATE, proc, null);
225 }
226
227 public static void writeDelete(ByteSlot slot, long procId)
228 throws IOException {
229 ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
230 builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE);
231 builder.setProcId(procId);
232 builder.build().writeDelimitedTo(slot);
233 }
234 }