2026-01-13 15:41:59 +01:00
|
|
|
import { type ChannelModel, type ConfirmChannel, connect as amqConnect } from "amqplib";
|
|
|
|
|
import { DomainEvent, DomainEventSubscriber } from "../domain/DomainEvent";
|
|
|
|
|
import { EventBus } from "../domain/EventBus.port";
|
|
|
|
|
import { buffer } from "node:stream/consumers";
|
|
|
|
|
|
|
|
|
|
export type RMQConnectionParams = {
|
|
|
|
|
username: string,
|
|
|
|
|
password: string,
|
|
|
|
|
vhost: string,
|
|
|
|
|
hostname: string,
|
|
|
|
|
port: number,
|
|
|
|
|
secure: boolean
|
|
|
|
|
}
|
2026-01-12 13:08:56 +01:00
|
|
|
|
|
|
|
|
const PREFETCH_LIMIT = 1
|
2026-01-13 15:41:59 +01:00
|
|
|
export class RabbitMQEventBus implements EventBus {
|
|
|
|
|
constructor(args: {
|
|
|
|
|
connectionParams: RMQConnectionParams
|
|
|
|
|
}) {
|
|
|
|
|
this.connectionOptions = args.connectionParams
|
|
|
|
|
this.checkStructure();
|
|
|
|
|
}
|
2026-01-12 13:08:56 +01:00
|
|
|
|
|
|
|
|
connection?: ChannelModel
|
|
|
|
|
channel?: ConfirmChannel
|
|
|
|
|
connected: Boolean = false
|
|
|
|
|
|
2026-01-13 15:41:59 +01:00
|
|
|
private connectionOptions: RMQConnectionParams
|
|
|
|
|
|
|
|
|
|
public async connect() {
|
|
|
|
|
this.connection = await this.createConnection();
|
|
|
|
|
if (this.connection == undefined) throw new Error("[RMQ] Error crecreando la conexion")
|
|
|
|
|
this.channel = await this.createConfirmChannel()
|
2026-01-12 13:08:56 +01:00
|
|
|
}
|
|
|
|
|
|
2026-01-13 15:41:59 +01:00
|
|
|
publish(events: DomainEvent[]): Promise<void> {
|
|
|
|
|
return new Promise((res, rej) => {
|
|
|
|
|
try {
|
|
|
|
|
for (const event of events) {
|
|
|
|
|
const exchange = "sim.exchange"
|
|
|
|
|
const routingKey = event.key
|
|
|
|
|
const content = Buffer.from(JSON.stringify(event))
|
|
|
|
|
this.channel?.publish(exchange, routingKey, content)
|
|
|
|
|
}
|
|
|
|
|
return res()
|
|
|
|
|
} catch (err) {
|
|
|
|
|
return rej(err)
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
addSubscribers(subscribers: Array<DomainEventSubscriber<DomainEvent>>): void {
|
|
|
|
|
throw new Error("Method not implemented.");
|
2026-01-12 13:08:56 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Verificacion que la estructura definida en el JSON corresponde con
|
|
|
|
|
* la esperada
|
|
|
|
|
* TODO: Faltan las colas fijas según las operaciones
|
|
|
|
|
*/
|
|
|
|
|
private checkStructure() {
|
|
|
|
|
this.channel?.assertQueue("sim.queue")
|
|
|
|
|
this.channel?.assertExchange("sim.exchange", "direct")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected async createConnection() {
|
|
|
|
|
const { hostname, port, secure } = { ...this.connectionOptions }
|
|
|
|
|
const { username, password } = { ...this.connectionOptions };
|
|
|
|
|
const protocol = secure ? 'amqps' : 'amqp';
|
|
|
|
|
const vhost = this.connectionOptions.vhost
|
|
|
|
|
|
2026-01-13 15:41:59 +01:00
|
|
|
const connection = await amqConnect({
|
2026-01-12 13:08:56 +01:00
|
|
|
protocol,
|
|
|
|
|
hostname,
|
|
|
|
|
port,
|
|
|
|
|
username,
|
|
|
|
|
password,
|
|
|
|
|
vhost
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
connection.on('error', (error: unknown) => {
|
|
|
|
|
console.error(`[RMQ] Rabbitmq connection error :: ${error}`);
|
|
|
|
|
Promise.reject(error);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
return connection;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected async createConfirmChannel() {
|
|
|
|
|
const channel = await this.connection?.createConfirmChannel()
|
|
|
|
|
await channel?.prefetch(PREFETCH_LIMIT)
|
|
|
|
|
|
|
|
|
|
if (channel == undefined) throw new Error("[RMQ] Error crecreando el canal")
|
|
|
|
|
|
|
|
|
|
channel.on('error', (error: unknown) => {
|
|
|
|
|
console.error(`[RMQ] Rabbitmq channel error :: ${error}`);
|
|
|
|
|
Promise.reject(error);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
return channel;
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|