View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.balancer;
19  
20  import java.io.FileNotFoundException;
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.HashMap;
25  import java.util.List;
26  import java.util.Set;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.ExecutionException;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.TimeUnit;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.ClusterStatus;
36  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
37  import org.apache.hadoop.hbase.HRegionInfo;
38  import org.apache.hadoop.hbase.HTableDescriptor;
39  import org.apache.hadoop.hbase.ServerName;
40  import org.apache.hadoop.hbase.TableName;
41  import org.apache.hadoop.hbase.classification.InterfaceAudience;
42  import org.apache.hadoop.hbase.master.AssignmentManager;
43  import org.apache.hadoop.hbase.master.MasterServices;
44  import org.apache.hadoop.hbase.master.RegionStates;
45  import org.apache.hadoop.hbase.regionserver.HRegion;
46  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
47  
48  import com.google.common.cache.CacheBuilder;
49  import com.google.common.cache.CacheLoader;
50  import com.google.common.cache.LoadingCache;
51  import com.google.common.collect.Lists;
52  import com.google.common.util.concurrent.Futures;
53  import com.google.common.util.concurrent.ListenableFuture;
54  import com.google.common.util.concurrent.ListeningExecutorService;
55  import com.google.common.util.concurrent.MoreExecutors;
56  import com.google.common.util.concurrent.ThreadFactoryBuilder;
57  
58  /**
59   * This will find where data for a region is located in HDFS. It ranks
60   * {@link ServerName}'s by the size of the store files they are holding for a
61   * given region.
62   *
63   */
64  @InterfaceAudience.Private
65  class RegionLocationFinder {
66    private static final Log LOG = LogFactory.getLog(RegionLocationFinder.class);
67    private static final long CACHE_TIME = 240 * 60 * 1000;
68    private static final HDFSBlocksDistribution EMPTY_BLOCK_DISTRIBUTION = new HDFSBlocksDistribution();
69    private Configuration conf;
70    private volatile ClusterStatus status;
71    private MasterServices services;
72    private final ListeningExecutorService executor;
73    // Do not scheduleFullRefresh at master startup
74    private long lastFullRefresh = EnvironmentEdgeManager.currentTime();
75  
76    private CacheLoader<HRegionInfo, HDFSBlocksDistribution> loader =
77        new CacheLoader<HRegionInfo, HDFSBlocksDistribution>() {
78  
79          public ListenableFuture<HDFSBlocksDistribution> reload(final HRegionInfo hri,
80                 HDFSBlocksDistribution oldValue) throws Exception {
81            return executor.submit(new Callable<HDFSBlocksDistribution>() {
82              @Override
83              public HDFSBlocksDistribution call() throws Exception {
84                return internalGetTopBlockLocation(hri);
85              }
86            });
87          }
88  
89          @Override
90          public HDFSBlocksDistribution load(HRegionInfo key) throws Exception {
91            return internalGetTopBlockLocation(key);
92          }
93        };
94  
95    // The cache for where regions are located.
96    private LoadingCache<HRegionInfo, HDFSBlocksDistribution> cache = null;
97  
98    RegionLocationFinder() {
99      this.cache = createCache();
100     executor = MoreExecutors.listeningDecorator(
101         Executors.newScheduledThreadPool(
102             5,
103             new ThreadFactoryBuilder().
104                 setDaemon(true)
105                 .setNameFormat("region-location-%d")
106                 .build()));
107   }
108 
109   /**
110    * Create a cache for region to list of servers
111    * @param time time to cache the locations
112    * @return A new Cache.
113    */
114   private LoadingCache<HRegionInfo, HDFSBlocksDistribution> createCache() {
115     return CacheBuilder.newBuilder()
116         .expireAfterWrite(CACHE_TIME, TimeUnit.MILLISECONDS)
117         .build(loader);
118   }
119 
120   public Configuration getConf() {
121     return conf;
122   }
123 
124   public void setConf(Configuration conf) {
125     this.conf = conf;
126   }
127 
128   public void setServices(MasterServices services) {
129     this.services = services;
130   }
131 
132   public void setClusterStatus(ClusterStatus status) {
133     long currentTime = EnvironmentEdgeManager.currentTime();
134     this.status = status;
135     if (currentTime > lastFullRefresh + (CACHE_TIME / 2)) {
136       // Only count the refresh if it includes user tables ( eg more than meta and namespace ).
137       lastFullRefresh = scheduleFullRefresh()?currentTime:lastFullRefresh;
138     }
139 
140   }
141 
142   /**
143    * Refresh all the region locations.
144    *
145    * @return true if user created regions got refreshed.
146    */
147   private boolean scheduleFullRefresh() {
148     // Protect from anything being null while starting up.
149     if (services == null) {
150       return false;
151     }
152     AssignmentManager am = services.getAssignmentManager();
153 
154     if (am == null) {
155       return false;
156     }
157     RegionStates regionStates = am.getRegionStates();
158     if (regionStates == null) {
159       return false;
160     }
161 
162     Set<HRegionInfo> regions = regionStates.getRegionAssignments().keySet();
163     boolean includesUserTables = false;
164     for (final HRegionInfo hri : regions) {
165       cache.refresh(hri);
166       includesUserTables = includesUserTables || !hri.isSystemTable();
167     }
168     return includesUserTables;
169   }
170 
171   protected List<ServerName> getTopBlockLocations(HRegionInfo region) {
172     List<String> topHosts = getBlockDistribution(region).getTopHosts();
173     return mapHostNameToServerName(topHosts);
174   }
175 
176   /**
177    * Returns an ordered list of hosts which have better locality for this region
178    * than the current host.
179    */
180   protected List<ServerName> getTopBlockLocations(HRegionInfo region, String currentHost) {
181     HDFSBlocksDistribution blocksDistribution = getBlockDistribution(region);
182     List<String> topHosts = new ArrayList<String>();
183     for (String host : blocksDistribution.getTopHosts()) {
184       if (host.equals(currentHost)) {
185         break;
186       }
187       topHosts.add(host);
188     }
189     return mapHostNameToServerName(topHosts);
190   }
191 
192   /**
193    * Returns an ordered list of hosts that are hosting the blocks for this
194    * region. The weight of each host is the sum of the block lengths of all
195    * files on that host, so the first host in the list is the server which holds
196    * the most bytes of the given region's HFiles.
197    *
198    * @param region region
199    * @return ordered list of hosts holding blocks of the specified region
200    */
201   protected HDFSBlocksDistribution internalGetTopBlockLocation(HRegionInfo region) {
202     try {
203       HTableDescriptor tableDescriptor = getTableDescriptor(region.getTable());
204       if (tableDescriptor != null) {
205         HDFSBlocksDistribution blocksDistribution =
206             HRegion.computeHDFSBlocksDistribution(getConf(), tableDescriptor, region);
207         return blocksDistribution;
208       }
209     } catch (IOException ioe) {
210       LOG.warn("IOException during HDFSBlocksDistribution computation. for " + "region = "
211           + region.getEncodedName(), ioe);
212     }
213 
214     return EMPTY_BLOCK_DISTRIBUTION;
215   }
216 
217   /**
218    * return HTableDescriptor for a given tableName
219    *
220    * @param tableName the table name
221    * @return HTableDescriptor
222    * @throws IOException
223    */
224   protected HTableDescriptor getTableDescriptor(TableName tableName) throws IOException {
225     HTableDescriptor tableDescriptor = null;
226     try {
227       if (this.services != null && this.services.getTableDescriptors() != null) {
228         tableDescriptor = this.services.getTableDescriptors().get(tableName);
229       }
230     } catch (FileNotFoundException fnfe) {
231       LOG.debug("FileNotFoundException during getTableDescriptors." + " Current table name = "
232           + tableName, fnfe);
233     }
234 
235     return tableDescriptor;
236   }
237 
238   /**
239    * Map hostname to ServerName, The output ServerName list will have the same
240    * order as input hosts.
241    *
242    * @param hosts the list of hosts
243    * @return ServerName list
244    */
245   protected List<ServerName> mapHostNameToServerName(List<String> hosts) {
246     if (hosts == null || status == null) {
247       if (hosts == null) {
248         LOG.warn("RegionLocationFinder top hosts is null");
249       }
250       return Lists.newArrayList();
251     }
252 
253     List<ServerName> topServerNames = new ArrayList<ServerName>();
254     Collection<ServerName> regionServers = status.getServers();
255 
256     // create a mapping from hostname to ServerName for fast lookup
257     HashMap<String, List<ServerName>> hostToServerName = new HashMap<String, List<ServerName>>();
258     for (ServerName sn : regionServers) {
259       String host = sn.getHostname();
260       if (!hostToServerName.containsKey(host)) {
261         hostToServerName.put(host, new ArrayList<ServerName>());
262       }
263       hostToServerName.get(host).add(sn);
264     }
265 
266     for (String host : hosts) {
267       if (!hostToServerName.containsKey(host)) {
268         continue;
269       }
270       for (ServerName sn : hostToServerName.get(host)) {
271         // it is possible that HDFS is up ( thus host is valid ),
272         // but RS is down ( thus sn is null )
273         if (sn != null) {
274           topServerNames.add(sn);
275         }
276       }
277     }
278     return topServerNames;
279   }
280 
281   public HDFSBlocksDistribution getBlockDistribution(HRegionInfo hri) {
282     HDFSBlocksDistribution blockDistbn = null;
283     try {
284       if (cache.asMap().containsKey(hri)) {
285         blockDistbn = cache.get(hri);
286         return blockDistbn;
287       } else {
288         LOG.debug("HDFSBlocksDistribution not found in cache for region "
289             + hri.getRegionNameAsString());
290         blockDistbn = internalGetTopBlockLocation(hri);
291         cache.put(hri, blockDistbn);
292         return blockDistbn;
293       }
294     } catch (ExecutionException e) {
295       LOG.warn("Error while fetching cache entry ", e);
296       blockDistbn = internalGetTopBlockLocation(hri);
297       cache.put(hri, blockDistbn);
298       return blockDistbn;
299     }
300   }
301 
302   private ListenableFuture<HDFSBlocksDistribution> asyncGetBlockDistribution(
303       HRegionInfo hri) {
304     try {
305       return loader.reload(hri, EMPTY_BLOCK_DISTRIBUTION);
306     } catch (Exception e) {
307       return Futures.immediateFuture(EMPTY_BLOCK_DISTRIBUTION);
308     }
309   }
310 
311   public void refreshAndWait(Collection<HRegionInfo> hris) {
312     ArrayList<ListenableFuture<HDFSBlocksDistribution>> regionLocationFutures =
313         new ArrayList<ListenableFuture<HDFSBlocksDistribution>>(hris.size());
314     for (HRegionInfo hregionInfo : hris) {
315       regionLocationFutures.add(asyncGetBlockDistribution(hregionInfo));
316     }
317     int index = 0;
318     for (HRegionInfo hregionInfo : hris) {
319       ListenableFuture<HDFSBlocksDistribution> future = regionLocationFutures
320           .get(index);
321       try {
322         cache.put(hregionInfo, future.get());
323       } catch (InterruptedException ite) {
324         Thread.currentThread().interrupt();
325       } catch (ExecutionException ee) {
326         LOG.debug(
327             "ExecutionException during HDFSBlocksDistribution computation. for region = "
328                 + hregionInfo.getEncodedName(), ee);
329       }
330       index++;
331     }
332   }
333 
334   // For test
335   LoadingCache<HRegionInfo, HDFSBlocksDistribution> getCache() {
336     return cache;
337   }
338 }