KafkaBolt Integration: Trident APIs
To use KafkaBolt, create an instance of
            org.apache.storm.kafka.trident.TridentState and
            org.apache.storm.kafka.trident.TridentStateFactory, and attach them to
        your topology. 
The following example shows construction of a Kafka bolt using Trident APIs, followed by details about the code:
Fields fields = new Fields("word", "count");
FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
new Values("storm", "1"),
new Values("trident", "1"),
new Values("needs", "1"),
new Values("javadoc", "1")
);
 
spout.setCycle(true);
 
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);
 
//set producer properties.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
        .withProducerProperties(props)
        .withKafkaTopicSelector(new DefaultTopicSelector("test"))
        .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
Config conf = new Config();
StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());
      - 
            Instantiate a KafkaBolt. The Trident API uses a combination of the storm.kafka.trident.TridentStateFactoryandstorm.kafka.trident.TridentKafkaStateFactoryclasses.TridentTopology topology = new TridentTopology(); Stream stream = topology.newStream("spout"); TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory(); stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
- 
            Configure the KafkaBolt with a Tuple-to-Message Mapper. The KafkaBolt must map Storm tuples to Kafka messages. By default, KafkaBolt looks for fields named "key" and "message." Storm provides the storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapperclass to support this default behavior and provide backward compatibility. The class is used by both the core-storm and Trident APIs.TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory() .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));You must specify the field names for the Storm tuple key and the Kafka message for any implementation of the TridentKafkaStatein the Trident API. This interface does not provide a default constructor.However, some Kafka bolts may require more than two fields. You can write your own implementation of the TupleToKafkaMapperandTridentTupleToKafkaMapperinterfaces to customize the mapping of Storm tuples to Kafka messages. Both interfaces define two methods:K getKeyFromTuple(Tuple/TridentTuple tuple); V getMessageFromTuple(Tuple/TridentTuple tuple); 
- 
            Configure the KafkaBoltwith a Kafka Topic Selector. Note NoteTo ignore a message, return NULL from the getTopics()method.TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory() .withKafkaTopicSelector(new DefaultTopicSelector("test")) .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));If you need to write to multiple Kafka topics, you can write your own implementation of the KafkaTopicSelectorinterface; for example:public interface KafkaTopicSelector { String getTopics(Tuple/TridentTuple tuple); }
- 
            Configure the KafkaBoltwith Kafka Producer properties.You can specify producer properties in your Storm topology by calling TridentKafkaStateFactory.withProducerProperties(). See the Apache Producer Configs documentation for more information.

