import { type ChannelModel, type ConfirmChannel, ConsumeMessage, connect as amqConnect } from "amqplib"; import { connect, AmqpConnectionManager, ChannelWrapper, Channel } from "amqp-connection-manager" import { DomainEvent, DomainEventSubscriber } from "../domain/DomainEvent"; import { EventBus } from "../domain/EventBus.port"; export type RMQConnectionParams = { username: string, password: string, vhost: string, hostname: string, port: number, secure: boolean } export class RabbitMQEventBus implements EventBus { private buildStructure?: (chan: Channel) => Promise private maxRetry: number = 0 constructor(args: { connectionParams: RMQConnectionParams, buildStructure?: (chan: Channel) => Promise, maxRetry?: number }) { this.connectionOptions = args.connectionParams if (args.buildStructure != undefined) this.buildStructure = args.buildStructure if (args.maxRetry != undefined) this.maxRetry = args.maxRetry } async consume(queue: string, callback: (msg: ConsumeMessage | null) => void) { // Comproaciones antes de escuchar if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado"); await this.channel.consume(queue, callback) } async ack(msg: ConsumeMessage) { if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado"); return this.channel.ack(msg) } /** * TODO: * - Esta implementacion del nack debe estar en objenious */ async nack(msg: ConsumeMessage, requeue?: boolean) { if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado"); console.log("NACK: ", msg.properties.headers) const headers = msg.properties.headers || {} const numberRetry = headers['x-retry-count'] || 0 const routingKey = msg.fields.routingKey if (numberRetry < this.maxRetry) { console.log("Delaying") await this.channel.publish("sim.ex.objenious.delayed", routingKey, msg.content, { headers: { ...headers, 'x-retry-count': numberRetry + 1 } }) } else { console.log("DeadLetter") await this.channel.publish("sim.ex.objenious.dlx", routingKey, msg.content, { headers: { ...headers } }) } // Hace falta? this.channel.ack(msg) //return this.channel.nack(msg, false, requeue) } connection?: AmqpConnectionManager channel?: ChannelWrapper connected: Boolean = false private connectionOptions: RMQConnectionParams public async connect() { try { this.connection = await this.createConnection(); if (this.connection == undefined) throw new Error("[RMQ] Error crecreando la conexion") this.channel = await this.createChannel() this.channel.on("close", () => { console.log("[RMQ] Canal desconectado") setTimeout(async () => { this.connect().then(e => { console.log("[RMQ] Canal reconectado") }) }, 1000) }) } catch (e) { console.error("[RMQ] Error estableciendo la conexion con el servidor", e) } } publish(events: DomainEvent[]): Promise { return new Promise((res, rej) => { try { for (const event of events) { const exchange = "sim.exchange" const routingKey = event.key const content = Buffer.from(JSON.stringify(event)) this.channel?.publish(exchange, routingKey, content, {}, (err, ok) => { if (err == undefined) { console.log("Evento publicado ", event) } else { console.error("Error publicando", event) } }) } return res() } catch (err) { return rej(err) } }) } addSubscribers(subscribers: Array>): void { throw new Error("Method not implemented."); } protected async createConnection() { const { hostname, port, secure } = { ...this.connectionOptions } const { username, password } = { ...this.connectionOptions }; const protocol = secure ? 'amqps' : 'amqp'; const vhost = this.connectionOptions.vhost const connection = connect({ protocol, hostname, port, username, password, vhost }); connection.on('error', async (error: unknown) => { console.error(`[RMQ] Rabbitmq connection error :: ${error}`); console.log(`[RMQ] Reintentando conexion`) }); connection.on("disconnect", (err: unknown) => { console.error(`[RMQ] Servidor Rabbitmq desconectado, reintentando ... ::`, err) }) return connection; } protected async createChannel() { const delay = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); if (this.connection == undefined) throw new Error("[RMQ] Intentando crear un canal sin una conexion") const channel = this.connection.createChannel({ setup: async (channel: Channel) => { // Exchanges comunes a todos channel.assertExchange("sim.exchange", "topic", { durable: true }) channel.assertExchange("sim.dlx", "topic", { durable: true }) // Estructuras propias de cada servicio if (this.buildStructure != undefined) { let topoligaSuccess = false while (!topoligaSuccess) { try { await this.buildStructure(channel) topoligaSuccess = true } catch (e) { console.log("[RMQ] Error Creando la topologia de rabbitmq", e) topoligaSuccess = false await delay(5 * 1000) } } } else { console.warn("[i] Se ha creado un canal sin garantizar que exista la/s cola/s que se van a usar") } }, }) //await channel.prefetch(PREFETCH_LIMIT) if (channel == undefined) throw new Error("[RMQ] Error crecreando el canal") channel.on('error', (error: unknown) => { console.error(`[RMQ] Rabbitmq channel error :: ${error}`); Promise.reject(error); }); return channel; } }