View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.io;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertNotEquals;
24  import static org.junit.Assert.assertTrue;
25  
26  import java.io.FileNotFoundException;
27  import java.io.IOException;
28  import java.util.ArrayList;
29  import java.util.List;
30  
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.fs.FSDataInputStream;
33  import org.apache.hadoop.fs.FSDataOutputStream;
34  import org.apache.hadoop.fs.FileSystem;
35  import org.apache.hadoop.fs.Path;
36  import org.apache.hadoop.hbase.HBaseTestingUtility;
37  import org.apache.hadoop.hbase.testclassification.MediumTests;
38  import org.apache.hadoop.hbase.util.FSUtils;
39  import org.apache.hadoop.hdfs.DistributedFileSystem;
40  import org.apache.hadoop.hdfs.MiniDFSCluster;
41  import org.apache.hadoop.ipc.RemoteException;
42  import org.junit.Test;
43  import org.junit.experimental.categories.Category;
44  
45  /**
46   * Test that FileLink switches between alternate locations
47   * when the current location moves or gets deleted.
48   */
49  @Category(MediumTests.class)
50  public class TestFileLink {
51  
52    @Test
53    public void testEquals() {
54      Path p1 = new Path("/p1");
55      Path p2 = new Path("/p2");
56      Path p3 = new Path("/p3");
57  
58      assertEquals(new FileLink(), new FileLink());
59      assertEquals(new FileLink(p1), new FileLink(p1));
60      assertEquals(new FileLink(p1, p2), new FileLink(p1, p2));
61      assertEquals(new FileLink(p1, p2, p3), new FileLink(p1, p2, p3));
62  
63      assertNotEquals(new FileLink(p1), new FileLink(p3));
64      assertNotEquals(new FileLink(p1, p2), new FileLink(p1));
65      assertNotEquals(new FileLink(p1, p2), new FileLink(p2));
66      assertNotEquals(new FileLink(p1, p2), new FileLink(p2, p1)); // ordering important!
67    }
68  
69    @Test
70    public void testHashCode() {
71      Path p1 = new Path("/p1");
72      Path p2 = new Path("/p2");
73      Path p3 = new Path("/p3");
74  
75      assertEquals(new FileLink().hashCode(), new FileLink().hashCode());
76      assertEquals(new FileLink(p1).hashCode(), new FileLink(p1).hashCode());
77      assertEquals(new FileLink(p1, p2).hashCode(), new FileLink(p1, p2).hashCode());
78      assertEquals(new FileLink(p1, p2, p3).hashCode(), new FileLink(p1, p2, p3).hashCode());
79  
80      assertNotEquals(new FileLink(p1).hashCode(), new FileLink(p3).hashCode());
81      assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p1).hashCode());
82      assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p2).hashCode());
83      assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p2, p1).hashCode()); // ordering
84    }
85  
86    /**
87     * Test, on HDFS, that the FileLink is still readable
88     * even when the current file gets renamed.
89     */
90    @Test
91    public void testHDFSLinkReadDuringRename() throws Exception {
92      HBaseTestingUtility testUtil = new HBaseTestingUtility();
93      Configuration conf = testUtil.getConfiguration();
94      conf.setInt("dfs.blocksize", 1024 * 1024);
95      conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
96  
97      testUtil.startMiniDFSCluster(1);
98      MiniDFSCluster cluster = testUtil.getDFSCluster();
99      FileSystem fs = cluster.getFileSystem();
100     assertEquals("hdfs", fs.getUri().getScheme());
101 
102     try {
103       testLinkReadDuringRename(fs, testUtil.getDefaultRootDirPath());
104     } finally {
105       testUtil.shutdownMiniCluster();
106     }
107   }
108 
109   private static class MyDistributedFileSystem extends DistributedFileSystem {
110     MyDistributedFileSystem() {
111     }
112     @Override
113     public FSDataInputStream open(Path f, final int bufferSize)
114         throws IOException {
115       throw new RemoteException(FileNotFoundException.class.getName(), "");
116     }
117     @Override
118     public Configuration getConf() {
119       return new Configuration();
120     }
121   }
122   @Test(expected = FileNotFoundException.class)
123   public void testLinkReadWithMissingFile() throws Exception {
124     HBaseTestingUtility testUtil = new HBaseTestingUtility();
125     FileSystem fs = new MyDistributedFileSystem();
126 
127     Path originalPath = new Path(testUtil.getDefaultRootDirPath(), "test.file");
128     Path archivedPath = new Path(testUtil.getDefaultRootDirPath(), "archived.file");
129 
130     List<Path> files = new ArrayList<Path>();
131     files.add(originalPath);
132     files.add(archivedPath);
133 
134     FileLink link = new FileLink(files);
135     link.open(fs);
136   }
137 
138   /**
139    * Test, on a local filesystem, that the FileLink is still readable
140    * even when the current file gets renamed.
141    */
142   @Test
143   public void testLocalLinkReadDuringRename() throws IOException {
144     HBaseTestingUtility testUtil = new HBaseTestingUtility();
145     FileSystem fs = testUtil.getTestFileSystem();
146     assertEquals("file", fs.getUri().getScheme());
147     testLinkReadDuringRename(fs, testUtil.getDataTestDir());
148   }
149 
150   /**
151    * Test that link is still readable even when the current file gets renamed.
152    */
153   private void testLinkReadDuringRename(FileSystem fs, Path rootDir) throws IOException {
154     Path originalPath = new Path(rootDir, "test.file");
155     Path archivedPath = new Path(rootDir, "archived.file");
156 
157     writeSomeData(fs, originalPath, 256 << 20, (byte)2);
158 
159     List<Path> files = new ArrayList<Path>();
160     files.add(originalPath);
161     files.add(archivedPath);
162 
163     FileLink link = new FileLink(files);
164     FSDataInputStream in = link.open(fs);
165     try {
166       byte[] data = new byte[8192];
167       long size = 0;
168 
169       // Read from origin
170       int n = in.read(data);
171       dataVerify(data, n, (byte)2);
172       size += n;
173 
174       if (FSUtils.WINDOWS) {
175         in.close();
176       }
177 
178       // Move origin to archive
179       assertFalse(fs.exists(archivedPath));
180       fs.rename(originalPath, archivedPath);
181       assertFalse(fs.exists(originalPath));
182       assertTrue(fs.exists(archivedPath));
183 
184       if (FSUtils.WINDOWS) {
185         in = link.open(fs); // re-read from beginning
186         in.read(data);
187       }
188 
189       // Try to read to the end
190       while ((n = in.read(data)) > 0) {
191         dataVerify(data, n, (byte)2);
192         size += n;
193       }
194 
195       assertEquals(256 << 20, size);
196     } finally {
197       in.close();
198       if (fs.exists(originalPath)) fs.delete(originalPath, true);
199       if (fs.exists(archivedPath)) fs.delete(archivedPath, true);
200     }
201   }
202 
203   /**
204    * Test that link is still readable even when the current file gets deleted.
205    *
206    * NOTE: This test is valid only on HDFS.
207    * When a file is deleted from a local file-system, it is simply 'unlinked'.
208    * The inode, which contains the file's data, is not deleted until all
209    * processes have finished with it.
210    * In HDFS when the request exceed the cached block locations,
211    * a query to the namenode is performed, using the filename,
212    * and the deleted file doesn't exists anymore (FileNotFoundException).
213    */
214   @Test
215   public void testHDFSLinkReadDuringDelete() throws Exception {
216     HBaseTestingUtility testUtil = new HBaseTestingUtility();
217     Configuration conf = testUtil.getConfiguration();
218     conf.setInt("dfs.blocksize", 1024 * 1024);
219     conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
220 
221     testUtil.startMiniDFSCluster(1);
222     MiniDFSCluster cluster = testUtil.getDFSCluster();
223     FileSystem fs = cluster.getFileSystem();
224     assertEquals("hdfs", fs.getUri().getScheme());
225 
226     try {
227       List<Path> files = new ArrayList<Path>();
228       for (int i = 0; i < 3; i++) {
229         Path path = new Path(String.format("test-data-%d", i));
230         writeSomeData(fs, path, 1 << 20, (byte)i);
231         files.add(path);
232       }
233 
234       FileLink link = new FileLink(files);
235       FSDataInputStream in = link.open(fs);
236       try {
237         byte[] data = new byte[8192];
238         int n;
239 
240         // Switch to file 1
241         n = in.read(data);
242         dataVerify(data, n, (byte)0);
243         fs.delete(files.get(0), true);
244         skipBuffer(in, (byte)0);
245 
246         // Switch to file 2
247         n = in.read(data);
248         dataVerify(data, n, (byte)1);
249         fs.delete(files.get(1), true);
250         skipBuffer(in, (byte)1);
251 
252         // Switch to file 3
253         n = in.read(data);
254         dataVerify(data, n, (byte)2);
255         fs.delete(files.get(2), true);
256         skipBuffer(in, (byte)2);
257 
258         // No more files available
259         try {
260           n = in.read(data);
261           assert(n <= 0);
262         } catch (FileNotFoundException e) {
263           assertTrue(true);
264         }
265       } finally {
266         in.close();
267       }
268     } finally {
269       testUtil.shutdownMiniCluster();
270     }
271   }
272 
273   /**
274    * Write up to 'size' bytes with value 'v' into a new file called 'path'.
275    */
276   private void writeSomeData (FileSystem fs, Path path, long size, byte v) throws IOException {
277     byte[] data = new byte[4096];
278     for (int i = 0; i < data.length; i++) {
279       data[i] = v;
280     }
281 
282     FSDataOutputStream stream = fs.create(path);
283     try {
284       long written = 0;
285       while (written < size) {
286         stream.write(data, 0, data.length);
287         written += data.length;
288       }
289     } finally {
290       stream.close();
291     }
292   }
293 
294   /**
295    * Verify that all bytes in 'data' have 'v' as value.
296    */
297   private static void dataVerify(byte[] data, int n, byte v) {
298     for (int i = 0; i < n; ++i) {
299       assertEquals(v, data[i]);
300     }
301   }
302 
303   private static void skipBuffer(FSDataInputStream in, byte v) throws IOException {
304     byte[] data = new byte[8192];
305     try {
306       int n;
307       while ((n = in.read(data)) == data.length) {
308         for (int i = 0; i < data.length; ++i) {
309           if (data[i] != v)
310             throw new Exception("File changed");
311         }
312       }
313     } catch (Exception e) {
314     }
315   }
316 }