1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21 import java.util.Comparator;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.PriorityBlockingQueue;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.locks.Condition;
26 import java.util.concurrent.locks.Lock;
27 import java.util.concurrent.locks.ReentrantLock;
28
29
30
31
32
33
34
35
36
37
38
39
40 public class StealJobQueue<T> extends PriorityBlockingQueue<T> {
41
42 private static final long serialVersionUID = -6334572230936888291L;
43
44 private BlockingQueue<T> stealFromQueue;
45
46 private final Lock lock = new ReentrantLock();
47 private final transient Condition notEmpty = lock.newCondition();
48
49 public StealJobQueue(Comparator<? super T> comparator) {
50 this.stealFromQueue = new PriorityBlockingQueue<T>(11, comparator) {
51
52 private static final long serialVersionUID = -7070010365201826904L;
53
54 @Override
55 public boolean offer(T t) {
56 lock.lock();
57 try {
58 notEmpty.signal();
59 return super.offer(t);
60 } finally {
61 lock.unlock();
62 }
63 }
64 };
65 }
66
67 public StealJobQueue(int initCapacity, int stealFromQueueInitCapacity,
68 Comparator<? super T> comparator) {
69 super(initCapacity, comparator);
70 this.stealFromQueue = new PriorityBlockingQueue<T>(stealFromQueueInitCapacity, comparator) {
71
72 private static final long serialVersionUID = -6805567216580184701L;
73
74 @Override
75 public boolean offer(T t) {
76 lock.lock();
77 try {
78 notEmpty.signal();
79 return super.offer(t);
80 } finally {
81 lock.unlock();
82 }
83 }
84 };
85 }
86
87
88
89
90
91 public BlockingQueue<T> getStealFromQueue() {
92 return stealFromQueue;
93 }
94
95 @Override
96 public boolean offer(T t) {
97 lock.lock();
98 try {
99 notEmpty.signal();
100 return super.offer(t);
101 } finally {
102 lock.unlock();
103 }
104 }
105
106
107 @Override
108 public T take() throws InterruptedException {
109 lock.lockInterruptibly();
110 try {
111 while (true) {
112 T retVal = this.poll();
113 if (retVal == null) {
114 retVal = stealFromQueue.poll();
115 }
116 if (retVal == null) {
117 notEmpty.await();
118 } else {
119 return retVal;
120 }
121 }
122 } finally {
123 lock.unlock();
124 }
125 }
126
127 @Override
128 public T poll(long timeout, TimeUnit unit) throws InterruptedException {
129 long nanos = unit.toNanos(timeout);
130 lock.lockInterruptibly();
131 try {
132 while (true) {
133 T retVal = this.poll();
134 if (retVal == null) {
135 retVal = stealFromQueue.poll();
136 }
137 if (retVal == null) {
138 if (nanos <= 0)
139 return null;
140 nanos = notEmpty.awaitNanos(nanos);
141 } else {
142 return retVal;
143 }
144 }
145 } finally {
146 lock.unlock();
147 }
148 }
149 }
150