Files
sf-sim/packages/sim-shared/infrastructure/RabbitMQEventBus.ts
2026-05-11 12:15:37 +02:00

225 lines
6.8 KiB
TypeScript

import { type ChannelModel, type ConfirmChannel, ConsumeMessage, connect as amqConnect } from "amqplib";
import { connect, AmqpConnectionManager, ChannelWrapper, Channel } from "amqp-connection-manager"
import { DomainEvent, DomainEventSubscriber } from "../domain/DomainEvent.js";
import { EventBus } from "../domain/EventBus.port.js";
export type RMQConnectionParams = {
username: string,
password: string,
vhost: string,
hostname: string,
port: number,
secure: boolean
}
export class RabbitMQEventBus implements EventBus {
private buildStructure?: (chan: Channel) => Promise<void>
private maxRetry: number = 0
connection?: AmqpConnectionManager
channel?: ChannelWrapper
connected: Boolean = false
private delayedExchange: string;
private dlxExchange: string;
private connectionOptions: RMQConnectionParams
constructor(args: {
connectionParams: RMQConnectionParams,
buildStructure?: (chan: Channel) => Promise<void>,
maxRetry?: number,
delayedExchange: string,
dlxExchange: string
}) {
this.connectionOptions = args.connectionParams
if (args.buildStructure != undefined) this.buildStructure = args.buildStructure
if (args.maxRetry != undefined) this.maxRetry = args.maxRetry
this.delayedExchange = args.delayedExchange
this.dlxExchange = args.dlxExchange
}
async consume(queue: string, callback: (msg: ConsumeMessage | null) => void) {
// Comproaciones antes de escuchar
if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado");
await this.channel.consume(queue, callback)
}
async ack(msg: ConsumeMessage) {
if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado");
return this.channel.ack(msg)
}
/**
* Re-implementacion del nack con chequeo del numero de reinetentos.
* TODO:
* - Decidir si se chequean o no los reintentos con errores 429
* - Motivo del último error en el mensaje
*
* @param msg
* @param requeue
*/
async nack(msg: ConsumeMessage, requeue?: boolean) {
if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado");
console.log("[i] NACK: ", msg.properties.headers)
const headers = msg.properties.headers || {}
const numberRetry = headers['x-retry-count'] || 0
const routingKey = msg.fields.routingKey
if (numberRetry < this.maxRetry) {
console.log("[i] Delaying ")
// "sim.ex.objenious.delayed"
await this.channel.publish(this.delayedExchange, routingKey, msg.content, {
headers: {
...headers,
'x-retry-count': numberRetry + 1
}
})
} else {
console.log("[i] DeadLetter")
//"sim.ex.objenious.dlx"
await this.channel.publish(this.dlxExchange, routingKey, msg.content, {
headers: {
...headers
}
})
}
this.channel.ack(msg)
}
public async connect() {
try {
this.connection = await this.createConnection();
if (this.connection == undefined) throw new Error("[RMQ] Error crecreando la conexion")
this.channel = await this.createChannel()
this.channel.on("close", () => {
console.log("[RMQ] Canal desconectado")
setTimeout(async () => {
this.connect().then(e => {
console.log("[RMQ] Canal reconectado")
})
}, 1000)
})
} catch (e) {
console.error("[RMQ] Error estableciendo la conexion con el servidor", e)
}
}
publish(events: DomainEvent[]): Promise<{ success: DomainEvent[], error: DomainEvent[] }> {
return new Promise(async (res, rej) => {
const successEvents: DomainEvent[] = []
const errorEvents: DomainEvent[] = []
try {
for (const event of events) {
const exchange = "sim.exchange"
const routingKey = event.key
const content = Buffer.from(JSON.stringify(event))
const isPublished = await this.channel?.publish(exchange, routingKey, content, {
headers: {
...event.headers
}
}, (err, ok) => {
if (err == undefined) {
console.log("Evento publicado ", event)
} else {
console.error("Error publicando", event)
}
})
// Hay que revisarlo pero en principio la libreria se encarga que el mensaje se publique
// si o si
successEvents.push(event)
}
return res({
success: successEvents,
error: errorEvents
})
} catch (err) {
return rej(err)
}
})
}
addSubscribers(subscribers: Array<DomainEventSubscriber<DomainEvent>>): void {
throw new Error("Method not implemented.");
}
protected async createConnection() {
const { hostname, port, secure } = { ...this.connectionOptions }
const { username, password } = { ...this.connectionOptions };
const protocol = secure ? 'amqps' : 'amqp';
const vhost = this.connectionOptions.vhost
const connection = connect({
protocol,
hostname,
port,
username,
password,
vhost
});
connection.on('error', async (error: unknown) => {
console.error(`[RMQ] Rabbitmq connection error :: ${error}`);
console.log(`[RMQ] Reintentando conexion`)
});
connection.on("disconnect", (err: unknown) => {
console.error(`[RMQ] Servidor Rabbitmq desconectado, reintentando ... ::`, err)
})
return connection;
}
protected async createChannel(): Promise<ChannelWrapper> {
const delay = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
if (this.connection == undefined) throw new Error("[RMQ] Intentando crear un canal sin una conexion")
const channel = this.connection.createChannel({
confirm: true,
setup: async (channel: ConfirmChannel) => {
// Exchanges comunes a todos
channel.assertExchange("sim.exchange", "topic", { durable: true })
channel.assertExchange("sim.dlx", "topic", { durable: true })
// Estructuras propias de cada servicio
if (this.buildStructure != undefined) {
let topoligaSuccess = false
while (!topoligaSuccess) {
try {
await this.buildStructure(channel)
topoligaSuccess = true
} catch (e) {
console.log("[RMQ] Error Creando la topologia de rabbitmq", e)
topoligaSuccess = false
await delay(5 * 1000)
}
}
} else {
console.warn("[i] Se ha creado un canal sin garantizar que exista la/s cola/s que se van a usar")
}
},
})
//await channel.prefetch(PREFETCH_LIMIT)
if (channel == undefined) throw new Error("[RMQ] Error crecreando el canal")
channel.on('error', (error: unknown) => {
console.error(`[RMQ] Rabbitmq channel error :: ${error}`);
Promise.reject(error);
});
return channel;
}
}