View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.procedure2.store.wal;
20  
21  import java.io.IOException;
22  import java.io.FileNotFoundException;
23  import java.util.concurrent.atomic.AtomicBoolean;
24  import java.util.concurrent.atomic.AtomicLong;
25  import java.util.concurrent.atomic.AtomicReference;
26  import java.util.concurrent.locks.Condition;
27  import java.util.concurrent.locks.ReentrantLock;
28  import java.util.concurrent.LinkedTransferQueue;
29  import java.util.concurrent.CopyOnWriteArrayList;
30  import java.util.concurrent.TimeUnit;
31  import java.util.Arrays;
32  import java.util.ArrayList;
33  import java.util.Collections;
34  import java.util.Comparator;
35  import java.util.HashSet;
36  import java.util.Iterator;
37  import java.util.LinkedList;
38  import java.util.Set;
39  
40  import org.apache.commons.collections.buffer.CircularFifoBuffer;
41  import org.apache.commons.logging.Log;
42  import org.apache.commons.logging.LogFactory;
43  import org.apache.hadoop.conf.Configuration;
44  import org.apache.hadoop.fs.FSDataOutputStream;
45  import org.apache.hadoop.fs.FileAlreadyExistsException;
46  import org.apache.hadoop.fs.FileStatus;
47  import org.apache.hadoop.fs.FileSystem;
48  import org.apache.hadoop.fs.Path;
49  import org.apache.hadoop.fs.PathFilter;
50  import org.apache.hadoop.hbase.classification.InterfaceAudience;
51  import org.apache.hadoop.hbase.classification.InterfaceStability;
52  import org.apache.hadoop.hbase.procedure2.Procedure;
53  import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
54  import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
55  import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
56  import org.apache.hadoop.hbase.procedure2.util.StringUtils;
57  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
58  import org.apache.hadoop.hbase.util.Threads;
59  import org.apache.hadoop.ipc.RemoteException;
60  
61  import com.google.common.annotations.VisibleForTesting;
62  
63  /**
64   * WAL implementation of the ProcedureStore.
65   */
66  @InterfaceAudience.Private
67  @InterfaceStability.Evolving
68  public class WALProcedureStore implements ProcedureStore {
69    private static final Log LOG = LogFactory.getLog(WALProcedureStore.class);
70  
71    public interface LeaseRecovery {
72      void recoverFileLease(FileSystem fs, Path path) throws IOException;
73    }
74  
75    private static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY =
76      "hbase.procedure.store.wal.max.retries.before.roll";
77    private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL = 3;
78  
79    private static final String WAIT_BEFORE_ROLL_CONF_KEY =
80      "hbase.procedure.store.wal.wait.before.roll";
81    private static final int DEFAULT_WAIT_BEFORE_ROLL = 500;
82  
83    private static final String ROLL_RETRIES_CONF_KEY =
84      "hbase.procedure.store.wal.max.roll.retries";
85    private static final int DEFAULT_ROLL_RETRIES = 3;
86  
87    private static final String MAX_SYNC_FAILURE_ROLL_CONF_KEY =
88      "hbase.procedure.store.wal.sync.failure.roll.max";
89    private static final int DEFAULT_MAX_SYNC_FAILURE_ROLL = 3;
90  
91    private static final String PERIODIC_ROLL_CONF_KEY =
92      "hbase.procedure.store.wal.periodic.roll.msec";
93    private static final int DEFAULT_PERIODIC_ROLL = 60 * 60 * 1000; // 1h
94  
95    private static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec";
96    private static final int DEFAULT_SYNC_WAIT_MSEC = 100;
97  
98    private static final String USE_HSYNC_CONF_KEY = "hbase.procedure.store.wal.use.hsync";
99    private static final boolean DEFAULT_USE_HSYNC = true;
100 
101   private static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold";
102   private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024; // 32M
103 
104   private final CopyOnWriteArrayList<ProcedureStoreListener> listeners =
105     new CopyOnWriteArrayList<ProcedureStoreListener>();
106 
107   private static final String STORE_WAL_SYNC_STATS_COUNT =
108       "hbase.procedure.store.wal.sync.stats.count";
109   private static final int DEFAULT_SYNC_STATS_COUNT = 10;
110 
111   private final LinkedList<ProcedureWALFile> logs = new LinkedList<ProcedureWALFile>();
112   private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker();
113   private final AtomicBoolean running = new AtomicBoolean(false);
114 
115   private final ReentrantLock lock = new ReentrantLock();
116   private final Condition waitCond = lock.newCondition();
117   private final Condition slotCond = lock.newCondition();
118   private final Condition syncCond = lock.newCondition();
119 
120   private final LeaseRecovery leaseRecovery;
121   private final Configuration conf;
122   private final FileSystem fs;
123   private final Path walDir;
124 
125   private final AtomicReference<Throwable> syncException = new AtomicReference<Throwable>();
126   private final AtomicBoolean loading = new AtomicBoolean(true);
127   private final AtomicBoolean inSync = new AtomicBoolean(false);
128   private final AtomicLong totalSynced = new AtomicLong(0);
129   private final AtomicLong lastRollTs = new AtomicLong(0);
130 
131   private LinkedTransferQueue<ByteSlot> slotsCache = null;
132   private Set<ProcedureWALFile> corruptedLogs = null;
133   private FSDataOutputStream stream = null;
134   private long flushLogId = 0;
135   private int slotIndex = 0;
136   private Thread syncThread;
137   private ByteSlot[] slots;
138 
139   private int maxRetriesBeforeRoll;
140   private int maxSyncFailureRoll;
141   private int waitBeforeRoll;
142   private int rollRetries;
143   private int periodicRollMsec;
144   private long rollThreshold;
145   private boolean useHsync;
146   private int syncWaitMsec;
147 
148   // Variables used for UI display
149   private CircularFifoBuffer syncMetricsBuffer;
150 
151   public static class SyncMetrics {
152     private long timestamp;
153     private long syncWaitMs;
154     private long totalSyncedBytes;
155     private int syncedEntries;
156     private float syncedPerSec;
157 
158     public long getTimestamp() {
159       return timestamp;
160     }
161 
162     public long getSyncWaitMs() {
163       return syncWaitMs;
164     }
165 
166     public long getTotalSyncedBytes() {
167       return totalSyncedBytes;
168     }
169 
170     public long getSyncedEntries() {
171       return syncedEntries;
172     }
173 
174     public float getSyncedPerSec() {
175       return syncedPerSec;
176     }
177   }
178 
179   public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir,
180       final LeaseRecovery leaseRecovery) {
181     this.fs = fs;
182     this.conf = conf;
183     this.walDir = walDir;
184     this.leaseRecovery = leaseRecovery;
185   }
186 
187   @Override
188   public void start(int numSlots) throws IOException {
189     if (running.getAndSet(true)) {
190       return;
191     }
192 
193     // Init buffer slots
194     loading.set(true);
195     slots = new ByteSlot[numSlots];
196     slotsCache = new LinkedTransferQueue();
197     while (slotsCache.size() < numSlots) {
198       slotsCache.offer(new ByteSlot());
199     }
200 
201     // Tunings
202     maxRetriesBeforeRoll =
203       conf.getInt(MAX_RETRIES_BEFORE_ROLL_CONF_KEY, DEFAULT_MAX_RETRIES_BEFORE_ROLL);
204     maxSyncFailureRoll = conf.getInt(MAX_SYNC_FAILURE_ROLL_CONF_KEY, DEFAULT_MAX_SYNC_FAILURE_ROLL);
205     waitBeforeRoll = conf.getInt(WAIT_BEFORE_ROLL_CONF_KEY, DEFAULT_WAIT_BEFORE_ROLL);
206     rollRetries = conf.getInt(ROLL_RETRIES_CONF_KEY, DEFAULT_ROLL_RETRIES);
207     rollThreshold = conf.getLong(ROLL_THRESHOLD_CONF_KEY, DEFAULT_ROLL_THRESHOLD);
208     periodicRollMsec = conf.getInt(PERIODIC_ROLL_CONF_KEY, DEFAULT_PERIODIC_ROLL);
209     syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC);
210     useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC);
211 
212     // WebUI
213     syncMetricsBuffer = new CircularFifoBuffer(
214       conf.getInt(STORE_WAL_SYNC_STATS_COUNT, DEFAULT_SYNC_STATS_COUNT));
215 
216     // Init sync thread
217     syncThread = new Thread("WALProcedureStoreSyncThread") {
218       @Override
219       public void run() {
220         try {
221           syncLoop();
222         } catch (Throwable e) {
223           LOG.error("Got an exception from the sync-loop", e);
224           if (!isSyncAborted()) {
225             sendAbortProcessSignal();
226           }
227         }
228       }
229     };
230     syncThread.start();
231   }
232 
233   @Override
234   public void stop(boolean abort) {
235     if (!running.getAndSet(false)) {
236       return;
237     }
238 
239     LOG.info("Stopping the WAL Procedure Store");
240     sendStopSignal();
241 
242     if (!abort) {
243       try {
244         while (syncThread.isAlive()) {
245           sendStopSignal();
246           syncThread.join(250);
247         }
248       } catch (InterruptedException e) {
249         LOG.warn("join interrupted", e);
250         Thread.currentThread().interrupt();
251       }
252     }
253 
254     // Close the writer
255     closeStream();
256 
257     // Close the old logs
258     // they should be already closed, this is just in case the load fails
259     // and we call start() and then stop()
260     for (ProcedureWALFile log: logs) {
261       log.close();
262     }
263     logs.clear();
264   }
265 
266   private void sendStopSignal() {
267     if (lock.tryLock()) {
268       try {
269         waitCond.signalAll();
270         syncCond.signalAll();
271       } finally {
272         lock.unlock();
273       }
274     }
275   }
276 
277   @Override
278   public boolean isRunning() {
279     return running.get();
280   }
281 
282   @Override
283   public int getNumThreads() {
284     return slots == null ? 0 : slots.length;
285   }
286 
287   public ProcedureStoreTracker getStoreTracker() {
288     return storeTracker;
289   }
290 
291   public ArrayList<ProcedureWALFile> getActiveLogs() {
292     lock.lock();
293     try {
294       return new ArrayList<ProcedureWALFile>(logs);
295     } finally {
296       lock.unlock();
297     }
298   }
299 
300   public Set<ProcedureWALFile> getCorruptedLogs() {
301     return corruptedLogs;
302   }
303 
304   @Override
305   public void registerListener(ProcedureStoreListener listener) {
306     this.listeners.add(listener);
307   }
308 
309   @Override
310   public boolean unregisterListener(ProcedureStoreListener listener) {
311     return this.listeners.remove(listener);
312   }
313 
314   @Override
315   public void recoverLease() throws IOException {
316     lock.lock();
317     try {
318       LOG.info("Starting WAL Procedure Store lease recovery");
319       FileStatus[] oldLogs = getLogFiles();
320       while (isRunning()) {
321         // Get Log-MaxID and recover lease on old logs
322         try {
323           flushLogId = initOldLogs(oldLogs);
324         } catch (FileNotFoundException e) {
325           LOG.warn("someone else is active and deleted logs. retrying.", e);
326           oldLogs = getLogFiles();
327           continue;
328         }
329 
330         // Create new state-log
331         if (!rollWriter(flushLogId + 1)) {
332           // someone else has already created this log
333           LOG.debug("someone else has already created log " + flushLogId);
334           continue;
335         }
336 
337         // We have the lease on the log
338         oldLogs = getLogFiles();
339         if (getMaxLogId(oldLogs) > flushLogId) {
340           if (LOG.isDebugEnabled()) {
341             LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId);
342           }
343           logs.getLast().removeFile();
344           continue;
345         }
346 
347         LOG.info("Lease acquired for flushLogId: " + flushLogId);
348         break;
349       }
350     } finally {
351       lock.unlock();
352     }
353   }
354 
355   @Override
356   public Iterator<Procedure> load() throws IOException {
357     if (logs.isEmpty()) {
358       throw new RuntimeException("recoverLease() must be called before loading data");
359     }
360 
361     // Nothing to do, If we have only the current log.
362     if (logs.size() == 1) {
363       if (LOG.isDebugEnabled()) {
364         LOG.debug("No state logs to replay.");
365       }
366       loading.set(false);
367       return null;
368     }
369 
370     // Load the old logs
371     Iterator<ProcedureWALFile> it = logs.descendingIterator();
372     it.next(); // Skip the current log
373     try {
374       return ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() {
375         @Override
376         public void markCorruptedWAL(ProcedureWALFile log, IOException e) {
377           if (corruptedLogs == null) {
378             corruptedLogs = new HashSet<ProcedureWALFile>();
379           }
380           corruptedLogs.add(log);
381           // TODO: sideline corrupted log
382         }
383       });
384     } finally {
385       loading.set(false);
386     }
387   }
388 
389   @Override
390   public void insert(final Procedure proc, final Procedure[] subprocs) {
391     if (LOG.isTraceEnabled()) {
392       LOG.trace("Insert " + proc + ", subproc=" + Arrays.toString(subprocs));
393     }
394 
395     ByteSlot slot = acquireSlot();
396     try {
397       // Serialize the insert
398       long[] subProcIds = null;
399       if (subprocs != null) {
400         ProcedureWALFormat.writeInsert(slot, proc, subprocs);
401         subProcIds = new long[subprocs.length];
402         for (int i = 0; i < subprocs.length; ++i) {
403           subProcIds[i] = subprocs[i].getProcId();
404         }
405       } else {
406         assert !proc.hasParent();
407         ProcedureWALFormat.writeInsert(slot, proc);
408       }
409 
410       // Push the transaction data and wait until it is persisted
411       pushData(PushType.INSERT, slot, proc.getProcId(), subProcIds);
412     } catch (IOException e) {
413       // We are not able to serialize the procedure.
414       // this is a code error, and we are not able to go on.
415       LOG.fatal("Unable to serialize one of the procedure: proc=" + proc +
416                 ", subprocs=" + Arrays.toString(subprocs), e);
417       throw new RuntimeException(e);
418     } finally {
419       releaseSlot(slot);
420     }
421   }
422 
423   @Override
424   public void update(final Procedure proc) {
425     if (LOG.isTraceEnabled()) {
426       LOG.trace("Update " + proc);
427     }
428 
429     ByteSlot slot = acquireSlot();
430     try {
431       // Serialize the update
432       ProcedureWALFormat.writeUpdate(slot, proc);
433 
434       // Push the transaction data and wait until it is persisted
435       pushData(PushType.UPDATE, slot, proc.getProcId(), null);
436     } catch (IOException e) {
437       // We are not able to serialize the procedure.
438       // this is a code error, and we are not able to go on.
439       LOG.fatal("Unable to serialize the procedure: " + proc, e);
440       throw new RuntimeException(e);
441     } finally {
442       releaseSlot(slot);
443     }
444   }
445 
446   @Override
447   public void delete(final long procId) {
448     if (LOG.isTraceEnabled()) {
449       LOG.trace("Delete " + procId);
450     }
451 
452     ByteSlot slot = acquireSlot();
453     try {
454       // Serialize the delete
455       ProcedureWALFormat.writeDelete(slot, procId);
456 
457       // Push the transaction data and wait until it is persisted
458       pushData(PushType.DELETE, slot, procId, null);
459     } catch (IOException e) {
460       // We are not able to serialize the procedure.
461       // this is a code error, and we are not able to go on.
462       LOG.fatal("Unable to serialize the procedure: " + procId, e);
463       throw new RuntimeException(e);
464     } finally {
465       releaseSlot(slot);
466     }
467   }
468 
469   private ByteSlot acquireSlot() {
470     ByteSlot slot = slotsCache.poll();
471     return slot != null ? slot : new ByteSlot();
472   }
473 
474   private void releaseSlot(final ByteSlot slot) {
475     slot.reset();
476     slotsCache.offer(slot);
477   }
478 
479   private enum PushType { INSERT, UPDATE, DELETE };
480 
481   private long pushData(final PushType type, final ByteSlot slot,
482       final long procId, final long[] subProcIds) {
483     if (!isRunning()) {
484       throw new RuntimeException("the store must be running before inserting data");
485     }
486     if (logs.isEmpty()) {
487       throw new RuntimeException("recoverLease() must be called before inserting data");
488     }
489 
490     long logId = -1;
491     lock.lock();
492     try {
493       // Wait for the sync to be completed
494       while (true) {
495         if (!isRunning()) {
496           throw new RuntimeException("store no longer running");
497         } else if (isSyncAborted()) {
498           throw new RuntimeException("sync aborted", syncException.get());
499         } else if (inSync.get()) {
500           syncCond.await();
501         } else if (slotIndex == slots.length) {
502           slotCond.signal();
503           syncCond.await();
504         } else {
505           break;
506         }
507       }
508 
509       updateStoreTracker(type, procId, subProcIds);
510       slots[slotIndex++] = slot;
511       logId = flushLogId;
512 
513       // Notify that there is new data
514       if (slotIndex == 1) {
515         waitCond.signal();
516       }
517 
518       // Notify that the slots are full
519       if (slotIndex == slots.length) {
520         waitCond.signal();
521         slotCond.signal();
522       }
523 
524       syncCond.await();
525     } catch (InterruptedException e) {
526       Thread.currentThread().interrupt();
527       sendAbortProcessSignal();
528       throw new RuntimeException(e);
529     } finally {
530       lock.unlock();
531       if (isSyncAborted()) {
532         throw new RuntimeException("sync aborted", syncException.get());
533       }
534     }
535     return logId;
536   }
537 
538   private void updateStoreTracker(final PushType type,
539       final long procId, final long[] subProcIds) {
540     switch (type) {
541       case INSERT:
542         if (subProcIds == null) {
543           storeTracker.insert(procId);
544         } else {
545           storeTracker.insert(procId, subProcIds);
546         }
547         break;
548       case UPDATE:
549         storeTracker.update(procId);
550         break;
551       case DELETE:
552         storeTracker.delete(procId);
553         break;
554       default:
555         throw new RuntimeException("invalid push type " + type);
556     }
557   }
558 
559   private boolean isSyncAborted() {
560     return syncException.get() != null;
561   }
562 
563   private void syncLoop() throws Throwable {
564     long totalSyncedToStore = 0;
565     inSync.set(false);
566     lock.lock();
567     try {
568       while (isRunning()) {
569         try {
570           // Wait until new data is available
571           if (slotIndex == 0) {
572             if (!loading.get()) {
573               periodicRoll();
574             }
575 
576             if (LOG.isTraceEnabled()) {
577               float rollTsSec = getMillisFromLastRoll() / 1000.0f;
578               LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)",
579                         StringUtils.humanSize(totalSynced.get()),
580                         StringUtils.humanSize(totalSynced.get() / rollTsSec)));
581             }
582 
583             waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS);
584             if (slotIndex == 0) {
585               // no data.. probably a stop() or a periodic roll
586               continue;
587             }
588           }
589           // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing
590           final long syncWaitSt = System.currentTimeMillis();
591           if (slotIndex != slots.length) {
592             slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
593           }
594 
595           final long currentTs = System.currentTimeMillis();
596           final long syncWaitMs = currentTs - syncWaitSt;
597           final float rollSec = getMillisFromLastRoll() / 1000.0f;
598           final float syncedPerSec = totalSyncedToStore / rollSec;
599           if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) {
600             LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)",
601                       StringUtils.humanTimeDiff(syncWaitMs), slotIndex,
602                       StringUtils.humanSize(totalSyncedToStore),
603                       StringUtils.humanSize(syncedPerSec)));
604           }
605 
606           // update webui circular buffers (TODO: get rid of allocations)
607           final SyncMetrics syncMetrics = new SyncMetrics();
608           syncMetrics.timestamp = currentTs;
609           syncMetrics.syncWaitMs = syncWaitMs;
610           syncMetrics.syncedEntries = slotIndex;
611           syncMetrics.totalSyncedBytes = totalSyncedToStore;
612           syncMetrics.syncedPerSec = syncedPerSec;
613           syncMetricsBuffer.add(syncMetrics);
614 
615           // sync
616           inSync.set(true);
617           long slotSize = syncSlots();
618           logs.getLast().addToSize(slotSize);
619           totalSyncedToStore = totalSynced.addAndGet(slotSize);
620           slotIndex = 0;
621           inSync.set(false);
622         } catch (InterruptedException e) {
623           Thread.currentThread().interrupt();
624           sendAbortProcessSignal();
625           syncException.compareAndSet(null, e);
626           throw e;
627         } catch (Throwable t) {
628           syncException.compareAndSet(null, t);
629           throw t;
630         } finally {
631           syncCond.signalAll();
632         }
633       }
634     } finally {
635       lock.unlock();
636     }
637   }
638 
639   public ArrayList<SyncMetrics> getSyncMetrics() {
640     lock.lock();
641     try {
642       return new ArrayList<SyncMetrics>(syncMetricsBuffer);
643     } finally {
644       lock.unlock();
645     }
646   }
647 
648   private long syncSlots() throws Throwable {
649     int retry = 0;
650     int logRolled = 0;
651     long totalSynced = 0;
652     do {
653       try {
654         totalSynced = syncSlots(stream, slots, 0, slotIndex);
655         break;
656       } catch (Throwable e) {
657         LOG.warn("unable to sync slots, retry=" + retry);
658         if (++retry >= maxRetriesBeforeRoll) {
659           if (logRolled >= maxSyncFailureRoll) {
660             LOG.error("Sync slots after log roll failed, abort.", e);
661             sendAbortProcessSignal();
662             throw e;
663           }
664 
665           if (!rollWriterOrDie()) {
666             throw e;
667           }
668 
669           logRolled++;
670           retry = 0;
671         }
672       }
673     } while (running.get());
674     return totalSynced;
675   }
676 
677   protected long syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int offset, int count)
678       throws IOException {
679     long totalSynced = 0;
680     for (int i = 0; i < count; ++i) {
681       ByteSlot data = slots[offset + i];
682       data.writeTo(stream);
683       totalSynced += data.size();
684     }
685 
686     if (useHsync) {
687       stream.hsync();
688     } else {
689       stream.hflush();
690     }
691     sendPostSyncSignal();
692 
693     if (LOG.isTraceEnabled()) {
694       LOG.trace("Sync slots=" + count + '/' + slots.length +
695                 ", flushed=" + StringUtils.humanSize(totalSynced));
696     }
697     return totalSynced;
698   }
699 
700   protected void sendPostSyncSignal() {
701     if (!this.listeners.isEmpty()) {
702       for (ProcedureStoreListener listener : this.listeners) {
703         listener.postSync();
704       }
705     }
706   }
707 
708   private void sendAbortProcessSignal() {
709     if (!this.listeners.isEmpty()) {
710       for (ProcedureStoreListener listener : this.listeners) {
711         listener.abortProcess();
712       }
713     }
714   }
715 
716   private boolean rollWriterOrDie() {
717     for (int i = 0; i < rollRetries; ++i) {
718       if (i > 0) Threads.sleepWithoutInterrupt(waitBeforeRoll * i);
719 
720       try {
721         if (rollWriter()) {
722           return true;
723         }
724       } catch (IOException e) {
725         LOG.warn("Unable to roll the log, attempt=" + (i + 1), e);
726       }
727     }
728     LOG.fatal("Unable to roll the log");
729     sendAbortProcessSignal();
730     throw new RuntimeException("unable to roll the log");
731   }
732 
733   private boolean tryRollWriter() {
734     try {
735       return rollWriter();
736     } catch (IOException e) {
737       LOG.warn("Unable to roll the log", e);
738       return false;
739     }
740   }
741 
742   public long getMillisToNextPeriodicRoll() {
743     if (lastRollTs.get() > 0 && periodicRollMsec > 0) {
744       return periodicRollMsec - getMillisFromLastRoll();
745     }
746     return Long.MAX_VALUE;
747   }
748 
749   public long getMillisFromLastRoll() {
750     return (System.currentTimeMillis() - lastRollTs.get());
751   }
752 
753   @VisibleForTesting
754   protected void periodicRollForTesting() throws IOException {
755     lock.lock();
756     try {
757       periodicRoll();
758     } finally {
759       lock.unlock();
760     }
761   }
762 
763   @VisibleForTesting
764   protected boolean rollWriterForTesting() throws IOException {
765     lock.lock();
766     try {
767       return rollWriter();
768     } finally {
769       lock.unlock();
770     }
771   }
772 
773   private void periodicRoll() throws IOException {
774     if (storeTracker.isEmpty()) {
775       if (LOG.isTraceEnabled()) {
776         LOG.trace("no active procedures");
777       }
778       tryRollWriter();
779       removeAllLogs(flushLogId - 1);
780     } else {
781       if (storeTracker.isUpdated()) {
782         if (LOG.isTraceEnabled()) {
783           LOG.trace("all the active procedures are in the latest log");
784         }
785         removeAllLogs(flushLogId - 1);
786       }
787 
788       // if the log size has exceeded the roll threshold
789       // or the periodic roll timeout is expired, try to roll the wal.
790       if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) {
791         tryRollWriter();
792       }
793 
794       removeInactiveLogs();
795     }
796   }
797 
798   private boolean rollWriter() throws IOException {
799     // Create new state-log
800     if (!rollWriter(flushLogId + 1)) {
801       LOG.warn("someone else has already created log " + flushLogId);
802       return false;
803     }
804 
805     // We have the lease on the log,
806     // but we should check if someone else has created new files
807     if (getMaxLogId(getLogFiles()) > flushLogId) {
808       LOG.warn("Someone else created new logs. Expected maxLogId < " + flushLogId);
809       logs.getLast().removeFile();
810       return false;
811     }
812 
813     // We have the lease on the log
814     return true;
815   }
816 
817   private boolean rollWriter(final long logId) throws IOException {
818     assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId;
819     assert lock.isHeldByCurrentThread() : "expected to be the lock owner. " + lock.isLocked();
820 
821     ProcedureWALHeader header = ProcedureWALHeader.newBuilder()
822       .setVersion(ProcedureWALFormat.HEADER_VERSION)
823       .setType(ProcedureWALFormat.LOG_TYPE_STREAM)
824       .setMinProcId(storeTracker.getMinProcId())
825       .setLogId(logId)
826       .build();
827 
828     FSDataOutputStream newStream = null;
829     Path newLogFile = null;
830     long startPos = -1;
831     newLogFile = getLogFilePath(logId);
832     try {
833       newStream = fs.create(newLogFile, false);
834     } catch (FileAlreadyExistsException e) {
835       LOG.error("Log file with id=" + logId + " already exists", e);
836       return false;
837     } catch (RemoteException re) {
838       LOG.warn("failed to create log file with id=" + logId, re);
839       return false;
840     }
841     try {
842       ProcedureWALFormat.writeHeader(newStream, header);
843       startPos = newStream.getPos();
844     } catch (IOException ioe) {
845       LOG.warn("Encountered exception writing header", ioe);
846       newStream.close();
847       return false;
848     }
849 
850     closeStream();
851 
852     storeTracker.resetUpdates();
853     stream = newStream;
854     flushLogId = logId;
855     totalSynced.set(0);
856     long rollTs = System.currentTimeMillis();
857     lastRollTs.set(rollTs);
858     logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos, rollTs));
859 
860     if (LOG.isDebugEnabled()) {
861       LOG.debug("Roll new state log: " + logId);
862     }
863     return true;
864   }
865 
866   private void closeStream() {
867     try {
868       if (stream != null) {
869         try {
870           ProcedureWALFile log = logs.getLast();
871           log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId());
872           long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker);
873           log.addToSize(trailerSize);
874         } catch (IOException e) {
875           LOG.warn("Unable to write the trailer: " + e.getMessage());
876         }
877         stream.close();
878       }
879     } catch (IOException e) {
880       LOG.error("Unable to close the stream", e);
881     } finally {
882       stream = null;
883     }
884   }
885 
886   // ==========================================================================
887   //  Log Files cleaner helpers
888   // ==========================================================================
889   private void removeInactiveLogs() {
890     // Verify if the ProcId of the first oldest is still active. if not remove the file.
891     while (logs.size() > 1) {
892       ProcedureWALFile log = logs.getFirst();
893       if (storeTracker.isTracking(log.getMinProcId(), log.getMaxProcId())) {
894         break;
895       }
896       removeLogFile(log);
897     }
898   }
899 
900   private void removeAllLogs(long lastLogId) {
901     if (logs.size() <= 1) return;
902 
903     if (LOG.isDebugEnabled()) {
904       LOG.debug("Remove all state logs with ID less than " + lastLogId);
905     }
906     while (logs.size() > 1) {
907       ProcedureWALFile log = logs.getFirst();
908       if (lastLogId < log.getLogId()) {
909         break;
910       }
911       removeLogFile(log);
912     }
913   }
914 
915   private boolean removeLogFile(final ProcedureWALFile log) {
916     try {
917       if (LOG.isDebugEnabled()) {
918         LOG.debug("Remove log: " + log);
919       }
920       log.removeFile();
921       logs.remove(log);
922       LOG.info("Remove log: " + log);
923       LOG.info("Removed logs: " + logs);
924       if (logs.size() == 0) { LOG.error("Expected at least one log"); }
925       assert logs.size() > 0 : "expected at least one log";
926     } catch (IOException e) {
927       LOG.error("Unable to remove log: " + log, e);
928       return false;
929     }
930     return true;
931   }
932 
933   // ==========================================================================
934   //  FileSystem Log Files helpers
935   // ==========================================================================
936   public Path getWALDir() {
937     return this.walDir;
938   }
939 
940   public FileSystem getFileSystem() {
941     return this.fs;
942   }
943 
944   protected Path getLogFilePath(final long logId) throws IOException {
945     return new Path(walDir, String.format("state-%020d.log", logId));
946   }
947 
948   private static long getLogIdFromName(final String name) {
949     int end = name.lastIndexOf(".log");
950     int start = name.lastIndexOf('-') + 1;
951     while (start < end) {
952       if (name.charAt(start) != '0')
953         break;
954       start++;
955     }
956     return Long.parseLong(name.substring(start, end));
957   }
958 
959   private static final PathFilter WALS_PATH_FILTER = new PathFilter() {
960     @Override
961     public boolean accept(Path path) {
962       String name = path.getName();
963       return name.startsWith("state-") && name.endsWith(".log");
964     }
965   };
966 
967   private static final Comparator<FileStatus> FILE_STATUS_ID_COMPARATOR =
968       new Comparator<FileStatus>() {
969     @Override
970     public int compare(FileStatus a, FileStatus b) {
971       final long aId = getLogIdFromName(a.getPath().getName());
972       final long bId = getLogIdFromName(b.getPath().getName());
973       return Long.compare(aId, bId);
974     }
975   };
976 
977   private FileStatus[] getLogFiles() throws IOException {
978     try {
979       FileStatus[] files = fs.listStatus(walDir, WALS_PATH_FILTER);
980       Arrays.sort(files, FILE_STATUS_ID_COMPARATOR);
981       return files;
982     } catch (FileNotFoundException e) {
983       LOG.warn("Log directory not found: " + e.getMessage());
984       return null;
985     }
986   }
987 
988   private static long getMaxLogId(final FileStatus[] logFiles) {
989     long maxLogId = 0;
990     if (logFiles != null && logFiles.length > 0) {
991       for (int i = 0; i < logFiles.length; ++i) {
992         maxLogId = Math.max(maxLogId, getLogIdFromName(logFiles[i].getPath().getName()));
993       }
994     }
995     return maxLogId;
996   }
997 
998   /**
999    * @return Max-LogID of the specified log file set
1000    */
1001   private long initOldLogs(final FileStatus[] logFiles) throws IOException {
1002     this.logs.clear();
1003 
1004     long maxLogId = 0;
1005     if (logFiles != null && logFiles.length > 0) {
1006       for (int i = 0; i < logFiles.length; ++i) {
1007         final Path logPath = logFiles[i].getPath();
1008         leaseRecovery.recoverFileLease(fs, logPath);
1009         maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName()));
1010 
1011         ProcedureWALFile log = initOldLog(logFiles[i]);
1012         if (log != null) {
1013           this.logs.add(log);
1014         }
1015       }
1016       Collections.sort(this.logs);
1017       initTrackerFromOldLogs();
1018     }
1019     return maxLogId;
1020   }
1021 
1022   private void initTrackerFromOldLogs() {
1023     // TODO: Load the most recent tracker available
1024     if (!logs.isEmpty()) {
1025       ProcedureWALFile log = logs.getLast();
1026       try {
1027         log.readTracker(storeTracker);
1028       } catch (IOException e) {
1029         LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage());
1030         // try the next one...
1031         storeTracker.reset();
1032         storeTracker.setPartialFlag(true);
1033       }
1034     }
1035   }
1036 
1037   private ProcedureWALFile initOldLog(final FileStatus logFile) throws IOException {
1038     ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
1039     if (logFile.getLen() == 0) {
1040       LOG.warn("Remove uninitialized log: " + logFile);
1041       log.removeFile();
1042       return null;
1043     }
1044     if (LOG.isDebugEnabled()) {
1045       LOG.debug("Opening state-log: " + logFile);
1046     }
1047     try {
1048       log.open();
1049     } catch (ProcedureWALFormat.InvalidWALDataException e) {
1050       LOG.warn("Remove uninitialized log: " + logFile, e);
1051       log.removeFile();
1052       return null;
1053     } catch (IOException e) {
1054       String msg = "Unable to read state log: " + logFile;
1055       LOG.error(msg, e);
1056       throw new IOException(msg, e);
1057     }
1058 
1059     if (log.isCompacted()) {
1060       try {
1061         log.readTrailer();
1062       } catch (IOException e) {
1063         LOG.warn("Unfinished compacted log: " + logFile, e);
1064         log.removeFile();
1065         return null;
1066       }
1067     }
1068     return log;
1069   }
1070 }