1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.procedure;
19
20 import java.io.IOException;
21 import java.util.Arrays;
22 import java.util.List;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.errorhandling.ForeignException;
28 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
29 import org.apache.hadoop.hbase.util.Bytes;
30 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
31 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
32 import org.apache.zookeeper.KeeperException;
33
34 import com.google.protobuf.InvalidProtocolBufferException;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54 @InterfaceAudience.Private
55 public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
56 private static final Log LOG = LogFactory.getLog(ZKProcedureMemberRpcs.class);
57
58 private final ZKProcedureUtil zkController;
59
60 protected ProcedureMember member;
61 private String memberName;
62
63
64
65
66
67
68
69
70 public ZKProcedureMemberRpcs(final ZooKeeperWatcher watcher, final String procType)
71 throws IOException {
72 try {
73 this.zkController = new ZKProcedureUtil(watcher, procType) {
74 @Override
75 public void nodeCreated(String path) {
76 if (!isInProcedurePath(path)) {
77 return;
78 }
79
80 LOG.info("Received created event:" + path);
81
82 if (isAcquiredNode(path)) {
83 waitForNewProcedures();
84 return;
85 } else if (isAbortNode(path)) {
86 watchForAbortedProcedures();
87 return;
88 }
89 String parent = ZKUtil.getParent(path);
90
91 if (isReachedNode(parent)) {
92 receivedReachedGlobalBarrier(path);
93 return;
94 } else if (isAbortNode(parent)) {
95 abort(path);
96 return;
97 } else if (isAcquiredNode(parent)) {
98 startNewSubprocedure(path);
99 } else {
100 LOG.debug("Ignoring created notification for node:" + path);
101 }
102 }
103
104 @Override
105 public void nodeChildrenChanged(String path) {
106 if (path.equals(this.acquiredZnode)) {
107 LOG.info("Received procedure start children changed event: " + path);
108 waitForNewProcedures();
109 } else if (path.equals(this.abortZnode)) {
110 LOG.info("Received procedure abort children changed event: " + path);
111 watchForAbortedProcedures();
112 }
113 }
114 };
115 } catch (KeeperException e) {
116 throw new IOException(e);
117 }
118 }
119
120 public ZKProcedureUtil getZkController() {
121 return zkController;
122 }
123
124 @Override
125 public String getMemberName() {
126 return memberName;
127 }
128
129
130
131
132
133 private void receivedReachedGlobalBarrier(String path) {
134 LOG.debug("Recieved reached global barrier:" + path);
135 String procName = ZKUtil.getNodeName(path);
136 this.member.receivedReachedGlobalBarrier(procName);
137 }
138
139 private void watchForAbortedProcedures() {
140 LOG.debug("Checking for aborted procedures on node: '" + zkController.getAbortZnode() + "'");
141 try {
142
143 for (String node : ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(),
144 zkController.getAbortZnode())) {
145 String abortNode = ZKUtil.joinZNode(zkController.getAbortZnode(), node);
146 abort(abortNode);
147 }
148 } catch (KeeperException e) {
149 member.controllerConnectionFailure("Failed to list children for abort node:"
150 + zkController.getAbortZnode(), e, null);
151 }
152 }
153
154 private void waitForNewProcedures() {
155
156 LOG.debug("Looking for new procedures under znode:'" + zkController.getAcquiredBarrier() + "'");
157 List<String> runningProcedures = null;
158 try {
159 runningProcedures = ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(),
160 zkController.getAcquiredBarrier());
161 if (runningProcedures == null) {
162 LOG.debug("No running procedures.");
163 return;
164 }
165 } catch (KeeperException e) {
166 member.controllerConnectionFailure("General failure when watching for new procedures",
167 e, null);
168 }
169 if (runningProcedures == null) {
170 LOG.debug("No running procedures.");
171 return;
172 }
173 for (String procName : runningProcedures) {
174
175 String path = ZKUtil.joinZNode(zkController.getAcquiredBarrier(), procName);
176 startNewSubprocedure(path);
177 }
178 }
179
180
181
182
183
184
185
186
187 private synchronized void startNewSubprocedure(String path) {
188 LOG.debug("Found procedure znode: " + path);
189 String opName = ZKUtil.getNodeName(path);
190
191 String abortZNode = zkController.getAbortZNode(opName);
192 try {
193 if (ZKUtil.watchAndCheckExists(zkController.getWatcher(), abortZNode)) {
194 LOG.debug("Not starting:" + opName + " because we already have an abort notification.");
195 return;
196 }
197 } catch (KeeperException e) {
198 member.controllerConnectionFailure("Failed to get the abort znode (" + abortZNode
199 + ") for procedure :" + opName, e, opName);
200 return;
201 }
202
203
204 Subprocedure subproc = null;
205 try {
206 byte[] data = ZKUtil.getData(zkController.getWatcher(), path);
207 if (!ProtobufUtil.isPBMagicPrefix(data)) {
208 String msg = "Data in for starting procuedure " + opName +
209 " is illegally formatted (no pb magic). " +
210 "Killing the procedure: " + Bytes.toString(data);
211 LOG.error(msg);
212 throw new IllegalArgumentException(msg);
213 }
214 LOG.debug("start proc data length is " + data.length);
215 data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
216 LOG.debug("Found data for znode:" + path);
217 subproc = member.createSubprocedure(opName, data);
218 member.submitSubprocedure(subproc);
219 } catch (IllegalArgumentException iae ) {
220 LOG.error("Illegal argument exception", iae);
221 sendMemberAborted(subproc, new ForeignException(getMemberName(), iae));
222 } catch (IllegalStateException ise) {
223 LOG.error("Illegal state exception ", ise);
224 sendMemberAborted(subproc, new ForeignException(getMemberName(), ise));
225 } catch (KeeperException e) {
226 member.controllerConnectionFailure("Failed to get data for new procedure:" + opName,
227 e, opName);
228 } catch (InterruptedException e) {
229 member.controllerConnectionFailure("Failed to get data for new procedure:" + opName,
230 e, opName);
231 Thread.currentThread().interrupt();
232 }
233 }
234
235
236
237
238
239
240
241 @Override
242 public void sendMemberAcquired(Subprocedure sub) throws IOException {
243 String procName = sub.getName();
244 try {
245 LOG.debug("Member: '" + memberName + "' joining acquired barrier for procedure (" + procName
246 + ") in zk");
247 String acquiredZNode = ZKUtil.joinZNode(ZKProcedureUtil.getAcquireBarrierNode(
248 zkController, procName), memberName);
249 ZKUtil.createAndFailSilent(zkController.getWatcher(), acquiredZNode);
250
251
252 String reachedBarrier = zkController.getReachedBarrierNode(procName);
253 LOG.debug("Watch for global barrier reached:" + reachedBarrier);
254 if (ZKUtil.watchAndCheckExists(zkController.getWatcher(), reachedBarrier)) {
255 receivedReachedGlobalBarrier(reachedBarrier);
256 }
257 } catch (KeeperException e) {
258 member.controllerConnectionFailure("Failed to acquire barrier for procedure: "
259 + procName + " and member: " + memberName, e, procName);
260 }
261 }
262
263
264
265
266 @Override
267 public void sendMemberCompleted(Subprocedure sub, byte[] data) throws IOException {
268 String procName = sub.getName();
269 LOG.debug("Marking procedure '" + procName + "' completed for member '" + memberName
270 + "' in zk");
271 String joinPath = ZKUtil.joinZNode(zkController.getReachedBarrierNode(procName), memberName);
272
273 if (data == null) {
274 data = new byte[0];
275 }
276 try {
277 ZKUtil.createAndFailSilent(zkController.getWatcher(), joinPath,
278 ProtobufUtil.prependPBMagic(data));
279 } catch (KeeperException e) {
280 member.controllerConnectionFailure("Failed to post zk node:" + joinPath
281 + " to join procedure barrier.", e, procName);
282 }
283 }
284
285
286
287
288
289 @Override
290 public void sendMemberAborted(Subprocedure sub, ForeignException ee) {
291 if (sub == null) {
292 LOG.error("Failed due to null subprocedure", ee);
293 return;
294 }
295 String procName = sub.getName();
296 LOG.debug("Aborting procedure (" + procName + ") in zk");
297 String procAbortZNode = zkController.getAbortZNode(procName);
298 try {
299 String source = (ee.getSource() == null) ? memberName: ee.getSource();
300 byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee));
301 ZKUtil.createAndFailSilent(zkController.getWatcher(), procAbortZNode, errorInfo);
302 LOG.debug("Finished creating abort znode:" + procAbortZNode);
303 } catch (KeeperException e) {
304
305
306 zkController.logZKTree(zkController.getBaseZnode());
307 member.controllerConnectionFailure("Failed to post zk node:" + procAbortZNode
308 + " to abort procedure", e, procName);
309 }
310 }
311
312
313
314
315
316 protected void abort(String abortZNode) {
317 LOG.debug("Aborting procedure member for znode " + abortZNode);
318 String opName = ZKUtil.getNodeName(abortZNode);
319 try {
320 byte[] data = ZKUtil.getData(zkController.getWatcher(), abortZNode);
321
322
323 ForeignException ee;
324 try {
325 if (data == null || data.length == 0) {
326
327 return;
328 } else if (!ProtobufUtil.isPBMagicPrefix(data)) {
329 String msg = "Illegally formatted data in abort node for proc " + opName
330 + ". Killing the procedure.";
331 LOG.error(msg);
332
333 ee = new ForeignException(getMemberName(), new IllegalArgumentException(msg));
334 } else {
335 data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
336 ee = ForeignException.deserialize(data);
337 }
338 } catch (InvalidProtocolBufferException e) {
339 LOG.warn("Got an error notification for op:" + opName
340 + " but we can't read the information. Killing the procedure.");
341
342 ee = new ForeignException(getMemberName(), e);
343 }
344
345 this.member.receiveAbortProcedure(opName, ee);
346 } catch (KeeperException e) {
347 member.controllerConnectionFailure("Failed to get data for abort znode:" + abortZNode
348 + zkController.getAbortZnode(), e, opName);
349 } catch (InterruptedException e) {
350 LOG.warn("abort already in progress", e);
351 Thread.currentThread().interrupt();
352 }
353 }
354
355 public void start(final String memberName, final ProcedureMember listener) {
356 LOG.debug("Starting procedure member '" + memberName + "'");
357 this.member = listener;
358 this.memberName = memberName;
359 watchForAbortedProcedures();
360 waitForNewProcedures();
361 }
362
363 @Override
364 public void close() throws IOException {
365 zkController.close();
366 }
367
368 }