View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.io;
20  
21  import java.util.ArrayList;
22  import java.util.Arrays;
23  import java.util.Collection;
24  import java.io.IOException;
25  import java.io.InputStream;
26  import java.io.FileNotFoundException;
27  import java.util.List;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.fs.FSDataInputStream;
33  import org.apache.hadoop.fs.FileSystem;
34  import org.apache.hadoop.fs.FileStatus;
35  import org.apache.hadoop.fs.Path;
36  import org.apache.hadoop.fs.PositionedReadable;
37  import org.apache.hadoop.fs.Seekable;
38  import org.apache.hadoop.hbase.util.FSUtils;
39  import org.apache.hadoop.ipc.RemoteException;
40  
41  /**
42   * The FileLink is a sort of hardlink, that allows access to a file given a set of locations.
43   *
44   * <p><b>The Problem:</b>
45   * <ul>
46   *  <li>
47   *    HDFS doesn't have support for hardlinks, and this make impossible to referencing
48   *    the same data blocks using different names.
49   *  </li>
50   *  <li>
51   *    HBase store files in one location (e.g. table/region/family/) and when the file is not
52   *    needed anymore (e.g. compaction, region deletion, ...) moves it to an archive directory.
53   *  </li>
54   * </ul>
55   * If we want to create a reference to a file, we need to remember that it can be in its
56   * original location or in the archive folder.
57   * The FileLink class tries to abstract this concept and given a set of locations
58   * it is able to switch between them making this operation transparent for the user.
59   * {@link HFileLink} is a more concrete implementation of the {@code FileLink}.
60   *
61   * <p><b>Back-references:</b>
62   * To help the {@link org.apache.hadoop.hbase.master.cleaner.CleanerChore} to keep track of
63   * the links to a particular file, during the {@code FileLink} creation, a new file is placed
64   * inside a back-reference directory. There's one back-reference directory for each file that
65   * has links, and in the directory there's one file per link.
66   *
67   * <p>HFileLink Example
68   * <ul>
69   *  <li>
70   *      /hbase/table/region-x/cf/file-k
71   *      (Original File)
72   *  </li>
73   *  <li>
74   *      /hbase/table-cloned/region-y/cf/file-k.region-x.table
75   *     (HFileLink to the original file)
76   *  </li>
77   *  <li>
78   *      /hbase/table-2nd-cloned/region-z/cf/file-k.region-x.table
79   *      (HFileLink to the original file)
80   *  </li>
81   *  <li>
82   *      /hbase/.archive/table/region-x/.links-file-k/region-y.table-cloned
83   *      (Back-reference to the link in table-cloned)
84   *  </li>
85   *  <li>
86   *      /hbase/.archive/table/region-x/.links-file-k/region-z.table-2nd-cloned
87   *      (Back-reference to the link in table-2nd-cloned)
88   *  </li>
89   * </ul>
90   */
91  @InterfaceAudience.Private
92  public class FileLink {
93    private static final Log LOG = LogFactory.getLog(FileLink.class);
94  
95    /** Define the Back-reference directory name prefix: .links-<hfile>/ */
96    public static final String BACK_REFERENCES_DIRECTORY_PREFIX = ".links-";
97  
98    /**
99     * FileLink InputStream that handles the switch between the original path
100    * and the alternative locations, when the file is moved.
101    */
102   private static class FileLinkInputStream extends InputStream
103       implements Seekable, PositionedReadable {
104     private FSDataInputStream in = null;
105     private Path currentPath = null;
106     private long pos = 0;
107 
108     private final FileLink fileLink;
109     private final int bufferSize;
110     private final FileSystem fs;
111 
112     public FileLinkInputStream(final FileSystem fs, final FileLink fileLink)
113         throws IOException {
114       this(fs, fileLink, FSUtils.getDefaultBufferSize(fs));
115     }
116 
117     public FileLinkInputStream(final FileSystem fs, final FileLink fileLink, int bufferSize)
118         throws IOException {
119       this.bufferSize = bufferSize;
120       this.fileLink = fileLink;
121       this.fs = fs;
122 
123       this.in = tryOpen();
124     }
125 
126     @Override
127     public int read() throws IOException {
128       int res;
129       try {
130         res = in.read();
131       } catch (FileNotFoundException e) {
132         res = tryOpen().read();
133       } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
134         res = tryOpen().read();
135       } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
136         res = tryOpen().read();
137       }
138       if (res > 0) pos += 1;
139       return res;
140     }
141 
142     @Override
143     public int read(byte[] b) throws IOException {
144        return read(b, 0, b.length);
145     }
146 
147     @Override
148     public int read(byte[] b, int off, int len) throws IOException {
149       int n;
150       try {
151         n = in.read(b, off, len);
152       } catch (FileNotFoundException e) {
153         n = tryOpen().read(b, off, len);
154       } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
155         n = tryOpen().read(b, off, len);
156       } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
157         n = tryOpen().read(b, off, len);
158       }
159       if (n > 0) pos += n;
160       assert(in.getPos() == pos);
161       return n;
162     }
163 
164     @Override
165     public int read(long position, byte[] buffer, int offset, int length) throws IOException {
166       int n;
167       try {
168         n = in.read(position, buffer, offset, length);
169       } catch (FileNotFoundException e) {
170         n = tryOpen().read(position, buffer, offset, length);
171       } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
172         n = tryOpen().read(position, buffer, offset, length);
173       } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
174         n = tryOpen().read(position, buffer, offset, length);
175       }
176       return n;
177     }
178 
179     @Override
180     public void readFully(long position, byte[] buffer) throws IOException {
181       readFully(position, buffer, 0, buffer.length);
182     }
183 
184     @Override
185     public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
186       try {
187         in.readFully(position, buffer, offset, length);
188       } catch (FileNotFoundException e) {
189         tryOpen().readFully(position, buffer, offset, length);
190       } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
191         tryOpen().readFully(position, buffer, offset, length);
192       } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
193         tryOpen().readFully(position, buffer, offset, length);
194       }
195     }
196 
197     @Override
198     public long skip(long n) throws IOException {
199       long skipped;
200 
201       try {
202         skipped = in.skip(n);
203       } catch (FileNotFoundException e) {
204         skipped = tryOpen().skip(n);
205       } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
206         skipped = tryOpen().skip(n);
207       } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
208         skipped = tryOpen().skip(n);
209       }
210 
211       if (skipped > 0) pos += skipped;
212       return skipped;
213     }
214 
215     @Override
216     public int available() throws IOException {
217       try {
218         return in.available();
219       } catch (FileNotFoundException e) {
220         return tryOpen().available();
221       } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
222         return tryOpen().available();
223       } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
224         return tryOpen().available();
225       }
226     }
227 
228     @Override
229     public void seek(long pos) throws IOException {
230       try {
231         in.seek(pos);
232       } catch (FileNotFoundException e) {
233         tryOpen().seek(pos);
234       } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
235         tryOpen().seek(pos);
236       } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
237         tryOpen().seek(pos);
238       }
239       this.pos = pos;
240     }
241 
242     @Override
243     public long getPos() throws IOException {
244       return pos;
245     }
246 
247     @Override
248     public boolean seekToNewSource(long targetPos) throws IOException {
249       boolean res;
250       try {
251         res = in.seekToNewSource(targetPos);
252       } catch (FileNotFoundException e) {
253         res = tryOpen().seekToNewSource(targetPos);
254       } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
255         res = tryOpen().seekToNewSource(targetPos);
256       } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
257         res = tryOpen().seekToNewSource(targetPos);
258       }
259       if (res) pos = targetPos;
260       return res;
261     }
262 
263     @Override
264     public void close() throws IOException {
265       in.close();
266     }
267 
268     @Override
269     public synchronized void mark(int readlimit) {
270     }
271 
272     @Override
273     public synchronized void reset() throws IOException {
274       throw new IOException("mark/reset not supported");
275     }
276 
277     @Override
278     public boolean markSupported() {
279       return false;
280     }
281 
282     /**
283      * Try to open the file from one of the available locations.
284      *
285      * @return FSDataInputStream stream of the opened file link
286      * @throws IOException on unexpected error, or file not found.
287      */
288     private FSDataInputStream tryOpen() throws IOException {
289       for (Path path: fileLink.getLocations()) {
290         if (path.equals(currentPath)) continue;
291         try {
292           in = fs.open(path, bufferSize);
293           if (pos != 0) in.seek(pos);
294           assert(in.getPos() == pos) : "Link unable to seek to the right position=" + pos;
295           if (LOG.isTraceEnabled()) {
296             if (currentPath == null) {
297               LOG.debug("link open path=" + path);
298             } else {
299               LOG.trace("link switch from path=" + currentPath + " to path=" + path);
300             }
301           }
302           currentPath = path;
303           return(in);
304         } catch (FileNotFoundException e) {
305           // Try another file location
306         } catch (RemoteException re) {
307           IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
308           if (!(ioe instanceof FileNotFoundException)) throw re;
309         }
310       }
311       throw new FileNotFoundException("Unable to open link: " + fileLink);
312     }
313   }
314 
315   private Path[] locations = null;
316 
317   protected FileLink() {
318     this.locations = null;
319   }
320 
321   /**
322    * @param originPath Original location of the file to link
323    * @param alternativePaths Alternative locations to look for the linked file
324    */
325   public FileLink(Path originPath, Path... alternativePaths) {
326     setLocations(originPath, alternativePaths);
327   }
328 
329   /**
330    * @param locations locations to look for the linked file
331    */
332   public FileLink(final Collection<Path> locations) {
333     this.locations = locations.toArray(new Path[locations.size()]);
334   }
335 
336   /**
337    * @return the locations to look for the linked file.
338    */
339   public Path[] getLocations() {
340     return locations;
341   }
342 
343   @Override
344   public String toString() {
345     StringBuilder str = new StringBuilder(getClass().getName());
346     str.append(" locations=[");
347     for (int i = 0; i < locations.length; ++i) {
348       if (i > 0) str.append(", ");
349       str.append(locations[i].toString());
350     }
351     str.append("]");
352     return str.toString();
353   }
354 
355   /**
356    * @return true if the file pointed by the link exists
357    */
358   public boolean exists(final FileSystem fs) throws IOException {
359     for (int i = 0; i < locations.length; ++i) {
360       if (fs.exists(locations[i])) {
361         return true;
362       }
363     }
364     return false;
365   }
366 
367   /**
368    * @return the path of the first available link.
369    */
370   public Path getAvailablePath(FileSystem fs) throws IOException {
371     for (int i = 0; i < locations.length; ++i) {
372       if (fs.exists(locations[i])) {
373         return locations[i];
374       }
375     }
376     throw new FileNotFoundException("Unable to open link: " + this);
377   }
378 
379   /**
380    * Get the FileStatus of the referenced file.
381    *
382    * @param fs {@link FileSystem} on which to get the file status
383    * @return InputStream for the hfile link.
384    * @throws IOException on unexpected error.
385    */
386   public FileStatus getFileStatus(FileSystem fs) throws IOException {
387     for (int i = 0; i < locations.length; ++i) {
388       try {
389         return fs.getFileStatus(locations[i]);
390       } catch (FileNotFoundException e) {
391         // Try another file location
392       }
393     }
394     throw new FileNotFoundException("Unable to open link: " + this);
395   }
396 
397   /**
398    * Open the FileLink for read.
399    * <p>
400    * It uses a wrapper of FSDataInputStream that is agnostic to the location
401    * of the file, even if the file switches between locations.
402    *
403    * @param fs {@link FileSystem} on which to open the FileLink
404    * @return InputStream for reading the file link.
405    * @throws IOException on unexpected error.
406    */
407   public FSDataInputStream open(final FileSystem fs) throws IOException {
408     return new FSDataInputStream(new FileLinkInputStream(fs, this));
409   }
410 
411   /**
412    * Open the FileLink for read.
413    * <p>
414    * It uses a wrapper of FSDataInputStream that is agnostic to the location
415    * of the file, even if the file switches between locations.
416    *
417    * @param fs {@link FileSystem} on which to open the FileLink
418    * @param bufferSize the size of the buffer to be used.
419    * @return InputStream for reading the file link.
420    * @throws IOException on unexpected error.
421    */
422   public FSDataInputStream open(final FileSystem fs, int bufferSize) throws IOException {
423     return new FSDataInputStream(new FileLinkInputStream(fs, this, bufferSize));
424   }
425 
426   /**
427    * NOTE: This method must be used only in the constructor!
428    * It creates a List with the specified locations for the link.
429    */
430   protected void setLocations(Path originPath, Path... alternativePaths) {
431     assert this.locations == null : "Link locations already set";
432 
433     List<Path> paths = new ArrayList<Path>(alternativePaths.length +1);
434     if (originPath != null) {
435       paths.add(originPath);
436     }
437 
438     for (int i = 0; i < alternativePaths.length; i++) {
439       if (alternativePaths[i] != null) {
440         paths.add(alternativePaths[i]);
441       }
442     }
443     this.locations = paths.toArray(new Path[0]);
444   }
445 
446   /**
447    * Get the directory to store the link back references
448    *
449    * <p>To simplify the reference count process, during the FileLink creation
450    * a back-reference is added to the back-reference directory of the specified file.
451    *
452    * @param storeDir Root directory for the link reference folder
453    * @param fileName File Name with links
454    * @return Path for the link back references.
455    */
456   public static Path getBackReferencesDir(final Path storeDir, final String fileName) {
457     return new Path(storeDir, BACK_REFERENCES_DIRECTORY_PREFIX + fileName);
458   }
459 
460   /**
461    * Get the referenced file name from the reference link directory path.
462    *
463    * @param dirPath Link references directory path
464    * @return Name of the file referenced
465    */
466   public static String getBackReferenceFileName(final Path dirPath) {
467     return dirPath.getName().substring(BACK_REFERENCES_DIRECTORY_PREFIX.length());
468   }
469 
470   /**
471    * Checks if the specified directory path is a back reference links folder.
472    *
473    * @param dirPath Directory path to verify
474    * @return True if the specified directory is a link references folder
475    */
476   public static boolean isBackReferencesDir(final Path dirPath) {
477     if (dirPath == null) return false;
478     return dirPath.getName().startsWith(BACK_REFERENCES_DIRECTORY_PREFIX);
479   }
480 
481   @Override
482   public boolean equals(Object obj) {
483     if (obj == null) {
484       return false;
485     }
486     // Assumes that the ordering of locations between objects are the same. This is true for the
487     // current subclasses already (HFileLink, WALLink). Otherwise, we may have to sort the locations
488     // or keep them presorted
489     if (this.getClass().equals(obj.getClass())) {
490       return Arrays.equals(this.locations, ((FileLink) obj).locations);
491     }
492 
493     return false;
494   }
495 
496   @Override
497   public int hashCode() {
498     return Arrays.hashCode(locations);
499   }
500 }
501