import { type ChannelModel, type ConfirmChannel, connect as amqConnect } from "amqplib"; import { DomainEvent, DomainEventSubscriber } from "../domain/DomainEvent"; import { EventBus } from "../domain/EventBus.port"; import { buffer } from "node:stream/consumers"; export type RMQConnectionParams = { username: string, password: string, vhost: string, hostname: string, port: number, secure: boolean } const PREFETCH_LIMIT = 1 export class RabbitMQEventBus implements EventBus { constructor(args: { connectionParams: RMQConnectionParams }) { this.connectionOptions = args.connectionParams this.checkStructure(); } connection?: ChannelModel channel?: ConfirmChannel connected: Boolean = false private connectionOptions: RMQConnectionParams public async connect() { this.connection = await this.createConnection(); if (this.connection == undefined) throw new Error("[RMQ] Error crecreando la conexion") this.channel = await this.createConfirmChannel() } publish(events: DomainEvent[]): Promise { return new Promise((res, rej) => { try { for (const event of events) { const exchange = "sim.exchange" const routingKey = event.key const content = Buffer.from(JSON.stringify(event)) this.channel?.publish(exchange, routingKey, content) } return res() } catch (err) { return rej(err) } }) } addSubscribers(subscribers: Array>): void { throw new Error("Method not implemented."); } /** * Verificacion que la estructura definida en el JSON corresponde con * la esperada * TODO: Faltan las colas fijas segĂșn las operaciones */ private checkStructure() { this.channel?.assertQueue("sim.queue") this.channel?.assertExchange("sim.exchange", "direct") } protected async createConnection() { const { hostname, port, secure } = { ...this.connectionOptions } const { username, password } = { ...this.connectionOptions }; const protocol = secure ? 'amqps' : 'amqp'; const vhost = this.connectionOptions.vhost const connection = await amqConnect({ protocol, hostname, port, username, password, vhost }); connection.on('error', (error: unknown) => { console.error(`[RMQ] Rabbitmq connection error :: ${error}`); Promise.reject(error); }); return connection; } protected async createConfirmChannel() { const channel = await this.connection?.createConfirmChannel() await channel?.prefetch(PREFETCH_LIMIT) if (channel == undefined) throw new Error("[RMQ] Error crecreando el canal") channel.on('error', (error: unknown) => { console.error(`[RMQ] Rabbitmq channel error :: ${error}`); Promise.reject(error); }); return channel; } }