1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import junit.framework.TestCase;
21 import org.apache.hadoop.hbase.testclassification.SmallTests;
22 import org.junit.experimental.categories.Category;
23
24 import java.util.Random;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import java.util.concurrent.atomic.AtomicLong;
27
28
29
30
31
32 @Category(SmallTests.class)
33 public class TestMultiVersionConsistencyControl extends TestCase {
34 static class Writer implements Runnable {
35 final AtomicBoolean finished;
36 final MultiVersionConsistencyControl mvcc;
37 final AtomicBoolean status;
38
39 Writer(AtomicBoolean finished, MultiVersionConsistencyControl mvcc, AtomicBoolean status) {
40 this.finished = finished;
41 this.mvcc = mvcc;
42 this.status = status;
43 }
44
45 private Random rnd = new Random();
46 public boolean failed = false;
47
48 public void run() {
49 AtomicLong startPoint = new AtomicLong();
50 while (!finished.get()) {
51 MultiVersionConsistencyControl.WriteEntry e =
52 mvcc.beginMemstoreInsertWithSeqNum(startPoint.incrementAndGet());
53
54
55 int sleepTime = rnd.nextInt(500);
56
57
58 try {
59 if (sleepTime > 0) Thread.sleep(0, sleepTime * 1000);
60 } catch (InterruptedException e1) {
61 }
62 try {
63 mvcc.completeMemstoreInsert(e);
64 } catch (RuntimeException ex) {
65
66 System.out.println(ex.toString());
67 ex.printStackTrace();
68 status.set(false);
69 return;
70
71 }
72 }
73 }
74 }
75
76 public void testParallelism() throws Exception {
77 final MultiVersionConsistencyControl mvcc = new MultiVersionConsistencyControl();
78
79 final AtomicBoolean finished = new AtomicBoolean(false);
80
81
82 final AtomicBoolean readerFailed = new AtomicBoolean(false);
83 final AtomicLong failedAt = new AtomicLong();
84 Runnable reader = new Runnable() {
85 public void run() {
86 long prev = mvcc.memstoreReadPoint();
87 while (!finished.get()) {
88 long newPrev = mvcc.memstoreReadPoint();
89 if (newPrev < prev) {
90
91 System.out.println("Reader got out of order, prev: " + prev + " next was: " + newPrev);
92 readerFailed.set(true);
93
94 failedAt.set(newPrev);
95 return;
96 }
97 }
98 }
99 };
100
101
102 int n = 20;
103 Thread[] writers = new Thread[n];
104 AtomicBoolean[] statuses = new AtomicBoolean[n];
105 Thread readThread = new Thread(reader);
106
107 for (int i = 0; i < n; ++i) {
108 statuses[i] = new AtomicBoolean(true);
109 writers[i] = new Thread(new Writer(finished, mvcc, statuses[i]));
110 writers[i].start();
111 }
112 readThread.start();
113
114 try {
115 Thread.sleep(10 * 1000);
116 } catch (InterruptedException ex) {
117 }
118
119 finished.set(true);
120
121 readThread.join();
122 for (int i = 0; i < n; ++i) {
123 writers[i].join();
124 }
125
126
127 assertFalse(readerFailed.get());
128 for (int i = 0; i < n; ++i) {
129 assertTrue(statuses[i].get());
130 }
131 }
132 }