Skip to content
Learni
Voir tous les tutoriels
Kafka

Comment implémenter des clients Kafka en Node.js en 2026

Read in English

Introduction

Apache Kafka est un système de streaming distribué incontournable pour gérer des flux de données en temps réel, utilisé par Netflix, Uber ou LinkedIn pour traiter des milliards d'événements par jour. Implémenter des clients Kafka (producers et consumers) en Node.js permet d'intégrer cette puissance dans des applications backend modernes. Ce tutoriel intermediate se concentre sur kafkajs, la librairie la plus performante et maintenue pour Node.js en 2026.

Pourquoi c'est crucial ? Les clients Kafka gèrent la résilience, la partitionnement et les offsets pour des pipelines de données fiables. Nous démarrerons Kafka localement via Docker, créerons un producer batché avec clés de partition, un consumer en group avec rebalancement, et aborderons la gestion d'erreurs avancée. À la fin, vous aurez un setup production-ready, scalable horizontalement. Temps estimé : 20 minutes pour un prototype fonctionnel. (128 mots)

Prérequis

  • Node.js 20+ installé
  • Docker et Docker Compose pour Kafka local
  • Connaissances de base en Kafka (topics, partitions, brokers)
  • Éditeur de code avec support TypeScript (VS Code recommandé)
  • Terminal pour exécuter les commandes

Démarrer Kafka localement

docker-compose.yml
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.7.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.7.0
    hostname: kafka
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'

Ce docker-compose.yml lance Zookeeper et un broker Kafka standalone sur localhost:9092. Les variables d'environnement configurent les listeners pour l'accès host et inter-conteneurs. Exécutez docker compose up -d ensuite ; cela crée automatiquement les topics au premier envoi.

Lancer Kafka et vérifier

Sauvegardez le fichier et exécutez docker compose up -d dans le terminal. Vérifiez les logs avec docker compose logs kafka. Kafka est prêt quand vous voyez 'started (kafka.server.KafkaServer)'. Ce setup simule un cluster mono-noeud, parfait pour le dev ; en prod, scalez avec plusieurs brokers.

Initialiser le projet Node.js

terminal
mkdir kafka-clients && cd kafka-clients
npm init -y
npm install kafkajs
npm install -D typescript @types/node ts-node
npx tsc --init

On crée un dossier projet, initialise npm, installe kafkajs (librairie zero-deps, async-native), TypeScript et ts-node pour exécution directe. Le tsc --init génère un tsconfig de base ; ajustez-le pour ES2022 et strict mode.

Configurer TypeScript

tsconfig.json
{
  "compilerOptions": {
    "target": "ES2022",
    "module": "NodeNext",
    "moduleResolution": "NodeNext",
    "strict": true,
    "esModuleInterop": true,
    "skipLibCheck": true,
    "forceConsistentCasingInFileNames": true,
    "outDir": "./dist",
    "rootDir": "./src"
  },
  "ts-node": {
    "esm": true
  }
}

Ce tsconfig active le mode strict pour catcher les erreurs tôt, cible ES2022 pour top-level await, et configure ts-node pour ESM. Créez un dossier src/ ; cela assure une compilation propre et une exécution fluide des scripts TS.

Mettre à jour package.json pour scripts

Ajoutez ces scripts dans package.json pour faciliter les lancements :

``json
"scripts": {
"producer": "tsx src/producer.ts",
"consumer": "tsx src/consumer.ts",
"dev": "tsx watch src/"
}
`

Installez tsx globalement si besoin : npm i -g tsx. (tsx` remplace ts-node pour ESM natif en 2026.)

Créer un producer batché avec partitions

src/producer.ts
import { Kafka, Producer, ProducerRecord } from 'kafkajs';

async function runProducer() {
  const kafka = new Kafka({
    clientId: 'my-producer',
    brokers: ['localhost:9092'],
  });
  const producer: Producer = kafka.producer({
    allowAutoTopicCreation: true,
    transactionTimeout: 30000,
    idempotent: true,
    maxInFlightRequestsPerConnection: 1,
  });

  await producer.connect();

  // Envoyer 10 messages batchés avec clé de partition
  const messages: ProducerRecord[] = [];
  for (let i = 0; i < 10; i++) {
    messages.push({
      topic: 'test-topic',
      messages: [{ key: `key-${i % 3}`, value: `Message ${i} en ${new Date().toISOString()}` }],
    });
  }

  const result = await producer.sendBatch({ topicMessages: messages });
  console.log('Messages envoyés:', result);

  await producer.disconnect();
}

runProducer().catch(console.error);

Ce producer connecte idempotent (exactly-once semantics), utilise sendBatch pour efficacité, et assigne des clés pour colocaliser les messages par partition (modulo 3). maxInFlightRequestsPerConnection:1 évite les réordonnancements ; surveillez les métriques dans les logs Kafka.

Créer un consumer en group avec offsets

src/consumer.ts
import { Kafka, Consumer, EachMessagePayload, logLevel } from 'kafkajs';

async function runConsumer() {
  const kafka = new Kafka({
    clientId: 'my-consumer',
    brokers: ['localhost:9092'],
    logLevel: logLevel.INFO,
  });
  const consumer: Consumer = kafka.consumer({ groupId: 'test-group' });

  await consumer.connect();
  await consumer.subscribe({ topic: 'test-topic', fromBeginning: true });

  await consumer.run({
    autoCommit: false,
    eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
      const prefix = `${topic}[${partition} | ${message.offset}] / ${message.key?.toString()}`;
      console.log(`${prefix} = ${message.value?.toString()}`);

      // Commit manuel après traitement
      await consumer.commitOffsets([{ topic, partition, offset: (Number(message.offset) + 1).toString() }]);
    },
  });
}

runConsumer().catch(console.error);

Ce consumer utilise un groupId pour load-balancing, fromBeginning: true pour replay, et commit manuel (autoCommit: false) pour at-least-once précis. Le callback eachMessage logue clé/offset/valeur ; gérez les retries ici pour idempotence.

Tester le flux complet

  1. Lancez le consumer : npm run consumer (il attend).
  2. Dans un autre terminal : npm run producer (envoie 10 messages).
  3. Observez les logs consumer : messages reçus avec offsets incrémentés.
Arrêtez avec Ctrl+C ; relancez pour vérifier les offsets persistés (pas de duplicatas).

Producer avec gestion d'erreurs retry

src/producer-retry.ts
import { Kafka, Producer, ProducerRecord, KafkaJSNonRetriableError } from 'kafkajs';

async function runProducerWithRetry() {
  const kafka = new Kafka({ clientId: 'producer-retry', brokers: ['localhost:9092'] });
  const producer = kafka.producer({
    retry: { maxRetryTime: 30000, initialRetryBackoffMs: 300 },
    allowAutoTopicCreation: true,
  });

  await producer.connect();

  try {
    const messages: ProducerRecord[] = [
      {
        topic: 'retry-topic',
        messages: [{ key: 'retry-key', value: 'Message avec retry' }],
      },
    ];
    const result = await producer.sendBatch({ topicMessages: messages });
    console.log('Succès:', result);
  } catch (error) {
    if (error instanceof KafkaJSNonRetriableError) {
      console.error('Erreur fatale:', error);
    } else {
      console.error('Retryable error, géré par config');
    }
  } finally {
    await producer.disconnect();
  }
}

runProducerWithRetry();

Ajoute des retries configurables (backoff exponentiel) pour transient errors comme network blips. Distinguez KafkaJSNonRetriableError pour dead-letter queues. Le finally assure disconnect propre, évitant leaks de connexions.

Bonnes pratiques

  • Idempotence et transactions : Activez idempotent: true et transactional.id pour exactly-once en prod.
  • Monitoring : Intégrez Prometheus via kafkajs instrumentation ; trackez latency et throughput.
  • Sécurité : Utilisez SASL/SSL pour brokers cloud (Confluent, Aiven) ; stockez creds en env vars.
  • Batching dynamique : Ajustez batchSize basé sur message rate pour optimiser I/O.
  • Graceful shutdown : Trappez SIGTERM pour drain queues avant disconnect.

Erreurs courantes à éviter

  • Oublier les offsets : Sans commit manuel, redémarrages causent duplicatas ; utilisez pause/resume pour backpressure.
  • Trop de in-flight requests : >5 par connexion réordonne ; limitez à 1 pour ordering strict.
  • Auto-topic creation en prod : Désactivez-le ; créez topics manuellement avec partitions optimales (6-12).
  • Ignorer rebalances : Écoutez consumer.events.GROUP_REBALANCE pour cleaner state partagé.

Pour aller plus loin

  • Documentation kafkajs : kafkajs.github.io
  • Schema Registry avec Avro : Intégrez @kafkajs/confluent-schema-registry
  • Kafka Streams en JS : Explorez kafkajs-streams
  • Formations Learni : Maîtrisez Kafka en profondeur avec nos formations certifiantes (Confluent Developer).