Skip to content
Learni
View all tutorials
Bases de données NoSQL

How to Implement Redis Streams in Node.js in 2026

Lire en français

Introduction

Redis Streams, introduced in version 5.0, revolutionize data stream handling with an append-only structure optimized for asynchronous messaging. Unlike ephemeral lists or Pub/Sub, Streams persist messages with timestamped IDs, support consumer groups for horizontal scaling, and guarantee strict event ordering.

Why use it in 2026? In a world of distributed applications (IoT, logs, microservices), Redis Streams shines for Kafka-like queues without the hassle. Picture an e-commerce setup: orders arrive via producers and get processed by multiple consumers without duplication or loss. This intermediate tutorial equips you with a complete Node.js setup: installation, XADD for producing, XREAD/XREADGROUP for consuming, and ACKs for confirmation. By the end, you'll have a resilient, production-ready system. (128 words)

Prerequisites

  • Node.js 20+ installed
  • Redis 7+ (Docker recommended)
  • Basic knowledge of asynchronous JavaScript (async/await, Promises)
  • npm or yarn for dependencies
  • An editor like VS Code

Launch Redis with Docker

docker-run-redis.sh
docker run --name redis-streams -p 6379:6379 -d redis:7-alpine redis-server --appendonly yes

This script launches a persistent Redis container with AOF enabled to prevent data loss. Port 6379 is exposed locally. Verify with docker ps and redis-cli ping to confirm the connection.

Understanding Stream Structure

A Stream is an ordered list of messages, where each entry is a tuple {ID, map}. The ID is auto-generated with '*' (ms-usec-seq) or set manually. Key commands: XADD to add, XREAD for sequential reads, XGROUP to create groups for parallel consumers like in Kafka. Think of a Stream as an immutable log journal, where consumers "bookmark" their position via groups.

Install the Client and Basic Producer

package.json-setup.sh
npm init -y
npm install redis@5
npm install -D @types/node typescript ts-node

Install the official Redis client v5 (native Streams support) and TS tools. Create tsconfig.json with {"compilerOptions": {"target": "ES2022", "module": "NodeNext"}} for modern compatibility.

Producer: Add Messages with XADD

producer.ts
import { createClient } from 'redis';

async function produce() {
  const client = createClient();
  await client.connect();

  for (let i = 1; i <= 5; i++) {
    const id = await client.xAdd('orders', '*', {
      userId: `user${i}`,
      product: `item${i}`,
      amount: (10 + i * 5).toString(),
      timestamp: Date.now().toString()
    });
    console.log(`Message ID: ${id}`);
  }

  await client.quit();
}

produce().catch(console.error);

This producer adds 5 messages to an 'orders' stream with structured fields. The '*' ID is auto-generated and unique. Run with npx ts-node producer.ts: check IDs in redis-cli XINFO STREAM orders.

Simple Message Reading

Before groups, test XREAD for blocking reads. BLOCK 0 waits indefinitely, COUNT 10 limits results. Note: without groups, it's single-consumer and not at-least-once guaranteed.

Simple Consumer with XREAD

simple-consumer.ts
import { createClient } from 'redis';

async function consume() {
  const client = createClient();
  await client.connect();

  const messages = await client.xRead(
    { key: 'orders', id: '$' }, // '$' = new messages only
    { BLOCK: 0, COUNT: 10 }
  );

  if (messages) {
    for (const stream of messages) {
      for (const msg of stream.messages) {
        console.log(`ID: ${msg.id}, Data:`, msg.message);
      }
    }
  }

  await client.quit();
}

consume().catch(console.error);

This consumer reads new messages ('$') in a blocking manner. Run the producer first. Pitfall: without ACK, messages persist; use ID > for messages after a specific ID.

Introduction to Consumer Groups

XGROUP partitions a stream into groups: consumers claim messages via XREADGROUP, enabling scalability and fault tolerance (pending entries). First create the group, then consume.

Create a Consumer Group

setup-group.ts
import { createClient } from 'redis';

async function setupGroup() {
  const client = createClient();
  await client.connect();

  await client.xGroupCreate('orders', 'workers', '0', false);
  console.log('Group "workers" created on stream "orders"');

  await client.quit();
}

setupGroup().catch(console.error);

'0' starts from the stream's beginning; MKSTREAM true auto-creates if missing (false here). Verify with redis-cli XINFO GROUPS orders. A group includes a PEL (Pending Entries List) for redeliveries.

Consumer with XREADGROUP and ACK

group-consumer.ts
import { createClient } from 'redis';

async function consumeGroup(consumerName: string) {
  const client = createClient();
  await client.connect();

  while (true) {
    const messages = await client.xReadGroup(
      'workers',
      consumerName,
      { key: 'orders', id: '>' }, // '>' = new messages
      { BLOCK: 5000, COUNT: 5 }
    );

    if (messages) {
      for (const stream of messages) {
        for (const msg of stream.messages) {
          console.log(`Consumer ${consumerName} processing:`, msg.message);
          // Business logic here
          await client.xAck('orders', 'workers', msg.id);
        }
      }
    }
  }
}

const consumerName = `worker-${process.pid}`;
consumeGroup(consumerName).catch(console.error);

Infinite loop consumes new messages ('>'). Each consumer has a unique name. XACK confirms: without it, messages go back to PEL. Launch multiples: ts-node group-consumer.ts to see load balancing.

Handle Pending Messages (XPENDING/XCLAIM)

pending-handler.ts
import { createClient } from 'redis';

async function handlePending(consumerName: string) {
  const client = createClient();
  await client.connect();

  // List pendings
  const pendings = await client.xPending('orders', 'workers', '-', '+', 10, consumerName);
  console.log('Pendings:', pendings);

  // Claim and reprocess
  for (const pending of pendings || []) {
    const claimed = await client.xClaim('orders', 'workers', consumerName, 60000, [pending.id]);
    for (const msg of claimed || []) {
      console.log('Reprocessed:', msg.message);
      await client.xAck('orders', 'workers', msg.id);
    }
  }

  await client.quit();
}

handlePending('worker-123').catch(console.error);

On startup, check XPENDING to reclaim timeouts. XCLAIM transfers ownership. Perfect for consumer crashes: un-ACKed messages get redelivered after 60s here.

Best Practices

  • Always ACK after successful processing to avoid infinite redeliveries.
  • Use unique consumer names (PID + hostname) to prevent collisions.
  • Limit BLOCK/COUNT for performance: 5-10 messages per read.
  • Monitor PEL length with XINFO: >1000 signals issues.
  • Trim the stream with XTRIM (MAXLEN ~100k) for bounded storage.

Common Errors to Avoid

  • Forget await client.connect(): get 'not connected' errors.
  • Use id: '0' without groups: reads everything, not scalable.
  • No fault handling: wrap in try/catch with reconnection (client.on('error'))
  • Ignore XCLAIM timebomb: idle > group timeout = auto-reclaim.

Next Steps

Dive deeper with Redis 7.4+ (native JSON Streams). Check the Redis Streams docs. Integrate BullMQ for jobs. Explore our Learni trainings on Redis and Node.js for advanced masterclasses.