Aggregator
The Aggregator interface represents the most general form of
aggregation operations:
public interface Aggregator<T> extends Operation {
T init(Object batchId, TridentCollector collector);
void aggregate(T val, TridentTuple tuple, TridentCollector collector);
void complete(T val, TridentCollector collector);
}
A key difference between Aggregator and other Trident aggregation
interfaces is that an instance of TridentCollector is passed as a
parameter to every method. This allows Aggregator implementations to emit tuples
at any time during execution.
Storm executes Aggregator instances as follows:
- Storm calls the
init()method, which returns an objectTrepresenting the initial state of the aggregation.Tis also passed to theaggregate()andcomplete()methods. - Storm calls the
aggregate()method repeatedly, to process each tuple in the batch. - Storm calls
complete()with the final value of the aggregation.
The word count example uses the built-in Count class that
implements the CombinerAggregator interface. The Count
class could also be implemented as an Aggregator:
public class Count extends BaseAggregator<CountState> {
static class CountState {
long count = 0;
}
public CountState init(Object batchId, TridentCollector collector) {
return new CountState();
}
public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
state.count+=1;
}
public void complete(CountState state, TridentCollector collector) {
collector.emit(new Values(state.count));
}
}

