Introduction
Apache Kafka est le standard de facto pour le streaming de données en temps réel, gérant des millions d'événements par seconde dans des environnements comme Netflix ou Uber. En 2026, avec la montée des workloads IA et edge computing, un cluster Kafka bien configuré est indispensable pour découpler microservices, traiter des logs massifs ou alimenter des pipelines ML. Ce tutoriel expert vous guide pas à pas pour déployer un cluster 3-nœuds haute disponibilité avec Docker Compose, implémenter producers et consumers en Python et Java, configurer la réplication et le monitoring. Vous obtiendrez un setup production-ready, tunable pour 10k+ TPS, avec focus sur la résilience et les performances. À la fin, vous saurez optimiser pour des cas réels comme le CDC ou les événements IoT. (128 mots)
Prérequis
- Docker et Docker Compose 2.20+ installés
- Python 3.11+ avec
pip install confluent-kafka - Java 21+ avec Maven
- 8 Go RAM minimum (pour cluster local)
- Connaissances avancées en réseaux distribués et idempotence
Déploiement cluster avec Docker Compose
version: '3.8'
services:
zookeeper-1:
image: confluentinc/cp-zookeeper:7.6.0
hostname: zookeeper-1
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "22181:2181"
zookeeper-2:
image: confluentinc/cp-zookeeper:7.6.0
hostname: zookeeper-2
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SERVERS: zookeeper-1:2888:3888;zookeeper-2:2888:3888
ports:
- "32181:2181"
kafka-1:
image: confluentinc/cp-kafka:7.6.0
hostname: kafka-1
depends_on:
- zookeeper-1
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2
kafka-2:
image: confluentinc/cp-kafka:7.6.0
hostname: kafka-2
depends_on:
- zookeeper-2
ports:
- "9093:9093"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2
kafka-3:
image: confluentinc/cp-kafka:7.6.0
hostname: kafka-3
ports:
- "9094:9094"
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9094
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2Ce docker-compose.yml déploie un cluster Kafka 3 brokers + 2 Zookeeper pour haute disponibilité. Les vars comme KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2 assurent la résilience. Lancez avec docker compose up -d. Piège : Vérifiez les ports advertised pour éviter les connexions externes ratées.
Vérification du cluster
Une fois lancé, vérifiez avec docker compose logs kafka-1. Vous devriez voir les brokers enregistrés. Utilisez kafka-topics --bootstrap-server localhost:9092 --list (installez kafka-tools si besoin). Ce setup tolère la perte d'un broker grâce à la réplication factor 2, comme un RAID distribué.
Création de topics partitionnés
#!/bin/bash
KAFKA_HOME=/opt/kafka # Ajustez si tools installés localement
$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic orders --partitions 6 --replication-factor 2 --config cleanup.policy=delete --config retention.ms=86400000 \
--config segment.ms=3600000
$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic user-events --partitions 12 --replication-factor 3 --config cleanup.policy=compact \
--config min.insync.replicas=2
$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic ordersCe script crée deux topics : 'orders' (6 partitions, TTL 1 jour) et 'user-events' (12 partitions, compaction pour exactly-once). min.insync.replicas=2 force l'ack multi-broker. Exécutez après installation des Kafka tools. Piège : Oublier les configs par défaut mène à des pertes de données.
Producer Python idempotent
Passons au code applicatif. Un producer Python avec Confluent Kafka gère l'idempotence et les retries, essentiel pour éviter les doublons en cas de reconnexion.
Producer Python avec transactions
from confluent_kafka import Producer
import json
import sys
import time
conf = {
'bootstrap.servers': 'localhost:9092,localhost:9093,localhost:9094',
'enable.idempotence': True,
'acks': 'all',
'retries': 5,
'max.in.flight.requests.per.connection': 1,
'transactional.id': 'producer-tx-1'
}
p = Producer(conf)
def delivery_report(err, msg):
if err is not None:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')
p.init_transactions()
for i in range(10):
data = {'order_id': i, 'amount': 100.0 + i, 'timestamp': time.time()}
p.begin_transaction()
p.produce('orders',
key=str(i),
value=json.dumps(data).encode('utf-8'),
callback=delivery_report)
p.commit_transaction()
time.sleep(1)
p.flush()Ce producer utilise transactions pour exactly-once semantics, avec idempotence activée et acks=all. Les clés partitionnent par order_id. Lancez avec python producer.py. Piège : Sans transactional.id unique, les tx échouent ; max.in.flight=1 évite les reorders.
Consumer Python avec offsets manuels
Le consumer gère les offsets manuels pour résumer les traitements, comme dans un ETL. Il scale horizontalement via consumer groups.
Consumer Python groupé
from confluent_kafka import Consumer, KafkaError
import json
conf = {
'bootstrap.servers': 'localhost:9092,localhost:9093,localhost:9094',
'group.id': 'order-processors',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
'session.timeout.ms': 10000,
'heartbeat.interval.ms': 3000
}
c = Consumer(conf)
c.subscribe(['orders'])
try:
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(f'Error: {msg.error()}')
break
data = json.loads(msg.value().decode('utf-8'))
print(f'Processed: {data} at offset {msg.offset()}')
# Simule traitement
c.commit(asynchronous=False)
except KeyboardInterrupt:
pass
finally:
c.close()Consumer group 'order-processors' lit en parallèle les partitions. commit manuel post-traitement assure at-least-once. Lancez plusieurs instances pour scalabilité. Piège : session.timeout trop bas cause rebalances inutiles ; désactivez auto-commit pour contrôle fin.
Kafka Streams en Java (agrégation)
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
import java.util.Properties;
public class OrderAggregator {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-aggregator");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("orders");
ObjectMapper mapper = new ObjectMapper();
stream
.mapValues(value -> {
try {
JsonNode node = mapper.readTree(value);
double amount = node.get("amount").asDouble();
return String.valueOf(amount);
} catch (Exception e) {
return "0.0";
}
})
.groupByKey()
.windowedBy(SessionWindows.with(Duration.ofMinutes(5)).grace(Duration.ofMinutes(1)))
.reduce((agg, val) -> String.valueOf(Double.parseDouble(agg) + Double.parseDouble(val)))
.toStream()
.to("order-sums", Produced.with(WindowedSerdes.sessionWindowedSerdeFrom(String.class), Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}Ce Kafka Streams app agrège les montants d'orders par session de 5min, output sur 'order-sums'. Utilise JSON parsing et windowing pour analytics temps réel. Compilez avec Maven (kafka-streams dependency). Piège : Choisir windowing inadapté cause memory leaks ; grace period nettoie les états orphelins.
Monitoring et tuning
Pour la prod, intégrez Prometheus + Grafana. Exposez JMX metrics via KAFKA_JMX_OPTS. Tunez num.network.threads=8 et log.segment.bytes=1GB pour 10k TPS.
Bonnes pratiques
- Réplication factor ≥3 en prod pour tolérance 2 faults.
- Utilisez exactly-once (idempotence + tx) pour finance/CDC.
- Partagez consumer groups pour scalabilité horizontale.
- Monitorez UnderReplicatedPartitions et latency p99.
- Séparez topics par workload (logs vs événements business).
Erreurs courantes à éviter
- Advertised listeners mal configurés : Clients ne reconnectent pas hors Docker.
- min.insync.replicas=1 : Perte données sur broker down.
- Pas de dead letter queue : Messages poison bloquent partitions.
- Ignore rebalances : Heartbeats trop espacés causent assignements perdus.
Pour aller plus loin
Plongez dans Kafka Connect pour CDC ou KRaft (sans Zookeeper). Explorez Schema Registry pour Avro. Découvrez nos formations Learni sur le streaming data pour certifs Confluent.