1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import java.io.IOException;
22 import java.security.PrivilegedAction;
23 import java.util.ArrayList;
24 import java.util.List;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.hbase.classification.InterfaceStability;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.FileSystem;
32 import org.apache.hadoop.hbase.master.HMaster;
33 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
34 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
35 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
36 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
37 import org.apache.hadoop.hbase.regionserver.HRegion;
38 import org.apache.hadoop.hbase.regionserver.HRegionServer;
39 import org.apache.hadoop.hbase.regionserver.Region;
40 import org.apache.hadoop.hbase.security.User;
41 import org.apache.hadoop.hbase.test.MetricsAssertHelper;
42 import org.apache.hadoop.hbase.util.JVMClusterUtil;
43 import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
44 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
45 import org.apache.hadoop.hbase.util.Threads;
46
47
48
49
50
51
52
53 @InterfaceAudience.Public
54 @InterfaceStability.Evolving
55 public class MiniHBaseCluster extends HBaseCluster {
56 static final Log LOG = LogFactory.getLog(MiniHBaseCluster.class.getName());
57 public LocalHBaseCluster hbaseCluster;
58 private static int index;
59
60
61
62
63
64
65
66 public MiniHBaseCluster(Configuration conf, int numRegionServers)
67 throws IOException, InterruptedException {
68 this(conf, 1, numRegionServers);
69 }
70
71
72
73
74
75
76
77
78 public MiniHBaseCluster(Configuration conf, int numMasters,
79 int numRegionServers)
80 throws IOException, InterruptedException {
81 this(conf, numMasters, numRegionServers, null, null);
82 }
83
84 public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers,
85 Class<? extends HMaster> masterClass,
86 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
87 throws IOException, InterruptedException {
88 super(conf);
89 conf.set(HConstants.MASTER_PORT, "0");
90
91
92 CompatibilityFactory.getInstance(MetricsAssertHelper.class).init();
93
94 init(numMasters, numRegionServers, masterClass, regionserverClass);
95 this.initialClusterStatus = getClusterStatus();
96 }
97
98 public Configuration getConfiguration() {
99 return this.conf;
100 }
101
102
103
104
105
106
107
108 public static class MiniHBaseClusterRegionServer extends HRegionServer {
109 private Thread shutdownThread = null;
110 private User user = null;
111 public static boolean TEST_SKIP_CLOSE = false;
112
113 public MiniHBaseClusterRegionServer(Configuration conf, CoordinatedStateManager cp)
114 throws IOException, InterruptedException {
115 super(conf, cp);
116 this.user = User.getCurrent();
117 }
118
119
120
121
122
123
124
125
126
127 @Override
128 protected void handleReportForDutyResponse(
129 final RegionServerStartupResponse c) throws IOException {
130 super.handleReportForDutyResponse(c);
131
132 this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem());
133 }
134
135 @Override
136 public void run() {
137 try {
138 this.user.runAs(new PrivilegedAction<Object>(){
139 public Object run() {
140 runRegionServer();
141 return null;
142 }
143 });
144 } catch (Throwable t) {
145 LOG.error("Exception in run", t);
146 } finally {
147
148 if (this.shutdownThread != null) {
149 this.shutdownThread.start();
150 Threads.shutdown(this.shutdownThread, 30000);
151 }
152 }
153 }
154
155 private void runRegionServer() {
156 super.run();
157 }
158
159 @Override
160 public void kill() {
161 super.kill();
162 }
163
164 public void abort(final String reason, final Throwable cause) {
165 this.user.runAs(new PrivilegedAction<Object>() {
166 public Object run() {
167 abortRegionServer(reason, cause);
168 return null;
169 }
170 });
171 }
172
173 private void abortRegionServer(String reason, Throwable cause) {
174 super.abort(reason, cause);
175 }
176 }
177
178
179
180
181
182 static class SingleFileSystemShutdownThread extends Thread {
183 private final FileSystem fs;
184 SingleFileSystemShutdownThread(final FileSystem fs) {
185 super("Shutdown of " + fs);
186 this.fs = fs;
187 }
188 @Override
189 public void run() {
190 try {
191 LOG.info("Hook closing fs=" + this.fs);
192 this.fs.close();
193 } catch (NullPointerException npe) {
194 LOG.debug("Need to fix these: " + npe.toString());
195 } catch (IOException e) {
196 LOG.warn("Running hook", e);
197 }
198 }
199 }
200
201 private void init(final int nMasterNodes, final int nRegionNodes,
202 Class<? extends HMaster> masterClass,
203 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
204 throws IOException, InterruptedException {
205 try {
206 if (masterClass == null){
207 masterClass = HMaster.class;
208 }
209 if (regionserverClass == null){
210 regionserverClass = MiniHBaseCluster.MiniHBaseClusterRegionServer.class;
211 }
212
213
214 hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, 0,
215 masterClass, regionserverClass);
216
217
218 for (int i=0; i<nRegionNodes; i++) {
219 Configuration rsConf = HBaseConfiguration.create(conf);
220 User user = HBaseTestingUtility.getDifferentUser(rsConf,
221 ".hfs."+index++);
222 hbaseCluster.addRegionServer(rsConf, i, user);
223 }
224
225 hbaseCluster.startup();
226 } catch (IOException e) {
227 shutdown();
228 throw e;
229 } catch (Throwable t) {
230 LOG.error("Error starting cluster", t);
231 shutdown();
232 throw new IOException("Shutting down", t);
233 }
234 }
235
236 @Override
237 public void startRegionServer(String hostname, int port) throws IOException {
238 this.startRegionServer();
239 }
240
241 @Override
242 public void killRegionServer(ServerName serverName) throws IOException {
243 HRegionServer server = getRegionServer(getRegionServerIndex(serverName));
244 if (server instanceof MiniHBaseClusterRegionServer) {
245 LOG.info("Killing " + server.toString());
246 ((MiniHBaseClusterRegionServer) server).kill();
247 } else {
248 abortRegionServer(getRegionServerIndex(serverName));
249 }
250 }
251
252 @Override
253 public void stopRegionServer(ServerName serverName) throws IOException {
254 stopRegionServer(getRegionServerIndex(serverName));
255 }
256
257 @Override
258 public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException {
259
260 waitOnRegionServer(getRegionServerIndex(serverName));
261 }
262
263 @Override
264 public void startMaster(String hostname, int port) throws IOException {
265 this.startMaster();
266 }
267
268 @Override
269 public void killMaster(ServerName serverName) throws IOException {
270 abortMaster(getMasterIndex(serverName));
271 }
272
273 @Override
274 public void stopMaster(ServerName serverName) throws IOException {
275 stopMaster(getMasterIndex(serverName));
276 }
277
278 @Override
279 public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException {
280
281 waitOnMaster(getMasterIndex(serverName));
282 }
283
284
285
286
287
288
289
290 public JVMClusterUtil.RegionServerThread startRegionServer()
291 throws IOException {
292 final Configuration newConf = HBaseConfiguration.create(conf);
293 User rsUser =
294 HBaseTestingUtility.getDifferentUser(newConf, ".hfs."+index++);
295 JVMClusterUtil.RegionServerThread t = null;
296 try {
297 t = hbaseCluster.addRegionServer(
298 newConf, hbaseCluster.getRegionServers().size(), rsUser);
299 t.start();
300 t.waitForServerOnline();
301 } catch (InterruptedException ie) {
302 throw new IOException("Interrupted adding regionserver to cluster", ie);
303 }
304 return t;
305 }
306
307
308
309
310
311 public String abortRegionServer(int serverNumber) {
312 HRegionServer server = getRegionServer(serverNumber);
313 LOG.info("Aborting " + server.toString());
314 server.abort("Aborting for tests", new Exception("Trace info"));
315 return server.toString();
316 }
317
318
319
320
321
322
323
324 public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) {
325 return stopRegionServer(serverNumber, true);
326 }
327
328
329
330
331
332
333
334
335
336
337
338 public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber,
339 final boolean shutdownFS) {
340 JVMClusterUtil.RegionServerThread server =
341 hbaseCluster.getRegionServers().get(serverNumber);
342 LOG.info("Stopping " + server.toString());
343 server.getRegionServer().stop("Stopping rs " + serverNumber);
344 return server;
345 }
346
347
348
349
350
351
352
353 public String waitOnRegionServer(final int serverNumber) {
354 return this.hbaseCluster.waitOnRegionServer(serverNumber);
355 }
356
357
358
359
360
361
362
363
364 public JVMClusterUtil.MasterThread startMaster() throws IOException {
365 Configuration c = HBaseConfiguration.create(conf);
366 User user =
367 HBaseTestingUtility.getDifferentUser(c, ".hfs."+index++);
368
369 JVMClusterUtil.MasterThread t = null;
370 try {
371 t = hbaseCluster.addMaster(c, hbaseCluster.getMasters().size(), user);
372 t.start();
373 } catch (InterruptedException ie) {
374 throw new IOException("Interrupted adding master to cluster", ie);
375 }
376 return t;
377 }
378
379
380
381
382
383 public MasterService.BlockingInterface getMasterAdminService() {
384 return this.hbaseCluster.getActiveMaster().getMasterRpcServices();
385 }
386
387
388
389
390
391 public HMaster getMaster() {
392 return this.hbaseCluster.getActiveMaster();
393 }
394
395
396
397
398
399 public MasterThread getMasterThread() {
400 for (MasterThread mt: hbaseCluster.getLiveMasters()) {
401 if (mt.getMaster().isActiveMaster()) {
402 return mt;
403 }
404 }
405 return null;
406 }
407
408
409
410
411
412 public HMaster getMaster(final int serverNumber) {
413 return this.hbaseCluster.getMaster(serverNumber);
414 }
415
416
417
418
419
420 public String abortMaster(int serverNumber) {
421 HMaster server = getMaster(serverNumber);
422 LOG.info("Aborting " + server.toString());
423 server.abort("Aborting for tests", new Exception("Trace info"));
424 return server.toString();
425 }
426
427
428
429
430
431
432
433 public JVMClusterUtil.MasterThread stopMaster(int serverNumber) {
434 return stopMaster(serverNumber, true);
435 }
436
437
438
439
440
441
442
443
444
445
446
447 public JVMClusterUtil.MasterThread stopMaster(int serverNumber,
448 final boolean shutdownFS) {
449 JVMClusterUtil.MasterThread server =
450 hbaseCluster.getMasters().get(serverNumber);
451 LOG.info("Stopping " + server.toString());
452 server.getMaster().stop("Stopping master " + serverNumber);
453 return server;
454 }
455
456
457
458
459
460
461
462 public String waitOnMaster(final int serverNumber) {
463 return this.hbaseCluster.waitOnMaster(serverNumber);
464 }
465
466
467
468
469
470
471
472
473
474 public boolean waitForActiveAndReadyMaster(long timeout) throws IOException {
475 List<JVMClusterUtil.MasterThread> mts;
476 long start = System.currentTimeMillis();
477 while (!(mts = getMasterThreads()).isEmpty()
478 && (System.currentTimeMillis() - start) < timeout) {
479 for (JVMClusterUtil.MasterThread mt : mts) {
480 if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) {
481 return true;
482 }
483 }
484
485 Threads.sleep(100);
486 }
487 return false;
488 }
489
490
491
492
493 public List<JVMClusterUtil.MasterThread> getMasterThreads() {
494 return this.hbaseCluster.getMasters();
495 }
496
497
498
499
500 public List<JVMClusterUtil.MasterThread> getLiveMasterThreads() {
501 return this.hbaseCluster.getLiveMasters();
502 }
503
504
505
506
507 public void join() {
508 this.hbaseCluster.join();
509 }
510
511
512
513
514
515 @SuppressWarnings("deprecation")
516 public void shutdown() throws IOException {
517 if (this.hbaseCluster != null) {
518 this.hbaseCluster.shutdown();
519 }
520 }
521
522 @Override
523 public void close() throws IOException {
524 }
525
526 @Override
527 public ClusterStatus getClusterStatus() throws IOException {
528 HMaster master = getMaster();
529 return master == null ? null : master.getClusterStatus();
530 }
531
532
533
534
535
536 public void flushcache() throws IOException {
537 for (JVMClusterUtil.RegionServerThread t:
538 this.hbaseCluster.getRegionServers()) {
539 for(Region r: t.getRegionServer().getOnlineRegionsLocalContext()) {
540 r.flush(true);
541 }
542 }
543 }
544
545
546
547
548
549 public void flushcache(TableName tableName) throws IOException {
550 for (JVMClusterUtil.RegionServerThread t:
551 this.hbaseCluster.getRegionServers()) {
552 for(Region r: t.getRegionServer().getOnlineRegionsLocalContext()) {
553 if(r.getTableDesc().getTableName().equals(tableName)) {
554 r.flush(true);
555 }
556 }
557 }
558 }
559
560
561
562
563
564 public void compact(boolean major) throws IOException {
565 for (JVMClusterUtil.RegionServerThread t:
566 this.hbaseCluster.getRegionServers()) {
567 for(Region r: t.getRegionServer().getOnlineRegionsLocalContext()) {
568 r.compact(major);
569 }
570 }
571 }
572
573
574
575
576
577 public void compact(TableName tableName, boolean major) throws IOException {
578 for (JVMClusterUtil.RegionServerThread t:
579 this.hbaseCluster.getRegionServers()) {
580 for(Region r: t.getRegionServer().getOnlineRegionsLocalContext()) {
581 if(r.getTableDesc().getTableName().equals(tableName)) {
582 r.compact(major);
583 }
584 }
585 }
586 }
587
588
589
590
591 public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() {
592 return this.hbaseCluster.getRegionServers();
593 }
594
595
596
597
598 public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() {
599 return this.hbaseCluster.getLiveRegionServers();
600 }
601
602
603
604
605
606
607 public HRegionServer getRegionServer(int serverNumber) {
608 return hbaseCluster.getRegionServer(serverNumber);
609 }
610
611 public List<HRegion> getRegions(byte[] tableName) {
612 return getRegions(TableName.valueOf(tableName));
613 }
614
615 public List<HRegion> getRegions(TableName tableName) {
616 List<HRegion> ret = new ArrayList<HRegion>();
617 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
618 HRegionServer hrs = rst.getRegionServer();
619 for (Region region : hrs.getOnlineRegionsLocalContext()) {
620 if (region.getTableDesc().getTableName().equals(tableName)) {
621 ret.add((HRegion)region);
622 }
623 }
624 }
625 return ret;
626 }
627
628
629
630
631
632 public int getServerWithMeta() {
633 return getServerWith(HRegionInfo.FIRST_META_REGIONINFO.getRegionName());
634 }
635
636
637
638
639
640
641
642 public int getServerWith(byte[] regionName) {
643 int index = -1;
644 int count = 0;
645 for (JVMClusterUtil.RegionServerThread rst: getRegionServerThreads()) {
646 HRegionServer hrs = rst.getRegionServer();
647 Region region = hrs.getOnlineRegion(regionName);
648 if (region != null) {
649 index = count;
650 break;
651 }
652 count++;
653 }
654 return index;
655 }
656
657 @Override
658 public ServerName getServerHoldingRegion(final TableName tn, byte[] regionName)
659 throws IOException {
660
661
662
663
664 HMaster master = getMaster();
665 Region region = master.getOnlineRegion(regionName);
666 if (region != null) {
667 return master.getServerName();
668 }
669 int index = getServerWith(regionName);
670 if (index < 0) {
671 return null;
672 }
673 return getRegionServer(index).getServerName();
674 }
675
676
677
678
679
680
681
682 public long countServedRegions() {
683 long count = 0;
684 for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) {
685 count += rst.getRegionServer().getNumberOfOnlineRegions();
686 }
687 for (JVMClusterUtil.MasterThread mt : getLiveMasterThreads()) {
688 count += mt.getMaster().getNumberOfOnlineRegions();
689 }
690 return count;
691 }
692
693
694
695
696
697 public void killAll() {
698 for (RegionServerThread rst : getRegionServerThreads()) {
699 rst.getRegionServer().abort("killAll");
700 }
701 for (MasterThread masterThread : getMasterThreads()) {
702 masterThread.getMaster().abort("killAll", new Throwable());
703 }
704 }
705
706 @Override
707 public void waitUntilShutDown() {
708 this.hbaseCluster.join();
709 }
710
711 public List<HRegion> findRegionsForTable(TableName tableName) {
712 ArrayList<HRegion> ret = new ArrayList<HRegion>();
713 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
714 HRegionServer hrs = rst.getRegionServer();
715 for (Region region : hrs.getOnlineRegions(tableName)) {
716 if (region.getTableDesc().getTableName().equals(tableName)) {
717 ret.add((HRegion)region);
718 }
719 }
720 }
721 return ret;
722 }
723
724
725 protected int getRegionServerIndex(ServerName serverName) {
726
727 List<RegionServerThread> servers = getRegionServerThreads();
728 for (int i=0; i < servers.size(); i++) {
729 if (servers.get(i).getRegionServer().getServerName().equals(serverName)) {
730 return i;
731 }
732 }
733 return -1;
734 }
735
736 protected int getMasterIndex(ServerName serverName) {
737 List<MasterThread> masters = getMasterThreads();
738 for (int i = 0; i < masters.size(); i++) {
739 if (masters.get(i).getMaster().getServerName().equals(serverName)) {
740 return i;
741 }
742 }
743 return -1;
744 }
745
746 @Override
747 public AdminService.BlockingInterface getAdminProtocol(ServerName serverName) throws IOException {
748 return getRegionServer(getRegionServerIndex(serverName)).getRSRpcServices();
749 }
750
751 @Override
752 public ClientService.BlockingInterface getClientProtocol(ServerName serverName)
753 throws IOException {
754 return getRegionServer(getRegionServerIndex(serverName)).getRSRpcServices();
755 }
756 }