1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.rsgroup;
22
23 import com.google.common.collect.ArrayListMultimap;
24 import com.google.common.collect.LinkedListMultimap;
25 import com.google.common.collect.ListMultimap;
26 import com.google.common.collect.Lists;
27 import com.google.common.collect.Maps;
28 import com.google.common.net.HostAndPort;
29
30 import java.io.IOException;
31 import java.util.ArrayList;
32 import java.util.Collection;
33 import java.util.Collections;
34 import java.util.HashMap;
35 import java.util.HashSet;
36 import java.util.LinkedList;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.Set;
40 import java.util.TreeMap;
41
42 import org.apache.commons.logging.Log;
43 import org.apache.commons.logging.LogFactory;
44
45 import org.apache.hadoop.conf.Configuration;
46 import org.apache.hadoop.hbase.ClusterStatus;
47 import org.apache.hadoop.hbase.HBaseIOException;
48 import org.apache.hadoop.hbase.HRegionInfo;
49 import org.apache.hadoop.hbase.ServerName;
50 import org.apache.hadoop.hbase.TableName;
51 import org.apache.hadoop.hbase.classification.InterfaceAudience;
52 import org.apache.hadoop.hbase.constraint.ConstraintException;
53 import org.apache.hadoop.hbase.master.LoadBalancer;
54 import org.apache.hadoop.hbase.master.MasterServices;
55 import org.apache.hadoop.hbase.master.RegionPlan;
56 import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
57 import org.apache.hadoop.util.ReflectionUtils;
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73 @InterfaceAudience.Private
74 public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalancer {
75
76 public static final String HBASE_GROUP_LOADBALANCER_CLASS = "hbase.group.grouploadbalancer.class";
77
78 private static final Log LOG = LogFactory.getLog(RSGroupBasedLoadBalancer.class);
79
80 private Configuration config;
81 private ClusterStatus clusterStatus;
82 private MasterServices masterServices;
83 private RSGroupInfoManager RSGroupInfoManager;
84 private LoadBalancer internalBalancer;
85
86
87 @InterfaceAudience.Private
88 public RSGroupBasedLoadBalancer() {
89 }
90
91
92 @InterfaceAudience.Private
93 public RSGroupBasedLoadBalancer(RSGroupInfoManager RSGroupInfoManager) {
94 this.RSGroupInfoManager = RSGroupInfoManager;
95 }
96
97 @Override
98 public Configuration getConf() {
99 return config;
100 }
101
102 @Override
103 public void setConf(Configuration conf) {
104 this.config = conf;
105 }
106
107 @Override
108 public void setClusterStatus(ClusterStatus st) {
109 this.clusterStatus = st;
110 }
111
112 @Override
113 public void setMasterServices(MasterServices masterServices) {
114 this.masterServices = masterServices;
115 }
116
117 @Override
118 public List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName, List<HRegionInfo>>
119 clusterState) throws HBaseIOException {
120 return balanceCluster(clusterState);
121 }
122
123 @Override
124 public List<RegionPlan> balanceCluster(Map<ServerName, List<HRegionInfo>> clusterState)
125 throws HBaseIOException {
126 if (!isOnline()) {
127 throw new ConstraintException(RSGroupInfoManager.RSGROUP_TABLE_NAME +
128 " is not online, unable to perform balance");
129 }
130
131 Map<ServerName,List<HRegionInfo>> correctedState = correctAssignments(clusterState);
132 List<RegionPlan> regionPlans = new ArrayList<RegionPlan>();
133
134 List<HRegionInfo> misplacedRegions = correctedState.get(LoadBalancer.BOGUS_SERVER_NAME);
135 for (HRegionInfo regionInfo : misplacedRegions) {
136 regionPlans.add(new RegionPlan(regionInfo, null, null));
137 }
138 try {
139 for (RSGroupInfo info : RSGroupInfoManager.listRSGroups()) {
140 Map<ServerName, List<HRegionInfo>> groupClusterState =
141 new HashMap<ServerName, List<HRegionInfo>>();
142 for (HostAndPort sName : info.getServers()) {
143 for(ServerName curr: clusterState.keySet()) {
144 if(curr.getHostPort().equals(sName)) {
145 groupClusterState.put(curr, correctedState.get(curr));
146 }
147 }
148 }
149 List<RegionPlan> groupPlans = this.internalBalancer
150 .balanceCluster(groupClusterState);
151 if (groupPlans != null) {
152 regionPlans.addAll(groupPlans);
153 }
154 }
155 } catch (IOException exp) {
156 LOG.warn("Exception while balancing cluster.", exp);
157 regionPlans.clear();
158 }
159 return regionPlans;
160 }
161
162 @Override
163 public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(
164 List<HRegionInfo> regions, List<ServerName> servers) throws HBaseIOException {
165 Map<ServerName, List<HRegionInfo>> assignments = Maps.newHashMap();
166 ListMultimap<String,HRegionInfo> regionMap = ArrayListMultimap.create();
167 ListMultimap<String,ServerName> serverMap = ArrayListMultimap.create();
168 generateGroupMaps(regions, servers, regionMap, serverMap);
169 for(String groupKey : regionMap.keySet()) {
170 if (regionMap.get(groupKey).size() > 0) {
171 Map<ServerName, List<HRegionInfo>> result =
172 this.internalBalancer.roundRobinAssignment(
173 regionMap.get(groupKey),
174 serverMap.get(groupKey));
175 if(result != null) {
176 assignments.putAll(result);
177 }
178 }
179 }
180 return assignments;
181 }
182
183 @Override
184 public Map<ServerName, List<HRegionInfo>> retainAssignment(
185 Map<HRegionInfo, ServerName> regions, List<ServerName> servers) throws HBaseIOException {
186 try {
187 Map<ServerName, List<HRegionInfo>> assignments = new TreeMap<ServerName, List<HRegionInfo>>();
188 ListMultimap<String, HRegionInfo> groupToRegion = ArrayListMultimap.create();
189 Set<HRegionInfo> misplacedRegions = getMisplacedRegions(regions);
190 for (HRegionInfo region : regions.keySet()) {
191 if (!misplacedRegions.contains(region)) {
192 String groupName = RSGroupInfoManager.getRSGroupOfTable(region.getTable());
193 groupToRegion.put(groupName, region);
194 }
195 }
196
197
198 for (String key : groupToRegion.keySet()) {
199 Map<HRegionInfo, ServerName> currentAssignmentMap = new TreeMap<HRegionInfo, ServerName>();
200 List<HRegionInfo> regionList = groupToRegion.get(key);
201 RSGroupInfo info = RSGroupInfoManager.getRSGroup(key);
202 List<ServerName> candidateList = filterOfflineServers(info, servers);
203 for (HRegionInfo region : regionList) {
204 currentAssignmentMap.put(region, regions.get(region));
205 }
206 if(candidateList.size() > 0) {
207 assignments.putAll(this.internalBalancer.retainAssignment(
208 currentAssignmentMap, candidateList));
209 }
210 }
211
212 for (HRegionInfo region : misplacedRegions) {
213 String groupName = RSGroupInfoManager.getRSGroupOfTable(
214 region.getTable());
215 RSGroupInfo info = RSGroupInfoManager.getRSGroup(groupName);
216 List<ServerName> candidateList = filterOfflineServers(info, servers);
217 ServerName server = this.internalBalancer.randomAssignment(region,
218 candidateList);
219 if (server != null) {
220 if (!assignments.containsKey(server)) {
221 assignments.put(server, new ArrayList<HRegionInfo>());
222 }
223 assignments.get(server).add(region);
224 } else {
225
226 if(!assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) {
227 assignments.put(LoadBalancer.BOGUS_SERVER_NAME, new ArrayList<HRegionInfo>());
228 }
229 assignments.get(LoadBalancer.BOGUS_SERVER_NAME).add(region);
230 }
231 }
232 return assignments;
233 } catch (IOException e) {
234 throw new HBaseIOException("Failed to do online retain assignment", e);
235 }
236 }
237
238 @Override
239 public ServerName randomAssignment(HRegionInfo region,
240 List<ServerName> servers) throws HBaseIOException {
241 ListMultimap<String,HRegionInfo> regionMap = LinkedListMultimap.create();
242 ListMultimap<String,ServerName> serverMap = LinkedListMultimap.create();
243 generateGroupMaps(Lists.newArrayList(region), servers, regionMap, serverMap);
244 List<ServerName> filteredServers = serverMap.get(regionMap.keySet().iterator().next());
245 return this.internalBalancer.randomAssignment(region, filteredServers);
246 }
247
248 @Override
249 public Map<HRegionInfo, ServerName> immediateAssignment(
250 List<HRegionInfo> regions, List<ServerName> servers) throws HBaseIOException {
251 Map<HRegionInfo,ServerName> assignments = Maps.newHashMap();
252 ListMultimap<String,HRegionInfo> regionMap = LinkedListMultimap.create();
253 ListMultimap<String,ServerName> serverMap = LinkedListMultimap.create();
254 generateGroupMaps(regions, servers, regionMap, serverMap);
255 for(String groupKey : regionMap.keySet()) {
256 if (regionMap.get(groupKey).size() > 0) {
257 assignments.putAll(
258 this.internalBalancer.immediateAssignment(
259 regionMap.get(groupKey),
260 serverMap.get(groupKey)));
261 }
262 }
263 return assignments;
264 }
265
266 private void generateGroupMaps(
267 List<HRegionInfo> regions,
268 List<ServerName> servers,
269 ListMultimap<String, HRegionInfo> regionMap,
270 ListMultimap<String, ServerName> serverMap) throws HBaseIOException {
271 try {
272 for (HRegionInfo region : regions) {
273 String groupName = RSGroupInfoManager.getRSGroupOfTable(region.getTable());
274 if(groupName == null) {
275 LOG.warn("Group for table "+region.getTable()+" is null");
276 }
277 regionMap.put(groupName, region);
278 }
279 for (String groupKey : regionMap.keySet()) {
280 RSGroupInfo info = RSGroupInfoManager.getRSGroup(groupKey);
281 serverMap.putAll(groupKey, filterOfflineServers(info, servers));
282 if(serverMap.get(groupKey).size() < 1) {
283 serverMap.put(groupKey, LoadBalancer.BOGUS_SERVER_NAME);
284 }
285 }
286 } catch(IOException e) {
287 throw new HBaseIOException("Failed to generate group maps", e);
288 }
289 }
290
291 private List<ServerName> filterOfflineServers(RSGroupInfo RSGroupInfo,
292 List<ServerName> onlineServers) {
293 if (RSGroupInfo != null) {
294 return filterServers(RSGroupInfo.getServers(), onlineServers);
295 } else {
296 LOG.debug("Group Information found to be null. Some regions might be unassigned.");
297 return Collections.EMPTY_LIST;
298 }
299 }
300
301
302
303
304
305
306
307
308
309
310 private List<ServerName> filterServers(Collection<HostAndPort> servers,
311 Collection<ServerName> onlineServers) {
312 ArrayList<ServerName> finalList = new ArrayList<ServerName>();
313 for (HostAndPort server : servers) {
314 for(ServerName curr: onlineServers) {
315 if(curr.getHostPort().equals(server)) {
316 finalList.add(curr);
317 }
318 }
319 }
320 return finalList;
321 }
322
323 private ListMultimap<String, HRegionInfo> groupRegions(
324 List<HRegionInfo> regionList) throws IOException {
325 ListMultimap<String, HRegionInfo> regionGroup = ArrayListMultimap
326 .create();
327 for (HRegionInfo region : regionList) {
328 String groupName = RSGroupInfoManager.getRSGroupOfTable(region.getTable());
329 regionGroup.put(groupName, region);
330 }
331 return regionGroup;
332 }
333
334 private Set<HRegionInfo> getMisplacedRegions(
335 Map<HRegionInfo, ServerName> regions) throws IOException {
336 Set<HRegionInfo> misplacedRegions = new HashSet<HRegionInfo>();
337 for (HRegionInfo region : regions.keySet()) {
338 ServerName assignedServer = regions.get(region);
339 RSGroupInfo info =
340 RSGroupInfoManager.getRSGroup(RSGroupInfoManager.getRSGroupOfTable(region.getTable()));
341 if (assignedServer != null &&
342 (info == null || !info.containsServer(assignedServer.getHostPort()))) {
343 LOG.debug("Found misplaced region: " + region.getRegionNameAsString() +
344 " on server: " + assignedServer +
345 " found in group: " +
346 RSGroupInfoManager.getRSGroupOfServer(assignedServer.getHostPort()) +
347 " outside of group: " + info.getName());
348 misplacedRegions.add(region);
349 }
350 }
351 return misplacedRegions;
352 }
353
354 private Map<ServerName, List<HRegionInfo>> correctAssignments(
355 Map<ServerName, List<HRegionInfo>> existingAssignments){
356 Map<ServerName, List<HRegionInfo>> correctAssignments =
357 new TreeMap<ServerName, List<HRegionInfo>>();
358 List<HRegionInfo> misplacedRegions = new LinkedList<HRegionInfo>();
359 correctAssignments.put(LoadBalancer.BOGUS_SERVER_NAME, new LinkedList<HRegionInfo>());
360 for (ServerName sName : existingAssignments.keySet()) {
361 correctAssignments.put(sName, new LinkedList<HRegionInfo>());
362 List<HRegionInfo> regions = existingAssignments.get(sName);
363 for (HRegionInfo region : regions) {
364 RSGroupInfo info = null;
365 try {
366 info = RSGroupInfoManager.getRSGroup(
367 RSGroupInfoManager.getRSGroupOfTable(region.getTable()));
368 }catch(IOException exp){
369 LOG.debug("Group information null for region of table " + region.getTable(),
370 exp);
371 }
372 if ((info == null) || (!info.containsServer(sName.getHostPort()))) {
373 correctAssignments.get(LoadBalancer.BOGUS_SERVER_NAME).add(region);
374 } else {
375 correctAssignments.get(sName).add(region);
376 }
377 }
378 }
379
380
381
382 for(HRegionInfo info: misplacedRegions) {
383 this.masterServices.getAssignmentManager().unassign(info);
384 }
385 return correctAssignments;
386 }
387
388 @Override
389 public void initialize() throws HBaseIOException {
390 try {
391 if (RSGroupInfoManager == null) {
392 List<RSGroupAdminEndpoint> cps =
393 masterServices.getMasterCoprocessorHost().findCoprocessors(RSGroupAdminEndpoint.class);
394 if (cps.size() != 1) {
395 String msg = "Expected one implementation of GroupAdminEndpoint but found " + cps.size();
396 LOG.error(msg);
397 throw new HBaseIOException(msg);
398 }
399 RSGroupInfoManager = cps.get(0).getGroupInfoManager();
400 }
401 } catch (IOException e) {
402 throw new HBaseIOException("Failed to initialize GroupInfoManagerImpl", e);
403 }
404
405
406 Class<? extends LoadBalancer> balancerKlass = config.getClass(
407 HBASE_GROUP_LOADBALANCER_CLASS,
408 StochasticLoadBalancer.class, LoadBalancer.class);
409 internalBalancer = ReflectionUtils.newInstance(balancerKlass, config);
410 internalBalancer.setClusterStatus(clusterStatus);
411 internalBalancer.setMasterServices(masterServices);
412 internalBalancer.setConf(config);
413 internalBalancer.initialize();
414 }
415
416 public boolean isOnline() {
417 return RSGroupInfoManager != null && RSGroupInfoManager.isOnline();
418 }
419
420 @Override
421 public void regionOnline(HRegionInfo regionInfo, ServerName sn) {
422 }
423
424 @Override
425 public void regionOffline(HRegionInfo regionInfo) {
426 }
427
428 @Override
429 public void onConfigurationChange(Configuration conf) {
430
431 }
432
433 @Override
434 public void stop(String why) {
435 }
436
437 @Override
438 public boolean isStopped() {
439 return false;
440 }
441 }