1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import java.io.IOException;
22 import java.net.ConnectException;
23 import java.net.SocketTimeoutException;
24 import java.util.List;
25
26 import org.apache.commons.lang.StringUtils;
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.HBaseConfiguration;
32 import org.apache.hadoop.hbase.HConstants;
33 import org.apache.hadoop.hbase.TableNotFoundException;
34 import org.apache.hadoop.hbase.classification.InterfaceAudience;
35 import org.apache.hadoop.hbase.client.HConnection;
36 import org.apache.hadoop.hbase.client.HConnectionManager;
37 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
38 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
39 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
40 import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
41 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
42 import org.apache.hadoop.hbase.util.FSUtils;
43 import org.apache.hadoop.hbase.wal.WAL.Entry;
44 import org.apache.hadoop.ipc.RemoteException;
45
46
47
48
49
50
51
52
53
54
55
56 @InterfaceAudience.Private
57 public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint {
58
59 private static final Log LOG = LogFactory.getLog(HBaseInterClusterReplicationEndpoint.class);
60 private HConnection conn;
61
62 private Configuration conf;
63
64
65 private long sleepForRetries;
66
67
68 private int maxRetriesMultiplier;
69
70 private int socketTimeoutMultiplier;
71
72 private MetricsSource metrics;
73
74 private ReplicationSinkManager replicationSinkMgr;
75 private boolean peersSelected = false;
76 private String replicationClusterId = "";
77 private Path baseNamespaceDir;
78 private Path hfileArchiveDir;
79 private boolean replicationBulkLoadDataEnabled;
80
81 @Override
82 public void init(Context context) throws IOException {
83 super.init(context);
84 this.conf = HBaseConfiguration.create(ctx.getConfiguration());
85 decorateConf();
86 this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
87 this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
88 maxRetriesMultiplier);
89
90
91
92 this.conn = HConnectionManager.createConnection(this.conf);
93 this.sleepForRetries =
94 this.conf.getLong("replication.source.sleepforretries", 1000);
95 this.metrics = context.getMetrics();
96
97 this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf);
98 this.replicationBulkLoadDataEnabled =
99 conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
100 HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
101 if (this.replicationBulkLoadDataEnabled) {
102 replicationClusterId = this.conf.get(HConstants.REPLICATION_CLUSTER_ID);
103 }
104
105 Path rootDir = FSUtils.getRootDir(conf);
106 Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR);
107 baseNamespaceDir = new Path(rootDir, baseNSDir);
108 hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir));
109 }
110
111 private void decorateConf() {
112 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
113 if (StringUtils.isNotEmpty(replicationCodec)) {
114 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
115 }
116 }
117
118 private void connectToPeers() {
119 getRegionServers();
120
121 int sleepMultiplier = 1;
122
123
124 while (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) {
125 replicationSinkMgr.chooseSinks();
126 if (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) {
127 if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
128 sleepMultiplier++;
129 }
130 }
131 }
132 }
133
134
135
136
137
138
139
140 protected boolean sleepForRetries(String msg, int sleepMultiplier) {
141 try {
142 if (LOG.isTraceEnabled()) {
143 LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
144 }
145 Thread.sleep(this.sleepForRetries * sleepMultiplier);
146 } catch (InterruptedException e) {
147 LOG.debug("Interrupted while sleeping between retries");
148 }
149 return sleepMultiplier < maxRetriesMultiplier;
150 }
151
152
153
154
155 @Override
156 public boolean replicate(ReplicateContext replicateContext) {
157 List<Entry> entries = replicateContext.getEntries();
158 int sleepMultiplier = 1;
159 while (this.isRunning()) {
160 if (!peersSelected) {
161 connectToPeers();
162 peersSelected = true;
163 }
164
165 if (!isPeerEnabled()) {
166 if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
167 sleepMultiplier++;
168 }
169 continue;
170 }
171 SinkPeer sinkPeer = null;
172 try {
173 sinkPeer = replicationSinkMgr.getReplicationSink();
174 BlockingInterface rrs = sinkPeer.getRegionServer();
175 if (LOG.isTraceEnabled()) {
176 LOG.trace("Replicating " + entries.size() +
177 " entries of total size " + replicateContext.getSize());
178 }
179 ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]),
180 replicationClusterId, baseNamespaceDir, hfileArchiveDir);
181
182
183 this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
184 replicationSinkMgr.reportSinkSuccess(sinkPeer);
185 return true;
186
187 } catch (IOException ioe) {
188
189 this.metrics.refreshAgeOfLastShippedOp();
190 if (ioe instanceof RemoteException) {
191 ioe = ((RemoteException) ioe).unwrapRemoteException();
192 LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
193 if (ioe instanceof TableNotFoundException) {
194 if (sleepForRetries("A table is missing in the peer cluster. "
195 + "Replication cannot proceed without losing data.", sleepMultiplier)) {
196 sleepMultiplier++;
197 }
198 }
199 } else {
200 if (ioe instanceof SocketTimeoutException) {
201
202
203
204 sleepForRetries("Encountered a SocketTimeoutException. Since the " +
205 "call to the remote cluster timed out, which is usually " +
206 "caused by a machine failure or a massive slowdown",
207 this.socketTimeoutMultiplier);
208 } else if (ioe instanceof ConnectException) {
209 LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
210 replicationSinkMgr.chooseSinks();
211 } else {
212 LOG.warn("Can't replicate because of a local or network error: ", ioe);
213 }
214 }
215
216 if (sinkPeer != null) {
217 replicationSinkMgr.reportBadSink(sinkPeer);
218 }
219 if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
220 sleepMultiplier++;
221 }
222 }
223 }
224 return false;
225 }
226
227 protected boolean isPeerEnabled() {
228 return ctx.getReplicationPeer().getPeerState() == PeerState.ENABLED;
229 }
230
231 @Override
232 protected void doStop() {
233 disconnect();
234 if (this.conn != null) {
235 try {
236 this.conn.close();
237 this.conn = null;
238 } catch (IOException e) {
239 LOG.warn("Failed to close the connection");
240 }
241 }
242 notifyStopped();
243 }
244 }