View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.balancer;
19  
20  import java.util.ArrayList;
21  import java.util.Arrays;
22  import java.util.Collections;
23  import java.util.HashMap;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.NavigableMap;
27  import java.util.Random;
28  import java.util.TreeMap;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.hbase.HBaseIOException;
34  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
35  import org.apache.hadoop.hbase.HRegionInfo;
36  import org.apache.hadoop.hbase.ServerName;
37  import org.apache.hadoop.hbase.TableName;
38  import org.apache.hadoop.hbase.master.RegionPlan;
39  
40  import com.google.common.collect.MinMaxPriorityQueue;
41  
42  /**
43   * Makes decisions about the placement and movement of Regions across
44   * RegionServers.
45   *
46   * <p>Cluster-wide load balancing will occur only when there are no regions in
47   * transition and according to a fixed period of a time using {@link #balanceCluster(Map)}.
48   *
49   * <p>Inline region placement with {@link #immediateAssignment} can be used when
50   * the Master needs to handle closed regions that it currently does not have
51   * a destination set for.  This can happen during master failover.
52   *
53   * <p>On cluster startup, bulk assignment can be used to determine
54   * locations for all Regions in a cluster.
55   *
56   * <p>This classes produces plans for the 
57   * {@link org.apache.hadoop.hbase.master.AssignmentManager} to execute.
58   */
59  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
60  public class SimpleLoadBalancer extends BaseLoadBalancer {
61    private static final Log LOG = LogFactory.getLog(SimpleLoadBalancer.class);
62    private static final Random RANDOM = new Random(System.currentTimeMillis());
63  
64    private RegionInfoComparator riComparator = new RegionInfoComparator();
65    private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator();
66  
67  
68    /**
69     * Stores additional per-server information about the regions added/removed
70     * during the run of the balancing algorithm.
71     *
72     * For servers that shed regions, we need to track which regions we have already
73     * shed. <b>nextRegionForUnload</b> contains the index in the list of regions on
74     * the server that is the next to be shed.
75     */
76    static class BalanceInfo {
77  
78      private final int nextRegionForUnload;
79      private int numRegionsAdded;
80  
81      public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) {
82        this.nextRegionForUnload = nextRegionForUnload;
83        this.numRegionsAdded = numRegionsAdded;
84      }
85  
86      int getNextRegionForUnload() {
87        return nextRegionForUnload;
88      }
89  
90      int getNumRegionsAdded() {
91        return numRegionsAdded;
92      }
93  
94      void setNumRegionsAdded(int numAdded) {
95        this.numRegionsAdded = numAdded;
96      }
97    }
98  
99    /**
100    * Generate a global load balancing plan according to the specified map of
101    * server information to the most loaded regions of each server.
102    *
103    * The load balancing invariant is that all servers are within 1 region of the
104    * average number of regions per server.  If the average is an integer number,
105    * all servers will be balanced to the average.  Otherwise, all servers will
106    * have either floor(average) or ceiling(average) regions.
107    *
108    * HBASE-3609 Modeled regionsToMove using Guava's MinMaxPriorityQueue so that
109    *   we can fetch from both ends of the queue.
110    * At the beginning, we check whether there was empty region server
111    *   just discovered by Master. If so, we alternately choose new / old
112    *   regions from head / tail of regionsToMove, respectively. This alternation
113    *   avoids clustering young regions on the newly discovered region server.
114    *   Otherwise, we choose new regions from head of regionsToMove.
115    *
116    * Another improvement from HBASE-3609 is that we assign regions from
117    *   regionsToMove to underloaded servers in round-robin fashion.
118    *   Previously one underloaded server would be filled before we move onto
119    *   the next underloaded server, leading to clustering of young regions.
120    *
121    * Finally, we randomly shuffle underloaded servers so that they receive
122    *   offloaded regions relatively evenly across calls to balanceCluster().
123    *
124    * The algorithm is currently implemented as such:
125    *
126    * <ol>
127    * <li>Determine the two valid numbers of regions each server should have,
128    *     <b>MIN</b>=floor(average) and <b>MAX</b>=ceiling(average).
129    *
130    * <li>Iterate down the most loaded servers, shedding regions from each so
131    *     each server hosts exactly <b>MAX</b> regions.  Stop once you reach a
132    *     server that already has &lt;= <b>MAX</b> regions.
133    *     <p>
134    *     Order the regions to move from most recent to least.
135    *
136    * <li>Iterate down the least loaded servers, assigning regions so each server
137    *     has exactly </b>MIN</b> regions.  Stop once you reach a server that
138    *     already has &gt;= <b>MIN</b> regions.
139    *
140    *     Regions being assigned to underloaded servers are those that were shed
141    *     in the previous step.  It is possible that there were not enough
142    *     regions shed to fill each underloaded server to <b>MIN</b>.  If so we
143    *     end up with a number of regions required to do so, <b>neededRegions</b>.
144    *
145    *     It is also possible that we were able to fill each underloaded but ended
146    *     up with regions that were unassigned from overloaded servers but that
147    *     still do not have assignment.
148    *
149    *     If neither of these conditions hold (no regions needed to fill the
150    *     underloaded servers, no regions leftover from overloaded servers),
151    *     we are done and return.  Otherwise we handle these cases below.
152    *
153    * <li>If <b>neededRegions</b> is non-zero (still have underloaded servers),
154    *     we iterate the most loaded servers again, shedding a single server from
155    *     each (this brings them from having <b>MAX</b> regions to having
156    *     <b>MIN</b> regions).
157    *
158    * <li>We now definitely have more regions that need assignment, either from
159    *     the previous step or from the original shedding from overloaded servers.
160    *     Iterate the least loaded servers filling each to <b>MIN</b>.
161    *
162    * <li>If we still have more regions that need assignment, again iterate the
163    *     least loaded servers, this time giving each one (filling them to
164    *     </b>MAX</b>) until we run out.
165    *
166    * <li>All servers will now either host <b>MIN</b> or <b>MAX</b> regions.
167    *
168    *     In addition, any server hosting &gt;= <b>MAX</b> regions is guaranteed
169    *     to end up with <b>MAX</b> regions at the end of the balancing.  This
170    *     ensures the minimal number of regions possible are moved.
171    * </ol>
172    *
173    * TODO: We can at-most reassign the number of regions away from a particular
174    *       server to be how many they report as most loaded.
175    *       Should we just keep all assignment in memory?  Any objections?
176    *       Does this mean we need HeapSize on HMaster?  Or just careful monitor?
177    *       (current thinking is we will hold all assignments in memory)
178    *
179    * @param clusterMap Map of regionservers and their load/region information to
180    *                   a list of their most loaded regions
181    * @return a list of regions to be moved, including source and destination,
182    *         or null if cluster is already balanced
183    */
184   @Override
185   public List<RegionPlan> balanceCluster(
186       Map<ServerName, List<HRegionInfo>> clusterMap) {
187     List<RegionPlan> regionsToReturn = balanceMasterRegions(clusterMap);
188     if (regionsToReturn != null || clusterMap == null || clusterMap.size() <= 1) {
189       return regionsToReturn;
190     }
191     if (masterServerName != null && clusterMap.containsKey(masterServerName)) {
192       if (clusterMap.size() <= 2) {
193         return null;
194       }
195       clusterMap = new HashMap<ServerName, List<HRegionInfo>>(clusterMap);
196       clusterMap.remove(masterServerName);
197     }
198 
199     long startTime = System.currentTimeMillis();
200 
201     // construct a Cluster object with clusterMap and rest of the
202     // argument as defaults
203     Cluster c = new Cluster(clusterMap, null, this.regionFinder, this.rackManager);
204     if (!this.needsBalance(c)) return null;
205 
206     ClusterLoadState cs = new ClusterLoadState(clusterMap);
207     int numServers = cs.getNumServers();
208     NavigableMap<ServerAndLoad, List<HRegionInfo>> serversByLoad = cs.getServersByLoad();
209     int numRegions = cs.getNumRegions();
210     float average = cs.getLoadAverage();
211     int max = (int)Math.ceil(average);
212     int min = (int)average;
213 
214     // Using to check balance result.
215     StringBuilder strBalanceParam = new StringBuilder();
216     strBalanceParam.append("Balance parameter: numRegions=").append(numRegions)
217         .append(", numServers=").append(numServers).append(", max=").append(max)
218         .append(", min=").append(min);
219     LOG.debug(strBalanceParam.toString());
220 
221     // Balance the cluster
222     // TODO: Look at data block locality or a more complex load to do this
223     MinMaxPriorityQueue<RegionPlan> regionsToMove =
224       MinMaxPriorityQueue.orderedBy(rpComparator).create();
225     regionsToReturn = new ArrayList<RegionPlan>();
226 
227     // Walk down most loaded, pruning each to the max
228     int serversOverloaded = 0;
229     // flag used to fetch regions from head and tail of list, alternately
230     boolean fetchFromTail = false;
231     Map<ServerName, BalanceInfo> serverBalanceInfo =
232       new TreeMap<ServerName, BalanceInfo>();
233     for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server:
234         serversByLoad.descendingMap().entrySet()) {
235       ServerAndLoad sal = server.getKey();
236       int load = sal.getLoad();
237       if (load <= max) {
238         serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0));
239         break;
240       }
241       serversOverloaded++;
242       List<HRegionInfo> regions = server.getValue();
243       int numToOffload = Math.min(load - max, regions.size());
244       // account for the out-of-band regions which were assigned to this server
245       // after some other region server crashed
246       Collections.sort(regions, riComparator);
247       int numTaken = 0;
248       for (int i = 0; i <= numToOffload; ) {
249         HRegionInfo hri = regions.get(i); // fetch from head
250         if (fetchFromTail) {
251           hri = regions.get(regions.size() - 1 - i);
252         }
253         i++;
254         // Don't rebalance special regions.
255         if (shouldBeOnMaster(hri)
256             && masterServerName.equals(sal.getServerName())) continue;
257         regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null));
258         numTaken++;
259         if (numTaken >= numToOffload) break;
260       }
261       serverBalanceInfo.put(sal.getServerName(),
262         new BalanceInfo(numToOffload, (-1)*numTaken));
263     }
264     int totalNumMoved = regionsToMove.size();
265 
266     // Walk down least loaded, filling each to the min
267     int neededRegions = 0; // number of regions needed to bring all up to min
268     fetchFromTail = false;
269 
270     Map<ServerName, Integer> underloadedServers = new HashMap<ServerName, Integer>();
271     int maxToTake = numRegions - min;
272     for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server:
273         serversByLoad.entrySet()) {
274       if (maxToTake == 0) break; // no more to take
275       int load = server.getKey().getLoad();
276       if (load >= min && load > 0) {
277         continue; // look for other servers which haven't reached min
278       }
279       int regionsToPut = min - load;
280       if (regionsToPut == 0)
281       {
282         regionsToPut = 1;
283       }
284       maxToTake -= regionsToPut;
285       underloadedServers.put(server.getKey().getServerName(), regionsToPut);
286     }
287     // number of servers that get new regions
288     int serversUnderloaded = underloadedServers.size();
289     int incr = 1;
290     List<ServerName> sns =
291       Arrays.asList(underloadedServers.keySet().toArray(new ServerName[serversUnderloaded]));
292     Collections.shuffle(sns, RANDOM);
293     while (regionsToMove.size() > 0) {
294       int cnt = 0;
295       int i = incr > 0 ? 0 : underloadedServers.size()-1;
296       for (; i >= 0 && i < underloadedServers.size(); i += incr) {
297         if (regionsToMove.isEmpty()) break;
298         ServerName si = sns.get(i);
299         int numToTake = underloadedServers.get(si);
300         if (numToTake == 0) continue;
301 
302         addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn);
303 
304         underloadedServers.put(si, numToTake-1);
305         cnt++;
306         BalanceInfo bi = serverBalanceInfo.get(si);
307         if (bi == null) {
308           bi = new BalanceInfo(0, 0);
309           serverBalanceInfo.put(si, bi);
310         }
311         bi.setNumRegionsAdded(bi.getNumRegionsAdded()+1);
312       }
313       if (cnt == 0) break;
314       // iterates underloadedServers in the other direction
315       incr = -incr;
316     }
317     for (Integer i : underloadedServers.values()) {
318       // If we still want to take some, increment needed
319       neededRegions += i;
320     }
321 
322     // If none needed to fill all to min and none left to drain all to max,
323     // we are done
324     if (neededRegions == 0 && regionsToMove.isEmpty()) {
325       long endTime = System.currentTimeMillis();
326       LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " +
327           "Moving " + totalNumMoved + " regions off of " +
328           serversOverloaded + " overloaded servers onto " +
329           serversUnderloaded + " less loaded servers");
330       return regionsToReturn;
331     }
332 
333     // Need to do a second pass.
334     // Either more regions to assign out or servers that are still underloaded
335 
336     // If we need more to fill min, grab one from each most loaded until enough
337     if (neededRegions != 0) {
338       // Walk down most loaded, grabbing one from each until we get enough
339       for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
340         serversByLoad.descendingMap().entrySet()) {
341         BalanceInfo balanceInfo =
342           serverBalanceInfo.get(server.getKey().getServerName());
343         int idx =
344           balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload();
345         if (idx >= server.getValue().size()) break;
346         HRegionInfo region = server.getValue().get(idx);
347         if (region.isMetaRegion()) continue; // Don't move meta regions.
348         regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null));
349         totalNumMoved++;
350         if (--neededRegions == 0) {
351           // No more regions needed, done shedding
352           break;
353         }
354       }
355     }
356 
357     // Now we have a set of regions that must be all assigned out
358     // Assign each underloaded up to the min, then if leftovers, assign to max
359 
360     // Walk down least loaded, assigning to each to fill up to min
361     for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
362         serversByLoad.entrySet()) {
363       int regionCount = server.getKey().getLoad();
364       if (regionCount >= min) break;
365       BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
366       if(balanceInfo != null) {
367         regionCount += balanceInfo.getNumRegionsAdded();
368       }
369       if(regionCount >= min) {
370         continue;
371       }
372       int numToTake = min - regionCount;
373       int numTaken = 0;
374       while(numTaken < numToTake && 0 < regionsToMove.size()) {
375         addRegionPlan(regionsToMove, fetchFromTail,
376           server.getKey().getServerName(), regionsToReturn);
377         numTaken++;
378       }
379     }
380 
381     // If we still have regions to dish out, assign underloaded to max
382     if (0 < regionsToMove.size()) {
383       for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
384         serversByLoad.entrySet()) {
385         int regionCount = server.getKey().getLoad();
386         BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
387         if(balanceInfo != null) {
388           regionCount += balanceInfo.getNumRegionsAdded();
389         }
390         if(regionCount >= max) {
391           break;
392         }
393         addRegionPlan(regionsToMove, fetchFromTail,
394           server.getKey().getServerName(), regionsToReturn);
395         if (regionsToMove.isEmpty()) {
396           break;
397         }
398       }
399     }
400 
401     long endTime = System.currentTimeMillis();
402 
403     if (!regionsToMove.isEmpty() || neededRegions != 0) {
404       // Emit data so can diagnose how balancer went astray.
405       LOG.warn("regionsToMove=" + totalNumMoved +
406         ", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded +
407         ", serversUnderloaded=" + serversUnderloaded);
408       StringBuilder sb = new StringBuilder();
409       for (Map.Entry<ServerName, List<HRegionInfo>> e: clusterMap.entrySet()) {
410         if (sb.length() > 0) sb.append(", ");
411         sb.append(e.getKey().toString());
412         sb.append(" ");
413         sb.append(e.getValue().size());
414       }
415       LOG.warn("Input " + sb.toString());
416     }
417 
418     // All done!
419     LOG.info("Done. Calculated a load balance in " + (endTime-startTime) + "ms. " +
420         "Moving " + totalNumMoved + " regions off of " +
421         serversOverloaded + " overloaded servers onto " +
422         serversUnderloaded + " less loaded servers");
423 
424     return regionsToReturn;
425   }
426 
427   /**
428    * Add a region from the head or tail to the List of regions to return.
429    */
430   private void addRegionPlan(final MinMaxPriorityQueue<RegionPlan> regionsToMove,
431       final boolean fetchFromTail, final ServerName sn, List<RegionPlan> regionsToReturn) {
432     RegionPlan rp = null;
433     if (!fetchFromTail) rp = regionsToMove.remove();
434     else rp = regionsToMove.removeLast();
435     rp.setDestination(sn);
436     regionsToReturn.add(rp);
437   }
438 
439   @Override
440   public List<RegionPlan> balanceCluster(TableName tableName,
441       Map<ServerName, List<HRegionInfo>> clusterState) throws HBaseIOException {
442     return balanceCluster(clusterState);
443   }
444 }