1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.replication.regionserver;
21
22 import com.google.common.annotations.VisibleForTesting;
23 import com.google.common.util.concurrent.ThreadFactoryBuilder;
24
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Random;
32 import java.util.SortedMap;
33 import java.util.SortedSet;
34 import java.util.TreeSet;
35 import java.util.UUID;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.CopyOnWriteArrayList;
38 import java.util.concurrent.LinkedBlockingQueue;
39 import java.util.concurrent.RejectedExecutionException;
40 import java.util.concurrent.ThreadPoolExecutor;
41 import java.util.concurrent.TimeUnit;
42
43 import org.apache.commons.logging.Log;
44 import org.apache.commons.logging.LogFactory;
45 import org.apache.hadoop.conf.Configuration;
46 import org.apache.hadoop.fs.FileSystem;
47 import org.apache.hadoop.fs.Path;
48 import org.apache.hadoop.hbase.HConstants;
49 import org.apache.hadoop.hbase.Server;
50 import org.apache.hadoop.hbase.TableDescriptors;
51 import org.apache.hadoop.hbase.TableName;
52 import org.apache.hadoop.hbase.classification.InterfaceAudience;
53 import org.apache.hadoop.hbase.regionserver.HRegionServer;
54 import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
55 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
56 import org.apache.hadoop.hbase.replication.ReplicationException;
57 import org.apache.hadoop.hbase.replication.ReplicationListener;
58 import org.apache.hadoop.hbase.replication.ReplicationPeer;
59 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
60 import org.apache.hadoop.hbase.replication.ReplicationPeers;
61 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
62 import org.apache.hadoop.hbase.replication.ReplicationQueues;
63 import org.apache.hadoop.hbase.replication.ReplicationTracker;
64 import org.apache.hadoop.hbase.util.Pair;
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80 @InterfaceAudience.Private
81 public class ReplicationSourceManager implements ReplicationListener {
82 private static final Log LOG =
83 LogFactory.getLog(ReplicationSourceManager.class);
84
85 private final List<ReplicationSourceInterface> sources;
86
87 private final List<ReplicationSourceInterface> oldsources;
88 private final ReplicationQueues replicationQueues;
89 private final ReplicationTracker replicationTracker;
90 private final ReplicationPeers replicationPeers;
91
92 private final UUID clusterId;
93
94 private final Server server;
95
96 private final Map<String, SortedSet<String>> walsById;
97
98 private final Map<String, SortedSet<String>> walsByIdRecoveredQueues;
99 private final Configuration conf;
100 private final FileSystem fs;
101
102 private Path latestPath;
103
104 private final Path logDir;
105
106 private final Path oldLogDir;
107
108 private final long sleepBeforeFailover;
109
110 private final ThreadPoolExecutor executor;
111
112 private final Random rand;
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127 public ReplicationSourceManager(final ReplicationQueues replicationQueues,
128 final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
129 final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
130 final Path oldLogDir, final UUID clusterId) {
131
132
133 this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
134 this.replicationQueues = replicationQueues;
135 this.replicationPeers = replicationPeers;
136 this.replicationTracker = replicationTracker;
137 this.server = server;
138 this.walsById = new HashMap<String, SortedSet<String>>();
139 this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, SortedSet<String>>();
140 this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
141 this.conf = conf;
142 this.fs = fs;
143 this.logDir = logDir;
144 this.oldLogDir = oldLogDir;
145 this.sleepBeforeFailover =
146 conf.getLong("replication.sleep.before.failover", 30000);
147 this.clusterId = clusterId;
148 this.replicationTracker.registerListener(this);
149 this.replicationPeers.getAllPeerIds();
150
151
152 int nbWorkers = conf.getInt("replication.executor.workers", 1);
153
154
155 this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
156 100, TimeUnit.MILLISECONDS,
157 new LinkedBlockingQueue<Runnable>());
158 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
159 tfb.setNameFormat("ReplicationExecutor-%d");
160 tfb.setDaemon(true);
161 this.executor.setThreadFactory(tfb.build());
162 this.rand = new Random();
163 }
164
165
166
167
168
169
170
171
172
173
174
175
176 public void logPositionAndCleanOldLogs(Path log, String id, long position,
177 boolean queueRecovered, boolean holdLogInZK) {
178 String fileName = log.getName();
179 this.replicationQueues.setLogPosition(id, fileName, position);
180 if (holdLogInZK) {
181 return;
182 }
183 cleanOldLogs(fileName, id, queueRecovered);
184 }
185
186
187
188
189
190
191
192
193 public void cleanOldLogs(String key, String id, boolean queueRecovered) {
194 if (queueRecovered) {
195 SortedSet<String> wals = walsByIdRecoveredQueues.get(id);
196 if (wals != null && !wals.first().equals(key)) {
197 cleanOldLogs(wals, key, id);
198 }
199 } else {
200 synchronized (this.walsById) {
201 SortedSet<String> wals = walsById.get(id);
202 if (!wals.first().equals(key)) {
203 cleanOldLogs(wals, key, id);
204 }
205 }
206 }
207 }
208
209 private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
210 SortedSet<String> walSet = wals.headSet(key);
211 LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
212 for (String wal : walSet) {
213 this.replicationQueues.removeLog(id, wal);
214 }
215 walSet.clear();
216 }
217
218
219
220
221
222 protected void init() throws IOException, ReplicationException {
223 boolean replicationForBulkLoadDataEnabled =
224 conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
225 HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
226 for (String id : this.replicationPeers.getPeerIds()) {
227 addSource(id);
228 if (replicationForBulkLoadDataEnabled) {
229
230
231 this.replicationQueues.addPeerToHFileRefs(id);
232 }
233 }
234 List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
235 if (currentReplicators == null || currentReplicators.size() == 0) {
236 return;
237 }
238 List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
239 LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
240 + otherRegionServers);
241
242
243 for (String rs : currentReplicators) {
244 if (!otherRegionServers.contains(rs)) {
245 transferQueues(rs);
246 }
247 }
248 }
249
250
251
252
253
254
255
256 protected ReplicationSourceInterface addSource(String id) throws IOException,
257 ReplicationException {
258 ReplicationPeerConfig peerConfig
259 = replicationPeers.getReplicationPeerConfig(id);
260 ReplicationPeer peer = replicationPeers.getPeer(id);
261 ReplicationSourceInterface src =
262 getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
263 this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
264 synchronized (this.walsById) {
265 this.sources.add(src);
266 this.walsById.put(id, new TreeSet<String>());
267
268 if (this.latestPath != null) {
269 String name = this.latestPath.getName();
270 this.walsById.get(id).add(name);
271 try {
272 this.replicationQueues.addLog(src.getPeerClusterZnode(), name);
273 } catch (ReplicationException e) {
274 String message =
275 "Cannot add log to queue when creating a new source, queueId="
276 + src.getPeerClusterZnode() + ", filename=" + name;
277 server.stop(message);
278 throw e;
279 }
280 src.enqueueLog(this.latestPath);
281 }
282 }
283 src.startup();
284 return src;
285 }
286
287
288
289
290
291 public void deleteSource(String peerId, boolean closeConnection) {
292 this.replicationQueues.removeQueue(peerId);
293 if (closeConnection) {
294 this.replicationPeers.peerRemoved(peerId);
295 }
296 }
297
298
299
300
301 public void join() {
302 this.executor.shutdown();
303 if (this.sources.size() == 0) {
304 this.replicationQueues.removeAllQueues();
305 }
306 for (ReplicationSourceInterface source : this.sources) {
307 source.terminate("Region server is closing");
308 }
309 }
310
311
312
313
314
315 protected Map<String, SortedSet<String>> getWALs() {
316 return Collections.unmodifiableMap(walsById);
317 }
318
319
320
321
322
323 protected Map<String, SortedSet<String>> getWalsByIdRecoveredQueues() {
324 return Collections.unmodifiableMap(walsByIdRecoveredQueues);
325 }
326
327
328
329
330
331 public List<ReplicationSourceInterface> getSources() {
332 return this.sources;
333 }
334
335
336
337
338
339 public List<ReplicationSourceInterface> getOldSources() {
340 return this.oldsources;
341 }
342
343 @VisibleForTesting
344 List<String> getAllQueues() {
345 return replicationQueues.getAllQueues();
346 }
347
348 void preLogRoll(Path newLog) throws IOException {
349 synchronized (this.walsById) {
350 String name = newLog.getName();
351 for (ReplicationSourceInterface source : this.sources) {
352 try {
353 this.replicationQueues.addLog(source.getPeerClusterZnode(), name);
354 } catch (ReplicationException e) {
355 throw new IOException("Cannot add log to replication queue with id="
356 + source.getPeerClusterZnode() + ", filename=" + name, e);
357 }
358 }
359 for (SortedSet<String> wals : this.walsById.values()) {
360 if (this.sources.isEmpty()) {
361
362
363 wals.clear();
364 }
365 wals.add(name);
366 }
367 }
368
369 this.latestPath = newLog;
370 }
371
372 void postLogRoll(Path newLog) throws IOException {
373
374 for (ReplicationSourceInterface source : this.sources) {
375 source.enqueueLog(newLog);
376 }
377 }
378
379
380
381
382
383
384
385
386
387
388
389 protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
390 final FileSystem fs, final ReplicationSourceManager manager,
391 final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
392 final Server server, final String peerId, final UUID clusterId,
393 final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
394 throws IOException {
395 RegionServerCoprocessorHost rsServerHost = null;
396 TableDescriptors tableDescriptors = null;
397 if (server instanceof HRegionServer) {
398 rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
399 tableDescriptors = ((HRegionServer) server).getTableDescriptors();
400 }
401 ReplicationSourceInterface src;
402 try {
403 @SuppressWarnings("rawtypes")
404 Class c = Class.forName(conf.get("replication.replicationsource.implementation",
405 ReplicationSource.class.getCanonicalName()));
406 src = (ReplicationSourceInterface) c.newInstance();
407 } catch (Exception e) {
408 LOG.warn("Passed replication source implementation throws errors, " +
409 "defaulting to ReplicationSource", e);
410 src = new ReplicationSource();
411 }
412
413 ReplicationEndpoint replicationEndpoint = null;
414 try {
415 String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
416 if (replicationEndpointImpl == null) {
417
418 replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
419 }
420 @SuppressWarnings("rawtypes")
421 Class c = Class.forName(replicationEndpointImpl);
422 replicationEndpoint = (ReplicationEndpoint) c.newInstance();
423 if(rsServerHost != null) {
424 ReplicationEndpoint newReplicationEndPoint = rsServerHost
425 .postCreateReplicationEndPoint(replicationEndpoint);
426 if(newReplicationEndPoint != null) {
427
428 replicationEndpoint = newReplicationEndPoint;
429 }
430 }
431 } catch (Exception e) {
432 LOG.warn("Passed replication endpoint implementation throws errors", e);
433 throw new IOException(e);
434 }
435
436 MetricsSource metrics = new MetricsSource(peerId);
437
438 src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId,
439 clusterId, replicationEndpoint, metrics);
440
441
442 replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
443 fs, peerConfig, peerId, clusterId, replicationPeer, metrics, tableDescriptors));
444
445 return src;
446 }
447
448
449
450
451
452
453
454
455
456 private void transferQueues(String rsZnode) {
457 NodeFailoverWorker transfer =
458 new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
459 this.clusterId);
460 try {
461 this.executor.execute(transfer);
462 } catch (RejectedExecutionException ex) {
463 LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
464 }
465 }
466
467
468
469
470
471 public void closeRecoveredQueue(ReplicationSourceInterface src) {
472 LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
473 this.oldsources.remove(src);
474 deleteSource(src.getPeerClusterZnode(), false);
475 this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
476 }
477
478
479
480
481
482
483 public void removePeer(String id) {
484 LOG.info("Closing the following queue " + id + ", currently have "
485 + sources.size() + " and another "
486 + oldsources.size() + " that were recovered");
487 String terminateMessage = "Replication stream was removed by a user";
488 List<ReplicationSourceInterface> oldSourcesToDelete =
489 new ArrayList<ReplicationSourceInterface>();
490
491
492 synchronized (oldsources) {
493
494 for (ReplicationSourceInterface src : oldsources) {
495 if (id.equals(src.getPeerClusterId())) {
496 oldSourcesToDelete.add(src);
497 }
498 }
499 for (ReplicationSourceInterface src : oldSourcesToDelete) {
500 src.terminate(terminateMessage);
501 closeRecoveredQueue(src);
502 }
503 }
504 LOG.info("Number of deleted recovered sources for " + id + ": "
505 + oldSourcesToDelete.size());
506
507 List<ReplicationSourceInterface> srcToRemove = new ArrayList<ReplicationSourceInterface>();
508 for (ReplicationSourceInterface src : this.sources) {
509 if (id.equals(src.getPeerClusterId())) {
510 srcToRemove.add(src);
511 }
512 }
513 if (srcToRemove.size() == 0) {
514 LOG.error("The queue we wanted to close is missing " + id);
515 return;
516 }
517 for (ReplicationSourceInterface toRemove : srcToRemove) {
518 toRemove.terminate(terminateMessage);
519 this.sources.remove(toRemove);
520 }
521 deleteSource(id, true);
522 }
523
524 @Override
525 public void regionServerRemoved(String regionserver) {
526 transferQueues(regionserver);
527 }
528
529 @Override
530 public void peerRemoved(String peerId) {
531 removePeer(peerId);
532 }
533
534 @Override
535 public void peerListChanged(List<String> peerIds) {
536 for (String id : peerIds) {
537 try {
538 boolean added = this.replicationPeers.peerAdded(id);
539 if (added) {
540 addSource(id);
541 }
542 } catch (Exception e) {
543 LOG.error("Error while adding a new peer", e);
544 }
545 }
546 }
547
548
549
550
551
552 class NodeFailoverWorker extends Thread {
553
554 private String rsZnode;
555 private final ReplicationQueues rq;
556 private final ReplicationPeers rp;
557 private final UUID clusterId;
558
559
560
561
562 public NodeFailoverWorker(String rsZnode) {
563 this(rsZnode, replicationQueues, replicationPeers, ReplicationSourceManager.this.clusterId);
564 }
565
566 public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
567 final ReplicationPeers replicationPeers, final UUID clusterId) {
568 super("Failover-for-"+rsZnode);
569 this.rsZnode = rsZnode;
570 this.rq = replicationQueues;
571 this.rp = replicationPeers;
572 this.clusterId = clusterId;
573 }
574
575 @Override
576 public void run() {
577 if (this.rq.isThisOurZnode(rsZnode)) {
578 return;
579 }
580
581
582 try {
583 Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
584 } catch (InterruptedException e) {
585 LOG.warn("Interrupted while waiting before transferring a queue.");
586 Thread.currentThread().interrupt();
587 }
588
589 if (server.isStopped()) {
590 LOG.info("Not transferring queue since we are shutting down");
591 return;
592 }
593 SortedMap<String, SortedSet<String>> newQueues = null;
594
595 newQueues = this.rq.claimQueues(rsZnode);
596
597
598 if (newQueues.isEmpty()) {
599
600
601 return;
602 }
603
604 for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
605 String peerId = entry.getKey();
606 SortedSet<String> walsSet = entry.getValue();
607 try {
608
609 ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
610 String actualPeerId = replicationQueueInfo.getPeerId();
611 ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
612 ReplicationPeerConfig peerConfig = null;
613 try {
614 peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
615 } catch (ReplicationException ex) {
616 LOG.warn("Received exception while getting replication peer config, skipping replay"
617 + ex);
618 }
619 if (peer == null || peerConfig == null) {
620 LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
621 replicationQueues.removeQueue(peerId);
622 continue;
623 }
624
625 ReplicationSourceInterface src =
626 getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
627 server, peerId, this.clusterId, peerConfig, peer);
628
629
630 synchronized (oldsources) {
631 if (!this.rp.getPeerIds().contains(src.getPeerClusterId())) {
632 src.terminate("Recovered queue doesn't belong to any current peer");
633 closeRecoveredQueue(src);
634 continue;
635 }
636 oldsources.add(src);
637 for (String wal : walsSet) {
638 src.enqueueLog(new Path(oldLogDir, wal));
639 }
640 src.startup();
641 }
642 walsByIdRecoveredQueues.put(peerId, walsSet);
643 } catch (IOException e) {
644
645 LOG.error("Failed creating a source", e);
646 }
647 }
648 }
649 }
650
651
652
653
654
655 public Path getOldLogDir() {
656 return this.oldLogDir;
657 }
658
659
660
661
662
663 public Path getLogDir() {
664 return this.logDir;
665 }
666
667
668
669
670
671 public FileSystem getFs() {
672 return this.fs;
673 }
674
675
676
677
678 public String getStats() {
679 StringBuffer stats = new StringBuffer();
680 for (ReplicationSourceInterface source : sources) {
681 stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
682 stats.append(source.getStats() + "\n");
683 }
684 for (ReplicationSourceInterface oldSource : oldsources) {
685 stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId()+": ");
686 stats.append(oldSource.getStats()+ "\n");
687 }
688 return stats.toString();
689 }
690
691 public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
692 throws ReplicationException {
693 for (ReplicationSourceInterface source : this.sources) {
694 source.addHFileRefs(tableName, family, pairs);
695 }
696 }
697
698 public void cleanUpHFileRefs(String peerId, List<String> files) {
699 this.replicationQueues.removeHFileRefs(peerId, files);
700 }
701 }