KafkaSpout Integration: Core Storm APIs
The core-storm API represents a Kafka spout with the KafkaSpout
class.
To initialize KafkaSpout, define a SpoutConfig
subclass instance of the KafkaConfig class, representing
configuration information needed to ingest data from a Kafka cluster.
KafkaSpout requires an instance of the BrokerHosts
interface.
BrokerHosts Interface
The BrokerHost interface maps Kafka brokers to topic partitions.
Constructors for KafkaSpout (and, for the Trident API,
TridentKafkaConfig) require an implementation of the
BrokerHosts interface.
The storm-kafka component provides two implementations of
BrokerHosts, ZkHosts and
StaticHosts:
Use
ZkHostsif you want to track broker-to-partition mapping dynamically.This class uses Kafka's ZooKeeper entries to track mapping.You can instantiate an object as follows:
public ZkHosts(String brokerZkStr, String brokerZkPath)public ZkHosts(String brokerZkStr)where:
brokerZkStris theIP:portaddress for the ZooKeeper host; for example,localhost:2181.brokerZkPathis the root directory under which topics and partition information are stored. By default this is /brokers, which is the default used by Kafka.
By default, broker-partition mapping refreshes every 60 seconds. If you want to change the refresh frequency, set
host.refreshFreqSecsto your chosen value.Use
StaticHostsfor static broker-to-partition mapping. To construct an instance of this class, you must first construct an instance ofGlobalPartitionInformation; for example:Broker brokerForPartition0 = new Broker("localhost");//localhost:9092 Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string. GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation(); partitionInfo.add(0, brokerForPartition0)//mapping form partition 0 to brokerForPartition0 partitionInfo.add(1, brokerForPartition1)//mapping form partition 1 to brokerForPartition1 partitionInfo.add(2, brokerForPartition2)//mapping form partition 2 to brokerForPartition2 StaticHosts hosts = new StaticHosts(partitionInfo);
KafkaConfig Class and SpoutConfig
Subclass
Next, define a SpoutConfig subclass instance of the
KafkaConfig class.
KafkaConfig contains several fields used to configure the
behavior of a Kafka spout in a Storm topology; Spoutconfig extends
KafkaConfig, supporting additional fields for ZooKeeper connection
info and for controlling behavior specific to KafkaSpout.
KafkaConfig implements the following constructors, each of
which requires an implementation of the BrokerHosts
interface:
public KafkaConfig(BrokerHosts hosts, String topic) public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
KafkaConfig Parameters
hostsOne or more hosts that are Kafka ZooKeeper broker nodes (see "
BrokerHostsInterface").topicName of the Kafka topic that KafkaSpout will consume from.
clientIdOptional parameter used as part of the ZooKeeper path, specifying where the spout's current offset is stored.
KafkaConfig Fields
fetchSizeBytesNumber of bytes to attempt to fetch in one request to a Kafka server. The default is 1MB.
socketTimeoutMsNumber of milliseconds to wait before a socket fails an operation with a timeout. The default value is 10 seconds.
bufferSizeBytesBuffer size (in bytes) for network requests. The default is 1MB.
schemeThe interface that specifies how a
ByteBufferfrom a Kafka topic is transformed into a Storm tuple.The default,
MultiScheme, returns a tuple and no additional processing.The API provides many implementations of the
Schemeclass, including:storm.kafka.StringSchemestorm.kafka.KeyValueSchemeAsMultiSchemestorm.kafka.StringKeyValueSchemestorm.kafka.KeyValueSchemeAsMultiScheme
![[Important]](../common/images/admon/important.png)
Important In Apache Storm versions prior to 1.0,
MultiSchememethods accepted abyte[]parameter instead of aByteBuffer. In Storm version 1.0,MultiSchemeand related scheme APIs changed; they now accept aByteBufferinstead of abyte[].As a result, Kafka spouts built with Storm versions earlier than 1.0 do not work with Storm versions 1.0 and later. When running topologies with Storm version 1.0 and later, ensure that your version of
storm-kafkais at least 1.0. Rebuild pre-1.0 shaded topology .jar files that bundlestorm-kafkaclasses withstorm-kafkaversion 1.0 before running them in clusters with Storm 1.0 and later.ignoreZKOffsetsTo force the spout to ignore any consumer state information stored in ZooKeeper, set
ignoreZkOffsetstotrue. Iftrue, the spout always begins reading from the offset defined bystartOffsetTime. For more information, see "How KafkaSpout stores offsets of a Kafka topic and recovers in case of failures."startOffsetTimeControls whether streaming for a topic starts from the beginning of the topic or whether only new messages are streamed. The following are valid values:
kafka.api.OffsetRequest.EarliestTime()starts streaming from the beginning of the topickafka.api.OffsetRequest.LatestTime()streams only new messages
maxOffsetBehindSpecifies how long a spout attempts to retry the processing of a failed tuple. If a failing tuple's offset is less than
maxOffsetBehind, the spout stops retrying the tuple. The default isLONG.MAX_VALUE.useStartOffsetTimeOfOffsetOutOfRangeControls whether a spout streams messages from the beginning of a topic when the spout throws an exception for an out-of-range offset. The default value is true.
metricsTimeBucketSizeInSecsControls the time interval at which Storm reports spout-related metrics. The default is 60 seconds.
Instantiate SpoutConfig as follows:
public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String nodeId)
SpoutConfig Parameters
hostsOne or more hosts that are Kafka ZooKeeper broker nodes (see "
BrokerHostsInterface").topicName of the Kafka topic that KafkaSpout will consume from.
zkrootRoot directory in ZooKeeper under which KafkaSpout consumer offsets are stored. The default is
/brokers.nodeIdZooKeeper node under which KafkaSpout stores offsets for each topic-partition. The node ID must be unique for each Topology. The topology uses this path to recover in failure scenarios, or when there is maintenance that requires killing the topology.
zkroot and nodeId are used to construct the ZooKeeper
path where Storm stores the Kafka offset. You can find offsets at
zkroot+"/"+nodeId.
To start processing messages from where the last operation left off, use the same
zkroot and nodeId. To start from the beginning of the
Kafka topic, set KafkaConfig.ignoreZKOffsets to
true.
Example
The following example illustrates the use of the KafkaSpout
class and related interfaces:
BrokerHosts hosts = new ZkHosts(zkConnString); SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + zkrootDir, node); spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

