Introduction
Redis Streams, introduced in version 5.0, revolutionize data stream handling with an append-only structure optimized for asynchronous messaging. Unlike ephemeral lists or Pub/Sub, Streams persist messages with timestamped IDs, support consumer groups for horizontal scaling, and guarantee strict event ordering.
Why use it in 2026? In a world of distributed applications (IoT, logs, microservices), Redis Streams shines for Kafka-like queues without the hassle. Picture an e-commerce setup: orders arrive via producers and get processed by multiple consumers without duplication or loss. This intermediate tutorial equips you with a complete Node.js setup: installation, XADD for producing, XREAD/XREADGROUP for consuming, and ACKs for confirmation. By the end, you'll have a resilient, production-ready system. (128 words)
Prerequisites
- Node.js 20+ installed
- Redis 7+ (Docker recommended)
- Basic knowledge of asynchronous JavaScript (async/await, Promises)
- npm or yarn for dependencies
- An editor like VS Code
Launch Redis with Docker
docker run --name redis-streams -p 6379:6379 -d redis:7-alpine redis-server --appendonly yesThis script launches a persistent Redis container with AOF enabled to prevent data loss. Port 6379 is exposed locally. Verify with docker ps and redis-cli ping to confirm the connection.
Understanding Stream Structure
A Stream is an ordered list of messages, where each entry is a tuple {ID, map. The ID is auto-generated with '*' (ms-usec-seq) or set manually. Key commands: XADD to add, XREAD for sequential reads, XGROUP to create groups for parallel consumers like in Kafka. Think of a Stream as an immutable log journal, where consumers "bookmark" their position via groups.
Install the Client and Basic Producer
npm init -y
npm install redis@5
npm install -D @types/node typescript ts-nodeInstall the official Redis client v5 (native Streams support) and TS tools. Create tsconfig.json with {"compilerOptions": {"target": "ES2022", "module": "NodeNext"}} for modern compatibility.
Producer: Add Messages with XADD
import { createClient } from 'redis';
async function produce() {
const client = createClient();
await client.connect();
for (let i = 1; i <= 5; i++) {
const id = await client.xAdd('orders', '*', {
userId: `user${i}`,
product: `item${i}`,
amount: (10 + i * 5).toString(),
timestamp: Date.now().toString()
});
console.log(`Message ID: ${id}`);
}
await client.quit();
}
produce().catch(console.error);This producer adds 5 messages to an 'orders' stream with structured fields. The '*' ID is auto-generated and unique. Run with npx ts-node producer.ts: check IDs in redis-cli XINFO STREAM orders.
Simple Message Reading
Before groups, test XREAD for blocking reads. BLOCK 0 waits indefinitely, COUNT 10 limits results. Note: without groups, it's single-consumer and not at-least-once guaranteed.
Simple Consumer with XREAD
import { createClient } from 'redis';
async function consume() {
const client = createClient();
await client.connect();
const messages = await client.xRead(
{ key: 'orders', id: '$' }, // '$' = new messages only
{ BLOCK: 0, COUNT: 10 }
);
if (messages) {
for (const stream of messages) {
for (const msg of stream.messages) {
console.log(`ID: ${msg.id}, Data:`, msg.message);
}
}
}
await client.quit();
}
consume().catch(console.error);This consumer reads new messages ('$') in a blocking manner. Run the producer first. Pitfall: without ACK, messages persist; use ID > for messages after a specific ID.
Introduction to Consumer Groups
XGROUP partitions a stream into groups: consumers claim messages via XREADGROUP, enabling scalability and fault tolerance (pending entries). First create the group, then consume.
Create a Consumer Group
import { createClient } from 'redis';
async function setupGroup() {
const client = createClient();
await client.connect();
await client.xGroupCreate('orders', 'workers', '0', false);
console.log('Group "workers" created on stream "orders"');
await client.quit();
}
setupGroup().catch(console.error);'0' starts from the stream's beginning; MKSTREAM true auto-creates if missing (false here). Verify with redis-cli XINFO GROUPS orders. A group includes a PEL (Pending Entries List) for redeliveries.
Consumer with XREADGROUP and ACK
import { createClient } from 'redis';
async function consumeGroup(consumerName: string) {
const client = createClient();
await client.connect();
while (true) {
const messages = await client.xReadGroup(
'workers',
consumerName,
{ key: 'orders', id: '>' }, // '>' = new messages
{ BLOCK: 5000, COUNT: 5 }
);
if (messages) {
for (const stream of messages) {
for (const msg of stream.messages) {
console.log(`Consumer ${consumerName} processing:`, msg.message);
// Business logic here
await client.xAck('orders', 'workers', msg.id);
}
}
}
}
}
const consumerName = `worker-${process.pid}`;
consumeGroup(consumerName).catch(console.error);Infinite loop consumes new messages ('>'). Each consumer has a unique name. XACK confirms: without it, messages go back to PEL. Launch multiples: ts-node group-consumer.ts to see load balancing.
Handle Pending Messages (XPENDING/XCLAIM)
import { createClient } from 'redis';
async function handlePending(consumerName: string) {
const client = createClient();
await client.connect();
// List pendings
const pendings = await client.xPending('orders', 'workers', '-', '+', 10, consumerName);
console.log('Pendings:', pendings);
// Claim and reprocess
for (const pending of pendings || []) {
const claimed = await client.xClaim('orders', 'workers', consumerName, 60000, [pending.id]);
for (const msg of claimed || []) {
console.log('Reprocessed:', msg.message);
await client.xAck('orders', 'workers', msg.id);
}
}
await client.quit();
}
handlePending('worker-123').catch(console.error);On startup, check XPENDING to reclaim timeouts. XCLAIM transfers ownership. Perfect for consumer crashes: un-ACKed messages get redelivered after 60s here.
Best Practices
- Always ACK after successful processing to avoid infinite redeliveries.
- Use unique consumer names (PID + hostname) to prevent collisions.
- Limit BLOCK/COUNT for performance: 5-10 messages per read.
- Monitor PEL length with XINFO: >1000 signals issues.
- Trim the stream with XTRIM (MAXLEN ~100k) for bounded storage.
Common Errors to Avoid
- Forget
await client.connect(): get 'not connected' errors. - Use
id: '0'without groups: reads everything, not scalable. - No fault handling: wrap in try/catch with reconnection (client.on('error'))
- Ignore XCLAIM timebomb: idle > group timeout = auto-reclaim.
Next Steps
Dive deeper with Redis 7.4+ (native JSON Streams). Check the Redis Streams docs. Integrate BullMQ for jobs. Explore our Learni trainings on Redis and Node.js for advanced masterclasses.