Files
sf-sim/packages/shared/infrastructure/RabbitMQEventBus.ts

152 lines
4.5 KiB
TypeScript

import { type ChannelModel, type ConfirmChannel, ConsumeMessage, connect as amqConnect } from "amqplib";
import { connect, AmqpConnectionManager, ChannelWrapper, Channel } from "amqp-connection-manager"
import { DomainEvent, DomainEventSubscriber } from "../domain/DomainEvent";
import { EventBus } from "../domain/EventBus.port";
export type RMQConnectionParams = {
username: string,
password: string,
vhost: string,
hostname: string,
port: number,
secure: boolean
}
export class RabbitMQEventBus implements EventBus {
private buildStructure?: (chan: Channel) => void
constructor(args: {
connectionParams: RMQConnectionParams,
buildStructure?: (chan: Channel) => void
}) {
this.connectionOptions = args.connectionParams
if (args.buildStructure != undefined) this.buildStructure = args.buildStructure
}
async consume(queue: string, callback: (msg: ConsumeMessage | null) => void) {
// Comproaciones antes de escuchar
if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado");
await this.channel.consume(queue, callback)
}
ack(msg: ConsumeMessage) {
if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado");
return this.channel.ack(msg)
}
nack(msg: ConsumeMessage) {
if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado");
return this.channel.nack(msg)
}
connection?: AmqpConnectionManager
channel?: ChannelWrapper
connected: Boolean = false
private connectionOptions: RMQConnectionParams
public async connect() {
try {
this.connection = await this.createConnection();
if (this.connection == undefined) throw new Error("[RMQ] Error crecreando la conexion")
this.channel = await this.createChannel()
this.channel.on("close", () => {
console.log("[RMQ] Canal desconectado")
setTimeout(async () => {
this.connect().then(e => {
console.log("[RMQ] Canal reconectado")
})
}, 1000)
})
} catch (e) {
console.error("[RMQ] Error estableciendo la conexion con el servidor", e)
}
}
publish(events: DomainEvent[]): Promise<void> {
return new Promise((res, rej) => {
try {
for (const event of events) {
const exchange = "sim.exchange"
const routingKey = event.key
const content = Buffer.from(JSON.stringify(event))
this.channel?.publish(exchange, routingKey, content, {}, (err, ok) => {
console.log("Confirmacion", err, ok)
})
}
return res()
} catch (err) {
return rej(err)
}
})
}
addSubscribers(subscribers: Array<DomainEventSubscriber<DomainEvent>>): void {
throw new Error("Method not implemented.");
}
protected async createConnection() {
const { hostname, port, secure } = { ...this.connectionOptions }
const { username, password } = { ...this.connectionOptions };
const protocol = secure ? 'amqps' : 'amqp';
const vhost = this.connectionOptions.vhost
const connection = connect({
protocol,
hostname,
port,
username,
password,
vhost
});
connection.on('error', async (error: unknown) => {
console.error(`[RMQ] Rabbitmq connection error :: ${error}`);
console.log(`[RMQ] Reintentando conexion`)
});
connection.on("disconnect", (err: unknown) => {
console.error(`[RMQ] Servidor Rabbitmq desconectado, reintentando ... :: ${err}`)
})
return connection;
}
protected async createChannel() {
if (this.connection == undefined) throw new Error("[RMQ] Intentando crear un canal sin una conexion")
const channel = this.connection.createChannel({
json: true,
setup: (channel: Channel) => {
// Exchanges comunes a todos
channel.assertExchange("sim.exchange", "topic", { durable: true })
channel.assertExchange("sim.dlx", "topic", { durable: true })
// Estructuras propias de cada servicio
if (this.buildStructure != undefined) {
this.buildStructure(channel)
} else {
console.warn("[i] Se ha creado un canal sin garantizar que exista la/s cola/s que se van a usar")
}
},
})
//await channel.prefetch(PREFETCH_LIMIT)
if (channel == undefined) throw new Error("[RMQ] Error crecreando el canal")
channel.on('error', (error: unknown) => {
console.error(`[RMQ] Rabbitmq channel error :: ${error}`);
Promise.reject(error);
});
return channel;
}
}