KafkaBolt Integration: Trident APIs
To use KafkaBolt, create an instance of
                org.apache.storm.kafka.trident.TridentState and
                org.apache.storm.kafka.trident.TridentStateFactory, and attach them to
            your topology. The following example shows  construction of a Kafka bolt using Trident
            APIs, followed by details about the code:
Fields fields = new Fields("word", "count");
FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
new Values("storm", "1"),
new Values("trident", "1"),
new Values("needs", "1"),
new Values("javadoc", "1")
);
 
spout.setCycle(true);
 
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);
 
//set producer properties.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
        .withProducerProperties(props)
        .withKafkaTopicSelector(new DefaultTopicSelector("test"))
        .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
Config conf = new Config();
StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());- Instantiate a KafkaBolt. - The Trident API uses a combination of the - storm.kafka.trident.TridentStateFactoryand- storm.kafka.trident.TridentKafkaStateFactoryclasses.- TridentTopology topology = new TridentTopology(); Stream stream = topology.newStream("spout"); TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory(); stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
- Configure the KafkaBolt with a Tuple-to-Message Mapper. - The KafkaBolt must map Storm tuples to Kafka messages. By default, KafkaBolt looks for fields named "key" and "message." Storm provides the - storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapperclass to support this default behavior and provide backward compatibility. The class is used by both the core-storm and Trident APIs.- TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory() .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));- You must specify the field names for the Storm tuple key and the Kafka message for any implementation of the - TridentKafkaStatein the Trident API. This interface does not provide a default constructor.- However, some Kafka bolts may require more than two fields. You can write your own implementation of the - TupleToKafkaMapperand- TridentTupleToKafkaMapperinterfaces to customize the mapping of Storm tuples to Kafka messages. Both interfaces define two methods:- K getKeyFromTuple(Tuple/TridentTuple tuple); - V getMessageFromTuple(Tuple/TridentTuple tuple); 
- Configure the - KafkaBoltwith a Kafka Topic Selector.![[Note]](../common/images/admon/note.png) - Note - To ignore a message, return NULL from the - getTopics()method.- TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory() .withKafkaTopicSelector(new DefaultTopicSelector("test")) .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));- If you need to write to multiple Kafka topics, you can write your own implementation of the - KafkaTopicSelectorinterface; for example:- public interface KafkaTopicSelector { String getTopics(Tuple/TridentTuple tuple); }
- Configure the - KafkaBoltwith Kafka Producer properties.- You can specify producer properties in your Storm topology by calling - TridentKafkaStateFactory.withProducerProperties(). See the Apache Producer Configs documentation for more information.

