1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.io;
19
20 import static org.junit.Assert.assertEquals;
21
22 import java.nio.ByteBuffer;
23 import java.util.concurrent.ConcurrentLinkedDeque;
24
25 import org.apache.hadoop.hbase.testclassification.SmallTests;
26 import org.junit.After;
27 import org.junit.Before;
28 import org.junit.Test;
29 import org.junit.experimental.categories.Category;
30
31 @Category({ SmallTests.class })
32 public class TestBoundedByteBufferPool {
33 final int maxByteBufferSizeToCache = 10;
34 final int initialByteBufferSize = 1;
35 final int maxToCache = 10;
36 BoundedByteBufferPool reservoir;
37
38 @Before
39 public void before() {
40 this.reservoir =
41 new BoundedByteBufferPool(maxByteBufferSizeToCache, initialByteBufferSize, maxToCache);
42 }
43
44 @After
45 public void after() {
46 this.reservoir = null;
47 }
48
49 @Test
50 public void testEquivalence() {
51 ByteBuffer bb = ByteBuffer.allocate(1);
52 this.reservoir.putBuffer(bb);
53 this.reservoir.putBuffer(bb);
54 this.reservoir.putBuffer(bb);
55 assertEquals(3, this.reservoir.buffers.size());
56 }
57
58 @Test
59 public void testGetPut() {
60 ByteBuffer bb = this.reservoir.getBuffer();
61 assertEquals(initialByteBufferSize, bb.capacity());
62 assertEquals(0, this.reservoir.buffers.size());
63 this.reservoir.putBuffer(bb);
64 assertEquals(1, this.reservoir.buffers.size());
65
66 this.reservoir.getBuffer();
67 assertEquals(0, this.reservoir.buffers.size());
68
69
70
71 final int newCapacity = 2;
72 this.reservoir.putBuffer(ByteBuffer.allocate(newCapacity));
73 assertEquals(1, reservoir.buffers.size());
74 this.reservoir.getBuffer();
75 assertEquals(0, this.reservoir.buffers.size());
76 bb = this.reservoir.getBuffer();
77 assertEquals(newCapacity, bb.capacity());
78
79 assertEquals(0, this.reservoir.buffers.size());
80 this.reservoir.putBuffer(ByteBuffer.allocate(maxByteBufferSizeToCache * 2));
81 assertEquals(0, this.reservoir.buffers.size());
82
83 for (int i = 0; i < maxToCache; i++) {
84 this.reservoir.putBuffer(ByteBuffer.allocate(initialByteBufferSize));
85 }
86 assertEquals(maxToCache, this.reservoir.buffers.size());
87 }
88
89 @Test
90 public void testBufferSizeGrowWithMultiThread() throws Exception {
91 final ConcurrentLinkedDeque<ByteBuffer> bufferQueue = new ConcurrentLinkedDeque<ByteBuffer>();
92 int takeBufferThreadsCount = 30;
93 int putBufferThreadsCount = 1;
94 Thread takeBufferThreads[] = new Thread[takeBufferThreadsCount];
95 for (int i = 0; i < takeBufferThreadsCount; i++) {
96 takeBufferThreads[i] = new Thread(new Runnable() {
97 @Override
98 public void run() {
99 while (true) {
100 ByteBuffer buffer = reservoir.getBuffer();
101 try {
102 Thread.sleep(5);
103 } catch (InterruptedException e) {
104 break;
105 }
106 bufferQueue.offer(buffer);
107 if (Thread.currentThread().isInterrupted()) break;
108 }
109 }
110 });
111 }
112
113 Thread putBufferThread[] = new Thread[putBufferThreadsCount];
114 for (int i = 0; i < putBufferThreadsCount; i++) {
115 putBufferThread[i] = new Thread(new Runnable() {
116 @Override
117 public void run() {
118 while (true) {
119 ByteBuffer buffer = bufferQueue.poll();
120 if (buffer != null) {
121 reservoir.putBuffer(buffer);
122 }
123 if (Thread.currentThread().isInterrupted()) break;
124 }
125 }
126 });
127 }
128
129 for (int i = 0; i < takeBufferThreadsCount; i++) {
130 takeBufferThreads[i].start();
131 }
132 for (int i = 0; i < putBufferThreadsCount; i++) {
133 putBufferThread[i].start();
134 }
135 Thread.sleep(2 * 1000);
136 for (int i = 0; i < takeBufferThreadsCount; i++) {
137 takeBufferThreads[i].interrupt();
138 takeBufferThreads[i].join();
139 }
140 for (int i = 0; i < putBufferThreadsCount; i++) {
141 putBufferThread[i].interrupt();
142 putBufferThread[i].join();
143 }
144
145
146 assertEquals(initialByteBufferSize, this.reservoir.runningAverage);
147 }
148 }