1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.Comparator;
23 import java.util.List;
24 import java.util.Set;
25 import java.util.TreeSet;
26
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.ClusterManager.ServiceType;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.hbase.client.Admin;
31 import org.apache.hadoop.hbase.client.ClusterConnection;
32 import org.apache.hadoop.hbase.client.Connection;
33 import org.apache.hadoop.hbase.client.ConnectionFactory;
34 import org.apache.hadoop.hbase.client.RegionLocator;
35 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
36 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
37 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
38 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
39 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.apache.hadoop.hbase.util.Threads;
42
43
44
45
46
47 @InterfaceAudience.Private
48 public class DistributedHBaseCluster extends HBaseCluster {
49 private Admin admin;
50 private final Connection connection;
51
52 private ClusterManager clusterManager;
53
54 public DistributedHBaseCluster(Configuration conf, ClusterManager clusterManager)
55 throws IOException {
56 super(conf);
57 this.clusterManager = clusterManager;
58 this.connection = ConnectionFactory.createConnection(conf);
59 this.admin = this.connection.getAdmin();
60 this.initialClusterStatus = getClusterStatus();
61 }
62
63 public void setClusterManager(ClusterManager clusterManager) {
64 this.clusterManager = clusterManager;
65 }
66
67 public ClusterManager getClusterManager() {
68 return clusterManager;
69 }
70
71
72
73
74
75 @Override
76 public ClusterStatus getClusterStatus() throws IOException {
77 return admin.getClusterStatus();
78 }
79
80 @Override
81 public ClusterStatus getInitialClusterStatus() throws IOException {
82 return initialClusterStatus;
83 }
84
85 @Override
86 public void close() throws IOException {
87 if (this.admin != null) {
88 admin.close();
89 }
90 if (this.connection != null && !this.connection.isClosed()) {
91 this.connection.close();
92 }
93 }
94
95 @Override
96 public AdminProtos.AdminService.BlockingInterface getAdminProtocol(ServerName serverName)
97 throws IOException {
98 return ((ClusterConnection)this.connection).getAdmin(serverName);
99 }
100
101 @Override
102 public ClientProtos.ClientService.BlockingInterface getClientProtocol(ServerName serverName)
103 throws IOException {
104 return ((ClusterConnection)this.connection).getClient(serverName);
105 }
106
107 @Override
108 public void startRegionServer(String hostname, int port) throws IOException {
109 LOG.info("Starting RS on: " + hostname);
110 clusterManager.start(ServiceType.HBASE_REGIONSERVER, hostname, port);
111 }
112
113 @Override
114 public void killRegionServer(ServerName serverName) throws IOException {
115 LOG.info("Aborting RS: " + serverName.getServerName());
116 clusterManager.kill(ServiceType.HBASE_REGIONSERVER,
117 serverName.getHostname(),
118 serverName.getPort());
119 }
120
121 @Override
122 public void stopRegionServer(ServerName serverName) throws IOException {
123 LOG.info("Stopping RS: " + serverName.getServerName());
124 clusterManager.stop(ServiceType.HBASE_REGIONSERVER,
125 serverName.getHostname(),
126 serverName.getPort());
127 }
128
129 @Override
130 public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException {
131 waitForServiceToStop(ServiceType.HBASE_REGIONSERVER, serverName, timeout);
132 }
133
134 private void waitForServiceToStop(ServiceType service, ServerName serverName, long timeout)
135 throws IOException {
136 LOG.info("Waiting service:" + service + " to stop: " + serverName.getServerName());
137 long start = System.currentTimeMillis();
138
139 String ret = "";
140 while ((System.currentTimeMillis() - start) < timeout) {
141 ret = clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort());
142 if (!(ret.length() > 0)) {
143 return;
144 }
145 Threads.sleep(1000);
146 }
147 throw new IOException("timeout waiting for service to stop:" + serverName + " " + ret);
148 }
149
150 @Override
151 public MasterService.BlockingInterface getMasterAdminService()
152 throws IOException {
153 return ((ClusterConnection)this.connection).getMaster();
154 }
155
156 @Override
157 public void startMaster(String hostname, int port) throws IOException {
158 LOG.info("Starting Master on: " + hostname + ":" + port);
159 clusterManager.start(ServiceType.HBASE_MASTER, hostname, port);
160 }
161
162 @Override
163 public void killMaster(ServerName serverName) throws IOException {
164 LOG.info("Aborting Master: " + serverName.getServerName());
165 clusterManager.kill(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort());
166 }
167
168 @Override
169 public void stopMaster(ServerName serverName) throws IOException {
170 LOG.info("Stopping Master: " + serverName.getServerName());
171 clusterManager.stop(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort());
172 }
173
174 @Override
175 public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException {
176 waitForServiceToStop(ServiceType.HBASE_MASTER, serverName, timeout);
177 }
178
179 @Override
180 public boolean waitForActiveAndReadyMaster(long timeout) throws IOException {
181 long start = System.currentTimeMillis();
182 while (System.currentTimeMillis() - start < timeout) {
183 try {
184 getMasterAdminService();
185 return true;
186 } catch (MasterNotRunningException m) {
187 LOG.warn("Master not started yet " + m);
188 } catch (ZooKeeperConnectionException e) {
189 LOG.warn("Failed to connect to ZK " + e);
190 }
191 Threads.sleep(1000);
192 }
193 return false;
194 }
195
196 @Override
197 public ServerName getServerHoldingRegion(TableName tn, byte[] regionName) throws IOException {
198 HRegionLocation regionLoc = null;
199 try (RegionLocator locator = connection.getRegionLocator(tn)) {
200 regionLoc = locator.getRegionLocation(regionName);
201 }
202 if (regionLoc == null) {
203 LOG.warn("Cannot find region server holding region " + Bytes.toString(regionName) +
204 ", start key [" + Bytes.toString(HRegionInfo.getStartKey(regionName)) + "]");
205 return null;
206 }
207
208 AdminProtos.AdminService.BlockingInterface client =
209 ((ClusterConnection)this.connection).getAdmin(regionLoc.getServerName());
210 ServerInfo info = ProtobufUtil.getServerInfo(null, client);
211 return ProtobufUtil.toServerName(info.getServerName());
212 }
213
214 @Override
215 public void waitUntilShutDown() {
216
217 throw new RuntimeException("Not implemented yet");
218 }
219
220 @Override
221 public void shutdown() throws IOException {
222
223 throw new RuntimeException("Not implemented yet");
224 }
225
226 @Override
227 public boolean isDistributedCluster() {
228 return true;
229 }
230
231 @Override
232 public boolean restoreClusterStatus(ClusterStatus initial) throws IOException {
233 ClusterStatus current = getClusterStatus();
234
235 LOG.info("Restoring cluster - started");
236
237
238 boolean success = true;
239 success = restoreMasters(initial, current) & success;
240 success = restoreRegionServers(initial, current) & success;
241 success = restoreAdmin() & success;
242
243 LOG.info("Restoring cluster - done");
244 return success;
245 }
246
247 protected boolean restoreMasters(ClusterStatus initial, ClusterStatus current) {
248 List<IOException> deferred = new ArrayList<IOException>();
249
250 final ServerName initMaster = initial.getMaster();
251 if (!ServerName.isSameHostnameAndPort(initMaster, current.getMaster())) {
252 LOG.info("Restoring cluster - Initial active master : "
253 + initMaster.getHostAndPort()
254 + " has changed to : "
255 + current.getMaster().getHostAndPort());
256
257
258 try {
259 if (!(clusterManager.isRunning(ServiceType.HBASE_MASTER,
260 initMaster.getHostname(), initMaster.getPort()).length() > 0)) {
261 LOG.info("Restoring cluster - starting initial active master at:"
262 + initMaster.getHostAndPort());
263 startMaster(initMaster.getHostname(), initMaster.getPort());
264 }
265
266
267
268
269
270 for (ServerName currentBackup : current.getBackupMasters()) {
271 if (!ServerName.isSameHostnameAndPort(currentBackup, initMaster)) {
272 LOG.info("Restoring cluster - stopping backup master: " + currentBackup);
273 stopMaster(currentBackup);
274 }
275 }
276 LOG.info("Restoring cluster - stopping active master: " + current.getMaster());
277 stopMaster(current.getMaster());
278 waitForActiveAndReadyMaster();
279 } catch (IOException ex) {
280
281
282 deferred.add(ex);
283 }
284
285
286 for (ServerName backup : initial.getBackupMasters()) {
287 try {
288
289 if (!(clusterManager.isRunning(ServiceType.HBASE_MASTER,
290 backup.getHostname(),
291 backup.getPort()).length() > 0)) {
292 LOG.info("Restoring cluster - starting initial backup master: "
293 + backup.getHostAndPort());
294 startMaster(backup.getHostname(), backup.getPort());
295 }
296 } catch (IOException ex) {
297 deferred.add(ex);
298 }
299 }
300 } else {
301
302 Set<ServerName> toStart = new TreeSet<ServerName>(new ServerNameIgnoreStartCodeComparator());
303 Set<ServerName> toKill = new TreeSet<ServerName>(new ServerNameIgnoreStartCodeComparator());
304 toStart.addAll(initial.getBackupMasters());
305 toKill.addAll(current.getBackupMasters());
306
307 for (ServerName server : current.getBackupMasters()) {
308 toStart.remove(server);
309 }
310 for (ServerName server: initial.getBackupMasters()) {
311 toKill.remove(server);
312 }
313
314 for (ServerName sn:toStart) {
315 try {
316 if(!(clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())
317 .length() > 0)) {
318 LOG.info("Restoring cluster - starting initial backup master: " + sn.getHostAndPort());
319 startMaster(sn.getHostname(), sn.getPort());
320 }
321 } catch (IOException ex) {
322 deferred.add(ex);
323 }
324 }
325
326 for (ServerName sn:toKill) {
327 try {
328 if(clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())
329 .length() > 0) {
330 LOG.info("Restoring cluster - stopping backup master: " + sn.getHostAndPort());
331 stopMaster(sn);
332 }
333 } catch (IOException ex) {
334 deferred.add(ex);
335 }
336 }
337 }
338 if (!deferred.isEmpty()) {
339 LOG.warn("Restoring cluster - restoring region servers reported "
340 + deferred.size() + " errors:");
341 for (int i=0; i<deferred.size() && i < 3; i++) {
342 LOG.warn(deferred.get(i));
343 }
344 }
345
346 return deferred.isEmpty();
347 }
348
349
350 private static class ServerNameIgnoreStartCodeComparator implements Comparator<ServerName> {
351 @Override
352 public int compare(ServerName o1, ServerName o2) {
353 int compare = o1.getHostname().compareToIgnoreCase(o2.getHostname());
354 if (compare != 0) return compare;
355 compare = o1.getPort() - o2.getPort();
356 if (compare != 0) return compare;
357 return 0;
358 }
359 }
360
361 protected boolean restoreRegionServers(ClusterStatus initial, ClusterStatus current) {
362 Set<ServerName> toStart = new TreeSet<ServerName>(new ServerNameIgnoreStartCodeComparator());
363 Set<ServerName> toKill = new TreeSet<ServerName>(new ServerNameIgnoreStartCodeComparator());
364 toStart.addAll(initial.getServers());
365 toKill.addAll(current.getServers());
366
367 for (ServerName server : current.getServers()) {
368 toStart.remove(server);
369 }
370 for (ServerName server: initial.getServers()) {
371 toKill.remove(server);
372 }
373
374 List<IOException> deferred = new ArrayList<IOException>();
375
376 for(ServerName sn:toStart) {
377 try {
378 if (!(clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER,
379 sn.getHostname(),
380 sn.getPort()).length() > 0)) {
381 LOG.info("Restoring cluster - starting initial region server: " + sn.getHostAndPort());
382 startRegionServer(sn.getHostname(), sn.getPort());
383 }
384 } catch (IOException ex) {
385 deferred.add(ex);
386 }
387 }
388
389 for(ServerName sn:toKill) {
390 try {
391 if (clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER,
392 sn.getHostname(),
393 sn.getPort()).length() > 0) {
394 LOG.info("Restoring cluster - stopping initial region server: " + sn.getHostAndPort());
395 stopRegionServer(sn);
396 }
397 } catch (IOException ex) {
398 deferred.add(ex);
399 }
400 }
401 if (!deferred.isEmpty()) {
402 LOG.warn("Restoring cluster - restoring region servers reported "
403 + deferred.size() + " errors:");
404 for (int i=0; i<deferred.size() && i < 3; i++) {
405 LOG.warn(deferred.get(i));
406 }
407 }
408
409 return deferred.isEmpty();
410 }
411
412 protected boolean restoreAdmin() throws IOException {
413
414
415
416
417 try {
418 admin.close();
419 } catch (IOException ioe) {
420 LOG.warn("While closing the old connection", ioe);
421 }
422 this.admin = this.connection.getAdmin();
423 LOG.info("Added new HBaseAdmin");
424 return true;
425 }
426 }