1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19 import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
20 import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertNull;
23 import static org.junit.Assert.fail;
24
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.List;
29 import java.util.Random;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.atomic.AtomicInteger;
32 import java.util.concurrent.atomic.AtomicLong;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FileSystem;
38 import org.apache.hadoop.fs.Path;
39 import org.apache.hadoop.hbase.Cell;
40 import org.apache.hadoop.hbase.CellUtil;
41 import org.apache.hadoop.hbase.HBaseTestingUtility;
42 import org.apache.hadoop.hbase.HColumnDescriptor;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.HRegionInfo;
45 import org.apache.hadoop.hbase.HTableDescriptor;
46 import org.apache.hadoop.hbase.MultithreadedTestUtil;
47 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
48 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
49 import org.apache.hadoop.hbase.TableName;
50 import org.apache.hadoop.hbase.client.Append;
51 import org.apache.hadoop.hbase.client.Delete;
52 import org.apache.hadoop.hbase.client.Durability;
53 import org.apache.hadoop.hbase.client.Get;
54 import org.apache.hadoop.hbase.client.Increment;
55 import org.apache.hadoop.hbase.client.Mutation;
56 import org.apache.hadoop.hbase.client.Put;
57 import org.apache.hadoop.hbase.client.Result;
58 import org.apache.hadoop.hbase.client.RowMutations;
59 import org.apache.hadoop.hbase.client.Scan;
60 import org.apache.hadoop.hbase.filter.BinaryComparator;
61 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
62 import org.apache.hadoop.hbase.io.HeapSize;
63 import org.apache.hadoop.hbase.testclassification.MediumTests;
64 import org.apache.hadoop.hbase.util.Bytes;
65 import org.apache.hadoop.hbase.wal.WAL;
66 import org.junit.After;
67 import org.junit.Before;
68 import org.junit.Rule;
69 import org.junit.Test;
70 import org.junit.experimental.categories.Category;
71 import org.junit.rules.TestName;
72
73
74
75
76
77 @Category(MediumTests.class)
78 public class TestAtomicOperation {
79 static final Log LOG = LogFactory.getLog(TestAtomicOperation.class);
80 @Rule public TestName name = new TestName();
81
82 HRegion region = null;
83 private HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
84
85
86 static byte[] tableName;
87 static final byte[] qual1 = Bytes.toBytes("qual1");
88 static final byte[] qual2 = Bytes.toBytes("qual2");
89 static final byte[] qual3 = Bytes.toBytes("qual3");
90 static final byte[] value1 = Bytes.toBytes("value1");
91 static final byte[] value2 = Bytes.toBytes("value2");
92 static final byte [] row = Bytes.toBytes("rowA");
93 static final byte [] row2 = Bytes.toBytes("rowB");
94
95 @Before
96 public void setup() {
97 tableName = Bytes.toBytes(name.getMethodName());
98 }
99
100 @After
101 public void teardown() throws IOException {
102 if (region != null) {
103 ((HRegion)region).close();
104 region = null;
105 }
106 }
107
108
109
110
111
112
113
114
115
116
117 @Test
118 public void testAppend() throws IOException {
119 initHRegion(tableName, name.getMethodName(), fam1);
120 String v1 = "Ultimate Answer to the Ultimate Question of Life,"+
121 " The Universe, and Everything";
122 String v2 = " is... 42.";
123 Append a = new Append(row);
124 a.setReturnResults(false);
125 a.add(fam1, qual1, Bytes.toBytes(v1));
126 a.add(fam1, qual2, Bytes.toBytes(v2));
127 assertNull(region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE));
128 a = new Append(row);
129 a.add(fam1, qual1, Bytes.toBytes(v2));
130 a.add(fam1, qual2, Bytes.toBytes(v1));
131 Result result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
132 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1+v2), result.getValue(fam1, qual1)));
133 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2+v1), result.getValue(fam1, qual2)));
134 }
135
136
137
138
139 @Test
140 public void testIncrementMultiThreads() throws IOException {
141 LOG.info("Starting test testIncrementMultiThreads");
142
143 initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2);
144
145
146
147 int numThreads = 100;
148 int incrementsPerThread = 1000;
149 Incrementer[] all = new Incrementer[numThreads];
150 int expectedTotal = 0;
151
152
153 for (int i = 0; i < numThreads; i++) {
154 all[i] = new Incrementer(region, i, i, incrementsPerThread);
155 expectedTotal += (i * incrementsPerThread);
156 }
157
158
159 for (int i = 0; i < numThreads; i++) {
160 all[i].start();
161 }
162
163
164 for (int i = 0; i < numThreads; i++) {
165 try {
166 all[i].join();
167 } catch (InterruptedException e) {
168 }
169 }
170 assertICV(row, fam1, qual1, expectedTotal);
171 assertICV(row, fam1, qual2, expectedTotal*2);
172 assertICV(row, fam2, qual3, expectedTotal*3);
173 LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal);
174 }
175
176
177 private void assertICV(byte [] row,
178 byte [] familiy,
179 byte[] qualifier,
180 long amount) throws IOException {
181
182 Get get = new Get(row);
183 get.addColumn(familiy, qualifier);
184 Result result = region.get(get);
185 assertEquals(1, result.size());
186
187 Cell kv = result.rawCells()[0];
188 long r = Bytes.toLong(CellUtil.cloneValue(kv));
189 assertEquals(amount, r);
190 }
191
192 private void initHRegion (byte [] tableName, String callingMethod,
193 byte[] ... families)
194 throws IOException {
195 initHRegion(tableName, callingMethod, null, families);
196 }
197
198 private void initHRegion (byte [] tableName, String callingMethod, int [] maxVersions,
199 byte[] ... families)
200 throws IOException {
201 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
202 int i=0;
203 for(byte [] family : families) {
204 HColumnDescriptor hcd = new HColumnDescriptor(family);
205 hcd.setMaxVersions(maxVersions != null ? maxVersions[i++] : 1);
206 htd.addFamily(hcd);
207 }
208 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
209 region = TEST_UTIL.createLocalHRegion(info, htd);
210 }
211
212
213
214
215
216 public static class Incrementer extends Thread {
217
218 private final HRegion region;
219 private final int numIncrements;
220 private final int amount;
221
222
223 public Incrementer(HRegion region, int threadNumber, int amount, int numIncrements) {
224 super("Incrementer." + threadNumber);
225 this.region = region;
226 this.numIncrements = numIncrements;
227 this.amount = amount;
228 setDaemon(true);
229 }
230
231 @Override
232 public void run() {
233 for (int i=0; i<numIncrements; i++) {
234 try {
235 Increment inc = new Increment(row);
236 inc.addColumn(fam1, qual1, amount);
237 inc.addColumn(fam1, qual2, amount*2);
238 inc.addColumn(fam2, qual3, amount*3);
239 inc.setDurability(Durability.ASYNC_WAL);
240 Result result = region.increment(inc);
241 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2,
242 Bytes.toLong(result.getValue(fam1, qual2)));
243 long fam1Increment = Bytes.toLong(result.getValue(fam1, qual1))*3;
244 long fam2Increment = Bytes.toLong(result.getValue(fam2, qual3));
245 assertEquals("fam1=" + fam1Increment + ", fam2=" + fam2Increment,
246 fam1Increment, fam2Increment);
247 } catch (IOException e) {
248 e.printStackTrace();
249 }
250 }
251 }
252 }
253
254 @Test
255 public void testAppendMultiThreads() throws IOException {
256 LOG.info("Starting test testAppendMultiThreads");
257
258 initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2);
259
260 int numThreads = 100;
261 int opsPerThread = 100;
262 AtomicOperation[] all = new AtomicOperation[numThreads];
263 final byte[] val = new byte[]{1};
264
265 AtomicInteger failures = new AtomicInteger(0);
266
267 for (int i = 0; i < numThreads; i++) {
268 all[i] = new AtomicOperation(region, opsPerThread, null, failures) {
269 @Override
270 public void run() {
271 for (int i=0; i<numOps; i++) {
272 try {
273 Append a = new Append(row);
274 a.add(fam1, qual1, val);
275 a.add(fam1, qual2, val);
276 a.add(fam2, qual3, val);
277 a.setDurability(Durability.ASYNC_WAL);
278 region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
279
280 Get g = new Get(row);
281 Result result = region.get(g);
282 assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam1, qual2).length);
283 assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam2, qual3).length);
284 } catch (IOException e) {
285 e.printStackTrace();
286 failures.incrementAndGet();
287 fail();
288 }
289 }
290 }
291 };
292 }
293
294
295 for (int i = 0; i < numThreads; i++) {
296 all[i].start();
297 }
298
299
300 for (int i = 0; i < numThreads; i++) {
301 try {
302 all[i].join();
303 } catch (InterruptedException e) {
304 }
305 }
306 assertEquals(0, failures.get());
307 Get g = new Get(row);
308 Result result = region.get(g);
309 assertEquals(result.getValue(fam1, qual1).length, 10000);
310 assertEquals(result.getValue(fam1, qual2).length, 10000);
311 assertEquals(result.getValue(fam2, qual3).length, 10000);
312 }
313
314
315
316 @Test
317 public void testRowMutationMultiThreads() throws IOException {
318
319 LOG.info("Starting test testRowMutationMultiThreads");
320 initHRegion(tableName, name.getMethodName(), fam1);
321
322
323
324 int numThreads = 10;
325 int opsPerThread = 500;
326 AtomicOperation[] all = new AtomicOperation[numThreads];
327
328 AtomicLong timeStamps = new AtomicLong(0);
329 AtomicInteger failures = new AtomicInteger(0);
330
331 for (int i = 0; i < numThreads; i++) {
332 all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
333 @Override
334 public void run() {
335 boolean op = true;
336 for (int i=0; i<numOps; i++) {
337 try {
338
339 if (i%10==0) {
340 synchronized(region) {
341 LOG.debug("flushing");
342 region.flush(true);
343 if (i%100==0) {
344 region.compact(false);
345 }
346 }
347 }
348 long ts = timeStamps.incrementAndGet();
349 RowMutations rm = new RowMutations(row);
350 if (op) {
351 Put p = new Put(row, ts);
352 p.add(fam1, qual1, value1);
353 p.setDurability(Durability.ASYNC_WAL);
354 rm.add(p);
355 Delete d = new Delete(row);
356 d.deleteColumns(fam1, qual2, ts);
357 d.setDurability(Durability.ASYNC_WAL);
358 rm.add(d);
359 } else {
360 Delete d = new Delete(row);
361 d.deleteColumns(fam1, qual1, ts);
362 d.setDurability(Durability.ASYNC_WAL);
363 rm.add(d);
364 Put p = new Put(row, ts);
365 p.add(fam1, qual2, value2);
366 p.setDurability(Durability.ASYNC_WAL);
367 rm.add(p);
368 }
369 region.mutateRow(rm);
370 op ^= true;
371
372 Get g = new Get(row);
373 Result r = region.get(g);
374 if (r.size() != 1) {
375 LOG.debug(r);
376 failures.incrementAndGet();
377 fail();
378 }
379 } catch (IOException e) {
380 e.printStackTrace();
381 failures.incrementAndGet();
382 fail();
383 }
384 }
385 }
386 };
387 }
388
389
390 for (int i = 0; i < numThreads; i++) {
391 all[i].start();
392 }
393
394
395 for (int i = 0; i < numThreads; i++) {
396 try {
397 all[i].join();
398 } catch (InterruptedException e) {
399 }
400 }
401 assertEquals(0, failures.get());
402 }
403
404
405
406
407
408 @Test
409 public void testMultiRowMutationMultiThreads() throws IOException {
410
411 LOG.info("Starting test testMultiRowMutationMultiThreads");
412 initHRegion(tableName, name.getMethodName(), fam1);
413
414
415
416 int numThreads = 10;
417 int opsPerThread = 500;
418 AtomicOperation[] all = new AtomicOperation[numThreads];
419
420 AtomicLong timeStamps = new AtomicLong(0);
421 AtomicInteger failures = new AtomicInteger(0);
422 final List<byte[]> rowsToLock = Arrays.asList(row, row2);
423
424 for (int i = 0; i < numThreads; i++) {
425 all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
426 @Override
427 public void run() {
428 boolean op = true;
429 for (int i=0; i<numOps; i++) {
430 try {
431
432 if (i%10==0) {
433 synchronized(region) {
434 LOG.debug("flushing");
435 region.flush(true);
436 if (i%100==0) {
437 region.compact(false);
438 }
439 }
440 }
441 long ts = timeStamps.incrementAndGet();
442 List<Mutation> mrm = new ArrayList<Mutation>();
443 if (op) {
444 Put p = new Put(row2, ts);
445 p.add(fam1, qual1, value1);
446 p.setDurability(Durability.ASYNC_WAL);
447 mrm.add(p);
448 Delete d = new Delete(row);
449 d.deleteColumns(fam1, qual1, ts);
450 d.setDurability(Durability.ASYNC_WAL);
451 mrm.add(d);
452 } else {
453 Delete d = new Delete(row2);
454 d.deleteColumns(fam1, qual1, ts);
455 d.setDurability(Durability.ASYNC_WAL);
456 mrm.add(d);
457 Put p = new Put(row, ts);
458 p.setDurability(Durability.ASYNC_WAL);
459 p.add(fam1, qual1, value2);
460 mrm.add(p);
461 }
462 region.mutateRowsWithLocks(mrm, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
463 op ^= true;
464
465 Scan s = new Scan(row);
466 RegionScanner rs = region.getScanner(s);
467 List<Cell> r = new ArrayList<Cell>();
468 while (rs.next(r))
469 ;
470 rs.close();
471 if (r.size() != 1) {
472 LOG.debug(r);
473 failures.incrementAndGet();
474 fail();
475 }
476 } catch (IOException e) {
477 e.printStackTrace();
478 failures.incrementAndGet();
479 fail();
480 }
481 }
482 }
483 };
484 }
485
486
487 for (int i = 0; i < numThreads; i++) {
488 all[i].start();
489 }
490
491
492 for (int i = 0; i < numThreads; i++) {
493 try {
494 all[i].join();
495 } catch (InterruptedException e) {
496 }
497 }
498 assertEquals(0, failures.get());
499 }
500
501 public static class AtomicOperation extends Thread {
502 protected final HRegion region;
503 protected final int numOps;
504 protected final AtomicLong timeStamps;
505 protected final AtomicInteger failures;
506 protected final Random r = new Random();
507
508 public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps,
509 AtomicInteger failures) {
510 this.region = region;
511 this.numOps = numOps;
512 this.timeStamps = timeStamps;
513 this.failures = failures;
514 }
515 }
516
517 private static CountDownLatch latch = new CountDownLatch(1);
518 private enum TestStep {
519 INIT,
520 PUT_STARTED,
521 PUT_COMPLETED,
522 CHECKANDPUT_STARTED,
523 CHECKANDPUT_COMPLETED
524
525 }
526 private static volatile TestStep testStep = TestStep.INIT;
527 private final String family = "f1";
528
529
530
531
532
533
534
535 @Test
536 public void testPutAndCheckAndPutInParallel() throws Exception {
537
538 final String tableName = "testPutAndCheckAndPut";
539 Configuration conf = TEST_UTIL.getConfiguration();
540 conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class);
541 final MockHRegion region = (MockHRegion) TEST_UTIL.createLocalHRegion(Bytes.toBytes(tableName),
542 null, null, tableName, conf, false, Durability.SYNC_WAL, null, Bytes.toBytes(family));
543
544 Put[] puts = new Put[1];
545 Put put = new Put(Bytes.toBytes("r1"));
546 put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10"));
547 puts[0] = put;
548
549 region.batchMutate(puts, HConstants.NO_NONCE, HConstants.NO_NONCE);
550 MultithreadedTestUtil.TestContext ctx =
551 new MultithreadedTestUtil.TestContext(conf);
552 ctx.addThread(new PutThread(ctx, region));
553 ctx.addThread(new CheckAndPutThread(ctx, region));
554 ctx.startThreads();
555 while (testStep != TestStep.CHECKANDPUT_COMPLETED) {
556 Thread.sleep(100);
557 }
558 ctx.stop();
559 Scan s = new Scan();
560 RegionScanner scanner = region.getScanner(s);
561 List<Cell> results = new ArrayList<Cell>();
562 ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(2).build();
563 scanner.next(results, scannerContext);
564 for (Cell keyValue : results) {
565 assertEquals("50",Bytes.toString(CellUtil.cloneValue(keyValue)));
566 }
567
568 }
569
570 private class PutThread extends TestThread {
571 private HRegion region;
572 PutThread(TestContext ctx, HRegion region) {
573 super(ctx);
574 this.region = region;
575 }
576
577 public void doWork() throws Exception {
578 Put[] puts = new Put[1];
579 Put put = new Put(Bytes.toBytes("r1"));
580 put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50"));
581 puts[0] = put;
582 testStep = TestStep.PUT_STARTED;
583 region.batchMutate(puts, HConstants.NO_NONCE, HConstants.NO_NONCE);
584 }
585 }
586
587 private class CheckAndPutThread extends TestThread {
588 private HRegion region;
589 CheckAndPutThread(TestContext ctx, HRegion region) {
590 super(ctx);
591 this.region = region;
592 }
593
594 public void doWork() throws Exception {
595 Put[] puts = new Put[1];
596 Put put = new Put(Bytes.toBytes("r1"));
597 put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("11"));
598 puts[0] = put;
599 while (testStep != TestStep.PUT_COMPLETED) {
600 Thread.sleep(100);
601 }
602 testStep = TestStep.CHECKANDPUT_STARTED;
603 region.checkAndMutate(Bytes.toBytes("r1"), Bytes.toBytes(family), Bytes.toBytes("q1"),
604 CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put, true);
605 testStep = TestStep.CHECKANDPUT_COMPLETED;
606 }
607 }
608
609 public static class MockHRegion extends HRegion {
610
611 public MockHRegion(Path tableDir, WAL log, FileSystem fs, Configuration conf,
612 final HRegionInfo regionInfo, final HTableDescriptor htd, RegionServerServices rsServices) {
613 super(tableDir, log, fs, conf, regionInfo, htd, rsServices);
614 }
615
616 @Override
617 public RowLock getRowLockInternal(final byte[] row, boolean waitForLock) throws IOException {
618 if (testStep == TestStep.CHECKANDPUT_STARTED) {
619 latch.countDown();
620 }
621 return new WrappedRowLock(super.getRowLockInternal(row, waitForLock));
622 }
623
624 public class WrappedRowLock extends RowLockImpl {
625
626 private WrappedRowLock(RowLock rowLock) {
627 setContext(((RowLockImpl)rowLock).getContext());
628 }
629
630 @Override
631 public void release() {
632 if (testStep == TestStep.INIT) {
633 super.release();
634 return;
635 }
636
637 if (testStep == TestStep.PUT_STARTED) {
638 try {
639 testStep = TestStep.PUT_COMPLETED;
640 super.release();
641
642
643
644
645
646
647
648
649
650
651 latch.await();
652 Thread.sleep(1000);
653 } catch (InterruptedException e) {
654 Thread.currentThread().interrupt();
655 }
656 }
657 else if (testStep == TestStep.CHECKANDPUT_STARTED) {
658 super.release();
659 }
660 }
661 }
662 }
663 }