1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2.store.wal;
20
21 import java.io.IOException;
22 import java.util.Iterator;
23 import java.util.Map;
24 import java.util.HashMap;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.fs.FSDataInputStream;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.hbase.classification.InterfaceStability;
31 import org.apache.hadoop.hbase.ProcedureInfo;
32 import org.apache.hadoop.hbase.procedure2.Procedure;
33 import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
34 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
35 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
36
37 import com.google.protobuf.InvalidProtocolBufferException;
38
39
40
41
42 @InterfaceAudience.Private
43 @InterfaceStability.Evolving
44 public class ProcedureWALFormatReader {
45 private static final Log LOG = LogFactory.getLog(ProcedureWALFormatReader.class);
46 private static final String SERVER_CRASH_PROCEDURE_CLASS_NAME =
47 "org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure";
48
49 private final ProcedureStoreTracker tracker;
50
51
52 private final Map<Long, Procedure> procedures = new HashMap<Long, Procedure>();
53 private final Map<Long, ProcedureProtos.Procedure> localProcedures =
54 new HashMap<Long, ProcedureProtos.Procedure>();
55
56 private long maxProcId = 0;
57
58 public ProcedureWALFormatReader(final ProcedureStoreTracker tracker) {
59 this.tracker = tracker;
60 }
61
62 public void read(ProcedureWALFile log, ProcedureWALFormat.Loader loader) throws IOException {
63 FSDataInputStream stream = log.getStream();
64 try {
65 boolean hasMore = true;
66 while (hasMore) {
67 ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream);
68 if (entry == null) {
69 LOG.warn("nothing left to decode. exiting with missing EOF");
70 hasMore = false;
71 break;
72 }
73 switch (entry.getType()) {
74 case PROCEDURE_WAL_INIT:
75 readInitEntry(entry);
76 break;
77 case PROCEDURE_WAL_INSERT:
78 readInsertEntry(entry);
79 break;
80 case PROCEDURE_WAL_UPDATE:
81 case PROCEDURE_WAL_COMPACT:
82 readUpdateEntry(entry);
83 break;
84 case PROCEDURE_WAL_DELETE:
85 readDeleteEntry(entry);
86 break;
87 case PROCEDURE_WAL_EOF:
88 hasMore = false;
89 break;
90 default:
91 throw new CorruptedWALProcedureStoreException("Invalid entry: " + entry);
92 }
93 }
94 } catch (InvalidProtocolBufferException e) {
95 LOG.error("got an exception while reading the procedure WAL: " + log, e);
96 loader.markCorruptedWAL(log, e);
97 }
98
99 if (!localProcedures.isEmpty()) {
100 Iterator<Map.Entry<Long, ProcedureProtos.Procedure>> itd =
101 localProcedures.entrySet().iterator();
102 long minProcId = Long.MAX_VALUE;
103 long maxProcId = Long.MIN_VALUE;
104 while (itd.hasNext()) {
105 Map.Entry<Long, ProcedureProtos.Procedure> entry = itd.next();
106 itd.remove();
107
108 long procId = entry.getKey();
109 minProcId = Math.min(minProcId, procId);
110 maxProcId = Math.max(maxProcId, procId);
111
112 deserializeAndStoreProcedure(procedures, entry.getValue(), procId);
113 }
114
115
116
117 log.setProcIds(minProcId, maxProcId);
118 }
119 }
120
121 @SuppressWarnings("rawtypes")
122 void deserializeAndStoreProcedure(
123 Map<Long,Procedure> procedures, ProcedureProtos.Procedure procedure,
124 Long procId) throws IOException {
125
126
127
128
129
130
131
132
133
134 if (!SERVER_CRASH_PROCEDURE_CLASS_NAME.equals(procedure.getClassName())) {
135
136 Procedure proc = convert(procedure);
137 procedures.put(procId, proc);
138 } else {
139 LOG.info("Skipping known, missing ServerCrashProcedure implementation");
140 }
141 }
142
143 Procedure convert(ProcedureProtos.Procedure procedure) throws IOException {
144 return Procedure.convert(procedure);
145 }
146
147 public Iterator<Procedure> getProcedures() {
148 return procedures.values().iterator();
149 }
150
151 private void loadEntries(final ProcedureWALEntry entry) {
152 for (ProcedureProtos.Procedure proc: entry.getProcedureList()) {
153 maxProcId = Math.max(maxProcId, proc.getProcId());
154 if (isRequired(proc.getProcId())) {
155 if (LOG.isTraceEnabled()) {
156 LOG.trace("read " + entry.getType() + " entry " + proc.getProcId());
157 }
158 localProcedures.put(proc.getProcId(), proc);
159 tracker.setDeleted(proc.getProcId(), false);
160 }
161 }
162 }
163
164 private void readInitEntry(final ProcedureWALEntry entry)
165 throws IOException {
166 assert entry.getProcedureCount() == 1 : "Expected only one procedure";
167
168 loadEntries(entry);
169 }
170
171 private void readInsertEntry(final ProcedureWALEntry entry) throws IOException {
172 assert entry.getProcedureCount() >= 1 : "Expected one or more procedures";
173 loadEntries(entry);
174 }
175
176 private void readUpdateEntry(final ProcedureWALEntry entry) throws IOException {
177 assert entry.getProcedureCount() == 1 : "Expected only one procedure";
178 loadEntries(entry);
179 }
180
181 private void readDeleteEntry(final ProcedureWALEntry entry) throws IOException {
182 assert entry.getProcedureCount() == 0 : "Expected no procedures";
183 assert entry.hasProcId() : "expected ProcID";
184 if (LOG.isTraceEnabled()) {
185 LOG.trace("read delete entry " + entry.getProcId());
186 }
187 maxProcId = Math.max(maxProcId, entry.getProcId());
188 localProcedures.remove(entry.getProcId());
189 assert !procedures.containsKey(entry.getProcId());
190 tracker.setDeleted(entry.getProcId(), true);
191 }
192
193 private boolean isDeleted(final long procId) {
194 return tracker.isDeleted(procId) == ProcedureStoreTracker.DeleteState.YES;
195 }
196
197 private boolean isRequired(final long procId) {
198 return !isDeleted(procId) && !procedures.containsKey(procId);
199 }
200 }