1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import static org.junit.Assert.assertEquals;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.Collection;
26 import java.util.List;
27 import java.util.concurrent.ThreadLocalRandom;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.FileSystem;
33 import org.apache.hadoop.hbase.CategoryBasedTimeout;
34 import org.apache.hadoop.hbase.Cell;
35 import org.apache.hadoop.hbase.HBaseTestingUtility;
36 import org.apache.hadoop.hbase.HConstants;
37 import org.apache.hadoop.hbase.client.Durability;
38 import org.apache.hadoop.hbase.client.Increment;
39 import org.apache.hadoop.hbase.client.Scan;
40 import org.apache.hadoop.hbase.client.TestIncrementsFromClientSide;
41 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
42 import org.apache.hadoop.hbase.testclassification.MediumTests;
43 import org.apache.hadoop.hbase.util.Bytes;
44 import org.apache.hadoop.hbase.wal.WAL;
45 import org.junit.After;
46 import org.junit.Rule;
47 import org.junit.Test;
48 import org.junit.experimental.categories.Category;
49 import org.junit.rules.TestName;
50 import org.junit.rules.TestRule;
51
52
53
54
55
56
57
58
59
60
61
62 @Category(MediumTests.class)
63 public class TestRegionIncrement {
64 private static final Log LOG = LogFactory.getLog(TestRegionIncrement.class);
65 @Rule public TestName name = new TestName();
66 @Rule public final TestRule timeout =
67 CategoryBasedTimeout.builder().withTimeout(this.getClass()).
68 withLookingForStuckThread(true).build();
69 private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
70 private final static byte [] INCREMENT_BYTES = Bytes.toBytes("increment");
71 private static final int THREAD_COUNT = 10;
72 private static final int INCREMENT_COUNT = 10000;
73 public static Collection<Object []> data() {
74 return Arrays.asList(new Object[] {Boolean.FALSE}, new Object [] {Boolean.TRUE});
75 }
76
77 @After
78 public void tearDown() throws Exception {
79 TEST_UTIL.cleanupTestDir();
80 }
81
82 private HRegion getRegion(final Configuration conf, final String tableName) throws IOException {
83 WAL wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDataTestDir(),
84 TEST_UTIL.getDataTestDir().toString(), conf);
85 return (HRegion)TEST_UTIL.createLocalHRegion(Bytes.toBytes(tableName),
86 HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, tableName, conf,
87 false, Durability.SKIP_WAL, wal, INCREMENT_BYTES);
88 }
89
90 private void closeRegion(final HRegion region) throws IOException {
91 region.close();
92 region.getWAL().close();
93 }
94
95
96
97
98 private static class SingleCellIncrementer extends Thread {
99 private final int count;
100 private final HRegion region;
101 private final Increment increment;
102
103 SingleCellIncrementer(final int i, final int count, final HRegion region,
104 final Increment increment) {
105 super("" + i);
106 setDaemon(true);
107 this.count = count;
108 this.region = region;
109 this.increment = increment;
110 }
111
112 @Override
113 public void run() {
114 for (int i = 0; i < this.count; i++) {
115 try {
116 this.region.increment(this.increment);
117
118 } catch (IOException e) {
119 throw new RuntimeException(e);
120 }
121 }
122 }
123 }
124
125
126
127
128 private static class CrossRowCellIncrementer extends Thread {
129 private final int count;
130 private final HRegion region;
131 private final Increment [] increments;
132
133 CrossRowCellIncrementer(final int i, final int count, final HRegion region, final int range) {
134 super("" + i);
135 setDaemon(true);
136 this.count = count;
137 this.region = region;
138 this.increments = new Increment[range];
139 for (int ii = 0; ii < range; ii++) {
140 this.increments[ii] = new Increment(Bytes.toBytes(i));
141 this.increments[ii].addColumn(INCREMENT_BYTES, INCREMENT_BYTES, 1);
142 }
143 }
144
145 @Override
146 public void run() {
147 for (int i = 0; i < this.count; i++) {
148 try {
149 int index = ThreadLocalRandom.current().nextInt(0, this.increments.length);
150 this.region.increment(this.increments[index]);
151
152 } catch (IOException e) {
153 throw new RuntimeException(e);
154 }
155 }
156 }
157 }
158
159
160
161
162
163
164 @Test
165 public void testUnContendedSingleCellIncrement()
166 throws IOException, InterruptedException {
167 final HRegion region = getRegion(TEST_UTIL.getConfiguration(),
168 TestIncrementsFromClientSide.filterStringSoTableNameSafe(this.name.getMethodName()));
169 long startTime = System.currentTimeMillis();
170 try {
171 SingleCellIncrementer [] threads = new SingleCellIncrementer[THREAD_COUNT];
172 for (int i = 0; i < threads.length; i++) {
173 byte [] rowBytes = Bytes.toBytes(i);
174 Increment increment = new Increment(rowBytes);
175 increment.addColumn(INCREMENT_BYTES, INCREMENT_BYTES, 1);
176 threads[i] = new SingleCellIncrementer(i, INCREMENT_COUNT, region, increment);
177 }
178 for (int i = 0; i < threads.length; i++) {
179 threads[i].start();
180 }
181 for (int i = 0; i < threads.length; i++) {
182 threads[i].join();
183 }
184 RegionScanner regionScanner = region.getScanner(new Scan());
185 List<Cell> cells = new ArrayList<Cell>(THREAD_COUNT);
186 while(regionScanner.next(cells)) continue;
187 assertEquals(THREAD_COUNT, cells.size());
188 long total = 0;
189 for (Cell cell: cells) total += Bytes.toLong(cell.getValue());
190 assertEquals(INCREMENT_COUNT * THREAD_COUNT, total);
191 } finally {
192 closeRegion(region);
193 LOG.info(this.name.getMethodName() + " " + (System.currentTimeMillis() - startTime) + "ms");
194 }
195 }
196
197
198
199
200
201
202
203 @Test
204 public void testContendedAcrossCellsIncrement()
205 throws IOException, InterruptedException {
206 final HRegion region = getRegion(TEST_UTIL.getConfiguration(),
207 TestIncrementsFromClientSide.filterStringSoTableNameSafe(this.name.getMethodName()));
208 long startTime = System.currentTimeMillis();
209 try {
210 CrossRowCellIncrementer [] threads = new CrossRowCellIncrementer[THREAD_COUNT];
211 for (int i = 0; i < threads.length; i++) {
212 threads[i] = new CrossRowCellIncrementer(i, INCREMENT_COUNT, region, THREAD_COUNT);
213 }
214 for (int i = 0; i < threads.length; i++) {
215 threads[i].start();
216 }
217 for (int i = 0; i < threads.length; i++) {
218 threads[i].join();
219 }
220 RegionScanner regionScanner = region.getScanner(new Scan());
221 List<Cell> cells = new ArrayList<Cell>(100);
222 while(regionScanner.next(cells)) continue;
223 assertEquals(THREAD_COUNT, cells.size());
224 long total = 0;
225 for (Cell cell: cells) total += Bytes.toLong(cell.getValue());
226 assertEquals(INCREMENT_COUNT * THREAD_COUNT, total);
227 } finally {
228 closeRegion(region);
229 LOG.info(this.name.getMethodName() + " " + (System.currentTimeMillis() - startTime) + "ms");
230 }
231 }
232 }