Introduction
Apache Kafka has become the standard for event-driven architectures. Kafka clients allow sending and consuming messages reliably at scale. This intermediate tutorial guides you through implementing producers and consumers with Node.js and the kafkajs library. You will learn how to manage connections, errors, and advanced configurations for production environments. Each step includes complete and functional code.
Prerequisites
- Node.js 20+
- Basic knowledge of TypeScript
- An accessible Kafka cluster (local or cloud)
- npm or pnpm installed
Installing Dependencies
npm install kafkajs
npm install --save-dev typescript @types/nodeWe are installing kafkajs, the most widely used Kafka library for Node.js, along with TypeScript for strict typing.
Kafka Client Configuration
import { Kafka } from 'kafkajs';
export const kafka = new Kafka({
clientId: 'learni-app',
brokers: ['localhost:9092'],
retry: {
initialRetryTime: 100,
retries: 8
}
});This file centralizes the client configuration. The retry settings help manage temporary broker disconnections.
Creating the Producer
import { kafka } from './client';
const producer = kafka.producer();
export async function sendMessage(topic: string, message: object) {
await producer.connect();
await producer.send({
topic,
messages: [{ value: JSON.stringify(message) }],
});
await producer.disconnect();
}The producer connects, sends a message serialized in JSON, then disconnects. This approach is simple but can be optimized with a persistent connection in production.
Creating the Consumer
import { kafka } from './client';
const consumer = kafka.consumer({ groupId: 'learni-group' });
export async function startConsumer(topic: string) {
await consumer.connect();
await consumer.subscribe({ topic, fromBeginning: true });
await consumer.run({
eachMessage: async ({ message }) => {
console.log('Message received:', message.value?.toString());
},
});
}The consumer subscribes to a topic and processes each message via a callback. The groupId enables load-balancing across multiple instances.
Error Handling and Reconnection
consumer.on('consumer.crash', async (err) => {
console.error('Consumer crash:', err);
await consumer.disconnect();
setTimeout(() => startConsumer('orders'), 5000);
});This listener handles crashes and attempts an automatic reconnection after 5 seconds, preventing permanent consumer shutdowns.
Best Practices
- Always use a unique groupId per consumer type
- Enable message compression to reduce bandwidth
- Implement retry mechanisms and dead-letter queues
- Monitor offsets with tools like Kafka UI
- Properly close connections with disconnect() when shutting down the application
Common Errors to Avoid
- Forgetting to call connect() before send() or subscribe()
- Using the same groupId for consumers with different processing logic
- Not handling JSON deserialization errors
- Leaving connections open without timeouts during broker restarts
Going Further
Discover our advanced training on distributed systems and Kafka: https://learni-group.com/formations