Learn Apache Kafka

Kafka Producer in Scala

Kafka Producer Scala example:-

This Kafka Producer scala example publishes messages to a topic as a Record. Record is a key-value pair where the key is optional and value is mandatory. In this example we have key and value are string hence, we are using StringSerializer. In case if you have a key as a long value then you should use LongSerializer, the same applies for value as-well.

E.g:-

import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
object KafkaProducerApp extends App {
 
  val props:Properties = new Properties()
  props.put("bootstrap.servers","localhost:9092")
  props.put("key.serializer",
         "org.apache.kafka.common.serialization.StringSerializer")
  props.put("value.serializer",
         "org.apache.kafka.common.serialization.StringSerializer")
  props.put("acks","all")
  val producer = new KafkaProducer[String, String](props)
  val topic = "cfamily"
  try {
    for (i <- 0 to 15) {
      val record = new ProducerRecord[String, String](topic, i.toString, "My Site is cfamily.com " + i)
      val metadata = producer.send(record)
      printf(s"sent record(key=%s value=%s) " +
        "meta(partition=%d, offset=%d)\n",
        record.key(), record.value(),
        metadata.get().partition(),
        metadata.get().offset())
    }
  }catch{
    case e:Exception => e.printStackTrace()
  }finally {
    producer.close()
  }
}

 

Producer send method returns metadata where we can find; which partition message has written to and offset.