1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import static org.junit.Assert.assertArrayEquals;
21 import static org.junit.Assert.assertTrue;
22 import static org.mockito.Mockito.doAnswer;
23 import static org.mockito.Mockito.spy;
24
25 import java.io.IOException;
26 import java.util.Collection;
27
28 import org.apache.commons.lang.mutable.MutableBoolean;
29 import org.apache.hadoop.hbase.DroppedSnapshotException;
30 import org.apache.hadoop.hbase.HBaseTestingUtility;
31 import org.apache.hadoop.hbase.HColumnDescriptor;
32 import org.apache.hadoop.hbase.HConstants;
33 import org.apache.hadoop.hbase.HTableDescriptor;
34 import org.apache.hadoop.hbase.NamespaceDescriptor;
35 import org.apache.hadoop.hbase.TableName;
36 import org.apache.hadoop.hbase.client.Connection;
37 import org.apache.hadoop.hbase.client.Get;
38 import org.apache.hadoop.hbase.client.HBaseAdmin;
39 import org.apache.hadoop.hbase.client.Put;
40 import org.apache.hadoop.hbase.client.Result;
41 import org.apache.hadoop.hbase.client.Table;
42 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
43 import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
44 import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
45 import org.apache.hadoop.hbase.testclassification.MediumTests;
46 import org.apache.hadoop.hbase.util.Bytes;
47 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
48 import org.apache.hadoop.hbase.wal.WAL;
49 import org.junit.After;
50 import org.junit.Before;
51 import org.junit.Test;
52 import org.junit.experimental.categories.Category;
53 import org.mockito.Matchers;
54 import org.mockito.invocation.InvocationOnMock;
55 import org.mockito.stubbing.Answer;
56 import org.mortbay.log.Log;
57
58
59
60
61 @Category({ MediumTests.class })
62 public class TestSplitWalDataLoss {
63
64 private final HBaseTestingUtility testUtil = new HBaseTestingUtility();
65
66 private NamespaceDescriptor namespace = NamespaceDescriptor.create(getClass().getSimpleName())
67 .build();
68
69 private TableName tableName = TableName.valueOf(namespace.getName(), "dataloss");
70
71 private byte[] family = Bytes.toBytes("f");
72
73 private byte[] qualifier = Bytes.toBytes("q");
74
75 @Before
76 public void setUp() throws Exception {
77 testUtil.getConfiguration().setInt("hbase.regionserver.msginterval", 30000);
78 testUtil.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
79 testUtil.startMiniCluster(2);
80 HBaseAdmin admin = testUtil.getHBaseAdmin();
81 admin.createNamespace(namespace);
82 admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(family)));
83 testUtil.waitTableAvailable(tableName);
84 }
85
86 @After
87 public void tearDown() throws Exception {
88 testUtil.shutdownMiniCluster();
89 }
90
91 @Test
92 public void test() throws IOException, InterruptedException {
93 final HRegionServer rs = testUtil.getRSForFirstRegionInTable(tableName);
94 final HRegion region = (HRegion) rs.getOnlineRegions(tableName).get(0);
95 HRegion spiedRegion = spy(region);
96 final MutableBoolean flushed = new MutableBoolean(false);
97 final MutableBoolean reported = new MutableBoolean(false);
98 doAnswer(new Answer<FlushResult>() {
99 @Override
100 public FlushResult answer(InvocationOnMock invocation) throws Throwable {
101 synchronized (flushed) {
102 flushed.setValue(true);
103 flushed.notifyAll();
104 }
105 synchronized (reported) {
106 while (!reported.booleanValue()) {
107 reported.wait();
108 }
109 }
110 rs.getWAL(region.getRegionInfo()).abortCacheFlush(
111 region.getRegionInfo().getEncodedNameAsBytes());
112 throw new DroppedSnapshotException("testcase");
113 }
114 }).when(spiedRegion).internalFlushCacheAndCommit(Matchers.<WAL> any(),
115 Matchers.<MonitoredTask> any(), Matchers.<PrepareFlushResult> any(),
116 Matchers.<Collection<Store>> any());
117 rs.onlineRegions.put(rs.onlineRegions.keySet().iterator().next(), spiedRegion);
118 Connection conn = testUtil.getConnection();
119
120 try (Table table = conn.getTable(tableName)) {
121 table.put(new Put(Bytes.toBytes("row0")).addColumn(family, qualifier, Bytes.toBytes("val0")));
122 }
123 long oldestSeqIdOfStore = region.getOldestSeqIdOfStore(family);
124 Log.info("CHANGE OLDEST " + oldestSeqIdOfStore);
125 assertTrue(oldestSeqIdOfStore > HConstants.NO_SEQNUM);
126 rs.cacheFlusher.requestFlush(spiedRegion, false);
127 synchronized (flushed) {
128 while (!flushed.booleanValue()) {
129 flushed.wait();
130 }
131 }
132 try (Table table = conn.getTable(tableName)) {
133 table.put(new Put(Bytes.toBytes("row1")).addColumn(family, qualifier, Bytes.toBytes("val1")));
134 }
135 long now = EnvironmentEdgeManager.currentTime();
136 rs.tryRegionServerReport(now - 500, now);
137 synchronized (reported) {
138 reported.setValue(true);
139 reported.notifyAll();
140 }
141 while (testUtil.getRSForFirstRegionInTable(tableName) == rs) {
142 Thread.sleep(100);
143 }
144 try (Table table = conn.getTable(tableName)) {
145 Result result = table.get(new Get(Bytes.toBytes("row0")));
146 assertArrayEquals(Bytes.toBytes("val0"), result.getValue(family, qualifier));
147 }
148 }
149 }