1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.io;
20
21 import java.util.ArrayList;
22 import java.util.Arrays;
23 import java.util.Collection;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.FileNotFoundException;
27 import java.util.List;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.fs.FSDataInputStream;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.FileStatus;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.fs.PositionedReadable;
37 import org.apache.hadoop.fs.Seekable;
38 import org.apache.hadoop.hbase.util.FSUtils;
39 import org.apache.hadoop.ipc.RemoteException;
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91 @InterfaceAudience.Private
92 public class FileLink {
93 private static final Log LOG = LogFactory.getLog(FileLink.class);
94
95
96 public static final String BACK_REFERENCES_DIRECTORY_PREFIX = ".links-";
97
98
99
100
101
102 private static class FileLinkInputStream extends InputStream
103 implements Seekable, PositionedReadable {
104 private FSDataInputStream in = null;
105 private Path currentPath = null;
106 private long pos = 0;
107
108 private final FileLink fileLink;
109 private final int bufferSize;
110 private final FileSystem fs;
111
112 public FileLinkInputStream(final FileSystem fs, final FileLink fileLink)
113 throws IOException {
114 this(fs, fileLink, FSUtils.getDefaultBufferSize(fs));
115 }
116
117 public FileLinkInputStream(final FileSystem fs, final FileLink fileLink, int bufferSize)
118 throws IOException {
119 this.bufferSize = bufferSize;
120 this.fileLink = fileLink;
121 this.fs = fs;
122
123 this.in = tryOpen();
124 }
125
126 @Override
127 public int read() throws IOException {
128 int res;
129 try {
130 res = in.read();
131 } catch (FileNotFoundException e) {
132 res = tryOpen().read();
133 } catch (NullPointerException e) {
134 res = tryOpen().read();
135 } catch (AssertionError e) {
136 res = tryOpen().read();
137 }
138 if (res > 0) pos += 1;
139 return res;
140 }
141
142 @Override
143 public int read(byte[] b) throws IOException {
144 return read(b, 0, b.length);
145 }
146
147 @Override
148 public int read(byte[] b, int off, int len) throws IOException {
149 int n;
150 try {
151 n = in.read(b, off, len);
152 } catch (FileNotFoundException e) {
153 n = tryOpen().read(b, off, len);
154 } catch (NullPointerException e) {
155 n = tryOpen().read(b, off, len);
156 } catch (AssertionError e) {
157 n = tryOpen().read(b, off, len);
158 }
159 if (n > 0) pos += n;
160 assert(in.getPos() == pos);
161 return n;
162 }
163
164 @Override
165 public int read(long position, byte[] buffer, int offset, int length) throws IOException {
166 int n;
167 try {
168 n = in.read(position, buffer, offset, length);
169 } catch (FileNotFoundException e) {
170 n = tryOpen().read(position, buffer, offset, length);
171 } catch (NullPointerException e) {
172 n = tryOpen().read(position, buffer, offset, length);
173 } catch (AssertionError e) {
174 n = tryOpen().read(position, buffer, offset, length);
175 }
176 return n;
177 }
178
179 @Override
180 public void readFully(long position, byte[] buffer) throws IOException {
181 readFully(position, buffer, 0, buffer.length);
182 }
183
184 @Override
185 public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
186 try {
187 in.readFully(position, buffer, offset, length);
188 } catch (FileNotFoundException e) {
189 tryOpen().readFully(position, buffer, offset, length);
190 } catch (NullPointerException e) {
191 tryOpen().readFully(position, buffer, offset, length);
192 } catch (AssertionError e) {
193 tryOpen().readFully(position, buffer, offset, length);
194 }
195 }
196
197 @Override
198 public long skip(long n) throws IOException {
199 long skipped;
200
201 try {
202 skipped = in.skip(n);
203 } catch (FileNotFoundException e) {
204 skipped = tryOpen().skip(n);
205 } catch (NullPointerException e) {
206 skipped = tryOpen().skip(n);
207 } catch (AssertionError e) {
208 skipped = tryOpen().skip(n);
209 }
210
211 if (skipped > 0) pos += skipped;
212 return skipped;
213 }
214
215 @Override
216 public int available() throws IOException {
217 try {
218 return in.available();
219 } catch (FileNotFoundException e) {
220 return tryOpen().available();
221 } catch (NullPointerException e) {
222 return tryOpen().available();
223 } catch (AssertionError e) {
224 return tryOpen().available();
225 }
226 }
227
228 @Override
229 public void seek(long pos) throws IOException {
230 try {
231 in.seek(pos);
232 } catch (FileNotFoundException e) {
233 tryOpen().seek(pos);
234 } catch (NullPointerException e) {
235 tryOpen().seek(pos);
236 } catch (AssertionError e) {
237 tryOpen().seek(pos);
238 }
239 this.pos = pos;
240 }
241
242 @Override
243 public long getPos() throws IOException {
244 return pos;
245 }
246
247 @Override
248 public boolean seekToNewSource(long targetPos) throws IOException {
249 boolean res;
250 try {
251 res = in.seekToNewSource(targetPos);
252 } catch (FileNotFoundException e) {
253 res = tryOpen().seekToNewSource(targetPos);
254 } catch (NullPointerException e) {
255 res = tryOpen().seekToNewSource(targetPos);
256 } catch (AssertionError e) {
257 res = tryOpen().seekToNewSource(targetPos);
258 }
259 if (res) pos = targetPos;
260 return res;
261 }
262
263 @Override
264 public void close() throws IOException {
265 in.close();
266 }
267
268 @Override
269 public synchronized void mark(int readlimit) {
270 }
271
272 @Override
273 public synchronized void reset() throws IOException {
274 throw new IOException("mark/reset not supported");
275 }
276
277 @Override
278 public boolean markSupported() {
279 return false;
280 }
281
282
283
284
285
286
287
288 private FSDataInputStream tryOpen() throws IOException {
289 for (Path path: fileLink.getLocations()) {
290 if (path.equals(currentPath)) continue;
291 try {
292 in = fs.open(path, bufferSize);
293 if (pos != 0) in.seek(pos);
294 assert(in.getPos() == pos) : "Link unable to seek to the right position=" + pos;
295 if (LOG.isTraceEnabled()) {
296 if (currentPath == null) {
297 LOG.debug("link open path=" + path);
298 } else {
299 LOG.trace("link switch from path=" + currentPath + " to path=" + path);
300 }
301 }
302 currentPath = path;
303 return(in);
304 } catch (FileNotFoundException e) {
305
306 } catch (RemoteException re) {
307 IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
308 if (!(ioe instanceof FileNotFoundException)) throw re;
309 }
310 }
311 throw new FileNotFoundException("Unable to open link: " + fileLink);
312 }
313 }
314
315 private Path[] locations = null;
316
317 protected FileLink() {
318 this.locations = null;
319 }
320
321
322
323
324
325 public FileLink(Path originPath, Path... alternativePaths) {
326 setLocations(originPath, alternativePaths);
327 }
328
329
330
331
332 public FileLink(final Collection<Path> locations) {
333 this.locations = locations.toArray(new Path[locations.size()]);
334 }
335
336
337
338
339 public Path[] getLocations() {
340 return locations;
341 }
342
343 @Override
344 public String toString() {
345 StringBuilder str = new StringBuilder(getClass().getName());
346 str.append(" locations=[");
347 for (int i = 0; i < locations.length; ++i) {
348 if (i > 0) str.append(", ");
349 str.append(locations[i].toString());
350 }
351 str.append("]");
352 return str.toString();
353 }
354
355
356
357
358 public boolean exists(final FileSystem fs) throws IOException {
359 for (int i = 0; i < locations.length; ++i) {
360 if (fs.exists(locations[i])) {
361 return true;
362 }
363 }
364 return false;
365 }
366
367
368
369
370 public Path getAvailablePath(FileSystem fs) throws IOException {
371 for (int i = 0; i < locations.length; ++i) {
372 if (fs.exists(locations[i])) {
373 return locations[i];
374 }
375 }
376 throw new FileNotFoundException("Unable to open link: " + this);
377 }
378
379
380
381
382
383
384
385
386 public FileStatus getFileStatus(FileSystem fs) throws IOException {
387 for (int i = 0; i < locations.length; ++i) {
388 try {
389 return fs.getFileStatus(locations[i]);
390 } catch (FileNotFoundException e) {
391
392 }
393 }
394 throw new FileNotFoundException("Unable to open link: " + this);
395 }
396
397
398
399
400
401
402
403
404
405
406
407 public FSDataInputStream open(final FileSystem fs) throws IOException {
408 return new FSDataInputStream(new FileLinkInputStream(fs, this));
409 }
410
411
412
413
414
415
416
417
418
419
420
421
422 public FSDataInputStream open(final FileSystem fs, int bufferSize) throws IOException {
423 return new FSDataInputStream(new FileLinkInputStream(fs, this, bufferSize));
424 }
425
426
427
428
429
430 protected void setLocations(Path originPath, Path... alternativePaths) {
431 assert this.locations == null : "Link locations already set";
432
433 List<Path> paths = new ArrayList<Path>(alternativePaths.length +1);
434 if (originPath != null) {
435 paths.add(originPath);
436 }
437
438 for (int i = 0; i < alternativePaths.length; i++) {
439 if (alternativePaths[i] != null) {
440 paths.add(alternativePaths[i]);
441 }
442 }
443 this.locations = paths.toArray(new Path[0]);
444 }
445
446
447
448
449
450
451
452
453
454
455
456 public static Path getBackReferencesDir(final Path storeDir, final String fileName) {
457 return new Path(storeDir, BACK_REFERENCES_DIRECTORY_PREFIX + fileName);
458 }
459
460
461
462
463
464
465
466 public static String getBackReferenceFileName(final Path dirPath) {
467 return dirPath.getName().substring(BACK_REFERENCES_DIRECTORY_PREFIX.length());
468 }
469
470
471
472
473
474
475
476 public static boolean isBackReferencesDir(final Path dirPath) {
477 if (dirPath == null) return false;
478 return dirPath.getName().startsWith(BACK_REFERENCES_DIRECTORY_PREFIX);
479 }
480
481 @Override
482 public boolean equals(Object obj) {
483 if (obj == null) {
484 return false;
485 }
486
487
488
489 if (this.getClass().equals(obj.getClass())) {
490 return Arrays.equals(this.locations, ((FileLink) obj).locations);
491 }
492
493 return false;
494 }
495
496 @Override
497 public int hashCode() {
498 return Arrays.hashCode(locations);
499 }
500 }
501