View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.util.Comparator;
22  import java.util.concurrent.BlockingQueue;
23  import java.util.concurrent.PriorityBlockingQueue;
24  import java.util.concurrent.TimeUnit;
25  import java.util.concurrent.locks.Condition;
26  import java.util.concurrent.locks.Lock;
27  import java.util.concurrent.locks.ReentrantLock;
28  
29  /**
30   * This queue allows a ThreadPoolExecutor to steal jobs from another ThreadPoolExecutor.
31   * This queue also acts as the factory for creating the PriorityBlockingQueue to be used in the
32   * steal-from ThreadPoolExecutor. The behavior of this queue is the same as a normal
33   * PriorityBlockingQueue except the take/poll(long,TimeUnit) methods would also check whether there
34   * are jobs in the steal-from queue if this q ueue is empty.
35   *
36   * Note the workers in ThreadPoolExecutor must be pre-started so that they can steal job from the
37   * other queue, otherwise the worker will only be started after there are jobs submitted to main
38   * queue.
39   */
40  public class StealJobQueue<T> extends PriorityBlockingQueue<T> {
41  
42    private static final long serialVersionUID = -6334572230936888291L;
43  
44    private BlockingQueue<T> stealFromQueue;
45  
46    private final Lock lock = new ReentrantLock();
47    private final transient Condition notEmpty = lock.newCondition();
48  
49    public StealJobQueue(Comparator<? super T> comparator) {
50      this.stealFromQueue = new PriorityBlockingQueue<T>(11, comparator) {
51  
52        private static final long serialVersionUID = -7070010365201826904L;
53  
54        @Override
55        public boolean offer(T t) {
56          lock.lock();
57          try {
58            notEmpty.signal();
59            return super.offer(t);
60          } finally {
61            lock.unlock();
62          }
63        }
64      };
65    }
66  
67    public StealJobQueue(int initCapacity, int stealFromQueueInitCapacity,
68        Comparator<? super T> comparator) {
69      super(initCapacity, comparator);
70      this.stealFromQueue = new PriorityBlockingQueue<T>(stealFromQueueInitCapacity, comparator) {
71  
72        private static final long serialVersionUID = -6805567216580184701L;
73  
74        @Override
75        public boolean offer(T t) {
76          lock.lock();
77          try {
78            notEmpty.signal();
79            return super.offer(t);
80          } finally {
81            lock.unlock();
82          }
83        }
84      };
85    }
86  
87    /**
88     * Get a queue whose job might be stolen by the consumer of this original queue
89     * @return the queue whose job could be stolen
90     */
91    public BlockingQueue<T> getStealFromQueue() {
92      return stealFromQueue;
93    }
94  
95    @Override
96    public boolean offer(T t) {
97      lock.lock();
98      try {
99        notEmpty.signal();
100       return super.offer(t);
101     } finally {
102       lock.unlock();
103     }
104   }
105 
106 
107   @Override
108   public T take() throws InterruptedException {
109     lock.lockInterruptibly();
110     try {
111       while (true) {
112         T retVal = this.poll();
113         if (retVal == null) {
114           retVal = stealFromQueue.poll();
115         }
116         if (retVal == null) {
117           notEmpty.await();
118         } else {
119           return retVal;
120         }
121       }
122     } finally {
123       lock.unlock();
124     }
125   }
126 
127   @Override
128   public T poll(long timeout, TimeUnit unit) throws InterruptedException {
129     long nanos = unit.toNanos(timeout);
130     lock.lockInterruptibly();
131     try {
132       while (true) {
133         T retVal = this.poll();
134         if (retVal == null) {
135           retVal = stealFromQueue.poll();
136         }
137         if (retVal == null) {
138           if (nanos <= 0)
139             return null;
140           nanos = notEmpty.awaitNanos(nanos);
141         } else {
142           return retVal;
143         }
144       }
145     } finally {
146       lock.unlock();
147     }
148   }
149 }
150