KafkaBolt Integration: Trident APIs
To use KafkaBolt, create an instance of
org.apache.storm.kafka.trident.TridentState and
org.apache.storm.kafka.trident.TridentStateFactory, and attach them to
your topology. The following example shows construction of a Kafka bolt using Trident
APIs, followed by details about the code:
Fields fields = new Fields("word", "count");
FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
new Values("storm", "1"),
new Values("trident", "1"),
new Values("needs", "1"),
new Values("javadoc", "1")
);
spout.setCycle(true);
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);
//set producer properties.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withProducerProperties(props)
.withKafkaTopicSelector(new DefaultTopicSelector("test"))
.withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
Config conf = new Config();
StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());Instantiate a KafkaBolt.
The Trident API uses a combination of the
storm.kafka.trident.TridentStateFactoryandstorm.kafka.trident.TridentKafkaStateFactoryclasses.TridentTopology topology = new TridentTopology(); Stream stream = topology.newStream("spout"); TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory(); stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());Configure the KafkaBolt with a Tuple-to-Message Mapper.
The KafkaBolt must map Storm tuples to Kafka messages. By default, KafkaBolt looks for fields named "key" and "message." Storm provides the
storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapperclass to support this default behavior and provide backward compatibility. The class is used by both the core-storm and Trident APIs.TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory() .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));You must specify the field names for the Storm tuple key and the Kafka message for any implementation of the
TridentKafkaStatein the Trident API. This interface does not provide a default constructor.However, some Kafka bolts may require more than two fields. You can write your own implementation of the
TupleToKafkaMapperandTridentTupleToKafkaMapperinterfaces to customize the mapping of Storm tuples to Kafka messages. Both interfaces define two methods:K getKeyFromTuple(Tuple/TridentTuple tuple);
V getMessageFromTuple(Tuple/TridentTuple tuple);
Configure the
KafkaBoltwith a Kafka Topic Selector.![[Note]](../common/images/admon/note.png)
Note To ignore a message, return NULL from the
getTopics()method.TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory() .withKafkaTopicSelector(new DefaultTopicSelector("test")) .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));If you need to write to multiple Kafka topics, you can write your own implementation of the
KafkaTopicSelectorinterface; for example:public interface KafkaTopicSelector { String getTopics(Tuple/TridentTuple tuple); }Configure the
KafkaBoltwith Kafka Producer properties.You can specify producer properties in your Storm topology by calling
TridentKafkaStateFactory.withProducerProperties(). See the Apache Producer Configs documentation for more information.

