Learn Apache Kafka

Kafka consumer and producer with a custom serializer

Kafka consumer and producer with a custom serializer

Kafka allows us to create our own serializer and deserializer so that we can produce and consume different data types like Json, POJO, Avro etc.. In this post will see how to produce and consumer “User” POJO object. To stream POJO objects one needs to create custom serializer and deserializer.

 

Maven Dependency:

<dependency>

     <groupId>org.apache.kafka</groupId>

     <artifactId>kafka-clients</artifactId>

     <version>2.1.0</version>

</dependency>

<dependency>

     <groupId>org.codehaus.jackson</groupId>

     <artifactId>jackson-mapper-asl</artifactId>

      <version>1.9.13</version>

</dependency>

 

1. First will create User POJO class.

package com.cfamilycomputers.kafka.beans
class User() {
  private var name:String = ""
  private var age:Int = 0
  def this(name: String, age: Int) {
    this()
    this.name =name
    this.age = age
  }
  def getName: String = this.name
  def getAge: Int = this.age
  override def toString: String = "User(" + name + ", " + age + ")"
}

 

2. Create User serializer class by extending Kafka Serializer

package com.cfamilycomputers.kafka.jackson
import java.util
import com.cfamilycomputers.kafka.beans.User
import org.apache.kafka.common.serialization.Serializer
import org.codehaus.jackson.map.ObjectMapper
class UserSerializer extends Serializer[User]{
  override def configure(map: util.Map[String, _], b: Boolean): Unit = {
  }
  override def serialize(s: String, t: User): Array[Byte] = {
    if(t==null)
      null
    else
     {
       val objectMapper = new ObjectMapper()
       objectMapper.writeValueAsString(t).getBytes
     }
  }
  override def close(): Unit = {
  }
}

 

3. Create User deserializer class by extending Kafka Deserializer

package com.cfamilycomputers.kafka.jackson
import java.util
import com.cfamilycomputers.kafka.beans.User
import org.apache.kafka.common.serialization.Deserializer
import org.codehaus.jackson.map.ObjectMapper
class UserDeserializer extends Deserializer[User] {
  override def configure(map: util.Map[String, _], b: Boolean): Unit = {
  }
  override def deserialize(s: String, bytes: Array[Byte]): User = {
    val mapper = new ObjectMapper()
    val user = mapper.readValue(bytes, classOf[User])
    user
  }
  override def close(): Unit = {
  }
}

 

4. Create a Kafka consumer and use UserDeserializer for “value.deserializer” property.

package com.cfamilycomputers.kafka.jackson
import java.util.Properties
import com.cfamilycomputers.kafka.beans.User
import org.apache.kafka.clients.consumer.KafkaConsumer
import scala.collection.JavaConverters._
object KafkaConsumerWithUserObject extends App {
  val prop:Properties = new Properties()
  prop.put("group.id", "test")
  prop.put("bootstrap.servers","192.168.1.100:9092") prop.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
prop.put("value.deserializer","com.nelamalli.kafka.jackson.UserDeserializer")
  prop.put("enable.auto.commit", "true")
  prop.put("auto.commit.interval.ms", "1000")
  val consumer = new KafkaConsumer[String,User](prop)
  val topics = List("user_user")
  try{
    consumer.subscribe(topics.asJava)
    while(true){
      val records = consumer.poll(10)
      for(record<-records.asScala){
        println("Topic: "+record.topic()+", Key: "+record.key() +", Value: "+record.value().getName +
          ", Offset: "+record.offset() +", Partition: "+record.partition())
      }
    }
  }catch{
    case e:Exception => e.printStackTrace()
  }finally {
    consumer.close()
  }
}

 

5. Create a Kafka producer and use UserSerializer for “value.serializer” property.

package com.cfamilycomputers.kafka.jackson
import java.util.Properties
import com.cfamilycomputers.kafka.beans.User
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
object KafkaProducerWithUserObject {
  val props:Properties = new Properties()
  props.put("bootstrap.servers","192.168.1.100:9092")
  props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
  props.put("value.serializer","com.cfamilycomputers.kafka.jackson.UserSerializer")
  props.put("acks","all")
  val producer = new KafkaProducer[String, User](props)
  try{
    for(i <- 0 to 100) {
      val user = new User("My Name - "+i,i)
      val record = new ProducerRecord[String, User]("user_topic",i.toString,user)
      val metadata = producer.send(record)
      printf(s"sent record(key=%s value=%s) " +
        "meta(partition=%d, offset=%d)\n",
        record.key(), record.value(), metadata.get().partition(),
        metadata.get().offset());
    }
  }catch{
    case e:Exception => e.printStackTrace()
  }finally {
    producer.close()
  }
}

 

6. Run KafkaConsumerWithUserObject

 

7. Run KafkaProducerWithUserObject