View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.fs;
22  
23  import java.io.Closeable;
24  import java.io.IOException;
25  import java.lang.reflect.Field;
26  import java.lang.reflect.InvocationHandler;
27  import java.lang.reflect.InvocationTargetException;
28  import java.lang.reflect.Method;
29  import java.lang.reflect.Modifier;
30  import java.lang.reflect.Proxy;
31  import java.lang.reflect.UndeclaredThrowableException;
32  import java.net.URI;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FSDataOutputStream;
38  import org.apache.hadoop.fs.FileSystem;
39  import org.apache.hadoop.fs.FilterFileSystem;
40  import org.apache.hadoop.fs.LocalFileSystem;
41  import org.apache.hadoop.fs.Path;
42  import org.apache.hadoop.hbase.ServerName;
43  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
44  import org.apache.hadoop.hdfs.DFSClient;
45  import org.apache.hadoop.hdfs.DistributedFileSystem;
46  import org.apache.hadoop.hdfs.protocol.ClientProtocol;
47  import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
48  import org.apache.hadoop.hdfs.protocol.LocatedBlock;
49  import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
50  import org.apache.hadoop.ipc.RPC;
51  import org.apache.hadoop.util.Progressable;
52  import org.apache.hadoop.util.ReflectionUtils;
53  
54  /**
55   * An encapsulation for the FileSystem object that hbase uses to access
56   * data. This class allows the flexibility of using
57   * separate filesystem objects for reading and writing hfiles and wals.
58   */
59  public class HFileSystem extends FilterFileSystem {
60    public static final Log LOG = LogFactory.getLog(HFileSystem.class);
61  
62    /** Parameter name for HBase WAL directory */
63    public static final String HBASE_WAL_DIR = "hbase.wal.dir";
64  
65    private final FileSystem noChecksumFs;   // read hfile data from storage
66    private final boolean useHBaseChecksum;
67  
68    /**
69     * Create a FileSystem object for HBase regionservers.
70     * @param conf The configuration to be used for the filesystem
71     * @param useHBaseChecksum if true, then use
72     *        checksum verfication in hbase, otherwise
73     *        delegate checksum verification to the FileSystem.
74     */
75    public HFileSystem(Configuration conf, boolean useHBaseChecksum)
76      throws IOException {
77  
78      // Create the default filesystem with checksum verification switched on.
79      // By default, any operation to this FilterFileSystem occurs on
80      // the underlying filesystem that has checksums switched on.
81      this.fs = FileSystem.get(conf);
82      this.useHBaseChecksum = useHBaseChecksum;
83  
84      fs.initialize(getDefaultUri(conf), conf);
85      
86      // disable checksum verification for local fileSystem, see HBASE-11218
87      if (fs instanceof LocalFileSystem) {
88        fs.setWriteChecksum(false);
89        fs.setVerifyChecksum(false);
90      }
91  
92      addLocationsOrderInterceptor(conf);
93  
94      // If hbase checksum verification is switched on, then create a new
95      // filesystem object that has cksum verification turned off.
96      // We will avoid verifying checksums in the fs client, instead do it
97      // inside of hbase.
98      // If this is the local file system hadoop has a bug where seeks
99      // do not go to the correct location if setVerifyChecksum(false) is called.
100     // This manifests itself in that incorrect data is read and HFileBlocks won't be able to read
101     // their header magic numbers. See HBASE-5885
102     if (useHBaseChecksum && !(fs instanceof LocalFileSystem)) {
103       conf = new Configuration(conf);
104       conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
105       this.noChecksumFs = newInstanceFileSystem(conf);
106       this.noChecksumFs.setVerifyChecksum(false);
107     } else {
108       this.noChecksumFs = fs;
109     }
110   }
111 
112   /**
113    * Wrap a FileSystem object within a HFileSystem. The noChecksumFs and
114    * writefs are both set to be the same specified fs. 
115    * Do not verify hbase-checksums while reading data from filesystem.
116    * @param fs Set the noChecksumFs and writeFs to this specified filesystem.
117    */
118   public HFileSystem(FileSystem fs) {
119     this.fs = fs;
120     this.noChecksumFs = fs;
121     this.useHBaseChecksum = false;
122   }
123 
124   /**
125    * Returns the filesystem that is specially setup for 
126    * doing reads from storage. This object avoids doing 
127    * checksum verifications for reads.
128    * @return The FileSystem object that can be used to read data
129    *         from files.
130    */
131   public FileSystem getNoChecksumFs() {
132     return noChecksumFs;
133   }
134 
135   /**
136    * Returns the underlying filesystem
137    * @return The underlying FileSystem for this FilterFileSystem object.
138    */
139   public FileSystem getBackingFs() throws IOException {
140     return fs;
141   }
142 
143   /**
144    * Are we verifying checksums in HBase?
145    * @return True, if hbase is configured to verify checksums,
146    *         otherwise false.
147    */
148   public boolean useHBaseChecksum() {
149     return useHBaseChecksum;
150   }
151 
152   /**
153    * Close this filesystem object
154    */
155   @Override
156   public void close() throws IOException {
157     super.close();
158     if (this.noChecksumFs != fs) {
159       this.noChecksumFs.close();
160     }
161   }
162 
163  /**
164    * Returns a brand new instance of the FileSystem. It does not use
165    * the FileSystem.Cache. In newer versions of HDFS, we can directly
166    * invoke FileSystem.newInstance(Configuration).
167    * 
168    * @param conf Configuration
169    * @return A new instance of the filesystem
170    */
171   private static FileSystem newInstanceFileSystem(Configuration conf)
172     throws IOException {
173     URI uri = FileSystem.getDefaultUri(conf);
174     FileSystem fs = null;
175     Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
176     if (clazz != null) {
177       // This will be true for Hadoop 1.0, or 0.20.
178       fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
179       fs.initialize(uri, conf);
180     } else {
181       // For Hadoop 2.0, we have to go through FileSystem for the filesystem
182       // implementation to be loaded by the service loader in case it has not
183       // been loaded yet.
184       Configuration clone = new Configuration(conf);
185       clone.setBoolean("fs." + uri.getScheme() + ".impl.disable.cache", true);
186       fs = FileSystem.get(uri, clone);
187     }
188     if (fs == null) {
189       throw new IOException("No FileSystem for scheme: " + uri.getScheme());
190     }
191 
192     return fs;
193   }
194 
195   public static boolean addLocationsOrderInterceptor(Configuration conf) throws IOException {
196     return addLocationsOrderInterceptor(conf, new ReorderWALBlocks());
197   }
198 
199   /**
200    * Add an interceptor on the calls to the namenode#getBlockLocations from the DFSClient
201    * linked to this FileSystem. See HBASE-6435 for the background.
202    * <p/>
203    * There should be no reason, except testing, to create a specific ReorderBlocks.
204    *
205    * @return true if the interceptor was added, false otherwise.
206    */
207   static boolean addLocationsOrderInterceptor(Configuration conf, final ReorderBlocks lrb) {
208     if (!conf.getBoolean("hbase.filesystem.reorder.blocks", true)) {  // activated by default
209       LOG.debug("addLocationsOrderInterceptor configured to false");
210       return false;
211     }
212 
213     FileSystem fs;
214     try {
215       fs = FileSystem.get(conf);
216     } catch (IOException e) {
217       LOG.warn("Can't get the file system from the conf.", e);
218       return false;
219     }
220 
221     if (!(fs instanceof DistributedFileSystem)) {
222       LOG.debug("The file system is not a DistributedFileSystem. " +
223           "Skipping on block location reordering");
224       return false;
225     }
226 
227     DistributedFileSystem dfs = (DistributedFileSystem) fs;
228     DFSClient dfsc = dfs.getClient();
229     if (dfsc == null) {
230       LOG.warn("The DistributedFileSystem does not contain a DFSClient. Can't add the location " +
231           "block reordering interceptor. Continuing, but this is unexpected."
232       );
233       return false;
234     }
235 
236     try {
237       Field nf = DFSClient.class.getDeclaredField("namenode");
238       nf.setAccessible(true);
239       Field modifiersField = Field.class.getDeclaredField("modifiers");
240       modifiersField.setAccessible(true);
241       modifiersField.setInt(nf, nf.getModifiers() & ~Modifier.FINAL);
242 
243       ClientProtocol namenode = (ClientProtocol) nf.get(dfsc);
244       if (namenode == null) {
245         LOG.warn("The DFSClient is not linked to a namenode. Can't add the location block" +
246             " reordering interceptor. Continuing, but this is unexpected."
247         );
248         return false;
249       }
250 
251       ClientProtocol cp1 = createReorderingProxy(namenode, lrb, conf);
252       nf.set(dfsc, cp1);
253       LOG.info("Added intercepting call to namenode#getBlockLocations so can do block reordering" +
254         " using class " + lrb.getClass());
255     } catch (NoSuchFieldException e) {
256       LOG.warn("Can't modify the DFSClient#namenode field to add the location reorder.", e);
257       return false;
258     } catch (IllegalAccessException e) {
259       LOG.warn("Can't modify the DFSClient#namenode field to add the location reorder.", e);
260       return false;
261     }
262 
263     return true;
264   }
265 
266   private static ClientProtocol createReorderingProxy(final ClientProtocol cp,
267       final ReorderBlocks lrb, final Configuration conf) {
268     return (ClientProtocol) Proxy.newProxyInstance
269         (cp.getClass().getClassLoader(),
270             new Class[]{ClientProtocol.class, Closeable.class},
271             new InvocationHandler() {
272               public Object invoke(Object proxy, Method method,
273                                    Object[] args) throws Throwable {
274                 try {
275                   if ((args == null || args.length == 0)
276                       && "close".equals(method.getName())) {
277                     RPC.stopProxy(cp);
278                     return null;
279                   } else {
280                     Object res = method.invoke(cp, args);
281                     if (res != null && args != null && args.length == 3
282                         && "getBlockLocations".equals(method.getName())
283                         && res instanceof LocatedBlocks
284                         && args[0] instanceof String
285                         && args[0] != null) {
286                       lrb.reorderBlocks(conf, (LocatedBlocks) res, (String) args[0]);
287                     }
288                     return res;
289                   }
290                 } catch  (InvocationTargetException ite) {
291                   // We will have this for all the exception, checked on not, sent
292                   //  by any layer, including the functional exception
293                   Throwable cause = ite.getCause();
294                   if (cause == null){
295                     throw new RuntimeException(
296                       "Proxy invocation failed and getCause is null", ite);
297                   }
298                   if (cause instanceof UndeclaredThrowableException) {
299                     Throwable causeCause = cause.getCause();
300                     if (causeCause == null) {
301                       throw new RuntimeException("UndeclaredThrowableException had null cause!");
302                     }
303                     cause = cause.getCause();
304                   }
305                   throw cause;
306                 }
307               }
308             });
309   }
310 
311   /**
312    * Interface to implement to add a specific reordering logic in hdfs.
313    */
314   interface ReorderBlocks {
315     /**
316      *
317      * @param conf - the conf to use
318      * @param lbs - the LocatedBlocks to reorder
319      * @param src - the file name currently read
320      * @throws IOException - if something went wrong
321      */
322     void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src) throws IOException;
323   }
324 
325   /**
326    * We're putting at lowest priority the wal files blocks that are on the same datanode
327    * as the original regionserver which created these files. This because we fear that the
328    * datanode is actually dead, so if we use it it will timeout.
329    */
330   static class ReorderWALBlocks implements ReorderBlocks {
331     public void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src)
332         throws IOException {
333 
334       ServerName sn = DefaultWALProvider.getServerNameFromWALDirectoryName(conf, src);
335       if (sn == null) {
336         // It's not an WAL
337         return;
338       }
339 
340       // Ok, so it's an WAL
341       String hostName = sn.getHostname();
342       if (LOG.isTraceEnabled()) {
343         LOG.trace(src +
344             " is an WAL file, so reordering blocks, last hostname will be:" + hostName);
345       }
346 
347       // Just check for all blocks
348       for (LocatedBlock lb : lbs.getLocatedBlocks()) {
349         DatanodeInfo[] dnis = lb.getLocations();
350         if (dnis != null && dnis.length > 1) {
351           boolean found = false;
352           for (int i = 0; i < dnis.length - 1 && !found; i++) {
353             if (hostName.equals(dnis[i].getHostName())) {
354               // advance the other locations by one and put this one at the last place.
355               DatanodeInfo toLast = dnis[i];
356               System.arraycopy(dnis, i + 1, dnis, i, dnis.length - i - 1);
357               dnis[dnis.length - 1] = toLast;
358               found = true;
359             }
360           }
361         }
362       }
363     }
364   }
365 
366   /**
367    * Create a new HFileSystem object, similar to FileSystem.get().
368    * This returns a filesystem object that avoids checksum
369    * verification in the filesystem for hfileblock-reads.
370    * For these blocks, checksum verification is done by HBase.
371    */
372   static public FileSystem get(Configuration conf) throws IOException {
373     return new HFileSystem(conf, true);
374   }
375 
376   /**
377    * Wrap a LocalFileSystem within a HFileSystem.
378    */
379   static public FileSystem getLocalFs(Configuration conf) throws IOException {
380     return new HFileSystem(FileSystem.getLocal(conf));
381   }
382 
383   /**
384    * The org.apache.hadoop.fs.FilterFileSystem does not yet support 
385    * createNonRecursive. This is a hadoop bug and when it is fixed in Hadoop,
386    * this definition will go away.
387    */
388   @SuppressWarnings("deprecation")
389   public FSDataOutputStream createNonRecursive(Path f,
390       boolean overwrite,
391       int bufferSize, short replication, long blockSize,
392       Progressable progress) throws IOException {
393     return fs.createNonRecursive(f, overwrite, bufferSize, replication,
394                                  blockSize, progress);
395   }
396 }