1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import java.io.IOException;
22 import java.security.PrivilegedExceptionAction;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.hbase.classification.InterfaceStability;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.hbase.client.Admin;
33 import org.apache.hadoop.hbase.client.HBaseAdmin;
34 import org.apache.hadoop.hbase.regionserver.HRegionServer;
35 import org.apache.hadoop.hbase.security.User;
36 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
37 import org.apache.hadoop.hbase.util.Threads;
38
39 import java.util.concurrent.CopyOnWriteArrayList;
40 import org.apache.hadoop.hbase.master.HMaster;
41 import org.apache.hadoop.hbase.util.JVMClusterUtil;
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 @InterfaceAudience.Public
60 @InterfaceStability.Evolving
61 public class LocalHBaseCluster {
62 static final Log LOG = LogFactory.getLog(LocalHBaseCluster.class);
63 private final List<JVMClusterUtil.MasterThread> masterThreads =
64 new CopyOnWriteArrayList<JVMClusterUtil.MasterThread>();
65 private final List<JVMClusterUtil.RegionServerThread> regionThreads =
66 new CopyOnWriteArrayList<JVMClusterUtil.RegionServerThread>();
67 private final static int DEFAULT_NO = 1;
68
69 public static final String LOCAL = "local";
70
71 public static final String LOCAL_COLON = LOCAL + ":";
72 private final Configuration conf;
73 private final Class<? extends HMaster> masterClass;
74 private final Class<? extends HRegionServer> regionServerClass;
75
76
77
78
79
80
81 public LocalHBaseCluster(final Configuration conf)
82 throws IOException {
83 this(conf, DEFAULT_NO);
84 }
85
86
87
88
89
90
91
92
93 public LocalHBaseCluster(final Configuration conf, final int noRegionServers)
94 throws IOException {
95 this(conf, 1, noRegionServers, getMasterImplementation(conf),
96 getRegionServerImplementation(conf));
97 }
98
99
100
101
102
103
104
105
106
107 public LocalHBaseCluster(final Configuration conf, final int noMasters,
108 final int noRegionServers)
109 throws IOException {
110 this(conf, noMasters, noRegionServers, getMasterImplementation(conf),
111 getRegionServerImplementation(conf));
112 }
113
114 @SuppressWarnings("unchecked")
115 private static Class<? extends HRegionServer> getRegionServerImplementation(final Configuration conf) {
116 return (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
117 HRegionServer.class);
118 }
119
120 @SuppressWarnings("unchecked")
121 private static Class<? extends HMaster> getMasterImplementation(final Configuration conf) {
122 return (Class<? extends HMaster>)conf.getClass(HConstants.MASTER_IMPL,
123 HMaster.class);
124 }
125
126
127
128
129
130
131
132
133
134
135
136 @SuppressWarnings("unchecked")
137 public LocalHBaseCluster(final Configuration conf, final int noMasters,
138 final int noRegionServers, final Class<? extends HMaster> masterClass,
139 final Class<? extends HRegionServer> regionServerClass)
140 throws IOException {
141 this.conf = conf;
142
143
144
145 conf.set(HConstants.MASTER_PORT, "0");
146 conf.set(HConstants.REGIONSERVER_PORT, "0");
147 conf.set(HConstants.REGIONSERVER_INFO_PORT, "0");
148
149 this.masterClass = (Class<? extends HMaster>)
150 conf.getClass(HConstants.MASTER_IMPL, masterClass);
151
152 for (int i = 0; i < noMasters; i++) {
153 addMaster(new Configuration(conf), i);
154 }
155
156 this.regionServerClass =
157 (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
158 regionServerClass);
159
160 for (int i = 0; i < noRegionServers; i++) {
161 addRegionServer(new Configuration(conf), i);
162 }
163 }
164
165 public JVMClusterUtil.RegionServerThread addRegionServer()
166 throws IOException {
167 return addRegionServer(new Configuration(conf), this.regionThreads.size());
168 }
169
170 @SuppressWarnings("unchecked")
171 public JVMClusterUtil.RegionServerThread addRegionServer(
172 Configuration config, final int index)
173 throws IOException {
174
175
176
177
178
179
180
181 CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
182
183 JVMClusterUtil.RegionServerThread rst =
184 JVMClusterUtil.createRegionServerThread(config, cp, (Class<? extends HRegionServer>) conf
185 .getClass(HConstants.REGION_SERVER_IMPL, this.regionServerClass), index);
186
187 this.regionThreads.add(rst);
188 return rst;
189 }
190
191 public JVMClusterUtil.RegionServerThread addRegionServer(
192 final Configuration config, final int index, User user)
193 throws IOException, InterruptedException {
194 return user.runAs(
195 new PrivilegedExceptionAction<JVMClusterUtil.RegionServerThread>() {
196 public JVMClusterUtil.RegionServerThread run() throws Exception {
197 return addRegionServer(config, index);
198 }
199 });
200 }
201
202 public JVMClusterUtil.MasterThread addMaster() throws IOException {
203 return addMaster(new Configuration(conf), this.masterThreads.size());
204 }
205
206 public JVMClusterUtil.MasterThread addMaster(Configuration c, final int index)
207 throws IOException {
208
209
210
211
212
213
214
215 CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
216
217 JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c, cp,
218 (Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, this.masterClass), index);
219 this.masterThreads.add(mt);
220 return mt;
221 }
222
223 public JVMClusterUtil.MasterThread addMaster(
224 final Configuration c, final int index, User user)
225 throws IOException, InterruptedException {
226 return user.runAs(
227 new PrivilegedExceptionAction<JVMClusterUtil.MasterThread>() {
228 public JVMClusterUtil.MasterThread run() throws Exception {
229 return addMaster(c, index);
230 }
231 });
232 }
233
234
235
236
237
238 public HRegionServer getRegionServer(int serverNumber) {
239 return regionThreads.get(serverNumber).getRegionServer();
240 }
241
242
243
244
245 public List<JVMClusterUtil.RegionServerThread> getRegionServers() {
246 return Collections.unmodifiableList(this.regionThreads);
247 }
248
249
250
251
252
253
254 public List<JVMClusterUtil.RegionServerThread> getLiveRegionServers() {
255 List<JVMClusterUtil.RegionServerThread> liveServers =
256 new ArrayList<JVMClusterUtil.RegionServerThread>();
257 List<RegionServerThread> list = getRegionServers();
258 for (JVMClusterUtil.RegionServerThread rst: list) {
259 if (rst.isAlive()) liveServers.add(rst);
260 else LOG.info("Not alive " + rst.getName());
261 }
262 return liveServers;
263 }
264
265
266
267
268 public Configuration getConfiguration() {
269 return this.conf;
270 }
271
272
273
274
275
276
277
278 public String waitOnRegionServer(int serverNumber) {
279 JVMClusterUtil.RegionServerThread regionServerThread =
280 this.regionThreads.remove(serverNumber);
281 while (regionServerThread.isAlive()) {
282 try {
283 LOG.info("Waiting on " +
284 regionServerThread.getRegionServer().toString());
285 regionServerThread.join();
286 } catch (InterruptedException e) {
287 e.printStackTrace();
288 }
289 }
290 return regionServerThread.getName();
291 }
292
293
294
295
296
297
298
299 public String waitOnRegionServer(JVMClusterUtil.RegionServerThread rst) {
300 while (rst.isAlive()) {
301 try {
302 LOG.info("Waiting on " +
303 rst.getRegionServer().toString());
304 rst.join();
305 } catch (InterruptedException e) {
306 e.printStackTrace();
307 }
308 }
309 for (int i=0;i<regionThreads.size();i++) {
310 if (regionThreads.get(i) == rst) {
311 regionThreads.remove(i);
312 break;
313 }
314 }
315 return rst.getName();
316 }
317
318
319
320
321
322 public HMaster getMaster(int serverNumber) {
323 return masterThreads.get(serverNumber).getMaster();
324 }
325
326
327
328
329
330
331 public HMaster getActiveMaster() {
332 for (JVMClusterUtil.MasterThread mt : masterThreads) {
333 if (mt.getMaster().isActiveMaster()) {
334
335
336 if (mt.getMaster().isActiveMaster() && !mt.getMaster().isStopped()) {
337 return mt.getMaster();
338 }
339 }
340 }
341 return null;
342 }
343
344
345
346
347 public List<JVMClusterUtil.MasterThread> getMasters() {
348 return Collections.unmodifiableList(this.masterThreads);
349 }
350
351
352
353
354
355
356 public List<JVMClusterUtil.MasterThread> getLiveMasters() {
357 List<JVMClusterUtil.MasterThread> liveServers =
358 new ArrayList<JVMClusterUtil.MasterThread>();
359 List<JVMClusterUtil.MasterThread> list = getMasters();
360 for (JVMClusterUtil.MasterThread mt: list) {
361 if (mt.isAlive()) {
362 liveServers.add(mt);
363 }
364 }
365 return liveServers;
366 }
367
368
369
370
371
372
373
374 public String waitOnMaster(int serverNumber) {
375 JVMClusterUtil.MasterThread masterThread = this.masterThreads.remove(serverNumber);
376 while (masterThread.isAlive()) {
377 try {
378 LOG.info("Waiting on " + masterThread.getMaster().getServerName().toString());
379 masterThread.join();
380 } catch (InterruptedException e) {
381 e.printStackTrace();
382 }
383 }
384 return masterThread.getName();
385 }
386
387
388
389
390
391
392
393 public String waitOnMaster(JVMClusterUtil.MasterThread masterThread) {
394 while (masterThread.isAlive()) {
395 try {
396 LOG.info("Waiting on " +
397 masterThread.getMaster().getServerName().toString());
398 masterThread.join();
399 } catch (InterruptedException e) {
400 e.printStackTrace();
401 }
402 }
403 for (int i=0;i<masterThreads.size();i++) {
404 if (masterThreads.get(i) == masterThread) {
405 masterThreads.remove(i);
406 break;
407 }
408 }
409 return masterThread.getName();
410 }
411
412
413
414
415
416 public void join() {
417 if (this.regionThreads != null) {
418 for(Thread t: this.regionThreads) {
419 if (t.isAlive()) {
420 try {
421 Threads.threadDumpingIsAlive(t);
422 } catch (InterruptedException e) {
423 LOG.debug("Interrupted", e);
424 }
425 }
426 }
427 }
428 if (this.masterThreads != null) {
429 for (Thread t : this.masterThreads) {
430 if (t.isAlive()) {
431 try {
432 Threads.threadDumpingIsAlive(t);
433 } catch (InterruptedException e) {
434 LOG.debug("Interrupted", e);
435 }
436 }
437 }
438 }
439 }
440
441
442
443
444 public void startup() throws IOException {
445 JVMClusterUtil.startup(this.masterThreads, this.regionThreads);
446 }
447
448
449
450
451 public void shutdown() {
452 JVMClusterUtil.shutdown(this.masterThreads, this.regionThreads);
453 }
454
455
456
457
458
459 public static boolean isLocal(final Configuration c) {
460 boolean mode = c.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED);
461 return(mode == HConstants.CLUSTER_IS_LOCAL);
462 }
463
464
465
466
467
468
469 public static void main(String[] args) throws IOException {
470 Configuration conf = HBaseConfiguration.create();
471 LocalHBaseCluster cluster = new LocalHBaseCluster(conf);
472 cluster.startup();
473 Admin admin = new HBaseAdmin(conf);
474 try {
475 HTableDescriptor htd =
476 new HTableDescriptor(TableName.valueOf(cluster.getClass().getName()));
477 admin.createTable(htd);
478 } finally {
479 admin.close();
480 }
481 cluster.shutdown();
482 }
483 }