Storm-Kafka API Reference
Javadoc for the storm-kafka component is installed at
<$STORM_HOME>/contrib/storm-kafka/storm-kafka-0.9.3.2.2.6.0-<buildnumber>-javadoc.jar.
This section provides additional reference documentation for the primary classes and
interfaces of the storm-kafka component.
BrokerHosts Interface
The storm-kafka component provides two implementations of the
BrokerHosts interface: ZkHosts and
StaticHosts. Use the ZkHosts
implementation to dynamically track broker-to-partition mapping and the
StaticHosts implementation when broker-to-partition mapping
is static. The constructor for StaticHosts requires an instance
of GlobalPartitionInformation:
Broker brokerForPartition0 = new Broker("localhost");//localhost:9092
Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly
Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string.
GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
partitionInfo.add(0, brokerForPartition0)//mapping form partition 0 to brokerForPartition0
partitionInfo.add(1, brokerForPartition1)//mapping form partition 1 to brokerForPartition1
partitionInfo.add(2, brokerForPartition2)//mapping form partition 2 to brokerForPartition2
StaticHosts hosts = new StaticHosts(partitionInfo);KafkaConfig Class
Instantiate an instance of KafkaConfig with one of the following constructors, each of which requires an implementation of the BrokerHosts interface:
public KafkaConfig(BrokerHosts hosts, String topic) public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
Table 5.1. KafkaConfig Parameters
KafkaConfig Parameter | Description |
|---|---|
| Any implementation of the |
| Name of the Kafka topic. |
| Optional parameter used as part of the ZooKeeper path where the spout's current offset is stored. |
Both SpoutConfig from the core-storm API and TridentKafkaConfig from the Trident API extend KafkaConfig. Instantiate these classes with the following constructors:
Core-Storm API
Constructor public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id)
Table 5.2. SpoutConfig Parameters
SpoutConfig Parameter | Description |
|---|---|
| Any implementation of the |
| Name of the Kafka topic. |
| Root directory in ZooKeeper where all topics and partition information is stored. By default, this is |
| Unique identifier for this spout. |
Trident API
Constructors public TridentKafkaConfig(BrokerHosts hosts, String topic) public TridentKafkaConfig(BrokerHosts hosts, String topic, String id)
Table 5.3. TridentKafkaConfig Parameters
TridentKafkaConfig | Description |
|---|---|
| Any implementation of the |
| Name of the Kafka topic. |
| Unique identifier for this spout. |
KafkaConfig contains several fields used to configure the behavior of a Kafka spout in a Storm topology:
Table 5.4. KafkaConfig Fields
KafkaConfig Field | Description |
|---|---|
| Specifies the number of bytes to attempt to fetch in one request to a Kafka server. The default is 1MB. |
| Specifies the number of milliseconds to wait before a socket fails an operation with a timeout. The default value is 10 seconds. |
| Specifies the buffer size in bytes for network requests. The default is 1MB. |
| The interface that specifies how a byte[] from a Kafka topic is transformed into a Storm tuple. The default, |
| To force the spout to ignore any consumer state
information stored in ZooKeeper, set
|
| Controls whether streaming for a topic starts from the beginning of the topic or whether only new messages are streamed. The following are valid values: * |
| Specifies how long a spout attempts to retry the processing of a failed tuple. If a failing
tuple's offset is less then |
| Controls whether a spout streams messages from the beginning of a topic when the spout throws an exception for an out-of-range offset. The default value is true. |
| Controls the time interval at which Storm reports spout-related metrics. The default is 60 seconds. |

