Checkpointing
Checkpointing is triggered by an internal checkpoint spout at the interval
                specified by topology.state.checkpoint.interval.ms. If there is at
                least one IStatefulBolt in the topology, the checkpoint spout is automatically added
                by the topology builder .
For stateful topologies, the topology builder wraps the IStatefulBolt
                in a StatefulBoltExecutor, which handles the state commits on receiving the
                checkpoint tuples. Non-stateful bolts are wrapped in a CheckpointTupleForwarder, which simply forwards the
                checkpoint tuples so
                that the checkpoint tuples can flow through the topology directed acyclic graph
                (DAG).
Checkpoint tuples flow through a separate internal stream called
                    $checkpoint. The topology builder wires the checkpoint stream
                across the whole topology, with the checkpoint spout at the root.

At specified checkpoint intervals, the checkpoint spout emits checkpoint tuples. Upon receiving a checkpoint tuple, the state of the bolt is saved and the checkpoint tuple is forwarded to the next component. Each bolt waits for the checkpoint to arrive on all of its input streams before it saves its state, so state is consistent across the topology. Once the checkpoint spout receives an ack from all bolts, the state commit is complete and the transaction is recorded as committed by the checkpoint spout.
This checkpoint mechanism builds on Storm's existing acking mechanism to replay the tuples. It uses concepts from the asynchronous snapshot algorithm used by Flink, and from the Chandy-Lamport algorithm for distributed snapshots. Internally, checkpointing uses a three-phase commit protocol with a prepare and commit phase, so that the state across the topology is saved in a consistent and atomic manner.

