Introduction to KafkaJS
KafkaJS is a modern client for Apache Kafka written in JavaScript. It is designed to support Kafka’s key features, such as high throughput, low latency, durability, and scalability, making it suitable for production environments. In this guide, we will cover a wide array of KafkaJS APIs with detailed explanations and code snippets to help you get started and make the most out of KafkaJS.
Installation
npm install kafkajs
Creating a Kafka Client
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['broker1:9092', 'broker2:9092']
});
Producing Messages
const producer = kafka.producer();
const produce = async () => {
await producer.connect();
await producer.send({
topic: 'my-topic',
messages: [
{ value: 'Hello KafkaJS user!' },
{ value: 'Another message' }
]
});
await producer.disconnect();
}
produce().catch(console.error);
Consuming Messages
const consumer = kafka.consumer({ groupId: 'test-group' });
const consume = async () => {
await consumer.connect();
await consumer.subscribe({ topic: 'my-topic', fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
});
},
});
}
consume().catch(console.error);
Admin API
const admin = kafka.admin();
const manageKafka = async () => {
await admin.connect();
const topics = await admin.listTopics();
console.log('Topics:', topics);
await admin.createTopics({
topics: [{ topic: 'new-topic', numPartitions: 1 }]
});
await admin.deleteTopics({
topics: ['old-topic'],
});
await admin.disconnect();
}
manageKafka().catch(console.error);
Full Example
Here’s how you can combine the above APIs into a full application:
const { Kafka } = require('kafkajs');
const run = async () => {
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['broker1:9092']
});
const producer = kafka.producer();
const consumer = kafka.consumer({ groupId: 'test-group' });
const admin = kafka.admin();
await admin.connect();
await admin.createTopics({ topics: [{ topic: 'app-topic', numPartitions: 1 }] });
await admin.disconnect();
const produce = async () => {
await producer.connect();
await producer.send({
topic: 'app-topic',
messages: [{ value: 'Hello from full example!' }],
});
await producer.disconnect();
};
const consume = async () => {
await consumer.connect();
await consumer.subscribe({ topic: 'app-topic', fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
});
},
});
};
produce().catch(console.error);
consume().catch(console.error);
};
run().catch(console.error);
By following this guide and leveraging the above examples, you can efficiently use KafkaJS to handle message streaming in your Node.js applications.
Hash: f429dccd5b325a94f5b9b3bb899ec94c4bab42c160781546aabb5315c7dc756d