View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.procedure2.store.wal;
20  
21  import java.io.IOException;
22  import java.util.Iterator;
23  import java.util.Map;
24  import java.util.HashMap;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.fs.FSDataInputStream;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.classification.InterfaceStability;
31  import org.apache.hadoop.hbase.ProcedureInfo;
32  import org.apache.hadoop.hbase.procedure2.Procedure;
33  import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
34  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
35  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
36  
37  import com.google.protobuf.InvalidProtocolBufferException;
38  
39  /**
40   * Helper class that loads the procedures stored in a WAL
41   */
42  @InterfaceAudience.Private
43  @InterfaceStability.Evolving
44  public class ProcedureWALFormatReader {
45    private static final Log LOG = LogFactory.getLog(ProcedureWALFormatReader.class);
46    private static final String SERVER_CRASH_PROCEDURE_CLASS_NAME =
47        "org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure";
48  
49    private final ProcedureStoreTracker tracker;
50    //private final long compactionLogId;
51  
52    private final Map<Long, Procedure> procedures = new HashMap<Long, Procedure>();
53    private final Map<Long, ProcedureProtos.Procedure> localProcedures =
54      new HashMap<Long, ProcedureProtos.Procedure>();
55  
56    private long maxProcId = 0;
57  
58    public ProcedureWALFormatReader(final ProcedureStoreTracker tracker) {
59      this.tracker = tracker;
60    }
61  
62    public void read(ProcedureWALFile log, ProcedureWALFormat.Loader loader) throws IOException {
63      FSDataInputStream stream = log.getStream();
64      try {
65        boolean hasMore = true;
66        while (hasMore) {
67          ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream);
68          if (entry == null) {
69            LOG.warn("nothing left to decode. exiting with missing EOF");
70            hasMore = false;
71            break;
72          }
73          switch (entry.getType()) {
74            case PROCEDURE_WAL_INIT:
75              readInitEntry(entry);
76              break;
77            case PROCEDURE_WAL_INSERT:
78              readInsertEntry(entry);
79              break;
80            case PROCEDURE_WAL_UPDATE:
81            case PROCEDURE_WAL_COMPACT:
82              readUpdateEntry(entry);
83              break;
84            case PROCEDURE_WAL_DELETE:
85              readDeleteEntry(entry);
86              break;
87            case PROCEDURE_WAL_EOF:
88              hasMore = false;
89              break;
90            default:
91              throw new CorruptedWALProcedureStoreException("Invalid entry: " + entry);
92          }
93        }
94      } catch (InvalidProtocolBufferException e) {
95        LOG.error("got an exception while reading the procedure WAL: " + log, e);
96        loader.markCorruptedWAL(log, e);
97      }
98  
99      if (!localProcedures.isEmpty()) {
100       Iterator<Map.Entry<Long, ProcedureProtos.Procedure>> itd =
101         localProcedures.entrySet().iterator();
102       long minProcId = Long.MAX_VALUE;
103       long maxProcId = Long.MIN_VALUE;
104       while (itd.hasNext()) {
105         Map.Entry<Long, ProcedureProtos.Procedure> entry = itd.next();
106         itd.remove();
107 
108         long procId = entry.getKey();
109         minProcId = Math.min(minProcId, procId);
110         maxProcId = Math.max(maxProcId, procId);
111 
112         deserializeAndStoreProcedure(procedures, entry.getValue(), procId);
113       }
114 
115       // TODO: Some procedure may be already runnables (see readInitEntry())
116       //       (we can also check the "update map" in the log trackers)
117       log.setProcIds(minProcId, maxProcId);
118     }
119   }
120 
121   @SuppressWarnings("rawtypes") 
122   void deserializeAndStoreProcedure(
123       Map<Long,Procedure> procedures, ProcedureProtos.Procedure procedure,
124       Long procId) throws IOException {
125     /* BIG WARNING:
126      *   This conditional was added to support the migration of IOP (HBase 1.2.x)
127      *   clusters to HDP clusters (HBase 1.1.x). The newer clusters have a procedure
128      *   called ServerCrashProcedure which does not exist in HDP (handled instead
129      *   by the ServerShutdownHandler). While HDP remains on a version in which
130      *   this ServerCrashProcedure doesn't exist, we need to make sure that we
131      *   ignore such a procedure from IOP clusters. We MUST not propagate this
132      *   change to a version of HDP which DOES have the ServerCrashProcedure.
133      */
134     if (!SERVER_CRASH_PROCEDURE_CLASS_NAME.equals(procedure.getClassName())) {
135       // Deserialize the procedure
136       Procedure proc = convert(procedure);
137       procedures.put(procId, proc);
138     } else {
139       LOG.info("Skipping known, missing ServerCrashProcedure implementation");
140     }
141   }
142 
143   Procedure convert(ProcedureProtos.Procedure procedure) throws IOException {
144     return Procedure.convert(procedure);
145   }
146 
147   public Iterator<Procedure> getProcedures() {
148     return procedures.values().iterator();
149   }
150 
151   private void loadEntries(final ProcedureWALEntry entry) {
152     for (ProcedureProtos.Procedure proc: entry.getProcedureList()) {
153       maxProcId = Math.max(maxProcId, proc.getProcId());
154       if (isRequired(proc.getProcId())) {
155         if (LOG.isTraceEnabled()) {
156           LOG.trace("read " + entry.getType() + " entry " + proc.getProcId());
157         }
158         localProcedures.put(proc.getProcId(), proc);
159         tracker.setDeleted(proc.getProcId(), false);
160       }
161     }
162   }
163 
164   private void readInitEntry(final ProcedureWALEntry entry)
165       throws IOException {
166     assert entry.getProcedureCount() == 1 : "Expected only one procedure";
167     // TODO: Make it runnable, before reading other files
168     loadEntries(entry);
169   }
170 
171   private void readInsertEntry(final ProcedureWALEntry entry) throws IOException {
172     assert entry.getProcedureCount() >= 1 : "Expected one or more procedures";
173     loadEntries(entry);
174   }
175 
176   private void readUpdateEntry(final ProcedureWALEntry entry) throws IOException {
177     assert entry.getProcedureCount() == 1 : "Expected only one procedure";
178     loadEntries(entry);
179   }
180 
181   private void readDeleteEntry(final ProcedureWALEntry entry) throws IOException {
182     assert entry.getProcedureCount() == 0 : "Expected no procedures";
183     assert entry.hasProcId() : "expected ProcID";
184     if (LOG.isTraceEnabled()) {
185       LOG.trace("read delete entry " + entry.getProcId());
186     }
187     maxProcId = Math.max(maxProcId, entry.getProcId());
188     localProcedures.remove(entry.getProcId());
189     assert !procedures.containsKey(entry.getProcId());
190     tracker.setDeleted(entry.getProcId(), true);
191   }
192 
193   private boolean isDeleted(final long procId) {
194     return tracker.isDeleted(procId) == ProcedureStoreTracker.DeleteState.YES;
195   }
196 
197   private boolean isRequired(final long procId) {
198     return !isDeleted(procId) && !procedures.containsKey(procId);
199   }
200 }