1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mob.compactions;
20
21 import java.io.FileNotFoundException;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.Collections;
26 import java.util.Date;
27 import java.util.HashMap;
28 import java.util.Iterator;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Map.Entry;
32 import java.util.concurrent.Callable;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Future;
35
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.hadoop.hbase.classification.InterfaceAudience;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.fs.FileStatus;
41 import org.apache.hadoop.fs.FileSystem;
42 import org.apache.hadoop.fs.Path;
43 import org.apache.hadoop.hbase.Cell;
44 import org.apache.hadoop.hbase.HColumnDescriptor;
45 import org.apache.hadoop.hbase.HConstants;
46 import org.apache.hadoop.hbase.KeyValue;
47 import org.apache.hadoop.hbase.TableName;
48 import org.apache.hadoop.hbase.Tag;
49 import org.apache.hadoop.hbase.TagType;
50 import org.apache.hadoop.hbase.client.Connection;
51 import org.apache.hadoop.hbase.client.ConnectionFactory;
52 import org.apache.hadoop.hbase.client.HTable;
53 import org.apache.hadoop.hbase.client.Scan;
54 import org.apache.hadoop.hbase.client.Table;
55 import org.apache.hadoop.hbase.io.HFileLink;
56 import org.apache.hadoop.hbase.io.crypto.Encryption;
57 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
58 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
59 import org.apache.hadoop.hbase.mob.MobConstants;
60 import org.apache.hadoop.hbase.mob.MobFileName;
61 import org.apache.hadoop.hbase.mob.MobUtils;
62 import org.apache.hadoop.hbase.mob.compactions.MobCompactionRequest.CompactionType;
63 import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartition;
64 import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId;
65 import org.apache.hadoop.hbase.regionserver.BloomType;
66 import org.apache.hadoop.hbase.regionserver.HStore;
67 import org.apache.hadoop.hbase.regionserver.ScanInfo;
68 import org.apache.hadoop.hbase.regionserver.ScanType;
69 import org.apache.hadoop.hbase.regionserver.ScannerContext;
70 import org.apache.hadoop.hbase.regionserver.StoreFile;
71 import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
72 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
73 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
74 import org.apache.hadoop.hbase.regionserver.StoreScanner;
75 import org.apache.hadoop.hbase.util.Bytes;
76 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
77 import org.apache.hadoop.hbase.util.Pair;
78
79
80
81
82 @InterfaceAudience.Private
83 public class PartitionedMobCompactor extends MobCompactor {
84
85 private static final Log LOG = LogFactory.getLog(PartitionedMobCompactor.class);
86 protected long mergeableSize;
87 protected int delFileMaxCount;
88
89 protected int compactionBatchSize;
90 protected int compactionKVMax;
91
92 private Path tempPath;
93 private Path bulkloadPath;
94 private CacheConfig compactionCacheConfig;
95 private final List<Tag> refCellTags;
96 private Encryption.Context cryptoContext = Encryption.Context.NONE;
97
98 public PartitionedMobCompactor(Configuration conf, FileSystem fs, TableName tableName,
99 HColumnDescriptor column, ExecutorService pool) throws IOException {
100 super(conf, fs, tableName, column, pool);
101 mergeableSize = conf.getLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD,
102 MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD);
103 delFileMaxCount = conf.getInt(MobConstants.MOB_DELFILE_MAX_COUNT,
104 MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
105
106 compactionBatchSize = conf.getInt(MobConstants.MOB_COMPACTION_BATCH_SIZE,
107 MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE);
108 tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME);
109 bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path(
110 tableName.getNamespaceAsString(), tableName.getQualifierAsString())));
111 compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX,
112 HConstants.COMPACTION_KV_MAX_DEFAULT);
113 Configuration copyOfConf = new Configuration(conf);
114 copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
115 compactionCacheConfig = new CacheConfig(copyOfConf);
116 refCellTags = new ArrayList<>(2);
117 refCellTags.add(MobConstants.MOB_REF_TAG);
118 Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, tableName.getName());
119 refCellTags.add(tableNameTag);
120 cryptoContext = MobUtils.createEncryptionContext(copyOfConf, column);
121 }
122
123 @Override
124 public List<Path> compact(List<FileStatus> files, boolean allFiles) throws IOException {
125 if (files == null || files.isEmpty()) {
126 LOG.info("No candidate mob files");
127 return null;
128 }
129 LOG.info("is allFiles: " + allFiles);
130
131 PartitionedMobCompactionRequest request = select(files, allFiles);
132
133 return performCompaction(request);
134 }
135
136
137
138
139
140
141
142
143
144 protected PartitionedMobCompactionRequest select(List<FileStatus> candidates,
145 boolean allFiles) throws IOException {
146 Collection<FileStatus> allDelFiles = new ArrayList<FileStatus>();
147 Map<CompactionPartitionId, CompactionPartition> filesToCompact =
148 new HashMap<CompactionPartitionId, CompactionPartition>();
149 final CompactionPartitionId id = new CompactionPartitionId();
150 int selectedFileCount = 0;
151 int irrelevantFileCount = 0;
152 for (FileStatus file : candidates) {
153 if (!file.isFile()) {
154 irrelevantFileCount++;
155 continue;
156 }
157
158 FileStatus linkedFile = file;
159 if (HFileLink.isHFileLink(file.getPath())) {
160 HFileLink link = HFileLink.buildFromHFileLinkPattern(conf, file.getPath());
161 linkedFile = getLinkedFileStatus(link);
162 if (linkedFile == null) {
163
164 irrelevantFileCount++;
165 continue;
166 }
167 }
168 if (StoreFileInfo.isDelFile(linkedFile.getPath())) {
169 allDelFiles.add(file);
170 } else if (allFiles || (linkedFile.getLen() < mergeableSize)) {
171
172
173 String fileName = linkedFile.getPath().getName();
174 id.setStartKey(MobFileName.getStartKeyFromName(fileName));
175 id.setDate(MobFileName.getDateFromName(fileName));
176 CompactionPartition compactionPartition = filesToCompact.get(id);
177 if (compactionPartition == null) {
178 CompactionPartitionId newId = new CompactionPartitionId(id.getStartKey(), id.getDate());
179 compactionPartition = new CompactionPartition(newId);
180
181 compactionPartition.addFile(file);
182 filesToCompact.put(newId, compactionPartition);
183 } else {
184 compactionPartition.addFile(file);
185 }
186 selectedFileCount++;
187 }
188 }
189
190
191
192
193
194
195 if (!allFiles && (allDelFiles.size() > 0)) {
196 Iterator<Map.Entry<CompactionPartitionId, CompactionPartition>> it =
197 filesToCompact.entrySet().iterator();
198
199 while(it.hasNext()) {
200 Map.Entry<CompactionPartitionId, CompactionPartition> entry = it.next();
201 if (entry.getValue().getFileCount() == 1) {
202 it.remove();
203 --selectedFileCount;
204 }
205 }
206 }
207
208 PartitionedMobCompactionRequest request = new PartitionedMobCompactionRequest(
209 filesToCompact.values(), allDelFiles);
210 if (candidates.size() == (allDelFiles.size() + selectedFileCount + irrelevantFileCount)) {
211
212 request.setCompactionType(CompactionType.ALL_FILES);
213 }
214 LOG.info("The compaction type is " + request.getCompactionType() + ", the request has "
215 + allDelFiles.size() + " del files, " + selectedFileCount + " selected files, and "
216 + irrelevantFileCount + " irrelevant files");
217 return request;
218 }
219
220
221
222
223
224
225
226
227
228
229
230
231 protected List<Path> performCompaction(PartitionedMobCompactionRequest request)
232 throws IOException {
233
234 List<Path> delFilePaths = new ArrayList<Path>();
235 for (FileStatus delFile : request.delFiles) {
236 delFilePaths.add(delFile.getPath());
237 }
238 List<Path> newDelPaths = compactDelFiles(request, delFilePaths);
239 List<StoreFile> newDelFiles = new ArrayList<StoreFile>();
240 List<Path> paths = null;
241 try {
242 for (Path newDelPath : newDelPaths) {
243 StoreFile sf = new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE);
244
245
246 sf.createReader();
247 newDelFiles.add(sf);
248 }
249 LOG.info("After merging, there are " + newDelFiles.size() + " del files");
250
251 paths = compactMobFiles(request, newDelFiles);
252 LOG.info("After compaction, there are " + paths.size() + " mob files");
253 } finally {
254 closeStoreFileReaders(newDelFiles);
255 }
256
257 if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) {
258 LOG.info(
259 "After a mob compaction with all files selected, archiving the del files " + newDelPaths);
260 try {
261 MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), newDelFiles);
262 } catch (IOException e) {
263 LOG.error("Failed to archive the del files " + newDelPaths, e);
264 }
265 }
266 return paths;
267 }
268
269
270
271
272
273
274
275
276 protected List<Path> compactMobFiles(final PartitionedMobCompactionRequest request,
277 final List<StoreFile> delFiles) throws IOException {
278 Collection<CompactionPartition> partitions = request.compactionPartitions;
279 if (partitions == null || partitions.isEmpty()) {
280 LOG.info("No partitions of mob files");
281 return Collections.emptyList();
282 }
283 List<Path> paths = new ArrayList<Path>();
284 Connection c = ConnectionFactory.createConnection(conf);
285 final Table table = c.getTable(tableName);
286 try {
287 Map<CompactionPartitionId, Future<List<Path>>> results =
288 new HashMap<CompactionPartitionId, Future<List<Path>>>();
289
290 for (final CompactionPartition partition : partitions) {
291 results.put(partition.getPartitionId(), pool.submit(new Callable<List<Path>>() {
292 @Override
293 public List<Path> call() throws Exception {
294 LOG.info("Compacting mob files for partition " + partition.getPartitionId());
295 return compactMobFilePartition(request, partition, delFiles, table);
296 }
297 }));
298 }
299
300 List<CompactionPartitionId> failedPartitions = new ArrayList<CompactionPartitionId>();
301 for (Entry<CompactionPartitionId, Future<List<Path>>> result : results.entrySet()) {
302 try {
303 paths.addAll(result.getValue().get());
304 } catch (Exception e) {
305
306 LOG.error("Failed to compact the partition " + result.getKey(), e);
307 failedPartitions.add(result.getKey());
308 }
309 }
310 if (!failedPartitions.isEmpty()) {
311
312 throw new IOException("Failed to compact the partitions " + failedPartitions);
313 }
314 } finally {
315 try {
316 table.close();
317 } catch (IOException e) {
318 LOG.error("Failed to close the HTable", e);
319 }
320 }
321 return paths;
322 }
323
324
325
326
327
328
329
330
331
332
333 private List<Path> compactMobFilePartition(PartitionedMobCompactionRequest request,
334 CompactionPartition partition, List<StoreFile> delFiles, Table table) throws IOException {
335 if (MobUtils.isMobFileExpired(column, EnvironmentEdgeManager.currentTime(),
336 partition.getPartitionId().getDate())) {
337
338
339 return Collections.emptyList();
340 }
341 List<Path> newFiles = new ArrayList<Path>();
342 List<FileStatus> files = partition.listFiles();
343 int offset = 0;
344 Path bulkloadPathOfPartition = new Path(bulkloadPath, partition.getPartitionId().toString());
345 Path bulkloadColumnPath = new Path(bulkloadPathOfPartition, column.getNameAsString());
346 while (offset < files.size()) {
347 int batch = compactionBatchSize;
348 if (files.size() - offset < compactionBatchSize) {
349 batch = files.size() - offset;
350 }
351 if (batch == 1 && delFiles.isEmpty()) {
352
353
354 newFiles.add(files.get(offset).getPath());
355 offset++;
356 continue;
357 }
358
359 fs.delete(bulkloadPathOfPartition, true);
360
361 List<StoreFile> filesToCompact = new ArrayList<StoreFile>();
362 for (int i = offset; i < batch + offset; i++) {
363 StoreFile sf = new StoreFile(fs, files.get(i).getPath(), conf, compactionCacheConfig,
364 BloomType.NONE);
365 filesToCompact.add(sf);
366 }
367 filesToCompact.addAll(delFiles);
368
369 compactMobFilesInBatch(request, partition, table, filesToCompact, batch,
370 bulkloadPathOfPartition, bulkloadColumnPath, newFiles);
371
372 offset += batch;
373 }
374 LOG.info("Compaction is finished. The number of mob files is changed from " + files.size()
375 + " to " + newFiles.size());
376 return newFiles;
377 }
378
379
380
381
382
383 private void closeStoreFileReaders(List<StoreFile> storeFiles) {
384 for (StoreFile storeFile : storeFiles) {
385 try {
386 storeFile.closeReader(true);
387 } catch (IOException e) {
388 LOG.warn("Failed to close the reader on store file " + storeFile.getPath(), e);
389 }
390 }
391 }
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407 private void compactMobFilesInBatch(PartitionedMobCompactionRequest request,
408 CompactionPartition partition, Table table, List<StoreFile> filesToCompact, int batch,
409 Path bulkloadPathOfPartition, Path bulkloadColumnPath, List<Path> newFiles)
410 throws IOException {
411
412 StoreScanner scanner = createScanner(filesToCompact, ScanType.COMPACT_DROP_DELETES);
413
414 List<StoreFile> mobFilesToCompact = filesToCompact.subList(0, batch);
415
416 Pair<Long, Long> fileInfo = getFileInfo(mobFilesToCompact);
417
418 Writer writer = null;
419 Writer refFileWriter = null;
420 Path filePath = null;
421 long mobCells = 0;
422 boolean cleanupTmpMobFile = false;
423 boolean cleanupBulkloadDirOfPartition = false;
424 boolean cleanupCommittedMobFile = false;
425 boolean closeReaders= true;
426
427 try {
428 try {
429 writer = MobUtils
430 .createWriter(conf, fs, column, partition.getPartitionId().getDate(), tempPath,
431 Long.MAX_VALUE, column.getCompactionCompressionType(),
432 partition.getPartitionId().getStartKey(), compactionCacheConfig, cryptoContext);
433 cleanupTmpMobFile = true;
434 filePath = writer.getPath();
435 byte[] fileName = Bytes.toBytes(filePath.getName());
436
437 refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath,
438 fileInfo.getSecond().longValue(), compactionCacheConfig, cryptoContext);
439 cleanupBulkloadDirOfPartition = true;
440 List<Cell> cells = new ArrayList<>();
441 boolean hasMore;
442 ScannerContext scannerContext =
443 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
444 do {
445 hasMore = scanner.next(cells, scannerContext);
446 for (Cell cell : cells) {
447
448 writer.append(cell);
449
450 Cell reference = MobUtils.createMobRefCell(cell, fileName, this.refCellTags);
451 refFileWriter.append(reference);
452 mobCells++;
453 }
454 cells.clear();
455 } while (hasMore);
456 } finally {
457
458 scanner.close();
459
460 if (cleanupTmpMobFile) {
461
462 closeMobFileWriter(writer, fileInfo.getFirst(), mobCells);
463 }
464
465 if (cleanupBulkloadDirOfPartition) {
466
467 closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime);
468 }
469 }
470
471 if (mobCells > 0) {
472
473 MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
474 cleanupTmpMobFile = false;
475 cleanupCommittedMobFile = true;
476
477 bulkloadRefFile(table, bulkloadPathOfPartition, filePath.getName());
478 cleanupCommittedMobFile = false;
479 newFiles.add(new Path(mobFamilyDir, filePath.getName()));
480 }
481
482
483 try {
484 closeStoreFileReaders(mobFilesToCompact);
485 closeReaders = false;
486 MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact);
487 } catch (IOException e) {
488 LOG.error("Failed to archive the files " + mobFilesToCompact, e);
489 }
490 } finally {
491 if (closeReaders) {
492 closeStoreFileReaders(mobFilesToCompact);
493 }
494
495 if (cleanupTmpMobFile) {
496 deletePath(filePath);
497 }
498
499 if (cleanupBulkloadDirOfPartition) {
500
501 deletePath(bulkloadPathOfPartition);
502 }
503
504 if (cleanupCommittedMobFile) {
505 deletePath(new Path(mobFamilyDir, filePath.getName()));
506 }
507 }
508 }
509
510
511
512
513
514
515
516
517
518 protected List<Path> compactDelFiles(PartitionedMobCompactionRequest request,
519 List<Path> delFilePaths) throws IOException {
520 if (delFilePaths.size() <= delFileMaxCount) {
521 return delFilePaths;
522 }
523
524 int offset = 0;
525 List<Path> paths = new ArrayList<Path>();
526 while (offset < delFilePaths.size()) {
527
528 int batch = compactionBatchSize;
529 if (delFilePaths.size() - offset < compactionBatchSize) {
530 batch = delFilePaths.size() - offset;
531 }
532 List<StoreFile> batchedDelFiles = new ArrayList<StoreFile>();
533 if (batch == 1) {
534
535 paths.add(delFilePaths.get(offset));
536 offset++;
537 continue;
538 }
539 for (int i = offset; i < batch + offset; i++) {
540 batchedDelFiles.add(new StoreFile(fs, delFilePaths.get(i), conf, compactionCacheConfig,
541 BloomType.NONE));
542 }
543
544 paths.add(compactDelFilesInBatch(request, batchedDelFiles));
545
546 offset += batch;
547 }
548 return compactDelFiles(request, paths);
549 }
550
551
552
553
554
555
556
557
558 private Path compactDelFilesInBatch(PartitionedMobCompactionRequest request,
559 List<StoreFile> delFiles) throws IOException {
560
561 StoreScanner scanner = createScanner(delFiles, ScanType.COMPACT_RETAIN_DELETES);
562 Writer writer = null;
563 Path filePath = null;
564 try {
565 writer = MobUtils.createDelFileWriter(conf, fs, column,
566 MobUtils.formatDate(new Date(request.selectionTime)), tempPath, Long.MAX_VALUE,
567 column.getCompactionCompression(), HConstants.EMPTY_START_ROW, compactionCacheConfig,
568 cryptoContext);
569 filePath = writer.getPath();
570 List<Cell> cells = new ArrayList<Cell>();
571 boolean hasMore = false;
572 ScannerContext scannerContext =
573 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
574 do {
575 hasMore = scanner.next(cells, scannerContext);
576 for (Cell cell : cells) {
577 writer.append(cell);
578 }
579 cells.clear();
580 } while (hasMore);
581 } finally {
582 scanner.close();
583 if (writer != null) {
584 try {
585 writer.close();
586 } catch (IOException e) {
587 LOG.error("Failed to close the writer of the file " + filePath, e);
588 }
589 }
590 }
591
592 Path path = MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
593
594 try {
595 MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), delFiles);
596 } catch (IOException e) {
597 LOG.error("Failed to archive the old del files " + delFiles, e);
598 }
599 return path;
600 }
601
602
603
604
605
606
607
608
609 private StoreScanner createScanner(List<StoreFile> filesToCompact, ScanType scanType)
610 throws IOException {
611 List scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, true, false,
612 null, HConstants.LATEST_TIMESTAMP);
613 Scan scan = new Scan();
614 scan.setMaxVersions(column.getMaxVersions());
615 long ttl = HStore.determineTTLFromFamily(column);
616 ScanInfo scanInfo = new ScanInfo(column, ttl, 0, KeyValue.COMPARATOR);
617 StoreScanner scanner = new StoreScanner(scan, scanInfo, scanType, null, scanners, 0L,
618 HConstants.LATEST_TIMESTAMP);
619 return scanner;
620 }
621
622
623
624
625
626
627
628
629 private void bulkloadRefFile(Table table, Path bulkloadDirectory, String fileName)
630 throws IOException {
631
632 try {
633 LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
634 bulkload.doBulkLoad(bulkloadDirectory, (HTable)table);
635 } catch (Exception e) {
636 throw new IOException(e);
637 }
638 }
639
640
641
642
643
644
645
646
647 private void closeMobFileWriter(Writer writer, long maxSeqId, long mobCellsCount)
648 throws IOException {
649 if (writer != null) {
650 writer.appendMetadata(maxSeqId, false, mobCellsCount);
651 try {
652 writer.close();
653 } catch (IOException e) {
654 LOG.error("Failed to close the writer of the file " + writer.getPath(), e);
655 }
656 }
657 }
658
659
660
661
662
663
664
665
666 private void closeRefFileWriter(Writer writer, long maxSeqId, long bulkloadTime)
667 throws IOException {
668 if (writer != null) {
669 writer.appendMetadata(maxSeqId, false);
670 writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(bulkloadTime));
671 writer.appendFileInfo(StoreFile.SKIP_RESET_SEQ_ID, Bytes.toBytes(true));
672 try {
673 writer.close();
674 } catch (IOException e) {
675 LOG.error("Failed to close the writer of the ref file " + writer.getPath(), e);
676 }
677 }
678 }
679
680
681
682
683
684
685
686 private Pair<Long, Long> getFileInfo(List<StoreFile> storeFiles) throws IOException {
687 long maxSeqId = 0;
688 long maxKeyCount = 0;
689 for (StoreFile sf : storeFiles) {
690
691 maxSeqId = Math.max(maxSeqId, sf.getMaxSequenceId());
692 byte[] count = sf.createReader().loadFileInfo().get(StoreFile.MOB_CELLS_COUNT);
693 if (count != null) {
694 maxKeyCount += Bytes.toLong(count);
695 }
696 }
697 return new Pair<Long, Long>(Long.valueOf(maxSeqId), Long.valueOf(maxKeyCount));
698 }
699
700
701
702
703
704 private void deletePath(Path path) {
705 try {
706 if (path != null) {
707 fs.delete(path, true);
708 }
709 } catch (IOException e) {
710 LOG.error("Failed to delete the file " + path, e);
711 }
712 }
713
714 private FileStatus getLinkedFileStatus(HFileLink link) throws IOException {
715 Path[] locations = link.getLocations();
716 for (Path location : locations) {
717 FileStatus file = getFileStatus(location);
718 if (file != null) {
719 return file;
720 }
721 }
722 return null;
723 }
724
725 private FileStatus getFileStatus(Path path) throws IOException {
726 try {
727 if (path != null) {
728 FileStatus file = fs.getFileStatus(path);
729 return file;
730 }
731 } catch (FileNotFoundException e) {
732 LOG.warn("The file " + path + " can not be found", e);
733 }
734 return null;
735 }
736 }