View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.master.balancer;
21  
22  import com.google.common.collect.ArrayListMultimap;
23  import com.google.common.collect.Lists;
24  import com.google.common.net.HostAndPort;
25  import org.apache.commons.lang.StringUtils;
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.HBaseConfiguration;
30  import org.apache.hadoop.hbase.HRegionInfo;
31  import org.apache.hadoop.hbase.HTableDescriptor;
32  import org.apache.hadoop.hbase.ServerName;
33  import org.apache.hadoop.hbase.TableDescriptors;
34  import org.apache.hadoop.hbase.TableName;
35  import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
36  import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
37  import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager;
38  import org.apache.hadoop.hbase.master.AssignmentManager;
39  import org.apache.hadoop.hbase.master.HMaster;
40  import org.apache.hadoop.hbase.master.MasterServices;
41  import org.apache.hadoop.hbase.master.RegionPlan;
42  import org.apache.hadoop.hbase.testclassification.SmallTests;
43  import org.apache.hadoop.hbase.util.Bytes;
44  import org.junit.BeforeClass;
45  import org.junit.Test;
46  import org.junit.experimental.categories.Category;
47  import org.mockito.Mockito;
48  import org.mockito.invocation.InvocationOnMock;
49  import org.mockito.stubbing.Answer;
50  
51  import java.io.FileNotFoundException;
52  import java.io.IOException;
53  import java.security.SecureRandom;
54  import java.util.ArrayList;
55  import java.util.Collections;
56  import java.util.HashMap;
57  import java.util.List;
58  import java.util.Map;
59  import java.util.Set;
60  import java.util.TreeMap;
61  import java.util.TreeSet;
62  
63  import static org.junit.Assert.assertEquals;
64  import static org.junit.Assert.assertFalse;
65  import static org.junit.Assert.assertTrue;
66  
67  //TODO use stochastic based load balancer instead
68  @Category(SmallTests.class)
69  public class TestRSGroupBasedLoadBalancer {
70  
71    private static final Log LOG = LogFactory.getLog(TestRSGroupBasedLoadBalancer.class);
72    private static RSGroupBasedLoadBalancer loadBalancer;
73    private static SecureRandom rand;
74  
75    static String[]  groups = new String[] { RSGroupInfo.DEFAULT_GROUP, "dg2", "dg3",
76        "dg4" };
77    static TableName[] tables =
78        new TableName[] { TableName.valueOf("dt1"),
79            TableName.valueOf("dt2"),
80            TableName.valueOf("dt3"),
81            TableName.valueOf("dt4")};
82    static List<ServerName> servers;
83    static Map<String, RSGroupInfo> groupMap;
84    static Map<TableName, String> tableMap;
85    static List<HTableDescriptor> tableDescs;
86    int[] regionAssignment = new int[] { 2, 5, 7, 10, 4, 3, 1 };
87    static int regionId = 0;
88  
89    @BeforeClass
90    public static void beforeAllTests() throws Exception {
91      rand = new SecureRandom();
92      servers = generateServers(7);
93      groupMap = constructGroupInfo(servers, groups);
94      tableMap = new HashMap<TableName, String>();
95      tableDescs = constructTableDesc();
96      Configuration conf = HBaseConfiguration.create();
97      conf.set("hbase.regions.slop", "0");
98      conf.set("hbase.group.grouploadbalancer.class", SimpleLoadBalancer.class.getCanonicalName());
99      loadBalancer = new RSGroupBasedLoadBalancer(getMockedGroupInfoManager());
100     loadBalancer.setMasterServices(getMockedMaster());
101     loadBalancer.setConf(conf);
102     loadBalancer.initialize();
103   }
104 
105   /**
106    * Test the load balancing algorithm.
107    *
108    * Invariant is that all servers of the group should be hosting either floor(average) or
109    * ceiling(average)
110    *
111    * @throws Exception
112    */
113   @Test
114   public void testBalanceCluster() throws Exception {
115     Map<ServerName, List<HRegionInfo>> servers = mockClusterServers();
116     ArrayListMultimap<String, ServerAndLoad> list = convertToGroupBasedMap(servers);
117     LOG.info("Mock Cluster :  " + printStats(list));
118     List<RegionPlan> plans = loadBalancer.balanceCluster(servers);
119     ArrayListMultimap<String, ServerAndLoad> balancedCluster = reconcile(
120         list, plans);
121     LOG.info("Mock Balance : " + printStats(balancedCluster));
122     assertClusterAsBalanced(balancedCluster);
123   }
124 
125   /**
126    * Invariant is that all servers of a group have load between floor(avg) and
127    * ceiling(avg) number of regions.
128    */
129   private void assertClusterAsBalanced(
130       ArrayListMultimap<String, ServerAndLoad> groupLoadMap) {
131     for (String gName : groupLoadMap.keySet()) {
132       List<ServerAndLoad> groupLoad = groupLoadMap.get(gName);
133       int numServers = groupLoad.size();
134       int numRegions = 0;
135       int maxRegions = 0;
136       int minRegions = Integer.MAX_VALUE;
137       for (ServerAndLoad server : groupLoad) {
138         int nr = server.getLoad();
139         if (nr > maxRegions) {
140           maxRegions = nr;
141         }
142         if (nr < minRegions) {
143           minRegions = nr;
144         }
145         numRegions += nr;
146       }
147       if (maxRegions - minRegions < 2) {
148         // less than 2 between max and min, can't balance
149         return;
150       }
151       int min = numRegions / numServers;
152       int max = numRegions % numServers == 0 ? min : min + 1;
153 
154       for (ServerAndLoad server : groupLoad) {
155         assertTrue(server.getLoad() <= max);
156         assertTrue(server.getLoad() >= min);
157       }
158     }
159   }
160 
161   /**
162    * All regions have an assignment.
163    *
164    * @param regions
165    * @param servers
166    * @param assignments
167    * @throws java.io.IOException
168    * @throws java.io.FileNotFoundException
169    */
170   private void assertImmediateAssignment(List<HRegionInfo> regions,
171                                          List<ServerName> servers,
172                                          Map<HRegionInfo, ServerName> assignments)
173       throws IOException {
174     for (HRegionInfo region : regions) {
175       assertTrue(assignments.containsKey(region));
176       ServerName server = assignments.get(region);
177       TableName tableName = region.getTable();
178 
179       String groupName =
180           getMockedGroupInfoManager().getRSGroupOfTable(tableName);
181       assertTrue(StringUtils.isNotEmpty(groupName));
182       RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup(groupName);
183       assertTrue("Region is not correctly assigned to group servers.",
184           gInfo.containsServer(server.getHostPort()));
185     }
186   }
187 
188   /**
189    * Tests the bulk assignment used during cluster startup.
190    *
191    * Round-robin. Should yield a balanced cluster so same invariant as the
192    * load balancer holds, all servers holding either floor(avg) or
193    * ceiling(avg).
194    *
195    * @throws Exception
196    */
197   @Test
198   public void testBulkAssignment() throws Exception {
199     List<HRegionInfo> regions = randomRegions(25);
200     Map<ServerName, List<HRegionInfo>> assignments = loadBalancer
201         .roundRobinAssignment(regions, servers);
202     //test empty region/servers scenario
203     //this should not throw an NPE
204     loadBalancer.roundRobinAssignment(regions,
205         Collections.EMPTY_LIST);
206     //test regular scenario
207     assertTrue(assignments.keySet().size() == servers.size());
208     for (ServerName sn : assignments.keySet()) {
209       List<HRegionInfo> regionAssigned = assignments.get(sn);
210       for (HRegionInfo region : regionAssigned) {
211         TableName tableName = region.getTable();
212         String groupName =
213             getMockedGroupInfoManager().getRSGroupOfTable(tableName);
214         assertTrue(StringUtils.isNotEmpty(groupName));
215         RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup(
216             groupName);
217         assertTrue(
218             "Region is not correctly assigned to group servers.",
219             gInfo.containsServer(sn.getHostPort()));
220       }
221     }
222     ArrayListMultimap<String, ServerAndLoad> loadMap = convertToGroupBasedMap(assignments);
223     assertClusterAsBalanced(loadMap);
224   }
225 
226   /**
227    * Test the cluster startup bulk assignment which attempts to retain
228    * assignment info.
229    *
230    * @throws Exception
231    */
232   @Test
233   public void testRetainAssignment() throws Exception {
234     // Test simple case where all same servers are there
235     Map<ServerName, List<HRegionInfo>> currentAssignments = mockClusterServers();
236     Map<HRegionInfo, ServerName> inputForTest = new HashMap<HRegionInfo, ServerName>();
237     for (ServerName sn : currentAssignments.keySet()) {
238       for (HRegionInfo region : currentAssignments.get(sn)) {
239         inputForTest.put(region, sn);
240       }
241     }
242     //verify region->null server assignment is handled
243     inputForTest.put(randomRegions(1).get(0), null);
244     Map<ServerName, List<HRegionInfo>> newAssignment = loadBalancer
245         .retainAssignment(inputForTest, servers);
246     assertRetainedAssignment(inputForTest, servers, newAssignment);
247   }
248 
249   /**
250    * Asserts a valid retained assignment plan.
251    * <p>
252    * Must meet the following conditions:
253    * <ul>
254    * <li>Every input region has an assignment, and to an online server
255    * <li>If a region had an existing assignment to a server with the same
256    * address a a currently online server, it will be assigned to it
257    * </ul>
258    *
259    * @param existing
260    * @param assignment
261    * @throws java.io.IOException
262    * @throws java.io.FileNotFoundException
263    */
264   private void assertRetainedAssignment(
265       Map<HRegionInfo, ServerName> existing, List<ServerName> servers,
266       Map<ServerName, List<HRegionInfo>> assignment)
267       throws FileNotFoundException, IOException {
268     // Verify condition 1, every region assigned, and to online server
269     Set<ServerName> onlineServerSet = new TreeSet<ServerName>(servers);
270     Set<HRegionInfo> assignedRegions = new TreeSet<HRegionInfo>();
271     for (Map.Entry<ServerName, List<HRegionInfo>> a : assignment.entrySet()) {
272       assertTrue(
273           "Region assigned to server that was not listed as online",
274           onlineServerSet.contains(a.getKey()));
275       for (HRegionInfo r : a.getValue())
276         assignedRegions.add(r);
277     }
278     assertEquals(existing.size(), assignedRegions.size());
279 
280     // Verify condition 2, every region must be assigned to correct server.
281     Set<String> onlineHostNames = new TreeSet<String>();
282     for (ServerName s : servers) {
283       onlineHostNames.add(s.getHostname());
284     }
285 
286     for (Map.Entry<ServerName, List<HRegionInfo>> a : assignment.entrySet()) {
287       ServerName currentServer = a.getKey();
288       for (HRegionInfo r : a.getValue()) {
289         ServerName oldAssignedServer = existing.get(r);
290         TableName tableName = r.getTable();
291         String groupName =
292             getMockedGroupInfoManager().getRSGroupOfTable(tableName);
293         assertTrue(StringUtils.isNotEmpty(groupName));
294         RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup(
295             groupName);
296         assertTrue(
297             "Region is not correctly assigned to group servers.",
298             gInfo.containsServer(currentServer.getHostPort()));
299         if (oldAssignedServer != null
300             && onlineHostNames.contains(oldAssignedServer
301             .getHostname())) {
302           // this region was previously assigned somewhere, and that
303           // host is still around, then the host must have been is a
304           // different group.
305           if (!oldAssignedServer.getHostPort().equals(currentServer.getHostPort())) {
306             assertFalse(gInfo.containsServer(oldAssignedServer.getHostPort()));
307           }
308         }
309       }
310     }
311   }
312 
313   private String printStats(
314       ArrayListMultimap<String, ServerAndLoad> groupBasedLoad) {
315     StringBuffer sb = new StringBuffer();
316     sb.append("\n");
317     for (String groupName : groupBasedLoad.keySet()) {
318       sb.append("Stats for group: " + groupName);
319       sb.append("\n");
320       sb.append(groupMap.get(groupName).getServers());
321       sb.append("\n");
322       List<ServerAndLoad> groupLoad = groupBasedLoad.get(groupName);
323       int numServers = groupLoad.size();
324       int totalRegions = 0;
325       sb.append("Per Server Load: \n");
326       for (ServerAndLoad sLoad : groupLoad) {
327         sb.append("Server :" + sLoad.getServerName() + " Load : "
328             + sLoad.getLoad() + "\n");
329         totalRegions += sLoad.getLoad();
330       }
331       sb.append(" Group Statistics : \n");
332       float average = (float) totalRegions / numServers;
333       int max = (int) Math.ceil(average);
334       int min = (int) Math.floor(average);
335       sb.append("[srvr=" + numServers + " rgns=" + totalRegions + " avg="
336           + average + " max=" + max + " min=" + min + "]");
337       sb.append("\n");
338       sb.append("===============================");
339       sb.append("\n");
340     }
341     return sb.toString();
342   }
343 
344   private ArrayListMultimap<String, ServerAndLoad> convertToGroupBasedMap(
345       final Map<ServerName, List<HRegionInfo>> serversMap) throws IOException {
346     ArrayListMultimap<String, ServerAndLoad> loadMap = ArrayListMultimap
347         .create();
348     for (RSGroupInfo gInfo : getMockedGroupInfoManager().listRSGroups()) {
349       Set<HostAndPort> groupServers = gInfo.getServers();
350       for (HostAndPort hostPort : groupServers) {
351         ServerName actual = null;
352         for(ServerName entry: servers) {
353           if(entry.getHostPort().equals(hostPort)) {
354             actual = entry;
355             break;
356           }
357         }
358         List<HRegionInfo> regions = serversMap.get(actual);
359         assertTrue("No load for " + actual, regions != null);
360         loadMap.put(gInfo.getName(),
361             new ServerAndLoad(actual, regions.size()));
362       }
363     }
364     return loadMap;
365   }
366 
367   private ArrayListMultimap<String, ServerAndLoad> reconcile(
368       ArrayListMultimap<String, ServerAndLoad> previousLoad,
369       List<RegionPlan> plans) {
370     ArrayListMultimap<String, ServerAndLoad> result = ArrayListMultimap
371         .create();
372     result.putAll(previousLoad);
373     if (plans != null) {
374       for (RegionPlan plan : plans) {
375         ServerName source = plan.getSource();
376         updateLoad(result, source, -1);
377         ServerName destination = plan.getDestination();
378         updateLoad(result, destination, +1);
379       }
380     }
381     return result;
382   }
383 
384   private void updateLoad(
385       ArrayListMultimap<String, ServerAndLoad> previousLoad,
386       final ServerName sn, final int diff) {
387     for (String groupName : previousLoad.keySet()) {
388       ServerAndLoad newSAL = null;
389       ServerAndLoad oldSAL = null;
390       for (ServerAndLoad sal : previousLoad.get(groupName)) {
391         if (ServerName.isSameHostnameAndPort(sn, sal.getServerName())) {
392           oldSAL = sal;
393           newSAL = new ServerAndLoad(sn, sal.getLoad() + diff);
394           break;
395         }
396       }
397       if (newSAL != null) {
398         previousLoad.remove(groupName, oldSAL);
399         previousLoad.put(groupName, newSAL);
400         break;
401       }
402     }
403   }
404 
405   private Map<ServerName, List<HRegionInfo>> mockClusterServers() throws IOException {
406     assertTrue(servers.size() == regionAssignment.length);
407     Map<ServerName, List<HRegionInfo>> assignment = new TreeMap<ServerName, List<HRegionInfo>>();
408     for (int i = 0; i < servers.size(); i++) {
409       int numRegions = regionAssignment[i];
410       List<HRegionInfo> regions = assignedRegions(numRegions, servers.get(i));
411       assignment.put(servers.get(i), regions);
412     }
413     return assignment;
414   }
415 
416   /**
417    * Generate a list of regions evenly distributed between the tables.
418    *
419    * @param numRegions The number of regions to be generated.
420    * @return List of HRegionInfo.
421    */
422   private List<HRegionInfo> randomRegions(int numRegions) {
423     List<HRegionInfo> regions = new ArrayList<HRegionInfo>(numRegions);
424     byte[] start = new byte[16];
425     byte[] end = new byte[16];
426     rand.nextBytes(start);
427     rand.nextBytes(end);
428     int regionIdx = rand.nextInt(tables.length);
429     for (int i = 0; i < numRegions; i++) {
430       Bytes.putInt(start, 0, numRegions << 1);
431       Bytes.putInt(end, 0, (numRegions << 1) + 1);
432       int tableIndex = (i + regionIdx) % tables.length;
433       HRegionInfo hri = new HRegionInfo(
434           tables[tableIndex], start, end, false, regionId++);
435       regions.add(hri);
436     }
437     return regions;
438   }
439 
440   /**
441    * Generate assigned regions to a given server using group information.
442    *
443    * @param numRegions the num regions to generate
444    * @param sn the servername
445    * @return the list of regions
446    * @throws java.io.IOException Signals that an I/O exception has occurred.
447    */
448   private List<HRegionInfo> assignedRegions(int numRegions, ServerName sn) throws IOException {
449     List<HRegionInfo> regions = new ArrayList<HRegionInfo>(numRegions);
450     byte[] start = new byte[16];
451     byte[] end = new byte[16];
452     Bytes.putInt(start, 0, numRegions << 1);
453     Bytes.putInt(end, 0, (numRegions << 1) + 1);
454     for (int i = 0; i < numRegions; i++) {
455       TableName tableName = getTableName(sn);
456       HRegionInfo hri = new HRegionInfo(
457           tableName, start, end, false,
458           regionId++);
459       regions.add(hri);
460     }
461     return regions;
462   }
463 
464   private static List<ServerName> generateServers(int numServers) {
465     List<ServerName> servers = new ArrayList<ServerName>(numServers);
466     for (int i = 0; i < numServers; i++) {
467       String host = "server" + rand.nextInt(100000);
468       int port = rand.nextInt(60000);
469       servers.add(ServerName.valueOf(host, port, -1));
470     }
471     return servers;
472   }
473 
474   /**
475    * Construct group info, with each group having at least one server.
476    *
477    * @param servers the servers
478    * @param groups the groups
479    * @return the map
480    */
481   private static Map<String, RSGroupInfo> constructGroupInfo(
482       List<ServerName> servers, String[] groups) {
483     assertTrue(servers != null);
484     assertTrue(servers.size() >= groups.length);
485     int index = 0;
486     Map<String, RSGroupInfo> groupMap = new HashMap<String, RSGroupInfo>();
487     for (String grpName : groups) {
488       RSGroupInfo RSGroupInfo = new RSGroupInfo(grpName);
489       RSGroupInfo.addServer(servers.get(index).getHostPort());
490       groupMap.put(grpName, RSGroupInfo);
491       index++;
492     }
493     while (index < servers.size()) {
494       int grpIndex = rand.nextInt(groups.length);
495       groupMap.get(groups[grpIndex]).addServer(
496           servers.get(index).getHostPort());
497       index++;
498     }
499     return groupMap;
500   }
501 
502   /**
503    * Construct table descriptors evenly distributed between the groups.
504    *
505    * @return the list
506    */
507   private static List<HTableDescriptor> constructTableDesc() {
508     List<HTableDescriptor> tds = Lists.newArrayList();
509     int index = rand.nextInt(groups.length);
510     for (int i = 0; i < tables.length; i++) {
511       HTableDescriptor htd = new HTableDescriptor(tables[i]);
512       int grpIndex = (i + index) % groups.length ;
513       String groupName = groups[grpIndex];
514       tableMap.put(tables[i], groupName);
515       tds.add(htd);
516     }
517     return tds;
518   }
519 
520   private static MasterServices getMockedMaster() throws IOException {
521     TableDescriptors tds = Mockito.mock(TableDescriptors.class);
522     Mockito.when(tds.get(tables[0])).thenReturn(tableDescs.get(0));
523     Mockito.when(tds.get(tables[1])).thenReturn(tableDescs.get(1));
524     Mockito.when(tds.get(tables[2])).thenReturn(tableDescs.get(2));
525     Mockito.when(tds.get(tables[3])).thenReturn(tableDescs.get(3));
526     MasterServices services = Mockito.mock(HMaster.class);
527     Mockito.when(services.getTableDescriptors()).thenReturn(tds);
528     AssignmentManager am = Mockito.mock(AssignmentManager.class);
529     Mockito.when(services.getAssignmentManager()).thenReturn(am);
530     return services;
531   }
532 
533   private static RSGroupInfoManager getMockedGroupInfoManager() throws IOException {
534     RSGroupInfoManager gm = Mockito.mock(RSGroupInfoManager.class);
535     Mockito.when(gm.getRSGroup(groups[0])).thenReturn(
536         groupMap.get(groups[0]));
537     Mockito.when(gm.getRSGroup(groups[1])).thenReturn(
538         groupMap.get(groups[1]));
539     Mockito.when(gm.getRSGroup(groups[2])).thenReturn(
540         groupMap.get(groups[2]));
541     Mockito.when(gm.getRSGroup(groups[3])).thenReturn(
542         groupMap.get(groups[3]));
543     Mockito.when(gm.listRSGroups()).thenReturn(
544         Lists.newLinkedList(groupMap.values()));
545     Mockito.when(gm.isOnline()).thenReturn(true);
546     Mockito.when(gm.getRSGroupOfTable(Mockito.any(TableName.class)))
547         .thenAnswer(new Answer<String>() {
548           @Override
549           public String answer(InvocationOnMock invocation) throws Throwable {
550             return tableMap.get(invocation.getArguments()[0]);
551           }
552         });
553     return gm;
554   }
555 
556   private TableName getTableName(ServerName sn) throws IOException {
557     TableName tableName = null;
558     RSGroupInfoManager gm = getMockedGroupInfoManager();
559     RSGroupInfo groupOfServer = null;
560     for(RSGroupInfo gInfo : gm.listRSGroups()){
561       if(gInfo.containsServer(sn.getHostPort())){
562         groupOfServer = gInfo;
563         break;
564       }
565     }
566 
567     for(HTableDescriptor desc : tableDescs){
568       if(gm.getRSGroupOfTable(desc.getTableName()).endsWith(groupOfServer.getName())){
569         tableName = desc.getTableName();
570       }
571     }
572     return tableName;
573   }
574 }