View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.Comparator;
23  import java.util.List;
24  import java.util.Set;
25  import java.util.TreeSet;
26  
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.ClusterManager.ServiceType;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.client.Admin;
31  import org.apache.hadoop.hbase.client.ClusterConnection;
32  import org.apache.hadoop.hbase.client.Connection;
33  import org.apache.hadoop.hbase.client.ConnectionFactory;
34  import org.apache.hadoop.hbase.client.RegionLocator;
35  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
36  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
37  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
38  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
39  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
40  import org.apache.hadoop.hbase.util.Bytes;
41  import org.apache.hadoop.hbase.util.Threads;
42  
43  /**
44   * Manages the interactions with an already deployed distributed cluster (as opposed to
45   * a pseudo-distributed, or mini/local cluster). This is used by integration and system tests.
46   */
47  @InterfaceAudience.Private
48  public class DistributedHBaseCluster extends HBaseCluster {
49    private Admin admin;
50    private final Connection connection;
51  
52    private ClusterManager clusterManager;
53  
54    public DistributedHBaseCluster(Configuration conf, ClusterManager clusterManager)
55        throws IOException {
56      super(conf);
57      this.clusterManager = clusterManager;
58      this.connection = ConnectionFactory.createConnection(conf);
59      this.admin = this.connection.getAdmin();
60      this.initialClusterStatus = getClusterStatus();
61    }
62  
63    public void setClusterManager(ClusterManager clusterManager) {
64      this.clusterManager = clusterManager;
65    }
66  
67    public ClusterManager getClusterManager() {
68      return clusterManager;
69    }
70  
71    /**
72     * Returns a ClusterStatus for this HBase cluster
73     * @throws IOException
74     */
75    @Override
76    public ClusterStatus getClusterStatus() throws IOException {
77      return admin.getClusterStatus();
78    }
79  
80    @Override
81    public ClusterStatus getInitialClusterStatus() throws IOException {
82      return initialClusterStatus;
83    }
84  
85    @Override
86    public void close() throws IOException {
87      if (this.admin != null) {
88        admin.close();
89      }
90      if (this.connection != null && !this.connection.isClosed()) {
91        this.connection.close();
92      }
93    }
94  
95    @Override
96    public AdminProtos.AdminService.BlockingInterface getAdminProtocol(ServerName serverName)
97    throws IOException {
98      return ((ClusterConnection)this.connection).getAdmin(serverName);
99    }
100 
101   @Override
102   public ClientProtos.ClientService.BlockingInterface getClientProtocol(ServerName serverName)
103   throws IOException {
104     return ((ClusterConnection)this.connection).getClient(serverName);
105   }
106 
107   @Override
108   public void startRegionServer(String hostname, int port) throws IOException {
109     LOG.info("Starting RS on: " + hostname);
110     clusterManager.start(ServiceType.HBASE_REGIONSERVER, hostname, port);
111   }
112 
113   @Override
114   public void killRegionServer(ServerName serverName) throws IOException {
115     LOG.info("Aborting RS: " + serverName.getServerName());
116     clusterManager.kill(ServiceType.HBASE_REGIONSERVER,
117             serverName.getHostname(),
118             serverName.getPort());
119   }
120 
121   @Override
122   public void stopRegionServer(ServerName serverName) throws IOException {
123     LOG.info("Stopping RS: " + serverName.getServerName());
124     clusterManager.stop(ServiceType.HBASE_REGIONSERVER,
125             serverName.getHostname(),
126             serverName.getPort());
127   }
128 
129   @Override
130   public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException {
131     waitForServiceToStop(ServiceType.HBASE_REGIONSERVER, serverName, timeout);
132   }
133 
134   private void waitForServiceToStop(ServiceType service, ServerName serverName, long timeout)
135     throws IOException {
136     LOG.info("Waiting service:" + service + " to stop: " + serverName.getServerName());
137     long start = System.currentTimeMillis();
138 
139     String ret = "";
140     while ((System.currentTimeMillis() - start) < timeout) {
141       ret = clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort());
142       if (!(ret.length() > 0)) {
143         return;
144       }
145       Threads.sleep(1000);
146     }
147     throw new IOException("timeout waiting for service to stop:" + serverName + " " + ret);
148   }
149 
150   @Override
151   public MasterService.BlockingInterface getMasterAdminService()
152   throws IOException {
153     return ((ClusterConnection)this.connection).getMaster();
154   }
155 
156   @Override
157   public void startMaster(String hostname, int port) throws IOException {
158     LOG.info("Starting Master on: " + hostname + ":" + port);
159     clusterManager.start(ServiceType.HBASE_MASTER, hostname, port);
160   }
161 
162   @Override
163   public void killMaster(ServerName serverName) throws IOException {
164     LOG.info("Aborting Master: " + serverName.getServerName());
165     clusterManager.kill(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort());
166   }
167 
168   @Override
169   public void stopMaster(ServerName serverName) throws IOException {
170     LOG.info("Stopping Master: " + serverName.getServerName());
171     clusterManager.stop(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort());
172   }
173 
174   @Override
175   public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException {
176     waitForServiceToStop(ServiceType.HBASE_MASTER, serverName, timeout);
177   }
178 
179   @Override
180   public boolean waitForActiveAndReadyMaster(long timeout) throws IOException {
181     long start = System.currentTimeMillis();
182     while (System.currentTimeMillis() - start < timeout) {
183       try {
184         getMasterAdminService();
185         return true;
186       } catch (MasterNotRunningException m) {
187         LOG.warn("Master not started yet " + m);
188       } catch (ZooKeeperConnectionException e) {
189         LOG.warn("Failed to connect to ZK " + e);
190       }
191       Threads.sleep(1000);
192     }
193     return false;
194   }
195 
196   @Override
197   public ServerName getServerHoldingRegion(TableName tn, byte[] regionName) throws IOException {
198     HRegionLocation regionLoc = null;
199     try (RegionLocator locator = connection.getRegionLocator(tn)) {
200       regionLoc = locator.getRegionLocation(regionName);
201     }
202     if (regionLoc == null) {
203       LOG.warn("Cannot find region server holding region " + Bytes.toString(regionName) +
204         ", start key [" + Bytes.toString(HRegionInfo.getStartKey(regionName)) + "]");
205       return null;
206     }
207 
208     AdminProtos.AdminService.BlockingInterface client =
209         ((ClusterConnection)this.connection).getAdmin(regionLoc.getServerName());
210     ServerInfo info = ProtobufUtil.getServerInfo(null, client);
211     return ProtobufUtil.toServerName(info.getServerName());
212   }
213 
214   @Override
215   public void waitUntilShutDown() {
216     // Simply wait for a few seconds for now (after issuing serverManager.kill
217     throw new RuntimeException("Not implemented yet");
218   }
219 
220   @Override
221   public void shutdown() throws IOException {
222     // not sure we want this
223     throw new RuntimeException("Not implemented yet");
224   }
225 
226   @Override
227   public boolean isDistributedCluster() {
228     return true;
229   }
230 
231   @Override
232   public boolean restoreClusterStatus(ClusterStatus initial) throws IOException {
233     ClusterStatus current = getClusterStatus();
234 
235     LOG.info("Restoring cluster - started");
236 
237     // do a best effort restore
238     boolean success = true;
239     success = restoreMasters(initial, current) & success;
240     success = restoreRegionServers(initial, current) & success;
241     success = restoreAdmin() & success;
242 
243     LOG.info("Restoring cluster - done");
244     return success;
245   }
246 
247   protected boolean restoreMasters(ClusterStatus initial, ClusterStatus current) {
248     List<IOException> deferred = new ArrayList<IOException>();
249     //check whether current master has changed
250     final ServerName initMaster = initial.getMaster();
251     if (!ServerName.isSameHostnameAndPort(initMaster, current.getMaster())) {
252       LOG.info("Restoring cluster - Initial active master : "
253               + initMaster.getHostAndPort()
254               + " has changed to : "
255               + current.getMaster().getHostAndPort());
256       // If initial master is stopped, start it, before restoring the state.
257       // It will come up as a backup master, if there is already an active master.
258       try {
259         if (!(clusterManager.isRunning(ServiceType.HBASE_MASTER,
260                 initMaster.getHostname(), initMaster.getPort()).length() > 0)) {
261           LOG.info("Restoring cluster - starting initial active master at:"
262                   + initMaster.getHostAndPort());
263           startMaster(initMaster.getHostname(), initMaster.getPort());
264         }
265 
266         // master has changed, we would like to undo this.
267         // 1. Kill the current backups
268         // 2. Stop current master
269         // 3. Start backup masters
270         for (ServerName currentBackup : current.getBackupMasters()) {
271           if (!ServerName.isSameHostnameAndPort(currentBackup, initMaster)) {
272             LOG.info("Restoring cluster - stopping backup master: " + currentBackup);
273             stopMaster(currentBackup);
274           }
275         }
276         LOG.info("Restoring cluster - stopping active master: " + current.getMaster());
277         stopMaster(current.getMaster());
278         waitForActiveAndReadyMaster(); // wait so that active master takes over
279       } catch (IOException ex) {
280         // if we fail to start the initial active master, we do not want to continue stopping
281         // backup masters. Just keep what we have now
282         deferred.add(ex);
283       }
284 
285       //start backup masters
286       for (ServerName backup : initial.getBackupMasters()) {
287         try {
288           //these are not started in backup mode, but we should already have an active master
289           if (!(clusterManager.isRunning(ServiceType.HBASE_MASTER,
290                   backup.getHostname(),
291                   backup.getPort()).length() > 0)) {
292             LOG.info("Restoring cluster - starting initial backup master: "
293                     + backup.getHostAndPort());
294             startMaster(backup.getHostname(), backup.getPort());
295           }
296         } catch (IOException ex) {
297           deferred.add(ex);
298         }
299       }
300     } else {
301       //current master has not changed, match up backup masters
302       Set<ServerName> toStart = new TreeSet<ServerName>(new ServerNameIgnoreStartCodeComparator());
303       Set<ServerName> toKill = new TreeSet<ServerName>(new ServerNameIgnoreStartCodeComparator());
304       toStart.addAll(initial.getBackupMasters());
305       toKill.addAll(current.getBackupMasters());
306 
307       for (ServerName server : current.getBackupMasters()) {
308         toStart.remove(server);
309       }
310       for (ServerName server: initial.getBackupMasters()) {
311         toKill.remove(server);
312       }
313 
314       for (ServerName sn:toStart) {
315         try {
316           if(!(clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())
317               .length() > 0)) {
318             LOG.info("Restoring cluster - starting initial backup master: " + sn.getHostAndPort());
319             startMaster(sn.getHostname(), sn.getPort());
320           }
321         } catch (IOException ex) {
322           deferred.add(ex);
323         }
324       }
325 
326       for (ServerName sn:toKill) {
327         try {
328           if(clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())
329               .length() > 0) {
330             LOG.info("Restoring cluster - stopping backup master: " + sn.getHostAndPort());
331             stopMaster(sn);
332           }
333         } catch (IOException ex) {
334           deferred.add(ex);
335         }
336       }
337     }
338     if (!deferred.isEmpty()) {
339       LOG.warn("Restoring cluster - restoring region servers reported "
340               + deferred.size() + " errors:");
341       for (int i=0; i<deferred.size() && i < 3; i++) {
342         LOG.warn(deferred.get(i));
343       }
344     }
345 
346     return deferred.isEmpty();
347   }
348 
349 
350   private static class ServerNameIgnoreStartCodeComparator implements Comparator<ServerName> {
351     @Override
352     public int compare(ServerName o1, ServerName o2) {
353       int compare = o1.getHostname().compareToIgnoreCase(o2.getHostname());
354       if (compare != 0) return compare;
355       compare = o1.getPort() - o2.getPort();
356       if (compare != 0) return compare;
357       return 0;
358     }
359   }
360 
361   protected boolean restoreRegionServers(ClusterStatus initial, ClusterStatus current) {
362     Set<ServerName> toStart = new TreeSet<ServerName>(new ServerNameIgnoreStartCodeComparator());
363     Set<ServerName> toKill = new TreeSet<ServerName>(new ServerNameIgnoreStartCodeComparator());
364     toStart.addAll(initial.getServers());
365     toKill.addAll(current.getServers());
366 
367     for (ServerName server : current.getServers()) {
368       toStart.remove(server);
369     }
370     for (ServerName server: initial.getServers()) {
371       toKill.remove(server);
372     }
373 
374     List<IOException> deferred = new ArrayList<IOException>();
375 
376     for(ServerName sn:toStart) {
377       try {
378         if (!(clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER,
379                 sn.getHostname(),
380                 sn.getPort()).length() > 0)) {
381           LOG.info("Restoring cluster - starting initial region server: " + sn.getHostAndPort());
382           startRegionServer(sn.getHostname(), sn.getPort());
383         }
384       } catch (IOException ex) {
385         deferred.add(ex);
386       }
387     }
388 
389     for(ServerName sn:toKill) {
390       try {
391         if (clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER,
392                 sn.getHostname(),
393                 sn.getPort()).length() > 0) {
394           LOG.info("Restoring cluster - stopping initial region server: " + sn.getHostAndPort());
395           stopRegionServer(sn);
396         }
397       } catch (IOException ex) {
398         deferred.add(ex);
399       }
400     }
401     if (!deferred.isEmpty()) {
402       LOG.warn("Restoring cluster - restoring region servers reported "
403               + deferred.size() + " errors:");
404       for (int i=0; i<deferred.size() && i < 3; i++) {
405         LOG.warn(deferred.get(i));
406       }
407     }
408 
409     return deferred.isEmpty();
410   }
411 
412   protected boolean restoreAdmin() throws IOException {
413     // While restoring above, if the HBase Master which was initially the Active one, was down
414     // and the restore put the cluster back to Initial configuration, HAdmin instance will need
415     // to refresh its connections (otherwise it will return incorrect information) or we can
416     // point it to new instance.
417     try {
418       admin.close();
419     } catch (IOException ioe) {
420       LOG.warn("While closing the old connection", ioe);
421     }
422     this.admin = this.connection.getAdmin();
423     LOG.info("Added new HBaseAdmin");
424     return true;
425   }
426 }