1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import java.io.File;
22 import java.io.IOException;
23 import java.util.Map;
24
25 import org.apache.commons.lang.StringUtils;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.conf.Configured;
31 import org.apache.hadoop.hbase.HBaseClusterManager.CommandProvider.Operation;
32 import org.apache.hadoop.hbase.util.Pair;
33 import org.apache.hadoop.hbase.util.RetryCounter;
34 import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
35 import org.apache.hadoop.hbase.util.RetryCounterFactory;
36 import org.apache.hadoop.util.Shell;
37
38
39
40
41
42
43
44
45 @InterfaceAudience.Private
46 public class HBaseClusterManager extends Configured implements ClusterManager {
47 private static final String SIGKILL = "SIGKILL";
48 private static final String SIGSTOP = "SIGSTOP";
49 private static final String SIGCONT = "SIGCONT";
50
51 protected static final Log LOG = LogFactory.getLog(HBaseClusterManager.class);
52 private String sshUserName;
53 private String sshOptions;
54
55
56
57
58
59 private static final String DEFAULT_TUNNEL_CMD = "/usr/bin/ssh %1$s %2$s%3$s%4$s \"%5$s\"";
60 private String tunnelCmd;
61
62 private static final String RETRY_ATTEMPTS_KEY = "hbase.it.clustermanager.retry.attempts";
63 private static final int DEFAULT_RETRY_ATTEMPTS = 5;
64
65 private static final String RETRY_SLEEP_INTERVAL_KEY = "hbase.it.clustermanager.retry.sleep.interval";
66 private static final int DEFAULT_RETRY_SLEEP_INTERVAL = 1000;
67
68 protected RetryCounterFactory retryCounterFactory;
69
70 @Override
71 public void setConf(Configuration conf) {
72 super.setConf(conf);
73 if (conf == null) {
74
75 return;
76 }
77 sshUserName = conf.get("hbase.it.clustermanager.ssh.user", "");
78 String extraSshOptions = conf.get("hbase.it.clustermanager.ssh.opts", "");
79 sshOptions = System.getenv("HBASE_SSH_OPTS");
80 if (!extraSshOptions.isEmpty()) {
81 sshOptions = StringUtils.join(new Object[] { sshOptions, extraSshOptions }, " ");
82 }
83 sshOptions = (sshOptions == null) ? "" : sshOptions;
84 tunnelCmd = conf.get("hbase.it.clustermanager.ssh.cmd", DEFAULT_TUNNEL_CMD);
85
86 if ((sshUserName != null && sshUserName.length() > 0) ||
87 (sshOptions != null && sshOptions.length() > 0)) {
88 LOG.info("Running with SSH user [" + sshUserName + "] and options [" + sshOptions + "]");
89 }
90
91 this.retryCounterFactory = new RetryCounterFactory(new RetryConfig()
92 .setMaxAttempts(conf.getInt(RETRY_ATTEMPTS_KEY, DEFAULT_RETRY_ATTEMPTS))
93 .setSleepInterval(conf.getLong(RETRY_SLEEP_INTERVAL_KEY, DEFAULT_RETRY_SLEEP_INTERVAL)));
94 }
95
96
97
98
99 protected class RemoteShell extends Shell.ShellCommandExecutor {
100 private String hostname;
101
102 public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env,
103 long timeout) {
104 super(execString, dir, env, timeout);
105 this.hostname = hostname;
106 }
107
108 public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env) {
109 super(execString, dir, env);
110 this.hostname = hostname;
111 }
112
113 public RemoteShell(String hostname, String[] execString, File dir) {
114 super(execString, dir);
115 this.hostname = hostname;
116 }
117
118 public RemoteShell(String hostname, String[] execString) {
119 super(execString);
120 this.hostname = hostname;
121 }
122
123 @Override
124 public String[] getExecString() {
125 String at = sshUserName.isEmpty() ? "" : "@";
126 String remoteCmd = StringUtils.join(super.getExecString(), " ");
127 String cmd = String.format(tunnelCmd, sshOptions, sshUserName, at, hostname, remoteCmd);
128 LOG.info("Executing full command [" + cmd + "]");
129 return new String[] { "/usr/bin/env", "bash", "-c", cmd };
130 }
131
132 @Override
133 public void execute() throws IOException {
134 super.execute();
135 }
136 }
137
138
139
140
141
142
143 static abstract class CommandProvider {
144
145 enum Operation {
146 START, STOP, RESTART
147 }
148
149 public abstract String getCommand(ServiceType service, Operation op);
150
151 public String isRunningCommand(ServiceType service) {
152 return findPidCommand(service);
153 }
154
155 protected String findPidCommand(ServiceType service) {
156 return String.format("ps aux | grep proc_%s | grep -v grep | tr -s ' ' | cut -d ' ' -f2",
157 service);
158 }
159
160 public String signalCommand(ServiceType service, String signal) {
161 return String.format("%s | xargs kill -s %s", findPidCommand(service), signal);
162 }
163 }
164
165
166
167
168 static class HBaseShellCommandProvider extends CommandProvider {
169 private final String hbaseHome;
170 private final String confDir;
171
172 HBaseShellCommandProvider(Configuration conf) {
173 hbaseHome = conf.get("hbase.it.clustermanager.hbase.home",
174 System.getenv("HBASE_HOME"));
175 String tmp = conf.get("hbase.it.clustermanager.hbase.conf.dir",
176 System.getenv("HBASE_CONF_DIR"));
177 if (tmp != null) {
178 confDir = String.format("--config %s", tmp);
179 } else {
180 confDir = "";
181 }
182 }
183
184 @Override
185 public String getCommand(ServiceType service, Operation op) {
186 return String.format("%s/bin/hbase-daemon.sh %s %s %s", hbaseHome, confDir,
187 op.toString().toLowerCase(), service);
188 }
189 }
190
191 public HBaseClusterManager() {
192 }
193
194 protected CommandProvider getCommandProvider(ServiceType service) {
195
196
197 return new HBaseShellCommandProvider(getConf());
198 }
199
200
201
202
203
204
205 private Pair<Integer, String> exec(String hostname, String... cmd) throws IOException {
206 LOG.info("Executing remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname);
207
208 RemoteShell shell = new RemoteShell(hostname, cmd);
209 try {
210 shell.execute();
211 } catch (Shell.ExitCodeException ex) {
212
213 String output = shell.getOutput();
214
215 throw new Shell.ExitCodeException(ex.getExitCode(), "stderr: " + ex.getMessage()
216 + ", stdout: " + output);
217 }
218
219 LOG.info("Executed remote command, exit code:" + shell.getExitCode()
220 + " , output:" + shell.getOutput());
221
222 return new Pair<Integer, String>(shell.getExitCode(), shell.getOutput());
223 }
224
225 private Pair<Integer, String> execWithRetries(String hostname, String... cmd)
226 throws IOException {
227 RetryCounter retryCounter = retryCounterFactory.create();
228 while (true) {
229 try {
230 return exec(hostname, cmd);
231 } catch (IOException e) {
232 retryOrThrow(retryCounter, e, hostname, cmd);
233 }
234 try {
235 retryCounter.sleepUntilNextRetry();
236 } catch (InterruptedException ex) {
237
238 LOG.warn("Sleep Interrupted:" + ex);
239 }
240 }
241 }
242
243 private <E extends Exception> void retryOrThrow(RetryCounter retryCounter, E ex,
244 String hostname, String[] cmd) throws E {
245 if (retryCounter.shouldRetry()) {
246 LOG.warn("Remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname
247 + " failed at attempt " + retryCounter.getAttemptTimes() + ". Retrying until maxAttempts: "
248 + retryCounter.getMaxAttempts() + ". Exception: " + ex.getMessage());
249 return;
250 }
251 throw ex;
252 }
253
254 private void exec(String hostname, ServiceType service, Operation op) throws IOException {
255 execWithRetries(hostname, getCommandProvider(service).getCommand(service, op));
256 }
257
258 @Override
259 public void start(ServiceType service, String hostname, int port) throws IOException {
260 exec(hostname, service, Operation.START);
261 }
262
263 @Override
264 public void stop(ServiceType service, String hostname, int port) throws IOException {
265 exec(hostname, service, Operation.STOP);
266 }
267
268 @Override
269 public void restart(ServiceType service, String hostname, int port) throws IOException {
270 exec(hostname, service, Operation.RESTART);
271 }
272
273 public void signal(ServiceType service, String signal, String hostname) throws IOException {
274 execWithRetries(hostname, getCommandProvider(service).signalCommand(service, signal));
275 }
276
277 @Override
278 public String isRunning(ServiceType service, String hostname, int port) throws IOException {
279 String ret = execWithRetries(hostname, getCommandProvider(service).isRunningCommand(service))
280 .getSecond();
281 return ret;
282 }
283
284 @Override
285 public void kill(ServiceType service, String hostname, int port) throws IOException {
286 signal(service, SIGKILL, hostname);
287 }
288
289 @Override
290 public void suspend(ServiceType service, String hostname, int port) throws IOException {
291 signal(service, SIGSTOP, hostname);
292 }
293
294 @Override
295 public void resume(ServiceType service, String hostname, int port) throws IOException {
296 signal(service, SIGCONT, hostname);
297 }
298 }