View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import static org.junit.Assert.assertArrayEquals;
21  import static org.junit.Assert.assertTrue;
22  import static org.mockito.Mockito.doAnswer;
23  import static org.mockito.Mockito.spy;
24  
25  import java.io.IOException;
26  import java.util.Collection;
27  
28  import org.apache.commons.lang.mutable.MutableBoolean;
29  import org.apache.hadoop.hbase.DroppedSnapshotException;
30  import org.apache.hadoop.hbase.HBaseTestingUtility;
31  import org.apache.hadoop.hbase.HColumnDescriptor;
32  import org.apache.hadoop.hbase.HConstants;
33  import org.apache.hadoop.hbase.HTableDescriptor;
34  import org.apache.hadoop.hbase.NamespaceDescriptor;
35  import org.apache.hadoop.hbase.TableName;
36  import org.apache.hadoop.hbase.client.Connection;
37  import org.apache.hadoop.hbase.client.Get;
38  import org.apache.hadoop.hbase.client.HBaseAdmin;
39  import org.apache.hadoop.hbase.client.Put;
40  import org.apache.hadoop.hbase.client.Result;
41  import org.apache.hadoop.hbase.client.Table;
42  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
43  import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
44  import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
45  import org.apache.hadoop.hbase.testclassification.MediumTests;
46  import org.apache.hadoop.hbase.util.Bytes;
47  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
48  import org.apache.hadoop.hbase.wal.WAL;
49  import org.junit.After;
50  import org.junit.Before;
51  import org.junit.Test;
52  import org.junit.experimental.categories.Category;
53  import org.mockito.Matchers;
54  import org.mockito.invocation.InvocationOnMock;
55  import org.mockito.stubbing.Answer;
56  import org.mortbay.log.Log;
57  
58  /**
59   * Testcase for https://issues.apache.org/jira/browse/HBASE-13811
60   */
61  @Category({ MediumTests.class })
62  public class TestSplitWalDataLoss {
63  
64    private final HBaseTestingUtility testUtil = new HBaseTestingUtility();
65  
66    private NamespaceDescriptor namespace = NamespaceDescriptor.create(getClass().getSimpleName())
67        .build();
68  
69    private TableName tableName = TableName.valueOf(namespace.getName(), "dataloss");
70  
71    private byte[] family = Bytes.toBytes("f");
72  
73    private byte[] qualifier = Bytes.toBytes("q");
74  
75    @Before
76    public void setUp() throws Exception {
77      testUtil.getConfiguration().setInt("hbase.regionserver.msginterval", 30000);
78      testUtil.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
79      testUtil.startMiniCluster(2);
80      HBaseAdmin admin = testUtil.getHBaseAdmin();
81      admin.createNamespace(namespace);
82      admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(family)));
83      testUtil.waitTableAvailable(tableName);
84    }
85  
86    @After
87    public void tearDown() throws Exception {
88      testUtil.shutdownMiniCluster();
89    }
90  
91    @Test
92    public void test() throws IOException, InterruptedException {
93      final HRegionServer rs = testUtil.getRSForFirstRegionInTable(tableName);
94      final HRegion region = (HRegion) rs.getOnlineRegions(tableName).get(0);
95      HRegion spiedRegion = spy(region);
96      final MutableBoolean flushed = new MutableBoolean(false);
97      final MutableBoolean reported = new MutableBoolean(false);
98      doAnswer(new Answer<FlushResult>() {
99        @Override
100       public FlushResult answer(InvocationOnMock invocation) throws Throwable {
101         synchronized (flushed) {
102           flushed.setValue(true);
103           flushed.notifyAll();
104         }
105         synchronized (reported) {
106           while (!reported.booleanValue()) {
107             reported.wait();
108           }
109         }
110         rs.getWAL(region.getRegionInfo()).abortCacheFlush(
111           region.getRegionInfo().getEncodedNameAsBytes());
112         throw new DroppedSnapshotException("testcase");
113       }
114     }).when(spiedRegion).internalFlushCacheAndCommit(Matchers.<WAL> any(),
115       Matchers.<MonitoredTask> any(), Matchers.<PrepareFlushResult> any(),
116       Matchers.<Collection<Store>> any());
117     rs.onlineRegions.put(rs.onlineRegions.keySet().iterator().next(), spiedRegion);
118     Connection conn = testUtil.getConnection();
119 
120     try (Table table = conn.getTable(tableName)) {
121       table.put(new Put(Bytes.toBytes("row0")).addColumn(family, qualifier, Bytes.toBytes("val0")));
122     }
123     long oldestSeqIdOfStore = region.getOldestSeqIdOfStore(family);
124     Log.info("CHANGE OLDEST " + oldestSeqIdOfStore);
125     assertTrue(oldestSeqIdOfStore > HConstants.NO_SEQNUM);
126     rs.cacheFlusher.requestFlush(spiedRegion, false);
127     synchronized (flushed) {
128       while (!flushed.booleanValue()) {
129         flushed.wait();
130       }
131     }
132     try (Table table = conn.getTable(tableName)) {
133       table.put(new Put(Bytes.toBytes("row1")).addColumn(family, qualifier, Bytes.toBytes("val1")));
134     }
135     long now = EnvironmentEdgeManager.currentTime();
136     rs.tryRegionServerReport(now - 500, now);
137     synchronized (reported) {
138       reported.setValue(true);
139       reported.notifyAll();
140     }
141     while (testUtil.getRSForFirstRegionInTable(tableName) == rs) {
142       Thread.sleep(100);
143     }
144     try (Table table = conn.getTable(tableName)) {
145       Result result = table.get(new Get(Bytes.toBytes("row0")));
146       assertArrayEquals(Bytes.toBytes("val0"), result.getValue(family, qualifier));
147     }
148   }
149 }