View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.procedure;
19  
20  import java.io.IOException;
21  import java.util.Arrays;
22  import java.util.List;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.hbase.errorhandling.ForeignException;
28  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
29  import org.apache.hadoop.hbase.util.Bytes;
30  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
31  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
32  import org.apache.zookeeper.KeeperException;
33  
34  import com.google.protobuf.InvalidProtocolBufferException;
35  
36  /**
37   * ZooKeeper based controller for a procedure member.
38   * <p>
39   * There can only be one {@link ZKProcedureMemberRpcs} per procedure type per member,
40   * since each procedure type is bound to a single set of znodes. You can have multiple
41   * {@link ZKProcedureMemberRpcs} on the same server, each serving a different member
42   * name, but each individual rpcs is still bound to a single member name (and since they are
43   * used to determine global progress, its important to not get this wrong).
44   * <p>
45   * To make this slightly more confusing, you can run multiple, concurrent procedures at the same
46   * time (as long as they have different types), from the same controller, but the same node name
47   * must be used for each procedure (though there is no conflict between the two procedure as long
48   * as they have distinct names).
49   * <p>
50   * There is no real error recovery with this mechanism currently -- if any the coordinator fails,
51   * its re-initialization will delete the znodes and require all in progress subprocedures to start
52   * anew.
53   */
54  @InterfaceAudience.Private
55  public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
56    private static final Log LOG = LogFactory.getLog(ZKProcedureMemberRpcs.class);
57  
58    private final ZKProcedureUtil zkController;
59  
60    protected ProcedureMember member;
61    private String memberName;
62  
63    /**
64     * Must call {@link #start(String, ProcedureMember)} before this can be used.
65     * @param watcher {@link ZooKeeperWatcher} to be owned by <tt>this</tt>. Closed via
66     *          {@link #close()}.
67     * @param procType name of the znode describing the procedure type
68     * @throws KeeperException if we can't reach zookeeper
69     */
70    public ZKProcedureMemberRpcs(final ZooKeeperWatcher watcher, final String procType)
71        throws IOException {
72      try {
73        this.zkController = new ZKProcedureUtil(watcher, procType) {
74          @Override
75          public void nodeCreated(String path) {
76            if (!isInProcedurePath(path)) {
77              return;
78            }
79  
80            LOG.info("Received created event:" + path);
81            // if it is a simple start/end/abort then we just rewatch the node
82            if (isAcquiredNode(path)) {
83              waitForNewProcedures();
84              return;
85            } else if (isAbortNode(path)) {
86              watchForAbortedProcedures();
87              return;
88            }
89            String parent = ZKUtil.getParent(path);
90            // if its the end barrier, the procedure can be completed
91            if (isReachedNode(parent)) {
92              receivedReachedGlobalBarrier(path);
93              return;
94            } else if (isAbortNode(parent)) {
95              abort(path);
96              return;
97            } else if (isAcquiredNode(parent)) {
98              startNewSubprocedure(path);
99            } else {
100             LOG.debug("Ignoring created notification for node:" + path);
101           }
102         }
103 
104         @Override
105         public void nodeChildrenChanged(String path) {
106           if (path.equals(this.acquiredZnode)) {
107             LOG.info("Received procedure start children changed event: " + path);
108             waitForNewProcedures();
109           } else if (path.equals(this.abortZnode)) {
110             LOG.info("Received procedure abort children changed event: " + path);
111             watchForAbortedProcedures();
112           }
113         }
114       };
115     } catch (KeeperException e) {
116       throw new IOException(e);
117     }
118   }
119 
120   public ZKProcedureUtil getZkController() {
121     return zkController;
122   }
123 
124   @Override
125   public String getMemberName() {
126     return memberName;
127   }
128 
129   /**
130    * Pass along the procedure global barrier notification to any listeners
131    * @param path full znode path that cause the notification
132    */
133   private void receivedReachedGlobalBarrier(String path) {
134     LOG.debug("Recieved reached global barrier:" + path);
135     String procName = ZKUtil.getNodeName(path);
136     this.member.receivedReachedGlobalBarrier(procName);
137   }
138 
139   private void watchForAbortedProcedures() {
140     LOG.debug("Checking for aborted procedures on node: '" + zkController.getAbortZnode() + "'");
141     try {
142       // this is the list of the currently aborted procedues
143       for (String node : ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(),
144         zkController.getAbortZnode())) {
145         String abortNode = ZKUtil.joinZNode(zkController.getAbortZnode(), node);
146         abort(abortNode);
147       }
148     } catch (KeeperException e) {
149       member.controllerConnectionFailure("Failed to list children for abort node:"
150           + zkController.getAbortZnode(), e, null);
151     }
152   }
153 
154   private void waitForNewProcedures() {
155     // watch for new procedues that we need to start subprocedures for
156     LOG.debug("Looking for new procedures under znode:'" + zkController.getAcquiredBarrier() + "'");
157     List<String> runningProcedures = null;
158     try {
159       runningProcedures = ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(),
160         zkController.getAcquiredBarrier());
161       if (runningProcedures == null) {
162         LOG.debug("No running procedures.");
163         return;
164       }
165     } catch (KeeperException e) {
166       member.controllerConnectionFailure("General failure when watching for new procedures",
167         e, null);
168     }
169     if (runningProcedures == null) {
170       LOG.debug("No running procedures.");
171       return;
172     }
173     for (String procName : runningProcedures) {
174       // then read in the procedure information
175       String path = ZKUtil.joinZNode(zkController.getAcquiredBarrier(), procName);
176       startNewSubprocedure(path);
177     }
178   }
179 
180   /**
181    * Kick off a new sub-procedure on the listener with the data stored in the passed znode.
182    * <p>
183    * Will attempt to create the same procedure multiple times if an procedure znode with the same
184    * name is created. It is left up the coordinator to ensure this doesn't occur.
185    * @param path full path to the znode for the procedure to start
186    */
187   private synchronized void startNewSubprocedure(String path) {
188     LOG.debug("Found procedure znode: " + path);
189     String opName = ZKUtil.getNodeName(path);
190     // start watching for an abort notification for the procedure
191     String abortZNode = zkController.getAbortZNode(opName);
192     try {
193       if (ZKUtil.watchAndCheckExists(zkController.getWatcher(), abortZNode)) {
194         LOG.debug("Not starting:" + opName + " because we already have an abort notification.");
195         return;
196       }
197     } catch (KeeperException e) {
198       member.controllerConnectionFailure("Failed to get the abort znode (" + abortZNode
199           + ") for procedure :" + opName, e, opName);
200       return;
201     }
202 
203     // get the data for the procedure
204     Subprocedure subproc = null;
205     try {
206       byte[] data = ZKUtil.getData(zkController.getWatcher(), path);
207       if (!ProtobufUtil.isPBMagicPrefix(data)) {
208         String msg = "Data in for starting procuedure " + opName +
209           " is illegally formatted (no pb magic). " +
210           "Killing the procedure: " + Bytes.toString(data);
211         LOG.error(msg);
212         throw new IllegalArgumentException(msg);
213       }
214       LOG.debug("start proc data length is " + data.length);
215       data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
216       LOG.debug("Found data for znode:" + path);
217       subproc = member.createSubprocedure(opName, data);
218       member.submitSubprocedure(subproc);
219     } catch (IllegalArgumentException iae ) {
220       LOG.error("Illegal argument exception", iae);
221       sendMemberAborted(subproc, new ForeignException(getMemberName(), iae));
222     } catch (IllegalStateException ise) {
223       LOG.error("Illegal state exception ", ise);
224       sendMemberAborted(subproc, new ForeignException(getMemberName(), ise));
225     } catch (KeeperException e) {
226       member.controllerConnectionFailure("Failed to get data for new procedure:" + opName,
227         e, opName);
228     } catch (InterruptedException e) {
229       member.controllerConnectionFailure("Failed to get data for new procedure:" + opName,
230         e, opName);
231       Thread.currentThread().interrupt();
232     }
233   }
234 
235   /**
236    * This attempts to create an acquired state znode for the procedure (snapshot name).
237    *
238    * It then looks for the reached znode to trigger in-barrier execution.  If not present we
239    * have a watcher, if present then trigger the in-barrier action.
240    */
241   @Override
242   public void sendMemberAcquired(Subprocedure sub) throws IOException {
243     String procName = sub.getName();
244     try {
245       LOG.debug("Member: '" + memberName + "' joining acquired barrier for procedure (" + procName
246           + ") in zk");
247       String acquiredZNode = ZKUtil.joinZNode(ZKProcedureUtil.getAcquireBarrierNode(
248         zkController, procName), memberName);
249       ZKUtil.createAndFailSilent(zkController.getWatcher(), acquiredZNode);
250 
251       // watch for the complete node for this snapshot
252       String reachedBarrier = zkController.getReachedBarrierNode(procName);
253       LOG.debug("Watch for global barrier reached:" + reachedBarrier);
254       if (ZKUtil.watchAndCheckExists(zkController.getWatcher(), reachedBarrier)) {
255         receivedReachedGlobalBarrier(reachedBarrier);
256       }
257     } catch (KeeperException e) {
258       member.controllerConnectionFailure("Failed to acquire barrier for procedure: "
259           + procName + " and member: " + memberName, e, procName);
260     }
261   }
262 
263   /**
264    * This acts as the ack for a completed procedure
265    */
266   @Override
267   public void sendMemberCompleted(Subprocedure sub, byte[] data) throws IOException {
268     String procName = sub.getName();
269     LOG.debug("Marking procedure  '" + procName + "' completed for member '" + memberName
270         + "' in zk");
271     String joinPath = ZKUtil.joinZNode(zkController.getReachedBarrierNode(procName), memberName);
272     // ProtobufUtil.prependPBMagic does not take care of null
273     if (data == null) {
274       data = new byte[0];
275     }
276     try {
277       ZKUtil.createAndFailSilent(zkController.getWatcher(), joinPath,
278         ProtobufUtil.prependPBMagic(data));
279     } catch (KeeperException e) {
280       member.controllerConnectionFailure("Failed to post zk node:" + joinPath
281           + " to join procedure barrier.", e, procName);
282     }
283   }
284 
285   /**
286    * This should be called by the member and should write a serialized root cause exception as
287    * to the abort znode.
288    */
289   @Override
290   public void sendMemberAborted(Subprocedure sub, ForeignException ee) {
291     if (sub == null) {
292       LOG.error("Failed due to null subprocedure", ee);
293       return;
294     }
295     String procName = sub.getName();
296     LOG.debug("Aborting procedure (" + procName + ") in zk");
297     String procAbortZNode = zkController.getAbortZNode(procName);
298     try {
299       String source = (ee.getSource() == null) ? memberName: ee.getSource();
300       byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee));
301       ZKUtil.createAndFailSilent(zkController.getWatcher(), procAbortZNode, errorInfo);
302       LOG.debug("Finished creating abort znode:" + procAbortZNode);
303     } catch (KeeperException e) {
304       // possible that we get this error for the procedure if we already reset the zk state, but in
305       // that case we should still get an error for that procedure anyways
306       zkController.logZKTree(zkController.getBaseZnode());
307       member.controllerConnectionFailure("Failed to post zk node:" + procAbortZNode
308           + " to abort procedure", e, procName);
309     }
310   }
311 
312   /**
313    * Pass along the found abort notification to the listener
314    * @param abortZNode full znode path to the failed procedure information
315    */
316   protected void abort(String abortZNode) {
317     LOG.debug("Aborting procedure member for znode " + abortZNode);
318     String opName = ZKUtil.getNodeName(abortZNode);
319     try {
320       byte[] data = ZKUtil.getData(zkController.getWatcher(), abortZNode);
321 
322       // figure out the data we need to pass
323       ForeignException ee;
324       try {
325         if (data == null || data.length == 0) {
326           // ignore
327           return;
328         } else if (!ProtobufUtil.isPBMagicPrefix(data)) {
329           String msg = "Illegally formatted data in abort node for proc " + opName
330               + ".  Killing the procedure.";
331           LOG.error(msg);
332           // we got a remote exception, but we can't describe it so just return exn from here
333           ee = new ForeignException(getMemberName(), new IllegalArgumentException(msg));
334         } else {
335           data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
336           ee = ForeignException.deserialize(data);
337         }
338       } catch (InvalidProtocolBufferException e) {
339         LOG.warn("Got an error notification for op:" + opName
340             + " but we can't read the information. Killing the procedure.");
341         // we got a remote exception, but we can't describe it so just return exn from here
342         ee = new ForeignException(getMemberName(), e);
343       }
344 
345       this.member.receiveAbortProcedure(opName, ee);
346     } catch (KeeperException e) {
347       member.controllerConnectionFailure("Failed to get data for abort znode:" + abortZNode
348           + zkController.getAbortZnode(), e, opName);
349     } catch (InterruptedException e) {
350       LOG.warn("abort already in progress", e);
351       Thread.currentThread().interrupt();
352     }
353   }
354 
355   public void start(final String memberName, final ProcedureMember listener) {
356     LOG.debug("Starting procedure member '" + memberName + "'");
357     this.member = listener;
358     this.memberName = memberName;
359     watchForAbortedProcedures();
360     waitForNewProcedures();
361   }
362 
363   @Override
364   public void close() throws IOException {
365     zkController.close();
366   }
367 
368 }