View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.rsgroup;
22  
23  import com.google.common.collect.Lists;
24  import com.google.common.collect.Maps;
25  
26  import com.google.common.collect.Sets;
27  import com.google.common.net.HostAndPort;
28  import com.google.protobuf.ServiceException;
29  
30  import java.io.IOException;
31  import java.util.ArrayList;
32  import java.util.Collections;
33  import java.util.Comparator;
34  import java.util.HashMap;
35  import java.util.HashSet;
36  import java.util.LinkedList;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.NavigableSet;
40  import java.util.Set;
41  import java.util.TreeSet;
42  import java.util.concurrent.atomic.AtomicBoolean;
43  
44  import org.apache.commons.logging.Log;
45  import org.apache.commons.logging.LogFactory;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.hbase.Cell;
48  import org.apache.hadoop.hbase.CellUtil;
49  import org.apache.hadoop.hbase.Coprocessor;
50  import org.apache.hadoop.hbase.DoNotRetryIOException;
51  import org.apache.hadoop.hbase.HColumnDescriptor;
52  import org.apache.hadoop.hbase.HConstants;
53  import org.apache.hadoop.hbase.HRegionInfo;
54  import org.apache.hadoop.hbase.HTableDescriptor;
55  import org.apache.hadoop.hbase.MetaTableAccessor;
56  import org.apache.hadoop.hbase.MetaTableAccessor.DefaultVisitorBase;
57  import org.apache.hadoop.hbase.NamespaceDescriptor;
58  import org.apache.hadoop.hbase.ServerName;
59  import org.apache.hadoop.hbase.TableName;
60  import org.apache.hadoop.hbase.TableStateManager;
61  import org.apache.hadoop.hbase.client.ClusterConnection;
62  import org.apache.hadoop.hbase.client.Delete;
63  import org.apache.hadoop.hbase.client.Get;
64  import org.apache.hadoop.hbase.client.Mutation;
65  import org.apache.hadoop.hbase.client.Put;
66  import org.apache.hadoop.hbase.client.Result;
67  import org.apache.hadoop.hbase.client.Table;
68  import org.apache.hadoop.hbase.constraint.ConstraintException;
69  import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
70  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
71  import org.apache.hadoop.hbase.master.MasterServices;
72  import org.apache.hadoop.hbase.master.ServerListener;
73  import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
74  import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
75  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
76  import org.apache.hadoop.hbase.protobuf.RequestConverter;
77  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
78  import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
79  import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
80  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
81  import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
82  import org.apache.hadoop.hbase.security.access.AccessControlLists;
83  import org.apache.hadoop.hbase.util.Bytes;
84  import org.apache.hadoop.hbase.util.ModifyRegionUtils;
85  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
86  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
87  import org.apache.zookeeper.KeeperException;
88  
89  /**
90   * This is an implementation of {@link RSGroupInfoManager}. Which makes
91   * use of an HBase table as the persistence store for the group information.
92   * It also makes use of zookeeper to store group information needed
93   * for bootstrapping during offline mode.
94   */
95  public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListener {
96    private static final Log LOG = LogFactory.getLog(RSGroupInfoManagerImpl.class);
97  
98    /** Table descriptor for <code>hbase:rsgroup</code> catalog table */
99    private final static HTableDescriptor RSGROUP_TABLE_DESC;
100   static {
101     RSGROUP_TABLE_DESC = new HTableDescriptor(RSGROUP_TABLE_NAME_BYTES);
102     RSGROUP_TABLE_DESC.addFamily(new HColumnDescriptor(META_FAMILY_BYTES));
103     RSGROUP_TABLE_DESC.setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName());
104     try {
105       RSGROUP_TABLE_DESC.addCoprocessor(
106         MultiRowMutationEndpoint.class.getName(),
107           null, Coprocessor.PRIORITY_SYSTEM, null);
108     } catch (IOException ex) {
109       throw new RuntimeException(ex);
110     }
111   }
112 
113   private volatile Map<String, RSGroupInfo> rsGroupMap;
114   private volatile Map<TableName, String> tableMap;
115   private MasterServices master;
116   private Table rsGroupTable;
117   private ClusterConnection conn;
118   private ZooKeeperWatcher watcher;
119   private RSGroupStartupWorker rsGroupStartupWorker;
120   // contains list of groups that were last flushed to persistent store
121   private volatile Set<String> prevRSGroups;
122   private RSGroupSerDe rsGroupSerDe;
123   private DefaultServerUpdater defaultServerUpdater;
124 
125 
126   public RSGroupInfoManagerImpl(MasterServices master) throws IOException {
127     this.rsGroupMap = Collections.EMPTY_MAP;
128     this.tableMap = Collections.EMPTY_MAP;
129     rsGroupSerDe = new RSGroupSerDe();
130     this.master = master;
131     this.watcher = master.getZooKeeper();
132     this.conn = master.getConnection();
133     rsGroupStartupWorker = new RSGroupStartupWorker(this, master, conn);
134     prevRSGroups = new HashSet<String>();
135     refresh();
136     rsGroupStartupWorker.start();
137     defaultServerUpdater = new DefaultServerUpdater(this);
138     master.getServerManager().registerListener(this);
139     defaultServerUpdater.start();
140   }
141 
142   /**
143    * Adds the group.
144    *
145    * @param rsGroupInfo the group name
146    */
147   @Override
148   public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException {
149     checkGroupName(rsGroupInfo.getName());
150     if (rsGroupMap.get(rsGroupInfo.getName()) != null ||
151         rsGroupInfo.getName().equals(rsGroupInfo.DEFAULT_GROUP)) {
152       throw new DoNotRetryIOException("Group already exists: "+ rsGroupInfo.getName());
153     }
154     Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
155     newGroupMap.put(rsGroupInfo.getName(), rsGroupInfo);
156     flushConfig(newGroupMap);
157   }
158 
159   @Override
160   public synchronized boolean moveServers(Set<HostAndPort> hostPorts, String srcGroup,
161                                           String dstGroup) throws IOException {
162     if (!rsGroupMap.containsKey(srcGroup)) {
163       throw new DoNotRetryIOException("Group "+srcGroup+" does not exist");
164     }
165     if (!rsGroupMap.containsKey(dstGroup)) {
166       throw new DoNotRetryIOException("Group "+dstGroup+" does not exist");
167     }
168 
169     RSGroupInfo src = new RSGroupInfo(getRSGroup(srcGroup));
170     RSGroupInfo dst = new RSGroupInfo(getRSGroup(dstGroup));
171     boolean foundOne = false;
172     for(HostAndPort el: hostPorts) {
173       foundOne = src.removeServer(el) || foundOne;
174       dst.addServer(el);
175     }
176 
177     Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
178     newGroupMap.put(src.getName(), src);
179     newGroupMap.put(dst.getName(), dst);
180 
181     flushConfig(newGroupMap);
182     return foundOne;
183   }
184 
185   /**
186    * Gets the group info of server.
187    *
188    * @param hostPort the server
189    * @return An instance of GroupInfo.
190    */
191   @Override
192   public RSGroupInfo getRSGroupOfServer(HostAndPort hostPort) throws IOException {
193     for (RSGroupInfo info : rsGroupMap.values()) {
194       if (info.containsServer(hostPort)){
195         return info;
196       }
197     }
198     return null;
199   }
200 
201   /**
202    * Gets the group information.
203    *
204    * @param groupName
205    *          the group name
206    * @return An instance of GroupInfo
207    */
208   @Override
209   public RSGroupInfo getRSGroup(String groupName) throws IOException {
210     RSGroupInfo RSGroupInfo = rsGroupMap.get(groupName);
211     return RSGroupInfo;
212   }
213 
214 
215 
216   @Override
217   public String getRSGroupOfTable(TableName tableName) throws IOException {
218     return tableMap.get(tableName);
219   }
220 
221   @Override
222   public synchronized void moveTables(
223       Set<TableName> tableNames, String groupName) throws IOException {
224     if (groupName != null && !rsGroupMap.containsKey(groupName)) {
225       throw new DoNotRetryIOException("Group "+groupName+" does not exist or is a special group");
226     }
227 
228     Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
229     for(TableName tableName: tableNames) {
230       if (tableMap.containsKey(tableName)) {
231         RSGroupInfo src = new RSGroupInfo(newGroupMap.get(tableMap.get(tableName)));
232         src.removeTable(tableName);
233         newGroupMap.put(src.getName(), src);
234       }
235       if(groupName != null) {
236         RSGroupInfo dst = new RSGroupInfo(newGroupMap.get(groupName));
237         dst.addTable(tableName);
238         newGroupMap.put(dst.getName(), dst);
239       }
240     }
241 
242     flushConfig(newGroupMap);
243   }
244 
245 
246   /**
247    * Delete a region server group.
248    *
249    * @param groupName the group name
250    * @throws java.io.IOException Signals that an I/O exception has occurred.
251    */
252   @Override
253   public synchronized void removeRSGroup(String groupName) throws IOException {
254     if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) {
255       throw new DoNotRetryIOException("Group "+groupName+" does not exist or is a reserved group");
256     }
257     Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
258     newGroupMap.remove(groupName);
259     flushConfig(newGroupMap);
260   }
261 
262   @Override
263   public List<RSGroupInfo> listRSGroups() throws IOException {
264     List<RSGroupInfo> list = Lists.newLinkedList(rsGroupMap.values());
265     return list;
266   }
267 
268   @Override
269   public boolean isOnline() {
270     return rsGroupStartupWorker.isOnline();
271   }
272 
273   @Override
274   public synchronized void refresh() throws IOException {
275     refresh(false);
276   }
277 
278   private synchronized void refresh(boolean forceOnline) throws IOException {
279     List<RSGroupInfo> groupList = new LinkedList<RSGroupInfo>();
280 
281     // overwrite anything read from zk, group table is source of truth
282     // if online read from GROUP table
283     if (forceOnline || isOnline()) {
284       LOG.debug("Refreshing in Online mode.");
285       if (rsGroupTable == null) {
286         rsGroupTable = conn.getTable(RSGROUP_TABLE_NAME);
287       }
288       groupList.addAll(rsGroupSerDe.retrieveGroupList(rsGroupTable));
289     } else {
290       LOG.debug("Refershing in Offline mode.");
291       String groupBasePath = ZKUtil.joinZNode(watcher.baseZNode, rsGroupZNode);
292       groupList.addAll(rsGroupSerDe.retrieveGroupList(watcher, groupBasePath));
293     }
294 
295     // refresh default group, prune
296     NavigableSet<TableName> orphanTables = new TreeSet<TableName>();
297     for(String entry: master.getTableDescriptors().getAll().keySet()) {
298       orphanTables.add(TableName.valueOf(entry));
299     }
300 
301     List<TableName> specialTables;
302     if(!master.isInitialized()) {
303       specialTables = new ArrayList<TableName>();
304       specialTables.add(AccessControlLists.ACL_TABLE_NAME);
305       specialTables.add(TableName.META_TABLE_NAME);
306       specialTables.add(TableName.NAMESPACE_TABLE_NAME);
307       specialTables.add(RSGROUP_TABLE_NAME);
308     } else {
309       specialTables =
310           master.listTableNamesByNamespace(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR);
311     }
312 
313     for(TableName table : specialTables) {
314       orphanTables.add(table);
315     }
316     for(RSGroupInfo group: groupList) {
317       if(!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
318         orphanTables.removeAll(group.getTables());
319       }
320     }
321 
322     // This is added to the last of the list
323     // so it overwrites the default group loaded
324     // from region group table or zk
325     groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP,
326         Sets.newHashSet(getDefaultServers()),
327         orphanTables));
328 
329 
330     // populate the data
331     HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap();
332     HashMap<TableName, String> newTableMap = Maps.newHashMap();
333     for (RSGroupInfo group : groupList) {
334       newGroupMap.put(group.getName(), group);
335       for(TableName table: group.getTables()) {
336         newTableMap.put(table, group.getName());
337       }
338     }
339     rsGroupMap = Collections.unmodifiableMap(newGroupMap);
340     tableMap = Collections.unmodifiableMap(newTableMap);
341 
342     prevRSGroups.clear();
343     prevRSGroups.addAll(rsGroupMap.keySet());
344   }
345 
346   private synchronized Map<TableName,String> flushConfigTable(Map<String,RSGroupInfo> newGroupMap)
347       throws IOException {
348     Map<TableName,String> newTableMap = Maps.newHashMap();
349     List<Mutation> mutations = Lists.newArrayList();
350 
351     // populate deletes
352     for(String groupName : prevRSGroups) {
353       if(!newGroupMap.containsKey(groupName)) {
354         Delete d = new Delete(Bytes.toBytes(groupName));
355         mutations.add(d);
356       }
357     }
358 
359     // populate puts
360     for(RSGroupInfo RSGroupInfo : newGroupMap.values()) {
361       RSGroupProtos.RSGroupInfo proto = ProtobufUtil.toProtoGroupInfo(RSGroupInfo);
362       Put p = new Put(Bytes.toBytes(RSGroupInfo.getName()));
363       p.addColumn(META_FAMILY_BYTES,
364           META_QUALIFIER_BYTES,
365           proto.toByteArray());
366       mutations.add(p);
367       for(TableName entry: RSGroupInfo.getTables()) {
368         newTableMap.put(entry, RSGroupInfo.getName());
369       }
370     }
371 
372     if(mutations.size() > 0) {
373       multiMutate(mutations);
374     }
375     return newTableMap;
376   }
377 
378   private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) throws IOException {
379     Map<TableName, String> newTableMap;
380 
381     // For offline mode persistence is still unavailable
382     // We're refreshing in-memory state but only for default servers
383     if (!isOnline()) {
384       Map<String, RSGroupInfo> m = Maps.newHashMap(rsGroupMap);
385       RSGroupInfo oldDefaultGroup = m.remove(RSGroupInfo.DEFAULT_GROUP);
386       RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
387       if (!m.equals(newGroupMap) ||
388           !oldDefaultGroup.getTables().equals(newDefaultGroup.getTables())) {
389         throw new IOException("Only default servers can be updated during offline mode");
390       }
391       newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroup);
392       rsGroupMap = newGroupMap;
393       return;
394     }
395 
396     newTableMap = flushConfigTable(newGroupMap);
397 
398     // make changes visible since it has been
399     // persisted in the source of truth
400     rsGroupMap = Collections.unmodifiableMap(newGroupMap);
401     tableMap = Collections.unmodifiableMap(newTableMap);
402 
403 
404     try {
405       String groupBasePath = ZKUtil.joinZNode(watcher.baseZNode, rsGroupZNode);
406       ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufUtil.PB_MAGIC);
407 
408       List<ZKUtil.ZKUtilOp> zkOps = new ArrayList<ZKUtil.ZKUtilOp>(newGroupMap.size());
409       for(String groupName : prevRSGroups) {
410         if(!newGroupMap.containsKey(groupName)) {
411           String znode = ZKUtil.joinZNode(groupBasePath, groupName);
412           zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
413         }
414       }
415 
416 
417       for(RSGroupInfo RSGroupInfo : newGroupMap.values()) {
418         String znode = ZKUtil.joinZNode(groupBasePath, RSGroupInfo.getName());
419         RSGroupProtos.RSGroupInfo proto = ProtobufUtil.toProtoGroupInfo(RSGroupInfo);
420         LOG.debug("Updating znode: "+znode);
421         ZKUtil.createAndFailSilent(watcher, znode);
422         zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
423         zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode,
424             ProtobufUtil.prependPBMagic(proto.toByteArray())));
425       }
426       LOG.debug("Writing ZK GroupInfo count: " + zkOps.size());
427 
428       ZKUtil.multiOrSequential(watcher, zkOps, false);
429     } catch (KeeperException e) {
430       LOG.error("Failed to write to rsGroupZNode", e);
431       master.abort("Failed to write to rsGroupZNode", e);
432       throw new IOException("Failed to write to rsGroupZNode",e);
433     }
434 
435     prevRSGroups.clear();
436     prevRSGroups.addAll(newGroupMap.keySet());
437   }
438 
439   private List<ServerName> getOnlineRS() throws IOException {
440     if (master != null) {
441       return master.getServerManager().getOnlineServersList();
442     }
443     try {
444       LOG.debug("Reading online RS from zookeeper");
445       List<ServerName> servers = new LinkedList<ServerName>();
446       for (String el: ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode)) {
447         servers.add(ServerName.parseServerName(el));
448       }
449       return servers;
450     } catch (KeeperException e) {
451       throw new IOException("Failed to retrieve server list from zookeeper", e);
452     }
453   }
454 
455   private List<HostAndPort> getDefaultServers() throws IOException {
456     List<HostAndPort> defaultServers = new LinkedList<HostAndPort>();
457     for(ServerName server : getOnlineRS()) {
458       HostAndPort hostPort = HostAndPort.fromParts(server.getHostname(), server.getPort());
459       boolean found = false;
460       for(RSGroupInfo RSGroupInfo : rsGroupMap.values()) {
461         if(!RSGroupInfo.DEFAULT_GROUP.equals(RSGroupInfo.getName()) &&
462             RSGroupInfo.containsServer(hostPort)) {
463           found = true;
464           break;
465         }
466       }
467       if(!found) {
468         defaultServers.add(hostPort);
469       }
470     }
471     return defaultServers;
472   }
473 
474   private synchronized void updateDefaultServers(
475       Set<HostAndPort> hostPort) throws IOException {
476     RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP);
477     RSGroupInfo newInfo = new RSGroupInfo(info.getName(), hostPort, info.getTables());
478     HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
479     newGroupMap.put(newInfo.getName(), newInfo);
480     flushConfig(newGroupMap);
481   }
482 
483   @Override
484   public void serverAdded(ServerName serverName) {
485     defaultServerUpdater.serverChanged();
486   }
487 
488   @Override
489   public void serverRemoved(ServerName serverName) {
490     defaultServerUpdater.serverChanged();
491   }
492 
493   private static class DefaultServerUpdater extends Thread {
494     private static final Log LOG = LogFactory.getLog(DefaultServerUpdater.class);
495     private RSGroupInfoManagerImpl mgr;
496     private boolean hasChanged = false;
497 
498     public DefaultServerUpdater(RSGroupInfoManagerImpl mgr) {
499       this.mgr = mgr;
500     }
501 
502     @Override
503     public void run() {
504       List<HostAndPort> prevDefaultServers = new LinkedList<HostAndPort>();
505       while(!mgr.master.isAborted() || !mgr.master.isStopped()) {
506         try {
507           LOG.info("Updating default servers.");
508           List<HostAndPort> servers = mgr.getDefaultServers();
509           Collections.sort(servers, new Comparator<HostAndPort>() {
510             @Override
511             public int compare(HostAndPort o1, HostAndPort o2) {
512               int diff = o1.getHostText().compareTo(o2.getHostText());
513               if (diff != 0) {
514                 return diff;
515               }
516               return o1.getPort() - o2.getPort();
517             }
518           });
519           if(!servers.equals(prevDefaultServers)) {
520             mgr.updateDefaultServers(Sets.<HostAndPort>newHashSet(servers));
521             prevDefaultServers = servers;
522             LOG.info("Updated with servers: "+servers.size());
523           }
524           try {
525             synchronized (this) {
526               if(!hasChanged) {
527                 wait();
528               }
529               hasChanged = false;
530             }
531           } catch (InterruptedException e) {
532           }
533         } catch (IOException e) {
534           LOG.warn("Failed to update default servers", e);
535         }
536       }
537     }
538 
539     public void serverChanged() {
540       synchronized (this) {
541         hasChanged = true;
542         this.notify();
543       }
544     }
545   }
546 
547 
548   private static class RSGroupStartupWorker extends Thread {
549     private static final Log LOG = LogFactory.getLog(RSGroupStartupWorker.class);
550 
551     private Configuration conf;
552     private volatile boolean isOnline = false;
553     private MasterServices masterServices;
554     private RSGroupInfoManagerImpl groupInfoManager;
555     private ClusterConnection conn;
556 
557     public RSGroupStartupWorker(RSGroupInfoManagerImpl groupInfoManager,
558                                 MasterServices masterServices,
559                                 ClusterConnection conn) {
560       this.conf = masterServices.getConfiguration();
561       this.masterServices = masterServices;
562       this.groupInfoManager = groupInfoManager;
563       this.conn = conn;
564       setName(RSGroupStartupWorker.class.getName()+"-"+masterServices.getServerName());
565       setDaemon(true);
566     }
567 
568     @Override
569     public void run() {
570       if(waitForGroupTableOnline()) {
571         LOG.info("GroupBasedLoadBalancer is now online");
572       }
573     }
574 
575     public boolean waitForGroupTableOnline() {
576       final List<HRegionInfo> foundRegions = new LinkedList<HRegionInfo>();
577       final List<HRegionInfo> assignedRegions = new LinkedList<HRegionInfo>();
578       final AtomicBoolean found = new AtomicBoolean(false);
579       final TableStateManager tsm = masterServices.getTableStateManager();
580       boolean createSent = false;
581       while (!found.get() && isMasterRunning()) {
582         foundRegions.clear();
583         assignedRegions.clear();
584         found.set(true);
585         try {
586           final Table nsTable = conn.getTable(TableName.NAMESPACE_TABLE_NAME);
587           final Table groupTable = conn.getTable(RSGROUP_TABLE_NAME);
588           boolean rootMetaFound =
589               masterServices.getMetaTableLocator().verifyMetaRegionLocation(
590                   conn,
591                   masterServices.getZooKeeper(),
592                   1);
593           final AtomicBoolean nsFound = new AtomicBoolean(false);
594           if (rootMetaFound) {
595 
596             MetaTableAccessor.Visitor visitor = new DefaultVisitorBase() {
597               @Override
598               public boolean visitInternal(Result row) throws IOException {
599 
600                 HRegionInfo info = MetaTableAccessor.getHRegionInfo(row);
601                 if (info != null) {
602                   Cell serverCell =
603                       row.getColumnLatestCell(HConstants.CATALOG_FAMILY,
604                           HConstants.SERVER_QUALIFIER);
605                   if (RSGROUP_TABLE_NAME.equals(info.getTable()) && serverCell != null) {
606                     ServerName sn =
607                         ServerName.parseVersionedServerName(CellUtil.cloneValue(serverCell));
608                     if (sn == null) {
609                       found.set(false);
610                     } else if (tsm.isTableState(RSGROUP_TABLE_NAME, ZooKeeperProtos.Table.State.ENABLED)) {
611                       try {
612                         ClientProtos.ClientService.BlockingInterface rs = conn.getClient(sn);
613                         ClientProtos.GetRequest request =
614                             RequestConverter.buildGetRequest(info.getRegionName(),
615                                 new Get(ROW_KEY));
616                         rs.get(null, request);
617                         assignedRegions.add(info);
618                       } catch(Exception ex) {
619                         LOG.debug("Caught exception while verifying group region", ex);
620                       }
621                     }
622                     foundRegions.add(info);
623                   }
624                   if (TableName.NAMESPACE_TABLE_NAME.equals(info.getTable())) {
625                     Cell cell = row.getColumnLatestCell(HConstants.CATALOG_FAMILY,
626                         HConstants.SERVER_QUALIFIER);
627                     ServerName sn = null;
628                     if(cell != null) {
629                       sn = ServerName.parseVersionedServerName(CellUtil.cloneValue(cell));
630                     }
631                     if (tsm.isTableState(TableName.NAMESPACE_TABLE_NAME,
632                       ZooKeeperProtos.Table.State.ENABLED)) {
633                       try {
634                         ClientProtos.ClientService.BlockingInterface rs = conn.getClient(sn);
635                         ClientProtos.GetRequest request =
636                             RequestConverter.buildGetRequest(info.getRegionName(),
637                                 new Get(ROW_KEY));
638                         rs.get(null, request);
639                         nsFound.set(true);
640                       } catch(Exception ex) {
641                         LOG.debug("Caught exception while verifying group region", ex);
642                       }
643                     }
644                   }
645                 }
646                 return true;
647               }
648             };
649             MetaTableAccessor.fullScan(conn, visitor);
650             // if no regions in meta then we have to create the table
651             if (foundRegions.size() < 1 && rootMetaFound && !createSent && nsFound.get()) {
652               groupInfoManager.createGroupTable(masterServices);
653               createSent = true;
654             }
655             LOG.info("Group table: " + RSGROUP_TABLE_NAME + " isOnline: " + found.get()
656                 + ", regionCount: " + foundRegions.size() + ", assignCount: "
657                 + assignedRegions.size() + ", rootMetaFound: "+rootMetaFound);
658             found.set(found.get() && assignedRegions.size() == foundRegions.size()
659                 && foundRegions.size() > 0);
660           } else {
661             LOG.info("Waiting for catalog tables to come online");
662             found.set(false);
663           }
664           if (found.get()) {
665             LOG.debug("With group table online, refreshing cached information.");
666             groupInfoManager.refresh(true);
667             isOnline = true;
668             //flush any inconsistencies between ZK and HTable
669             groupInfoManager.flushConfig(groupInfoManager.rsGroupMap);
670           }
671         } catch(Exception e) {
672           found.set(false);
673           LOG.warn("Failed to perform check", e);
674         }
675         try {
676           Thread.sleep(100);
677         } catch (InterruptedException e) {
678           LOG.info("Sleep interrupted", e);
679         }
680       }
681       return found.get();
682     }
683 
684     public boolean isOnline() {
685       return isOnline;
686     }
687 
688     private boolean isMasterRunning() {
689       return !masterServices.isAborted() && !masterServices.isStopped();
690     }
691   }
692 
693   private void createGroupTable(MasterServices masterServices) throws IOException {
694     HRegionInfo[] newRegions =
695         ModifyRegionUtils.createHRegionInfos(RSGROUP_TABLE_DESC, null);
696     ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
697     masterServices.getMasterProcedureExecutor().submitProcedure(
698         new CreateTableProcedure(
699             masterServices.getMasterProcedureExecutor().getEnvironment(),
700             RSGROUP_TABLE_DESC,
701             newRegions,
702             latch));
703     latch.await();
704     // wait for region to be online
705     int tries = 600;
706     while(masterServices.getAssignmentManager().getRegionStates()
707         .getRegionServerOfRegion(newRegions[0]) == null && tries > 0) {
708       try {
709         Thread.sleep(100);
710       } catch (InterruptedException e) {
711         throw new IOException("Wait interrupted", e);
712       }
713       tries--;
714     }
715     if(tries <= 0) {
716       throw new IOException("Failed to create group table.");
717     }
718   }
719 
720   private void multiMutate(List<Mutation> mutations)
721       throws IOException {
722     CoprocessorRpcChannel channel = rsGroupTable.coprocessorService(ROW_KEY);
723     MultiRowMutationProtos.MutateRowsRequest.Builder mmrBuilder
724       = MultiRowMutationProtos.MutateRowsRequest.newBuilder();
725     for (Mutation mutation : mutations) {
726       if (mutation instanceof Put) {
727         mmrBuilder.addMutationRequest(ProtobufUtil.toMutation(
728           ClientProtos.MutationProto.MutationType.PUT, mutation));
729       } else if (mutation instanceof Delete) {
730         mmrBuilder.addMutationRequest(ProtobufUtil.toMutation(
731           ClientProtos.MutationProto.MutationType.DELETE, mutation));
732       } else {
733         throw new DoNotRetryIOException("multiMutate doesn't support "
734           + mutation.getClass().getName());
735       }
736     }
737 
738     MultiRowMutationProtos.MultiRowMutationService.BlockingInterface service =
739       MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel);
740     try {
741       service.mutateRows(null, mmrBuilder.build());
742     } catch (ServiceException ex) {
743       ProtobufUtil.toIOException(ex);
744     }
745   }
746 
747   private void checkGroupName(String groupName) throws ConstraintException {
748     if(!groupName.matches("[a-zA-Z0-9_]+")) {
749       throw new ConstraintException("Group name should only contain alphanumeric characters");
750     }
751   }
752 }