1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete;
22 import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_final_transition_failed;
23 import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_preempt_task;
24 import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_acquired;
25 import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_done;
26 import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_err;
27 import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_resigned;
28 import static org.junit.Assert.assertEquals;
29 import static org.junit.Assert.assertFalse;
30 import static org.junit.Assert.assertTrue;
31 import static org.junit.Assert.fail;
32
33 import java.io.IOException;
34 import java.util.ArrayList;
35 import java.util.Arrays;
36 import java.util.HashSet;
37 import java.util.Iterator;
38 import java.util.LinkedList;
39 import java.util.List;
40 import java.util.NavigableSet;
41 import java.util.Set;
42 import java.util.concurrent.ExecutorService;
43 import java.util.concurrent.Executors;
44 import java.util.concurrent.Future;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.TimeoutException;
47 import java.util.concurrent.atomic.AtomicLong;
48
49 import org.apache.commons.logging.Log;
50 import org.apache.commons.logging.LogFactory;
51 import org.apache.hadoop.conf.Configuration;
52 import org.apache.hadoop.fs.FSDataOutputStream;
53 import org.apache.hadoop.fs.FileStatus;
54 import org.apache.hadoop.fs.FileSystem;
55 import org.apache.hadoop.fs.Path;
56 import org.apache.hadoop.fs.PathFilter;
57 import org.apache.hadoop.hbase.HBaseConfiguration;
58 import org.apache.hadoop.hbase.HBaseTestingUtility;
59 import org.apache.hadoop.hbase.HColumnDescriptor;
60 import org.apache.hadoop.hbase.HConstants;
61 import org.apache.hadoop.hbase.HRegionInfo;
62 import org.apache.hadoop.hbase.HTableDescriptor;
63 import org.apache.hadoop.hbase.KeyValue;
64 import org.apache.hadoop.hbase.MiniHBaseCluster;
65 import org.apache.hadoop.hbase.NamespaceDescriptor;
66 import org.apache.hadoop.hbase.ServerName;
67 import org.apache.hadoop.hbase.SplitLogCounters;
68 import org.apache.hadoop.hbase.TableName;
69 import org.apache.hadoop.hbase.Waiter;
70 import org.apache.hadoop.hbase.client.ClusterConnection;
71 import org.apache.hadoop.hbase.client.ConnectionUtils;
72 import org.apache.hadoop.hbase.client.Delete;
73 import org.apache.hadoop.hbase.client.Get;
74 import org.apache.hadoop.hbase.client.HTable;
75 import org.apache.hadoop.hbase.client.Increment;
76 import org.apache.hadoop.hbase.client.NonceGenerator;
77 import org.apache.hadoop.hbase.client.PerClientRandomNonceGenerator;
78 import org.apache.hadoop.hbase.client.Put;
79 import org.apache.hadoop.hbase.client.RegionLocator;
80 import org.apache.hadoop.hbase.client.Result;
81 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
82 import org.apache.hadoop.hbase.client.Table;
83 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
84 import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
85 import org.apache.hadoop.hbase.exceptions.OperationConflictException;
86 import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
87 import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
88 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
89 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
90 import org.apache.hadoop.hbase.regionserver.HRegion;
91 import org.apache.hadoop.hbase.regionserver.HRegionServer;
92 import org.apache.hadoop.hbase.regionserver.Region;
93 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
94 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
95 import org.apache.hadoop.hbase.testclassification.LargeTests;
96 import org.apache.hadoop.hbase.util.Bytes;
97 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
98 import org.apache.hadoop.hbase.util.FSUtils;
99 import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
100 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
101 import org.apache.hadoop.hbase.util.Threads;
102 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
103 import org.apache.hadoop.hbase.wal.WAL;
104 import org.apache.hadoop.hbase.wal.WALFactory;
105 import org.apache.hadoop.hbase.wal.WALSplitter;
106 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
107 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
108 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
109 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
110 import org.apache.hadoop.hdfs.MiniDFSCluster;
111 import org.apache.zookeeper.KeeperException;
112 import org.junit.After;
113 import org.junit.AfterClass;
114 import org.junit.Assert;
115 import org.junit.Before;
116 import org.junit.BeforeClass;
117 import org.junit.Ignore;
118 import org.junit.Test;
119 import org.junit.experimental.categories.Category;
120
121 @Category(LargeTests.class)
122 @SuppressWarnings("deprecation")
123 public class TestDistributedLogSplitting {
124 private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class);
125 static {
126
127
128
129
130
131
132 System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
133
134 }
135
136
137 static final int NUM_MASTERS = 2;
138 static final int NUM_RS = 6;
139
140 MiniHBaseCluster cluster;
141 HMaster master;
142 Configuration conf;
143 static Configuration originalConf;
144 static HBaseTestingUtility TEST_UTIL;
145 static MiniDFSCluster dfsCluster;
146 static MiniZooKeeperCluster zkCluster;
147
148 @BeforeClass
149 public static void setup() throws Exception {
150 TEST_UTIL = new HBaseTestingUtility(HBaseConfiguration.create());
151 dfsCluster = TEST_UTIL.startMiniDFSCluster(1);
152 zkCluster = TEST_UTIL.startMiniZKCluster();
153 originalConf = TEST_UTIL.getConfiguration();
154 }
155
156 @AfterClass
157 public static void tearDown() throws IOException {
158 TEST_UTIL.shutdownMiniZKCluster();
159 TEST_UTIL.shutdownMiniDFSCluster();
160 TEST_UTIL.shutdownMiniHBaseCluster();
161 }
162
163 private void startCluster(int num_rs) throws Exception {
164 SplitLogCounters.resetCounters();
165 LOG.info("Starting cluster");
166 conf.getLong("hbase.splitlog.max.resubmit", 0);
167
168 conf.setInt("zookeeper.recovery.retry", 0);
169 conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
170 conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0);
171 conf.setInt("hbase.regionserver.wal.max.splitters", 3);
172 conf.setInt("hbase.master.maximum.ping.server.attempts", 3);
173 conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 40);
174 TEST_UTIL.shutdownMiniHBaseCluster();
175 TEST_UTIL = new HBaseTestingUtility(conf);
176 TEST_UTIL.setDFSCluster(dfsCluster);
177 TEST_UTIL.setZkCluster(zkCluster);
178 TEST_UTIL.startMiniHBaseCluster(NUM_MASTERS, num_rs);
179 cluster = TEST_UTIL.getHBaseCluster();
180 LOG.info("Waiting for active/ready master");
181 cluster.waitForActiveAndReadyMaster();
182 master = cluster.getMaster();
183 while (cluster.getLiveRegionServerThreads().size() < num_rs) {
184 Threads.sleep(1);
185 }
186 }
187
188 @Before
189 public void before() throws Exception {
190
191 conf = HBaseConfiguration.create(originalConf);
192 }
193
194 @After
195 public void after() throws Exception {
196 try {
197 if (TEST_UTIL.getHBaseCluster() != null) {
198 for (MasterThread mt : TEST_UTIL.getHBaseCluster().getLiveMasterThreads()) {
199 mt.getMaster().abort("closing...", null);
200 }
201 }
202 TEST_UTIL.shutdownMiniHBaseCluster();
203 } finally {
204 TEST_UTIL.getTestFileSystem().delete(FSUtils.getRootDir(TEST_UTIL.getConfiguration()), true);
205 ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase");
206 }
207 }
208
209 @Test (timeout=300000)
210 public void testRecoveredEdits() throws Exception {
211 LOG.info("testRecoveredEdits");
212 conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024);
213 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
214 startCluster(NUM_RS);
215
216 final int NUM_LOG_LINES = 1000;
217 final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
218
219
220 master.balanceSwitch(false);
221 FileSystem fs = master.getMasterFileSystem().getFileSystem();
222
223 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
224
225 Path rootdir = FSUtils.getRootDir(conf);
226
227 installTable(new ZooKeeperWatcher(conf, "table-creation", null),
228 "table", "family", 40);
229 TableName table = TableName.valueOf("table");
230 List<HRegionInfo> regions = null;
231 HRegionServer hrs = null;
232 for (int i = 0; i < NUM_RS; i++) {
233 boolean foundRs = false;
234 hrs = rsts.get(i).getRegionServer();
235 regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
236 for (HRegionInfo region : regions) {
237 if (region.getTable().getNameAsString().equalsIgnoreCase("table")) {
238 foundRs = true;
239 break;
240 }
241 }
242 if (foundRs) break;
243 }
244 final Path logDir = new Path(rootdir, DefaultWALProvider.getWALDirectoryName(hrs
245 .getServerName().toString()));
246
247 LOG.info("#regions = " + regions.size());
248 Iterator<HRegionInfo> it = regions.iterator();
249 while (it.hasNext()) {
250 HRegionInfo region = it.next();
251 if (region.getTable().getNamespaceAsString()
252 .equals(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR)) {
253 it.remove();
254 }
255 }
256
257 makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100);
258
259 slm.splitLogDistributed(logDir);
260
261 int count = 0;
262 for (HRegionInfo hri : regions) {
263
264 Path tdir = FSUtils.getTableDir(rootdir, table);
265 Path editsdir =
266 WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
267 LOG.debug("checking edits dir " + editsdir);
268 FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
269 @Override
270 public boolean accept(Path p) {
271 if (WALSplitter.isSequenceIdFile(p)) {
272 return false;
273 }
274 return true;
275 }
276 });
277 assertTrue("edits dir should have more than a single file in it. instead has " + files.length,
278 files.length > 1);
279 for (int i = 0; i < files.length; i++) {
280 int c = countWAL(files[i].getPath(), fs, conf);
281 count += c;
282 }
283 LOG.info(count + " edits in " + files.length + " recovered edits files.");
284 }
285
286
287 assertFalse(fs.exists(logDir));
288
289 assertEquals(NUM_LOG_LINES, count);
290 }
291
292 @Test(timeout = 300000)
293 public void testLogReplayWithNonMetaRSDown() throws Exception {
294 LOG.info("testLogReplayWithNonMetaRSDown");
295 conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024);
296 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
297 startCluster(NUM_RS);
298 final int NUM_REGIONS_TO_CREATE = 40;
299 final int NUM_LOG_LINES = 1000;
300
301
302 master.balanceSwitch(false);
303
304 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
305 Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
306
307 HRegionServer hrs = findRSToKill(false, "table");
308 List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
309 makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100);
310
311
312 this.abortRSAndVerifyRecovery(hrs, ht, zkw, NUM_REGIONS_TO_CREATE, NUM_LOG_LINES);
313 ht.close();
314 zkw.close();
315 }
316
317 private static class NonceGeneratorWithDups extends PerClientRandomNonceGenerator {
318 private boolean isDups = false;
319 private LinkedList<Long> nonces = new LinkedList<Long>();
320
321 public void startDups() {
322 isDups = true;
323 }
324
325 @Override
326 public long newNonce() {
327 long nonce = isDups ? nonces.removeFirst() : super.newNonce();
328 if (!isDups) {
329 nonces.add(nonce);
330 }
331 return nonce;
332 }
333 }
334
335 @Test(timeout = 300000)
336 public void testNonceRecovery() throws Exception {
337 LOG.info("testNonceRecovery");
338 final String TABLE_NAME = "table";
339 final String FAMILY_NAME = "family";
340 final int NUM_REGIONS_TO_CREATE = 40;
341
342 conf.setLong("hbase.regionserver.hlog.blocksize", 100*1024);
343 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
344 startCluster(NUM_RS);
345 master.balanceSwitch(false);
346
347 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
348 HTable ht = installTable(zkw, TABLE_NAME, FAMILY_NAME, NUM_REGIONS_TO_CREATE);
349 NonceGeneratorWithDups ng = new NonceGeneratorWithDups();
350 NonceGenerator oldNg =
351 ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection)ht.getConnection(), ng);
352
353 try {
354 List<Increment> reqs = new ArrayList<Increment>();
355 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
356 HRegionServer hrs = rst.getRegionServer();
357 List<HRegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
358 for (HRegionInfo hri : hris) {
359 if (TABLE_NAME.equalsIgnoreCase(hri.getTable().getNameAsString())) {
360 byte[] key = hri.getStartKey();
361 if (key == null || key.length == 0) {
362 key = Bytes.copy(hri.getEndKey());
363 --(key[key.length - 1]);
364 }
365 Increment incr = new Increment(key);
366 incr.addColumn(Bytes.toBytes(FAMILY_NAME), Bytes.toBytes("q"), 1);
367 ht.increment(incr);
368 reqs.add(incr);
369 }
370 }
371 }
372
373 HRegionServer hrs = findRSToKill(false, "table");
374 abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
375 ng.startDups();
376 for (Increment incr : reqs) {
377 try {
378 ht.increment(incr);
379 fail("should have thrown");
380 } catch (OperationConflictException ope) {
381 LOG.debug("Caught as expected: " + ope.getMessage());
382 }
383 }
384 } finally {
385 ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection) ht.getConnection(), oldNg);
386 ht.close();
387 zkw.close();
388 }
389 }
390
391 @Test(timeout = 300000)
392 public void testLogReplayWithMetaRSDown() throws Exception {
393 LOG.info("testRecoveredEditsReplayWithMetaRSDown");
394 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
395 startCluster(NUM_RS);
396 final int NUM_REGIONS_TO_CREATE = 40;
397 final int NUM_LOG_LINES = 1000;
398
399
400 master.balanceSwitch(false);
401
402 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
403 Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
404
405 HRegionServer hrs = findRSToKill(true, "table");
406 List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
407 makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100);
408
409 this.abortRSAndVerifyRecovery(hrs, ht, zkw, NUM_REGIONS_TO_CREATE, NUM_LOG_LINES);
410 ht.close();
411 zkw.close();
412 }
413
414 private void abortRSAndVerifyRecovery(HRegionServer hrs, Table ht, final ZooKeeperWatcher zkw,
415 final int numRegions, final int numofLines) throws Exception {
416
417 abortRSAndWaitForRecovery(hrs, zkw, numRegions);
418 assertEquals(numofLines, TEST_UTIL.countRows(ht));
419 }
420
421 private void abortRSAndWaitForRecovery(HRegionServer hrs, final ZooKeeperWatcher zkw,
422 final int numRegions) throws Exception {
423 final MiniHBaseCluster tmpCluster = this.cluster;
424
425
426 LOG.info("Aborting region server: " + hrs.getServerName());
427 hrs.abort("testing");
428
429
430 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
431 @Override
432 public boolean evaluate() throws Exception {
433 return (tmpCluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
434 }
435 });
436
437
438 TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
439 @Override
440 public boolean evaluate() throws Exception {
441 return (HBaseTestingUtility.getAllOnlineRegions(tmpCluster).size()
442 >= (numRegions + 1));
443 }
444 });
445
446
447 TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
448 @Override
449 public boolean evaluate() throws Exception {
450 List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
451 zkw.recoveringRegionsZNode, false);
452 return (recoveringRegions != null && recoveringRegions.size() == 0);
453 }
454 });
455 }
456
457 @Test(timeout = 300000)
458 public void testMasterStartsUpWithLogSplittingWork() throws Exception {
459 LOG.info("testMasterStartsUpWithLogSplittingWork");
460 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
461 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
462 startCluster(NUM_RS);
463
464 final int NUM_REGIONS_TO_CREATE = 40;
465 final int NUM_LOG_LINES = 1000;
466
467
468 master.balanceSwitch(false);
469
470 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
471 Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
472
473 HRegionServer hrs = findRSToKill(false, "table");
474 List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
475 makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100);
476
477
478 abortMaster(cluster);
479
480
481 LOG.info("Aborting region server: " + hrs.getServerName());
482 hrs.abort("testing");
483
484
485 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
486 @Override
487 public boolean evaluate() throws Exception {
488 return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
489 }
490 });
491
492 Thread.sleep(2000);
493 LOG.info("Current Open Regions:"
494 + HBaseTestingUtility.getAllOnlineRegions(cluster).size());
495
496
497 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
498 @Override
499 public boolean evaluate() throws Exception {
500 return (HBaseTestingUtility.getAllOnlineRegions(cluster).size()
501 >= (NUM_REGIONS_TO_CREATE + 1));
502 }
503 });
504
505 LOG.info("Current Open Regions After Master Node Starts Up:"
506 + HBaseTestingUtility.getAllOnlineRegions(cluster).size());
507
508 assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
509
510 ht.close();
511 zkw.close();
512 }
513
514 @Test(timeout = 300000)
515 public void testMasterStartsUpWithLogReplayWork() throws Exception {
516 LOG.info("testMasterStartsUpWithLogReplayWork");
517 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
518 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
519 startCluster(NUM_RS);
520
521 final int NUM_REGIONS_TO_CREATE = 40;
522 final int NUM_LOG_LINES = 1000;
523
524
525 master.balanceSwitch(false);
526
527 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
528 Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
529
530 HRegionServer hrs = findRSToKill(false, "table");
531 List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
532 makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100);
533
534
535 abortMaster(cluster);
536
537
538 LOG.info("Aborting region server: " + hrs.getServerName());
539 hrs.abort("testing");
540
541
542 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
543 @Override
544 public boolean evaluate() throws Exception {
545 return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
546 }
547 });
548
549 Thread.sleep(2000);
550 LOG.info("Current Open Regions:" + HBaseTestingUtility.getAllOnlineRegions(cluster).size());
551
552
553 TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
554 @Override
555 public boolean evaluate() throws Exception {
556 List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
557 zkw.recoveringRegionsZNode, false);
558 boolean done = recoveringRegions != null && recoveringRegions.size() == 0;
559 if (!done) {
560 LOG.info("Recovering regions: " + recoveringRegions);
561 }
562 return done;
563 }
564 });
565
566 LOG.info("Current Open Regions After Master Node Starts Up:"
567 + HBaseTestingUtility.getAllOnlineRegions(cluster).size());
568
569 assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
570
571 ht.close();
572 zkw.close();
573 }
574
575
576 @Test(timeout = 300000)
577 public void testLogReplayTwoSequentialRSDown() throws Exception {
578 LOG.info("testRecoveredEditsReplayTwoSequentialRSDown");
579 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
580 startCluster(NUM_RS);
581 final int NUM_REGIONS_TO_CREATE = 40;
582 final int NUM_LOG_LINES = 1000;
583
584
585 master.balanceSwitch(false);
586
587 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
588 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
589 Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
590
591 List<HRegionInfo> regions = null;
592 HRegionServer hrs1 = findRSToKill(false, "table");
593 regions = ProtobufUtil.getOnlineRegions(hrs1.getRSRpcServices());
594
595 makeWAL(hrs1, regions, "table", "family", NUM_LOG_LINES, 100);
596
597
598 LOG.info("Aborting region server: " + hrs1.getServerName());
599 hrs1.abort("testing");
600
601
602 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
603 @Override
604 public boolean evaluate() throws Exception {
605 return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
606 }
607 });
608
609
610 TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
611 @Override
612 public boolean evaluate() throws Exception {
613 return (HBaseTestingUtility.getAllOnlineRegions(cluster).size()
614 >= (NUM_REGIONS_TO_CREATE + 1));
615 }
616 });
617
618
619 Thread.sleep(300);
620
621 rsts = cluster.getLiveRegionServerThreads();
622 HRegionServer hrs2 = rsts.get(0).getRegionServer();
623 LOG.info("Aborting one more region server: " + hrs2.getServerName());
624 hrs2.abort("testing");
625
626
627 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
628 @Override
629 public boolean evaluate() throws Exception {
630 return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 2));
631 }
632 });
633
634
635 TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
636 @Override
637 public boolean evaluate() throws Exception {
638 return (HBaseTestingUtility.getAllOnlineRegions(cluster).size()
639 >= (NUM_REGIONS_TO_CREATE + 1));
640 }
641 });
642
643
644 TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
645 @Override
646 public boolean evaluate() throws Exception {
647 List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
648 zkw.recoveringRegionsZNode, false);
649 return (recoveringRegions != null && recoveringRegions.size() == 0);
650 }
651 });
652
653 assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
654 ht.close();
655 zkw.close();
656 }
657
658 @Test(timeout = 300000)
659 public void testMarkRegionsRecoveringInZK() throws Exception {
660 LOG.info("testMarkRegionsRecoveringInZK");
661 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
662 startCluster(NUM_RS);
663 master.balanceSwitch(false);
664 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
665 final ZooKeeperWatcher zkw = master.getZooKeeper();
666 Table ht = installTable(zkw, "table", "family", 40);
667 final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
668
669 Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>();
670 HRegionInfo region = null;
671 HRegionServer hrs = null;
672 ServerName firstFailedServer = null;
673 ServerName secondFailedServer = null;
674 for (int i = 0; i < NUM_RS; i++) {
675 hrs = rsts.get(i).getRegionServer();
676 List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
677 if (regions.isEmpty()) continue;
678 region = regions.get(0);
679 regionSet.add(region);
680 firstFailedServer = hrs.getServerName();
681 secondFailedServer = rsts.get((i + 1) % NUM_RS).getRegionServer().getServerName();
682 break;
683 }
684
685 slm.markRegionsRecovering(firstFailedServer, regionSet);
686 slm.markRegionsRecovering(secondFailedServer, regionSet);
687
688 List<String> recoveringRegions = ZKUtil.listChildrenNoWatch(zkw,
689 ZKUtil.joinZNode(zkw.recoveringRegionsZNode, region.getEncodedName()));
690
691 assertEquals(recoveringRegions.size(), 2);
692
693
694 final HRegionServer tmphrs = hrs;
695 TEST_UTIL.waitFor(60000, 1000, new Waiter.Predicate<Exception>() {
696 @Override
697 public boolean evaluate() throws Exception {
698 return (tmphrs.getRecoveringRegions().size() == 0);
699 }
700 });
701 ht.close();
702 }
703
704 @Test(timeout = 300000)
705 public void testReplayCmd() throws Exception {
706 LOG.info("testReplayCmd");
707 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
708 startCluster(NUM_RS);
709 final int NUM_REGIONS_TO_CREATE = 40;
710
711
712 master.balanceSwitch(false);
713
714 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
715 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
716 HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
717
718 List<HRegionInfo> regions = null;
719 HRegionServer hrs = null;
720 for (int i = 0; i < NUM_RS; i++) {
721 boolean isCarryingMeta = false;
722 hrs = rsts.get(i).getRegionServer();
723 regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
724 for (HRegionInfo region : regions) {
725 if (region.isMetaRegion()) {
726 isCarryingMeta = true;
727 break;
728 }
729 }
730 if (isCarryingMeta) {
731 continue;
732 }
733 if (regions.size() > 0) break;
734 }
735
736 this.prepareData(ht, Bytes.toBytes("family"), Bytes.toBytes("c1"));
737 String originalCheckSum = TEST_UTIL.checksumRows(ht);
738
739
740 abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
741
742 assertEquals("Data should remain after reopening of regions", originalCheckSum,
743 TEST_UTIL.checksumRows(ht));
744
745 ht.close();
746 zkw.close();
747 }
748
749 @Test(timeout = 300000)
750 public void testLogReplayForDisablingTable() throws Exception {
751 LOG.info("testLogReplayForDisablingTable");
752 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
753 startCluster(NUM_RS);
754 final int NUM_REGIONS_TO_CREATE = 40;
755 final int NUM_LOG_LINES = 1000;
756
757 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
758 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
759 Table disablingHT = installTable(zkw, "disableTable", "family", NUM_REGIONS_TO_CREATE);
760 Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE, NUM_REGIONS_TO_CREATE);
761
762
763
764 master.balanceSwitch(false);
765
766 List<HRegionInfo> regions = null;
767 HRegionServer hrs = null;
768 boolean hasRegionsForBothTables = false;
769 String tableName = null;
770 for (int i = 0; i < NUM_RS; i++) {
771 tableName = null;
772 hasRegionsForBothTables = false;
773 boolean isCarryingSystem = false;
774 hrs = rsts.get(i).getRegionServer();
775 regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
776 for (HRegionInfo region : regions) {
777 if (region.getTable().isSystemTable()) {
778 isCarryingSystem = true;
779 break;
780 }
781 if (tableName != null &&
782 !tableName.equalsIgnoreCase(region.getTable().getNameAsString())) {
783
784 hasRegionsForBothTables = true;
785 break;
786 } else if (tableName == null) {
787 tableName = region.getTable().getNameAsString();
788 }
789 }
790 if (isCarryingSystem) {
791 continue;
792 }
793 if (hasRegionsForBothTables) {
794 break;
795 }
796 }
797
798
799 Assert.assertTrue(hasRegionsForBothTables);
800
801 LOG.info("#regions = " + regions.size());
802 Iterator<HRegionInfo> it = regions.iterator();
803 while (it.hasNext()) {
804 HRegionInfo region = it.next();
805 if (region.isMetaTable()) {
806 it.remove();
807 }
808 }
809 makeWAL(hrs, regions, "disableTable", "family", NUM_LOG_LINES, 100, false);
810 makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100);
811
812 LOG.info("Disabling table\n");
813 TEST_UTIL.getHBaseAdmin().disableTable(TableName.valueOf("disableTable"));
814
815
816 LOG.info("Aborting region server: " + hrs.getServerName());
817 hrs.abort("testing");
818
819
820 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
821 @Override
822 public boolean evaluate() throws Exception {
823 return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
824 }
825 });
826
827
828 TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
829 @Override
830 public boolean evaluate() throws Exception {
831 return (HBaseTestingUtility.getAllOnlineRegions(cluster).size()
832 >= (NUM_REGIONS_TO_CREATE + 1));
833 }
834 });
835
836
837 TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
838 @Override
839 public boolean evaluate() throws Exception {
840 List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
841 zkw.recoveringRegionsZNode, false);
842 ServerManager serverManager = master.getServerManager();
843 return (!serverManager.areDeadServersInProgress() &&
844 recoveringRegions != null && recoveringRegions.size() == 0);
845 }
846 });
847
848 int count = 0;
849 FileSystem fs = master.getMasterFileSystem().getFileSystem();
850 Path rootdir = FSUtils.getRootDir(conf);
851 Path tdir = FSUtils.getTableDir(rootdir, TableName.valueOf("disableTable"));
852 for (HRegionInfo hri : regions) {
853 Path editsdir =
854 WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
855 LOG.debug("checking edits dir " + editsdir);
856 if(!fs.exists(editsdir)) continue;
857 FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
858 @Override
859 public boolean accept(Path p) {
860 if (WALSplitter.isSequenceIdFile(p)) {
861 return false;
862 }
863 return true;
864 }
865 });
866 if(files != null) {
867 for(FileStatus file : files) {
868 int c = countWAL(file.getPath(), fs, conf);
869 count += c;
870 LOG.info(c + " edits in " + file.getPath());
871 }
872 }
873 }
874
875 LOG.info("Verify edits in recovered.edits files");
876 assertEquals(NUM_LOG_LINES, count);
877 LOG.info("Verify replayed edits");
878 assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
879
880
881 for (HRegionInfo hri : regions) {
882 Path editsdir =
883 WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
884 fs.delete(editsdir, true);
885 }
886 disablingHT.close();
887 ht.close();
888 zkw.close();
889 }
890
891 @Ignore ("We don't support DLR anymore") @Test(timeout = 300000)
892 public void testDisallowWritesInRecovering() throws Exception {
893 LOG.info("testDisallowWritesInRecovering");
894 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
895 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
896 conf.setBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING, true);
897 startCluster(NUM_RS);
898 final int NUM_REGIONS_TO_CREATE = 40;
899
900
901 master.balanceSwitch(false);
902
903 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
904 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
905 Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
906 final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
907
908 Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>();
909 HRegionInfo region = null;
910 HRegionServer hrs = null;
911 HRegionServer dstRS = null;
912 for (int i = 0; i < NUM_RS; i++) {
913 hrs = rsts.get(i).getRegionServer();
914 List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
915 if (regions.isEmpty()) continue;
916 region = regions.get(0);
917 if (region.isMetaRegion()) continue;
918 regionSet.add(region);
919 dstRS = rsts.get((i+1) % NUM_RS).getRegionServer();
920 break;
921 }
922
923 slm.markRegionsRecovering(hrs.getServerName(), regionSet);
924
925 final HRegionInfo hri = region;
926 final HRegionServer tmpRS = dstRS;
927 TEST_UTIL.getHBaseAdmin().move(region.getEncodedNameAsBytes(),
928 Bytes.toBytes(dstRS.getServerName().getServerName()));
929
930 final RegionStates regionStates =
931 TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
932 TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() {
933 @Override
934 public boolean evaluate() throws Exception {
935 ServerName sn = regionStates.getRegionServerOfRegion(hri);
936 return (sn != null && sn.equals(tmpRS.getServerName()));
937 }
938 });
939
940 try {
941 byte[] key = region.getStartKey();
942 if (key == null || key.length == 0) {
943 key = new byte[] { 0, 0, 0, 0, 1 };
944 }
945 Put put = new Put(key);
946 put.add(Bytes.toBytes("family"), Bytes.toBytes("c1"), new byte[]{'b'});
947 ht.put(put);
948 } catch (IOException ioe) {
949 Assert.assertTrue(ioe instanceof RetriesExhaustedWithDetailsException);
950 RetriesExhaustedWithDetailsException re = (RetriesExhaustedWithDetailsException) ioe;
951 boolean foundRegionInRecoveryException = false;
952 for (Throwable t : re.getCauses()) {
953 if (t instanceof RegionInRecoveryException) {
954 foundRegionInRecoveryException = true;
955 break;
956 }
957 }
958 Assert.assertTrue(
959 "No RegionInRecoveryException. Following exceptions returned=" + re.getCauses(),
960 foundRegionInRecoveryException);
961 }
962
963 ht.close();
964 zkw.close();
965 }
966
967
968
969
970
971
972
973
974
975
976 @Test (timeout=300000)
977 public void testWorkerAbort() throws Exception {
978 LOG.info("testWorkerAbort");
979 startCluster(3);
980 final int NUM_LOG_LINES = 10000;
981 final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
982 FileSystem fs = master.getMasterFileSystem().getFileSystem();
983
984 final List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
985 HRegionServer hrs = findRSToKill(false, "table");
986 Path rootdir = FSUtils.getRootDir(conf);
987 final Path logDir = new Path(rootdir,
988 DefaultWALProvider.getWALDirectoryName(hrs.getServerName().toString()));
989
990 installTable(new ZooKeeperWatcher(conf, "table-creation", null),
991 "table", "family", 40);
992
993 makeWAL(hrs, ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()),
994 "table", "family", NUM_LOG_LINES, 100);
995
996 new Thread() {
997 @Override
998 public void run() {
999 waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
1000 for (RegionServerThread rst : rsts) {
1001 rst.getRegionServer().abort("testing");
1002 break;
1003 }
1004 }
1005 }.start();
1006
1007 FileStatus[] logfiles = fs.listStatus(logDir);
1008 TaskBatch batch = new TaskBatch();
1009 slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch);
1010
1011 long curt = System.currentTimeMillis();
1012 long waitTime = 80000;
1013 long endt = curt + waitTime;
1014 while (curt < endt) {
1015 if ((tot_wkr_task_resigned.get() + tot_wkr_task_err.get() +
1016 tot_wkr_final_transition_failed.get() + tot_wkr_task_done.get() +
1017 tot_wkr_preempt_task.get()) == 0) {
1018 Thread.yield();
1019 curt = System.currentTimeMillis();
1020 } else {
1021 assertTrue(1 <= (tot_wkr_task_resigned.get() + tot_wkr_task_err.get() +
1022 tot_wkr_final_transition_failed.get() + tot_wkr_task_done.get() +
1023 tot_wkr_preempt_task.get()));
1024 return;
1025 }
1026 }
1027 fail("none of the following counters went up in " + waitTime +
1028 " milliseconds - " +
1029 "tot_wkr_task_resigned, tot_wkr_task_err, " +
1030 "tot_wkr_final_transition_failed, tot_wkr_task_done, " +
1031 "tot_wkr_preempt_task");
1032 }
1033
1034 @Test (timeout=300000)
1035 public void testThreeRSAbort() throws Exception {
1036 LOG.info("testThreeRSAbort");
1037 final int NUM_REGIONS_TO_CREATE = 40;
1038 final int NUM_ROWS_PER_REGION = 100;
1039
1040 startCluster(NUM_RS);
1041
1042 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf,
1043 "distributed log splitting test", null);
1044
1045 Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
1046 populateDataInTable(NUM_ROWS_PER_REGION, "family");
1047
1048
1049 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1050 assertEquals(NUM_RS, rsts.size());
1051 rsts.get(0).getRegionServer().abort("testing");
1052 rsts.get(1).getRegionServer().abort("testing");
1053 rsts.get(2).getRegionServer().abort("testing");
1054
1055 long start = EnvironmentEdgeManager.currentTime();
1056 while (cluster.getLiveRegionServerThreads().size() > (NUM_RS - 3)) {
1057 if (EnvironmentEdgeManager.currentTime() - start > 60000) {
1058 assertTrue(false);
1059 }
1060 Thread.sleep(200);
1061 }
1062
1063 start = EnvironmentEdgeManager.currentTime();
1064 while (HBaseTestingUtility.getAllOnlineRegions(cluster).size()
1065 < (NUM_REGIONS_TO_CREATE + 1)) {
1066 if (EnvironmentEdgeManager.currentTime() - start > 60000) {
1067 assertTrue("Timedout", false);
1068 }
1069 Thread.sleep(200);
1070 }
1071
1072
1073 TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
1074 @Override
1075 public boolean evaluate() throws Exception {
1076 List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
1077 zkw.recoveringRegionsZNode, false);
1078 return (recoveringRegions != null && recoveringRegions.size() == 0);
1079 }
1080 });
1081
1082 assertEquals(NUM_REGIONS_TO_CREATE * NUM_ROWS_PER_REGION,
1083 TEST_UTIL.countRows(ht));
1084 ht.close();
1085 zkw.close();
1086 }
1087
1088
1089
1090 @Test(timeout=30000)
1091 public void testDelayedDeleteOnFailure() throws Exception {
1092 LOG.info("testDelayedDeleteOnFailure");
1093 startCluster(1);
1094 final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
1095 final FileSystem fs = master.getMasterFileSystem().getFileSystem();
1096 final Path logDir = new Path(FSUtils.getRootDir(conf), "x");
1097 fs.mkdirs(logDir);
1098 ExecutorService executor = null;
1099 try {
1100 final Path corruptedLogFile = new Path(logDir, "x");
1101 FSDataOutputStream out;
1102 out = fs.create(corruptedLogFile);
1103 out.write(0);
1104 out.write(Bytes.toBytes("corrupted bytes"));
1105 out.close();
1106 ZKSplitLogManagerCoordination coordination =
1107 (ZKSplitLogManagerCoordination) ((BaseCoordinatedStateManager) master
1108 .getCoordinatedStateManager()).getSplitLogManagerCoordination();
1109 coordination.setIgnoreDeleteForTesting(true);
1110 executor = Executors.newSingleThreadExecutor();
1111 Runnable runnable = new Runnable() {
1112 @Override
1113 public void run() {
1114 try {
1115
1116
1117
1118 slm.splitLogDistributed(logDir);
1119 } catch (IOException ioe) {
1120 try {
1121 assertTrue(fs.exists(corruptedLogFile));
1122
1123
1124
1125 slm.splitLogDistributed(logDir);
1126 } catch (IOException e) {
1127 assertTrue(Thread.currentThread().isInterrupted());
1128 return;
1129 }
1130 fail("did not get the expected IOException from the 2nd call");
1131 }
1132 fail("did not get the expected IOException from the 1st call");
1133 }
1134 };
1135 Future<?> result = executor.submit(runnable);
1136 try {
1137 result.get(2000, TimeUnit.MILLISECONDS);
1138 } catch (TimeoutException te) {
1139
1140 }
1141 waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000);
1142 executor.shutdownNow();
1143 executor = null;
1144
1145
1146 result.get();
1147 } finally {
1148 if (executor != null) {
1149
1150
1151 executor.shutdownNow();
1152 }
1153 fs.delete(logDir, true);
1154 }
1155 }
1156
1157 @Test(timeout = 300000)
1158 public void testMetaRecoveryInZK() throws Exception {
1159 LOG.info("testMetaRecoveryInZK");
1160 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
1161 startCluster(NUM_RS);
1162
1163
1164
1165 master.balanceSwitch(false);
1166 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
1167
1168
1169 HRegionServer hrs = findRSToKill(true, null);
1170 List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
1171
1172 LOG.info("#regions = " + regions.size());
1173 Set<HRegionInfo> tmpRegions = new HashSet<HRegionInfo>();
1174 tmpRegions.add(HRegionInfo.FIRST_META_REGIONINFO);
1175 master.getMasterFileSystem().prepareLogReplay(hrs.getServerName(), tmpRegions);
1176 Set<HRegionInfo> userRegionSet = new HashSet<HRegionInfo>();
1177 userRegionSet.addAll(regions);
1178 master.getMasterFileSystem().prepareLogReplay(hrs.getServerName(), userRegionSet);
1179 boolean isMetaRegionInRecovery = false;
1180 List<String> recoveringRegions =
1181 zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false);
1182 for (String curEncodedRegionName : recoveringRegions) {
1183 if (curEncodedRegionName.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
1184 isMetaRegionInRecovery = true;
1185 break;
1186 }
1187 }
1188 assertTrue(isMetaRegionInRecovery);
1189
1190 master.getMasterFileSystem().splitMetaLog(hrs.getServerName());
1191
1192 isMetaRegionInRecovery = false;
1193 recoveringRegions =
1194 zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false);
1195 for (String curEncodedRegionName : recoveringRegions) {
1196 if (curEncodedRegionName.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
1197 isMetaRegionInRecovery = true;
1198 break;
1199 }
1200 }
1201
1202 assertFalse(isMetaRegionInRecovery);
1203 zkw.close();
1204 }
1205
1206 @Test(timeout = 300000)
1207 public void testSameVersionUpdatesRecovery() throws Exception {
1208 LOG.info("testSameVersionUpdatesRecovery");
1209 conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024);
1210 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
1211 startCluster(NUM_RS);
1212 final AtomicLong sequenceId = new AtomicLong(100);
1213 final int NUM_REGIONS_TO_CREATE = 40;
1214 final int NUM_LOG_LINES = 1000;
1215
1216
1217 master.balanceSwitch(false);
1218
1219 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1220 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
1221 Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
1222
1223 List<HRegionInfo> regions = null;
1224 HRegionServer hrs = null;
1225 for (int i = 0; i < NUM_RS; i++) {
1226 boolean isCarryingMeta = false;
1227 hrs = rsts.get(i).getRegionServer();
1228 regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
1229 for (HRegionInfo region : regions) {
1230 if (region.isMetaRegion()) {
1231 isCarryingMeta = true;
1232 break;
1233 }
1234 }
1235 if (isCarryingMeta) {
1236 continue;
1237 }
1238 break;
1239 }
1240
1241 LOG.info("#regions = " + regions.size());
1242 Iterator<HRegionInfo> it = regions.iterator();
1243 while (it.hasNext()) {
1244 HRegionInfo region = it.next();
1245 if (region.isMetaTable()
1246 || region.getEncodedName().equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
1247 it.remove();
1248 }
1249 }
1250 if (regions.size() == 0) return;
1251 HRegionInfo curRegionInfo = regions.get(0);
1252 byte[] startRow = curRegionInfo.getStartKey();
1253 if (startRow == null || startRow.length == 0) {
1254 startRow = new byte[] { 0, 0, 0, 0, 1 };
1255 }
1256 byte[] row = Bytes.incrementBytes(startRow, 1);
1257
1258 row = Arrays.copyOfRange(row, 3, 8);
1259 long value = 0;
1260 TableName tableName = TableName.valueOf("table");
1261 byte[] family = Bytes.toBytes("family");
1262 byte[] qualifier = Bytes.toBytes("c1");
1263 long timeStamp = System.currentTimeMillis();
1264 HTableDescriptor htd = new HTableDescriptor();
1265 htd.addFamily(new HColumnDescriptor(family));
1266 final WAL wal = hrs.getWAL(curRegionInfo);
1267 for (int i = 0; i < NUM_LOG_LINES; i += 1) {
1268 WALEdit e = new WALEdit();
1269 value++;
1270 e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value)));
1271 wal.append(htd, curRegionInfo,
1272 new HLogKey(curRegionInfo.getEncodedNameAsBytes(), tableName, System.currentTimeMillis()),
1273 e, sequenceId, true, null);
1274 }
1275 wal.sync();
1276 wal.shutdown();
1277
1278
1279 this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
1280
1281
1282 LOG.info("Verification Starts...");
1283 Get g = new Get(row);
1284 Result r = ht.get(g);
1285 long theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
1286 assertEquals(value, theStoredVal);
1287
1288
1289 LOG.info("Verification after flush...");
1290 TEST_UTIL.getHBaseAdmin().flush(tableName);
1291 r = ht.get(g);
1292 theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
1293 assertEquals(value, theStoredVal);
1294 ht.close();
1295 }
1296
1297 @Test(timeout = 300000)
1298 public void testSameVersionUpdatesRecoveryWithCompaction() throws Exception {
1299 LOG.info("testSameVersionUpdatesRecoveryWithWrites");
1300 conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024);
1301 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
1302 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 30 * 1024);
1303 conf.setInt("hbase.hstore.compactionThreshold", 3);
1304 startCluster(NUM_RS);
1305 final AtomicLong sequenceId = new AtomicLong(100);
1306 final int NUM_REGIONS_TO_CREATE = 40;
1307 final int NUM_LOG_LINES = 2000;
1308
1309
1310 master.balanceSwitch(false);
1311
1312 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1313 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
1314 Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
1315
1316 List<HRegionInfo> regions = null;
1317 HRegionServer hrs = null;
1318 for (int i = 0; i < NUM_RS; i++) {
1319 boolean isCarryingMeta = false;
1320 hrs = rsts.get(i).getRegionServer();
1321 regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
1322 for (HRegionInfo region : regions) {
1323 if (region.isMetaRegion()) {
1324 isCarryingMeta = true;
1325 break;
1326 }
1327 }
1328 if (isCarryingMeta) {
1329 continue;
1330 }
1331 break;
1332 }
1333
1334 LOG.info("#regions = " + regions.size());
1335 Iterator<HRegionInfo> it = regions.iterator();
1336 while (it.hasNext()) {
1337 HRegionInfo region = it.next();
1338 if (region.isMetaTable()
1339 || region.getEncodedName().equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
1340 it.remove();
1341 }
1342 }
1343 if (regions.size() == 0) return;
1344 HRegionInfo curRegionInfo = regions.get(0);
1345 byte[] startRow = curRegionInfo.getStartKey();
1346 if (startRow == null || startRow.length == 0) {
1347 startRow = new byte[] { 0, 0, 0, 0, 1 };
1348 }
1349 byte[] row = Bytes.incrementBytes(startRow, 1);
1350
1351 row = Arrays.copyOfRange(row, 3, 8);
1352 long value = 0;
1353 final TableName tableName = TableName.valueOf("table");
1354 byte[] family = Bytes.toBytes("family");
1355 byte[] qualifier = Bytes.toBytes("c1");
1356 long timeStamp = System.currentTimeMillis();
1357 HTableDescriptor htd = new HTableDescriptor(tableName);
1358 htd.addFamily(new HColumnDescriptor(family));
1359 final WAL wal = hrs.getWAL(curRegionInfo);
1360 for (int i = 0; i < NUM_LOG_LINES; i += 1) {
1361 WALEdit e = new WALEdit();
1362 value++;
1363 e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value)));
1364 wal.append(htd, curRegionInfo, new HLogKey(curRegionInfo.getEncodedNameAsBytes(),
1365 tableName, System.currentTimeMillis()), e, sequenceId, true, null);
1366 }
1367 wal.sync();
1368 wal.shutdown();
1369
1370
1371 this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
1372
1373
1374 LOG.info("Verification Starts...");
1375 Get g = new Get(row);
1376 Result r = ht.get(g);
1377 long theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
1378 assertEquals(value, theStoredVal);
1379
1380
1381 LOG.info("Verification after flush...");
1382 TEST_UTIL.getHBaseAdmin().flush(tableName);
1383 TEST_UTIL.getHBaseAdmin().compact(tableName);
1384
1385
1386 TEST_UTIL.waitFor(30000, 200, new Waiter.Predicate<Exception>() {
1387 @Override
1388 public boolean evaluate() throws Exception {
1389 return (TEST_UTIL.getHBaseAdmin().getCompactionState(tableName) == CompactionState.NONE);
1390 }
1391 });
1392
1393 r = ht.get(g);
1394 theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
1395 assertEquals(value, theStoredVal);
1396 ht.close();
1397 }
1398
1399 @Test(timeout = 300000)
1400 public void testReadWriteSeqIdFiles() throws Exception {
1401 LOG.info("testReadWriteSeqIdFiles");
1402 startCluster(2);
1403 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
1404 HTable ht = installTable(zkw, "table", "family", 10);
1405 FileSystem fs = master.getMasterFileSystem().getFileSystem();
1406 Path tableDir = FSUtils.getTableDir(FSUtils.getRootDir(conf), TableName.valueOf("table"));
1407 List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir);
1408 long newSeqId = WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 1L, 1000L);
1409 WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0) , 1L, 1000L);
1410 assertEquals(newSeqId + 2000,
1411 WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 3L, 1000L));
1412
1413 Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regionDirs.get(0));
1414 FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
1415 @Override
1416 public boolean accept(Path p) {
1417 return WALSplitter.isSequenceIdFile(p);
1418 }
1419 });
1420
1421 assertEquals(1, files.length);
1422
1423
1424 NavigableSet<Path> recoveredEdits = WALSplitter.getSplitEditFilesSorted(fs, regionDirs.get(0));
1425 assertEquals(0, recoveredEdits.size());
1426
1427 ht.close();
1428 }
1429
1430 HTable installTable(ZooKeeperWatcher zkw, String tname, String fname, int nrs) throws Exception {
1431 return installTable(zkw, tname, fname, nrs, 0);
1432 }
1433
1434 HTable installTable(ZooKeeperWatcher zkw, String tname, String fname, int nrs,
1435 int existingRegions) throws Exception {
1436
1437 TableName table = TableName.valueOf(tname);
1438 byte [] family = Bytes.toBytes(fname);
1439 LOG.info("Creating table with " + nrs + " regions");
1440 HTable ht = TEST_UTIL.createMultiRegionTable(table, family, nrs);
1441 int numRegions = -1;
1442 try (RegionLocator r = ht.getRegionLocator()) {
1443 numRegions = r.getStartKeys().length;
1444 }
1445 assertEquals(nrs, numRegions);
1446 LOG.info("Waiting for no more RIT\n");
1447 blockUntilNoRIT(zkw, master);
1448
1449
1450 LOG.debug("Disabling table\n");
1451 TEST_UTIL.getHBaseAdmin().disableTable(table);
1452 LOG.debug("Waiting for no more RIT\n");
1453 blockUntilNoRIT(zkw, master);
1454 NavigableSet<String> regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
1455 LOG.debug("Verifying only catalog and namespace regions are assigned\n");
1456 if (regions.size() != 2) {
1457 for (String oregion : regions)
1458 LOG.debug("Region still online: " + oregion);
1459 }
1460 assertTrue(2 + existingRegions <= regions.size());
1461 LOG.debug("Enabling table\n");
1462 TEST_UTIL.getHBaseAdmin().enableTable(table);
1463 LOG.debug("Waiting for no more RIT\n");
1464 blockUntilNoRIT(zkw, master);
1465 LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n");
1466 regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
1467 assertTrue(numRegions + 2 + existingRegions <= regions.size());
1468 return ht;
1469 }
1470
1471 void populateDataInTable(int nrows, String fname) throws Exception {
1472 byte [] family = Bytes.toBytes(fname);
1473
1474 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1475 assertEquals(NUM_RS, rsts.size());
1476
1477 for (RegionServerThread rst : rsts) {
1478 HRegionServer hrs = rst.getRegionServer();
1479 List<HRegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
1480 for (HRegionInfo hri : hris) {
1481 if (hri.getTable().isSystemTable()) {
1482 continue;
1483 }
1484 LOG.debug("adding data to rs = " + rst.getName() +
1485 " region = "+ hri.getRegionNameAsString());
1486 Region region = hrs.getOnlineRegion(hri.getRegionName());
1487 assertTrue(region != null);
1488 putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), family);
1489 }
1490 }
1491 }
1492
1493 public void makeWAL(HRegionServer hrs, List<HRegionInfo> regions, String tname, String fname,
1494 int num_edits, int edit_size) throws IOException {
1495 makeWAL(hrs, regions, tname, fname, num_edits, edit_size, true);
1496 }
1497
1498 public void makeWAL(HRegionServer hrs, List<HRegionInfo> regions, String tname, String fname,
1499 int num_edits, int edit_size, boolean cleanShutdown) throws IOException {
1500 TableName fullTName = TableName.valueOf(tname);
1501
1502 regions.remove(HRegionInfo.FIRST_META_REGIONINFO);
1503
1504 final AtomicLong sequenceId = new AtomicLong(10);
1505
1506
1507 for(Iterator<HRegionInfo> iter = regions.iterator(); iter.hasNext(); ) {
1508 HRegionInfo regionInfo = iter.next();
1509 if(regionInfo.getTable().isSystemTable()) {
1510 iter.remove();
1511 }
1512 }
1513 HTableDescriptor htd = new HTableDescriptor(fullTName);
1514 byte[] family = Bytes.toBytes(fname);
1515 htd.addFamily(new HColumnDescriptor(family));
1516 byte[] value = new byte[edit_size];
1517
1518 List<HRegionInfo> hris = new ArrayList<HRegionInfo>();
1519 for (HRegionInfo region : regions) {
1520 if (!region.getTable().getNameAsString().equalsIgnoreCase(tname)) {
1521 continue;
1522 }
1523 hris.add(region);
1524 }
1525 LOG.info("Creating wal edits across " + hris.size() + " regions.");
1526 for (int i = 0; i < edit_size; i++) {
1527 value[i] = (byte) ('a' + (i % 26));
1528 }
1529 int n = hris.size();
1530 int[] counts = new int[n];
1531
1532 final int syncEvery = 30 * 1024 / edit_size;
1533 if (n > 0) {
1534 for (int i = 0; i < num_edits; i += 1) {
1535 WALEdit e = new WALEdit();
1536 HRegionInfo curRegionInfo = hris.get(i % n);
1537 final WAL log = hrs.getWAL(curRegionInfo);
1538 byte[] startRow = curRegionInfo.getStartKey();
1539 if (startRow == null || startRow.length == 0) {
1540 startRow = new byte[] { 0, 0, 0, 0, 1 };
1541 }
1542 byte[] row = Bytes.incrementBytes(startRow, counts[i % n]);
1543 row = Arrays.copyOfRange(row, 3, 8);
1544
1545
1546 byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i));
1547 e.add(new KeyValue(row, family, qualifier, System.currentTimeMillis(), value));
1548 log.append(htd, curRegionInfo, new HLogKey(curRegionInfo.getEncodedNameAsBytes(), fullTName,
1549 System.currentTimeMillis()), e, sequenceId, true, null);
1550 if (0 == i % syncEvery) {
1551 log.sync();
1552 }
1553 counts[i % n] += 1;
1554 }
1555 }
1556
1557
1558 for (HRegionInfo info : hris) {
1559 final WAL log = hrs.getWAL(info);
1560 log.sync();
1561 }
1562 if (cleanShutdown) {
1563 for (HRegionInfo info : hris) {
1564 final WAL log = hrs.getWAL(info);
1565 log.shutdown();
1566 }
1567 }
1568 for (int i = 0; i < n; i++) {
1569 LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits");
1570 }
1571 return;
1572 }
1573
1574 private int countWAL(Path log, FileSystem fs, Configuration conf)
1575 throws IOException {
1576 int count = 0;
1577 WAL.Reader in = WALFactory.createReader(fs, log, conf);
1578 try {
1579 WAL.Entry e;
1580 while ((e = in.next()) != null) {
1581 if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) {
1582 count++;
1583 }
1584 }
1585 } finally {
1586 try {
1587 in.close();
1588 } catch (IOException exception) {
1589 LOG.warn("Problem closing wal: " + exception.getMessage());
1590 LOG.debug("exception details.", exception);
1591 }
1592 }
1593 return count;
1594 }
1595
1596 private void blockUntilNoRIT(ZooKeeperWatcher zkw, HMaster master)
1597 throws KeeperException, InterruptedException {
1598 ZKAssign.blockUntilNoRIT(zkw);
1599 master.assignmentManager.waitUntilNoRegionsInTransition(60000);
1600 }
1601
1602 private void putData(Region region, byte[] startRow, int numRows, byte [] qf,
1603 byte [] ...families)
1604 throws IOException {
1605 for(int i = 0; i < numRows; i++) {
1606 Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i)));
1607 for(byte [] family : families) {
1608 put.add(family, qf, null);
1609 }
1610 region.put(put);
1611 }
1612 }
1613
1614
1615
1616
1617 private void prepareData(final Table t, final byte[] f, final byte[] column) throws IOException {
1618 byte[] k = new byte[3];
1619
1620
1621 List<Put> puts = new ArrayList<>();
1622 for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1623 for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1624 for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1625 k[0] = b1;
1626 k[1] = b2;
1627 k[2] = b3;
1628 Put put = new Put(k);
1629 put.add(f, column, k);
1630 puts.add(put);
1631 }
1632 }
1633 }
1634 t.put(puts);
1635
1636 for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1637 k[0] = 'a';
1638 k[1] = 'a';
1639 k[2] = b3;
1640 Delete del = new Delete(k);
1641 t.delete(del);
1642 }
1643 }
1644
1645 private void waitForCounter(AtomicLong ctr, long oldval, long newval,
1646 long timems) {
1647 long curt = System.currentTimeMillis();
1648 long endt = curt + timems;
1649 while (curt < endt) {
1650 if (ctr.get() == oldval) {
1651 Thread.yield();
1652 curt = System.currentTimeMillis();
1653 } else {
1654 assertEquals(newval, ctr.get());
1655 return;
1656 }
1657 }
1658 assertTrue(false);
1659 }
1660
1661 private void abortMaster(MiniHBaseCluster cluster) throws InterruptedException {
1662 for (MasterThread mt : cluster.getLiveMasterThreads()) {
1663 if (mt.getMaster().isActiveMaster()) {
1664 mt.getMaster().abort("Aborting for tests", new Exception("Trace info"));
1665 mt.join();
1666 break;
1667 }
1668 }
1669 LOG.debug("Master is aborted");
1670 }
1671
1672
1673
1674
1675
1676
1677
1678
1679 private HRegionServer findRSToKill(boolean hasMetaRegion, String tableName) throws Exception {
1680 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1681 int numOfRSs = rsts.size();
1682 List<HRegionInfo> regions = null;
1683 HRegionServer hrs = null;
1684
1685 for (int i = 0; i < numOfRSs; i++) {
1686 boolean isCarryingMeta = false;
1687 boolean foundTableRegion = false;
1688 hrs = rsts.get(i).getRegionServer();
1689 regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
1690 for (HRegionInfo region : regions) {
1691 if (region.isMetaRegion()) {
1692 isCarryingMeta = true;
1693 }
1694 if (tableName == null || region.getTable().getNameAsString().equals(tableName)) {
1695 foundTableRegion = true;
1696 }
1697 if (foundTableRegion && (isCarryingMeta || !hasMetaRegion)) {
1698 break;
1699 }
1700 }
1701 if (isCarryingMeta && hasMetaRegion) {
1702
1703 if (!foundTableRegion) {
1704 final HRegionServer destRS = hrs;
1705
1706 List<HRegionInfo> tableRegions =
1707 TEST_UTIL.getHBaseAdmin().getTableRegions(TableName.valueOf(tableName));
1708 final HRegionInfo hri = tableRegions.get(0);
1709 TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(),
1710 Bytes.toBytes(destRS.getServerName().getServerName()));
1711
1712 final RegionStates regionStates =
1713 TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
1714 TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() {
1715 @Override
1716 public boolean evaluate() throws Exception {
1717 ServerName sn = regionStates.getRegionServerOfRegion(hri);
1718 return (sn != null && sn.equals(destRS.getServerName()));
1719 }
1720 });
1721 }
1722 return hrs;
1723 } else if (hasMetaRegion || isCarryingMeta) {
1724 continue;
1725 }
1726 if (foundTableRegion) break;
1727 }
1728
1729 return hrs;
1730 }
1731
1732 }