1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master.balancer;
19
20 import java.io.FileNotFoundException;
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Set;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.TimeUnit;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.hbase.ClusterStatus;
36 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
37 import org.apache.hadoop.hbase.HRegionInfo;
38 import org.apache.hadoop.hbase.HTableDescriptor;
39 import org.apache.hadoop.hbase.ServerName;
40 import org.apache.hadoop.hbase.TableName;
41 import org.apache.hadoop.hbase.classification.InterfaceAudience;
42 import org.apache.hadoop.hbase.master.AssignmentManager;
43 import org.apache.hadoop.hbase.master.MasterServices;
44 import org.apache.hadoop.hbase.master.RegionStates;
45 import org.apache.hadoop.hbase.regionserver.HRegion;
46 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
47
48 import com.google.common.cache.CacheBuilder;
49 import com.google.common.cache.CacheLoader;
50 import com.google.common.cache.LoadingCache;
51 import com.google.common.collect.Lists;
52 import com.google.common.util.concurrent.Futures;
53 import com.google.common.util.concurrent.ListenableFuture;
54 import com.google.common.util.concurrent.ListeningExecutorService;
55 import com.google.common.util.concurrent.MoreExecutors;
56 import com.google.common.util.concurrent.ThreadFactoryBuilder;
57
58
59
60
61
62
63
64 @InterfaceAudience.Private
65 class RegionLocationFinder {
66 private static final Log LOG = LogFactory.getLog(RegionLocationFinder.class);
67 private static final long CACHE_TIME = 240 * 60 * 1000;
68 private static final HDFSBlocksDistribution EMPTY_BLOCK_DISTRIBUTION = new HDFSBlocksDistribution();
69 private Configuration conf;
70 private volatile ClusterStatus status;
71 private MasterServices services;
72 private final ListeningExecutorService executor;
73
74 private long lastFullRefresh = EnvironmentEdgeManager.currentTime();
75
76 private CacheLoader<HRegionInfo, HDFSBlocksDistribution> loader =
77 new CacheLoader<HRegionInfo, HDFSBlocksDistribution>() {
78
79 public ListenableFuture<HDFSBlocksDistribution> reload(final HRegionInfo hri,
80 HDFSBlocksDistribution oldValue) throws Exception {
81 return executor.submit(new Callable<HDFSBlocksDistribution>() {
82 @Override
83 public HDFSBlocksDistribution call() throws Exception {
84 return internalGetTopBlockLocation(hri);
85 }
86 });
87 }
88
89 @Override
90 public HDFSBlocksDistribution load(HRegionInfo key) throws Exception {
91 return internalGetTopBlockLocation(key);
92 }
93 };
94
95
96 private LoadingCache<HRegionInfo, HDFSBlocksDistribution> cache = null;
97
98 RegionLocationFinder() {
99 this.cache = createCache();
100 executor = MoreExecutors.listeningDecorator(
101 Executors.newScheduledThreadPool(
102 5,
103 new ThreadFactoryBuilder().
104 setDaemon(true)
105 .setNameFormat("region-location-%d")
106 .build()));
107 }
108
109
110
111
112
113
114 private LoadingCache<HRegionInfo, HDFSBlocksDistribution> createCache() {
115 return CacheBuilder.newBuilder()
116 .expireAfterWrite(CACHE_TIME, TimeUnit.MILLISECONDS)
117 .build(loader);
118 }
119
120 public Configuration getConf() {
121 return conf;
122 }
123
124 public void setConf(Configuration conf) {
125 this.conf = conf;
126 }
127
128 public void setServices(MasterServices services) {
129 this.services = services;
130 }
131
132 public void setClusterStatus(ClusterStatus status) {
133 long currentTime = EnvironmentEdgeManager.currentTime();
134 this.status = status;
135 if (currentTime > lastFullRefresh + (CACHE_TIME / 2)) {
136
137 lastFullRefresh = scheduleFullRefresh()?currentTime:lastFullRefresh;
138 }
139
140 }
141
142
143
144
145
146
147 private boolean scheduleFullRefresh() {
148
149 if (services == null) {
150 return false;
151 }
152 AssignmentManager am = services.getAssignmentManager();
153
154 if (am == null) {
155 return false;
156 }
157 RegionStates regionStates = am.getRegionStates();
158 if (regionStates == null) {
159 return false;
160 }
161
162 Set<HRegionInfo> regions = regionStates.getRegionAssignments().keySet();
163 boolean includesUserTables = false;
164 for (final HRegionInfo hri : regions) {
165 cache.refresh(hri);
166 includesUserTables = includesUserTables || !hri.isSystemTable();
167 }
168 return includesUserTables;
169 }
170
171 protected List<ServerName> getTopBlockLocations(HRegionInfo region) {
172 List<String> topHosts = getBlockDistribution(region).getTopHosts();
173 return mapHostNameToServerName(topHosts);
174 }
175
176
177
178
179
180 protected List<ServerName> getTopBlockLocations(HRegionInfo region, String currentHost) {
181 HDFSBlocksDistribution blocksDistribution = getBlockDistribution(region);
182 List<String> topHosts = new ArrayList<String>();
183 for (String host : blocksDistribution.getTopHosts()) {
184 if (host.equals(currentHost)) {
185 break;
186 }
187 topHosts.add(host);
188 }
189 return mapHostNameToServerName(topHosts);
190 }
191
192
193
194
195
196
197
198
199
200
201 protected HDFSBlocksDistribution internalGetTopBlockLocation(HRegionInfo region) {
202 try {
203 HTableDescriptor tableDescriptor = getTableDescriptor(region.getTable());
204 if (tableDescriptor != null) {
205 HDFSBlocksDistribution blocksDistribution =
206 HRegion.computeHDFSBlocksDistribution(getConf(), tableDescriptor, region);
207 return blocksDistribution;
208 }
209 } catch (IOException ioe) {
210 LOG.warn("IOException during HDFSBlocksDistribution computation. for " + "region = "
211 + region.getEncodedName(), ioe);
212 }
213
214 return EMPTY_BLOCK_DISTRIBUTION;
215 }
216
217
218
219
220
221
222
223
224 protected HTableDescriptor getTableDescriptor(TableName tableName) throws IOException {
225 HTableDescriptor tableDescriptor = null;
226 try {
227 if (this.services != null && this.services.getTableDescriptors() != null) {
228 tableDescriptor = this.services.getTableDescriptors().get(tableName);
229 }
230 } catch (FileNotFoundException fnfe) {
231 LOG.debug("FileNotFoundException during getTableDescriptors." + " Current table name = "
232 + tableName, fnfe);
233 }
234
235 return tableDescriptor;
236 }
237
238
239
240
241
242
243
244
245 protected List<ServerName> mapHostNameToServerName(List<String> hosts) {
246 if (hosts == null || status == null) {
247 if (hosts == null) {
248 LOG.warn("RegionLocationFinder top hosts is null");
249 }
250 return Lists.newArrayList();
251 }
252
253 List<ServerName> topServerNames = new ArrayList<ServerName>();
254 Collection<ServerName> regionServers = status.getServers();
255
256
257 HashMap<String, List<ServerName>> hostToServerName = new HashMap<String, List<ServerName>>();
258 for (ServerName sn : regionServers) {
259 String host = sn.getHostname();
260 if (!hostToServerName.containsKey(host)) {
261 hostToServerName.put(host, new ArrayList<ServerName>());
262 }
263 hostToServerName.get(host).add(sn);
264 }
265
266 for (String host : hosts) {
267 if (!hostToServerName.containsKey(host)) {
268 continue;
269 }
270 for (ServerName sn : hostToServerName.get(host)) {
271
272
273 if (sn != null) {
274 topServerNames.add(sn);
275 }
276 }
277 }
278 return topServerNames;
279 }
280
281 public HDFSBlocksDistribution getBlockDistribution(HRegionInfo hri) {
282 HDFSBlocksDistribution blockDistbn = null;
283 try {
284 if (cache.asMap().containsKey(hri)) {
285 blockDistbn = cache.get(hri);
286 return blockDistbn;
287 } else {
288 LOG.debug("HDFSBlocksDistribution not found in cache for region "
289 + hri.getRegionNameAsString());
290 blockDistbn = internalGetTopBlockLocation(hri);
291 cache.put(hri, blockDistbn);
292 return blockDistbn;
293 }
294 } catch (ExecutionException e) {
295 LOG.warn("Error while fetching cache entry ", e);
296 blockDistbn = internalGetTopBlockLocation(hri);
297 cache.put(hri, blockDistbn);
298 return blockDistbn;
299 }
300 }
301
302 private ListenableFuture<HDFSBlocksDistribution> asyncGetBlockDistribution(
303 HRegionInfo hri) {
304 try {
305 return loader.reload(hri, EMPTY_BLOCK_DISTRIBUTION);
306 } catch (Exception e) {
307 return Futures.immediateFuture(EMPTY_BLOCK_DISTRIBUTION);
308 }
309 }
310
311 public void refreshAndWait(Collection<HRegionInfo> hris) {
312 ArrayList<ListenableFuture<HDFSBlocksDistribution>> regionLocationFutures =
313 new ArrayList<ListenableFuture<HDFSBlocksDistribution>>(hris.size());
314 for (HRegionInfo hregionInfo : hris) {
315 regionLocationFutures.add(asyncGetBlockDistribution(hregionInfo));
316 }
317 int index = 0;
318 for (HRegionInfo hregionInfo : hris) {
319 ListenableFuture<HDFSBlocksDistribution> future = regionLocationFutures
320 .get(index);
321 try {
322 cache.put(hregionInfo, future.get());
323 } catch (InterruptedException ite) {
324 Thread.currentThread().interrupt();
325 } catch (ExecutionException ee) {
326 LOG.debug(
327 "ExecutionException during HDFSBlocksDistribution computation. for region = "
328 + hregionInfo.getEncodedName(), ee);
329 }
330 index++;
331 }
332 }
333
334
335 LoadingCache<HRegionInfo, HDFSBlocksDistribution> getCache() {
336 return cache;
337 }
338 }