import { type ChannelModel, type ConfirmChannel, ConsumeMessage, connect as amqConnect } from "amqplib"; import { connect, AmqpConnectionManager, ChannelWrapper, Channel } from "amqp-connection-manager" import { DomainEvent, DomainEventSubscriber } from "../domain/DomainEvent"; import { EventBus } from "../domain/EventBus.port"; export type RMQConnectionParams = { username: string, password: string, vhost: string, hostname: string, port: number, secure: boolean } export class RabbitMQEventBus implements EventBus { private buildStructure?: (chan: Channel) => void constructor(args: { connectionParams: RMQConnectionParams, buildStructure?: (chan: Channel) => void }) { this.connectionOptions = args.connectionParams if (args.buildStructure != undefined) this.buildStructure = args.buildStructure } async consume(queue: string, callback: (msg: ConsumeMessage | null) => void) { // Comproaciones antes de escuchar if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado"); // El binding (cola -> [routingkey] -> exchange) lo hago por configuracion. Meter colas a demanda? //await this.channel.prefetch(1) await this.channel.consume(queue, callback) } ack(msg: ConsumeMessage) { if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado"); return this.channel.ack(msg) } nack(msg: ConsumeMessage) { if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado"); return this.channel.nack(msg) } connection?: AmqpConnectionManager channel?: ChannelWrapper connected: Boolean = false private connectionOptions: RMQConnectionParams public async connect() { try { this.connection = await this.createConnection(); if (this.connection == undefined) throw new Error("[RMQ] Error crecreando la conexion") this.channel = await this.createConfirmChannel() this.channel.on("close", () => { console.log("[RMQ] Canal desconectado") setTimeout(async () => { this.connect().then(e => { console.log("[RMQ] Canal reconectado") }) }, 1000) }) } catch (e) { console.error("[RMQ] Error estableciendo la conexion con el servidor", e) } } publish(events: DomainEvent[]): Promise { return new Promise((res, rej) => { try { for (const event of events) { const exchange = "sim.exchange" const routingKey = event.key const content = Buffer.from(JSON.stringify(event)) this.channel?.publish(exchange, routingKey, content, {}, (err, ok) => { console.log("Confirmacion", err, ok) }) } return res() } catch (err) { return rej(err) } }) } addSubscribers(subscribers: Array>): void { throw new Error("Method not implemented."); } protected async createConnection() { const { hostname, port, secure } = { ...this.connectionOptions } const { username, password } = { ...this.connectionOptions }; const protocol = secure ? 'amqps' : 'amqp'; const vhost = this.connectionOptions.vhost const connection = connect({ protocol, hostname, port, username, password, vhost }); connection.on('error', async (error: unknown) => { console.error(`[RMQ] Rabbitmq connection error :: ${error}`); console.log(`[RMQ] Reintentando conexion`) }); connection.on("disconnect", (err) => { console.error(`[RMQ] Servidor Rabbitmq desconectado, reintentando ...`) }) return connection; } protected async createConfirmChannel() { if (this.connection == undefined) throw new Error("[RMQ] Intentando crear un canal sin una conexion") const channel = this.connection.createChannel({ json: true, setup: (channel: Channel) => { // Exchanges comunes a todos channel.assertExchange("sim.exchange", "topic", { durable: true }) channel.assertExchange("sim.dlx", "topic", { durable: true }) // Estructuras propias de cada servicio if (this.buildStructure != undefined) { this.buildStructure(channel) } else { console.warn("[i] Se ha creado un canal sin garantizar que exista la/s cola/s que se van a usar") } }, }) //await channel.prefetch(PREFETCH_LIMIT) if (channel == undefined) throw new Error("[RMQ] Error crecreando el canal") channel.on('error', (error: unknown) => { console.error(`[RMQ] Rabbitmq channel error :: ${error}`); Promise.reject(error); }); return channel; } }