1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.Collections;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.HashSet;
31 import java.util.TreeSet;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.atomic.AtomicLong;
35 import java.util.concurrent.locks.ReentrantLock;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.CopyOnWriteArrayList;
38 import java.util.concurrent.TimeUnit;
39
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42 import org.apache.hadoop.conf.Configuration;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.ProcedureInfo;
45 import org.apache.hadoop.hbase.classification.InterfaceAudience;
46 import org.apache.hadoop.hbase.classification.InterfaceStability;
47 import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
48 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
49 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
50 import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue;
51 import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever;
52 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
53 import org.apache.hadoop.hbase.security.User;
54 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
55 import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
56 import org.apache.hadoop.hbase.util.NonceKey;
57 import org.apache.hadoop.hbase.util.Pair;
58 import org.apache.hadoop.hbase.util.Threads;
59
60 import com.google.common.base.Preconditions;
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75 @InterfaceAudience.Private
76 @InterfaceStability.Evolving
77 public class ProcedureExecutor<TEnvironment> {
78 private static final Log LOG = LogFactory.getLog(ProcedureExecutor.class);
79
80 Testing testing = null;
81 public static class Testing {
82 protected boolean killBeforeStoreUpdate = false;
83 protected boolean toggleKillBeforeStoreUpdate = false;
84
85 protected boolean shouldKillBeforeStoreUpdate() {
86 final boolean kill = this.killBeforeStoreUpdate;
87 if (this.toggleKillBeforeStoreUpdate) {
88 this.killBeforeStoreUpdate = !kill;
89 LOG.warn("Toggle Kill before store update to: " + this.killBeforeStoreUpdate);
90 }
91 return kill;
92 }
93 }
94
95 public interface ProcedureExecutorListener {
96 void procedureLoaded(long procId);
97 void procedureAdded(long procId);
98 void procedureFinished(long procId);
99 }
100
101
102
103
104 private static class ProcedureTimeoutRetriever implements TimeoutRetriever<Procedure> {
105 @Override
106 public long getTimeout(Procedure proc) {
107 return proc.getTimeRemaining();
108 }
109
110 @Override
111 public TimeUnit getTimeUnit(Procedure proc) {
112 return TimeUnit.MILLISECONDS;
113 }
114 }
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131 private static class CompletedProcedureCleaner<TEnvironment> extends Procedure<TEnvironment> {
132 private static final Log LOG = LogFactory.getLog(CompletedProcedureCleaner.class);
133
134 private static final String CLEANER_INTERVAL_CONF_KEY = "hbase.procedure.cleaner.interval";
135 private static final int DEFAULT_CLEANER_INTERVAL = 30 * 1000;
136
137 private static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl";
138 private static final int DEFAULT_EVICT_TTL = 15 * 60000;
139
140 private static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl";
141 private static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000;
142
143 private final Map<Long, ProcedureInfo> completed;
144 private final Map<NonceKey, Long> nonceKeysToProcIdsMap;
145 private final ProcedureStore store;
146 private final Configuration conf;
147
148 public CompletedProcedureCleaner(final Configuration conf, final ProcedureStore store,
149 final Map<Long, ProcedureInfo> completedMap,
150 final Map<NonceKey, Long> nonceKeysToProcIdsMap) {
151
152 setTimeout(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
153 this.completed = completedMap;
154 this.nonceKeysToProcIdsMap = nonceKeysToProcIdsMap;
155 this.store = store;
156 this.conf = conf;
157 }
158
159 public void periodicExecute(final TEnvironment env) {
160 if (completed.isEmpty()) {
161 if (LOG.isDebugEnabled()) {
162 LOG.debug("No completed procedures to cleanup.");
163 }
164 return;
165 }
166
167 final long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
168 final long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
169
170 long now = EnvironmentEdgeManager.currentTime();
171 Iterator<Map.Entry<Long, ProcedureInfo>> it = completed.entrySet().iterator();
172 while (it.hasNext() && store.isRunning()) {
173 Map.Entry<Long, ProcedureInfo> entry = it.next();
174 ProcedureInfo result = entry.getValue();
175
176
177 if ((result.hasClientAckTime() && (now - result.getClientAckTime()) >= evictAckTtl) ||
178 (now - result.getLastUpdate()) >= evictTtl) {
179 if (LOG.isDebugEnabled()) {
180 LOG.debug("Evict completed procedure " + entry.getKey());
181 }
182 store.delete(entry.getKey());
183 it.remove();
184
185 NonceKey nonceKey = result.getNonceKey();
186 if (nonceKey != null) {
187 nonceKeysToProcIdsMap.remove(nonceKey);
188 }
189 }
190 }
191 }
192
193 @Override
194 protected Procedure[] execute(final TEnvironment env) {
195 throw new UnsupportedOperationException();
196 }
197
198 @Override
199 protected void rollback(final TEnvironment env) {
200 throw new UnsupportedOperationException();
201 }
202
203 @Override
204 protected boolean abort(final TEnvironment env) {
205 throw new UnsupportedOperationException();
206 }
207
208 @Override
209 public void serializeStateData(final OutputStream stream) {
210 throw new UnsupportedOperationException();
211 }
212
213 @Override
214 public void deserializeStateData(final InputStream stream) {
215 throw new UnsupportedOperationException();
216 }
217 }
218
219
220
221
222
223
224 private final ConcurrentHashMap<Long, ProcedureInfo> completed =
225 new ConcurrentHashMap<Long, ProcedureInfo>();
226
227
228
229
230
231
232 private final ConcurrentHashMap<Long, RootProcedureState> rollbackStack =
233 new ConcurrentHashMap<Long, RootProcedureState>();
234
235
236
237
238
239 private final ConcurrentHashMap<Long, Procedure> procedures =
240 new ConcurrentHashMap<Long, Procedure>();
241
242
243
244
245
246 private ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap =
247 new ConcurrentHashMap<NonceKey, Long>();
248
249
250
251
252
253 private final TimeoutBlockingQueue<Procedure> waitingTimeout =
254 new TimeoutBlockingQueue<Procedure>(new ProcedureTimeoutRetriever());
255
256
257
258
259 private final ProcedureRunnableSet runnables;
260
261
262 private final ReentrantLock submitLock = new ReentrantLock();
263 private final AtomicLong lastProcId = new AtomicLong(-1);
264
265 private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners =
266 new CopyOnWriteArrayList<ProcedureExecutorListener>();
267
268 private final AtomicInteger activeExecutorCount = new AtomicInteger(0);
269 private final AtomicBoolean running = new AtomicBoolean(false);
270 private final TEnvironment environment;
271 private final ProcedureStore store;
272 private final Configuration conf;
273
274 private Thread[] threads;
275
276 public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
277 final ProcedureStore store) {
278 this(conf, environment, store, new ProcedureSimpleRunQueue());
279 }
280
281 public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
282 final ProcedureStore store, final ProcedureRunnableSet runqueue) {
283 this.environment = environment;
284 this.runnables = runqueue;
285 this.store = store;
286 this.conf = conf;
287 }
288
289 private List<Map.Entry<Long, RootProcedureState>> load() throws IOException {
290 Preconditions.checkArgument(completed.isEmpty());
291 Preconditions.checkArgument(rollbackStack.isEmpty());
292 Preconditions.checkArgument(procedures.isEmpty());
293 Preconditions.checkArgument(waitingTimeout.isEmpty());
294 Preconditions.checkArgument(runnables.size() == 0);
295
296
297 Iterator<Procedure> loader = store.load();
298 if (loader == null) {
299 lastProcId.set(0);
300 return null;
301 }
302
303 long logMaxProcId = 0;
304 int runnablesCount = 0;
305 while (loader.hasNext()) {
306 Procedure proc = loader.next();
307 proc.beforeReplay(getEnvironment());
308 procedures.put(proc.getProcId(), proc);
309 logMaxProcId = Math.max(logMaxProcId, proc.getProcId());
310 if (LOG.isDebugEnabled()) {
311 LOG.debug("Loading procedure state=" + proc.getState() +
312 " isFailed=" + proc.hasException() + ": " + proc);
313 }
314 if (!proc.hasParent() && !proc.isFinished()) {
315 rollbackStack.put(proc.getProcId(), new RootProcedureState());
316 }
317
318
319 if (proc.getNonceKey() != null) {
320 nonceKeysToProcIdsMap.put(proc.getNonceKey(), proc.getProcId());
321 }
322
323 if (proc.getState() == ProcedureState.RUNNABLE) {
324 runnablesCount++;
325 }
326 }
327 assert lastProcId.get() < 0;
328 lastProcId.set(logMaxProcId);
329
330
331 TreeSet<Procedure> runnableSet = null;
332 HashSet<Procedure> waitingSet = null;
333 for (final Procedure proc: procedures.values()) {
334 Long rootProcId = getRootProcedureId(proc);
335 if (rootProcId == null) {
336
337 runnables.addBack(proc);
338 continue;
339 }
340
341 if (!proc.hasParent() && proc.isFinished()) {
342 if (LOG.isDebugEnabled()) {
343 LOG.debug("The procedure is completed state=" + proc.getState() +
344 " isFailed=" + proc.hasException() + ": " + proc);
345 }
346 assert !rollbackStack.containsKey(proc.getProcId());
347
348 completed.put(proc.getProcId(), Procedure.createProcedureInfo(proc, proc.getNonceKey()));
349
350 continue;
351 }
352
353 if (proc.hasParent() && !proc.isFinished()) {
354 Procedure parent = procedures.get(proc.getParentProcId());
355
356 if (parent != null) {
357 parent.incChildrenLatch();
358 }
359 }
360
361 RootProcedureState procStack = rollbackStack.get(rootProcId);
362 procStack.loadStack(proc);
363
364 switch (proc.getState()) {
365 case RUNNABLE:
366 if (runnableSet == null) {
367 runnableSet = new TreeSet<Procedure>();
368 }
369 runnableSet.add(proc);
370 break;
371 case WAITING_TIMEOUT:
372 if (waitingSet == null) {
373 waitingSet = new HashSet<Procedure>();
374 }
375 waitingSet.add(proc);
376 break;
377 case FINISHED:
378 if (proc.hasException()) {
379
380 runnables.addBack(proc);
381 break;
382 }
383 case ROLLEDBACK:
384 case INITIALIZING:
385 String msg = "Unexpected " + proc.getState() + " state for " + proc;
386 LOG.error(msg);
387 throw new UnsupportedOperationException(msg);
388 default:
389 break;
390 }
391 }
392
393
394 List<Map.Entry<Long, RootProcedureState>> corrupted = null;
395 Iterator<Map.Entry<Long, RootProcedureState>> itStack = rollbackStack.entrySet().iterator();
396 while (itStack.hasNext()) {
397 Map.Entry<Long, RootProcedureState> entry = itStack.next();
398 RootProcedureState procStack = entry.getValue();
399 if (procStack.isValid()) continue;
400
401 for (Procedure proc: procStack.getSubprocedures()) {
402 procedures.remove(proc.getProcId());
403 if (runnableSet != null) runnableSet.remove(proc);
404 if (waitingSet != null) waitingSet.remove(proc);
405 }
406 itStack.remove();
407 if (corrupted == null) {
408 corrupted = new ArrayList<Map.Entry<Long, RootProcedureState>>();
409 }
410 corrupted.add(entry);
411 }
412
413
414 if (runnableSet != null) {
415
416
417 for (Procedure proc: runnableSet) {
418 if (!proc.hasParent()) {
419 sendProcedureLoadedNotification(proc.getProcId());
420 }
421 runnables.addBack(proc);
422 }
423 }
424 return corrupted;
425 }
426
427 public void start(int numThreads) throws IOException {
428 if (running.getAndSet(true)) {
429 LOG.warn("Already running");
430 return;
431 }
432
433
434
435 threads = new Thread[numThreads + 1];
436 LOG.info("Starting procedure executor threads=" + threads.length);
437
438
439 for (int i = 0; i < numThreads; ++i) {
440 threads[i] = new Thread("ProcedureExecutorThread-" + i) {
441 @Override
442 public void run() {
443 execLoop();
444 }
445 };
446 }
447
448
449 threads[numThreads] = new Thread("ProcedureExecutorTimeout") {
450 @Override
451 public void run() {
452 timeoutLoop();
453 }
454 };
455
456
457 store.recoverLease();
458
459
460
461
462
463
464 load();
465
466
467 for (int i = 0; i < threads.length; ++i) {
468 threads[i].start();
469 }
470
471
472 waitingTimeout.add(
473 new CompletedProcedureCleaner(conf, store, completed, nonceKeysToProcIdsMap));
474 }
475
476 public void stop() {
477 if (!running.getAndSet(false)) {
478 return;
479 }
480
481 LOG.info("Stopping the procedure executor");
482 runnables.signalAll();
483 waitingTimeout.signalAll();
484 }
485
486 public void join() {
487 boolean interrupted = false;
488
489 for (int i = 0; i < threads.length; ++i) {
490 try {
491 threads[i].join();
492 } catch (InterruptedException ex) {
493 interrupted = true;
494 }
495 }
496
497 if (interrupted) {
498 Thread.currentThread().interrupt();
499 }
500
501 completed.clear();
502 rollbackStack.clear();
503 procedures.clear();
504 nonceKeysToProcIdsMap.clear();
505 waitingTimeout.clear();
506 runnables.clear();
507 lastProcId.set(-1);
508 }
509
510 public boolean isRunning() {
511 return running.get();
512 }
513
514
515
516
517 public int getNumThreads() {
518 return threads == null ? 0 : (threads.length - 1);
519 }
520
521 public int getActiveExecutorCount() {
522 return activeExecutorCount.get();
523 }
524
525 public TEnvironment getEnvironment() {
526 return this.environment;
527 }
528
529 public ProcedureStore getStore() {
530 return this.store;
531 }
532
533 public void registerListener(ProcedureExecutorListener listener) {
534 this.listeners.add(listener);
535 }
536
537 public boolean unregisterListener(ProcedureExecutorListener listener) {
538 return this.listeners.remove(listener);
539 }
540
541
542
543
544
545 public List<ProcedureInfo> listProcedures() {
546 List<ProcedureInfo> procedureLists =
547 new ArrayList<ProcedureInfo>(procedures.size() + completed.size());
548 for (java.util.Map.Entry<Long, Procedure> p: procedures.entrySet()) {
549 procedureLists.add(Procedure.createProcedureInfo(p.getValue(), null));
550 }
551 for (java.util.Map.Entry<Long, ProcedureInfo> e: completed.entrySet()) {
552
553
554
555
556 procedureLists.add(e.getValue());
557 }
558 return procedureLists;
559 }
560
561
562
563
564
565
566
567
568
569
570 public NonceKey createNonceKey(final long nonceGroup, final long nonce) {
571 return (nonce == HConstants.NO_NONCE) ? null : new NonceKey(nonceGroup, nonce);
572 }
573
574
575
576
577
578
579
580
581
582
583
584
585 public long registerNonce(final NonceKey nonceKey) {
586 if (nonceKey == null) return -1;
587
588
589 Long oldProcId = nonceKeysToProcIdsMap.get(nonceKey);
590 if (oldProcId == null) {
591
592
593 final long newProcId = nextProcId();
594 oldProcId = nonceKeysToProcIdsMap.putIfAbsent(nonceKey, newProcId);
595 if (oldProcId == null) return -1;
596 }
597
598
599
600 final boolean isTraceEnabled = LOG.isTraceEnabled();
601 while (isRunning() &&
602 !(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId)) &&
603 nonceKeysToProcIdsMap.containsKey(nonceKey)) {
604 if (isTraceEnabled) {
605 LOG.trace("waiting for procId=" + oldProcId.longValue() + " to be submitted");
606 }
607 Threads.sleep(100);
608 }
609 return oldProcId.longValue();
610 }
611
612
613
614
615
616 public void unregisterNonceIfProcedureWasNotSubmitted(final NonceKey nonceKey) {
617 if (nonceKey == null) return;
618
619 final Long procId = nonceKeysToProcIdsMap.get(nonceKey);
620 if (procId == null) return;
621
622
623 if (!(procedures.containsKey(procId) || completed.containsKey(procId))) {
624 nonceKeysToProcIdsMap.remove(nonceKey);
625 }
626 }
627
628
629
630
631
632
633
634
635
636
637 public void setFailureResultForNonce(final NonceKey nonceKey, final String procName,
638 final User procOwner, final IOException exception) {
639 if (nonceKey == null) return;
640
641 final Long procId = nonceKeysToProcIdsMap.get(nonceKey);
642 if (procId == null || completed.containsKey(procId)) return;
643
644 final long currentTime = EnvironmentEdgeManager.currentTime();
645 final ProcedureInfo result = new ProcedureInfo(
646 procId.longValue(),
647 procName,
648 procOwner != null ? procOwner.getShortName() : null,
649 ProcedureState.ROLLEDBACK,
650 -1,
651 nonceKey,
652 ForeignExceptionUtil.toProtoForeignException("ProcedureExecutor", exception),
653 currentTime,
654 currentTime,
655 null);
656 completed.putIfAbsent(procId, result);
657 }
658
659
660
661
662
663
664
665
666
667 public long submitProcedure(final Procedure proc) {
668 return submitProcedure(proc, null);
669 }
670
671
672
673
674
675
676
677 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
678 justification = "FindBugs is blind to the check-for-null")
679 public long submitProcedure(final Procedure proc, final NonceKey nonceKey) {
680 Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
681 Preconditions.checkArgument(isRunning(), "executor not running");
682 Preconditions.checkArgument(lastProcId.get() >= 0);
683 Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc);
684
685 final Long currentProcId;
686 if (nonceKey != null) {
687 currentProcId = nonceKeysToProcIdsMap.get(nonceKey);
688 Preconditions.checkArgument(currentProcId != null,
689 "expected nonceKey=" + nonceKey + " to be reserved, use registerNonce()");
690 } else {
691 currentProcId = nextProcId();
692 }
693
694
695 proc.setNonceKey(nonceKey);
696 proc.setProcId(currentProcId.longValue());
697
698
699 store.insert(proc, null);
700 if (LOG.isDebugEnabled()) {
701 LOG.debug("Procedure " + proc + " added to the store.");
702 }
703
704
705 RootProcedureState stack = new RootProcedureState();
706 rollbackStack.put(currentProcId, stack);
707
708
709 assert !procedures.containsKey(currentProcId);
710 procedures.put(currentProcId, proc);
711 sendProcedureAddedNotification(currentProcId);
712 runnables.addBack(proc);
713 return currentProcId;
714 }
715
716 public ProcedureInfo getResult(final long procId) {
717 return completed.get(procId);
718 }
719
720
721
722
723
724
725
726
727 public boolean isFinished(final long procId) {
728 return completed.containsKey(procId);
729 }
730
731
732
733
734
735
736 public boolean isStarted(final long procId) {
737 Procedure proc = procedures.get(procId);
738 if (proc == null) {
739 return completed.get(procId) != null;
740 }
741 return proc.wasExecuted();
742 }
743
744
745
746
747
748 public void removeResult(final long procId) {
749 ProcedureInfo result = completed.get(procId);
750 if (result == null) {
751 assert !procedures.containsKey(procId) : "procId=" + procId + " is still running";
752 if (LOG.isDebugEnabled()) {
753 LOG.debug("Procedure procId=" + procId + " already removed by the cleaner.");
754 }
755 return;
756 }
757
758
759 result.setClientAckTime(EnvironmentEdgeManager.currentTime());
760 }
761
762
763
764
765
766
767
768 public boolean abort(final long procId) {
769 return abort(procId, true);
770 }
771
772
773
774
775
776
777
778
779 public boolean abort(final long procId, final boolean mayInterruptIfRunning) {
780 Procedure proc = procedures.get(procId);
781 if (proc != null) {
782 if (!mayInterruptIfRunning && proc.wasExecuted()) {
783 return false;
784 } else {
785 return proc.abort(getEnvironment());
786 }
787 }
788 return false;
789 }
790
791
792
793
794
795
796
797
798 public boolean isProcedureOwner(final long procId, final User user) {
799 if (user == null) {
800 return false;
801 }
802
803 Procedure proc = procedures.get(procId);
804 if (proc != null) {
805 return proc.getOwner().equals(user.getShortName());
806 }
807 ProcedureInfo procInfo = completed.get(procId);
808 if (procInfo == null) {
809
810
811 return false;
812 }
813 return ProcedureInfo.isProcedureOwner(procInfo, user);
814 }
815
816 public Map<Long, ProcedureInfo> getResults() {
817 return Collections.unmodifiableMap(completed);
818 }
819
820 public Procedure getProcedure(final long procId) {
821 return procedures.get(procId);
822 }
823
824 protected ProcedureRunnableSet getRunnableSet() {
825 return runnables;
826 }
827
828
829
830
831
832
833 private void execLoop() {
834 while (isRunning()) {
835 Long procId = runnables.poll();
836 Procedure proc = procId != null ? procedures.get(procId) : null;
837 if (proc == null) continue;
838
839 try {
840 activeExecutorCount.incrementAndGet();
841 execLoop(proc);
842 } finally {
843 activeExecutorCount.decrementAndGet();
844 }
845 }
846 }
847
848 private void execLoop(Procedure proc) {
849 if (LOG.isTraceEnabled()) {
850 LOG.trace("Trying to start the execution of " + proc);
851 }
852
853 Long rootProcId = getRootProcedureId(proc);
854 if (rootProcId == null) {
855
856 executeRollback(proc);
857 return;
858 }
859
860 RootProcedureState procStack = rollbackStack.get(rootProcId);
861 if (procStack == null) return;
862
863 do {
864
865 if (!procStack.acquire(proc)) {
866 if (procStack.setRollback()) {
867
868 if (!executeRollback(rootProcId, procStack)) {
869 procStack.unsetRollback();
870 runnables.yield(proc);
871 }
872 } else {
873
874
875
876 if (!proc.wasExecuted()) {
877 if (!executeRollback(proc)) {
878 runnables.yield(proc);
879 }
880 }
881 }
882 break;
883 }
884
885
886 assert proc.getState() == ProcedureState.RUNNABLE;
887 if (proc.acquireLock(getEnvironment())) {
888 execProcedure(procStack, proc);
889 proc.releaseLock(getEnvironment());
890 } else {
891 runnables.yield(proc);
892 }
893 procStack.release(proc);
894
895
896
897 if (testing != null && !isRunning()) {
898 break;
899 }
900
901 if (proc.isSuccess()) {
902 if (LOG.isDebugEnabled()) {
903 LOG.debug("Procedure completed in " +
904 StringUtils.humanTimeDiff(proc.elapsedTime()) + ": " + proc);
905 }
906
907 if (proc.getProcId() == rootProcId) {
908 procedureFinished(proc);
909 }
910 break;
911 }
912 } while (procStack.isFailed());
913 }
914
915 private void timeoutLoop() {
916 while (isRunning()) {
917 Procedure proc = waitingTimeout.poll();
918 if (proc == null) continue;
919
920 if (proc.getTimeRemaining() > 100) {
921
922
923 waitingTimeout.add(proc);
924 continue;
925 }
926
927
928
929
930
931
932
933
934
935
936
937
938 if (proc instanceof CompletedProcedureCleaner) {
939 try {
940 ((CompletedProcedureCleaner)proc).periodicExecute(getEnvironment());
941 } catch (Throwable e) {
942 LOG.error("Ignoring CompletedProcedureCleaner exception: " + e.getMessage(), e);
943 }
944 proc.setStartTime(EnvironmentEdgeManager.currentTime());
945 waitingTimeout.add(proc);
946 continue;
947 }
948
949
950
951 if (proc.setTimeoutFailure()) {
952 long rootProcId = Procedure.getRootProcedureId(procedures, proc);
953 RootProcedureState procStack = rollbackStack.get(rootProcId);
954 procStack.abort();
955 store.update(proc);
956 runnables.addFront(proc);
957 continue;
958 }
959 }
960 }
961
962
963
964
965
966
967 private boolean executeRollback(final long rootProcId, final RootProcedureState procStack) {
968 Procedure rootProc = procedures.get(rootProcId);
969 RemoteProcedureException exception = rootProc.getException();
970 if (exception == null) {
971 exception = procStack.getException();
972 rootProc.setFailure(exception);
973 store.update(rootProc);
974 }
975
976 List<Procedure> subprocStack = procStack.getSubprocedures();
977 assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc;
978
979 int stackTail = subprocStack.size();
980 boolean reuseLock = false;
981 while (stackTail --> 0) {
982 final Procedure proc = subprocStack.get(stackTail);
983
984 if (!reuseLock && !proc.acquireLock(getEnvironment())) {
985
986
987 return false;
988 }
989
990 boolean abortRollback = !executeRollback(proc);
991 abortRollback |= !isRunning() || !store.isRunning();
992
993
994
995
996 reuseLock = stackTail > 0 && (subprocStack.get(stackTail - 1) == proc) && !abortRollback;
997 if (!reuseLock) {
998 proc.releaseLock(getEnvironment());
999 }
1000
1001
1002
1003 if (abortRollback) {
1004 return false;
1005 }
1006
1007 subprocStack.remove(stackTail);
1008 }
1009
1010
1011 LOG.info("Rolledback procedure " + rootProc +
1012 " exec-time=" + StringUtils.humanTimeDiff(rootProc.elapsedTime()) +
1013 " exception=" + exception.getMessage());
1014 procedureFinished(rootProc);
1015 return true;
1016 }
1017
1018
1019
1020
1021
1022
1023 private boolean executeRollback(final Procedure proc) {
1024 try {
1025 proc.doRollback(getEnvironment());
1026 } catch (IOException e) {
1027 if (LOG.isDebugEnabled()) {
1028 LOG.debug("rollback attempt failed for " + proc, e);
1029 }
1030 return false;
1031 } catch (Throwable e) {
1032
1033 LOG.fatal("CODE-BUG: Uncatched runtime exception for procedure: " + proc, e);
1034 }
1035
1036
1037
1038 if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
1039 if (LOG.isDebugEnabled()) {
1040 LOG.debug("TESTING: Kill before store update");
1041 }
1042 stop();
1043 return false;
1044 }
1045
1046 if (proc.removeStackIndex()) {
1047 proc.setState(ProcedureState.ROLLEDBACK);
1048 if (proc.hasParent()) {
1049 store.delete(proc.getProcId());
1050 procedures.remove(proc.getProcId());
1051 } else {
1052 store.update(proc);
1053 }
1054 } else {
1055 store.update(proc);
1056 }
1057 return true;
1058 }
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077 private void execProcedure(final RootProcedureState procStack, final Procedure procedure) {
1078 Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE);
1079
1080
1081 boolean reExecute = false;
1082 Procedure[] subprocs = null;
1083 do {
1084 reExecute = false;
1085 try {
1086 subprocs = procedure.doExecute(getEnvironment());
1087 if (subprocs != null && subprocs.length == 0) {
1088 subprocs = null;
1089 }
1090 } catch (ProcedureYieldException e) {
1091 if (LOG.isTraceEnabled()) {
1092 LOG.trace("Yield procedure: " + procedure);
1093 }
1094 runnables.yield(procedure);
1095 return;
1096 } catch (Throwable e) {
1097
1098 String msg = "CODE-BUG: Uncatched runtime exception for procedure: " + procedure;
1099 LOG.error(msg, e);
1100 procedure.setFailure(new RemoteProcedureException(msg, e));
1101 }
1102
1103 if (!procedure.isFailed()) {
1104 if (subprocs != null) {
1105 if (subprocs.length == 1 && subprocs[0] == procedure) {
1106
1107 subprocs = null;
1108 reExecute = true;
1109 } else {
1110
1111 for (int i = 0; i < subprocs.length; ++i) {
1112 Procedure subproc = subprocs[i];
1113 if (subproc == null) {
1114 String msg = "subproc[" + i + "] is null, aborting the procedure";
1115 procedure.setFailure(new RemoteProcedureException(msg,
1116 new IllegalArgumentIOException(msg)));
1117 subprocs = null;
1118 break;
1119 }
1120
1121 assert subproc.getState() == ProcedureState.INITIALIZING;
1122 subproc.setParentProcId(procedure.getProcId());
1123 subproc.setProcId(nextProcId());
1124 }
1125
1126 if (!procedure.isFailed()) {
1127 procedure.setChildrenLatch(subprocs.length);
1128 switch (procedure.getState()) {
1129 case RUNNABLE:
1130 procedure.setState(ProcedureState.WAITING);
1131 break;
1132 case WAITING_TIMEOUT:
1133 waitingTimeout.add(procedure);
1134 break;
1135 default:
1136 break;
1137 }
1138 }
1139 }
1140 } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
1141 waitingTimeout.add(procedure);
1142 } else {
1143
1144 procedure.setState(ProcedureState.FINISHED);
1145 }
1146 }
1147
1148
1149 procStack.addRollbackStep(procedure);
1150
1151
1152
1153 if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
1154 if (LOG.isDebugEnabled()) {
1155 LOG.debug("TESTING: Kill before store update");
1156 }
1157 stop();
1158 return;
1159 }
1160
1161
1162 if (subprocs != null && !procedure.isFailed()) {
1163 if (LOG.isTraceEnabled()) {
1164 LOG.trace("Store add " + procedure + " children " + Arrays.toString(subprocs));
1165 }
1166 store.insert(procedure, subprocs);
1167 } else {
1168 if (LOG.isTraceEnabled()) {
1169 LOG.trace("Store update " + procedure);
1170 }
1171 store.update(procedure);
1172 }
1173
1174
1175 if (!store.isRunning()) {
1176 return;
1177 }
1178
1179 assert (reExecute && subprocs == null) || !reExecute;
1180 } while (reExecute);
1181
1182
1183 if (subprocs != null && !procedure.isFailed()) {
1184 for (int i = 0; i < subprocs.length; ++i) {
1185 Procedure subproc = subprocs[i];
1186 assert !procedures.containsKey(subproc.getProcId());
1187 procedures.put(subproc.getProcId(), subproc);
1188 runnables.addFront(subproc);
1189 }
1190 }
1191
1192 if (procedure.isFinished() && procedure.hasParent()) {
1193 Procedure parent = procedures.get(procedure.getParentProcId());
1194 if (parent == null) {
1195 assert procStack.isRollingback();
1196 return;
1197 }
1198
1199
1200 if (LOG.isTraceEnabled()) {
1201 LOG.trace(parent + " child is done: " + procedure);
1202 }
1203 if (parent.childrenCountDown() && parent.getState() == ProcedureState.WAITING) {
1204 parent.setState(ProcedureState.RUNNABLE);
1205 store.update(parent);
1206 runnables.addFront(parent);
1207 if (LOG.isTraceEnabled()) {
1208 LOG.trace(parent + " all the children finished their work, resume.");
1209 }
1210 return;
1211 }
1212 }
1213 }
1214
1215 private void sendProcedureLoadedNotification(final long procId) {
1216 if (!this.listeners.isEmpty()) {
1217 for (ProcedureExecutorListener listener: this.listeners) {
1218 try {
1219 listener.procedureLoaded(procId);
1220 } catch (Throwable e) {
1221 LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
1222 }
1223 }
1224 }
1225 }
1226
1227 private void sendProcedureAddedNotification(final long procId) {
1228 if (!this.listeners.isEmpty()) {
1229 for (ProcedureExecutorListener listener: this.listeners) {
1230 try {
1231 listener.procedureAdded(procId);
1232 } catch (Throwable e) {
1233 LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
1234 }
1235 }
1236 }
1237 }
1238
1239 private void sendProcedureFinishedNotification(final long procId) {
1240 if (!this.listeners.isEmpty()) {
1241 for (ProcedureExecutorListener listener: this.listeners) {
1242 try {
1243 listener.procedureFinished(procId);
1244 } catch (Throwable e) {
1245 LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
1246 }
1247 }
1248 }
1249 }
1250
1251 private long nextProcId() {
1252 long procId = lastProcId.incrementAndGet();
1253 if (procId < 0) {
1254 while (!lastProcId.compareAndSet(procId, 0)) {
1255 procId = lastProcId.get();
1256 if (procId >= 0)
1257 break;
1258 }
1259 while (procedures.containsKey(procId)) {
1260 procId = lastProcId.incrementAndGet();
1261 }
1262 }
1263 return procId;
1264 }
1265
1266 private Long getRootProcedureId(Procedure proc) {
1267 return Procedure.getRootProcedureId(procedures, proc);
1268 }
1269
1270 private void procedureFinished(final Procedure proc) {
1271
1272 try {
1273 proc.completionCleanup(getEnvironment());
1274 } catch (Throwable e) {
1275
1276 LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e);
1277 }
1278
1279
1280 completed.put(proc.getProcId(), Procedure.createProcedureInfo(proc, proc.getNonceKey()));
1281 rollbackStack.remove(proc.getProcId());
1282 procedures.remove(proc.getProcId());
1283
1284
1285 try {
1286 runnables.completionCleanup(proc);
1287 } catch (Throwable e) {
1288
1289 LOG.error("CODE-BUG: uncatched runtime exception for runnableSet: " + runnables, e);
1290 }
1291
1292
1293 sendProcedureFinishedNotification(proc.getProcId());
1294 }
1295
1296 public Pair<ProcedureInfo, Procedure> getResultOrProcedure(final long procId) {
1297 ProcedureInfo result = completed.get(procId);
1298 Procedure proc = null;
1299 if (result == null) {
1300 proc = procedures.get(procId);
1301 if (proc == null) {
1302 result = completed.get(procId);
1303 }
1304 }
1305 return new Pair(result, proc);
1306 }
1307 }