View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional information regarding
4    * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
7    * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
8    * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
9    * for the specific language governing permissions and limitations under the License.
10   */
11  package org.apache.hadoop.hbase.replication.regionserver;
12  
13  import java.io.File;
14  import java.io.IOException;
15  import java.net.URL;
16  import java.util.HashMap;
17  import java.util.Map;
18  
19  import org.apache.commons.logging.Log;
20  import org.apache.commons.logging.LogFactory;
21  import org.apache.hadoop.conf.Configuration;
22  import org.apache.hadoop.fs.FileUtil;
23  import org.apache.hadoop.fs.Path;
24  import org.apache.hadoop.hbase.HBaseConfiguration;
25  import org.apache.hadoop.hbase.HConstants;
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  
28  /**
29   * This will load all the xml configuration files for the source cluster replication ID from
30   * user configured replication configuration directory.
31   */
32  @InterfaceAudience.Private
33  public class DefaultSourceFSConfigurationProvider implements SourceFSConfigurationProvider {
34    private static final Log LOG = LogFactory.getLog(DefaultSourceFSConfigurationProvider.class);
35    // Map containing all the source clusters configurations against their replication cluster id
36    private Map<String, Configuration> sourceClustersConfs = new HashMap<>();
37    private static final String XML = ".xml";
38  
39    @Override
40    public Configuration getConf(Configuration sinkConf, String replicationClusterId)
41        throws IOException {
42      if (sourceClustersConfs.get(replicationClusterId) == null) {
43        synchronized (this.sourceClustersConfs) {
44          if (sourceClustersConfs.get(replicationClusterId) == null) {
45            LOG.info("Loading source cluster FS client conf for cluster " + replicationClusterId);
46            // Load only user provided client configurations.
47            Configuration sourceClusterConf = new Configuration(false);
48  
49            String replicationConfDir = sinkConf.get(HConstants.REPLICATION_CONF_DIR);
50            if (replicationConfDir == null) {
51              LOG.debug(HConstants.REPLICATION_CONF_DIR + " is not configured.");
52              URL resource = HBaseConfiguration.class.getClassLoader().getResource("hbase-site.xml");
53              if (resource != null) {
54                String path = resource.getPath();
55                replicationConfDir = path.substring(0, path.lastIndexOf("/"));
56              } else {
57                replicationConfDir = System.getenv("HBASE_CONF_DIR");
58              }
59            }
60  
61            LOG.info("Loading source cluster " + replicationClusterId
62                + " file system configurations from xml files under directory " + replicationConfDir);
63            File confDir = new File(replicationConfDir, replicationClusterId);
64            String[] listofConfFiles = FileUtil.list(confDir);
65            for (String confFile : listofConfFiles) {
66              if (new File(confDir, confFile).isFile() && confFile.endsWith(XML)) {
67                // Add all the user provided client conf files
68                sourceClusterConf.addResource(new Path(confDir.getPath(), confFile));
69              }
70            }
71            this.sourceClustersConfs.put(replicationClusterId, sourceClusterConf);
72          }
73        }
74      }
75      return this.sourceClustersConfs.get(replicationClusterId);
76    }
77  
78  }