Working with Apache Kafka in Java Applications
Apache Kafka has grown into one of the most widely adopted platforms for building real-time streaming and messaging applications. Originally developed at LinkedIn, Kafka is now a top-level Apache Software Foundation project, known for its scalability, durability, and fault-tolerant event streaming capabilities. For Java developers, Kafka offers a rich API that integrates naturally with the Java ecosystem, allowing them to create both producers that send messages to Kafka topics and consumers that process those messages efficiently. Understanding how to work with Apache Kafka in Java applications involves learning about its architecture, configuration, data flow, and best practices to ensure high-performance and reliable messaging.
Introduction to Apache Kafka
Apache Kafka is a distributed publish-subscribe messaging system designed for handling large volumes of real-time data. Unlike traditional message brokers, Kafka is built to handle high throughput and to persist messages on disk for fault tolerance. Messages in Kafka are organized into topics, which are further split into partitions to allow parallel processing. Each partition is replicated across brokers to prevent data loss. Kafka’s design makes it ideal for event sourcing, log aggregation, stream processing, and data integration scenarios.
Java developers benefit from Kafka’s official Java client library, which offers Producer and Consumer APIs, along with an AdminClient for managing topics and configurations. Kafka Streams, another library in the Kafka ecosystem, allows developers to build real-time processing pipelines directly within Java applications.
Kafka Architecture Overview
To work effectively with Kafka in Java, it’s essential to understand its core components:
Brokers
A broker is a Kafka server responsible for storing messages and serving clients. In production, multiple brokers form a Kafka cluster, which distributes topics and partitions among them for scalability.
Topics and Partitions
A topic is a category or feed name where messages are published. Each topic is divided into partitions, which are ordered and immutable sequences of messages. Partitioning allows Kafka to process data in parallel, increasing throughput.
Producers
A producer is a Java application or service that sends records to Kafka topics. Producers can choose which partition to send a record to, either randomly, round-robin, or based on a key.
Consumers
A consumer is a Java application that subscribes to topics and processes messages from them. Consumers are organized into consumer groups for load balancing, where each message is consumed by only one member of the group.
ZooKeeper (Legacy Requirement)
Earlier versions of Kafka relied on Apache ZooKeeper for cluster coordination, topic configuration, and partition leader election. Since Kafka 2.8, it has been moving toward a self-managed metadata quorum, reducing dependency on ZooKeeper.
Setting Up Kafka for Java Development
Before writing Java code for Kafka, you need a local or remote Kafka instance. This usually involves:
- Downloading Kafka from the Apache Kafka website.
- Extracting it and starting the ZooKeeper server (if using a pre-2.8 version).
- Starting the Kafka broker.
- Creating topics using the Kafka command-line tools.
For example, to create a topic named my-topic with one partition and replication factor of one:
bash bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Once the broker is running, you can connect your Java application to it.
Adding Kafka Dependencies in a Java Project
If you are using Maven, include the Kafka client dependency in your pom.xml:
xml <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.8.0</version>
</dependency>
For Gradle:
gradle implementation 'org.apache.kafka:kafka-clients:3.8.0'
This gives you access to the KafkaProducer, KafkaConsumer, and AdminClient classes.
Writing a Kafka Producer in Java
A Kafka producer in Java sends messages to a Kafka topic. Here’s a basic example:
java import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class SimpleProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "key" + i, "message " + i);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("Sent message to %s partition %d offset %d%n",
metadata.topic(), metadata.partition(), metadata.offset());
} else {
exception.printStackTrace();
}
});
}
}
}
}
This producer connects to localhost:9092 and sends ten messages to the my-topic topic. The send method is asynchronous, and the callback handles confirmation or errors.
Writing a Kafka Consumer in Java
A Kafka consumer reads messages from one or more topics. Here’s a basic consumer:
java import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class SimpleConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received message: key=%s value=%s partition=%d offset=%d%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
}
}
}
This consumer subscribes to my-topic and continuously polls for messages, printing them to the console.
Managing Kafka Topics in Java
The AdminClient API lets Java applications create, delete, and inspect Kafka topics programmatically:
java import org.apache.kafka.clients.admin.*;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaTopicManager {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient adminClient = AdminClient.create(props)) {
NewTopic newTopic = new NewTopic("admin-created-topic", 1, (short) 1);
adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
System.out.println("Topic created successfully.");
}
}
}
This approach can be useful when you want your application to automatically set up topics.
Working with Kafka Streams in Java
Kafka Streams is a client library that lets you process data streams in a declarative way. A simple example to count words from a Kafka topic:
java import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.StreamsConfig;
import java.util.Properties;
public class WordCountStream {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("text-topic");
KTable<String, Long> wordCounts = textLines
.flatMapValues(text -> java.util.Arrays.asList(text.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count();
wordCounts.toStream().to("wordcount-output");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
This program reads messages from text-topic, splits them into words, counts occurrences, and writes the results to wordcount-output.
Best Practices for Using Kafka in Java Applications
- Set appropriate partition keys to ensure even distribution and ordered processing where required.
- Use asynchronous send for higher throughput, but handle exceptions in callbacks.
- Tune consumer poll intervals and commit offsets carefully to balance performance and reliability.
- Configure retries and acknowledgments to ensure message delivery without duplicates.
- Monitor lag in consumer groups to detect processing delays.
- Use Avro or Protobuf for structured message serialization instead of plain strings.
Testing Kafka Java Applications
You can use EmbeddedKafka (from Spring Kafka) or Testcontainers to spin up Kafka instances for integration testing. This ensures that your producer and consumer code is tested in an environment close to production.
Conclusion
Working with Apache Kafka in Java applications enables developers to handle high-throughput, real-time data processing with reliability and scalability. By understanding the architecture, APIs, and best practices, Java developers can create robust event-driven systems. From basic producer-consumer workflows to advanced stream processing with Kafka Streams, the integration possibilities are vast. With careful configuration and testing, Kafka-powered Java applications can efficiently manage data pipelines in modern distributed systems.
