Introduction
Apache Kafka est un système de streaming distribué incontournable pour gérer des flux de données en temps réel, utilisé par Netflix, Uber ou LinkedIn pour traiter des milliards d'événements par jour. Implémenter des clients Kafka (producers et consumers) en Node.js permet d'intégrer cette puissance dans des applications backend modernes. Ce tutoriel intermediate se concentre sur kafkajs, la librairie la plus performante et maintenue pour Node.js en 2026.
Pourquoi c'est crucial ? Les clients Kafka gèrent la résilience, la partitionnement et les offsets pour des pipelines de données fiables. Nous démarrerons Kafka localement via Docker, créerons un producer batché avec clés de partition, un consumer en group avec rebalancement, et aborderons la gestion d'erreurs avancée. À la fin, vous aurez un setup production-ready, scalable horizontalement. Temps estimé : 20 minutes pour un prototype fonctionnel. (128 mots)
Prérequis
- Node.js 20+ installé
- Docker et Docker Compose pour Kafka local
- Connaissances de base en Kafka (topics, partitions, brokers)
- Éditeur de code avec support TypeScript (VS Code recommandé)
- Terminal pour exécuter les commandes
Démarrer Kafka localement
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.7.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.7.0
hostname: kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'Ce docker-compose.yml lance Zookeeper et un broker Kafka standalone sur localhost:9092. Les variables d'environnement configurent les listeners pour l'accès host et inter-conteneurs. Exécutez docker compose up -d ensuite ; cela crée automatiquement les topics au premier envoi.
Lancer Kafka et vérifier
Sauvegardez le fichier et exécutez docker compose up -d dans le terminal. Vérifiez les logs avec docker compose logs kafka. Kafka est prêt quand vous voyez 'started (kafka.server.KafkaServer)'. Ce setup simule un cluster mono-noeud, parfait pour le dev ; en prod, scalez avec plusieurs brokers.
Initialiser le projet Node.js
mkdir kafka-clients && cd kafka-clients
npm init -y
npm install kafkajs
npm install -D typescript @types/node ts-node
npx tsc --initOn crée un dossier projet, initialise npm, installe kafkajs (librairie zero-deps, async-native), TypeScript et ts-node pour exécution directe. Le tsc --init génère un tsconfig de base ; ajustez-le pour ES2022 et strict mode.
Configurer TypeScript
{
"compilerOptions": {
"target": "ES2022",
"module": "NodeNext",
"moduleResolution": "NodeNext",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"outDir": "./dist",
"rootDir": "./src"
},
"ts-node": {
"esm": true
}
}Ce tsconfig active le mode strict pour catcher les erreurs tôt, cible ES2022 pour top-level await, et configure ts-node pour ESM. Créez un dossier src/ ; cela assure une compilation propre et une exécution fluide des scripts TS.
Mettre à jour package.json pour scripts
Ajoutez ces scripts dans package.json pour faciliter les lancements :
``json`
"scripts": {
"producer": "tsx src/producer.ts",
"consumer": "tsx src/consumer.ts",
"dev": "tsx watch src/"
}
Installez tsx globalement si besoin : npm i -g tsx. (tsx` remplace ts-node pour ESM natif en 2026.)
Créer un producer batché avec partitions
import { Kafka, Producer, ProducerRecord } from 'kafkajs';
async function runProducer() {
const kafka = new Kafka({
clientId: 'my-producer',
brokers: ['localhost:9092'],
});
const producer: Producer = kafka.producer({
allowAutoTopicCreation: true,
transactionTimeout: 30000,
idempotent: true,
maxInFlightRequestsPerConnection: 1,
});
await producer.connect();
// Envoyer 10 messages batchés avec clé de partition
const messages: ProducerRecord[] = [];
for (let i = 0; i < 10; i++) {
messages.push({
topic: 'test-topic',
messages: [{ key: `key-${i % 3}`, value: `Message ${i} en ${new Date().toISOString()}` }],
});
}
const result = await producer.sendBatch({ topicMessages: messages });
console.log('Messages envoyés:', result);
await producer.disconnect();
}
runProducer().catch(console.error);Ce producer connecte idempotent (exactly-once semantics), utilise sendBatch pour efficacité, et assigne des clés pour colocaliser les messages par partition (modulo 3). maxInFlightRequestsPerConnection:1 évite les réordonnancements ; surveillez les métriques dans les logs Kafka.
Créer un consumer en group avec offsets
import { Kafka, Consumer, EachMessagePayload, logLevel } from 'kafkajs';
async function runConsumer() {
const kafka = new Kafka({
clientId: 'my-consumer',
brokers: ['localhost:9092'],
logLevel: logLevel.INFO,
});
const consumer: Consumer = kafka.consumer({ groupId: 'test-group' });
await consumer.connect();
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true });
await consumer.run({
autoCommit: false,
eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
const prefix = `${topic}[${partition} | ${message.offset}] / ${message.key?.toString()}`;
console.log(`${prefix} = ${message.value?.toString()}`);
// Commit manuel après traitement
await consumer.commitOffsets([{ topic, partition, offset: (Number(message.offset) + 1).toString() }]);
},
});
}
runConsumer().catch(console.error);Ce consumer utilise un groupId pour load-balancing, fromBeginning: true pour replay, et commit manuel (autoCommit: false) pour at-least-once précis. Le callback eachMessage logue clé/offset/valeur ; gérez les retries ici pour idempotence.
Tester le flux complet
- Lancez le consumer :
npm run consumer(il attend). - Dans un autre terminal :
npm run producer(envoie 10 messages). - Observez les logs consumer : messages reçus avec offsets incrémentés.
Producer avec gestion d'erreurs retry
import { Kafka, Producer, ProducerRecord, KafkaJSNonRetriableError } from 'kafkajs';
async function runProducerWithRetry() {
const kafka = new Kafka({ clientId: 'producer-retry', brokers: ['localhost:9092'] });
const producer = kafka.producer({
retry: { maxRetryTime: 30000, initialRetryBackoffMs: 300 },
allowAutoTopicCreation: true,
});
await producer.connect();
try {
const messages: ProducerRecord[] = [
{
topic: 'retry-topic',
messages: [{ key: 'retry-key', value: 'Message avec retry' }],
},
];
const result = await producer.sendBatch({ topicMessages: messages });
console.log('Succès:', result);
} catch (error) {
if (error instanceof KafkaJSNonRetriableError) {
console.error('Erreur fatale:', error);
} else {
console.error('Retryable error, géré par config');
}
} finally {
await producer.disconnect();
}
}
runProducerWithRetry();Ajoute des retries configurables (backoff exponentiel) pour transient errors comme network blips. Distinguez KafkaJSNonRetriableError pour dead-letter queues. Le finally assure disconnect propre, évitant leaks de connexions.
Bonnes pratiques
- Idempotence et transactions : Activez
idempotent: trueettransactional.idpour exactly-once en prod. - Monitoring : Intégrez Prometheus via kafkajs instrumentation ; trackez latency et throughput.
- Sécurité : Utilisez SASL/SSL pour brokers cloud (Confluent, Aiven) ; stockez creds en env vars.
- Batching dynamique : Ajustez
batchSizebasé sur message rate pour optimiser I/O. - Graceful shutdown : Trappez SIGTERM pour drain queues avant disconnect.
Erreurs courantes à éviter
- Oublier les offsets : Sans commit manuel, redémarrages causent duplicatas ; utilisez
pause/resumepour backpressure. - Trop de in-flight requests : >5 par connexion réordonne ; limitez à 1 pour ordering strict.
- Auto-topic creation en prod : Désactivez-le ; créez topics manuellement avec partitions optimales (6-12).
- Ignorer rebalances : Écoutez
consumer.events.GROUP_REBALANCEpour cleaner state partagé.
Pour aller plus loin
- Documentation kafkajs : kafkajs.github.io
- Schema Registry avec Avro : Intégrez
@kafkajs/confluent-schema-registry - Kafka Streams en JS : Explorez
kafkajs-streams - Formations Learni : Maîtrisez Kafka en profondeur avec nos formations certifiantes (Confluent Developer).