Creating Kafka Consumer using Java
A consumer is also instantiated by
providing properties object as configuration.Similar to the StringSerialization
in producer, we have StringDeserializer in consumer to convert bytes back to
Object.group.id is a must have property and here it is an arbitrary value.This
value becomes important for kafka broker when we have a consumer group of a
broker.With this group id, kafka broker ensures that the same message is not
consumed more then once by a consumer group meaning a message can be only
consumed by any one member a consumer group.
There are following steps taken to
create a consumer:
1. Create consumer properties.
2. Create a consumer.
3. Subscribe the consumer to a specific
topic.
4. Poll for some new data
Create consumer properties: -
key.deserializer: It is a Deserializer class for the
key, which is used to implement the
'org.apache.kafka.common.serialization.Deserializer' interface.
value.deserializer: A Deserializer class for value
which implements the 'org.apache.kafka.common.serialization.Desrializer'
interface.
bootstrap.servers: It is a list of host/port pairs
which is used to establish an initial connection with the Kafka cluster. It
does not contain a full set of servers that a client requires. Only the servers
which are required for bootstrapping are required.
group.id: It is a unique string which
identifies the consumer of a consumer group. This property is needed when a
consumer uses either Kafka based offset management strategy or group management
functionality via subscribing to a topic.
auto.offset.reset: This property is required when no
initial offset is present or if the current offset does not exist anymore on
the server. There are the following values used to reset the offset values:
earliest: This offset variable automatically
reset the value to its earliest offset.
latest: This offset variable reset the
offset value to its latest offset.
none: If no previous offset is found for
the previous group, it throws an exception to the consumer.
anything else: It throws an exception to the
consumer.
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id", "cfamily-group");
Creating the consumer
KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
Subscribe the consumer to a specific
topic
List topics = new ArrayList();
topics.add("cfamily");
kafkaConsumer.subscribe(topics);
Poll for some new data
while (true){
ConsumerRecords records = kafkaConsumer.poll(10);
for (ConsumerRecord record: records){
System.out.println(String.format("Topic - %s, Partition - %d, Value: %s", record.topic(), record.partition(), record.value()));
}
}
E.g:-
public class Consumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id", "cfamily-group");
KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
List topics = new ArrayList();
topics.add("cfamily");
kafkaConsumer.subscribe(topics);
try{
while (true){
ConsumerRecords records = kafkaConsumer.poll(10);
for (ConsumerRecord record: records){
System.out.println(String.format("Topic - %s, Partition - %d, Value: %s", record.topic(), record.partition(), record.value()));
}
}
}catch (Exception e){
System.out.println(e.getMessage());
}finally {
kafkaConsumer.close();
}
}
}