1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.Comparator;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.Iterator;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.NavigableMap;
34 import java.util.Random;
35 import java.util.Set;
36 import java.util.TreeMap;
37 import java.util.concurrent.Callable;
38 import java.util.concurrent.ConcurrentHashMap;
39 import java.util.concurrent.CopyOnWriteArrayList;
40 import java.util.concurrent.ThreadFactory;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.atomic.AtomicBoolean;
43 import java.util.concurrent.atomic.AtomicInteger;
44 import java.util.concurrent.locks.Lock;
45 import java.util.concurrent.locks.ReentrantLock;
46
47 import org.apache.commons.logging.Log;
48 import org.apache.commons.logging.LogFactory;
49 import org.apache.hadoop.conf.Configuration;
50 import org.apache.hadoop.fs.FileSystem;
51 import org.apache.hadoop.fs.Path;
52 import org.apache.hadoop.hbase.CoordinatedStateException;
53 import org.apache.hadoop.hbase.HBaseIOException;
54 import org.apache.hadoop.hbase.HConstants;
55 import org.apache.hadoop.hbase.HRegionInfo;
56 import org.apache.hadoop.hbase.HRegionLocation;
57 import org.apache.hadoop.hbase.HTableDescriptor;
58 import org.apache.hadoop.hbase.MetaTableAccessor;
59 import org.apache.hadoop.hbase.NotServingRegionException;
60 import org.apache.hadoop.hbase.RegionLocations;
61 import org.apache.hadoop.hbase.RegionTransition;
62 import org.apache.hadoop.hbase.ServerName;
63 import org.apache.hadoop.hbase.TableName;
64 import org.apache.hadoop.hbase.TableNotFoundException;
65 import org.apache.hadoop.hbase.TableStateManager;
66 import org.apache.hadoop.hbase.classification.InterfaceAudience;
67 import org.apache.hadoop.hbase.client.Admin;
68 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
69 import org.apache.hadoop.hbase.client.Result;
70 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
71 import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
72 import org.apache.hadoop.hbase.coordination.RegionMergeCoordination;
73 import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination.SplitTransactionDetails;
74 import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
75 import org.apache.hadoop.hbase.coordination.ZkRegionMergeCoordination;
76 import org.apache.hadoop.hbase.exceptions.DeserializationException;
77 import org.apache.hadoop.hbase.executor.EventHandler;
78 import org.apache.hadoop.hbase.executor.EventType;
79 import org.apache.hadoop.hbase.executor.ExecutorService;
80 import org.apache.hadoop.hbase.ipc.FailedServerException;
81 import org.apache.hadoop.hbase.ipc.RpcClient;
82 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
83 import org.apache.hadoop.hbase.master.RegionState.State;
84 import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
85 import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
86 import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
87 import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
88 import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
89 import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
90 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
91 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
92 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
93 import org.apache.hadoop.hbase.quotas.RegionStateListener;
94 import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
95 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
96 import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
97 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
98 import org.apache.hadoop.hbase.util.ConfigUtil;
99 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
100 import org.apache.hadoop.hbase.util.FSUtils;
101 import org.apache.hadoop.hbase.util.KeyLocker;
102 import org.apache.hadoop.hbase.util.Pair;
103 import org.apache.hadoop.hbase.util.PairOfSameType;
104 import org.apache.hadoop.hbase.util.Threads;
105 import org.apache.hadoop.hbase.util.Triple;
106 import org.apache.hadoop.hbase.util.VersionInfo;
107 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
108 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
109 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
110 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
111 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
112 import org.apache.hadoop.ipc.RemoteException;
113 import org.apache.hadoop.util.StringUtils;
114 import org.apache.zookeeper.AsyncCallback;
115 import org.apache.zookeeper.KeeperException;
116 import org.apache.zookeeper.KeeperException.NoNodeException;
117 import org.apache.zookeeper.KeeperException.NodeExistsException;
118 import org.apache.zookeeper.data.Stat;
119
120 import com.google.common.annotations.VisibleForTesting;
121 import com.google.common.collect.LinkedHashMultimap;
122
123
124
125
126
127
128
129
130 @InterfaceAudience.Private
131 public class AssignmentManager extends ZooKeeperListener {
132 private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
133
134 public static final ServerName HBCK_CODE_SERVERNAME = ServerName.valueOf(HConstants.HBCK_CODE_NAME,
135 -1, -1L);
136
137 static final String ALREADY_IN_TRANSITION_WAITTIME
138 = "hbase.assignment.already.intransition.waittime";
139 static final int DEFAULT_ALREADY_IN_TRANSITION_WAITTIME = 60000;
140
141 protected final MasterServices server;
142
143 private ServerManager serverManager;
144
145 private boolean shouldAssignRegionsWithFavoredNodes;
146
147 private LoadBalancer balancer;
148
149 private final MetricsAssignmentManager metricsAssignmentManager;
150
151 private final TableLockManager tableLockManager;
152
153 private AtomicInteger numRegionsOpened = new AtomicInteger(0);
154
155 final private KeyLocker<String> locker = new KeyLocker<String>();
156
157 Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>());
158
159
160
161
162
163 private final Map <String, HRegionInfo> regionsToReopen;
164
165
166
167
168
169 private final int maximumAttempts;
170
171
172
173
174 private final Map<String, PairOfSameType<HRegionInfo>> mergingRegions
175 = new HashMap<String, PairOfSameType<HRegionInfo>>();
176
177 private final Map<HRegionInfo, PairOfSameType<HRegionInfo>> splitRegions
178 = new HashMap<HRegionInfo, PairOfSameType<HRegionInfo>>();
179
180
181
182
183
184 private final long sleepTimeBeforeRetryingMetaAssignment;
185
186
187
188
189
190 final NavigableMap<String, RegionPlan> regionPlans =
191 new TreeMap<String, RegionPlan>();
192
193 private final TableStateManager tableStateManager;
194
195 private final ExecutorService executorService;
196
197
198 private Map<HRegionInfo, AtomicBoolean> closedRegionHandlerCalled = null;
199
200
201 private Map<HRegionInfo, AtomicBoolean> openedRegionHandlerCalled = null;
202
203
204 private java.util.concurrent.ExecutorService threadPoolExecutorService;
205
206
207 private final java.util.concurrent.ExecutorService zkEventWorkers;
208
209 private List<EventType> ignoreStatesRSOffline = Arrays.asList(
210 EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED);
211
212 private final RegionStates regionStates;
213
214
215
216
217
218 private final int bulkAssignThresholdRegions;
219 private final int bulkAssignThresholdServers;
220 private final int bulkPerRegionOpenTimeGuesstimate;
221
222
223
224
225 private final boolean bulkAssignWaitTillAllAssigned;
226
227
228
229
230
231
232
233
234
235 protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
236
237
238
239
240
241
242
243
244 private final ConcurrentHashMap<String, AtomicInteger>
245 failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
246
247
248 private final boolean useZKForAssignment;
249
250
251
252 private final RegionStateStore regionStateStore;
253
254
255
256
257 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
258 public static boolean TEST_SKIP_SPLIT_HANDLING = false;
259
260
261 private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
262
263 private RegionStateListener regionStateListener;
264
265 public enum ServerHostRegion {
266 NOT_HOSTING_REGION, HOSTING_REGION, UNKNOWN,
267 }
268
269 private final Object checkIfShouldMoveSystemRegionLock = new Object();
270
271
272
273
274
275
276
277
278
279
280
281
282
283 public AssignmentManager(MasterServices server, ServerManager serverManager,
284 final LoadBalancer balancer,
285 final ExecutorService service, MetricsMaster metricsMaster,
286 final TableLockManager tableLockManager) throws KeeperException,
287 IOException, CoordinatedStateException {
288 super(server.getZooKeeper());
289 this.server = server;
290 this.serverManager = serverManager;
291 this.executorService = service;
292 this.regionStateStore = new RegionStateStore(server);
293 this.regionsToReopen = Collections.synchronizedMap
294 (new HashMap<String, HRegionInfo> ());
295 Configuration conf = server.getConfiguration();
296
297 this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
298 HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
299 FavoredNodeLoadBalancer.class);
300 try {
301 if (server.getCoordinatedStateManager() != null) {
302 this.tableStateManager = server.getCoordinatedStateManager().getTableStateManager();
303 } else {
304 this.tableStateManager = null;
305 }
306 } catch (InterruptedException e) {
307 throw new InterruptedIOException();
308 }
309
310 this.maximumAttempts = Math.max(1,
311 this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
312 this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
313 "hbase.meta.assignment.retry.sleeptime", 1000l);
314 this.balancer = balancer;
315 int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
316 this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
317 maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
318 this.regionStates = new RegionStates(
319 server, tableStateManager, serverManager, regionStateStore);
320
321 this.bulkAssignWaitTillAllAssigned =
322 conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
323 this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
324 this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
325 this.bulkPerRegionOpenTimeGuesstimate =
326 conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000);
327
328 int workers = conf.getInt("hbase.assignment.zkevent.workers", 20);
329 ThreadFactory threadFactory = Threads.newDaemonThreadFactory("AM.ZK.Worker");
330 zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L,
331 TimeUnit.SECONDS, threadFactory);
332 this.tableLockManager = tableLockManager;
333
334 this.metricsAssignmentManager = new MetricsAssignmentManager();
335 useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
336 }
337
338 MetricsAssignmentManager getAssignmentManagerMetrics() {
339 return this.metricsAssignmentManager;
340 }
341
342
343
344
345
346 public void registerListener(final AssignmentListener listener) {
347 this.listeners.add(listener);
348 }
349
350
351
352
353
354 public boolean unregisterListener(final AssignmentListener listener) {
355 return this.listeners.remove(listener);
356 }
357
358
359
360
361 public TableStateManager getTableStateManager() {
362
363
364 return this.tableStateManager;
365 }
366
367
368
369
370
371
372
373 public RegionStates getRegionStates() {
374 return regionStates;
375 }
376
377
378
379
380 @VisibleForTesting
381 RegionStateStore getRegionStateStore() {
382 return regionStateStore;
383 }
384
385 public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
386 return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
387 }
388
389
390
391
392
393
394 public void addPlan(String encodedName, RegionPlan plan) {
395 synchronized (regionPlans) {
396 regionPlans.put(encodedName, plan);
397 }
398 }
399
400
401
402
403 public void addPlans(Map<String, RegionPlan> plans) {
404 synchronized (regionPlans) {
405 regionPlans.putAll(plans);
406 }
407 }
408
409
410
411
412
413
414
415
416 public void setRegionsToReopen(List <HRegionInfo> regions) {
417 for(HRegionInfo hri : regions) {
418 regionsToReopen.put(hri.getEncodedName(), hri);
419 }
420 }
421
422
423
424
425
426
427
428
429 public Pair<Integer, Integer> getReopenStatus(TableName tableName)
430 throws IOException {
431 List<HRegionInfo> hris;
432 if (TableName.META_TABLE_NAME.equals(tableName)) {
433 hris = new MetaTableLocator().getMetaRegions(server.getZooKeeper());
434 } else {
435 hris = MetaTableAccessor.getTableRegions(server.getZooKeeper(),
436 server.getConnection(), tableName, true);
437 }
438
439 Integer pending = 0;
440 for (HRegionInfo hri : hris) {
441 String name = hri.getEncodedName();
442
443 if (regionsToReopen.containsKey(name)
444 || regionStates.isRegionInTransition(name)) {
445 pending++;
446 }
447 }
448 return new Pair<Integer, Integer>(pending, hris.size());
449 }
450
451
452
453
454
455
456 public boolean isFailoverCleanupDone() {
457 return failoverCleanupDone.get();
458 }
459
460
461
462
463
464 public Lock acquireRegionLock(final String encodedName) {
465 return locker.acquireLock(encodedName);
466 }
467
468
469
470
471
472 void failoverCleanupDone() {
473 failoverCleanupDone.set(true);
474 serverManager.processQueuedDeadServers();
475 }
476
477
478
479
480
481
482
483
484
485 void joinCluster() throws IOException,
486 KeeperException, InterruptedException, CoordinatedStateException {
487 long startTime = System.currentTimeMillis();
488
489
490
491
492
493
494
495
496
497
498
499 Set<ServerName> deadServers = rebuildUserRegions();
500
501
502
503
504 boolean failover = processDeadServersAndRegionsInTransition(deadServers);
505
506 if (!useZKForAssignment) {
507
508 ZKUtil.deleteNodeRecursively(watcher, watcher.assignmentZNode);
509 }
510 recoverTableInDisablingState();
511 recoverTableInEnablingState();
512 LOG.info("Joined the cluster in " + (System.currentTimeMillis()
513 - startTime) + "ms, failover=" + failover);
514 }
515
516
517
518
519
520
521
522
523
524
525
526
527 boolean processDeadServersAndRegionsInTransition(
528 final Set<ServerName> deadServers) throws KeeperException,
529 IOException, InterruptedException, CoordinatedStateException {
530 List<String> nodes = ZKUtil.listChildrenNoWatch(watcher,
531 watcher.assignmentZNode);
532
533 if (useZKForAssignment && nodes == null) {
534 String errorMessage = "Failed to get the children from ZK";
535 server.abort(errorMessage, new IOException(errorMessage));
536 return true;
537 }
538
539 boolean failover = !serverManager.getDeadServers().isEmpty();
540 if (failover) {
541
542 if (LOG.isDebugEnabled()) {
543 LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
544 }
545 } else {
546
547 Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
548 for (Map.Entry<HRegionInfo, ServerName> en:
549 regionStates.getRegionAssignments().entrySet()) {
550 HRegionInfo hri = en.getKey();
551 if (!hri.isMetaTable()
552 && onlineServers.contains(en.getValue())) {
553 LOG.debug("Found " + hri + " out on cluster");
554 failover = true;
555 break;
556 }
557 }
558 if (!failover && nodes != null) {
559
560 for (String encodedName: nodes) {
561 RegionState regionState = regionStates.getRegionState(encodedName);
562 if (regionState != null && !regionState.getRegion().isMetaRegion()) {
563 LOG.debug("Found " + regionState + " in RITs");
564 failover = true;
565 break;
566 }
567 }
568 }
569 }
570 if (!failover && !useZKForAssignment) {
571
572 Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
573 if (!regionsInTransition.isEmpty()) {
574 Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
575 for (RegionState regionState: regionsInTransition.values()) {
576 ServerName serverName = regionState.getServerName();
577 if (!regionState.getRegion().isMetaRegion()
578 && serverName != null && onlineServers.contains(serverName)) {
579 LOG.debug("Found " + regionState + " in RITs");
580 failover = true;
581 break;
582 }
583 }
584 }
585 }
586 if (!failover) {
587
588
589
590
591 Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
592 if (!queuedDeadServers.isEmpty()) {
593 Configuration conf = server.getConfiguration();
594 Path walRootDir = FSUtils.getWALRootDir(conf);
595 FileSystem walFs = FSUtils.getWALFileSystem(conf);
596 for (ServerName serverName: queuedDeadServers) {
597
598
599 Path walDir = new Path(walRootDir,
600 DefaultWALProvider.getWALDirectoryName(serverName.toString()));
601 Path splitDir = walDir.suffix(DefaultWALProvider.SPLITTING_EXT);
602 if (walFs.exists(walDir) || walFs.exists(splitDir)) {
603 LOG.debug("Found queued dead server " + serverName);
604 failover = true;
605 break;
606 }
607 }
608 if (!failover) {
609
610
611 LOG.info("AM figured that it's not a failover and cleaned up "
612 + queuedDeadServers.size() + " queued dead servers");
613 serverManager.removeRequeuedDeadServers();
614 }
615 }
616 }
617
618 Set<TableName> disabledOrDisablingOrEnabling = null;
619 Map<HRegionInfo, ServerName> allRegions = null;
620
621 if (!failover) {
622 disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
623 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING,
624 ZooKeeperProtos.Table.State.ENABLING);
625
626
627 allRegions = regionStates.closeAllUserRegions(
628 disabledOrDisablingOrEnabling);
629 }
630
631
632 regionStateStore.start();
633
634
635 if (failover) {
636 LOG.info("Found regions out on cluster or in RIT; presuming failover");
637
638
639 processDeadServersAndRecoverLostRegions(deadServers);
640 }
641
642 if (!failover && useZKForAssignment) {
643
644 ZKAssign.deleteAllNodes(watcher);
645 ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
646 this.watcher.assignmentZNode);
647 }
648
649
650
651
652
653 failoverCleanupDone();
654 if (!failover) {
655
656 LOG.info("Clean cluster startup. Assigning user regions");
657 assignAllUserRegions(allRegions);
658 }
659
660
661
662 for (HRegionInfo h : replicasToClose) {
663 unassign(h);
664 }
665 replicasToClose.clear();
666 return failover;
667 }
668
669
670
671
672
673
674
675
676
677
678
679
680 boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)
681 throws InterruptedException, KeeperException, IOException {
682 String encodedRegionName = hri.getEncodedName();
683 if (!processRegionInTransition(encodedRegionName, hri)) {
684 return false;
685 }
686 LOG.debug("Waiting on " + HRegionInfo.prettyPrint(encodedRegionName));
687 while (!this.server.isStopped() &&
688 this.regionStates.isRegionInTransition(encodedRegionName)) {
689 RegionState state = this.regionStates.getRegionTransitionState(encodedRegionName);
690 if (state == null || !serverManager.isServerOnline(state.getServerName())) {
691
692
693
694 break;
695 }
696 this.regionStates.waitForUpdate(100);
697 }
698 return true;
699 }
700
701
702
703
704
705
706
707
708
709
710 boolean processRegionInTransition(final String encodedRegionName,
711 final HRegionInfo regionInfo) throws KeeperException, IOException {
712
713
714
715
716 Lock lock = locker.acquireLock(encodedRegionName);
717 try {
718 Stat stat = new Stat();
719 byte [] data = ZKAssign.getDataAndWatch(watcher, encodedRegionName, stat);
720 if (data == null) return false;
721 RegionTransition rt;
722 try {
723 rt = RegionTransition.parseFrom(data);
724 } catch (DeserializationException e) {
725 LOG.warn("Failed parse znode data", e);
726 return false;
727 }
728 HRegionInfo hri = regionInfo;
729 if (hri == null) {
730
731
732
733
734
735 hri = regionStates.getRegionInfo(rt.getRegionName());
736 EventType et = rt.getEventType();
737 if (hri == null && et != EventType.RS_ZK_REGION_MERGING
738 && et != EventType.RS_ZK_REQUEST_REGION_MERGE) {
739 LOG.warn("Couldn't find the region in recovering " + rt);
740 return false;
741 }
742 }
743
744
745
746 BaseCoordinatedStateManager cp =
747 (BaseCoordinatedStateManager) this.server.getCoordinatedStateManager();
748 OpenRegionCoordination openRegionCoordination = cp.getOpenRegionCoordination();
749
750 ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
751 new ZkOpenRegionCoordination.ZkOpenRegionDetails();
752 zkOrd.setVersion(stat.getVersion());
753 zkOrd.setServerName(cp.getServer().getServerName());
754
755 return processRegionsInTransition(
756 rt, hri, openRegionCoordination, zkOrd);
757 } finally {
758 lock.unlock();
759 }
760 }
761
762
763
764
765
766
767
768
769
770 boolean processRegionsInTransition(
771 final RegionTransition rt, final HRegionInfo regionInfo,
772 OpenRegionCoordination coordination,
773 final OpenRegionCoordination.OpenRegionDetails ord) throws KeeperException {
774 EventType et = rt.getEventType();
775
776 final ServerName sn = rt.getServerName();
777 final byte[] regionName = rt.getRegionName();
778 final String encodedName = HRegionInfo.encodeRegionName(regionName);
779 final String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
780 LOG.info("Processing " + prettyPrintedRegionName + " in state: " + et);
781
782 if (regionStates.isRegionInTransition(encodedName)
783 && (regionInfo.isMetaRegion() || !useZKForAssignment)) {
784 LOG.info("Processed region " + prettyPrintedRegionName + " in state: "
785 + et + ", does nothing since the region is already in transition "
786 + regionStates.getRegionTransitionState(encodedName));
787
788 return true;
789 }
790 if (!serverManager.isServerOnline(sn)) {
791
792
793
794 LOG.debug("RIT " + encodedName + " in state=" + rt.getEventType() +
795 " was on deadserver; forcing offline");
796 if (regionStates.isRegionOnline(regionInfo)) {
797
798
799
800 regionStates.regionOffline(regionInfo);
801 sendRegionClosedNotification(regionInfo);
802 }
803
804 regionStates.updateRegionState(regionInfo, State.OFFLINE, sn);
805
806 if (regionInfo.isMetaRegion()) {
807
808
809 MetaTableLocator.setMetaLocation(watcher, sn, State.OPEN);
810 } else {
811
812
813 regionStates.setLastRegionServerOfRegion(sn, encodedName);
814
815 if (!serverManager.isServerDead(sn)) {
816 serverManager.expireServer(sn);
817 }
818 }
819 return false;
820 }
821 switch (et) {
822 case M_ZK_REGION_CLOSING:
823
824
825 final RegionState rsClosing = regionStates.updateRegionState(rt, State.CLOSING);
826 this.executorService.submit(
827 new EventHandler(server, EventType.M_MASTER_RECOVERY) {
828 @Override
829 public void process() throws IOException {
830 ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
831 try {
832 final int expectedVersion = ((ZkOpenRegionCoordination.ZkOpenRegionDetails) ord)
833 .getVersion();
834 unassign(regionInfo, rsClosing, expectedVersion, null, useZKForAssignment, null);
835 if (regionStates.isRegionOffline(regionInfo)) {
836 assign(regionInfo, true);
837 }
838 } finally {
839 lock.unlock();
840 }
841 }
842 });
843 break;
844
845 case RS_ZK_REGION_CLOSED:
846 case RS_ZK_REGION_FAILED_OPEN:
847
848 regionStates.setRegionStateTOCLOSED(regionInfo, sn);
849 if (!replicasToClose.contains(regionInfo)) {
850 invokeAssign(regionInfo);
851 } else {
852 offlineDisabledRegion(regionInfo);
853 }
854 break;
855
856 case M_ZK_REGION_OFFLINE:
857
858 regionStates.updateRegionState(rt, State.OFFLINE);
859 final RegionState rsOffline = regionStates.getRegionState(regionInfo);
860 this.executorService.submit(
861 new EventHandler(server, EventType.M_MASTER_RECOVERY) {
862 @Override
863 public void process() throws IOException {
864 ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
865 try {
866 RegionPlan plan = new RegionPlan(regionInfo, null, sn);
867 addPlan(encodedName, plan);
868 assign(rsOffline, true, false);
869 } finally {
870 lock.unlock();
871 }
872 }
873 });
874 break;
875
876 case RS_ZK_REGION_OPENING:
877 regionStates.updateRegionState(rt, State.OPENING);
878 break;
879
880 case RS_ZK_REGION_OPENED:
881
882
883
884 regionStates.updateRegionState(rt, State.OPEN);
885 new OpenedRegionHandler(server, this, regionInfo, coordination, ord).process();
886 break;
887 case RS_ZK_REQUEST_REGION_SPLIT:
888 case RS_ZK_REGION_SPLITTING:
889 case RS_ZK_REGION_SPLIT:
890
891
892
893 regionStates.regionOnline(regionInfo, sn);
894 regionStates.updateRegionState(rt, State.SPLITTING);
895 if (!handleRegionSplitting(
896 rt, encodedName, prettyPrintedRegionName, sn)) {
897 deleteSplittingNode(encodedName, sn);
898 }
899 break;
900 case RS_ZK_REQUEST_REGION_MERGE:
901 case RS_ZK_REGION_MERGING:
902 case RS_ZK_REGION_MERGED:
903 if (!handleRegionMerging(
904 rt, encodedName, prettyPrintedRegionName, sn)) {
905 deleteMergingNode(encodedName, sn);
906 }
907 break;
908 default:
909 throw new IllegalStateException("Received region in state:" + et + " is not valid.");
910 }
911 LOG.info("Processed region " + prettyPrintedRegionName + " in state "
912 + et + ", on " + (serverManager.isServerOnline(sn) ? "" : "dead ")
913 + "server: " + sn);
914 return true;
915 }
916
917
918
919
920
921 public void removeClosedRegion(HRegionInfo hri) {
922 if (regionsToReopen.remove(hri.getEncodedName()) != null) {
923 LOG.debug("Removed region from reopening regions because it was closed");
924 }
925 }
926
927
928
929
930
931
932
933
934
935
936
937
938 void handleRegion(final RegionTransition rt, OpenRegionCoordination coordination,
939 OpenRegionCoordination.OpenRegionDetails ord) {
940 if (rt == null) {
941 LOG.warn("Unexpected NULL input for RegionTransition rt");
942 return;
943 }
944 final ServerName sn = rt.getServerName();
945
946 if (sn.equals(HBCK_CODE_SERVERNAME)) {
947 handleHBCK(rt);
948 return;
949 }
950 final long createTime = rt.getCreateTime();
951 final byte[] regionName = rt.getRegionName();
952 String encodedName = HRegionInfo.encodeRegionName(regionName);
953 String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
954
955 if (!serverManager.isServerOnline(sn)
956 && !ignoreStatesRSOffline.contains(rt.getEventType())) {
957 LOG.warn("Attempted to handle region transition for server but " +
958 "it is not online: " + prettyPrintedRegionName + ", " + rt);
959 return;
960 }
961
962 RegionState regionState =
963 regionStates.getRegionState(encodedName);
964 long startTime = System.currentTimeMillis();
965 if (LOG.isDebugEnabled()) {
966 boolean lateEvent = createTime < (startTime - 15000);
967 LOG.debug("Handling " + rt.getEventType() +
968 ", server=" + sn + ", region=" +
969 (prettyPrintedRegionName == null ? "null" : prettyPrintedRegionName) +
970 (lateEvent ? ", which is more than 15 seconds late" : "") +
971 ", current_state=" + regionState);
972 }
973
974
975 if (rt.getEventType() == EventType.M_ZK_REGION_OFFLINE) {
976 return;
977 }
978
979
980 Lock lock = locker.acquireLock(encodedName);
981 try {
982 RegionState latestState =
983 regionStates.getRegionState(encodedName);
984 if ((regionState == null && latestState != null)
985 || (regionState != null && latestState == null)
986 || (regionState != null && latestState != null
987 && latestState.getState() != regionState.getState())) {
988 LOG.warn("Region state changed from " + regionState + " to "
989 + latestState + ", while acquiring lock");
990 }
991 long waitedTime = System.currentTimeMillis() - startTime;
992 if (waitedTime > 5000) {
993 LOG.warn("Took " + waitedTime + "ms to acquire the lock");
994 }
995 regionState = latestState;
996 switch (rt.getEventType()) {
997 case RS_ZK_REQUEST_REGION_SPLIT:
998 case RS_ZK_REGION_SPLITTING:
999 case RS_ZK_REGION_SPLIT:
1000 if (!handleRegionSplitting(
1001 rt, encodedName, prettyPrintedRegionName, sn)) {
1002 deleteSplittingNode(encodedName, sn);
1003 }
1004 break;
1005
1006 case RS_ZK_REQUEST_REGION_MERGE:
1007 case RS_ZK_REGION_MERGING:
1008 case RS_ZK_REGION_MERGED:
1009
1010
1011 if (!handleRegionMerging(
1012 rt, encodedName, prettyPrintedRegionName, sn)) {
1013 deleteMergingNode(encodedName, sn);
1014 }
1015 break;
1016
1017 case M_ZK_REGION_CLOSING:
1018
1019
1020 if (regionState == null
1021 || !regionState.isPendingCloseOrClosingOnServer(sn)) {
1022 LOG.warn("Received CLOSING for " + prettyPrintedRegionName
1023 + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
1024 + regionStates.getRegionState(encodedName));
1025 return;
1026 }
1027
1028 regionStates.updateRegionState(rt, State.CLOSING);
1029 break;
1030
1031 case RS_ZK_REGION_CLOSED:
1032
1033 if (regionState == null
1034 || !regionState.isPendingCloseOrClosingOnServer(sn)) {
1035 LOG.warn("Received CLOSED for " + prettyPrintedRegionName
1036 + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
1037 + regionStates.getRegionState(encodedName));
1038 return;
1039 }
1040
1041
1042
1043 new ClosedRegionHandler(server, this, regionState.getRegion()).process();
1044 updateClosedRegionHandlerTracker(regionState.getRegion());
1045 break;
1046
1047 case RS_ZK_REGION_FAILED_OPEN:
1048 if (regionState == null
1049 || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1050 LOG.warn("Received FAILED_OPEN for " + prettyPrintedRegionName
1051 + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1052 + regionStates.getRegionState(encodedName));
1053 return;
1054 }
1055 AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
1056 if (failedOpenCount == null) {
1057 failedOpenCount = new AtomicInteger();
1058
1059
1060
1061 failedOpenTracker.put(encodedName, failedOpenCount);
1062 }
1063 if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
1064 regionStates.updateRegionState(rt, State.FAILED_OPEN);
1065
1066
1067 failedOpenTracker.remove(encodedName);
1068 } else {
1069
1070 regionState = regionStates.setRegionStateTOCLOSED(rt.getRegionName(), sn);
1071 if (regionState != null) {
1072
1073
1074 getRegionPlan(regionState.getRegion(), sn, true);
1075 new ClosedRegionHandler(server, this, regionState.getRegion()).process();
1076 }
1077 }
1078 break;
1079
1080 case RS_ZK_REGION_OPENING:
1081
1082
1083 if (regionState == null
1084 || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1085 LOG.warn("Received OPENING for " + prettyPrintedRegionName
1086 + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1087 + regionStates.getRegionState(encodedName));
1088 return;
1089 }
1090
1091 regionStates.updateRegionState(rt, State.OPENING);
1092 break;
1093
1094 case RS_ZK_REGION_OPENED:
1095
1096 if (regionState == null
1097 || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1098 LOG.warn("Received OPENED for " + prettyPrintedRegionName
1099 + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1100 + regionStates.getRegionState(encodedName));
1101
1102 if (regionState != null) {
1103 if(regionState.isOpened() && regionState.getServerName().equals(sn)) {
1104
1105
1106 failedOpenTracker.remove(encodedName);
1107 new OpenedRegionHandler(
1108 server, this, regionState.getRegion(), coordination, ord).process();
1109 updateOpenedRegionHandlerTracker(regionState.getRegion());
1110 } else {
1111
1112
1113
1114 unassign(regionState.getRegion(), null, -1, null, false, sn);
1115 }
1116 }
1117 return;
1118 }
1119
1120 regionState =
1121 regionStates.transitionOpenFromPendingOpenOrOpeningOnServer(rt,regionState, sn);
1122 if (regionState != null) {
1123 failedOpenTracker.remove(encodedName);
1124 new OpenedRegionHandler(
1125 server, this, regionState.getRegion(), coordination, ord).process();
1126 updateOpenedRegionHandlerTracker(regionState.getRegion());
1127 }
1128 break;
1129
1130 default:
1131 throw new IllegalStateException("Received event is not valid.");
1132 }
1133 } finally {
1134 lock.unlock();
1135 }
1136 }
1137
1138
1139 boolean wasClosedHandlerCalled(HRegionInfo hri) {
1140 AtomicBoolean b = closedRegionHandlerCalled.get(hri);
1141
1142
1143
1144 return b == null ? false : b.compareAndSet(true, false);
1145 }
1146
1147
1148 boolean wasOpenedHandlerCalled(HRegionInfo hri) {
1149 AtomicBoolean b = openedRegionHandlerCalled.get(hri);
1150
1151
1152
1153 return b == null ? false : b.compareAndSet(true, false);
1154 }
1155
1156
1157 void initializeHandlerTrackers() {
1158 closedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1159 openedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1160 }
1161
1162 void updateClosedRegionHandlerTracker(HRegionInfo hri) {
1163 if (closedRegionHandlerCalled != null) {
1164 closedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1165 }
1166 }
1167
1168 void updateOpenedRegionHandlerTracker(HRegionInfo hri) {
1169 if (openedRegionHandlerCalled != null) {
1170 openedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1171 }
1172 }
1173
1174
1175
1176
1177
1178
1179 void processFavoredNodes(List<HRegionInfo> regions) throws IOException {
1180 if (!shouldAssignRegionsWithFavoredNodes) return;
1181
1182
1183 Map<HRegionInfo, List<ServerName>> regionToFavoredNodes =
1184 new HashMap<HRegionInfo, List<ServerName>>();
1185 for (HRegionInfo region : regions) {
1186 regionToFavoredNodes.put(region,
1187 ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region));
1188 }
1189 FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes,
1190 this.server.getConnection());
1191 }
1192
1193
1194
1195
1196
1197
1198
1199 @SuppressWarnings("deprecation")
1200 private void handleHBCK(RegionTransition rt) {
1201 String encodedName = HRegionInfo.encodeRegionName(rt.getRegionName());
1202 LOG.info("Handling HBCK triggered transition=" + rt.getEventType() +
1203 ", server=" + rt.getServerName() + ", region=" +
1204 HRegionInfo.prettyPrint(encodedName));
1205 RegionState regionState = regionStates.getRegionTransitionState(encodedName);
1206 switch (rt.getEventType()) {
1207 case M_ZK_REGION_OFFLINE:
1208 HRegionInfo regionInfo;
1209 if (regionState != null) {
1210 regionInfo = regionState.getRegion();
1211 } else {
1212 try {
1213 byte [] name = rt.getRegionName();
1214 Pair<HRegionInfo, ServerName> p = MetaTableAccessor.getRegion(
1215 this.server.getConnection(), name);
1216 regionInfo = p.getFirst();
1217 } catch (IOException e) {
1218 LOG.info("Exception reading hbase:meta doing HBCK repair operation", e);
1219 return;
1220 }
1221 }
1222 LOG.info("HBCK repair is triggering assignment of region=" +
1223 regionInfo.getRegionNameAsString());
1224
1225 assign(regionInfo, false);
1226 break;
1227
1228 default:
1229 LOG.warn("Received unexpected region state from HBCK: " + rt.toString());
1230 break;
1231 }
1232
1233 }
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249 @Override
1250 public void nodeCreated(String path) {
1251 handleAssignmentEvent(path);
1252 }
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266 @Override
1267 public void nodeDataChanged(String path) {
1268 handleAssignmentEvent(path);
1269 }
1270
1271
1272
1273
1274
1275 private final Set<String> regionsInProgress = new HashSet<String>();
1276
1277
1278 private final LinkedHashMultimap <String, RegionRunnable>
1279 zkEventWorkerWaitingList = LinkedHashMultimap.create();
1280
1281
1282
1283
1284 private interface RegionRunnable extends Runnable{
1285
1286
1287
1288 String getRegionName();
1289 }
1290
1291
1292
1293
1294
1295 protected void zkEventWorkersSubmit(final RegionRunnable regRunnable) {
1296
1297 synchronized (regionsInProgress) {
1298
1299
1300 if (regionsInProgress.contains(regRunnable.getRegionName())) {
1301 synchronized (zkEventWorkerWaitingList){
1302 zkEventWorkerWaitingList.put(regRunnable.getRegionName(), regRunnable);
1303 }
1304 return;
1305 }
1306
1307
1308 regionsInProgress.add(regRunnable.getRegionName());
1309 zkEventWorkers.submit(new Runnable() {
1310 @Override
1311 public void run() {
1312 try {
1313 regRunnable.run();
1314 } finally {
1315
1316
1317 synchronized (regionsInProgress) {
1318 regionsInProgress.remove(regRunnable.getRegionName());
1319 synchronized (zkEventWorkerWaitingList) {
1320 java.util.Set<RegionRunnable> waiting = zkEventWorkerWaitingList.get(
1321 regRunnable.getRegionName());
1322 if (!waiting.isEmpty()) {
1323
1324 RegionRunnable toSubmit = waiting.iterator().next();
1325 zkEventWorkerWaitingList.remove(toSubmit.getRegionName(), toSubmit);
1326 zkEventWorkersSubmit(toSubmit);
1327 }
1328 }
1329 }
1330 }
1331 }
1332 });
1333 }
1334 }
1335
1336 @Override
1337 public void nodeDeleted(final String path) {
1338 if (path.startsWith(watcher.assignmentZNode)) {
1339 final String regionName = ZKAssign.getRegionName(watcher, path);
1340 zkEventWorkersSubmit(new RegionRunnable() {
1341 @Override
1342 public String getRegionName() {
1343 return regionName;
1344 }
1345
1346 @Override
1347 public void run() {
1348 Lock lock = locker.acquireLock(regionName);
1349 try {
1350 RegionState rs = regionStates.getRegionTransitionState(regionName);
1351 if (rs == null) {
1352 rs = regionStates.getRegionState(regionName);
1353 if (rs == null || !rs.isMergingNew()) {
1354
1355 return;
1356 }
1357 }
1358
1359 HRegionInfo regionInfo = rs.getRegion();
1360 String regionNameStr = regionInfo.getRegionNameAsString();
1361 LOG.debug("Znode " + regionNameStr + " deleted, state: " + rs);
1362
1363 boolean disabled = getTableStateManager().isTableState(regionInfo.getTable(),
1364 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING);
1365
1366 ServerName serverName = rs.getServerName();
1367 if (serverManager.isServerOnline(serverName)) {
1368 if (rs.isOnServer(serverName) && (rs.isOpened() || rs.isSplitting())) {
1369 synchronized (regionStates) {
1370 regionOnline(regionInfo, serverName);
1371 if (rs.isSplitting() && splitRegions.containsKey(regionInfo)) {
1372
1373
1374 HRegionInfo hri_a = splitRegions.get(regionInfo).getFirst();
1375 HRegionInfo hri_b = splitRegions.get(regionInfo).getSecond();
1376 if (!regionStates.isRegionInTransition(hri_a.getEncodedName())) {
1377 LOG.warn("Split daughter region not in transition " + hri_a);
1378 }
1379 if (!regionStates.isRegionInTransition(hri_b.getEncodedName())) {
1380 LOG.warn("Split daughter region not in transition" + hri_b);
1381 }
1382 regionOffline(hri_a);
1383 regionOffline(hri_b);
1384 splitRegions.remove(regionInfo);
1385 }
1386 if (disabled) {
1387
1388 LOG.info("Opened " + regionNameStr
1389 + "but this table is disabled, triggering close of region");
1390 unassign(regionInfo);
1391 }
1392 }
1393 } else if (rs.isMergingNew()) {
1394 synchronized (regionStates) {
1395 String p = regionInfo.getEncodedName();
1396 PairOfSameType<HRegionInfo> regions = mergingRegions.get(p);
1397 if (regions != null) {
1398 onlineMergingRegion(disabled, regions.getFirst(), serverName);
1399 onlineMergingRegion(disabled, regions.getSecond(), serverName);
1400 }
1401 }
1402 }
1403 }
1404 } finally {
1405 lock.unlock();
1406 }
1407 }
1408
1409 private void onlineMergingRegion(boolean disabled,
1410 final HRegionInfo hri, final ServerName serverName) {
1411 RegionState regionState = regionStates.getRegionState(hri);
1412 if (regionState != null && regionState.isMerging()
1413 && regionState.isOnServer(serverName)) {
1414 regionOnline(regionState.getRegion(), serverName);
1415 if (disabled) {
1416 unassign(hri);
1417 }
1418 }
1419 }
1420 });
1421 }
1422 }
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436 @Override
1437 public void nodeChildrenChanged(String path) {
1438 if (path.equals(watcher.assignmentZNode)) {
1439 zkEventWorkers.submit(new Runnable() {
1440 @Override
1441 public void run() {
1442 try {
1443
1444 List<String> children =
1445 ZKUtil.listChildrenAndWatchForNewChildren(
1446 watcher, watcher.assignmentZNode);
1447 if (children != null) {
1448 Stat stat = new Stat();
1449 for (String child : children) {
1450
1451
1452
1453 if (!regionStates.isRegionInTransition(child)) {
1454 ZKAssign.getDataAndWatch(watcher, child, stat);
1455 }
1456 }
1457 }
1458 } catch (KeeperException e) {
1459 server.abort("Unexpected ZK exception reading unassigned children", e);
1460 }
1461 }
1462 });
1463 }
1464 }
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475 void regionOnline(HRegionInfo regionInfo, ServerName sn) {
1476 regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
1477 }
1478
1479 void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
1480 numRegionsOpened.incrementAndGet();
1481 regionStates.regionOnline(regionInfo, sn, openSeqNum);
1482
1483
1484 clearRegionPlan(regionInfo);
1485 balancer.regionOnline(regionInfo, sn);
1486
1487
1488 sendRegionOpenedNotification(regionInfo, sn);
1489 }
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499 private void handleAssignmentEvent(final String path) {
1500 if (path.startsWith(watcher.assignmentZNode)) {
1501 final String regionName = ZKAssign.getRegionName(watcher, path);
1502
1503 zkEventWorkersSubmit(new RegionRunnable() {
1504 @Override
1505 public String getRegionName() {
1506 return regionName;
1507 }
1508
1509 @Override
1510 public void run() {
1511 try {
1512 Stat stat = new Stat();
1513 byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat);
1514 if (data == null) return;
1515
1516 RegionTransition rt = RegionTransition.parseFrom(data);
1517
1518
1519
1520 BaseCoordinatedStateManager csm =
1521 (BaseCoordinatedStateManager) server.getCoordinatedStateManager();
1522 OpenRegionCoordination openRegionCoordination = csm.getOpenRegionCoordination();
1523
1524 ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
1525 new ZkOpenRegionCoordination.ZkOpenRegionDetails();
1526 zkOrd.setVersion(stat.getVersion());
1527 zkOrd.setServerName(csm.getServer().getServerName());
1528
1529 handleRegion(rt, openRegionCoordination, zkOrd);
1530 } catch (KeeperException e) {
1531 server.abort("Unexpected ZK exception reading unassigned node data", e);
1532 } catch (DeserializationException e) {
1533 server.abort("Unexpected exception deserializing node data", e);
1534 }
1535 }
1536 });
1537 }
1538 }
1539
1540
1541
1542
1543
1544
1545
1546
1547 public void regionOffline(final HRegionInfo regionInfo) {
1548 regionOffline(regionInfo, null);
1549 }
1550
1551 public void offlineDisabledRegion(HRegionInfo regionInfo) {
1552 if (useZKForAssignment) {
1553
1554 LOG.debug("Table being disabled so deleting ZK node and removing from " +
1555 "regions in transition, skipping assignment of region " +
1556 regionInfo.getRegionNameAsString());
1557 String encodedName = regionInfo.getEncodedName();
1558 deleteNodeInStates(encodedName, "closed", null,
1559 EventType.RS_ZK_REGION_CLOSED, EventType.M_ZK_REGION_OFFLINE);
1560 }
1561 replicasToClose.remove(regionInfo);
1562 regionOffline(regionInfo);
1563 }
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585 public void assign(HRegionInfo region, boolean setOfflineInZK) {
1586 assign(region, setOfflineInZK, false);
1587 }
1588
1589
1590
1591
1592 @VisibleForTesting
1593 public void assign(HRegionInfo region,
1594 boolean setOfflineInZK, boolean forceNewPlan) {
1595 if (isDisabledorDisablingRegionInRIT(region)) {
1596 return;
1597 }
1598 String encodedName = region.getEncodedName();
1599 Lock lock = locker.acquireLock(encodedName);
1600 try {
1601 RegionState state = forceRegionStateToOffline(region, forceNewPlan);
1602 if (state != null) {
1603 if (regionStates.wasRegionOnDeadServer(encodedName)) {
1604 LOG.info("Skip assigning " + region.getRegionNameAsString()
1605 + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1606 + " is dead but not processed yet");
1607 return;
1608 }
1609 assign(state, setOfflineInZK && useZKForAssignment, forceNewPlan);
1610 }
1611 } finally {
1612 lock.unlock();
1613 }
1614 }
1615
1616
1617
1618
1619
1620
1621
1622 boolean assign(final ServerName destination, final List<HRegionInfo> regions)
1623 throws InterruptedException {
1624 long startTime = EnvironmentEdgeManager.currentTime();
1625 try {
1626 int regionCount = regions.size();
1627 if (regionCount == 0) {
1628 return true;
1629 }
1630 LOG.info("Assigning " + regionCount + " region(s) to " + destination.toString());
1631 Set<String> encodedNames = new HashSet<String>(regionCount);
1632 for (HRegionInfo region : regions) {
1633 encodedNames.add(region.getEncodedName());
1634 }
1635
1636 List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
1637 Map<String, Lock> locks = locker.acquireLocks(encodedNames);
1638 try {
1639 AtomicInteger counter = new AtomicInteger(0);
1640 Map<String, Integer> offlineNodesVersions = new ConcurrentHashMap<String, Integer>();
1641 OfflineCallback cb = new OfflineCallback(
1642 watcher, destination, counter, offlineNodesVersions);
1643 Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size());
1644 List<RegionState> states = new ArrayList<RegionState>(regions.size());
1645 for (HRegionInfo region : regions) {
1646 String encodedName = region.getEncodedName();
1647 if (!isDisabledorDisablingRegionInRIT(region)) {
1648 RegionState state = forceRegionStateToOffline(region, false);
1649 boolean onDeadServer = false;
1650 if (state != null) {
1651 if (regionStates.wasRegionOnDeadServer(encodedName)) {
1652 LOG.info("Skip assigning " + region.getRegionNameAsString()
1653 + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1654 + " is dead but not processed yet");
1655 onDeadServer = true;
1656 } else if (!useZKForAssignment
1657 || asyncSetOfflineInZooKeeper(state, cb, destination)) {
1658 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
1659 plans.put(encodedName, plan);
1660 states.add(state);
1661 continue;
1662 }
1663 }
1664
1665 if (!onDeadServer) {
1666 LOG.info("failed to force region state to offline or "
1667 + "failed to set it offline in ZK, will reassign later: " + region);
1668 failedToOpenRegions.add(region);
1669 }
1670 }
1671
1672
1673 Lock lock = locks.remove(encodedName);
1674 lock.unlock();
1675 }
1676
1677 if (useZKForAssignment) {
1678
1679 int total = states.size();
1680 for (int oldCounter = 0; !server.isStopped();) {
1681 int count = counter.get();
1682 if (oldCounter != count) {
1683 LOG.debug(destination.toString() + " unassigned znodes=" + count +
1684 " of total=" + total + "; oldCounter=" + oldCounter);
1685 oldCounter = count;
1686 }
1687 if (count >= total) break;
1688 Thread.sleep(5);
1689 }
1690 }
1691
1692 if (server.isStopped()) {
1693 return false;
1694 }
1695
1696
1697
1698 this.addPlans(plans);
1699
1700 List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos =
1701 new ArrayList<Triple<HRegionInfo, Integer, List<ServerName>>>(states.size());
1702 for (RegionState state: states) {
1703 HRegionInfo region = state.getRegion();
1704 String encodedRegionName = region.getEncodedName();
1705 Integer nodeVersion = offlineNodesVersions.get(encodedRegionName);
1706 if (useZKForAssignment && (nodeVersion == null || nodeVersion == -1)) {
1707 LOG.warn("failed to offline in zookeeper: " + region);
1708 failedToOpenRegions.add(region);
1709 Lock lock = locks.remove(encodedRegionName);
1710 lock.unlock();
1711 } else {
1712 regionStates.updateRegionState(
1713 region, State.PENDING_OPEN, destination);
1714 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1715 if (this.shouldAssignRegionsWithFavoredNodes) {
1716 favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1717 }
1718 regionOpenInfos.add(new Triple<HRegionInfo, Integer, List<ServerName>>(
1719 region, nodeVersion, favoredNodes));
1720 }
1721 }
1722
1723
1724 try {
1725
1726
1727 long maxWaitTime = System.currentTimeMillis() +
1728 this.server.getConfiguration().
1729 getLong("hbase.regionserver.rpc.startup.waittime", 60000);
1730 for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
1731 try {
1732
1733 if (regionOpenInfos.isEmpty()) {
1734 break;
1735 }
1736 List<RegionOpeningState> regionOpeningStateList = serverManager
1737 .sendRegionOpen(destination, regionOpenInfos);
1738 if (regionOpeningStateList == null) {
1739
1740 return false;
1741 }
1742 for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
1743 RegionOpeningState openingState = regionOpeningStateList.get(k);
1744 if (openingState != RegionOpeningState.OPENED) {
1745 HRegionInfo region = regionOpenInfos.get(k).getFirst();
1746 if (openingState == RegionOpeningState.ALREADY_OPENED) {
1747 processAlreadyOpenedRegion(region, destination);
1748 } else if (openingState == RegionOpeningState.FAILED_OPENING) {
1749
1750 failedToOpenRegions.add(region);
1751 } else {
1752 LOG.warn("THIS SHOULD NOT HAPPEN: unknown opening state "
1753 + openingState + " in assigning region " + region);
1754 }
1755 }
1756 }
1757 break;
1758 } catch (IOException e) {
1759 if (e instanceof RemoteException) {
1760 e = ((RemoteException)e).unwrapRemoteException();
1761 }
1762 if (e instanceof RegionServerStoppedException) {
1763 LOG.warn("The region server was shut down, ", e);
1764
1765 return false;
1766 } else if (e instanceof ServerNotRunningYetException) {
1767 long now = System.currentTimeMillis();
1768 if (now < maxWaitTime) {
1769 LOG.debug("Server is not yet up; waiting up to " +
1770 (maxWaitTime - now) + "ms", e);
1771 Thread.sleep(100);
1772 i--;
1773 continue;
1774 }
1775 } else if (e instanceof java.net.SocketTimeoutException
1776 && this.serverManager.isServerOnline(destination)) {
1777
1778
1779
1780
1781 if (LOG.isDebugEnabled()) {
1782 LOG.debug("Bulk assigner openRegion() to " + destination
1783 + " has timed out, but the regions might"
1784 + " already be opened on it.", e);
1785 }
1786
1787 Thread.sleep(100);
1788 i--;
1789 continue;
1790 }
1791 throw e;
1792 }
1793 }
1794 } catch (IOException e) {
1795
1796 LOG.info("Unable to communicate with " + destination
1797 + " in order to assign regions, ", e);
1798 return false;
1799 }
1800 } finally {
1801 for (Lock lock : locks.values()) {
1802 lock.unlock();
1803 }
1804 }
1805
1806 if (!failedToOpenRegions.isEmpty()) {
1807 for (HRegionInfo region : failedToOpenRegions) {
1808 if (!regionStates.isRegionOnline(region)) {
1809 invokeAssign(region);
1810 }
1811 }
1812 }
1813
1814
1815 ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions.size());
1816 for (HRegionInfo region: regions) {
1817 if (!region.getTable().isSystemTable()) {
1818 userRegionSet.add(region);
1819 }
1820 }
1821 if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
1822 System.currentTimeMillis())) {
1823 LOG.debug("some user regions are still in transition: " + userRegionSet);
1824 }
1825 LOG.debug("Bulk assigning done for " + destination);
1826 return true;
1827 } finally {
1828 metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTime() - startTime);
1829 }
1830 }
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842 private void unassign(final HRegionInfo region,
1843 final RegionState state, final int versionOfClosingNode,
1844 final ServerName dest, final boolean transitionInZK,
1845 final ServerName src) {
1846 ServerName server = src;
1847 if (state != null) {
1848 server = state.getServerName();
1849 }
1850 long maxWaitTime = -1;
1851 for (int i = 1; i <= this.maximumAttempts; i++) {
1852 if (this.server.isStopped() || this.server.isAborted()) {
1853 LOG.debug("Server stopped/aborted; skipping unassign of " + region);
1854 return;
1855 }
1856
1857 if (!serverManager.isServerOnline(server)) {
1858 LOG.debug("Offline " + region.getRegionNameAsString()
1859 + ", no need to unassign since it's on a dead server: " + server);
1860 if (transitionInZK) {
1861
1862 deleteClosingOrClosedNode(region, server);
1863 }
1864 if (state != null) {
1865 regionOffline(region);
1866 }
1867 return;
1868 }
1869 try {
1870
1871 if (serverManager.sendRegionClose(server, region,
1872 versionOfClosingNode, dest, transitionInZK)) {
1873 LOG.debug("Sent CLOSE to " + server + " for region " +
1874 region.getRegionNameAsString());
1875 if (useZKForAssignment && !transitionInZK && state != null) {
1876
1877
1878 unassign(region, state, versionOfClosingNode,
1879 dest, transitionInZK, src);
1880 }
1881 return;
1882 }
1883
1884
1885 LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
1886 region.getRegionNameAsString());
1887 } catch (Throwable t) {
1888 long sleepTime = 0;
1889 Configuration conf = this.server.getConfiguration();
1890 if (t instanceof RemoteException) {
1891 t = ((RemoteException)t).unwrapRemoteException();
1892 }
1893 boolean logRetries = true;
1894 if (t instanceof RegionServerAbortedException
1895 || t instanceof RegionServerStoppedException
1896 || t instanceof ServerNotRunningYetException) {
1897
1898
1899 sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1900 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1901
1902 } else if (t instanceof NotServingRegionException) {
1903 LOG.debug("Offline " + region.getRegionNameAsString()
1904 + ", it's not any more on " + server, t);
1905 if (transitionInZK) {
1906 deleteClosingOrClosedNode(region, server);
1907 }
1908 if (state != null) {
1909 regionOffline(region);
1910 }
1911 return;
1912 } else if ((t instanceof FailedServerException) || (state != null &&
1913 t instanceof RegionAlreadyInTransitionException)) {
1914 if(t instanceof FailedServerException) {
1915 sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1916 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1917 } else {
1918
1919 LOG.debug("update " + state + " the timestamp.");
1920 state.updateTimestampToNow();
1921 if (maxWaitTime < 0) {
1922 maxWaitTime =
1923 EnvironmentEdgeManager.currentTime()
1924 + conf.getLong(ALREADY_IN_TRANSITION_WAITTIME,
1925 DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
1926 }
1927 long now = EnvironmentEdgeManager.currentTime();
1928 if (now < maxWaitTime) {
1929 LOG.debug("Region is already in transition; "
1930 + "waiting up to " + (maxWaitTime - now) + "ms", t);
1931 sleepTime = 100;
1932 i--;
1933 logRetries = false;
1934 }
1935 }
1936 }
1937
1938 try {
1939 if (sleepTime > 0) {
1940 Thread.sleep(sleepTime);
1941 }
1942 } catch (InterruptedException ie) {
1943 LOG.warn("Failed to unassign "
1944 + region.getRegionNameAsString() + " since interrupted", ie);
1945 Thread.currentThread().interrupt();
1946 if (state != null) {
1947 regionStates.updateRegionState(region, State.FAILED_CLOSE);
1948 }
1949 return;
1950 }
1951
1952 if (logRetries) {
1953 LOG.info("Server " + server + " returned " + t + " for "
1954 + region.getRegionNameAsString() + ", try=" + i
1955 + " of " + this.maximumAttempts, t);
1956
1957 }
1958 }
1959 }
1960
1961 if (state != null) {
1962 regionStates.updateRegionState(region, State.FAILED_CLOSE);
1963 }
1964 }
1965
1966
1967
1968
1969 private RegionState forceRegionStateToOffline(
1970 final HRegionInfo region, final boolean forceNewPlan) {
1971 RegionState state = regionStates.getRegionState(region);
1972 if (state == null) {
1973 LOG.warn("Assigning but not in region states: " + region);
1974 state = regionStates.createRegionState(region);
1975 }
1976
1977 ServerName sn = state.getServerName();
1978 if (forceNewPlan && LOG.isDebugEnabled()) {
1979 LOG.debug("Force region state offline " + state);
1980 }
1981
1982 switch (state.getState()) {
1983 case OPEN:
1984 case OPENING:
1985 case PENDING_OPEN:
1986 case CLOSING:
1987 case PENDING_CLOSE:
1988 if (!forceNewPlan) {
1989 LOG.debug("Skip assigning " +
1990 region + ", it is already " + state);
1991 return null;
1992 }
1993 case FAILED_CLOSE:
1994 case FAILED_OPEN:
1995 unassign(region, state, -1, null, false, null);
1996 state = regionStates.getRegionState(region);
1997 if (state.isFailedClose()) {
1998
1999
2000 LOG.info("Skip assigning " +
2001 region + ", we couldn't close it: " + state);
2002 return null;
2003 }
2004 case OFFLINE:
2005
2006
2007
2008
2009
2010
2011
2012
2013 if (useZKForAssignment
2014 && regionStates.isServerDeadAndNotProcessed(sn)
2015 && wasRegionOnDeadServerByMeta(region, sn)) {
2016 if (!regionStates.isRegionInTransition(region)) {
2017 LOG.info("Updating the state to " + State.OFFLINE + " to allow to be reassigned by SSH");
2018 regionStates.updateRegionState(region, State.OFFLINE);
2019 }
2020 LOG.info("Skip assigning " + region.getRegionNameAsString()
2021 + ", it is on a dead but not processed yet server: " + sn);
2022 return null;
2023 }
2024 case CLOSED:
2025 break;
2026 default:
2027 LOG.error("Trying to assign region " + region
2028 + ", which is " + state);
2029 return null;
2030 }
2031 return state;
2032 }
2033
2034 @SuppressWarnings("deprecation")
2035 protected boolean wasRegionOnDeadServerByMeta(
2036 final HRegionInfo region, final ServerName sn) {
2037 try {
2038 if (region.isMetaRegion()) {
2039 ServerName server = this.server.getMetaTableLocator().
2040 getMetaRegionLocation(this.server.getZooKeeper());
2041 return regionStates.isServerDeadAndNotProcessed(server);
2042 }
2043 while (!server.isStopped()) {
2044 try {
2045 this.server.getMetaTableLocator().waitMetaRegionLocation(server.getZooKeeper());
2046 Result r = MetaTableAccessor.getRegionResult(server.getConnection(),
2047 region.getRegionName());
2048 if (r == null || r.isEmpty()) return false;
2049 ServerName server = HRegionInfo.getServerName(r);
2050 return regionStates.isServerDeadAndNotProcessed(server);
2051 } catch (IOException ioe) {
2052 LOG.info("Received exception accessing hbase:meta during force assign "
2053 + region.getRegionNameAsString() + ", retrying", ioe);
2054 }
2055 }
2056 } catch (InterruptedException e) {
2057 Thread.currentThread().interrupt();
2058 LOG.info("Interrupted accessing hbase:meta", e);
2059 }
2060
2061 return regionStates.isServerDeadAndNotProcessed(sn);
2062 }
2063
2064
2065
2066
2067
2068
2069
2070 public void assign(RegionState state,
2071 boolean setOfflineInZK, final boolean forceNewPlan) {
2072 long startTime = EnvironmentEdgeManager.currentTime();
2073 try {
2074 Configuration conf = server.getConfiguration();
2075 RegionState currentState = state;
2076 int versionOfOfflineNode = -1;
2077 RegionPlan plan = null;
2078 long maxWaitTime = -1;
2079 HRegionInfo region = state.getRegion();
2080 RegionOpeningState regionOpenState;
2081 Throwable previousException = null;
2082 for (int i = 1; i <= maximumAttempts; i++) {
2083 if (server.isStopped() || server.isAborted()) {
2084 LOG.info("Skip assigning " + region.getRegionNameAsString()
2085 + ", the server is stopped/aborted");
2086 return;
2087 }
2088
2089 if (plan == null) {
2090 try {
2091 plan = getRegionPlan(region, forceNewPlan);
2092 } catch (HBaseIOException e) {
2093 LOG.warn("Failed to get region plan", e);
2094 }
2095 }
2096
2097 if (plan == null) {
2098 LOG.warn("Unable to determine a plan to assign " + region);
2099
2100
2101 if (region.isMetaRegion()) {
2102 if (i == maximumAttempts) {
2103 i = 0;
2104
2105 LOG.warn("Unable to determine a plan to assign a hbase:meta region " + region +
2106 " after maximumAttempts (" + this.maximumAttempts +
2107 "). Reset attempts count and continue retrying.");
2108 }
2109 waitForRetryingMetaAssignment();
2110 continue;
2111 }
2112
2113 regionStates.updateRegionState(region, State.FAILED_OPEN);
2114 return;
2115 }
2116 if (setOfflineInZK && versionOfOfflineNode == -1) {
2117 LOG.info("Setting node as OFFLINED in ZooKeeper for region " + region);
2118
2119
2120 versionOfOfflineNode = setOfflineInZooKeeper(currentState, plan.getDestination());
2121 if (versionOfOfflineNode != -1) {
2122 if (isDisabledorDisablingRegionInRIT(region)) {
2123 return;
2124 }
2125
2126
2127
2128
2129
2130
2131 TableName tableName = region.getTable();
2132 if (!tableStateManager.isTableState(tableName,
2133 ZooKeeperProtos.Table.State.ENABLED, ZooKeeperProtos.Table.State.ENABLING)) {
2134 LOG.debug("Setting table " + tableName + " to ENABLED state.");
2135 setEnabledTable(tableName);
2136 }
2137 }
2138 }
2139 if (setOfflineInZK && versionOfOfflineNode == -1) {
2140 LOG.info("Unable to set offline in ZooKeeper to assign " + region);
2141
2142
2143
2144
2145 if (!server.isAborted()) {
2146 continue;
2147 }
2148 }
2149 LOG.info("Assigning " + region.getRegionNameAsString() +
2150 " to " + plan.getDestination());
2151
2152 currentState = regionStates.updateRegionState(region,
2153 State.PENDING_OPEN, plan.getDestination());
2154
2155 boolean needNewPlan;
2156 final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
2157 " to " + plan.getDestination();
2158 try {
2159 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
2160 if (this.shouldAssignRegionsWithFavoredNodes) {
2161 favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
2162 }
2163 regionOpenState = serverManager.sendRegionOpen(
2164 plan.getDestination(), region, versionOfOfflineNode, favoredNodes);
2165
2166 if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
2167
2168 needNewPlan = true;
2169 LOG.warn(assignMsg + ", regionserver says 'FAILED_OPENING', " +
2170 " trying to assign elsewhere instead; " +
2171 "try=" + i + " of " + this.maximumAttempts);
2172 } else {
2173
2174 if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
2175 processAlreadyOpenedRegion(region, plan.getDestination());
2176 }
2177 return;
2178 }
2179
2180 } catch (Throwable t) {
2181 if (t instanceof RemoteException) {
2182 t = ((RemoteException) t).unwrapRemoteException();
2183 }
2184 previousException = t;
2185
2186
2187
2188
2189 boolean hold = (t instanceof RegionAlreadyInTransitionException ||
2190 t instanceof ServerNotRunningYetException);
2191
2192
2193
2194
2195
2196
2197 boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
2198 && this.serverManager.isServerOnline(plan.getDestination()));
2199
2200
2201 if (hold) {
2202 LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
2203 "try=" + i + " of " + this.maximumAttempts, t);
2204
2205 if (maxWaitTime < 0) {
2206 if (t instanceof RegionAlreadyInTransitionException) {
2207 maxWaitTime = EnvironmentEdgeManager.currentTime()
2208 + this.server.getConfiguration().getLong(ALREADY_IN_TRANSITION_WAITTIME,
2209 DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
2210 } else {
2211 maxWaitTime = EnvironmentEdgeManager.currentTime()
2212 + this.server.getConfiguration().getLong(
2213 "hbase.regionserver.rpc.startup.waittime", 60000);
2214 }
2215 }
2216 try {
2217 needNewPlan = false;
2218 long now = EnvironmentEdgeManager.currentTime();
2219 if (now < maxWaitTime) {
2220 LOG.debug("Server is not yet up or region is already in transition; "
2221 + "waiting up to " + (maxWaitTime - now) + "ms", t);
2222 Thread.sleep(100);
2223 i--;
2224 } else if (!(t instanceof RegionAlreadyInTransitionException)) {
2225 LOG.debug("Server is not up for a while; try a new one", t);
2226 needNewPlan = true;
2227 }
2228 } catch (InterruptedException ie) {
2229 LOG.warn("Failed to assign "
2230 + region.getRegionNameAsString() + " since interrupted", ie);
2231 regionStates.updateRegionState(region, State.FAILED_OPEN);
2232 Thread.currentThread().interrupt();
2233 return;
2234 }
2235 } else if (retry) {
2236 needNewPlan = false;
2237 i--;
2238 LOG.warn(assignMsg + ", trying to assign to the same region server due ", t);
2239 } else {
2240 needNewPlan = true;
2241 LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
2242 " try=" + i + " of " + this.maximumAttempts, t);
2243 }
2244 }
2245
2246 if (i == this.maximumAttempts) {
2247
2248 if (region.isMetaRegion()) {
2249 i = 0;
2250 LOG.warn(assignMsg +
2251 ", trying to assign a hbase:meta region reached to maximumAttempts (" +
2252 this.maximumAttempts + "). Reset attempt counts and continue retrying.");
2253 waitForRetryingMetaAssignment();
2254 }
2255 else {
2256
2257
2258 continue;
2259 }
2260 }
2261
2262
2263
2264
2265 if (needNewPlan) {
2266
2267
2268
2269
2270 RegionPlan newPlan = null;
2271 try {
2272 newPlan = getRegionPlan(region, true);
2273 } catch (HBaseIOException e) {
2274 LOG.warn("Failed to get region plan", e);
2275 }
2276 if (newPlan == null) {
2277 regionStates.updateRegionState(region, State.FAILED_OPEN);
2278 LOG.warn("Unable to find a viable location to assign region " +
2279 region.getRegionNameAsString());
2280 return;
2281 }
2282
2283 if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
2284
2285
2286
2287 LOG.info("Region assignment plan changed from " + plan.getDestination() + " to "
2288 + newPlan.getDestination() + " server.");
2289 currentState = regionStates.updateRegionState(region, State.OFFLINE);
2290 versionOfOfflineNode = -1;
2291 if (useZKForAssignment) {
2292 setOfflineInZK = true;
2293 }
2294 plan = newPlan;
2295 } else if(plan.getDestination().equals(newPlan.getDestination()) &&
2296 previousException instanceof FailedServerException) {
2297 try {
2298 LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
2299 " to the same failed server.");
2300 Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
2301 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
2302 } catch (InterruptedException ie) {
2303 LOG.warn("Failed to assign "
2304 + region.getRegionNameAsString() + " since interrupted", ie);
2305 regionStates.updateRegionState(region, State.FAILED_OPEN);
2306 Thread.currentThread().interrupt();
2307 return;
2308 }
2309 }
2310 }
2311 }
2312
2313 regionStates.updateRegionState(region, State.FAILED_OPEN);
2314 } finally {
2315 metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTime() - startTime);
2316 }
2317 }
2318
2319 private void processAlreadyOpenedRegion(HRegionInfo region, ServerName sn) {
2320
2321
2322
2323 LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString()
2324 + " to " + sn);
2325 String encodedName = region.getEncodedName();
2326
2327
2328
2329 if (useZKForAssignment) {
2330 String node = ZKAssign.getNodeName(watcher, encodedName);
2331 Stat stat = new Stat();
2332 try {
2333 byte[] existingBytes = ZKUtil.getDataNoWatch(watcher, node, stat);
2334 if (existingBytes != null) {
2335 RegionTransition rt= RegionTransition.parseFrom(existingBytes);
2336 EventType et = rt.getEventType();
2337 if (et.equals(EventType.RS_ZK_REGION_OPENED)) {
2338 LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString()
2339 + " and node in " + et + " state");
2340 return;
2341 }
2342 }
2343 } catch (KeeperException ke) {
2344 LOG.warn("Unexpected ZK exception getData " + node
2345 + " node for the region " + encodedName, ke);
2346 } catch (DeserializationException e) {
2347 LOG.warn("Get RegionTransition from zk deserialization failed! ", e);
2348 }
2349 deleteNodeInStates(encodedName, "offline", sn, EventType.M_ZK_REGION_OFFLINE);
2350 }
2351
2352 regionStates.regionOnline(region, sn);
2353 }
2354
2355 private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
2356 if (this.tableStateManager.isTableState(region.getTable(),
2357 ZooKeeperProtos.Table.State.DISABLED,
2358 ZooKeeperProtos.Table.State.DISABLING) || replicasToClose.contains(region)) {
2359 LOG.info("Table " + region.getTable() + " is disabled or disabling;"
2360 + " skipping assign of " + region.getRegionNameAsString());
2361 offlineDisabledRegion(region);
2362 return true;
2363 }
2364 return false;
2365 }
2366
2367
2368
2369
2370
2371
2372
2373
2374 private int setOfflineInZooKeeper(final RegionState state, final ServerName destination) {
2375 if (!state.isClosed() && !state.isOffline()) {
2376 String msg = "Unexpected state : " + state + " .. Cannot transit it to OFFLINE.";
2377 this.server.abort(msg, new IllegalStateException(msg));
2378 return -1;
2379 }
2380 regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
2381 int versionOfOfflineNode;
2382 try {
2383
2384 versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(watcher,
2385 state.getRegion(), destination);
2386 if (versionOfOfflineNode == -1) {
2387 LOG.warn("Attempted to create/force node into OFFLINE state before "
2388 + "completing assignment but failed to do so for " + state);
2389 return -1;
2390 }
2391 } catch (KeeperException e) {
2392 server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
2393 return -1;
2394 }
2395 return versionOfOfflineNode;
2396 }
2397
2398
2399
2400
2401
2402
2403
2404 public List<ServerName> getExcludedServersForSystemTable() {
2405 List<Pair<ServerName, String>> serverList = new ArrayList<>();
2406 for (ServerName s : serverManager.getOnlineServersList()) {
2407 serverList.add(new Pair<>(s, server.getRegionServerVersion(s)));
2408 }
2409 if (serverList.isEmpty()) {
2410 return new ArrayList<>();
2411 }
2412 String highestVersion = Collections.max(serverList, new Comparator<Pair<ServerName, String>>() {
2413 @Override
2414 public int compare(Pair<ServerName, String> o1, Pair<ServerName, String> o2) {
2415 return VersionInfo.compareVersion(o1.getSecond(), o2.getSecond());
2416 }
2417 }).getSecond();
2418 List<ServerName> res = new ArrayList<>();
2419 for (Pair<ServerName, String> pair : serverList) {
2420 if (!pair.getSecond().equals(highestVersion)) {
2421 res.add(pair.getFirst());
2422 }
2423 }
2424 return res;
2425 }
2426
2427
2428
2429
2430
2431
2432
2433 private RegionPlan getRegionPlan(final HRegionInfo region,
2434 final boolean forceNewPlan) throws HBaseIOException {
2435 return getRegionPlan(region, null, forceNewPlan);
2436 }
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447 private RegionPlan getRegionPlan(final HRegionInfo region,
2448 final ServerName serverToExclude, final boolean forceNewPlan) {
2449
2450 final String encodedName = region.getEncodedName();
2451 List<ServerName> exclude = new ArrayList<>();
2452 if (region.isSystemTable()) {
2453 exclude.addAll(getExcludedServersForSystemTable());
2454 }
2455 if (serverToExclude !=null) {
2456 exclude.add(serverToExclude);
2457 }
2458 final List<ServerName> destServers =
2459 serverManager.createDestinationServersList(exclude);
2460
2461 if (destServers.isEmpty()){
2462 LOG.warn("Can't move " + encodedName +
2463 ", there is no destination server available.");
2464 return null;
2465 }
2466
2467 RegionPlan randomPlan = null;
2468 boolean newPlan = false;
2469 RegionPlan existingPlan;
2470
2471 synchronized (this.regionPlans) {
2472 existingPlan = this.regionPlans.get(encodedName);
2473
2474 if (existingPlan != null && existingPlan.getDestination() != null) {
2475 LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
2476 + " destination server is " + existingPlan.getDestination() +
2477 " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
2478 }
2479
2480 if (forceNewPlan
2481 || existingPlan == null
2482 || existingPlan.getDestination() == null
2483 || !destServers.contains(existingPlan.getDestination())) {
2484 newPlan = true;
2485 }
2486 }
2487
2488 if (newPlan) {
2489 ServerName destination = null;
2490 try {
2491 destination = balancer.randomAssignment(region, destServers);
2492 } catch (HBaseIOException e) {
2493 LOG.warn(e);
2494 }
2495 if (destination == null) {
2496 LOG.warn("Can't find a destination for " + encodedName);
2497 return null;
2498 }
2499 synchronized (this.regionPlans) {
2500 randomPlan = new RegionPlan(region, null, destination);
2501 if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
2502 List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
2503 regions.add(region);
2504 try {
2505 processFavoredNodes(regions);
2506 } catch (IOException ie) {
2507 LOG.warn("Ignoring exception in processFavoredNodes " + ie);
2508 }
2509 }
2510 this.regionPlans.put(encodedName, randomPlan);
2511 }
2512 LOG.debug("No previous transition plan found (or ignoring " + "an existing plan) for "
2513 + region.getRegionNameAsString() + "; generated random plan=" + randomPlan + "; "
2514 + destServers.size() + " (online=" + serverManager.getOnlineServers().size()
2515 + ") available servers, forceNewPlan=" + forceNewPlan);
2516 return randomPlan;
2517 }
2518 LOG.debug("Using pre-existing plan for " +
2519 region.getRegionNameAsString() + "; plan=" + existingPlan);
2520 return existingPlan;
2521 }
2522
2523
2524
2525
2526 private void waitForRetryingMetaAssignment() {
2527 try {
2528 Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
2529 } catch (InterruptedException e) {
2530 LOG.error("Got exception while waiting for hbase:meta assignment");
2531 Thread.currentThread().interrupt();
2532 }
2533 }
2534
2535
2536
2537
2538
2539
2540
2541 public void checkIfShouldMoveSystemRegionAsync() {
2542 new Thread(new Runnable() {
2543 @Override
2544 public void run() {
2545 try {
2546 synchronized (checkIfShouldMoveSystemRegionLock) {
2547
2548 List<HRegionInfo> regionsShouldMove = new ArrayList<>();
2549 for (ServerName server : getExcludedServersForSystemTable()) {
2550 regionsShouldMove.addAll(getCarryingSystemTables(server));
2551 }
2552 if (!regionsShouldMove.isEmpty()) {
2553 List<RegionPlan> plans = new ArrayList<>();
2554 for (HRegionInfo regionInfo : regionsShouldMove) {
2555 RegionPlan plan = getRegionPlan(regionInfo, true);
2556 if (regionInfo.isMetaRegion()) {
2557
2558 balance(plan);
2559 } else {
2560 plans.add(plan);
2561 }
2562 }
2563 for (RegionPlan plan : plans) {
2564 balance(plan);
2565 }
2566 }
2567 }
2568 } catch (Throwable t) {
2569 LOG.error(t);
2570 }
2571 }
2572 }).start();
2573 }
2574
2575
2576
2577
2578
2579
2580
2581
2582
2583
2584
2585
2586
2587
2588
2589 public void unassign(HRegionInfo region) {
2590 unassign(region, false);
2591 }
2592
2593
2594
2595
2596
2597
2598
2599
2600
2601
2602
2603
2604
2605
2606
2607
2608 public void unassign(HRegionInfo region, boolean force, ServerName dest) {
2609
2610 LOG.debug("Starting unassign of " + region.getRegionNameAsString()
2611 + " (offlining), current state: " + regionStates.getRegionState(region));
2612
2613 String encodedName = region.getEncodedName();
2614
2615 int versionOfClosingNode = -1;
2616
2617
2618 ReentrantLock lock = locker.acquireLock(encodedName);
2619 RegionState state = regionStates.getRegionTransitionState(encodedName);
2620 boolean reassign = true;
2621 try {
2622 if (state == null) {
2623
2624
2625 state = regionStates.getRegionState(encodedName);
2626 if (state != null && state.isUnassignable()) {
2627 LOG.info("Attempting to unassign " + state + ", ignored");
2628
2629 return;
2630 }
2631
2632 try {
2633 if (state == null || state.getServerName() == null) {
2634
2635
2636 LOG.warn("Attempting to unassign a region not in RegionStates "
2637 + region.getRegionNameAsString() + ", offlined");
2638 regionOffline(region);
2639 return;
2640 }
2641 if (useZKForAssignment) {
2642 versionOfClosingNode = ZKAssign.createNodeClosing(
2643 watcher, region, state.getServerName());
2644 if (versionOfClosingNode == -1) {
2645 LOG.info("Attempting to unassign " +
2646 region.getRegionNameAsString() + " but ZK closing node "
2647 + "can't be created.");
2648 reassign = false;
2649 return;
2650 }
2651 }
2652 } catch (KeeperException e) {
2653 if (e instanceof NodeExistsException) {
2654
2655
2656
2657
2658 NodeExistsException nee = (NodeExistsException)e;
2659 String path = nee.getPath();
2660 try {
2661 if (isSplitOrSplittingOrMergedOrMerging(path)) {
2662 LOG.debug(path + " is SPLIT or SPLITTING or MERGED or MERGING; " +
2663 "skipping unassign because region no longer exists -- its split or merge");
2664 reassign = false;
2665 return;
2666 }
2667 } catch (KeeperException.NoNodeException ke) {
2668 LOG.warn("Failed getData on SPLITTING/SPLIT at " + path +
2669 "; presuming split and that the region to unassign, " +
2670 encodedName + ", no longer exists -- confirm", ke);
2671 return;
2672 } catch (KeeperException ke) {
2673 LOG.error("Unexpected zk state", ke);
2674 } catch (DeserializationException de) {
2675 LOG.error("Failed parse", de);
2676 }
2677 }
2678
2679 server.abort("Unexpected ZK exception creating node CLOSING", e);
2680 reassign = false;
2681 return;
2682 }
2683 state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2684 } else if (state.isFailedOpen()) {
2685
2686 regionOffline(region);
2687 return;
2688 } else if (force && state.isPendingCloseOrClosing()) {
2689 LOG.debug("Attempting to unassign " + region.getRegionNameAsString() +
2690 " which is already " + state.getState() +
2691 " but forcing to send a CLOSE RPC again ");
2692 if (state.isFailedClose()) {
2693 state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2694 }
2695 state.updateTimestampToNow();
2696 } else {
2697 LOG.debug("Attempting to unassign " +
2698 region.getRegionNameAsString() + " but it is " +
2699 "already in transition (" + state.getState() + ", force=" + force + ")");
2700 return;
2701 }
2702
2703 unassign(region, state, versionOfClosingNode, dest, useZKForAssignment, null);
2704 } finally {
2705 lock.unlock();
2706
2707
2708 if (!replicasToClose.contains(region) && reassign && regionStates.isRegionOffline(region)) {
2709 assign(region, true);
2710 }
2711 }
2712 }
2713
2714 public void unassign(HRegionInfo region, boolean force){
2715 unassign(region, force, null);
2716 }
2717
2718
2719
2720
2721 public void deleteClosingOrClosedNode(HRegionInfo region, ServerName sn) {
2722 String encodedName = region.getEncodedName();
2723 deleteNodeInStates(encodedName, "closing", sn, EventType.M_ZK_REGION_CLOSING,
2724 EventType.RS_ZK_REGION_CLOSED);
2725 }
2726
2727
2728
2729
2730
2731
2732
2733 private boolean isSplitOrSplittingOrMergedOrMerging(final String path)
2734 throws KeeperException, DeserializationException {
2735 boolean result = false;
2736
2737
2738 byte [] data = ZKAssign.getData(watcher, path);
2739 if (data == null) {
2740 LOG.info("Node " + path + " is gone");
2741 return false;
2742 }
2743 RegionTransition rt = RegionTransition.parseFrom(data);
2744 switch (rt.getEventType()) {
2745 case RS_ZK_REQUEST_REGION_SPLIT:
2746 case RS_ZK_REGION_SPLIT:
2747 case RS_ZK_REGION_SPLITTING:
2748 case RS_ZK_REQUEST_REGION_MERGE:
2749 case RS_ZK_REGION_MERGED:
2750 case RS_ZK_REGION_MERGING:
2751 result = true;
2752 break;
2753 default:
2754 LOG.info("Node " + path + " is in " + rt.getEventType());
2755 break;
2756 }
2757 return result;
2758 }
2759
2760
2761
2762
2763
2764
2765 public int getNumRegionsOpened() {
2766 return numRegionsOpened.get();
2767 }
2768
2769
2770
2771
2772
2773
2774
2775
2776
2777
2778 public boolean waitForAssignment(HRegionInfo regionInfo)
2779 throws InterruptedException {
2780 ArrayList<HRegionInfo> regionSet = new ArrayList<HRegionInfo>(1);
2781 regionSet.add(regionInfo);
2782 return waitForAssignment(regionSet, true, Long.MAX_VALUE);
2783 }
2784
2785
2786
2787
2788 protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
2789 final boolean waitTillAllAssigned, final int reassigningRegions,
2790 final long minEndTime) throws InterruptedException {
2791 long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1);
2792 if (deadline < 0) {
2793 deadline = Long.MAX_VALUE;
2794 }
2795 return waitForAssignment(regionSet, waitTillAllAssigned, deadline);
2796 }
2797
2798
2799
2800
2801
2802
2803
2804
2805
2806 protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
2807 final boolean waitTillAllAssigned, final long deadline) throws InterruptedException {
2808
2809 while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) {
2810 int failedOpenCount = 0;
2811 Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
2812 while (regionInfoIterator.hasNext()) {
2813 HRegionInfo hri = regionInfoIterator.next();
2814 if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
2815 State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
2816 regionInfoIterator.remove();
2817 } else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) {
2818 failedOpenCount++;
2819 }
2820 }
2821 if (!waitTillAllAssigned) {
2822
2823 break;
2824 }
2825 if (!regionSet.isEmpty()) {
2826 if (failedOpenCount == regionSet.size()) {
2827
2828 break;
2829 }
2830 regionStates.waitForUpdate(100);
2831 }
2832 }
2833 return regionSet.isEmpty();
2834 }
2835
2836
2837
2838
2839
2840
2841
2842
2843
2844
2845
2846
2847 public void assignMeta(HRegionInfo hri) throws KeeperException {
2848 this.server.getMetaTableLocator().deleteMetaLocation(this.watcher, hri.getReplicaId());
2849 assign(hri, true);
2850 }
2851
2852
2853
2854
2855
2856
2857
2858
2859
2860 public void assign(Map<HRegionInfo, ServerName> regions)
2861 throws IOException, InterruptedException {
2862 if (regions == null || regions.isEmpty()) {
2863 return;
2864 }
2865 List<ServerName> servers = serverManager.createDestinationServersList();
2866 if (servers == null || servers.isEmpty()) {
2867 throw new IOException("Found no destination server to assign region(s)");
2868 }
2869
2870
2871 Map<ServerName, List<HRegionInfo>> bulkPlan =
2872 balancer.retainAssignment(regions, servers);
2873 if (bulkPlan == null) {
2874 throw new IOException("Unable to determine a plan to assign region(s)");
2875 }
2876
2877 processBogusAssignments(bulkPlan);
2878
2879 assign(regions.size(), servers.size(),
2880 "retainAssignment=true", bulkPlan);
2881 }
2882
2883
2884
2885
2886
2887
2888
2889
2890
2891 public void assign(List<HRegionInfo> regions)
2892 throws IOException, InterruptedException {
2893 if (regions == null || regions.isEmpty()) {
2894 return;
2895 }
2896
2897 List<ServerName> servers = serverManager.createDestinationServersList();
2898 if (servers == null || servers.isEmpty()) {
2899 throw new IOException("Found no destination server to assign region(s)");
2900 }
2901
2902
2903 Map<ServerName, List<HRegionInfo>> bulkPlan
2904 = balancer.roundRobinAssignment(regions, servers);
2905 if (bulkPlan == null) {
2906 throw new IOException("Unable to determine a plan to assign region(s)");
2907 }
2908
2909 processBogusAssignments(bulkPlan);
2910
2911 processFavoredNodes(regions);
2912 assign(regions.size(), servers.size(),
2913 "round-robin=true", bulkPlan);
2914 }
2915
2916 private void assign(int regions, int totalServers,
2917 String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
2918 throws InterruptedException, IOException {
2919
2920 int servers = bulkPlan.size();
2921 if (servers == 1 || (regions < bulkAssignThresholdRegions
2922 && servers < bulkAssignThresholdServers)) {
2923
2924
2925
2926 if (LOG.isTraceEnabled()) {
2927 LOG.trace("Not using bulk assignment since we are assigning only " + regions +
2928 " region(s) to " + servers + " server(s)");
2929 }
2930
2931
2932 ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions);
2933 for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
2934 if (!assign(plan.getKey(), plan.getValue())) {
2935 for (HRegionInfo region: plan.getValue()) {
2936 if (!regionStates.isRegionOnline(region)) {
2937 invokeAssign(region);
2938 if (!region.getTable().isSystemTable()) {
2939 userRegionSet.add(region);
2940 }
2941 }
2942 }
2943 }
2944 }
2945
2946
2947 if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
2948 System.currentTimeMillis())) {
2949 LOG.debug("some user regions are still in transition: " + userRegionSet);
2950 }
2951 } else {
2952 LOG.info("Bulk assigning " + regions + " region(s) across "
2953 + totalServers + " server(s), " + message);
2954
2955
2956 BulkAssigner ba = new GeneralBulkAssigner(
2957 this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
2958 ba.bulkAssign();
2959 LOG.info("Bulk assigning done");
2960 }
2961 }
2962
2963
2964
2965
2966
2967
2968
2969
2970
2971
2972 private void assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions)
2973 throws IOException, InterruptedException {
2974 if (allRegions == null || allRegions.isEmpty()) return;
2975
2976
2977 boolean retainAssignment = server.getConfiguration().
2978 getBoolean("hbase.master.startup.retainassign", true);
2979
2980 Set<HRegionInfo> regionsFromMetaScan = allRegions.keySet();
2981 if (retainAssignment) {
2982 assign(allRegions);
2983 } else {
2984 List<HRegionInfo> regions = new ArrayList<HRegionInfo>(regionsFromMetaScan);
2985 assign(regions);
2986 }
2987
2988 for (HRegionInfo hri : regionsFromMetaScan) {
2989 TableName tableName = hri.getTable();
2990 if (!tableStateManager.isTableState(tableName,
2991 ZooKeeperProtos.Table.State.ENABLED)) {
2992 setEnabledTable(tableName);
2993 }
2994 }
2995
2996 assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, server));
2997 }
2998
2999
3000
3001
3002
3003
3004
3005
3006
3007
3008
3009
3010 public static List<HRegionInfo> replicaRegionsNotRecordedInMeta(
3011 Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)throws IOException {
3012 List<HRegionInfo> regionsNotRecordedInMeta = new ArrayList<HRegionInfo>();
3013 for (HRegionInfo hri : regionsRecordedInMeta) {
3014 TableName table = hri.getTable();
3015 HTableDescriptor htd = master.getTableDescriptors().get(table);
3016
3017 int desiredRegionReplication = htd.getRegionReplication();
3018 for (int i = 0; i < desiredRegionReplication; i++) {
3019 HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i);
3020 if (regionsRecordedInMeta.contains(replica)) continue;
3021 regionsNotRecordedInMeta.add(replica);
3022 }
3023 }
3024 return regionsNotRecordedInMeta;
3025 }
3026
3027
3028
3029
3030
3031
3032
3033 boolean waitUntilNoRegionsInTransition(final long timeout)
3034 throws InterruptedException {
3035
3036
3037
3038
3039
3040
3041 final long endTime = System.currentTimeMillis() + timeout;
3042
3043 while (!this.server.isStopped() && regionStates.isRegionsInTransition()
3044 && endTime > System.currentTimeMillis()) {
3045 regionStates.waitForUpdate(100);
3046 }
3047
3048 return !regionStates.isRegionsInTransition();
3049 }
3050
3051
3052
3053
3054
3055
3056
3057
3058
3059 Set<ServerName> rebuildUserRegions() throws
3060 IOException, KeeperException, CoordinatedStateException {
3061 Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
3062 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.ENABLING);
3063
3064 Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
3065 ZooKeeperProtos.Table.State.DISABLED,
3066 ZooKeeperProtos.Table.State.DISABLING,
3067 ZooKeeperProtos.Table.State.ENABLING);
3068
3069
3070 List<Result> results = MetaTableAccessor.fullScanOfMeta(server.getConnection());
3071
3072 Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
3073
3074 Set<ServerName> offlineServers = new HashSet<ServerName>();
3075
3076 for (Result result : results) {
3077 if (result == null && LOG.isDebugEnabled()){
3078 LOG.debug("null result from meta - ignoring but this is strange.");
3079 continue;
3080 }
3081
3082
3083
3084 PairOfSameType<HRegionInfo> p = MetaTableAccessor.getMergeRegions(result);
3085 if (p.getFirst() != null && p.getSecond() != null) {
3086 int numReplicas = server.getTableDescriptors().get(p.getFirst().
3087 getTable()).getRegionReplication();
3088 for (HRegionInfo merge : p) {
3089 for (int i = 1; i < numReplicas; i++) {
3090 replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i));
3091 }
3092 }
3093 }
3094 RegionLocations rl = MetaTableAccessor.getRegionLocations(result);
3095 if (rl == null) continue;
3096 HRegionLocation[] locations = rl.getRegionLocations();
3097 if (locations == null) continue;
3098 for (HRegionLocation hrl : locations) {
3099 if (hrl == null) continue;
3100 HRegionInfo regionInfo = hrl.getRegionInfo();
3101 if (regionInfo == null) continue;
3102 int replicaId = regionInfo.getReplicaId();
3103 State state = RegionStateStore.getRegionState(result, replicaId);
3104
3105
3106
3107 if (replicaId == 0 && state.equals(State.SPLIT)) {
3108 for (HRegionLocation h : locations) {
3109 replicasToClose.add(h.getRegionInfo());
3110 }
3111 }
3112 ServerName lastHost = hrl.getServerName();
3113 ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId);
3114 if (tableStateManager.isTableState(regionInfo.getTable(),
3115 ZooKeeperProtos.Table.State.DISABLED)) {
3116
3117
3118 lastHost = null;
3119 regionLocation = null;
3120 }
3121 regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
3122 if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
3123
3124 continue;
3125 }
3126 TableName tableName = regionInfo.getTable();
3127 if (!onlineServers.contains(regionLocation)) {
3128
3129 offlineServers.add(regionLocation);
3130 if (useZKForAssignment) {
3131 regionStates.regionOffline(regionInfo);
3132 }
3133 } else if (!disabledOrEnablingTables.contains(tableName)) {
3134
3135
3136 regionStates.regionOnline(regionInfo, regionLocation);
3137 balancer.regionOnline(regionInfo, regionLocation);
3138 } else if (useZKForAssignment) {
3139 regionStates.regionOffline(regionInfo);
3140 }
3141
3142
3143 if (!disabledOrDisablingOrEnabling.contains(tableName)
3144 && !getTableStateManager().isTableState(tableName,
3145 ZooKeeperProtos.Table.State.ENABLED)) {
3146 setEnabledTable(tableName);
3147 }
3148 }
3149 }
3150 return offlineServers;
3151 }
3152
3153
3154
3155
3156
3157
3158
3159
3160
3161 private void recoverTableInDisablingState()
3162 throws KeeperException, IOException, CoordinatedStateException {
3163 Set<TableName> disablingTables =
3164 tableStateManager.getTablesInStates(ZooKeeperProtos.Table.State.DISABLING);
3165 if (disablingTables.size() != 0) {
3166 for (TableName tableName : disablingTables) {
3167
3168 LOG.info("The table " + tableName
3169 + " is in DISABLING state. Hence recovering by moving the table"
3170 + " to DISABLED state.");
3171 new DisableTableHandler(this.server, tableName,
3172 this, tableLockManager, true).prepare().process();
3173 }
3174 }
3175 }
3176
3177
3178
3179
3180
3181
3182
3183
3184
3185 private void recoverTableInEnablingState()
3186 throws KeeperException, IOException, CoordinatedStateException {
3187 Set<TableName> enablingTables = tableStateManager.
3188 getTablesInStates(ZooKeeperProtos.Table.State.ENABLING);
3189 if (enablingTables.size() != 0) {
3190 for (TableName tableName : enablingTables) {
3191
3192 LOG.info("The table " + tableName
3193 + " is in ENABLING state. Hence recovering by moving the table"
3194 + " to ENABLED state.");
3195
3196
3197 EnableTableHandler eth = new EnableTableHandler(this.server, tableName,
3198 this, tableLockManager, true);
3199 try {
3200 eth.prepare();
3201 } catch (TableNotFoundException e) {
3202 LOG.warn("Table " + tableName + " not found in hbase:meta to recover.");
3203 continue;
3204 }
3205 eth.process();
3206 }
3207 }
3208 }
3209
3210
3211
3212
3213
3214
3215
3216
3217
3218
3219
3220
3221
3222
3223
3224
3225 private void processDeadServersAndRecoverLostRegions(
3226 Set<ServerName> deadServers) throws IOException, KeeperException {
3227 if (deadServers != null && !deadServers.isEmpty()) {
3228 for (ServerName serverName: deadServers) {
3229 if (!serverManager.isServerDead(serverName)) {
3230 serverManager.expireServer(serverName);
3231 }
3232 }
3233 }
3234
3235 List<String> nodes = useZKForAssignment ?
3236 ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.assignmentZNode)
3237 : ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
3238 if (nodes != null && !nodes.isEmpty()) {
3239 for (String encodedRegionName : nodes) {
3240 processRegionInTransition(encodedRegionName, null);
3241 }
3242 } else if (!useZKForAssignment) {
3243 processRegionInTransitionZkLess();
3244 }
3245 }
3246
3247 void processRegionInTransitionZkLess() {
3248
3249
3250
3251
3252
3253 Map<String, RegionState> rits = regionStates.getRegionsInTransition();
3254 for (RegionState regionState : rits.values()) {
3255 LOG.info("Processing " + regionState);
3256 ServerName serverName = regionState.getServerName();
3257
3258
3259 if (serverName != null
3260 && !serverManager.getOnlineServers().containsKey(serverName)) {
3261 LOG.info("Server " + serverName + " isn't online. SSH will handle this");
3262 continue;
3263 }
3264 HRegionInfo regionInfo = regionState.getRegion();
3265 State state = regionState.getState();
3266
3267 switch (state) {
3268 case CLOSED:
3269 invokeAssign(regionInfo);
3270 break;
3271 case PENDING_OPEN:
3272 retrySendRegionOpen(regionState);
3273 break;
3274 case PENDING_CLOSE:
3275 retrySendRegionClose(regionState);
3276 break;
3277 case FAILED_CLOSE:
3278 case FAILED_OPEN:
3279 invokeUnAssign(regionInfo);
3280 break;
3281 default:
3282
3283 }
3284 }
3285 }
3286
3287
3288
3289
3290
3291 private void retrySendRegionOpen(final RegionState regionState) {
3292 this.executorService.submit(
3293 new EventHandler(server, EventType.M_MASTER_RECOVERY) {
3294 @Override
3295 public void process() throws IOException {
3296 HRegionInfo hri = regionState.getRegion();
3297 ServerName serverName = regionState.getServerName();
3298 ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
3299 try {
3300 for (int i = 1; i <= maximumAttempts; i++) {
3301 if (!serverManager.isServerOnline(serverName)
3302 || server.isStopped() || server.isAborted()) {
3303 return;
3304 }
3305 try {
3306 if (!regionState.equals(regionStates.getRegionState(hri))) {
3307 return;
3308 }
3309 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
3310 if (shouldAssignRegionsWithFavoredNodes) {
3311 favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri);
3312 }
3313 RegionOpeningState regionOpenState = serverManager.sendRegionOpen(
3314 serverName, hri, -1, favoredNodes);
3315
3316 if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
3317
3318
3319 LOG.debug("Got failed_opening in retry sendRegionOpen for "
3320 + regionState + ", re-assign it");
3321 invokeAssign(hri, true);
3322 }
3323 return;
3324 } catch (Throwable t) {
3325 if (t instanceof RemoteException) {
3326 t = ((RemoteException) t).unwrapRemoteException();
3327 }
3328
3329 if (t instanceof java.net.SocketTimeoutException
3330 || t instanceof FailedServerException) {
3331 Threads.sleep(100);
3332 continue;
3333 }
3334
3335 LOG.debug("Got exception in retry sendRegionOpen for "
3336 + regionState + ", re-assign it", t);
3337 invokeAssign(hri);
3338 return;
3339 }
3340 }
3341 } finally {
3342 lock.unlock();
3343 }
3344 }
3345 });
3346 }
3347
3348
3349
3350
3351
3352 private void retrySendRegionClose(final RegionState regionState) {
3353 this.executorService.submit(
3354 new EventHandler(server, EventType.M_MASTER_RECOVERY) {
3355 @Override
3356 public void process() throws IOException {
3357 HRegionInfo hri = regionState.getRegion();
3358 ServerName serverName = regionState.getServerName();
3359 ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
3360 try {
3361 for (int i = 1; i <= maximumAttempts; i++) {
3362 if (!serverManager.isServerOnline(serverName)
3363 || server.isStopped() || server.isAborted()) {
3364 return;
3365 }
3366 try {
3367 if (!regionState.equals(regionStates.getRegionState(hri))) {
3368 return;
3369 }
3370 if (!serverManager.sendRegionClose(serverName, hri, -1, null, false)) {
3371
3372 LOG.debug("Got false in retry sendRegionClose for "
3373 + regionState + ", re-close it");
3374 invokeUnAssign(hri);
3375 }
3376 return;
3377 } catch (Throwable t) {
3378 if (t instanceof RemoteException) {
3379 t = ((RemoteException) t).unwrapRemoteException();
3380 }
3381
3382 if (t instanceof java.net.SocketTimeoutException
3383 || t instanceof FailedServerException) {
3384 Threads.sleep(100);
3385 continue;
3386 }
3387 if (!(t instanceof NotServingRegionException
3388 || t instanceof RegionAlreadyInTransitionException)) {
3389
3390
3391
3392 LOG.debug("Got exception in retry sendRegionClose for "
3393 + regionState + ", re-close it", t);
3394 invokeUnAssign(hri);
3395 }
3396 return;
3397 }
3398 }
3399 } finally {
3400 lock.unlock();
3401 }
3402 }
3403 });
3404 }
3405
3406
3407
3408
3409
3410
3411
3412
3413 public void updateRegionsInTransitionMetrics() {
3414 long currentTime = System.currentTimeMillis();
3415 int totalRITs = 0;
3416 int totalRITsOverThreshold = 0;
3417 long oldestRITTime = 0;
3418 int ritThreshold = this.server.getConfiguration().
3419 getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
3420 for (RegionState state: regionStates.getRegionsInTransition().values()) {
3421 totalRITs++;
3422 long ritTime = currentTime - state.getStamp();
3423 if (ritTime > ritThreshold) {
3424 totalRITsOverThreshold++;
3425 }
3426 if (oldestRITTime < ritTime) {
3427 oldestRITTime = ritTime;
3428 }
3429 }
3430 if (this.metricsAssignmentManager != null) {
3431 this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
3432 this.metricsAssignmentManager.updateRITCount(totalRITs);
3433 this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
3434 }
3435 }
3436
3437
3438
3439
3440 void clearRegionPlan(final HRegionInfo region) {
3441 synchronized (this.regionPlans) {
3442 this.regionPlans.remove(region.getEncodedName());
3443 }
3444 }
3445
3446
3447
3448
3449
3450
3451 public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
3452 throws IOException, InterruptedException {
3453 waitOnRegionToClearRegionsInTransition(hri, -1L);
3454 }
3455
3456
3457
3458
3459
3460
3461
3462
3463 public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
3464 throws InterruptedException {
3465 if (!regionStates.isRegionInTransition(hri)) return true;
3466 long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTime()
3467 + timeOut;
3468
3469
3470 LOG.info("Waiting for " + hri.getEncodedName() +
3471 " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
3472 while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
3473 regionStates.waitForUpdate(100);
3474 if (EnvironmentEdgeManager.currentTime() > end) {
3475 LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
3476 return false;
3477 }
3478 }
3479 if (this.server.isStopped()) {
3480 LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
3481 return false;
3482 }
3483 return true;
3484 }
3485
3486 void invokeAssign(HRegionInfo regionInfo) {
3487 invokeAssign(regionInfo, true);
3488 }
3489
3490 void invokeAssign(HRegionInfo regionInfo, boolean newPlan) {
3491 threadPoolExecutorService.submit(new AssignCallable(this, regionInfo, newPlan));
3492 }
3493
3494 void invokeUnAssign(HRegionInfo regionInfo) {
3495 threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
3496 }
3497
3498 public ServerHostRegion isCarryingMeta(ServerName serverName) {
3499 return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
3500 }
3501
3502 public ServerHostRegion isCarryingMetaReplica(ServerName serverName, int replicaId) {
3503 return isCarryingRegion(serverName,
3504 RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, replicaId));
3505 }
3506
3507 public ServerHostRegion isCarryingMetaReplica(ServerName serverName, HRegionInfo metaHri) {
3508 return isCarryingRegion(serverName, metaHri);
3509 }
3510
3511 private List<HRegionInfo> getCarryingSystemTables(ServerName serverName) {
3512 Set<HRegionInfo> regions = this.getRegionStates().getServerRegions(serverName);
3513 if (regions == null) {
3514 return new ArrayList<>();
3515 }
3516 List<HRegionInfo> list = new ArrayList<>();
3517 for (HRegionInfo region : regions) {
3518 if (region.isSystemTable()) {
3519 list.add(region);
3520 }
3521 }
3522 return list;
3523 }
3524
3525
3526
3527
3528
3529
3530
3531
3532
3533
3534
3535 private ServerHostRegion isCarryingRegion(ServerName serverName, HRegionInfo hri) {
3536 RegionTransition rt = null;
3537 try {
3538 byte [] data = ZKAssign.getData(watcher, hri.getEncodedName());
3539
3540 rt = data == null? null: RegionTransition.parseFrom(data);
3541 } catch (KeeperException e) {
3542 server.abort("Exception reading unassigned node for region=" + hri.getEncodedName(), e);
3543 } catch (DeserializationException e) {
3544 server.abort("Exception parsing unassigned node for region=" + hri.getEncodedName(), e);
3545 }
3546
3547 ServerName addressFromZK = rt != null? rt.getServerName(): null;
3548 if (addressFromZK != null) {
3549
3550 boolean matchZK = addressFromZK.equals(serverName);
3551 LOG.debug("Checking region=" + hri.getRegionNameAsString() + ", zk server=" + addressFromZK +
3552 " current=" + serverName + ", matches=" + matchZK);
3553 return matchZK ? ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION;
3554 }
3555
3556 ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
3557 if (addressFromAM != null) {
3558 boolean matchAM = addressFromAM.equals(serverName);
3559 LOG.debug("based on AM, current region=" + hri.getRegionNameAsString() +
3560 " is on server=" + (addressFromAM != null ? addressFromAM : "null") +
3561 " server being checked: " + serverName);
3562 return matchAM ? ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION;
3563 }
3564
3565 if (hri.isMetaRegion() && RegionReplicaUtil.isDefaultReplica(hri)) {
3566
3567 final ServerName serverNameInZK =
3568 server.getMetaTableLocator().getMetaRegionLocation(this.server.getZooKeeper());
3569 LOG.debug("Based on MetaTableLocator, the META region is on server="
3570 + (serverNameInZK == null ? "null" : serverNameInZK)
3571 + " server being checked: " + serverName);
3572 if (serverNameInZK != null) {
3573 return serverNameInZK.equals(serverName) ?
3574 ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION;
3575 }
3576 }
3577
3578
3579 return ServerHostRegion.UNKNOWN;
3580 }
3581
3582
3583
3584
3585
3586
3587 public List<HRegionInfo> processServerShutdown(final ServerName sn) {
3588
3589 synchronized (this.regionPlans) {
3590 for (Iterator <Map.Entry<String, RegionPlan>> i =
3591 this.regionPlans.entrySet().iterator(); i.hasNext();) {
3592 Map.Entry<String, RegionPlan> e = i.next();
3593 ServerName otherSn = e.getValue().getDestination();
3594
3595 if (otherSn != null && otherSn.equals(sn)) {
3596
3597 i.remove();
3598 }
3599 }
3600 }
3601 List<HRegionInfo> regions = regionStates.serverOffline(watcher, sn);
3602 for (Iterator<HRegionInfo> it = regions.iterator(); it.hasNext(); ) {
3603 HRegionInfo hri = it.next();
3604 String encodedName = hri.getEncodedName();
3605
3606
3607 Lock lock = locker.acquireLock(encodedName);
3608 try {
3609 RegionState regionState =
3610 regionStates.getRegionTransitionState(encodedName);
3611 if (regionState == null
3612 || (regionState.getServerName() != null && !regionState.isOnServer(sn))
3613 || !(regionState.isFailedClose() || regionState.isOffline()
3614 || regionState.isPendingOpenOrOpening())) {
3615 LOG.info("Skip " + regionState + " since it is not opening/failed_close"
3616 + " on the dead server any more: " + sn);
3617 it.remove();
3618 } else {
3619 try {
3620
3621 ZKAssign.deleteNodeFailSilent(watcher, hri);
3622 } catch (KeeperException ke) {
3623 server.abort("Unexpected ZK exception deleting node " + hri, ke);
3624 }
3625 if (tableStateManager.isTableState(hri.getTable(),
3626 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3627 regionStates.regionOffline(hri);
3628 it.remove();
3629 continue;
3630 }
3631
3632 regionStates.updateRegionState(hri, State.OFFLINE);
3633 }
3634 } finally {
3635 lock.unlock();
3636 }
3637 }
3638 return regions;
3639 }
3640
3641
3642
3643
3644 public void balance(final RegionPlan plan) {
3645
3646 HRegionInfo hri = plan.getRegionInfo();
3647 TableName tableName = hri.getTable();
3648 if (tableStateManager.isTableState(tableName,
3649 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3650 LOG.info("Ignored moving region of disabling/disabled table "
3651 + tableName);
3652 return;
3653 }
3654
3655
3656 String encodedName = hri.getEncodedName();
3657 ReentrantLock lock = locker.acquireLock(encodedName);
3658 try {
3659 if (!regionStates.isRegionOnline(hri)) {
3660 RegionState state = regionStates.getRegionState(encodedName);
3661 LOG.info("Ignored moving region not assigned: " + hri + ", "
3662 + (state == null ? "not in region states" : state));
3663 return;
3664 }
3665 synchronized (this.regionPlans) {
3666 this.regionPlans.put(plan.getRegionName(), plan);
3667 }
3668 unassign(hri, false, plan.getDestination());
3669 } finally {
3670 lock.unlock();
3671 }
3672 }
3673
3674 public void stop() {
3675 shutdown();
3676 }
3677
3678
3679
3680
3681 public void shutdown() {
3682
3683 synchronized (zkEventWorkerWaitingList){
3684 zkEventWorkerWaitingList.clear();
3685 }
3686
3687
3688 threadPoolExecutorService.shutdownNow();
3689 zkEventWorkers.shutdownNow();
3690 regionStateStore.stop();
3691 }
3692
3693 protected void setEnabledTable(TableName tableName) {
3694 try {
3695 this.tableStateManager.setTableState(tableName,
3696 ZooKeeperProtos.Table.State.ENABLED);
3697 } catch (CoordinatedStateException e) {
3698
3699 String errorMsg = "Unable to ensure that the table " + tableName
3700 + " will be" + " enabled because of a ZooKeeper issue";
3701 LOG.error(errorMsg);
3702 this.server.abort(errorMsg, e);
3703 }
3704 }
3705
3706
3707
3708
3709
3710
3711
3712 private boolean asyncSetOfflineInZooKeeper(final RegionState state,
3713 final AsyncCallback.StringCallback cb, final ServerName destination) {
3714 if (!state.isClosed() && !state.isOffline()) {
3715 this.server.abort("Unexpected state trying to OFFLINE; " + state,
3716 new IllegalStateException());
3717 return false;
3718 }
3719 regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
3720 try {
3721 ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
3722 destination, cb, state);
3723 } catch (KeeperException e) {
3724 if (e instanceof NodeExistsException) {
3725 LOG.warn("Node for " + state.getRegion() + " already exists");
3726 } else {
3727 server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
3728 }
3729 return false;
3730 }
3731 return true;
3732 }
3733
3734 private boolean deleteNodeInStates(String encodedName,
3735 String desc, ServerName sn, EventType... types) {
3736 try {
3737 for (EventType et: types) {
3738 if (ZKAssign.deleteNode(watcher, encodedName, et, sn)) {
3739 return true;
3740 }
3741 }
3742 LOG.info("Failed to delete the " + desc + " node for "
3743 + encodedName + ". The node type may not match");
3744 } catch (NoNodeException e) {
3745 if (LOG.isDebugEnabled()) {
3746 LOG.debug("The " + desc + " node for " + encodedName + " already deleted");
3747 }
3748 } catch (KeeperException ke) {
3749 server.abort("Unexpected ZK exception deleting " + desc
3750 + " node for the region " + encodedName, ke);
3751 }
3752 return false;
3753 }
3754
3755 private void deleteMergingNode(String encodedName, ServerName sn) {
3756 deleteNodeInStates(encodedName, "merging", sn, EventType.RS_ZK_REGION_MERGING,
3757 EventType.RS_ZK_REQUEST_REGION_MERGE, EventType.RS_ZK_REGION_MERGED);
3758 }
3759
3760 private void deleteSplittingNode(String encodedName, ServerName sn) {
3761 deleteNodeInStates(encodedName, "splitting", sn, EventType.RS_ZK_REGION_SPLITTING,
3762 EventType.RS_ZK_REQUEST_REGION_SPLIT, EventType.RS_ZK_REGION_SPLIT);
3763 }
3764
3765 private void onRegionFailedOpen(
3766 final HRegionInfo hri, final ServerName sn) {
3767 String encodedName = hri.getEncodedName();
3768 AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
3769 if (failedOpenCount == null) {
3770 failedOpenCount = new AtomicInteger();
3771
3772
3773
3774 failedOpenTracker.put(encodedName, failedOpenCount);
3775 }
3776 if (failedOpenCount.incrementAndGet() >= maximumAttempts && !hri.isMetaRegion()) {
3777 regionStates.updateRegionState(hri, State.FAILED_OPEN);
3778
3779
3780 failedOpenTracker.remove(encodedName);
3781 } else {
3782 if (hri.isMetaRegion() && failedOpenCount.get() >= maximumAttempts) {
3783
3784
3785 LOG.warn("Failed to open the hbase:meta region " +
3786 hri.getRegionNameAsString() + " after" +
3787 failedOpenCount.get() + " retries. Continue retrying.");
3788 }
3789
3790
3791 RegionState regionState = regionStates.updateRegionState(hri, State.CLOSED);
3792 if (regionState != null) {
3793
3794
3795 if (getTableStateManager().isTableState(hri.getTable(),
3796 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
3797 replicasToClose.contains(hri)) {
3798 offlineDisabledRegion(hri);
3799 return;
3800 }
3801
3802 regionStates.updateRegionState(hri, RegionState.State.CLOSED);
3803
3804 removeClosedRegion(hri);
3805 getRegionPlan(hri, sn, true);
3806 invokeAssign(hri, false);
3807 }
3808 }
3809 }
3810
3811 private void onRegionOpen(
3812 final HRegionInfo hri, final ServerName sn, long openSeqNum) {
3813 regionOnline(hri, sn, openSeqNum);
3814 if (useZKForAssignment) {
3815 try {
3816
3817 ZKAssign.deleteNodeFailSilent(watcher, hri);
3818 } catch (KeeperException ke) {
3819 server.abort("Unexpected ZK exception deleting node " + hri, ke);
3820 }
3821 }
3822
3823
3824 failedOpenTracker.remove(hri.getEncodedName());
3825 if (getTableStateManager().isTableState(hri.getTable(),
3826 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3827 invokeUnAssign(hri);
3828 }
3829 }
3830
3831 private void onRegionClosed(final HRegionInfo hri) {
3832 if (getTableStateManager().isTableState(hri.getTable(),
3833 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
3834 replicasToClose.contains(hri)) {
3835 offlineDisabledRegion(hri);
3836 return;
3837 }
3838 regionStates.updateRegionState(hri, RegionState.State.CLOSED);
3839 sendRegionClosedNotification(hri);
3840
3841 removeClosedRegion(hri);
3842 invokeAssign(hri, false);
3843 }
3844
3845 private String onRegionSplit(ServerName sn, TransitionCode code,
3846 final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3847 final RegionState rs_p = regionStates.getRegionState(p);
3848 RegionState rs_a = regionStates.getRegionState(a);
3849 RegionState rs_b = regionStates.getRegionState(b);
3850 if (!(rs_p.isOpenOrSplittingOnServer(sn)
3851 && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
3852 && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
3853 return "Not in state good for split";
3854 }
3855 regionStates.updateRegionState(a, State.SPLITTING_NEW, sn);
3856 regionStates.updateRegionState(b, State.SPLITTING_NEW, sn);
3857 regionStates.updateRegionState(p, State.SPLITTING);
3858
3859 if (code == TransitionCode.SPLIT) {
3860 if (TEST_SKIP_SPLIT_HANDLING) {
3861 return "Skipping split message, TEST_SKIP_SPLIT_HANDLING is set";
3862 }
3863 regionOffline(p, State.SPLIT);
3864 regionOnline(a, sn, 1);
3865 regionOnline(b, sn, 1);
3866
3867
3868 if (getTableStateManager().isTableState(p.getTable(),
3869 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3870 invokeUnAssign(a);
3871 invokeUnAssign(b);
3872 } else {
3873 Callable<Object> splitReplicasCallable = new Callable<Object>() {
3874 @Override
3875 public Object call() {
3876 doSplittingOfReplicas(p, a, b);
3877 return null;
3878 }
3879 };
3880 threadPoolExecutorService.submit(splitReplicasCallable);
3881 }
3882 } else if (code == TransitionCode.SPLIT_PONR) {
3883 try {
3884 regionStates.splitRegion(p, a, b, sn);
3885 } catch (IOException ioe) {
3886 LOG.info("Failed to record split region " + p.getShortNameToLog());
3887 return "Failed to record the splitting in meta";
3888 }
3889 } else if (code == TransitionCode.SPLIT_REVERTED) {
3890
3891
3892 regionOnline(p, sn);
3893
3894
3895 RegionState regionStateA = regionStates.getRegionState(a);
3896 RegionState regionStateB = regionStates.getRegionState(b);
3897 if (regionStateA != null) {
3898 regionOffline(a);
3899 }
3900 if (regionStateB != null) {
3901 regionOffline(b);
3902 }
3903
3904 if (getTableStateManager().isTableState(p.getTable(),
3905 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3906 invokeUnAssign(p);
3907 }
3908 }
3909 return null;
3910 }
3911
3912 private String onRegionMerge(ServerName sn, TransitionCode code,
3913 final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3914 RegionState rs_p = regionStates.getRegionState(p);
3915 RegionState rs_a = regionStates.getRegionState(a);
3916 RegionState rs_b = regionStates.getRegionState(b);
3917 if (!(rs_a.isOpenOrMergingOnServer(sn) && rs_b.isOpenOrMergingOnServer(sn)
3918 && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
3919 return "Not in state good for merge";
3920 }
3921 regionStates.updateRegionState(a, State.MERGING);
3922 regionStates.updateRegionState(b, State.MERGING);
3923 regionStates.updateRegionState(p, State.MERGING_NEW, sn);
3924
3925 String encodedName = p.getEncodedName();
3926 if (code == TransitionCode.READY_TO_MERGE) {
3927 mergingRegions.put(encodedName,
3928 new PairOfSameType<HRegionInfo>(a, b));
3929 } else if (code == TransitionCode.MERGED) {
3930 mergingRegions.remove(encodedName);
3931 regionOffline(a, State.MERGED);
3932 regionOffline(b, State.MERGED);
3933 regionOnline(p, sn, 1);
3934
3935
3936 if (getTableStateManager().isTableState(p.getTable(),
3937 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3938 invokeUnAssign(p);
3939 } else {
3940 Callable<Object> mergeReplicasCallable = new Callable<Object>() {
3941 @Override
3942 public Object call() {
3943 doMergingOfReplicas(p, a, b);
3944 return null;
3945 }
3946 };
3947 threadPoolExecutorService.submit(mergeReplicasCallable);
3948 }
3949 } else if (code == TransitionCode.MERGE_PONR) {
3950 try {
3951 regionStates.mergeRegions(p, a, b, sn);
3952 } catch (IOException ioe) {
3953 LOG.info("Failed to record merged region " + p.getShortNameToLog());
3954 return "Failed to record the merging in meta";
3955 }
3956 }
3957 return null;
3958 }
3959
3960 private String onRegionMergeReverted(ServerName sn, TransitionCode code,
3961 final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3962 RegionState rs_p = regionStates.getRegionState(p);
3963 String encodedName = p.getEncodedName();
3964 mergingRegions.remove(encodedName);
3965
3966
3967
3968 regionOnline(a, sn);
3969 regionOnline(b, sn);
3970
3971
3972 if (rs_p != null) {
3973 regionOffline(p);
3974 }
3975
3976 if (getTableStateManager().isTableState(p.getTable(),
3977 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3978 invokeUnAssign(a);
3979 invokeUnAssign(b);
3980 }
3981
3982 return null;
3983 }
3984
3985
3986
3987
3988
3989 private boolean handleRegionMerging(final RegionTransition rt, final String encodedName,
3990 final String prettyPrintedRegionName, final ServerName sn) {
3991 if (!serverManager.isServerOnline(sn)) {
3992 LOG.warn("Dropped merging! ServerName=" + sn + " unknown.");
3993 return false;
3994 }
3995 byte [] payloadOfMerging = rt.getPayload();
3996 List<HRegionInfo> mergingRegions;
3997 try {
3998 mergingRegions = HRegionInfo.parseDelimitedFrom(
3999 payloadOfMerging, 0, payloadOfMerging.length);
4000 } catch (IOException e) {
4001 LOG.error("Dropped merging! Failed reading " + rt.getEventType()
4002 + " payload for " + prettyPrintedRegionName);
4003 return false;
4004 }
4005 assert mergingRegions.size() == 3;
4006 HRegionInfo p = mergingRegions.get(0);
4007 HRegionInfo hri_a = mergingRegions.get(1);
4008 HRegionInfo hri_b = mergingRegions.get(2);
4009
4010 RegionState rs_p = regionStates.getRegionState(p);
4011 RegionState rs_a = regionStates.getRegionState(hri_a);
4012 RegionState rs_b = regionStates.getRegionState(hri_b);
4013
4014 if (!((rs_a == null || rs_a.isOpenOrMergingOnServer(sn))
4015 && (rs_b == null || rs_b.isOpenOrMergingOnServer(sn))
4016 && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
4017 LOG.warn("Dropped merging! Not in state good for MERGING; rs_p="
4018 + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
4019 return false;
4020 }
4021
4022 EventType et = rt.getEventType();
4023 if (et == EventType.RS_ZK_REQUEST_REGION_MERGE) {
4024 try {
4025 RegionMergeCoordination.RegionMergeDetails std =
4026 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
4027 .getRegionMergeCoordination().getDefaultDetails();
4028 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
4029 .getRegionMergeCoordination().processRegionMergeRequest(p, hri_a, hri_b, sn, std);
4030 if (((ZkRegionMergeCoordination.ZkRegionMergeDetails) std).getZnodeVersion() == -1) {
4031 byte[] data = ZKAssign.getData(watcher, encodedName);
4032 EventType currentType = null;
4033 if (data != null) {
4034 RegionTransition newRt = RegionTransition.parseFrom(data);
4035 currentType = newRt.getEventType();
4036 }
4037 if (currentType == null || (currentType != EventType.RS_ZK_REGION_MERGED
4038 && currentType != EventType.RS_ZK_REGION_MERGING)) {
4039 LOG.warn("Failed to transition pending_merge node "
4040 + encodedName + " to merging, it's now " + currentType);
4041 return false;
4042 }
4043 }
4044 } catch (Exception e) {
4045 LOG.warn("Failed to transition pending_merge node "
4046 + encodedName + " to merging", e);
4047 return false;
4048 }
4049 }
4050
4051 synchronized (regionStates) {
4052 regionStates.updateRegionState(hri_a, State.MERGING);
4053 regionStates.updateRegionState(hri_b, State.MERGING);
4054 regionStates.updateRegionState(p, State.MERGING_NEW, sn);
4055
4056 if (et != EventType.RS_ZK_REGION_MERGED) {
4057 this.mergingRegions.put(encodedName,
4058 new PairOfSameType<HRegionInfo>(hri_a, hri_b));
4059 } else {
4060 this.mergingRegions.remove(encodedName);
4061 regionOffline(hri_a, State.MERGED);
4062 regionOffline(hri_b, State.MERGED);
4063 regionOnline(p, sn);
4064 }
4065 }
4066
4067 if (et == EventType.RS_ZK_REGION_MERGED) {
4068 doMergingOfReplicas(p, hri_a, hri_b);
4069 LOG.debug("Handling MERGED event for " + encodedName + "; deleting node");
4070
4071 try {
4072 boolean successful = false;
4073 while (!successful) {
4074
4075
4076 successful = ZKAssign.deleteNode(watcher, encodedName,
4077 EventType.RS_ZK_REGION_MERGED, sn);
4078 }
4079 } catch (KeeperException e) {
4080 if (e instanceof NoNodeException) {
4081 String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
4082 LOG.debug("The znode " + znodePath + " does not exist. May be deleted already.");
4083 } else {
4084 server.abort("Error deleting MERGED node " + encodedName, e);
4085 }
4086 }
4087 LOG.info("Handled MERGED event; merged=" + p.getRegionNameAsString()
4088 + ", region_a=" + hri_a.getRegionNameAsString() + ", region_b="
4089 + hri_b.getRegionNameAsString() + ", on " + sn);
4090
4091
4092 if (tableStateManager.isTableState(p.getTable(),
4093 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
4094 unassign(p);
4095 }
4096 }
4097 return true;
4098 }
4099
4100
4101
4102
4103 private boolean handleRegionSplitting(final RegionTransition rt, final String encodedName,
4104 final String prettyPrintedRegionName, final ServerName sn) {
4105 if (!serverManager.isServerOnline(sn)) {
4106 LOG.warn("Dropped splitting! ServerName=" + sn + " unknown.");
4107 return false;
4108 }
4109 byte [] payloadOfSplitting = rt.getPayload();
4110 List<HRegionInfo> splittingRegions;
4111 try {
4112 splittingRegions = HRegionInfo.parseDelimitedFrom(
4113 payloadOfSplitting, 0, payloadOfSplitting.length);
4114 } catch (IOException e) {
4115 LOG.error("Dropped splitting! Failed reading " + rt.getEventType()
4116 + " payload for " + prettyPrintedRegionName);
4117 return false;
4118 }
4119 assert splittingRegions.size() == 2;
4120 HRegionInfo hri_a = splittingRegions.get(0);
4121 HRegionInfo hri_b = splittingRegions.get(1);
4122
4123 RegionState rs_p = regionStates.getRegionState(encodedName);
4124 RegionState rs_a = regionStates.getRegionState(hri_a);
4125 RegionState rs_b = regionStates.getRegionState(hri_b);
4126
4127 if (!((rs_p == null || rs_p.isOpenOrSplittingOnServer(sn))
4128 && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
4129 && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
4130 LOG.warn("Dropped splitting! Not in state good for SPLITTING; rs_p="
4131 + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
4132 return false;
4133 }
4134
4135 if (rs_p == null) {
4136
4137 rs_p = regionStates.updateRegionState(rt, State.OPEN);
4138 if (rs_p == null) {
4139 LOG.warn("Received splitting for region " + prettyPrintedRegionName
4140 + " from server " + sn + " but it doesn't exist anymore,"
4141 + " probably already processed its split");
4142 return false;
4143 }
4144 regionStates.regionOnline(rs_p.getRegion(), sn);
4145 }
4146
4147 HRegionInfo p = rs_p.getRegion();
4148 EventType et = rt.getEventType();
4149 if (et == EventType.RS_ZK_REQUEST_REGION_SPLIT) {
4150 try {
4151 SplitTransactionDetails std =
4152 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
4153 .getSplitTransactionCoordination().getDefaultDetails();
4154 if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
4155 .getSplitTransactionCoordination().processTransition(p, hri_a, hri_b, sn, std) == -1) {
4156 byte[] data = ZKAssign.getData(watcher, encodedName);
4157 EventType currentType = null;
4158 if (data != null) {
4159 RegionTransition newRt = RegionTransition.parseFrom(data);
4160 currentType = newRt.getEventType();
4161 }
4162 if (currentType == null
4163 || (currentType != EventType.RS_ZK_REGION_SPLIT && currentType != EventType.RS_ZK_REGION_SPLITTING)) {
4164 LOG.warn("Failed to transition pending_split node " + encodedName
4165 + " to splitting, it's now " + currentType);
4166 return false;
4167 }
4168 }
4169 } catch (Exception e) {
4170 LOG.warn("Failed to transition pending_split node " + encodedName + " to splitting", e);
4171 return false;
4172 }
4173 }
4174
4175 synchronized (regionStates) {
4176 splitRegions.put(p, new PairOfSameType<HRegionInfo>(hri_a, hri_b));
4177 regionStates.updateRegionState(hri_a, State.SPLITTING_NEW, sn);
4178 regionStates.updateRegionState(hri_b, State.SPLITTING_NEW, sn);
4179 regionStates.updateRegionState(rt, State.SPLITTING);
4180
4181
4182
4183 if (TEST_SKIP_SPLIT_HANDLING) {
4184 LOG.warn("Skipping split message, TEST_SKIP_SPLIT_HANDLING is set");
4185 return true;
4186 }
4187
4188 if (et == EventType.RS_ZK_REGION_SPLIT) {
4189 regionOffline(p, State.SPLIT);
4190 regionOnline(hri_a, sn);
4191 regionOnline(hri_b, sn);
4192 splitRegions.remove(p);
4193 }
4194 }
4195
4196 if (et == EventType.RS_ZK_REGION_SPLIT) {
4197
4198 doSplittingOfReplicas(rs_p.getRegion(), hri_a, hri_b);
4199 LOG.debug("Handling SPLIT event for " + encodedName + "; deleting node");
4200
4201 try {
4202 boolean successful = false;
4203 while (!successful) {
4204
4205
4206 successful = ZKAssign.deleteNode(watcher, encodedName,
4207 EventType.RS_ZK_REGION_SPLIT, sn);
4208 }
4209 } catch (KeeperException e) {
4210 if (e instanceof NoNodeException) {
4211 String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
4212 LOG.debug("The znode " + znodePath + " does not exist. May be deleted already.");
4213 } else {
4214 server.abort("Error deleting SPLIT node " + encodedName, e);
4215 }
4216 }
4217 LOG.info("Handled SPLIT event; parent=" + p.getRegionNameAsString()
4218 + ", daughter a=" + hri_a.getRegionNameAsString() + ", daughter b="
4219 + hri_b.getRegionNameAsString() + ", on " + sn);
4220
4221
4222 if (tableStateManager.isTableState(p.getTable(),
4223 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
4224 unassign(hri_a);
4225 unassign(hri_b);
4226 }
4227 }
4228 return true;
4229 }
4230
4231 private void doMergingOfReplicas(HRegionInfo mergedHri, final HRegionInfo hri_a,
4232 final HRegionInfo hri_b) {
4233
4234
4235 List<HRegionInfo> unmergedRegions = new ArrayList<HRegionInfo>();
4236 unmergedRegions.add(hri_a);
4237 unmergedRegions.add(hri_b);
4238 Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(unmergedRegions);
4239 Collection<List<HRegionInfo>> c = map.values();
4240 for (List<HRegionInfo> l : c) {
4241 for (HRegionInfo h : l) {
4242 if (!RegionReplicaUtil.isDefaultReplica(h)) {
4243 LOG.debug("Unassigning un-merged replica " + h);
4244 unassign(h);
4245 }
4246 }
4247 }
4248 int numReplicas = 1;
4249 try {
4250 numReplicas = server.getTableDescriptors().get(mergedHri.getTable()).
4251 getRegionReplication();
4252 } catch (IOException e) {
4253 LOG.warn("Couldn't get the replication attribute of the table " + mergedHri.getTable() +
4254 " due to " + e.getMessage() + ". The assignment of replicas for the merged region " +
4255 "will not be done");
4256 }
4257 List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
4258 for (int i = 1; i < numReplicas; i++) {
4259 regions.add(RegionReplicaUtil.getRegionInfoForReplica(mergedHri, i));
4260 }
4261 try {
4262 assign(regions);
4263 } catch (IOException ioe) {
4264 LOG.warn("Couldn't assign all replica(s) of region " + mergedHri + " because of " +
4265 ioe.getMessage());
4266 } catch (InterruptedException ie) {
4267 LOG.warn("Couldn't assign all replica(s) of region " + mergedHri+ " because of " +
4268 ie.getMessage());
4269 }
4270 }
4271
4272 private void doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a,
4273 final HRegionInfo hri_b) {
4274
4275
4276
4277 int numReplicas = 1;
4278 try {
4279 numReplicas = server.getTableDescriptors().get(parentHri.getTable()).
4280 getRegionReplication();
4281 } catch (IOException e) {
4282 LOG.warn("Couldn't get the replication attribute of the table " + parentHri.getTable() +
4283 " due to " + e.getMessage() + ". The assignment of daughter replicas " +
4284 "replicas will not be done");
4285 }
4286
4287 List<HRegionInfo> parentRegion = new ArrayList<HRegionInfo>();
4288 parentRegion.add(parentHri);
4289 Map<ServerName, List<HRegionInfo>> currentAssign =
4290 regionStates.getRegionAssignments(parentRegion);
4291 Collection<List<HRegionInfo>> c = currentAssign.values();
4292 for (List<HRegionInfo> l : c) {
4293 for (HRegionInfo h : l) {
4294 if (!RegionReplicaUtil.isDefaultReplica(h)) {
4295 LOG.debug("Unassigning parent's replica " + h);
4296 unassign(h);
4297 }
4298 }
4299 }
4300
4301 Map<HRegionInfo, ServerName> map = new HashMap<HRegionInfo, ServerName>();
4302 for (int i = 1; i < numReplicas; i++) {
4303 prepareDaughterReplicaForAssignment(hri_a, parentHri, i, map);
4304 prepareDaughterReplicaForAssignment(hri_b, parentHri, i, map);
4305 }
4306 try {
4307 assign(map);
4308 } catch (IOException e) {
4309 LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
4310 } catch (InterruptedException e) {
4311 LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
4312 }
4313 }
4314
4315 private void prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri,
4316 int replicaId, Map<HRegionInfo, ServerName> map) {
4317 HRegionInfo parentReplica = RegionReplicaUtil.getRegionInfoForReplica(parentHri, replicaId);
4318 HRegionInfo daughterReplica = RegionReplicaUtil.getRegionInfoForReplica(daughterHri,
4319 replicaId);
4320 LOG.debug("Created replica region for daughter " + daughterReplica);
4321 ServerName sn;
4322 if ((sn = regionStates.getRegionServerOfRegion(parentReplica)) != null) {
4323 map.put(daughterReplica, sn);
4324 } else {
4325 List<ServerName> servers = serverManager.getOnlineServersList();
4326 sn = servers.get((new Random(System.currentTimeMillis())).nextInt(servers.size()));
4327 map.put(daughterReplica, sn);
4328 }
4329 }
4330
4331 public Set<HRegionInfo> getReplicasToClose() {
4332 return replicasToClose;
4333 }
4334
4335
4336
4337
4338
4339
4340 private void regionOffline(final HRegionInfo regionInfo, final State state) {
4341 regionStates.regionOffline(regionInfo, state);
4342 removeClosedRegion(regionInfo);
4343
4344 clearRegionPlan(regionInfo);
4345 balancer.regionOffline(regionInfo);
4346
4347
4348 sendRegionClosedNotification(regionInfo);
4349
4350 if (state != null && state.equals(State.SPLIT)) {
4351 Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
4352 c.add(regionInfo);
4353 Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
4354 Collection<List<HRegionInfo>> allReplicas = map.values();
4355 for (List<HRegionInfo> list : allReplicas) {
4356 replicasToClose.addAll(list);
4357 }
4358 }
4359 else if (state != null && state.equals(State.MERGED)) {
4360 Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
4361 c.add(regionInfo);
4362 Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
4363 Collection<List<HRegionInfo>> allReplicas = map.values();
4364 for (List<HRegionInfo> list : allReplicas) {
4365 replicasToClose.addAll(list);
4366 }
4367 }
4368 }
4369
4370 private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
4371 final ServerName serverName) {
4372 if (!this.listeners.isEmpty()) {
4373 for (AssignmentListener listener : this.listeners) {
4374 listener.regionOpened(regionInfo, serverName);
4375 }
4376 }
4377 }
4378
4379 private void sendRegionClosedNotification(final HRegionInfo regionInfo) {
4380 if (!this.listeners.isEmpty()) {
4381 for (AssignmentListener listener : this.listeners) {
4382 listener.regionClosed(regionInfo);
4383 }
4384 }
4385 }
4386
4387
4388
4389
4390
4391
4392
4393
4394
4395
4396
4397
4398
4399
4400
4401
4402
4403
4404
4405
4406
4407
4408
4409
4410
4411
4412
4413
4414
4415
4416
4417
4418
4419
4420
4421
4422
4423
4424
4425
4426
4427
4428
4429
4430
4431 protected String onRegionTransition(final ServerName serverName,
4432 final RegionStateTransition transition) {
4433 TransitionCode code = transition.getTransitionCode();
4434 HRegionInfo hri = HRegionInfo.convert(transition.getRegionInfo(0));
4435 RegionState current = regionStates.getRegionState(hri);
4436 if (LOG.isDebugEnabled()) {
4437 LOG.debug("Got transition " + code + " for "
4438 + (current != null ? current.toString() : hri.getShortNameToLog())
4439 + " from " + serverName);
4440 }
4441 String errorMsg = null;
4442 switch (code) {
4443 case OPENED:
4444 if (current != null && current.isOpened() && current.isOnServer(serverName)) {
4445 LOG.info("Region " + hri.getShortNameToLog() + " is already " + current.getState() + " on "
4446 + serverName);
4447 break;
4448 }
4449 case FAILED_OPEN:
4450 if (current == null
4451 || !current.isPendingOpenOrOpeningOnServer(serverName)) {
4452 errorMsg = hri.getShortNameToLog()
4453 + " is not pending open on " + serverName;
4454 } else if (code == TransitionCode.FAILED_OPEN) {
4455 onRegionFailedOpen(hri, serverName);
4456 } else {
4457 long openSeqNum = HConstants.NO_SEQNUM;
4458 if (transition.hasOpenSeqNum()) {
4459 openSeqNum = transition.getOpenSeqNum();
4460 }
4461 if (openSeqNum < 0) {
4462 errorMsg = "Newly opened region has invalid open seq num " + openSeqNum;
4463 } else {
4464 onRegionOpen(hri, serverName, openSeqNum);
4465 }
4466 }
4467 break;
4468
4469 case CLOSED:
4470 if (current == null
4471 || !current.isPendingCloseOrClosingOnServer(serverName)) {
4472 errorMsg = hri.getShortNameToLog()
4473 + " is not pending close on " + serverName;
4474 } else {
4475 onRegionClosed(hri);
4476 }
4477 break;
4478
4479 case READY_TO_SPLIT:
4480 try {
4481 regionStateListener.onRegionSplit(hri);
4482 if (!((HMaster)server).getSplitOrMergeTracker().isSplitOrMergeEnabled(
4483 Admin.MasterSwitchType.SPLIT)) {
4484 errorMsg = "split switch is off!";
4485 }
4486 } catch (IOException exp) {
4487 errorMsg = StringUtils.stringifyException(exp);
4488 }
4489 break;
4490 case SPLIT_PONR:
4491 case SPLIT:
4492 case SPLIT_REVERTED:
4493 errorMsg =
4494 onRegionSplit(serverName, code, hri, HRegionInfo.convert(transition.getRegionInfo(1)),
4495 HRegionInfo.convert(transition.getRegionInfo(2)));
4496 if (org.apache.commons.lang.StringUtils.isEmpty(errorMsg)) {
4497 try {
4498 regionStateListener.onRegionSplitReverted(hri);
4499 } catch (IOException exp) {
4500 LOG.warn(StringUtils.stringifyException(exp));
4501 }
4502 }
4503 break;
4504 case READY_TO_MERGE:
4505 if (!((HMaster)server).getSplitOrMergeTracker().isSplitOrMergeEnabled(
4506 Admin.MasterSwitchType.MERGE)) {
4507 errorMsg = "merge switch is off!";
4508 }
4509 break;
4510 case MERGE_PONR:
4511 case MERGED:
4512 errorMsg = onRegionMerge(serverName, code, hri,
4513 HRegionInfo.convert(transition.getRegionInfo(1)),
4514 HRegionInfo.convert(transition.getRegionInfo(2)));
4515 if (code == TransitionCode.MERGED && org.apache.commons.lang.StringUtils.isEmpty(errorMsg)) {
4516 try {
4517 regionStateListener.onRegionMerged(hri);
4518 } catch (IOException exp) {
4519 errorMsg = StringUtils.stringifyException(exp);
4520 }
4521 }
4522 break;
4523 case MERGE_REVERTED:
4524 errorMsg = onRegionMergeReverted(serverName, code, hri,
4525 HRegionInfo.convert(transition.getRegionInfo(1)),
4526 HRegionInfo.convert(transition.getRegionInfo(2)));
4527 break;
4528
4529 default:
4530 errorMsg = "Unexpected transition code " + code;
4531 }
4532 if (errorMsg != null) {
4533 LOG.error("Failed to transtion region from " + current + " to "
4534 + code + " by " + serverName + ": " + errorMsg);
4535 }
4536 return errorMsg;
4537 }
4538
4539 private void processBogusAssignments(Map<ServerName, List<HRegionInfo>> bulkPlan) {
4540 if (bulkPlan.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) {
4541
4542 for (HRegionInfo hri : bulkPlan.get(LoadBalancer.BOGUS_SERVER_NAME)) {
4543 regionStates.updateRegionState(hri, State.FAILED_OPEN);
4544 }
4545 bulkPlan.remove(LoadBalancer.BOGUS_SERVER_NAME);
4546 }
4547 }
4548
4549
4550
4551
4552 public LoadBalancer getBalancer() {
4553 return this.balancer;
4554 }
4555
4556 public Map<ServerName, List<HRegionInfo>>
4557 getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
4558 return getRegionStates().getRegionAssignments(infos);
4559 }
4560
4561 void setRegionStateListener(RegionStateListener listener) {
4562 this.regionStateListener = listener;
4563 }
4564 }