Apache Kafka Tutorial – Learn about Apache Kafka Consumer with Example Java Application working as a Kafka consumer. This updated guide explains what a Kafka consumer does, how polling and consumer groups work, which Java consumer properties are required, and how to run a simple Java application that reads records from a Kafka topic.

What is a Kafka Consumer in Apache Kafka?

A Kafka Consumer is an application that reads data from Kafka Topics. It subscribes to one or more topics in the Kafka cluster and receives records from the topic partitions assigned to it.

A consumer does not receive records by a one-time request only. In most Java applications, the consumer runs a polling loop. On each poll, the Kafka client fetches available records, the application processes them, and the consumer continues polling for more records. This is why Kafka consumer examples usually contain a repeated call to poll().

The connectivity of Consumer to Kafka Cluster is known using Heartbeat. Heartbeat is setup at Consumer to let Zookeeper or Broker Coordinator know if the Consumer is still connected to the Cluster. Absence of heartbeat means the Consumer is no longer connected to the Cluster, in which case the Broker Coordinator has to re-balance the load. Heartbeat is an overhead to the cluster. The interval at which the heartbeat at Consumer should happen is configurable by keeping the data throughput and overhead in consideration.

In modern Kafka terminology, the consumer group coordinator manages group membership and partition assignment. Heartbeats help the coordinator know that a consumer is still active. If a consumer stops heartbeating or stops polling for too long, Kafka may remove it from the group and reassign its partitions to other consumers in the same group.

Also, consumers could be grouped and the consumers in the Consumer Group could share the partitions of the Topics they subscribed to. If there are N partitions in a Topic, N consumers in the Consumer Group, and the group has subscribed to a Topic, each consumer would read data from a partition of the topic. This is just a heads up that Consumers could be in groups. We shall go into details of Consumer Group in out next tutorial.

The Consumer API from Kafka helps to connect to Kafka cluster and consume the data streams.

Following is a picture demonstrating the working of Consumer in Apache Kafka.

Consumer in Apache Kafka - Consumer Example in Apache Kafka - Apache Kafka Tutorial - www.tutorialkart.com
Consumer in Apache Kafka

How Kafka Consumer Works in a Java Application

In a Java Kafka consumer application, the client connects to Kafka using bootstrap.servers, joins a consumer group using group.id, subscribes to one or more topics, and repeatedly polls for records. Each record has a topic, partition, offset, key, value, timestamp, and optional headers.

  • Topic subscription: The consumer declares which topic or topics it wants to read.
  • Partition assignment: Kafka assigns topic partitions to consumers in the same group.
  • Polling: The Java consumer calls poll() to fetch available records.
  • Processing: The application reads each ConsumerRecord and performs the required business logic.
  • Offset management: Kafka tracks which records have been consumed by storing committed offsets for a consumer group.

Kafka Consumer with Example Java Application

Following is a step by step process to write a simple Consumer Example in Apache Kafka.

  • Create a Java project for the Kafka consumer.
  • Add Kafka client libraries to the Java build path.
  • Configure consumer properties such as broker address, group id, offset behavior, and deserializers.
  • Create a KafkaConsumer instance.
  • Subscribe the consumer to a Kafka topic.
  • Poll records and process each consumed message.
  • Run Kafka, start a producer, and verify the consumer output.

Java Project Setup for Kafka Consumer Example

1. Create Java Project

Create a new Java Project called KafkaExamples, in your favorite IDE. In this example, we shall use Eclipse. But the process should remain same for most of the other IDEs.

2. Add Jars to Build Path

Add following jars to the Java Project Build Path.Note : The jars are available in the lib folder of Apache Kafka download from [https://kafka.apache.org/downloads].

  • kafka_2.11-0.11.0.0.jar
  • kafka-clients-0.11.0.0.jar
  • scala-library-2.12.3.jar
  • slf4j-api-1.7.25.jar
  • slf4j-log4j12-1.7.25.jar
  • log4j-1.2.17.jar

For a newer Java project, a build tool such as Maven or Gradle is easier than adding individual jar files manually. The main dependency for a plain Kafka Java consumer is kafka-clients. Use a version that matches the Kafka version used in your environment.

</>
Copy
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.7.0</version>
</dependency>

The remaining example keeps the original Java class structure so that the consumer flow is clear for a beginner working in a basic IDE project.

Kafka Consumer Java Class and Configuration Properties

3. New SampleConsumer Thread

Create a new class for a sample Consumer, SampleConsumer.java, that extends Thread. So that Consumer could be launched as a new thread from a machine on demand.

</>
Copy
public class SampleConsumer extends Thread {
    . . .
}

4. Properties of Kafka Consumer

Provide the information like Kafka Server URL, Kafka Server Port, Consumer’s ID (Client ID), Serializers for Key and Value.

</>
Copy
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_URL + ":" + KAFKA_SERVER_PORT);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CLIENT_ID);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

Note : Make sure that the Server URL and PORT are in compliance with the values in /<kafka_directory>/config/server.properties.

The main Kafka consumer properties used in this Java example are explained below.

Consumer propertyPurpose in this Kafka consumer example
bootstrap.serversKafka broker address used by the Java consumer to connect to the cluster.
group.idName of the consumer group. Kafka uses this to manage partition assignment and committed offsets.
enable.auto.commitWhen set to true, the consumer commits offsets automatically at the configured interval.
auto.commit.interval.msTime interval at which offsets are automatically committed when auto commit is enabled.
session.timeout.msMaximum time Kafka waits without heartbeat before treating the consumer as unavailable.
key.deserializerDeserializer used to convert record key bytes into a Java object.
value.deserializerDeserializer used to convert record value bytes into a Java object.

In this tutorial, the producer sends an Integer key and a String value. Therefore, the consumer uses IntegerDeserializer for the key and StringDeserializer for the value. If your producer sends different data types, the consumer deserializers must match the produced data format.

5. Create Kafka Consumer with the Properties

With the properties that have been mentioned above, create a new KafkaConsumer.

</>
Copy
KafkaConsumer consumer = new KafkaConsumer<>(props);

Subscribe and Poll Records in Kafka Java Consumer

6. Subscribe Consumer to a Topic

Consumer has to subscribe to a Topic, from which it can receive records.

</>
Copy
consumer.subscribe(Collections.singletonList(this.topic));

The topic name must match the topic to which your Kafka producer writes records. In this example, both the producer and consumer use testTopic.

7. Fetch Records for the Topic

Fetch Records for the Topic that the Consumer has been subscribed to, using poll(long interval). interval is the time period over which, the records are aggregated.

</>
Copy
ConsumerRecords<Integer, String> records = consumer.poll(1000);

In recent Kafka client versions, poll(Duration) is preferred instead of the older poll(long) form. The idea remains the same: the consumer waits up to the specified time for records and returns a collection of records that the application can process.

</>
Copy
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(1000));

8. Consume the records

You may consumer the records as per your need or use case. Here, in this tutorial, we shall print those messages to console output.

</>
Copy
for (ConsumerRecord<Integer, String> record : records) {
            System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
        }

Each consumed record contains the key, value, topic, partition, and offset. The offset is the record position inside a partition. It is useful for tracking how far a consumer group has read from a topic partition.

Run Kafka Locally Before Starting Java Consumer

9. Start Zookeeper and Kafka Cluster

Navigate to the root of Kafka directory and run each of the following commands in separate terminals to start Zookeeper and Kafka Cluster.

$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties

The commands above match older Kafka distributions that use ZooKeeper. In newer Kafka setups, you may run Kafka in KRaft mode without ZooKeeper. Use the startup commands that match your Kafka download and make sure the broker is reachable at localhost:9092 before running the Java consumer.

Create the sample topic if it does not already exist.

</>
Copy
bin/kafka-topics.sh --create \
  --topic testTopic \
  --bootstrap-server localhost:9092 \
  --partitions 1 \
  --replication-factor 1

You can also check whether the topic is available before running the Java consumer.

</>
Copy
bin/kafka-topics.sh --list --bootstrap-server localhost:9092

10. Start the Kafka Producer

Well! There has to be a Producer of records for the Consumer to feed on. Start the Kafka Producer by following Kafka Producer with Java Example. Also note that, if you are changing the Topic name, make sure you use the same topic name for the Kafka Producer Example and Kafka Consumer Example Java Applications.

11. Start the SampleConsumer thread

</>
Copy
Consumer consumerThread = new Consumer("testTopic");
consumerThread.start();

The class name used while starting the thread must match the consumer class in your project. If your class is named SampleConsumer, create and start SampleConsumer instead.

Example Java Application that works as Kafka Consumer

SampleConsumer.java

</>
Copy
import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;
/**
* Kafka Consumer with Example Java Application
*/
public class SampleConsumer extends ShutdownableThread {
    private final KafkaConsumer<Integer, String> consumer;
    private final String topic;
    
	public static final String KAFKA_SERVER_URL = "localhost";
	public static final int KAFKA_SERVER_PORT = 9092;
	public static final String CLIENT_ID = "SampleConsumer";

    public SampleConsumer(String topic) {
        super("KafkaConsumerExample", false);
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_URL + ":" + KAFKA_SERVER_PORT);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, CLIENT_ID);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        consumer = new KafkaConsumer<>(props);
        this.topic = topic;
    }

    @Override
    public void doWork() {
        consumer.subscribe(Collections.singletonList(this.topic));
        ConsumerRecords<Integer, String> records = consumer.poll(1000);
        for (ConsumerRecord<Integer, String> record : records) {
            System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
        }
    }

    @Override
    public String name() {
        return null;
    }

    @Override
    public boolean isInterruptible() {
        return false;
    }
}

KafkaConsumerDemo.java

</>
Copy
/**
*  Kafka Consumer with Example Java Application
*/
public class KafkaConsumerDemo {
    public static void main(String[] args) {
        Consumer consumerThread = new Consumer("testTopic");
        consumerThread.start();
    }
}

Run KafkaConsumerDemo.java.

Kafka Producer Console Output

message(1, Message_1) sent to partition(0), offset(111467) in 419 ms
message(2, Message_2) sent to partition(0), offset(111468) in 80 ms
message(3, Message_3) sent to partition(0), offset(111469) in 76 ms
message(4, Message_4) sent to partition(0), offset(111470) in 76 ms
message(5, Message_5) sent to partition(0), offset(111471) in 76 ms
message(6, Message_6) sent to partition(0), offset(111472) in 75 ms
message(7, Message_7) sent to partition(0), offset(111473) in 73 ms
message(8, Message_8) sent to partition(0), offset(111474) in 81 ms
message(9, Message_9) sent to partition(0), offset(111475) in 75 ms
message(10, Message_10) sent to partition(0), offset(111476) in 75 ms

Kafka Consumer Console Output

Received message: (1, Message_1) at offset 111467
Received message: (2, Message_2) at offset 111468
Received message: (3, Message_3) at offset 111469
Received message: (4, Message_4) at offset 111470
Received message: (5, Message_5) at offset 111471
Received message: (6, Message_6) at offset 111472
Received message: (7, Message_7) at offset 111473
Received message: (8, Message_8) at offset 111474
Received message: (9, Message_9) at offset 111475
Received message: (10, Message_10) at offset 111476

Kafka Consumer Output and Offset Verification

The consumer output confirms that the Java application has read records from testTopic. In the sample output, the key and value are printed along with the offset. The producer output and consumer output show matching offsets, which means the records produced to the topic were later consumed by the Java consumer.

If you want a quick independent check, you can use the Kafka console consumer for the same topic.

</>
Copy
bin/kafka-console-consumer.sh \
  --topic testTopic \
  --from-beginning \
  --bootstrap-server localhost:9092

A console consumer output for the sample messages may look like this.

Message_1
Message_2
Message_3
Message_4
Message_5

Auto Commit and Manual Commit in Kafka Consumer

The example uses enable.auto.commit=true, so Kafka commits offsets automatically at regular intervals. This is simple for a beginner example, but production applications often need more control over when an offset is committed.

If you commit an offset before processing is fully complete, a crash may cause records to be skipped on restart. If you process a record but do not commit the offset, the same record may be read again after a restart. Choose offset commit behavior based on whether your application can safely process duplicate records.

A manual commit setup commonly disables auto commit and calls commit after successful processing.

</>
Copy
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

// After processing records successfully
consumer.commitSync();

Common Kafka Consumer Errors in Java Applications

If the Java Kafka consumer does not receive messages, check these common causes first.

  • Kafka broker is not running: Confirm that Kafka is started and reachable at localhost:9092.
  • Topic name mismatch: The producer and consumer must use the same topic name, such as testTopic.
  • No new records for the group: A consumer group reads from committed offsets. If the group already consumed old records, produce new records or use a new group id for testing.
  • Wrong deserializer: The configured key and value deserializers must match the data written by the producer.
  • Consumer stopped polling: Long processing between polls can make Kafka consider the consumer inactive, which can trigger a rebalance.
  • Class name mismatch: Make sure the class used in KafkaConsumerDemo.java matches the actual consumer class name in your project.

Kafka Consumer Java Example Best Practices

The sample code is intentionally small. For a real Kafka consumer application in Java, keep these practices in mind.

  • Use a controlled polling loop and provide a clean shutdown path.
  • Close the consumer with consumer.close() when the application stops.
  • Handle deserialization errors and processing exceptions clearly.
  • Use manual offset commits when your application must commit only after successful processing.
  • Keep processing time within consumer timeout limits, or tune the relevant consumer properties carefully.
  • Use a stable group.id for an application that should continue from previously committed offsets.
  • Use a different group.id for test runs when you want to read messages independently.

Kafka Consumer in Java FAQ

How to write a Kafka consumer in Java?

To write a Kafka consumer in Java, add the Kafka client dependency, configure bootstrap.servers, group.id, and deserializers, create a KafkaConsumer, subscribe to a topic, and call poll() in a loop to receive records.

How to consume Kafka message in Java?

Use consumer.poll() to fetch records from Kafka. Then iterate through the returned ConsumerRecords collection and read each ConsumerRecord using methods such as key(), value(), partition(), and offset().

How to use Kafka in a Java application?

Use the Kafka Java client library. A Java application can use KafkaProducer to write records, KafkaConsumer to read records, and the Admin client to manage topics and other Kafka resources.

Why does a Kafka Java consumer usually have a while loop?

A Kafka consumer is designed to keep polling for records as long as the application is running. A loop keeps the consumer active, receives new records, and maintains membership in the consumer group through polling and heartbeats.

Is Kafka better with Java or Python?

Kafka can be used with both Java and Python. Java is a common choice for JVM services and direct use of the official Kafka client API. Python is useful for scripts, data workflows, and applications already built in Python. The better choice depends on the application stack, performance needs, and team experience.

Editorial QA Checklist for Kafka Consumer Java Tutorial

  • Does the tutorial explain that a Kafka consumer reads from topics through polling?
  • Are consumer group, partition assignment, heartbeat, and rebalance explained in the Kafka consumer context?
  • Are bootstrap.servers, group.id, deserializers, and offset commit settings described clearly?
  • Does the Java example show topic subscription, polling, and reading each ConsumerRecord?
  • Does the troubleshooting section cover topic mismatch, deserializer mismatch, missing broker, and consumed offsets?

Conclusion

In this Apache Kafka Tutorial – Kafka Consumer with Example Java Application, we have learnt about Kafka Consumer, and presented a step by step guide to realize a Kafka Consumer Application using Java.