View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.balancer;
19  
20  import java.util.ArrayList;
21  import java.util.Arrays;
22  import java.util.Collection;
23  import java.util.Collections;
24  import java.util.Comparator;
25  import java.util.Deque;
26  import java.util.HashMap;
27  import java.util.HashSet;
28  import java.util.Iterator;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Map.Entry;
32  import java.util.NavigableMap;
33  import java.util.Random;
34  import java.util.Set;
35  import java.util.TreeMap;
36  
37  import org.apache.commons.lang.NotImplementedException;
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.hbase.ClusterStatus;
42  import org.apache.hadoop.hbase.HBaseIOException;
43  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
44  import org.apache.hadoop.hbase.HRegionInfo;
45  import org.apache.hadoop.hbase.RegionLoad;
46  import org.apache.hadoop.hbase.ServerName;
47  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
48  import org.apache.hadoop.hbase.master.LoadBalancer;
49  import org.apache.hadoop.hbase.master.MasterServices;
50  import org.apache.hadoop.hbase.master.RackManager;
51  import org.apache.hadoop.hbase.master.RegionPlan;
52  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
53  import org.apache.hadoop.util.StringUtils;
54  
55  import com.google.common.base.Joiner;
56  import com.google.common.collect.ArrayListMultimap;
57  import com.google.common.collect.Lists;
58  import com.google.common.collect.Sets;
59  
60  /**
61   * The base class for load balancers. It provides the the functions used to by
62   * {@link org.apache.hadoop.hbase.master.AssignmentManager} to assign regions
63   * in the edge cases. It doesn't provide an implementation of the actual
64   * balancing algorithm.
65   *
66   */
67  public abstract class BaseLoadBalancer implements LoadBalancer {
68    private static final int MIN_SERVER_BALANCE = 2;
69    private volatile boolean stopped = false;
70  
71    private static final List<HRegionInfo> EMPTY_REGION_LIST = new ArrayList<HRegionInfo>(0);
72  
73    protected final RegionLocationFinder regionFinder = new RegionLocationFinder();
74  
75    private static class DefaultRackManager extends RackManager {
76      @Override
77      public String getRack(ServerName server) {
78        return UNKNOWN_RACK;
79      }
80    }
81  
82    /**
83     * The constructor that uses the basic MetricsBalancer
84     */
85    protected BaseLoadBalancer() {
86      metricsBalancer = new MetricsBalancer();
87    }
88  
89    /**
90     * This Constructor accepts an instance of MetricsBalancer,
91     * which will be used instead of creating a new one
92     */
93    protected BaseLoadBalancer(MetricsBalancer metricsBalancer) {
94      this.metricsBalancer = (metricsBalancer != null) ? metricsBalancer : new MetricsBalancer();
95    }
96  
97    /**
98     * An efficient array based implementation similar to ClusterState for keeping
99     * the status of the cluster in terms of region assignment and distribution.
100    * LoadBalancers, such as StochasticLoadBalancer uses this Cluster object because of
101    * hundreds of thousands of hashmap manipulations are very costly, which is why this
102    * class uses mostly indexes and arrays.
103    *
104    * Cluster tracks a list of unassigned regions, region assignments, and the server
105    * topology in terms of server names, hostnames and racks.
106    */
107   protected static class Cluster {
108     ServerName[] servers;
109     String[] hosts; // ServerName uniquely identifies a region server. multiple RS can run on the same host
110     String[] racks;
111     boolean multiServersPerHost = false; // whether or not any host has more than one server
112 
113     ArrayList<String> tables;
114     HRegionInfo[] regions;
115     Deque<RegionLoad>[] regionLoads;
116     private RegionLocationFinder regionFinder;
117 
118     int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
119 
120     int[]   serverIndexToHostIndex;      //serverIndex -> host index
121     int[]   serverIndexToRackIndex;      //serverIndex -> rack index
122 
123     int[][] regionsPerServer;            //serverIndex -> region list
124     int[][] regionsPerHost;              //hostIndex -> list of regions
125     int[][] regionsPerRack;              //rackIndex -> region list
126     int[][] primariesOfRegionsPerServer; //serverIndex -> sorted list of regions by primary region index
127     int[][] primariesOfRegionsPerHost;   //hostIndex -> sorted list of regions by primary region index
128     int[][] primariesOfRegionsPerRack;   //rackIndex -> sorted list of regions by primary region index
129 
130     int[][] serversPerHost;              //hostIndex -> list of server indexes
131     int[][] serversPerRack;              //rackIndex -> list of server indexes
132     int[]   regionIndexToServerIndex;    //regionIndex -> serverIndex
133     int[]   initialRegionIndexToServerIndex;    //regionIndex -> serverIndex (initial cluster state)
134     int[]   regionIndexToTableIndex;     //regionIndex -> tableIndex
135     int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
136     int[]   numMaxRegionsPerTable;       //tableIndex -> max number of regions in a single RS
137     int[]   regionIndexToPrimaryIndex;   //regionIndex -> regionIndex of the primary
138     boolean hasRegionReplicas = false;   //whether there is regions with replicas
139 
140     Integer[] serverIndicesSortedByRegionCount;
141     Integer[] serverIndicesSortedByLocality;
142 
143     Map<String, Integer> serversToIndex;
144     Map<String, Integer> hostsToIndex;
145     Map<String, Integer> racksToIndex;
146     Map<String, Integer> tablesToIndex;
147     Map<HRegionInfo, Integer> regionsToIndex;
148     float[] localityPerServer;
149 
150     int numServers;
151     int numHosts;
152     int numRacks;
153     int numTables;
154     int numRegions;
155 
156     int numMovedRegions = 0; //num moved regions from the initial configuration
157     Map<ServerName, List<HRegionInfo>> clusterState;
158 
159     protected final RackManager rackManager;
160     // Maps region -> rackIndex -> locality of region on rack
161     private float[][] rackLocalities;
162     // Maps localityType -> region -> [server|rack]Index with highest locality
163     private int[][] regionsToMostLocalEntities;
164 
165     protected Cluster(
166         Map<ServerName, List<HRegionInfo>> clusterState,
167         Map<String, Deque<RegionLoad>> loads,
168         RegionLocationFinder regionFinder,
169         RackManager rackManager) {
170       this(null, clusterState, loads, regionFinder, rackManager);
171     }
172 
173     @SuppressWarnings("unchecked")
174     protected Cluster(
175         Collection<HRegionInfo> unassignedRegions,
176         Map<ServerName, List<HRegionInfo>> clusterState,
177         Map<String, Deque<RegionLoad>> loads,
178         RegionLocationFinder regionFinder,
179         RackManager rackManager) {
180 
181       if (unassignedRegions == null) {
182         unassignedRegions = EMPTY_REGION_LIST;
183       }
184 
185       serversToIndex = new HashMap<String, Integer>();
186       hostsToIndex = new HashMap<String, Integer>();
187       racksToIndex = new HashMap<String, Integer>();
188       tablesToIndex = new HashMap<String, Integer>();
189 
190       //TODO: We should get the list of tables from master
191       tables = new ArrayList<String>();
192       this.rackManager = rackManager != null ? rackManager : new DefaultRackManager();
193 
194       numRegions = 0;
195 
196       List<List<Integer>> serversPerHostList = new ArrayList<List<Integer>>();
197       List<List<Integer>> serversPerRackList = new ArrayList<List<Integer>>();
198       this.clusterState = clusterState;
199       this.regionFinder = regionFinder;
200 
201       // Use servername and port as there can be dead servers in this list. We want everything with
202       // a matching hostname and port to have the same index.
203       for (ServerName sn : clusterState.keySet()) {
204         if (serversToIndex.get(sn.getHostAndPort()) == null) {
205           serversToIndex.put(sn.getHostAndPort(), numServers++);
206         }
207         if (!hostsToIndex.containsKey(sn.getHostname())) {
208           hostsToIndex.put(sn.getHostname(), numHosts++);
209           serversPerHostList.add(new ArrayList<Integer>(1));
210         }
211 
212         int serverIndex = serversToIndex.get(sn.getHostAndPort());
213         int hostIndex = hostsToIndex.get(sn.getHostname());
214         serversPerHostList.get(hostIndex).add(serverIndex);
215 
216         String rack = this.rackManager.getRack(sn);
217         if (!racksToIndex.containsKey(rack)) {
218           racksToIndex.put(rack, numRacks++);
219           serversPerRackList.add(new ArrayList<Integer>());
220         }
221         int rackIndex = racksToIndex.get(rack);
222         serversPerRackList.get(rackIndex).add(serverIndex);
223       }
224 
225       // Count how many regions there are.
226       for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
227         numRegions += entry.getValue().size();
228       }
229       numRegions += unassignedRegions.size();
230 
231       regionsToIndex = new HashMap<HRegionInfo, Integer>(numRegions);
232       servers = new ServerName[numServers];
233       serversPerHost = new int[numHosts][];
234       serversPerRack = new int[numRacks][];
235       regions = new HRegionInfo[numRegions];
236       regionIndexToServerIndex = new int[numRegions];
237       initialRegionIndexToServerIndex = new int[numRegions];
238       regionIndexToTableIndex = new int[numRegions];
239       regionIndexToPrimaryIndex = new int[numRegions];
240       regionLoads = new Deque[numRegions];
241 
242       regionLocations = new int[numRegions][];
243       serverIndicesSortedByRegionCount = new Integer[numServers];
244       serverIndicesSortedByLocality = new Integer[numServers];
245       localityPerServer = new float[numServers];
246 
247       serverIndexToHostIndex = new int[numServers];
248       serverIndexToRackIndex = new int[numServers];
249       regionsPerServer = new int[numServers][];
250       regionsPerHost = new int[numHosts][];
251       regionsPerRack = new int[numRacks][];
252       primariesOfRegionsPerServer = new int[numServers][];
253       primariesOfRegionsPerHost = new int[numHosts][];
254       primariesOfRegionsPerRack = new int[numRacks][];
255 
256       int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
257 
258       for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
259         int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
260 
261         // keep the servername if this is the first server name for this hostname
262         // or this servername has the newest startcode.
263         if (servers[serverIndex] == null ||
264             servers[serverIndex].getStartcode() < entry.getKey().getStartcode()) {
265           servers[serverIndex] = entry.getKey();
266         }
267 
268         if (regionsPerServer[serverIndex] != null) {
269           // there is another server with the same hostAndPort in ClusterState.
270           // allocate the array for the total size
271           regionsPerServer[serverIndex] = new int[entry.getValue().size() + regionsPerServer[serverIndex].length];
272         } else {
273           regionsPerServer[serverIndex] = new int[entry.getValue().size()];
274         }
275         primariesOfRegionsPerServer[serverIndex] = new int[regionsPerServer[serverIndex].length];
276         serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
277         serverIndicesSortedByLocality[serverIndex] = serverIndex;
278       }
279 
280       hosts = new String[numHosts];
281       for (Entry<String, Integer> entry : hostsToIndex.entrySet()) {
282         hosts[entry.getValue()] = entry.getKey();
283       }
284       racks = new String[numRacks];
285       for (Entry<String, Integer> entry : racksToIndex.entrySet()) {
286         racks[entry.getValue()] = entry.getKey();
287       }
288 
289       for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
290         int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
291         regionPerServerIndex = 0;
292 
293         int hostIndex = hostsToIndex.get(entry.getKey().getHostname());
294         serverIndexToHostIndex[serverIndex] = hostIndex;
295 
296         int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey()));
297         serverIndexToRackIndex[serverIndex] = rackIndex;
298 
299         for (HRegionInfo region : entry.getValue()) {
300           registerRegion(region, regionIndex, serverIndex, loads, regionFinder);
301           regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
302           regionIndex++;
303         }
304       }
305 
306       for (HRegionInfo region : unassignedRegions) {
307         registerRegion(region, regionIndex, -1, loads, regionFinder);
308         regionIndex++;
309       }
310 
311       for (int i = 0; i < serversPerHostList.size(); i++) {
312         serversPerHost[i] = new int[serversPerHostList.get(i).size()];
313         for (int j = 0; j < serversPerHost[i].length; j++) {
314           serversPerHost[i][j] = serversPerHostList.get(i).get(j);
315         }
316         if (serversPerHost[i].length > 1) {
317           multiServersPerHost = true;
318         }
319       }
320 
321       for (int i = 0; i < serversPerRackList.size(); i++) {
322         serversPerRack[i] = new int[serversPerRackList.get(i).size()];
323         for (int j = 0; j < serversPerRack[i].length; j++) {
324           serversPerRack[i][j] = serversPerRackList.get(i).get(j);
325         }
326       }
327 
328       numTables = tables.size();
329       numRegionsPerServerPerTable = new int[numServers][numTables];
330 
331       for (int i = 0; i < numServers; i++) {
332         for (int j = 0; j < numTables; j++) {
333           numRegionsPerServerPerTable[i][j] = 0;
334         }
335       }
336 
337       for (int i=0; i < regionIndexToServerIndex.length; i++) {
338         if (regionIndexToServerIndex[i] >= 0) {
339           numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
340         }
341       }
342 
343       numMaxRegionsPerTable = new int[numTables];
344       for (int serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) {
345         for (tableIndex = 0 ; tableIndex < numRegionsPerServerPerTable[serverIndex].length; tableIndex++) {
346           if (numRegionsPerServerPerTable[serverIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
347             numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[serverIndex][tableIndex];
348           }
349         }
350       }
351 
352       for (int i = 0; i < regions.length; i ++) {
353         HRegionInfo info = regions[i];
354         if (RegionReplicaUtil.isDefaultReplica(info)) {
355           regionIndexToPrimaryIndex[i] = i;
356         } else {
357           hasRegionReplicas = true;
358           HRegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info);
359           regionIndexToPrimaryIndex[i] =
360               regionsToIndex.containsKey(primaryInfo) ?
361               regionsToIndex.get(primaryInfo):
362               -1;
363         }
364       }
365 
366       for (int i = 0; i < regionsPerServer.length; i++) {
367         primariesOfRegionsPerServer[i] = new int[regionsPerServer[i].length];
368         for (int j = 0; j < regionsPerServer[i].length; j++) {
369           int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]];
370           primariesOfRegionsPerServer[i][j] = primaryIndex;
371         }
372         // sort the regions by primaries.
373         Arrays.sort(primariesOfRegionsPerServer[i]);
374       }
375 
376       // compute regionsPerHost
377       if (multiServersPerHost) {
378         for (int i = 0 ; i < serversPerHost.length; i++) {
379           int numRegionsPerHost = 0;
380           for (int j = 0; j < serversPerHost[i].length; j++) {
381             numRegionsPerHost += regionsPerServer[serversPerHost[i][j]].length;
382           }
383           regionsPerHost[i] = new int[numRegionsPerHost];
384           primariesOfRegionsPerHost[i] = new int[numRegionsPerHost];
385         }
386         for (int i = 0 ; i < serversPerHost.length; i++) {
387           int numRegionPerHostIndex = 0;
388           for (int j = 0; j < serversPerHost[i].length; j++) {
389             for (int k = 0; k < regionsPerServer[serversPerHost[i][j]].length; k++) {
390               int region = regionsPerServer[serversPerHost[i][j]][k];
391               regionsPerHost[i][numRegionPerHostIndex] = region;
392               int primaryIndex = regionIndexToPrimaryIndex[region];
393               primariesOfRegionsPerHost[i][numRegionPerHostIndex] = primaryIndex;
394               numRegionPerHostIndex++;
395             }
396           }
397           // sort the regions by primaries.
398           Arrays.sort(primariesOfRegionsPerHost[i]);
399         }
400       }
401 
402       // compute regionsPerRack
403       if (numRacks > 1) {
404         for (int i = 0 ; i < serversPerRack.length; i++) {
405           int numRegionsPerRack = 0;
406           for (int j = 0; j < serversPerRack[i].length; j++) {
407             numRegionsPerRack += regionsPerServer[serversPerRack[i][j]].length;
408           }
409           regionsPerRack[i] = new int[numRegionsPerRack];
410           primariesOfRegionsPerRack[i] = new int[numRegionsPerRack];
411         }
412 
413         for (int i = 0 ; i < serversPerRack.length; i++) {
414           int numRegionPerRackIndex = 0;
415           for (int j = 0; j < serversPerRack[i].length; j++) {
416             for (int k = 0; k < regionsPerServer[serversPerRack[i][j]].length; k++) {
417               int region = regionsPerServer[serversPerRack[i][j]][k];
418               regionsPerRack[i][numRegionPerRackIndex] = region;
419               int primaryIndex = regionIndexToPrimaryIndex[region];
420               primariesOfRegionsPerRack[i][numRegionPerRackIndex] = primaryIndex;
421               numRegionPerRackIndex++;
422             }
423           }
424           // sort the regions by primaries.
425           Arrays.sort(primariesOfRegionsPerRack[i]);
426         }
427       }
428     }
429 
430     /** Helper for Cluster constructor to handle a region */
431     private void registerRegion(HRegionInfo region, int regionIndex,
432         int serverIndex, Map<String, Deque<RegionLoad>> loads,
433         RegionLocationFinder regionFinder) {
434       String tableName = region.getTable().getNameAsString();
435       if (!tablesToIndex.containsKey(tableName)) {
436         tables.add(tableName);
437         tablesToIndex.put(tableName, tablesToIndex.size());
438       }
439       int tableIndex = tablesToIndex.get(tableName);
440 
441       regionsToIndex.put(region, regionIndex);
442       regions[regionIndex] = region;
443       regionIndexToServerIndex[regionIndex] = serverIndex;
444       initialRegionIndexToServerIndex[regionIndex] = serverIndex;
445       regionIndexToTableIndex[regionIndex] = tableIndex;
446 
447       // region load
448       if (loads != null) {
449         Deque<RegionLoad> rl = loads.get(region.getRegionNameAsString());
450         // That could have failed if the RegionLoad is using the other regionName
451         if (rl == null) {
452           // Try getting the region load using encoded name.
453           rl = loads.get(region.getEncodedName());
454         }
455         regionLoads[regionIndex] = rl;
456       }
457 
458       if (regionFinder != null) {
459         // region location
460         List<ServerName> loc = regionFinder.getTopBlockLocations(region);
461         regionLocations[regionIndex] = new int[loc.size()];
462         for (int i = 0; i < loc.size(); i++) {
463           regionLocations[regionIndex][i] = loc.get(i) == null ? -1
464               : (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1
465                   : serversToIndex.get(loc.get(i).getHostAndPort()));
466         }
467       }
468     }
469 
470     /**
471      * Returns true iff a given server has less regions than the balanced amount
472      */
473     public boolean serverHasTooFewRegions(int server) {
474       int minLoad = this.numRegions / numServers;
475       int numRegions = getNumRegions(server);
476       return numRegions < minLoad;
477     }
478 
479     /**
480      * Retrieves and lazily initializes a field storing the locality of
481      * every region/server combination
482      */
483     public float[][] getOrComputeRackLocalities() {
484       if (rackLocalities == null || regionsToMostLocalEntities == null) {
485         computeCachedLocalities();
486       }
487       return rackLocalities;
488     }
489 
490     /**
491      * Lazily initializes and retrieves a mapping of region -> server for which region has
492      * the highest the locality
493      */
494     public int[] getOrComputeRegionsToMostLocalEntities(LocalityType type) {
495       if (rackLocalities == null || regionsToMostLocalEntities == null) {
496         computeCachedLocalities();
497       }
498       return regionsToMostLocalEntities[type.ordinal()];
499     }
500 
501     /**
502      * Looks up locality from cache of localities. Will create cache if it does
503      * not already exist.
504      */
505     public float getOrComputeLocality(int region, int entity, LocalityType type) {
506       switch (type) {
507         case SERVER:
508           return getLocalityOfRegion(region, entity);
509         case RACK:
510           return getOrComputeRackLocalities()[region][entity];
511         default:
512           throw new IllegalArgumentException("Unsupported LocalityType: " + type);
513       }
514     }
515 
516     /**
517      * Returns locality weighted by region size in MB. Will create locality cache
518      * if it does not already exist.
519      */
520     public double getOrComputeWeightedLocality(int region, int server, LocalityType type) {
521       return getRegionSizeMB(region) * getOrComputeLocality(region, server, type);
522     }
523 
524     /**
525      * Returns the size in MB from the most recent RegionLoad for region
526      */
527     public int getRegionSizeMB(int region) {
528       Deque<RegionLoad> load = regionLoads[region];
529       // This means regions have no actual data on disk
530       if (load == null) {
531         return 0;
532       }
533       return regionLoads[region].getLast().getStorefileSizeMB();
534     }
535 
536     /**
537      * Computes and caches the locality for each region/rack combinations,
538      * as well as storing a mapping of region -> server and region -> rack such that server
539      * and rack have the highest locality for region
540      */
541     private void computeCachedLocalities() {
542       rackLocalities = new float[numRegions][numServers];
543       regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions];
544 
545       // Compute localities and find most local server per region
546       for (int region = 0; region < numRegions; region++) {
547         int serverWithBestLocality = 0;
548         float bestLocalityForRegion = 0;
549         for (int server = 0; server < numServers; server++) {
550           // Aggregate per-rack locality
551           float locality = getLocalityOfRegion(region, server);
552           int rack = serverIndexToRackIndex[server];
553           int numServersInRack = serversPerRack[rack].length;
554           rackLocalities[region][rack] += locality / numServersInRack;
555 
556           if (locality > bestLocalityForRegion) {
557             serverWithBestLocality = server;
558             bestLocalityForRegion = locality;
559           }
560         }
561         regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality;
562 
563         // Find most local rack per region
564         int rackWithBestLocality = 0;
565         float bestRackLocalityForRegion = 0.0f;
566         for (int rack = 0; rack < numRacks; rack++) {
567           float rackLocality = rackLocalities[region][rack];
568           if (rackLocality > bestRackLocalityForRegion) {
569             bestRackLocalityForRegion = rackLocality;
570             rackWithBestLocality = rack;
571           }
572         }
573         regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality;
574       }
575 
576     }
577 
578     /**
579      * Maps region index to rack index
580      */
581     public int getRackForRegion(int region) {
582       return serverIndexToRackIndex[regionIndexToServerIndex[region]];
583     }
584 
585     enum LocalityType {
586       SERVER,
587       RACK
588     }
589 
590     /** An action to move or swap a region */
591     public static class Action {
592       public static enum Type {
593         ASSIGN_REGION,
594         MOVE_REGION,
595         SWAP_REGIONS,
596         NULL,
597       }
598 
599       public Type type;
600       public Action (Type type) {this.type = type;}
601       /** Returns an Action which would undo this action */
602       public Action undoAction() { return this; }
603       @Override
604       public String toString() { return type + ":";}
605     }
606 
607     public static class AssignRegionAction extends Action {
608       public int region;
609       public int server;
610       public AssignRegionAction(int region, int server) {
611         super(Type.ASSIGN_REGION);
612         this.region = region;
613         this.server = server;
614       }
615       @Override
616       public Action undoAction() {
617         // TODO implement this. This action is not being used by the StochasticLB for now
618         // in case it uses it, we should implement this function.
619         throw new NotImplementedException();
620       }
621       @Override
622       public String toString() {
623         return type + ": " + region + ":" + server;
624       }
625     }
626 
627     public static class MoveRegionAction extends Action {
628       public int region;
629       public int fromServer;
630       public int toServer;
631 
632       public MoveRegionAction(int region, int fromServer, int toServer) {
633         super(Type.MOVE_REGION);
634         this.fromServer = fromServer;
635         this.region = region;
636         this.toServer = toServer;
637       }
638       @Override
639       public Action undoAction() {
640         return new MoveRegionAction (region, toServer, fromServer);
641       }
642       @Override
643       public String toString() {
644         return type + ": " + region + ":" + fromServer + " -> " + toServer;
645       }
646     }
647 
648     public static class SwapRegionsAction extends Action {
649       public int fromServer;
650       public int fromRegion;
651       public int toServer;
652       public int toRegion;
653       public SwapRegionsAction(int fromServer, int fromRegion, int toServer, int toRegion) {
654         super(Type.SWAP_REGIONS);
655         this.fromServer = fromServer;
656         this.fromRegion = fromRegion;
657         this.toServer = toServer;
658         this.toRegion = toRegion;
659       }
660       @Override
661       public Action undoAction() {
662         return new SwapRegionsAction (fromServer, toRegion, toServer, fromRegion);
663       }
664       @Override
665       public String toString() {
666         return type + ": " + fromRegion + ":" + fromServer + " <-> " + toRegion + ":" + toServer;
667       }
668     }
669 
670     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NM_FIELD_NAMING_CONVENTION",
671         justification="Mistake. Too disruptive to change now")
672     public static final Action NullAction = new Action(Type.NULL);
673 
674     public void doAction(Action action) {
675       switch (action.type) {
676       case NULL: break;
677       case ASSIGN_REGION:
678         AssignRegionAction ar = (AssignRegionAction) action;
679         regionsPerServer[ar.server] = addRegion(regionsPerServer[ar.server], ar.region);
680         regionMoved(ar.region, -1, ar.server);
681         break;
682       case MOVE_REGION:
683         MoveRegionAction mra = (MoveRegionAction) action;
684         regionsPerServer[mra.fromServer] = removeRegion(regionsPerServer[mra.fromServer], mra.region);
685         regionsPerServer[mra.toServer] = addRegion(regionsPerServer[mra.toServer], mra.region);
686         regionMoved(mra.region, mra.fromServer, mra.toServer);
687         break;
688       case SWAP_REGIONS:
689         SwapRegionsAction a = (SwapRegionsAction) action;
690         regionsPerServer[a.fromServer] = replaceRegion(regionsPerServer[a.fromServer], a.fromRegion, a.toRegion);
691         regionsPerServer[a.toServer] = replaceRegion(regionsPerServer[a.toServer], a.toRegion, a.fromRegion);
692         regionMoved(a.fromRegion, a.fromServer, a.toServer);
693         regionMoved(a.toRegion, a.toServer, a.fromServer);
694         break;
695       default:
696         throw new RuntimeException("Uknown action:" + action.type);
697       }
698     }
699 
700     /**
701      * Return true if the placement of region on server would lower the availability
702      * of the region in question
703      * @param server
704      * @param region
705      * @return true or false
706      */
707     boolean wouldLowerAvailability(HRegionInfo regionInfo, ServerName serverName) {
708       if (!serversToIndex.containsKey(serverName.getHostAndPort())) {
709         return false; // safeguard against race between cluster.servers and servers from LB method args
710       }
711       int server = serversToIndex.get(serverName.getHostAndPort());
712       int region = regionsToIndex.get(regionInfo);
713 
714       int primary = regionIndexToPrimaryIndex[region];
715 
716       // there is a subset relation for server < host < rack
717       // check server first
718 
719       if (contains(primariesOfRegionsPerServer[server], primary)) {
720         // check for whether there are other servers that we can place this region
721         for (int i = 0; i < primariesOfRegionsPerServer.length; i++) {
722           if (i != server && !contains(primariesOfRegionsPerServer[i], primary)) {
723             return true; // meaning there is a better server
724           }
725         }
726         return false; // there is not a better server to place this
727       }
728 
729       // check host
730       if (multiServersPerHost) { // these arrays would only be allocated if we have more than one server per host
731         int host = serverIndexToHostIndex[server];
732         if (contains(primariesOfRegionsPerHost[host], primary)) {
733           // check for whether there are other hosts that we can place this region
734           for (int i = 0; i < primariesOfRegionsPerHost.length; i++) {
735             if (i != host && !contains(primariesOfRegionsPerHost[i], primary)) {
736               return true; // meaning there is a better host
737             }
738           }
739           return false; // there is not a better host to place this
740         }
741       }
742 
743       // check rack
744       if (numRacks > 1) {
745         int rack = serverIndexToRackIndex[server];
746         if (contains(primariesOfRegionsPerRack[rack], primary)) {
747           // check for whether there are other racks that we can place this region
748           for (int i = 0; i < primariesOfRegionsPerRack.length; i++) {
749             if (i != rack && !contains(primariesOfRegionsPerRack[i], primary)) {
750               return true; // meaning there is a better rack
751             }
752           }
753           return false; // there is not a better rack to place this
754         }
755       }
756       return false;
757     }
758 
759     void doAssignRegion(HRegionInfo regionInfo, ServerName serverName) {
760       if (!serversToIndex.containsKey(serverName.getHostAndPort())) {
761         return;
762       }
763       int server = serversToIndex.get(serverName.getHostAndPort());
764       int region = regionsToIndex.get(regionInfo);
765       doAction(new AssignRegionAction(region, server));
766     }
767 
768     void regionMoved(int region, int oldServer, int newServer) {
769       regionIndexToServerIndex[region] = newServer;
770       if (initialRegionIndexToServerIndex[region] == newServer) {
771         numMovedRegions--; //region moved back to original location
772       } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) {
773         numMovedRegions++; //region moved from original location
774       }
775       int tableIndex = regionIndexToTableIndex[region];
776       if (oldServer >= 0) {
777         numRegionsPerServerPerTable[oldServer][tableIndex]--;
778       }
779       numRegionsPerServerPerTable[newServer][tableIndex]++;
780 
781       //check whether this caused maxRegionsPerTable in the new Server to be updated
782       if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
783         numRegionsPerServerPerTable[newServer][tableIndex] = numMaxRegionsPerTable[tableIndex];
784       } else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + 1)
785           == numMaxRegionsPerTable[tableIndex]) {
786         //recompute maxRegionsPerTable since the previous value was coming from the old server
787         for (int serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) {
788           if (numRegionsPerServerPerTable[serverIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
789             numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[serverIndex][tableIndex];
790           }
791         }
792       }
793 
794       // update for servers
795       int primary = regionIndexToPrimaryIndex[region];
796       if (oldServer >= 0) {
797         primariesOfRegionsPerServer[oldServer] = removeRegion(
798           primariesOfRegionsPerServer[oldServer], primary);
799       }
800       primariesOfRegionsPerServer[newServer] = addRegionSorted(
801         primariesOfRegionsPerServer[newServer], primary);
802 
803       // update for hosts
804       if (multiServersPerHost) {
805         int oldHost = oldServer >= 0 ? serverIndexToHostIndex[oldServer] : -1;
806         int newHost = serverIndexToHostIndex[newServer];
807         if (newHost != oldHost) {
808           regionsPerHost[newHost] = addRegion(regionsPerHost[newHost], region);
809           primariesOfRegionsPerHost[newHost] = addRegionSorted(primariesOfRegionsPerHost[newHost], primary);
810           if (oldHost >= 0) {
811             regionsPerHost[oldHost] = removeRegion(regionsPerHost[oldHost], region);
812             primariesOfRegionsPerHost[oldHost] = removeRegion(
813               primariesOfRegionsPerHost[oldHost], primary); // will still be sorted
814           }
815         }
816       }
817 
818       // update for racks
819       if (numRacks > 1) {
820         int oldRack = oldServer >= 0 ? serverIndexToRackIndex[oldServer] : -1;
821         int newRack = serverIndexToRackIndex[newServer];
822         if (newRack != oldRack) {
823           regionsPerRack[newRack] = addRegion(regionsPerRack[newRack], region);
824           primariesOfRegionsPerRack[newRack] = addRegionSorted(primariesOfRegionsPerRack[newRack], primary);
825           if (oldRack >= 0) {
826             regionsPerRack[oldRack] = removeRegion(regionsPerRack[oldRack], region);
827             primariesOfRegionsPerRack[oldRack] = removeRegion(
828               primariesOfRegionsPerRack[oldRack], primary); // will still be sorted
829           }
830         }
831       }
832     }
833 
834     int[] removeRegion(int[] regions, int regionIndex) {
835       //TODO: this maybe costly. Consider using linked lists
836       int[] newRegions = new int[regions.length - 1];
837       int i = 0;
838       for (i = 0; i < regions.length; i++) {
839         if (regions[i] == regionIndex) {
840           break;
841         }
842         newRegions[i] = regions[i];
843       }
844       System.arraycopy(regions, i+1, newRegions, i, newRegions.length - i);
845       return newRegions;
846     }
847 
848     int[] addRegion(int[] regions, int regionIndex) {
849       int[] newRegions = new int[regions.length + 1];
850       System.arraycopy(regions, 0, newRegions, 0, regions.length);
851       newRegions[newRegions.length - 1] = regionIndex;
852       return newRegions;
853     }
854 
855     int[] addRegionSorted(int[] regions, int regionIndex) {
856       int[] newRegions = new int[regions.length + 1];
857       int i = 0;
858       for (i = 0; i < regions.length; i++) { // find the index to insert
859         if (regions[i] > regionIndex) {
860           break;
861         }
862       }
863       System.arraycopy(regions, 0, newRegions, 0, i); // copy first half
864       System.arraycopy(regions, i, newRegions, i+1, regions.length - i); // copy second half
865       newRegions[i] = regionIndex;
866 
867       return newRegions;
868     }
869 
870     int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) {
871       int i = 0;
872       for (i = 0; i < regions.length; i++) {
873         if (regions[i] == regionIndex) {
874           regions[i] = newRegionIndex;
875           break;
876         }
877       }
878       return regions;
879     }
880 
881     void sortServersByRegionCount() {
882       Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator);
883     }
884 
885     int getNumRegions(int server) {
886       return regionsPerServer[server].length;
887     }
888 
889     boolean contains(int[] arr, int val) {
890       return Arrays.binarySearch(arr, val) >= 0;
891     }
892 
893     private Comparator<Integer> numRegionsComparator = new Comparator<Integer>() {
894       @Override
895       public int compare(Integer integer, Integer integer2) {
896         return Integer.valueOf(getNumRegions(integer)).compareTo(getNumRegions(integer2));
897       }
898     };
899 
900     void sortServersByLocality() {
901       Arrays.sort(serverIndicesSortedByLocality, localityComparator);
902     }
903 
904     float getLocality(int server) {
905       return localityPerServer[server];
906     }
907 
908     private Comparator<Integer> localityComparator = new Comparator<Integer>() {
909       @Override
910       public int compare(Integer integer, Integer integer2) {
911         float locality1 = getLocality(integer);
912         float locality2 = getLocality(integer2);
913         if (locality1 < locality2) {
914           return -1;
915         } else if (locality1 > locality2) {
916           return 1;
917         } else {
918           return 0;
919         }
920       }
921     };
922 
923     int getLowestLocalityRegionServer() {
924       if (regionFinder == null) {
925         return -1;
926       } else {
927         sortServersByLocality();
928         // We want to find server with non zero regions having lowest locality.
929         int i = 0;
930         int lowestLocalityServerIndex = serverIndicesSortedByLocality[i];
931         while (localityPerServer[lowestLocalityServerIndex] == 0
932             && (regionsPerServer[lowestLocalityServerIndex].length == 0)) {
933           i++;
934           lowestLocalityServerIndex = serverIndicesSortedByLocality[i];
935         }
936         if (LOG.isTraceEnabled()) {
937           LOG.trace("Lowest locality region server with non zero regions is "
938             + servers[lowestLocalityServerIndex].getHostname() + " with locality "
939             + localityPerServer[lowestLocalityServerIndex]);
940         }
941         return lowestLocalityServerIndex;
942       }
943     }
944 
945     int getLowestLocalityRegionOnServer(int serverIndex) {
946       if (regionFinder != null) {
947         float lowestLocality = 1.0f;
948         int lowestLocalityRegionIndex = -1;
949         if (regionsPerServer[serverIndex].length == 0) {
950           // No regions on that region server
951           return -1;
952         }
953         for (int j = 0; j < regionsPerServer[serverIndex].length; j++) {
954           int regionIndex = regionsPerServer[serverIndex][j];
955           HDFSBlocksDistribution distribution = regionFinder
956               .getBlockDistribution(regions[regionIndex]);
957           float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname());
958           // skip empty region
959           if (distribution.getUniqueBlocksTotalWeight() == 0) {
960             continue;
961           }
962           if (locality < lowestLocality) {
963             lowestLocality = locality;
964             lowestLocalityRegionIndex = j;
965           }
966         }
967         if (lowestLocalityRegionIndex == -1) {
968           return -1;
969         }
970         if (LOG.isTraceEnabled()) {
971           LOG.trace("Lowest locality region is "
972               + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]]
973                   .getRegionNameAsString() + " with locality " + lowestLocality
974               + " and its region server contains " + regionsPerServer[serverIndex].length
975               + " regions");
976         }
977         return regionsPerServer[serverIndex][lowestLocalityRegionIndex];
978       } else {
979         return -1;
980       }
981     }
982 
983     float getLocalityOfRegion(int region, int server) {
984       if (regionFinder != null) {
985         HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]);
986         return distribution.getBlockLocalityIndex(servers[server].getHostname());
987       } else {
988         return 0f;
989       }
990     }
991 
992     /**
993      * Returns a least loaded server which has better locality for this region
994      * than the current server.
995      */
996     int getLeastLoadedTopServerForRegion(int region, int currentServer) {
997       if (regionFinder != null) {
998         List<ServerName> topLocalServers = regionFinder.getTopBlockLocations(regions[region],
999           servers[currentServer].getHostname());
1000         int leastLoadedServerIndex = -1;
1001         int load = Integer.MAX_VALUE;
1002         for (ServerName sn : topLocalServers) {
1003           int index = serversToIndex.get(sn);
1004           int tempLoad = regionsPerServer[index].length;
1005           if (tempLoad <= load) {
1006             leastLoadedServerIndex = index;
1007             load = tempLoad;
1008           }
1009         }
1010         if (leastLoadedServerIndex != -1) {
1011           LOG.debug("Pick the least loaded server " + servers[leastLoadedServerIndex].getHostname()
1012             + " with better locality for region " + regions[region]);
1013         }
1014         return leastLoadedServerIndex;
1015       } else {
1016         return -1;
1017       }
1018     }
1019 
1020     void calculateRegionServerLocalities() {
1021       if (regionFinder == null) {
1022         LOG.warn("Region location finder found null, skipping locality calculations.");
1023         return;
1024       }
1025       for (int i = 0; i < regionsPerServer.length; i++) {
1026         HDFSBlocksDistribution distribution = new HDFSBlocksDistribution();
1027         if (regionsPerServer[i].length > 0) {
1028           for (int j = 0; j < regionsPerServer[i].length; j++) {
1029             int regionIndex = regionsPerServer[i][j];
1030             distribution.add(regionFinder.getBlockDistribution(regions[regionIndex]));
1031           }
1032         } else {
1033           LOG.debug("Server " + servers[i].getHostname() + " had 0 regions.");
1034         }
1035         localityPerServer[i] = distribution.getBlockLocalityIndex(servers[i].getHostname());
1036       }
1037     }
1038 
1039     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SBSC_USE_STRINGBUFFER_CONCATENATION",
1040         justification="Not important but should be fixed")
1041     @Override
1042     public String toString() {
1043       String desc = "Cluster{" +
1044           "servers=[";
1045           for(ServerName sn:servers) {
1046              desc += sn.getHostAndPort() + ", ";
1047           }
1048           desc +=
1049           ", serverIndicesSortedByRegionCount="+
1050           Arrays.toString(serverIndicesSortedByRegionCount) +
1051           ", regionsPerServer=[";
1052 
1053           for (int[]r:regionsPerServer) {
1054             desc += Arrays.toString(r);
1055           }
1056           desc += "]" +
1057           ", numMaxRegionsPerTable=" +
1058           Arrays.toString(numMaxRegionsPerTable) +
1059           ", numRegions=" +
1060           numRegions +
1061           ", numServers=" +
1062           numServers +
1063           ", numTables=" +
1064           numTables +
1065           ", numMovedRegions=" +
1066           numMovedRegions +
1067           '}';
1068       return desc;
1069     }
1070   }
1071 
1072   // slop for regions
1073   protected float slop;
1074   protected Configuration config;
1075   protected RackManager rackManager;
1076   private static final Random RANDOM = new Random(System.currentTimeMillis());
1077   private static final Log LOG = LogFactory.getLog(BaseLoadBalancer.class);
1078 
1079   public static final String TABLES_ON_MASTER =
1080     "hbase.balancer.tablesOnMaster";
1081 
1082   protected final Set<String> tablesOnMaster = new HashSet<String>();
1083   protected MetricsBalancer metricsBalancer = null;
1084   protected ClusterStatus clusterStatus = null;
1085   protected ServerName masterServerName;
1086   protected MasterServices services;
1087 
1088   protected static String[] getTablesOnMaster(Configuration conf) {
1089     String valueString = conf.get(TABLES_ON_MASTER);
1090     if (valueString != null) {
1091       valueString = valueString.trim();
1092     }
1093     if (valueString == null || valueString.equalsIgnoreCase("none")) {
1094       return null;
1095     }
1096     return StringUtils.getStrings(valueString);
1097   }
1098 
1099   /**
1100    * Check if configured to put any tables on the active master
1101    */
1102   public static boolean tablesOnMaster(Configuration conf) {
1103     String[] tables = getTablesOnMaster(conf);
1104     return tables != null && tables.length > 0;
1105   }
1106 
1107   @Override
1108   public void setConf(Configuration conf) {
1109     setSlop(conf);
1110     if (slop < 0) slop = 0;
1111     else if (slop > 1) slop = 1;
1112 
1113     this.config = conf;
1114     String[] tables = getTablesOnMaster(conf);
1115     if (tables != null && tables.length > 0) {
1116       Collections.addAll(tablesOnMaster, tables);
1117     }
1118     this.rackManager = new RackManager(getConf());
1119     regionFinder.setConf(conf);
1120   }
1121 
1122   protected void setSlop(Configuration conf) {
1123     this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2);
1124   }
1125 
1126   /**
1127    * Check if a region belongs to some small system table.
1128    * If so, the primary replica may be expected to be put on the master regionserver.
1129    */
1130   public boolean shouldBeOnMaster(HRegionInfo region) {
1131     return tablesOnMaster.contains(region.getTable().getNameAsString())
1132         && region.getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID;
1133   }
1134 
1135   /**
1136    * Balance the regions that should be on master regionserver.
1137    */
1138   protected List<RegionPlan> balanceMasterRegions(
1139       Map<ServerName, List<HRegionInfo>> clusterMap) {
1140     if (masterServerName == null
1141         || clusterMap == null || clusterMap.size() <= 1) return null;
1142     List<RegionPlan> plans = null;
1143     List<HRegionInfo> regions = clusterMap.get(masterServerName);
1144     if (regions != null) {
1145       Iterator<ServerName> keyIt = null;
1146       for (HRegionInfo region: regions) {
1147         if (shouldBeOnMaster(region)) continue;
1148 
1149         // Find a non-master regionserver to host the region
1150         if (keyIt == null || !keyIt.hasNext()) {
1151           keyIt = clusterMap.keySet().iterator();
1152         }
1153         ServerName dest = keyIt.next();
1154         if (masterServerName.equals(dest)) {
1155           if (!keyIt.hasNext()) {
1156             keyIt = clusterMap.keySet().iterator();
1157           }
1158           dest = keyIt.next();
1159         }
1160 
1161         // Move this region away from the master regionserver
1162         RegionPlan plan = new RegionPlan(region, masterServerName, dest);
1163         if (plans == null) {
1164           plans = new ArrayList<RegionPlan>();
1165         }
1166         plans.add(plan);
1167       }
1168     }
1169     for (Map.Entry<ServerName, List<HRegionInfo>> server: clusterMap.entrySet()) {
1170       if (masterServerName.equals(server.getKey())) continue;
1171       for (HRegionInfo region: server.getValue()) {
1172         if (!shouldBeOnMaster(region)) continue;
1173 
1174         // Move this region to the master regionserver
1175         RegionPlan plan = new RegionPlan(region, server.getKey(), masterServerName);
1176         if (plans == null) {
1177           plans = new ArrayList<RegionPlan>();
1178         }
1179         plans.add(plan);
1180       }
1181     }
1182     return plans;
1183   }
1184 
1185   /**
1186    * Assign the regions that should be on master regionserver.
1187    */
1188   protected Map<ServerName, List<HRegionInfo>> assignMasterRegions(
1189       Collection<HRegionInfo> regions, List<ServerName> servers) {
1190     if (servers == null || regions == null || regions.isEmpty()) {
1191       return null;
1192     }
1193     Map<ServerName, List<HRegionInfo>> assignments
1194       = new TreeMap<ServerName, List<HRegionInfo>>();
1195     if (masterServerName != null && servers.contains(masterServerName)) {
1196       assignments.put(masterServerName, new ArrayList<HRegionInfo>());
1197       for (HRegionInfo region: regions) {
1198         if (shouldBeOnMaster(region)) {
1199           assignments.get(masterServerName).add(region);
1200         }
1201       }
1202     }
1203     return assignments;
1204   }
1205 
1206   @Override
1207   public Configuration getConf() {
1208     return this.config;
1209   }
1210 
1211   @Override
1212   public void setClusterStatus(ClusterStatus st) {
1213     this.clusterStatus = st;
1214     regionFinder.setClusterStatus(st);
1215   }
1216 
1217   @Override
1218   public void setMasterServices(MasterServices masterServices) {
1219     masterServerName = masterServices.getServerName();
1220     this.services = masterServices;
1221     this.regionFinder.setServices(masterServices);
1222   }
1223 
1224   public void setRackManager(RackManager rackManager) {
1225     this.rackManager = rackManager;
1226   }
1227 
1228   protected boolean needsBalance(Cluster c) {
1229     ClusterLoadState cs = new ClusterLoadState(c.clusterState);
1230     if (cs.getNumServers() < MIN_SERVER_BALANCE) {
1231       if (LOG.isDebugEnabled()) {
1232         LOG.debug("Not running balancer because only " + cs.getNumServers()
1233             + " active regionserver(s)");
1234       }
1235       return false;
1236     }
1237     if(areSomeRegionReplicasColocated(c)) return true;
1238     // Check if we even need to do any load balancing
1239     // HBASE-3681 check sloppiness first
1240     float average = cs.getLoadAverage(); // for logging
1241     int floor = (int) Math.floor(average * (1 - slop));
1242     int ceiling = (int) Math.ceil(average * (1 + slop));
1243     if (!(cs.getMaxLoad() > ceiling || cs.getMinLoad() < floor)) {
1244       NavigableMap<ServerAndLoad, List<HRegionInfo>> serversByLoad = cs.getServersByLoad();
1245       if (LOG.isTraceEnabled()) {
1246         // If nothing to balance, then don't say anything unless trace-level logging.
1247         LOG.trace("Skipping load balancing because balanced cluster; " +
1248           "servers=" + cs.getNumServers() +
1249           " regions=" + cs.getNumRegions() + " average=" + average +
1250           " mostloaded=" + serversByLoad.lastKey().getLoad() +
1251           " leastloaded=" + serversByLoad.firstKey().getLoad());
1252       }
1253       return false;
1254     }
1255     return true;
1256   }
1257 
1258   /**
1259    * Subclasses should implement this to return true if the cluster has nodes that hosts
1260    * multiple replicas for the same region, or, if there are multiple racks and the same
1261    * rack hosts replicas of the same region
1262    * @param c Cluster information
1263    * @return whether region replicas are currently co-located
1264    */
1265   protected boolean areSomeRegionReplicasColocated(Cluster c) {
1266     return false;
1267   }
1268 
1269   /**
1270    * Generates a bulk assignment plan to be used on cluster startup using a
1271    * simple round-robin assignment.
1272    * <p>
1273    * Takes a list of all the regions and all the servers in the cluster and
1274    * returns a map of each server to the regions that it should be assigned.
1275    * <p>
1276    * Currently implemented as a round-robin assignment. Same invariant as load
1277    * balancing, all servers holding floor(avg) or ceiling(avg).
1278    *
1279    * TODO: Use block locations from HDFS to place regions with their blocks
1280    *
1281    * @param regions all regions
1282    * @param servers all servers
1283    * @return map of server to the regions it should take, or null if no
1284    *         assignment is possible (ie. no regions or no servers)
1285    */
1286   @Override
1287   public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(List<HRegionInfo> regions,
1288       List<ServerName> servers) {
1289     metricsBalancer.incrMiscInvocations();
1290     Map<ServerName, List<HRegionInfo>> assignments = assignMasterRegions(regions, servers);
1291     if (assignments != null && !assignments.isEmpty()) {
1292       servers = new ArrayList<ServerName>(servers);
1293       // Guarantee not to put other regions on master
1294       servers.remove(masterServerName);
1295       List<HRegionInfo> masterRegions = assignments.get(masterServerName);
1296       if (!masterRegions.isEmpty()) {
1297         regions = new ArrayList<HRegionInfo>(regions);
1298         for (HRegionInfo region: masterRegions) {
1299           regions.remove(region);
1300         }
1301       }
1302     }
1303     if (regions == null || regions.isEmpty()) {
1304       return assignments;
1305     }
1306 
1307     int numServers = servers == null ? 0 : servers.size();
1308     if (numServers == 0) {
1309       LOG.warn("Wanted to do round robin assignment but no servers to assign to");
1310       return null;
1311     }
1312 
1313     // TODO: instead of retainAssignment() and roundRobinAssignment(), we should just run the
1314     // normal LB.balancerCluster() with unassignedRegions. We only need to have a candidate
1315     // generator for AssignRegionAction. The LB will ensure the regions are mostly local
1316     // and balanced. This should also run fast with fewer number of iterations.
1317 
1318     if (numServers == 1) { // Only one server, nothing fancy we can do here
1319       ServerName server = servers.get(0);
1320       assignments.put(server, new ArrayList<HRegionInfo>(regions));
1321       return assignments;
1322     }
1323 
1324     Cluster cluster = createCluster(servers, regions, false);
1325     List<HRegionInfo> unassignedRegions = new ArrayList<HRegionInfo>();
1326 
1327     roundRobinAssignment(cluster, regions, unassignedRegions,
1328       servers, assignments);
1329 
1330     List<HRegionInfo> lastFewRegions = new ArrayList<HRegionInfo>();
1331     // assign the remaining by going through the list and try to assign to servers one-by-one
1332     int serverIdx = RANDOM.nextInt(numServers);
1333     for (HRegionInfo region : unassignedRegions) {
1334       boolean assigned = false;
1335       for (int j = 0; j < numServers; j++) { // try all servers one by one
1336         ServerName serverName = servers.get((j + serverIdx) % numServers);
1337         if (!cluster.wouldLowerAvailability(region, serverName)) {
1338           List<HRegionInfo> serverRegions = assignments.get(serverName);
1339           if (serverRegions == null) {
1340             serverRegions = new ArrayList<HRegionInfo>();
1341             assignments.put(serverName, serverRegions);
1342           }
1343           serverRegions.add(region);
1344           cluster.doAssignRegion(region, serverName);
1345           serverIdx = (j + serverIdx + 1) % numServers; //remain from next server
1346           assigned = true;
1347           break;
1348         }
1349       }
1350       if (!assigned) {
1351         lastFewRegions.add(region);
1352       }
1353     }
1354     // just sprinkle the rest of the regions on random regionservers. The balanceCluster will
1355     // make it optimal later. we can end up with this if numReplicas > numServers.
1356     for (HRegionInfo region : lastFewRegions) {
1357       int i = RANDOM.nextInt(numServers);
1358       ServerName server = servers.get(i);
1359       List<HRegionInfo> serverRegions = assignments.get(server);
1360       if (serverRegions == null) {
1361         serverRegions = new ArrayList<HRegionInfo>();
1362         assignments.put(server, serverRegions);
1363       }
1364       serverRegions.add(region);
1365       cluster.doAssignRegion(region, server);
1366     }
1367     return assignments;
1368   }
1369 
1370   protected Cluster createCluster(List<ServerName> servers,
1371       Collection<HRegionInfo> regions, boolean forceRefresh) {
1372     if (forceRefresh == true) {
1373       regionFinder.refreshAndWait(regions);
1374     }
1375     // Get the snapshot of the current assignments for the regions in question, and then create
1376     // a cluster out of it. Note that we might have replicas already assigned to some servers
1377     // earlier. So we want to get the snapshot to see those assignments, but this will only contain
1378     // replicas of the regions that are passed (for performance).
1379     Map<ServerName, List<HRegionInfo>> clusterState = getRegionAssignmentsByServer(regions);
1380 
1381     for (ServerName server : servers) {
1382       if (!clusterState.containsKey(server)) {
1383         clusterState.put(server, EMPTY_REGION_LIST);
1384       }
1385     }
1386     return new Cluster(regions, clusterState, null, this.regionFinder,
1387         rackManager);
1388   }
1389 
1390   /**
1391    * Generates an immediate assignment plan to be used by a new master for
1392    * regions in transition that do not have an already known destination.
1393    *
1394    * Takes a list of regions that need immediate assignment and a list of all
1395    * available servers. Returns a map of regions to the server they should be
1396    * assigned to.
1397    *
1398    * This method will return quickly and does not do any intelligent balancing.
1399    * The goal is to make a fast decision not the best decision possible.
1400    *
1401    * Currently this is random.
1402    *
1403    * @param regions
1404    * @param servers
1405    * @return map of regions to the server it should be assigned to
1406    */
1407   @Override
1408   public Map<HRegionInfo, ServerName> immediateAssignment(List<HRegionInfo> regions,
1409       List<ServerName> servers) {
1410     metricsBalancer.incrMiscInvocations();
1411     if (servers == null || servers.isEmpty()) {
1412       LOG.warn("Wanted to do random assignment but no servers to assign to");
1413       return null;
1414     }
1415 
1416     Map<HRegionInfo, ServerName> assignments = new TreeMap<HRegionInfo, ServerName>();
1417     for (HRegionInfo region : regions) {
1418       assignments.put(region, randomAssignment(region, servers));
1419     }
1420     return assignments;
1421   }
1422 
1423   /**
1424    * Used to assign a single region to a random server.
1425    */
1426   @Override
1427   public ServerName randomAssignment(HRegionInfo regionInfo, List<ServerName> servers) {
1428     metricsBalancer.incrMiscInvocations();
1429     if (servers != null && servers.contains(masterServerName)) {
1430       if (shouldBeOnMaster(regionInfo)) {
1431         return masterServerName;
1432       }
1433       servers = new ArrayList<ServerName>(servers);
1434       // Guarantee not to put other regions on master
1435       servers.remove(masterServerName);
1436     }
1437 
1438     int numServers = servers == null ? 0 : servers.size();
1439     if (numServers == 0) {
1440       LOG.warn("Wanted to do retain assignment but no servers to assign to");
1441       return null;
1442     }
1443     if (numServers == 1) { // Only one server, nothing fancy we can do here
1444       return servers.get(0);
1445     }
1446 
1447     List<HRegionInfo> regions = Lists.newArrayList(regionInfo);
1448     Cluster cluster = createCluster(servers, regions, false);
1449     return randomAssignment(cluster, regionInfo, servers);
1450   }
1451 
1452   /**
1453    * Generates a bulk assignment startup plan, attempting to reuse the existing
1454    * assignment information from META, but adjusting for the specified list of
1455    * available/online servers available for assignment.
1456    * <p>
1457    * Takes a map of all regions to their existing assignment from META. Also
1458    * takes a list of online servers for regions to be assigned to. Attempts to
1459    * retain all assignment, so in some instances initial assignment will not be
1460    * completely balanced.
1461    * <p>
1462    * Any leftover regions without an existing server to be assigned to will be
1463    * assigned randomly to available servers.
1464    *
1465    * @param regions regions and existing assignment from meta
1466    * @param servers available servers
1467    * @return map of servers and regions to be assigned to them
1468    */
1469   @Override
1470   public Map<ServerName, List<HRegionInfo>> retainAssignment(Map<HRegionInfo, ServerName> regions,
1471       List<ServerName> servers) {
1472     // Update metrics
1473     metricsBalancer.incrMiscInvocations();
1474     Map<ServerName, List<HRegionInfo>> assignments
1475       = assignMasterRegions(regions.keySet(), servers);
1476     if (assignments != null && !assignments.isEmpty()) {
1477       servers = new ArrayList<ServerName>(servers);
1478       // Guarantee not to put other regions on master
1479       servers.remove(masterServerName);
1480       List<HRegionInfo> masterRegions = assignments.get(masterServerName);
1481       if (!masterRegions.isEmpty()) {
1482         regions = new HashMap<HRegionInfo, ServerName>(regions);
1483         for (HRegionInfo region: masterRegions) {
1484           regions.remove(region);
1485         }
1486       }
1487     }
1488     if (regions == null || regions.isEmpty()) {
1489       return assignments;
1490     }
1491 
1492     int numServers = servers == null ? 0 : servers.size();
1493     if (numServers == 0) {
1494       LOG.warn("Wanted to do retain assignment but no servers to assign to");
1495       return null;
1496     }
1497     if (numServers == 1) { // Only one server, nothing fancy we can do here
1498       ServerName server = servers.get(0);
1499       assignments.put(server, new ArrayList<HRegionInfo>(regions.keySet()));
1500       return assignments;
1501     }
1502 
1503     // Group all of the old assignments by their hostname.
1504     // We can't group directly by ServerName since the servers all have
1505     // new start-codes.
1506 
1507     // Group the servers by their hostname. It's possible we have multiple
1508     // servers on the same host on different ports.
1509     ArrayListMultimap<String, ServerName> serversByHostname = ArrayListMultimap.create();
1510     for (ServerName server : servers) {
1511       assignments.put(server, new ArrayList<HRegionInfo>());
1512       serversByHostname.put(server.getHostname(), server);
1513     }
1514 
1515     // Collection of the hostnames that used to have regions
1516     // assigned, but for which we no longer have any RS running
1517     // after the cluster restart.
1518     Set<String> oldHostsNoLongerPresent = Sets.newTreeSet();
1519 
1520     int numRandomAssignments = 0;
1521     int numRetainedAssigments = 0;
1522 
1523     Cluster cluster = createCluster(servers, regions.keySet(), true);
1524 
1525     for (Map.Entry<HRegionInfo, ServerName> entry : regions.entrySet()) {
1526       HRegionInfo region = entry.getKey();
1527       ServerName oldServerName = entry.getValue();
1528       List<ServerName> localServers = new ArrayList<ServerName>();
1529       if (oldServerName != null) {
1530         localServers = serversByHostname.get(oldServerName.getHostname());
1531       }
1532       if (localServers.isEmpty()) {
1533         // No servers on the new cluster match up with this hostname,
1534         // assign randomly.
1535         ServerName randomServer = randomAssignment(cluster, region, servers);
1536         assignments.get(randomServer).add(region);
1537         numRandomAssignments++;
1538         if (oldServerName != null) oldHostsNoLongerPresent.add(oldServerName.getHostname());
1539       } else if (localServers.size() == 1) {
1540         // the usual case - one new server on same host
1541         ServerName target = localServers.get(0);
1542         assignments.get(target).add(region);
1543         cluster.doAssignRegion(region, target);
1544         numRetainedAssigments++;
1545       } else {
1546         // multiple new servers in the cluster on this same host
1547         if (localServers.contains(oldServerName)) {
1548           assignments.get(oldServerName).add(region);
1549           cluster.doAssignRegion(region, oldServerName);
1550         } else {
1551           ServerName target = null;
1552           for (ServerName tmp: localServers) {
1553             if (tmp.getPort() == oldServerName.getPort()) {
1554               target = tmp;
1555               break;
1556             }
1557           }
1558           if (target == null) {
1559             target = randomAssignment(cluster, region, localServers);
1560           }
1561           assignments.get(target).add(region);
1562         }
1563         numRetainedAssigments++;
1564       }
1565     }
1566 
1567     String randomAssignMsg = "";
1568     if (numRandomAssignments > 0) {
1569       randomAssignMsg =
1570           numRandomAssignments + " regions were assigned "
1571               + "to random hosts, since the old hosts for these regions are no "
1572               + "longer present in the cluster. These hosts were:\n  "
1573               + Joiner.on("\n  ").join(oldHostsNoLongerPresent);
1574     }
1575 
1576     LOG.info("Reassigned " + regions.size() + " regions. " + numRetainedAssigments
1577         + " retained the pre-restart assignment. " + randomAssignMsg);
1578     return assignments;
1579   }
1580 
1581   @Override
1582   public void initialize() throws HBaseIOException{
1583   }
1584 
1585   @Override
1586   public void regionOnline(HRegionInfo regionInfo, ServerName sn) {
1587   }
1588 
1589   @Override
1590   public void regionOffline(HRegionInfo regionInfo) {
1591   }
1592 
1593   @Override
1594   public boolean isStopped() {
1595     return stopped;
1596   }
1597 
1598   @Override
1599   public void stop(String why) {
1600     LOG.info("Load Balancer stop requested: "+why);
1601     stopped = true;
1602   }
1603 
1604   /**
1605    * Used to assign a single region to a random server.
1606    */
1607   private ServerName randomAssignment(Cluster cluster, HRegionInfo regionInfo,
1608       List<ServerName> servers) {
1609     int numServers = servers.size(); // servers is not null, numServers > 1
1610     ServerName sn = null;
1611     final int maxIterations = numServers * 4;
1612     int iterations = 0;
1613 
1614     do {
1615       int i = RANDOM.nextInt(numServers);
1616       sn = servers.get(i);
1617     } while (cluster.wouldLowerAvailability(regionInfo, sn)
1618         && iterations++ < maxIterations);
1619     cluster.doAssignRegion(regionInfo, sn);
1620     return sn;
1621   }
1622 
1623   /**
1624    * Round robin a list of regions to a list of servers
1625    */
1626   private void roundRobinAssignment(Cluster cluster, List<HRegionInfo> regions,
1627       List<HRegionInfo> unassignedRegions, List<ServerName> servers,
1628       Map<ServerName, List<HRegionInfo>> assignments) {
1629 
1630     int numServers = servers.size();
1631     int numRegions = regions.size();
1632     int max = (int) Math.ceil((float) numRegions / numServers);
1633     int serverIdx = 0;
1634     if (numServers > 1) {
1635       serverIdx = RANDOM.nextInt(numServers);
1636     }
1637     int regionIdx = 0;
1638 
1639     for (int j = 0; j < numServers; j++) {
1640       ServerName server = servers.get((j + serverIdx) % numServers);
1641       List<HRegionInfo> serverRegions = new ArrayList<HRegionInfo>(max);
1642       for (int i = regionIdx; i < numRegions; i += numServers) {
1643         HRegionInfo region = regions.get(i % numRegions);
1644         if (cluster.wouldLowerAvailability(region, server)) {
1645           unassignedRegions.add(region);
1646         } else {
1647           serverRegions.add(region);
1648           cluster.doAssignRegion(region, server);
1649         }
1650       }
1651       assignments.put(server, serverRegions);
1652       regionIdx++;
1653     }
1654   }
1655 
1656   protected Map<ServerName, List<HRegionInfo>> getRegionAssignmentsByServer(
1657     Collection<HRegionInfo> regions) {
1658     if (this.services != null && this.services.getAssignmentManager() != null) {
1659       return this.services.getAssignmentManager().getSnapShotOfAssignment(regions);
1660     } else {
1661       return new HashMap<ServerName, List<HRegionInfo>>();
1662     }
1663   }
1664 
1665   @Override
1666   public void onConfigurationChange(Configuration conf) {
1667   }
1668 }