1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.zookeeper;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.NavigableMap;
26 import java.util.TreeMap;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.Server;
31 import org.apache.hadoop.hbase.ServerName;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.hbase.master.AssignmentManager;
34 import org.apache.hadoop.hbase.master.HMaster;
35 import org.apache.hadoop.hbase.master.MasterServices;
36 import org.apache.hadoop.hbase.master.ServerManager;
37 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
38 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
39 import org.apache.zookeeper.KeeperException;
40
41
42
43
44
45
46
47
48
49
50
51 @InterfaceAudience.Private
52 public class RegionServerTracker extends ZooKeeperListener {
53 private static final Log LOG = LogFactory.getLog(RegionServerTracker.class);
54 private NavigableMap<ServerName, RegionServerInfo> regionServers =
55 new TreeMap<ServerName, RegionServerInfo>();
56 private ServerManager serverManager;
57 private MasterServices server;
58
59 public RegionServerTracker(ZooKeeperWatcher watcher,
60 MasterServices server, ServerManager serverManager) {
61 super(watcher);
62 this.server = server;
63 this.serverManager = serverManager;
64 }
65
66
67
68
69
70
71
72
73
74 public void start() throws KeeperException, IOException {
75 watcher.registerListener(this);
76 List<String> servers =
77 ZKUtil.listChildrenAndWatchThem(watcher, watcher.rsZNode);
78 refresh(servers);
79 }
80
81 private void refresh(final List<String> servers) throws IOException {
82 synchronized(this.regionServers) {
83 this.regionServers.clear();
84 for (String n: servers) {
85 ServerName sn = ServerName.parseServerName(ZKUtil.getNodeName(n));
86 if (regionServers.get(sn) == null) {
87 RegionServerInfo.Builder rsInfoBuilder = RegionServerInfo.newBuilder();
88 try {
89 String nodePath = ZKUtil.joinZNode(watcher.rsZNode, n);
90 byte[] data = ZKUtil.getData(watcher, nodePath);
91 if (data != null && data.length > 0 && ProtobufUtil.isPBMagicPrefix(data)) {
92 int magicLen = ProtobufUtil.lengthOfPBMagic();
93 ProtobufUtil.mergeFrom(rsInfoBuilder, data, magicLen, data.length - magicLen);
94 }
95 if (LOG.isDebugEnabled()) {
96 LOG.debug("Added tracking of RS " + nodePath);
97 }
98 } catch (KeeperException e) {
99 LOG.warn("Get Rs info port from ephemeral node", e);
100 } catch (IOException e) {
101 LOG.warn("Illegal data from ephemeral node", e);
102 } catch (InterruptedException e) {
103 throw new InterruptedIOException();
104 }
105 this.regionServers.put(sn, rsInfoBuilder.build());
106 }
107 }
108 }
109
110 if (server.isInitialized()) {
111 server.checkIfShouldMoveSystemRegionAsync();
112 }
113 }
114
115 private void remove(final ServerName sn) {
116 synchronized(this.regionServers) {
117 this.regionServers.remove(sn);
118 }
119 }
120
121 @Override
122 public void nodeDeleted(String path) {
123 if (path.startsWith(watcher.rsZNode)) {
124 String serverName = ZKUtil.getNodeName(path);
125 LOG.info("RegionServer ephemeral node deleted, processing expiration [" +
126 serverName + "]");
127 ServerName sn = ServerName.parseServerName(serverName);
128 if (!serverManager.isServerOnline(sn)) {
129 LOG.warn(serverName.toString() + " is not online or isn't known to the master."+
130 "The latter could be caused by a DNS misconfiguration.");
131 return;
132 }
133 remove(sn);
134 this.serverManager.expireServer(sn);
135 }
136 }
137
138 @Override
139 public void nodeChildrenChanged(String path) {
140 if (path.equals(watcher.rsZNode)
141 && !server.isAborted() && !server.isStopped()) {
142 try {
143 List<String> servers =
144 ZKUtil.listChildrenAndWatchThem(watcher, watcher.rsZNode);
145 refresh(servers);
146 } catch (IOException e) {
147 server.abort("Unexpected zk exception getting RS nodes", e);
148 } catch (KeeperException e) {
149 server.abort("Unexpected zk exception getting RS nodes", e);
150 }
151 }
152 }
153
154 public RegionServerInfo getRegionServerInfo(final ServerName sn) {
155 return regionServers.get(sn);
156 }
157
158
159
160
161
162 public List<ServerName> getOnlineServers() {
163 synchronized (this.regionServers) {
164 return new ArrayList<ServerName>(this.regionServers.keySet());
165 }
166 }
167 }