View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.replication.regionserver;
21  
22  import com.google.common.annotations.VisibleForTesting;
23  import com.google.common.util.concurrent.ThreadFactoryBuilder;
24  
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.Collections;
28  import java.util.HashMap;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Random;
32  import java.util.SortedMap;
33  import java.util.SortedSet;
34  import java.util.TreeSet;
35  import java.util.UUID;
36  import java.util.concurrent.ConcurrentHashMap;
37  import java.util.concurrent.CopyOnWriteArrayList;
38  import java.util.concurrent.LinkedBlockingQueue;
39  import java.util.concurrent.RejectedExecutionException;
40  import java.util.concurrent.ThreadPoolExecutor;
41  import java.util.concurrent.TimeUnit;
42  
43  import org.apache.commons.logging.Log;
44  import org.apache.commons.logging.LogFactory;
45  import org.apache.hadoop.conf.Configuration;
46  import org.apache.hadoop.fs.FileSystem;
47  import org.apache.hadoop.fs.Path;
48  import org.apache.hadoop.hbase.HConstants;
49  import org.apache.hadoop.hbase.Server;
50  import org.apache.hadoop.hbase.TableDescriptors;
51  import org.apache.hadoop.hbase.TableName;
52  import org.apache.hadoop.hbase.classification.InterfaceAudience;
53  import org.apache.hadoop.hbase.regionserver.HRegionServer;
54  import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
55  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
56  import org.apache.hadoop.hbase.replication.ReplicationException;
57  import org.apache.hadoop.hbase.replication.ReplicationListener;
58  import org.apache.hadoop.hbase.replication.ReplicationPeer;
59  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
60  import org.apache.hadoop.hbase.replication.ReplicationPeers;
61  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
62  import org.apache.hadoop.hbase.replication.ReplicationQueues;
63  import org.apache.hadoop.hbase.replication.ReplicationTracker;
64  import org.apache.hadoop.hbase.util.Pair;
65  
66  /**
67   * This class is responsible to manage all the replication
68   * sources. There are two classes of sources:
69   * <li> Normal sources are persistent and one per peer cluster</li>
70   * <li> Old sources are recovered from a failed region server and our
71   * only goal is to finish replicating the WAL queue it had up in ZK</li>
72   *
73   * When a region server dies, this class uses a watcher to get notified and it
74   * tries to grab a lock in order to transfer all the queues in a local
75   * old source.
76   *
77   * This class implements the ReplicationListener interface so that it can track changes in
78   * replication state.
79   */
80  @InterfaceAudience.Private
81  public class ReplicationSourceManager implements ReplicationListener {
82    private static final Log LOG =
83        LogFactory.getLog(ReplicationSourceManager.class);
84    // List of all the sources that read this RS's logs
85    private final List<ReplicationSourceInterface> sources;
86    // List of all the sources we got from died RSs
87    private final List<ReplicationSourceInterface> oldsources;
88    private final ReplicationQueues replicationQueues;
89    private final ReplicationTracker replicationTracker;
90    private final ReplicationPeers replicationPeers;
91    // UUID for this cluster
92    private final UUID clusterId;
93    // All about stopping
94    private final Server server;
95    // All logs we are currently tracking
96    private final Map<String, SortedSet<String>> walsById;
97    // Logs for recovered sources we are currently tracking
98    private final Map<String, SortedSet<String>> walsByIdRecoveredQueues;
99    private final Configuration conf;
100   private final FileSystem fs;
101   // The path to the latest log we saw, for new coming sources
102   private Path latestPath;
103   // Path to the wals directories
104   private final Path logDir;
105   // Path to the wal archive
106   private final Path oldLogDir;
107   // The number of ms that we wait before moving znodes, HBASE-3596
108   private final long sleepBeforeFailover;
109   // Homemade executer service for replication
110   private final ThreadPoolExecutor executor;
111 
112   private final Random rand;
113 
114 
115   /**
116    * Creates a replication manager and sets the watch on all the other registered region servers
117    * @param replicationQueues the interface for manipulating replication queues
118    * @param replicationPeers
119    * @param replicationTracker
120    * @param conf the configuration to use
121    * @param server the server for this region server
122    * @param fs the file system to use
123    * @param logDir the directory that contains all wal directories of live RSs
124    * @param oldLogDir the directory where old logs are archived
125    * @param clusterId
126    */
127   public ReplicationSourceManager(final ReplicationQueues replicationQueues,
128       final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
129       final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
130       final Path oldLogDir, final UUID clusterId) {
131     //CopyOnWriteArrayList is thread-safe.
132     //Generally, reading is more than modifying.
133     this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
134     this.replicationQueues = replicationQueues;
135     this.replicationPeers = replicationPeers;
136     this.replicationTracker = replicationTracker;
137     this.server = server;
138     this.walsById = new HashMap<String, SortedSet<String>>();
139     this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, SortedSet<String>>();
140     this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
141     this.conf = conf;
142     this.fs = fs;
143     this.logDir = logDir;
144     this.oldLogDir = oldLogDir;
145     this.sleepBeforeFailover =
146         conf.getLong("replication.sleep.before.failover", 30000); // 30 seconds
147     this.clusterId = clusterId;
148     this.replicationTracker.registerListener(this);
149     this.replicationPeers.getAllPeerIds();
150     // It's preferable to failover 1 RS at a time, but with good zk servers
151     // more could be processed at the same time.
152     int nbWorkers = conf.getInt("replication.executor.workers", 1);
153     // use a short 100ms sleep since this could be done inline with a RS startup
154     // even if we fail, other region servers can take care of it
155     this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
156         100, TimeUnit.MILLISECONDS,
157         new LinkedBlockingQueue<Runnable>());
158     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
159     tfb.setNameFormat("ReplicationExecutor-%d");
160     tfb.setDaemon(true);
161     this.executor.setThreadFactory(tfb.build());
162     this.rand = new Random();
163   }
164 
165   /**
166    * Provide the id of the peer and a log key and this method will figure which
167    * wal it belongs to and will log, for this region server, the current
168    * position. It will also clean old logs from the queue.
169    * @param log Path to the log currently being replicated from
170    * replication status in zookeeper. It will also delete older entries.
171    * @param id id of the peer cluster
172    * @param position current location in the log
173    * @param queueRecovered indicates if this queue comes from another region server
174    * @param holdLogInZK if true then the log is retained in ZK
175    */
176   public void logPositionAndCleanOldLogs(Path log, String id, long position,
177       boolean queueRecovered, boolean holdLogInZK) {
178     String fileName = log.getName();
179     this.replicationQueues.setLogPosition(id, fileName, position);
180     if (holdLogInZK) {
181      return;
182     }
183     cleanOldLogs(fileName, id, queueRecovered);
184   }
185 
186   /**
187    * Cleans a log file and all older files from ZK. Called when we are sure that a
188    * log file is closed and has no more entries.
189    * @param key Path to the log
190    * @param id id of the peer cluster
191    * @param queueRecovered Whether this is a recovered queue
192    */
193   public void cleanOldLogs(String key, String id, boolean queueRecovered) {
194     if (queueRecovered) {
195       SortedSet<String> wals = walsByIdRecoveredQueues.get(id);
196       if (wals != null && !wals.first().equals(key)) {
197         cleanOldLogs(wals, key, id);
198       }
199     } else {
200       synchronized (this.walsById) {
201         SortedSet<String> wals = walsById.get(id);
202         if (!wals.first().equals(key)) {
203           cleanOldLogs(wals, key, id);
204         }
205       }
206     }
207  }
208 
209   private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
210     SortedSet<String> walSet = wals.headSet(key);
211     LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
212     for (String wal : walSet) {
213       this.replicationQueues.removeLog(id, wal);
214     }
215     walSet.clear();
216   }
217 
218   /**
219    * Adds a normal source per registered peer cluster and tries to process all
220    * old region server wal queues
221    */
222   protected void init() throws IOException, ReplicationException {
223     boolean replicationForBulkLoadDataEnabled =
224         conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
225           HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
226     for (String id : this.replicationPeers.getPeerIds()) {
227       addSource(id);
228       if (replicationForBulkLoadDataEnabled) {
229         // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case
230         // when a peer was added before replication for bulk loaded data was enabled.
231         this.replicationQueues.addPeerToHFileRefs(id);
232       }
233     }
234     List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
235     if (currentReplicators == null || currentReplicators.size() == 0) {
236       return;
237     }
238     List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
239     LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
240         + otherRegionServers);
241 
242     // Look if there's anything to process after a restart
243     for (String rs : currentReplicators) {
244       if (!otherRegionServers.contains(rs)) {
245         transferQueues(rs);
246       }
247     }
248   }
249 
250   /**
251    * Add a new normal source to this region server
252    * @param id the id of the peer cluster
253    * @return the source that was created
254    * @throws IOException
255    */
256   protected ReplicationSourceInterface addSource(String id) throws IOException,
257       ReplicationException {
258     ReplicationPeerConfig peerConfig
259       = replicationPeers.getReplicationPeerConfig(id);
260     ReplicationPeer peer = replicationPeers.getPeer(id);
261     ReplicationSourceInterface src =
262         getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
263           this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
264     synchronized (this.walsById) {
265       this.sources.add(src);
266       this.walsById.put(id, new TreeSet<String>());
267       // Add the latest wal to that source's queue
268       if (this.latestPath != null) {
269         String name = this.latestPath.getName();
270         this.walsById.get(id).add(name);
271         try {
272           this.replicationQueues.addLog(src.getPeerClusterZnode(), name);
273         } catch (ReplicationException e) {
274           String message =
275               "Cannot add log to queue when creating a new source, queueId="
276                   + src.getPeerClusterZnode() + ", filename=" + name;
277           server.stop(message);
278           throw e;
279         }
280         src.enqueueLog(this.latestPath);
281       }
282     }
283     src.startup();
284     return src;
285   }
286 
287   /**
288    * Delete a complete queue of wals associated with a peer cluster
289    * @param peerId Id of the peer cluster queue of wals to delete
290    */
291   public void deleteSource(String peerId, boolean closeConnection) {
292     this.replicationQueues.removeQueue(peerId);
293     if (closeConnection) {
294       this.replicationPeers.peerRemoved(peerId);
295     }
296   }
297 
298   /**
299    * Terminate the replication on this region server
300    */
301   public void join() {
302     this.executor.shutdown();
303     if (this.sources.size() == 0) {
304       this.replicationQueues.removeAllQueues();
305     }
306     for (ReplicationSourceInterface source : this.sources) {
307       source.terminate("Region server is closing");
308     }
309   }
310 
311   /**
312    * Get a copy of the wals of the first source on this rs
313    * @return a sorted set of wal names
314    */
315   protected Map<String, SortedSet<String>> getWALs() {
316     return Collections.unmodifiableMap(walsById);
317   }
318 
319   /**
320    * Get a copy of the wals of the recovered sources on this rs
321    * @return a sorted set of wal names
322    */
323   protected Map<String, SortedSet<String>> getWalsByIdRecoveredQueues() {
324     return Collections.unmodifiableMap(walsByIdRecoveredQueues);
325   }
326 
327   /**
328    * Get a list of all the normal sources of this rs
329    * @return lis of all sources
330    */
331   public List<ReplicationSourceInterface> getSources() {
332     return this.sources;
333   }
334 
335   /**
336    * Get a list of all the old sources of this rs
337    * @return list of all old sources
338    */
339   public List<ReplicationSourceInterface> getOldSources() {
340     return this.oldsources;
341   }
342 
343   @VisibleForTesting
344   List<String> getAllQueues() {
345     return replicationQueues.getAllQueues();
346   }
347 
348   void preLogRoll(Path newLog) throws IOException {
349     synchronized (this.walsById) {
350       String name = newLog.getName();
351       for (ReplicationSourceInterface source : this.sources) {
352         try {
353           this.replicationQueues.addLog(source.getPeerClusterZnode(), name);
354         } catch (ReplicationException e) {
355           throw new IOException("Cannot add log to replication queue with id="
356               + source.getPeerClusterZnode() + ", filename=" + name, e);
357         }
358       }
359       for (SortedSet<String> wals : this.walsById.values()) {
360         if (this.sources.isEmpty()) {
361           // If there's no slaves, don't need to keep the old wals since
362           // we only consider the last one when a new slave comes in
363           wals.clear();
364         }
365         wals.add(name);
366       }
367     }
368 
369     this.latestPath = newLog;
370   }
371 
372   void postLogRoll(Path newLog) throws IOException {
373     // This only updates the sources we own, not the recovered ones
374     for (ReplicationSourceInterface source : this.sources) {
375       source.enqueueLog(newLog);
376     }
377   }
378 
379   /**
380    * Factory method to create a replication source
381    * @param conf the configuration to use
382    * @param fs the file system to use
383    * @param manager the manager to use
384    * @param server the server object for this region server
385    * @param peerId the id of the peer cluster
386    * @return the created source
387    * @throws IOException
388    */
389   protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
390       final FileSystem fs, final ReplicationSourceManager manager,
391       final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
392       final Server server, final String peerId, final UUID clusterId,
393       final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
394           throws IOException {
395     RegionServerCoprocessorHost rsServerHost = null;
396     TableDescriptors tableDescriptors = null;
397     if (server instanceof HRegionServer) {
398       rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
399       tableDescriptors = ((HRegionServer) server).getTableDescriptors();
400     }
401     ReplicationSourceInterface src;
402     try {
403       @SuppressWarnings("rawtypes")
404       Class c = Class.forName(conf.get("replication.replicationsource.implementation",
405           ReplicationSource.class.getCanonicalName()));
406       src = (ReplicationSourceInterface) c.newInstance();
407     } catch (Exception e) {
408       LOG.warn("Passed replication source implementation throws errors, " +
409           "defaulting to ReplicationSource", e);
410       src = new ReplicationSource();
411     }
412 
413     ReplicationEndpoint replicationEndpoint = null;
414     try {
415       String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
416       if (replicationEndpointImpl == null) {
417         // Default to HBase inter-cluster replication endpoint
418         replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
419       }
420       @SuppressWarnings("rawtypes")
421       Class c = Class.forName(replicationEndpointImpl);
422       replicationEndpoint = (ReplicationEndpoint) c.newInstance();
423       if(rsServerHost != null) {
424         ReplicationEndpoint newReplicationEndPoint = rsServerHost
425             .postCreateReplicationEndPoint(replicationEndpoint);
426         if(newReplicationEndPoint != null) {
427           // Override the newly created endpoint from the hook with configured end point
428           replicationEndpoint = newReplicationEndPoint;
429         }
430       }
431     } catch (Exception e) {
432       LOG.warn("Passed replication endpoint implementation throws errors", e);
433       throw new IOException(e);
434     }
435 
436     MetricsSource metrics = new MetricsSource(peerId);
437     // init replication source
438     src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId,
439       clusterId, replicationEndpoint, metrics);
440 
441     // init replication endpoint
442     replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
443       fs, peerConfig, peerId, clusterId, replicationPeer, metrics, tableDescriptors));
444 
445     return src;
446   }
447 
448   /**
449    * Transfer all the queues of the specified to this region server.
450    * First it tries to grab a lock and if it works it will move the
451    * znodes and finally will delete the old znodes.
452    *
453    * It creates one old source for any type of source of the old rs.
454    * @param rsZnode
455    */
456   private void transferQueues(String rsZnode) {
457     NodeFailoverWorker transfer =
458         new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
459             this.clusterId);
460     try {
461       this.executor.execute(transfer);
462     } catch (RejectedExecutionException ex) {
463       LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
464     }
465   }
466 
467   /**
468    * Clear the references to the specified old source
469    * @param src source to clear
470    */
471   public void closeRecoveredQueue(ReplicationSourceInterface src) {
472     LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
473     this.oldsources.remove(src);
474     deleteSource(src.getPeerClusterZnode(), false);
475     this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
476   }
477 
478   /**
479    * Thie method first deletes all the recovered sources for the specified
480    * id, then deletes the normal source (deleting all related data in ZK).
481    * @param id The id of the peer cluster
482    */
483   public void removePeer(String id) {
484     LOG.info("Closing the following queue " + id + ", currently have "
485         + sources.size() + " and another "
486         + oldsources.size() + " that were recovered");
487     String terminateMessage = "Replication stream was removed by a user";
488     List<ReplicationSourceInterface> oldSourcesToDelete =
489         new ArrayList<ReplicationSourceInterface>();
490     // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
491     // see NodeFailoverWorker.run
492     synchronized (oldsources) {
493       // First close all the recovered sources for this peer
494       for (ReplicationSourceInterface src : oldsources) {
495         if (id.equals(src.getPeerClusterId())) {
496           oldSourcesToDelete.add(src);
497         }
498       }
499       for (ReplicationSourceInterface src : oldSourcesToDelete) {
500         src.terminate(terminateMessage);
501         closeRecoveredQueue(src);
502       }
503     }
504     LOG.info("Number of deleted recovered sources for " + id + ": "
505         + oldSourcesToDelete.size());
506     // Now look for the one on this cluster
507     List<ReplicationSourceInterface> srcToRemove = new ArrayList<ReplicationSourceInterface>();
508     for (ReplicationSourceInterface src : this.sources) {
509       if (id.equals(src.getPeerClusterId())) {
510         srcToRemove.add(src);
511       }
512     }
513     if (srcToRemove.size() == 0) {
514       LOG.error("The queue we wanted to close is missing " + id);
515       return;
516     }
517     for (ReplicationSourceInterface toRemove : srcToRemove) {
518       toRemove.terminate(terminateMessage);
519       this.sources.remove(toRemove);
520     }
521     deleteSource(id, true);
522   }
523 
524   @Override
525   public void regionServerRemoved(String regionserver) {
526     transferQueues(regionserver);
527   }
528 
529   @Override
530   public void peerRemoved(String peerId) {
531     removePeer(peerId);
532   }
533 
534   @Override
535   public void peerListChanged(List<String> peerIds) {
536     for (String id : peerIds) {
537       try {
538         boolean added = this.replicationPeers.peerAdded(id);
539         if (added) {
540           addSource(id);
541         }
542       } catch (Exception e) {
543         LOG.error("Error while adding a new peer", e);
544       }
545     }
546   }
547 
548   /**
549    * Class responsible to setup new ReplicationSources to take care of the
550    * queues from dead region servers.
551    */
552   class NodeFailoverWorker extends Thread {
553 
554     private String rsZnode;
555     private final ReplicationQueues rq;
556     private final ReplicationPeers rp;
557     private final UUID clusterId;
558 
559     /**
560      * @param rsZnode
561      */
562     public NodeFailoverWorker(String rsZnode) {
563       this(rsZnode, replicationQueues, replicationPeers, ReplicationSourceManager.this.clusterId);
564     }
565 
566     public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
567         final ReplicationPeers replicationPeers, final UUID clusterId) {
568       super("Failover-for-"+rsZnode);
569       this.rsZnode = rsZnode;
570       this.rq = replicationQueues;
571       this.rp = replicationPeers;
572       this.clusterId = clusterId;
573     }
574 
575     @Override
576     public void run() {
577       if (this.rq.isThisOurZnode(rsZnode)) {
578         return;
579       }
580       // Wait a bit before transferring the queues, we may be shutting down.
581       // This sleep may not be enough in some cases.
582       try {
583         Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
584       } catch (InterruptedException e) {
585         LOG.warn("Interrupted while waiting before transferring a queue.");
586         Thread.currentThread().interrupt();
587       }
588       // We try to lock that rs' queue directory
589       if (server.isStopped()) {
590         LOG.info("Not transferring queue since we are shutting down");
591         return;
592       }
593       SortedMap<String, SortedSet<String>> newQueues = null;
594 
595       newQueues = this.rq.claimQueues(rsZnode);
596 
597       // Copying over the failed queue is completed.
598       if (newQueues.isEmpty()) {
599         // We either didn't get the lock or the failed region server didn't have any outstanding
600         // WALs to replicate, so we are done.
601         return;
602       }
603 
604       for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
605         String peerId = entry.getKey();
606         SortedSet<String> walsSet = entry.getValue();
607         try {
608           // there is not an actual peer defined corresponding to peerId for the failover.
609           ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
610           String actualPeerId = replicationQueueInfo.getPeerId();
611           ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
612           ReplicationPeerConfig peerConfig = null;
613           try {
614             peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
615           } catch (ReplicationException ex) {
616             LOG.warn("Received exception while getting replication peer config, skipping replay"
617                 + ex);
618           }
619           if (peer == null || peerConfig == null) {
620             LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
621             replicationQueues.removeQueue(peerId);
622             continue;
623           }
624 
625           ReplicationSourceInterface src =
626               getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
627                 server, peerId, this.clusterId, peerConfig, peer);
628           // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
629           // see removePeer
630           synchronized (oldsources) {
631             if (!this.rp.getPeerIds().contains(src.getPeerClusterId())) {
632               src.terminate("Recovered queue doesn't belong to any current peer");
633               closeRecoveredQueue(src);
634               continue;
635             }
636             oldsources.add(src);
637             for (String wal : walsSet) {
638               src.enqueueLog(new Path(oldLogDir, wal));
639             }
640             src.startup();
641           }
642           walsByIdRecoveredQueues.put(peerId, walsSet);
643         } catch (IOException e) {
644           // TODO manage it
645           LOG.error("Failed creating a source", e);
646         }
647       }
648     }
649   }
650 
651   /**
652    * Get the directory where wals are archived
653    * @return the directory where wals are archived
654    */
655   public Path getOldLogDir() {
656     return this.oldLogDir;
657   }
658 
659   /**
660    * Get the directory where wals are stored by their RSs
661    * @return the directory where wals are stored by their RSs
662    */
663   public Path getLogDir() {
664     return this.logDir;
665   }
666 
667   /**
668    * Get the handle on the local file system
669    * @return Handle on the local file system
670    */
671   public FileSystem getFs() {
672     return this.fs;
673   }
674 
675   /**
676    * Get a string representation of all the sources' metrics
677    */
678   public String getStats() {
679     StringBuffer stats = new StringBuffer();
680     for (ReplicationSourceInterface source : sources) {
681       stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
682       stats.append(source.getStats() + "\n");
683     }
684     for (ReplicationSourceInterface oldSource : oldsources) {
685       stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId()+": ");
686       stats.append(oldSource.getStats()+ "\n");
687     }
688     return stats.toString();
689   }
690 
691   public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
692       throws ReplicationException {
693     for (ReplicationSourceInterface source : this.sources) {
694       source.addHFileRefs(tableName, family, pairs);
695     }
696   }
697 
698   public void cleanUpHFileRefs(String peerId, List<String> files) {
699     this.replicationQueues.removeHFileRefs(peerId, files);
700   }
701 }