View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  
18  package org.apache.hadoop.hbase.util;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertNull;
22  import static org.junit.Assert.assertSame;
23  import static org.junit.Assert.assertTrue;
24  import static org.junit.Assert.assertFalse;
25  
26  import java.util.Comparator;
27  import java.util.PriorityQueue;
28  import java.util.concurrent.CyclicBarrier;
29  import java.util.concurrent.ExecutorService;
30  import java.util.concurrent.Executors;
31  import java.util.concurrent.TimeUnit;
32  
33  import org.apache.hadoop.hbase.testclassification.SmallTests;
34  
35  import org.junit.After;
36  import org.junit.Before;
37  import org.junit.Test;
38  import org.junit.experimental.categories.Category;
39  
40  @Category(SmallTests.class)
41  public class TestBoundedPriorityBlockingQueue {
42    private final static int CAPACITY = 16;
43  
44    class TestObject {
45      private final int priority;
46      private final int seqId;
47  
48      public TestObject(final int priority, final int seqId) {
49        this.priority = priority;
50        this.seqId = seqId;
51      }
52  
53      public int getSeqId() {
54        return this.seqId;
55      }
56  
57      public int getPriority() {
58        return this.priority;
59      }
60    }
61  
62    class TestObjectComparator implements Comparator<TestObject> {
63      public TestObjectComparator() {}
64  
65      @Override
66      public int compare(TestObject a, TestObject b) {
67        return a.getPriority() - b.getPriority();
68      }
69    }
70  
71    private BoundedPriorityBlockingQueue<TestObject> queue;
72  
73    @Before
74    public void setUp() throws Exception {
75      this.queue = new BoundedPriorityBlockingQueue<TestObject>(CAPACITY, new TestObjectComparator());
76    }
77  
78    @After
79    public void tearDown() throws Exception {
80    }
81  
82    @Test
83    public void tesAppend() throws Exception {
84      // Push
85      for (int i = 1; i <= CAPACITY; ++i) {
86        assertTrue(queue.offer(new TestObject(i, i)));
87        assertEquals(i, queue.size());
88        assertEquals(CAPACITY - i, queue.remainingCapacity());
89      }
90      assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS));
91  
92      // Pop
93      for (int i = 1; i <= CAPACITY; ++i) {
94        TestObject obj = queue.poll();
95        assertEquals(i, obj.getSeqId());
96        assertEquals(CAPACITY - i, queue.size());
97        assertEquals(i, queue.remainingCapacity());
98      }
99      assertEquals(null, queue.poll());
100   }
101 
102   @Test
103   public void tesAppendSamePriority() throws Exception {
104     // Push
105     for (int i = 1; i <= CAPACITY; ++i) {
106       assertTrue(queue.offer(new TestObject(0, i)));
107       assertEquals(i, queue.size());
108       assertEquals(CAPACITY - i, queue.remainingCapacity());
109     }
110     assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS));
111 
112     // Pop
113     for (int i = 1; i <= CAPACITY; ++i) {
114       TestObject obj = queue.poll();
115       assertEquals(i, obj.getSeqId());
116       assertEquals(CAPACITY - i, queue.size());
117       assertEquals(i, queue.remainingCapacity());
118     }
119     assertEquals(null, queue.poll());
120   }
121 
122   @Test
123   public void testPrepend() throws Exception {
124     // Push
125     for (int i = 1; i <= CAPACITY; ++i) {
126       assertTrue(queue.offer(new TestObject(CAPACITY - i, i)));
127       assertEquals(i, queue.size());
128       assertEquals(CAPACITY - i, queue.remainingCapacity());
129     }
130 
131     // Pop
132     for (int i = 1; i <= CAPACITY; ++i) {
133       TestObject obj = queue.poll();
134       assertEquals(CAPACITY - (i - 1), obj.getSeqId());
135       assertEquals(CAPACITY - i, queue.size());
136       assertEquals(i, queue.remainingCapacity());
137     }
138     assertEquals(null, queue.poll());
139   }
140 
141   @Test
142   public void testInsert() throws Exception {
143     // Push
144     for (int i = 1; i <= CAPACITY; i += 2) {
145       assertTrue(queue.offer(new TestObject(i, i)));
146       assertEquals((1 + i) / 2, queue.size());
147     }
148     for (int i = 2; i <= CAPACITY; i += 2) {
149       assertTrue(queue.offer(new TestObject(i, i)));
150       assertEquals(CAPACITY / 2 + (i / 2), queue.size());
151     }
152     assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS));
153 
154     // Pop
155     for (int i = 1; i <= CAPACITY; ++i) {
156       TestObject obj = queue.poll();
157       assertEquals(i, obj.getSeqId());
158       assertEquals(CAPACITY - i, queue.size());
159       assertEquals(i, queue.remainingCapacity());
160     }
161     assertEquals(null, queue.poll());
162   }
163 
164   @Test
165   public void testFifoSamePriority() throws Exception {
166     assertTrue(CAPACITY >= 6);
167     for (int i = 0; i < 6; ++i) {
168       assertTrue(queue.offer(new TestObject((1 + (i % 2)) * 10, i)));
169     }
170 
171     for (int i = 0; i < 6; i += 2) {
172       TestObject obj = queue.poll();
173       assertEquals(10, obj.getPriority());
174       assertEquals(i, obj.getSeqId());
175     }
176 
177     for (int i = 1; i < 6; i += 2) {
178       TestObject obj = queue.poll();
179       assertEquals(20, obj.getPriority());
180       assertEquals(i, obj.getSeqId());
181     }
182     assertEquals(null, queue.poll());
183   }
184 
185   @Test
186   public void testPoll() {
187     assertNull(queue.poll());
188     PriorityQueue<TestObject> testList = new PriorityQueue<TestObject>(CAPACITY, new TestObjectComparator());
189 
190     for (int i = 0; i < CAPACITY; ++i) {
191       TestObject obj = new TestObject(i, i);
192       testList.add(obj);
193       queue.offer(obj);
194     }
195 
196     for (int i = 0; i < CAPACITY; ++i) {
197       assertEquals(testList.poll(), queue.poll());
198     }
199 
200     assertNull(null, queue.poll());
201   }
202 
203   @Test(timeout=10000)
204   public void testPollInExecutor() throws InterruptedException {
205     final TestObject testObj = new TestObject(0, 0);
206 
207     final CyclicBarrier threadsStarted = new CyclicBarrier(2);
208     ExecutorService executor = Executors.newFixedThreadPool(2);
209     executor.execute(new Runnable() {
210       public void run() {
211         try {
212           assertNull(queue.poll(1000, TimeUnit.MILLISECONDS));
213           threadsStarted.await();
214           assertSame(testObj, queue.poll(1000, TimeUnit.MILLISECONDS));
215           assertTrue(queue.isEmpty());
216         } catch (Exception e) {
217           throw new RuntimeException(e);
218         }
219       }
220     });
221 
222     executor.execute(new Runnable() {
223       public void run() {
224         try {
225             threadsStarted.await();
226             queue.offer(testObj);
227         } catch (Exception e) {
228           throw new RuntimeException(e);
229         }
230       }
231     });
232 
233     executor.shutdown();
234     assertTrue(executor.awaitTermination(8000, TimeUnit.MILLISECONDS));
235   }
236 }