Files
sf-sim/packages/shared/adapters/queues/RabbitMQClient.ts

79 lines
2.0 KiB
TypeScript
Raw Normal View History

import client, { ChannelModel, ConfirmChannel, connect, Connection } from "amqplib"
const PREFETCH_LIMIT = 1
export class RabbitConnection {
connection?: ChannelModel
channel?: ConfirmChannel
connected: Boolean = false
private connectionOptions: {
hostname: string,
port: number,
secure: boolean,
username: string,
password: string,
vhost: string
}
constructor(opts: typeof this.connectionOptions) {
this.connectionOptions = opts
this.checkStructure();
}
/**
* Verificacion que la estructura definida en el JSON corresponde con
* la esperada
* TODO: Faltan las colas fijas según las operaciones
*/
private checkStructure() {
this.channel?.assertQueue("sim.queue")
this.channel?.assertExchange("sim.exchange", "direct")
}
public async connect() {
this.connection = await this.createConnection();
if (this.connection == undefined) throw new Error("[RMQ] Error crecreando la conexion")
this.channel = await this.createConfirmChannel()
}
protected async createConnection() {
const { hostname, port, secure } = { ...this.connectionOptions }
const { username, password } = { ...this.connectionOptions };
const protocol = secure ? 'amqps' : 'amqp';
const vhost = this.connectionOptions.vhost
const connection = await client.connect({
protocol,
hostname,
port,
username,
password,
vhost
});
connection.on('error', (error: unknown) => {
console.error(`[RMQ] Rabbitmq connection error :: ${error}`);
Promise.reject(error);
});
return connection;
}
protected async createConfirmChannel() {
const channel = await this.connection?.createConfirmChannel()
await channel?.prefetch(PREFETCH_LIMIT)
if (channel == undefined) throw new Error("[RMQ] Error crecreando el canal")
channel.on('error', (error: unknown) => {
console.error(`[RMQ] Rabbitmq channel error :: ${error}`);
Promise.reject(error);
});
return channel;
}
}