1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.util;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertTrue;
23
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.Random;
27 import java.util.concurrent.atomic.AtomicBoolean;
28
29 import org.apache.hadoop.hbase.testclassification.SmallTests;
30 import org.junit.Before;
31 import org.junit.Test;
32 import org.junit.experimental.categories.Category;
33
34 @Category(SmallTests.class)
35 public class TestBoundedConcurrentLinkedQueue {
36 private final static int CAPACITY = 16;
37
38 private BoundedConcurrentLinkedQueue<Long> queue;
39
40 @Before
41 public void setUp() throws Exception {
42 this.queue = new BoundedConcurrentLinkedQueue<Long>(CAPACITY);
43 }
44
45 @Test
46 public void testOfferAndPoll() throws Exception {
47
48 for (long i = 1; i <= CAPACITY; ++i) {
49 assertTrue(queue.offer(i));
50 assertEquals(i, queue.size());
51 assertEquals(CAPACITY - i, queue.remainingCapacity());
52 }
53 assertFalse(queue.offer(0L));
54
55
56 for (int i = 1; i <= CAPACITY; ++i) {
57 long l = queue.poll();
58 assertEquals(i, l);
59 assertEquals(CAPACITY - i, queue.size());
60 assertEquals(i, queue.remainingCapacity());
61 }
62 assertEquals(null, queue.poll());
63 }
64
65 @Test
66 public void testDrain() throws Exception {
67
68 for (long i = 1; i <= CAPACITY; ++i) {
69 assertTrue(queue.offer(i));
70 assertEquals(i, queue.size());
71 assertEquals(CAPACITY - i, queue.remainingCapacity());
72 }
73 assertFalse(queue.offer(0L));
74
75
76 List<Long> list = new ArrayList<Long>();
77 queue.drainTo(list);
78 assertEquals(null, queue.poll());
79 assertEquals(0, queue.size());
80 assertEquals(CAPACITY, queue.remainingCapacity());
81 }
82
83 @Test
84 public void testClear() {
85
86 for (long i = 1; i <= CAPACITY; ++i) {
87 assertTrue(queue.offer(i));
88 assertEquals(i, queue.size());
89 assertEquals(CAPACITY - i, queue.remainingCapacity());
90 }
91 assertFalse(queue.offer(0L));
92
93 queue.clear();
94 assertEquals(null, queue.poll());
95 assertEquals(0, queue.size());
96 assertEquals(CAPACITY, queue.remainingCapacity());
97 }
98
99 @Test
100 public void testMultiThread() throws InterruptedException {
101 int offerThreadCount = 10;
102 int pollThreadCount = 5;
103 int duration = 5000;
104 final AtomicBoolean stop = new AtomicBoolean(false);
105 Thread[] offerThreads = new Thread[offerThreadCount];
106 for (int i = 0; i < offerThreadCount; i++) {
107 offerThreads[i] = new Thread("offer-thread-" + i) {
108
109 @Override
110 public void run() {
111 Random rand = new Random();
112 while (!stop.get()) {
113 queue.offer(rand.nextLong());
114 try {
115 Thread.sleep(1);
116 } catch (InterruptedException e) {
117 }
118 }
119 }
120
121 };
122 }
123 Thread[] pollThreads = new Thread[pollThreadCount];
124 for (int i = 0; i < pollThreadCount; i++) {
125 pollThreads[i] = new Thread("poll-thread-" + i) {
126
127 @Override
128 public void run() {
129 while (!stop.get()) {
130 queue.poll();
131 try {
132 Thread.sleep(1);
133 } catch (InterruptedException e) {
134 }
135 }
136 }
137
138 };
139 }
140 for (Thread t : offerThreads) {
141 t.start();
142 }
143 for (Thread t : pollThreads) {
144 t.start();
145 }
146 long startTime = System.currentTimeMillis();
147 while (System.currentTimeMillis() - startTime < duration) {
148 assertTrue(queue.size() <= CAPACITY);
149 Thread.yield();
150 }
151 stop.set(true);
152 for (Thread t : offerThreads) {
153 t.join();
154 }
155 for (Thread t : pollThreads) {
156 t.join();
157 }
158 assertTrue(queue.size() <= CAPACITY);
159 }
160 }