View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import java.io.IOException;
22  import java.net.ConnectException;
23  import java.net.SocketTimeoutException;
24  import java.util.List;
25  
26  import org.apache.commons.lang.StringUtils;
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.hbase.HBaseConfiguration;
32  import org.apache.hadoop.hbase.HConstants;
33  import org.apache.hadoop.hbase.TableNotFoundException;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.client.HConnection;
36  import org.apache.hadoop.hbase.client.HConnectionManager;
37  import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
38  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
39  import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
40  import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
41  import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
42  import org.apache.hadoop.hbase.util.FSUtils;
43  import org.apache.hadoop.hbase.wal.WAL.Entry;
44  import org.apache.hadoop.ipc.RemoteException;
45  
46  /**
47   * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} 
48   * implementation for replicating to another HBase cluster.
49   * For the slave cluster it selects a random number of peers
50   * using a replication ratio. For example, if replication ration = 0.1
51   * and slave cluster has 100 region servers, 10 will be selected.
52   * <p/>
53   * A stream is considered down when we cannot contact a region server on the
54   * peer cluster for more than 55 seconds by default.
55   */
56  @InterfaceAudience.Private
57  public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint {
58  
59    private static final Log LOG = LogFactory.getLog(HBaseInterClusterReplicationEndpoint.class);
60    private HConnection conn;
61  
62    private Configuration conf;
63  
64    // How long should we sleep for each retry
65    private long sleepForRetries;
66  
67    // Maximum number of retries before taking bold actions
68    private int maxRetriesMultiplier;
69    // Socket timeouts require even bolder actions since we don't want to DDOS
70    private int socketTimeoutMultiplier;
71    //Metrics for this source
72    private MetricsSource metrics;
73    // Handles connecting to peer region servers
74    private ReplicationSinkManager replicationSinkMgr;
75    private boolean peersSelected = false;
76    private String replicationClusterId = "";
77    private Path baseNamespaceDir;
78    private Path hfileArchiveDir;
79    private boolean replicationBulkLoadDataEnabled;
80  
81    @Override
82    public void init(Context context) throws IOException {
83      super.init(context);
84      this.conf = HBaseConfiguration.create(ctx.getConfiguration());
85      decorateConf();
86      this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
87      this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
88          maxRetriesMultiplier);
89      // TODO: This connection is replication specific or we should make it particular to
90      // replication and make replication specific settings such as compression or codec to use
91      // passing Cells.
92      this.conn = HConnectionManager.createConnection(this.conf);
93      this.sleepForRetries =
94          this.conf.getLong("replication.source.sleepforretries", 1000);
95      this.metrics = context.getMetrics();
96      // ReplicationQueueInfo parses the peerId out of the znode for us
97      this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf);
98      this.replicationBulkLoadDataEnabled =
99          conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
100           HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
101     if (this.replicationBulkLoadDataEnabled) {
102       replicationClusterId = this.conf.get(HConstants.REPLICATION_CLUSTER_ID);
103     }
104     // Construct base namespace directory and hfile archive directory path
105     Path rootDir = FSUtils.getRootDir(conf);
106     Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR);
107     baseNamespaceDir = new Path(rootDir, baseNSDir);
108     hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir));
109   }
110 
111   private void decorateConf() {
112     String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
113     if (StringUtils.isNotEmpty(replicationCodec)) {
114       this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
115     }
116   }
117 
118   private void connectToPeers() {
119     getRegionServers();
120 
121     int sleepMultiplier = 1;
122 
123     // Connect to peer cluster first, unless we have to stop
124     while (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) {
125       replicationSinkMgr.chooseSinks();
126       if (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) {
127         if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
128           sleepMultiplier++;
129         }
130       }
131     }
132   }
133 
134   /**
135    * Do the sleeping logic
136    * @param msg Why we sleep
137    * @param sleepMultiplier by how many times the default sleeping time is augmented
138    * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
139    */
140   protected boolean sleepForRetries(String msg, int sleepMultiplier) {
141     try {
142       if (LOG.isTraceEnabled()) {
143         LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
144       }
145       Thread.sleep(this.sleepForRetries * sleepMultiplier);
146     } catch (InterruptedException e) {
147       LOG.debug("Interrupted while sleeping between retries");
148     }
149     return sleepMultiplier < maxRetriesMultiplier;
150   }
151 
152   /**
153    * Do the shipping logic
154    */
155   @Override
156   public boolean replicate(ReplicateContext replicateContext) {
157     List<Entry> entries = replicateContext.getEntries();
158     int sleepMultiplier = 1;
159     while (this.isRunning()) {
160       if (!peersSelected) {
161         connectToPeers();
162         peersSelected = true;
163       }
164 
165       if (!isPeerEnabled()) {
166         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
167           sleepMultiplier++;
168         }
169         continue;
170       }
171       SinkPeer sinkPeer = null;
172       try {
173         sinkPeer = replicationSinkMgr.getReplicationSink();
174         BlockingInterface rrs = sinkPeer.getRegionServer();
175         if (LOG.isTraceEnabled()) {
176           LOG.trace("Replicating " + entries.size() +
177               " entries of total size " + replicateContext.getSize());
178         }
179         ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]),
180           replicationClusterId, baseNamespaceDir, hfileArchiveDir);
181 
182         // update metrics
183         this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
184         replicationSinkMgr.reportSinkSuccess(sinkPeer);
185         return true;
186 
187       } catch (IOException ioe) {
188         // Didn't ship anything, but must still age the last time we did
189         this.metrics.refreshAgeOfLastShippedOp();
190         if (ioe instanceof RemoteException) {
191           ioe = ((RemoteException) ioe).unwrapRemoteException();
192           LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
193           if (ioe instanceof TableNotFoundException) {
194             if (sleepForRetries("A table is missing in the peer cluster. "
195                 + "Replication cannot proceed without losing data.", sleepMultiplier)) {
196               sleepMultiplier++;
197             }
198           }
199         } else {
200           if (ioe instanceof SocketTimeoutException) {
201             // This exception means we waited for more than 60s and nothing
202             // happened, the cluster is alive and calling it right away
203             // even for a test just makes things worse.
204             sleepForRetries("Encountered a SocketTimeoutException. Since the " +
205               "call to the remote cluster timed out, which is usually " +
206               "caused by a machine failure or a massive slowdown",
207               this.socketTimeoutMultiplier);
208           } else if (ioe instanceof ConnectException) {
209             LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
210             replicationSinkMgr.chooseSinks();
211           } else {
212             LOG.warn("Can't replicate because of a local or network error: ", ioe);
213           }
214         }
215 
216         if (sinkPeer != null) {
217           replicationSinkMgr.reportBadSink(sinkPeer);
218         }
219         if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
220           sleepMultiplier++;
221         }
222       }
223     }
224     return false; // in case we exited before replicating
225   }
226 
227   protected boolean isPeerEnabled() {
228     return ctx.getReplicationPeer().getPeerState() == PeerState.ENABLED;
229   }
230 
231   @Override
232   protected void doStop() {
233     disconnect(); //don't call super.doStop()
234     if (this.conn != null) {
235       try {
236         this.conn.close();
237         this.conn = null;
238       } catch (IOException e) {
239         LOG.warn("Failed to close the connection");
240       }
241     }
242     notifyStopped();
243   }
244 }