Introduction to Ampqlib
Ampqlib is a powerful Node.js library for interacting with RabbitMQ message broker. It provides a robust interface for various messaging patterns and guarantees delivery models. This article will guide you through the essential APIs with practical examples and include a real-world application demonstrating their use.
Connecting to RabbitMQ
First, you need to install the library:
npm install amqplib
Then, establish a connection to the RabbitMQ server:
const amqp = require('amqplib');
async function connect() {
try {
const connection = await amqp.connect('amqp://localhost');
console.log('Connected to RabbitMQ');
} catch (error) {
console.error('Failed to connect to RabbitMQ:', error);
}
}
connect();
Creating a Channel
A channel is required to interact with RabbitMQ:
async function createChannel() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
console.log('Channel created');
return channel;
}
createChannel();
Declaring a Queue
Queues are responsible for storing messages:
async function createQueue(channel, queueName) {
await channel.assertQueue(queueName, { durable: true });
console.log(`Queue ${queueName} created`);
}
async function setup() {
const channel = await createChannel();
await createQueue(channel, 'myQueue');
}
setup();
Publishing Messages
Messages can be sent to the queue:
async function sendMessage(channel, queue, message) {
channel.sendToQueue(queue, Buffer.from(message), { persistent: true });
console.log(`Sent ${message} to ${queue}`);
}
async function publishMessage() {
const channel = await createChannel();
await createQueue(channel, 'myQueue');
sendMessage(channel, 'myQueue', 'Hello World');
}
publishMessage();
Consuming Messages
Receive messages from the queue:
async function consumeMessages(channel, queue) {
await channel.consume(queue, msg => {
if (msg !== null) {
console.log(`Received ${msg.content.toString()}`);
channel.ack(msg);
}
});
}
async function startConsumer() {
const channel = await createChannel();
await createQueue(channel, 'myQueue');
consumeMessages(channel, 'myQueue');
}
startConsumer();
Handling Connections and Errors
Always manage errors gracefully:
const connection = await amqp.connect('amqp://localhost'); connection.on('error', err => {
console.error('Connection error:', err);
});
connection.on('close', () => {
console.log('Connection closed');
});
Example Application
Combining the above API usage in a single app:
const amqp = require('amqplib');
async function connectToRabbitMQ() {
try {
const connection = await amqp.connect('amqp://localhost');
connection.on('error', console.error);
connection.on('close', () => console.log('Connection closed'));
const channel = await connection.createChannel();
await channel.assertQueue('taskQueue', { durable: true });
channel.consume('taskQueue', msg => {
if (msg !== null) {
console.log(`Received: ${msg.content.toString()}`);
channel.ack(msg);
}
});
setInterval(() => {
const message = `Message at ${new Date()}`;
channel.sendToQueue('taskQueue', Buffer.from(message), { persistent: true });
console.log(`Sent: ${message}`);
}, 10000);
} catch (error) {
console.error('Connection error:', error);
}
}
connectToRabbitMQ();
By following the above steps, you can efficiently build robust applications using RabbitMQ with the help of the Ampqlib library.
Hash: 552d27fcbdca536822c4bf44e41a8f618788c3d54527ed57e2c8a6e4d12913d3