View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import java.io.EOFException;
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Comparator;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.UUID;
29  import java.util.concurrent.PriorityBlockingQueue;
30  import java.util.concurrent.TimeUnit;
31  
32  import org.apache.commons.lang.StringUtils;
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FileStatus;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.Cell;
40  import org.apache.hadoop.hbase.CellUtil;
41  import org.apache.hadoop.hbase.HBaseConfiguration;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.Stoppable;
44  import org.apache.hadoop.hbase.TableName;
45  import org.apache.hadoop.hbase.classification.InterfaceAudience;
46  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
47  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
48  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
49  import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
50  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
51  import org.apache.hadoop.hbase.replication.ReplicationException;
52  import org.apache.hadoop.hbase.replication.ReplicationPeers;
53  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
54  import org.apache.hadoop.hbase.replication.ReplicationQueues;
55  import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
56  import org.apache.hadoop.hbase.replication.WALEntryFilter;
57  import org.apache.hadoop.hbase.util.Bytes;
58  import org.apache.hadoop.hbase.util.CancelableProgressable;
59  import org.apache.hadoop.hbase.util.FSUtils;
60  import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
61  import org.apache.hadoop.hbase.util.Pair;
62  import org.apache.hadoop.hbase.util.Threads;
63  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
64  import org.apache.hadoop.hbase.wal.WAL;
65  import org.apache.hadoop.hbase.wal.WALKey;
66  
67  import com.google.common.collect.Lists;
68  import com.google.common.util.concurrent.ListenableFuture;
69  import com.google.common.util.concurrent.Service;
70  
71  /**
72   * Class that handles the source of a replication stream.
73   * Currently does not handle more than 1 slave
74   * For each slave cluster it selects a random number of peers
75   * using a replication ratio. For example, if replication ration = 0.1
76   * and slave cluster has 100 region servers, 10 will be selected.
77   * <p/>
78   * A stream is considered down when we cannot contact a region server on the
79   * peer cluster for more than 55 seconds by default.
80   * <p/>
81   *
82   */
83  @InterfaceAudience.Private
84  public class ReplicationSource extends Thread
85      implements ReplicationSourceInterface {
86  
87    public static final Log LOG = LogFactory.getLog(ReplicationSource.class);
88    // Queue of logs to process
89    private PriorityBlockingQueue<Path> queue;
90    private ReplicationQueues replicationQueues;
91    private ReplicationPeers replicationPeers;
92  
93    private Configuration conf;
94    private ReplicationQueueInfo replicationQueueInfo;
95    // id of the peer cluster this source replicates to
96    private String peerId;
97    // The manager of all sources to which we ping back our progress
98    private ReplicationSourceManager manager;
99    // Should we stop everything?
100   private Stoppable stopper;
101   // How long should we sleep for each retry
102   private long sleepForRetries;
103   // Max size in bytes of entriesArray
104   private long replicationQueueSizeCapacity;
105   // Max number of entries in entriesArray
106   private int replicationQueueNbCapacity;
107   // Our reader for the current log. open/close handled by repLogReader
108   private WAL.Reader reader;
109   // Last position in the log that we sent to ZooKeeper
110   private long lastLoggedPosition = -1;
111   // Path of the current log
112   private volatile Path currentPath;
113   private FileSystem fs;
114   // id of this cluster
115   private UUID clusterId;
116   // id of the other cluster
117   private UUID peerClusterId;
118   // total number of edits we replicated
119   private long totalReplicatedEdits = 0;
120   // total number of edits we replicated
121   private long totalReplicatedOperations = 0;
122   // The znode we currently play with
123   private String peerClusterZnode;
124   // Maximum number of retries before taking bold actions
125   private int maxRetriesMultiplier;
126   // Current number of operations (Put/Delete) that we need to replicate
127   private int currentNbOperations = 0;
128   // Current size of data we need to replicate
129   private int currentSize = 0;
130   //Current number of hfiles that we need to replicate
131   private long currentNbHFiles = 0;
132   // Indicates if this particular source is running
133   private volatile boolean running = true;
134   // Metrics for this source
135   private MetricsSource metrics;
136   // Handle on the log reader helper
137   private ReplicationWALReaderManager repLogReader;
138   //WARN threshold for the number of queued logs, defaults to 2
139   private int logQueueWarnThreshold;
140   // ReplicationEndpoint which will handle the actual replication
141   private ReplicationEndpoint replicationEndpoint;
142   // A filter (or a chain of filters) for the WAL entries.
143   private WALEntryFilter walEntryFilter;
144   // throttler
145   private ReplicationThrottler throttler;
146 
147   /**
148    * Instantiation method used by region servers
149    *
150    * @param conf configuration to use
151    * @param fs file system to use
152    * @param manager replication manager to ping to
153    * @param stopper     the atomic boolean to use to stop the regionserver
154    * @param peerClusterZnode the name of our znode
155    * @param clusterId unique UUID for the cluster
156    * @param replicationEndpoint the replication endpoint implementation
157    * @param metrics metrics for replication source
158    * @throws IOException
159    */
160   @Override
161   public void init(final Configuration conf, final FileSystem fs,
162       final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
163       final ReplicationPeers replicationPeers, final Stoppable stopper,
164       final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
165       final MetricsSource metrics)
166           throws IOException {
167     this.stopper = stopper;
168     this.conf = HBaseConfiguration.create(conf);
169     decorateConf();
170     this.replicationQueueSizeCapacity =
171         this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
172     this.replicationQueueNbCapacity =
173         this.conf.getInt("replication.source.nb.capacity", 25000);
174     this.sleepForRetries =
175         this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
176     this.maxRetriesMultiplier =
177         this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
178     this.queue =
179         new PriorityBlockingQueue<Path>(
180             this.conf.getInt("hbase.regionserver.maxlogs", 32),
181             new LogsComparator());
182     long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
183     this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
184     this.replicationQueues = replicationQueues;
185     this.replicationPeers = replicationPeers;
186     this.manager = manager;
187     this.fs = fs;
188     this.metrics = metrics;
189     this.repLogReader = new ReplicationWALReaderManager(this.fs, this.conf);
190     this.clusterId = clusterId;
191 
192     this.peerClusterZnode = peerClusterZnode;
193     this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
194     // ReplicationQueueInfo parses the peerId out of the znode for us
195     this.peerId = this.replicationQueueInfo.getPeerId();
196     this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
197     this.replicationEndpoint = replicationEndpoint;
198   }
199 
200   private void decorateConf() {
201     String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
202     if (StringUtils.isNotEmpty(replicationCodec)) {
203       this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
204     }
205   }
206 
207   @Override
208   public void enqueueLog(Path log) {
209     this.queue.put(log);
210     int queueSize = queue.size();
211     this.metrics.setSizeOfLogQueue(queueSize);
212     // This will log a warning for each new log that gets created above the warn threshold
213     if (queueSize > this.logQueueWarnThreshold) {
214       LOG.warn("Queue size: " + queueSize +
215         " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
216     }
217   }
218 
219   @Override
220   public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
221       throws ReplicationException {
222     String peerId = peerClusterZnode;
223     if (peerId.contains("-")) {
224       // peerClusterZnode will be in the form peerId + "-" + rsZNode.
225       // A peerId will not have "-" in its name, see HBASE-11394
226       peerId = peerClusterZnode.split("-")[0];
227     }
228     Map<TableName, List<String>> tableCFMap = replicationPeers.getPeer(peerId).getTableCFs();
229     if (tableCFMap != null) {
230       List<String> tableCfs = tableCFMap.get(tableName);
231       if (tableCFMap.containsKey(tableName)
232           && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
233         this.replicationQueues.addHFileRefs(peerId, pairs);
234         metrics.incrSizeOfHFileRefsQueue(pairs.size());
235       } else {
236         LOG.debug("HFiles will not be replicated belonging to the table " + tableName + " family "
237             + Bytes.toString(family) + " to peer id " + peerId);
238       }
239     } else {
240       // user has explicitly not defined any table cfs for replication, means replicate all the
241       // data
242       this.replicationQueues.addHFileRefs(peerId, pairs);
243       metrics.incrSizeOfHFileRefsQueue(pairs.size());
244     }
245   }
246 
247   private void uninitialize() {
248     LOG.debug("Source exiting " + this.peerId);
249     metrics.clear();
250     if (replicationEndpoint.state() == Service.State.STARTING
251         || replicationEndpoint.state() == Service.State.RUNNING) {
252       replicationEndpoint.stopAndWait();
253     }
254   }
255 
256   @Override
257   public void run() {
258     // We were stopped while looping to connect to sinks, just abort
259     if (!this.isActive()) {
260       uninitialize();
261       return;
262     }
263 
264     try {
265       // start the endpoint, connect to the cluster
266       Service.State state = replicationEndpoint.start().get();
267       if (state != Service.State.RUNNING) {
268         LOG.warn("ReplicationEndpoint was not started. Exiting");
269         uninitialize();
270         return;
271       }
272     } catch (Exception ex) {
273       LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
274       throw new RuntimeException(ex);
275     }
276 
277     // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
278     ArrayList<WALEntryFilter> filters = Lists.newArrayList(
279       (WALEntryFilter)new SystemTableWALEntryFilter());
280     WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
281     if (filterFromEndpoint != null) {
282       filters.add(filterFromEndpoint);
283     }
284     this.walEntryFilter = new ChainWALEntryFilter(filters);
285 
286     int sleepMultiplier = 1;
287     // delay this until we are in an asynchronous thread
288     while (this.isActive() && this.peerClusterId == null) {
289       this.peerClusterId = replicationEndpoint.getPeerUUID();
290       if (this.isActive() && this.peerClusterId == null) {
291         if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
292           sleepMultiplier++;
293         }
294       }
295     }
296     // We were stopped while looping to contact peer's zk ensemble, just abort
297     if (!this.isActive()) {
298       uninitialize();
299       return;
300     }
301 
302     // resetting to 1 to reuse later
303     sleepMultiplier = 1;
304 
305     // In rare case, zookeeper setting may be messed up. That leads to the incorrect
306     // peerClusterId value, which is the same as the source clusterId
307     if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
308       this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
309           + peerClusterId + " which is not allowed by ReplicationEndpoint:"
310           + replicationEndpoint.getClass().getName(), null, false);
311     }
312     LOG.info("Replicating "+clusterId + " -> " + peerClusterId);
313 
314     // If this is recovered, the queue is already full and the first log
315     // normally has a position (unless the RS failed between 2 logs)
316     if (this.replicationQueueInfo.isQueueRecovered()) {
317       try {
318         this.repLogReader.setPosition(this.replicationQueues.getLogPosition(this.peerClusterZnode,
319           this.queue.peek().getName()));
320         if (LOG.isTraceEnabled()) {
321           LOG.trace("Recovered queue started with log " + this.queue.peek() +
322               " at position " + this.repLogReader.getPosition());
323         }
324       } catch (ReplicationException e) {
325         this.terminate("Couldn't get the position of this recovered queue " +
326             this.peerClusterZnode, e);
327       }
328     }
329     // Loop until we close down
330     while (isActive()) {
331       // Sleep until replication is enabled again
332       if (!isPeerEnabled()) {
333         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
334           sleepMultiplier++;
335         }
336         continue;
337       }
338       Path oldPath = getCurrentPath(); //note that in the current scenario,
339                                        //oldPath will be null when a log roll
340                                        //happens.
341       // Get a new path
342       boolean hasCurrentPath = getNextPath();
343       if (getCurrentPath() != null && oldPath == null) {
344         sleepMultiplier = 1; //reset the sleepMultiplier on a path change
345       }
346       if (!hasCurrentPath) {
347         if (sleepForRetries("No log to process", sleepMultiplier)) {
348           sleepMultiplier++;
349         }
350         continue;
351       }
352       boolean currentWALisBeingWrittenTo = false;
353       //For WAL files we own (rather than recovered), take a snapshot of whether the
354       //current WAL file (this.currentPath) is in use (for writing) NOW!
355       //Since the new WAL paths are enqueued only after the prev WAL file
356       //is 'closed', presence of an element in the queue means that
357       //the previous WAL file was closed, else the file is in use (currentPath)
358       //We take the snapshot now so that we are protected against races
359       //where a new file gets enqueued while the current file is being processed
360       //(and where we just finished reading the current file).
361       if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
362         currentWALisBeingWrittenTo = true;
363       }
364       // Open a reader on it
365       if (!openReader(sleepMultiplier)) {
366         // Reset the sleep multiplier, else it'd be reused for the next file
367         sleepMultiplier = 1;
368         continue;
369       }
370 
371       // If we got a null reader but didn't continue, then sleep and continue
372       if (this.reader == null) {
373         if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
374           sleepMultiplier++;
375         }
376         continue;
377       }
378 
379       boolean gotIOE = false;
380       currentNbOperations = 0;
381       currentNbHFiles = 0;
382       List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
383       currentSize = 0;
384       try {
385         if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
386           continue;
387         }
388       } catch (IOException ioe) {
389         LOG.warn(this.peerClusterZnode + " Got: ", ioe);
390         gotIOE = true;
391         if (ioe.getCause() instanceof EOFException) {
392 
393           boolean considerDumping = false;
394           if (this.replicationQueueInfo.isQueueRecovered()) {
395             try {
396               FileStatus stat = this.fs.getFileStatus(this.currentPath);
397               if (stat.getLen() == 0) {
398                 LOG.warn(this.peerClusterZnode + " Got EOF and the file was empty");
399               }
400               considerDumping = true;
401             } catch (IOException e) {
402               LOG.warn(this.peerClusterZnode + " Got while getting file size: ", e);
403             }
404           }
405 
406           if (considerDumping &&
407               sleepMultiplier == this.maxRetriesMultiplier &&
408               processEndOfFile()) {
409             continue;
410           }
411         }
412       } finally {
413         try {
414           this.reader = null;
415           this.repLogReader.closeReader();
416         } catch (IOException e) {
417           gotIOE = true;
418           LOG.warn("Unable to finalize the tailing of a file", e);
419         }
420       }
421 
422       // If we didn't get anything to replicate, or if we hit a IOE,
423       // wait a bit and retry.
424       // But if we need to stop, don't bother sleeping
425       if (this.isActive() && (gotIOE || entries.isEmpty())) {
426         if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
427           this.manager.logPositionAndCleanOldLogs(this.currentPath,
428               this.peerClusterZnode, this.repLogReader.getPosition(),
429               this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
430           this.lastLoggedPosition = this.repLogReader.getPosition();
431         }
432         // Reset the sleep multiplier if nothing has actually gone wrong
433         if (!gotIOE) {
434           sleepMultiplier = 1;
435           // if there was nothing to ship and it's not an error
436           // set "ageOfLastShippedOp" to <now> to indicate that we're current
437           this.metrics.setAgeOfLastShippedOp(System.currentTimeMillis());
438         }
439         if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
440           sleepMultiplier++;
441         }
442         continue;
443       }
444       sleepMultiplier = 1;
445       shipEdits(currentWALisBeingWrittenTo, entries);
446     }
447     uninitialize();
448   }
449 
450   /**
451    * Read all the entries from the current log files and retain those
452    * that need to be replicated. Else, process the end of the current file.
453    * @param currentWALisBeingWrittenTo is the current WAL being written to
454    * @param entries resulting entries to be replicated
455    * @return true if we got nothing and went to the next file, false if we got
456    * entries
457    * @throws IOException
458    */
459   protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
460       List<WAL.Entry> entries) throws IOException {
461     long seenEntries = 0;
462     if (LOG.isTraceEnabled()) {
463       LOG.trace("Seeking in " + this.currentPath + " at position "
464           + this.repLogReader.getPosition());
465     }
466     this.repLogReader.seek();
467     long positionBeforeRead = this.repLogReader.getPosition();
468     WAL.Entry entry =
469         this.repLogReader.readNextAndSetPosition();
470     while (entry != null) {
471       this.metrics.incrLogEditsRead();
472       seenEntries++;
473 
474       // don't replicate if the log entries have already been consumed by the cluster
475       if (replicationEndpoint.canReplicateToSameCluster()
476           || !entry.getKey().getClusterIds().contains(peerClusterId)) {
477         // Remove all KVs that should not be replicated
478         entry = walEntryFilter.filter(entry);
479         WALEdit edit = null;
480         WALKey logKey = null;
481         if (entry != null) {
482           edit = entry.getEdit();
483           logKey = entry.getKey();
484         }
485 
486         if (edit != null && edit.size() != 0) {
487           //Mark that the current cluster has the change
488           logKey.addClusterId(clusterId);
489           currentNbOperations += countDistinctRowKeys(edit);
490           entries.add(entry);
491           currentSize += entry.getEdit().heapSize();
492           currentSize += calculateTotalSizeOfStoreFiles(edit);
493         } else {
494           this.metrics.incrLogEditsFiltered();
495         }
496       }
497       // Stop if too many entries or too big
498       if (currentSize >= this.replicationQueueSizeCapacity ||
499           entries.size() >= this.replicationQueueNbCapacity) {
500         break;
501       }
502       try {
503         entry = this.repLogReader.readNextAndSetPosition();
504       } catch (IOException ie) {
505         LOG.debug("Break on IOE: " + ie.getMessage());
506         break;
507       }
508     }
509     metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead);
510     if (currentWALisBeingWrittenTo) {
511       return false;
512     }
513     // If we didn't get anything and the queue has an object, it means we
514     // hit the end of the file for sure
515     return seenEntries == 0 && processEndOfFile();
516   }
517 
518   /**
519    * Calculate the total size of all the store files
520    * @param edit edit to count row keys from
521    * @return the total size of the store files
522    */
523   private int calculateTotalSizeOfStoreFiles(WALEdit edit) {
524     List<Cell> cells = edit.getCells();
525     int totalStoreFilesSize = 0;
526 
527     int totalCells = edit.size();
528     for (int i = 0; i < totalCells; i++) {
529       if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
530         try {
531           BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
532           List<StoreDescriptor> stores = bld.getStoresList();
533           int totalStores = stores.size();
534           for (int j = 0; j < totalStores; j++) {
535             totalStoreFilesSize += stores.get(j).getStoreFileSizeBytes();
536           }
537         } catch (IOException e) {
538           LOG.error("Failed to deserialize bulk load entry from wal edit. "
539               + "Size of HFiles part of cell will not be considered in replication "
540               + "request size calculation.", e);
541         }
542       }
543     }
544     return totalStoreFilesSize;
545   }
546 
547   private void cleanUpHFileRefs(WALEdit edit) throws IOException {
548     String peerId = peerClusterZnode;
549     if (peerId.contains("-")) {
550       // peerClusterZnode will be in the form peerId + "-" + rsZNode.
551       // A peerId will not have "-" in its name, see HBASE-11394
552       peerId = peerClusterZnode.split("-")[0];
553     }
554     List<Cell> cells = edit.getCells();
555     int totalCells = cells.size();
556     for (int i = 0; i < totalCells; i++) {
557       Cell cell = cells.get(i);
558       if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
559         BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
560         List<StoreDescriptor> stores = bld.getStoresList();
561         int totalStores = stores.size();
562         for (int j = 0; j < totalStores; j++) {
563           List<String> storeFileList = stores.get(j).getStoreFileList();
564           manager.cleanUpHFileRefs(peerId, storeFileList);
565           metrics.decrSizeOfHFileRefsQueue(storeFileList.size());
566         }
567       }
568     }
569   }
570 
571   /**
572    * Poll for the next path
573    * @return true if a path was obtained, false if not
574    */
575   protected boolean getNextPath() {
576     try {
577       if (this.currentPath == null) {
578         this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
579         this.metrics.setSizeOfLogQueue(queue.size());
580         if (this.currentPath != null) {
581           this.manager.cleanOldLogs(this.currentPath.getName(),
582               this.peerId,
583               this.replicationQueueInfo.isQueueRecovered());
584           if (LOG.isTraceEnabled()) {
585             LOG.trace("New log: " + this.currentPath);
586           }
587         }
588       }
589     } catch (InterruptedException e) {
590       LOG.warn("Interrupted while reading edits", e);
591     }
592     return this.currentPath != null;
593   }
594 
595   /**
596    * Open a reader on the current path
597    *
598    * @param sleepMultiplier by how many times the default sleeping time is augmented
599    * @return true if we should continue with that file, false if we are over with it
600    */
601   protected boolean openReader(int sleepMultiplier) {
602     try {
603       try {
604         if (LOG.isTraceEnabled()) {
605           LOG.trace("Opening log " + this.currentPath);
606         }
607         this.reader = repLogReader.openReader(this.currentPath);
608       } catch (FileNotFoundException fnfe) {
609         if (this.replicationQueueInfo.isQueueRecovered()) {
610           // We didn't find the log in the archive directory, look if it still
611           // exists in the dead RS folder (there could be a chain of failures
612           // to look at)
613           List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
614           LOG.info("NB dead servers : " + deadRegionServers.size());
615           final Path walDir = FSUtils.getWALRootDir(this.conf);
616           for (String curDeadServerName : deadRegionServers) {
617             final Path deadRsDirectory = new Path(walDir,
618                 DefaultWALProvider.getWALDirectoryName(curDeadServerName));
619             Path[] locs = new Path[] {
620                 new Path(deadRsDirectory, currentPath.getName()),
621                 new Path(deadRsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT),
622                                           currentPath.getName()),
623             };
624             for (Path possibleLogLocation : locs) {
625               LOG.info("Possible location " + possibleLogLocation.toUri().toString());
626               if (this.manager.getFs().exists(possibleLogLocation)) {
627                 // We found the right new location
628                 LOG.info("Log " + this.currentPath + " still exists at " +
629                     possibleLogLocation);
630                 // Breaking here will make us sleep since reader is null
631                 // TODO why don't we need to set currentPath and call openReader here?
632                 return true;
633               }
634             }
635           }
636           // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
637           // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
638           if (stopper instanceof ReplicationSyncUp.DummyServer) {
639             // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal
640             //      area rather than to the wal area for a particular region server.
641             FileStatus[] rss = fs.listStatus(manager.getLogDir());
642             for (FileStatus rs : rss) {
643               Path p = rs.getPath();
644               FileStatus[] logs = fs.listStatus(p);
645               for (FileStatus log : logs) {
646                 p = new Path(p, log.getPath().getName());
647                 if (p.getName().equals(currentPath.getName())) {
648                   currentPath = p;
649                   LOG.info("Log " + currentPath.getName() + " found at " + currentPath);
650                   // Open the log at the new location
651                   this.openReader(sleepMultiplier);
652                   return true;
653                 }
654               }
655             }
656           }
657 
658           // TODO What happens if the log was missing from every single location?
659           // Although we need to check a couple of times as the log could have
660           // been moved by the master between the checks
661           // It can also happen if a recovered queue wasn't properly cleaned,
662           // such that the znode pointing to a log exists but the log was
663           // deleted a long time ago.
664           // For the moment, we'll throw the IO and processEndOfFile
665           throw new IOException("File from recovered queue is " +
666               "nowhere to be found", fnfe);
667         } else {
668           // If the log was archived, continue reading from there
669           Path archivedLogLocation =
670               new Path(manager.getOldLogDir(), currentPath.getName());
671           if (this.manager.getFs().exists(archivedLogLocation)) {
672             currentPath = archivedLogLocation;
673             LOG.info("Log " + this.currentPath + " was moved to " +
674                 archivedLogLocation);
675             // Open the log at the new location
676             this.openReader(sleepMultiplier);
677 
678           }
679           // TODO What happens the log is missing in both places?
680         }
681       }
682     } catch (LeaseNotRecoveredException lnre) {
683       // HBASE-15019 the WAL was not closed due to some hiccup.
684       LOG.warn(peerClusterZnode + " Try to recover the WAL lease " + currentPath, lnre);
685       recoverLease(conf, currentPath);
686       this.reader = null;
687     } catch (IOException ioe) {
688       if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
689       LOG.warn(this.peerClusterZnode + " Got: ", ioe);
690       this.reader = null;
691       if (ioe.getCause() instanceof NullPointerException) {
692         // Workaround for race condition in HDFS-4380
693         // which throws a NPE if we open a file before any data node has the most recent block
694         // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
695         LOG.warn("Got NPE opening reader, will retry.");
696       } else if (sleepMultiplier == this.maxRetriesMultiplier) {
697         // TODO Need a better way to determine if a file is really gone but
698         // TODO without scanning all logs dir
699         LOG.warn("Waited too long for this file, considering dumping");
700         return !processEndOfFile();
701       }
702     }
703     return true;
704   }
705 
706   private void recoverLease(final Configuration conf, final Path path) {
707     try {
708       final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
709       FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
710       fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
711         @Override
712         public boolean progress() {
713           LOG.debug("recover WAL lease: " + path);
714           return isActive();
715         }
716       });
717     } catch (IOException e) {
718       LOG.warn("unable to recover lease for WAL: " + path, e);
719     }
720   }
721 
722   /*
723    * Checks whether the current log file is empty, and it is not a recovered queue. This is to
724    * handle scenario when in an idle cluster, there is no entry in the current log and we keep on
725    * trying to read the log file and get EOFException. In case of a recovered queue the last log
726    * file may be empty, and we don't want to retry that.
727    */
728   private boolean isCurrentLogEmpty() {
729     return (this.repLogReader.getPosition() == 0 &&
730         !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
731   }
732 
733   /**
734    * Do the sleeping logic
735    * @param msg Why we sleep
736    * @param sleepMultiplier by how many times the default sleeping time is augmented
737    * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
738    */
739   protected boolean sleepForRetries(String msg, int sleepMultiplier) {
740     try {
741       if (LOG.isTraceEnabled()) {
742         LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
743       }
744       Thread.sleep(this.sleepForRetries * sleepMultiplier);
745     } catch (InterruptedException e) {
746       LOG.debug("Interrupted while sleeping between retries");
747       Thread.currentThread().interrupt();
748     }
749     return sleepMultiplier < maxRetriesMultiplier;
750   }
751 
752   /**
753    * Count the number of different row keys in the given edit because of
754    * mini-batching. We assume that there's at least one Cell in the WALEdit.
755    * @param edit edit to count row keys from
756    * @return number of different row keys
757    */
758   private int countDistinctRowKeys(WALEdit edit) {
759     List<Cell> cells = edit.getCells();
760     int distinctRowKeys = 1;
761     int totalHFileEntries = 0;
762     Cell lastCell = cells.get(0);
763     int totalCells = edit.size();
764     for (int i = 0; i < totalCells; i++) {
765       // Count HFiles to be replicated
766       if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
767         try {
768           BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
769           List<StoreDescriptor> stores = bld.getStoresList();
770           int totalStores = stores.size();
771           for (int j = 0; j < totalStores; j++) {
772             totalHFileEntries += stores.get(j).getStoreFileList().size();
773           }
774         } catch (IOException e) {
775           LOG.error("Failed to deserialize bulk load entry from wal edit. "
776               + "Then its hfiles count will not be added into metric.");
777         }
778       }
779       if (!CellUtil.matchingRow(cells.get(i), lastCell)) {
780         distinctRowKeys++;
781       }
782       lastCell = cells.get(i);
783     }
784     currentNbHFiles += totalHFileEntries;
785     return distinctRowKeys + totalHFileEntries;
786   }
787 
788   /**
789    * Do the shipping logic
790    * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
791    * written to when this method was called
792    */
793   protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) {
794     int sleepMultiplier = 0;
795     if (entries.isEmpty()) {
796       LOG.warn("Was given 0 edits to ship");
797       return;
798     }
799     while (this.isActive()) {
800       try {
801         if (this.throttler.isEnabled()) {
802           long sleepTicks = this.throttler.getNextSleepInterval(currentSize);
803           if (sleepTicks > 0) {
804             try {
805               if (LOG.isTraceEnabled()) {
806                 LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
807               }
808               Thread.sleep(sleepTicks);
809             } catch (InterruptedException e) {
810               LOG.debug("Interrupted while sleeping for throttling control");
811               Thread.currentThread().interrupt();
812               // current thread might be interrupted to terminate
813               // directly go back to while() for confirm this
814               continue;
815             }
816             // reset throttler's cycle start tick when sleep for throttling occurs
817             this.throttler.resetStartTick();
818           }
819         }
820         // create replicateContext here, so the entries can be GC'd upon return from this call stack
821         ReplicationEndpoint.ReplicateContext replicateContext = new ReplicationEndpoint.ReplicateContext();
822         replicateContext.setEntries(entries).setSize(currentSize);
823 
824         long startTimeNs = System.nanoTime();
825         // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
826         boolean replicated = replicationEndpoint.replicate(replicateContext);
827         long endTimeNs = System.nanoTime();
828 
829         if (!replicated) {
830           continue;
831         } else {
832           sleepMultiplier = Math.max(sleepMultiplier-1, 0);
833         }
834 
835         if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
836           //Clean up hfile references
837           int size = entries.size();
838           for (int i = 0; i < size; i++) {
839             cleanUpHFileRefs(entries.get(i).getEdit());
840           }
841           //Log and clean up WAL logs
842           this.manager.logPositionAndCleanOldLogs(this.currentPath,
843               this.peerClusterZnode, this.repLogReader.getPosition(),
844               this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
845           this.lastLoggedPosition = this.repLogReader.getPosition();
846         }
847         if (this.throttler.isEnabled()) {
848           this.throttler.addPushSize(currentSize);
849         }
850         this.totalReplicatedEdits += entries.size();
851         this.totalReplicatedOperations += currentNbOperations;
852         this.metrics.shipBatch(this.currentNbOperations, this.currentSize/1024, currentNbHFiles);
853         this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
854         if (LOG.isTraceEnabled()) {
855           LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or "
856               + this.totalReplicatedOperations + " operations in " +
857               ((endTimeNs - startTimeNs)/1000000) + " ms");
858         }
859         break;
860       } catch (Exception ex) {
861         LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" +
862             org.apache.hadoop.util.StringUtils.stringifyException(ex));
863         if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
864           sleepMultiplier++;
865         }
866       }
867     }
868   }
869 
870   /**
871    * check whether the peer is enabled or not
872    *
873    * @return true if the peer is enabled, otherwise false
874    */
875   protected boolean isPeerEnabled() {
876     return this.replicationPeers.getStatusOfPeer(this.peerId);
877   }
878 
879   /**
880    * If the queue isn't empty, switch to the next one
881    * Else if this is a recovered queue, it means we're done!
882    * Else we'll just continue to try reading the log file
883    * @return true if we're done with the current file, false if we should
884    * continue trying to read from it
885    */
886   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE",
887       justification="Yeah, this is how it works")
888   protected boolean processEndOfFile() {
889     // We presume this means the file we're reading is closed.
890     if (this.queue.size() != 0) {
891       // -1 means the wal wasn't closed cleanly.
892       final long trailerSize = this.repLogReader.currentTrailerSize();
893       final long currentPosition = this.repLogReader.getPosition();
894       FileStatus stat = null;
895       try {
896         stat = fs.getFileStatus(this.currentPath);
897       } catch (IOException exception) {
898         LOG.warn("Couldn't get file length information about log " + this.currentPath + ", it " + (trailerSize < 0 ? "was not" : "was") + " closed cleanly"
899             + ", stats: " + getStats());
900         metrics.incrUnknownFileLengthForClosedWAL();
901       }
902       if (stat != null) {
903         if (trailerSize < 0) {
904           if (currentPosition < stat.getLen()) {
905             final long skippedBytes = stat.getLen() - currentPosition;
906             LOG.info("Reached the end of WAL file '" + currentPath + "'. It was not closed cleanly, so we did not parse " + skippedBytes + " bytes of data.");
907             metrics.incrUncleanlyClosedWALs();
908             metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
909           }
910         } else if (currentPosition + trailerSize < stat.getLen()){
911           LOG.warn("Processing end of WAL file '" + currentPath + "'. At position " + currentPosition + ", which is too far away from reported file length " + stat.getLen() +
912             ". Restarting WAL reading (see HBASE-15983 for details). stats: " + getStats());
913           repLogReader.setPosition(0);
914           metrics.incrRestartedWALReading();
915           metrics.incrRepeatedFileBytes(currentPosition);
916           return false;
917         }
918       }
919       if (LOG.isTraceEnabled()) {
920         LOG.trace("Reached the end of a log, stats: " + getStats()
921           + ", and the length of the file is " + (stat == null ? "N/A" : stat.getLen()));
922       }
923       this.currentPath = null;
924       this.repLogReader.finishCurrentFile();
925       this.reader = null;
926       metrics.incrCompletedWAL();
927       return true;
928     } else if (this.replicationQueueInfo.isQueueRecovered()) {
929       this.manager.closeRecoveredQueue(this);
930       LOG.info("Finished recovering the queue with the following stats " + getStats());
931       metrics.incrCompletedRecoveryQueue();
932       this.running = false;
933       return true;
934     }
935     return false;
936   }
937 
938   @Override
939   public void startup() {
940     String n = Thread.currentThread().getName();
941     Thread.UncaughtExceptionHandler handler =
942         new Thread.UncaughtExceptionHandler() {
943           @Override
944           public void uncaughtException(final Thread t, final Throwable e) {
945             LOG.error("Unexpected exception in ReplicationSource," +
946               " currentPath=" + currentPath, e);
947           }
948         };
949     Threads.setDaemonThreadRunning(
950         this, n + ".replicationSource," +
951         this.peerClusterZnode, handler);
952   }
953 
954   @Override
955   public void terminate(String reason) {
956     terminate(reason, null);
957   }
958 
959   @Override
960   public void terminate(String reason, Exception cause) {
961     terminate(reason, cause, true);
962   }
963 
964   public void terminate(String reason, Exception cause, boolean join) {
965     if (cause == null) {
966       LOG.info("Closing source "
967           + this.peerClusterZnode + " because: " + reason);
968 
969     } else {
970       LOG.error("Closing source " + this.peerClusterZnode
971           + " because an error occurred: " + reason, cause);
972     }
973     this.running = false;
974     this.interrupt();
975     ListenableFuture<Service.State> future = null;
976     if (this.replicationEndpoint != null) {
977       future = this.replicationEndpoint.stop();
978     }
979     if (join) {
980       Threads.shutdown(this, this.sleepForRetries);
981       if (future != null) {
982         try {
983           future.get();
984         } catch (Exception e) {
985           LOG.warn("Got exception:" + e);
986         }
987       }
988     }
989   }
990 
991   @Override
992   public String getPeerClusterZnode() {
993     return this.peerClusterZnode;
994   }
995 
996   @Override
997   public String getPeerClusterId() {
998     return this.peerId;
999   }
1000 
1001   @Override
1002   public Path getCurrentPath() {
1003     return this.currentPath;
1004   }
1005 
1006   private boolean isActive() {
1007     return !this.stopper.isStopped() && this.running && !isInterrupted();
1008   }
1009 
1010   /**
1011    * Comparator used to compare logs together based on their start time
1012    */
1013   public static class LogsComparator implements Comparator<Path> {
1014 
1015     @Override
1016     public int compare(Path o1, Path o2) {
1017       return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
1018     }
1019 
1020     /**
1021      * Split a path to get the start time
1022      * For example: 10.20.20.171%3A60020.1277499063250
1023      * @param p path to split
1024      * @return start time
1025      */
1026     private static long getTS(Path p) {
1027       int tsIndex = p.getName().lastIndexOf('.') + 1;
1028       return Long.parseLong(p.getName().substring(tsIndex));
1029     }
1030   }
1031 
1032   @Override
1033   public String getStats() {
1034     long position = this.repLogReader.getPosition();
1035     return "Total replicated edits: " + totalReplicatedEdits +
1036       ", currently replicating from: " + this.currentPath +
1037       " at position: " + position;
1038   }
1039 
1040   /**
1041    * Get Replication Source Metrics
1042    * @return sourceMetrics
1043    */
1044   public MetricsSource getSourceMetrics() {
1045     return this.metrics;
1046   }
1047 }