1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21 import java.io.InterruptedIOException;
22 import java.io.IOException;
23 import java.lang.reflect.Constructor;
24 import java.lang.reflect.InvocationTargetException;
25 import java.util.List;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.CoordinatedStateManager;
32 import org.apache.hadoop.hbase.master.HMaster;
33 import org.apache.hadoop.hbase.regionserver.HRegionServer;
34
35
36
37
38 @InterfaceAudience.Private
39 public class JVMClusterUtil {
40 private static final Log LOG = LogFactory.getLog(JVMClusterUtil.class);
41
42
43
44
45 public static class RegionServerThread extends Thread {
46 private final HRegionServer regionServer;
47
48 public RegionServerThread(final HRegionServer r, final int index) {
49 super(r, "RS:" + index + ";" + r.getServerName().toShortString());
50 this.regionServer = r;
51 }
52
53
54 public HRegionServer getRegionServer() {
55 return this.regionServer;
56 }
57
58
59
60
61
62 public void waitForServerOnline() {
63
64
65
66
67 regionServer.waitForServerOnline();
68 }
69 }
70
71
72
73
74
75
76
77
78
79
80
81 public static JVMClusterUtil.RegionServerThread createRegionServerThread(
82 final Configuration c, CoordinatedStateManager cp, final Class<? extends HRegionServer> hrsc,
83 final int index)
84 throws IOException {
85 HRegionServer server;
86 try {
87
88 Constructor<? extends HRegionServer> ctor = hrsc.getConstructor(Configuration.class,
89 CoordinatedStateManager.class);
90 ctor.setAccessible(true);
91 server = ctor.newInstance(c, cp);
92 } catch (InvocationTargetException ite) {
93 Throwable target = ite.getTargetException();
94 throw new RuntimeException("Failed construction of RegionServer: " +
95 hrsc.toString() + ((target.getCause() != null)?
96 target.getCause().getMessage(): ""), target);
97 } catch (Exception e) {
98 IOException ioe = new IOException();
99 ioe.initCause(e);
100 throw ioe;
101 }
102 return new JVMClusterUtil.RegionServerThread(server, index);
103 }
104
105
106
107
108
109 public static class MasterThread extends Thread {
110 private final HMaster master;
111
112 public MasterThread(final HMaster m, final int index) {
113 super(m, "M:" + index + ";" + m.getServerName().toShortString());
114 this.master = m;
115 }
116
117
118 public HMaster getMaster() {
119 return this.master;
120 }
121 }
122
123
124
125
126
127
128
129
130
131
132
133 public static JVMClusterUtil.MasterThread createMasterThread(
134 final Configuration c, CoordinatedStateManager cp, final Class<? extends HMaster> hmc,
135 final int index)
136 throws IOException {
137 HMaster server;
138 try {
139 server = hmc.getConstructor(Configuration.class, CoordinatedStateManager.class).
140 newInstance(c, cp);
141 } catch (InvocationTargetException ite) {
142 Throwable target = ite.getTargetException();
143 throw new RuntimeException("Failed construction of Master: " +
144 hmc.toString() + ((target.getCause() != null)?
145 target.getCause().getMessage(): ""), target);
146 } catch (Exception e) {
147 IOException ioe = new IOException();
148 ioe.initCause(e);
149 throw ioe;
150 }
151 return new JVMClusterUtil.MasterThread(server, index);
152 }
153
154 private static JVMClusterUtil.MasterThread findActiveMaster(
155 List<JVMClusterUtil.MasterThread> masters) {
156 for (JVMClusterUtil.MasterThread t : masters) {
157 if (t.master.isActiveMaster()) {
158 return t;
159 }
160 }
161
162 return null;
163 }
164
165
166
167
168
169
170
171
172 public static String startup(final List<JVMClusterUtil.MasterThread> masters,
173 final List<JVMClusterUtil.RegionServerThread> regionservers) throws IOException {
174
175 Configuration configuration = null;
176
177 if (masters == null || masters.isEmpty()) {
178 return null;
179 }
180
181 for (JVMClusterUtil.MasterThread t : masters) {
182 configuration = t.getMaster().getConfiguration();
183 t.start();
184 }
185
186
187
188
189 long startTime = System.currentTimeMillis();
190 while (findActiveMaster(masters) == null) {
191 try {
192 Thread.sleep(100);
193 } catch (InterruptedException e) {
194 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
195 }
196 int startTimeout = configuration != null ? Integer.parseInt(
197 configuration.get("hbase.master.start.timeout.localHBaseCluster", "30000")) : 30000;
198 if (System.currentTimeMillis() > startTime + startTimeout) {
199 throw new RuntimeException(String.format("Master not active after %s seconds", startTimeout));
200 }
201 }
202
203 if (regionservers != null) {
204 for (JVMClusterUtil.RegionServerThread t: regionservers) {
205 t.start();
206 }
207 }
208
209
210
211 startTime = System.currentTimeMillis();
212 final int maxwait = 200000;
213 while (true) {
214 JVMClusterUtil.MasterThread t = findActiveMaster(masters);
215 if (t != null && t.master.isInitialized()) {
216 return t.master.getServerName().toString();
217 }
218
219 if (System.currentTimeMillis() > startTime + 10000) {
220 try {
221 Thread.sleep(1000);
222 } catch (InterruptedException e) {
223 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
224 }
225 }
226 if (System.currentTimeMillis() > startTime + maxwait) {
227 String msg = "Master not initialized after " + maxwait + "ms seconds";
228 Threads.printThreadInfo(System.out,
229 "Thread dump because: " + msg);
230 throw new RuntimeException(msg);
231 }
232 try {
233 Thread.sleep(100);
234 } catch (InterruptedException e) {
235 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
236 }
237 }
238 }
239
240
241
242
243
244 public static void shutdown(final List<MasterThread> masters,
245 final List<RegionServerThread> regionservers) {
246 LOG.debug("Shutting down HBase Cluster");
247 if (masters != null) {
248
249 JVMClusterUtil.MasterThread activeMaster = null;
250 for (JVMClusterUtil.MasterThread t : masters) {
251 if (!t.master.isActiveMaster()) {
252 try {
253 t.master.stopMaster();
254 } catch (IOException e) {
255 LOG.error("Exception occurred while stopping master", e);
256 }
257 } else {
258 activeMaster = t;
259 }
260 }
261
262 if (activeMaster != null) {
263 try {
264 activeMaster.master.shutdown();
265 } catch (IOException e) {
266 LOG.error("Exception occurred in HMaster.shutdown()", e);
267 }
268 }
269
270 }
271 boolean wasInterrupted = false;
272 final long maxTime = System.currentTimeMillis() + 30 * 1000;
273 if (regionservers != null) {
274
275 for (RegionServerThread t : regionservers) {
276 t.getRegionServer().stop("Shutdown requested");
277 }
278 for (RegionServerThread t : regionservers) {
279 long now = System.currentTimeMillis();
280 if (t.isAlive() && !wasInterrupted && now < maxTime) {
281 try {
282 t.join(maxTime - now);
283 } catch (InterruptedException e) {
284 LOG.info("Got InterruptedException on shutdown - " +
285 "not waiting anymore on region server ends", e);
286 wasInterrupted = true;
287 }
288 }
289 }
290
291
292 for (int i = 0; i < 100; ++i) {
293 boolean atLeastOneLiveServer = false;
294 for (RegionServerThread t : regionservers) {
295 if (t.isAlive()) {
296 atLeastOneLiveServer = true;
297 try {
298 LOG.warn("RegionServerThreads remaining, give one more chance before interrupting");
299 t.join(1000);
300 } catch (InterruptedException e) {
301 wasInterrupted = true;
302 }
303 }
304 }
305 if (!atLeastOneLiveServer) break;
306 for (RegionServerThread t : regionservers) {
307 if (t.isAlive()) {
308 LOG.warn("RegionServerThreads taking too long to stop, interrupting");
309 t.interrupt();
310 }
311 }
312 }
313 }
314
315 if (masters != null) {
316 for (JVMClusterUtil.MasterThread t : masters) {
317 while (t.master.isAlive() && !wasInterrupted) {
318 try {
319
320
321
322 Threads.threadDumpingIsAlive(t.master.getThread());
323 } catch(InterruptedException e) {
324 LOG.info("Got InterruptedException on shutdown - " +
325 "not waiting anymore on master ends", e);
326 wasInterrupted = true;
327 }
328 }
329 }
330 }
331 LOG.info("Shutdown of " +
332 ((masters != null) ? masters.size() : "0") + " master(s) and " +
333 ((regionservers != null) ? regionservers.size() : "0") +
334 " regionserver(s) " + (wasInterrupted ? "interrupted" : "complete"));
335
336 if (wasInterrupted){
337 Thread.currentThread().interrupt();
338 }
339 }
340 }