Files
sf-sim/packages/sim-shared/infrastructure/RabbitMQEventBus.ts

222 lines
6.7 KiB
TypeScript
Raw Normal View History

2026-01-16 11:14:35 +01:00
import { type ChannelModel, type ConfirmChannel, ConsumeMessage, connect as amqConnect } from "amqplib";
import { connect, AmqpConnectionManager, ChannelWrapper, Channel } from "amqp-connection-manager"
import { DomainEvent, DomainEventSubscriber } from "../domain/DomainEvent.js";
import { EventBus } from "../domain/EventBus.port.js";
export type RMQConnectionParams = {
username: string,
password: string,
vhost: string,
hostname: string,
port: number,
secure: boolean
}
export class RabbitMQEventBus implements EventBus {
2026-01-30 10:42:48 +01:00
private buildStructure?: (chan: Channel) => Promise<void>
private maxRetry: number = 0
connection?: AmqpConnectionManager
channel?: ChannelWrapper
connected: Boolean = false
2026-04-22 12:31:46 +02:00
private delayedExchange: string;
private dlxExchange: string;
private connectionOptions: RMQConnectionParams
constructor(args: {
connectionParams: RMQConnectionParams,
2026-01-30 10:42:48 +01:00
buildStructure?: (chan: Channel) => Promise<void>,
2026-04-22 12:31:46 +02:00
maxRetry?: number,
delayedExchange: string,
dlxExchange: string
}) {
this.connectionOptions = args.connectionParams
if (args.buildStructure != undefined) this.buildStructure = args.buildStructure
2026-01-30 10:42:48 +01:00
if (args.maxRetry != undefined) this.maxRetry = args.maxRetry
2026-04-22 12:31:46 +02:00
this.delayedExchange = args.delayedExchange
this.dlxExchange = args.dlxExchange
}
2026-01-16 11:14:35 +01:00
async consume(queue: string, callback: (msg: ConsumeMessage | null) => void) {
// Comproaciones antes de escuchar
if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado");
await this.channel.consume(queue, callback)
}
2026-01-30 10:42:48 +01:00
async ack(msg: ConsumeMessage) {
2026-01-16 11:14:35 +01:00
if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado");
return this.channel.ack(msg)
}
/**
* TODO:
* - Esta implementacion del nack debe estar en objenious
*/
2026-01-30 10:42:48 +01:00
async nack(msg: ConsumeMessage, requeue?: boolean) {
if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado");
2026-01-30 10:42:48 +01:00
2026-04-22 12:31:46 +02:00
console.log("[i] NACK: ", msg.properties.headers)
2026-01-30 10:42:48 +01:00
const headers = msg.properties.headers || {}
const numberRetry = headers['x-retry-count'] || 0
const routingKey = msg.fields.routingKey
if (numberRetry < this.maxRetry) {
2026-04-22 12:31:46 +02:00
console.log("[i] Delaying ")
// "sim.ex.objenious.delayed"
await this.channel.publish(this.delayedExchange, routingKey, msg.content, {
2026-01-30 10:42:48 +01:00
headers: {
...headers,
'x-retry-count': numberRetry + 1
}
})
} else {
2026-04-22 12:31:46 +02:00
console.log("[i] DeadLetter")
//"sim.ex.objenious.dlx"
await this.channel.publish(this.dlxExchange, routingKey, msg.content, {
2026-01-30 10:42:48 +01:00
headers: {
...headers
}
})
}
// Hace falta?
this.channel.ack(msg)
2026-01-30 10:42:48 +01:00
//return this.channel.nack(msg, false, requeue)
}
public async connect() {
try {
this.connection = await this.createConnection();
if (this.connection == undefined) throw new Error("[RMQ] Error crecreando la conexion")
this.channel = await this.createChannel()
this.channel.on("close", () => {
console.log("[RMQ] Canal desconectado")
setTimeout(async () => {
this.connect().then(e => {
console.log("[RMQ] Canal reconectado")
})
}, 1000)
})
} catch (e) {
console.error("[RMQ] Error estableciendo la conexion con el servidor", e)
}
}
publish(events: DomainEvent[]): Promise<{ success: DomainEvent[], error: DomainEvent[] }> {
return new Promise(async (res, rej) => {
const successEvents: DomainEvent[] = []
const errorEvents: DomainEvent[] = []
try {
for (const event of events) {
const exchange = "sim.exchange"
const routingKey = event.key
const content = Buffer.from(JSON.stringify(event))
const isPublished = await this.channel?.publish(exchange, routingKey, content, {
headers: {
...event.headers
}
}, (err, ok) => {
if (err == undefined) {
console.log("Evento publicado ", event)
} else {
console.error("Error publicando", event)
}
})
// Hay que revisarlo pero en principio la libreria se encarga que el mensaje se publique
// si o si
successEvents.push(event)
}
return res({
success: successEvents,
error: errorEvents
})
} catch (err) {
return rej(err)
}
})
}
addSubscribers(subscribers: Array<DomainEventSubscriber<DomainEvent>>): void {
throw new Error("Method not implemented.");
}
protected async createConnection() {
const { hostname, port, secure } = { ...this.connectionOptions }
const { username, password } = { ...this.connectionOptions };
const protocol = secure ? 'amqps' : 'amqp';
const vhost = this.connectionOptions.vhost
const connection = connect({
protocol,
hostname,
port,
username,
password,
vhost
});
connection.on('error', async (error: unknown) => {
console.error(`[RMQ] Rabbitmq connection error :: ${error}`);
console.log(`[RMQ] Reintentando conexion`)
});
connection.on("disconnect", (err: unknown) => {
2026-01-30 10:42:48 +01:00
console.error(`[RMQ] Servidor Rabbitmq desconectado, reintentando ... ::`, err)
})
return connection;
}
protected async createChannel(): Promise<ChannelWrapper> {
2026-01-30 10:42:48 +01:00
const delay = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
if (this.connection == undefined) throw new Error("[RMQ] Intentando crear un canal sin una conexion")
const channel = this.connection.createChannel({
confirm: true,
setup: async (channel: ConfirmChannel) => {
// Exchanges comunes a todos
channel.assertExchange("sim.exchange", "topic", { durable: true })
channel.assertExchange("sim.dlx", "topic", { durable: true })
// Estructuras propias de cada servicio
if (this.buildStructure != undefined) {
2026-01-30 10:42:48 +01:00
let topoligaSuccess = false
while (!topoligaSuccess) {
try {
await this.buildStructure(channel)
topoligaSuccess = true
} catch (e) {
console.log("[RMQ] Error Creando la topologia de rabbitmq", e)
topoligaSuccess = false
await delay(5 * 1000)
}
}
} else {
console.warn("[i] Se ha creado un canal sin garantizar que exista la/s cola/s que se van a usar")
}
},
})
//await channel.prefetch(PREFETCH_LIMIT)
if (channel == undefined) throw new Error("[RMQ] Error crecreando el canal")
channel.on('error', (error: unknown) => {
console.error(`[RMQ] Rabbitmq channel error :: ${error}`);
Promise.reject(error);
});
return channel;
}
}