1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertNotSame;
23 import static org.junit.Assert.assertTrue;
24 import static org.junit.Assert.fail;
25
26 import java.io.IOException;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.HashMap;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.concurrent.atomic.AtomicBoolean;
33
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.hbase.CellScannable;
36 import org.apache.hadoop.hbase.CellUtil;
37 import org.apache.hadoop.hbase.CoordinatedStateException;
38 import org.apache.hadoop.hbase.CoordinatedStateManager;
39 import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
40 import org.apache.hadoop.hbase.DoNotRetryIOException;
41 import org.apache.hadoop.hbase.HBaseConfiguration;
42 import org.apache.hadoop.hbase.HBaseTestingUtility;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.HRegionInfo;
45 import org.apache.hadoop.hbase.HTableDescriptor;
46 import org.apache.hadoop.hbase.MetaMockingUtil;
47 import org.apache.hadoop.hbase.RegionException;
48 import org.apache.hadoop.hbase.RegionTransition;
49 import org.apache.hadoop.hbase.Server;
50 import org.apache.hadoop.hbase.ServerLoad;
51 import org.apache.hadoop.hbase.ServerName;
52 import org.apache.hadoop.hbase.TableName;
53 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
54 import org.apache.hadoop.hbase.client.ClusterConnection;
55 import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
56 import org.apache.hadoop.hbase.client.Result;
57 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
58 import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
59 import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
60 import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
61 import org.apache.hadoop.hbase.exceptions.DeserializationException;
62 import org.apache.hadoop.hbase.executor.EventType;
63 import org.apache.hadoop.hbase.executor.ExecutorService;
64 import org.apache.hadoop.hbase.executor.ExecutorType;
65 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
66 import org.apache.hadoop.hbase.master.RegionState.State;
67 import org.apache.hadoop.hbase.master.TableLockManager.NullTableLockManager;
68 import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
69 import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
70 import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
71 import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
72 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
73 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
74 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
75 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
76 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
77 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
78 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
79 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table;
80 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
81 import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
82 import org.apache.hadoop.hbase.testclassification.MediumTests;
83 import org.apache.hadoop.hbase.util.Bytes;
84 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
85 import org.apache.hadoop.hbase.util.Threads;
86 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
87 import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
88 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
89 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
90 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
91 import org.apache.zookeeper.KeeperException;
92 import org.apache.zookeeper.KeeperException.NodeExistsException;
93 import org.apache.zookeeper.Watcher;
94 import org.junit.After;
95 import org.junit.AfterClass;
96 import org.junit.Before;
97 import org.junit.BeforeClass;
98 import org.junit.Test;
99 import org.junit.experimental.categories.Category;
100 import org.mockito.Mockito;
101 import org.mockito.internal.util.reflection.Whitebox;
102 import org.mockito.invocation.InvocationOnMock;
103 import org.mockito.stubbing.Answer;
104
105 import com.google.protobuf.RpcController;
106 import com.google.protobuf.ServiceException;
107
108
109
110
111
112 @Category(MediumTests.class)
113 public class TestAssignmentManager {
114 private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
115 private static final ServerName SERVERNAME_A =
116 ServerName.valueOf("example.org", 1234, 5678);
117 private static final ServerName SERVERNAME_AA =
118 ServerName.valueOf("example.org", 1234, 9999);
119 private static final ServerName SERVERNAME_B =
120 ServerName.valueOf("example.org", 0, 5678);
121 private static final HRegionInfo REGIONINFO =
122 new HRegionInfo(TableName.valueOf("t"),
123 HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
124 private static int assignmentCount;
125 private static boolean enabling = false;
126
127
128 private MasterServices server;
129 private ServerManager serverManager;
130 private ZooKeeperWatcher watcher;
131 private CoordinatedStateManager cp;
132 private MetaTableLocator mtl;
133 private LoadBalancer balancer;
134 private HMaster master;
135 private ClusterConnection connection;
136
137 @BeforeClass
138 public static void beforeClass() throws Exception {
139 HTU.getConfiguration().setBoolean("hbase.assignment.usezk", true);
140 HTU.startMiniZKCluster();
141 }
142
143 @AfterClass
144 public static void afterClass() throws IOException {
145 HTU.shutdownMiniZKCluster();
146 }
147
148 @Before
149 public void before() throws ZooKeeperConnectionException, IOException {
150
151
152
153
154
155
156 this.server = Mockito.mock(MasterServices.class);
157 Mockito.when(server.getServerName()).thenReturn(ServerName.valueOf("master,1,1"));
158 Mockito.when(server.getConfiguration()).thenReturn(HTU.getConfiguration());
159 this.watcher =
160 new ZooKeeperWatcher(HTU.getConfiguration(), "mockedServer", this.server, true);
161 Mockito.when(server.getZooKeeper()).thenReturn(this.watcher);
162 Mockito.doThrow(new RuntimeException("Aborted")).
163 when(server).abort(Mockito.anyString(), (Throwable)Mockito.anyObject());
164
165 cp = new ZkCoordinatedStateManager();
166 cp.initialize(this.server);
167 cp.start();
168
169 mtl = Mockito.mock(MetaTableLocator.class);
170
171 Mockito.when(server.getCoordinatedStateManager()).thenReturn(cp);
172 Mockito.when(server.getMetaTableLocator()).thenReturn(mtl);
173
174
175 this.connection =
176 (ClusterConnection)HConnectionTestingUtility.getMockedConnection(HTU.getConfiguration());
177
178
179
180 Mockito.when(server.getConnection()).thenReturn(connection);
181 Mockito.when(connection.isManaged()).thenReturn(true);
182
183
184
185 this.serverManager = Mockito.mock(ServerManager.class);
186 Mockito.when(this.serverManager.isServerOnline(SERVERNAME_A)).thenReturn(true);
187 Mockito.when(this.serverManager.isServerOnline(SERVERNAME_B)).thenReturn(true);
188 Mockito.when(this.serverManager.getDeadServers()).thenReturn(new DeadServer());
189 final Map<ServerName, ServerLoad> onlineServers = new HashMap<ServerName, ServerLoad>();
190 onlineServers.put(SERVERNAME_B, ServerLoad.EMPTY_SERVERLOAD);
191 onlineServers.put(SERVERNAME_A, ServerLoad.EMPTY_SERVERLOAD);
192 Mockito.when(this.serverManager.getOnlineServersList()).thenReturn(
193 new ArrayList<ServerName>(onlineServers.keySet()));
194 Mockito.when(this.serverManager.getOnlineServers()).thenReturn(onlineServers);
195
196 List<ServerName> avServers = new ArrayList<ServerName>();
197 avServers.addAll(onlineServers.keySet());
198 Mockito.when(this.serverManager.createDestinationServersList()).thenReturn(avServers);
199 Mockito.when(this.serverManager.createDestinationServersList(new ArrayList<ServerName>())).thenReturn(avServers);
200
201 Mockito.when(this.serverManager.sendRegionClose(SERVERNAME_A, REGIONINFO, -1)).
202 thenReturn(true);
203 Mockito.when(this.serverManager.sendRegionClose(SERVERNAME_B, REGIONINFO, -1)).
204 thenReturn(true);
205
206 Mockito.when(this.serverManager.sendRegionOpen(SERVERNAME_A, REGIONINFO, -1, null)).
207 thenReturn(RegionOpeningState.OPENED);
208 Mockito.when(this.serverManager.sendRegionOpen(SERVERNAME_B, REGIONINFO, -1, null)).
209 thenReturn(RegionOpeningState.OPENED);
210 this.master = Mockito.mock(HMaster.class);
211
212 Mockito.when(this.master.getServerManager()).thenReturn(serverManager);
213 }
214
215 @After public void after() throws KeeperException, IOException {
216 if (this.watcher != null) {
217
218 ZKAssign.deleteAllNodes(this.watcher);
219 this.watcher.close();
220 this.cp.stop();
221 }
222 if (this.connection != null) this.connection.close();
223 }
224
225
226
227
228
229
230
231
232
233 @Test(timeout = 60000)
234 public void testBalanceOnMasterFailoverScenarioWithOpenedNode()
235 throws IOException, KeeperException, InterruptedException, ServiceException,
236 DeserializationException, CoordinatedStateException {
237 AssignmentManagerWithExtrasForTesting am =
238 setUpMockedAssignmentManager(this.server, this.serverManager);
239 try {
240 createRegionPlanAndBalance(am, SERVERNAME_A, SERVERNAME_B, REGIONINFO);
241 startFakeFailedOverMasterAssignmentManager(am, this.watcher);
242 while (!am.processRITInvoked) Thread.sleep(1);
243
244
245
246 am.addPlan(REGIONINFO.getEncodedName(), new RegionPlan(REGIONINFO, null, SERVERNAME_B));
247
248 Mocking.waitForRegionFailedToCloseAndSetToPendingClose(am, REGIONINFO);
249
250
251
252
253
254
255 int versionid =
256 ZKAssign.transitionNodeClosed(this.watcher, REGIONINFO, SERVERNAME_A, -1);
257 assertNotSame(versionid, -1);
258 Mocking.waitForRegionPendingOpenInRIT(am, REGIONINFO.getEncodedName());
259
260
261
262 versionid = ZKAssign.getVersion(this.watcher, REGIONINFO);
263 assertNotSame(-1, versionid);
264
265 versionid = ZKAssign.transitionNode(server.getZooKeeper(), REGIONINFO,
266 SERVERNAME_B, EventType.M_ZK_REGION_OFFLINE,
267 EventType.RS_ZK_REGION_OPENING, versionid);
268 assertNotSame(-1, versionid);
269
270 versionid = ZKAssign.transitionNodeOpened(this.watcher, REGIONINFO,
271 SERVERNAME_B, versionid);
272 assertNotSame(-1, versionid);
273 am.gate.set(false);
274
275 ZKAssign.blockUntilNoRIT(watcher);
276 } finally {
277 am.getExecutorService().shutdown();
278 am.shutdown();
279 }
280 }
281
282 @Test(timeout = 60000)
283 public void testBalanceOnMasterFailoverScenarioWithClosedNode()
284 throws IOException, KeeperException, InterruptedException, ServiceException,
285 DeserializationException, CoordinatedStateException {
286 AssignmentManagerWithExtrasForTesting am =
287 setUpMockedAssignmentManager(this.server, this.serverManager);
288 try {
289 createRegionPlanAndBalance(am, SERVERNAME_A, SERVERNAME_B, REGIONINFO);
290 startFakeFailedOverMasterAssignmentManager(am, this.watcher);
291 while (!am.processRITInvoked) Thread.sleep(1);
292
293
294
295 am.addPlan(REGIONINFO.getEncodedName(), new RegionPlan(REGIONINFO, null, SERVERNAME_B));
296
297 Mocking.waitForRegionFailedToCloseAndSetToPendingClose(am, REGIONINFO);
298
299
300
301
302
303
304 int versionid =
305 ZKAssign.transitionNodeClosed(this.watcher, REGIONINFO, SERVERNAME_A, -1);
306 assertNotSame(versionid, -1);
307 am.gate.set(false);
308 Mocking.waitForRegionPendingOpenInRIT(am, REGIONINFO.getEncodedName());
309
310
311
312 versionid = ZKAssign.getVersion(this.watcher, REGIONINFO);
313 assertNotSame(-1, versionid);
314
315 versionid = ZKAssign.transitionNode(server.getZooKeeper(), REGIONINFO,
316 SERVERNAME_B, EventType.M_ZK_REGION_OFFLINE,
317 EventType.RS_ZK_REGION_OPENING, versionid);
318 assertNotSame(-1, versionid);
319
320 versionid = ZKAssign.transitionNodeOpened(this.watcher, REGIONINFO,
321 SERVERNAME_B, versionid);
322 assertNotSame(-1, versionid);
323
324
325 ZKAssign.blockUntilNoRIT(watcher);
326 } finally {
327 am.getExecutorService().shutdown();
328 am.shutdown();
329 }
330 }
331
332 @Test(timeout = 60000)
333 public void testBalanceOnMasterFailoverScenarioWithOfflineNode()
334 throws IOException, KeeperException, InterruptedException, ServiceException,
335 DeserializationException, CoordinatedStateException {
336 AssignmentManagerWithExtrasForTesting am =
337 setUpMockedAssignmentManager(this.server, this.serverManager);
338 try {
339 createRegionPlanAndBalance(am, SERVERNAME_A, SERVERNAME_B, REGIONINFO);
340 startFakeFailedOverMasterAssignmentManager(am, this.watcher);
341 while (!am.processRITInvoked) Thread.sleep(1);
342
343
344
345 am.addPlan(REGIONINFO.getEncodedName(), new RegionPlan(REGIONINFO, null, SERVERNAME_B));
346
347 Mocking.waitForRegionFailedToCloseAndSetToPendingClose(am, REGIONINFO);
348
349
350
351
352
353
354 int versionid =
355 ZKAssign.transitionNodeClosed(this.watcher, REGIONINFO, SERVERNAME_A, -1);
356 assertNotSame(versionid, -1);
357 Mocking.waitForRegionPendingOpenInRIT(am, REGIONINFO.getEncodedName());
358
359 am.gate.set(false);
360
361
362 versionid = ZKAssign.getVersion(this.watcher, REGIONINFO);
363 assertNotSame(-1, versionid);
364
365 versionid = ZKAssign.transitionNode(server.getZooKeeper(), REGIONINFO,
366 SERVERNAME_B, EventType.M_ZK_REGION_OFFLINE,
367 EventType.RS_ZK_REGION_OPENING, versionid);
368 assertNotSame(-1, versionid);
369
370 versionid = ZKAssign.transitionNodeOpened(this.watcher, REGIONINFO,
371 SERVERNAME_B, versionid);
372 assertNotSame(-1, versionid);
373
374 ZKAssign.blockUntilNoRIT(watcher);
375 } finally {
376 am.getExecutorService().shutdown();
377 am.shutdown();
378 }
379 }
380
381 private void createRegionPlanAndBalance(
382 final AssignmentManager am, final ServerName from,
383 final ServerName to, final HRegionInfo hri) throws RegionException {
384
385
386 am.regionOnline(hri, from);
387
388
389 am.balance(new RegionPlan(hri, from, to));
390 }
391
392
393
394
395
396
397
398
399 @Test (timeout=180000)
400 public void testBalance() throws IOException, KeeperException, DeserializationException,
401 InterruptedException, CoordinatedStateException {
402
403
404 ExecutorService executor = startupMasterExecutor("testBalanceExecutor");
405
406
407 LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(server
408 .getConfiguration());
409
410 AssignmentManager am = new AssignmentManager(this.server,
411 this.serverManager, balancer, executor, null, master.getTableLockManager());
412 am.failoverCleanupDone.set(true);
413 try {
414
415
416 this.watcher.registerListenerFirst(am);
417
418
419 am.regionOnline(REGIONINFO, SERVERNAME_A);
420
421 RegionPlan plan = new RegionPlan(REGIONINFO, SERVERNAME_A, SERVERNAME_B);
422 am.balance(plan);
423
424 RegionStates regionStates = am.getRegionStates();
425
426 assertTrue(regionStates.isRegionInTransition(REGIONINFO)
427 && regionStates.isRegionInState(REGIONINFO, State.FAILED_CLOSE));
428
429 regionStates.updateRegionState(REGIONINFO, State.PENDING_CLOSE);
430
431
432
433
434
435
436 int versionid =
437 ZKAssign.transitionNodeClosed(this.watcher, REGIONINFO, SERVERNAME_A, -1);
438 assertNotSame(versionid, -1);
439
440
441
442
443
444 Mocking.waitForRegionPendingOpenInRIT(am, REGIONINFO.getEncodedName());
445
446
447 versionid = ZKAssign.getVersion(this.watcher, REGIONINFO);
448 assertNotSame(-1, versionid);
449
450 versionid = ZKAssign.transitionNode(server.getZooKeeper(), REGIONINFO,
451 SERVERNAME_B, EventType.M_ZK_REGION_OFFLINE,
452 EventType.RS_ZK_REGION_OPENING, versionid);
453 assertNotSame(-1, versionid);
454
455 versionid =
456 ZKAssign.transitionNodeOpened(this.watcher, REGIONINFO, SERVERNAME_B, versionid);
457 assertNotSame(-1, versionid);
458
459 while(regionStates.isRegionInTransition(REGIONINFO)) Threads.sleep(1);
460 } finally {
461 executor.shutdown();
462 am.shutdown();
463
464 ZKAssign.deleteAllNodes(this.watcher);
465 }
466 }
467
468
469
470
471
472
473 @Test (timeout=180000)
474 public void testShutdownHandler()
475 throws KeeperException, IOException, CoordinatedStateException, ServiceException {
476
477
478 ExecutorService executor = startupMasterExecutor("testShutdownHandler");
479
480
481 AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(
482 this.server, this.serverManager);
483 try {
484 processServerShutdownHandler(am, false);
485 } finally {
486 executor.shutdown();
487 am.shutdown();
488
489 ZKAssign.deleteAllNodes(this.watcher);
490 }
491 }
492
493
494
495
496
497
498 @Test (timeout=180000)
499 public void testShutdownHandlerWithRestartedServer()
500 throws KeeperException, IOException, CoordinatedStateException, ServiceException {
501
502
503 ExecutorService executor = startupMasterExecutor("testShutdownHandlerWithRestartedServer");
504
505
506 AssignmentManagerWithExtrasForTesting am =
507 setUpMockedAssignmentManager(this.server, this.serverManager);
508 am.getRegionStates().regionOnline(REGIONINFO, SERVERNAME_A);
509 am.getTableStateManager().setTableState(REGIONINFO.getTable(), Table.State.ENABLED);
510 try {
511 processServerShutdownHandler(am, false, true);
512 } finally {
513 executor.shutdown();
514 am.shutdown();
515
516 ZKAssign.deleteAllNodes(this.watcher);
517 }
518 }
519
520
521
522
523
524
525
526
527
528
529 @Test (timeout=180000)
530 public void testSSHWhenDisableTableInProgress() throws KeeperException, IOException,
531 CoordinatedStateException, ServiceException {
532 testCaseWithPartiallyDisabledState(Table.State.DISABLING);
533 testCaseWithPartiallyDisabledState(Table.State.DISABLED);
534 }
535
536
537
538
539
540
541
542
543
544 @Test (timeout=180000)
545 public void testSSHWhenSplitRegionInProgress() throws KeeperException, IOException, Exception {
546
547 testCaseWithSplitRegionPartial(true);
548
549 testCaseWithSplitRegionPartial(false);
550 }
551
552 private void testCaseWithSplitRegionPartial(boolean regionSplitDone) throws KeeperException,
553 IOException, InterruptedException,
554 CoordinatedStateException, ServiceException {
555
556
557 ExecutorService executor = startupMasterExecutor("testSSHWhenSplitRegionInProgress");
558
559 ZKAssign.deleteAllNodes(this.watcher);
560
561
562 AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(
563 this.server, this.serverManager);
564
565 am.regionOnline(REGIONINFO, SERVERNAME_A);
566
567 am.getRegionStates().updateRegionState(
568 REGIONINFO, State.SPLITTING, SERVERNAME_A);
569 am.getTableStateManager().setTableState(REGIONINFO.getTable(),
570 Table.State.ENABLED);
571 RegionTransition data = RegionTransition.createRegionTransition(EventType.RS_ZK_REGION_SPLITTING,
572 REGIONINFO.getRegionName(), SERVERNAME_A);
573 String node = ZKAssign.getNodeName(this.watcher, REGIONINFO.getEncodedName());
574
575 ZKUtil.createAndWatch(this.watcher, node, data.toByteArray());
576
577 try {
578 processServerShutdownHandler(am, regionSplitDone);
579
580
581
582 if (regionSplitDone) {
583 assertFalse("Region state of region in SPLITTING should be removed from rit.",
584 am.getRegionStates().isRegionsInTransition());
585 } else {
586 while (!am.assignInvoked) {
587 Thread.sleep(1);
588 }
589 assertTrue("Assign should be invoked.", am.assignInvoked);
590 }
591 } finally {
592 REGIONINFO.setOffline(false);
593 REGIONINFO.setSplit(false);
594 executor.shutdown();
595 am.shutdown();
596
597 ZKAssign.deleteAllNodes(this.watcher);
598 }
599 }
600
601 private void testCaseWithPartiallyDisabledState(Table.State state) throws KeeperException,
602 IOException, CoordinatedStateException, ServiceException {
603
604
605 ExecutorService executor = startupMasterExecutor("testSSHWhenDisableTableInProgress");
606 LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(server.getConfiguration());
607 ZKAssign.deleteAllNodes(this.watcher);
608
609
610 AssignmentManager am = new AssignmentManager(this.server,
611 this.serverManager, balancer, executor, null, master.getTableLockManager());
612
613 am.regionOnline(REGIONINFO, SERVERNAME_A);
614
615 am.getRegionStates().updateRegionState(REGIONINFO, State.PENDING_CLOSE);
616 if (state == Table.State.DISABLING) {
617 am.getTableStateManager().setTableState(REGIONINFO.getTable(),
618 Table.State.DISABLING);
619 } else {
620 am.getTableStateManager().setTableState(REGIONINFO.getTable(),
621 Table.State.DISABLED);
622 }
623 RegionTransition data = RegionTransition.createRegionTransition(EventType.M_ZK_REGION_CLOSING,
624 REGIONINFO.getRegionName(), SERVERNAME_A);
625
626
627
628 String node = ZKAssign.getNodeName(this.watcher, REGIONINFO.getEncodedName());
629
630 ZKUtil.createAndWatch(this.watcher, node, data.toByteArray());
631
632 try {
633 processServerShutdownHandler(am, false);
634
635
636 assertTrue("The znode should be deleted.", ZKUtil.checkExists(this.watcher, node) == -1);
637
638
639
640 if (state == Table.State.DISABLED) {
641 assertFalse("Region state of region in pending close should be removed from rit.",
642 am.getRegionStates().isRegionsInTransition());
643 }
644 } finally {
645 am.setEnabledTable(REGIONINFO.getTable());
646 executor.shutdown();
647 am.shutdown();
648
649 ZKAssign.deleteAllNodes(this.watcher);
650 }
651 }
652
653 private void processServerShutdownHandler(AssignmentManager am, boolean splitRegion)
654 throws IOException, ServiceException {
655 processServerShutdownHandler(am, splitRegion, false);
656 }
657
658 private void processServerShutdownHandler(
659 AssignmentManager am, boolean splitRegion, boolean deadserverRestarted)
660 throws IOException, ServiceException {
661
662
663 this.watcher.registerListenerFirst(am);
664
665
666
667 ClientProtos.ClientService.BlockingInterface implementation =
668 Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
669
670
671 Result r;
672 if (splitRegion) {
673 r = MetaMockingUtil.getMetaTableRowResultAsSplitRegion(REGIONINFO, SERVERNAME_A);
674 } else {
675 r = MetaMockingUtil.getMetaTableRowResult(REGIONINFO, SERVERNAME_A);
676 }
677
678 final ScanResponse.Builder builder = ScanResponse.newBuilder();
679 builder.setMoreResults(true);
680 builder.addCellsPerResult(r.size());
681 final List<CellScannable> cellScannables = new ArrayList<CellScannable>(1);
682 cellScannables.add(r);
683 Mockito.when(implementation.scan(
684 (RpcController)Mockito.any(), (ScanRequest)Mockito.any())).
685 thenAnswer(new Answer<ScanResponse>() {
686 @Override
687 public ScanResponse answer(InvocationOnMock invocation) throws Throwable {
688 PayloadCarryingRpcController controller = (PayloadCarryingRpcController) invocation
689 .getArguments()[0];
690 if (controller != null) {
691 controller.setCellScanner(CellUtil.createCellScanner(cellScannables));
692 }
693 return builder.build();
694 }
695 });
696
697
698 ClusterConnection connection =
699 HConnectionTestingUtility.getMockedConnectionAndDecorate(HTU.getConfiguration(),
700 null, implementation, SERVERNAME_B, REGIONINFO);
701
702
703
704
705
706 Mockito.when(connection.isManaged()).thenReturn(true);
707 try {
708
709
710 Mockito.when(this.server.getConnection()).thenReturn(connection);
711
712
713
714 DeadServer deadServers = new DeadServer();
715 deadServers.add(SERVERNAME_A);
716 Mockito.when(this.serverManager.isServerReachable(SERVERNAME_B)).thenReturn(true);
717 Mockito.when(this.serverManager.isServerOnline(SERVERNAME_A)).thenReturn(false);
718 final Map<ServerName, ServerLoad> onlineServers = new HashMap<ServerName, ServerLoad>();
719 onlineServers.put(SERVERNAME_B, ServerLoad.EMPTY_SERVERLOAD);
720 if (deadserverRestarted) {
721
722
723 Mockito.when(this.serverManager.isServerOnline(SERVERNAME_AA)).thenReturn(true);
724 Mockito.when(this.serverManager.isServerReachable(SERVERNAME_AA)).thenReturn(true);
725 Mockito.when(
726 this.serverManager.isServerWithSameHostnamePortOnline(SERVERNAME_A)).thenReturn(true);
727 onlineServers.put(SERVERNAME_AA, ServerLoad.EMPTY_SERVERLOAD);
728 }
729 Mockito.when(this.serverManager.getOnlineServersList()).thenReturn(
730 new ArrayList<ServerName>(onlineServers.keySet()));
731 Mockito.when(this.serverManager.getOnlineServers()).thenReturn(onlineServers);
732 List<ServerName> avServers = new ArrayList<ServerName>();
733 avServers.addAll(onlineServers.keySet());
734 Mockito.when(this.serverManager.createDestinationServersList()).thenReturn(avServers);
735
736 MasterFileSystem fs = Mockito.mock(MasterFileSystem.class);
737 Mockito.doNothing().when(fs).setLogRecoveryMode();
738 Mockito.when(fs.getLogRecoveryMode()).thenReturn(RecoveryMode.LOG_REPLAY);
739 MasterServices services = Mockito.mock(MasterServices.class);
740 Mockito.when(services.getAssignmentManager()).thenReturn(am);
741 Mockito.when(services.getServerManager()).thenReturn(this.serverManager);
742 Mockito.when(services.getZooKeeper()).thenReturn(this.watcher);
743 Mockito.when(services.getMasterFileSystem()).thenReturn(fs);
744 Mockito.when(services.getConnection()).thenReturn(connection);
745 Configuration conf = server.getConfiguration();
746 Mockito.when(services.getConfiguration()).thenReturn(conf);
747 ServerShutdownHandler handler = new ServerShutdownHandler(this.server,
748 services, deadServers, SERVERNAME_A, false);
749 am.failoverCleanupDone.set(true);
750 handler.process();
751
752 } finally {
753 if (connection != null) connection.close();
754 }
755 }
756
757
758
759
760
761
762
763 private ExecutorService startupMasterExecutor(final String name) {
764
765 ExecutorService executor = new ExecutorService(name);
766 executor.startExecutorService(ExecutorType.MASTER_OPEN_REGION, 3);
767 executor.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, 3);
768 executor.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, 3);
769 executor.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, 3);
770 return executor;
771 }
772
773 @Test (timeout=180000)
774 public void testUnassignWithSplitAtSameTime() throws KeeperException,
775 IOException, CoordinatedStateException {
776
777 final HRegionInfo hri = HRegionInfo.FIRST_META_REGIONINFO;
778
779
780
781 Mockito.when(this.serverManager.sendRegionClose(SERVERNAME_A, hri, -1)).thenReturn(true);
782
783 LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(server
784 .getConfiguration());
785
786 AssignmentManager am = new AssignmentManager(this.server,
787 this.serverManager, balancer, null, null, master.getTableLockManager());
788 try {
789
790 unassign(am, SERVERNAME_A, hri);
791
792 ZKAssign.deleteClosingNode(this.watcher, hri, SERVERNAME_A);
793
794
795
796 int version = createNodeSplitting(this.watcher, hri, SERVERNAME_A);
797
798
799
800
801 unassign(am, SERVERNAME_A, hri);
802
803 ZKAssign.transitionNode(this.watcher, hri, SERVERNAME_A,
804 EventType.RS_ZK_REGION_SPLITTING, EventType.RS_ZK_REGION_SPLITTING, version);
805 assertFalse(am.getRegionStates().isRegionInTransition(hri));
806 } finally {
807 am.shutdown();
808 }
809 }
810
811
812
813
814
815
816
817 @Test(timeout = 60000)
818 public void testProcessDeadServersAndRegionsInTransitionShouldNotFailWithNPE()
819 throws IOException, KeeperException, CoordinatedStateException,
820 InterruptedException, ServiceException {
821 final RecoverableZooKeeper recoverableZk = Mockito
822 .mock(RecoverableZooKeeper.class);
823 AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(
824 this.server, this.serverManager);
825 Watcher zkw = new ZooKeeperWatcher(HBaseConfiguration.create(), "unittest",
826 null) {
827 @Override
828 public RecoverableZooKeeper getRecoverableZooKeeper() {
829 return recoverableZk;
830 }
831 };
832 ((ZooKeeperWatcher) zkw).registerListener(am);
833 Mockito.doThrow(new InterruptedException()).when(recoverableZk)
834 .getChildren("/hbase/region-in-transition", null);
835 am.setWatcher((ZooKeeperWatcher) zkw);
836 try {
837 am.processDeadServersAndRegionsInTransition(null);
838 fail("Expected to abort");
839 } catch (NullPointerException e) {
840 fail("Should not throw NPE");
841 } catch (RuntimeException e) {
842 assertEquals("Aborted", e.getLocalizedMessage());
843 } finally {
844 am.shutdown();
845 }
846 }
847
848
849
850
851 @Test(timeout = 60000)
852 public void testRegionPlanIsUpdatedWhenRegionFailsToOpen() throws IOException, KeeperException,
853 ServiceException, InterruptedException, CoordinatedStateException {
854 this.server.getConfiguration().setClass(
855 HConstants.HBASE_MASTER_LOADBALANCER_CLASS, MockedLoadBalancer.class,
856 LoadBalancer.class);
857 AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(
858 this.server, this.serverManager);
859 try {
860
861
862
863 AtomicBoolean gate = new AtomicBoolean(false);
864 if (balancer instanceof MockedLoadBalancer) {
865 ((MockedLoadBalancer) balancer).setGateVariable(gate);
866 }
867 ZKAssign.createNodeOffline(this.watcher, REGIONINFO, SERVERNAME_A);
868 int v = ZKAssign.getVersion(this.watcher, REGIONINFO);
869 ZKAssign.transitionNode(this.watcher, REGIONINFO, SERVERNAME_A,
870 EventType.M_ZK_REGION_OFFLINE, EventType.RS_ZK_REGION_FAILED_OPEN, v);
871 String path = ZKAssign.getNodeName(this.watcher, REGIONINFO
872 .getEncodedName());
873 am.getRegionStates().updateRegionState(
874 REGIONINFO, State.OPENING, SERVERNAME_A);
875
876
877 am.regionPlans.put(REGIONINFO.getEncodedName(), new RegionPlan(
878 REGIONINFO, null, SERVERNAME_A));
879 RegionPlan regionPlan = am.regionPlans.get(REGIONINFO.getEncodedName());
880 List<ServerName> serverList = new ArrayList<ServerName>(2);
881 serverList.add(SERVERNAME_B);
882 Mockito.when(
883 this.serverManager.createDestinationServersList(Arrays.asList(SERVERNAME_A)))
884 .thenReturn(serverList);
885 am.nodeDataChanged(path);
886
887
888 while (!gate.get()) {
889 Thread.sleep(10);
890 }
891
892
893
894 RegionPlan newRegionPlan = am.regionPlans
895 .get(REGIONINFO.getEncodedName());
896 while (newRegionPlan == null) {
897 Thread.sleep(10);
898 newRegionPlan = am.regionPlans.get(REGIONINFO.getEncodedName());
899 }
900
901
902
903 assertNotSame("Same region plan should not come", regionPlan,
904 newRegionPlan);
905 assertTrue("Destination servers should be different.", !(regionPlan
906 .getDestination().equals(newRegionPlan.getDestination())));
907
908 Mocking.waitForRegionPendingOpenInRIT(am, REGIONINFO.getEncodedName());
909 } finally {
910 this.server.getConfiguration().setClass(
911 HConstants.HBASE_MASTER_LOADBALANCER_CLASS, StochasticLoadBalancer.class,
912 LoadBalancer.class);
913 am.getExecutorService().shutdown();
914 am.shutdown();
915 }
916 }
917
918
919
920
921
922
923 public static class MockedLoadBalancer extends StochasticLoadBalancer {
924 private AtomicBoolean gate;
925
926 public void setGateVariable(AtomicBoolean gate) {
927 this.gate = gate;
928 }
929
930 @Override
931 public ServerName randomAssignment(HRegionInfo regionInfo, List<ServerName> servers) {
932 ServerName randomServerName = super.randomAssignment(regionInfo, servers);
933 this.gate.set(true);
934 return randomServerName;
935 }
936
937 @Override
938 public Map<ServerName, List<HRegionInfo>> retainAssignment(
939 Map<HRegionInfo, ServerName> regions, List<ServerName> servers) {
940 this.gate.set(true);
941 return super.retainAssignment(regions, servers);
942 }
943 }
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958 @Test(timeout = 60000)
959 public void testAssignmentOfRegionInSSHAndInFailedOpenState() throws IOException,
960 KeeperException, ServiceException, CoordinatedStateException, InterruptedException {
961 AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(
962 this.server, this.serverManager);
963 ZKAssign.createNodeOffline(this.watcher, REGIONINFO, SERVERNAME_B);
964 int v = ZKAssign.getVersion(this.watcher, REGIONINFO);
965 ZKAssign.transitionNode(this.watcher, REGIONINFO, SERVERNAME_B,
966 EventType.M_ZK_REGION_OFFLINE, EventType.RS_ZK_REGION_FAILED_OPEN, v);
967 Mockito.when(this.serverManager.isServerOnline(SERVERNAME_B)).thenReturn(true);
968 Mockito.when(this.serverManager.isServerReachable(SERVERNAME_B)).thenReturn(true);
969 Mockito.when(this.serverManager.isServerOnline(SERVERNAME_A)).thenReturn(false);
970 DeadServer deadServers = new DeadServer();
971 deadServers.add(SERVERNAME_A);
972 Mockito.when(this.serverManager.getDeadServers()).thenReturn(deadServers);
973 final Map<ServerName, ServerLoad> onlineServers = new HashMap<ServerName, ServerLoad>();
974 onlineServers.put(SERVERNAME_B, ServerLoad.EMPTY_SERVERLOAD);
975 Mockito.when(this.serverManager.getOnlineServersList()).thenReturn(
976 new ArrayList<ServerName>(onlineServers.keySet()));
977 Mockito.when(this.serverManager.getOnlineServers()).thenReturn(onlineServers);
978 am.gate.set(false);
979
980 am.joinCluster();
981 while (!am.gate.get()) {
982 Thread.sleep(10);
983 }
984 assertTrue(am.getRegionStates().getRegionState(REGIONINFO).getState()
985 == RegionState.State.PENDING_OPEN);
986 am.shutdown();
987 }
988
989 @Test(timeout = 600000)
990 public void testAssignmentOfRegionRITWithOffline() throws IOException,
991 KeeperException, ServiceException, CoordinatedStateException, InterruptedException {
992 AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(
993 this.server, this.serverManager);
994 ZKAssign.createNodeOffline(this.watcher, REGIONINFO, SERVERNAME_B);
995 am.gate.set(false);
996
997 am.joinCluster();
998 while (!am.gate.get()) {
999 Thread.sleep(10);
1000 }
1001 assertTrue(am.getRegionStates().getRegionState(REGIONINFO).getState()
1002 == RegionState.State.PENDING_OPEN);
1003 am.shutdown();
1004 }
1005
1006
1007
1008
1009
1010 @Test(timeout = 60000)
1011 public void testRegionInOpeningStateOnDeadRSWhileMasterFailover() throws IOException,
1012 KeeperException, ServiceException, CoordinatedStateException, InterruptedException {
1013 AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(
1014 this.server, this.serverManager);
1015 ZKAssign.createNodeOffline(this.watcher, REGIONINFO, SERVERNAME_A);
1016 int version = ZKAssign.getVersion(this.watcher, REGIONINFO);
1017 ZKAssign.transitionNode(this.watcher, REGIONINFO, SERVERNAME_A, EventType.M_ZK_REGION_OFFLINE,
1018 EventType.RS_ZK_REGION_OPENING, version);
1019 RegionTransition rt = RegionTransition.createRegionTransition(EventType.RS_ZK_REGION_OPENING,
1020 REGIONINFO.getRegionName(), SERVERNAME_A, HConstants.EMPTY_BYTE_ARRAY);
1021 version = ZKAssign.getVersion(this.watcher, REGIONINFO);
1022 Mockito.when(this.serverManager.isServerOnline(SERVERNAME_A)).thenReturn(false);
1023 am.getRegionStates().logSplit(SERVERNAME_A);
1024 am.getRegionStates().createRegionState(REGIONINFO);
1025 am.gate.set(false);
1026
1027 BaseCoordinatedStateManager cp = new ZkCoordinatedStateManager();
1028 cp.initialize(server);
1029 cp.start();
1030
1031 OpenRegionCoordination orc = cp.getOpenRegionCoordination();
1032 ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
1033 new ZkOpenRegionCoordination.ZkOpenRegionDetails();
1034 zkOrd.setServerName(server.getServerName());
1035 zkOrd.setVersion(version);
1036
1037 assertFalse(am.processRegionsInTransition(rt, REGIONINFO, orc, zkOrd));
1038 am.getTableStateManager().setTableState(REGIONINFO.getTable(), Table.State.ENABLED);
1039 processServerShutdownHandler(am, false);
1040
1041 while (!am.gate.get()) {
1042 Thread.sleep(10);
1043 }
1044 assertTrue("The region should be assigned immediately.", null != am.regionPlans.get(REGIONINFO
1045 .getEncodedName()));
1046 am.shutdown();
1047 }
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057 @Test(timeout = 60000)
1058 public void testDisablingTableRegionsAssignmentDuringCleanClusterStartup()
1059 throws KeeperException, IOException, Exception {
1060 this.server.getConfiguration().setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
1061 MockedLoadBalancer.class, LoadBalancer.class);
1062 Mockito.when(this.serverManager.getOnlineServers()).thenReturn(
1063 new HashMap<ServerName, ServerLoad>(0));
1064 List<ServerName> destServers = new ArrayList<ServerName>(1);
1065 destServers.add(SERVERNAME_A);
1066 Mockito.when(this.serverManager.createDestinationServersList()).thenReturn(destServers);
1067
1068 HTU.getConfiguration().setInt(HConstants.MASTER_PORT, 0);
1069
1070 CoordinatedStateManager csm = CoordinatedStateManagerFactory.getCoordinatedStateManager(
1071 HTU.getConfiguration());
1072 MasterServices server = new HMaster(HTU.getConfiguration(), csm);
1073 AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(server,
1074 this.serverManager);
1075
1076 Whitebox.setInternalState(server, "metaTableLocator", Mockito.mock(MetaTableLocator.class));
1077
1078
1079
1080 Whitebox.setInternalState(server, "clusterConnection", am.getConnection());
1081
1082 AtomicBoolean gate = new AtomicBoolean(false);
1083 if (balancer instanceof MockedLoadBalancer) {
1084 ((MockedLoadBalancer) balancer).setGateVariable(gate);
1085 }
1086 try{
1087
1088 am.getTableStateManager().setTableState(REGIONINFO.getTable(),
1089 Table.State.DISABLING);
1090 am.joinCluster();
1091
1092 assertFalse(
1093 "Assign should not be invoked for disabling table regions during clean cluster startup.",
1094 gate.get());
1095
1096 assertTrue("Table should be disabled.",
1097 am.getTableStateManager().isTableState(REGIONINFO.getTable(),
1098 Table.State.DISABLED));
1099 } finally {
1100 this.server.getConfiguration().setClass(
1101 HConstants.HBASE_MASTER_LOADBALANCER_CLASS, StochasticLoadBalancer.class,
1102 LoadBalancer.class);
1103 am.getTableStateManager().setTableState(REGIONINFO.getTable(),
1104 Table.State.ENABLED);
1105 am.shutdown();
1106 }
1107 }
1108
1109
1110
1111
1112
1113
1114
1115
1116 @Test (timeout=180000)
1117 public void testMasterRestartWhenTableInEnabling() throws KeeperException, IOException, Exception {
1118 enabling = true;
1119 List<ServerName> destServers = new ArrayList<ServerName>(1);
1120 destServers.add(SERVERNAME_A);
1121 Mockito.when(this.serverManager.createDestinationServersList()).thenReturn(destServers);
1122 Mockito.when(this.serverManager.isServerOnline(SERVERNAME_A)).thenReturn(true);
1123 HTU.getConfiguration().setInt(HConstants.MASTER_PORT, 0);
1124 CoordinatedStateManager csm = CoordinatedStateManagerFactory.getCoordinatedStateManager(
1125 HTU.getConfiguration());
1126 MasterServices server = new HMaster(HTU.getConfiguration(), csm);
1127
1128 Whitebox.setInternalState(server, "serverManager", this.serverManager);
1129 AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(server,
1130 this.serverManager);
1131
1132 Whitebox.setInternalState(server, "metaTableLocator", Mockito.mock(MetaTableLocator.class));
1133
1134
1135
1136 Whitebox.setInternalState(server, "clusterConnection", am.getConnection());
1137
1138 try {
1139
1140 am.getTableStateManager().setTableState(REGIONINFO.getTable(),
1141 Table.State.ENABLING);
1142 new EnableTableHandler((Server)server, REGIONINFO.getTable(),
1143 am, new NullTableLockManager(), true).prepare()
1144 .process();
1145 assertEquals("Number of assignments should be 1.", 1, assignmentCount);
1146 assertTrue("Table should be enabled.",
1147 am.getTableStateManager().isTableState(REGIONINFO.getTable(),
1148 Table.State.ENABLED));
1149 } finally {
1150 enabling = false;
1151 assignmentCount = 0;
1152 am.getTableStateManager().setTableState(REGIONINFO.getTable(),
1153 Table.State.ENABLED);
1154 am.shutdown();
1155 ZKAssign.deleteAllNodes(this.watcher);
1156 }
1157 }
1158
1159
1160
1161
1162
1163
1164
1165
1166 @Test (timeout=180000)
1167 public void testMasterRestartShouldRemoveStaleZnodesOfUnknownTableAsForMeta()
1168 throws Exception {
1169 List<ServerName> destServers = new ArrayList<ServerName>(1);
1170 destServers.add(SERVERNAME_A);
1171 Mockito.when(this.serverManager.createDestinationServersList()).thenReturn(destServers);
1172 Mockito.when(this.serverManager.isServerOnline(SERVERNAME_A)).thenReturn(true);
1173 HTU.getConfiguration().setInt(HConstants.MASTER_PORT, 0);
1174 CoordinatedStateManager csm = CoordinatedStateManagerFactory.getCoordinatedStateManager(
1175 HTU.getConfiguration());
1176 MasterServices server = new HMaster(HTU.getConfiguration(), csm);
1177 Whitebox.setInternalState(server, "serverManager", this.serverManager);
1178 AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(server,
1179 this.serverManager);
1180
1181 Whitebox.setInternalState(server, "metaTableLocator", Mockito.mock(MetaTableLocator.class));
1182
1183
1184
1185 Whitebox.setInternalState(server, "clusterConnection", am.getConnection());
1186
1187 try {
1188 TableName tableName = TableName.valueOf("dummyTable");
1189
1190 am.getTableStateManager().setTableState(tableName,
1191 Table.State.ENABLING);
1192 am.joinCluster();
1193 assertFalse("Table should not be present in zookeeper.",
1194 am.getTableStateManager().isTablePresent(tableName));
1195 } finally {
1196 am.shutdown();
1197 }
1198 }
1199
1200
1201
1202
1203
1204 @Test (timeout=180000)
1205 public void testSSHTimesOutOpeningRegionTransition()
1206 throws KeeperException, IOException, CoordinatedStateException, ServiceException {
1207
1208 AssignmentManagerWithExtrasForTesting am =
1209 setUpMockedAssignmentManager(this.server, this.serverManager);
1210
1211 RegionState state = new RegionState(REGIONINFO,
1212 State.OPENING, System.currentTimeMillis(), SERVERNAME_A);
1213 am.getRegionStates().regionOnline(REGIONINFO, SERVERNAME_B);
1214 am.getRegionStates().regionsInTransition.put(REGIONINFO.getEncodedName(), state);
1215
1216 am.regionPlans.put(REGIONINFO.getEncodedName(),
1217 new RegionPlan(REGIONINFO, SERVERNAME_B, SERVERNAME_A));
1218 am.getTableStateManager().setTableState(REGIONINFO.getTable(),
1219 Table.State.ENABLED);
1220
1221 try {
1222 am.assignInvoked = false;
1223 processServerShutdownHandler(am, false);
1224 assertTrue(am.assignInvoked);
1225 } finally {
1226 am.getRegionStates().regionsInTransition.remove(REGIONINFO.getEncodedName());
1227 am.regionPlans.remove(REGIONINFO.getEncodedName());
1228 am.shutdown();
1229 }
1230 }
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241 @Test (timeout=180000)
1242 public void testClosingFailureDuringRecovery() throws Exception {
1243
1244 AssignmentManagerWithExtrasForTesting am =
1245 setUpMockedAssignmentManager(this.server, this.serverManager);
1246 ZKAssign.createNodeClosing(this.watcher, REGIONINFO, SERVERNAME_A);
1247 try {
1248 am.getRegionStates().createRegionState(REGIONINFO);
1249
1250 assertFalse( am.getRegionStates().isRegionsInTransition() );
1251
1252 am.processRegionInTransition(REGIONINFO.getEncodedName(), REGIONINFO);
1253
1254 assertTrue( am.getRegionStates().isRegionsInTransition() );
1255 } finally {
1256 am.shutdown();
1257 }
1258 }
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276 private static int createNodeSplitting(final ZooKeeperWatcher zkw,
1277 final HRegionInfo region, final ServerName serverName)
1278 throws KeeperException, IOException {
1279 RegionTransition rt =
1280 RegionTransition.createRegionTransition(EventType.RS_ZK_REGION_SPLITTING,
1281 region.getRegionName(), serverName);
1282
1283 String node = ZKAssign.getNodeName(zkw, region.getEncodedName());
1284 if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.toByteArray())) {
1285 throw new IOException("Failed create of ephemeral " + node);
1286 }
1287
1288
1289 return transitionNodeSplitting(zkw, region, serverName, -1);
1290 }
1291
1292
1293
1294 private static int transitionNodeSplitting(final ZooKeeperWatcher zkw,
1295 final HRegionInfo parent,
1296 final ServerName serverName, final int version)
1297 throws KeeperException, IOException {
1298 return ZKAssign.transitionNode(zkw, parent, serverName,
1299 EventType.RS_ZK_REGION_SPLITTING, EventType.RS_ZK_REGION_SPLITTING, version);
1300 }
1301
1302 private void unassign(final AssignmentManager am, final ServerName sn,
1303 final HRegionInfo hri) throws RegionException {
1304
1305 am.regionOnline(hri, sn);
1306
1307 am.unassign(hri);
1308 }
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319 private AssignmentManagerWithExtrasForTesting setUpMockedAssignmentManager(
1320 final MasterServices server, final ServerManager manager)
1321 throws IOException, KeeperException, ServiceException, CoordinatedStateException {
1322
1323
1324
1325
1326
1327 ClientProtos.ClientService.BlockingInterface ri =
1328 Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
1329
1330 Result r = MetaMockingUtil.getMetaTableRowResult(REGIONINFO, SERVERNAME_A);
1331 final ScanResponse.Builder builder = ScanResponse.newBuilder();
1332 builder.setMoreResults(true);
1333 builder.addCellsPerResult(r.size());
1334 final List<CellScannable> rows = new ArrayList<CellScannable>(1);
1335 rows.add(r);
1336 Answer<ScanResponse> ans = new Answer<ClientProtos.ScanResponse>() {
1337 @Override
1338 public ScanResponse answer(InvocationOnMock invocation) throws Throwable {
1339 PayloadCarryingRpcController controller = (PayloadCarryingRpcController) invocation
1340 .getArguments()[0];
1341 if (controller != null) {
1342 controller.setCellScanner(CellUtil.createCellScanner(rows));
1343 }
1344 return builder.build();
1345 }
1346 };
1347 if (enabling) {
1348 Mockito.when(ri.scan((RpcController) Mockito.any(), (ScanRequest) Mockito.any()))
1349 .thenAnswer(ans).thenAnswer(ans).thenAnswer(ans).thenAnswer(ans).thenAnswer(ans)
1350 .thenReturn(ScanResponse.newBuilder().setMoreResults(false).build());
1351 } else {
1352 Mockito.when(ri.scan((RpcController) Mockito.any(), (ScanRequest) Mockito.any())).thenAnswer(
1353 ans);
1354 }
1355
1356 GetResponse.Builder getBuilder = GetResponse.newBuilder();
1357 getBuilder.setResult(ProtobufUtil.toResult(r));
1358 Mockito.when(ri.get((RpcController)Mockito.any(), (GetRequest) Mockito.any())).
1359 thenReturn(getBuilder.build());
1360
1361 ClusterConnection connection = (ClusterConnection)HConnectionTestingUtility.
1362 getMockedConnectionAndDecorate(HTU.getConfiguration(), null,
1363 ri, SERVERNAME_B, REGIONINFO);
1364 Mockito.when(this.server.getConnection()).thenReturn(connection);
1365
1366
1367
1368
1369
1370
1371 Mockito.when(connection.isManaged()).thenReturn(true);
1372
1373
1374 ExecutorService executor = startupMasterExecutor("mockedAMExecutor");
1375 this.balancer = LoadBalancerFactory.getLoadBalancer(server.getConfiguration());
1376 AssignmentManagerWithExtrasForTesting am = new AssignmentManagerWithExtrasForTesting(
1377 server, connection, manager, this.balancer, executor, new NullTableLockManager());
1378 return am;
1379 }
1380
1381
1382
1383
1384 class AssignmentManagerWithExtrasForTesting extends AssignmentManager {
1385
1386 private final ExecutorService es;
1387 boolean processRITInvoked = false;
1388 boolean assignInvoked = false;
1389 AtomicBoolean gate = new AtomicBoolean(true);
1390 private ClusterConnection connection;
1391
1392 public AssignmentManagerWithExtrasForTesting(
1393 final MasterServices master, ClusterConnection connection, final ServerManager serverManager,
1394 final LoadBalancer balancer,
1395 final ExecutorService service, final TableLockManager tableLockManager)
1396 throws KeeperException, IOException, CoordinatedStateException {
1397 super(master, serverManager, balancer, service, null, tableLockManager);
1398 this.es = service;
1399 this.connection = connection;
1400 }
1401
1402 @Override
1403 boolean processRegionInTransition(String encodedRegionName,
1404 HRegionInfo regionInfo) throws KeeperException, IOException {
1405 this.processRITInvoked = true;
1406 return super.processRegionInTransition(encodedRegionName, regionInfo);
1407 }
1408
1409 @Override
1410 public void assign(HRegionInfo region, boolean setOfflineInZK, boolean forceNewPlan) {
1411 if (enabling) {
1412 assignmentCount++;
1413 this.regionOnline(region, SERVERNAME_A);
1414 } else {
1415 super.assign(region, setOfflineInZK, forceNewPlan);
1416 this.gate.set(true);
1417 }
1418 }
1419 @Override
1420 public void assign(RegionState state,
1421 boolean setOfflineInZK, final boolean forceNewPlan) {
1422 if (enabling) {
1423 assignmentCount++;
1424 } else {
1425 super.assign(state, setOfflineInZK, forceNewPlan);
1426 this.gate.set(true);
1427 }
1428 }
1429
1430 @Override
1431 boolean assign(ServerName destination, List<HRegionInfo> regions)
1432 throws InterruptedException {
1433 if (enabling) {
1434 for (HRegionInfo region : regions) {
1435 assignmentCount++;
1436 this.regionOnline(region, SERVERNAME_A);
1437 }
1438 return true;
1439 }
1440 return super.assign(destination, regions);
1441 }
1442
1443 @Override
1444 public void assign(Map<HRegionInfo, ServerName> regionServerMap)
1445 throws IOException, InterruptedException {
1446 assignInvoked = (regionServerMap != null && regionServerMap.size() > 0);
1447 super.assign(regionServerMap);
1448 this.gate.set(true);
1449 }
1450
1451 @Override
1452 public void assign(List<HRegionInfo> regions)
1453 throws IOException, InterruptedException {
1454 assignInvoked = (regions != null && regions.size() > 0);
1455 super.assign(regions);
1456 this.gate.set(true);
1457 }
1458
1459
1460 void setWatcher(ZooKeeperWatcher watcher) {
1461 this.watcher = watcher;
1462 }
1463
1464
1465
1466
1467 ExecutorService getExecutorService() {
1468 return this.es;
1469 }
1470
1471
1472
1473
1474 ClusterConnection getConnection() {
1475 return this.connection;
1476 }
1477
1478 @Override
1479 public void shutdown() {
1480 super.shutdown();
1481 if (this.connection != null)
1482 try {
1483 this.connection.close();
1484 } catch (IOException e) {
1485 fail("Failed to close connection");
1486 }
1487 }
1488 }
1489
1490
1491
1492
1493
1494
1495
1496 private void startFakeFailedOverMasterAssignmentManager(final AssignmentManager am,
1497 final ZooKeeperWatcher watcher) {
1498
1499
1500 watcher.registerListenerFirst(am);
1501 Thread t = new Thread("RunAmJoinCluster") {
1502 @Override
1503 public void run() {
1504
1505
1506
1507
1508
1509 am.getRegionStates().regionsInTransition.clear();
1510 am.regionPlans.clear();
1511 try {
1512 am.joinCluster();
1513 } catch (IOException e) {
1514 throw new RuntimeException(e);
1515 } catch (KeeperException e) {
1516 throw new RuntimeException(e);
1517 } catch (InterruptedException e) {
1518 throw new RuntimeException(e);
1519 } catch (CoordinatedStateException e) {
1520 throw new RuntimeException(e);
1521 }
1522 }
1523 };
1524 t.start();
1525 while (!t.isAlive()) Threads.sleep(1);
1526 }
1527
1528 @Test (timeout=180000)
1529 public void testForceAssignMergingRegion() throws Exception {
1530
1531 final HRegionInfo hri = HRegionInfo.FIRST_META_REGIONINFO;
1532
1533 LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(
1534 server.getConfiguration());
1535
1536 AssignmentManager am = new AssignmentManager(this.server,
1537 this.serverManager, balancer, null, null, master.getTableLockManager());
1538 RegionStates regionStates = am.getRegionStates();
1539 try {
1540
1541 regionStates.updateRegionState(hri, RegionState.State.MERGING);
1542
1543 am.assign(hri, true, true);
1544 assertEquals("The region should be still in merging state",
1545 RegionState.State.MERGING, regionStates.getRegionState(hri).getState());
1546 } finally {
1547 am.shutdown();
1548 }
1549 }
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561 @Test (timeout=180000)
1562 public void testAssignmentEventIgnoredIfNotExpected() throws KeeperException, IOException,
1563 CoordinatedStateException {
1564
1565 final HRegionInfo hri = HRegionInfo.FIRST_META_REGIONINFO;
1566 LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(
1567 server.getConfiguration());
1568 final AtomicBoolean zkEventProcessed = new AtomicBoolean(false);
1569
1570 AssignmentManager am = new AssignmentManager(this.server,
1571 this.serverManager, balancer, null, null, master.getTableLockManager()) {
1572
1573 @Override
1574 void handleRegion(final RegionTransition rt, OpenRegionCoordination coordination,
1575 OpenRegionCoordination.OpenRegionDetails ord) {
1576 super.handleRegion(rt, coordination, ord);
1577 if (rt != null && Bytes.equals(hri.getRegionName(),
1578 rt.getRegionName()) && rt.getEventType() == EventType.RS_ZK_REGION_OPENING) {
1579 zkEventProcessed.set(true);
1580 }
1581 }
1582 };
1583 try {
1584
1585 am.getRegionStates().regionOffline(hri);
1586 zkEventProcessed.set(false);
1587 this.watcher.registerListenerFirst(am);
1588 assertFalse("The region should not be in transition",
1589 am.getRegionStates().isRegionInTransition(hri));
1590 ZKAssign.createNodeOffline(this.watcher, hri, SERVERNAME_A);
1591
1592 ZKAssign.transitionNodeOpening(this.watcher, hri, SERVERNAME_A);
1593 long startTime = EnvironmentEdgeManager.currentTime();
1594 while (!zkEventProcessed.get()) {
1595 assertTrue("Timed out in waiting for ZK event to be processed",
1596 EnvironmentEdgeManager.currentTime() - startTime < 30000);
1597 Threads.sleepWithoutInterrupt(100);
1598 }
1599 assertFalse(am.getRegionStates().isRegionInTransition(hri));
1600 } finally {
1601 am.shutdown();
1602 }
1603 }
1604
1605
1606
1607
1608
1609
1610 @Test (timeout=180000)
1611 public void testBalanceRegionOfDeletedTable() throws Exception {
1612 AssignmentManager am = new AssignmentManager(this.server, this.serverManager,
1613 balancer, null, null, master.getTableLockManager());
1614 RegionStates regionStates = am.getRegionStates();
1615 HRegionInfo hri = REGIONINFO;
1616 regionStates.createRegionState(hri);
1617 assertFalse(regionStates.isRegionInTransition(hri));
1618 RegionPlan plan = new RegionPlan(hri, SERVERNAME_A, SERVERNAME_B);
1619
1620 regionStates.tableDeleted(hri.getTable());
1621 am.balance(plan);
1622 assertFalse("The region should not in transition",
1623 regionStates.isRegionInTransition(hri));
1624 am.shutdown();
1625 }
1626
1627
1628
1629
1630
1631 @SuppressWarnings("unchecked")
1632 @Test (timeout=180000)
1633 public void testOpenCloseRegionRPCIntendedForPreviousServer() throws Exception {
1634 Mockito.when(this.serverManager.sendRegionOpen(Mockito.eq(SERVERNAME_B), Mockito.eq(REGIONINFO),
1635 Mockito.anyInt(), (List<ServerName>)Mockito.any()))
1636 .thenThrow(new DoNotRetryIOException());
1637 this.server.getConfiguration().setInt("hbase.assignment.maximum.attempts", 100);
1638
1639 HRegionInfo hri = REGIONINFO;
1640 LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(
1641 server.getConfiguration());
1642
1643 AssignmentManager am = new AssignmentManager(this.server,
1644 this.serverManager, balancer, null, null, master.getTableLockManager());
1645 RegionStates regionStates = am.getRegionStates();
1646 try {
1647 am.regionPlans.put(REGIONINFO.getEncodedName(),
1648 new RegionPlan(REGIONINFO, null, SERVERNAME_B));
1649
1650
1651 am.assign(hri, true, false);
1652 } finally {
1653 assertEquals(SERVERNAME_A, regionStates.getRegionState(REGIONINFO).getServerName());
1654 am.shutdown();
1655 }
1656 }
1657
1658
1659
1660
1661 @Test (timeout=180000)
1662 public void testCloseRegionOnAbortingRS() throws Exception {
1663 this.server.getConfiguration().setInt("hbase.assignment.maximum.attempts", 2);
1664
1665 HRegionInfo hri = REGIONINFO;
1666 LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(
1667 server.getConfiguration());
1668
1669 AssignmentManager am = new AssignmentManager(this.server,
1670 this.serverManager, balancer, null, null, master.getTableLockManager());
1671 RegionStates regionStates = am.getRegionStates();
1672
1673 regionStates.createRegionState(hri, State.OPEN, SERVERNAME_B, SERVERNAME_B);
1674
1675
1676 Mockito.when(this.serverManager.sendRegionClose(Mockito.eq(SERVERNAME_B), Mockito.eq(REGIONINFO),
1677 Mockito.anyInt(), (ServerName)Mockito.any(), Mockito.anyBoolean()))
1678 .thenThrow(new RegionServerAbortedException(""));
1679
1680
1681 am.unassign(hri);
1682
1683
1684 assertEquals(State.FAILED_CLOSE, regionStates.getRegionState(REGIONINFO).getState());
1685 assertEquals(SERVERNAME_B, regionStates.getRegionState(REGIONINFO).getServerName());
1686
1687 am.shutdown();
1688 }
1689
1690
1691
1692
1693 @Test (timeout=180000)
1694 public void testCloseRegionOnServerNotOnline() throws Exception {
1695 this.server.getConfiguration().setInt("hbase.assignment.maximum.attempts", 2);
1696
1697 HRegionInfo hri = REGIONINFO;
1698 LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(
1699 server.getConfiguration());
1700
1701 AssignmentManager am = new AssignmentManager(this.server,
1702 this.serverManager, balancer, null, null, master.getTableLockManager()) {
1703 @Override
1704 protected boolean wasRegionOnDeadServerByMeta(HRegionInfo region, ServerName sn) {
1705 return true;
1706 };
1707 };
1708 RegionStates regionStates = am.getRegionStates();
1709
1710 regionStates.createRegionState(hri, State.OPEN, SERVERNAME_B, SERVERNAME_B);
1711
1712
1713 Mockito.when(this.serverManager.isServerOnline(SERVERNAME_B))
1714 .thenReturn(false);
1715
1716
1717 am.unassign(hri);
1718
1719
1720 assertEquals(State.OFFLINE, regionStates.getRegionState(REGIONINFO).getState());
1721
1722
1723 am.regionPlans.put(REGIONINFO.getEncodedName(),
1724 new RegionPlan(REGIONINFO, null, SERVERNAME_A));
1725 am.assign(hri, true, false);
1726
1727
1728 assertEquals(State.OFFLINE, regionStates.getRegionState(REGIONINFO).getState());
1729
1730 am.shutdown();
1731 }
1732 }