1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.fs.FileSystem;
27 import org.apache.hadoop.fs.Path;
28 import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
29 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
30 import org.apache.hadoop.hbase.testclassification.LargeTests;
31 import org.junit.After;
32 import org.junit.Before;
33 import org.junit.Ignore;
34 import org.junit.Test;
35 import org.junit.experimental.categories.Category;
36
37 import static org.junit.Assert.assertTrue;
38 import static org.junit.Assert.fail;
39
40 @Category(LargeTests.class)
41 public class TestProcedureReplayOrder {
42 private static final Log LOG = LogFactory.getLog(TestProcedureReplayOrder.class);
43
44 private static final Procedure NULL_PROC = null;
45
46 private ProcedureExecutor<Void> procExecutor;
47 private TestProcedureEnv procEnv;
48 private ProcedureStore procStore;
49
50 private HBaseCommonTestingUtility htu;
51 private FileSystem fs;
52 private Path testDir;
53 private Path logDir;
54
55 @Before
56 public void setUp() throws IOException {
57 htu = new HBaseCommonTestingUtility();
58 htu.getConfiguration().setInt("hbase.procedure.store.wal.sync.wait.msec", 10);
59
60 testDir = htu.getDataTestDir();
61 fs = testDir.getFileSystem(htu.getConfiguration());
62 assertTrue(testDir.depth() > 1);
63
64 logDir = new Path(testDir, "proc-logs");
65 procEnv = new TestProcedureEnv();
66 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
67 procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
68 procStore.start(24);
69 procExecutor.start(1);
70 }
71
72 @After
73 public void tearDown() throws IOException {
74 procExecutor.stop();
75 procStore.stop(false);
76 fs.delete(logDir, true);
77 }
78
79 @Test(timeout=90000)
80 public void testSingleStepReplyOrder() throws Exception {
81
82 procEnv.setAcquireLock(false);
83
84
85 submitProcedures(16, 25, TestSingleStepProcedure.class);
86
87
88 ProcedureTestingUtility.restart(procExecutor, new Runnable() {
89 @Override
90 public void run() {
91 procEnv.setAcquireLock(true);
92 }
93 });
94
95
96
97 ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
98 procEnv.assertSortedExecList();
99
100
101 }
102
103 @Ignore
104 @Test(timeout=90000)
105 public void testMultiStepReplyOrder() throws Exception {
106
107 procEnv.setAcquireLock(false);
108
109
110 submitProcedures(16, 10, TestTwoStepProcedure.class);
111
112
113 ProcedureTestingUtility.restart(procExecutor, new Runnable() {
114 @Override
115 public void run() {
116 procEnv.setAcquireLock(true);
117 }
118 });
119
120 fail("TODO: FIXME: NOT IMPLEMENT REPLAY ORDER");
121 }
122
123 private void submitProcedures(final int nthreads, final int nprocPerThread,
124 final Class<?> procClazz) throws Exception {
125 Thread[] submitThreads = new Thread[nthreads];
126 for (int i = 0; i < submitThreads.length; ++i) {
127 submitThreads[i] = new Thread() {
128 @Override
129 public void run() {
130 for (int i = 0; i < nprocPerThread; ++i) {
131 try {
132 procExecutor.submitProcedure((Procedure)procClazz.newInstance());
133 } catch (InstantiationException|IllegalAccessException e) {
134 LOG.error("unable to instantiate the procedure", e);
135 fail("failure during the proc.newInstance(): " + e.getMessage());
136 }
137 }
138 }
139 };
140 }
141
142 for (int i = 0; i < submitThreads.length; ++i) {
143 submitThreads[i].start();
144 }
145
146 for (int i = 0; i < submitThreads.length; ++i) {
147 submitThreads[i].join();
148 }
149 }
150
151 private static class TestProcedureEnv {
152 private ArrayList<Long> execList = new ArrayList<Long>();
153 private boolean acquireLock = true;
154
155 public void setAcquireLock(boolean acquireLock) {
156 this.acquireLock = acquireLock;
157 }
158
159 public boolean canAcquireLock() {
160 return acquireLock;
161 }
162
163 public void addToExecList(final Procedure proc) {
164 execList.add(proc.getProcId());
165 }
166
167 public ArrayList<Long> getExecList() {
168 return execList;
169 }
170
171 public void assertSortedExecList() {
172 LOG.debug("EXEC LIST: " + execList);
173 for (int i = 1; i < execList.size(); ++i) {
174 assertTrue("exec list not sorted: " + execList.get(i-1) + " >= " + execList.get(i),
175 execList.get(i-1) < execList.get(i));
176 }
177 }
178 }
179
180 public static class TestSingleStepProcedure extends SequentialProcedure<TestProcedureEnv> {
181 public TestSingleStepProcedure() { }
182
183 @Override
184 protected Procedure[] execute(TestProcedureEnv env) {
185 LOG.debug("execute procedure " + this);
186 env.addToExecList(this);
187 return null;
188 }
189
190 protected boolean acquireLock(final TestProcedureEnv env) {
191 return env.canAcquireLock();
192 }
193
194 @Override
195 protected void rollback(TestProcedureEnv env) { }
196
197 @Override
198 protected boolean abort(TestProcedureEnv env) { return true; }
199 }
200
201 public static class TestTwoStepProcedure extends SequentialProcedure<TestProcedureEnv> {
202 public TestTwoStepProcedure() { }
203
204 @Override
205 protected Procedure[] execute(TestProcedureEnv env) {
206 LOG.debug("execute procedure " + this);
207 env.addToExecList(this);
208 return new Procedure[] { new TestSingleStepProcedure() };
209 }
210
211 protected boolean acquireLock(final TestProcedureEnv env) {
212 return true;
213 }
214
215 @Override
216 protected void rollback(TestProcedureEnv env) { }
217
218 @Override
219 protected boolean abort(TestProcedureEnv env) { return true; }
220 }
221 }