Chapter 6. Consuming Events/Messages from Kafka on a Secured Cluster
Prerequisite: Make sure that you have enabled access to the topic (via Ranger or native ACLs) for the user associated with the consumer process. We recommend that you use Ranger to manage permissions. For more information, see the Apache Ranger User Guide for Kafka.
During the installation process, Ambari configures a series of Kafka client and producer settings, and creates a JAAS configuration file for the Kafka client. It is not necessary to modify these values, but for more information see Kafka Configuration Options.
Note: Only the Kafka Java API is supported for Kerberos. Third-party clients are not supported.
To consume events/messages:
- Specify the path to the JAAS configuration file as one of your JVM parameters. For example: - -Djava.security.auth.login.config=/usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf- For more information about the - kafka_client_jaasfile, see "JAAS Configuration File for the Kafka Client" in Kafka Configuration Options.
- kinitwith the principal's keytab.
- Launch - kafka-console-consumer.shwith the following configuration settings. (Note: these settings are the same as in previous versions, except for the addition of- --security-protocol PLAINTEXTSASL.)- ./bin/kafka-console-consumer.sh --zookeeper c6401.ambari.apache.org:2181 --topic test_topic --from-beginning --security-protocol PLAINTEXTSASL
Consumer Code Example for a Kerberos-Enabled Cluster
The following example shows sample code for a producer in a Kerberos-enabled Kafka cluster.
    Note that the SECURITY_PROTOCOL_CONFIG property is set to
    SASL_PLAINTEXT.
package com.hortonworks.example.kafka.consumer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
public class BasicConsumerExample {
   public static void main(String[] args) {
       Properties consumerConfig = new Properties();
       consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka.example.com:6667");
       // specify the protocol for SSL Encryption
       consumerConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
       consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
       consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
       consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
       consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
       KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerConfig);
       TestConsumerRebalanceListener rebalanceListener = new TestConsumerRebalanceListener();
       consumer.subscribe(Collections.singletonList("test-topic"), rebalanceListener);
       while (true) {
           ConsumerRecords<byte[], byte[]> records = consumer.poll(1000);
           for (ConsumerRecord<byte[], byte[]> record : records) {
               System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
           }
           consumer.commitSync();
       }
   }
   private static class  TestConsumerRebalanceListener implements ConsumerRebalanceListener {
       @Override
       public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
           System.out.println("Called onPartitionsRevoked with partitions:" + partitions);
       }
       @Override
       public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
           System.out.println("Called onPartitionsAssigned with partitions:" + partitions);
       }
   }
}To run the example, issue the following command:
# java -Djava.security.auth.login.config=/usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf com.hortonworks.example.kafka.consumer.BasicConsumerExample
Troubleshooting
Issue: If you launch the consumer from the command-line
    interface without specifying the security-protocol option, you will see the
    following error:
2015-07-21 04:14:06,611] ERROR fetching topic metadata for topics 
[Set(test_topic)] from broker 
[ArrayBuffer(BrokerEndPoint(0,c6401.ambari.apache.org,6667), 
BrokerEndPoint(1,c6402.ambari.apache.org,6667))] failed 
(kafka.utils.CoreUtils$)
kafka.common.KafkaException: fetching topic metadata for topics 
[Set(test_topic)] from broker 
[ArrayBuffer(BrokerEndPoint(0,c6401.ambari.apache.org,6667), 
BrokerEndPoint(1,c6402.ambari.apache.org,6667))] failed 
     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
Caused by: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
     at kafka.utils.CoreUtils$.read(CoreUtils.scala:193)
     at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)Solution: Add --security-protocol
      PLAINTEXTSASL to the kafka-console-consumer.sh runtime options.

