View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.balancer;
19  
20  import java.util.ArrayDeque;
21  import java.util.ArrayList;
22  import java.util.Arrays;
23  import java.util.Collection;
24  import java.util.Collections;
25  import java.util.Deque;
26  import java.util.HashMap;
27  import java.util.LinkedList;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Map.Entry;
31  import java.util.Random;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.hbase.classification.InterfaceAudience;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.hbase.ClusterStatus;
38  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.HRegionInfo;
41  import org.apache.hadoop.hbase.HTableDescriptor;
42  import org.apache.hadoop.hbase.RegionLoad;
43  import org.apache.hadoop.hbase.ServerLoad;
44  import org.apache.hadoop.hbase.ServerName;
45  import org.apache.hadoop.hbase.TableName;
46  import org.apache.hadoop.hbase.master.MasterServices;
47  import org.apache.hadoop.hbase.master.RegionPlan;
48  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
49  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
50  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
51  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.LocalityType;
52  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
53  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
54  import org.apache.hadoop.hbase.util.Bytes;
55  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
56  
57  import com.google.common.base.Optional;
58  
59  /**
60   * <p>This is a best effort load balancer. Given a Cost function F(C) => x It will
61   * randomly try and mutate the cluster to Cprime. If F(Cprime) < F(C) then the
62   * new cluster state becomes the plan. It includes costs functions to compute the cost of:</p>
63   * <ul>
64   * <li>Region Load</li>
65   * <li>Table Load</li>
66   * <li>Data Locality</li>
67   * <li>Memstore Sizes</li>
68   * <li>Storefile Sizes</li>
69   * </ul>
70   *
71   *
72   * <p>Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost
73   * best solution, and 1 is the highest possible cost and the worst solution.  The computed costs are
74   * scaled by their respective multipliers:</p>
75   *
76   * <ul>
77   *   <li>hbase.master.balancer.stochastic.regionLoadCost</li>
78   *   <li>hbase.master.balancer.stochastic.moveCost</li>
79   *   <li>hbase.master.balancer.stochastic.tableLoadCost</li>
80   *   <li>hbase.master.balancer.stochastic.localityCost</li>
81   *   <li>hbase.master.balancer.stochastic.memstoreSizeCost</li>
82   *   <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
83   * </ul>
84   *
85   * <p>In addition to the above configurations, the balancer can be tuned by the following
86   * configuration values:</p>
87   * <ul>
88   *   <li>hbase.master.balancer.stochastic.maxMoveRegions which
89   *   controls what the max number of regions that can be moved in a single invocation of this
90   *   balancer.</li>
91   *   <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
92   *   regions is multiplied to try and get the number of times the balancer will
93   *   mutate all servers.</li>
94   *   <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that
95   *   the balancer will try and mutate all the servers. The balancer will use the minimum of this
96   *   value and the above computation.</li>
97   * </ul>
98   *
99   * <p>This balancer is best used with hbase.master.loadbalance.bytable set to false
100  * so that the balancer gets the full picture of all loads on the cluster.</p>
101  */
102 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
103 public class StochasticLoadBalancer extends BaseLoadBalancer {
104 
105   protected static final String STEPS_PER_REGION_KEY =
106       "hbase.master.balancer.stochastic.stepsPerRegion";
107   protected static final String MAX_STEPS_KEY =
108       "hbase.master.balancer.stochastic.maxSteps";
109   protected static final String MAX_RUNNING_TIME_KEY =
110       "hbase.master.balancer.stochastic.maxRunningTime";
111   protected static final String KEEP_REGION_LOADS =
112       "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
113   private static final String TABLE_FUNCTION_SEP = "_";
114 
115   private static final Random RANDOM = new Random(System.currentTimeMillis());
116   private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
117 
118   Map<String, Deque<RegionLoad>> loads = new HashMap<String, Deque<RegionLoad>>();
119 
120   // values are defaults
121   private int maxSteps = 1000000;
122   private int stepsPerRegion = 800;
123   private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
124   private int numRegionLoadsToRemember = 15;
125 
126   private CandidateGenerator[] candidateGenerators;
127   private CostFromRegionLoadFunction[] regionLoadFunctions;
128   private CostFunction[] costFunctions;
129 
130   // to save and report costs to JMX
131   private Double curOverallCost = 0d;
132   private Double[] tempFunctionCosts;
133   private Double[] curFunctionCosts;
134 
135   // Keep locality based picker and cost function to alert them
136   // when new services are offered
137   private LocalityBasedCandidateGenerator localityCandidateGenerator;
138   private ServerLocalityCostFunction localityCost;
139   private RackLocalityCostFunction rackLocalityCost;
140   private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
141   private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
142   private boolean isByTable = false;
143   private TableName tableName = null;
144 
145   /**
146    * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
147    * default MetricsBalancer
148    */
149   public StochasticLoadBalancer() {
150     super(new MetricsStochasticBalancer());
151   }
152 
153   @Override
154   public void onConfigurationChange(Configuration conf) {
155     setConf(conf);
156   }
157 
158   @Override
159   public synchronized void setConf(Configuration conf) {
160     super.setConf(conf);
161     LOG.info("loading config");
162 
163     maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
164 
165     stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
166     maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
167 
168     numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
169     isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
170 
171     if (localityCandidateGenerator == null) {
172       localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
173     }
174     localityCost = new ServerLocalityCostFunction(conf, services);
175     rackLocalityCost = new RackLocalityCostFunction(conf, services);
176 
177     if (candidateGenerators == null) {
178       candidateGenerators = new CandidateGenerator[] {
179           new RandomCandidateGenerator(),
180           new LoadCandidateGenerator(),
181           localityCandidateGenerator,
182           new RegionReplicaRackCandidateGenerator(),
183       };
184     }
185 
186     regionLoadFunctions = new CostFromRegionLoadFunction[] {
187       new ReadRequestCostFunction(conf),
188       new WriteRequestCostFunction(conf),
189       new MemstoreSizeCostFunction(conf),
190       new StoreFileCostFunction(conf)
191     };
192 
193     regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
194     regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
195 
196     costFunctions = new CostFunction[]{
197       new RegionCountSkewCostFunction(conf),
198       new PrimaryRegionCountSkewCostFunction(conf),
199       new MoveCostFunction(conf),
200       localityCost,
201       rackLocalityCost,
202       new TableSkewCostFunction(conf),
203       regionReplicaHostCostFunction,
204       regionReplicaRackCostFunction,
205       regionLoadFunctions[0],
206       regionLoadFunctions[1],
207       regionLoadFunctions[2],
208       regionLoadFunctions[3],
209     };
210 
211     curFunctionCosts= new Double[costFunctions.length];
212     tempFunctionCosts= new Double[costFunctions.length];
213 
214   }
215 
216   @Override
217   protected void setSlop(Configuration conf) {
218     this.slop = conf.getFloat("hbase.regions.slop", 0.001F);
219   }
220 
221   @Override
222   public synchronized void setClusterStatus(ClusterStatus st) {
223     super.setClusterStatus(st);
224     updateRegionLoad();
225     for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
226       cost.setClusterStatus(st);
227     }
228 
229     // update metrics size
230     try {
231       // by-table or ensemble mode
232       int tablesCount = isByTable ? services.getTableDescriptors().getAll().size() : 1;
233       int functionsCount = getCostFunctionNames().length;
234 
235       updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall
236     } catch (Exception e) {
237       LOG.error("failed to get the size of all tables, exception = " + e.getMessage());
238     }
239   }
240 
241   /**
242    * Update the number of metrics that are reported to JMX
243    */
244   public void updateMetricsSize(int size) {
245     if (metricsBalancer instanceof MetricsStochasticBalancer) {
246         ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size);
247     }
248   }
249 
250   @Override
251   public synchronized void setMasterServices(MasterServices masterServices) {
252     super.setMasterServices(masterServices);
253     this.localityCost.setServices(masterServices);
254     this.rackLocalityCost.setServices(masterServices);
255     this.localityCandidateGenerator.setServices(masterServices);
256   }
257 
258   @Override
259   protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) {
260     regionReplicaHostCostFunction.init(c);
261     if (regionReplicaHostCostFunction.cost() > 0) return true;
262     regionReplicaRackCostFunction.init(c);
263     if (regionReplicaRackCostFunction.cost() > 0) return true;
264     return false;
265   }
266 
267   @Override
268   public synchronized List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName,
269     List<HRegionInfo>> clusterState) {
270     this.tableName = tableName;
271     return balanceCluster(clusterState);
272   }
273 
274   /**
275    * Given the cluster state this will try and approach an optimal balance. This
276    * should always approach the optimal state given enough steps.
277    */
278   @Override
279   public synchronized List<RegionPlan> balanceCluster(Map<ServerName,
280     List<HRegionInfo>> clusterState) {
281     List<RegionPlan> plans = balanceMasterRegions(clusterState);
282     if (plans != null || clusterState == null || clusterState.size() <= 1) {
283       return plans;
284     }
285 
286     if (masterServerName != null && clusterState.containsKey(masterServerName)) {
287       if (clusterState.size() <= 2) {
288         return null;
289       }
290       clusterState = new HashMap<ServerName, List<HRegionInfo>>(clusterState);
291       clusterState.remove(masterServerName);
292     }
293 
294     // On clusters with lots of HFileLinks or lots of reference files,
295     // instantiating the storefile infos can be quite expensive.
296     // Allow turning this feature off if the locality cost is not going to
297     // be used in any computations.
298     RegionLocationFinder finder = null;
299     if (this.localityCost != null && this.localityCost.getMultiplier() > 0
300         || this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0) {
301       finder = this.regionFinder;
302     }
303 
304     //The clusterState that is given to this method contains the state
305     //of all the regions in the table(s) (that's true today)
306     // Keep track of servers to iterate through them.
307     Cluster cluster = new Cluster(clusterState, loads, finder, rackManager);
308 
309     if (!needsBalance(cluster)) {
310       return null;
311     }
312 
313     long startTime = EnvironmentEdgeManager.currentTime();
314 
315     initCosts(cluster);
316 
317     double currentCost = computeCost(cluster, Double.MAX_VALUE);
318     curOverallCost = currentCost;
319     for (int i = 0; i < this.curFunctionCosts.length; i++) {
320       curFunctionCosts[i] = tempFunctionCosts[i];
321     }
322 
323     double initCost = currentCost;
324     double newCost = currentCost;
325 
326     long computedMaxSteps = Math.min(this.maxSteps,
327         ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
328     // Perform a stochastic walk to see if we can get a good fit.
329     long step;
330 
331     for (step = 0; step < computedMaxSteps; step++) {
332       int generatorIdx = RANDOM.nextInt(candidateGenerators.length);
333       CandidateGenerator p = candidateGenerators[generatorIdx];
334       Cluster.Action action = p.generate(cluster);
335 
336       if (action.type == Type.NULL) {
337         continue;
338       }
339 
340       cluster.doAction(action);
341       updateCostsWithAction(cluster, action);
342 
343       newCost = computeCost(cluster, currentCost);
344 
345       // Should this be kept?
346       if (newCost < currentCost) {
347         currentCost = newCost;
348 
349         // save for JMX
350         curOverallCost = currentCost;
351         for (int i = 0; i < this.curFunctionCosts.length; i++) {
352           curFunctionCosts[i] = tempFunctionCosts[i];
353         }
354       } else {
355         // Put things back the way they were before.
356         // TODO: undo by remembering old values
357         Action undoAction = action.undoAction();
358         cluster.doAction(undoAction);
359         updateCostsWithAction(cluster, undoAction);
360       }
361 
362       if (EnvironmentEdgeManager.currentTime() - startTime >
363           maxRunningTime) {
364         break;
365       }
366     }
367     long endTime = EnvironmentEdgeManager.currentTime();
368 
369     metricsBalancer.balanceCluster(endTime - startTime);
370 
371     // update costs metrics
372     updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
373     if (initCost > currentCost) {
374       plans = createRegionPlans(cluster);
375       if (LOG.isDebugEnabled()) {
376         LOG.debug("Finished computing new load balance plan.  Computation took "
377             + (endTime - startTime) + "ms to try " + step
378             + " different iterations.  Found a solution that moves "
379             + plans.size() + " regions; Going from a computed cost of "
380             + initCost + " to a new cost of " + currentCost);
381       }
382 
383       return plans;
384     }
385     if (LOG.isDebugEnabled()) {
386       LOG.debug("Could not find a better load balance plan.  Tried "
387           + step + " different configurations in " + (endTime - startTime)
388           + "ms, and did not find anything with a computed cost less than " + initCost);
389     }
390     return null;
391   }
392 
393   /**
394    * update costs to JMX
395    */
396   private void updateStochasticCosts(TableName tableName, Double overall, Double[] subCosts) {
397     if (tableName == null) return;
398 
399     // check if the metricsBalancer is MetricsStochasticBalancer before casting
400     if (metricsBalancer instanceof MetricsStochasticBalancer) {
401       MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer;
402       // overall cost
403       balancer.updateStochasticCost(tableName.getNameAsString(),
404         "Overall", "Overall cost", overall);
405 
406       // each cost function
407       for (int i = 0; i < costFunctions.length; i++) {
408         CostFunction costFunction = costFunctions[i];
409         String costFunctionName = costFunction.getClass().getSimpleName();
410         Double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall);
411         // TODO: cost function may need a specific description
412         balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName,
413           "The percent of " + costFunctionName, costPercent);
414       }
415     }
416   }
417 
418 
419   /**
420    * Create all of the RegionPlan's needed to move from the initial cluster state to the desired
421    * state.
422    *
423    * @param cluster The state of the cluster
424    * @return List of RegionPlan's that represent the moves needed to get to desired final state.
425    */
426   private List<RegionPlan> createRegionPlans(Cluster cluster) {
427     List<RegionPlan> plans = new LinkedList<RegionPlan>();
428     for (int regionIndex = 0;
429          regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
430       int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
431       int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
432 
433       if (initialServerIndex != newServerIndex) {
434         HRegionInfo region = cluster.regions[regionIndex];
435         ServerName initialServer = cluster.servers[initialServerIndex];
436         ServerName newServer = cluster.servers[newServerIndex];
437 
438         if (LOG.isTraceEnabled()) {
439           LOG.trace("Moving Region " + region.getEncodedName() + " from server "
440               + initialServer.getHostname() + " to " + newServer.getHostname());
441         }
442         RegionPlan rp = new RegionPlan(region, initialServer, newServer);
443         plans.add(rp);
444       }
445     }
446     return plans;
447   }
448 
449   /**
450    * Store the current region loads.
451    */
452   private synchronized void updateRegionLoad() {
453     // We create a new hashmap so that regions that are no longer there are removed.
454     // However we temporarily need the old loads so we can use them to keep the rolling average.
455     Map<String, Deque<RegionLoad>> oldLoads = loads;
456     loads = new HashMap<String, Deque<RegionLoad>>();
457 
458     for (ServerName sn : clusterStatus.getServers()) {
459       ServerLoad sl = clusterStatus.getLoad(sn);
460       if (sl == null) {
461         continue;
462       }
463       for (Entry<byte[], RegionLoad> entry : sl.getRegionsLoad().entrySet()) {
464         Deque<RegionLoad> rLoads = oldLoads.get(Bytes.toString(entry.getKey()));
465         if (rLoads == null) {
466           // There was nothing there
467           rLoads = new ArrayDeque<RegionLoad>();
468         } else if (rLoads.size() >= numRegionLoadsToRemember) {
469           rLoads.remove();
470         }
471         rLoads.add(entry.getValue());
472         loads.put(Bytes.toString(entry.getKey()), rLoads);
473 
474       }
475     }
476 
477     for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
478       cost.setLoads(loads);
479     }
480   }
481 
482   protected void initCosts(Cluster cluster) {
483     for (CostFunction c:costFunctions) {
484       c.init(cluster);
485     }
486   }
487 
488   protected void updateCostsWithAction(Cluster cluster, Action action) {
489     for (CostFunction c : costFunctions) {
490       c.postAction(action);
491     }
492   }
493 
494   /**
495    * Get the names of the cost functions
496    */
497   public String[] getCostFunctionNames() {
498     if (costFunctions == null) return null;
499     String[] ret = new String[costFunctions.length];
500     for (int i = 0; i < costFunctions.length; i++) {
501       CostFunction c = costFunctions[i];
502       ret[i] = c.getClass().getSimpleName();
503     }
504 
505     return ret;
506   }
507 
508   /**
509    * This is the main cost function.  It will compute a cost associated with a proposed cluster
510    * state.  All different costs will be combined with their multipliers to produce a double cost.
511    *
512    * @param cluster The state of the cluster
513    * @param previousCost the previous cost. This is used as an early out.
514    * @return a double of a cost associated with the proposed cluster state.  This cost is an
515    *         aggregate of all individual cost functions.
516    */
517   protected double computeCost(Cluster cluster, double previousCost) {
518     double total = 0;
519 
520     for (int i = 0; i < costFunctions.length; i++) {
521       CostFunction c = costFunctions[i];
522       this.tempFunctionCosts[i] = 0.0;
523 
524       if (c.getMultiplier() <= 0) {
525         continue;
526       }
527 
528       Float multiplier = c.getMultiplier();
529       Double cost = c.cost();
530 
531       this.tempFunctionCosts[i] = multiplier*cost;
532       total += this.tempFunctionCosts[i];
533 
534       if (total > previousCost) {
535         break;
536       }
537     }
538 
539     return total;
540   }
541 
542   /** Generates a candidate action to be applied to the cluster for cost function search */
543   abstract static class CandidateGenerator {
544     abstract Cluster.Action generate(Cluster cluster);
545 
546     /**
547      * From a list of regions pick a random one. Null can be returned which
548      * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
549      * rather than swap.
550      *
551      * @param cluster        The state of the cluster
552      * @param server         index of the server
553      * @param chanceOfNoSwap Chance that this will decide to try a move rather
554      *                       than a swap.
555      * @return a random {@link HRegionInfo} or null if an asymmetrical move is
556      *         suggested.
557      */
558     protected int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) {
559       // Check to see if this is just a move.
560       if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) {
561         // signal a move only.
562         return -1;
563       }
564       int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length);
565       return cluster.regionsPerServer[server][rand];
566 
567     }
568     protected int pickRandomServer(Cluster cluster) {
569       if (cluster.numServers < 1) {
570         return -1;
571       }
572 
573       return RANDOM.nextInt(cluster.numServers);
574     }
575 
576     protected int pickRandomRack(Cluster cluster) {
577       if (cluster.numRacks < 1) {
578         return -1;
579       }
580 
581       return RANDOM.nextInt(cluster.numRacks);
582     }
583 
584     protected int pickOtherRandomServer(Cluster cluster, int serverIndex) {
585       if (cluster.numServers < 2) {
586         return -1;
587       }
588       while (true) {
589         int otherServerIndex = pickRandomServer(cluster);
590         if (otherServerIndex != serverIndex) {
591           return otherServerIndex;
592         }
593       }
594     }
595 
596     protected int pickOtherRandomRack(Cluster cluster, int rackIndex) {
597       if (cluster.numRacks < 2) {
598         return -1;
599       }
600       while (true) {
601         int otherRackIndex = pickRandomRack(cluster);
602         if (otherRackIndex != rackIndex) {
603           return otherRackIndex;
604         }
605       }
606     }
607 
608     protected Cluster.Action pickRandomRegions(Cluster cluster,
609                                                        int thisServer,
610                                                        int otherServer) {
611       if (thisServer < 0 || otherServer < 0) {
612         return Cluster.NullAction;
613       }
614 
615       // Decide who is most likely to need another region
616       int thisRegionCount = cluster.getNumRegions(thisServer);
617       int otherRegionCount = cluster.getNumRegions(otherServer);
618 
619       // Assign the chance based upon the above
620       double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5;
621       double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5;
622 
623       int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
624       int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
625 
626       return getAction(thisServer, thisRegion, otherServer, otherRegion);
627     }
628 
629     protected Cluster.Action getAction(int fromServer, int fromRegion,
630         int toServer, int toRegion) {
631       if (fromServer < 0 || toServer < 0) {
632         return Cluster.NullAction;
633       }
634       if (fromRegion > 0 && toRegion > 0) {
635         return new Cluster.SwapRegionsAction(fromServer, fromRegion,
636           toServer, toRegion);
637       } else if (fromRegion > 0) {
638         return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer);
639       } else if (toRegion > 0) {
640         return new Cluster.MoveRegionAction(toRegion, toServer, fromServer);
641       } else {
642         return Cluster.NullAction;
643       }
644     }
645 
646     /**
647      * Returns a random iteration order of indexes of an array with size length
648      */
649     protected List<Integer> getRandomIterationOrder(int length) {
650       ArrayList<Integer> order = new ArrayList<>(length);
651       for (int i = 0; i < length; i++) {
652         order.add(i);
653       }
654       Collections.shuffle(order);
655       return order;
656     }
657   }
658 
659   static class RandomCandidateGenerator extends CandidateGenerator {
660 
661     @Override
662     Cluster.Action generate(Cluster cluster) {
663 
664       int thisServer = pickRandomServer(cluster);
665 
666       // Pick the other server
667       int otherServer = pickOtherRandomServer(cluster, thisServer);
668 
669       return pickRandomRegions(cluster, thisServer, otherServer);
670     }
671   }
672 
673   static class LoadCandidateGenerator extends CandidateGenerator {
674 
675     @Override
676     Cluster.Action generate(Cluster cluster) {
677       cluster.sortServersByRegionCount();
678       int thisServer = pickMostLoadedServer(cluster, -1);
679       int otherServer = pickLeastLoadedServer(cluster, thisServer);
680 
681       return pickRandomRegions(cluster, thisServer, otherServer);
682     }
683 
684     private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
685       Integer[] servers = cluster.serverIndicesSortedByRegionCount;
686 
687       int index = 0;
688       while (servers[index] == null || servers[index] == thisServer) {
689         index++;
690         if (index == servers.length) {
691           return -1;
692         }
693       }
694       return servers[index];
695     }
696 
697     private int pickMostLoadedServer(final Cluster cluster, int thisServer) {
698       Integer[] servers = cluster.serverIndicesSortedByRegionCount;
699 
700       int index = servers.length - 1;
701       while (servers[index] == null || servers[index] == thisServer) {
702         index--;
703         if (index < 0) {
704           return -1;
705         }
706       }
707       return servers[index];
708     }
709   }
710 
711   static class LocalityBasedCandidateGenerator extends CandidateGenerator {
712 
713     private MasterServices masterServices;
714 
715     LocalityBasedCandidateGenerator(MasterServices masterServices) {
716       this.masterServices = masterServices;
717     }
718 
719     @Override
720     Cluster.Action generate(Cluster cluster) {
721       if (this.masterServices == null) {
722         int thisServer = pickRandomServer(cluster);
723         // Pick the other server
724         int otherServer = pickOtherRandomServer(cluster, thisServer);
725         return pickRandomRegions(cluster, thisServer, otherServer);
726       }
727 
728       // Randomly iterate through regions until you find one that is not on ideal host
729       for (int region : getRandomIterationOrder(cluster.numRegions)) {
730         int currentServer = cluster.regionIndexToServerIndex[region];
731         if (currentServer != cluster.getOrComputeRegionsToMostLocalEntities(LocalityType.SERVER)[region]) {
732           Optional<Action> potential = tryMoveOrSwap(
733               cluster,
734               currentServer,
735               region,
736               cluster.getOrComputeRegionsToMostLocalEntities(LocalityType.SERVER)[region]
737           );
738           if (potential.isPresent()) {
739             return potential.get();
740           }
741         }
742       }
743       return Cluster.NullAction;
744     }
745 
746     /**
747      * Try to generate a move/swap fromRegion between fromServer and toServer such that locality is improved.
748      * Returns empty optional if no move can be found
749      */
750     private Optional<Action> tryMoveOrSwap(Cluster cluster,
751                                            int fromServer,
752                                            int fromRegion,
753                                            int toServer) {
754       // Try move first. We know apriori fromRegion has the highest locality on toServer
755       if (cluster.serverHasTooFewRegions(toServer)) {
756         return Optional.of(getAction(fromServer, fromRegion, toServer, -1));
757       }
758 
759       // Compare locality gain/loss from swapping fromRegion with regions on toServer
760       double fromRegionLocalityDelta =
761           getWeightedLocality(cluster, fromRegion, toServer) - getWeightedLocality(cluster, fromRegion, fromServer);
762       for (int toRegionIndex : getRandomIterationOrder(cluster.regionsPerServer[toServer].length)) {
763         int toRegion = cluster.regionsPerServer[toServer][toRegionIndex];
764         double toRegionLocalityDelta =
765             getWeightedLocality(cluster, toRegion, fromServer) - getWeightedLocality(cluster, toRegion, toServer);
766         // If locality would remain neutral or improve, attempt the swap
767         if (fromRegionLocalityDelta + toRegionLocalityDelta >= 0) {
768           return Optional.of(getAction(fromServer, fromRegion, toServer, toRegion));
769         }
770       }
771 
772       return Optional.absent();
773     }
774 
775     private double getWeightedLocality(Cluster cluster, int region, int server) {
776       return cluster.getOrComputeWeightedLocality(region, server, LocalityType.SERVER);
777     }
778 
779     void setServices(MasterServices services) {
780       this.masterServices = services;
781     }
782   }
783 
784   /**
785    * Generates candidates which moves the replicas out of the region server for
786    * co-hosted region replicas
787    */
788   static class RegionReplicaCandidateGenerator extends CandidateGenerator {
789 
790     RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator();
791 
792     /**
793      * Randomly select one regionIndex out of all region replicas co-hosted in the same group
794      * (a group is a server, host or rack)
795      * @param primariesOfRegionsPerGroup either Cluster.primariesOfRegionsPerServer,
796      * primariesOfRegionsPerHost or primariesOfRegionsPerRack
797      * @param regionsPerGroup either Cluster.regionsPerServer, regionsPerHost or regionsPerRack
798      * @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex
799      * @return a regionIndex for the selected primary or -1 if there is no co-locating
800      */
801     int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup
802         , int[] regionIndexToPrimaryIndex) {
803       int currentPrimary = -1;
804       int currentPrimaryIndex = -1;
805       int selectedPrimaryIndex = -1;
806       double currentLargestRandom = -1;
807       // primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region
808       // ids for the regions hosted in server, a consecutive repetition means that replicas
809       // are co-hosted
810       for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) {
811         int primary = j < primariesOfRegionsPerGroup.length
812             ? primariesOfRegionsPerGroup[j] : -1;
813         if (primary != currentPrimary) { // check for whether we see a new primary
814           int numReplicas = j - currentPrimaryIndex;
815           if (numReplicas > 1) { // means consecutive primaries, indicating co-location
816             // decide to select this primary region id or not
817             double currentRandom = RANDOM.nextDouble();
818             // we don't know how many region replicas are co-hosted, we will randomly select one
819             // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
820             if (currentRandom > currentLargestRandom) {
821               selectedPrimaryIndex = currentPrimary;
822               currentLargestRandom = currentRandom;
823             }
824           }
825           currentPrimary = primary;
826           currentPrimaryIndex = j;
827         }
828       }
829 
830       // we have found the primary id for the region to move. Now find the actual regionIndex
831       // with the given primary, prefer to move the secondary region.
832       for (int j = 0; j < regionsPerGroup.length; j++) {
833         int regionIndex = regionsPerGroup[j];
834         if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) {
835           // always move the secondary, not the primary
836           if (selectedPrimaryIndex != regionIndex) {
837             return regionIndex;
838           }
839         }
840       }
841       return -1;
842     }
843 
844     @Override
845     Cluster.Action generate(Cluster cluster) {
846       int serverIndex = pickRandomServer(cluster);
847       if (cluster.numServers <= 1 || serverIndex == -1) {
848         return Cluster.NullAction;
849       }
850 
851       int regionIndex = selectCoHostedRegionPerGroup(
852         cluster.primariesOfRegionsPerServer[serverIndex],
853         cluster.regionsPerServer[serverIndex],
854         cluster.regionIndexToPrimaryIndex);
855 
856       // if there are no pairs of region replicas co-hosted, default to random generator
857       if (regionIndex == -1) {
858         // default to randompicker
859         return randomGenerator.generate(cluster);
860       }
861 
862       int toServerIndex = pickOtherRandomServer(cluster, serverIndex);
863       int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
864       return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
865     }
866   }
867 
868   /**
869    * Generates candidates which moves the replicas out of the rack for
870    * co-hosted region replicas in the same rack
871    */
872   static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator {
873     @Override
874     Cluster.Action generate(Cluster cluster) {
875       int rackIndex = pickRandomRack(cluster);
876       if (cluster.numRacks <= 1 || rackIndex == -1) {
877         return super.generate(cluster);
878       }
879 
880       int regionIndex = selectCoHostedRegionPerGroup(
881         cluster.primariesOfRegionsPerRack[rackIndex],
882         cluster.regionsPerRack[rackIndex],
883         cluster.regionIndexToPrimaryIndex);
884 
885       // if there are no pairs of region replicas co-hosted, default to random generator
886       if (regionIndex == -1) {
887         // default to randompicker
888         return randomGenerator.generate(cluster);
889       }
890 
891       int serverIndex = cluster.regionIndexToServerIndex[regionIndex];
892       int toRackIndex = pickOtherRandomRack(cluster, rackIndex);
893 
894       int rand = RANDOM.nextInt(cluster.serversPerRack[toRackIndex].length);
895       int toServerIndex = cluster.serversPerRack[toRackIndex][rand];
896       int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
897       return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
898     }
899   }
900 
901   /**
902    * Base class of StochasticLoadBalancer's Cost Functions.
903    */
904   abstract static class CostFunction {
905 
906     private float multiplier = 0;
907 
908     protected Cluster cluster;
909 
910     CostFunction(Configuration c) {
911 
912     }
913 
914     float getMultiplier() {
915       return multiplier;
916     }
917 
918     void setMultiplier(float m) {
919       this.multiplier = m;
920     }
921 
922     /** Called once per LB invocation to give the cost function
923      * to initialize it's state, and perform any costly calculation.
924      */
925     void init(Cluster cluster) {
926       this.cluster = cluster;
927     }
928 
929     /** Called once per cluster Action to give the cost function
930      * an opportunity to update it's state. postAction() is always
931      * called at least once before cost() is called with the cluster
932      * that this action is performed on. */
933     void postAction(Action action) {
934       switch (action.type) {
935       case NULL: break;
936       case ASSIGN_REGION:
937         AssignRegionAction ar = (AssignRegionAction) action;
938         regionMoved(ar.region, -1, ar.server);
939         break;
940       case MOVE_REGION:
941         MoveRegionAction mra = (MoveRegionAction) action;
942         regionMoved(mra.region, mra.fromServer, mra.toServer);
943         break;
944       case SWAP_REGIONS:
945         SwapRegionsAction a = (SwapRegionsAction) action;
946         regionMoved(a.fromRegion, a.fromServer, a.toServer);
947         regionMoved(a.toRegion, a.toServer, a.fromServer);
948         break;
949       default:
950         throw new RuntimeException("Uknown action:" + action.type);
951       }
952     }
953 
954     protected void regionMoved(int region, int oldServer, int newServer) {
955     }
956 
957     abstract double cost();
958 
959     /**
960      * Function to compute a scaled cost using {@link DescriptiveStatistics}. It
961      * assumes that this is a zero sum set of costs.  It assumes that the worst case
962      * possible is all of the elements in one region server and the rest having 0.
963      *
964      * @param stats the costs
965      * @return a scaled set of costs.
966      */
967     protected double costFromArray(double[] stats) {
968       double totalCost = 0;
969       double total = getSum(stats);
970 
971       double count = stats.length;
972       double mean = total/count;
973 
974       // Compute max as if all region servers had 0 and one had the sum of all costs.  This must be
975       // a zero sum cost for this to make sense.
976       double max = ((count - 1) * mean) + (total - mean);
977 
978       // It's possible that there aren't enough regions to go around
979       double min;
980       if (count > total) {
981         min = ((count - total) * mean) + ((1 - mean) * total);
982       } else {
983         // Some will have 1 more than everything else.
984         int numHigh = (int) (total - (Math.floor(mean) * count));
985         int numLow = (int) (count - numHigh);
986 
987         min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
988 
989       }
990       min = Math.max(0, min);
991       for (int i=0; i<stats.length; i++) {
992         double n = stats[i];
993         double diff = Math.abs(mean - n);
994         totalCost += diff;
995       }
996 
997       double scaled =  scale(min, max, totalCost);
998       return scaled;
999     }
1000 
1001     private double getSum(double[] stats) {
1002       double total = 0;
1003       for(double s:stats) {
1004         total += s;
1005       }
1006       return total;
1007     }
1008 
1009     /**
1010      * Scale the value between 0 and 1.
1011      *
1012      * @param min   Min value
1013      * @param max   The Max value
1014      * @param value The value to be scaled.
1015      * @return The scaled value.
1016      */
1017     protected double scale(double min, double max, double value) {
1018       if (max <= min || value <= min) {
1019         return 0;
1020       }
1021       if ((max - min) == 0) return 0;
1022 
1023       return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
1024     }
1025   }
1026 
1027   /**
1028    * Given the starting state of the regions and a potential ending state
1029    * compute cost based upon the number of regions that have moved.
1030    */
1031   static class MoveCostFunction extends CostFunction {
1032     private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
1033     private static final String MAX_MOVES_PERCENT_KEY =
1034         "hbase.master.balancer.stochastic.maxMovePercent";
1035     private static final float DEFAULT_MOVE_COST = 100;
1036     private static final int DEFAULT_MAX_MOVES = 600;
1037     private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
1038 
1039     private final float maxMovesPercent;
1040 
1041     MoveCostFunction(Configuration conf) {
1042       super(conf);
1043 
1044       // Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
1045       // that large benefits are need to overcome the cost of a move.
1046       this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
1047       // What percent of the number of regions a single run of the balancer can move.
1048       maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
1049     }
1050 
1051     @Override
1052     double cost() {
1053       // Try and size the max number of Moves, but always be prepared to move some.
1054       int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
1055           DEFAULT_MAX_MOVES);
1056 
1057       double moveCost = cluster.numMovedRegions;
1058 
1059       // Don't let this single balance move more than the max moves.
1060       // This allows better scaling to accurately represent the actual cost of a move.
1061       if (moveCost > maxMoves) {
1062         return 1000000;   // return a number much greater than any of the other cost
1063       }
1064 
1065       return scale(0, cluster.numRegions, moveCost);
1066     }
1067   }
1068 
1069   /**
1070    * Compute the cost of a potential cluster state from skew in number of
1071    * regions on a cluster.
1072    */
1073   static class RegionCountSkewCostFunction extends CostFunction {
1074     private static final String REGION_COUNT_SKEW_COST_KEY =
1075         "hbase.master.balancer.stochastic.regionCountCost";
1076     private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
1077 
1078     private double[] stats = null;
1079 
1080     RegionCountSkewCostFunction(Configuration conf) {
1081       super(conf);
1082       // Load multiplier should be the greatest as it is the most general way to balance data.
1083       this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
1084     }
1085 
1086     @Override
1087     double cost() {
1088       if (stats == null || stats.length != cluster.numServers) {
1089         stats = new double[cluster.numServers];
1090       }
1091 
1092       for (int i =0; i < cluster.numServers; i++) {
1093         stats[i] = cluster.regionsPerServer[i].length;
1094       }
1095 
1096       return costFromArray(stats);
1097     }
1098   }
1099 
1100   /**
1101    * Compute the cost of a potential cluster state from skew in number of
1102    * primary regions on a cluster.
1103    */
1104   static class PrimaryRegionCountSkewCostFunction extends CostFunction {
1105     private static final String PRIMARY_REGION_COUNT_SKEW_COST_KEY =
1106         "hbase.master.balancer.stochastic.primaryRegionCountCost";
1107     private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;
1108 
1109     private double[] stats = null;
1110 
1111     PrimaryRegionCountSkewCostFunction(Configuration conf) {
1112       super(conf);
1113       // Load multiplier should be the greatest as primary regions serve majority of reads/writes.
1114       this.setMultiplier(conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY,
1115         DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
1116     }
1117 
1118     @Override
1119     double cost() {
1120       if (!cluster.hasRegionReplicas) {
1121         return 0;
1122       }
1123       if (stats == null || stats.length != cluster.numServers) {
1124         stats = new double[cluster.numServers];
1125       }
1126 
1127       for (int i =0; i < cluster.numServers; i++) {
1128         stats[i] = 0;
1129         for (int regionIdx : cluster.regionsPerServer[i]) {
1130           if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
1131             stats[i] ++;
1132           }
1133         }
1134       }
1135 
1136       return costFromArray(stats);
1137     }
1138   }
1139 
1140   /**
1141    * Compute the cost of a potential cluster configuration based upon how evenly
1142    * distributed tables are.
1143    */
1144   static class TableSkewCostFunction extends CostFunction {
1145 
1146     private static final String TABLE_SKEW_COST_KEY =
1147         "hbase.master.balancer.stochastic.tableSkewCost";
1148     private static final float DEFAULT_TABLE_SKEW_COST = 35;
1149 
1150     TableSkewCostFunction(Configuration conf) {
1151       super(conf);
1152       this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
1153     }
1154 
1155     @Override
1156     double cost() {
1157       double max = cluster.numRegions;
1158       double min = ((double) cluster.numRegions) / cluster.numServers;
1159       double value = 0;
1160 
1161       for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
1162         value += cluster.numMaxRegionsPerTable[i];
1163       }
1164 
1165       return scale(min, max, value);
1166     }
1167   }
1168 
1169   /**
1170    * Compute a cost of a potential cluster configuration based upon where
1171    * {@link org.apache.hadoop.hbase.regionserver.StoreFile}s are located.
1172    */
1173   static abstract class LocalityBasedCostFunction extends CostFunction {
1174 
1175     private final LocalityType type;
1176 
1177     private double bestLocality; // best case locality across cluster weighted by local data size
1178     private double locality; // current locality across cluster weighted by local data size
1179 
1180     private MasterServices services;
1181 
1182     LocalityBasedCostFunction(Configuration conf,
1183                               MasterServices srv,
1184                               LocalityType type,
1185                               String localityCostKey,
1186                               float defaultLocalityCost) {
1187       super(conf);
1188       this.type = type;
1189       this.setMultiplier(conf.getFloat(localityCostKey, defaultLocalityCost));
1190       this.services = srv;
1191       this.locality = 0.0;
1192       this.bestLocality = 0.0;
1193     }
1194 
1195     /**
1196      * Maps region to the current entity (server or rack) on which it is stored
1197      */
1198     abstract int regionIndexToEntityIndex(int region);
1199 
1200     public void setServices(MasterServices srvc) {
1201       this.services = srvc;
1202     }
1203 
1204     @Override
1205     void init(Cluster cluster) {
1206       super.init(cluster);
1207       locality = 0.0;
1208       bestLocality = 0.0;
1209 
1210       // If no master, no computation will work, so assume 0 cost
1211       if (this.services == null) {
1212         return;
1213       }
1214 
1215       for (int region = 0; region < cluster.numRegions; region++) {
1216         locality += getWeightedLocality(region, regionIndexToEntityIndex(region));
1217         bestLocality += getWeightedLocality(region, getMostLocalEntityForRegion(region));
1218       }
1219 
1220       // We normalize locality to be a score between 0 and 1.0 representing how good it
1221       // is compared to how good it could be. If bestLocality is 0, assume locality is 100
1222       // (and the cost is 0)
1223       locality = bestLocality == 0 ? 1.0 : locality / bestLocality;
1224     }
1225 
1226     @Override
1227     protected void regionMoved(int region, int oldServer, int newServer) {
1228       int oldEntity = type == LocalityType.SERVER ? oldServer : cluster.serverIndexToRackIndex[oldServer];
1229       int newEntity = type == LocalityType.SERVER ? newServer : cluster.serverIndexToRackIndex[newServer];
1230       if (this.services == null) {
1231         return;
1232       }
1233       double localityDelta = getWeightedLocality(region, newEntity) - getWeightedLocality(region, oldEntity);
1234       double normalizedDelta = bestLocality == 0 ? 0.0 : localityDelta / bestLocality;
1235       locality += normalizedDelta;
1236     }
1237 
1238     @Override
1239     double cost() {
1240       return 1 - locality;
1241     }
1242 
1243     private int getMostLocalEntityForRegion(int region) {
1244       return cluster.getOrComputeRegionsToMostLocalEntities(type)[region];
1245     }
1246 
1247     private double getWeightedLocality(int region, int entity) {
1248       return cluster.getOrComputeWeightedLocality(region, entity, type);
1249     }
1250 
1251   }
1252 
1253   static class ServerLocalityCostFunction extends LocalityBasedCostFunction {
1254 
1255     private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
1256     private static final float DEFAULT_LOCALITY_COST = 25;
1257 
1258     ServerLocalityCostFunction(Configuration conf, MasterServices srv) {
1259       super(
1260           conf,
1261           srv,
1262           LocalityType.SERVER,
1263           LOCALITY_COST_KEY,
1264           DEFAULT_LOCALITY_COST
1265       );
1266     }
1267 
1268     @Override
1269     int regionIndexToEntityIndex(int region) {
1270       return cluster.regionIndexToServerIndex[region];
1271     }
1272   }
1273 
1274   static class RackLocalityCostFunction extends LocalityBasedCostFunction {
1275 
1276     private static final String RACK_LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.rackLocalityCost";
1277     private static final float DEFAULT_RACK_LOCALITY_COST = 15;
1278 
1279     public RackLocalityCostFunction(Configuration conf, MasterServices services) {
1280       super(
1281           conf,
1282           services,
1283           LocalityType.RACK,
1284           RACK_LOCALITY_COST_KEY,
1285           DEFAULT_RACK_LOCALITY_COST
1286       );
1287     }
1288 
1289     @Override
1290     int regionIndexToEntityIndex(int region) {
1291       return cluster.getRackForRegion(region);
1292     }
1293   }
1294 
1295   /**
1296    * Base class the allows writing costs functions from rolling average of some
1297    * number from RegionLoad.
1298    */
1299   abstract static class CostFromRegionLoadFunction extends CostFunction {
1300 
1301     private ClusterStatus clusterStatus = null;
1302     private Map<String, Deque<RegionLoad>> loads = null;
1303     private double[] stats = null;
1304     CostFromRegionLoadFunction(Configuration conf) {
1305       super(conf);
1306     }
1307 
1308     void setClusterStatus(ClusterStatus status) {
1309       this.clusterStatus = status;
1310     }
1311 
1312     void setLoads(Map<String, Deque<RegionLoad>> l) {
1313       this.loads = l;
1314     }
1315 
1316     @Override
1317     double cost() {
1318       if (clusterStatus == null || loads == null) {
1319         return 0;
1320       }
1321 
1322       if (stats == null || stats.length != cluster.numServers) {
1323         stats = new double[cluster.numServers];
1324       }
1325 
1326       for (int i =0; i < stats.length; i++) {
1327         //Cost this server has from RegionLoad
1328         long cost = 0;
1329 
1330         // for every region on this server get the rl
1331         for(int regionIndex:cluster.regionsPerServer[i]) {
1332           Collection<RegionLoad> regionLoadList =  cluster.regionLoads[regionIndex];
1333 
1334           // Now if we found a region load get the type of cost that was requested.
1335           if (regionLoadList != null) {
1336             cost += getRegionLoadCost(regionLoadList);
1337           }
1338         }
1339 
1340         // Add the total cost to the stats.
1341         stats[i] = cost;
1342       }
1343 
1344       // Now return the scaled cost from data held in the stats object.
1345       return costFromArray(stats);
1346     }
1347 
1348     protected double getRegionLoadCost(Collection<RegionLoad> regionLoadList) {
1349       double cost = 0;
1350 
1351       for (RegionLoad rl : regionLoadList) {
1352         double toAdd = getCostFromRl(rl);
1353 
1354         if (cost == 0) {
1355           cost = toAdd;
1356         } else {
1357           cost = (.5 * cost) + (.5 * toAdd);
1358         }
1359       }
1360 
1361       return cost;
1362     }
1363 
1364     protected abstract double getCostFromRl(RegionLoad rl);
1365   }
1366 
1367   /**
1368    * Compute the cost of total number of read requests  The more unbalanced the higher the
1369    * computed cost will be.  This uses a rolling average of regionload.
1370    */
1371 
1372   static class ReadRequestCostFunction extends CostFromRegionLoadFunction {
1373 
1374     private static final String READ_REQUEST_COST_KEY =
1375         "hbase.master.balancer.stochastic.readRequestCost";
1376     private static final float DEFAULT_READ_REQUEST_COST = 5;
1377 
1378     ReadRequestCostFunction(Configuration conf) {
1379       super(conf);
1380       this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST));
1381     }
1382 
1383 
1384     @Override
1385     protected double getCostFromRl(RegionLoad rl) {
1386       return rl.getReadRequestsCount();
1387     }
1388   }
1389 
1390   /**
1391    * Compute the cost of total number of write requests.  The more unbalanced the higher the
1392    * computed cost will be.  This uses a rolling average of regionload.
1393    */
1394   static class WriteRequestCostFunction extends CostFromRegionLoadFunction {
1395 
1396     private static final String WRITE_REQUEST_COST_KEY =
1397         "hbase.master.balancer.stochastic.writeRequestCost";
1398     private static final float DEFAULT_WRITE_REQUEST_COST = 5;
1399 
1400     WriteRequestCostFunction(Configuration conf) {
1401       super(conf);
1402       this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
1403     }
1404 
1405     @Override
1406     protected double getCostFromRl(RegionLoad rl) {
1407       return rl.getWriteRequestsCount();
1408     }
1409   }
1410 
1411   /**
1412    * A cost function for region replicas. We give a very high cost to hosting
1413    * replicas of the same region in the same host. We do not prevent the case
1414    * though, since if numReplicas > numRegionServers, we still want to keep the
1415    * replica open.
1416    */
1417   static class RegionReplicaHostCostFunction extends CostFunction {
1418     private static final String REGION_REPLICA_HOST_COST_KEY =
1419         "hbase.master.balancer.stochastic.regionReplicaHostCostKey";
1420     private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
1421 
1422     long maxCost = 0;
1423     long[] costsPerGroup; // group is either server, host or rack
1424     int[][] primariesOfRegionsPerGroup;
1425 
1426     public RegionReplicaHostCostFunction(Configuration conf) {
1427       super(conf);
1428       this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY,
1429         DEFAULT_REGION_REPLICA_HOST_COST_KEY));
1430     }
1431 
1432     @Override
1433     void init(Cluster cluster) {
1434       super.init(cluster);
1435       // max cost is the case where every region replica is hosted together regardless of host
1436       maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
1437       costsPerGroup = new long[cluster.numHosts];
1438       primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
1439           ? cluster.primariesOfRegionsPerHost
1440           : cluster.primariesOfRegionsPerServer;
1441       for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) {
1442         costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
1443       }
1444     }
1445 
1446     long getMaxCost(Cluster cluster) {
1447       if (!cluster.hasRegionReplicas) {
1448         return 0; // short circuit
1449       }
1450       // max cost is the case where every region replica is hosted together regardless of host
1451       int[] primariesOfRegions = new int[cluster.numRegions];
1452       System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
1453           cluster.regions.length);
1454 
1455       Arrays.sort(primariesOfRegions);
1456 
1457       // compute numReplicas from the sorted array
1458       return costPerGroup(primariesOfRegions);
1459     }
1460 
1461     @Override
1462     double cost() {
1463       if (maxCost <= 0) {
1464         return 0;
1465       }
1466 
1467       long totalCost = 0;
1468       for (int i = 0 ; i < costsPerGroup.length; i++) {
1469         totalCost += costsPerGroup[i];
1470       }
1471       return scale(0, maxCost, totalCost);
1472     }
1473 
1474     /**
1475      * For each primary region, it computes the total number of replicas in the array (numReplicas)
1476      * and returns a sum of numReplicas-1 squared. For example, if the server hosts
1477      * regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it
1478      * returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1).
1479      * @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
1480      * @return a sum of numReplicas-1 squared for each primary region in the group.
1481      */
1482     protected long costPerGroup(int[] primariesOfRegions) {
1483       long cost = 0;
1484       int currentPrimary = -1;
1485       int currentPrimaryIndex = -1;
1486       // primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
1487       // sharing the same primary will have consecutive numbers in the array.
1488       for (int j = 0 ; j <= primariesOfRegions.length; j++) {
1489         int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
1490         if (primary != currentPrimary) { // we see a new primary
1491           int numReplicas = j - currentPrimaryIndex;
1492           // square the cost
1493           if (numReplicas > 1) { // means consecutive primaries, indicating co-location
1494             cost += (numReplicas - 1) * (numReplicas - 1);
1495           }
1496           currentPrimary = primary;
1497           currentPrimaryIndex = j;
1498         }
1499       }
1500 
1501       return cost;
1502     }
1503 
1504     @Override
1505     protected void regionMoved(int region, int oldServer, int newServer) {
1506       if (maxCost <= 0) {
1507         return; // no need to compute
1508       }
1509       if (cluster.multiServersPerHost) {
1510         int oldHost = cluster.serverIndexToHostIndex[oldServer];
1511         int newHost = cluster.serverIndexToHostIndex[newServer];
1512         if (newHost != oldHost) {
1513           costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
1514           costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
1515         }
1516       } else {
1517         costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
1518         costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
1519       }
1520     }
1521   }
1522 
1523   /**
1524    * A cost function for region replicas for the rack distribution. We give a relatively high
1525    * cost to hosting replicas of the same region in the same rack. We do not prevent the case
1526    * though.
1527    */
1528   static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
1529     private static final String REGION_REPLICA_RACK_COST_KEY =
1530         "hbase.master.balancer.stochastic.regionReplicaRackCostKey";
1531     private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
1532 
1533     public RegionReplicaRackCostFunction(Configuration conf) {
1534       super(conf);
1535       this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY,
1536         DEFAULT_REGION_REPLICA_RACK_COST_KEY));
1537     }
1538 
1539     @Override
1540     void init(Cluster cluster) {
1541       this.cluster = cluster;
1542       if (cluster.numRacks <= 1) {
1543         maxCost = 0;
1544         return; // disabled for 1 rack
1545       }
1546       // max cost is the case where every region replica is hosted together regardless of rack
1547       maxCost = getMaxCost(cluster);
1548       costsPerGroup = new long[cluster.numRacks];
1549       for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) {
1550         costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
1551       }
1552     }
1553 
1554     @Override
1555     protected void regionMoved(int region, int oldServer, int newServer) {
1556       if (maxCost <= 0) {
1557         return; // no need to compute
1558       }
1559       int oldRack = cluster.serverIndexToRackIndex[oldServer];
1560       int newRack = cluster.serverIndexToRackIndex[newServer];
1561       if (newRack != oldRack) {
1562         costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
1563         costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
1564       }
1565     }
1566   }
1567 
1568   /**
1569    * Compute the cost of total memstore size.  The more unbalanced the higher the
1570    * computed cost will be.  This uses a rolling average of regionload.
1571    */
1572   static class MemstoreSizeCostFunction extends CostFromRegionLoadFunction {
1573 
1574     private static final String MEMSTORE_SIZE_COST_KEY =
1575         "hbase.master.balancer.stochastic.memstoreSizeCost";
1576     private static final float DEFAULT_MEMSTORE_SIZE_COST = 5;
1577 
1578     MemstoreSizeCostFunction(Configuration conf) {
1579       super(conf);
1580       this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST));
1581     }
1582 
1583     @Override
1584     protected double getCostFromRl(RegionLoad rl) {
1585       return rl.getMemStoreSizeMB();
1586     }
1587   }
1588   /**
1589    * Compute the cost of total open storefiles size.  The more unbalanced the higher the
1590    * computed cost will be.  This uses a rolling average of regionload.
1591    */
1592   static class StoreFileCostFunction extends CostFromRegionLoadFunction {
1593 
1594     private static final String STOREFILE_SIZE_COST_KEY =
1595         "hbase.master.balancer.stochastic.storefileSizeCost";
1596     private static final float DEFAULT_STOREFILE_SIZE_COST = 5;
1597 
1598     StoreFileCostFunction(Configuration conf) {
1599       super(conf);
1600       this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST));
1601     }
1602 
1603     @Override
1604     protected double getCostFromRl(RegionLoad rl) {
1605       return rl.getStorefileSizeMB();
1606     }
1607   }
1608 
1609   /**
1610    * A helper function to compose the attribute name from tablename and costfunction name
1611    */
1612   public static String composeAttributeName(String tableName, String costFunctionName) {
1613     return tableName + TABLE_FUNCTION_SEP + costFunctionName;
1614   }
1615 }