Introduction
Apache Kafka is the de facto standard for real-time data streaming, handling millions of events per second in environments like Netflix or Uber. In 2026, with the rise of AI workloads and edge computing, a well-configured Kafka cluster is essential for decoupling microservices, processing massive logs, or feeding ML pipelines. This expert tutorial walks you through deploying a highly available 3-node cluster with Docker Compose, implementing producers and consumers in Python and Java, and configuring replication and monitoring. You'll end up with a production-ready setup tunable for 10k+ TPS, emphasizing resilience and performance. By the end, you'll know how to optimize for real-world use cases like CDC or IoT events. (128 words)
Prerequisites
- Docker and Docker Compose 2.20+ installed
- Python 3.11+ with
pip install confluent-kafka - Java 21+ with Maven
- Minimum 8 GB RAM (for local cluster)
- Advanced knowledge of distributed networks and idempotence
Cluster Deployment with Docker Compose
version: '3.8'
services:
zookeeper-1:
image: confluentinc/cp-zookeeper:7.6.0
hostname: zookeeper-1
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "22181:2181"
zookeeper-2:
image: confluentinc/cp-zookeeper:7.6.0
hostname: zookeeper-2
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SERVERS: zookeeper-1:2888:3888;zookeeper-2:2888:3888
ports:
- "32181:2181"
kafka-1:
image: confluentinc/cp-kafka:7.6.0
hostname: kafka-1
depends_on:
- zookeeper-1
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2
kafka-2:
image: confluentinc/cp-kafka:7.6.0
hostname: kafka-2
depends_on:
- zookeeper-2
ports:
- "9093:9093"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2
kafka-3:
image: confluentinc/cp-kafka:7.6.0
hostname: kafka-3
ports:
- "9094:9094"
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9094
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2This docker-compose.yml deploys a 3-broker Kafka cluster + 2 ZooKeepers for high availability. Vars like KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2 ensure resilience. Run with docker compose up -d. Pitfall: Check advertised ports to avoid failed external connections.
Verifying the Cluster
Once launched, check with docker compose logs kafka-1. You should see the brokers registered. Use kafka-topics --bootstrap-server localhost:9092 --list (install kafka-tools if needed). This setup tolerates losing one broker thanks to replication factor 2, like distributed RAID.
Creating Partitioned Topics
#!/bin/bash
KAFKA_HOME=/opt/kafka # Adjust if tools are installed locally
$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic orders --partitions 6 --replication-factor 2 --config cleanup.policy=delete --config retention.ms=86400000 \
--config segment.ms=3600000
$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic user-events --partitions 12 --replication-factor 3 --config cleanup.policy=compact \
--config min.insync.replicas=2
$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic ordersThis script creates two topics: 'orders' (6 partitions, 1-day TTL) and 'user-events' (12 partitions, compaction for exactly-once). min.insync.replicas=2 forces multi-broker acks. Run after installing Kafka tools. Pitfall: Forgetting default configs leads to data loss.
Idempotent Python Producer
Now let's dive into application code. A Python producer using Confluent Kafka handles idempotence and retries, crucial for avoiding duplicates during reconnections.
Python Producer with Transactions
from confluent_kafka import Producer
import json
import sys
import time
conf = {
'bootstrap.servers': 'localhost:9092,localhost:9093,localhost:9094',
'enable.idempotence': True,
'acks': 'all',
'retries': 5,
'max.in.flight.requests.per.connection': 1,
'transactional.id': 'producer-tx-1'
}
p = Producer(conf)
def delivery_report(err, msg):
if err is not None:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')
p.init_transactions()
for i in range(10):
data = {'order_id': i, 'amount': 100.0 + i, 'timestamp': time.time()}
p.begin_transaction()
p.produce('orders',
key=str(i),
value=json.dumps(data).encode('utf-8'),
callback=delivery_report)
p.commit_transaction()
time.sleep(1)
p.flush()This producer uses transactions for exactly-once semantics, with idempotence enabled and acks=all. Keys partition by order_id. Run with python producer.py. Pitfall: Without a unique transactional.id, transactions fail; max.in.flight=1 prevents reordering.
Python Consumer with Manual Offsets
The consumer handles manual offsets for processing summaries, like in an ETL. It scales horizontally via consumer groups.
Grouped Python Consumer
from confluent_kafka import Consumer, KafkaError
import json
conf = {
'bootstrap.servers': 'localhost:9092,localhost:9093,localhost:9094',
'group.id': 'order-processors',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
'session.timeout.ms': 10000,
'heartbeat.interval.ms': 3000
}
c = Consumer(conf)
c.subscribe(['orders'])
try:
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(f'Error: {msg.error()}')
break
data = json.loads(msg.value().decode('utf-8'))
print(f'Processed: {data} at offset {msg.offset()}')
# Simule traitement
c.commit(asynchronous=False)
except KeyboardInterrupt:
pass
finally:
c.close()Consumer group 'order-processors' reads partitions in parallel. Manual commits after processing ensure at-least-once delivery. Launch multiple instances for scalability. Pitfall: Too-low session.timeout causes unnecessary rebalances; disable auto-commit for fine control.
Kafka Streams in Java (Aggregation)
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
import java.util.Properties;
public class OrderAggregator {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-aggregator");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("orders");
ObjectMapper mapper = new ObjectMapper();
stream
.mapValues(value -> {
try {
JsonNode node = mapper.readTree(value);
double amount = node.get("amount").asDouble();
return String.valueOf(amount);
} catch (Exception e) {
return "0.0";
}
})
.groupByKey()
.windowedBy(SessionWindows.with(Duration.ofMinutes(5)).grace(Duration.ofMinutes(1)))
.reduce((agg, val) -> String.valueOf(Double.parseDouble(agg) + Double.parseDouble(val)))
.toStream()
.to("order-sums", Produced.with(WindowedSerdes.sessionWindowedSerdeFrom(String.class), Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}This Kafka Streams app aggregates order amounts by 5-minute sessions, outputting to 'order-sums'. It uses JSON parsing and windowing for real-time analytics. Compile with Maven (kafka-streams dependency). Pitfall: Wrong windowing causes memory leaks; grace period cleans orphaned states.
Monitoring and Tuning
For production, integrate Prometheus + Grafana. Expose JMX metrics via KAFKA_JMX_OPTS. Tune num.network.threads=8 and log.segment.bytes=1GB for 10k TPS.
Best Practices
- Replication factor ≥3 in production for 2-fault tolerance.
- Use exactly-once (idempotence + transactions) for finance/CDC.
- Share consumer groups for horizontal scalability.
- Monitor UnderReplicatedPartitions and p99 latency.
- Separate topics by workload (logs vs. business events).
Common Errors to Avoid
- Misconfigured advertised listeners: Clients fail to reconnect outside Docker.
- min.insync.replicas=1: Data loss on broker failure.
- No dead letter queue: Poison messages block partitions.
- Ignoring rebalances: Sparse heartbeats cause lost assignments.
Next Steps
Dive into Kafka Connect for CDC or KRaft (ZooKeeper-free). Explore Schema Registry for Avro. Check out our Learni training on data streaming for Confluent certifications.