1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2.store.wal;
20
21 import java.io.FileNotFoundException;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.io.OutputStream;
25 import java.util.Iterator;
26 import java.util.HashSet;
27 import java.util.Set;
28 import java.util.Random;
29 import java.util.concurrent.atomic.AtomicLong;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.FileStatus;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
37 import org.apache.hadoop.hbase.procedure2.Procedure;
38 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
39 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
40 import org.apache.hadoop.hbase.procedure2.SequentialProcedure;
41 import org.apache.hadoop.hbase.testclassification.SmallTests;
42 import org.apache.hadoop.hbase.util.Bytes;
43 import org.apache.hadoop.io.IOUtils;
44
45 import org.junit.After;
46 import org.junit.Before;
47 import org.junit.Assert;
48 import org.junit.Test;
49 import org.junit.experimental.categories.Category;
50
51 import static org.junit.Assert.assertEquals;
52 import static org.junit.Assert.assertFalse;
53 import static org.junit.Assert.assertTrue;
54 import static org.junit.Assert.fail;
55
56 @Category(SmallTests.class)
57 public class TestWALProcedureStore {
58 private static final Log LOG = LogFactory.getLog(TestWALProcedureStore.class);
59
60 private static final int PROCEDURE_STORE_SLOTS = 1;
61 private static final Procedure NULL_PROC = null;
62
63 private WALProcedureStore procStore;
64
65 private HBaseCommonTestingUtility htu;
66 private FileSystem fs;
67 private Path testDir;
68 private Path logDir;
69
70 @Before
71 public void setUp() throws IOException {
72 htu = new HBaseCommonTestingUtility();
73 testDir = htu.getDataTestDir();
74 fs = testDir.getFileSystem(htu.getConfiguration());
75 assertTrue(testDir.depth() > 1);
76
77 logDir = new Path(testDir, "proc-logs");
78 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
79 procStore.start(PROCEDURE_STORE_SLOTS);
80 procStore.recoverLease();
81 procStore.load();
82 }
83
84 @After
85 public void tearDown() throws IOException {
86 procStore.stop(false);
87 fs.delete(logDir, true);
88 }
89
90 private Iterator<Procedure> storeRestart() throws Exception {
91 procStore.stop(false);
92 procStore.start(PROCEDURE_STORE_SLOTS);
93 procStore.recoverLease();
94 return procStore.load();
95 }
96
97 @Test
98 public void testEmptyRoll() throws Exception {
99 for (int i = 0; i < 10; ++i) {
100 procStore.periodicRollForTesting();
101 }
102 FileStatus[] status = fs.listStatus(logDir);
103 assertEquals(1, status.length);
104 }
105
106 @Test
107 public void testEmptyLogLoad() throws Exception {
108 Iterator<Procedure> loader = storeRestart();
109 assertEquals(0, countProcedures(loader));
110 }
111
112 @Test
113 public void testLoad() throws Exception {
114 Set<Long> procIds = new HashSet<>();
115
116
117 Procedure proc1 = new TestSequentialProcedure();
118 procIds.add(proc1.getProcId());
119 procStore.insert(proc1, null);
120
121 Procedure proc2 = new TestSequentialProcedure();
122 Procedure[] child2 = new Procedure[2];
123 child2[0] = new TestSequentialProcedure();
124 child2[1] = new TestSequentialProcedure();
125
126 procIds.add(proc2.getProcId());
127 procIds.add(child2[0].getProcId());
128 procIds.add(child2[1].getProcId());
129 procStore.insert(proc2, child2);
130
131
132 verifyProcIdsOnRestart(procIds);
133
134
135 procStore.update(proc1);
136 procStore.update(child2[1]);
137 procStore.delete(child2[1].getProcId());
138 procIds.remove(child2[1].getProcId());
139
140
141 verifyProcIdsOnRestart(procIds);
142
143
144 procStore.stop(false);
145 FileStatus[] logs = fs.listStatus(logDir);
146 assertEquals(3, logs.length);
147 for (int i = 0; i < logs.length; ++i) {
148 corruptLog(logs[i], 4);
149 }
150 verifyProcIdsOnRestart(procIds);
151 }
152
153 @Test
154 public void testNoTrailerDoubleRestart() throws Exception {
155
156 Procedure proc0 = new TestSequentialProcedure();
157 procStore.insert(proc0, null);
158 Procedure proc1 = new TestSequentialProcedure();
159 procStore.insert(proc1, null);
160 Procedure proc2 = new TestSequentialProcedure();
161 procStore.insert(proc2, null);
162 procStore.rollWriterForTesting();
163
164
165 procStore.delete(proc1.getProcId());
166 procStore.rollWriterForTesting();
167
168
169 procStore.update(proc2);
170 procStore.rollWriterForTesting();
171
172
173 procStore.delete(proc2.getProcId());
174
175
176 procStore.stop(false);
177 FileStatus[] logs = fs.listStatus(logDir);
178 assertEquals(4, logs.length);
179 for (int i = 0; i < logs.length; ++i) {
180 corruptLog(logs[i], 4);
181 }
182
183
184 assertEquals(1, countProcedures(storeRestart()));
185
186
187 assertEquals(5, fs.listStatus(logDir).length);
188 assertEquals(1, countProcedures(storeRestart()));
189
190
191 procStore.delete(proc0.getProcId());
192 procStore.periodicRollForTesting();
193 assertEquals(1, fs.listStatus(logDir).length);
194 assertEquals(0, countProcedures(storeRestart()));
195 }
196
197 @Test
198 public void testCorruptedTrailer() throws Exception {
199
200 for (int i = 0; i < 100; ++i) {
201 procStore.insert(new TestSequentialProcedure(), null);
202 }
203
204
205 procStore.stop(false);
206
207
208 FileStatus[] logs = fs.listStatus(logDir);
209 assertEquals(1, logs.length);
210 corruptLog(logs[0], 4);
211
212 int count = countProcedures(storeRestart());
213 assertEquals(100, count);
214 }
215
216 @Test
217 public void testCorruptedEntries() throws Exception {
218
219 for (int i = 0; i < 100; ++i) {
220 procStore.insert(new TestSequentialProcedure(), null);
221 }
222
223
224 procStore.stop(false);
225
226
227
228 FileStatus[] logs = fs.listStatus(logDir);
229 assertEquals(1, logs.length);
230 corruptLog(logs[0], 1823);
231
232 int count = countProcedures(storeRestart());
233 assertTrue(procStore.getCorruptedLogs() != null);
234 assertEquals(1, procStore.getCorruptedLogs().size());
235 assertEquals(85, count);
236 }
237
238 @Test
239 public void testInsertUpdateDelete() throws Exception {
240 final int NTHREAD = 2;
241
242 procStore.stop(false);
243 fs.delete(logDir, true);
244
245 org.apache.hadoop.conf.Configuration conf =
246 new org.apache.hadoop.conf.Configuration(htu.getConfiguration());
247 conf.setBoolean("hbase.procedure.store.wal.use.hsync", false);
248 conf.setInt("hbase.procedure.store.wal.periodic.roll.msec", 10000);
249 conf.setInt("hbase.procedure.store.wal.roll.threshold", 128 * 1024);
250
251 fs.mkdirs(logDir);
252 procStore = ProcedureTestingUtility.createWalStore(conf, fs, logDir);
253 procStore.start(NTHREAD);
254 procStore.recoverLease();
255 assertEquals(0, countProcedures(procStore.load()));
256
257 final long LAST_PROC_ID = 9999;
258 final Thread[] thread = new Thread[NTHREAD];
259 final AtomicLong procCounter = new AtomicLong((long)Math.round(Math.random() * 100));
260 for (int i = 0; i < thread.length; ++i) {
261 thread[i] = new Thread() {
262 @Override
263 public void run() {
264 Random rand = new Random();
265 TestProcedure proc;
266 do {
267 proc = new TestProcedure(procCounter.addAndGet(1));
268
269 procStore.insert(proc, null);
270
271 for (int i = 0, nupdates = rand.nextInt(10); i <= nupdates; ++i) {
272 try { Thread.sleep(0, rand.nextInt(15)); } catch (InterruptedException e) {}
273 procStore.update(proc);
274 }
275
276 procStore.delete(proc.getProcId());
277 } while (proc.getProcId() < LAST_PROC_ID);
278 }
279 };
280 thread[i].start();
281 }
282
283 for (int i = 0; i < thread.length; ++i) {
284 thread[i].join();
285 }
286
287 procStore.getStoreTracker().dump();
288 assertTrue(procCounter.get() >= LAST_PROC_ID);
289 assertTrue(procStore.getStoreTracker().isEmpty());
290 assertEquals(1, procStore.getActiveLogs().size());
291 }
292
293 @Test
294 public void testRollAndRemove() throws IOException {
295
296 Procedure proc1 = new TestSequentialProcedure();
297 procStore.insert(proc1, null);
298
299 Procedure proc2 = new TestSequentialProcedure();
300 procStore.insert(proc2, null);
301
302
303 procStore.rollWriterForTesting();
304 assertEquals(2, procStore.getActiveLogs().size());
305
306
307
308 procStore.update(proc1);
309 procStore.update(proc2);
310 assertEquals(1, procStore.getActiveLogs().size());
311
312
313 procStore.rollWriterForTesting();
314 assertEquals(2, procStore.getActiveLogs().size());
315
316
317
318 procStore.delete(proc1.getProcId());
319 procStore.delete(proc2.getProcId());
320 assertEquals(1, procStore.getActiveLogs().size());
321 }
322
323 @Test
324 public void testFileNotFoundDuringLeaseRecovery() throws IOException {
325 TestProcedure[] procs = new TestProcedure[3];
326 for (int i = 0; i < procs.length; ++i) {
327 procs[i] = new TestProcedure(i + 1, 0);
328 procStore.insert(procs[i], null);
329 }
330 procStore.rollWriterForTesting();
331 for (int i = 0; i < procs.length; ++i) {
332 procStore.update(procs[i]);
333 procStore.rollWriterForTesting();
334 }
335 procStore.stop(false);
336
337 FileStatus[] status = fs.listStatus(logDir);
338 assertEquals(procs.length + 2, status.length);
339
340
341 procStore = new WALProcedureStore(htu.getConfiguration(), fs, logDir,
342 new WALProcedureStore.LeaseRecovery() {
343 private int count = 0;
344
345 @Override
346 public void recoverFileLease(FileSystem fs, Path path) throws IOException {
347 if (++count <= 2) {
348 fs.delete(path, false);
349 LOG.debug("Simulate FileNotFound at count=" + count + " for " + path);
350 throw new FileNotFoundException("test file not found " + path);
351 }
352 LOG.debug("Simulate recoverFileLease() at count=" + count + " for " + path);
353 }
354 });
355
356 procStore.start(PROCEDURE_STORE_SLOTS);
357 procStore.recoverLease();
358 int countProcs = countProcedures(procStore.load());
359 assertEquals(procs.length - 1, countProcs);
360 assertTrue(procStore.getCorruptedLogs() == null);
361 }
362
363 private void corruptLog(final FileStatus logFile, final long dropBytes)
364 throws IOException {
365 assertTrue(logFile.getLen() > dropBytes);
366 LOG.debug("corrupt log " + logFile.getPath() +
367 " size=" + logFile.getLen() + " drop=" + dropBytes);
368 Path tmpPath = new Path(testDir, "corrupted.log");
369 InputStream in = fs.open(logFile.getPath());
370 OutputStream out = fs.create(tmpPath);
371 IOUtils.copyBytes(in, out, logFile.getLen() - dropBytes, true);
372 if (!fs.rename(tmpPath, logFile.getPath())) {
373 throw new IOException("Unable to rename");
374 }
375 }
376
377 private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception {
378 int count = 0;
379 Iterator<Procedure> loader = storeRestart();
380 while (loader.hasNext()) {
381 Procedure proc = loader.next();
382 LOG.debug("loading procId=" + proc.getProcId());
383 assertTrue("procId=" + proc.getProcId() + " unexpected", procIds.contains(proc.getProcId()));
384 count++;
385 }
386 assertEquals(procIds.size(), count);
387 }
388
389 private void assertIsEmpty(Iterator<Procedure> iterator) {
390 assertEquals(0, countProcedures(iterator));
391 }
392
393 private int countProcedures(Iterator<Procedure> iterator) {
394 int count = 0;
395 while (iterator != null && iterator.hasNext()) {
396 Procedure proc = iterator.next();
397 LOG.trace("loading procId=" + proc.getProcId());
398 count++;
399 }
400 return count;
401 }
402
403 private void assertEmptyLogDir() {
404 try {
405 FileStatus[] status = fs.listStatus(logDir);
406 assertTrue("expected empty state-log dir", status == null || status.length == 0);
407 } catch (FileNotFoundException e) {
408 fail("expected the state-log dir to be present: " + logDir);
409 } catch (IOException e) {
410 fail("got en exception on state-log dir list: " + e.getMessage());
411 }
412 }
413
414 public static class TestSequentialProcedure extends SequentialProcedure<Void> {
415 private static long seqid = 0;
416
417 public TestSequentialProcedure() {
418 setProcId(++seqid);
419 }
420
421 @Override
422 protected Procedure[] execute(Void env) { return null; }
423
424 @Override
425 protected void rollback(Void env) { }
426
427 @Override
428 protected boolean abort(Void env) { return false; }
429
430 @Override
431 protected void serializeStateData(final OutputStream stream) throws IOException {
432 long procId = getProcId();
433 if (procId % 2 == 0) {
434 stream.write(Bytes.toBytes(procId));
435 }
436 }
437
438 @Override
439 protected void deserializeStateData(InputStream stream) throws IOException {
440 long procId = getProcId();
441 if (procId % 2 == 0) {
442 byte[] bProcId = new byte[8];
443 assertEquals(8, stream.read(bProcId));
444 assertEquals(procId, Bytes.toLong(bProcId));
445 } else {
446 assertEquals(0, stream.available());
447 }
448 }
449 }
450 }