1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import java.io.EOFException;
22 import java.io.FileNotFoundException;
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Comparator;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.UUID;
29 import java.util.concurrent.PriorityBlockingQueue;
30 import java.util.concurrent.TimeUnit;
31
32 import org.apache.commons.lang.StringUtils;
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.fs.FileStatus;
37 import org.apache.hadoop.fs.FileSystem;
38 import org.apache.hadoop.fs.Path;
39 import org.apache.hadoop.hbase.Cell;
40 import org.apache.hadoop.hbase.CellUtil;
41 import org.apache.hadoop.hbase.HBaseConfiguration;
42 import org.apache.hadoop.hbase.HConstants;
43 import org.apache.hadoop.hbase.Stoppable;
44 import org.apache.hadoop.hbase.TableName;
45 import org.apache.hadoop.hbase.classification.InterfaceAudience;
46 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
47 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
48 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
49 import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
50 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
51 import org.apache.hadoop.hbase.replication.ReplicationException;
52 import org.apache.hadoop.hbase.replication.ReplicationPeers;
53 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
54 import org.apache.hadoop.hbase.replication.ReplicationQueues;
55 import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
56 import org.apache.hadoop.hbase.replication.WALEntryFilter;
57 import org.apache.hadoop.hbase.util.Bytes;
58 import org.apache.hadoop.hbase.util.CancelableProgressable;
59 import org.apache.hadoop.hbase.util.FSUtils;
60 import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
61 import org.apache.hadoop.hbase.util.Pair;
62 import org.apache.hadoop.hbase.util.Threads;
63 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
64 import org.apache.hadoop.hbase.wal.WAL;
65 import org.apache.hadoop.hbase.wal.WALKey;
66
67 import com.google.common.collect.Lists;
68 import com.google.common.util.concurrent.ListenableFuture;
69 import com.google.common.util.concurrent.Service;
70
71
72
73
74
75
76
77
78
79
80
81
82
83 @InterfaceAudience.Private
84 public class ReplicationSource extends Thread
85 implements ReplicationSourceInterface {
86
87 public static final Log LOG = LogFactory.getLog(ReplicationSource.class);
88
89 private PriorityBlockingQueue<Path> queue;
90 private ReplicationQueues replicationQueues;
91 private ReplicationPeers replicationPeers;
92
93 private Configuration conf;
94 private ReplicationQueueInfo replicationQueueInfo;
95
96 private String peerId;
97
98 private ReplicationSourceManager manager;
99
100 private Stoppable stopper;
101
102 private long sleepForRetries;
103
104 private long replicationQueueSizeCapacity;
105
106 private int replicationQueueNbCapacity;
107
108 private WAL.Reader reader;
109
110 private long lastLoggedPosition = -1;
111
112 private volatile Path currentPath;
113 private FileSystem fs;
114
115 private UUID clusterId;
116
117 private UUID peerClusterId;
118
119 private long totalReplicatedEdits = 0;
120
121 private long totalReplicatedOperations = 0;
122
123 private String peerClusterZnode;
124
125 private int maxRetriesMultiplier;
126
127 private int currentNbOperations = 0;
128
129 private int currentSize = 0;
130
131 private long currentNbHFiles = 0;
132
133 private volatile boolean running = true;
134
135 private MetricsSource metrics;
136
137 private ReplicationWALReaderManager repLogReader;
138
139 private int logQueueWarnThreshold;
140
141 private ReplicationEndpoint replicationEndpoint;
142
143 private WALEntryFilter walEntryFilter;
144
145 private ReplicationThrottler throttler;
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160 @Override
161 public void init(final Configuration conf, final FileSystem fs,
162 final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
163 final ReplicationPeers replicationPeers, final Stoppable stopper,
164 final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
165 final MetricsSource metrics)
166 throws IOException {
167 this.stopper = stopper;
168 this.conf = HBaseConfiguration.create(conf);
169 decorateConf();
170 this.replicationQueueSizeCapacity =
171 this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
172 this.replicationQueueNbCapacity =
173 this.conf.getInt("replication.source.nb.capacity", 25000);
174 this.sleepForRetries =
175 this.conf.getLong("replication.source.sleepforretries", 1000);
176 this.maxRetriesMultiplier =
177 this.conf.getInt("replication.source.maxretriesmultiplier", 300);
178 this.queue =
179 new PriorityBlockingQueue<Path>(
180 this.conf.getInt("hbase.regionserver.maxlogs", 32),
181 new LogsComparator());
182 long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
183 this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
184 this.replicationQueues = replicationQueues;
185 this.replicationPeers = replicationPeers;
186 this.manager = manager;
187 this.fs = fs;
188 this.metrics = metrics;
189 this.repLogReader = new ReplicationWALReaderManager(this.fs, this.conf);
190 this.clusterId = clusterId;
191
192 this.peerClusterZnode = peerClusterZnode;
193 this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
194
195 this.peerId = this.replicationQueueInfo.getPeerId();
196 this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
197 this.replicationEndpoint = replicationEndpoint;
198 }
199
200 private void decorateConf() {
201 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
202 if (StringUtils.isNotEmpty(replicationCodec)) {
203 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
204 }
205 }
206
207 @Override
208 public void enqueueLog(Path log) {
209 this.queue.put(log);
210 int queueSize = queue.size();
211 this.metrics.setSizeOfLogQueue(queueSize);
212
213 if (queueSize > this.logQueueWarnThreshold) {
214 LOG.warn("Queue size: " + queueSize +
215 " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
216 }
217 }
218
219 @Override
220 public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
221 throws ReplicationException {
222 String peerId = peerClusterZnode;
223 if (peerId.contains("-")) {
224
225
226 peerId = peerClusterZnode.split("-")[0];
227 }
228 Map<TableName, List<String>> tableCFMap = replicationPeers.getPeer(peerId).getTableCFs();
229 if (tableCFMap != null) {
230 List<String> tableCfs = tableCFMap.get(tableName);
231 if (tableCFMap.containsKey(tableName)
232 && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
233 this.replicationQueues.addHFileRefs(peerId, pairs);
234 metrics.incrSizeOfHFileRefsQueue(pairs.size());
235 } else {
236 LOG.debug("HFiles will not be replicated belonging to the table " + tableName + " family "
237 + Bytes.toString(family) + " to peer id " + peerId);
238 }
239 } else {
240
241
242 this.replicationQueues.addHFileRefs(peerId, pairs);
243 metrics.incrSizeOfHFileRefsQueue(pairs.size());
244 }
245 }
246
247 private void uninitialize() {
248 LOG.debug("Source exiting " + this.peerId);
249 metrics.clear();
250 if (replicationEndpoint.state() == Service.State.STARTING
251 || replicationEndpoint.state() == Service.State.RUNNING) {
252 replicationEndpoint.stopAndWait();
253 }
254 }
255
256 @Override
257 public void run() {
258
259 if (!this.isActive()) {
260 uninitialize();
261 return;
262 }
263
264 try {
265
266 Service.State state = replicationEndpoint.start().get();
267 if (state != Service.State.RUNNING) {
268 LOG.warn("ReplicationEndpoint was not started. Exiting");
269 uninitialize();
270 return;
271 }
272 } catch (Exception ex) {
273 LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
274 throw new RuntimeException(ex);
275 }
276
277
278 ArrayList<WALEntryFilter> filters = Lists.newArrayList(
279 (WALEntryFilter)new SystemTableWALEntryFilter());
280 WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
281 if (filterFromEndpoint != null) {
282 filters.add(filterFromEndpoint);
283 }
284 this.walEntryFilter = new ChainWALEntryFilter(filters);
285
286 int sleepMultiplier = 1;
287
288 while (this.isActive() && this.peerClusterId == null) {
289 this.peerClusterId = replicationEndpoint.getPeerUUID();
290 if (this.isActive() && this.peerClusterId == null) {
291 if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
292 sleepMultiplier++;
293 }
294 }
295 }
296
297 if (!this.isActive()) {
298 uninitialize();
299 return;
300 }
301
302
303 sleepMultiplier = 1;
304
305
306
307 if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
308 this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
309 + peerClusterId + " which is not allowed by ReplicationEndpoint:"
310 + replicationEndpoint.getClass().getName(), null, false);
311 }
312 LOG.info("Replicating "+clusterId + " -> " + peerClusterId);
313
314
315
316 if (this.replicationQueueInfo.isQueueRecovered()) {
317 try {
318 this.repLogReader.setPosition(this.replicationQueues.getLogPosition(this.peerClusterZnode,
319 this.queue.peek().getName()));
320 if (LOG.isTraceEnabled()) {
321 LOG.trace("Recovered queue started with log " + this.queue.peek() +
322 " at position " + this.repLogReader.getPosition());
323 }
324 } catch (ReplicationException e) {
325 this.terminate("Couldn't get the position of this recovered queue " +
326 this.peerClusterZnode, e);
327 }
328 }
329
330 while (isActive()) {
331
332 if (!isPeerEnabled()) {
333 if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
334 sleepMultiplier++;
335 }
336 continue;
337 }
338 Path oldPath = getCurrentPath();
339
340
341
342 boolean hasCurrentPath = getNextPath();
343 if (getCurrentPath() != null && oldPath == null) {
344 sleepMultiplier = 1;
345 }
346 if (!hasCurrentPath) {
347 if (sleepForRetries("No log to process", sleepMultiplier)) {
348 sleepMultiplier++;
349 }
350 continue;
351 }
352 boolean currentWALisBeingWrittenTo = false;
353
354
355
356
357
358
359
360
361 if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
362 currentWALisBeingWrittenTo = true;
363 }
364
365 if (!openReader(sleepMultiplier)) {
366
367 sleepMultiplier = 1;
368 continue;
369 }
370
371
372 if (this.reader == null) {
373 if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
374 sleepMultiplier++;
375 }
376 continue;
377 }
378
379 boolean gotIOE = false;
380 currentNbOperations = 0;
381 currentNbHFiles = 0;
382 List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
383 currentSize = 0;
384 try {
385 if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
386 continue;
387 }
388 } catch (IOException ioe) {
389 LOG.warn(this.peerClusterZnode + " Got: ", ioe);
390 gotIOE = true;
391 if (ioe.getCause() instanceof EOFException) {
392
393 boolean considerDumping = false;
394 if (this.replicationQueueInfo.isQueueRecovered()) {
395 try {
396 FileStatus stat = this.fs.getFileStatus(this.currentPath);
397 if (stat.getLen() == 0) {
398 LOG.warn(this.peerClusterZnode + " Got EOF and the file was empty");
399 }
400 considerDumping = true;
401 } catch (IOException e) {
402 LOG.warn(this.peerClusterZnode + " Got while getting file size: ", e);
403 }
404 }
405
406 if (considerDumping &&
407 sleepMultiplier == this.maxRetriesMultiplier &&
408 processEndOfFile()) {
409 continue;
410 }
411 }
412 } finally {
413 try {
414 this.reader = null;
415 this.repLogReader.closeReader();
416 } catch (IOException e) {
417 gotIOE = true;
418 LOG.warn("Unable to finalize the tailing of a file", e);
419 }
420 }
421
422
423
424
425 if (this.isActive() && (gotIOE || entries.isEmpty())) {
426 if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
427 this.manager.logPositionAndCleanOldLogs(this.currentPath,
428 this.peerClusterZnode, this.repLogReader.getPosition(),
429 this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
430 this.lastLoggedPosition = this.repLogReader.getPosition();
431 }
432
433 if (!gotIOE) {
434 sleepMultiplier = 1;
435
436
437 this.metrics.setAgeOfLastShippedOp(System.currentTimeMillis());
438 }
439 if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
440 sleepMultiplier++;
441 }
442 continue;
443 }
444 sleepMultiplier = 1;
445 shipEdits(currentWALisBeingWrittenTo, entries);
446 }
447 uninitialize();
448 }
449
450
451
452
453
454
455
456
457
458
459 protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
460 List<WAL.Entry> entries) throws IOException {
461 long seenEntries = 0;
462 if (LOG.isTraceEnabled()) {
463 LOG.trace("Seeking in " + this.currentPath + " at position "
464 + this.repLogReader.getPosition());
465 }
466 this.repLogReader.seek();
467 long positionBeforeRead = this.repLogReader.getPosition();
468 WAL.Entry entry =
469 this.repLogReader.readNextAndSetPosition();
470 while (entry != null) {
471 this.metrics.incrLogEditsRead();
472 seenEntries++;
473
474
475 if (replicationEndpoint.canReplicateToSameCluster()
476 || !entry.getKey().getClusterIds().contains(peerClusterId)) {
477
478 entry = walEntryFilter.filter(entry);
479 WALEdit edit = null;
480 WALKey logKey = null;
481 if (entry != null) {
482 edit = entry.getEdit();
483 logKey = entry.getKey();
484 }
485
486 if (edit != null && edit.size() != 0) {
487
488 logKey.addClusterId(clusterId);
489 currentNbOperations += countDistinctRowKeys(edit);
490 entries.add(entry);
491 currentSize += entry.getEdit().heapSize();
492 currentSize += calculateTotalSizeOfStoreFiles(edit);
493 } else {
494 this.metrics.incrLogEditsFiltered();
495 }
496 }
497
498 if (currentSize >= this.replicationQueueSizeCapacity ||
499 entries.size() >= this.replicationQueueNbCapacity) {
500 break;
501 }
502 try {
503 entry = this.repLogReader.readNextAndSetPosition();
504 } catch (IOException ie) {
505 LOG.debug("Break on IOE: " + ie.getMessage());
506 break;
507 }
508 }
509 metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead);
510 if (currentWALisBeingWrittenTo) {
511 return false;
512 }
513
514
515 return seenEntries == 0 && processEndOfFile();
516 }
517
518
519
520
521
522
523 private int calculateTotalSizeOfStoreFiles(WALEdit edit) {
524 List<Cell> cells = edit.getCells();
525 int totalStoreFilesSize = 0;
526
527 int totalCells = edit.size();
528 for (int i = 0; i < totalCells; i++) {
529 if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
530 try {
531 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
532 List<StoreDescriptor> stores = bld.getStoresList();
533 int totalStores = stores.size();
534 for (int j = 0; j < totalStores; j++) {
535 totalStoreFilesSize += stores.get(j).getStoreFileSizeBytes();
536 }
537 } catch (IOException e) {
538 LOG.error("Failed to deserialize bulk load entry from wal edit. "
539 + "Size of HFiles part of cell will not be considered in replication "
540 + "request size calculation.", e);
541 }
542 }
543 }
544 return totalStoreFilesSize;
545 }
546
547 private void cleanUpHFileRefs(WALEdit edit) throws IOException {
548 String peerId = peerClusterZnode;
549 if (peerId.contains("-")) {
550
551
552 peerId = peerClusterZnode.split("-")[0];
553 }
554 List<Cell> cells = edit.getCells();
555 int totalCells = cells.size();
556 for (int i = 0; i < totalCells; i++) {
557 Cell cell = cells.get(i);
558 if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
559 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
560 List<StoreDescriptor> stores = bld.getStoresList();
561 int totalStores = stores.size();
562 for (int j = 0; j < totalStores; j++) {
563 List<String> storeFileList = stores.get(j).getStoreFileList();
564 manager.cleanUpHFileRefs(peerId, storeFileList);
565 metrics.decrSizeOfHFileRefsQueue(storeFileList.size());
566 }
567 }
568 }
569 }
570
571
572
573
574
575 protected boolean getNextPath() {
576 try {
577 if (this.currentPath == null) {
578 this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
579 this.metrics.setSizeOfLogQueue(queue.size());
580 if (this.currentPath != null) {
581 this.manager.cleanOldLogs(this.currentPath.getName(),
582 this.peerId,
583 this.replicationQueueInfo.isQueueRecovered());
584 if (LOG.isTraceEnabled()) {
585 LOG.trace("New log: " + this.currentPath);
586 }
587 }
588 }
589 } catch (InterruptedException e) {
590 LOG.warn("Interrupted while reading edits", e);
591 }
592 return this.currentPath != null;
593 }
594
595
596
597
598
599
600
601 protected boolean openReader(int sleepMultiplier) {
602 try {
603 try {
604 if (LOG.isTraceEnabled()) {
605 LOG.trace("Opening log " + this.currentPath);
606 }
607 this.reader = repLogReader.openReader(this.currentPath);
608 } catch (FileNotFoundException fnfe) {
609 if (this.replicationQueueInfo.isQueueRecovered()) {
610
611
612
613 List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
614 LOG.info("NB dead servers : " + deadRegionServers.size());
615 final Path walDir = FSUtils.getWALRootDir(this.conf);
616 for (String curDeadServerName : deadRegionServers) {
617 final Path deadRsDirectory = new Path(walDir,
618 DefaultWALProvider.getWALDirectoryName(curDeadServerName));
619 Path[] locs = new Path[] {
620 new Path(deadRsDirectory, currentPath.getName()),
621 new Path(deadRsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT),
622 currentPath.getName()),
623 };
624 for (Path possibleLogLocation : locs) {
625 LOG.info("Possible location " + possibleLogLocation.toUri().toString());
626 if (this.manager.getFs().exists(possibleLogLocation)) {
627
628 LOG.info("Log " + this.currentPath + " still exists at " +
629 possibleLogLocation);
630
631
632 return true;
633 }
634 }
635 }
636
637
638 if (stopper instanceof ReplicationSyncUp.DummyServer) {
639
640
641 FileStatus[] rss = fs.listStatus(manager.getLogDir());
642 for (FileStatus rs : rss) {
643 Path p = rs.getPath();
644 FileStatus[] logs = fs.listStatus(p);
645 for (FileStatus log : logs) {
646 p = new Path(p, log.getPath().getName());
647 if (p.getName().equals(currentPath.getName())) {
648 currentPath = p;
649 LOG.info("Log " + currentPath.getName() + " found at " + currentPath);
650
651 this.openReader(sleepMultiplier);
652 return true;
653 }
654 }
655 }
656 }
657
658
659
660
661
662
663
664
665 throw new IOException("File from recovered queue is " +
666 "nowhere to be found", fnfe);
667 } else {
668
669 Path archivedLogLocation =
670 new Path(manager.getOldLogDir(), currentPath.getName());
671 if (this.manager.getFs().exists(archivedLogLocation)) {
672 currentPath = archivedLogLocation;
673 LOG.info("Log " + this.currentPath + " was moved to " +
674 archivedLogLocation);
675
676 this.openReader(sleepMultiplier);
677
678 }
679
680 }
681 }
682 } catch (LeaseNotRecoveredException lnre) {
683
684 LOG.warn(peerClusterZnode + " Try to recover the WAL lease " + currentPath, lnre);
685 recoverLease(conf, currentPath);
686 this.reader = null;
687 } catch (IOException ioe) {
688 if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
689 LOG.warn(this.peerClusterZnode + " Got: ", ioe);
690 this.reader = null;
691 if (ioe.getCause() instanceof NullPointerException) {
692
693
694
695 LOG.warn("Got NPE opening reader, will retry.");
696 } else if (sleepMultiplier == this.maxRetriesMultiplier) {
697
698
699 LOG.warn("Waited too long for this file, considering dumping");
700 return !processEndOfFile();
701 }
702 }
703 return true;
704 }
705
706 private void recoverLease(final Configuration conf, final Path path) {
707 try {
708 final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
709 FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
710 fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
711 @Override
712 public boolean progress() {
713 LOG.debug("recover WAL lease: " + path);
714 return isActive();
715 }
716 });
717 } catch (IOException e) {
718 LOG.warn("unable to recover lease for WAL: " + path, e);
719 }
720 }
721
722
723
724
725
726
727
728 private boolean isCurrentLogEmpty() {
729 return (this.repLogReader.getPosition() == 0 &&
730 !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
731 }
732
733
734
735
736
737
738
739 protected boolean sleepForRetries(String msg, int sleepMultiplier) {
740 try {
741 if (LOG.isTraceEnabled()) {
742 LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
743 }
744 Thread.sleep(this.sleepForRetries * sleepMultiplier);
745 } catch (InterruptedException e) {
746 LOG.debug("Interrupted while sleeping between retries");
747 Thread.currentThread().interrupt();
748 }
749 return sleepMultiplier < maxRetriesMultiplier;
750 }
751
752
753
754
755
756
757
758 private int countDistinctRowKeys(WALEdit edit) {
759 List<Cell> cells = edit.getCells();
760 int distinctRowKeys = 1;
761 int totalHFileEntries = 0;
762 Cell lastCell = cells.get(0);
763 int totalCells = edit.size();
764 for (int i = 0; i < totalCells; i++) {
765
766 if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
767 try {
768 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
769 List<StoreDescriptor> stores = bld.getStoresList();
770 int totalStores = stores.size();
771 for (int j = 0; j < totalStores; j++) {
772 totalHFileEntries += stores.get(j).getStoreFileList().size();
773 }
774 } catch (IOException e) {
775 LOG.error("Failed to deserialize bulk load entry from wal edit. "
776 + "Then its hfiles count will not be added into metric.");
777 }
778 }
779 if (!CellUtil.matchingRow(cells.get(i), lastCell)) {
780 distinctRowKeys++;
781 }
782 lastCell = cells.get(i);
783 }
784 currentNbHFiles += totalHFileEntries;
785 return distinctRowKeys + totalHFileEntries;
786 }
787
788
789
790
791
792
793 protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) {
794 int sleepMultiplier = 0;
795 if (entries.isEmpty()) {
796 LOG.warn("Was given 0 edits to ship");
797 return;
798 }
799 while (this.isActive()) {
800 try {
801 if (this.throttler.isEnabled()) {
802 long sleepTicks = this.throttler.getNextSleepInterval(currentSize);
803 if (sleepTicks > 0) {
804 try {
805 if (LOG.isTraceEnabled()) {
806 LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
807 }
808 Thread.sleep(sleepTicks);
809 } catch (InterruptedException e) {
810 LOG.debug("Interrupted while sleeping for throttling control");
811 Thread.currentThread().interrupt();
812
813
814 continue;
815 }
816
817 this.throttler.resetStartTick();
818 }
819 }
820
821 ReplicationEndpoint.ReplicateContext replicateContext = new ReplicationEndpoint.ReplicateContext();
822 replicateContext.setEntries(entries).setSize(currentSize);
823
824 long startTimeNs = System.nanoTime();
825
826 boolean replicated = replicationEndpoint.replicate(replicateContext);
827 long endTimeNs = System.nanoTime();
828
829 if (!replicated) {
830 continue;
831 } else {
832 sleepMultiplier = Math.max(sleepMultiplier-1, 0);
833 }
834
835 if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
836
837 int size = entries.size();
838 for (int i = 0; i < size; i++) {
839 cleanUpHFileRefs(entries.get(i).getEdit());
840 }
841
842 this.manager.logPositionAndCleanOldLogs(this.currentPath,
843 this.peerClusterZnode, this.repLogReader.getPosition(),
844 this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
845 this.lastLoggedPosition = this.repLogReader.getPosition();
846 }
847 if (this.throttler.isEnabled()) {
848 this.throttler.addPushSize(currentSize);
849 }
850 this.totalReplicatedEdits += entries.size();
851 this.totalReplicatedOperations += currentNbOperations;
852 this.metrics.shipBatch(this.currentNbOperations, this.currentSize/1024, currentNbHFiles);
853 this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
854 if (LOG.isTraceEnabled()) {
855 LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or "
856 + this.totalReplicatedOperations + " operations in " +
857 ((endTimeNs - startTimeNs)/1000000) + " ms");
858 }
859 break;
860 } catch (Exception ex) {
861 LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" +
862 org.apache.hadoop.util.StringUtils.stringifyException(ex));
863 if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
864 sleepMultiplier++;
865 }
866 }
867 }
868 }
869
870
871
872
873
874
875 protected boolean isPeerEnabled() {
876 return this.replicationPeers.getStatusOfPeer(this.peerId);
877 }
878
879
880
881
882
883
884
885
886 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE",
887 justification="Yeah, this is how it works")
888 protected boolean processEndOfFile() {
889
890 if (this.queue.size() != 0) {
891
892 final long trailerSize = this.repLogReader.currentTrailerSize();
893 final long currentPosition = this.repLogReader.getPosition();
894 FileStatus stat = null;
895 try {
896 stat = fs.getFileStatus(this.currentPath);
897 } catch (IOException exception) {
898 LOG.warn("Couldn't get file length information about log " + this.currentPath + ", it " + (trailerSize < 0 ? "was not" : "was") + " closed cleanly"
899 + ", stats: " + getStats());
900 metrics.incrUnknownFileLengthForClosedWAL();
901 }
902 if (stat != null) {
903 if (trailerSize < 0) {
904 if (currentPosition < stat.getLen()) {
905 final long skippedBytes = stat.getLen() - currentPosition;
906 LOG.info("Reached the end of WAL file '" + currentPath + "'. It was not closed cleanly, so we did not parse " + skippedBytes + " bytes of data.");
907 metrics.incrUncleanlyClosedWALs();
908 metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
909 }
910 } else if (currentPosition + trailerSize < stat.getLen()){
911 LOG.warn("Processing end of WAL file '" + currentPath + "'. At position " + currentPosition + ", which is too far away from reported file length " + stat.getLen() +
912 ". Restarting WAL reading (see HBASE-15983 for details). stats: " + getStats());
913 repLogReader.setPosition(0);
914 metrics.incrRestartedWALReading();
915 metrics.incrRepeatedFileBytes(currentPosition);
916 return false;
917 }
918 }
919 if (LOG.isTraceEnabled()) {
920 LOG.trace("Reached the end of a log, stats: " + getStats()
921 + ", and the length of the file is " + (stat == null ? "N/A" : stat.getLen()));
922 }
923 this.currentPath = null;
924 this.repLogReader.finishCurrentFile();
925 this.reader = null;
926 metrics.incrCompletedWAL();
927 return true;
928 } else if (this.replicationQueueInfo.isQueueRecovered()) {
929 this.manager.closeRecoveredQueue(this);
930 LOG.info("Finished recovering the queue with the following stats " + getStats());
931 metrics.incrCompletedRecoveryQueue();
932 this.running = false;
933 return true;
934 }
935 return false;
936 }
937
938 @Override
939 public void startup() {
940 String n = Thread.currentThread().getName();
941 Thread.UncaughtExceptionHandler handler =
942 new Thread.UncaughtExceptionHandler() {
943 @Override
944 public void uncaughtException(final Thread t, final Throwable e) {
945 LOG.error("Unexpected exception in ReplicationSource," +
946 " currentPath=" + currentPath, e);
947 }
948 };
949 Threads.setDaemonThreadRunning(
950 this, n + ".replicationSource," +
951 this.peerClusterZnode, handler);
952 }
953
954 @Override
955 public void terminate(String reason) {
956 terminate(reason, null);
957 }
958
959 @Override
960 public void terminate(String reason, Exception cause) {
961 terminate(reason, cause, true);
962 }
963
964 public void terminate(String reason, Exception cause, boolean join) {
965 if (cause == null) {
966 LOG.info("Closing source "
967 + this.peerClusterZnode + " because: " + reason);
968
969 } else {
970 LOG.error("Closing source " + this.peerClusterZnode
971 + " because an error occurred: " + reason, cause);
972 }
973 this.running = false;
974 this.interrupt();
975 ListenableFuture<Service.State> future = null;
976 if (this.replicationEndpoint != null) {
977 future = this.replicationEndpoint.stop();
978 }
979 if (join) {
980 Threads.shutdown(this, this.sleepForRetries);
981 if (future != null) {
982 try {
983 future.get();
984 } catch (Exception e) {
985 LOG.warn("Got exception:" + e);
986 }
987 }
988 }
989 }
990
991 @Override
992 public String getPeerClusterZnode() {
993 return this.peerClusterZnode;
994 }
995
996 @Override
997 public String getPeerClusterId() {
998 return this.peerId;
999 }
1000
1001 @Override
1002 public Path getCurrentPath() {
1003 return this.currentPath;
1004 }
1005
1006 private boolean isActive() {
1007 return !this.stopper.isStopped() && this.running && !isInterrupted();
1008 }
1009
1010
1011
1012
1013 public static class LogsComparator implements Comparator<Path> {
1014
1015 @Override
1016 public int compare(Path o1, Path o2) {
1017 return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
1018 }
1019
1020
1021
1022
1023
1024
1025
1026 private static long getTS(Path p) {
1027 int tsIndex = p.getName().lastIndexOf('.') + 1;
1028 return Long.parseLong(p.getName().substring(tsIndex));
1029 }
1030 }
1031
1032 @Override
1033 public String getStats() {
1034 long position = this.repLogReader.getPosition();
1035 return "Total replicated edits: " + totalReplicatedEdits +
1036 ", currently replicating from: " + this.currentPath +
1037 " at position: " + position;
1038 }
1039
1040
1041
1042
1043
1044 public MetricsSource getSourceMetrics() {
1045 return this.metrics;
1046 }
1047 }