1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.rsgroup;
22
23 import com.google.common.collect.Lists;
24 import com.google.common.collect.Maps;
25
26 import com.google.common.collect.Sets;
27 import com.google.common.net.HostAndPort;
28 import com.google.protobuf.ServiceException;
29
30 import java.io.IOException;
31 import java.util.ArrayList;
32 import java.util.Collections;
33 import java.util.Comparator;
34 import java.util.HashMap;
35 import java.util.HashSet;
36 import java.util.LinkedList;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.NavigableSet;
40 import java.util.Set;
41 import java.util.TreeSet;
42 import java.util.concurrent.atomic.AtomicBoolean;
43
44 import org.apache.commons.logging.Log;
45 import org.apache.commons.logging.LogFactory;
46 import org.apache.hadoop.conf.Configuration;
47 import org.apache.hadoop.hbase.Cell;
48 import org.apache.hadoop.hbase.CellUtil;
49 import org.apache.hadoop.hbase.Coprocessor;
50 import org.apache.hadoop.hbase.DoNotRetryIOException;
51 import org.apache.hadoop.hbase.HColumnDescriptor;
52 import org.apache.hadoop.hbase.HConstants;
53 import org.apache.hadoop.hbase.HRegionInfo;
54 import org.apache.hadoop.hbase.HTableDescriptor;
55 import org.apache.hadoop.hbase.MetaTableAccessor;
56 import org.apache.hadoop.hbase.MetaTableAccessor.DefaultVisitorBase;
57 import org.apache.hadoop.hbase.NamespaceDescriptor;
58 import org.apache.hadoop.hbase.ServerName;
59 import org.apache.hadoop.hbase.TableName;
60 import org.apache.hadoop.hbase.TableStateManager;
61 import org.apache.hadoop.hbase.client.ClusterConnection;
62 import org.apache.hadoop.hbase.client.Delete;
63 import org.apache.hadoop.hbase.client.Get;
64 import org.apache.hadoop.hbase.client.Mutation;
65 import org.apache.hadoop.hbase.client.Put;
66 import org.apache.hadoop.hbase.client.Result;
67 import org.apache.hadoop.hbase.client.Table;
68 import org.apache.hadoop.hbase.constraint.ConstraintException;
69 import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
70 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
71 import org.apache.hadoop.hbase.master.MasterServices;
72 import org.apache.hadoop.hbase.master.ServerListener;
73 import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
74 import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
75 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
76 import org.apache.hadoop.hbase.protobuf.RequestConverter;
77 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
78 import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
79 import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
80 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
81 import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
82 import org.apache.hadoop.hbase.security.access.AccessControlLists;
83 import org.apache.hadoop.hbase.util.Bytes;
84 import org.apache.hadoop.hbase.util.ModifyRegionUtils;
85 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
86 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
87 import org.apache.zookeeper.KeeperException;
88
89
90
91
92
93
94
95 public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListener {
96 private static final Log LOG = LogFactory.getLog(RSGroupInfoManagerImpl.class);
97
98
99 private final static HTableDescriptor RSGROUP_TABLE_DESC;
100 static {
101 RSGROUP_TABLE_DESC = new HTableDescriptor(RSGROUP_TABLE_NAME_BYTES);
102 RSGROUP_TABLE_DESC.addFamily(new HColumnDescriptor(META_FAMILY_BYTES));
103 RSGROUP_TABLE_DESC.setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName());
104 try {
105 RSGROUP_TABLE_DESC.addCoprocessor(
106 MultiRowMutationEndpoint.class.getName(),
107 null, Coprocessor.PRIORITY_SYSTEM, null);
108 } catch (IOException ex) {
109 throw new RuntimeException(ex);
110 }
111 }
112
113 private volatile Map<String, RSGroupInfo> rsGroupMap;
114 private volatile Map<TableName, String> tableMap;
115 private MasterServices master;
116 private Table rsGroupTable;
117 private ClusterConnection conn;
118 private ZooKeeperWatcher watcher;
119 private RSGroupStartupWorker rsGroupStartupWorker;
120
121 private volatile Set<String> prevRSGroups;
122 private RSGroupSerDe rsGroupSerDe;
123 private DefaultServerUpdater defaultServerUpdater;
124
125
126 public RSGroupInfoManagerImpl(MasterServices master) throws IOException {
127 this.rsGroupMap = Collections.EMPTY_MAP;
128 this.tableMap = Collections.EMPTY_MAP;
129 rsGroupSerDe = new RSGroupSerDe();
130 this.master = master;
131 this.watcher = master.getZooKeeper();
132 this.conn = master.getConnection();
133 rsGroupStartupWorker = new RSGroupStartupWorker(this, master, conn);
134 prevRSGroups = new HashSet<String>();
135 refresh();
136 rsGroupStartupWorker.start();
137 defaultServerUpdater = new DefaultServerUpdater(this);
138 master.getServerManager().registerListener(this);
139 defaultServerUpdater.start();
140 }
141
142
143
144
145
146
147 @Override
148 public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException {
149 checkGroupName(rsGroupInfo.getName());
150 if (rsGroupMap.get(rsGroupInfo.getName()) != null ||
151 rsGroupInfo.getName().equals(rsGroupInfo.DEFAULT_GROUP)) {
152 throw new DoNotRetryIOException("Group already exists: "+ rsGroupInfo.getName());
153 }
154 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
155 newGroupMap.put(rsGroupInfo.getName(), rsGroupInfo);
156 flushConfig(newGroupMap);
157 }
158
159 @Override
160 public synchronized boolean moveServers(Set<HostAndPort> hostPorts, String srcGroup,
161 String dstGroup) throws IOException {
162 if (!rsGroupMap.containsKey(srcGroup)) {
163 throw new DoNotRetryIOException("Group "+srcGroup+" does not exist");
164 }
165 if (!rsGroupMap.containsKey(dstGroup)) {
166 throw new DoNotRetryIOException("Group "+dstGroup+" does not exist");
167 }
168
169 RSGroupInfo src = new RSGroupInfo(getRSGroup(srcGroup));
170 RSGroupInfo dst = new RSGroupInfo(getRSGroup(dstGroup));
171 boolean foundOne = false;
172 for(HostAndPort el: hostPorts) {
173 foundOne = src.removeServer(el) || foundOne;
174 dst.addServer(el);
175 }
176
177 Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
178 newGroupMap.put(src.getName(), src);
179 newGroupMap.put(dst.getName(), dst);
180
181 flushConfig(newGroupMap);
182 return foundOne;
183 }
184
185
186
187
188
189
190
191 @Override
192 public RSGroupInfo getRSGroupOfServer(HostAndPort hostPort) throws IOException {
193 for (RSGroupInfo info : rsGroupMap.values()) {
194 if (info.containsServer(hostPort)){
195 return info;
196 }
197 }
198 return null;
199 }
200
201
202
203
204
205
206
207
208 @Override
209 public RSGroupInfo getRSGroup(String groupName) throws IOException {
210 RSGroupInfo RSGroupInfo = rsGroupMap.get(groupName);
211 return RSGroupInfo;
212 }
213
214
215
216 @Override
217 public String getRSGroupOfTable(TableName tableName) throws IOException {
218 return tableMap.get(tableName);
219 }
220
221 @Override
222 public synchronized void moveTables(
223 Set<TableName> tableNames, String groupName) throws IOException {
224 if (groupName != null && !rsGroupMap.containsKey(groupName)) {
225 throw new DoNotRetryIOException("Group "+groupName+" does not exist or is a special group");
226 }
227
228 Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
229 for(TableName tableName: tableNames) {
230 if (tableMap.containsKey(tableName)) {
231 RSGroupInfo src = new RSGroupInfo(newGroupMap.get(tableMap.get(tableName)));
232 src.removeTable(tableName);
233 newGroupMap.put(src.getName(), src);
234 }
235 if(groupName != null) {
236 RSGroupInfo dst = new RSGroupInfo(newGroupMap.get(groupName));
237 dst.addTable(tableName);
238 newGroupMap.put(dst.getName(), dst);
239 }
240 }
241
242 flushConfig(newGroupMap);
243 }
244
245
246
247
248
249
250
251
252 @Override
253 public synchronized void removeRSGroup(String groupName) throws IOException {
254 if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) {
255 throw new DoNotRetryIOException("Group "+groupName+" does not exist or is a reserved group");
256 }
257 Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
258 newGroupMap.remove(groupName);
259 flushConfig(newGroupMap);
260 }
261
262 @Override
263 public List<RSGroupInfo> listRSGroups() throws IOException {
264 List<RSGroupInfo> list = Lists.newLinkedList(rsGroupMap.values());
265 return list;
266 }
267
268 @Override
269 public boolean isOnline() {
270 return rsGroupStartupWorker.isOnline();
271 }
272
273 @Override
274 public synchronized void refresh() throws IOException {
275 refresh(false);
276 }
277
278 private synchronized void refresh(boolean forceOnline) throws IOException {
279 List<RSGroupInfo> groupList = new LinkedList<RSGroupInfo>();
280
281
282
283 if (forceOnline || isOnline()) {
284 LOG.debug("Refreshing in Online mode.");
285 if (rsGroupTable == null) {
286 rsGroupTable = conn.getTable(RSGROUP_TABLE_NAME);
287 }
288 groupList.addAll(rsGroupSerDe.retrieveGroupList(rsGroupTable));
289 } else {
290 LOG.debug("Refershing in Offline mode.");
291 String groupBasePath = ZKUtil.joinZNode(watcher.baseZNode, rsGroupZNode);
292 groupList.addAll(rsGroupSerDe.retrieveGroupList(watcher, groupBasePath));
293 }
294
295
296 NavigableSet<TableName> orphanTables = new TreeSet<TableName>();
297 for(String entry: master.getTableDescriptors().getAll().keySet()) {
298 orphanTables.add(TableName.valueOf(entry));
299 }
300
301 List<TableName> specialTables;
302 if(!master.isInitialized()) {
303 specialTables = new ArrayList<TableName>();
304 specialTables.add(AccessControlLists.ACL_TABLE_NAME);
305 specialTables.add(TableName.META_TABLE_NAME);
306 specialTables.add(TableName.NAMESPACE_TABLE_NAME);
307 specialTables.add(RSGROUP_TABLE_NAME);
308 } else {
309 specialTables =
310 master.listTableNamesByNamespace(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR);
311 }
312
313 for(TableName table : specialTables) {
314 orphanTables.add(table);
315 }
316 for(RSGroupInfo group: groupList) {
317 if(!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
318 orphanTables.removeAll(group.getTables());
319 }
320 }
321
322
323
324
325 groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP,
326 Sets.newHashSet(getDefaultServers()),
327 orphanTables));
328
329
330
331 HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap();
332 HashMap<TableName, String> newTableMap = Maps.newHashMap();
333 for (RSGroupInfo group : groupList) {
334 newGroupMap.put(group.getName(), group);
335 for(TableName table: group.getTables()) {
336 newTableMap.put(table, group.getName());
337 }
338 }
339 rsGroupMap = Collections.unmodifiableMap(newGroupMap);
340 tableMap = Collections.unmodifiableMap(newTableMap);
341
342 prevRSGroups.clear();
343 prevRSGroups.addAll(rsGroupMap.keySet());
344 }
345
346 private synchronized Map<TableName,String> flushConfigTable(Map<String,RSGroupInfo> newGroupMap)
347 throws IOException {
348 Map<TableName,String> newTableMap = Maps.newHashMap();
349 List<Mutation> mutations = Lists.newArrayList();
350
351
352 for(String groupName : prevRSGroups) {
353 if(!newGroupMap.containsKey(groupName)) {
354 Delete d = new Delete(Bytes.toBytes(groupName));
355 mutations.add(d);
356 }
357 }
358
359
360 for(RSGroupInfo RSGroupInfo : newGroupMap.values()) {
361 RSGroupProtos.RSGroupInfo proto = ProtobufUtil.toProtoGroupInfo(RSGroupInfo);
362 Put p = new Put(Bytes.toBytes(RSGroupInfo.getName()));
363 p.addColumn(META_FAMILY_BYTES,
364 META_QUALIFIER_BYTES,
365 proto.toByteArray());
366 mutations.add(p);
367 for(TableName entry: RSGroupInfo.getTables()) {
368 newTableMap.put(entry, RSGroupInfo.getName());
369 }
370 }
371
372 if(mutations.size() > 0) {
373 multiMutate(mutations);
374 }
375 return newTableMap;
376 }
377
378 private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) throws IOException {
379 Map<TableName, String> newTableMap;
380
381
382
383 if (!isOnline()) {
384 Map<String, RSGroupInfo> m = Maps.newHashMap(rsGroupMap);
385 RSGroupInfo oldDefaultGroup = m.remove(RSGroupInfo.DEFAULT_GROUP);
386 RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
387 if (!m.equals(newGroupMap) ||
388 !oldDefaultGroup.getTables().equals(newDefaultGroup.getTables())) {
389 throw new IOException("Only default servers can be updated during offline mode");
390 }
391 newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroup);
392 rsGroupMap = newGroupMap;
393 return;
394 }
395
396 newTableMap = flushConfigTable(newGroupMap);
397
398
399
400 rsGroupMap = Collections.unmodifiableMap(newGroupMap);
401 tableMap = Collections.unmodifiableMap(newTableMap);
402
403
404 try {
405 String groupBasePath = ZKUtil.joinZNode(watcher.baseZNode, rsGroupZNode);
406 ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufUtil.PB_MAGIC);
407
408 List<ZKUtil.ZKUtilOp> zkOps = new ArrayList<ZKUtil.ZKUtilOp>(newGroupMap.size());
409 for(String groupName : prevRSGroups) {
410 if(!newGroupMap.containsKey(groupName)) {
411 String znode = ZKUtil.joinZNode(groupBasePath, groupName);
412 zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
413 }
414 }
415
416
417 for(RSGroupInfo RSGroupInfo : newGroupMap.values()) {
418 String znode = ZKUtil.joinZNode(groupBasePath, RSGroupInfo.getName());
419 RSGroupProtos.RSGroupInfo proto = ProtobufUtil.toProtoGroupInfo(RSGroupInfo);
420 LOG.debug("Updating znode: "+znode);
421 ZKUtil.createAndFailSilent(watcher, znode);
422 zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
423 zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode,
424 ProtobufUtil.prependPBMagic(proto.toByteArray())));
425 }
426 LOG.debug("Writing ZK GroupInfo count: " + zkOps.size());
427
428 ZKUtil.multiOrSequential(watcher, zkOps, false);
429 } catch (KeeperException e) {
430 LOG.error("Failed to write to rsGroupZNode", e);
431 master.abort("Failed to write to rsGroupZNode", e);
432 throw new IOException("Failed to write to rsGroupZNode",e);
433 }
434
435 prevRSGroups.clear();
436 prevRSGroups.addAll(newGroupMap.keySet());
437 }
438
439 private List<ServerName> getOnlineRS() throws IOException {
440 if (master != null) {
441 return master.getServerManager().getOnlineServersList();
442 }
443 try {
444 LOG.debug("Reading online RS from zookeeper");
445 List<ServerName> servers = new LinkedList<ServerName>();
446 for (String el: ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode)) {
447 servers.add(ServerName.parseServerName(el));
448 }
449 return servers;
450 } catch (KeeperException e) {
451 throw new IOException("Failed to retrieve server list from zookeeper", e);
452 }
453 }
454
455 private List<HostAndPort> getDefaultServers() throws IOException {
456 List<HostAndPort> defaultServers = new LinkedList<HostAndPort>();
457 for(ServerName server : getOnlineRS()) {
458 HostAndPort hostPort = HostAndPort.fromParts(server.getHostname(), server.getPort());
459 boolean found = false;
460 for(RSGroupInfo RSGroupInfo : rsGroupMap.values()) {
461 if(!RSGroupInfo.DEFAULT_GROUP.equals(RSGroupInfo.getName()) &&
462 RSGroupInfo.containsServer(hostPort)) {
463 found = true;
464 break;
465 }
466 }
467 if(!found) {
468 defaultServers.add(hostPort);
469 }
470 }
471 return defaultServers;
472 }
473
474 private synchronized void updateDefaultServers(
475 Set<HostAndPort> hostPort) throws IOException {
476 RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP);
477 RSGroupInfo newInfo = new RSGroupInfo(info.getName(), hostPort, info.getTables());
478 HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
479 newGroupMap.put(newInfo.getName(), newInfo);
480 flushConfig(newGroupMap);
481 }
482
483 @Override
484 public void serverAdded(ServerName serverName) {
485 defaultServerUpdater.serverChanged();
486 }
487
488 @Override
489 public void serverRemoved(ServerName serverName) {
490 defaultServerUpdater.serverChanged();
491 }
492
493 private static class DefaultServerUpdater extends Thread {
494 private static final Log LOG = LogFactory.getLog(DefaultServerUpdater.class);
495 private RSGroupInfoManagerImpl mgr;
496 private boolean hasChanged = false;
497
498 public DefaultServerUpdater(RSGroupInfoManagerImpl mgr) {
499 this.mgr = mgr;
500 }
501
502 @Override
503 public void run() {
504 List<HostAndPort> prevDefaultServers = new LinkedList<HostAndPort>();
505 while(!mgr.master.isAborted() || !mgr.master.isStopped()) {
506 try {
507 LOG.info("Updating default servers.");
508 List<HostAndPort> servers = mgr.getDefaultServers();
509 Collections.sort(servers, new Comparator<HostAndPort>() {
510 @Override
511 public int compare(HostAndPort o1, HostAndPort o2) {
512 int diff = o1.getHostText().compareTo(o2.getHostText());
513 if (diff != 0) {
514 return diff;
515 }
516 return o1.getPort() - o2.getPort();
517 }
518 });
519 if(!servers.equals(prevDefaultServers)) {
520 mgr.updateDefaultServers(Sets.<HostAndPort>newHashSet(servers));
521 prevDefaultServers = servers;
522 LOG.info("Updated with servers: "+servers.size());
523 }
524 try {
525 synchronized (this) {
526 if(!hasChanged) {
527 wait();
528 }
529 hasChanged = false;
530 }
531 } catch (InterruptedException e) {
532 }
533 } catch (IOException e) {
534 LOG.warn("Failed to update default servers", e);
535 }
536 }
537 }
538
539 public void serverChanged() {
540 synchronized (this) {
541 hasChanged = true;
542 this.notify();
543 }
544 }
545 }
546
547
548 private static class RSGroupStartupWorker extends Thread {
549 private static final Log LOG = LogFactory.getLog(RSGroupStartupWorker.class);
550
551 private Configuration conf;
552 private volatile boolean isOnline = false;
553 private MasterServices masterServices;
554 private RSGroupInfoManagerImpl groupInfoManager;
555 private ClusterConnection conn;
556
557 public RSGroupStartupWorker(RSGroupInfoManagerImpl groupInfoManager,
558 MasterServices masterServices,
559 ClusterConnection conn) {
560 this.conf = masterServices.getConfiguration();
561 this.masterServices = masterServices;
562 this.groupInfoManager = groupInfoManager;
563 this.conn = conn;
564 setName(RSGroupStartupWorker.class.getName()+"-"+masterServices.getServerName());
565 setDaemon(true);
566 }
567
568 @Override
569 public void run() {
570 if(waitForGroupTableOnline()) {
571 LOG.info("GroupBasedLoadBalancer is now online");
572 }
573 }
574
575 public boolean waitForGroupTableOnline() {
576 final List<HRegionInfo> foundRegions = new LinkedList<HRegionInfo>();
577 final List<HRegionInfo> assignedRegions = new LinkedList<HRegionInfo>();
578 final AtomicBoolean found = new AtomicBoolean(false);
579 final TableStateManager tsm = masterServices.getTableStateManager();
580 boolean createSent = false;
581 while (!found.get() && isMasterRunning()) {
582 foundRegions.clear();
583 assignedRegions.clear();
584 found.set(true);
585 try {
586 final Table nsTable = conn.getTable(TableName.NAMESPACE_TABLE_NAME);
587 final Table groupTable = conn.getTable(RSGROUP_TABLE_NAME);
588 boolean rootMetaFound =
589 masterServices.getMetaTableLocator().verifyMetaRegionLocation(
590 conn,
591 masterServices.getZooKeeper(),
592 1);
593 final AtomicBoolean nsFound = new AtomicBoolean(false);
594 if (rootMetaFound) {
595
596 MetaTableAccessor.Visitor visitor = new DefaultVisitorBase() {
597 @Override
598 public boolean visitInternal(Result row) throws IOException {
599
600 HRegionInfo info = MetaTableAccessor.getHRegionInfo(row);
601 if (info != null) {
602 Cell serverCell =
603 row.getColumnLatestCell(HConstants.CATALOG_FAMILY,
604 HConstants.SERVER_QUALIFIER);
605 if (RSGROUP_TABLE_NAME.equals(info.getTable()) && serverCell != null) {
606 ServerName sn =
607 ServerName.parseVersionedServerName(CellUtil.cloneValue(serverCell));
608 if (sn == null) {
609 found.set(false);
610 } else if (tsm.isTableState(RSGROUP_TABLE_NAME, ZooKeeperProtos.Table.State.ENABLED)) {
611 try {
612 ClientProtos.ClientService.BlockingInterface rs = conn.getClient(sn);
613 ClientProtos.GetRequest request =
614 RequestConverter.buildGetRequest(info.getRegionName(),
615 new Get(ROW_KEY));
616 rs.get(null, request);
617 assignedRegions.add(info);
618 } catch(Exception ex) {
619 LOG.debug("Caught exception while verifying group region", ex);
620 }
621 }
622 foundRegions.add(info);
623 }
624 if (TableName.NAMESPACE_TABLE_NAME.equals(info.getTable())) {
625 Cell cell = row.getColumnLatestCell(HConstants.CATALOG_FAMILY,
626 HConstants.SERVER_QUALIFIER);
627 ServerName sn = null;
628 if(cell != null) {
629 sn = ServerName.parseVersionedServerName(CellUtil.cloneValue(cell));
630 }
631 if (tsm.isTableState(TableName.NAMESPACE_TABLE_NAME,
632 ZooKeeperProtos.Table.State.ENABLED)) {
633 try {
634 ClientProtos.ClientService.BlockingInterface rs = conn.getClient(sn);
635 ClientProtos.GetRequest request =
636 RequestConverter.buildGetRequest(info.getRegionName(),
637 new Get(ROW_KEY));
638 rs.get(null, request);
639 nsFound.set(true);
640 } catch(Exception ex) {
641 LOG.debug("Caught exception while verifying group region", ex);
642 }
643 }
644 }
645 }
646 return true;
647 }
648 };
649 MetaTableAccessor.fullScan(conn, visitor);
650
651 if (foundRegions.size() < 1 && rootMetaFound && !createSent && nsFound.get()) {
652 groupInfoManager.createGroupTable(masterServices);
653 createSent = true;
654 }
655 LOG.info("Group table: " + RSGROUP_TABLE_NAME + " isOnline: " + found.get()
656 + ", regionCount: " + foundRegions.size() + ", assignCount: "
657 + assignedRegions.size() + ", rootMetaFound: "+rootMetaFound);
658 found.set(found.get() && assignedRegions.size() == foundRegions.size()
659 && foundRegions.size() > 0);
660 } else {
661 LOG.info("Waiting for catalog tables to come online");
662 found.set(false);
663 }
664 if (found.get()) {
665 LOG.debug("With group table online, refreshing cached information.");
666 groupInfoManager.refresh(true);
667 isOnline = true;
668
669 groupInfoManager.flushConfig(groupInfoManager.rsGroupMap);
670 }
671 } catch(Exception e) {
672 found.set(false);
673 LOG.warn("Failed to perform check", e);
674 }
675 try {
676 Thread.sleep(100);
677 } catch (InterruptedException e) {
678 LOG.info("Sleep interrupted", e);
679 }
680 }
681 return found.get();
682 }
683
684 public boolean isOnline() {
685 return isOnline;
686 }
687
688 private boolean isMasterRunning() {
689 return !masterServices.isAborted() && !masterServices.isStopped();
690 }
691 }
692
693 private void createGroupTable(MasterServices masterServices) throws IOException {
694 HRegionInfo[] newRegions =
695 ModifyRegionUtils.createHRegionInfos(RSGROUP_TABLE_DESC, null);
696 ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
697 masterServices.getMasterProcedureExecutor().submitProcedure(
698 new CreateTableProcedure(
699 masterServices.getMasterProcedureExecutor().getEnvironment(),
700 RSGROUP_TABLE_DESC,
701 newRegions,
702 latch));
703 latch.await();
704
705 int tries = 600;
706 while(masterServices.getAssignmentManager().getRegionStates()
707 .getRegionServerOfRegion(newRegions[0]) == null && tries > 0) {
708 try {
709 Thread.sleep(100);
710 } catch (InterruptedException e) {
711 throw new IOException("Wait interrupted", e);
712 }
713 tries--;
714 }
715 if(tries <= 0) {
716 throw new IOException("Failed to create group table.");
717 }
718 }
719
720 private void multiMutate(List<Mutation> mutations)
721 throws IOException {
722 CoprocessorRpcChannel channel = rsGroupTable.coprocessorService(ROW_KEY);
723 MultiRowMutationProtos.MutateRowsRequest.Builder mmrBuilder
724 = MultiRowMutationProtos.MutateRowsRequest.newBuilder();
725 for (Mutation mutation : mutations) {
726 if (mutation instanceof Put) {
727 mmrBuilder.addMutationRequest(ProtobufUtil.toMutation(
728 ClientProtos.MutationProto.MutationType.PUT, mutation));
729 } else if (mutation instanceof Delete) {
730 mmrBuilder.addMutationRequest(ProtobufUtil.toMutation(
731 ClientProtos.MutationProto.MutationType.DELETE, mutation));
732 } else {
733 throw new DoNotRetryIOException("multiMutate doesn't support "
734 + mutation.getClass().getName());
735 }
736 }
737
738 MultiRowMutationProtos.MultiRowMutationService.BlockingInterface service =
739 MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel);
740 try {
741 service.mutateRows(null, mmrBuilder.build());
742 } catch (ServiceException ex) {
743 ProtobufUtil.toIOException(ex);
744 }
745 }
746
747 private void checkGroupName(String groupName) throws ConstraintException {
748 if(!groupName.matches("[a-zA-Z0-9_]+")) {
749 throw new ConstraintException("Group name should only contain alphanumeric characters");
750 }
751 }
752 }