1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.wal;
19
20 import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER;
21
22 import java.io.FileNotFoundException;
23 import java.io.IOException;
24 import java.io.InterruptedIOException;
25 import java.io.OutputStream;
26 import java.lang.reflect.InvocationTargetException;
27 import java.lang.reflect.Method;
28 import java.net.URLEncoder;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Collections;
32 import java.util.Comparator;
33 import java.util.HashMap;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.NavigableMap;
37 import java.util.Set;
38 import java.util.TreeMap;
39 import java.util.UUID;
40 import java.util.concurrent.BlockingQueue;
41 import java.util.concurrent.ConcurrentHashMap;
42 import java.util.concurrent.ConcurrentMap;
43 import java.util.concurrent.ConcurrentSkipListMap;
44 import java.util.concurrent.CopyOnWriteArrayList;
45 import java.util.concurrent.CountDownLatch;
46 import java.util.concurrent.ExecutionException;
47 import java.util.concurrent.ExecutorService;
48 import java.util.concurrent.Executors;
49 import java.util.concurrent.LinkedBlockingQueue;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.atomic.AtomicBoolean;
52 import java.util.concurrent.atomic.AtomicInteger;
53 import java.util.concurrent.atomic.AtomicLong;
54 import java.util.concurrent.locks.ReentrantLock;
55
56 import org.apache.commons.logging.Log;
57 import org.apache.commons.logging.LogFactory;
58 import org.apache.hadoop.conf.Configuration;
59 import org.apache.hadoop.fs.FSDataOutputStream;
60 import org.apache.hadoop.fs.FileStatus;
61 import org.apache.hadoop.fs.FileSystem;
62 import org.apache.hadoop.fs.Path;
63 import org.apache.hadoop.fs.PathFilter;
64 import org.apache.hadoop.hbase.Cell;
65 import org.apache.hadoop.hbase.CellUtil;
66 import org.apache.hadoop.hbase.HBaseConfiguration;
67 import org.apache.hadoop.hbase.HConstants;
68 import org.apache.hadoop.hbase.HRegionInfo;
69 import org.apache.hadoop.hbase.HTableDescriptor;
70 import org.apache.hadoop.hbase.KeyValue;
71 import org.apache.hadoop.hbase.TableName;
72 import org.apache.hadoop.hbase.classification.InterfaceAudience;
73 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
74 import org.apache.hadoop.hbase.util.Bytes;
75 import org.apache.hadoop.hbase.util.ClassSize;
76 import org.apache.hadoop.hbase.util.DrainBarrier;
77 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
78 import org.apache.hadoop.hbase.util.FSUtils;
79 import org.apache.hadoop.hbase.util.HasThread;
80 import org.apache.hadoop.hbase.util.Threads;
81 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
82 import org.apache.hadoop.hbase.wal.WAL;
83 import org.apache.hadoop.hbase.wal.WALFactory;
84 import org.apache.hadoop.hbase.wal.WALKey;
85 import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
86 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
87 import org.apache.hadoop.hbase.wal.WALSplitter;
88 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
89 import org.apache.hadoop.util.StringUtils;
90 import org.apache.htrace.NullScope;
91 import org.apache.htrace.Span;
92 import org.apache.htrace.Trace;
93 import org.apache.htrace.TraceScope;
94
95 import com.google.common.annotations.VisibleForTesting;
96 import com.google.common.collect.Maps;
97 import com.lmax.disruptor.BlockingWaitStrategy;
98 import com.lmax.disruptor.EventHandler;
99 import com.lmax.disruptor.ExceptionHandler;
100 import com.lmax.disruptor.LifecycleAware;
101 import com.lmax.disruptor.TimeoutException;
102 import com.lmax.disruptor.dsl.Disruptor;
103 import com.lmax.disruptor.dsl.ProducerType;
104
105
106
107 import com.google.common.annotations.VisibleForTesting;
108 import com.lmax.disruptor.BlockingWaitStrategy;
109 import com.lmax.disruptor.EventHandler;
110 import com.lmax.disruptor.ExceptionHandler;
111 import com.lmax.disruptor.LifecycleAware;
112 import com.lmax.disruptor.TimeoutException;
113 import com.lmax.disruptor.dsl.Disruptor;
114 import com.lmax.disruptor.dsl.ProducerType;
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143 @InterfaceAudience.Private
144 public class FSHLog implements WAL {
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179 static final Log LOG = LogFactory.getLog(FSHLog.class);
180
181 private static final int DEFAULT_SLOW_SYNC_TIME_MS = 100;
182
183
184
185
186
187
188
189
190
191 private final Disruptor<RingBufferTruck> disruptor;
192
193 private static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000;
194
195
196
197
198 private final ExecutorService appendExecutor;
199
200
201
202
203
204
205
206 private final RingBufferEventHandler ringBufferEventHandler;
207
208
209
210
211
212
213
214 private final Map<Thread, SyncFuture> syncFuturesByHandler;
215
216
217
218
219
220 private volatile long highestUnsyncedSequence = -1;
221
222
223
224
225
226
227 private final AtomicLong highestSyncedSequence = new AtomicLong(0);
228
229
230
231
232 protected final FileSystem fs;
233
234
235
236
237 private final Path fullPathLogDir;
238
239
240
241 private final Path fullPathArchiveDir;
242
243
244
245
246 private final PathFilter ourFiles;
247
248
249
250
251 private final String logFilePrefix;
252
253
254
255
256 private final String logFileSuffix;
257
258
259
260
261 private final String prefixPathStr;
262
263 private final WALCoprocessorHost coprocessorHost;
264
265
266
267
268 protected final Configuration conf;
269
270 private final List<WALActionsListener> listeners = new CopyOnWriteArrayList<WALActionsListener>();
271
272 @Override
273 public void registerWALActionsListener(final WALActionsListener listener) {
274 this.listeners.add(listener);
275 }
276
277 @Override
278 public boolean unregisterWALActionsListener(final WALActionsListener listener) {
279 return this.listeners.remove(listener);
280 }
281
282 @Override
283 public WALCoprocessorHost getCoprocessorHost() {
284 return coprocessorHost;
285 }
286
287
288
289 private FSDataOutputStream hdfs_out;
290
291
292
293
294 private final int minTolerableReplication;
295
296
297 private final Method getNumCurrentReplicas;
298 private final Method getPipeLine;
299 private final int slowSyncNs;
300
301 private final static Object [] NO_ARGS = new Object []{};
302
303 private final long walSyncTimeout;
304
305
306
307
308
309 private final AtomicInteger consecutiveLogRolls = new AtomicInteger(0);
310
311 private final int lowReplicationRollLimit;
312
313
314
315
316 private volatile boolean lowReplicationRollEnabled = true;
317
318
319
320
321 volatile Writer writer;
322
323
324 private final DrainBarrier closeBarrier = new DrainBarrier();
325
326
327
328
329
330
331
332
333 private final ReentrantLock rollWriterLock = new ReentrantLock(true);
334
335 private volatile boolean closed = false;
336 private final AtomicBoolean shutdown = new AtomicBoolean(false);
337
338
339 private final AtomicLong filenum = new AtomicLong(-1);
340
341
342 private final AtomicInteger numEntries = new AtomicInteger(0);
343
344
345 private final long logrollsize;
346
347
348
349
350 private AtomicLong totalLogSize = new AtomicLong(0);
351
352
353
354
355
356
357 private final int maxLogs;
358
359
360 private final int closeErrorsTolerated;
361
362 private final AtomicInteger closeErrorCount = new AtomicInteger();
363
364
365
366
367
368
369
370
371
372
373
374 private final Object regionSequenceIdLock = new Object();
375
376
377
378
379
380
381
382
383 private final ConcurrentMap<byte[], ConcurrentMap<byte[], Long>> oldestUnflushedStoreSequenceIds
384 = new ConcurrentSkipListMap<byte[], ConcurrentMap<byte[], Long>>(
385 Bytes.BYTES_COMPARATOR);
386
387
388
389
390
391
392
393
394
395 private final Map<byte[], Map<byte[], Long>> lowestFlushingStoreSequenceIds =
396 new TreeMap<byte[], Map<byte[], Long>>(Bytes.BYTES_COMPARATOR);
397
398
399
400
401
402
403
404
405
406
407
408
409 private Map<byte[], Long> highestRegionSequenceIds = new HashMap<byte[], Long>();
410
411
412
413
414
415 final Comparator<Path> LOG_NAME_COMPARATOR = new Comparator<Path>() {
416 @Override
417 public int compare(Path o1, Path o2) {
418 long t1 = getFileNumFromFileName(o1);
419 long t2 = getFileNumFromFileName(o2);
420 if (t1 == t2) {
421 return 0;
422 }
423 return (t1 > t2) ? 1 : -1;
424 }
425 };
426
427
428
429
430
431 private NavigableMap<Path, Map<byte[], Long>> byWalRegionSequenceIds =
432 new ConcurrentSkipListMap<Path, Map<byte[], Long>>(LOG_NAME_COMPARATOR);
433
434
435
436
437
438 static class RingBufferExceptionHandler implements ExceptionHandler {
439 @Override
440 public void handleEventException(Throwable ex, long sequence, Object event) {
441 LOG.error("Sequence=" + sequence + ", event=" + event, ex);
442 throw new RuntimeException(ex);
443 }
444
445 @Override
446 public void handleOnStartException(Throwable ex) {
447 LOG.error(ex);
448 throw new RuntimeException(ex);
449 }
450
451 @Override
452 public void handleOnShutdownException(Throwable ex) {
453 LOG.error(ex);
454 throw new RuntimeException(ex);
455 }
456 }
457
458
459
460
461
462
463
464
465
466
467 public FSHLog(final FileSystem fs, final Path root, final String logDir, final Configuration conf)
468 throws IOException {
469 this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
470 }
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496 public FSHLog(final FileSystem fs, final Path rootDir, final String logDir,
497 final String archiveDir, final Configuration conf,
498 final List<WALActionsListener> listeners,
499 final boolean failIfWALExists, final String prefix, final String suffix)
500 throws IOException {
501 this.fs = fs;
502 this.fullPathLogDir = new Path(rootDir, logDir);
503 this.fullPathArchiveDir = new Path(rootDir, archiveDir);
504 this.conf = conf;
505
506 if (!fs.exists(fullPathLogDir) && !fs.mkdirs(fullPathLogDir)) {
507 throw new IOException("Unable to mkdir " + fullPathLogDir);
508 }
509
510 if (!fs.exists(this.fullPathArchiveDir)) {
511 if (!fs.mkdirs(this.fullPathArchiveDir)) {
512 throw new IOException("Unable to mkdir " + this.fullPathArchiveDir);
513 }
514 }
515
516
517 this.logFilePrefix =
518 prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8");
519
520 if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) {
521 throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER +
522 "' but instead was '" + suffix + "'");
523 }
524
525
526 FSUtils.setStoragePolicy(fs, conf, this.fullPathLogDir, HConstants.WAL_STORAGE_POLICY,
527 HConstants.DEFAULT_WAL_STORAGE_POLICY);
528 this.logFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8");
529 this.prefixPathStr = new Path(fullPathLogDir,
530 logFilePrefix + WAL_FILE_NAME_DELIMITER).toString();
531
532 this.ourFiles = new PathFilter() {
533 @Override
534 public boolean accept(final Path fileName) {
535
536 final String fileNameString = fileName.toString();
537 if (!fileNameString.startsWith(prefixPathStr)) {
538 return false;
539 }
540 if (logFileSuffix.isEmpty()) {
541
542 return org.apache.commons.lang.StringUtils.isNumeric(
543 fileNameString.substring(prefixPathStr.length()));
544 } else if (!fileNameString.endsWith(logFileSuffix)) {
545 return false;
546 }
547 return true;
548 }
549 };
550
551 if (failIfWALExists) {
552 final FileStatus[] walFiles = FSUtils.listStatus(fs, fullPathLogDir, ourFiles);
553 if (null != walFiles && 0 != walFiles.length) {
554 throw new IOException("Target WAL already exists within directory " + fullPathLogDir);
555 }
556 }
557
558
559 if (listeners != null) {
560 for (WALActionsListener i: listeners) {
561 registerWALActionsListener(i);
562 }
563 }
564 this.coprocessorHost = new WALCoprocessorHost(this, conf);
565
566
567
568 final long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
569 FSUtils.getDefaultBlockSize(this.fs, this.fullPathLogDir));
570 this.logrollsize =
571 (long)(blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f));
572
573 this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
574 this.minTolerableReplication = conf.getInt( "hbase.regionserver.hlog.tolerable.lowreplication",
575 FSUtils.getDefaultReplication(fs, this.fullPathLogDir));
576 this.lowReplicationRollLimit =
577 conf.getInt("hbase.regionserver.hlog.lowreplication.rolllimit", 5);
578 this.closeErrorsTolerated = conf.getInt("hbase.regionserver.logroll.errors.tolerated", 0);
579 int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200);
580
581 LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) +
582 ", rollsize=" + StringUtils.byteDesc(this.logrollsize) +
583 ", prefix=" + this.logFilePrefix + ", suffix=" + logFileSuffix + ", logDir=" +
584 this.fullPathLogDir + ", archiveDir=" + this.fullPathArchiveDir);
585
586
587 rollWriter();
588
589 this.slowSyncNs =
590 1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms",
591 DEFAULT_SLOW_SYNC_TIME_MS);
592 this.walSyncTimeout = conf.getLong("hbase.regionserver.hlog.sync.timeout",
593 DEFAULT_WAL_SYNC_TIMEOUT_MS);
594
595
596 this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out);
597 this.getPipeLine = getGetPipeline(this.hdfs_out);
598
599
600
601 String hostingThreadName = Thread.currentThread().getName();
602 this.appendExecutor = Executors.
603 newSingleThreadExecutor(Threads.getNamedThreadFactory(hostingThreadName + ".append"));
604
605
606
607
608 final int preallocatedEventCount =
609 this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
610
611
612 this.disruptor =
613 new Disruptor<RingBufferTruck>(RingBufferTruck.EVENT_FACTORY, preallocatedEventCount,
614 this.appendExecutor, ProducerType.MULTI, new BlockingWaitStrategy());
615
616
617 this.disruptor.getRingBuffer().next();
618 this.ringBufferEventHandler =
619 new RingBufferEventHandler(conf.getInt("hbase.regionserver.hlog.syncer.count", 5),
620 maxHandlersCount);
621 this.disruptor.handleExceptionsWith(new RingBufferExceptionHandler());
622 this.disruptor.handleEventsWith(new RingBufferEventHandler [] {this.ringBufferEventHandler});
623
624 this.syncFuturesByHandler = new ConcurrentHashMap<Thread, SyncFuture>(maxHandlersCount);
625
626 this.disruptor.start();
627 }
628
629
630
631
632
633 protected FileStatus[] getFiles() throws IOException {
634 return FSUtils.listStatus(fs, fullPathLogDir, ourFiles);
635 }
636
637
638
639
640
641
642
643
644
645 @VisibleForTesting
646 OutputStream getOutputStream() {
647 FSDataOutputStream fsdos = this.hdfs_out;
648 if (fsdos == null) {
649 return null;
650 }
651 return fsdos.getWrappedStream();
652 }
653
654 @Override
655 public byte [][] rollWriter() throws FailedLogCloseException, IOException {
656 return rollWriter(false);
657 }
658
659
660
661
662
663 private Path getNewPath() throws IOException {
664 this.filenum.set(System.currentTimeMillis());
665 Path newPath = getCurrentFileName();
666 while (fs.exists(newPath)) {
667 this.filenum.incrementAndGet();
668 newPath = getCurrentFileName();
669 }
670 return newPath;
671 }
672
673 Path getOldPath() {
674 long currentFilenum = this.filenum.get();
675 Path oldPath = null;
676 if (currentFilenum > 0) {
677
678 oldPath = computeFilename(currentFilenum);
679 }
680 return oldPath;
681 }
682
683
684
685
686
687 private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)
688 throws IOException {
689 if (!this.listeners.isEmpty()) {
690 for (WALActionsListener i : this.listeners) {
691 i.preLogRoll(oldPath, newPath);
692 }
693 }
694 }
695
696
697
698
699
700 private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath)
701 throws IOException {
702 if (!this.listeners.isEmpty()) {
703 for (WALActionsListener i : this.listeners) {
704 i.postLogRoll(oldPath, newPath);
705 }
706 }
707 }
708
709
710
711
712
713 private void preemptiveSync(final ProtobufLogWriter nextWriter) {
714 long startTimeNanos = System.nanoTime();
715 try {
716 nextWriter.sync();
717 postSync(System.nanoTime() - startTimeNanos, 0);
718 } catch (IOException e) {
719
720 LOG.warn("pre-sync failed but an optimization so keep going", e);
721 }
722 }
723
724 @Override
725 public byte [][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
726 rollWriterLock.lock();
727 try {
728
729 if (!force && (this.writer != null && this.numEntries.get() <= 0)) {
730 return null;
731 }
732 byte [][] regionsToFlush = null;
733 if (this.closed) {
734 LOG.debug("WAL closed. Skipping rolling of writer");
735 return regionsToFlush;
736 }
737 if (!closeBarrier.beginOp()) {
738 LOG.debug("WAL closing. Skipping rolling of writer");
739 return regionsToFlush;
740 }
741 TraceScope scope = Trace.startSpan("FSHLog.rollWriter");
742 try {
743 Path oldPath = getOldPath();
744 Path newPath = getNewPath();
745
746 Writer nextWriter = this.createWriterInstance(newPath);
747 FSDataOutputStream nextHdfsOut = null;
748 if (nextWriter instanceof ProtobufLogWriter) {
749 nextHdfsOut = ((ProtobufLogWriter)nextWriter).getStream();
750
751
752 preemptiveSync((ProtobufLogWriter)nextWriter);
753 }
754 tellListenersAboutPreLogRoll(oldPath, newPath);
755
756 newPath = replaceWriter(oldPath, newPath, nextWriter, nextHdfsOut);
757 tellListenersAboutPostLogRoll(oldPath, newPath);
758
759 if (getNumRolledLogFiles() > 0) {
760 cleanOldLogs();
761 regionsToFlush = findRegionsToForceFlush();
762 }
763 } finally {
764 closeBarrier.endOp();
765 assert scope == NullScope.INSTANCE || !scope.isDetached();
766 scope.close();
767 }
768 return regionsToFlush;
769 } finally {
770 rollWriterLock.unlock();
771 }
772 }
773
774
775
776
777
778
779
780 protected Writer createWriterInstance(final Path path) throws IOException {
781 return DefaultWALProvider.createWriter(conf, fs, path, false);
782 }
783
784 private long getLowestSeqId(Map<byte[], Long> seqIdMap) {
785 long result = HConstants.NO_SEQNUM;
786 for (Long seqNum: seqIdMap.values()) {
787 if (result == HConstants.NO_SEQNUM || seqNum.longValue() < result) {
788 result = seqNum.longValue();
789 }
790 }
791 return result;
792 }
793
794 private <T extends Map<byte[], Long>> Map<byte[], Long> copyMapWithLowestSeqId(
795 Map<byte[], T> mapToCopy) {
796 Map<byte[], Long> copied = Maps.newHashMap();
797 for (Map.Entry<byte[], T> entry: mapToCopy.entrySet()) {
798 long lowestSeqId = getLowestSeqId(entry.getValue());
799 if (lowestSeqId != HConstants.NO_SEQNUM) {
800 copied.put(entry.getKey(), lowestSeqId);
801 }
802 }
803 return copied;
804 }
805
806
807
808
809
810
811
812
813
814
815
816
817 private void cleanOldLogs() throws IOException {
818 Map<byte[], Long> lowestFlushingRegionSequenceIdsLocal = null;
819 Map<byte[], Long> oldestUnflushedRegionSequenceIdsLocal = null;
820 List<Path> logsToArchive = new ArrayList<Path>();
821
822 synchronized (regionSequenceIdLock) {
823 lowestFlushingRegionSequenceIdsLocal =
824 copyMapWithLowestSeqId(this.lowestFlushingStoreSequenceIds);
825 oldestUnflushedRegionSequenceIdsLocal =
826 copyMapWithLowestSeqId(this.oldestUnflushedStoreSequenceIds);
827 }
828 for (Map.Entry<Path, Map<byte[], Long>> e : byWalRegionSequenceIds.entrySet()) {
829
830 Path log = e.getKey();
831 Map<byte[], Long> sequenceNums = e.getValue();
832
833 if (areAllRegionsFlushed(sequenceNums, lowestFlushingRegionSequenceIdsLocal,
834 oldestUnflushedRegionSequenceIdsLocal)) {
835 logsToArchive.add(log);
836 LOG.debug("WAL file ready for archiving " + log);
837 }
838 }
839 for (Path p : logsToArchive) {
840 this.totalLogSize.addAndGet(-this.fs.getFileStatus(p).getLen());
841 archiveLogFile(p);
842 this.byWalRegionSequenceIds.remove(p);
843 }
844 }
845
846
847
848
849
850
851
852
853
854
855
856
857 static boolean areAllRegionsFlushed(Map<byte[], Long> sequenceNums,
858 Map<byte[], Long> oldestFlushingMap, Map<byte[], Long> oldestUnflushedMap) {
859 for (Map.Entry<byte[], Long> regionSeqIdEntry : sequenceNums.entrySet()) {
860
861
862 long oldestFlushing = oldestFlushingMap.containsKey(regionSeqIdEntry.getKey()) ?
863 oldestFlushingMap.get(regionSeqIdEntry.getKey()) : Long.MAX_VALUE;
864 long oldestUnFlushed = oldestUnflushedMap.containsKey(regionSeqIdEntry.getKey()) ?
865 oldestUnflushedMap.get(regionSeqIdEntry.getKey()) : Long.MAX_VALUE;
866
867 long minSeqNum = Math.min(oldestFlushing, oldestUnFlushed);
868 if (minSeqNum <= regionSeqIdEntry.getValue()) return false;
869 }
870 return true;
871 }
872
873
874
875
876
877
878
879
880
881
882
883
884 private byte[][] findEligibleMemstoresToFlush(Map<byte[], Long> regionsSequenceNums) {
885 List<byte[]> regionsToFlush = null;
886
887 synchronized (regionSequenceIdLock) {
888 for (Map.Entry<byte[], Long> e: regionsSequenceNums.entrySet()) {
889 ConcurrentMap<byte[], Long> m =
890 this.oldestUnflushedStoreSequenceIds.get(e.getKey());
891 if (m == null || m.isEmpty()) {
892 continue;
893 }
894 long unFlushedVal = Collections.min(m.values());
895 if (unFlushedVal != HConstants.NO_SEQNUM && unFlushedVal <= e.getValue()) {
896 if (regionsToFlush == null)
897 regionsToFlush = new ArrayList<byte[]>();
898 regionsToFlush.add(e.getKey());
899 }
900 }
901 }
902 return regionsToFlush == null ? null : regionsToFlush
903 .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
904 }
905
906
907
908
909
910
911
912
913 byte[][] findRegionsToForceFlush() throws IOException {
914 byte [][] regions = null;
915 int logCount = getNumRolledLogFiles();
916 if (logCount > this.maxLogs && logCount > 0) {
917 Map.Entry<Path, Map<byte[], Long>> firstWALEntry =
918 this.byWalRegionSequenceIds.firstEntry();
919 regions = findEligibleMemstoresToFlush(firstWALEntry.getValue());
920 }
921 if (regions != null) {
922 StringBuilder sb = new StringBuilder();
923 for (int i = 0; i < regions.length; i++) {
924 if (i > 0) {
925 sb.append(", ");
926 }
927 sb.append(Bytes.toStringBinary(regions[i]));
928 }
929 LOG.info("Too many wals: logs=" + logCount + ", maxlogs=" +
930 this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " +
931 sb.toString());
932 }
933 return regions;
934 }
935
936
937
938
939
940 @VisibleForTesting
941 protected void afterCreatingZigZagLatch() {}
942
943
944
945
946 @VisibleForTesting
947 protected void beforeWaitOnSafePoint() {};
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967 Path replaceWriter(final Path oldPath, final Path newPath, Writer nextWriter,
968 final FSDataOutputStream nextHdfsOut)
969 throws IOException {
970
971
972
973
974
975 SyncFuture syncFuture = null;
976 SafePointZigZagLatch zigzagLatch = null;
977 long sequence = -1L;
978 if (this.ringBufferEventHandler != null) {
979
980
981
982
983
984
985
986
987 sequence = getSequenceOnRingBuffer();
988 zigzagLatch = this.ringBufferEventHandler.attainSafePoint();
989 }
990 afterCreatingZigZagLatch();
991 TraceScope scope = Trace.startSpan("FSHFile.replaceWriter");
992 try {
993
994
995
996
997
998 try {
999 if (zigzagLatch != null) {
1000
1001
1002 assert sequence > 0L : "Failed to get sequence from ring buffer";
1003 Trace.addTimelineAnnotation("awaiting safepoint");
1004 syncFuture = zigzagLatch.waitSafePoint(publishSyncOnRingBuffer(sequence));
1005 }
1006 } catch (FailedSyncBeforeLogCloseException e) {
1007
1008 if (isUnflushedEntries()) {
1009 throw e;
1010 }
1011 LOG.warn("Failed sync-before-close but no outstanding appends; closing WAL: " +
1012 e.getMessage());
1013 }
1014
1015
1016
1017 try {
1018 if (this.writer != null) {
1019 Trace.addTimelineAnnotation("closing writer");
1020 this.writer.close();
1021 Trace.addTimelineAnnotation("writer closed");
1022 }
1023 this.closeErrorCount.set(0);
1024 } catch (IOException ioe) {
1025 int errors = closeErrorCount.incrementAndGet();
1026 if (!isUnflushedEntries() && (errors <= this.closeErrorsTolerated)) {
1027 LOG.warn("Riding over failed WAL close of " + oldPath + ", cause=\"" +
1028 ioe.getMessage() + "\", errors=" + errors +
1029 "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK");
1030 } else {
1031 throw ioe;
1032 }
1033 }
1034 this.writer = nextWriter;
1035 this.hdfs_out = nextHdfsOut;
1036 int oldNumEntries = this.numEntries.get();
1037 this.numEntries.set(0);
1038 final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath));
1039 if (oldPath != null) {
1040 this.byWalRegionSequenceIds.put(oldPath, this.highestRegionSequenceIds);
1041 this.highestRegionSequenceIds = new HashMap<byte[], Long>();
1042 long oldFileLen = this.fs.getFileStatus(oldPath).getLen();
1043 this.totalLogSize.addAndGet(oldFileLen);
1044 LOG.info("Rolled WAL " + FSUtils.getPath(oldPath) + " with entries=" + oldNumEntries +
1045 ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " +
1046 newPathString);
1047 } else {
1048 LOG.info("New WAL " + newPathString);
1049 }
1050 } catch (InterruptedException ie) {
1051
1052 Thread.currentThread().interrupt();
1053 } catch (IOException e) {
1054 long count = getUnflushedEntriesCount();
1055 LOG.error("Failed close of WAL writer " + oldPath + ", unflushedEntries=" + count, e);
1056 throw new FailedLogCloseException(oldPath + ", unflushedEntries=" + count, e);
1057 } finally {
1058 try {
1059
1060 if (zigzagLatch != null) {
1061 zigzagLatch.releaseSafePoint();
1062
1063
1064
1065
1066
1067
1068 if (syncFuture != null) {
1069 try {
1070 blockOnSync(syncFuture);
1071 } catch (IOException ioe) {
1072 if (LOG.isTraceEnabled()) {
1073 LOG.trace("Stale sync exception", ioe);
1074 }
1075 }
1076 }
1077 }
1078 } finally {
1079 scope.close();
1080 }
1081 }
1082 return newPath;
1083 }
1084
1085 long getUnflushedEntriesCount() {
1086 long highestSynced = this.highestSyncedSequence.get();
1087 return highestSynced > this.highestUnsyncedSequence?
1088 0: this.highestUnsyncedSequence - highestSynced;
1089 }
1090
1091 boolean isUnflushedEntries() {
1092 return getUnflushedEntriesCount() > 0;
1093 }
1094
1095
1096
1097
1098
1099 public static Path getWALArchivePath(Path archiveDir, Path p) {
1100 return new Path(archiveDir, p.getName());
1101 }
1102
1103 private void archiveLogFile(final Path p) throws IOException {
1104 Path newPath = getWALArchivePath(this.fullPathArchiveDir, p);
1105
1106 if (!this.listeners.isEmpty()) {
1107 for (WALActionsListener i : this.listeners) {
1108 i.preLogArchive(p, newPath);
1109 }
1110 }
1111 LOG.info("Archiving " + p + " to " + newPath);
1112 if (!FSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
1113 throw new IOException("Unable to rename " + p + " to " + newPath);
1114 }
1115
1116 if (!this.listeners.isEmpty()) {
1117 for (WALActionsListener i : this.listeners) {
1118 i.postLogArchive(p, newPath);
1119 }
1120 }
1121 }
1122
1123
1124
1125
1126
1127
1128
1129 protected Path computeFilename(final long filenum) {
1130 if (filenum < 0) {
1131 throw new RuntimeException("WAL file number can't be < 0");
1132 }
1133 String child = logFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + logFileSuffix;
1134 return new Path(fullPathLogDir, child);
1135 }
1136
1137
1138
1139
1140
1141
1142 public Path getCurrentFileName() {
1143 return computeFilename(this.filenum.get());
1144 }
1145
1146
1147
1148
1149
1150 public long getFilenum() {
1151 return filenum.get();
1152 }
1153
1154 @Override
1155 public String toString() {
1156 return "FSHLog " + logFilePrefix + ":" + logFileSuffix + "(num " + filenum + ")";
1157 }
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167 protected long getFileNumFromFileName(Path fileName) {
1168 if (fileName == null) throw new IllegalArgumentException("file name can't be null");
1169 if (!ourFiles.accept(fileName)) {
1170 throw new IllegalArgumentException("The log file " + fileName +
1171 " doesn't belong to this WAL. (" + toString() + ")");
1172 }
1173 final String fileNameString = fileName.toString();
1174 String chompedPath = fileNameString.substring(prefixPathStr.length(),
1175 (fileNameString.length() - logFileSuffix.length()));
1176 return Long.parseLong(chompedPath);
1177 }
1178
1179 @Override
1180 public void close() throws IOException {
1181 shutdown();
1182 final FileStatus[] files = getFiles();
1183 if (null != files && 0 != files.length) {
1184 for (FileStatus file : files) {
1185 Path p = getWALArchivePath(this.fullPathArchiveDir, file.getPath());
1186
1187 if (!this.listeners.isEmpty()) {
1188 for (WALActionsListener i : this.listeners) {
1189 i.preLogArchive(file.getPath(), p);
1190 }
1191 }
1192
1193 if (!FSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
1194 throw new IOException("Unable to rename " + file.getPath() + " to " + p);
1195 }
1196
1197 if (!this.listeners.isEmpty()) {
1198 for (WALActionsListener i : this.listeners) {
1199 i.postLogArchive(file.getPath(), p);
1200 }
1201 }
1202 }
1203 LOG.debug("Moved " + files.length + " WAL file(s) to " +
1204 FSUtils.getPath(this.fullPathArchiveDir));
1205 }
1206 LOG.info("Closed WAL: " + toString() );
1207 }
1208
1209 @Override
1210 public void shutdown() throws IOException {
1211 if (shutdown.compareAndSet(false, true)) {
1212 try {
1213
1214 closeBarrier.stopAndDrainOps();
1215 } catch (InterruptedException e) {
1216 LOG.error("Exception while waiting for cache flushes and log rolls", e);
1217 Thread.currentThread().interrupt();
1218 }
1219
1220
1221
1222
1223 if (this.disruptor != null) {
1224 long timeoutms = conf.getLong("hbase.wal.disruptor.shutdown.timeout.ms", 60000);
1225 try {
1226 this.disruptor.shutdown(timeoutms, TimeUnit.MILLISECONDS);
1227 } catch (TimeoutException e) {
1228 LOG.warn("Timed out bringing down disruptor after " + timeoutms + "ms; forcing halt " +
1229 "(It is a problem if this is NOT an ABORT! -- DATALOSS!!!!)");
1230 this.disruptor.halt();
1231 this.disruptor.shutdown();
1232 }
1233 }
1234
1235 if (this.appendExecutor != null) this.appendExecutor.shutdown();
1236
1237
1238 if (!this.listeners.isEmpty()) {
1239 for (WALActionsListener i : this.listeners) {
1240 i.logCloseRequested();
1241 }
1242 }
1243 this.closed = true;
1244 if (LOG.isDebugEnabled()) {
1245 LOG.debug("Closing WAL writer in " + FSUtils.getPath(fullPathLogDir));
1246 }
1247 if (this.writer != null) {
1248 this.writer.close();
1249 this.writer = null;
1250 }
1251 }
1252 }
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262 @SuppressWarnings("deprecation")
1263 protected WALKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum,
1264 long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
1265
1266 return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds, nonceGroup, nonce);
1267 }
1268
1269 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION",
1270 justification="Will never be null")
1271 @Override
1272 public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key,
1273 final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore,
1274 final List<Cell> memstoreCells) throws IOException {
1275 if (this.closed) throw new IOException("Cannot append; log is closed");
1276
1277
1278 TraceScope scope = Trace.startSpan("FSHLog.append");
1279
1280
1281
1282
1283 FSWALEntry entry = null;
1284 long sequence = this.disruptor.getRingBuffer().next();
1285 try {
1286 RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
1287
1288
1289
1290 entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri,
1291
1292 (memstoreCells != null)? memstoreCells: edits == null? null: edits.getCells());
1293 truck.loadPayload(entry, scope.detach());
1294 } finally {
1295 this.disruptor.getRingBuffer().publish(sequence);
1296 }
1297 return sequence;
1298 }
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315 private class SyncRunner extends HasThread {
1316 private volatile long sequence;
1317
1318 private final BlockingQueue<SyncFuture> syncFutures;
1319 private volatile SyncFuture takeSyncFuture = null;
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331 SyncRunner(final String name, final int maxHandlersCount) {
1332 super(name);
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346 this.syncFutures = new LinkedBlockingQueue<SyncFuture>(maxHandlersCount * 3);
1347 }
1348
1349 void offer(final long sequence, final SyncFuture [] syncFutures, final int syncFutureCount) {
1350
1351 this.sequence = sequence;
1352 for (int i = 0; i < syncFutureCount; ++i) {
1353 this.syncFutures.add(syncFutures[i]);
1354 }
1355 }
1356
1357
1358
1359
1360
1361
1362
1363
1364 private int releaseSyncFuture(final SyncFuture syncFuture, final long currentSequence,
1365 final Throwable t) {
1366 if (!syncFuture.done(currentSequence, t)) throw new IllegalStateException();
1367
1368 return 1;
1369 }
1370
1371
1372
1373
1374
1375
1376
1377 private int releaseSyncFutures(final long currentSequence, final Throwable t) {
1378 int syncCount = 0;
1379 for (SyncFuture syncFuture; (syncFuture = this.syncFutures.peek()) != null;) {
1380 if (syncFuture.getRingBufferSequence() > currentSequence) break;
1381 releaseSyncFuture(syncFuture, currentSequence, t);
1382 if (!this.syncFutures.remove(syncFuture)) {
1383 throw new IllegalStateException(syncFuture.toString());
1384 }
1385 syncCount++;
1386 }
1387 return syncCount;
1388 }
1389
1390
1391
1392
1393
1394 private long updateHighestSyncedSequence(long sequence) {
1395 long currentHighestSyncedSequence;
1396
1397 do {
1398 currentHighestSyncedSequence = highestSyncedSequence.get();
1399 if (currentHighestSyncedSequence >= sequence) {
1400
1401
1402 sequence = currentHighestSyncedSequence;
1403 break;
1404 }
1405 } while (!highestSyncedSequence.compareAndSet(currentHighestSyncedSequence, sequence));
1406 return sequence;
1407 }
1408
1409 boolean areSyncFuturesReleased() {
1410
1411
1412 return syncFutures.size() <= 0
1413 && takeSyncFuture == null;
1414 }
1415
1416 public void run() {
1417 long currentSequence;
1418 while (!isInterrupted()) {
1419 int syncCount = 0;
1420
1421 try {
1422 while (true) {
1423 takeSyncFuture = null;
1424
1425 takeSyncFuture = this.syncFutures.take();
1426 currentSequence = this.sequence;
1427 long syncFutureSequence = takeSyncFuture.getRingBufferSequence();
1428 if (syncFutureSequence > currentSequence) {
1429 throw new IllegalStateException("currentSequence=" + syncFutureSequence +
1430 ", syncFutureSequence=" + syncFutureSequence);
1431 }
1432
1433 long currentHighestSyncedSequence = highestSyncedSequence.get();
1434 if (currentSequence < currentHighestSyncedSequence) {
1435 syncCount += releaseSyncFuture(takeSyncFuture, currentHighestSyncedSequence, null);
1436
1437 continue;
1438 }
1439 break;
1440 }
1441
1442
1443 TraceScope scope = Trace.continueSpan(takeSyncFuture.getSpan());
1444 long start = System.nanoTime();
1445 Throwable lastException = null;
1446 try {
1447 Trace.addTimelineAnnotation("syncing writer");
1448 writer.sync();
1449 Trace.addTimelineAnnotation("writer synced");
1450 currentSequence = updateHighestSyncedSequence(currentSequence);
1451 } catch (IOException e) {
1452 LOG.error("Error syncing, request close of WAL", e);
1453 lastException = e;
1454 } catch (Exception e) {
1455 LOG.warn("UNEXPECTED", e);
1456 lastException = e;
1457 } finally {
1458
1459 takeSyncFuture.setSpan(scope.detach());
1460
1461 syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, lastException);
1462
1463 syncCount += releaseSyncFutures(currentSequence, lastException);
1464 if (lastException != null) requestLogRoll();
1465 else checkLogRoll();
1466 }
1467 postSync(System.nanoTime() - start, syncCount);
1468 } catch (InterruptedException e) {
1469
1470 Thread.currentThread().interrupt();
1471 } catch (Throwable t) {
1472 LOG.warn("UNEXPECTED, continuing", t);
1473 }
1474 }
1475 }
1476 }
1477
1478
1479
1480
1481 void checkLogRoll() {
1482
1483 if (!rollWriterLock.tryLock()) return;
1484 boolean lowReplication;
1485 try {
1486 lowReplication = checkLowReplication();
1487 } finally {
1488 rollWriterLock.unlock();
1489 }
1490 try {
1491 if (lowReplication || writer != null && writer.getLength() > logrollsize) {
1492 requestLogRoll(lowReplication);
1493 }
1494 } catch (IOException e) {
1495 LOG.warn("Writer.getLength() failed; continuing", e);
1496 }
1497 }
1498
1499
1500
1501
1502 private boolean checkLowReplication() {
1503 boolean logRollNeeded = false;
1504
1505
1506 try {
1507 int numCurrentReplicas = getLogReplication();
1508 if (numCurrentReplicas != 0 && numCurrentReplicas < this.minTolerableReplication) {
1509 if (this.lowReplicationRollEnabled) {
1510 if (this.consecutiveLogRolls.get() < this.lowReplicationRollLimit) {
1511 LOG.warn("HDFS pipeline error detected. " + "Found "
1512 + numCurrentReplicas + " replicas but expecting no less than "
1513 + this.minTolerableReplication + " replicas. "
1514 + " Requesting close of WAL. current pipeline: "
1515 + Arrays.toString(getPipeLine()));
1516 logRollNeeded = true;
1517
1518
1519
1520 this.consecutiveLogRolls.getAndIncrement();
1521 } else {
1522 LOG.warn("Too many consecutive RollWriter requests, it's a sign of "
1523 + "the total number of live datanodes is lower than the tolerable replicas.");
1524 this.consecutiveLogRolls.set(0);
1525 this.lowReplicationRollEnabled = false;
1526 }
1527 }
1528 } else if (numCurrentReplicas >= this.minTolerableReplication) {
1529 if (!this.lowReplicationRollEnabled) {
1530
1531
1532
1533 if (this.numEntries.get() <= 1) {
1534 return logRollNeeded;
1535 }
1536
1537
1538 this.lowReplicationRollEnabled = true;
1539 LOG.info("LowReplication-Roller was enabled.");
1540 }
1541 }
1542 } catch (Exception e) {
1543 LOG.warn("DFSOutputStream.getNumCurrentReplicas failed because of " + e +
1544 ", continuing...");
1545 }
1546 return logRollNeeded;
1547 }
1548
1549 private SyncFuture publishSyncOnRingBuffer(long sequence) {
1550 return publishSyncOnRingBuffer(sequence, null);
1551 }
1552
1553 private long getSequenceOnRingBuffer() {
1554 return this.disruptor.getRingBuffer().next();
1555 }
1556
1557 private SyncFuture publishSyncOnRingBuffer(Span span) {
1558 long sequence = this.disruptor.getRingBuffer().next();
1559 return publishSyncOnRingBuffer(sequence, span);
1560 }
1561
1562 private SyncFuture publishSyncOnRingBuffer(long sequence, Span span) {
1563 SyncFuture syncFuture = getSyncFuture(sequence, span);
1564 try {
1565 RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
1566 truck.loadPayload(syncFuture);
1567 } finally {
1568 this.disruptor.getRingBuffer().publish(sequence);
1569 }
1570 return syncFuture;
1571 }
1572
1573
1574 private Span publishSyncThenBlockOnCompletion(Span span) throws IOException {
1575 return blockOnSync(publishSyncOnRingBuffer(span));
1576 }
1577
1578 private Span blockOnSync(final SyncFuture syncFuture) throws IOException {
1579
1580 try {
1581 syncFuture.get(walSyncTimeout);
1582 return syncFuture.getSpan();
1583 } catch (TimeoutIOException tioe) {
1584
1585
1586
1587 this.syncFuturesByHandler.remove(Thread.currentThread());
1588 throw tioe;
1589 } catch (InterruptedException ie) {
1590 LOG.warn("Interrupted", ie);
1591 throw convertInterruptedExceptionToIOException(ie);
1592 } catch (ExecutionException e) {
1593 throw ensureIOException(e.getCause());
1594 }
1595 }
1596
1597 private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) {
1598 Thread.currentThread().interrupt();
1599 IOException ioe = new InterruptedIOException();
1600 ioe.initCause(ie);
1601 return ioe;
1602 }
1603
1604 private SyncFuture getSyncFuture(final long sequence, Span span) {
1605 SyncFuture syncFuture = this.syncFuturesByHandler.get(Thread.currentThread());
1606 if (syncFuture == null) {
1607 syncFuture = new SyncFuture();
1608 this.syncFuturesByHandler.put(Thread.currentThread(), syncFuture);
1609 }
1610 return syncFuture.reset(sequence, span);
1611 }
1612
1613 private void postSync(final long timeInNanos, final int handlerSyncs) {
1614 if (timeInNanos > this.slowSyncNs) {
1615 String msg =
1616 new StringBuilder().append("Slow sync cost: ")
1617 .append(timeInNanos / 1000000).append(" ms, current pipeline: ")
1618 .append(Arrays.toString(getPipeLine())).toString();
1619 Trace.addTimelineAnnotation(msg);
1620 LOG.info(msg);
1621 }
1622 if (!listeners.isEmpty()) {
1623 for (WALActionsListener listener : listeners) {
1624 listener.postSync(timeInNanos, handlerSyncs);
1625 }
1626 }
1627 }
1628
1629 private long postAppend(final Entry e, final long elapsedTime) throws IOException {
1630 long len = 0;
1631 if (!listeners.isEmpty()) {
1632 for (Cell cell : e.getEdit().getCells()) {
1633 len += CellUtil.estimatedSerializedSizeOf(cell);
1634 }
1635 for (WALActionsListener listener : listeners) {
1636 listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit());
1637 }
1638 }
1639 return len;
1640 }
1641
1642
1643
1644
1645
1646
1647 private Method getGetNumCurrentReplicas(final FSDataOutputStream os) {
1648
1649
1650 Method m = null;
1651 if (os != null) {
1652 Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream().getClass();
1653 try {
1654 m = wrappedStreamClass.getDeclaredMethod("getNumCurrentReplicas", new Class<?>[] {});
1655 m.setAccessible(true);
1656 } catch (NoSuchMethodException e) {
1657 LOG.info("FileSystem's output stream doesn't support getNumCurrentReplicas; " +
1658 "HDFS-826 not available; fsOut=" + wrappedStreamClass.getName());
1659 } catch (SecurityException e) {
1660 LOG.info("No access to getNumCurrentReplicas on FileSystems's output stream; HDFS-826 " +
1661 "not available; fsOut=" + wrappedStreamClass.getName(), e);
1662 m = null;
1663 }
1664 }
1665 if (m != null) {
1666 if (LOG.isTraceEnabled()) LOG.trace("Using getNumCurrentReplicas");
1667 }
1668 return m;
1669 }
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683 @VisibleForTesting
1684 int getLogReplication()
1685 throws IllegalArgumentException, IllegalAccessException, InvocationTargetException {
1686 final OutputStream stream = getOutputStream();
1687 if (this.getNumCurrentReplicas != null && stream != null) {
1688 Object repl = this.getNumCurrentReplicas.invoke(stream, NO_ARGS);
1689 if (repl instanceof Integer) {
1690 return ((Integer)repl).intValue();
1691 }
1692 }
1693 return 0;
1694 }
1695
1696 @Override
1697 public void sync() throws IOException {
1698 TraceScope scope = Trace.startSpan("FSHLog.sync");
1699 try {
1700 scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach()));
1701 } finally {
1702 assert scope == NullScope.INSTANCE || !scope.isDetached();
1703 scope.close();
1704 }
1705 }
1706
1707 @Override
1708 public void sync(long txid) throws IOException {
1709 if (this.highestSyncedSequence.get() >= txid){
1710
1711 return;
1712 }
1713 TraceScope scope = Trace.startSpan("FSHLog.sync");
1714 try {
1715 scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach()));
1716 } finally {
1717 assert scope == NullScope.INSTANCE || !scope.isDetached();
1718 scope.close();
1719 }
1720 }
1721
1722
1723 public void requestLogRoll() {
1724 requestLogRoll(false);
1725 }
1726
1727 private void requestLogRoll(boolean tooFewReplicas) {
1728 if (!this.listeners.isEmpty()) {
1729 for (WALActionsListener i: this.listeners) {
1730 i.logRollRequested(tooFewReplicas);
1731 }
1732 }
1733 }
1734
1735
1736
1737 public int getNumRolledLogFiles() {
1738 return byWalRegionSequenceIds.size();
1739 }
1740
1741
1742
1743 public int getNumLogFiles() {
1744
1745 return getNumRolledLogFiles() + 1;
1746 }
1747
1748
1749
1750 public long getLogFileSize() {
1751 return this.totalLogSize.get();
1752 }
1753
1754 @Override
1755 public Long startCacheFlush(final byte[] encodedRegionName,
1756 Set<byte[]> flushedFamilyNames) {
1757 Map<byte[], Long> oldStoreSeqNum = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
1758 if (!closeBarrier.beginOp()) {
1759 LOG.info("Flush will not be started for " + Bytes.toString(encodedRegionName) +
1760 " - because the server is closing.");
1761 return null;
1762 }
1763 long oldestUnflushedSequenceId = HConstants.NO_SEQNUM;
1764 synchronized (regionSequenceIdLock) {
1765 ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
1766 oldestUnflushedStoreSequenceIds.get(encodedRegionName);
1767 if (oldestUnflushedStoreSequenceIdsOfRegion != null) {
1768 for (byte[] familyName: flushedFamilyNames) {
1769 Long seqId = oldestUnflushedStoreSequenceIdsOfRegion.remove(familyName);
1770 if (seqId != null) {
1771 oldStoreSeqNum.put(familyName, seqId);
1772 }
1773 }
1774 if (!oldStoreSeqNum.isEmpty()) {
1775 Map<byte[], Long> oldValue = this.lowestFlushingStoreSequenceIds.put(
1776 encodedRegionName, oldStoreSeqNum);
1777 assert oldValue == null: "Flushing map not cleaned up for "
1778 + Bytes.toString(encodedRegionName);
1779 }
1780 if (oldestUnflushedStoreSequenceIdsOfRegion.isEmpty()) {
1781
1782
1783
1784
1785 oldestUnflushedStoreSequenceIds.remove(encodedRegionName);
1786 } else {
1787 oldestUnflushedSequenceId =
1788 Collections.min(oldestUnflushedStoreSequenceIdsOfRegion.values());
1789 }
1790 }
1791 }
1792 if (oldStoreSeqNum.isEmpty()) {
1793
1794
1795
1796
1797
1798 LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: ["
1799 + Bytes.toString(encodedRegionName) + "]");
1800 }
1801 return oldestUnflushedSequenceId;
1802 }
1803
1804 @Override
1805 public void completeCacheFlush(final byte [] encodedRegionName) {
1806 synchronized (regionSequenceIdLock) {
1807 this.lowestFlushingStoreSequenceIds.remove(encodedRegionName);
1808 }
1809 closeBarrier.endOp();
1810 }
1811
1812 private ConcurrentMap<byte[], Long> getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(
1813 byte[] encodedRegionName) {
1814 ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
1815 oldestUnflushedStoreSequenceIds.get(encodedRegionName);
1816 if (oldestUnflushedStoreSequenceIdsOfRegion != null) {
1817 return oldestUnflushedStoreSequenceIdsOfRegion;
1818 }
1819 oldestUnflushedStoreSequenceIdsOfRegion =
1820 new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1821 ConcurrentMap<byte[], Long> alreadyPut =
1822 oldestUnflushedStoreSequenceIds.putIfAbsent(encodedRegionName,
1823 oldestUnflushedStoreSequenceIdsOfRegion);
1824 return alreadyPut == null ? oldestUnflushedStoreSequenceIdsOfRegion : alreadyPut;
1825 }
1826
1827 @Override
1828 public void abortCacheFlush(byte[] encodedRegionName) {
1829 Map<byte[], Long> storeSeqNumsBeforeFlushStarts;
1830 Map<byte[], Long> currentStoreSeqNums = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1831 synchronized (regionSequenceIdLock) {
1832 storeSeqNumsBeforeFlushStarts = this.lowestFlushingStoreSequenceIds.remove(
1833 encodedRegionName);
1834 if (storeSeqNumsBeforeFlushStarts != null) {
1835 ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
1836 getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(encodedRegionName);
1837 for (Map.Entry<byte[], Long> familyNameAndSeqId: storeSeqNumsBeforeFlushStarts
1838 .entrySet()) {
1839 currentStoreSeqNums.put(familyNameAndSeqId.getKey(),
1840 oldestUnflushedStoreSequenceIdsOfRegion.put(familyNameAndSeqId.getKey(),
1841 familyNameAndSeqId.getValue()));
1842 }
1843 }
1844 }
1845 closeBarrier.endOp();
1846 if (storeSeqNumsBeforeFlushStarts != null) {
1847 for (Map.Entry<byte[], Long> familyNameAndSeqId : storeSeqNumsBeforeFlushStarts.entrySet()) {
1848 Long currentSeqNum = currentStoreSeqNums.get(familyNameAndSeqId.getKey());
1849 if (currentSeqNum != null
1850 && currentSeqNum.longValue() <= familyNameAndSeqId.getValue().longValue()) {
1851 String errorStr =
1852 "Region " + Bytes.toString(encodedRegionName) + " family "
1853 + Bytes.toString(familyNameAndSeqId.getKey())
1854 + " acquired edits out of order current memstore seq=" + currentSeqNum
1855 + ", previous oldest unflushed id=" + familyNameAndSeqId.getValue();
1856 LOG.error(errorStr);
1857 Runtime.getRuntime().halt(1);
1858 }
1859 }
1860 }
1861 }
1862
1863 @VisibleForTesting
1864 boolean isLowReplicationRollEnabled() {
1865 return lowReplicationRollEnabled;
1866 }
1867
1868 public static final long FIXED_OVERHEAD = ClassSize.align(
1869 ClassSize.OBJECT + (5 * ClassSize.REFERENCE) +
1870 ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
1871
1872
1873 private static void split(final Configuration conf, final Path p) throws IOException {
1874 FileSystem fs = FSUtils.getWALFileSystem(conf);
1875 if (!fs.exists(p)) {
1876 throw new FileNotFoundException(p.toString());
1877 }
1878 if (!fs.getFileStatus(p).isDirectory()) {
1879 throw new IOException(p + " is not a directory");
1880 }
1881
1882 final Path baseDir = FSUtils.getWALRootDir(conf);
1883 final Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
1884 WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
1885 }
1886
1887 @Override
1888 public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
1889 ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
1890 this.oldestUnflushedStoreSequenceIds.get(encodedRegionName);
1891 return oldestUnflushedStoreSequenceIdsOfRegion != null ?
1892 getLowestSeqId(oldestUnflushedStoreSequenceIdsOfRegion) : HConstants.NO_SEQNUM;
1893 }
1894
1895 @Override
1896 public long getEarliestMemstoreSeqNum(byte[] encodedRegionName,
1897 byte[] familyName) {
1898 synchronized (regionSequenceIdLock) {
1899 Map<byte[], Long> m = this.lowestFlushingStoreSequenceIds.get(encodedRegionName);
1900 if (m != null) {
1901 Long earlist = m.get(familyName);
1902 if (earlist != null) {
1903 return earlist;
1904 }
1905 }
1906 m = this.oldestUnflushedStoreSequenceIds.get(encodedRegionName);
1907 if (m != null) {
1908 Long earlist = m.get(familyName);
1909 if (earlist != null) {
1910 return earlist;
1911 }
1912 }
1913 }
1914 return HConstants.NO_SEQNUM;
1915 }
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943 static class SafePointZigZagLatch {
1944
1945
1946
1947 private volatile CountDownLatch safePointAttainedLatch = new CountDownLatch(1);
1948
1949
1950
1951 private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1);
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963 SyncFuture waitSafePoint(final SyncFuture syncFuture)
1964 throws InterruptedException, FailedSyncBeforeLogCloseException {
1965 while (true) {
1966 if (this.safePointAttainedLatch.await(1, TimeUnit.NANOSECONDS)) break;
1967 if (syncFuture.isThrowable()) {
1968 throw new FailedSyncBeforeLogCloseException(syncFuture.getThrowable());
1969 }
1970 }
1971 return syncFuture;
1972 }
1973
1974
1975
1976
1977
1978
1979
1980 void safePointAttained() throws InterruptedException {
1981 this.safePointAttainedLatch.countDown();
1982 this.safePointReleasedLatch.await();
1983 }
1984
1985
1986
1987
1988
1989 void releaseSafePoint() {
1990 this.safePointReleasedLatch.countDown();
1991 }
1992
1993
1994
1995
1996 boolean isCocked() {
1997 return this.safePointAttainedLatch.getCount() > 0 &&
1998 this.safePointReleasedLatch.getCount() > 0;
1999 }
2000 }
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025 class RingBufferEventHandler implements EventHandler<RingBufferTruck>, LifecycleAware {
2026 private final SyncRunner [] syncRunners;
2027 private final SyncFuture [] syncFutures;
2028
2029
2030 private volatile int syncFuturesCount = 0;
2031 private volatile SafePointZigZagLatch zigzagLatch;
2032
2033
2034
2035
2036 private Exception exception = null;
2037
2038
2039
2040 private final Object safePointWaiter = new Object();
2041 private volatile boolean shutdown = false;
2042
2043
2044
2045
2046 private int syncRunnerIndex;
2047
2048 RingBufferEventHandler(final int syncRunnerCount, final int maxHandlersCount) {
2049 this.syncFutures = new SyncFuture[maxHandlersCount];
2050 this.syncRunners = new SyncRunner[syncRunnerCount];
2051 for (int i = 0; i < syncRunnerCount; i++) {
2052 this.syncRunners[i] = new SyncRunner("sync." + i, maxHandlersCount);
2053 }
2054 }
2055
2056 private void cleanupOutstandingSyncsOnException(final long sequence, final Exception e) {
2057
2058 for (int i = 0; i < this.syncFuturesCount; i++) this.syncFutures[i].done(sequence, e);
2059 this.syncFuturesCount = 0;
2060 }
2061
2062
2063
2064
2065 private boolean isOutstandingSyncs() {
2066
2067 for (int i = 0; i < this.syncFuturesCount; i++) {
2068 if (!this.syncFutures[i].isDone()) return true;
2069 }
2070 return false;
2071 }
2072
2073 private boolean isOutstandingSyncsFromRunners() {
2074
2075 for (SyncRunner syncRunner : syncRunners) {
2076 if (syncRunner.isAlive() && !syncRunner.areSyncFuturesReleased()) {
2077 return true;
2078 }
2079 }
2080 return false;
2081 }
2082
2083 @Override
2084
2085 public void onEvent(final RingBufferTruck truck, final long sequence, boolean endOfBatch)
2086 throws Exception {
2087
2088
2089
2090
2091
2092
2093
2094
2095 try {
2096 if (truck.hasSyncFuturePayload()) {
2097 this.syncFutures[this.syncFuturesCount++] = truck.unloadSyncFuturePayload();
2098
2099 if (this.syncFuturesCount == this.syncFutures.length) endOfBatch = true;
2100 } else if (truck.hasFSWALEntryPayload()) {
2101 TraceScope scope = Trace.continueSpan(truck.unloadSpanPayload());
2102 try {
2103 FSWALEntry entry = truck.unloadFSWALEntryPayload();
2104 if (this.exception != null) {
2105
2106
2107
2108
2109
2110 entry.stampRegionSequenceId();
2111
2112 return;
2113 }
2114 append(entry);
2115 } catch (Exception e) {
2116
2117 this.exception = e;
2118
2119
2120
2121 cleanupOutstandingSyncsOnException(sequence,
2122 this.exception instanceof DamagedWALException ? this.exception
2123 : new DamagedWALException("On sync", this.exception));
2124
2125 return;
2126 } finally {
2127 assert scope == NullScope.INSTANCE || !scope.isDetached();
2128 scope.close();
2129 }
2130 } else {
2131
2132 cleanupOutstandingSyncsOnException(sequence,
2133 new IllegalStateException("Neither append nor sync"));
2134
2135 return;
2136 }
2137
2138
2139
2140
2141
2142 if (!endOfBatch || this.syncFuturesCount <= 0) return;
2143
2144
2145
2146 if (LOG.isTraceEnabled()) {
2147 LOG.trace("Sequence=" + sequence + ", syncCount=" + this.syncFuturesCount);
2148 }
2149
2150 if (this.exception == null) {
2151
2152
2153 this.syncRunnerIndex = (this.syncRunnerIndex + 1) % this.syncRunners.length;
2154 try {
2155 this.syncRunners[this.syncRunnerIndex].offer(sequence, this.syncFutures,
2156 this.syncFuturesCount);
2157 } catch (Exception e) {
2158
2159 requestLogRoll();
2160 this.exception = new DamagedWALException("Failed offering sync", e);
2161 }
2162 }
2163
2164 if (this.exception != null) {
2165 cleanupOutstandingSyncsOnException(sequence,
2166 new DamagedWALException("On sync", this.exception));
2167 }
2168 attainSafePoint(sequence);
2169 this.syncFuturesCount = 0;
2170 } catch (Throwable t) {
2171 LOG.error("UNEXPECTED!!! syncFutures.length=" + this.syncFutures.length, t);
2172 }
2173 }
2174
2175 SafePointZigZagLatch attainSafePoint() {
2176 this.zigzagLatch = new SafePointZigZagLatch();
2177 return this.zigzagLatch;
2178 }
2179
2180
2181
2182
2183
2184 private void attainSafePoint(final long currentSequence) {
2185 if (this.zigzagLatch == null || !this.zigzagLatch.isCocked()) return;
2186
2187 beforeWaitOnSafePoint();
2188 try {
2189
2190
2191
2192 while ((!this.shutdown && this.zigzagLatch.isCocked() &&
2193 highestSyncedSequence.get() < currentSequence &&
2194
2195
2196 isOutstandingSyncs())
2197
2198 || isOutstandingSyncsFromRunners()) {
2199 synchronized (this.safePointWaiter) {
2200 this.safePointWaiter.wait(0, 1);
2201 }
2202 }
2203
2204
2205
2206 this.exception = null;
2207 this.zigzagLatch.safePointAttained();
2208 } catch (InterruptedException e) {
2209 LOG.warn("Interrupted ", e);
2210 Thread.currentThread().interrupt();
2211 }
2212 }
2213
2214 private void updateOldestUnflushedSequenceIds(byte[] encodedRegionName,
2215 Set<byte[]> familyNameSet, Long lRegionSequenceId) {
2216 ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
2217 getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(encodedRegionName);
2218 for (byte[] familyName : familyNameSet) {
2219 oldestUnflushedStoreSequenceIdsOfRegion.putIfAbsent(familyName, lRegionSequenceId);
2220 }
2221 }
2222
2223
2224
2225
2226
2227
2228 void append(final FSWALEntry entry) throws Exception {
2229
2230 atHeadOfRingBufferEventHandlerAppend();
2231
2232 long start = EnvironmentEdgeManager.currentTime();
2233 byte [] encodedRegionName = entry.getKey().getEncodedRegionName();
2234 long regionSequenceId = WALKey.NO_SEQUENCE_ID;
2235 try {
2236
2237
2238
2239 regionSequenceId = entry.stampRegionSequenceId();
2240
2241
2242
2243
2244 if (entry.getEdit().isEmpty()) {
2245 return;
2246 }
2247
2248
2249 if (!coprocessorHost.preWALWrite(entry.getHRegionInfo(), entry.getKey(),
2250 entry.getEdit())) {
2251 if (entry.getEdit().isReplay()) {
2252
2253 entry.getKey().setScopes(null);
2254 }
2255 }
2256 if (!listeners.isEmpty()) {
2257 for (WALActionsListener i: listeners) {
2258
2259 i.visitLogEntryBeforeWrite(entry.getHTableDescriptor(), entry.getKey(),
2260 entry.getEdit());
2261 }
2262 }
2263
2264 writer.append(entry);
2265 assert highestUnsyncedSequence < entry.getSequence();
2266 highestUnsyncedSequence = entry.getSequence();
2267 Long lRegionSequenceId = Long.valueOf(regionSequenceId);
2268 highestRegionSequenceIds.put(encodedRegionName, lRegionSequenceId);
2269 if (entry.isInMemstore()) {
2270 updateOldestUnflushedSequenceIds(encodedRegionName,
2271 entry.getFamilyNames(), lRegionSequenceId);
2272 }
2273
2274 coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit());
2275
2276 postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
2277 } catch (Exception e) {
2278 String msg = "Failed appending " + regionSequenceId + ", requesting roll of WAL";
2279 LOG.warn(msg, e);
2280 requestLogRoll();
2281 throw new DamagedWALException(msg, e);
2282 }
2283 numEntries.incrementAndGet();
2284 }
2285
2286 @Override
2287 public void onStart() {
2288 for (SyncRunner syncRunner: this.syncRunners) syncRunner.start();
2289 }
2290
2291 @Override
2292 public void onShutdown() {
2293 for (SyncRunner syncRunner: this.syncRunners) syncRunner.interrupt();
2294 }
2295 }
2296
2297
2298
2299
2300 @VisibleForTesting
2301 void atHeadOfRingBufferEventHandlerAppend() {
2302
2303 }
2304
2305 private static IOException ensureIOException(final Throwable t) {
2306 return (t instanceof IOException)? (IOException)t: new IOException(t);
2307 }
2308
2309 private static void usage() {
2310 System.err.println("Usage: FSHLog <ARGS>");
2311 System.err.println("Arguments:");
2312 System.err.println(" --dump Dump textual representation of passed one or more files");
2313 System.err.println(" For example: " +
2314 "FSHLog --dump hdfs://example.com:9000/hbase/.logs/MACHINE/LOGFILE");
2315 System.err.println(" --split Split the passed directory of WAL logs");
2316 System.err.println(" For example: " +
2317 "FSHLog --split hdfs://example.com:9000/hbase/.logs/DIR");
2318 }
2319
2320
2321
2322
2323
2324
2325
2326
2327 public static void main(String[] args) throws IOException {
2328 if (args.length < 2) {
2329 usage();
2330 System.exit(-1);
2331 }
2332
2333 if (args[0].compareTo("--dump") == 0) {
2334 WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
2335 } else if (args[0].compareTo("--perf") == 0) {
2336 LOG.fatal("Please use the WALPerformanceEvaluation tool instead. i.e.:");
2337 LOG.fatal("\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " +
2338 args[1]);
2339 System.exit(-1);
2340 } else if (args[0].compareTo("--split") == 0) {
2341 Configuration conf = HBaseConfiguration.create();
2342 for (int i = 1; i < args.length; i++) {
2343 try {
2344 Path logPath = new Path(args[i]);
2345 FSUtils.setFsDefault(conf, logPath);
2346 split(conf, logPath);
2347 } catch (IOException t) {
2348 t.printStackTrace(System.err);
2349 System.exit(-1);
2350 }
2351 }
2352 } else {
2353 usage();
2354 System.exit(-1);
2355 }
2356 }
2357
2358
2359
2360
2361
2362 private Method getGetPipeline(final FSDataOutputStream os) {
2363 Method m = null;
2364 if (os != null) {
2365 Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream()
2366 .getClass();
2367 try {
2368 m = wrappedStreamClass.getDeclaredMethod("getPipeline",
2369 new Class<?>[] {});
2370 m.setAccessible(true);
2371 } catch (NoSuchMethodException e) {
2372 LOG.info("FileSystem's output stream doesn't support"
2373 + " getPipeline; not available; fsOut="
2374 + wrappedStreamClass.getName());
2375 } catch (SecurityException e) {
2376 LOG.info(
2377 "Doesn't have access to getPipeline on "
2378 + "FileSystems's output stream ; fsOut="
2379 + wrappedStreamClass.getName(), e);
2380 m = null;
2381 }
2382 }
2383 return m;
2384 }
2385
2386
2387
2388
2389 @VisibleForTesting
2390 DatanodeInfo[] getPipeLine() {
2391 if (this.getPipeLine != null && this.hdfs_out != null) {
2392 Object repl;
2393 try {
2394 repl = this.getPipeLine.invoke(getOutputStream(), NO_ARGS);
2395 if (repl instanceof DatanodeInfo[]) {
2396 return ((DatanodeInfo[]) repl);
2397 }
2398 } catch (Exception e) {
2399 LOG.info("Get pipeline failed", e);
2400 }
2401 }
2402 return new DatanodeInfo[0];
2403 }
2404 }