View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.procedure2;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  import java.util.ArrayList;
25  import java.util.Arrays;
26  import java.util.Collections;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.HashSet;
31  import java.util.TreeSet;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  import java.util.concurrent.atomic.AtomicInteger;
34  import java.util.concurrent.atomic.AtomicLong;
35  import java.util.concurrent.locks.ReentrantLock;
36  import java.util.concurrent.ConcurrentHashMap;
37  import java.util.concurrent.CopyOnWriteArrayList;
38  import java.util.concurrent.TimeUnit;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.ProcedureInfo;
45  import org.apache.hadoop.hbase.classification.InterfaceAudience;
46  import org.apache.hadoop.hbase.classification.InterfaceStability;
47  import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
48  import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
49  import org.apache.hadoop.hbase.procedure2.util.StringUtils;
50  import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue;
51  import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever;
52  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
53  import org.apache.hadoop.hbase.security.User;
54  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
55  import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
56  import org.apache.hadoop.hbase.util.NonceKey;
57  import org.apache.hadoop.hbase.util.Pair;
58  import org.apache.hadoop.hbase.util.Threads;
59  
60  import com.google.common.base.Preconditions;
61  
62  /**
63   * Thread Pool that executes the submitted procedures.
64   * The executor has a ProcedureStore associated.
65   * Each operation is logged and on restart the pending procedures are resumed.
66   *
67   * Unless the Procedure code throws an error (e.g. invalid user input)
68   * the procedure will complete (at some point in time), On restart the pending
69   * procedures are resumed and the once failed will be rolledback.
70   *
71   * The user can add procedures to the executor via submitProcedure(proc)
72   * check for the finished state via isFinished(procId)
73   * and get the result via getResult(procId)
74   */
75  @InterfaceAudience.Private
76  @InterfaceStability.Evolving
77  public class ProcedureExecutor<TEnvironment> {
78    private static final Log LOG = LogFactory.getLog(ProcedureExecutor.class);
79  
80    Testing testing = null;
81    public static class Testing {
82      protected boolean killBeforeStoreUpdate = false;
83      protected boolean toggleKillBeforeStoreUpdate = false;
84  
85      protected boolean shouldKillBeforeStoreUpdate() {
86        final boolean kill = this.killBeforeStoreUpdate;
87        if (this.toggleKillBeforeStoreUpdate) {
88          this.killBeforeStoreUpdate = !kill;
89          LOG.warn("Toggle Kill before store update to: " + this.killBeforeStoreUpdate);
90        }
91        return kill;
92      }
93    }
94  
95    public interface ProcedureExecutorListener {
96      void procedureLoaded(long procId);
97      void procedureAdded(long procId);
98      void procedureFinished(long procId);
99    }
100 
101   /**
102    * Used by the TimeoutBlockingQueue to get the timeout interval of the procedure
103    */
104   private static class ProcedureTimeoutRetriever implements TimeoutRetriever<Procedure> {
105     @Override
106     public long getTimeout(Procedure proc) {
107       return proc.getTimeRemaining();
108     }
109 
110     @Override
111     public TimeUnit getTimeUnit(Procedure proc) {
112       return TimeUnit.MILLISECONDS;
113     }
114   }
115 
116   /**
117    * Internal cleaner that removes the completed procedure results after a TTL.
118    * NOTE: This is a special case handled in timeoutLoop().
119    *
120    * Since the client code looks more or less like:
121    *   procId = master.doOperation()
122    *   while (master.getProcResult(procId) == ProcInProgress);
123    * The master should not throw away the proc result as soon as the procedure is done
124    * but should wait a result request from the client (see executor.removeResult(procId))
125    * The client will call something like master.isProcDone() or master.getProcResult()
126    * which will return the result/state to the client, and it will mark the completed
127    * proc as ready to delete. note that the client may not receive the response from
128    * the master (e.g. master failover) so, if we delay a bit the real deletion of
129    * the proc result the client will be able to get the result the next try.
130    */
131   private static class CompletedProcedureCleaner<TEnvironment> extends Procedure<TEnvironment> {
132     private static final Log LOG = LogFactory.getLog(CompletedProcedureCleaner.class);
133 
134     private static final String CLEANER_INTERVAL_CONF_KEY = "hbase.procedure.cleaner.interval";
135     private static final int DEFAULT_CLEANER_INTERVAL = 30 * 1000; // 30sec
136 
137     private static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl";
138     private static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min
139 
140     private static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl";
141     private static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min
142 
143     private final Map<Long, ProcedureInfo> completed;
144     private final Map<NonceKey, Long> nonceKeysToProcIdsMap;
145     private final ProcedureStore store;
146     private final Configuration conf;
147 
148     public CompletedProcedureCleaner(final Configuration conf, final ProcedureStore store,
149         final Map<Long, ProcedureInfo> completedMap,
150         final Map<NonceKey, Long> nonceKeysToProcIdsMap) {
151       // set the timeout interval that triggers the periodic-procedure
152       setTimeout(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
153       this.completed = completedMap;
154       this.nonceKeysToProcIdsMap = nonceKeysToProcIdsMap;
155       this.store = store;
156       this.conf = conf;
157     }
158 
159     public void periodicExecute(final TEnvironment env) {
160       if (completed.isEmpty()) {
161         if (LOG.isDebugEnabled()) {
162           LOG.debug("No completed procedures to cleanup.");
163         }
164         return;
165       }
166 
167       final long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
168       final long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
169 
170       long now = EnvironmentEdgeManager.currentTime();
171       Iterator<Map.Entry<Long, ProcedureInfo>> it = completed.entrySet().iterator();
172       while (it.hasNext() && store.isRunning()) {
173         Map.Entry<Long, ProcedureInfo> entry = it.next();
174         ProcedureInfo result = entry.getValue();
175 
176         // TODO: Select TTL based on Procedure type
177         if ((result.hasClientAckTime() && (now - result.getClientAckTime()) >= evictAckTtl) ||
178             (now - result.getLastUpdate()) >= evictTtl) {
179           if (LOG.isDebugEnabled()) {
180             LOG.debug("Evict completed procedure " + entry.getKey());
181           }
182           store.delete(entry.getKey());
183           it.remove();
184 
185           NonceKey nonceKey = result.getNonceKey();
186           if (nonceKey != null) {
187             nonceKeysToProcIdsMap.remove(nonceKey);
188           }
189         }
190       }
191     }
192 
193     @Override
194     protected Procedure[] execute(final TEnvironment env) {
195       throw new UnsupportedOperationException();
196     }
197 
198     @Override
199     protected void rollback(final TEnvironment env) {
200       throw new UnsupportedOperationException();
201     }
202 
203     @Override
204     protected boolean abort(final TEnvironment env) {
205       throw new UnsupportedOperationException();
206     }
207 
208     @Override
209     public void serializeStateData(final OutputStream stream) {
210       throw new UnsupportedOperationException();
211     }
212 
213     @Override
214     public void deserializeStateData(final InputStream stream) {
215       throw new UnsupportedOperationException();
216     }
217   }
218 
219   /**
220    * Map the the procId returned by submitProcedure(), the Root-ProcID, to the ProcedureInfo.
221    * Once a Root-Procedure completes (success or failure), the result will be added to this map.
222    * The user of ProcedureExecutor should call getResult(procId) to get the result.
223    */
224   private final ConcurrentHashMap<Long, ProcedureInfo> completed =
225     new ConcurrentHashMap<Long, ProcedureInfo>();
226 
227   /**
228    * Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState.
229    * The RootProcedureState contains the execution stack of the Root-Procedure,
230    * It is added to the map by submitProcedure() and removed on procedure completion.
231    */
232   private final ConcurrentHashMap<Long, RootProcedureState> rollbackStack =
233     new ConcurrentHashMap<Long, RootProcedureState>();
234 
235   /**
236    * Helper map to lookup the live procedures by ID.
237    * This map contains every procedure. root-procedures and subprocedures.
238    */
239   private final ConcurrentHashMap<Long, Procedure> procedures =
240     new ConcurrentHashMap<Long, Procedure>();
241 
242   /**
243    * Helper map to lookup whether the procedure already issued from the same client.
244    * This map contains every root procedure.
245    */
246   private ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap =
247       new ConcurrentHashMap<NonceKey, Long>();
248 
249   /**
250    * Timeout Queue that contains Procedures in a WAITING_TIMEOUT state
251    * or periodic procedures.
252    */
253   private final TimeoutBlockingQueue<Procedure> waitingTimeout =
254     new TimeoutBlockingQueue<Procedure>(new ProcedureTimeoutRetriever());
255 
256   /**
257    * Queue that contains runnable procedures.
258    */
259   private final ProcedureRunnableSet runnables;
260 
261   // TODO
262   private final ReentrantLock submitLock = new ReentrantLock();
263   private final AtomicLong lastProcId = new AtomicLong(-1);
264 
265   private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners =
266     new CopyOnWriteArrayList<ProcedureExecutorListener>();
267 
268   private final AtomicInteger activeExecutorCount = new AtomicInteger(0);
269   private final AtomicBoolean running = new AtomicBoolean(false);
270   private final TEnvironment environment;
271   private final ProcedureStore store;
272   private final Configuration conf;
273 
274   private Thread[] threads;
275 
276   public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
277       final ProcedureStore store) {
278     this(conf, environment, store, new ProcedureSimpleRunQueue());
279   }
280 
281   public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
282       final ProcedureStore store, final ProcedureRunnableSet runqueue) {
283     this.environment = environment;
284     this.runnables = runqueue;
285     this.store = store;
286     this.conf = conf;
287   }
288 
289   private List<Map.Entry<Long, RootProcedureState>> load() throws IOException {
290     Preconditions.checkArgument(completed.isEmpty());
291     Preconditions.checkArgument(rollbackStack.isEmpty());
292     Preconditions.checkArgument(procedures.isEmpty());
293     Preconditions.checkArgument(waitingTimeout.isEmpty());
294     Preconditions.checkArgument(runnables.size() == 0);
295 
296     // 1. Load the procedures
297     Iterator<Procedure> loader = store.load();
298     if (loader == null) {
299       lastProcId.set(0);
300       return null;
301     }
302 
303     long logMaxProcId = 0;
304     int runnablesCount = 0;
305     while (loader.hasNext()) {
306       Procedure proc = loader.next();
307       proc.beforeReplay(getEnvironment());
308       procedures.put(proc.getProcId(), proc);
309       logMaxProcId = Math.max(logMaxProcId, proc.getProcId());
310       if (LOG.isDebugEnabled()) {
311         LOG.debug("Loading procedure state=" + proc.getState() +
312             " isFailed=" + proc.hasException() + ": " + proc);
313       }
314       if (!proc.hasParent() && !proc.isFinished()) {
315         rollbackStack.put(proc.getProcId(), new RootProcedureState());
316       }
317 
318       // add the nonce to the map
319       if (proc.getNonceKey() != null) {
320         nonceKeysToProcIdsMap.put(proc.getNonceKey(), proc.getProcId());
321       }
322 
323       if (proc.getState() == ProcedureState.RUNNABLE) {
324         runnablesCount++;
325       }
326     }
327     assert lastProcId.get() < 0;
328     lastProcId.set(logMaxProcId);
329 
330     // 2. Initialize the stacks
331     TreeSet<Procedure> runnableSet = null;
332     HashSet<Procedure> waitingSet = null;
333     for (final Procedure proc: procedures.values()) {
334       Long rootProcId = getRootProcedureId(proc);
335       if (rootProcId == null) {
336         // The 'proc' was ready to run but the root procedure was rolledback?
337         runnables.addBack(proc);
338         continue;
339       }
340 
341       if (!proc.hasParent() && proc.isFinished()) {
342         if (LOG.isDebugEnabled()) {
343           LOG.debug("The procedure is completed state=" + proc.getState() +
344               " isFailed=" + proc.hasException() + ": " + proc);
345         }
346         assert !rollbackStack.containsKey(proc.getProcId());
347 
348         completed.put(proc.getProcId(), Procedure.createProcedureInfo(proc, proc.getNonceKey()));
349 
350         continue;
351       }
352 
353       if (proc.hasParent() && !proc.isFinished()) {
354         Procedure parent = procedures.get(proc.getParentProcId());
355         // corrupted procedures are handled later at step 3
356         if (parent != null) {
357           parent.incChildrenLatch();
358         }
359       }
360 
361       RootProcedureState procStack = rollbackStack.get(rootProcId);
362       procStack.loadStack(proc);
363 
364       switch (proc.getState()) {
365         case RUNNABLE:
366           if (runnableSet == null) {
367             runnableSet = new TreeSet<Procedure>();
368           }
369           runnableSet.add(proc);
370           break;
371         case WAITING_TIMEOUT:
372           if (waitingSet == null) {
373             waitingSet = new HashSet<Procedure>();
374           }
375           waitingSet.add(proc);
376           break;
377         case FINISHED:
378           if (proc.hasException()) {
379             // add the proc to the runnables to perform the rollback
380             runnables.addBack(proc);
381             break;
382           }
383         case ROLLEDBACK:
384         case INITIALIZING:
385           String msg = "Unexpected " + proc.getState() + " state for " + proc;
386           LOG.error(msg);
387           throw new UnsupportedOperationException(msg);
388         default:
389           break;
390       }
391     }
392 
393     // 3. Validate the stacks
394     List<Map.Entry<Long, RootProcedureState>> corrupted = null;
395     Iterator<Map.Entry<Long, RootProcedureState>> itStack = rollbackStack.entrySet().iterator();
396     while (itStack.hasNext()) {
397       Map.Entry<Long, RootProcedureState> entry = itStack.next();
398       RootProcedureState procStack = entry.getValue();
399       if (procStack.isValid()) continue;
400 
401       for (Procedure proc: procStack.getSubprocedures()) {
402         procedures.remove(proc.getProcId());
403         if (runnableSet != null) runnableSet.remove(proc);
404         if (waitingSet != null) waitingSet.remove(proc);
405       }
406       itStack.remove();
407       if (corrupted == null) {
408         corrupted = new ArrayList<Map.Entry<Long, RootProcedureState>>();
409       }
410       corrupted.add(entry);
411     }
412 
413     // 4. Push the runnables
414     if (runnableSet != null) {
415       // TODO: See ProcedureWALFormatReader.readInitEntry() some procedure
416       // may be started way before this stuff.
417       for (Procedure proc: runnableSet) {
418         if (!proc.hasParent()) {
419           sendProcedureLoadedNotification(proc.getProcId());
420         }
421         runnables.addBack(proc);
422       }
423     }
424     return corrupted;
425   }
426 
427   public void start(int numThreads) throws IOException {
428     if (running.getAndSet(true)) {
429       LOG.warn("Already running");
430       return;
431     }
432 
433     // We have numThreads executor + one timer thread used for timing out
434     // procedures and triggering periodic procedures.
435     threads = new Thread[numThreads + 1];
436     LOG.info("Starting procedure executor threads=" + threads.length);
437 
438     // Initialize procedures executor
439     for (int i = 0; i < numThreads; ++i) {
440       threads[i] = new Thread("ProcedureExecutorThread-" + i) {
441         @Override
442         public void run() {
443           execLoop();
444         }
445       };
446     }
447 
448     // Initialize procedures timeout handler (this is the +1 thread)
449     threads[numThreads] = new Thread("ProcedureExecutorTimeout") {
450       @Override
451       public void run() {
452         timeoutLoop();
453       }
454     };
455 
456     // Acquire the store lease.
457     store.recoverLease();
458 
459     // TODO: Split in two steps.
460     // TODO: Handle corrupted procedure returned (probably just a WARN)
461     // The first one will make sure that we have the latest id,
462     // so we can start the threads and accept new procedures.
463     // The second step will do the actual load of old procedures.
464     load();
465 
466     // Start the executors. Here we must have the lastProcId set.
467     for (int i = 0; i < threads.length; ++i) {
468       threads[i].start();
469     }
470 
471     // Add completed cleaner
472     waitingTimeout.add(
473       new CompletedProcedureCleaner(conf, store, completed, nonceKeysToProcIdsMap));
474   }
475 
476   public void stop() {
477     if (!running.getAndSet(false)) {
478       return;
479     }
480 
481     LOG.info("Stopping the procedure executor");
482     runnables.signalAll();
483     waitingTimeout.signalAll();
484   }
485 
486   public void join() {
487     boolean interrupted = false;
488 
489     for (int i = 0; i < threads.length; ++i) {
490       try {
491         threads[i].join();
492       } catch (InterruptedException ex) {
493         interrupted = true;
494       }
495     }
496 
497     if (interrupted) {
498       Thread.currentThread().interrupt();
499     }
500 
501     completed.clear();
502     rollbackStack.clear();
503     procedures.clear();
504     nonceKeysToProcIdsMap.clear();
505     waitingTimeout.clear();
506     runnables.clear();
507     lastProcId.set(-1);
508   }
509 
510   public boolean isRunning() {
511     return running.get();
512   }
513 
514   /**
515    * @return the number of execution threads.
516    */
517   public int getNumThreads() {
518     return threads == null ? 0 : (threads.length - 1);
519   }
520 
521   public int getActiveExecutorCount() {
522     return activeExecutorCount.get();
523   }
524 
525   public TEnvironment getEnvironment() {
526     return this.environment;
527   }
528 
529   public ProcedureStore getStore() {
530     return this.store;
531   }
532 
533   public void registerListener(ProcedureExecutorListener listener) {
534     this.listeners.add(listener);
535   }
536 
537   public boolean unregisterListener(ProcedureExecutorListener listener) {
538     return this.listeners.remove(listener);
539   }
540 
541   /**
542    * List procedures.
543    * @return the procedures in a list
544    */
545   public List<ProcedureInfo> listProcedures() {
546     List<ProcedureInfo> procedureLists =
547         new ArrayList<ProcedureInfo>(procedures.size() + completed.size());
548     for (java.util.Map.Entry<Long, Procedure> p: procedures.entrySet()) {
549       procedureLists.add(Procedure.createProcedureInfo(p.getValue(), null));
550     }
551     for (java.util.Map.Entry<Long, ProcedureInfo> e: completed.entrySet()) {
552       // Note: The procedure could show up twice in the list with different state, as
553       // it could complete after we walk through procedures list and insert into
554       // procedureList - it is ok, as we will use the information in the ProcedureInfo
555       // to figure it out; to prevent this would increase the complexity of the logic.
556       procedureLists.add(e.getValue());
557     }
558     return procedureLists;
559   }
560 
561   // ==========================================================================
562   //  Nonce Procedure helpers
563   // ==========================================================================
564   /**
565    * Create a NoneKey from the specified nonceGroup and nonce.
566    * @param nonceGroup
567    * @param nonce
568    * @return the generated NonceKey
569    */
570   public NonceKey createNonceKey(final long nonceGroup, final long nonce) {
571     return (nonce == HConstants.NO_NONCE) ? null : new NonceKey(nonceGroup, nonce);
572   }
573 
574   /**
575    * Register a nonce for a procedure that is going to be submitted.
576    * A procId will be reserved and on submitProcedure(),
577    * the procedure with the specified nonce will take the reserved ProcId.
578    * If someone already reserved the nonce, this method will return the procId reserved,
579    * otherwise an invalid procId will be returned. and the caller should procede
580    * and submit the procedure.
581    *
582    * @param nonceKey A unique identifier for this operation from the client or process.
583    * @return the procId associated with the nonce, if any otherwise an invalid procId.
584    */
585   public long registerNonce(final NonceKey nonceKey) {
586     if (nonceKey == null) return -1;
587 
588     // check if we have already a Reserved ID for the nonce
589     Long oldProcId = nonceKeysToProcIdsMap.get(nonceKey);
590     if (oldProcId == null) {
591       // reserve a new Procedure ID, this will be associated with the nonce
592       // and the procedure submitted with the specified nonce will use this ID.
593       final long newProcId = nextProcId();
594       oldProcId = nonceKeysToProcIdsMap.putIfAbsent(nonceKey, newProcId);
595       if (oldProcId == null) return -1;
596     }
597 
598     // we found a registered nonce, but the procedure may not have been submitted yet.
599     // since the client expect the procedure to be submitted, spin here until it is.
600     final boolean isTraceEnabled = LOG.isTraceEnabled();
601     while (isRunning() &&
602            !(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId)) &&
603            nonceKeysToProcIdsMap.containsKey(nonceKey)) {
604       if (isTraceEnabled) {
605         LOG.trace("waiting for procId=" + oldProcId.longValue() + " to be submitted");
606       }
607       Threads.sleep(100);
608     }
609     return oldProcId.longValue();
610   }
611 
612   /**
613    * Remove the NonceKey if the procedure was not submitted to the executor.
614    * @param nonceKey A unique identifier for this operation from the client or process.
615    */
616   public void unregisterNonceIfProcedureWasNotSubmitted(final NonceKey nonceKey) {
617     if (nonceKey == null) return;
618 
619     final Long procId = nonceKeysToProcIdsMap.get(nonceKey);
620     if (procId == null) return;
621 
622     // if the procedure was not submitted, remove the nonce
623     if (!(procedures.containsKey(procId) || completed.containsKey(procId))) {
624       nonceKeysToProcIdsMap.remove(nonceKey);
625     }
626   }
627 
628   /**
629    * If the failure failed before submitting it, we may want to give back the
630    * same error to the requests with the same nonceKey.
631    *
632    * @param nonceKey A unique identifier for this operation from the client or process
633    * @param procName name of the procedure, used to inform the user
634    * @param procOwner name of the owner of the procedure, used to inform the user
635    * @param exception the failure to report to the user
636    */
637   public void setFailureResultForNonce(final NonceKey nonceKey, final String procName,
638       final User procOwner, final IOException exception) {
639     if (nonceKey == null) return;
640 
641     final Long procId = nonceKeysToProcIdsMap.get(nonceKey);
642     if (procId == null || completed.containsKey(procId)) return;
643 
644     final long currentTime = EnvironmentEdgeManager.currentTime();
645     final ProcedureInfo result = new ProcedureInfo(
646       procId.longValue(),
647       procName,
648       procOwner != null ? procOwner.getShortName() : null,
649       ProcedureState.ROLLEDBACK,
650       -1,
651       nonceKey,
652       ForeignExceptionUtil.toProtoForeignException("ProcedureExecutor", exception),
653       currentTime,
654       currentTime,
655       null);
656     completed.putIfAbsent(procId, result);
657   }
658 
659   // ==========================================================================
660   //  Submit/Abort Procedure
661   // ==========================================================================
662   /**
663    * Add a new root-procedure to the executor.
664    * @param proc the new procedure to execute.
665    * @return the procedure id, that can be used to monitor the operation
666    */
667   public long submitProcedure(final Procedure proc) {
668     return submitProcedure(proc, null);
669   }
670 
671   /**
672    * Add a new root-procedure to the executor.
673    * @param proc the new procedure to execute.
674    * @param nonceKey the registered unique identifier for this operation from the client or process.
675    * @return the procedure id, that can be used to monitor the operation
676    */
677   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
678       justification = "FindBugs is blind to the check-for-null")
679   public long submitProcedure(final Procedure proc, final NonceKey nonceKey) {
680     Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
681     Preconditions.checkArgument(isRunning(), "executor not running");
682     Preconditions.checkArgument(lastProcId.get() >= 0);
683     Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc);
684 
685     final Long currentProcId;
686     if (nonceKey != null) {
687       currentProcId = nonceKeysToProcIdsMap.get(nonceKey);
688       Preconditions.checkArgument(currentProcId != null,
689         "expected nonceKey=" + nonceKey + " to be reserved, use registerNonce()");
690     } else {
691       currentProcId = nextProcId();
692     }
693 
694     // Initialize the procedure
695     proc.setNonceKey(nonceKey);
696     proc.setProcId(currentProcId.longValue());
697 
698     // Commit the transaction
699     store.insert(proc, null);
700     if (LOG.isDebugEnabled()) {
701       LOG.debug("Procedure " + proc + " added to the store.");
702     }
703 
704     // Create the rollback stack for the procedure
705     RootProcedureState stack = new RootProcedureState();
706     rollbackStack.put(currentProcId, stack);
707 
708     // Submit the new subprocedures
709     assert !procedures.containsKey(currentProcId);
710     procedures.put(currentProcId, proc);
711     sendProcedureAddedNotification(currentProcId);
712     runnables.addBack(proc);
713     return currentProcId;
714   }
715 
716   public ProcedureInfo getResult(final long procId) {
717     return completed.get(procId);
718   }
719 
720   /**
721    * Return true if the procedure is finished.
722    * The state may be "completed successfully" or "failed and rolledback".
723    * Use getResult() to check the state or get the result data.
724    * @param procId the ID of the procedure to check
725    * @return true if the procedure execution is finished, otherwise false.
726    */
727   public boolean isFinished(final long procId) {
728     return completed.containsKey(procId);
729   }
730 
731   /**
732    * Return true if the procedure is started.
733    * @param procId the ID of the procedure to check
734    * @return true if the procedure execution is started, otherwise false.
735    */
736   public boolean isStarted(final long procId) {
737     Procedure proc = procedures.get(procId);
738     if (proc == null) {
739       return completed.get(procId) != null;
740     }
741     return proc.wasExecuted();
742   }
743 
744   /**
745    * Mark the specified completed procedure, as ready to remove.
746    * @param procId the ID of the procedure to remove
747    */
748   public void removeResult(final long procId) {
749     ProcedureInfo result = completed.get(procId);
750     if (result == null) {
751       assert !procedures.containsKey(procId) : "procId=" + procId + " is still running";
752       if (LOG.isDebugEnabled()) {
753         LOG.debug("Procedure procId=" + procId + " already removed by the cleaner.");
754       }
755       return;
756     }
757 
758     // The CompletedProcedureCleaner will take care of deletion, once the TTL is expired.
759     result.setClientAckTime(EnvironmentEdgeManager.currentTime());
760   }
761 
762   /**
763    * Send an abort notification the specified procedure.
764    * Depending on the procedure implementation the abort can be considered or ignored.
765    * @param procId the procedure to abort
766    * @return true if the procedure exist and has received the abort, otherwise false.
767    */
768   public boolean abort(final long procId) {
769     return abort(procId, true);
770   }
771 
772   /**
773    * Send an abort notification the specified procedure.
774    * Depending on the procedure implementation the abort can be considered or ignored.
775    * @param procId the procedure to abort
776    * @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
777    * @return true if the procedure exist and has received the abort, otherwise false.
778    */
779   public boolean abort(final long procId, final boolean mayInterruptIfRunning) {
780     Procedure proc = procedures.get(procId);
781     if (proc != null) {
782       if (!mayInterruptIfRunning && proc.wasExecuted()) {
783         return false;
784       } else {
785         return proc.abort(getEnvironment());
786       }
787     }
788     return false;
789   }
790 
791   /**
792    * Check if the user is this procedure's owner
793    * @param procId the target procedure
794    * @param user the user
795    * @return true if the user is the owner of the procedure,
796    *   false otherwise or the owner is unknown.
797    */
798   public boolean isProcedureOwner(final long procId, final User user) {
799     if (user == null) {
800       return false;
801     }
802 
803     Procedure proc = procedures.get(procId);
804     if (proc != null) {
805       return proc.getOwner().equals(user.getShortName());
806     }
807     ProcedureInfo procInfo = completed.get(procId);
808     if (procInfo == null) {
809       // Procedure either does not exist or has already completed and got cleaned up.
810       // At this time, we cannot check the owner of the procedure
811       return false;
812     }
813     return ProcedureInfo.isProcedureOwner(procInfo, user);
814   }
815 
816   public Map<Long, ProcedureInfo> getResults() {
817     return Collections.unmodifiableMap(completed);
818   }
819 
820   public Procedure getProcedure(final long procId) {
821     return procedures.get(procId);
822   }
823 
824   protected ProcedureRunnableSet getRunnableSet() {
825     return runnables;
826   }
827 
828   /**
829    * Execution loop (N threads)
830    * while the executor is in a running state,
831    * fetch a procedure from the runnables queue and start the execution.
832    */
833   private void execLoop() {
834     while (isRunning()) {
835       Long procId = runnables.poll();
836       Procedure proc = procId != null ? procedures.get(procId) : null;
837       if (proc == null) continue;
838 
839       try {
840         activeExecutorCount.incrementAndGet();
841         execLoop(proc);
842       } finally {
843         activeExecutorCount.decrementAndGet();
844       }
845     }
846   }
847 
848   private void execLoop(Procedure proc) {
849     if (LOG.isTraceEnabled()) {
850       LOG.trace("Trying to start the execution of " + proc);
851     }
852 
853     Long rootProcId = getRootProcedureId(proc);
854     if (rootProcId == null) {
855       // The 'proc' was ready to run but the root procedure was rolledback
856       executeRollback(proc);
857       return;
858     }
859 
860     RootProcedureState procStack = rollbackStack.get(rootProcId);
861     if (procStack == null) return;
862 
863     do {
864       // Try to acquire the execution
865       if (!procStack.acquire(proc)) {
866         if (procStack.setRollback()) {
867           // we have the 'rollback-lock' we can start rollingback
868           if (!executeRollback(rootProcId, procStack)) {
869             procStack.unsetRollback();
870             runnables.yield(proc);
871           }
872         } else {
873           // if we can't rollback means that some child is still running.
874           // the rollback will be executed after all the children are done.
875           // If the procedure was never executed, remove and mark it as rolledback.
876           if (!proc.wasExecuted()) {
877             if (!executeRollback(proc)) {
878               runnables.yield(proc);
879             }
880           }
881         }
882         break;
883       }
884 
885       // Execute the procedure
886       assert proc.getState() == ProcedureState.RUNNABLE;
887       if (proc.acquireLock(getEnvironment())) {
888         execProcedure(procStack, proc);
889         proc.releaseLock(getEnvironment());
890       } else {
891         runnables.yield(proc);
892       }
893       procStack.release(proc);
894 
895       // allows to kill the executor before something is stored to the wal.
896       // useful to test the procedure recovery.
897       if (testing != null && !isRunning()) {
898         break;
899       }
900 
901       if (proc.isSuccess()) {
902         if (LOG.isDebugEnabled()) {
903           LOG.debug("Procedure completed in " +
904               StringUtils.humanTimeDiff(proc.elapsedTime()) + ": " + proc);
905         }
906         // Finalize the procedure state
907         if (proc.getProcId() == rootProcId) {
908           procedureFinished(proc);
909         }
910         break;
911       }
912     } while (procStack.isFailed());
913   }
914 
915   private void timeoutLoop() {
916     while (isRunning()) {
917       Procedure proc = waitingTimeout.poll();
918       if (proc == null) continue;
919 
920       if (proc.getTimeRemaining() > 100) {
921         // got an early wake, maybe a stop?
922         // re-enqueue the task in case was not a stop or just a signal
923         waitingTimeout.add(proc);
924         continue;
925       }
926 
927       // ----------------------------------------------------------------------------
928       // TODO-MAYBE: Should we provide a notification to the store with the
929       // full set of procedures pending and completed to write a compacted
930       // version of the log (in case is a log)?
931       // In theory no, procedures are have a short life, so at some point the store
932       // will have the tracker saying everything is in the last log.
933       // ----------------------------------------------------------------------------
934 
935       // The CompletedProcedureCleaner is a special case, and it acts as a chore.
936       // instead of bringing the Chore class in, we reuse this timeout thread for
937       // this special case.
938       if (proc instanceof CompletedProcedureCleaner) {
939         try {
940           ((CompletedProcedureCleaner)proc).periodicExecute(getEnvironment());
941         } catch (Throwable e) {
942           LOG.error("Ignoring CompletedProcedureCleaner exception: " + e.getMessage(), e);
943         }
944         proc.setStartTime(EnvironmentEdgeManager.currentTime());
945         waitingTimeout.add(proc);
946         continue;
947       }
948 
949       // The procedure received an "abort-timeout", call abort() and
950       // add the procedure back in the queue for rollback.
951       if (proc.setTimeoutFailure()) {
952         long rootProcId = Procedure.getRootProcedureId(procedures, proc);
953         RootProcedureState procStack = rollbackStack.get(rootProcId);
954         procStack.abort();
955         store.update(proc);
956         runnables.addFront(proc);
957         continue;
958       }
959     }
960   }
961 
962   /**
963    * Execute the rollback of the full procedure stack.
964    * Once the procedure is rolledback, the root-procedure will be visible as
965    * finished to user, and the result will be the fatal exception.
966    */
967   private boolean executeRollback(final long rootProcId, final RootProcedureState procStack) {
968     Procedure rootProc = procedures.get(rootProcId);
969     RemoteProcedureException exception = rootProc.getException();
970     if (exception == null) {
971       exception = procStack.getException();
972       rootProc.setFailure(exception);
973       store.update(rootProc);
974     }
975 
976     List<Procedure> subprocStack = procStack.getSubprocedures();
977     assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc;
978 
979     int stackTail = subprocStack.size();
980     boolean reuseLock = false;
981     while (stackTail --> 0) {
982       final Procedure proc = subprocStack.get(stackTail);
983 
984       if (!reuseLock && !proc.acquireLock(getEnvironment())) {
985         // can't take a lock on the procedure, add the root-proc back on the
986         // queue waiting for the lock availability
987         return false;
988       }
989 
990       boolean abortRollback = !executeRollback(proc);
991       abortRollback |= !isRunning() || !store.isRunning();
992 
993       // If the next procedure is the same to this one
994       // (e.g. StateMachineProcedure reuse the same instance)
995       // we can avoid to lock/unlock each step
996       reuseLock = stackTail > 0 && (subprocStack.get(stackTail - 1) == proc) && !abortRollback;
997       if (!reuseLock) {
998         proc.releaseLock(getEnvironment());
999       }
1000 
1001       // allows to kill the executor before something is stored to the wal.
1002       // useful to test the procedure recovery.
1003       if (abortRollback) {
1004         return false;
1005       }
1006 
1007       subprocStack.remove(stackTail);
1008     }
1009 
1010     // Finalize the procedure state
1011     LOG.info("Rolledback procedure " + rootProc +
1012              " exec-time=" + StringUtils.humanTimeDiff(rootProc.elapsedTime()) +
1013              " exception=" + exception.getMessage());
1014     procedureFinished(rootProc);
1015     return true;
1016   }
1017 
1018   /**
1019    * Execute the rollback of the procedure step.
1020    * It updates the store with the new state (stack index)
1021    * or will remove completly the procedure in case it is a child.
1022    */
1023   private boolean executeRollback(final Procedure proc) {
1024     try {
1025       proc.doRollback(getEnvironment());
1026     } catch (IOException e) {
1027       if (LOG.isDebugEnabled()) {
1028         LOG.debug("rollback attempt failed for " + proc, e);
1029       }
1030       return false;
1031     } catch (Throwable e) {
1032       // Catch NullPointerExceptions or similar errors...
1033       LOG.fatal("CODE-BUG: Uncatched runtime exception for procedure: " + proc, e);
1034     }
1035 
1036     // allows to kill the executor before something is stored to the wal.
1037     // useful to test the procedure recovery.
1038     if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
1039       if (LOG.isDebugEnabled()) {
1040         LOG.debug("TESTING: Kill before store update");
1041       }
1042       stop();
1043       return false;
1044     }
1045 
1046     if (proc.removeStackIndex()) {
1047       proc.setState(ProcedureState.ROLLEDBACK);
1048       if (proc.hasParent()) {
1049         store.delete(proc.getProcId());
1050         procedures.remove(proc.getProcId());
1051       } else {
1052         store.update(proc);
1053       }
1054     } else {
1055       store.update(proc);
1056     }
1057     return true;
1058   }
1059 
1060   /**
1061    * Executes the specified procedure
1062    *  - calls the doExecute() of the procedure
1063    *  - if the procedure execution didn't fail (e.g. invalid user input)
1064    *     - ...and returned subprocedures
1065    *        - the subprocedures are initialized.
1066    *        - the subprocedures are added to the store
1067    *        - the subprocedures are added to the runnable queue
1068    *        - the procedure is now in a WAITING state, waiting for the subprocedures to complete
1069    *     - ...if there are no subprocedure
1070    *        - the procedure completed successfully
1071    *        - if there is a parent (WAITING)
1072    *            - the parent state will be set to RUNNABLE
1073    *  - in case of failure
1074    *    - the store is updated with the new state
1075    *    - the executor (caller of this method) will start the rollback of the procedure
1076    */
1077   private void execProcedure(final RootProcedureState procStack, final Procedure procedure) {
1078     Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE);
1079 
1080     // Execute the procedure
1081     boolean reExecute = false;
1082     Procedure[] subprocs = null;
1083     do {
1084       reExecute = false;
1085       try {
1086         subprocs = procedure.doExecute(getEnvironment());
1087         if (subprocs != null && subprocs.length == 0) {
1088           subprocs = null;
1089         }
1090       } catch (ProcedureYieldException e) {
1091         if (LOG.isTraceEnabled()) {
1092           LOG.trace("Yield procedure: " + procedure);
1093         }
1094         runnables.yield(procedure);
1095         return;
1096       } catch (Throwable e) {
1097         // Catch NullPointerExceptions or similar errors...
1098         String msg = "CODE-BUG: Uncatched runtime exception for procedure: " + procedure;
1099         LOG.error(msg, e);
1100         procedure.setFailure(new RemoteProcedureException(msg, e));
1101       }
1102 
1103       if (!procedure.isFailed()) {
1104         if (subprocs != null) {
1105           if (subprocs.length == 1 && subprocs[0] == procedure) {
1106             // quick-shortcut for a state machine like procedure
1107             subprocs = null;
1108             reExecute = true;
1109           } else {
1110             // yield the current procedure, and make the subprocedure runnable
1111             for (int i = 0; i < subprocs.length; ++i) {
1112               Procedure subproc = subprocs[i];
1113               if (subproc == null) {
1114                 String msg = "subproc[" + i + "] is null, aborting the procedure";
1115                 procedure.setFailure(new RemoteProcedureException(msg,
1116                   new IllegalArgumentIOException(msg)));
1117                 subprocs = null;
1118                 break;
1119               }
1120 
1121               assert subproc.getState() == ProcedureState.INITIALIZING;
1122               subproc.setParentProcId(procedure.getProcId());
1123               subproc.setProcId(nextProcId());
1124             }
1125 
1126             if (!procedure.isFailed()) {
1127               procedure.setChildrenLatch(subprocs.length);
1128               switch (procedure.getState()) {
1129                 case RUNNABLE:
1130                   procedure.setState(ProcedureState.WAITING);
1131                   break;
1132                 case WAITING_TIMEOUT:
1133                   waitingTimeout.add(procedure);
1134                   break;
1135                 default:
1136                   break;
1137               }
1138             }
1139           }
1140         } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
1141           waitingTimeout.add(procedure);
1142         } else {
1143           // No subtask, so we are done
1144           procedure.setState(ProcedureState.FINISHED);
1145         }
1146       }
1147 
1148       // Add the procedure to the stack
1149       procStack.addRollbackStep(procedure);
1150 
1151       // allows to kill the executor before something is stored to the wal.
1152       // useful to test the procedure recovery.
1153       if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
1154         if (LOG.isDebugEnabled()) {
1155           LOG.debug("TESTING: Kill before store update");
1156         }
1157         stop();
1158         return;
1159       }
1160 
1161       // Commit the transaction
1162       if (subprocs != null && !procedure.isFailed()) {
1163         if (LOG.isTraceEnabled()) {
1164           LOG.trace("Store add " + procedure + " children " + Arrays.toString(subprocs));
1165         }
1166         store.insert(procedure, subprocs);
1167       } else {
1168         if (LOG.isTraceEnabled()) {
1169           LOG.trace("Store update " + procedure);
1170         }
1171         store.update(procedure);
1172       }
1173 
1174       // if the store is not running we are aborting
1175       if (!store.isRunning()) {
1176         return;
1177       }
1178 
1179       assert (reExecute && subprocs == null) || !reExecute;
1180     } while (reExecute);
1181 
1182     // Submit the new subprocedures
1183     if (subprocs != null && !procedure.isFailed()) {
1184       for (int i = 0; i < subprocs.length; ++i) {
1185         Procedure subproc = subprocs[i];
1186         assert !procedures.containsKey(subproc.getProcId());
1187         procedures.put(subproc.getProcId(), subproc);
1188         runnables.addFront(subproc);
1189       }
1190     }
1191 
1192     if (procedure.isFinished() && procedure.hasParent()) {
1193       Procedure parent = procedures.get(procedure.getParentProcId());
1194       if (parent == null) {
1195         assert procStack.isRollingback();
1196         return;
1197       }
1198 
1199       // If this procedure is the last child awake the parent procedure
1200       if (LOG.isTraceEnabled()) {
1201         LOG.trace(parent + " child is done: " + procedure);
1202       }
1203       if (parent.childrenCountDown() && parent.getState() == ProcedureState.WAITING) {
1204         parent.setState(ProcedureState.RUNNABLE);
1205         store.update(parent);
1206         runnables.addFront(parent);
1207         if (LOG.isTraceEnabled()) {
1208           LOG.trace(parent + " all the children finished their work, resume.");
1209         }
1210         return;
1211       }
1212     }
1213   }
1214 
1215   private void sendProcedureLoadedNotification(final long procId) {
1216     if (!this.listeners.isEmpty()) {
1217       for (ProcedureExecutorListener listener: this.listeners) {
1218         try {
1219           listener.procedureLoaded(procId);
1220         } catch (Throwable e) {
1221           LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
1222         }
1223       }
1224     }
1225   }
1226 
1227   private void sendProcedureAddedNotification(final long procId) {
1228     if (!this.listeners.isEmpty()) {
1229       for (ProcedureExecutorListener listener: this.listeners) {
1230         try {
1231           listener.procedureAdded(procId);
1232         } catch (Throwable e) {
1233           LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
1234         }
1235       }
1236     }
1237   }
1238 
1239   private void sendProcedureFinishedNotification(final long procId) {
1240     if (!this.listeners.isEmpty()) {
1241       for (ProcedureExecutorListener listener: this.listeners) {
1242         try {
1243           listener.procedureFinished(procId);
1244         } catch (Throwable e) {
1245           LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
1246         }
1247       }
1248     }
1249   }
1250 
1251   private long nextProcId() {
1252     long procId = lastProcId.incrementAndGet();
1253     if (procId < 0) {
1254       while (!lastProcId.compareAndSet(procId, 0)) {
1255         procId = lastProcId.get();
1256         if (procId >= 0)
1257           break;
1258       }
1259       while (procedures.containsKey(procId)) {
1260         procId = lastProcId.incrementAndGet();
1261       }
1262     }
1263     return procId;
1264   }
1265 
1266   private Long getRootProcedureId(Procedure proc) {
1267     return Procedure.getRootProcedureId(procedures, proc);
1268   }
1269 
1270   private void procedureFinished(final Procedure proc) {
1271     // call the procedure completion cleanup handler
1272     try {
1273       proc.completionCleanup(getEnvironment());
1274     } catch (Throwable e) {
1275       // Catch NullPointerExceptions or similar errors...
1276       LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e);
1277     }
1278 
1279     // update the executor internal state maps
1280     completed.put(proc.getProcId(), Procedure.createProcedureInfo(proc, proc.getNonceKey()));
1281     rollbackStack.remove(proc.getProcId());
1282     procedures.remove(proc.getProcId());
1283 
1284     // call the runnableSet completion cleanup handler
1285     try {
1286       runnables.completionCleanup(proc);
1287     } catch (Throwable e) {
1288       // Catch NullPointerExceptions or similar errors...
1289       LOG.error("CODE-BUG: uncatched runtime exception for runnableSet: " + runnables, e);
1290     }
1291 
1292     // Notify the listeners
1293     sendProcedureFinishedNotification(proc.getProcId());
1294   }
1295 
1296   public Pair<ProcedureInfo, Procedure> getResultOrProcedure(final long procId) {
1297     ProcedureInfo result = completed.get(procId);
1298     Procedure proc = null;
1299     if (result == null) {
1300       proc = procedures.get(procId);
1301       if (proc == null) {
1302         result = completed.get(procId);
1303       }
1304     }
1305     return new Pair(result, proc);
1306   }
1307 }