Learn about Kafka Producer and a Producer Example in Apache Kafka with step by step guide to realize a producer using Java.

What is a Producer in Apache Kafka ?

A producer is an application that is source of data stream. It generates tokens or messages and publish it to one or more topics in the Kafka cluster. The Producer API from Kafka helps to pack the message or token and deliver it to Kafka Server.

Following is a picture demonstrating the working of Producer in Apache Kafka.

Producer in Apache Kafka - Producer Example in Apache Kafka - Apache Kafka Tutorial - www.tutorialkart.com
Producer Application in Apache Kafka
ADVERTISEMENT

Producer Example in Apache Kafka

In this Apache Kafka Tutorial, we shall learn Producer in Apache Kafka with a Java Example program. Following is a step by step process to write a simple Producer Example in Apache Kafka.

Create Java Project

Create a new Java Project called KafkaExamples, in your favorite IDE. In this example, we shall use Eclipse. But the process should remain same for most of the other IDEs.

Add Jars to Build Path

Add following jars to the Java Project Build Path.Note : The jars are available in the lib folder of Apache Kafka download from [[https://kafka.apache.org/downloads]].

  • kafka_2.11-0.11.0.0.jar
  • kafka-clients-0.11.0.0.jar
  • scala-library-2.12.3.jar
  • slf4j-api-1.7.25.jar
  • slf4j-log4j12-1.7.25.jar
  • log4j-1.2.17.jar

New SampleProducer Thread

Create a new class for a sample Producer, SampleProducer.java, that extends Thread. So that Producer could be launched as a new thread from a machine on demand.

public class SampleProducer extends Thread {
    . . .
}

Properties of Kafka Producer

Provide the information like Kafka Server URL, Kafka Server Port, Producer’s ID (Client ID), Serializers for Key and Value.

Properties properties = new Properties();
properties.put("bootstrap.servers", KAFKA_SERVER_URL + ":" + KAFKA_SERVER_PORT);
properties.put("client.id", "DemoProducer");
properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Note : Make sure that the Server URL and PORT are in compliance with the values in //config/server.properties.

Create Kafka Producer with the Properties

With the properties that have been mentioned above, create a new KafkaProducer.

KafkaProducer producer = new KafkaProducer<>(properties);

Synchronous or Asynchronous

You may send the events from Producer to the Kafka Server synchronously or asynchronously.

Send Messages Synchronously

You may send messages synchronously (i.e., a new message is sent only after completing the previous message/transaction) as shown below :

try {
    producer.send(new ProducerRecord<>(topic,
            messageNo,
            messageStr)).get();
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
    // handle the exception
}

When messages are sent synchronously, they are prone to interruption or stoppage of their transmission to the Kafka Server. InterruptedException and ExecutionException thrown by the send function have to be handled.

Send Messages Asynchronously

You may send messages asynchronously as shown below :

producer.send(new ProducerRecord<>(topic,
        messageNo,
        messageStr), new DemoCallBack(startTime, messageNo, messageStr));

When a message is sent asynchronously, you need to provide a CallBack class that implements onCompletion() method which is called when a message is sent successfully and acknowledged by Kafka Server. We have provided a DemoCallBack class here for the call back purpose.

Start Zookeeper and Kafka Cluster

Navigate to the root of Kafka directory and run each of the following commands in separate terminals to start Zookeeper and Kafka Cluster.

$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties

Start the SampleProducer thread

SampleProducer producerThread = new SampleProducer(TOPIC, isAsync);
producerThread.start();

Complete Java Producer Example in Apache Kafka

SampleProducer.java

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * Producer Example in Apache Kafka
 * @author www.tutorialkart.com
 */
public class SampleProducer extends Thread {
	private final KafkaProducer<Integer, String> producer;
	private final String topic;
	private final Boolean isAsync;

	public static final String KAFKA_SERVER_URL = "localhost";
	public static final int KAFKA_SERVER_PORT = 9092;
	public static final String CLIENT_ID = "SampleProducer";

	public SampleProducer(String topic, Boolean isAsync) {
		Properties properties = new Properties();
		properties.put("bootstrap.servers", KAFKA_SERVER_URL + ":" + KAFKA_SERVER_PORT);
		properties.put("client.id", CLIENT_ID);
		properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
		properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		producer = new KafkaProducer<>(properties);
		this.topic = topic;
		this.isAsync = isAsync;
	}

	public void run() {
		int messageNo = 1;
		while (true) {
			String messageStr = "Message_" + messageNo;
			long startTime = System.currentTimeMillis();
			if (isAsync) { // Send asynchronously
				producer.send(new ProducerRecord<>(topic,
						messageNo,
						messageStr), new DemoCallBack(startTime, messageNo, messageStr));
			} else { // Send synchronously
				try {
					producer.send(new ProducerRecord<>(topic,
							messageNo,
							messageStr)).get();
					System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
				} catch (InterruptedException | ExecutionException e) {
					e.printStackTrace();
					// handle the exception
				}
			}
			++messageNo;
		}
	}
}

class DemoCallBack implements Callback {

	private final long startTime;
	private final int key;
	private final String message;

	public DemoCallBack(long startTime, int key, String message) {
		this.startTime = startTime;
		this.key = key;
		this.message = message;
	}

	/**
	 * onCompletion method will be called when the record sent to the Kafka Server has been acknowledged.
	 * 
	 * @param metadata  The metadata contains the partition and offset of the record. Null if an error occurred.
	 * @param exception The exception thrown during processing of this record. Null if no error occurred.
	 */
	public void onCompletion(RecordMetadata metadata, Exception exception) {
		long elapsedTime = System.currentTimeMillis() - startTime;
		if (metadata != null) {
			System.out.println(
					"message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
					"), " +
					"offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
		} else {
			exception.printStackTrace();
		}
	}
}

KafkaProducerDemo.java

public class KafkaProducerDemo {
	public static final String TOPIC = "testTopic";
	
    public static void main(String[] args) {
        boolean isAsync = false;
        SampleProducer producerThread = new SampleProducer(TOPIC, isAsync);
        // start the producer
        producerThread.start();

    }
}

Run KafkaProducerDemo.java.

Sent message: (1, Message_1)
Sent message: (2, Message_2)
Sent message: (3, Message_3)
Sent message: (4, Message_4)
Sent message: (5, Message_5)
Sent message: (6, Message_6)
Sent message: (7, Message_7)
Sent message: (8, Message_8)
Sent message: (9, Message_9)
Sent message: (10, Message_10)
Sent message: (11, Message_11)
Sent message: (12, Message_12)

Messages are sent synchronously. You may change the value of isAsync to true to send messages Asynchronously to Kafka Cluster.

Conclusion :

In this Apache Kafka Tutorial – Kafka Producer Example, we have learnt about Kafka Producer, and presented a step by step guide to realize a Kafka Producer Application using Java.