1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.wal;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertNotNull;
23 import static org.junit.Assert.assertTrue;
24 import static org.junit.Assert.fail;
25 import static org.mockito.Matchers.any;
26 import static org.mockito.Matchers.eq;
27 import static org.mockito.Mockito.doAnswer;
28 import static org.mockito.Mockito.spy;
29 import static org.mockito.Mockito.when;
30
31 import java.io.FilterInputStream;
32 import java.io.IOException;
33 import java.lang.reflect.Field;
34 import java.security.PrivilegedExceptionAction;
35 import java.util.ArrayList;
36 import java.util.Collection;
37 import java.util.HashSet;
38 import java.util.List;
39 import java.util.Set;
40 import java.util.concurrent.atomic.AtomicBoolean;
41 import java.util.concurrent.atomic.AtomicInteger;
42 import java.util.concurrent.atomic.AtomicLong;
43
44 import org.apache.commons.logging.Log;
45 import org.apache.commons.logging.LogFactory;
46 import org.apache.hadoop.conf.Configuration;
47 import org.apache.hadoop.fs.FSDataInputStream;
48 import org.apache.hadoop.fs.FileStatus;
49 import org.apache.hadoop.fs.FileSystem;
50 import org.apache.hadoop.fs.Path;
51 import org.apache.hadoop.fs.PathFilter;
52 import org.apache.hadoop.hbase.Cell;
53 import org.apache.hadoop.hbase.HBaseConfiguration;
54 import org.apache.hadoop.hbase.HBaseTestingUtility;
55 import org.apache.hadoop.hbase.HColumnDescriptor;
56 import org.apache.hadoop.hbase.HConstants;
57 import org.apache.hadoop.hbase.HRegionInfo;
58 import org.apache.hadoop.hbase.HTableDescriptor;
59 import org.apache.hadoop.hbase.KeyValue;
60 import org.apache.hadoop.hbase.MasterNotRunningException;
61 import org.apache.hadoop.hbase.testclassification.MediumTests;
62 import org.apache.hadoop.hbase.MiniHBaseCluster;
63 import org.apache.hadoop.hbase.ServerName;
64 import org.apache.hadoop.hbase.TableName;
65 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
66 import org.apache.hadoop.hbase.client.Delete;
67 import org.apache.hadoop.hbase.client.Get;
68 import org.apache.hadoop.hbase.client.HTable;
69 import org.apache.hadoop.hbase.client.Put;
70 import org.apache.hadoop.hbase.client.Result;
71 import org.apache.hadoop.hbase.client.ResultScanner;
72 import org.apache.hadoop.hbase.client.Scan;
73 import org.apache.hadoop.hbase.client.Table;
74 import org.apache.hadoop.hbase.fs.HFileSystem;
75 import org.apache.hadoop.hbase.master.HMaster;
76 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
77 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
78 import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
79 import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
80 import org.apache.hadoop.hbase.regionserver.FlushRequestListener;
81 import org.apache.hadoop.hbase.regionserver.FlushRequester;
82 import org.apache.hadoop.hbase.regionserver.HRegion;
83 import org.apache.hadoop.hbase.regionserver.HRegionServer;
84 import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
85 import org.apache.hadoop.hbase.regionserver.Region;
86 import org.apache.hadoop.hbase.regionserver.RegionScanner;
87 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
88 import org.apache.hadoop.hbase.regionserver.Store;
89 import org.apache.hadoop.hbase.security.User;
90 import org.apache.hadoop.hbase.util.Bytes;
91 import org.apache.hadoop.hbase.util.EnvironmentEdge;
92 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
93 import org.apache.hadoop.hbase.util.FSUtils;
94 import org.apache.hadoop.hbase.util.HFileTestUtil;
95 import org.apache.hadoop.hbase.util.Pair;
96 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
97 import org.apache.hadoop.hbase.wal.WAL;
98 import org.apache.hadoop.hbase.wal.WALFactory;
99 import org.apache.hadoop.hbase.wal.WALKey;
100 import org.apache.hadoop.hbase.wal.WALSplitter;
101 import org.apache.hadoop.hdfs.DFSInputStream;
102 import org.junit.After;
103 import org.junit.AfterClass;
104 import org.junit.Before;
105 import org.junit.BeforeClass;
106 import org.junit.Rule;
107 import org.junit.Test;
108 import org.junit.experimental.categories.Category;
109 import org.junit.rules.TestName;
110 import org.mockito.Mockito;
111 import org.mockito.invocation.InvocationOnMock;
112 import org.mockito.stubbing.Answer;
113
114
115
116
117 @Category(MediumTests.class)
118 public class TestWALReplay {
119 public static final Log LOG = LogFactory.getLog(TestWALReplay.class);
120 static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
121 private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
122 private Path hbaseRootDir = null;
123 private Path hbaseWALRootDir = null;
124 private String logName;
125 private Path oldLogDir;
126 private Path logDir;
127 private FileSystem fs;
128 private Configuration conf;
129 private RecoveryMode mode;
130 private WALFactory wals;
131
132 @Rule
133 public final TestName currentTest = new TestName();
134
135
136 @BeforeClass
137 public static void setUpBeforeClass() throws Exception {
138 Configuration conf = TEST_UTIL.getConfiguration();
139 conf.setBoolean("dfs.support.append", true);
140
141 conf.setInt("dfs.client.block.recovery.retries", 2);
142 TEST_UTIL.startMiniCluster(3);
143 Path hbaseRootDir =
144 TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
145 Path hbaseWALRootDir =
146 TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbaselog"));
147 LOG.info(HConstants.HBASE_DIR + "=" + hbaseRootDir);
148 LOG.info(HFileSystem.HBASE_WAL_DIR + "=" + hbaseWALRootDir);
149 FSUtils.setRootDir(conf, hbaseRootDir);
150 FSUtils.setWALRootDir(conf, hbaseWALRootDir);
151 }
152
153 @AfterClass
154 public static void tearDownAfterClass() throws Exception {
155 TEST_UTIL.shutdownMiniCluster();
156 }
157
158 @Before
159 public void setUp() throws Exception {
160 this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
161 this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
162 this.hbaseRootDir = FSUtils.getRootDir(this.conf);
163 this.hbaseWALRootDir = FSUtils.getWALRootDir(this.conf);
164 this.oldLogDir = new Path(this.hbaseWALRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
165 this.logName = DefaultWALProvider.getWALDirectoryName(currentTest.getMethodName() + "-manual");
166 this.logDir = new Path(this.hbaseWALRootDir, logName);
167 if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
168 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
169 }
170 if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseWALRootDir)) {
171 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseWALRootDir, true);
172 }
173 this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
174 RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
175 this.wals = new WALFactory(conf, null, currentTest.getMethodName());
176 }
177
178 @After
179 public void tearDown() throws Exception {
180 this.wals.close();
181 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
182 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseWALRootDir, true);
183 }
184
185
186
187
188 private void deleteDir(final Path p) throws IOException {
189 if (this.fs.exists(p)) {
190 if (!this.fs.delete(p, true)) {
191 throw new IOException("Failed remove of " + p);
192 }
193 }
194 }
195
196
197
198
199
200 @Test
201 public void testReplayEditsAfterRegionMovedWithMultiCF() throws Exception {
202 final TableName tableName =
203 TableName.valueOf("testReplayEditsAfterRegionMovedWithMultiCF");
204 byte[] family1 = Bytes.toBytes("cf1");
205 byte[] family2 = Bytes.toBytes("cf2");
206 byte[] qualifier = Bytes.toBytes("q");
207 byte[] value = Bytes.toBytes("testV");
208 byte[][] familys = { family1, family2 };
209 TEST_UTIL.createTable(tableName, familys);
210 Table htable = new HTable(TEST_UTIL.getConfiguration(), tableName);
211 Put put = new Put(Bytes.toBytes("r1"));
212 put.add(family1, qualifier, value);
213 htable.put(put);
214 ResultScanner resultScanner = htable.getScanner(new Scan());
215 int count = 0;
216 while (resultScanner.next() != null) {
217 count++;
218 }
219 resultScanner.close();
220 assertEquals(1, count);
221
222 MiniHBaseCluster hbaseCluster = TEST_UTIL.getMiniHBaseCluster();
223 List<HRegion> regions = hbaseCluster.getRegions(tableName);
224 assertEquals(1, regions.size());
225
226
227 Region destRegion = regions.get(0);
228 int originServerNum = hbaseCluster
229 .getServerWith(destRegion.getRegionInfo().getRegionName());
230 assertTrue("Please start more than 1 regionserver", hbaseCluster
231 .getRegionServerThreads().size() > 1);
232 int destServerNum = 0;
233 while (destServerNum == originServerNum) {
234 destServerNum++;
235 }
236 HRegionServer originServer = hbaseCluster.getRegionServer(originServerNum);
237 HRegionServer destServer = hbaseCluster.getRegionServer(destServerNum);
238
239 moveRegionAndWait(destRegion, destServer);
240
241
242 Delete del = new Delete(Bytes.toBytes("r1"));
243 htable.delete(del);
244 resultScanner = htable.getScanner(new Scan());
245 count = 0;
246 while (resultScanner.next() != null) {
247 count++;
248 }
249 resultScanner.close();
250 assertEquals(0, count);
251
252
253 Region region = destServer.getOnlineRegion(destRegion.getRegionInfo().getRegionName());
254 region.flush(true);
255
256 for (Store store : region.getStores()) {
257 store.triggerMajorCompaction();
258 }
259 region.compact(true);
260
261
262 moveRegionAndWait(destRegion, originServer);
263
264 originServer.abort("testing");
265
266
267 Result result = htable.get(new Get(Bytes.toBytes("r1")));
268 if (result != null) {
269 assertTrue("Row is deleted, but we get" + result.toString(),
270 (result == null) || result.isEmpty());
271 }
272 resultScanner.close();
273 }
274
275 private void moveRegionAndWait(Region destRegion, HRegionServer destServer)
276 throws InterruptedException, MasterNotRunningException,
277 ZooKeeperConnectionException, IOException {
278 HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
279 TEST_UTIL.getHBaseAdmin().move(
280 destRegion.getRegionInfo().getEncodedNameAsBytes(),
281 Bytes.toBytes(destServer.getServerName().getServerName()));
282 while (true) {
283 ServerName serverName = master.getAssignmentManager()
284 .getRegionStates().getRegionServerOfRegion(destRegion.getRegionInfo());
285 if (serverName != null && serverName.equals(destServer.getServerName())) {
286 TEST_UTIL.assertRegionOnServer(
287 destRegion.getRegionInfo(), serverName, 200);
288 break;
289 }
290 Thread.sleep(10);
291 }
292 }
293
294
295
296
297
298
299 @Test
300 public void test2727() throws Exception {
301
302
303 final TableName tableName =
304 TableName.valueOf("test2727");
305 HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
306 Path basedir = FSUtils.getTableDir(hbaseWALRootDir, tableName);
307 deleteDir(basedir);
308
309 HTableDescriptor htd = createBasic3FamilyHTD(tableName);
310 HRegion region2 = HRegion.createHRegion(hri, hbaseWALRootDir, this.conf, htd);
311 HRegion.closeHRegion(region2);
312 final byte [] rowName = tableName.getName();
313
314 WAL wal1 = createWAL(this.conf);
315
316 final int countPerFamily = 1000;
317 final AtomicLong sequenceId = new AtomicLong(1);
318 for (HColumnDescriptor hcd: htd.getFamilies()) {
319 addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee,
320 wal1, htd, sequenceId);
321 }
322 wal1.shutdown();
323 runWALSplit(this.conf);
324
325 WAL wal2 = createWAL(this.conf);
326
327 for (HColumnDescriptor hcd: htd.getFamilies()) {
328 addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily,
329 ee, wal2, htd, sequenceId);
330 }
331 wal2.shutdown();
332 runWALSplit(this.conf);
333
334 WAL wal3 = createWAL(this.conf);
335 try {
336 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseWALRootDir, hri, htd, wal3);
337 long seqid = region.getOpenSeqNum();
338
339
340 assertTrue(seqid > sequenceId.get());
341 assertEquals(seqid - 1, sequenceId.get());
342 LOG.debug("region.getOpenSeqNum(): " + region.getOpenSeqNum() + ", wal3.id: "
343 + sequenceId.get());
344
345
346 region.close();
347 } finally {
348 wal3.close();
349 }
350 }
351
352
353
354
355
356
357
358
359
360
361 @Test
362 public void testRegionMadeOfBulkLoadedFilesOnly()
363 throws IOException, SecurityException, IllegalArgumentException,
364 NoSuchFieldException, IllegalAccessException, InterruptedException {
365 final TableName tableName =
366 TableName.valueOf("testRegionMadeOfBulkLoadedFilesOnly");
367 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
368 final Path basedir = new Path(this.hbaseWALRootDir, tableName.getNameAsString());
369 deleteDir(basedir);
370 final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
371 HRegion region2 = HRegion.createHRegion(hri, hbaseWALRootDir, this.conf, htd);
372 HRegion.closeHRegion(region2);
373 WAL wal = createWAL(this.conf);
374 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseWALRootDir, hri, htd, wal);
375
376 byte [] family = htd.getFamilies().iterator().next().getName();
377 Path f = new Path(basedir, "hfile");
378 HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(""),
379 Bytes.toBytes("z"), 10);
380 List <Pair<byte[],String>> hfs= new ArrayList<Pair<byte[],String>>(1);
381 hfs.add(Pair.newPair(family, f.toString()));
382 region.bulkLoadHFiles(hfs, true, null);
383
384
385 byte [] row = tableName.getName();
386 region.put((new Put(row)).add(family, family, family));
387 wal.sync();
388 final int rowsInsertedCount = 11;
389
390 assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
391
392
393 final Configuration newConf = HBaseConfiguration.create(this.conf);
394 User user = HBaseTestingUtility.getDifferentUser(newConf,
395 tableName.getNameAsString());
396 user.runAs(new PrivilegedExceptionAction() {
397 @Override
398 public Object run() throws Exception {
399 runWALSplit(newConf);
400 WAL wal2 = createWAL(newConf);
401
402 HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf),
403 hbaseWALRootDir, hri, htd, wal2);
404 long seqid2 = region2.getOpenSeqNum();
405 assertTrue(seqid2 > -1);
406 assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
407
408
409 region2.close();
410 wal2.close();
411 return null;
412 }
413 });
414 }
415
416
417
418
419
420
421
422
423
424
425
426
427 @Test
428 public void testCompactedBulkLoadedFiles()
429 throws IOException, SecurityException, IllegalArgumentException,
430 NoSuchFieldException, IllegalAccessException, InterruptedException {
431 final TableName tableName =
432 TableName.valueOf("testCompactedBulkLoadedFiles");
433 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
434 final Path basedir = new Path(this.hbaseWALRootDir, tableName.getNameAsString());
435 deleteDir(basedir);
436 final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
437 HRegion region2 = HRegion.createHRegion(hri,
438 hbaseWALRootDir, this.conf, htd);
439 HRegion.closeHRegion(region2);
440 WAL wal = createWAL(this.conf);
441 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseWALRootDir, hri, htd, wal);
442
443
444 byte [] row = tableName.getName();
445 byte [] family = htd.getFamilies().iterator().next().getName();
446 region.put((new Put(row)).add(family, family, family));
447 wal.sync();
448
449 List <Pair<byte[],String>> hfs= new ArrayList<Pair<byte[],String>>(1);
450 for (int i = 0; i < 3; i++) {
451 Path f = new Path(basedir, "hfile"+i);
452 HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(i + "00"),
453 Bytes.toBytes(i + "50"), 10);
454 hfs.add(Pair.newPair(family, f.toString()));
455 }
456 region.bulkLoadHFiles(hfs, true, null);
457 final int rowsInsertedCount = 31;
458 assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
459
460
461 region.compact(true);
462 assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
463
464
465 final Configuration newConf = HBaseConfiguration.create(this.conf);
466 User user = HBaseTestingUtility.getDifferentUser(newConf,
467 tableName.getNameAsString());
468 user.runAs(new PrivilegedExceptionAction() {
469 @Override
470 public Object run() throws Exception {
471 runWALSplit(newConf);
472 WAL wal2 = createWAL(newConf);
473
474 HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf),
475 hbaseWALRootDir, hri, htd, wal2);
476 long seqid2 = region2.getOpenSeqNum();
477 assertTrue(seqid2 > -1);
478 assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
479
480
481 region2.close();
482 wal2.close();
483 return null;
484 }
485 });
486 }
487
488
489
490
491
492
493
494
495
496
497
498 @Test
499 public void testReplayEditsWrittenViaHRegion()
500 throws IOException, SecurityException, IllegalArgumentException,
501 NoSuchFieldException, IllegalAccessException, InterruptedException {
502 final TableName tableName =
503 TableName.valueOf("testReplayEditsWrittenViaHRegion");
504 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
505 final Path basedir = FSUtils.getTableDir(this.hbaseWALRootDir, tableName);
506 deleteDir(basedir);
507 final byte[] rowName = tableName.getName();
508 final int countPerFamily = 10;
509 final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
510 HRegion region3 = HRegion.createHRegion(hri,
511 hbaseWALRootDir, this.conf, htd);
512 HRegion.closeHRegion(region3);
513
514
515
516 WAL wal = createWAL(this.conf);
517 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseWALRootDir, hri, htd, wal);
518 long seqid = region.getOpenSeqNum();
519 boolean first = true;
520 for (HColumnDescriptor hcd: htd.getFamilies()) {
521 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
522 if (first) {
523
524 region.flush(true);
525 first = false;
526 }
527 }
528
529 final Get g = new Get(rowName);
530 Result result = region.get(g);
531 assertEquals(countPerFamily * htd.getFamilies().size(),
532 result.size());
533
534
535
536 region.close(true);
537 wal.shutdown();
538 runWALSplit(this.conf);
539 WAL wal2 = createWAL(this.conf);
540 HRegion region2 = HRegion.openHRegion(conf, this.fs, hbaseWALRootDir, hri, htd, wal2);
541 long seqid2 = region2.getOpenSeqNum();
542 assertTrue(seqid + result.size() < seqid2);
543 final Result result1b = region2.get(g);
544 assertEquals(result.size(), result1b.size());
545
546
547
548
549 for (HColumnDescriptor hcd: htd.getFamilies()) {
550 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y");
551 }
552
553 final Result result2 = region2.get(g);
554 assertEquals(2 * result.size(), result2.size());
555 wal2.sync();
556 final Configuration newConf = HBaseConfiguration.create(this.conf);
557 User user = HBaseTestingUtility.getDifferentUser(newConf,
558 tableName.getNameAsString());
559 user.runAs(new PrivilegedExceptionAction() {
560 @Override
561 public Object run() throws Exception {
562 runWALSplit(newConf);
563 FileSystem newFS = FileSystem.get(newConf);
564
565 WAL wal3 = createWAL(newConf);
566 final AtomicInteger countOfRestoredEdits = new AtomicInteger(0);
567 HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) {
568 @Override
569 protected boolean restoreEdit(Store s, Cell cell) {
570 boolean b = super.restoreEdit(s, cell);
571 countOfRestoredEdits.incrementAndGet();
572 return b;
573 }
574 };
575 long seqid3 = region3.initialize();
576 Result result3 = region3.get(g);
577
578 assertEquals(result2.size(), result3.size());
579 assertEquals(htd.getFamilies().size() * countPerFamily,
580 countOfRestoredEdits.get());
581
582
583 region3.close();
584 wal3.close();
585 return null;
586 }
587 });
588 }
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608 @Test
609 public void testReplayEditsAfterPartialFlush()
610 throws IOException, SecurityException, IllegalArgumentException,
611 NoSuchFieldException, IllegalAccessException, InterruptedException {
612 final TableName tableName =
613 TableName.valueOf("testReplayEditsWrittenViaHRegion");
614 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
615 final Path basedir = FSUtils.getTableDir(this.hbaseWALRootDir, tableName);
616 deleteDir(basedir);
617 final byte[] rowName = tableName.getName();
618 final int countPerFamily = 10;
619 final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
620 HRegion region3 = HRegion.createHRegion(hri,
621 hbaseWALRootDir, this.conf, htd);
622 HRegion.closeHRegion(region3);
623
624
625
626 WAL wal = createWAL(this.conf);
627 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseWALRootDir, hri, htd, wal);
628 long seqid = region.getOpenSeqNum();
629 for (HColumnDescriptor hcd: htd.getFamilies()) {
630 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
631 }
632
633
634 final Get g = new Get(rowName);
635 Result result = region.get(g);
636 assertEquals(countPerFamily * htd.getFamilies().size(),
637 result.size());
638
639
640 region.flush(true);
641 region.close(true);
642 wal.shutdown();
643
644
645
646
647
648 int cf_count = 0;
649 for (HColumnDescriptor hcd: htd.getFamilies()) {
650 cf_count++;
651 if (cf_count == 2) {
652 region.getRegionFileSystem().deleteFamily(hcd.getNameAsString());
653 }
654 }
655
656
657
658 runWALSplit(this.conf);
659 WAL wal2 = createWAL(this.conf);
660 HRegion region2 = HRegion.openHRegion(this.conf, this.fs, hbaseWALRootDir, hri, htd, wal2);
661 long seqid2 = region2.getOpenSeqNum();
662 assertTrue(seqid + result.size() < seqid2);
663
664 final Result result1b = region2.get(g);
665 assertEquals(result.size(), result1b.size());
666 }
667
668
669
670
671 public static class CustomStoreFlusher extends DefaultStoreFlusher {
672
673 static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false);
674
675 public CustomStoreFlusher(Configuration conf, Store store) {
676 super(conf, store);
677 }
678 @Override
679 public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
680 MonitoredTask status) throws IOException {
681 if (throwExceptionWhenFlushing.get()) {
682 throw new IOException("Simulated exception by tests");
683 }
684 return super.flushSnapshot(snapshot, cacheFlushId, status);
685 }
686
687 };
688
689
690
691
692
693
694
695 @Test
696 public void testReplayEditsAfterAbortingFlush() throws IOException {
697 final TableName tableName =
698 TableName.valueOf("testReplayEditsAfterAbortingFlush");
699 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
700 final Path basedir = FSUtils.getTableDir(this.hbaseWALRootDir, tableName);
701 deleteDir(basedir);
702 final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
703 HRegion region3 = HRegion.createHRegion(hri, hbaseWALRootDir, this.conf, htd);
704 region3.close();
705 region3.getWAL().close();
706
707
708
709 WAL wal = createWAL(this.conf);
710 RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
711 Mockito.doReturn(false).when(rsServices).isAborted();
712 when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10));
713 Configuration customConf = new Configuration(this.conf);
714 customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
715 CustomStoreFlusher.class.getName());
716 HRegion region =
717 HRegion.openHRegion(this.hbaseWALRootDir, hri, htd, wal, customConf, rsServices, null);
718 int writtenRowCount = 10;
719 List<HColumnDescriptor> families = new ArrayList<HColumnDescriptor>(
720 htd.getFamilies());
721 for (int i = 0; i < writtenRowCount; i++) {
722 Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
723 put.add(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
724 Bytes.toBytes("val"));
725 region.put(put);
726 }
727
728
729 RegionScanner scanner = region.getScanner(new Scan());
730 assertEquals(writtenRowCount, getScannedCount(scanner));
731
732
733 CustomStoreFlusher.throwExceptionWhenFlushing.set(true);
734 try {
735 region.flush(true);
736 fail("Injected exception hasn't been thrown");
737 } catch (Throwable t) {
738 LOG.info("Expected simulated exception when flushing region,"
739 + t.getMessage());
740
741 Mockito.doReturn(true).when(rsServices).isAborted();
742 region.setClosing(false);
743
744 }
745
746 int moreRow = 10;
747 for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) {
748 Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
749 put.add(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
750 Bytes.toBytes("val"));
751 region.put(put);
752 }
753 writtenRowCount += moreRow;
754
755 CustomStoreFlusher.throwExceptionWhenFlushing.set(false);
756 try {
757 region.flush(true);
758 } catch (IOException t) {
759 LOG.info("Expected exception when flushing region because server is stopped,"
760 + t.getMessage());
761 }
762
763 region.close(true);
764 wal.shutdown();
765
766
767 runWALSplit(this.conf);
768 WAL wal2 = createWAL(this.conf);
769 Mockito.doReturn(false).when(rsServices).isAborted();
770 HRegion region2 =
771 HRegion.openHRegion(this.hbaseWALRootDir, hri, htd, wal2, this.conf, rsServices, null);
772 scanner = region2.getScanner(new Scan());
773 assertEquals(writtenRowCount, getScannedCount(scanner));
774 }
775
776 private int getScannedCount(RegionScanner scanner) throws IOException {
777 int scannedCount = 0;
778 List<Cell> results = new ArrayList<Cell>();
779 while (true) {
780 boolean existMore = scanner.next(results);
781 if (!results.isEmpty())
782 scannedCount++;
783 if (!existMore)
784 break;
785 results.clear();
786 }
787 return scannedCount;
788 }
789
790
791
792
793
794
795 @Test
796 public void testReplayEditsWrittenIntoWAL() throws Exception {
797 final TableName tableName =
798 TableName.valueOf("testReplayEditsWrittenIntoWAL");
799 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
800 final Path basedir = FSUtils.getTableDir(hbaseWALRootDir, tableName);
801 deleteDir(basedir);
802
803 final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
804 HRegion region2 = HRegion.createHRegion(hri,
805 hbaseWALRootDir, this.conf, htd);
806 HRegion.closeHRegion(region2);
807 final WAL wal = createWAL(this.conf);
808 final byte[] rowName = tableName.getName();
809 final byte[] regionName = hri.getEncodedNameAsBytes();
810 final AtomicLong sequenceId = new AtomicLong(1);
811
812
813 final int countPerFamily = 1000;
814 Set<byte[]> familyNames = new HashSet<byte[]>();
815 for (HColumnDescriptor hcd: htd.getFamilies()) {
816 addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily,
817 ee, wal, htd, sequenceId);
818 familyNames.add(hcd.getName());
819 }
820
821
822 wal.startCacheFlush(regionName, familyNames);
823 wal.completeCacheFlush(regionName);
824
825
826 WALEdit edit = new WALEdit();
827 long now = ee.currentTime();
828 edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName,
829 now, rowName));
830 wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now), edit, sequenceId,
831 true, null);
832
833
834 edit = new WALEdit();
835 now = ee.currentTime();
836 edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now,
837 KeyValue.Type.DeleteFamily));
838 wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now), edit, sequenceId,
839 true, null);
840
841
842 wal.sync();
843
844
845 final Configuration newConf = HBaseConfiguration.create(this.conf);
846 User user = HBaseTestingUtility.getDifferentUser(newConf,
847 ".replay.wal.secondtime");
848 user.runAs(new PrivilegedExceptionAction<Void>() {
849 @Override
850 public Void run() throws Exception {
851 runWALSplit(newConf);
852 FileSystem newFS = FileSystem.get(newConf);
853
854 newConf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 100);
855
856 WAL newWal = createWAL(newConf);
857 final AtomicInteger flushcount = new AtomicInteger(0);
858 try {
859 final HRegion region =
860 new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) {
861 @Override
862 protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
863 final Collection<Store> storesToFlush, MonitoredTask status,
864 boolean writeFlushWalMarker)
865 throws IOException {
866 LOG.info("InternalFlushCache Invoked");
867 FlushResult fs = super.internalFlushcache(wal, myseqid, storesToFlush,
868 Mockito.mock(MonitoredTask.class), writeFlushWalMarker);
869 flushcount.incrementAndGet();
870 return fs;
871 };
872 };
873 long seqid = region.initialize();
874
875 assertTrue("Flushcount=" + flushcount.get(), flushcount.get() > 0);
876 assertTrue(seqid - 1 == sequenceId.get());
877
878 Get get = new Get(rowName);
879 Result result = region.get(get);
880
881 assertEquals(countPerFamily * (htd.getFamilies().size() - 1),
882 result.size());
883 region.close();
884 } finally {
885 newWal.close();
886 }
887 return null;
888 }
889 });
890 }
891
892 @Test
893
894 public void testSequentialEditLogSeqNum() throws IOException {
895 final TableName tableName = TableName.valueOf(currentTest.getMethodName());
896 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
897 final Path basedir =
898 FSUtils.getTableDir(this.hbaseWALRootDir, tableName);
899 deleteDir(basedir);
900 final byte[] rowName = tableName.getName();
901 final int countPerFamily = 10;
902 final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
903
904
905 MockWAL wal = createMockWAL();
906
907 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseWALRootDir, hri, htd, wal);
908 for (HColumnDescriptor hcd : htd.getFamilies()) {
909 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
910 }
911
912
913
914 region.flush(true);
915 for (HColumnDescriptor hcd : htd.getFamilies()) {
916 addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x");
917 }
918 long lastestSeqNumber = region.getSequenceId().get();
919
920 wal.doCompleteCacheFlush = true;
921
922
923 wal.completeCacheFlush(hri.getEncodedNameAsBytes());
924 wal.shutdown();
925 FileStatus[] listStatus = wal.getFiles();
926 assertNotNull(listStatus);
927 assertTrue(listStatus.length > 0);
928 WALSplitter.splitLogFile(hbaseWALRootDir, listStatus[0],
929 this.fs, this.conf, null, null, null, mode, wals);
930 FileStatus[] listStatus1 = this.fs.listStatus(
931 new Path(FSUtils.getTableDir(hbaseWALRootDir, tableName), new Path(hri.getEncodedName(),
932 "recovered.edits")), new PathFilter() {
933 @Override
934 public boolean accept(Path p) {
935 if (WALSplitter.isSequenceIdFile(p)) {
936 return false;
937 }
938 return true;
939 }
940 });
941 int editCount = 0;
942 for (FileStatus fileStatus : listStatus1) {
943 editCount = Integer.parseInt(fileStatus.getPath().getName());
944 }
945
946 assertEquals(
947 "The sequence number of the recoverd.edits and the current edit seq should be same",
948 lastestSeqNumber, editCount);
949 }
950
951
952
953
954 @Test
955 public void testDatalossWhenInputError() throws IOException, InstantiationException,
956 IllegalAccessException {
957 final TableName tableName = TableName.valueOf("testDatalossWhenInputError");
958 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
959 final Path basedir = FSUtils.getTableDir(this.hbaseWALRootDir, tableName);
960 deleteDir(basedir);
961 final byte[] rowName = tableName.getName();
962 final int countPerFamily = 10;
963 final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
964 HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseWALRootDir, this.hbaseWALRootDir, this.conf, htd);
965 Path regionDir = region1.getRegionFileSystem().getRegionDir();
966 HBaseTestingUtility.closeRegionAndWAL(region1);
967
968 WAL wal = createWAL(this.conf);
969 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseWALRootDir, hri, htd, wal);
970 for (HColumnDescriptor hcd : htd.getFamilies()) {
971 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
972 }
973
974 final Get g = new Get(rowName);
975 Result result = region.get(g);
976 assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
977
978
979 region.close(true);
980 wal.shutdown();
981
982 runWALSplit(this.conf);
983
984
985 Path editFile = WALSplitter.getSplitEditFilesSorted(this.fs, regionDir).first();
986 FSDataInputStream stream = fs.open(editFile);
987 stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length);
988 Class<? extends DefaultWALProvider.Reader> logReaderClass =
989 conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
990 DefaultWALProvider.Reader.class);
991 DefaultWALProvider.Reader reader = logReaderClass.newInstance();
992 reader.init(this.fs, editFile, conf, stream);
993 final long headerLength = stream.getPos();
994 reader.close();
995 FileSystem spyFs = spy(this.fs);
996 doAnswer(new Answer<FSDataInputStream>() {
997
998 @Override
999 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
1000 FSDataInputStream stream = (FSDataInputStream) invocation.callRealMethod();
1001 Field field = FilterInputStream.class.getDeclaredField("in");
1002 field.setAccessible(true);
1003 final DFSInputStream in = (DFSInputStream) field.get(stream);
1004 DFSInputStream spyIn = spy(in);
1005 doAnswer(new Answer<Integer>() {
1006
1007 private long pos;
1008
1009 @Override
1010 public Integer answer(InvocationOnMock invocation) throws Throwable {
1011 if (pos >= headerLength) {
1012 throw new IOException("read over limit");
1013 }
1014 int b = (Integer) invocation.callRealMethod();
1015 if (b > 0) {
1016 pos += b;
1017 }
1018 return b;
1019 }
1020 }).when(spyIn).read(any(byte[].class), any(int.class), any(int.class));
1021 doAnswer(new Answer<Void>() {
1022
1023 @Override
1024 public Void answer(InvocationOnMock invocation) throws Throwable {
1025 invocation.callRealMethod();
1026 in.close();
1027 return null;
1028 }
1029 }).when(spyIn).close();
1030 field.set(stream, spyIn);
1031 return stream;
1032 }
1033 }).when(spyFs).open(eq(editFile));
1034
1035 WAL wal2 = createWAL(this.conf);
1036 HRegion region2;
1037 try {
1038
1039 region2 = HRegion.openHRegion(conf, spyFs, hbaseWALRootDir, hri, htd, wal2);
1040 assertEquals(result.size(), region2.get(g).size());
1041 } catch (IOException e) {
1042 assertEquals("read over limit", e.getMessage());
1043 }
1044 region2 = HRegion.openHRegion(conf, fs, hbaseWALRootDir, hri, htd, wal2);
1045 assertEquals(result.size(), region2.get(g).size());
1046 }
1047
1048 static class MockWAL extends FSHLog {
1049 boolean doCompleteCacheFlush = false;
1050
1051 public MockWAL(FileSystem fs, Path walRootDir, String logName, Configuration conf)
1052 throws IOException {
1053 super(fs, walRootDir, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
1054 }
1055
1056 @Override
1057 public void completeCacheFlush(byte[] encodedRegionName) {
1058 if (!doCompleteCacheFlush) {
1059 return;
1060 }
1061 super.completeCacheFlush(encodedRegionName);
1062 }
1063 }
1064
1065 private HTableDescriptor createBasic1FamilyHTD(final TableName tableName) {
1066 HTableDescriptor htd = new HTableDescriptor(tableName);
1067 HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a"));
1068 htd.addFamily(a);
1069 return htd;
1070 }
1071
1072 private MockWAL createMockWAL() throws IOException {
1073 MockWAL wal = new MockWAL(fs, hbaseWALRootDir, logName, conf);
1074
1075
1076 HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
1077 return wal;
1078 }
1079
1080
1081
1082 class TestFlusher implements FlushRequester {
1083 private HRegion r;
1084
1085 @Override
1086 public void requestFlush(Region region, boolean force) {
1087 try {
1088 r.flush(force);
1089 } catch (IOException e) {
1090 throw new RuntimeException("Exception flushing", e);
1091 }
1092 }
1093
1094 @Override
1095 public void requestDelayedFlush(Region region, long when, boolean forceFlushAllStores) {
1096
1097
1098 }
1099
1100 @Override
1101 public void registerFlushRequestListener(FlushRequestListener listener) {
1102
1103 }
1104
1105 @Override
1106 public boolean unregisterFlushRequestListener(FlushRequestListener listener) {
1107 return false;
1108 }
1109
1110 @Override
1111 public void setGlobalMemstoreLimit(long globalMemStoreSize) {
1112
1113 }
1114 }
1115
1116 private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,
1117 final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
1118 final HTableDescriptor htd, final AtomicLong sequenceId)
1119 throws IOException {
1120 String familyStr = Bytes.toString(family);
1121 for (int j = 0; j < count; j++) {
1122 byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j));
1123 byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j));
1124 WALEdit edit = new WALEdit();
1125 edit.add(new KeyValue(rowName, family, qualifierBytes,
1126 ee.currentTime(), columnBytes));
1127 wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, ee.currentTime()),
1128 edit, sequenceId, true, null);
1129 }
1130 wal.sync();
1131 }
1132
1133 static List<Put> addRegionEdits (final byte [] rowName, final byte [] family,
1134 final int count, EnvironmentEdge ee, final Region r,
1135 final String qualifierPrefix)
1136 throws IOException {
1137 List<Put> puts = new ArrayList<Put>();
1138 for (int j = 0; j < count; j++) {
1139 byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j));
1140 Put p = new Put(rowName);
1141 p.add(family, qualifier, ee.currentTime(), rowName);
1142 r.put(p);
1143 puts.add(p);
1144 }
1145 return puts;
1146 }
1147
1148
1149
1150
1151
1152
1153 private HRegionInfo createBasic3FamilyHRegionInfo(final TableName tableName) {
1154 return new HRegionInfo(tableName, null, null, false);
1155 }
1156
1157
1158
1159
1160
1161
1162
1163 private Path runWALSplit(final Configuration c) throws IOException {
1164 List<Path> splits = WALSplitter.split(
1165 hbaseWALRootDir, logDir, oldLogDir, FSUtils.getWALFileSystem(c), c, wals);
1166
1167 assertEquals("splits=" + splits, 1, splits.size());
1168
1169 assertTrue(fs.exists(splits.get(0)));
1170 LOG.info("Split file=" + splits.get(0));
1171 return splits.get(0);
1172 }
1173
1174
1175
1176
1177
1178
1179 private WAL createWAL(final Configuration c) throws IOException {
1180 FSHLog wal = new FSHLog(FSUtils.getWALFileSystem(c), hbaseWALRootDir, logName, c);
1181
1182
1183 HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
1184 return wal;
1185 }
1186
1187 private HTableDescriptor createBasic3FamilyHTD(final TableName tableName) {
1188 HTableDescriptor htd = new HTableDescriptor(tableName);
1189 HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a"));
1190 htd.addFamily(a);
1191 HColumnDescriptor b = new HColumnDescriptor(Bytes.toBytes("b"));
1192 htd.addFamily(b);
1193 HColumnDescriptor c = new HColumnDescriptor(Bytes.toBytes("c"));
1194 htd.addFamily(c);
1195 return htd;
1196 }
1197 }