Implementing Windowing in Core Storm
Bolts that need windowing support can implement the bolt interface
                    IWindowedBolt:
public interface IWindowedBolt extends IComponent {
   void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
   /**
     * Process tuples falling within the window and optionally emit 
     * new tuples based on the tuples in the input window.
     */
   void execute(TupleWindow inputWindow);
   void cleanup(); 
}Every time the window slides (the sliding interval elapses), the execute
            method is invoked.
The TupleWindow parameter gives access to current tuples in the window,
                tuples that expired, and new tuples that were added since the last window was
                computed. This information can be used to optimize the efficiency of windowing
                computations.
Bolts that need windowing support would typically extend
            BaseWindowedBolt, which has APIs for specifying window length and sliding
            intervals. For example:
public class SlidingWindowBolt extends BaseWindowedBolt {
   private OutputCollector collector;
   @Override
   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector){
       this.collector = collector;
   }
   @Override
   public void execute(TupleWindow inputWindow) {
     for(Tuple tuple: inputWindow.get()) {
       // do the windowing computation
          ...
     }
     collector.emit(new Values(computedValue));
   }
}The BaseWindowedBolt has APIs to define type of window, window length,
            and sliding interval. You can specify window length and sliding interval as a count of
            the number of tuples, a duration of time, or both count and duration. The following
            window configuration settings are supported:
/* * Tuple count based sliding window that slides after slidingInterval number of tuples */ withWindow(Count windowLength, Count slidingInterval) /* * Tuple count based window that slides with every incoming tuple */ withWindow(Count windowLength) /* * Tuple count based sliding window that slides after slidingInterval time duration */ withWindow(Count windowLength, Duration slidingInterval) /* * Time duration based sliding window that slides after slidingInterval time duration */ withWindow(Duration windowLength, Duration slidingInterval) /* * Time duration based window that slides with every incoming tuple */ withWindow(Duration windowLength) /* * Time duration based sliding window that slides after slidingInterval number of tuples */ withWindow(Duration windowLength, Count slidingInterval) /* * Count based tumbling window that tumbles after the specified count of tuples */ withTumblingWindow(BaseWindowedBolt.Count count) /* * Time duration based tumbling window that tumbles after the specified time duration */ withTumblingWindow(BaseWindowedBolt.Duration duration)
To add windowed bolts to the topology, use the TopologyBuilder (as you
                would with non-windowed bolts). For example:
TopologyBuilder builder = new TopologyBuilder();
/*
 * A windowed bolt that computes sum over a sliding window with window length of
 * 30 events that slides after every 10 events.
 */
builder.setBolt("sum", new WindowSumBolt().withWindow(Count.of(30), Count.of(10)), 1)
       .shuffleGrouping("spout");    For a sample topology that shows how to use the APIs to compute a sliding window sum
                and a tumbling window average, see the
                    SlidingWindowTopology.java file in the
                    storm-starter GitHub directory.
For examples of tumbling and sliding windows, see the Apache document Windowing Support in Core Storm.
The following subsections describe additional aspects of windowing calculations: timestamps, watermarks, guarantees, and state management.

