1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master.balancer;
19
20 import java.util.ArrayList;
21 import java.util.Arrays;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.NavigableMap;
27 import java.util.Random;
28 import java.util.TreeMap;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.hbase.HBaseIOException;
34 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
35 import org.apache.hadoop.hbase.HRegionInfo;
36 import org.apache.hadoop.hbase.ServerName;
37 import org.apache.hadoop.hbase.TableName;
38 import org.apache.hadoop.hbase.master.RegionPlan;
39
40 import com.google.common.collect.MinMaxPriorityQueue;
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
60 public class SimpleLoadBalancer extends BaseLoadBalancer {
61 private static final Log LOG = LogFactory.getLog(SimpleLoadBalancer.class);
62 private static final Random RANDOM = new Random(System.currentTimeMillis());
63
64 private RegionInfoComparator riComparator = new RegionInfoComparator();
65 private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator();
66
67
68
69
70
71
72
73
74
75
76 static class BalanceInfo {
77
78 private final int nextRegionForUnload;
79 private int numRegionsAdded;
80
81 public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) {
82 this.nextRegionForUnload = nextRegionForUnload;
83 this.numRegionsAdded = numRegionsAdded;
84 }
85
86 int getNextRegionForUnload() {
87 return nextRegionForUnload;
88 }
89
90 int getNumRegionsAdded() {
91 return numRegionsAdded;
92 }
93
94 void setNumRegionsAdded(int numAdded) {
95 this.numRegionsAdded = numAdded;
96 }
97 }
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184 @Override
185 public List<RegionPlan> balanceCluster(
186 Map<ServerName, List<HRegionInfo>> clusterMap) {
187 List<RegionPlan> regionsToReturn = balanceMasterRegions(clusterMap);
188 if (regionsToReturn != null || clusterMap == null || clusterMap.size() <= 1) {
189 return regionsToReturn;
190 }
191 if (masterServerName != null && clusterMap.containsKey(masterServerName)) {
192 if (clusterMap.size() <= 2) {
193 return null;
194 }
195 clusterMap = new HashMap<ServerName, List<HRegionInfo>>(clusterMap);
196 clusterMap.remove(masterServerName);
197 }
198
199 long startTime = System.currentTimeMillis();
200
201
202
203 Cluster c = new Cluster(clusterMap, null, this.regionFinder, this.rackManager);
204 if (!this.needsBalance(c)) return null;
205
206 ClusterLoadState cs = new ClusterLoadState(clusterMap);
207 int numServers = cs.getNumServers();
208 NavigableMap<ServerAndLoad, List<HRegionInfo>> serversByLoad = cs.getServersByLoad();
209 int numRegions = cs.getNumRegions();
210 float average = cs.getLoadAverage();
211 int max = (int)Math.ceil(average);
212 int min = (int)average;
213
214
215 StringBuilder strBalanceParam = new StringBuilder();
216 strBalanceParam.append("Balance parameter: numRegions=").append(numRegions)
217 .append(", numServers=").append(numServers).append(", max=").append(max)
218 .append(", min=").append(min);
219 LOG.debug(strBalanceParam.toString());
220
221
222
223 MinMaxPriorityQueue<RegionPlan> regionsToMove =
224 MinMaxPriorityQueue.orderedBy(rpComparator).create();
225 regionsToReturn = new ArrayList<RegionPlan>();
226
227
228 int serversOverloaded = 0;
229
230 boolean fetchFromTail = false;
231 Map<ServerName, BalanceInfo> serverBalanceInfo =
232 new TreeMap<ServerName, BalanceInfo>();
233 for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server:
234 serversByLoad.descendingMap().entrySet()) {
235 ServerAndLoad sal = server.getKey();
236 int load = sal.getLoad();
237 if (load <= max) {
238 serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0));
239 break;
240 }
241 serversOverloaded++;
242 List<HRegionInfo> regions = server.getValue();
243 int numToOffload = Math.min(load - max, regions.size());
244
245
246 Collections.sort(regions, riComparator);
247 int numTaken = 0;
248 for (int i = 0; i <= numToOffload; ) {
249 HRegionInfo hri = regions.get(i);
250 if (fetchFromTail) {
251 hri = regions.get(regions.size() - 1 - i);
252 }
253 i++;
254
255 if (shouldBeOnMaster(hri)
256 && masterServerName.equals(sal.getServerName())) continue;
257 regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null));
258 numTaken++;
259 if (numTaken >= numToOffload) break;
260 }
261 serverBalanceInfo.put(sal.getServerName(),
262 new BalanceInfo(numToOffload, (-1)*numTaken));
263 }
264 int totalNumMoved = regionsToMove.size();
265
266
267 int neededRegions = 0;
268 fetchFromTail = false;
269
270 Map<ServerName, Integer> underloadedServers = new HashMap<ServerName, Integer>();
271 int maxToTake = numRegions - min;
272 for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server:
273 serversByLoad.entrySet()) {
274 if (maxToTake == 0) break;
275 int load = server.getKey().getLoad();
276 if (load >= min && load > 0) {
277 continue;
278 }
279 int regionsToPut = min - load;
280 if (regionsToPut == 0)
281 {
282 regionsToPut = 1;
283 }
284 maxToTake -= regionsToPut;
285 underloadedServers.put(server.getKey().getServerName(), regionsToPut);
286 }
287
288 int serversUnderloaded = underloadedServers.size();
289 int incr = 1;
290 List<ServerName> sns =
291 Arrays.asList(underloadedServers.keySet().toArray(new ServerName[serversUnderloaded]));
292 Collections.shuffle(sns, RANDOM);
293 while (regionsToMove.size() > 0) {
294 int cnt = 0;
295 int i = incr > 0 ? 0 : underloadedServers.size()-1;
296 for (; i >= 0 && i < underloadedServers.size(); i += incr) {
297 if (regionsToMove.isEmpty()) break;
298 ServerName si = sns.get(i);
299 int numToTake = underloadedServers.get(si);
300 if (numToTake == 0) continue;
301
302 addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn);
303
304 underloadedServers.put(si, numToTake-1);
305 cnt++;
306 BalanceInfo bi = serverBalanceInfo.get(si);
307 if (bi == null) {
308 bi = new BalanceInfo(0, 0);
309 serverBalanceInfo.put(si, bi);
310 }
311 bi.setNumRegionsAdded(bi.getNumRegionsAdded()+1);
312 }
313 if (cnt == 0) break;
314
315 incr = -incr;
316 }
317 for (Integer i : underloadedServers.values()) {
318
319 neededRegions += i;
320 }
321
322
323
324 if (neededRegions == 0 && regionsToMove.isEmpty()) {
325 long endTime = System.currentTimeMillis();
326 LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " +
327 "Moving " + totalNumMoved + " regions off of " +
328 serversOverloaded + " overloaded servers onto " +
329 serversUnderloaded + " less loaded servers");
330 return regionsToReturn;
331 }
332
333
334
335
336
337 if (neededRegions != 0) {
338
339 for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
340 serversByLoad.descendingMap().entrySet()) {
341 BalanceInfo balanceInfo =
342 serverBalanceInfo.get(server.getKey().getServerName());
343 int idx =
344 balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload();
345 if (idx >= server.getValue().size()) break;
346 HRegionInfo region = server.getValue().get(idx);
347 if (region.isMetaRegion()) continue;
348 regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null));
349 totalNumMoved++;
350 if (--neededRegions == 0) {
351
352 break;
353 }
354 }
355 }
356
357
358
359
360
361 for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
362 serversByLoad.entrySet()) {
363 int regionCount = server.getKey().getLoad();
364 if (regionCount >= min) break;
365 BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
366 if(balanceInfo != null) {
367 regionCount += balanceInfo.getNumRegionsAdded();
368 }
369 if(regionCount >= min) {
370 continue;
371 }
372 int numToTake = min - regionCount;
373 int numTaken = 0;
374 while(numTaken < numToTake && 0 < regionsToMove.size()) {
375 addRegionPlan(regionsToMove, fetchFromTail,
376 server.getKey().getServerName(), regionsToReturn);
377 numTaken++;
378 }
379 }
380
381
382 if (0 < regionsToMove.size()) {
383 for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
384 serversByLoad.entrySet()) {
385 int regionCount = server.getKey().getLoad();
386 BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
387 if(balanceInfo != null) {
388 regionCount += balanceInfo.getNumRegionsAdded();
389 }
390 if(regionCount >= max) {
391 break;
392 }
393 addRegionPlan(regionsToMove, fetchFromTail,
394 server.getKey().getServerName(), regionsToReturn);
395 if (regionsToMove.isEmpty()) {
396 break;
397 }
398 }
399 }
400
401 long endTime = System.currentTimeMillis();
402
403 if (!regionsToMove.isEmpty() || neededRegions != 0) {
404
405 LOG.warn("regionsToMove=" + totalNumMoved +
406 ", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded +
407 ", serversUnderloaded=" + serversUnderloaded);
408 StringBuilder sb = new StringBuilder();
409 for (Map.Entry<ServerName, List<HRegionInfo>> e: clusterMap.entrySet()) {
410 if (sb.length() > 0) sb.append(", ");
411 sb.append(e.getKey().toString());
412 sb.append(" ");
413 sb.append(e.getValue().size());
414 }
415 LOG.warn("Input " + sb.toString());
416 }
417
418
419 LOG.info("Done. Calculated a load balance in " + (endTime-startTime) + "ms. " +
420 "Moving " + totalNumMoved + " regions off of " +
421 serversOverloaded + " overloaded servers onto " +
422 serversUnderloaded + " less loaded servers");
423
424 return regionsToReturn;
425 }
426
427
428
429
430 private void addRegionPlan(final MinMaxPriorityQueue<RegionPlan> regionsToMove,
431 final boolean fetchFromTail, final ServerName sn, List<RegionPlan> regionsToReturn) {
432 RegionPlan rp = null;
433 if (!fetchFromTail) rp = regionsToMove.remove();
434 else rp = regionsToMove.removeLast();
435 rp.setDestination(sn);
436 regionsToReturn.add(rp);
437 }
438
439 @Override
440 public List<RegionPlan> balanceCluster(TableName tableName,
441 Map<ServerName, List<HRegionInfo>> clusterState) throws HBaseIOException {
442 return balanceCluster(clusterState);
443 }
444 }