1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertTrue;
23
24 import java.io.IOException;
25 import java.util.Random;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.Cell;
30 import org.apache.hadoop.hbase.CellUtil;
31 import org.apache.hadoop.hbase.HBaseTestingUtility;
32 import org.apache.hadoop.hbase.HColumnDescriptor;
33 import org.apache.hadoop.hbase.HConstants;
34 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
35 import org.apache.hadoop.hbase.HRegionInfo;
36 import org.apache.hadoop.hbase.HTableDescriptor;
37 import org.apache.hadoop.hbase.testclassification.MediumTests;
38 import org.apache.hadoop.hbase.TableName;
39 import org.apache.hadoop.hbase.client.Get;
40 import org.apache.hadoop.hbase.client.Put;
41 import org.apache.hadoop.hbase.client.Result;
42 import org.apache.hadoop.hbase.util.Bytes;
43 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
44 import org.junit.After;
45 import org.junit.Before;
46 import org.junit.BeforeClass;
47 import org.junit.Rule;
48 import org.junit.Test;
49 import org.junit.experimental.categories.Category;
50 import org.junit.rules.TestName;
51
52
53
54
55
56
57 @Category(MediumTests.class)
58 public class TestParallelPut {
59 static final Log LOG = LogFactory.getLog(TestParallelPut.class);
60 @Rule public TestName name = new TestName();
61
62 private HRegion region = null;
63 private static HBaseTestingUtility HBTU = new HBaseTestingUtility();
64 private static final int THREADS100 = 100;
65
66
67 static byte[] tableName;
68 static final byte[] qual1 = Bytes.toBytes("qual1");
69 static final byte[] qual2 = Bytes.toBytes("qual2");
70 static final byte[] qual3 = Bytes.toBytes("qual3");
71 static final byte[] value1 = Bytes.toBytes("value1");
72 static final byte[] value2 = Bytes.toBytes("value2");
73 static final byte [] row = Bytes.toBytes("rowA");
74 static final byte [] row2 = Bytes.toBytes("rowB");
75
76 @BeforeClass
77 public static void beforeClass() {
78
79 HBTU.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREADS100);
80 }
81
82
83
84
85
86 @Before
87 public void setUp() throws Exception {
88 tableName = Bytes.toBytes(name.getMethodName());
89 }
90
91 @After
92 public void tearDown() throws Exception {
93 EnvironmentEdgeManagerTestHelper.reset();
94 if (region != null) region.close(true);
95 }
96
97 public String getName() {
98 return name.getMethodName();
99 }
100
101
102
103
104
105
106
107
108
109 @Test
110 public void testPut() throws IOException {
111 LOG.info("Starting testPut");
112 this.region = initHRegion(tableName, getName(), fam1);
113
114 long value = 1L;
115
116 Put put = new Put(row);
117 put.add(fam1, qual1, Bytes.toBytes(value));
118 region.put(put);
119
120 assertGet(this.region, row, fam1, qual1, Bytes.toBytes(value));
121 }
122
123
124
125
126 @Test
127 public void testParallelPuts() throws IOException {
128
129 LOG.info("Starting testParallelPuts");
130
131 this.region = initHRegion(tableName, getName(), fam1);
132 int numOps = 1000;
133
134
135 Putter[] all = new Putter[THREADS100];
136
137
138 for (int i = 0; i < THREADS100; i++) {
139 all[i] = new Putter(region, i, numOps);
140 }
141
142
143 for (int i = 0; i < THREADS100; i++) {
144 all[i].start();
145 }
146
147
148 for (int i = 0; i < THREADS100; i++) {
149 try {
150 all[i].join();
151 } catch (InterruptedException e) {
152 LOG.warn("testParallelPuts encountered InterruptedException." +
153 " Ignoring....", e);
154 }
155 }
156 LOG.info("testParallelPuts successfully verified " +
157 (numOps * THREADS100) + " put operations.");
158 }
159
160
161 private static void assertGet(final HRegion region, byte [] row, byte [] familiy,
162 byte[] qualifier, byte[] value) throws IOException {
163
164 Get get = new Get(row);
165 get.addColumn(familiy, qualifier);
166 Result result = region.get(get);
167 assertEquals(1, result.size());
168
169 Cell kv = result.rawCells()[0];
170 byte[] r = CellUtil.cloneValue(kv);
171 assertTrue(Bytes.compareTo(r, value) == 0);
172 }
173
174 private HRegion initHRegion(byte [] tableName, String callingMethod,
175 byte[] ... families)
176 throws IOException {
177 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
178 for(byte [] family : families) {
179 htd.addFamily(new HColumnDescriptor(family));
180 }
181 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
182 return HBTU.createLocalHRegion(info, htd);
183 }
184
185
186
187
188 public static class Putter extends Thread {
189
190 private final HRegion region;
191 private final int threadNumber;
192 private final int numOps;
193 private final Random rand = new Random();
194 byte [] rowkey = null;
195
196 public Putter(HRegion region, int threadNumber, int numOps) {
197 this.region = region;
198 this.threadNumber = threadNumber;
199 this.numOps = numOps;
200 this.rowkey = Bytes.toBytes((long)threadNumber);
201 setDaemon(true);
202 }
203
204 @Override
205 public void run() {
206 byte[] value = new byte[100];
207 Put[] in = new Put[1];
208
209
210 for (int i=0; i<numOps; i++) {
211
212 rand.nextBytes(value);
213
214
215
216 Put put = new Put(rowkey);
217 put.add(fam1, qual1, value);
218 in[0] = put;
219 try {
220 OperationStatus[] ret = region.batchMutate(in);
221 assertEquals(1, ret.length);
222 assertEquals(OperationStatusCode.SUCCESS, ret[0].getOperationStatusCode());
223 assertGet(this.region, rowkey, fam1, qual1, value);
224 } catch (IOException e) {
225 assertTrue("Thread id " + threadNumber + " operation " + i + " failed.",
226 false);
227 }
228 }
229 }
230 }
231 }