View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.client;
20  
21  import java.util.Map;
22  import java.util.Map.Entry;
23  import java.util.Set;
24  import java.util.concurrent.ConcurrentMap;
25  import java.util.concurrent.ConcurrentNavigableMap;
26  import java.util.concurrent.CopyOnWriteArraySet;
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.HRegionInfo;
32  import org.apache.hadoop.hbase.HRegionLocation;
33  import org.apache.hadoop.hbase.KeyValue;
34  import org.apache.hadoop.hbase.KeyValue.KVComparator;
35  import org.apache.hadoop.hbase.RegionLocations;
36  import org.apache.hadoop.hbase.ServerName;
37  import org.apache.hadoop.hbase.TableName;
38  import org.apache.hadoop.hbase.types.CopyOnWriteArrayMap;
39  import org.apache.hadoop.hbase.util.Bytes;
40  
41  /**
42   * A cache implementation for region locations from meta.
43   */
44  @InterfaceAudience.Private
45  public class MetaCache {
46  
47    private static final Log LOG = LogFactory.getLog(MetaCache.class);
48  
49    /**
50     * Map of table to table {@link HRegionLocation}s.
51     */
52    private final ConcurrentMap<TableName, ConcurrentNavigableMap<byte[], RegionLocations>>
53    cachedRegionLocations =
54    new CopyOnWriteArrayMap<>();
55  
56    // The presence of a server in the map implies it's likely that there is an
57    // entry in cachedRegionLocations that map to this server; but the absence
58    // of a server in this map guarentees that there is no entry in cache that
59    // maps to the absent server.
60    // The access to this attribute must be protected by a lock on cachedRegionLocations
61    private final Set<ServerName> cachedServers = new CopyOnWriteArraySet<>();
62  
63    /**
64     * Search the cache for a location that fits our table and row key.
65     * Return null if no suitable region is located.
66     *
67     * @return Null or region location found in cache.
68     */
69    public RegionLocations getCachedLocation(final TableName tableName, final byte [] row) {
70      ConcurrentNavigableMap<byte[], RegionLocations> tableLocations =
71        getTableLocations(tableName);
72  
73      Entry<byte[], RegionLocations> e = tableLocations.floorEntry(row);
74      if (e == null) {
75        return null;
76      }
77      RegionLocations possibleRegion = e.getValue();
78  
79      // make sure that the end key is greater than the row we're looking
80      // for, otherwise the row actually belongs in the next region, not
81      // this one. the exception case is when the endkey is
82      // HConstants.EMPTY_END_ROW, signifying that the region we're
83      // checking is actually the last region in the table.
84      byte[] endKey = possibleRegion.getRegionLocation().getRegionInfo().getEndKey();
85      if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) ||
86          getRowComparator(tableName).compareRows(
87              endKey, 0, endKey.length, row, 0, row.length) > 0) {
88        return possibleRegion;
89      }
90  
91      // Passed all the way through, so we got nothing - complete cache miss
92      return null;
93    }
94  
95    private KVComparator getRowComparator(TableName tableName) {
96      return TableName.META_TABLE_NAME.equals(tableName) ? KeyValue.META_COMPARATOR
97          : KeyValue.COMPARATOR;
98    }
99    /**
100    * Put a newly discovered HRegionLocation into the cache.
101    * @param tableName The table name.
102    * @param source the source of the new location
103    * @param location the new location
104    */
105   public void cacheLocation(final TableName tableName, final ServerName source,
106       final HRegionLocation location) {
107     assert source != null;
108     byte [] startKey = location.getRegionInfo().getStartKey();
109     ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
110     RegionLocations locations = new RegionLocations(new HRegionLocation[] {location}) ;
111     RegionLocations oldLocations = tableLocations.putIfAbsent(startKey, locations);
112     boolean isNewCacheEntry = (oldLocations == null);
113     if (isNewCacheEntry) {
114       if (LOG.isTraceEnabled()) {
115         LOG.trace("Cached location: " + location);
116       }
117       addToCachedServers(locations);
118       return;
119     }
120 
121     // If the server in cache sends us a redirect, assume it's always valid.
122     HRegionLocation oldLocation = oldLocations.getRegionLocation(
123       location.getRegionInfo().getReplicaId());
124     boolean force = oldLocation != null && oldLocation.getServerName() != null
125         && oldLocation.getServerName().equals(source);
126 
127     // For redirect if the number is equal to previous
128     // record, the most common case is that first the region was closed with seqNum, and then
129     // opened with the same seqNum; hence we will ignore the redirect.
130     // There are so many corner cases with various combinations of opens and closes that
131     // an additional counter on top of seqNum would be necessary to handle them all.
132     RegionLocations updatedLocations = oldLocations.updateLocation(location, false, force);
133     if (oldLocations != updatedLocations) {
134       boolean replaced = tableLocations.replace(startKey, oldLocations, updatedLocations);
135       if (replaced && LOG.isTraceEnabled()) {
136         LOG.trace("Changed cached location to: " + location);
137       }
138       addToCachedServers(updatedLocations);
139     }
140   }
141 
142   /**
143    * Put a newly discovered HRegionLocation into the cache.
144    * @param tableName The table name.
145    * @param locations the new locations
146    */
147   public void cacheLocation(final TableName tableName, final RegionLocations locations) {
148     byte [] startKey = locations.getRegionLocation().getRegionInfo().getStartKey();
149     ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
150     RegionLocations oldLocation = tableLocations.putIfAbsent(startKey, locations);
151     boolean isNewCacheEntry = (oldLocation == null);
152     if (isNewCacheEntry) {
153       if (LOG.isTraceEnabled()) {
154         LOG.trace("Cached location: " + locations);
155       }
156       addToCachedServers(locations);
157       return;
158     }
159 
160     // merge old and new locations and add it to the cache
161     // Meta record might be stale - some (probably the same) server has closed the region
162     // with later seqNum and told us about the new location.
163     RegionLocations mergedLocation = oldLocation.mergeLocations(locations);
164     boolean replaced = tableLocations.replace(startKey, oldLocation, mergedLocation);
165     if (replaced && LOG.isTraceEnabled()) {
166       LOG.trace("Merged cached locations: " + mergedLocation);
167     }
168     addToCachedServers(locations);
169   }
170 
171   private void addToCachedServers(RegionLocations locations) {
172     for (HRegionLocation loc : locations.getRegionLocations()) {
173       if (loc != null) {
174         cachedServers.add(loc.getServerName());
175       }
176     }
177   }
178 
179   /**
180    * @param tableName
181    * @return Map of cached locations for passed <code>tableName</code>
182    */
183   private ConcurrentNavigableMap<byte[], RegionLocations>
184     getTableLocations(final TableName tableName) {
185     // find the map of cached locations for this table
186     ConcurrentNavigableMap<byte[], RegionLocations> result;
187     result = this.cachedRegionLocations.get(tableName);
188     // if tableLocations for this table isn't built yet, make one
189     if (result == null) {
190       result = new CopyOnWriteArrayMap<>(Bytes.BYTES_COMPARATOR);
191       ConcurrentNavigableMap<byte[], RegionLocations> old =
192           this.cachedRegionLocations.putIfAbsent(tableName, result);
193       if (old != null) {
194         return old;
195       }
196     }
197     return result;
198   }
199 
200   /**
201    * Check the region cache to see whether a region is cached yet or not.
202    * @param tableName tableName
203    * @param row row
204    * @return Region cached or not.
205    */
206   public boolean isRegionCached(TableName tableName, final byte[] row) {
207     RegionLocations location = getCachedLocation(tableName, row);
208     return location != null;
209   }
210 
211   /**
212    * Return the number of cached region for a table. It will only be called
213    * from a unit test.
214    */
215   public int getNumberOfCachedRegionLocations(final TableName tableName) {
216     Map<byte[], RegionLocations> tableLocs = this.cachedRegionLocations.get(tableName);
217     if (tableLocs == null) {
218       return 0;
219     }
220     int numRegions = 0;
221     for (RegionLocations tableLoc : tableLocs.values()) {
222       numRegions += tableLoc.numNonNullElements();
223     }
224     return numRegions;
225   }
226 
227   /**
228    * Delete all cached entries.
229    */
230   public void clearCache() {
231     this.cachedRegionLocations.clear();
232     this.cachedServers.clear();
233   }
234 
235   /**
236    * Delete all cached entries of a server.
237    */
238   public void clearCache(final ServerName serverName) {
239     if (!this.cachedServers.contains(serverName)) {
240       return;
241     }
242 
243     boolean deletedSomething = false;
244     synchronized (this.cachedServers) {
245       // We block here, because if there is an error on a server, it's likely that multiple
246       //  threads will get the error  simultaneously. If there are hundreds of thousand of
247       //  region location to check, it's better to do this only once. A better pattern would
248       //  be to check if the server is dead when we get the region location.
249       if (!this.cachedServers.contains(serverName)) {
250         return;
251       }
252       for (ConcurrentMap<byte[], RegionLocations> tableLocations : cachedRegionLocations.values()){
253         for (Entry<byte[], RegionLocations> e : tableLocations.entrySet()) {
254           RegionLocations regionLocations = e.getValue();
255           if (regionLocations != null) {
256             RegionLocations updatedLocations = regionLocations.removeByServer(serverName);
257             if (updatedLocations != regionLocations) {
258               if (updatedLocations.isEmpty()) {
259                 deletedSomething |= tableLocations.remove(e.getKey(), regionLocations);
260               } else {
261                 deletedSomething |= tableLocations.replace(e.getKey(), regionLocations, updatedLocations);
262               }
263             }
264           }
265         }
266       }
267       this.cachedServers.remove(serverName);
268     }
269     if (deletedSomething && LOG.isTraceEnabled()) {
270       LOG.trace("Removed all cached region locations that map to " + serverName);
271     }
272   }
273 
274   /**
275    * Delete all cached entries of a table.
276    */
277   public void clearCache(final TableName tableName) {
278     if (LOG.isTraceEnabled()) {
279       LOG.trace("Removed all cached region locations for table " + tableName);
280     }
281     this.cachedRegionLocations.remove(tableName);
282   }
283 
284   /**
285    * Delete a cached location, no matter what it is. Called when we were told to not use cache.
286    * @param tableName tableName
287    * @param row
288    */
289   public void clearCache(final TableName tableName, final byte [] row, int replicaId) {
290     ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
291 
292     boolean removed = false;
293     RegionLocations regionLocations = getCachedLocation(tableName, row);
294     if (regionLocations != null) {
295       HRegionLocation toBeRemoved = regionLocations.getRegionLocation(replicaId);
296       RegionLocations updatedLocations = regionLocations.remove(replicaId);
297       if (updatedLocations != regionLocations) {
298         byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey();
299         if (updatedLocations.isEmpty()) {
300           removed = tableLocations.remove(startKey, regionLocations);
301         } else {
302           removed = tableLocations.replace(startKey, regionLocations, updatedLocations);
303         }
304       }
305 
306       if (removed && LOG.isTraceEnabled() && toBeRemoved != null) {
307         LOG.trace("Removed " + toBeRemoved + " from cache");
308       }
309     }
310   }
311 
312   /**
313    * Delete a cached location, no matter what it is. Called when we were told to not use cache.
314    * @param tableName tableName
315    * @param row
316    */
317   public void clearCache(final TableName tableName, final byte [] row) {
318     ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
319 
320     RegionLocations regionLocations = getCachedLocation(tableName, row);
321     if (regionLocations != null) {
322       byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey();
323       boolean removed = tableLocations.remove(startKey, regionLocations);
324       if (removed && LOG.isTraceEnabled()) {
325         LOG.trace("Removed " + regionLocations + " from cache");
326       }
327     }
328   }
329 
330   /**
331    * Delete a cached location for a table, row and server
332    */
333   public void clearCache(final TableName tableName, final byte [] row, ServerName serverName) {
334     ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
335 
336     RegionLocations regionLocations = getCachedLocation(tableName, row);
337     if (regionLocations != null) {
338       RegionLocations updatedLocations = regionLocations.removeByServer(serverName);
339       if (updatedLocations != regionLocations) {
340         byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey();
341         boolean removed = false;
342         if (updatedLocations.isEmpty()) {
343           removed = tableLocations.remove(startKey, regionLocations);
344         } else {
345           removed = tableLocations.replace(startKey, regionLocations, updatedLocations);
346         }
347         if (removed && LOG.isTraceEnabled()) {
348           LOG.trace("Removed locations of table: " + tableName + " ,row: " + Bytes.toString(row)
349             + " mapping to server: " + serverName + " from cache");
350         }
351       }
352     }
353   }
354 
355   /**
356    * Deletes the cached location of the region if necessary, based on some error from source.
357    * @param hri The region in question.
358    */
359   public void clearCache(HRegionInfo hri) {
360     ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(hri.getTable());
361     RegionLocations regionLocations = tableLocations.get(hri.getStartKey());
362     if (regionLocations != null) {
363       HRegionLocation oldLocation = regionLocations.getRegionLocation(hri.getReplicaId());
364       if (oldLocation == null) return;
365       RegionLocations updatedLocations = regionLocations.remove(oldLocation);
366       boolean removed = false;
367       if (updatedLocations != regionLocations) {
368         if (updatedLocations.isEmpty()) {
369           removed = tableLocations.remove(hri.getStartKey(), regionLocations);
370         } else {
371           removed = tableLocations.replace(hri.getStartKey(), regionLocations, updatedLocations);
372         }
373         if (removed && LOG.isTraceEnabled()) {
374           LOG.trace("Removed " + oldLocation + " from cache");
375         }
376       }
377     }
378   }
379 
380   public void clearCache(final HRegionLocation location) {
381     if (location == null) {
382       return;
383     }
384     TableName tableName = location.getRegionInfo().getTable();
385     ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
386     RegionLocations regionLocations = tableLocations.get(location.getRegionInfo().getStartKey());
387     if (regionLocations != null) {
388       RegionLocations updatedLocations = regionLocations.remove(location);
389       boolean removed = false;
390       if (updatedLocations != regionLocations) {
391         if (updatedLocations.isEmpty()) {
392           removed = tableLocations.remove(location.getRegionInfo().getStartKey(), regionLocations);
393         } else {
394           removed = tableLocations.replace(location.getRegionInfo().getStartKey(), regionLocations, updatedLocations);
395         }
396         if (removed && LOG.isTraceEnabled()) {
397           LOG.trace("Removed " + location + " from cache");
398         }
399       }
400     }
401   }
402 }