1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.io;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertNotEquals;
24 import static org.junit.Assert.assertTrue;
25
26 import java.io.FileNotFoundException;
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.List;
30
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.FSDataInputStream;
33 import org.apache.hadoop.fs.FSDataOutputStream;
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.hbase.HBaseTestingUtility;
37 import org.apache.hadoop.hbase.testclassification.MediumTests;
38 import org.apache.hadoop.hbase.util.FSUtils;
39 import org.apache.hadoop.hdfs.DistributedFileSystem;
40 import org.apache.hadoop.hdfs.MiniDFSCluster;
41 import org.apache.hadoop.ipc.RemoteException;
42 import org.junit.Test;
43 import org.junit.experimental.categories.Category;
44
45
46
47
48
49 @Category(MediumTests.class)
50 public class TestFileLink {
51
52 @Test
53 public void testEquals() {
54 Path p1 = new Path("/p1");
55 Path p2 = new Path("/p2");
56 Path p3 = new Path("/p3");
57
58 assertEquals(new FileLink(), new FileLink());
59 assertEquals(new FileLink(p1), new FileLink(p1));
60 assertEquals(new FileLink(p1, p2), new FileLink(p1, p2));
61 assertEquals(new FileLink(p1, p2, p3), new FileLink(p1, p2, p3));
62
63 assertNotEquals(new FileLink(p1), new FileLink(p3));
64 assertNotEquals(new FileLink(p1, p2), new FileLink(p1));
65 assertNotEquals(new FileLink(p1, p2), new FileLink(p2));
66 assertNotEquals(new FileLink(p1, p2), new FileLink(p2, p1));
67 }
68
69 @Test
70 public void testHashCode() {
71 Path p1 = new Path("/p1");
72 Path p2 = new Path("/p2");
73 Path p3 = new Path("/p3");
74
75 assertEquals(new FileLink().hashCode(), new FileLink().hashCode());
76 assertEquals(new FileLink(p1).hashCode(), new FileLink(p1).hashCode());
77 assertEquals(new FileLink(p1, p2).hashCode(), new FileLink(p1, p2).hashCode());
78 assertEquals(new FileLink(p1, p2, p3).hashCode(), new FileLink(p1, p2, p3).hashCode());
79
80 assertNotEquals(new FileLink(p1).hashCode(), new FileLink(p3).hashCode());
81 assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p1).hashCode());
82 assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p2).hashCode());
83 assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p2, p1).hashCode());
84 }
85
86
87
88
89
90 @Test
91 public void testHDFSLinkReadDuringRename() throws Exception {
92 HBaseTestingUtility testUtil = new HBaseTestingUtility();
93 Configuration conf = testUtil.getConfiguration();
94 conf.setInt("dfs.blocksize", 1024 * 1024);
95 conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
96
97 testUtil.startMiniDFSCluster(1);
98 MiniDFSCluster cluster = testUtil.getDFSCluster();
99 FileSystem fs = cluster.getFileSystem();
100 assertEquals("hdfs", fs.getUri().getScheme());
101
102 try {
103 testLinkReadDuringRename(fs, testUtil.getDefaultRootDirPath());
104 } finally {
105 testUtil.shutdownMiniCluster();
106 }
107 }
108
109 private static class MyDistributedFileSystem extends DistributedFileSystem {
110 MyDistributedFileSystem() {
111 }
112 @Override
113 public FSDataInputStream open(Path f, final int bufferSize)
114 throws IOException {
115 throw new RemoteException(FileNotFoundException.class.getName(), "");
116 }
117 @Override
118 public Configuration getConf() {
119 return new Configuration();
120 }
121 }
122 @Test(expected = FileNotFoundException.class)
123 public void testLinkReadWithMissingFile() throws Exception {
124 HBaseTestingUtility testUtil = new HBaseTestingUtility();
125 FileSystem fs = new MyDistributedFileSystem();
126
127 Path originalPath = new Path(testUtil.getDefaultRootDirPath(), "test.file");
128 Path archivedPath = new Path(testUtil.getDefaultRootDirPath(), "archived.file");
129
130 List<Path> files = new ArrayList<Path>();
131 files.add(originalPath);
132 files.add(archivedPath);
133
134 FileLink link = new FileLink(files);
135 link.open(fs);
136 }
137
138
139
140
141
142 @Test
143 public void testLocalLinkReadDuringRename() throws IOException {
144 HBaseTestingUtility testUtil = new HBaseTestingUtility();
145 FileSystem fs = testUtil.getTestFileSystem();
146 assertEquals("file", fs.getUri().getScheme());
147 testLinkReadDuringRename(fs, testUtil.getDataTestDir());
148 }
149
150
151
152
153 private void testLinkReadDuringRename(FileSystem fs, Path rootDir) throws IOException {
154 Path originalPath = new Path(rootDir, "test.file");
155 Path archivedPath = new Path(rootDir, "archived.file");
156
157 writeSomeData(fs, originalPath, 256 << 20, (byte)2);
158
159 List<Path> files = new ArrayList<Path>();
160 files.add(originalPath);
161 files.add(archivedPath);
162
163 FileLink link = new FileLink(files);
164 FSDataInputStream in = link.open(fs);
165 try {
166 byte[] data = new byte[8192];
167 long size = 0;
168
169
170 int n = in.read(data);
171 dataVerify(data, n, (byte)2);
172 size += n;
173
174 if (FSUtils.WINDOWS) {
175 in.close();
176 }
177
178
179 assertFalse(fs.exists(archivedPath));
180 fs.rename(originalPath, archivedPath);
181 assertFalse(fs.exists(originalPath));
182 assertTrue(fs.exists(archivedPath));
183
184 if (FSUtils.WINDOWS) {
185 in = link.open(fs);
186 in.read(data);
187 }
188
189
190 while ((n = in.read(data)) > 0) {
191 dataVerify(data, n, (byte)2);
192 size += n;
193 }
194
195 assertEquals(256 << 20, size);
196 } finally {
197 in.close();
198 if (fs.exists(originalPath)) fs.delete(originalPath, true);
199 if (fs.exists(archivedPath)) fs.delete(archivedPath, true);
200 }
201 }
202
203
204
205
206
207
208
209
210
211
212
213
214 @Test
215 public void testHDFSLinkReadDuringDelete() throws Exception {
216 HBaseTestingUtility testUtil = new HBaseTestingUtility();
217 Configuration conf = testUtil.getConfiguration();
218 conf.setInt("dfs.blocksize", 1024 * 1024);
219 conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
220
221 testUtil.startMiniDFSCluster(1);
222 MiniDFSCluster cluster = testUtil.getDFSCluster();
223 FileSystem fs = cluster.getFileSystem();
224 assertEquals("hdfs", fs.getUri().getScheme());
225
226 try {
227 List<Path> files = new ArrayList<Path>();
228 for (int i = 0; i < 3; i++) {
229 Path path = new Path(String.format("test-data-%d", i));
230 writeSomeData(fs, path, 1 << 20, (byte)i);
231 files.add(path);
232 }
233
234 FileLink link = new FileLink(files);
235 FSDataInputStream in = link.open(fs);
236 try {
237 byte[] data = new byte[8192];
238 int n;
239
240
241 n = in.read(data);
242 dataVerify(data, n, (byte)0);
243 fs.delete(files.get(0), true);
244 skipBuffer(in, (byte)0);
245
246
247 n = in.read(data);
248 dataVerify(data, n, (byte)1);
249 fs.delete(files.get(1), true);
250 skipBuffer(in, (byte)1);
251
252
253 n = in.read(data);
254 dataVerify(data, n, (byte)2);
255 fs.delete(files.get(2), true);
256 skipBuffer(in, (byte)2);
257
258
259 try {
260 n = in.read(data);
261 assert(n <= 0);
262 } catch (FileNotFoundException e) {
263 assertTrue(true);
264 }
265 } finally {
266 in.close();
267 }
268 } finally {
269 testUtil.shutdownMiniCluster();
270 }
271 }
272
273
274
275
276 private void writeSomeData (FileSystem fs, Path path, long size, byte v) throws IOException {
277 byte[] data = new byte[4096];
278 for (int i = 0; i < data.length; i++) {
279 data[i] = v;
280 }
281
282 FSDataOutputStream stream = fs.create(path);
283 try {
284 long written = 0;
285 while (written < size) {
286 stream.write(data, 0, data.length);
287 written += data.length;
288 }
289 } finally {
290 stream.close();
291 }
292 }
293
294
295
296
297 private static void dataVerify(byte[] data, int n, byte v) {
298 for (int i = 0; i < n; ++i) {
299 assertEquals(v, data[i]);
300 }
301 }
302
303 private static void skipBuffer(FSDataInputStream in, byte v) throws IOException {
304 byte[] data = new byte[8192];
305 try {
306 int n;
307 while ((n = in.read(data)) == data.length) {
308 for (int i = 0; i < data.length; ++i) {
309 if (data[i] != v)
310 throw new Exception("File changed");
311 }
312 }
313 } catch (Exception e) {
314 }
315 }
316 }