1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.rsgroup;
21
22 import com.google.common.collect.Lists;
23 import com.google.common.collect.Maps;
24 import com.google.common.collect.Sets;
25 import com.google.common.net.HostAndPort;
26
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.Collections;
30 import java.util.HashMap;
31 import java.util.HashSet;
32 import java.util.Iterator;
33 import java.util.LinkedList;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Set;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.ConcurrentMap;
39
40 import org.apache.commons.lang.StringUtils;
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43 import org.apache.hadoop.hbase.HRegionInfo;
44 import org.apache.hadoop.hbase.HTableDescriptor;
45 import org.apache.hadoop.hbase.NamespaceDescriptor;
46 import org.apache.hadoop.hbase.ServerName;
47 import org.apache.hadoop.hbase.TableName;
48 import org.apache.hadoop.hbase.classification.InterfaceAudience;
49 import org.apache.hadoop.hbase.constraint.ConstraintException;
50 import org.apache.hadoop.hbase.master.AssignmentManager;
51 import org.apache.hadoop.hbase.master.LoadBalancer;
52 import org.apache.hadoop.hbase.master.MasterServices;
53 import org.apache.hadoop.hbase.master.RegionPlan;
54 import org.apache.hadoop.hbase.master.RegionState;
55 import org.apache.hadoop.hbase.master.ServerManager;
56 import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
57
58
59
60
61 @InterfaceAudience.Private
62 public class RSGroupAdminServer extends RSGroupAdmin {
63 private static final Log LOG = LogFactory.getLog(RSGroupAdminServer.class);
64
65 private MasterServices master;
66
67
68 private ConcurrentMap<HostAndPort,String> serversInTransition =
69 new ConcurrentHashMap<HostAndPort, String>();
70 private RSGroupInfoManager RSGroupInfoManager;
71
72 public RSGroupAdminServer(MasterServices master,
73 RSGroupInfoManager RSGroupInfoManager) throws IOException {
74 this.master = master;
75 this.RSGroupInfoManager = RSGroupInfoManager;
76 }
77
78 @Override
79 public RSGroupInfo getRSGroupInfo(String groupName) throws IOException {
80 return getRSGroupInfoManager().getRSGroup(groupName);
81 }
82
83
84 @Override
85 public RSGroupInfo getRSGroupInfoOfTable(TableName tableName) throws IOException {
86 String groupName = getRSGroupInfoManager().getRSGroupOfTable(tableName);
87 if (groupName == null) {
88 return null;
89 }
90 return getRSGroupInfoManager().getRSGroup(groupName);
91 }
92
93 @Override
94 public void moveServers(Set<HostAndPort> servers, String targetGroupName)
95 throws IOException {
96 if (servers == null) {
97 throw new ConstraintException(
98 "The list of servers cannot be null.");
99 }
100 if (StringUtils.isEmpty(targetGroupName)) {
101 throw new ConstraintException("The target group cannot be null.");
102 }
103 if (servers.size() < 1) {
104 return;
105 }
106
107 RSGroupInfo targetGrp = getRSGroupInfo(targetGroupName);
108 if (targetGrp == null) {
109 throw new ConstraintException("Group does not exist: "+targetGroupName);
110 }
111
112 RSGroupInfoManager manager = getRSGroupInfoManager();
113 synchronized (manager) {
114 if (master.getMasterCoprocessorHost() != null) {
115 master.getMasterCoprocessorHost().preMoveServers(servers, targetGroupName);
116 }
117 HostAndPort firstServer = servers.iterator().next();
118
119
120 RSGroupInfo srcGrp = manager.getRSGroupOfServer(firstServer);
121
122
123
124 if (srcGrp == null) {
125 throw new ConstraintException(
126 "Server "+firstServer+" does not have a group.");
127 }
128 if (RSGroupInfo.DEFAULT_GROUP.equals(srcGrp.getName())) {
129 Set<HostAndPort> onlineServers = new HashSet<HostAndPort>();
130 for(ServerName server: master.getServerManager().getOnlineServers().keySet()) {
131 onlineServers.add(server.getHostPort());
132 }
133 for(HostAndPort el: servers) {
134 if(!onlineServers.contains(el)) {
135 throw new ConstraintException(
136 "Server "+el+" is not an online server in default group.");
137 }
138 }
139 }
140
141 if(srcGrp.getServers().size() <= servers.size() &&
142 srcGrp.getTables().size() > 0) {
143 throw new ConstraintException("Cannot leave a group "+srcGrp.getName()+
144 " that contains tables " +"without servers.");
145 }
146
147 String sourceGroupName = getRSGroupInfoManager()
148 .getRSGroupOfServer(srcGrp.getServers().iterator().next()).getName();
149 if(getRSGroupInfo(targetGroupName) == null) {
150 throw new ConstraintException("Target group does not exist: "+targetGroupName);
151 }
152
153 for(HostAndPort server: servers) {
154 if (serversInTransition.containsKey(server)) {
155 throw new ConstraintException(
156 "Server list contains a server that is already being moved: "+server);
157 }
158 String tmpGroup = getRSGroupInfoManager().getRSGroupOfServer(server).getName();
159 if (sourceGroupName != null && !tmpGroup.equals(sourceGroupName)) {
160 throw new ConstraintException(
161 "Move server request should only come from one source group. "+
162 "Expecting only "+sourceGroupName+" but contains "+tmpGroup);
163 }
164 }
165
166 if(sourceGroupName.equals(targetGroupName)) {
167 throw new ConstraintException(
168 "Target group is the same as source group: "+targetGroupName);
169 }
170
171 try {
172
173 for (HostAndPort server : servers) {
174 serversInTransition.put(server, targetGroupName);
175 }
176
177 getRSGroupInfoManager().moveServers(servers, sourceGroupName, targetGroupName);
178 boolean found;
179 List<HostAndPort> tmpServers = Lists.newArrayList(servers);
180 do {
181 found = false;
182 for (Iterator<HostAndPort> iter = tmpServers.iterator();
183 iter.hasNext(); ) {
184 HostAndPort rs = iter.next();
185
186 List<HRegionInfo> regions = new LinkedList<HRegionInfo>();
187 for (Map.Entry<HRegionInfo, ServerName> el :
188 master.getAssignmentManager().getRegionStates().getRegionAssignments().entrySet()) {
189 if (el.getValue().getHostPort().equals(rs)) {
190 regions.add(el.getKey());
191 }
192 }
193 for (RegionState state :
194 master.getAssignmentManager().getRegionStates().getRegionsInTransition().values()) {
195 if (state.getServerName().getHostPort().equals(rs)) {
196 regions.add(state.getRegion());
197 }
198 }
199
200
201 LOG.info("Unassigning " + regions.size() +
202 " regions from server " + rs + " for move to " + targetGroupName);
203 if (regions.size() > 0) {
204
205 for (HRegionInfo region : regions) {
206
207
208 if (!targetGrp.containsTable(region.getTable())) {
209 master.getAssignmentManager().unassign(region);
210 found = true;
211 }
212 }
213 }
214 if (!found) {
215 iter.remove();
216 }
217 }
218 try {
219 Thread.sleep(1000);
220 } catch (InterruptedException e) {
221 LOG.warn("Sleep interrupted", e);
222 Thread.currentThread().interrupt();
223 }
224 } while (found);
225 } finally {
226
227 for (HostAndPort server : servers) {
228 serversInTransition.remove(server);
229 }
230 }
231 if (master.getMasterCoprocessorHost() != null) {
232 master.getMasterCoprocessorHost().postMoveServers(servers, targetGroupName);
233 }
234 LOG.info("Move server done: "+sourceGroupName+"->"+targetGroupName);
235 }
236 }
237
238 @Override
239 public void moveTables(Set<TableName> tables, String targetGroup) throws IOException {
240 if (tables == null) {
241 throw new ConstraintException(
242 "The list of servers cannot be null.");
243 }
244 if(tables.size() < 1) {
245 LOG.debug("moveTables() passed an empty set. Ignoring.");
246 return;
247 }
248 RSGroupInfoManager manager = getRSGroupInfoManager();
249 synchronized (manager) {
250 if (master.getMasterCoprocessorHost() != null) {
251 master.getMasterCoprocessorHost().preMoveTables(tables, targetGroup);
252 }
253
254 if(targetGroup != null) {
255 RSGroupInfo destGroup = manager.getRSGroup(targetGroup);
256 if(destGroup == null) {
257 throw new ConstraintException("Target group does not exist: "+targetGroup);
258 }
259 if(destGroup.getServers().size() < 1) {
260 throw new ConstraintException("Target group must have at least one server.");
261 }
262 }
263
264 for(TableName table : tables) {
265 String srcGroup = manager.getRSGroupOfTable(table);
266 if(srcGroup != null && srcGroup.equals(targetGroup)) {
267 throw new ConstraintException(
268 "Source group is the same as target group for table "+table+" :"+srcGroup);
269 }
270 }
271 manager.moveTables(tables, targetGroup);
272 if (master.getMasterCoprocessorHost() != null) {
273 master.getMasterCoprocessorHost().postMoveTables(tables, targetGroup);
274 }
275 }
276 for(TableName table: tables) {
277 TableLock lock = master.getTableLockManager().writeLock(table, "Group: table move");
278 try {
279 lock.acquire();
280 for (HRegionInfo region :
281 master.getAssignmentManager().getRegionStates().getRegionsOfTable(table)) {
282 master.getAssignmentManager().unassign(region);
283 }
284 } finally {
285 lock.release();
286 }
287 }
288 }
289
290 @Override
291 public void addRSGroup(String name) throws IOException {
292 if (master.getMasterCoprocessorHost() != null) {
293 master.getMasterCoprocessorHost().preAddRSGroup(name);
294 }
295 getRSGroupInfoManager().addRSGroup(new RSGroupInfo(name));
296 if (master.getMasterCoprocessorHost() != null) {
297 master.getMasterCoprocessorHost().postAddRSGroup(name);
298 }
299 }
300
301 @Override
302 public void removeRSGroup(String name) throws IOException {
303 RSGroupInfoManager manager = getRSGroupInfoManager();
304 synchronized (manager) {
305 if (master.getMasterCoprocessorHost() != null) {
306 master.getMasterCoprocessorHost().preRemoveRSGroup(name);
307 }
308 RSGroupInfo RSGroupInfo = getRSGroupInfoManager().getRSGroup(name);
309 if(RSGroupInfo == null) {
310 throw new ConstraintException("Group "+name+" does not exist");
311 }
312 int tableCount = RSGroupInfo.getTables().size();
313 if (tableCount > 0) {
314 throw new ConstraintException("Group "+name+" must have no associated tables: "+tableCount);
315 }
316 int serverCount = RSGroupInfo.getServers().size();
317 if(serverCount > 0) {
318 throw new ConstraintException(
319 "Group "+name+" must have no associated servers: "+serverCount);
320 }
321 for(NamespaceDescriptor ns: master.listNamespaceDescriptors()) {
322 String nsGroup = ns.getConfigurationValue(RSGroupInfo.NAMESPACEDESC_PROP_GROUP);
323 if(nsGroup != null && nsGroup.equals(name)) {
324 throw new ConstraintException("Group "+name+" is referenced by namespace: "+ns.getName());
325 }
326 }
327 manager.removeRSGroup(name);
328 if (master.getMasterCoprocessorHost() != null) {
329 master.getMasterCoprocessorHost().postRemoveRSGroup(name);
330 }
331 }
332 }
333
334 @Override
335 public boolean balanceRSGroup(String groupName) throws IOException {
336 ServerManager serverManager = master.getServerManager();
337 AssignmentManager assignmentManager = master.getAssignmentManager();
338 LoadBalancer balancer = master.getLoadBalancer();
339
340 boolean balancerRan;
341 synchronized (balancer) {
342 if (master.getMasterCoprocessorHost() != null) {
343 master.getMasterCoprocessorHost().preBalanceRSGroup(groupName);
344 }
345 if (getRSGroupInfo(groupName) == null) {
346 throw new ConstraintException("Group does not exist: "+groupName);
347 }
348
349 Map<String, RegionState> groupRIT = rsGroupGetRegionsInTransition(groupName);
350 if (groupRIT.size() > 0) {
351 LOG.debug("Not running balancer because " +
352 groupRIT.size() +
353 " region(s) in transition: " +
354 StringUtils.abbreviate(
355 master.getAssignmentManager().getRegionStates().getRegionsInTransition().toString(),
356 256));
357 return false;
358 }
359 if (serverManager.areDeadServersInProgress()) {
360 LOG.debug("Not running balancer because processing dead regionserver(s): " +
361 serverManager.getDeadServers());
362 return false;
363 }
364
365
366 List<RegionPlan> plans = new ArrayList<RegionPlan>();
367 for(Map.Entry<TableName, Map<ServerName, List<HRegionInfo>>> tableMap:
368 getRSGroupAssignmentsByTable(groupName).entrySet()) {
369 LOG.info("Creating partial plan for table "+tableMap.getKey()+": "+tableMap.getValue());
370 List<RegionPlan> partialPlans = balancer.balanceCluster(tableMap.getValue());
371 LOG.info("Partial plan for table "+tableMap.getKey()+": "+partialPlans);
372 if (partialPlans != null) {
373 plans.addAll(partialPlans);
374 }
375 }
376 long startTime = System.currentTimeMillis();
377 balancerRan = plans != null;
378 if (plans != null && !plans.isEmpty()) {
379 LOG.info("Group balance "+groupName+" starting with plan count: "+plans.size());
380 for (RegionPlan plan: plans) {
381 LOG.info("balance " + plan);
382 assignmentManager.balance(plan);
383 }
384 LOG.info("Group balance "+groupName+" completed after "+
385 (System.currentTimeMillis()-startTime)+" seconds");
386 }
387 if (master.getMasterCoprocessorHost() != null) {
388 master.getMasterCoprocessorHost().postBalanceRSGroup(groupName, balancerRan);
389 }
390 }
391 return balancerRan;
392 }
393
394 @Override
395 public List<RSGroupInfo> listRSGroups() throws IOException {
396 return getRSGroupInfoManager().listRSGroups();
397 }
398
399 @Override
400 public RSGroupInfo getRSGroupOfServer(HostAndPort hostPort) throws IOException {
401 return getRSGroupInfoManager().getRSGroupOfServer(hostPort);
402 }
403
404 @InterfaceAudience.Private
405 public RSGroupInfoManager getRSGroupInfoManager() throws IOException {
406 return RSGroupInfoManager;
407 }
408
409 private Map<String, RegionState> rsGroupGetRegionsInTransition(String groupName)
410 throws IOException {
411 Map<String, RegionState> rit = Maps.newTreeMap();
412 AssignmentManager am = master.getAssignmentManager();
413 RSGroupInfo RSGroupInfo = getRSGroupInfo(groupName);
414 for(TableName tableName : RSGroupInfo.getTables()) {
415 for(HRegionInfo regionInfo: am.getRegionStates().getRegionsOfTable(tableName)) {
416 RegionState state =
417 master.getAssignmentManager().getRegionStates().getRegionTransitionState(regionInfo);
418 if(state != null) {
419 rit.put(regionInfo.getEncodedName(), state);
420 }
421 }
422 }
423 return rit;
424 }
425
426 private Map<TableName, Map<ServerName, List<HRegionInfo>>>
427 getRSGroupAssignmentsByTable(String groupName) throws IOException {
428 Map<TableName, Map<ServerName, List<HRegionInfo>>> result = Maps.newHashMap();
429 RSGroupInfo RSGroupInfo = getRSGroupInfo(groupName);
430 Map<TableName, Map<ServerName, List<HRegionInfo>>> assignments = Maps.newHashMap();
431 for(Map.Entry<HRegionInfo, ServerName> entry:
432 master.getAssignmentManager().getRegionStates().getRegionAssignments().entrySet()) {
433 TableName currTable = entry.getKey().getTable();
434 ServerName currServer = entry.getValue();
435 HRegionInfo currRegion = entry.getKey();
436 if(RSGroupInfo.getTables().contains(currTable)) {
437 if(!assignments.containsKey(entry.getKey().getTable())) {
438 assignments.put(currTable, new HashMap<ServerName, List<HRegionInfo>>());
439 }
440 if(!assignments.get(currTable).containsKey(currServer)) {
441 assignments.get(currTable).put(currServer, new ArrayList<HRegionInfo>());
442 }
443 assignments.get(currTable).get(currServer).add(currRegion);
444 }
445 }
446
447 Map<ServerName, List<HRegionInfo>> serverMap = Maps.newHashMap();
448 for(ServerName serverName: master.getServerManager().getOnlineServers().keySet()) {
449 if(RSGroupInfo.getServers().contains(serverName.getHostPort())) {
450 serverMap.put(serverName, Collections.EMPTY_LIST);
451 }
452 }
453
454
455 for(TableName tableName : RSGroupInfo.getTables()) {
456 if(assignments.containsKey(tableName)) {
457 result.put(tableName, new HashMap<ServerName, List<HRegionInfo>>());
458 result.get(tableName).putAll(serverMap);
459 result.get(tableName).putAll(assignments.get(tableName));
460 LOG.debug("Adding assignments for "+tableName+": "+assignments.get(tableName));
461 }
462 }
463
464 return result;
465 }
466
467 public void prepareRSGroupForTable(HTableDescriptor desc) throws IOException {
468 String groupName =
469 master.getNamespaceDescriptor(desc.getTableName().getNamespaceAsString())
470 .getConfigurationValue(RSGroupInfo.NAMESPACEDESC_PROP_GROUP);
471 if (groupName == null) {
472 groupName = RSGroupInfo.DEFAULT_GROUP;
473 }
474 RSGroupInfo RSGroupInfo = getRSGroupInfo(groupName);
475 if (RSGroupInfo == null) {
476 throw new ConstraintException("RSGroup " + groupName + " does not exist.");
477 }
478 if (!RSGroupInfo.containsTable(desc.getTableName())) {
479 LOG.debug("Pre-moving table " + desc.getTableName() + " to rsgroup " + groupName);
480 moveTables(Sets.newHashSet(desc.getTableName()), groupName);
481 }
482 }
483
484 public void cleanupRSGroupForTable(TableName tableName) throws IOException {
485 try {
486 RSGroupInfo group = getRSGroupInfoOfTable(tableName);
487 if (group != null) {
488 LOG.debug("Removing deleted table from table rsgroup " + group.getName());
489 moveTables(Sets.newHashSet(tableName), null);
490 }
491 } catch (ConstraintException ex) {
492 LOG.debug("Failed to perform rsgroup information cleanup for table: " + tableName, ex);
493 } catch (IOException ex) {
494 LOG.debug("Failed to perform rsgroup information cleanup for table: " + tableName, ex);
495 }
496 }
497
498 @Override
499 public void close() throws IOException {
500 }
501 }