Comprehensive KafkaJS Guide for Developers Enhancing Kafka with Node.js

Introduction to KafkaJS

KafkaJS is a modern client for Apache Kafka written in JavaScript. It is designed to support Kafka’s key features, such as high throughput, low latency, durability, and scalability, making it suitable for production environments. In this guide, we will cover a wide array of KafkaJS APIs with detailed explanations and code snippets to help you get started and make the most out of KafkaJS.

Installation

npm install kafkajs

Creating a Kafka Client


const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['broker1:9092', 'broker2:9092']
});

Producing Messages


const producer = kafka.producer();

const produce = async () => {
  await producer.connect();
  await producer.send({
    topic: 'my-topic',
    messages: [
      { value: 'Hello KafkaJS user!' },
      { value: 'Another message' }
    ]
  });
  await producer.disconnect();
}

produce().catch(console.error);

Consuming Messages


const consumer = kafka.consumer({ groupId: 'test-group' });

const consume = async () => {
  await consumer.connect();
  await consumer.subscribe({ topic: 'my-topic', fromBeginning: true });
  
  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString(),
      });
    },
  });
}

consume().catch(console.error);

Admin API


const admin = kafka.admin();

const manageKafka = async () => {
  await admin.connect();
  
  const topics = await admin.listTopics();
  console.log('Topics:', topics);
  
  await admin.createTopics({
    topics: [{ topic: 'new-topic', numPartitions: 1 }]
  });
  
  await admin.deleteTopics({
    topics: ['old-topic'],
  });
  
  await admin.disconnect();
}

manageKafka().catch(console.error);

Full Example

Here’s how you can combine the above APIs into a full application:


const { Kafka } = require('kafkajs');

const run = async () => {
  const kafka = new Kafka({
    clientId: 'my-app',
    brokers: ['broker1:9092']
  });
  
  const producer = kafka.producer();
  const consumer = kafka.consumer({ groupId: 'test-group' });
  const admin = kafka.admin();
  
  await admin.connect();
  await admin.createTopics({ topics: [{ topic: 'app-topic', numPartitions: 1 }] });
  await admin.disconnect();
  
  const produce = async () => {
    await producer.connect();
    await producer.send({
      topic: 'app-topic',
      messages: [{ value: 'Hello from full example!' }],
    });
    await producer.disconnect();
  };
  
  const consume = async () => {
    await consumer.connect();
    await consumer.subscribe({ topic: 'app-topic', fromBeginning: true });
    await consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        console.log({
          partition,
          offset: message.offset,
          value: message.value.toString(),
        });
      },
    });
  };
  
  produce().catch(console.error);
  consume().catch(console.error);
};

run().catch(console.error);

By following this guide and leveraging the above examples, you can efficiently use KafkaJS to handle message streaming in your Node.js applications.

Hash: f429dccd5b325a94f5b9b3bb899ec94c4bab42c160781546aabb5315c7dc756d

Leave a Reply

Your email address will not be published. Required fields are marked *