Introduction
Redis Streams, introduit en version 5.0, révolutionne la gestion des flux de données en offrant une structure de données append-only optimisée pour le messaging asynchrone. Contrairement aux listes ou Pub/Sub éphémères, les Streams persistent les messages avec des IDs timestampés, supportent les consumer groups pour le scaling horizontal et garantissent l'ordre strict des événements.
Pourquoi l'utiliser en 2026 ? Dans un monde d'applications distribuées (IoT, logs, microservices), Redis Streams excelle pour les cas comme les files d'attente Kafka-like sans la complexité. Imaginez un e-commerce : les commandes arrivent via producer, traitées par plusieurs consumers sans duplication ni perte. Ce tutoriel intermediate vous équipe d'un setup Node.js complet : installation, XADD pour producer, XREAD/XREADGROUP pour consommer, et ACK pour confirmer. À la fin, vous déployez un système résilient, prêt pour la prod. (128 mots)
Prérequis
- Node.js 20+ installé
- Redis 7+ (via Docker recommandé)
- Connaissances de base en JavaScript asynchrone (async/await, Promises)
- npm ou yarn pour les dépendances
- Un éditeur comme VS Code
Lancer Redis via Docker
docker run --name redis-streams -p 6379:6379 -d redis:7-alpine redis-server --appendonly yesCe script lance un conteneur Redis persistant avec AOF activé pour éviter les pertes de données. Le port 6379 est exposé localement. Vérifiez avec docker ps et redis-cli ping pour confirmer la connexion.
Comprendre la structure des Streams
Un Stream est une liste ordonnée de messages, chaque entrée étant un tuple {ID, map. L'ID est auto-généré via '*' (ms-usec-seq) ou manuel. Les commandes clés : XADD ajoute, XREAD lit séquentiellement, XGROUP crée des groupes pour consumers parallèles comme dans Kafka. Analogie : un Stream est un journal immuable de logs, où les consumers "bookmarkent" leur position via groupes.
Installer le client et producer basique
npm init -y
npm install redis@5
npm install -D @types/node typescript ts-nodeOn installe le client Redis officiel v5 (support natif Streams) et outils TS. Créez tsconfig.json avec {"compilerOptions": {"target": "ES2022", "module": "NodeNext"}} pour compatibilité moderne.
Producer : Ajouter des messages avec XADD
import { createClient } from 'redis';
async function produce() {
const client = createClient();
await client.connect();
for (let i = 1; i <= 5; i++) {
const id = await client.xAdd('orders', '*', {
userId: `user${i}`,
product: `item${i}`,
amount: (10 + i * 5).toString(),
timestamp: Date.now().toString()
});
console.log(`Message ID: ${id}`);
}
await client.quit();
}
produce().catch(console.error);Ce producer ajoute 5 messages à un stream 'orders' avec champs structurés. L'ID '*' est auto-généré unique. Exécutez avec npx ts-node producer.ts : observez les IDs dans redis-cli XINFO STREAM orders.
Lecture simple des messages
Avant les groupes, testons XREAD pour une lecture blocante. BLOCK 0 attend indéfiniment, COUNT 10 limite les résultats. Attention : sans groupes, c'est single-consumer et non-at-least-once.
Consumer simple avec XREAD
import { createClient } from 'redis';
async function consume() {
const client = createClient();
await client.connect();
const messages = await client.xRead(
{ key: 'orders', id: '$' }, // '$' = nouveaux messages seulement
{ BLOCK: 0, COUNT: 10 }
);
if (messages) {
for (const stream of messages) {
for (const msg of stream.messages) {
console.log(`ID: ${msg.id}, Data:`, msg.message);
}
}
}
await client.quit();
}
consume().catch(console.error);Ce consumer lit les nouveaux messages ('$') de manière blocante. Relancez le producer d'abord. Piège : sans ACK, les messages persistent ; ID > pour après un ID spécifique.
Introduction aux Consumer Groups
XGROUP partitionne un stream en groupes : chaque consumer claim des messages via XREADGROUP, permettant scalabilité et tolérance aux pannes (pending entries). Créez d'abord le groupe, puis consommez.
Créer un consumer group
import { createClient } from 'redis';
async function setupGroup() {
const client = createClient();
await client.connect();
await client.xGroupCreate('orders', 'workers', '0', false);
console.log('Groupe "workers" créé sur stream "orders"');
await client.quit();
}
setupGroup().catch(console.error);'0' = depuis le début du stream ; MKSTREAM true auto-crée si absent (ici false). Vérifiez : redis-cli XINFO GROUPS orders. Un groupe = PEL (Pending Entries List) pour redélivraisons.
Consumer avec XREADGROUP et ACK
import { createClient } from 'redis';
async function consumeGroup(consumerName: string) {
const client = createClient();
await client.connect();
while (true) {
const messages = await client.xReadGroup(
'workers',
consumerName,
{ key: 'orders', id: '>' }, // '>' = nouveaux messages
{ BLOCK: 5000, COUNT: 5 }
);
if (messages) {
for (const stream of messages) {
for (const msg of stream.messages) {
console.log(`Consumer ${consumerName} traite:`, msg.message);
// Traitement métier ici
await client.xAck('orders', 'workers', msg.id);
}
}
}
}
}
const consumerName = `worker-${process.pid}`;
consumeGroup(consumerName).catch(console.error);Boucle infinie pour consumer '>' (nouveaux). Chaque consumer a un nom unique. XACK confirme : sans, message repassé en PEL. Lancez plusieurs : ts-node group-consumer.ts pour voir load balancing.
Gérer les pending messages (XPENDING/XCLAIM)
import { createClient } from 'redis';
async function handlePending(consumerName: string) {
const client = createClient();
await client.connect();
// Lister pendings
const pendings = await client.xPending('orders', 'workers', '-', '+', 10, consumerName);
console.log('Pendings:', pendings);
// Claim et re-traiter
for (const pending of pendings || []) {
const claimed = await client.xClaim('orders', 'workers', consumerName, 60000, [pending.id]);
for (const msg of claimed || []) {
console.log('Re-traité:', msg.message);
await client.xAck('orders', 'workers', msg.id);
}
}
await client.quit();
}
handlePending('worker-123').catch(console.error);Au démarrage, checkez XPENDING pour reclaim timeouts. XCLAIM transfère ownership. Idéal pour consumer crash : messages non ACK repassés après 60s ici.
Bonnes pratiques
- Toujours ACK après traitement réussi pour éviter redélivraisons infinies.
- Utilisez consumer names uniques (PID + hostname) pour éviter collisions.
- Limitez BLOCK/COUNT pour perf : 5-10 messages par read.
- Monitorez PEL length avec XINFO : >1000 signale problèmes.
- Trimmez le stream via XTRIM (MAXLEN ~100k) pour bounded storage.
Erreurs courantes à éviter
- Oublier
await client.connect(): erreurs 'not connected'. - Utiliser
id: '0'sans groupe : lit tout, non scalable. - Pas de gestion pannes : wrappez en try/catch avec reconnexion (client.on('error'))
- Ignorer timebomb sur XCLAIM : idle > groupe timeout = auto-reclaim.
Pour aller plus loin
Approfondissez avec Redis 7.4+ (Streams JSON natif). Étudiez Redis Streams docs. Intégrez BullMQ pour jobs. Découvrez nos formations Learni sur Redis et Node.js pour masterclasses avancées.