View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.rsgroup;
22  
23  import com.google.common.collect.ArrayListMultimap;
24  import com.google.common.collect.LinkedListMultimap;
25  import com.google.common.collect.ListMultimap;
26  import com.google.common.collect.Lists;
27  import com.google.common.collect.Maps;
28  import com.google.common.net.HostAndPort;
29  
30  import java.io.IOException;
31  import java.util.ArrayList;
32  import java.util.Collection;
33  import java.util.Collections;
34  import java.util.HashMap;
35  import java.util.HashSet;
36  import java.util.LinkedList;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.Set;
40  import java.util.TreeMap;
41  
42  import org.apache.commons.logging.Log;
43  import org.apache.commons.logging.LogFactory;
44  
45  import org.apache.hadoop.conf.Configuration;
46  import org.apache.hadoop.hbase.ClusterStatus;
47  import org.apache.hadoop.hbase.HBaseIOException;
48  import org.apache.hadoop.hbase.HRegionInfo;
49  import org.apache.hadoop.hbase.ServerName;
50  import org.apache.hadoop.hbase.TableName;
51  import org.apache.hadoop.hbase.classification.InterfaceAudience;
52  import org.apache.hadoop.hbase.constraint.ConstraintException;
53  import org.apache.hadoop.hbase.master.LoadBalancer;
54  import org.apache.hadoop.hbase.master.MasterServices;
55  import org.apache.hadoop.hbase.master.RegionPlan;
56  import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
57  import org.apache.hadoop.util.ReflectionUtils;
58  
59  /**
60   * GroupBasedLoadBalancer, used when Region Server Grouping is configured (HBase-6721)
61   * It does region balance based on a table's group membership.
62   *
63   * Most assignment methods contain two exclusive code paths: Online - when the group
64   * table is online and Offline - when it is unavailable.
65   *
66   * During Offline, assignments are assigned based on cached information in zookeeper.
67   * If unavailable (ie bootstrap) then regions are assigned randomly.
68   *
69   * Once the GROUP table has been assigned, the balancer switches to Online and will then
70   * start providing appropriate assignments for user tables.
71   *
72   */
73  @InterfaceAudience.Private
74  public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalancer {
75    /** Config for pluggable load balancers */
76    public static final String HBASE_GROUP_LOADBALANCER_CLASS = "hbase.group.grouploadbalancer.class";
77  
78    private static final Log LOG = LogFactory.getLog(RSGroupBasedLoadBalancer.class);
79  
80    private Configuration config;
81    private ClusterStatus clusterStatus;
82    private MasterServices masterServices;
83    private RSGroupInfoManager RSGroupInfoManager;
84    private LoadBalancer internalBalancer;
85  
86    //used during reflection by LoadBalancerFactory
87    @InterfaceAudience.Private
88    public RSGroupBasedLoadBalancer() {
89    }
90  
91    //This constructor should only be used for unit testing
92    @InterfaceAudience.Private
93    public RSGroupBasedLoadBalancer(RSGroupInfoManager RSGroupInfoManager) {
94      this.RSGroupInfoManager = RSGroupInfoManager;
95    }
96  
97    @Override
98    public Configuration getConf() {
99      return config;
100   }
101 
102   @Override
103   public void setConf(Configuration conf) {
104     this.config = conf;
105   }
106 
107   @Override
108   public void setClusterStatus(ClusterStatus st) {
109     this.clusterStatus = st;
110   }
111 
112   @Override
113   public void setMasterServices(MasterServices masterServices) {
114     this.masterServices = masterServices;
115   }
116 
117   @Override
118   public List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName, List<HRegionInfo>>
119       clusterState) throws HBaseIOException {
120     return balanceCluster(clusterState);
121   }
122 
123   @Override
124   public List<RegionPlan> balanceCluster(Map<ServerName, List<HRegionInfo>> clusterState)
125       throws HBaseIOException {
126     if (!isOnline()) {
127       throw new ConstraintException(RSGroupInfoManager.RSGROUP_TABLE_NAME +
128           " is not online, unable to perform balance");
129     }
130 
131     Map<ServerName,List<HRegionInfo>> correctedState = correctAssignments(clusterState);
132     List<RegionPlan> regionPlans = new ArrayList<RegionPlan>();
133 
134     List<HRegionInfo> misplacedRegions = correctedState.get(LoadBalancer.BOGUS_SERVER_NAME);
135     for (HRegionInfo regionInfo : misplacedRegions) {
136       regionPlans.add(new RegionPlan(regionInfo, null, null));
137     }
138     try {
139       for (RSGroupInfo info : RSGroupInfoManager.listRSGroups()) {
140         Map<ServerName, List<HRegionInfo>> groupClusterState =
141             new HashMap<ServerName, List<HRegionInfo>>();
142         for (HostAndPort sName : info.getServers()) {
143           for(ServerName curr: clusterState.keySet()) {
144             if(curr.getHostPort().equals(sName)) {
145               groupClusterState.put(curr, correctedState.get(curr));
146             }
147           }
148         }
149         List<RegionPlan> groupPlans = this.internalBalancer
150             .balanceCluster(groupClusterState);
151         if (groupPlans != null) {
152           regionPlans.addAll(groupPlans);
153         }
154       }
155     } catch (IOException exp) {
156       LOG.warn("Exception while balancing cluster.", exp);
157       regionPlans.clear();
158     }
159     return regionPlans;
160   }
161 
162   @Override
163   public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(
164       List<HRegionInfo> regions, List<ServerName> servers) throws HBaseIOException {
165     Map<ServerName, List<HRegionInfo>> assignments = Maps.newHashMap();
166     ListMultimap<String,HRegionInfo> regionMap = ArrayListMultimap.create();
167     ListMultimap<String,ServerName> serverMap = ArrayListMultimap.create();
168     generateGroupMaps(regions, servers, regionMap, serverMap);
169     for(String groupKey : regionMap.keySet()) {
170       if (regionMap.get(groupKey).size() > 0) {
171         Map<ServerName, List<HRegionInfo>> result =
172             this.internalBalancer.roundRobinAssignment(
173                 regionMap.get(groupKey),
174                 serverMap.get(groupKey));
175         if(result != null) {
176           assignments.putAll(result);
177         }
178       }
179     }
180     return assignments;
181   }
182 
183   @Override
184   public Map<ServerName, List<HRegionInfo>> retainAssignment(
185       Map<HRegionInfo, ServerName> regions, List<ServerName> servers) throws HBaseIOException {
186     try {
187       Map<ServerName, List<HRegionInfo>> assignments = new TreeMap<ServerName, List<HRegionInfo>>();
188       ListMultimap<String, HRegionInfo> groupToRegion = ArrayListMultimap.create();
189       Set<HRegionInfo> misplacedRegions = getMisplacedRegions(regions);
190       for (HRegionInfo region : regions.keySet()) {
191         if (!misplacedRegions.contains(region)) {
192           String groupName = RSGroupInfoManager.getRSGroupOfTable(region.getTable());
193           groupToRegion.put(groupName, region);
194         }
195       }
196       // Now the "groupToRegion" map has only the regions which have correct
197       // assignments.
198       for (String key : groupToRegion.keySet()) {
199         Map<HRegionInfo, ServerName> currentAssignmentMap = new TreeMap<HRegionInfo, ServerName>();
200         List<HRegionInfo> regionList = groupToRegion.get(key);
201         RSGroupInfo info = RSGroupInfoManager.getRSGroup(key);
202         List<ServerName> candidateList = filterOfflineServers(info, servers);
203         for (HRegionInfo region : regionList) {
204           currentAssignmentMap.put(region, regions.get(region));
205         }
206         if(candidateList.size() > 0) {
207           assignments.putAll(this.internalBalancer.retainAssignment(
208               currentAssignmentMap, candidateList));
209         }
210       }
211 
212       for (HRegionInfo region : misplacedRegions) {
213         String groupName = RSGroupInfoManager.getRSGroupOfTable(
214             region.getTable());
215         RSGroupInfo info = RSGroupInfoManager.getRSGroup(groupName);
216         List<ServerName> candidateList = filterOfflineServers(info, servers);
217         ServerName server = this.internalBalancer.randomAssignment(region,
218             candidateList);
219         if (server != null) {
220           if (!assignments.containsKey(server)) {
221             assignments.put(server, new ArrayList<HRegionInfo>());
222           }
223           assignments.get(server).add(region);
224         } else {
225           //if not server is available assign to bogus so it ends up in RIT
226           if(!assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) {
227             assignments.put(LoadBalancer.BOGUS_SERVER_NAME, new ArrayList<HRegionInfo>());
228           }
229           assignments.get(LoadBalancer.BOGUS_SERVER_NAME).add(region);
230         }
231       }
232       return assignments;
233     } catch (IOException e) {
234       throw new HBaseIOException("Failed to do online retain assignment", e);
235     }
236   }
237 
238   @Override
239   public ServerName randomAssignment(HRegionInfo region,
240       List<ServerName> servers) throws HBaseIOException {
241     ListMultimap<String,HRegionInfo> regionMap = LinkedListMultimap.create();
242     ListMultimap<String,ServerName> serverMap = LinkedListMultimap.create();
243     generateGroupMaps(Lists.newArrayList(region), servers, regionMap, serverMap);
244     List<ServerName> filteredServers = serverMap.get(regionMap.keySet().iterator().next());
245     return this.internalBalancer.randomAssignment(region, filteredServers);
246   }
247 
248   @Override
249   public Map<HRegionInfo, ServerName> immediateAssignment(
250       List<HRegionInfo> regions, List<ServerName> servers) throws HBaseIOException {
251     Map<HRegionInfo,ServerName> assignments = Maps.newHashMap();
252     ListMultimap<String,HRegionInfo> regionMap = LinkedListMultimap.create();
253     ListMultimap<String,ServerName> serverMap = LinkedListMultimap.create();
254     generateGroupMaps(regions, servers, regionMap, serverMap);
255     for(String groupKey : regionMap.keySet()) {
256       if (regionMap.get(groupKey).size() > 0) {
257         assignments.putAll(
258           this.internalBalancer.immediateAssignment(
259             regionMap.get(groupKey),
260             serverMap.get(groupKey)));
261       }
262     }
263     return assignments;
264   }
265 
266   private void generateGroupMaps(
267     List<HRegionInfo> regions,
268     List<ServerName> servers,
269     ListMultimap<String, HRegionInfo> regionMap,
270     ListMultimap<String, ServerName> serverMap) throws HBaseIOException {
271     try {
272       for (HRegionInfo region : regions) {
273         String groupName = RSGroupInfoManager.getRSGroupOfTable(region.getTable());
274         if(groupName == null) {
275           LOG.warn("Group for table "+region.getTable()+" is null");
276         }
277         regionMap.put(groupName, region);
278       }
279       for (String groupKey : regionMap.keySet()) {
280         RSGroupInfo info = RSGroupInfoManager.getRSGroup(groupKey);
281         serverMap.putAll(groupKey, filterOfflineServers(info, servers));
282         if(serverMap.get(groupKey).size() < 1) {
283           serverMap.put(groupKey, LoadBalancer.BOGUS_SERVER_NAME);
284         }
285       }
286     } catch(IOException e) {
287       throw new HBaseIOException("Failed to generate group maps", e);
288     }
289   }
290 
291   private List<ServerName> filterOfflineServers(RSGroupInfo RSGroupInfo,
292                                                 List<ServerName> onlineServers) {
293     if (RSGroupInfo != null) {
294       return filterServers(RSGroupInfo.getServers(), onlineServers);
295     } else {
296       LOG.debug("Group Information found to be null. Some regions might be unassigned.");
297       return Collections.EMPTY_LIST;
298     }
299   }
300 
301   /**
302    * Filter servers based on the online servers.
303    *
304    * @param servers
305    *          the servers
306    * @param onlineServers
307    *          List of servers which are online.
308    * @return the list
309    */
310   private List<ServerName> filterServers(Collection<HostAndPort> servers,
311       Collection<ServerName> onlineServers) {
312     ArrayList<ServerName> finalList = new ArrayList<ServerName>();
313     for (HostAndPort server : servers) {
314       for(ServerName curr: onlineServers) {
315         if(curr.getHostPort().equals(server)) {
316           finalList.add(curr);
317         }
318       }
319     }
320     return finalList;
321   }
322 
323   private ListMultimap<String, HRegionInfo> groupRegions(
324       List<HRegionInfo> regionList) throws IOException {
325     ListMultimap<String, HRegionInfo> regionGroup = ArrayListMultimap
326         .create();
327     for (HRegionInfo region : regionList) {
328       String groupName = RSGroupInfoManager.getRSGroupOfTable(region.getTable());
329       regionGroup.put(groupName, region);
330     }
331     return regionGroup;
332   }
333 
334   private Set<HRegionInfo> getMisplacedRegions(
335       Map<HRegionInfo, ServerName> regions) throws IOException {
336     Set<HRegionInfo> misplacedRegions = new HashSet<HRegionInfo>();
337     for (HRegionInfo region : regions.keySet()) {
338       ServerName assignedServer = regions.get(region);
339       RSGroupInfo info =
340           RSGroupInfoManager.getRSGroup(RSGroupInfoManager.getRSGroupOfTable(region.getTable()));
341       if (assignedServer != null &&
342           (info == null || !info.containsServer(assignedServer.getHostPort()))) {
343         LOG.debug("Found misplaced region: " + region.getRegionNameAsString() +
344             " on server: " + assignedServer +
345             " found in group: " +
346             RSGroupInfoManager.getRSGroupOfServer(assignedServer.getHostPort()) +
347             " outside of group: " + info.getName());
348         misplacedRegions.add(region);
349       }
350     }
351     return misplacedRegions;
352   }
353 
354   private Map<ServerName, List<HRegionInfo>> correctAssignments(
355        Map<ServerName, List<HRegionInfo>> existingAssignments){
356     Map<ServerName, List<HRegionInfo>> correctAssignments =
357         new TreeMap<ServerName, List<HRegionInfo>>();
358     List<HRegionInfo> misplacedRegions = new LinkedList<HRegionInfo>();
359     correctAssignments.put(LoadBalancer.BOGUS_SERVER_NAME, new LinkedList<HRegionInfo>());
360     for (ServerName sName : existingAssignments.keySet()) {
361       correctAssignments.put(sName, new LinkedList<HRegionInfo>());
362       List<HRegionInfo> regions = existingAssignments.get(sName);
363       for (HRegionInfo region : regions) {
364         RSGroupInfo info = null;
365         try {
366           info = RSGroupInfoManager.getRSGroup(
367               RSGroupInfoManager.getRSGroupOfTable(region.getTable()));
368         }catch(IOException exp){
369           LOG.debug("Group information null for region of table " + region.getTable(),
370               exp);
371         }
372         if ((info == null) || (!info.containsServer(sName.getHostPort()))) {
373           correctAssignments.get(LoadBalancer.BOGUS_SERVER_NAME).add(region);
374         } else {
375           correctAssignments.get(sName).add(region);
376         }
377       }
378     }
379 
380     //TODO bulk unassign?
381     //unassign misplaced regions, so that they are assigned to correct groups.
382     for(HRegionInfo info: misplacedRegions) {
383       this.masterServices.getAssignmentManager().unassign(info);
384     }
385     return correctAssignments;
386   }
387 
388   @Override
389   public void initialize() throws HBaseIOException {
390     try {
391       if (RSGroupInfoManager == null) {
392         List<RSGroupAdminEndpoint> cps =
393           masterServices.getMasterCoprocessorHost().findCoprocessors(RSGroupAdminEndpoint.class);
394         if (cps.size() != 1) {
395           String msg = "Expected one implementation of GroupAdminEndpoint but found " + cps.size();
396           LOG.error(msg);
397           throw new HBaseIOException(msg);
398         }
399         RSGroupInfoManager = cps.get(0).getGroupInfoManager();
400       }
401     } catch (IOException e) {
402       throw new HBaseIOException("Failed to initialize GroupInfoManagerImpl", e);
403     }
404 
405     // Create the balancer
406     Class<? extends LoadBalancer> balancerKlass = config.getClass(
407         HBASE_GROUP_LOADBALANCER_CLASS,
408         StochasticLoadBalancer.class, LoadBalancer.class);
409     internalBalancer = ReflectionUtils.newInstance(balancerKlass, config);
410     internalBalancer.setClusterStatus(clusterStatus);
411     internalBalancer.setMasterServices(masterServices);
412     internalBalancer.setConf(config);
413     internalBalancer.initialize();
414   }
415 
416   public boolean isOnline() {
417     return RSGroupInfoManager != null && RSGroupInfoManager.isOnline();
418   }
419 
420   @Override
421   public void regionOnline(HRegionInfo regionInfo, ServerName sn) {
422   }
423 
424   @Override
425   public void regionOffline(HRegionInfo regionInfo) {
426   }
427 
428   @Override
429   public void onConfigurationChange(Configuration conf) {
430     //DO nothing for now
431   }
432 
433   @Override
434   public void stop(String why) {
435   }
436 
437   @Override
438   public boolean isStopped() {
439     return false;
440   }
441 }