View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import java.util.Collection;
21  import java.util.Iterator;
22  import java.util.concurrent.BlockingQueue;
23  import java.util.concurrent.LinkedBlockingDeque;
24  import java.util.concurrent.LinkedBlockingQueue;
25  import java.util.concurrent.TimeUnit;
26  import java.util.concurrent.atomic.AtomicBoolean;
27  import java.util.concurrent.atomic.AtomicLong;
28  
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  
31  /**
32   * Adaptive LIFO blocking queue utilizing CoDel algorithm to prevent queue overloading.
33   *
34   * Implementing {@link BlockingQueue} interface to be compatible with {@link RpcExecutor}.
35   *
36   * Currently uses milliseconds internally, need to look into whether we should use
37   * nanoseconds for timeInterval and minDelay.
38   *
39   * @see <a href="http://queue.acm.org/detail.cfm?id=2839461">Fail at Scale paper</a>
40   *
41   * @see <a href="https://github.com/facebook/wangle/blob/master/wangle/concurrent/Codel.cpp">
42   *   CoDel version for generic job queues in Wangle library</a>
43   */
44  @InterfaceAudience.Private
45  public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
46  
47    // backing queue
48    private LinkedBlockingDeque<CallRunner> queue;
49  
50    // so we can calculate actual threshold to switch to LIFO under load
51    private int maxCapacity;
52  
53    // metrics (shared across all queues)
54    private AtomicLong numGeneralCallsDropped;
55    private AtomicLong numLifoModeSwitches;
56  
57    // Both are in milliseconds
58    private volatile int codelTargetDelay;
59    private volatile int codelInterval;
60  
61    // if queue if full more than that percent, we switch to LIFO mode.
62    // Values are in the range of 0.7, 0.8 etc (0-1.0).
63    private volatile double lifoThreshold;
64  
65    // minimal delay observed during the interval
66    private volatile long minDelay;
67  
68    // the moment when current interval ends
69    private volatile long intervalTime = System.currentTimeMillis();
70  
71    // switch to ensure only one threads does interval cutoffs
72    private AtomicBoolean resetDelay = new AtomicBoolean(true);
73  
74    // if we're in this mode, "long" calls are getting dropped
75    private AtomicBoolean isOverloaded = new AtomicBoolean(false);
76  
77    public AdaptiveLifoCoDelCallQueue(int capacity, int targetDelay, int interval,
78        double lifoThreshold, AtomicLong numGeneralCallsDropped, AtomicLong numLifoModeSwitches) {
79      this.maxCapacity = capacity;
80      this.queue = new LinkedBlockingDeque<>(capacity);
81      this.codelTargetDelay = targetDelay;
82      this.codelInterval = interval;
83      this.lifoThreshold = lifoThreshold;
84      this.numGeneralCallsDropped = numGeneralCallsDropped;
85      this.numLifoModeSwitches = numLifoModeSwitches;
86    }
87  
88    /**
89     * Update tunables.
90     *
91     * @param newCodelTargetDelay new CoDel target delay
92     * @param newCodelInterval new CoDel interval
93     * @param newLifoThreshold new Adaptive Lifo threshold
94     */
95    public void updateTunables(int newCodelTargetDelay, int newCodelInterval,
96                               double newLifoThreshold) {
97      this.codelTargetDelay = newCodelTargetDelay;
98      this.codelInterval = newCodelInterval;
99      this.lifoThreshold = newLifoThreshold;
100   }
101 
102   /**
103    * Behaves as {@link LinkedBlockingQueue#take()}, except it will silently
104    * skip all calls which it thinks should be dropped.
105    *
106    * @return the head of this queue
107    * @throws InterruptedException if interrupted while waiting
108    */
109   @Override
110   public CallRunner take() throws InterruptedException {
111     CallRunner cr;
112     while(true) {
113       if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
114         numLifoModeSwitches.incrementAndGet();
115         cr = queue.takeLast();
116       } else {
117         cr = queue.takeFirst();
118       }
119       if (needToDrop(cr)) {
120         numGeneralCallsDropped.incrementAndGet();
121         cr.drop();
122       } else {
123         return cr;
124       }
125     }
126   }
127 
128   @Override
129   public CallRunner poll() {
130     CallRunner cr;
131     boolean switched = false;
132     while(true) {
133       if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
134         // Only count once per switch.
135         if (!switched) {
136           switched = true;
137           numLifoModeSwitches.incrementAndGet();
138         }
139         cr = queue.pollLast();
140       } else {
141         switched = false;
142         cr = queue.pollFirst();
143       }
144       if (cr == null) {
145         return cr;
146       }
147       if (needToDrop(cr)) {
148         numGeneralCallsDropped.incrementAndGet();
149         cr.drop();
150       } else {
151         return cr;
152       }
153     }
154   }
155 
156   /**
157    * @param callRunner to validate
158    * @return true if this call needs to be skipped based on call timestamp
159    *   and internal queue state (deemed overloaded).
160    */
161   private boolean needToDrop(CallRunner callRunner) {
162     long now = System.currentTimeMillis();
163     long callDelay = now - callRunner.getCall().timestamp;
164 
165     long localMinDelay = this.minDelay;
166 
167     // Try and determine if we should reset
168     // the delay time and determine overload
169     if (now > intervalTime &&
170         !resetDelay.get() &&
171         !resetDelay.getAndSet(true)) {
172       intervalTime = now + codelInterval;
173 
174       isOverloaded.set(localMinDelay > codelTargetDelay);
175     }
176 
177     // If it looks like we should reset the delay
178     // time do it only once on one thread
179     if (resetDelay.get() && resetDelay.getAndSet(false)) {
180       minDelay = callDelay;
181       // we just reset the delay dunno about how this will work
182       return false;
183     } else if (callDelay < localMinDelay) {
184       minDelay = callDelay;
185     }
186 
187     return isOverloaded.get() && callDelay > 2 * codelTargetDelay;
188   }
189 
190   // Generic BlockingQueue methods we support
191   @Override
192   public boolean offer(CallRunner callRunner) {
193     return queue.offer(callRunner);
194   }
195 
196   @Override
197   public int size() {
198     return queue.size();
199   }
200 
201   @Override
202   public String toString() {
203     return queue.toString();
204   }
205 
206   // This class does NOT provide generic purpose BlockingQueue implementation,
207   // so to prevent misuse all other methods throw UnsupportedOperationException.
208 
209   @Override
210   public CallRunner poll(long timeout, TimeUnit unit) throws InterruptedException {
211     throw new UnsupportedOperationException("This class doesn't support anything,"
212       + " but take() and offer() methods");
213   }
214 
215 
216   @Override
217   public CallRunner peek() {
218     throw new UnsupportedOperationException("This class doesn't support anything,"
219       + " but take() and offer() methods");
220   }
221 
222   @Override
223   public boolean remove(Object o) {
224     throw new UnsupportedOperationException("This class doesn't support anything,"
225       + " but take() and offer() methods");
226   }
227 
228   @Override
229   public boolean contains(Object o) {
230     throw new UnsupportedOperationException("This class doesn't support anything,"
231       + " but take() and offer() methods");
232   }
233 
234   @Override
235   public Object[] toArray() {
236     throw new UnsupportedOperationException("This class doesn't support anything,"
237       + " but take() and offer() methods");
238   }
239 
240   @Override
241   public <T> T[] toArray(T[] a) {
242     throw new UnsupportedOperationException("This class doesn't support anything,"
243       + " but take() and offer() methods");
244   }
245 
246   @Override
247   public void clear() {
248     throw new UnsupportedOperationException("This class doesn't support anything,"
249       + " but take() and offer() methods");
250   }
251 
252   @Override
253   public int drainTo(Collection<? super CallRunner> c) {
254     throw new UnsupportedOperationException("This class doesn't support anything,"
255       + " but take() and offer() methods");
256   }
257 
258   @Override
259   public int drainTo(Collection<? super CallRunner> c, int maxElements) {
260     throw new UnsupportedOperationException("This class doesn't support anything,"
261       + " but take() and offer() methods");
262   }
263 
264   @Override
265   public Iterator<CallRunner> iterator() {
266     throw new UnsupportedOperationException("This class doesn't support anything,"
267       + " but take() and offer() methods");
268   }
269 
270   @Override
271   public boolean add(CallRunner callRunner) {
272     throw new UnsupportedOperationException("This class doesn't support anything,"
273       + " but take() and offer() methods");
274   }
275 
276   @Override
277   public CallRunner remove() {
278     throw new UnsupportedOperationException("This class doesn't support anything,"
279       + " but take() and offer() methods");
280   }
281 
282   @Override
283   public CallRunner element() {
284     throw new UnsupportedOperationException("This class doesn't support anything,"
285       + " but take() and offer() methods");
286   }
287 
288   @Override
289   public boolean addAll(Collection<? extends CallRunner> c) {
290     throw new UnsupportedOperationException("This class doesn't support anything,"
291       + " but take() and offer() methods");
292   }
293 
294   @Override
295   public boolean isEmpty() {
296     throw new UnsupportedOperationException("This class doesn't support anything,"
297       + " but take() and offer() methods");
298   }
299 
300   @Override
301   public boolean containsAll(Collection<?> c) {
302     throw new UnsupportedOperationException("This class doesn't support anything,"
303       + " but take() and offer() methods");
304   }
305 
306   @Override
307   public boolean removeAll(Collection<?> c) {
308     throw new UnsupportedOperationException("This class doesn't support anything,"
309       + " but take() and offer() methods");
310   }
311 
312   @Override
313   public boolean retainAll(Collection<?> c) {
314     throw new UnsupportedOperationException("This class doesn't support anything,"
315       + " but take() and offer() methods");
316   }
317 
318   @Override
319   public int remainingCapacity() {
320     throw new UnsupportedOperationException("This class doesn't support anything,"
321       + " but take() and offer() methods");
322   }
323 
324   @Override
325   public void put(CallRunner callRunner) throws InterruptedException {
326     throw new UnsupportedOperationException("This class doesn't support anything,"
327       + " but take() and offer() methods");
328   }
329 
330   @Override
331   public boolean offer(CallRunner callRunner, long timeout, TimeUnit unit)
332       throws InterruptedException {
333     throw new UnsupportedOperationException("This class doesn't support anything,"
334       + " but take() and offer() methods");
335   }
336 }