1
2
3
4
5
6
7
8
9
10
11 package org.apache.hadoop.hbase.replication.regionserver;
12
13 import java.io.File;
14 import java.io.IOException;
15 import java.net.URL;
16 import java.util.HashMap;
17 import java.util.Map;
18
19 import org.apache.commons.logging.Log;
20 import org.apache.commons.logging.LogFactory;
21 import org.apache.hadoop.conf.Configuration;
22 import org.apache.hadoop.fs.FileUtil;
23 import org.apache.hadoop.fs.Path;
24 import org.apache.hadoop.hbase.HBaseConfiguration;
25 import org.apache.hadoop.hbase.HConstants;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27
28
29
30
31
32 @InterfaceAudience.Private
33 public class DefaultSourceFSConfigurationProvider implements SourceFSConfigurationProvider {
34 private static final Log LOG = LogFactory.getLog(DefaultSourceFSConfigurationProvider.class);
35
36 private Map<String, Configuration> sourceClustersConfs = new HashMap<>();
37 private static final String XML = ".xml";
38
39 @Override
40 public Configuration getConf(Configuration sinkConf, String replicationClusterId)
41 throws IOException {
42 if (sourceClustersConfs.get(replicationClusterId) == null) {
43 synchronized (this.sourceClustersConfs) {
44 if (sourceClustersConfs.get(replicationClusterId) == null) {
45 LOG.info("Loading source cluster FS client conf for cluster " + replicationClusterId);
46
47 Configuration sourceClusterConf = new Configuration(false);
48
49 String replicationConfDir = sinkConf.get(HConstants.REPLICATION_CONF_DIR);
50 if (replicationConfDir == null) {
51 LOG.debug(HConstants.REPLICATION_CONF_DIR + " is not configured.");
52 URL resource = HBaseConfiguration.class.getClassLoader().getResource("hbase-site.xml");
53 if (resource != null) {
54 String path = resource.getPath();
55 replicationConfDir = path.substring(0, path.lastIndexOf("/"));
56 } else {
57 replicationConfDir = System.getenv("HBASE_CONF_DIR");
58 }
59 }
60
61 LOG.info("Loading source cluster " + replicationClusterId
62 + " file system configurations from xml files under directory " + replicationConfDir);
63 File confDir = new File(replicationConfDir, replicationClusterId);
64 String[] listofConfFiles = FileUtil.list(confDir);
65 for (String confFile : listofConfFiles) {
66 if (new File(confDir, confFile).isFile() && confFile.endsWith(XML)) {
67
68 sourceClusterConf.addResource(new Path(confDir.getPath(), confFile));
69 }
70 }
71 this.sourceClustersConfs.put(replicationClusterId, sourceClusterConf);
72 }
73 }
74 }
75 return this.sourceClustersConfs.get(replicationClusterId);
76 }
77
78 }