Learn Apache Kafka

Kafka Consumer in Java

Creating Kafka Consumer using Java

A consumer is also instantiated by providing properties object as configuration.Similar to the StringSerialization in producer, we have StringDeserializer in consumer to convert bytes back to Object.group.id is a must have property and here it is an arbitrary value.This value becomes important for kafka broker when we have a consumer group of a broker.With this group id, kafka broker ensures that the same message is not consumed more then once by a consumer group meaning a message can be only consumed by any one member a consumer group.

 

There are following steps taken to create a consumer:

1.     Create consumer properties.

2.     Create a consumer.

3.     Subscribe the consumer to a specific topic.

4.     Poll for some new data

 

Create consumer properties: -

key.deserializer: It is a Deserializer class for the key, which is used to implement the 'org.apache.kafka.common.serialization.Deserializer' interface.

 

value.deserializer: A Deserializer class for value which implements the 'org.apache.kafka.common.serialization.Desrializer' interface.

 

bootstrap.servers: It is a list of host/port pairs which is used to establish an initial connection with the Kafka cluster. It does not contain a full set of servers that a client requires. Only the servers which are required for bootstrapping are required.

 

group.id: It is a unique string which identifies the consumer of a consumer group. This property is needed when a consumer uses either Kafka based offset management strategy or group management functionality via subscribing to a topic.

 

auto.offset.reset: This property is required when no initial offset is present or if the current offset does not exist anymore on the server. There are the following values used to reset the offset values:

 

earliest: This offset variable automatically reset the value to its earliest offset.

 

latest: This offset variable reset the offset value to its latest offset.

 

none: If no previous offset is found for the previous group, it throws an exception to the consumer.

 

anything else: It throws an exception to the consumer.

 

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id", "cfamily-group");

 

Creating the consumer

KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);

 

Subscribe the consumer to a specific topic

List topics = new ArrayList();
topics.add("cfamily");
kafkaConsumer.subscribe(topics);

 

Poll for some new data

while (true){
        ConsumerRecords records = kafkaConsumer.poll(10);
        for (ConsumerRecord record: records){
             System.out.println(String.format("Topic - %s, Partition - %d, Value: %s", record.topic(), record.partition(), record.value()));
        }
}

                       

E.g:-

public class Consumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("group.id", "cfamily-group");
 
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        List topics = new ArrayList();
        topics.add("cfamily");
        kafkaConsumer.subscribe(topics);
        try{
            while (true){
                ConsumerRecords records = kafkaConsumer.poll(10);
                for (ConsumerRecord record: records){
                    System.out.println(String.format("Topic - %s, Partition - %d, Value: %s", record.topic(), record.partition(), record.value()));
                }
            }
        }catch (Exception e){
            System.out.println(e.getMessage());
        }finally {
            kafkaConsumer.close();
        }
    }
}