1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master.procedure;
20
21 import java.io.IOException;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.CountDownLatch;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.fs.FileSystem;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.hbase.HBaseTestingUtility;
31 import org.apache.hadoop.hbase.HRegionInfo;
32 import org.apache.hadoop.hbase.HTableDescriptor;
33 import org.apache.hadoop.hbase.MiniHBaseCluster;
34 import org.apache.hadoop.hbase.TableName;
35 import org.apache.hadoop.hbase.master.HMaster;
36 import org.apache.hadoop.hbase.procedure2.Procedure;
37 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
38 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
39 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
40 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
41 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
42 import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CreateTableState;
43 import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteTableState;
44 import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DisableTableState;
45 import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.EnableTableState;
46 import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.TruncateTableState;
47 import org.apache.hadoop.hbase.testclassification.LargeTests;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.apache.hadoop.hbase.util.FSUtils;
50 import org.apache.hadoop.hbase.util.ModifyRegionUtils;
51 import org.apache.hadoop.hbase.util.Threads;
52 import org.apache.hadoop.hdfs.MiniDFSCluster;
53 import org.apache.hadoop.hdfs.server.datanode.DataNode;
54
55 import org.junit.After;
56 import org.junit.Before;
57 import org.junit.Ignore;
58 import org.junit.Test;
59 import org.junit.experimental.categories.Category;
60 import org.mockito.Mockito;
61
62 import static org.junit.Assert.assertEquals;
63 import static org.junit.Assert.assertFalse;
64 import static org.junit.Assert.assertTrue;
65 import static org.junit.Assert.fail;
66
67 @Category(LargeTests.class)
68 public class TestWALProcedureStoreOnHDFS {
69 private static final Log LOG = LogFactory.getLog(TestWALProcedureStoreOnHDFS.class);
70
71 protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
72
73 private WALProcedureStore store;
74
75 private static void setupConf(Configuration conf) {
76 conf.setInt("dfs.replication", 3);
77 conf.setInt("dfs.namenode.replication.min", 3);
78
79
80 conf.setInt("hbase.procedure.store.wal.wait.before.roll", 1000);
81 conf.setInt("hbase.procedure.store.wal.max.roll.retries", 5);
82 conf.setInt("hbase.procedure.store.wal.sync.failure.roll.max", 5);
83 }
84
85 @Before
86 public void setup() throws Exception {
87 setupConf(UTIL.getConfiguration());
88 MiniDFSCluster dfs = UTIL.startMiniDFSCluster(3);
89
90 Path logDir = new Path(new Path(dfs.getFileSystem().getUri()), "/test-logs");
91 store = ProcedureTestingUtility.createWalStore(
92 UTIL.getConfiguration(), dfs.getFileSystem(), logDir);
93 store.registerListener(new ProcedureStore.ProcedureStoreListener() {
94 @Override
95 public void postSync() {}
96
97 @Override
98 public void abortProcess() {
99 LOG.fatal("Abort the Procedure Store");
100 store.stop(true);
101 }
102 });
103 store.start(8);
104 store.recoverLease();
105 }
106
107 @After
108 public void tearDown() throws Exception {
109 store.stop(false);
110 UTIL.getDFSCluster().getFileSystem().delete(store.getWALDir(), true);
111
112 try {
113 UTIL.shutdownMiniCluster();
114 } catch (Exception e) {
115 LOG.warn("failure shutting down cluster", e);
116 }
117 }
118
119 @Test(timeout=60000, expected=RuntimeException.class)
120 public void testWalAbortOnLowReplication() throws Exception {
121 assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
122
123 LOG.info("Stop DataNode");
124 UTIL.getDFSCluster().stopDataNode(0);
125 assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
126
127 store.insert(new TestProcedure(1, -1), null);
128 for (long i = 2; store.isRunning(); ++i) {
129 assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
130 store.insert(new TestProcedure(i, -1), null);
131 Thread.sleep(100);
132 }
133 assertFalse(store.isRunning());
134 fail("The store.insert() should throw an exeption");
135 }
136
137 @Test(timeout=60000)
138 public void testWalAbortOnLowReplicationWithQueuedWriters() throws Exception {
139 assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
140
141 store.registerListener(new ProcedureStore.ProcedureStoreListener() {
142 @Override
143 public void postSync() {
144 Threads.sleepWithoutInterrupt(2000);
145 }
146
147 @Override
148 public void abortProcess() {}
149 });
150
151 final AtomicInteger reCount = new AtomicInteger(0);
152 Thread[] thread = new Thread[store.getNumThreads() * 2 + 1];
153 for (int i = 0; i < thread.length; ++i) {
154 final long procId = i + 1;
155 thread[i] = new Thread() {
156 public void run() {
157 try {
158 LOG.debug("[S] INSERT " + procId);
159 store.insert(new TestProcedure(procId, -1), null);
160 LOG.debug("[E] INSERT " + procId);
161 } catch (RuntimeException e) {
162 reCount.incrementAndGet();
163 LOG.debug("[F] INSERT " + procId + ": " + e.getMessage());
164 }
165 }
166 };
167 thread[i].start();
168 }
169
170 Thread.sleep(1000);
171 LOG.info("Stop DataNode");
172 UTIL.getDFSCluster().stopDataNode(0);
173 assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
174
175 for (int i = 0; i < thread.length; ++i) {
176 thread[i].join();
177 }
178
179 assertFalse(store.isRunning());
180 assertTrue(reCount.toString(), reCount.get() >= store.getNumThreads() &&
181 reCount.get() < thread.length);
182 }
183
184 @Test(timeout=60000)
185 @Ignore
186 public void testWalRollOnLowReplication() throws Exception {
187 int dnCount = 0;
188 store.insert(new TestProcedure(1, -1), null);
189 UTIL.getDFSCluster().restartDataNode(dnCount);
190 for (long i = 2; i < 100; ++i) {
191 store.insert(new TestProcedure(i, -1), null);
192 waitForNumReplicas(3);
193 Thread.sleep(100);
194 if ((i % 30) == 0) {
195 LOG.info("Restart Data Node");
196 UTIL.getDFSCluster().restartDataNode(++dnCount % 3);
197 }
198 }
199 assertTrue(store.isRunning());
200 }
201
202 public void waitForNumReplicas(int numReplicas) throws Exception {
203 while (UTIL.getDFSCluster().getDataNodes().size() < numReplicas) {
204 Thread.sleep(100);
205 }
206
207 for (int i = 0; i < numReplicas; ++i) {
208 for (DataNode dn: UTIL.getDFSCluster().getDataNodes()) {
209 while (!dn.isDatanodeFullyStarted()) {
210 Thread.sleep(100);
211 }
212 }
213 }
214 }
215 }