1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.junit.Assert.*;
22
23 import java.io.IOException;
24 import java.util.Arrays;
25 import java.util.Collection;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import java.util.concurrent.atomic.AtomicReference;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.commons.logging.impl.Log4JLogger;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.hbase.HBaseTestingUtility;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.HTableDescriptor;
37 import org.apache.hadoop.hbase.TableName;
38 import org.apache.hadoop.hbase.Waiter.Predicate;
39 import org.apache.hadoop.hbase.client.Admin;
40 import org.apache.hadoop.hbase.client.Connection;
41 import org.apache.hadoop.hbase.client.ConnectionFactory;
42 import org.apache.hadoop.hbase.client.Consistency;
43 import org.apache.hadoop.hbase.client.Get;
44 import org.apache.hadoop.hbase.client.RpcRetryingCaller;
45 import org.apache.hadoop.hbase.client.Table;
46 import org.apache.hadoop.hbase.replication.regionserver.TestRegionReplicaReplicationEndpoint;
47 import org.apache.hadoop.hbase.testclassification.LargeTests;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
50 import org.apache.hadoop.hbase.util.Threads;
51 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
52 import org.apache.log4j.Level;
53 import org.junit.After;
54 import org.junit.Before;
55 import org.junit.Rule;
56 import org.junit.Test;
57 import org.junit.experimental.categories.Category;
58 import org.junit.rules.TestName;
59 import org.junit.runner.RunWith;
60 import org.junit.runners.Parameterized;
61 import org.junit.runners.Parameterized.Parameters;
62
63
64
65
66 @RunWith(Parameterized.class)
67 @Category(LargeTests.class)
68 public class TestRegionReplicaFailover {
69
70 private static final Log LOG = LogFactory.getLog(TestRegionReplicaReplicationEndpoint.class);
71
72 static {
73 ((Log4JLogger)RpcRetryingCaller.LOG).getLogger().setLevel(Level.ALL);
74 }
75
76 private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
77
78 private static final int NB_SERVERS = 3;
79
80 protected final byte[][] families = new byte[][] {HBaseTestingUtility.fam1,
81 HBaseTestingUtility.fam2, HBaseTestingUtility.fam3};
82 protected final byte[] fam = HBaseTestingUtility.fam1;
83 protected final byte[] qual1 = Bytes.toBytes("qual1");
84 protected final byte[] value1 = Bytes.toBytes("value1");
85 protected final byte[] row = Bytes.toBytes("rowA");
86 protected final byte[] row2 = Bytes.toBytes("rowB");
87
88 @Rule public TestName name = new TestName();
89
90 private HTableDescriptor htd;
91
92
93
94
95 @Parameters
96 public static Collection<Object[]> getParameters() {
97 Object[][] params =
98 new Boolean[][] { {true}, {false} };
99 return Arrays.asList(params);
100 }
101
102 @Parameterized.Parameter(0)
103 public boolean distributedLogReplay;
104
105 @Before
106 public void before() throws Exception {
107 Configuration conf = HTU.getConfiguration();
108 conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
109 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
110 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, true);
111 conf.setInt("replication.stats.thread.period.seconds", 5);
112 conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
113 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, distributedLogReplay);
114 conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 60);
115
116 HTU.startMiniCluster(NB_SERVERS);
117 htd = HTU.createTableDescriptor(
118 name.getMethodName().substring(0, name.getMethodName().length()-3));
119 htd.setRegionReplication(3);
120 HTU.getHBaseAdmin().createTable(htd);
121 }
122
123 @After
124 public void after() throws Exception {
125 HTU.deleteTableIfAny(htd.getTableName());
126 HTU.shutdownMiniCluster();
127 }
128
129
130
131
132
133 @Test(timeout = 60000)
134 public void testSecondaryRegionWithEmptyRegion() throws IOException {
135
136
137 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
138 Table table = connection.getTable(htd.getTableName())) {
139
140 Get get = new Get(row);
141 get.setConsistency(Consistency.TIMELINE);
142 get.setReplicaId(1);
143 table.get(get);
144 }
145 }
146
147
148
149
150
151
152 @Test(timeout = 60000)
153 public void testSecondaryRegionWithNonEmptyRegion() throws IOException {
154
155
156 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
157 Table table = connection.getTable(htd.getTableName())) {
158
159 HTU.loadNumericRows(table, fam, 0, 1000);
160
161 HTU.getHBaseAdmin().disableTable(htd.getTableName());
162 HTU.getHBaseAdmin().enableTable(htd.getTableName());
163
164 HTU.verifyNumericRows(table, fam, 0, 1000, 1);
165 }
166 }
167
168
169
170
171 @Test (timeout = 120000)
172 public void testPrimaryRegionKill() throws Exception {
173 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
174 Table table = connection.getTable(htd.getTableName())) {
175
176 HTU.loadNumericRows(table, fam, 0, 1000);
177
178
179 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
180 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
181
182
183
184
185 boolean aborted = false;
186 for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
187 for (Region r : rs.getRegionServer().getOnlineRegions(htd.getTableName())) {
188 if (r.getRegionInfo().getReplicaId() == 0) {
189 LOG.info("Aborting region server hosting primary region replica");
190 rs.getRegionServer().abort("for test");
191 aborted = true;
192 }
193 }
194 }
195 assertTrue(aborted);
196
197
198 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 0, 30000);
199 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
200 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
201 }
202
203
204 HTU.getMiniHBaseCluster().startRegionServer();
205 }
206
207
208
209 private void verifyNumericRowsWithTimeout(final Table table, final byte[] f, final int startRow,
210 final int endRow, final int replicaId, final long timeout) throws Exception {
211 try {
212 HTU.waitFor(timeout, new Predicate<Exception>() {
213 @Override
214 public boolean evaluate() throws Exception {
215 try {
216 HTU.verifyNumericRows(table, f, startRow, endRow, replicaId);
217 return true;
218 } catch (AssertionError ae) {
219 return false;
220 }
221 }
222 });
223 } catch (Throwable t) {
224
225 HTU.verifyNumericRows(table, f, startRow, endRow, replicaId);
226 }
227 }
228
229
230
231
232
233 @Test (timeout = 120000)
234 public void testSecondaryRegionKill() throws Exception {
235 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
236 Table table = connection.getTable(htd.getTableName())) {
237 HTU.loadNumericRows(table, fam, 0, 1000);
238
239
240 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
241 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
242
243
244
245
246 boolean aborted = false;
247 for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
248 for (Region r : rs.getRegionServer().getOnlineRegions(htd.getTableName())) {
249 if (r.getRegionInfo().getReplicaId() == 1) {
250 LOG.info("Aborting region server hosting secondary region replica");
251 rs.getRegionServer().abort("for test");
252 aborted = true;
253 }
254 }
255 }
256 assertTrue(aborted);
257
258 Threads.sleep(5000);
259
260 HTU.verifyNumericRows(table, fam, 0, 1000, 1);
261 HTU.verifyNumericRows(table, fam, 0, 1000, 2);
262 }
263
264
265 HTU.getMiniHBaseCluster().startRegionServer();
266 }
267
268
269
270
271
272
273 @Test (timeout = 120000)
274 public void testSecondaryRegionKillWhilePrimaryIsAcceptingWrites() throws Exception {
275 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
276 Table table = connection.getTable(htd.getTableName());
277 Admin admin = connection.getAdmin()) {
278
279 HTU.loadNumericRows(table, fam, 0, 1000);
280 admin.flush(table.getName());
281 HTU.loadNumericRows(table, fam, 1000, 2000);
282
283 final AtomicReference<Throwable> ex = new AtomicReference<Throwable>(null);
284 final AtomicBoolean done = new AtomicBoolean(false);
285 final AtomicInteger key = new AtomicInteger(2000);
286
287 Thread loader = new Thread() {
288 @Override
289 public void run() {
290 while (!done.get()) {
291 try {
292 HTU.loadNumericRows(table, fam, key.get(), key.get()+1000);
293 key.addAndGet(1000);
294 } catch (Throwable e) {
295 ex.compareAndSet(null, e);
296 }
297 }
298 }
299 };
300 loader.start();
301
302 Thread aborter = new Thread() {
303 @Override
304 public void run() {
305 try {
306 boolean aborted = false;
307 for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
308 for (Region r : rs.getRegionServer().getOnlineRegions(htd.getTableName())) {
309 if (r.getRegionInfo().getReplicaId() == 1) {
310 LOG.info("Aborting region server hosting secondary region replica");
311 rs.getRegionServer().abort("for test");
312 aborted = true;
313 }
314 }
315 }
316 assertTrue(aborted);
317 } catch (Throwable e) {
318 ex.compareAndSet(null, e);
319 }
320 };
321 };
322
323 aborter.start();
324 aborter.join();
325 done.set(true);
326 loader.join();
327
328 assertNull(ex.get());
329
330 assertTrue(key.get() > 1000);
331 LOG.info("Loaded up to key :" + key.get());
332 verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 0, 30000);
333 verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 1, 30000);
334 verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 2, 30000);
335 }
336
337
338 HTU.getMiniHBaseCluster().startRegionServer();
339 }
340
341
342
343
344
345 @Test (timeout = 120000)
346 public void testLotsOfRegionReplicas() throws IOException {
347 int numRegions = NB_SERVERS * 20;
348 int regionReplication = 10;
349 String tableName = htd.getTableName().getNameAsString() + "2";
350 htd = HTU.createTableDescriptor(tableName);
351 htd.setRegionReplication(regionReplication);
352
353
354 byte[] startKey = Bytes.toBytes("aaa");
355 byte[] endKey = Bytes.toBytes("zzz");
356 byte[][] splits = HTU.getRegionSplitStartKeys(startKey, endKey, numRegions);
357 HTU.getHBaseAdmin().createTable(htd, startKey, endKey, numRegions);
358
359 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
360 Table table = connection.getTable(htd.getTableName())) {
361
362 for (int i = 1; i < splits.length; i++) {
363 for (int j = 0; j < regionReplication; j++) {
364 Get get = new Get(splits[i]);
365 get.setConsistency(Consistency.TIMELINE);
366 get.setReplicaId(j);
367 table.get(get);
368 }
369 }
370 }
371
372 HTU.deleteTableIfAny(TableName.valueOf(tableName));
373 }
374 }