View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  
18  package org.apache.hadoop.hbase.util;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertFalse;
22  import static org.junit.Assert.assertTrue;
23  
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.Random;
27  import java.util.concurrent.atomic.AtomicBoolean;
28  
29  import org.apache.hadoop.hbase.testclassification.SmallTests;
30  import org.junit.Before;
31  import org.junit.Test;
32  import org.junit.experimental.categories.Category;
33  
34  @Category(SmallTests.class)
35  public class TestBoundedConcurrentLinkedQueue {
36    private final static int CAPACITY = 16;
37  
38    private BoundedConcurrentLinkedQueue<Long> queue;
39  
40    @Before
41    public void setUp() throws Exception {
42      this.queue = new BoundedConcurrentLinkedQueue<Long>(CAPACITY);
43    }
44  
45    @Test
46    public void testOfferAndPoll() throws Exception {
47      // Offer
48      for (long i = 1; i <= CAPACITY; ++i) {
49        assertTrue(queue.offer(i));
50        assertEquals(i, queue.size());
51        assertEquals(CAPACITY - i, queue.remainingCapacity());
52      }
53      assertFalse(queue.offer(0L));
54  
55      // Poll
56      for (int i = 1; i <= CAPACITY; ++i) {
57        long l = queue.poll();
58        assertEquals(i, l);
59        assertEquals(CAPACITY - i, queue.size());
60        assertEquals(i, queue.remainingCapacity());
61      }
62      assertEquals(null, queue.poll());
63    }
64  
65    @Test
66    public void testDrain() throws Exception {
67      // Offer
68      for (long i = 1; i <= CAPACITY; ++i) {
69        assertTrue(queue.offer(i));
70        assertEquals(i, queue.size());
71        assertEquals(CAPACITY - i, queue.remainingCapacity());
72      }
73      assertFalse(queue.offer(0L));
74  
75      // Drain
76      List<Long> list = new ArrayList<Long>();
77      queue.drainTo(list);
78      assertEquals(null, queue.poll());
79      assertEquals(0, queue.size());
80      assertEquals(CAPACITY, queue.remainingCapacity());
81    }
82  
83    @Test
84    public void testClear() {
85      // Offer
86      for (long i = 1; i <= CAPACITY; ++i) {
87        assertTrue(queue.offer(i));
88        assertEquals(i, queue.size());
89        assertEquals(CAPACITY - i, queue.remainingCapacity());
90      }
91      assertFalse(queue.offer(0L));
92  
93      queue.clear();
94      assertEquals(null, queue.poll());
95      assertEquals(0, queue.size());
96      assertEquals(CAPACITY, queue.remainingCapacity());
97    }
98  
99    @Test
100   public void testMultiThread() throws InterruptedException {
101     int offerThreadCount = 10;
102     int pollThreadCount = 5;
103     int duration = 5000; // ms
104     final AtomicBoolean stop = new AtomicBoolean(false);
105     Thread[] offerThreads = new Thread[offerThreadCount];
106     for (int i = 0; i < offerThreadCount; i++) {
107       offerThreads[i] = new Thread("offer-thread-" + i) {
108 
109         @Override
110         public void run() {
111           Random rand = new Random();
112           while (!stop.get()) {
113             queue.offer(rand.nextLong());
114             try {
115               Thread.sleep(1);
116             } catch (InterruptedException e) {
117             }
118           }
119         }
120 
121       };
122     }
123     Thread[] pollThreads = new Thread[pollThreadCount];
124     for (int i = 0; i < pollThreadCount; i++) {
125       pollThreads[i] = new Thread("poll-thread-" + i) {
126 
127         @Override
128         public void run() {
129           while (!stop.get()) {
130             queue.poll();
131             try {
132               Thread.sleep(1);
133             } catch (InterruptedException e) {
134             }
135           }
136         }
137 
138       };
139     }
140     for (Thread t : offerThreads) {
141       t.start();
142     }
143     for (Thread t : pollThreads) {
144       t.start();
145     }
146     long startTime = System.currentTimeMillis();
147     while (System.currentTimeMillis() - startTime < duration) {
148       assertTrue(queue.size() <= CAPACITY);
149       Thread.yield();
150     }
151     stop.set(true);
152     for (Thread t : offerThreads) {
153       t.join();
154     }
155     for (Thread t : pollThreads) {
156       t.join();
157     }
158     assertTrue(queue.size() <= CAPACITY);
159   }
160 }