1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import java.util.Deque;
21 import java.util.concurrent.BlockingQueue;
22 import java.util.concurrent.ConcurrentLinkedDeque;
23 import java.util.concurrent.LinkedBlockingQueue;
24 import java.util.concurrent.Semaphore;
25 import java.util.concurrent.atomic.AtomicLong;
26
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.Abortable;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30
31
32
33
34
35
36
37
38 @InterfaceAudience.Private
39 public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
40
41
42
43
44
45 private final Deque<FastPathHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>();
46
47 public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCount,
48 final int numQueues, final int maxQueueLength, final Configuration conf,
49 final Abortable abortable) {
50 super(name, handlerCount, numQueues, conf, abortable, LinkedBlockingQueue.class,
51 maxQueueLength);
52 }
53
54 public FastPathBalancedQueueRpcExecutor(String name, int handlerCount,
55 int numCallQueues,
56 Configuration conf,
57 Abortable abortable,
58 Class<? extends BlockingQueue> queueClass,
59 Object... args) {
60 super(name, handlerCount, numCallQueues, conf, abortable, queueClass, args);
61 }
62
63 @Override
64 protected Handler getHandler(String name, double handlerFailureThreshhold,
65 BlockingQueue<CallRunner> q) {
66 return new FastPathHandler(name, handlerFailureThreshhold, q, fastPathHandlerStack);
67 }
68
69 @Override
70 public boolean dispatch(CallRunner callTask) throws InterruptedException {
71 FastPathHandler handler = popReadyHandler();
72 return handler != null? handler.loadCallRunner(callTask): super.dispatch(callTask);
73 }
74
75
76
77
78 private FastPathHandler popReadyHandler() {
79 return this.fastPathHandlerStack.poll();
80 }
81
82 class FastPathHandler extends Handler {
83
84
85 final Deque<FastPathHandler> fastPathHandlerStack;
86
87 private Semaphore semaphore = new Semaphore(0);
88
89 private CallRunner loadedCallRunner;
90
91 FastPathHandler(String name, double handlerFailureThreshhold, BlockingQueue<CallRunner> q,
92 final Deque<FastPathHandler> fastPathHandlerStack) {
93 super(name, handlerFailureThreshhold, q);
94 this.fastPathHandlerStack = fastPathHandlerStack;
95 }
96
97 protected CallRunner getCallRunner() throws InterruptedException {
98
99 CallRunner cr = this.q.poll();
100 if (cr == null) {
101
102
103 if (this.fastPathHandlerStack != null) {
104 this.fastPathHandlerStack.push(this);
105 this.semaphore.acquire();
106 cr = this.loadedCallRunner;
107 this.loadedCallRunner = null;
108 } else {
109
110 cr = super.getCallRunner();
111 }
112 }
113 return cr;
114 }
115
116
117
118
119
120 boolean loadCallRunner(final CallRunner cr) {
121 this.loadedCallRunner = cr;
122 this.semaphore.release();
123 return true;
124 }
125 }
126 }