Skip to content
Learni
Voir tous les tutoriels
Bases de données NoSQL

Comment implémenter Redis Streams en Node.js 2026

Read in English

Introduction

Redis Streams, introduit en version 5.0, révolutionne la gestion des flux de données en offrant une structure de données append-only optimisée pour le messaging asynchrone. Contrairement aux listes ou Pub/Sub éphémères, les Streams persistent les messages avec des IDs timestampés, supportent les consumer groups pour le scaling horizontal et garantissent l'ordre strict des événements.

Pourquoi l'utiliser en 2026 ? Dans un monde d'applications distribuées (IoT, logs, microservices), Redis Streams excelle pour les cas comme les files d'attente Kafka-like sans la complexité. Imaginez un e-commerce : les commandes arrivent via producer, traitées par plusieurs consumers sans duplication ni perte. Ce tutoriel intermediate vous équipe d'un setup Node.js complet : installation, XADD pour producer, XREAD/XREADGROUP pour consommer, et ACK pour confirmer. À la fin, vous déployez un système résilient, prêt pour la prod. (128 mots)

Prérequis

  • Node.js 20+ installé
  • Redis 7+ (via Docker recommandé)
  • Connaissances de base en JavaScript asynchrone (async/await, Promises)
  • npm ou yarn pour les dépendances
  • Un éditeur comme VS Code

Lancer Redis via Docker

docker-run-redis.sh
docker run --name redis-streams -p 6379:6379 -d redis:7-alpine redis-server --appendonly yes

Ce script lance un conteneur Redis persistant avec AOF activé pour éviter les pertes de données. Le port 6379 est exposé localement. Vérifiez avec docker ps et redis-cli ping pour confirmer la connexion.

Comprendre la structure des Streams

Un Stream est une liste ordonnée de messages, chaque entrée étant un tuple {ID, map}. L'ID est auto-généré via '*' (ms-usec-seq) ou manuel. Les commandes clés : XADD ajoute, XREAD lit séquentiellement, XGROUP crée des groupes pour consumers parallèles comme dans Kafka. Analogie : un Stream est un journal immuable de logs, où les consumers "bookmarkent" leur position via groupes.

Installer le client et producer basique

package.json-setup.sh
npm init -y
npm install redis@5
npm install -D @types/node typescript ts-node

On installe le client Redis officiel v5 (support natif Streams) et outils TS. Créez tsconfig.json avec {"compilerOptions": {"target": "ES2022", "module": "NodeNext"}} pour compatibilité moderne.

Producer : Ajouter des messages avec XADD

producer.ts
import { createClient } from 'redis';

async function produce() {
  const client = createClient();
  await client.connect();

  for (let i = 1; i <= 5; i++) {
    const id = await client.xAdd('orders', '*', {
      userId: `user${i}`,
      product: `item${i}`,
      amount: (10 + i * 5).toString(),
      timestamp: Date.now().toString()
    });
    console.log(`Message ID: ${id}`);
  }

  await client.quit();
}

produce().catch(console.error);

Ce producer ajoute 5 messages à un stream 'orders' avec champs structurés. L'ID '*' est auto-généré unique. Exécutez avec npx ts-node producer.ts : observez les IDs dans redis-cli XINFO STREAM orders.

Lecture simple des messages

Avant les groupes, testons XREAD pour une lecture blocante. BLOCK 0 attend indéfiniment, COUNT 10 limite les résultats. Attention : sans groupes, c'est single-consumer et non-at-least-once.

Consumer simple avec XREAD

simple-consumer.ts
import { createClient } from 'redis';

async function consume() {
  const client = createClient();
  await client.connect();

  const messages = await client.xRead(
    { key: 'orders', id: '$' }, // '$' = nouveaux messages seulement
    { BLOCK: 0, COUNT: 10 }
  );

  if (messages) {
    for (const stream of messages) {
      for (const msg of stream.messages) {
        console.log(`ID: ${msg.id}, Data:`, msg.message);
      }
    }
  }

  await client.quit();
}

consume().catch(console.error);

Ce consumer lit les nouveaux messages ('$') de manière blocante. Relancez le producer d'abord. Piège : sans ACK, les messages persistent ; ID > pour après un ID spécifique.

Introduction aux Consumer Groups

XGROUP partitionne un stream en groupes : chaque consumer claim des messages via XREADGROUP, permettant scalabilité et tolérance aux pannes (pending entries). Créez d'abord le groupe, puis consommez.

Créer un consumer group

setup-group.ts
import { createClient } from 'redis';

async function setupGroup() {
  const client = createClient();
  await client.connect();

  await client.xGroupCreate('orders', 'workers', '0', false);
  console.log('Groupe "workers" créé sur stream "orders"');

  await client.quit();
}

setupGroup().catch(console.error);

'0' = depuis le début du stream ; MKSTREAM true auto-crée si absent (ici false). Vérifiez : redis-cli XINFO GROUPS orders. Un groupe = PEL (Pending Entries List) pour redélivraisons.

Consumer avec XREADGROUP et ACK

group-consumer.ts
import { createClient } from 'redis';

async function consumeGroup(consumerName: string) {
  const client = createClient();
  await client.connect();

  while (true) {
    const messages = await client.xReadGroup(
      'workers',
      consumerName,
      { key: 'orders', id: '>' }, // '>' = nouveaux messages
      { BLOCK: 5000, COUNT: 5 }
    );

    if (messages) {
      for (const stream of messages) {
        for (const msg of stream.messages) {
          console.log(`Consumer ${consumerName} traite:`, msg.message);
          // Traitement métier ici
          await client.xAck('orders', 'workers', msg.id);
        }
      }
    }
  }
}

const consumerName = `worker-${process.pid}`;
consumeGroup(consumerName).catch(console.error);

Boucle infinie pour consumer '>' (nouveaux). Chaque consumer a un nom unique. XACK confirme : sans, message repassé en PEL. Lancez plusieurs : ts-node group-consumer.ts pour voir load balancing.

Gérer les pending messages (XPENDING/XCLAIM)

pending-handler.ts
import { createClient } from 'redis';

async function handlePending(consumerName: string) {
  const client = createClient();
  await client.connect();

  // Lister pendings
  const pendings = await client.xPending('orders', 'workers', '-', '+', 10, consumerName);
  console.log('Pendings:', pendings);

  // Claim et re-traiter
  for (const pending of pendings || []) {
    const claimed = await client.xClaim('orders', 'workers', consumerName, 60000, [pending.id]);
    for (const msg of claimed || []) {
      console.log('Re-traité:', msg.message);
      await client.xAck('orders', 'workers', msg.id);
    }
  }

  await client.quit();
}

handlePending('worker-123').catch(console.error);

Au démarrage, checkez XPENDING pour reclaim timeouts. XCLAIM transfère ownership. Idéal pour consumer crash : messages non ACK repassés après 60s ici.

Bonnes pratiques

  • Toujours ACK après traitement réussi pour éviter redélivraisons infinies.
  • Utilisez consumer names uniques (PID + hostname) pour éviter collisions.
  • Limitez BLOCK/COUNT pour perf : 5-10 messages par read.
  • Monitorez PEL length avec XINFO : >1000 signale problèmes.
  • Trimmez le stream via XTRIM (MAXLEN ~100k) pour bounded storage.

Erreurs courantes à éviter

  • Oublier await client.connect() : erreurs 'not connected'.
  • Utiliser id: '0' sans groupe : lit tout, non scalable.
  • Pas de gestion pannes : wrappez en try/catch avec reconnexion (client.on('error'))
  • Ignorer timebomb sur XCLAIM : idle > groupe timeout = auto-reclaim.

Pour aller plus loin

Approfondissez avec Redis 7.4+ (Streams JSON natif). Étudiez Redis Streams docs. Intégrez BullMQ pour jobs. Découvrez nos formations Learni sur Redis et Node.js pour masterclasses avancées.

Comment implémenter Redis Streams Node.js 2026 | Learni