Apache Kafka is a high-throughput, distributed messaging system. Apache Storm provides a Kafka spout to facilitate ingesting data from Kafka 0.8x brokers. Storm developers should include downstream bolts in their topologies to process data ingested with the Kafka spout. The storm-kafka components include a core-storm spout, as well as a fully transactional Trident spout. The storm-kafka spouts provide the following key features: 
- 'Exactly once' tuple processing with the Trident API 
- Dynamic discovery of Kafka brokers and partitions 
Hortonworks recommends that Storm developers use the Trident API. However, use the core-storm API if sub-second latency is critical for your Storm topology.
The core-storm API represents a Kafka spout with the KafkaSpout class, and the Trident API provides a OpaqueTridentKafkaSpout class to represent the spout. To initialize KafkaSpout and OpaqueTridentKafkaSpout, Storm developers need an instance of a subclass of the KafkaConfig class, which represents configuration information needed to ingest data from a Kafka cluster. The KafkaSpout constructor requires the SpoutConfig subclass, and the OpaqueTridentKafkaSpout requires the TridentKafkaConfig subclass. In turn, the constructors for both KafkaSpout and OpaqueTridentKafkaSpout require an implementation of the BrokerHosts interface, which is used to map Kafka brokers to topic partitions. The storm-kafka component provides two implementations of BrokerHosts: ZkHosts and StaticHosts. Use the ZkHosts implementation to dynamically track broker-to-partition mapping and the StaticHosts implementation when broker-to-partition mapping is static.
The following code samples demonstrate the use of these classes and interfaces.
Core-storm API
BrokerHosts hosts = new ZkHosts(zkConnString); SpoutConfig spoutConfig = new SpoutConfig(hosts,topicName,"/" + topicName, UUID.randomUUID().toString()); spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
Trident API
TridentTopology topology = new TridentTopology();
BrokerHosts zk = new ZkHosts("localhost");
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic");
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf); zookeeper.connect=host1:2181,host2:2181,host3:2181
Storm-Kafka API Reference
Javadoc for the storm-kafka component is installed at <$STORM_HOME>/contrib/storm-kafka/storm-kafka-0.9.3.2.2.8.0-<buildnumber>-javadoc.jar. This section provides additional reference documentation for the primary classes and interfaces of the storm-kafka component. 
BrokerHosts Interface
The storm-kafka component provides two implementations of the BrokerHosts interface: ZkHosts and StaticHosts. Use the ZkHosts implementation to dynamically track broker-to-partition mapping and the StaticHosts implementation when broker-to-partition mapping is static. The constructor for StaticHosts requires an instance of GlobalPartitionInformation: 
Broker brokerForPartition0 = new Broker("localhost");//localhost:9092
Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly
Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string.
GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
partitionInfo.add(0, brokerForPartition0)//mapping form partition 0 to brokerForPartition0
partitionInfo.add(1, brokerForPartition1)//mapping form partition 1 to brokerForPartition1
partitionInfo.add(2, brokerForPartition2)//mapping form partition 2 to brokerForPartition2
StaticHosts hosts = new StaticHosts(partitionInfo);KafkaConfig Class
Instantiate an instance of KafkaConfig with one of the following constructors, each of which requires an implementation of the BrokerHosts interface: 
public KafkaConfig(BrokerHosts hosts, String topic) public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
Table 1.8. KafkaConfig Parameters
| KafkaConfig Parameter | Description | 
|---|---|
|   |  Any implementation of the  | 
|   | Name of the Kafka topic. | 
|   | Optional parameter used as part of the ZooKeeper path where the spout's current offset is stored. | 
Both SpoutConfig from the core-storm API and TridentKafkaConfig from the Trident API extend KafkaConfig. Instantiate these classes with the following constructors: 
Core-Storm API
Constructor public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id)
Table 1.9. SpoutConfig Parameters
| SpoutConfig Parameter | Description | 
|---|---|
|   |  Any implementation of the  | 
|   | Name of the Kafka topic. | 
|   |  Root directory in Zookeeper where all topics and partition information is stored. By default, this is  | 
|   | Unique identifier for this spout. | 
Trident API
Constructors public TridentKafkaConfig(BrokerHosts hosts, String topic) public TridentKafkaConfig(BrokerHosts hosts, String topic, String id)
Table 1.10. TridentKafkaConfig Parameters
| TridentKafkaConfig | Description | 
|---|---|
|   |  Any implementation of the  | 
|   | Name of the Kafka topic. | 
|   | Unique identifier for this spout. | 
KafkaConfig contains several fields used to configure the behavior of a Kafka spout in a Storm topology: 
Table 1.11. KafkaConfig Fields
| KafkaConfig Field | Description | 
|---|---|
|   | Specifies the number of bytes to attempt to fetch in one request to a Kafka server. The default is 1MB. | 
|   | Specifies the number of milliseconds to wait before a socket fails an operation with a timeout. The default value is 10 seconds. | 
|   | Specifies the buffer size in bytes for network requests. The default is 1MB. | 
|   |  The interface that specifies how a byte[] from a Kafka topic is transformed into a Storm tuple. The default,  | 
|   | Controls whether a Kafka spout fetches data from the beginning of a Kafka topic. The default is false. | 
|   |  Controls whether streaming for a topic starts from the beginning of the topic or whether only new messages are streamed. The following are valid values: *  | 
|   |  Specifies how long a spout attempts to retry the processing of a failed tuple. If a failing tuple's offset is less then  | 
|   | Controls whether a spout streams messages from the beginning of a topic when the spout throws an exception for an out-of-range offset. The default value is true. | 
|   | Controls the time interval at which Storm reports spout-related metrics. The default is 60 seconds. | 
Limitations
The current version of the Kafka spout contains the following limitations:
- Does not support Kafka 0.7x brokers. 
- Storm developers must include - ${STORM_HOME}/lib/*in the CLASSPATH environment variable from the command line when running- kafka-topologyin local mode. Otherwise, developers will likely receive a- java.lang.NoClassDefFoundErrorexception:- java -cp "/usr/lib/storm/contrib/storm-kafka-example-0.9.1.2.1.1.0-320-jar-with-dependencies.jar: /usr/lib/storm/lib/*" org.apache.storm.kafka.TestKafkaTopology <zookeeper_host> 
- Secure Hadoop clusters must comment out the following statement from - ${STORM_HOME}/bin/kafka-server-start.sh:- EXTRA_ARGS="-name kafkaServer -loggc" 
- Core-storm API Constructor - public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) - Table 1.12. SpoutConfig Parameters - SpoutConfig Parameter - Description - hosts- Any implementation of the - BrokerHostsinterface, currently either- ZkHostsor- StaticHosts.- topic- Name of the Kafka topic. - zkroot- Root directory in Zookeeper where all topics and partition information is stored. By default, this is - /brokers.- id- Unique identifier for this spout. 
- Trident API Constructors - public TridentKafkaConfig(BrokerHosts hosts, String topic) public TridentKafkaConfig(BrokerHosts hosts, String topic, String id) - Table 1.13. TridentKafkaConfig Parameters - TridentKafkaConfig - Description - hosts- Any implementation of the - BrokerHostsinterface, currently either- ZkHostsor- StaticHosts.- topic- Name of the Kafka topic. - clientid- Unique identifier for this spout. 
Kafka Cluster Configuration
The storm-kafka connector requires some configuration of the Apache Kafka installation. Kafka administrators must add a zookeeper.connect property with the hostnames and port numbers of the HDP Zookeeper nodes to Kafka's server.properties file. 


