Introduction
Apache Kafka is an essential distributed streaming platform for handling real-time data flows, powering Netflix, Uber, and LinkedIn to process billions of events daily. Implementing Kafka clients (producers and consumers) in Node.js lets you integrate this power into modern backend apps. This intermediate tutorial focuses on kafkajs, the most performant and maintained library for Node.js in 2026.
Why it matters: Kafka clients handle resilience, partitioning, and offsets for reliable data pipelines. We'll spin up local Kafka with Docker, build a batched producer with partition keys, a grouped consumer with rebalancing, and cover advanced error handling. By the end, you'll have a production-ready, horizontally scalable setup. Estimated time: 20 minutes for a working prototype. (128 words)
Prerequisites
- Node.js 20+ installed
- Docker and Docker Compose for local Kafka
- Basic Kafka knowledge (topics, partitions, brokers)
- Code editor with TypeScript support (VS Code recommended)
- Terminal for running commands
Start Kafka Locally
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.7.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.7.0
hostname: kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'This docker-compose.yml launches Zookeeper and a standalone Kafka broker on localhost:9092. Environment variables configure listeners for host access and inter-container communication. Run docker compose up -d next; it auto-creates topics on first send.
Launch Kafka and Verify
Save the file and run docker compose up -d in your terminal. Check logs with docker compose logs kafka. Kafka is ready when you see 'started (kafka.server.KafkaServer)'. This setup simulates a single-node cluster, ideal for development; scale with multiple brokers in production.
Initialize the Node.js Project
mkdir kafka-clients && cd kafka-clients
npm init -y
npm install kafkajs
npm install -D typescript @types/node ts-node
npx tsc --initCreate a project folder, init npm, install kafkajs (zero-deps, async-native library), TypeScript, and ts-node for direct execution. tsc --init generates a base tsconfig; tweak it for ES2022 and strict mode.
Configure TypeScript
{
"compilerOptions": {
"target": "ES2022",
"module": "NodeNext",
"moduleResolution": "NodeNext",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"outDir": "./dist",
"rootDir": "./src"
},
"ts-node": {
"esm": true
}
}This tsconfig enables strict mode to catch errors early, targets ES2022 for top-level await, and sets up ts-node for ESM. Create a src/ folder; it ensures clean compilation and smooth TS script execution.
Update package.json for Scripts
Add these scripts to package.json for easy launches:
"scripts": {
"producer": "tsx src/producer.ts",
"consumer": "tsx src/consumer.ts",
"dev": "tsx watch src/"
}
Install tsx globally if needed: npm i -g tsx. (tsx replaces ts-node for native ESM support in 2026.)
Create a Batched Producer with Partitions
import { Kafka, Producer, ProducerRecord } from 'kafkajs';
async function runProducer() {
const kafka = new Kafka({
clientId: 'my-producer',
brokers: ['localhost:9092'],
});
const producer: Producer = kafka.producer({
allowAutoTopicCreation: true,
transactionTimeout: 30000,
idempotent: true,
maxInFlightRequestsPerConnection: 1,
});
await producer.connect();
// Envoyer 10 messages batchés avec clé de partition
const messages: ProducerRecord[] = [];
for (let i = 0; i < 10; i++) {
messages.push({
topic: 'test-topic',
messages: [{ key: `key-${i % 3}`, value: `Message ${i} en ${new Date().toISOString()}` }],
});
}
const result = await producer.sendBatch({ topicMessages: messages });
console.log('Messages envoyés:', result);
await producer.disconnect();
}
runProducer().catch(console.error);This producer connects with idempotence (exactly-once semantics), uses sendBatch for efficiency, and assigns keys to colocate messages by partition (modulo 3). maxInFlightRequestsPerConnection:1 prevents reordering; monitor metrics in Kafka logs.
Create a Grouped Consumer with Offsets
import { Kafka, Consumer, EachMessagePayload, logLevel } from 'kafkajs';
async function runConsumer() {
const kafka = new Kafka({
clientId: 'my-consumer',
brokers: ['localhost:9092'],
logLevel: logLevel.INFO,
});
const consumer: Consumer = kafka.consumer({ groupId: 'test-group' });
await consumer.connect();
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true });
await consumer.run({
autoCommit: false,
eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
const prefix = `${topic}[${partition} | ${message.offset}] / ${message.key?.toString()}`;
console.log(`${prefix} = ${message.value?.toString()}`);
// Commit manuel après traitement
await consumer.commitOffsets([{ topic, partition, offset: (Number(message.offset) + 1).toString() }]);
},
});
}
runConsumer().catch(console.error);This consumer uses a groupId for load balancing, fromBeginning: true for replay, and manual commits (autoCommit: false) for precise at-least-once delivery. The eachMessage callback logs key/offset/value; handle retries here for idempotence.
Test the Full Pipeline
- Start the consumer:
npm run consumer(it waits). - In another terminal:
npm run producer(sends 10 messages). - Watch consumer logs: messages arrive with incrementing offsets.
Producer with Retry Error Handling
import { Kafka, Producer, ProducerRecord, KafkaJSNonRetriableError } from 'kafkajs';
async function runProducerWithRetry() {
const kafka = new Kafka({ clientId: 'producer-retry', brokers: ['localhost:9092'] });
const producer = kafka.producer({
retry: { maxRetryTime: 30000, initialRetryBackoffMs: 300 },
allowAutoTopicCreation: true,
});
await producer.connect();
try {
const messages: ProducerRecord[] = [
{
topic: 'retry-topic',
messages: [{ key: 'retry-key', value: 'Message avec retry' }],
},
];
const result = await producer.sendBatch({ topicMessages: messages });
console.log('Succès:', result);
} catch (error) {
if (error instanceof KafkaJSNonRetriableError) {
console.error('Erreur fatale:', error);
} else {
console.error('Retryable error, géré par config');
}
} finally {
await producer.disconnect();
}
}
runProducerWithRetry();Adds configurable retries (exponential backoff) for transient errors like network issues. Distinguish KafkaJSNonRetriableError for dead-letter queues. The finally block ensures clean disconnect, preventing connection leaks.
Best Practices
- Idempotence and transactions: Enable
idempotent: trueandtransactional.idfor exactly-once in production. - Monitoring: Integrate Prometheus via kafkajs instrumentation; track latency and throughput.
- Security: Use SASL/SSL for cloud brokers (Confluent, Aiven); store creds in env vars.
- Dynamic batching: Tune
batchSizebased on message rate to optimize I/O. - Graceful shutdown: Trap SIGTERM to drain queues before disconnect.
Common Errors to Avoid
- Forgetting offsets: Without manual commits, restarts cause duplicates; use
pause/resumefor backpressure. - Too many in-flight requests: >5 per connection reorders; limit to 1 for strict ordering.
- Auto-topic creation in prod: Disable it; create topics manually with optimal partitions (6-12).
- Ignoring rebalances: Listen for
consumer.events.GROUP_REBALANCEto clean shared state.
Next Steps
- kafkajs docs: kafkajs.github.io
- Schema Registry with Avro: Integrate
@kafkajs/confluent-schema-registry - Kafka Streams in JS: Explore
kafkajs-streams - Learni Training: Master Kafka deeply with our certified courses (Confluent Developer).