1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import java.util.Collection;
21 import java.util.Iterator;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.LinkedBlockingDeque;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import java.util.concurrent.atomic.AtomicLong;
28
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 @InterfaceAudience.Private
45 public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
46
47
48 private LinkedBlockingDeque<CallRunner> queue;
49
50
51 private int maxCapacity;
52
53
54 private AtomicLong numGeneralCallsDropped;
55 private AtomicLong numLifoModeSwitches;
56
57
58 private volatile int codelTargetDelay;
59 private volatile int codelInterval;
60
61
62
63 private volatile double lifoThreshold;
64
65
66 private volatile long minDelay;
67
68
69 private volatile long intervalTime = System.currentTimeMillis();
70
71
72 private AtomicBoolean resetDelay = new AtomicBoolean(true);
73
74
75 private AtomicBoolean isOverloaded = new AtomicBoolean(false);
76
77 public AdaptiveLifoCoDelCallQueue(int capacity, int targetDelay, int interval,
78 double lifoThreshold, AtomicLong numGeneralCallsDropped, AtomicLong numLifoModeSwitches) {
79 this.maxCapacity = capacity;
80 this.queue = new LinkedBlockingDeque<>(capacity);
81 this.codelTargetDelay = targetDelay;
82 this.codelInterval = interval;
83 this.lifoThreshold = lifoThreshold;
84 this.numGeneralCallsDropped = numGeneralCallsDropped;
85 this.numLifoModeSwitches = numLifoModeSwitches;
86 }
87
88
89
90
91
92
93
94
95 public void updateTunables(int newCodelTargetDelay, int newCodelInterval,
96 double newLifoThreshold) {
97 this.codelTargetDelay = newCodelTargetDelay;
98 this.codelInterval = newCodelInterval;
99 this.lifoThreshold = newLifoThreshold;
100 }
101
102
103
104
105
106
107
108
109 @Override
110 public CallRunner take() throws InterruptedException {
111 CallRunner cr;
112 while(true) {
113 if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
114 numLifoModeSwitches.incrementAndGet();
115 cr = queue.takeLast();
116 } else {
117 cr = queue.takeFirst();
118 }
119 if (needToDrop(cr)) {
120 numGeneralCallsDropped.incrementAndGet();
121 cr.drop();
122 } else {
123 return cr;
124 }
125 }
126 }
127
128 @Override
129 public CallRunner poll() {
130 CallRunner cr;
131 boolean switched = false;
132 while(true) {
133 if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
134
135 if (!switched) {
136 switched = true;
137 numLifoModeSwitches.incrementAndGet();
138 }
139 cr = queue.pollLast();
140 } else {
141 switched = false;
142 cr = queue.pollFirst();
143 }
144 if (cr == null) {
145 return cr;
146 }
147 if (needToDrop(cr)) {
148 numGeneralCallsDropped.incrementAndGet();
149 cr.drop();
150 } else {
151 return cr;
152 }
153 }
154 }
155
156
157
158
159
160
161 private boolean needToDrop(CallRunner callRunner) {
162 long now = System.currentTimeMillis();
163 long callDelay = now - callRunner.getCall().timestamp;
164
165 long localMinDelay = this.minDelay;
166
167
168
169 if (now > intervalTime &&
170 !resetDelay.get() &&
171 !resetDelay.getAndSet(true)) {
172 intervalTime = now + codelInterval;
173
174 isOverloaded.set(localMinDelay > codelTargetDelay);
175 }
176
177
178
179 if (resetDelay.get() && resetDelay.getAndSet(false)) {
180 minDelay = callDelay;
181
182 return false;
183 } else if (callDelay < localMinDelay) {
184 minDelay = callDelay;
185 }
186
187 return isOverloaded.get() && callDelay > 2 * codelTargetDelay;
188 }
189
190
191 @Override
192 public boolean offer(CallRunner callRunner) {
193 return queue.offer(callRunner);
194 }
195
196 @Override
197 public int size() {
198 return queue.size();
199 }
200
201 @Override
202 public String toString() {
203 return queue.toString();
204 }
205
206
207
208
209 @Override
210 public CallRunner poll(long timeout, TimeUnit unit) throws InterruptedException {
211 throw new UnsupportedOperationException("This class doesn't support anything,"
212 + " but take() and offer() methods");
213 }
214
215
216 @Override
217 public CallRunner peek() {
218 throw new UnsupportedOperationException("This class doesn't support anything,"
219 + " but take() and offer() methods");
220 }
221
222 @Override
223 public boolean remove(Object o) {
224 throw new UnsupportedOperationException("This class doesn't support anything,"
225 + " but take() and offer() methods");
226 }
227
228 @Override
229 public boolean contains(Object o) {
230 throw new UnsupportedOperationException("This class doesn't support anything,"
231 + " but take() and offer() methods");
232 }
233
234 @Override
235 public Object[] toArray() {
236 throw new UnsupportedOperationException("This class doesn't support anything,"
237 + " but take() and offer() methods");
238 }
239
240 @Override
241 public <T> T[] toArray(T[] a) {
242 throw new UnsupportedOperationException("This class doesn't support anything,"
243 + " but take() and offer() methods");
244 }
245
246 @Override
247 public void clear() {
248 throw new UnsupportedOperationException("This class doesn't support anything,"
249 + " but take() and offer() methods");
250 }
251
252 @Override
253 public int drainTo(Collection<? super CallRunner> c) {
254 throw new UnsupportedOperationException("This class doesn't support anything,"
255 + " but take() and offer() methods");
256 }
257
258 @Override
259 public int drainTo(Collection<? super CallRunner> c, int maxElements) {
260 throw new UnsupportedOperationException("This class doesn't support anything,"
261 + " but take() and offer() methods");
262 }
263
264 @Override
265 public Iterator<CallRunner> iterator() {
266 throw new UnsupportedOperationException("This class doesn't support anything,"
267 + " but take() and offer() methods");
268 }
269
270 @Override
271 public boolean add(CallRunner callRunner) {
272 throw new UnsupportedOperationException("This class doesn't support anything,"
273 + " but take() and offer() methods");
274 }
275
276 @Override
277 public CallRunner remove() {
278 throw new UnsupportedOperationException("This class doesn't support anything,"
279 + " but take() and offer() methods");
280 }
281
282 @Override
283 public CallRunner element() {
284 throw new UnsupportedOperationException("This class doesn't support anything,"
285 + " but take() and offer() methods");
286 }
287
288 @Override
289 public boolean addAll(Collection<? extends CallRunner> c) {
290 throw new UnsupportedOperationException("This class doesn't support anything,"
291 + " but take() and offer() methods");
292 }
293
294 @Override
295 public boolean isEmpty() {
296 throw new UnsupportedOperationException("This class doesn't support anything,"
297 + " but take() and offer() methods");
298 }
299
300 @Override
301 public boolean containsAll(Collection<?> c) {
302 throw new UnsupportedOperationException("This class doesn't support anything,"
303 + " but take() and offer() methods");
304 }
305
306 @Override
307 public boolean removeAll(Collection<?> c) {
308 throw new UnsupportedOperationException("This class doesn't support anything,"
309 + " but take() and offer() methods");
310 }
311
312 @Override
313 public boolean retainAll(Collection<?> c) {
314 throw new UnsupportedOperationException("This class doesn't support anything,"
315 + " but take() and offer() methods");
316 }
317
318 @Override
319 public int remainingCapacity() {
320 throw new UnsupportedOperationException("This class doesn't support anything,"
321 + " but take() and offer() methods");
322 }
323
324 @Override
325 public void put(CallRunner callRunner) throws InterruptedException {
326 throw new UnsupportedOperationException("This class doesn't support anything,"
327 + " but take() and offer() methods");
328 }
329
330 @Override
331 public boolean offer(CallRunner callRunner, long timeout, TimeUnit unit)
332 throws InterruptedException {
333 throw new UnsupportedOperationException("This class doesn't support anything,"
334 + " but take() and offer() methods");
335 }
336 }