View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.procedure2;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  import java.lang.reflect.Constructor;
25  import java.lang.reflect.Modifier;
26  import java.util.Arrays;
27  import java.util.List;
28  import java.util.Map;
29  
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.ProcedureInfo;
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.hbase.classification.InterfaceStability;
34  import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
35  import org.apache.hadoop.hbase.procedure2.util.StringUtils;
36  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
37  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
38  import org.apache.hadoop.hbase.util.ByteStringer;
39  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
40  import org.apache.hadoop.hbase.util.NonceKey;
41  
42  import com.google.common.annotations.VisibleForTesting;
43  import com.google.common.base.Preconditions;
44  import com.google.protobuf.ByteString;
45  
46  /**
47   * Base Procedure class responsible to handle the Procedure Metadata
48   * e.g. state, startTime, lastUpdate, stack-indexes, ...
49   *
50   * execute() is called each time the procedure is executed.
51   * it may be called multiple times in case of failure and restart, so the
52   * code must be idempotent.
53   * the return is a set of sub-procedures or null in case the procedure doesn't
54   * have sub-procedures. Once the sub-procedures are successfully completed
55   * the execute() method is called again, you should think at it as a stack:
56   *  -> step 1
57   *  ---> step 2
58   *  -> step 1
59   *
60   * rollback() is called when the procedure or one of the sub-procedures is failed.
61   * the rollback step is supposed to cleanup the resources created during the
62   * execute() step. in case of failure and restart rollback() may be called
63   * multiple times, so the code must be idempotent.
64   */
65  @InterfaceAudience.Private
66  @InterfaceStability.Evolving
67  public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
68    // unchanged after initialization
69    private String owner = null;
70    private Long parentProcId = null;
71    private Long procId = null;
72    private long startTime;
73  
74    // runtime state, updated every operation
75    private ProcedureState state = ProcedureState.INITIALIZING;
76    private Integer timeout = null;
77    private int[] stackIndexes = null;
78    private int childrenLatch = 0;
79    private long lastUpdate;
80  
81    private RemoteProcedureException exception = null;
82    private byte[] result = null;
83  
84    private NonceKey nonceKey = null;
85  
86    /**
87     * The main code of the procedure. It must be idempotent since execute()
88     * may be called multiple time in case of machine failure in the middle
89     * of the execution.
90     * @param env the environment passed to the ProcedureExecutor
91     * @return a set of sub-procedures or null if there is nothing else to execute.
92     * @throws ProcedureYieldException the procedure will be added back to the queue and retried later
93     * @throws InterruptedException the procedure will be added back to the queue and retried later
94     */
95    protected abstract Procedure[] execute(TEnvironment env)
96      throws ProcedureYieldException, InterruptedException;
97  
98    /**
99     * The code to undo what done by the execute() code.
100    * It is called when the procedure or one of the sub-procedure failed or an
101    * abort was requested. It should cleanup all the resources created by
102    * the execute() call. The implementation must be idempotent since rollback()
103    * may be called multiple time in case of machine failure in the middle
104    * of the execution.
105    *
106    * @param env the environment passed to the ProcedureExecutor
107    * @throws IOException temporary failure, the rollback will retry later
108    */
109   protected abstract void rollback(TEnvironment env)
110     throws IOException;
111 
112   /**
113    * The abort() call is asynchronous and each procedure must decide how to deal
114    * with that, if they want to be abortable. The simplest implementation
115    * is to have an AtomicBoolean set in the abort() method and then the execute()
116    * will check if the abort flag is set or not.
117    * abort() may be called multiple times from the client, so the implementation
118    * must be idempotent.
119    *
120    * NOTE: abort() is not like Thread.interrupt() it is just a notification
121    * that allows the procedure implementor where to abort to avoid leak and
122    * have a better control on what was executed and what not.
123    *
124    * @param env the environment passed to the ProcedureExecutor
125    */
126   protected abstract boolean abort(TEnvironment env);
127 
128   /**
129    * The user-level code of the procedure may have some state to
130    * persist (e.g. input arguments) to be able to resume on failure.
131    * @param stream the stream that will contain the user serialized data
132    * @throws IOException failure to stream data
133    */
134   protected abstract void serializeStateData(final OutputStream stream)
135     throws IOException;
136 
137   /**
138    * Called on store load to allow the user to decode the previously serialized
139    * state.
140    * @param stream the stream that contains the user serialized data
141    */
142   protected abstract void deserializeStateData(final InputStream stream)
143     throws IOException;
144 
145   /**
146    * The user should override this method, and try to take a lock if necessary.
147    * A lock can be anything, and it is up to the implementor.
148    * Example: in our Master we can execute request in parallel for different tables
149    *          create t1 and create t2 can be executed at the same time.
150    *          anything else on t1/t2 is queued waiting that specific table create to happen.
151    *
152    * @param env the environment passed to the ProcedureExecutor
153    * @return true if the lock was acquired and false otherwise
154    */
155   protected boolean acquireLock(final TEnvironment env) {
156     return true;
157   }
158 
159   /**
160    * The user should override this method, and release lock if necessary.
161    *
162    * @param env the environment passed to the ProcedureExecutor
163    */
164   protected void releaseLock(final TEnvironment env) {
165     // no-op
166   }
167 
168   /**
169    * Called when the procedure is loaded for replay.
170    * The procedure implementor may use this method to perform some quick
171    * operation before replay.
172    * e.g. failing the procedure if the state on replay may be unknown.
173    *
174    * @param env the environment passed to the ProcedureExecutor
175    */
176   protected void beforeReplay(final TEnvironment env) {
177     // no-op
178   }
179 
180   /**
181    * Called when the procedure is marked as completed (success or rollback).
182    * The procedure implementor may use this method to cleanup in-memory states.
183    * This operation will not be retried on failure.
184    *
185    * @param env the environment passed to the ProcedureExecutor
186    */
187   protected void completionCleanup(final TEnvironment env) {
188     // no-op
189   }
190 
191   @Override
192   public String toString() {
193     // Return the simple String presentation of the procedure.
194     return toStringSimpleSB().toString();
195   }
196 
197   /**
198    * Build the StringBuilder for the simple form of
199    * procedure string.
200    * @return the StringBuilder
201    */
202   protected StringBuilder toStringSimpleSB() {
203     StringBuilder sb = new StringBuilder();
204     toStringClassDetails(sb);
205 
206     if (procId != null) {
207       sb.append(" id=");
208       sb.append(getProcId());
209     }
210 
211     if (hasParent()) {
212       sb.append(" parent=");
213       sb.append(getParentProcId());
214     }
215 
216     if (hasOwner()) {
217       sb.append(" owner=");
218       sb.append(getOwner());
219     }
220 
221     sb.append(" state=");
222     sb.append(getState());
223 
224     return sb;
225   }
226 
227   /**
228    * Extend the toString() information with more procedure
229    * details
230    */
231   public String toStringDetails() {
232     StringBuilder sb = toStringSimpleSB();
233 
234     sb.append(" startTime=");
235     sb.append(getStartTime());
236 
237     sb.append(" lastUpdate=");
238     sb.append(getLastUpdate());
239 
240     if (stackIndexes != null) {
241       sb.append("\n");
242       sb.append("stackIndexes=");
243       sb.append(Arrays.toString(getStackIndexes()));
244     }
245 
246     return sb.toString();
247   }
248 
249   protected String toStringClass() {
250     StringBuilder sb = new StringBuilder();
251     toStringClassDetails(sb);
252 
253     return sb.toString();
254   }
255 
256   /**
257    * Extend the toString() information with the procedure details
258    * e.g. className and parameters
259    * @param builder the string builder to use to append the proc specific information
260    */
261   protected void toStringClassDetails(StringBuilder builder) {
262     builder.append(getClass().getName());
263   }
264 
265   /**
266    * @return the serialized result if any, otherwise null
267    */
268   public byte[] getResult() {
269     return result;
270   }
271 
272   /**
273    * The procedure may leave a "result" on completion.
274    * @param result the serialized result that will be passed to the client
275    */
276   protected void setResult(final byte[] result) {
277     this.result = result;
278   }
279 
280   public long getProcId() {
281     return procId;
282   }
283 
284   public boolean hasParent() {
285     return parentProcId != null;
286   }
287 
288   public boolean hasException() {
289     return exception != null;
290   }
291 
292   public boolean hasTimeout() {
293     return timeout != null;
294   }
295 
296   public long getParentProcId() {
297     return parentProcId;
298   }
299 
300   public NonceKey getNonceKey() {
301     return nonceKey;
302   }
303 
304   /**
305    * @return true if the procedure has failed.
306    *         true may mean failed but not yet rolledback or failed and rolledback.
307    */
308   public synchronized boolean isFailed() {
309     return exception != null || state == ProcedureState.ROLLEDBACK;
310   }
311 
312   /**
313    * @return true if the procedure is finished successfully.
314    */
315   public synchronized boolean isSuccess() {
316     return state == ProcedureState.FINISHED && exception == null;
317   }
318 
319   /**
320    * @return true if the procedure is finished. The Procedure may be completed
321    *         successfuly or failed and rolledback.
322    */
323   public synchronized boolean isFinished() {
324     switch (state) {
325       case ROLLEDBACK:
326         return true;
327       case FINISHED:
328         return exception == null;
329       default:
330         break;
331     }
332     return false;
333   }
334 
335   /**
336    * @return true if the procedure is waiting for a child to finish or for an external event.
337    */
338   public synchronized boolean isWaiting() {
339     switch (state) {
340       case WAITING:
341       case WAITING_TIMEOUT:
342         return true;
343       default:
344         break;
345     }
346     return false;
347   }
348 
349   public synchronized RemoteProcedureException getException() {
350     return exception;
351   }
352 
353   public long getStartTime() {
354     return startTime;
355   }
356 
357   public synchronized long getLastUpdate() {
358     return lastUpdate;
359   }
360 
361   public synchronized long elapsedTime() {
362     return lastUpdate - startTime;
363   }
364 
365   /**
366    * @param timeout timeout in msec
367    */
368   protected void setTimeout(final int timeout) {
369     this.timeout = timeout;
370   }
371 
372   /**
373    * @return the timeout in msec
374    */
375   public int getTimeout() {
376     return timeout;
377   }
378 
379   /**
380    * @return the remaining time before the timeout
381    */
382   public long getTimeRemaining() {
383     return Math.max(0, timeout - (EnvironmentEdgeManager.currentTime() - startTime));
384   }
385 
386   /**
387    * @param owner the owner passed in
388    */
389   @VisibleForTesting
390   @InterfaceAudience.Private
391   public void setOwner(final String owner) {
392     this.owner = StringUtils.isEmpty(owner) ? null : owner;
393   }
394 
395   public String getOwner() {
396     return owner;
397   }
398 
399   public boolean hasOwner() {
400     return owner != null;
401   }
402 
403   /**
404    * @param state current procedure state
405    */
406   @VisibleForTesting
407   @InterfaceAudience.Private
408   protected synchronized void setState(final ProcedureState state) {
409     this.state = state;
410     updateTimestamp();
411   }
412 
413   @InterfaceAudience.Private
414   protected synchronized ProcedureState getState() {
415     return state;
416   }
417 
418   /**
419    * @param source exception string
420    * @param cause the cause of failure
421    */
422   protected void setFailure(final String source, final Throwable cause) {
423     setFailure(new RemoteProcedureException(source, cause));
424   }
425 
426   /**
427    * @param exception exception thrown
428    */
429   protected synchronized void setFailure(final RemoteProcedureException exception) {
430     this.exception = exception;
431     if (!isFinished()) {
432       setState(ProcedureState.FINISHED);
433     }
434   }
435 
436   /**
437    * @param source exception string
438    * @param msg message to pass on
439    */
440   protected void setAbortFailure(final String source, final String msg) {
441     setFailure(source, new ProcedureAbortedException(msg));
442   }
443 
444   @InterfaceAudience.Private
445   protected synchronized boolean setTimeoutFailure() {
446     if (state == ProcedureState.WAITING_TIMEOUT) {
447       long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate;
448       setFailure("ProcedureExecutor", new TimeoutIOException(
449         "Operation timed out after " + StringUtils.humanTimeDiff(timeDiff)));
450       return true;
451     }
452     return false;
453   }
454 
455   /**
456    * Called by the ProcedureExecutor to assign the ID to the newly created procedure.
457    */
458   @VisibleForTesting
459   @InterfaceAudience.Private
460   protected void setProcId(final long procId) {
461     this.procId = procId;
462     this.startTime = EnvironmentEdgeManager.currentTime();
463     setState(ProcedureState.RUNNABLE);
464   }
465 
466   /**
467    * Called by the ProcedureExecutor to assign the parent to the newly created procedure.
468    *
469    * @param parentProcId parent procedure Id
470    */
471   @InterfaceAudience.Private
472   protected void setParentProcId(final long parentProcId) {
473     this.parentProcId = parentProcId;
474   }
475 
476   /**
477    * Called by the ProcedureExecutor to set the value to the newly created procedure.
478    *
479    * @param nonceKey the key to detect duplicate call
480    */
481   @VisibleForTesting
482   @InterfaceAudience.Private
483   protected void setNonceKey(final NonceKey nonceKey) {
484     this.nonceKey = nonceKey;
485   }
486 
487   /**
488    * Internal method called by the ProcedureExecutor that starts the
489    * user-level code execute().
490    * @param env the environment passed to the ProcedureExecutor
491    * @return a set of sub-procedures or null if there is nothing else to execute.
492    * @throws ProcedureYieldException the procedure will be added back to the queue and retried later
493    * @throws InterruptedException
494    */
495   @InterfaceAudience.Private
496   protected Procedure[] doExecute(final TEnvironment env)
497       throws ProcedureYieldException, InterruptedException {
498     try {
499       updateTimestamp();
500       return execute(env);
501     } finally {
502       updateTimestamp();
503     }
504   }
505 
506   /**
507    * Internal method called by the ProcedureExecutor that starts the
508    * user-level code rollback().
509    * @param env the environment passed to the ProcedureExecutor
510    */
511   @InterfaceAudience.Private
512   protected void doRollback(final TEnvironment env) throws IOException {
513     try {
514       updateTimestamp();
515       rollback(env);
516     } finally {
517       updateTimestamp();
518     }
519   }
520 
521   /**
522    * Called on store load to initialize the Procedure internals after
523    * the creation/deserialization.
524    * @param startTime procedure start time
525    */
526   @InterfaceAudience.Private
527   protected void setStartTime(final long startTime) {
528     this.startTime = startTime;
529   }
530 
531   /**
532    * Called on store load to initialize the Procedure internals after
533    * the creation/deserialization.
534    * @param lastUpdate last time to update procedure
535    */
536   private synchronized void setLastUpdate(final long lastUpdate) {
537     this.lastUpdate = lastUpdate;
538   }
539 
540   protected synchronized void updateTimestamp() {
541     this.lastUpdate = EnvironmentEdgeManager.currentTime();
542   }
543 
544   /**
545    * Called by the ProcedureExecutor on procedure-load to restore the latch state
546    * @param numChildren children count
547    */
548   @InterfaceAudience.Private
549   protected synchronized void setChildrenLatch(final int numChildren) {
550     this.childrenLatch = numChildren;
551   }
552 
553   /**
554    * Called by the ProcedureExecutor on procedure-load to restore the latch state
555    */
556   @InterfaceAudience.Private
557   protected synchronized void incChildrenLatch() {
558     // TODO: can this be inferred from the stack? I think so...
559     this.childrenLatch++;
560   }
561 
562   /**
563    * Called by the ProcedureExecutor to notify that one of the sub-procedures
564    * has completed.
565    */
566   @InterfaceAudience.Private
567   protected synchronized boolean childrenCountDown() {
568     assert childrenLatch > 0;
569     return --childrenLatch == 0;
570   }
571 
572   /**
573    * Called by the RootProcedureState on procedure execution.
574    * Each procedure store its stack-index positions.
575    * @param index the place where procedure is in
576    */
577   @InterfaceAudience.Private
578   protected synchronized void addStackIndex(final int index) {
579     if (stackIndexes == null) {
580       stackIndexes = new int[] { index };
581     } else {
582       int count = stackIndexes.length;
583       stackIndexes = Arrays.copyOf(stackIndexes, count + 1);
584       stackIndexes[count] = index;
585     }
586   }
587 
588   @InterfaceAudience.Private
589   protected synchronized boolean removeStackIndex() {
590     if (stackIndexes.length > 1) {
591       stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1);
592       return false;
593     } else {
594       stackIndexes = null;
595       return true;
596     }
597   }
598 
599   /**
600    * Called on store load to initialize the Procedure internals after
601    * the creation/deserialization.
602    * @param stackIndexes the list of positions of procedures
603    */
604   @InterfaceAudience.Private
605   protected synchronized void setStackIndexes(final List<Integer> stackIndexes) {
606     this.stackIndexes = new int[stackIndexes.size()];
607     for (int i = 0; i < this.stackIndexes.length; ++i) {
608       this.stackIndexes[i] = stackIndexes.get(i);
609     }
610   }
611 
612   @InterfaceAudience.Private
613   protected synchronized boolean wasExecuted() {
614     return stackIndexes != null;
615   }
616 
617   @InterfaceAudience.Private
618   protected synchronized int[] getStackIndexes() {
619     return stackIndexes;
620   }
621 
622   /**
623    * @param other the procedure to compare to
624    */
625   @Override
626   public int compareTo(final Procedure other) {
627     long diff = getProcId() - other.getProcId();
628     return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
629   }
630 
631   /*
632    * Helper to lookup the root Procedure ID given a specified procedure.
633    * @param procedures list of procedure
634    * @param the procedure to look for
635    */
636   @InterfaceAudience.Private
637   protected static Long getRootProcedureId(final Map<Long, Procedure> procedures, Procedure proc) {
638     while (proc.hasParent()) {
639       proc = procedures.get(proc.getParentProcId());
640       if (proc == null) return null;
641     }
642     return proc.getProcId();
643   }
644 
645   /*
646    * @param className procedure class name
647    * @throws IOException failure
648    */
649   protected static Procedure newInstance(final String className) throws IOException {
650     try {
651       Class<?> clazz = Class.forName(className);
652       if (!Modifier.isPublic(clazz.getModifiers())) {
653         throw new Exception("the " + clazz + " class is not public");
654       }
655 
656       Constructor<?> ctor = clazz.getConstructor();
657       assert ctor != null : "no constructor found";
658       if (!Modifier.isPublic(ctor.getModifiers())) {
659         throw new Exception("the " + clazz + " constructor is not public");
660       }
661       return (Procedure)ctor.newInstance();
662     } catch (Exception e) {
663       throw new IOException("The procedure class " + className +
664           " must be accessible and have an empty constructor", e);
665     }
666   }
667 
668   /*
669    * @param proc procedure
670    * @throws IOException failure
671    */
672   protected static void validateClass(final Procedure proc) throws IOException {
673     try {
674       Class<?> clazz = proc.getClass();
675       if (!Modifier.isPublic(clazz.getModifiers())) {
676         throw new Exception("the " + clazz + " class is not public");
677       }
678 
679       Constructor<?> ctor = clazz.getConstructor();
680       assert ctor != null;
681       if (!Modifier.isPublic(ctor.getModifiers())) {
682         throw new Exception("the " + clazz + " constructor is not public");
683       }
684     } catch (Exception e) {
685       throw new IOException("The procedure class " + proc.getClass().getName() +
686           " must be accessible and have an empty constructor", e);
687     }
688   }
689 
690   /**
691    * Helper to create the ProcedureInfo from Procedure.
692    * @param proc procedure
693    * @param nonceKey the key to detect duplicate call
694    */
695   @InterfaceAudience.Private
696   public static ProcedureInfo createProcedureInfo(final Procedure proc, final NonceKey nonceKey) {
697     RemoteProcedureException exception = proc.hasException() ? proc.getException() : null;
698     return new ProcedureInfo(
699       proc.getProcId(),
700       proc.toStringClass(),
701       proc.getOwner(),
702       proc.getState(),
703       proc.hasParent() ? proc.getParentProcId() : -1,
704       nonceKey,
705       exception != null ?
706           RemoteProcedureException.toProto(exception.getSource(), exception.getCause()) : null,
707       proc.getLastUpdate(),
708       proc.getStartTime(),
709       proc.getResult());
710   }
711 
712   /**
713    * Helper to convert the procedure to protobuf.
714    * Used by ProcedureStore implementations.
715    * @param proc procedure
716    */
717   @InterfaceAudience.Private
718   public static ProcedureProtos.Procedure convert(final Procedure proc)
719       throws IOException {
720     Preconditions.checkArgument(proc != null);
721     validateClass(proc);
722 
723     ProcedureProtos.Procedure.Builder builder = ProcedureProtos.Procedure.newBuilder()
724       .setClassName(proc.getClass().getName())
725       .setProcId(proc.getProcId())
726       .setState(proc.getState())
727       .setStartTime(proc.getStartTime())
728       .setLastUpdate(proc.getLastUpdate());
729 
730     if (proc.hasParent()) {
731       builder.setParentId(proc.getParentProcId());
732     }
733 
734     if (proc.hasTimeout()) {
735       builder.setTimeout(proc.getTimeout());
736     }
737 
738     if (proc.hasOwner()) {
739       builder.setOwner(proc.getOwner());
740     }
741 
742     int[] stackIds = proc.getStackIndexes();
743     if (stackIds != null) {
744       for (int i = 0; i < stackIds.length; ++i) {
745         builder.addStackId(stackIds[i]);
746       }
747     }
748 
749     if (proc.hasException()) {
750       RemoteProcedureException exception = proc.getException();
751       builder.setException(
752         RemoteProcedureException.toProto(exception.getSource(), exception.getCause()));
753     }
754 
755     byte[] result = proc.getResult();
756     if (result != null) {
757       builder.setResult(ByteStringer.wrap(result));
758     }
759 
760     ByteString.Output stateStream = ByteString.newOutput();
761     proc.serializeStateData(stateStream);
762     if (stateStream.size() > 0) {
763       builder.setStateData(stateStream.toByteString());
764     }
765 
766     if (proc.getNonceKey() != null) {
767       builder.setNonceGroup(proc.getNonceKey().getNonceGroup());
768       builder.setNonce(proc.getNonceKey().getNonce());
769     }
770 
771     return builder.build();
772   }
773 
774   /**
775    * Helper to convert the protobuf procedure.
776    * Used by ProcedureStore implementations.
777    *
778    * TODO: OPTIMIZATION: some of the field never change during the execution
779    *                     (e.g. className, procId, parentId, ...).
780    *                     We can split in 'data' and 'state', and the store
781    *                     may take advantage of it by storing the data only on insert().
782    *
783    * @param proto procedure protobuf
784    */
785   @InterfaceAudience.Private
786   public static Procedure convert(final ProcedureProtos.Procedure proto)
787       throws IOException {
788     // Procedure from class name
789     Procedure proc = Procedure.newInstance(proto.getClassName());
790 
791     // set fields
792     proc.setProcId(proto.getProcId());
793     proc.setState(proto.getState());
794     proc.setStartTime(proto.getStartTime());
795     proc.setLastUpdate(proto.getLastUpdate());
796 
797     if (proto.hasParentId()) {
798       proc.setParentProcId(proto.getParentId());
799     }
800 
801     if (proto.hasOwner()) {
802       proc.setOwner(proto.getOwner());
803     }
804 
805     if (proto.hasTimeout()) {
806       proc.setTimeout(proto.getTimeout());
807     }
808 
809     if (proto.getStackIdCount() > 0) {
810       proc.setStackIndexes(proto.getStackIdList());
811     }
812 
813     if (proto.hasException()) {
814       assert proc.getState() == ProcedureState.FINISHED ||
815              proc.getState() == ProcedureState.ROLLEDBACK :
816              "The procedure must be failed (waiting to rollback) or rolledback";
817       proc.setFailure(RemoteProcedureException.fromProto(proto.getException()));
818     }
819 
820     if (proto.hasResult()) {
821       proc.setResult(proto.getResult().toByteArray());
822     }
823 
824     if (proto.getNonce() != HConstants.NO_NONCE) {
825       NonceKey nonceKey = new NonceKey(proto.getNonceGroup(), proto.getNonce());
826       proc.setNonceKey(nonceKey);
827     }
828 
829     // we want to call deserialize even when the stream is empty, mainly for testing.
830     proc.deserializeStateData(proto.getStateData().newInput());
831 
832     return proc;
833   }
834 }