Skip to content
Learni
View all tutorials
Kafka

How to Implement Kafka Clients in Node.js in 2026

Lire en français

Introduction

Apache Kafka is an essential distributed streaming platform for handling real-time data flows, powering Netflix, Uber, and LinkedIn to process billions of events daily. Implementing Kafka clients (producers and consumers) in Node.js lets you integrate this power into modern backend apps. This intermediate tutorial focuses on kafkajs, the most performant and maintained library for Node.js in 2026.

Why it matters: Kafka clients handle resilience, partitioning, and offsets for reliable data pipelines. We'll spin up local Kafka with Docker, build a batched producer with partition keys, a grouped consumer with rebalancing, and cover advanced error handling. By the end, you'll have a production-ready, horizontally scalable setup. Estimated time: 20 minutes for a working prototype. (128 words)

Prerequisites

  • Node.js 20+ installed
  • Docker and Docker Compose for local Kafka
  • Basic Kafka knowledge (topics, partitions, brokers)
  • Code editor with TypeScript support (VS Code recommended)
  • Terminal for running commands

Start Kafka Locally

docker-compose.yml
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.7.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.7.0
    hostname: kafka
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'

This docker-compose.yml launches Zookeeper and a standalone Kafka broker on localhost:9092. Environment variables configure listeners for host access and inter-container communication. Run docker compose up -d next; it auto-creates topics on first send.

Launch Kafka and Verify

Save the file and run docker compose up -d in your terminal. Check logs with docker compose logs kafka. Kafka is ready when you see 'started (kafka.server.KafkaServer)'. This setup simulates a single-node cluster, ideal for development; scale with multiple brokers in production.

Initialize the Node.js Project

terminal
mkdir kafka-clients && cd kafka-clients
npm init -y
npm install kafkajs
npm install -D typescript @types/node ts-node
npx tsc --init

Create a project folder, init npm, install kafkajs (zero-deps, async-native library), TypeScript, and ts-node for direct execution. tsc --init generates a base tsconfig; tweak it for ES2022 and strict mode.

Configure TypeScript

tsconfig.json
{
  "compilerOptions": {
    "target": "ES2022",
    "module": "NodeNext",
    "moduleResolution": "NodeNext",
    "strict": true,
    "esModuleInterop": true,
    "skipLibCheck": true,
    "forceConsistentCasingInFileNames": true,
    "outDir": "./dist",
    "rootDir": "./src"
  },
  "ts-node": {
    "esm": true
  }
}

This tsconfig enables strict mode to catch errors early, targets ES2022 for top-level await, and sets up ts-node for ESM. Create a src/ folder; it ensures clean compilation and smooth TS script execution.

Update package.json for Scripts

Add these scripts to package.json for easy launches:

"scripts": {
"producer": "tsx src/producer.ts",
"consumer": "tsx src/consumer.ts",
"dev": "tsx watch src/"
}

Install tsx globally if needed: npm i -g tsx. (tsx replaces ts-node for native ESM support in 2026.)

Create a Batched Producer with Partitions

src/producer.ts
import { Kafka, Producer, ProducerRecord } from 'kafkajs';

async function runProducer() {
  const kafka = new Kafka({
    clientId: 'my-producer',
    brokers: ['localhost:9092'],
  });
  const producer: Producer = kafka.producer({
    allowAutoTopicCreation: true,
    transactionTimeout: 30000,
    idempotent: true,
    maxInFlightRequestsPerConnection: 1,
  });

  await producer.connect();

  // Envoyer 10 messages batchés avec clé de partition
  const messages: ProducerRecord[] = [];
  for (let i = 0; i < 10; i++) {
    messages.push({
      topic: 'test-topic',
      messages: [{ key: `key-${i % 3}`, value: `Message ${i} en ${new Date().toISOString()}` }],
    });
  }

  const result = await producer.sendBatch({ topicMessages: messages });
  console.log('Messages envoyés:', result);

  await producer.disconnect();
}

runProducer().catch(console.error);

This producer connects with idempotence (exactly-once semantics), uses sendBatch for efficiency, and assigns keys to colocate messages by partition (modulo 3). maxInFlightRequestsPerConnection:1 prevents reordering; monitor metrics in Kafka logs.

Create a Grouped Consumer with Offsets

src/consumer.ts
import { Kafka, Consumer, EachMessagePayload, logLevel } from 'kafkajs';

async function runConsumer() {
  const kafka = new Kafka({
    clientId: 'my-consumer',
    brokers: ['localhost:9092'],
    logLevel: logLevel.INFO,
  });
  const consumer: Consumer = kafka.consumer({ groupId: 'test-group' });

  await consumer.connect();
  await consumer.subscribe({ topic: 'test-topic', fromBeginning: true });

  await consumer.run({
    autoCommit: false,
    eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
      const prefix = `${topic}[${partition} | ${message.offset}] / ${message.key?.toString()}`;
      console.log(`${prefix} = ${message.value?.toString()}`);

      // Commit manuel après traitement
      await consumer.commitOffsets([{ topic, partition, offset: (Number(message.offset) + 1).toString() }]);
    },
  });
}

runConsumer().catch(console.error);

This consumer uses a groupId for load balancing, fromBeginning: true for replay, and manual commits (autoCommit: false) for precise at-least-once delivery. The eachMessage callback logs key/offset/value; handle retries here for idempotence.

Test the Full Pipeline

  1. Start the consumer: npm run consumer (it waits).
  2. In another terminal: npm run producer (sends 10 messages).
  3. Watch consumer logs: messages arrive with incrementing offsets.
Stop with Ctrl+C; restart to verify persisted offsets (no duplicates).

Producer with Retry Error Handling

src/producer-retry.ts
import { Kafka, Producer, ProducerRecord, KafkaJSNonRetriableError } from 'kafkajs';

async function runProducerWithRetry() {
  const kafka = new Kafka({ clientId: 'producer-retry', brokers: ['localhost:9092'] });
  const producer = kafka.producer({
    retry: { maxRetryTime: 30000, initialRetryBackoffMs: 300 },
    allowAutoTopicCreation: true,
  });

  await producer.connect();

  try {
    const messages: ProducerRecord[] = [
      {
        topic: 'retry-topic',
        messages: [{ key: 'retry-key', value: 'Message avec retry' }],
      },
    ];
    const result = await producer.sendBatch({ topicMessages: messages });
    console.log('Succès:', result);
  } catch (error) {
    if (error instanceof KafkaJSNonRetriableError) {
      console.error('Erreur fatale:', error);
    } else {
      console.error('Retryable error, géré par config');
    }
  } finally {
    await producer.disconnect();
  }
}

runProducerWithRetry();

Adds configurable retries (exponential backoff) for transient errors like network issues. Distinguish KafkaJSNonRetriableError for dead-letter queues. The finally block ensures clean disconnect, preventing connection leaks.

Best Practices

  • Idempotence and transactions: Enable idempotent: true and transactional.id for exactly-once in production.
  • Monitoring: Integrate Prometheus via kafkajs instrumentation; track latency and throughput.
  • Security: Use SASL/SSL for cloud brokers (Confluent, Aiven); store creds in env vars.
  • Dynamic batching: Tune batchSize based on message rate to optimize I/O.
  • Graceful shutdown: Trap SIGTERM to drain queues before disconnect.

Common Errors to Avoid

  • Forgetting offsets: Without manual commits, restarts cause duplicates; use pause/resume for backpressure.
  • Too many in-flight requests: >5 per connection reorders; limit to 1 for strict ordering.
  • Auto-topic creation in prod: Disable it; create topics manually with optimal partitions (6-12).
  • Ignoring rebalances: Listen for consumer.events.GROUP_REBALANCE to clean shared state.

Next Steps

  • kafkajs docs: kafkajs.github.io
  • Schema Registry with Avro: Integrate @kafkajs/confluent-schema-registry
  • Kafka Streams in JS: Explore kafkajs-streams
  • Learni Training: Master Kafka deeply with our certified courses (Confluent Developer).