1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2.store.wal;
20
21 import java.io.IOException;
22 import java.io.FileNotFoundException;
23 import java.util.concurrent.atomic.AtomicBoolean;
24 import java.util.concurrent.atomic.AtomicLong;
25 import java.util.concurrent.atomic.AtomicReference;
26 import java.util.concurrent.locks.Condition;
27 import java.util.concurrent.locks.ReentrantLock;
28 import java.util.concurrent.LinkedTransferQueue;
29 import java.util.concurrent.CopyOnWriteArrayList;
30 import java.util.concurrent.TimeUnit;
31 import java.util.Arrays;
32 import java.util.ArrayList;
33 import java.util.Collections;
34 import java.util.Comparator;
35 import java.util.HashSet;
36 import java.util.Iterator;
37 import java.util.LinkedList;
38 import java.util.Set;
39
40 import org.apache.commons.collections.buffer.CircularFifoBuffer;
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43 import org.apache.hadoop.conf.Configuration;
44 import org.apache.hadoop.fs.FSDataOutputStream;
45 import org.apache.hadoop.fs.FileAlreadyExistsException;
46 import org.apache.hadoop.fs.FileStatus;
47 import org.apache.hadoop.fs.FileSystem;
48 import org.apache.hadoop.fs.Path;
49 import org.apache.hadoop.fs.PathFilter;
50 import org.apache.hadoop.hbase.classification.InterfaceAudience;
51 import org.apache.hadoop.hbase.classification.InterfaceStability;
52 import org.apache.hadoop.hbase.procedure2.Procedure;
53 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
54 import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
55 import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
56 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
57 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
58 import org.apache.hadoop.hbase.util.Threads;
59 import org.apache.hadoop.ipc.RemoteException;
60
61 import com.google.common.annotations.VisibleForTesting;
62
63
64
65
66 @InterfaceAudience.Private
67 @InterfaceStability.Evolving
68 public class WALProcedureStore implements ProcedureStore {
69 private static final Log LOG = LogFactory.getLog(WALProcedureStore.class);
70
71 public interface LeaseRecovery {
72 void recoverFileLease(FileSystem fs, Path path) throws IOException;
73 }
74
75 private static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY =
76 "hbase.procedure.store.wal.max.retries.before.roll";
77 private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL = 3;
78
79 private static final String WAIT_BEFORE_ROLL_CONF_KEY =
80 "hbase.procedure.store.wal.wait.before.roll";
81 private static final int DEFAULT_WAIT_BEFORE_ROLL = 500;
82
83 private static final String ROLL_RETRIES_CONF_KEY =
84 "hbase.procedure.store.wal.max.roll.retries";
85 private static final int DEFAULT_ROLL_RETRIES = 3;
86
87 private static final String MAX_SYNC_FAILURE_ROLL_CONF_KEY =
88 "hbase.procedure.store.wal.sync.failure.roll.max";
89 private static final int DEFAULT_MAX_SYNC_FAILURE_ROLL = 3;
90
91 private static final String PERIODIC_ROLL_CONF_KEY =
92 "hbase.procedure.store.wal.periodic.roll.msec";
93 private static final int DEFAULT_PERIODIC_ROLL = 60 * 60 * 1000;
94
95 private static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec";
96 private static final int DEFAULT_SYNC_WAIT_MSEC = 100;
97
98 private static final String USE_HSYNC_CONF_KEY = "hbase.procedure.store.wal.use.hsync";
99 private static final boolean DEFAULT_USE_HSYNC = true;
100
101 private static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold";
102 private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024;
103
104 private final CopyOnWriteArrayList<ProcedureStoreListener> listeners =
105 new CopyOnWriteArrayList<ProcedureStoreListener>();
106
107 private static final String STORE_WAL_SYNC_STATS_COUNT =
108 "hbase.procedure.store.wal.sync.stats.count";
109 private static final int DEFAULT_SYNC_STATS_COUNT = 10;
110
111 private final LinkedList<ProcedureWALFile> logs = new LinkedList<ProcedureWALFile>();
112 private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker();
113 private final AtomicBoolean running = new AtomicBoolean(false);
114
115 private final ReentrantLock lock = new ReentrantLock();
116 private final Condition waitCond = lock.newCondition();
117 private final Condition slotCond = lock.newCondition();
118 private final Condition syncCond = lock.newCondition();
119
120 private final LeaseRecovery leaseRecovery;
121 private final Configuration conf;
122 private final FileSystem fs;
123 private final Path walDir;
124
125 private final AtomicReference<Throwable> syncException = new AtomicReference<Throwable>();
126 private final AtomicBoolean loading = new AtomicBoolean(true);
127 private final AtomicBoolean inSync = new AtomicBoolean(false);
128 private final AtomicLong totalSynced = new AtomicLong(0);
129 private final AtomicLong lastRollTs = new AtomicLong(0);
130
131 private LinkedTransferQueue<ByteSlot> slotsCache = null;
132 private Set<ProcedureWALFile> corruptedLogs = null;
133 private FSDataOutputStream stream = null;
134 private long flushLogId = 0;
135 private int slotIndex = 0;
136 private Thread syncThread;
137 private ByteSlot[] slots;
138
139 private int maxRetriesBeforeRoll;
140 private int maxSyncFailureRoll;
141 private int waitBeforeRoll;
142 private int rollRetries;
143 private int periodicRollMsec;
144 private long rollThreshold;
145 private boolean useHsync;
146 private int syncWaitMsec;
147
148
149 private CircularFifoBuffer syncMetricsBuffer;
150
151 public static class SyncMetrics {
152 private long timestamp;
153 private long syncWaitMs;
154 private long totalSyncedBytes;
155 private int syncedEntries;
156 private float syncedPerSec;
157
158 public long getTimestamp() {
159 return timestamp;
160 }
161
162 public long getSyncWaitMs() {
163 return syncWaitMs;
164 }
165
166 public long getTotalSyncedBytes() {
167 return totalSyncedBytes;
168 }
169
170 public long getSyncedEntries() {
171 return syncedEntries;
172 }
173
174 public float getSyncedPerSec() {
175 return syncedPerSec;
176 }
177 }
178
179 public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir,
180 final LeaseRecovery leaseRecovery) {
181 this.fs = fs;
182 this.conf = conf;
183 this.walDir = walDir;
184 this.leaseRecovery = leaseRecovery;
185 }
186
187 @Override
188 public void start(int numSlots) throws IOException {
189 if (running.getAndSet(true)) {
190 return;
191 }
192
193
194 loading.set(true);
195 slots = new ByteSlot[numSlots];
196 slotsCache = new LinkedTransferQueue();
197 while (slotsCache.size() < numSlots) {
198 slotsCache.offer(new ByteSlot());
199 }
200
201
202 maxRetriesBeforeRoll =
203 conf.getInt(MAX_RETRIES_BEFORE_ROLL_CONF_KEY, DEFAULT_MAX_RETRIES_BEFORE_ROLL);
204 maxSyncFailureRoll = conf.getInt(MAX_SYNC_FAILURE_ROLL_CONF_KEY, DEFAULT_MAX_SYNC_FAILURE_ROLL);
205 waitBeforeRoll = conf.getInt(WAIT_BEFORE_ROLL_CONF_KEY, DEFAULT_WAIT_BEFORE_ROLL);
206 rollRetries = conf.getInt(ROLL_RETRIES_CONF_KEY, DEFAULT_ROLL_RETRIES);
207 rollThreshold = conf.getLong(ROLL_THRESHOLD_CONF_KEY, DEFAULT_ROLL_THRESHOLD);
208 periodicRollMsec = conf.getInt(PERIODIC_ROLL_CONF_KEY, DEFAULT_PERIODIC_ROLL);
209 syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC);
210 useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC);
211
212
213 syncMetricsBuffer = new CircularFifoBuffer(
214 conf.getInt(STORE_WAL_SYNC_STATS_COUNT, DEFAULT_SYNC_STATS_COUNT));
215
216
217 syncThread = new Thread("WALProcedureStoreSyncThread") {
218 @Override
219 public void run() {
220 try {
221 syncLoop();
222 } catch (Throwable e) {
223 LOG.error("Got an exception from the sync-loop", e);
224 if (!isSyncAborted()) {
225 sendAbortProcessSignal();
226 }
227 }
228 }
229 };
230 syncThread.start();
231 }
232
233 @Override
234 public void stop(boolean abort) {
235 if (!running.getAndSet(false)) {
236 return;
237 }
238
239 LOG.info("Stopping the WAL Procedure Store");
240 sendStopSignal();
241
242 if (!abort) {
243 try {
244 while (syncThread.isAlive()) {
245 sendStopSignal();
246 syncThread.join(250);
247 }
248 } catch (InterruptedException e) {
249 LOG.warn("join interrupted", e);
250 Thread.currentThread().interrupt();
251 }
252 }
253
254
255 closeStream();
256
257
258
259
260 for (ProcedureWALFile log: logs) {
261 log.close();
262 }
263 logs.clear();
264 }
265
266 private void sendStopSignal() {
267 if (lock.tryLock()) {
268 try {
269 waitCond.signalAll();
270 syncCond.signalAll();
271 } finally {
272 lock.unlock();
273 }
274 }
275 }
276
277 @Override
278 public boolean isRunning() {
279 return running.get();
280 }
281
282 @Override
283 public int getNumThreads() {
284 return slots == null ? 0 : slots.length;
285 }
286
287 public ProcedureStoreTracker getStoreTracker() {
288 return storeTracker;
289 }
290
291 public ArrayList<ProcedureWALFile> getActiveLogs() {
292 lock.lock();
293 try {
294 return new ArrayList<ProcedureWALFile>(logs);
295 } finally {
296 lock.unlock();
297 }
298 }
299
300 public Set<ProcedureWALFile> getCorruptedLogs() {
301 return corruptedLogs;
302 }
303
304 @Override
305 public void registerListener(ProcedureStoreListener listener) {
306 this.listeners.add(listener);
307 }
308
309 @Override
310 public boolean unregisterListener(ProcedureStoreListener listener) {
311 return this.listeners.remove(listener);
312 }
313
314 @Override
315 public void recoverLease() throws IOException {
316 lock.lock();
317 try {
318 LOG.info("Starting WAL Procedure Store lease recovery");
319 FileStatus[] oldLogs = getLogFiles();
320 while (isRunning()) {
321
322 try {
323 flushLogId = initOldLogs(oldLogs);
324 } catch (FileNotFoundException e) {
325 LOG.warn("someone else is active and deleted logs. retrying.", e);
326 oldLogs = getLogFiles();
327 continue;
328 }
329
330
331 if (!rollWriter(flushLogId + 1)) {
332
333 LOG.debug("someone else has already created log " + flushLogId);
334 continue;
335 }
336
337
338 oldLogs = getLogFiles();
339 if (getMaxLogId(oldLogs) > flushLogId) {
340 if (LOG.isDebugEnabled()) {
341 LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId);
342 }
343 logs.getLast().removeFile();
344 continue;
345 }
346
347 LOG.info("Lease acquired for flushLogId: " + flushLogId);
348 break;
349 }
350 } finally {
351 lock.unlock();
352 }
353 }
354
355 @Override
356 public Iterator<Procedure> load() throws IOException {
357 if (logs.isEmpty()) {
358 throw new RuntimeException("recoverLease() must be called before loading data");
359 }
360
361
362 if (logs.size() == 1) {
363 if (LOG.isDebugEnabled()) {
364 LOG.debug("No state logs to replay.");
365 }
366 loading.set(false);
367 return null;
368 }
369
370
371 Iterator<ProcedureWALFile> it = logs.descendingIterator();
372 it.next();
373 try {
374 return ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() {
375 @Override
376 public void markCorruptedWAL(ProcedureWALFile log, IOException e) {
377 if (corruptedLogs == null) {
378 corruptedLogs = new HashSet<ProcedureWALFile>();
379 }
380 corruptedLogs.add(log);
381
382 }
383 });
384 } finally {
385 loading.set(false);
386 }
387 }
388
389 @Override
390 public void insert(final Procedure proc, final Procedure[] subprocs) {
391 if (LOG.isTraceEnabled()) {
392 LOG.trace("Insert " + proc + ", subproc=" + Arrays.toString(subprocs));
393 }
394
395 ByteSlot slot = acquireSlot();
396 try {
397
398 long[] subProcIds = null;
399 if (subprocs != null) {
400 ProcedureWALFormat.writeInsert(slot, proc, subprocs);
401 subProcIds = new long[subprocs.length];
402 for (int i = 0; i < subprocs.length; ++i) {
403 subProcIds[i] = subprocs[i].getProcId();
404 }
405 } else {
406 assert !proc.hasParent();
407 ProcedureWALFormat.writeInsert(slot, proc);
408 }
409
410
411 pushData(PushType.INSERT, slot, proc.getProcId(), subProcIds);
412 } catch (IOException e) {
413
414
415 LOG.fatal("Unable to serialize one of the procedure: proc=" + proc +
416 ", subprocs=" + Arrays.toString(subprocs), e);
417 throw new RuntimeException(e);
418 } finally {
419 releaseSlot(slot);
420 }
421 }
422
423 @Override
424 public void update(final Procedure proc) {
425 if (LOG.isTraceEnabled()) {
426 LOG.trace("Update " + proc);
427 }
428
429 ByteSlot slot = acquireSlot();
430 try {
431
432 ProcedureWALFormat.writeUpdate(slot, proc);
433
434
435 pushData(PushType.UPDATE, slot, proc.getProcId(), null);
436 } catch (IOException e) {
437
438
439 LOG.fatal("Unable to serialize the procedure: " + proc, e);
440 throw new RuntimeException(e);
441 } finally {
442 releaseSlot(slot);
443 }
444 }
445
446 @Override
447 public void delete(final long procId) {
448 if (LOG.isTraceEnabled()) {
449 LOG.trace("Delete " + procId);
450 }
451
452 ByteSlot slot = acquireSlot();
453 try {
454
455 ProcedureWALFormat.writeDelete(slot, procId);
456
457
458 pushData(PushType.DELETE, slot, procId, null);
459 } catch (IOException e) {
460
461
462 LOG.fatal("Unable to serialize the procedure: " + procId, e);
463 throw new RuntimeException(e);
464 } finally {
465 releaseSlot(slot);
466 }
467 }
468
469 private ByteSlot acquireSlot() {
470 ByteSlot slot = slotsCache.poll();
471 return slot != null ? slot : new ByteSlot();
472 }
473
474 private void releaseSlot(final ByteSlot slot) {
475 slot.reset();
476 slotsCache.offer(slot);
477 }
478
479 private enum PushType { INSERT, UPDATE, DELETE };
480
481 private long pushData(final PushType type, final ByteSlot slot,
482 final long procId, final long[] subProcIds) {
483 if (!isRunning()) {
484 throw new RuntimeException("the store must be running before inserting data");
485 }
486 if (logs.isEmpty()) {
487 throw new RuntimeException("recoverLease() must be called before inserting data");
488 }
489
490 long logId = -1;
491 lock.lock();
492 try {
493
494 while (true) {
495 if (!isRunning()) {
496 throw new RuntimeException("store no longer running");
497 } else if (isSyncAborted()) {
498 throw new RuntimeException("sync aborted", syncException.get());
499 } else if (inSync.get()) {
500 syncCond.await();
501 } else if (slotIndex == slots.length) {
502 slotCond.signal();
503 syncCond.await();
504 } else {
505 break;
506 }
507 }
508
509 updateStoreTracker(type, procId, subProcIds);
510 slots[slotIndex++] = slot;
511 logId = flushLogId;
512
513
514 if (slotIndex == 1) {
515 waitCond.signal();
516 }
517
518
519 if (slotIndex == slots.length) {
520 waitCond.signal();
521 slotCond.signal();
522 }
523
524 syncCond.await();
525 } catch (InterruptedException e) {
526 Thread.currentThread().interrupt();
527 sendAbortProcessSignal();
528 throw new RuntimeException(e);
529 } finally {
530 lock.unlock();
531 if (isSyncAborted()) {
532 throw new RuntimeException("sync aborted", syncException.get());
533 }
534 }
535 return logId;
536 }
537
538 private void updateStoreTracker(final PushType type,
539 final long procId, final long[] subProcIds) {
540 switch (type) {
541 case INSERT:
542 if (subProcIds == null) {
543 storeTracker.insert(procId);
544 } else {
545 storeTracker.insert(procId, subProcIds);
546 }
547 break;
548 case UPDATE:
549 storeTracker.update(procId);
550 break;
551 case DELETE:
552 storeTracker.delete(procId);
553 break;
554 default:
555 throw new RuntimeException("invalid push type " + type);
556 }
557 }
558
559 private boolean isSyncAborted() {
560 return syncException.get() != null;
561 }
562
563 private void syncLoop() throws Throwable {
564 long totalSyncedToStore = 0;
565 inSync.set(false);
566 lock.lock();
567 try {
568 while (isRunning()) {
569 try {
570
571 if (slotIndex == 0) {
572 if (!loading.get()) {
573 periodicRoll();
574 }
575
576 if (LOG.isTraceEnabled()) {
577 float rollTsSec = getMillisFromLastRoll() / 1000.0f;
578 LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)",
579 StringUtils.humanSize(totalSynced.get()),
580 StringUtils.humanSize(totalSynced.get() / rollTsSec)));
581 }
582
583 waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS);
584 if (slotIndex == 0) {
585
586 continue;
587 }
588 }
589
590 final long syncWaitSt = System.currentTimeMillis();
591 if (slotIndex != slots.length) {
592 slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
593 }
594
595 final long currentTs = System.currentTimeMillis();
596 final long syncWaitMs = currentTs - syncWaitSt;
597 final float rollSec = getMillisFromLastRoll() / 1000.0f;
598 final float syncedPerSec = totalSyncedToStore / rollSec;
599 if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) {
600 LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)",
601 StringUtils.humanTimeDiff(syncWaitMs), slotIndex,
602 StringUtils.humanSize(totalSyncedToStore),
603 StringUtils.humanSize(syncedPerSec)));
604 }
605
606
607 final SyncMetrics syncMetrics = new SyncMetrics();
608 syncMetrics.timestamp = currentTs;
609 syncMetrics.syncWaitMs = syncWaitMs;
610 syncMetrics.syncedEntries = slotIndex;
611 syncMetrics.totalSyncedBytes = totalSyncedToStore;
612 syncMetrics.syncedPerSec = syncedPerSec;
613 syncMetricsBuffer.add(syncMetrics);
614
615
616 inSync.set(true);
617 long slotSize = syncSlots();
618 logs.getLast().addToSize(slotSize);
619 totalSyncedToStore = totalSynced.addAndGet(slotSize);
620 slotIndex = 0;
621 inSync.set(false);
622 } catch (InterruptedException e) {
623 Thread.currentThread().interrupt();
624 sendAbortProcessSignal();
625 syncException.compareAndSet(null, e);
626 throw e;
627 } catch (Throwable t) {
628 syncException.compareAndSet(null, t);
629 throw t;
630 } finally {
631 syncCond.signalAll();
632 }
633 }
634 } finally {
635 lock.unlock();
636 }
637 }
638
639 public ArrayList<SyncMetrics> getSyncMetrics() {
640 lock.lock();
641 try {
642 return new ArrayList<SyncMetrics>(syncMetricsBuffer);
643 } finally {
644 lock.unlock();
645 }
646 }
647
648 private long syncSlots() throws Throwable {
649 int retry = 0;
650 int logRolled = 0;
651 long totalSynced = 0;
652 do {
653 try {
654 totalSynced = syncSlots(stream, slots, 0, slotIndex);
655 break;
656 } catch (Throwable e) {
657 LOG.warn("unable to sync slots, retry=" + retry);
658 if (++retry >= maxRetriesBeforeRoll) {
659 if (logRolled >= maxSyncFailureRoll) {
660 LOG.error("Sync slots after log roll failed, abort.", e);
661 sendAbortProcessSignal();
662 throw e;
663 }
664
665 if (!rollWriterOrDie()) {
666 throw e;
667 }
668
669 logRolled++;
670 retry = 0;
671 }
672 }
673 } while (running.get());
674 return totalSynced;
675 }
676
677 protected long syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int offset, int count)
678 throws IOException {
679 long totalSynced = 0;
680 for (int i = 0; i < count; ++i) {
681 ByteSlot data = slots[offset + i];
682 data.writeTo(stream);
683 totalSynced += data.size();
684 }
685
686 if (useHsync) {
687 stream.hsync();
688 } else {
689 stream.hflush();
690 }
691 sendPostSyncSignal();
692
693 if (LOG.isTraceEnabled()) {
694 LOG.trace("Sync slots=" + count + '/' + slots.length +
695 ", flushed=" + StringUtils.humanSize(totalSynced));
696 }
697 return totalSynced;
698 }
699
700 protected void sendPostSyncSignal() {
701 if (!this.listeners.isEmpty()) {
702 for (ProcedureStoreListener listener : this.listeners) {
703 listener.postSync();
704 }
705 }
706 }
707
708 private void sendAbortProcessSignal() {
709 if (!this.listeners.isEmpty()) {
710 for (ProcedureStoreListener listener : this.listeners) {
711 listener.abortProcess();
712 }
713 }
714 }
715
716 private boolean rollWriterOrDie() {
717 for (int i = 0; i < rollRetries; ++i) {
718 if (i > 0) Threads.sleepWithoutInterrupt(waitBeforeRoll * i);
719
720 try {
721 if (rollWriter()) {
722 return true;
723 }
724 } catch (IOException e) {
725 LOG.warn("Unable to roll the log, attempt=" + (i + 1), e);
726 }
727 }
728 LOG.fatal("Unable to roll the log");
729 sendAbortProcessSignal();
730 throw new RuntimeException("unable to roll the log");
731 }
732
733 private boolean tryRollWriter() {
734 try {
735 return rollWriter();
736 } catch (IOException e) {
737 LOG.warn("Unable to roll the log", e);
738 return false;
739 }
740 }
741
742 public long getMillisToNextPeriodicRoll() {
743 if (lastRollTs.get() > 0 && periodicRollMsec > 0) {
744 return periodicRollMsec - getMillisFromLastRoll();
745 }
746 return Long.MAX_VALUE;
747 }
748
749 public long getMillisFromLastRoll() {
750 return (System.currentTimeMillis() - lastRollTs.get());
751 }
752
753 @VisibleForTesting
754 protected void periodicRollForTesting() throws IOException {
755 lock.lock();
756 try {
757 periodicRoll();
758 } finally {
759 lock.unlock();
760 }
761 }
762
763 @VisibleForTesting
764 protected boolean rollWriterForTesting() throws IOException {
765 lock.lock();
766 try {
767 return rollWriter();
768 } finally {
769 lock.unlock();
770 }
771 }
772
773 private void periodicRoll() throws IOException {
774 if (storeTracker.isEmpty()) {
775 if (LOG.isTraceEnabled()) {
776 LOG.trace("no active procedures");
777 }
778 tryRollWriter();
779 removeAllLogs(flushLogId - 1);
780 } else {
781 if (storeTracker.isUpdated()) {
782 if (LOG.isTraceEnabled()) {
783 LOG.trace("all the active procedures are in the latest log");
784 }
785 removeAllLogs(flushLogId - 1);
786 }
787
788
789
790 if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) {
791 tryRollWriter();
792 }
793
794 removeInactiveLogs();
795 }
796 }
797
798 private boolean rollWriter() throws IOException {
799
800 if (!rollWriter(flushLogId + 1)) {
801 LOG.warn("someone else has already created log " + flushLogId);
802 return false;
803 }
804
805
806
807 if (getMaxLogId(getLogFiles()) > flushLogId) {
808 LOG.warn("Someone else created new logs. Expected maxLogId < " + flushLogId);
809 logs.getLast().removeFile();
810 return false;
811 }
812
813
814 return true;
815 }
816
817 private boolean rollWriter(final long logId) throws IOException {
818 assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId;
819 assert lock.isHeldByCurrentThread() : "expected to be the lock owner. " + lock.isLocked();
820
821 ProcedureWALHeader header = ProcedureWALHeader.newBuilder()
822 .setVersion(ProcedureWALFormat.HEADER_VERSION)
823 .setType(ProcedureWALFormat.LOG_TYPE_STREAM)
824 .setMinProcId(storeTracker.getMinProcId())
825 .setLogId(logId)
826 .build();
827
828 FSDataOutputStream newStream = null;
829 Path newLogFile = null;
830 long startPos = -1;
831 newLogFile = getLogFilePath(logId);
832 try {
833 newStream = fs.create(newLogFile, false);
834 } catch (FileAlreadyExistsException e) {
835 LOG.error("Log file with id=" + logId + " already exists", e);
836 return false;
837 } catch (RemoteException re) {
838 LOG.warn("failed to create log file with id=" + logId, re);
839 return false;
840 }
841 try {
842 ProcedureWALFormat.writeHeader(newStream, header);
843 startPos = newStream.getPos();
844 } catch (IOException ioe) {
845 LOG.warn("Encountered exception writing header", ioe);
846 newStream.close();
847 return false;
848 }
849
850 closeStream();
851
852 storeTracker.resetUpdates();
853 stream = newStream;
854 flushLogId = logId;
855 totalSynced.set(0);
856 long rollTs = System.currentTimeMillis();
857 lastRollTs.set(rollTs);
858 logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos, rollTs));
859
860 if (LOG.isDebugEnabled()) {
861 LOG.debug("Roll new state log: " + logId);
862 }
863 return true;
864 }
865
866 private void closeStream() {
867 try {
868 if (stream != null) {
869 try {
870 ProcedureWALFile log = logs.getLast();
871 log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId());
872 long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker);
873 log.addToSize(trailerSize);
874 } catch (IOException e) {
875 LOG.warn("Unable to write the trailer: " + e.getMessage());
876 }
877 stream.close();
878 }
879 } catch (IOException e) {
880 LOG.error("Unable to close the stream", e);
881 } finally {
882 stream = null;
883 }
884 }
885
886
887
888
889 private void removeInactiveLogs() {
890
891 while (logs.size() > 1) {
892 ProcedureWALFile log = logs.getFirst();
893 if (storeTracker.isTracking(log.getMinProcId(), log.getMaxProcId())) {
894 break;
895 }
896 removeLogFile(log);
897 }
898 }
899
900 private void removeAllLogs(long lastLogId) {
901 if (logs.size() <= 1) return;
902
903 if (LOG.isDebugEnabled()) {
904 LOG.debug("Remove all state logs with ID less than " + lastLogId);
905 }
906 while (logs.size() > 1) {
907 ProcedureWALFile log = logs.getFirst();
908 if (lastLogId < log.getLogId()) {
909 break;
910 }
911 removeLogFile(log);
912 }
913 }
914
915 private boolean removeLogFile(final ProcedureWALFile log) {
916 try {
917 if (LOG.isDebugEnabled()) {
918 LOG.debug("Remove log: " + log);
919 }
920 log.removeFile();
921 logs.remove(log);
922 LOG.info("Remove log: " + log);
923 LOG.info("Removed logs: " + logs);
924 if (logs.size() == 0) { LOG.error("Expected at least one log"); }
925 assert logs.size() > 0 : "expected at least one log";
926 } catch (IOException e) {
927 LOG.error("Unable to remove log: " + log, e);
928 return false;
929 }
930 return true;
931 }
932
933
934
935
936 public Path getWALDir() {
937 return this.walDir;
938 }
939
940 public FileSystem getFileSystem() {
941 return this.fs;
942 }
943
944 protected Path getLogFilePath(final long logId) throws IOException {
945 return new Path(walDir, String.format("state-%020d.log", logId));
946 }
947
948 private static long getLogIdFromName(final String name) {
949 int end = name.lastIndexOf(".log");
950 int start = name.lastIndexOf('-') + 1;
951 while (start < end) {
952 if (name.charAt(start) != '0')
953 break;
954 start++;
955 }
956 return Long.parseLong(name.substring(start, end));
957 }
958
959 private static final PathFilter WALS_PATH_FILTER = new PathFilter() {
960 @Override
961 public boolean accept(Path path) {
962 String name = path.getName();
963 return name.startsWith("state-") && name.endsWith(".log");
964 }
965 };
966
967 private static final Comparator<FileStatus> FILE_STATUS_ID_COMPARATOR =
968 new Comparator<FileStatus>() {
969 @Override
970 public int compare(FileStatus a, FileStatus b) {
971 final long aId = getLogIdFromName(a.getPath().getName());
972 final long bId = getLogIdFromName(b.getPath().getName());
973 return Long.compare(aId, bId);
974 }
975 };
976
977 private FileStatus[] getLogFiles() throws IOException {
978 try {
979 FileStatus[] files = fs.listStatus(walDir, WALS_PATH_FILTER);
980 Arrays.sort(files, FILE_STATUS_ID_COMPARATOR);
981 return files;
982 } catch (FileNotFoundException e) {
983 LOG.warn("Log directory not found: " + e.getMessage());
984 return null;
985 }
986 }
987
988 private static long getMaxLogId(final FileStatus[] logFiles) {
989 long maxLogId = 0;
990 if (logFiles != null && logFiles.length > 0) {
991 for (int i = 0; i < logFiles.length; ++i) {
992 maxLogId = Math.max(maxLogId, getLogIdFromName(logFiles[i].getPath().getName()));
993 }
994 }
995 return maxLogId;
996 }
997
998
999
1000
1001 private long initOldLogs(final FileStatus[] logFiles) throws IOException {
1002 this.logs.clear();
1003
1004 long maxLogId = 0;
1005 if (logFiles != null && logFiles.length > 0) {
1006 for (int i = 0; i < logFiles.length; ++i) {
1007 final Path logPath = logFiles[i].getPath();
1008 leaseRecovery.recoverFileLease(fs, logPath);
1009 maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName()));
1010
1011 ProcedureWALFile log = initOldLog(logFiles[i]);
1012 if (log != null) {
1013 this.logs.add(log);
1014 }
1015 }
1016 Collections.sort(this.logs);
1017 initTrackerFromOldLogs();
1018 }
1019 return maxLogId;
1020 }
1021
1022 private void initTrackerFromOldLogs() {
1023
1024 if (!logs.isEmpty()) {
1025 ProcedureWALFile log = logs.getLast();
1026 try {
1027 log.readTracker(storeTracker);
1028 } catch (IOException e) {
1029 LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage());
1030
1031 storeTracker.reset();
1032 storeTracker.setPartialFlag(true);
1033 }
1034 }
1035 }
1036
1037 private ProcedureWALFile initOldLog(final FileStatus logFile) throws IOException {
1038 ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
1039 if (logFile.getLen() == 0) {
1040 LOG.warn("Remove uninitialized log: " + logFile);
1041 log.removeFile();
1042 return null;
1043 }
1044 if (LOG.isDebugEnabled()) {
1045 LOG.debug("Opening state-log: " + logFile);
1046 }
1047 try {
1048 log.open();
1049 } catch (ProcedureWALFormat.InvalidWALDataException e) {
1050 LOG.warn("Remove uninitialized log: " + logFile, e);
1051 log.removeFile();
1052 return null;
1053 } catch (IOException e) {
1054 String msg = "Unable to read state log: " + logFile;
1055 LOG.error(msg, e);
1056 throw new IOException(msg, e);
1057 }
1058
1059 if (log.isCompacted()) {
1060 try {
1061 log.readTrailer();
1062 } catch (IOException e) {
1063 LOG.warn("Unfinished compacted log: " + logFile, e);
1064 log.removeFile();
1065 return null;
1066 }
1067 }
1068 return log;
1069 }
1070 }