Files
sf-sim/packages/shared/infrastructure/RabbitMQEventBus.ts

127 lines
3.6 KiB
TypeScript

import { type ChannelModel, type ConfirmChannel, ConsumeMessage, connect as amqConnect } from "amqplib";
import { DomainEvent, DomainEventSubscriber } from "../domain/DomainEvent";
import { EventBus } from "../domain/EventBus.port";
export type RMQConnectionParams = {
username: string,
password: string,
vhost: string,
hostname: string,
port: number,
secure: boolean
}
const PREFETCH_LIMIT = 1
export class RabbitMQEventBus implements EventBus {
constructor(args: {
connectionParams: RMQConnectionParams
}) {
this.connectionOptions = args.connectionParams
this.checkStructure();
}
async consume(queue: string, callback: (msg: ConsumeMessage | null) => void) {
// Comproaciones antes de escuchar
if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado");
this.checkStructure()
// El binding (cola -> [routingkey] -> exchange) lo hago por configuracion. Meter colas a demanda?
await this.channel.prefetch(1)
await this.channel.consume(queue, callback)
}
ack(msg: ConsumeMessage) {
if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado");
return this.channel.ack(msg)
}
nack(msg: ConsumeMessage) {
if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado");
return this.channel.nack(msg)
}
connection?: ChannelModel
channel?: ConfirmChannel
connected: Boolean = false
private connectionOptions: RMQConnectionParams
public async connect() {
this.connection = await this.createConnection();
if (this.connection == undefined) throw new Error("[RMQ] Error crecreando la conexion")
this.channel = await this.createConfirmChannel()
}
publish(events: DomainEvent[]): Promise<void> {
return new Promise((res, rej) => {
try {
for (const event of events) {
const exchange = "sim.exchange"
const routingKey = event.key
const content = Buffer.from(JSON.stringify(event))
this.channel?.publish(exchange, routingKey, content)
}
return res()
} catch (err) {
return rej(err)
}
})
}
addSubscribers(subscribers: Array<DomainEventSubscriber<DomainEvent>>): void {
throw new Error("Method not implemented.");
}
/**
* Verificacion que la estructura definida en el JSON corresponde con
* la esperada
* TODO: Faltan las colas fijas según las operaciones
*/
private checkStructure() {
this.channel?.assertQueue("sim.activations")
this.channel?.assertQueue("sim.cancelations")
this.channel?.assertQueue("sim.logs")
this.channel?.assertExchange("sim.exchange", "topic")
}
protected async createConnection() {
const { hostname, port, secure } = { ...this.connectionOptions }
const { username, password } = { ...this.connectionOptions };
const protocol = secure ? 'amqps' : 'amqp';
const vhost = this.connectionOptions.vhost
const connection = await amqConnect({
protocol,
hostname,
port,
username,
password,
vhost
});
connection.on('error', (error: unknown) => {
console.error(`[RMQ] Rabbitmq connection error :: ${error}`);
Promise.reject(error);
});
return connection;
}
protected async createConfirmChannel() {
const channel = await this.connection?.createConfirmChannel()
await channel?.prefetch(PREFETCH_LIMIT)
if (channel == undefined) throw new Error("[RMQ] Error crecreando el canal")
channel.on('error', (error: unknown) => {
console.error(`[RMQ] Rabbitmq channel error :: ${error}`);
Promise.reject(error);
});
return channel;
}
}