Introduction
Apache Kafka est devenu le standard pour les architectures orientées événements. Les clients Kafka permettent d'envoyer et de consommer des messages de manière fiable à grande échelle. Ce tutoriel intermédiaire vous guide dans l'implémentation de producers et consumers avec Node.js et la librairie kafkajs. Vous apprendrez à gérer les connexions, les erreurs et les configurations avancées pour des environnements de production. Chaque étape inclut du code complet et fonctionnel.
Prérequis
- Node.js 20+
- Connaissances de base en TypeScript
- Un cluster Kafka accessible (local ou cloud)
- npm ou pnpm installé
Installation des dépendances
npm install kafkajs
npm install --save-dev typescript @types/nodeNous installons kafkajs, la librairie Kafka la plus utilisée pour Node.js, ainsi que TypeScript pour un typage strict.
Configuration du client Kafka
import { Kafka } from 'kafkajs';
export const kafka = new Kafka({
clientId: 'learni-app',
brokers: ['localhost:9092'],
retry: {
initialRetryTime: 100,
retries: 8
}
});Ce fichier centralise la configuration du client. Le retry permet de gérer les déconnexions temporaires du broker.
Création du Producer
import { kafka } from './client';
const producer = kafka.producer();
export async function sendMessage(topic: string, message: object) {
await producer.connect();
await producer.send({
topic,
messages: [{ value: JSON.stringify(message) }],
});
await producer.disconnect();
}Le producer se connecte, envoie un message sérialisé en JSON puis se déconnecte. Cette approche est simple mais peut être optimisée avec une connexion persistante en production.
Création du Consumer
import { kafka } from './client';
const consumer = kafka.consumer({ groupId: 'learni-group' });
export async function startConsumer(topic: string) {
await consumer.connect();
await consumer.subscribe({ topic, fromBeginning: true });
await consumer.run({
eachMessage: async ({ message }) => {
console.log('Message reçu:', message.value?.toString());
},
});
}Le consumer s'abonne à un topic et traite chaque message via un callback. Le groupId permet le load-balancing entre plusieurs instances.
Gestion des erreurs et reconnexion
consumer.on('consumer.crash', async (err) => {
console.error('Consumer crash:', err);
await consumer.disconnect();
setTimeout(() => startConsumer('orders'), 5000);
});Cet écouteur gère les crashes et tente une reconnexion automatique après 5 secondes, évitant ainsi les arrêts définitifs du consumer.
Bonnes pratiques
- Toujours utiliser un groupId unique par type de consumer
- Activer la compression des messages pour réduire la bande passante
- Implémenter un mécanisme de retry et de dead-letter queue
- Monitorer les offsets avec des outils comme Kafka UI
- Fermer proprement les connexions avec disconnect() lors de l'arrêt de l'application
Erreurs courantes à éviter
- Oublier d'appeler connect() avant send() ou subscribe()
- Utiliser le même groupId pour des consumers ayant des traitements différents
- Ne pas gérer les erreurs de désérialisation JSON
- Laisser les connexions ouvertes sans timeout en cas de redémarrage du broker
Pour aller plus loin
Découvrez nos formations avancées sur les systèmes distribués et Kafka : https://learni-group.com/formations