1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.lang.reflect.Constructor;
25 import java.lang.reflect.Modifier;
26 import java.util.Arrays;
27 import java.util.List;
28 import java.util.Map;
29
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.ProcedureInfo;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.hbase.classification.InterfaceStability;
34 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
35 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
36 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
37 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
38 import org.apache.hadoop.hbase.util.ByteStringer;
39 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
40 import org.apache.hadoop.hbase.util.NonceKey;
41
42 import com.google.common.annotations.VisibleForTesting;
43 import com.google.common.base.Preconditions;
44 import com.google.protobuf.ByteString;
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 @InterfaceAudience.Private
66 @InterfaceStability.Evolving
67 public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
68
69 private String owner = null;
70 private Long parentProcId = null;
71 private Long procId = null;
72 private long startTime;
73
74
75 private ProcedureState state = ProcedureState.INITIALIZING;
76 private Integer timeout = null;
77 private int[] stackIndexes = null;
78 private int childrenLatch = 0;
79 private long lastUpdate;
80
81 private RemoteProcedureException exception = null;
82 private byte[] result = null;
83
84 private NonceKey nonceKey = null;
85
86
87
88
89
90
91
92
93
94
95 protected abstract Procedure[] execute(TEnvironment env)
96 throws ProcedureYieldException, InterruptedException;
97
98
99
100
101
102
103
104
105
106
107
108
109 protected abstract void rollback(TEnvironment env)
110 throws IOException;
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126 protected abstract boolean abort(TEnvironment env);
127
128
129
130
131
132
133
134 protected abstract void serializeStateData(final OutputStream stream)
135 throws IOException;
136
137
138
139
140
141
142 protected abstract void deserializeStateData(final InputStream stream)
143 throws IOException;
144
145
146
147
148
149
150
151
152
153
154
155 protected boolean acquireLock(final TEnvironment env) {
156 return true;
157 }
158
159
160
161
162
163
164 protected void releaseLock(final TEnvironment env) {
165
166 }
167
168
169
170
171
172
173
174
175
176 protected void beforeReplay(final TEnvironment env) {
177
178 }
179
180
181
182
183
184
185
186
187 protected void completionCleanup(final TEnvironment env) {
188
189 }
190
191 @Override
192 public String toString() {
193
194 return toStringSimpleSB().toString();
195 }
196
197
198
199
200
201
202 protected StringBuilder toStringSimpleSB() {
203 StringBuilder sb = new StringBuilder();
204 toStringClassDetails(sb);
205
206 if (procId != null) {
207 sb.append(" id=");
208 sb.append(getProcId());
209 }
210
211 if (hasParent()) {
212 sb.append(" parent=");
213 sb.append(getParentProcId());
214 }
215
216 if (hasOwner()) {
217 sb.append(" owner=");
218 sb.append(getOwner());
219 }
220
221 sb.append(" state=");
222 sb.append(getState());
223
224 return sb;
225 }
226
227
228
229
230
231 public String toStringDetails() {
232 StringBuilder sb = toStringSimpleSB();
233
234 sb.append(" startTime=");
235 sb.append(getStartTime());
236
237 sb.append(" lastUpdate=");
238 sb.append(getLastUpdate());
239
240 if (stackIndexes != null) {
241 sb.append("\n");
242 sb.append("stackIndexes=");
243 sb.append(Arrays.toString(getStackIndexes()));
244 }
245
246 return sb.toString();
247 }
248
249 protected String toStringClass() {
250 StringBuilder sb = new StringBuilder();
251 toStringClassDetails(sb);
252
253 return sb.toString();
254 }
255
256
257
258
259
260
261 protected void toStringClassDetails(StringBuilder builder) {
262 builder.append(getClass().getName());
263 }
264
265
266
267
268 public byte[] getResult() {
269 return result;
270 }
271
272
273
274
275
276 protected void setResult(final byte[] result) {
277 this.result = result;
278 }
279
280 public long getProcId() {
281 return procId;
282 }
283
284 public boolean hasParent() {
285 return parentProcId != null;
286 }
287
288 public boolean hasException() {
289 return exception != null;
290 }
291
292 public boolean hasTimeout() {
293 return timeout != null;
294 }
295
296 public long getParentProcId() {
297 return parentProcId;
298 }
299
300 public NonceKey getNonceKey() {
301 return nonceKey;
302 }
303
304
305
306
307
308 public synchronized boolean isFailed() {
309 return exception != null || state == ProcedureState.ROLLEDBACK;
310 }
311
312
313
314
315 public synchronized boolean isSuccess() {
316 return state == ProcedureState.FINISHED && exception == null;
317 }
318
319
320
321
322
323 public synchronized boolean isFinished() {
324 switch (state) {
325 case ROLLEDBACK:
326 return true;
327 case FINISHED:
328 return exception == null;
329 default:
330 break;
331 }
332 return false;
333 }
334
335
336
337
338 public synchronized boolean isWaiting() {
339 switch (state) {
340 case WAITING:
341 case WAITING_TIMEOUT:
342 return true;
343 default:
344 break;
345 }
346 return false;
347 }
348
349 public synchronized RemoteProcedureException getException() {
350 return exception;
351 }
352
353 public long getStartTime() {
354 return startTime;
355 }
356
357 public synchronized long getLastUpdate() {
358 return lastUpdate;
359 }
360
361 public synchronized long elapsedTime() {
362 return lastUpdate - startTime;
363 }
364
365
366
367
368 protected void setTimeout(final int timeout) {
369 this.timeout = timeout;
370 }
371
372
373
374
375 public int getTimeout() {
376 return timeout;
377 }
378
379
380
381
382 public long getTimeRemaining() {
383 return Math.max(0, timeout - (EnvironmentEdgeManager.currentTime() - startTime));
384 }
385
386
387
388
389 @VisibleForTesting
390 @InterfaceAudience.Private
391 public void setOwner(final String owner) {
392 this.owner = StringUtils.isEmpty(owner) ? null : owner;
393 }
394
395 public String getOwner() {
396 return owner;
397 }
398
399 public boolean hasOwner() {
400 return owner != null;
401 }
402
403
404
405
406 @VisibleForTesting
407 @InterfaceAudience.Private
408 protected synchronized void setState(final ProcedureState state) {
409 this.state = state;
410 updateTimestamp();
411 }
412
413 @InterfaceAudience.Private
414 protected synchronized ProcedureState getState() {
415 return state;
416 }
417
418
419
420
421
422 protected void setFailure(final String source, final Throwable cause) {
423 setFailure(new RemoteProcedureException(source, cause));
424 }
425
426
427
428
429 protected synchronized void setFailure(final RemoteProcedureException exception) {
430 this.exception = exception;
431 if (!isFinished()) {
432 setState(ProcedureState.FINISHED);
433 }
434 }
435
436
437
438
439
440 protected void setAbortFailure(final String source, final String msg) {
441 setFailure(source, new ProcedureAbortedException(msg));
442 }
443
444 @InterfaceAudience.Private
445 protected synchronized boolean setTimeoutFailure() {
446 if (state == ProcedureState.WAITING_TIMEOUT) {
447 long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate;
448 setFailure("ProcedureExecutor", new TimeoutIOException(
449 "Operation timed out after " + StringUtils.humanTimeDiff(timeDiff)));
450 return true;
451 }
452 return false;
453 }
454
455
456
457
458 @VisibleForTesting
459 @InterfaceAudience.Private
460 protected void setProcId(final long procId) {
461 this.procId = procId;
462 this.startTime = EnvironmentEdgeManager.currentTime();
463 setState(ProcedureState.RUNNABLE);
464 }
465
466
467
468
469
470
471 @InterfaceAudience.Private
472 protected void setParentProcId(final long parentProcId) {
473 this.parentProcId = parentProcId;
474 }
475
476
477
478
479
480
481 @VisibleForTesting
482 @InterfaceAudience.Private
483 protected void setNonceKey(final NonceKey nonceKey) {
484 this.nonceKey = nonceKey;
485 }
486
487
488
489
490
491
492
493
494
495 @InterfaceAudience.Private
496 protected Procedure[] doExecute(final TEnvironment env)
497 throws ProcedureYieldException, InterruptedException {
498 try {
499 updateTimestamp();
500 return execute(env);
501 } finally {
502 updateTimestamp();
503 }
504 }
505
506
507
508
509
510
511 @InterfaceAudience.Private
512 protected void doRollback(final TEnvironment env) throws IOException {
513 try {
514 updateTimestamp();
515 rollback(env);
516 } finally {
517 updateTimestamp();
518 }
519 }
520
521
522
523
524
525
526 @InterfaceAudience.Private
527 protected void setStartTime(final long startTime) {
528 this.startTime = startTime;
529 }
530
531
532
533
534
535
536 private synchronized void setLastUpdate(final long lastUpdate) {
537 this.lastUpdate = lastUpdate;
538 }
539
540 protected synchronized void updateTimestamp() {
541 this.lastUpdate = EnvironmentEdgeManager.currentTime();
542 }
543
544
545
546
547
548 @InterfaceAudience.Private
549 protected synchronized void setChildrenLatch(final int numChildren) {
550 this.childrenLatch = numChildren;
551 }
552
553
554
555
556 @InterfaceAudience.Private
557 protected synchronized void incChildrenLatch() {
558
559 this.childrenLatch++;
560 }
561
562
563
564
565
566 @InterfaceAudience.Private
567 protected synchronized boolean childrenCountDown() {
568 assert childrenLatch > 0;
569 return --childrenLatch == 0;
570 }
571
572
573
574
575
576
577 @InterfaceAudience.Private
578 protected synchronized void addStackIndex(final int index) {
579 if (stackIndexes == null) {
580 stackIndexes = new int[] { index };
581 } else {
582 int count = stackIndexes.length;
583 stackIndexes = Arrays.copyOf(stackIndexes, count + 1);
584 stackIndexes[count] = index;
585 }
586 }
587
588 @InterfaceAudience.Private
589 protected synchronized boolean removeStackIndex() {
590 if (stackIndexes.length > 1) {
591 stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1);
592 return false;
593 } else {
594 stackIndexes = null;
595 return true;
596 }
597 }
598
599
600
601
602
603
604 @InterfaceAudience.Private
605 protected synchronized void setStackIndexes(final List<Integer> stackIndexes) {
606 this.stackIndexes = new int[stackIndexes.size()];
607 for (int i = 0; i < this.stackIndexes.length; ++i) {
608 this.stackIndexes[i] = stackIndexes.get(i);
609 }
610 }
611
612 @InterfaceAudience.Private
613 protected synchronized boolean wasExecuted() {
614 return stackIndexes != null;
615 }
616
617 @InterfaceAudience.Private
618 protected synchronized int[] getStackIndexes() {
619 return stackIndexes;
620 }
621
622
623
624
625 @Override
626 public int compareTo(final Procedure other) {
627 long diff = getProcId() - other.getProcId();
628 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
629 }
630
631
632
633
634
635
636 @InterfaceAudience.Private
637 protected static Long getRootProcedureId(final Map<Long, Procedure> procedures, Procedure proc) {
638 while (proc.hasParent()) {
639 proc = procedures.get(proc.getParentProcId());
640 if (proc == null) return null;
641 }
642 return proc.getProcId();
643 }
644
645
646
647
648
649 protected static Procedure newInstance(final String className) throws IOException {
650 try {
651 Class<?> clazz = Class.forName(className);
652 if (!Modifier.isPublic(clazz.getModifiers())) {
653 throw new Exception("the " + clazz + " class is not public");
654 }
655
656 Constructor<?> ctor = clazz.getConstructor();
657 assert ctor != null : "no constructor found";
658 if (!Modifier.isPublic(ctor.getModifiers())) {
659 throw new Exception("the " + clazz + " constructor is not public");
660 }
661 return (Procedure)ctor.newInstance();
662 } catch (Exception e) {
663 throw new IOException("The procedure class " + className +
664 " must be accessible and have an empty constructor", e);
665 }
666 }
667
668
669
670
671
672 protected static void validateClass(final Procedure proc) throws IOException {
673 try {
674 Class<?> clazz = proc.getClass();
675 if (!Modifier.isPublic(clazz.getModifiers())) {
676 throw new Exception("the " + clazz + " class is not public");
677 }
678
679 Constructor<?> ctor = clazz.getConstructor();
680 assert ctor != null;
681 if (!Modifier.isPublic(ctor.getModifiers())) {
682 throw new Exception("the " + clazz + " constructor is not public");
683 }
684 } catch (Exception e) {
685 throw new IOException("The procedure class " + proc.getClass().getName() +
686 " must be accessible and have an empty constructor", e);
687 }
688 }
689
690
691
692
693
694
695 @InterfaceAudience.Private
696 public static ProcedureInfo createProcedureInfo(final Procedure proc, final NonceKey nonceKey) {
697 RemoteProcedureException exception = proc.hasException() ? proc.getException() : null;
698 return new ProcedureInfo(
699 proc.getProcId(),
700 proc.toStringClass(),
701 proc.getOwner(),
702 proc.getState(),
703 proc.hasParent() ? proc.getParentProcId() : -1,
704 nonceKey,
705 exception != null ?
706 RemoteProcedureException.toProto(exception.getSource(), exception.getCause()) : null,
707 proc.getLastUpdate(),
708 proc.getStartTime(),
709 proc.getResult());
710 }
711
712
713
714
715
716
717 @InterfaceAudience.Private
718 public static ProcedureProtos.Procedure convert(final Procedure proc)
719 throws IOException {
720 Preconditions.checkArgument(proc != null);
721 validateClass(proc);
722
723 ProcedureProtos.Procedure.Builder builder = ProcedureProtos.Procedure.newBuilder()
724 .setClassName(proc.getClass().getName())
725 .setProcId(proc.getProcId())
726 .setState(proc.getState())
727 .setStartTime(proc.getStartTime())
728 .setLastUpdate(proc.getLastUpdate());
729
730 if (proc.hasParent()) {
731 builder.setParentId(proc.getParentProcId());
732 }
733
734 if (proc.hasTimeout()) {
735 builder.setTimeout(proc.getTimeout());
736 }
737
738 if (proc.hasOwner()) {
739 builder.setOwner(proc.getOwner());
740 }
741
742 int[] stackIds = proc.getStackIndexes();
743 if (stackIds != null) {
744 for (int i = 0; i < stackIds.length; ++i) {
745 builder.addStackId(stackIds[i]);
746 }
747 }
748
749 if (proc.hasException()) {
750 RemoteProcedureException exception = proc.getException();
751 builder.setException(
752 RemoteProcedureException.toProto(exception.getSource(), exception.getCause()));
753 }
754
755 byte[] result = proc.getResult();
756 if (result != null) {
757 builder.setResult(ByteStringer.wrap(result));
758 }
759
760 ByteString.Output stateStream = ByteString.newOutput();
761 proc.serializeStateData(stateStream);
762 if (stateStream.size() > 0) {
763 builder.setStateData(stateStream.toByteString());
764 }
765
766 if (proc.getNonceKey() != null) {
767 builder.setNonceGroup(proc.getNonceKey().getNonceGroup());
768 builder.setNonce(proc.getNonceKey().getNonce());
769 }
770
771 return builder.build();
772 }
773
774
775
776
777
778
779
780
781
782
783
784
785 @InterfaceAudience.Private
786 public static Procedure convert(final ProcedureProtos.Procedure proto)
787 throws IOException {
788
789 Procedure proc = Procedure.newInstance(proto.getClassName());
790
791
792 proc.setProcId(proto.getProcId());
793 proc.setState(proto.getState());
794 proc.setStartTime(proto.getStartTime());
795 proc.setLastUpdate(proto.getLastUpdate());
796
797 if (proto.hasParentId()) {
798 proc.setParentProcId(proto.getParentId());
799 }
800
801 if (proto.hasOwner()) {
802 proc.setOwner(proto.getOwner());
803 }
804
805 if (proto.hasTimeout()) {
806 proc.setTimeout(proto.getTimeout());
807 }
808
809 if (proto.getStackIdCount() > 0) {
810 proc.setStackIndexes(proto.getStackIdList());
811 }
812
813 if (proto.hasException()) {
814 assert proc.getState() == ProcedureState.FINISHED ||
815 proc.getState() == ProcedureState.ROLLEDBACK :
816 "The procedure must be failed (waiting to rollback) or rolledback";
817 proc.setFailure(RemoteProcedureException.fromProto(proto.getException()));
818 }
819
820 if (proto.hasResult()) {
821 proc.setResult(proto.getResult().toByteArray());
822 }
823
824 if (proto.getNonce() != HConstants.NO_NONCE) {
825 NonceKey nonceKey = new NonceKey(proto.getNonceGroup(), proto.getNonce());
826 proc.setNonceKey(nonceKey);
827 }
828
829
830 proc.deserializeStateData(proto.getStateData().newInput());
831
832 return proc;
833 }
834 }