Refactor de las rutas para lanzarse con node (sin tsx)
This commit is contained in:
200
packages/sim-shared/infrastructure/RabbitMQEventBus.ts
Normal file
200
packages/sim-shared/infrastructure/RabbitMQEventBus.ts
Normal file
@@ -0,0 +1,200 @@
|
||||
import { type ChannelModel, type ConfirmChannel, ConsumeMessage, connect as amqConnect } from "amqplib";
|
||||
import { connect, AmqpConnectionManager, ChannelWrapper, Channel } from "amqp-connection-manager"
|
||||
|
||||
import { DomainEvent, DomainEventSubscriber } from "../domain/DomainEvent.js";
|
||||
import { EventBus } from "../domain/EventBus.port.js";
|
||||
|
||||
export type RMQConnectionParams = {
|
||||
username: string,
|
||||
password: string,
|
||||
vhost: string,
|
||||
hostname: string,
|
||||
port: number,
|
||||
secure: boolean
|
||||
}
|
||||
|
||||
export class RabbitMQEventBus implements EventBus {
|
||||
private buildStructure?: (chan: Channel) => Promise<void>
|
||||
private maxRetry: number = 0
|
||||
|
||||
constructor(args: {
|
||||
connectionParams: RMQConnectionParams,
|
||||
buildStructure?: (chan: Channel) => Promise<void>,
|
||||
maxRetry?: number
|
||||
}) {
|
||||
this.connectionOptions = args.connectionParams
|
||||
if (args.buildStructure != undefined) this.buildStructure = args.buildStructure
|
||||
if (args.maxRetry != undefined) this.maxRetry = args.maxRetry
|
||||
}
|
||||
|
||||
async consume(queue: string, callback: (msg: ConsumeMessage | null) => void) {
|
||||
// Comproaciones antes de escuchar
|
||||
if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado");
|
||||
await this.channel.consume(queue, callback)
|
||||
}
|
||||
|
||||
async ack(msg: ConsumeMessage) {
|
||||
if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado");
|
||||
return this.channel.ack(msg)
|
||||
}
|
||||
|
||||
/**
|
||||
* TODO:
|
||||
* - Esta implementacion del nack debe estar en objenious
|
||||
*/
|
||||
async nack(msg: ConsumeMessage, requeue?: boolean) {
|
||||
if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado");
|
||||
|
||||
console.log("NACK: ", msg.properties.headers)
|
||||
|
||||
const headers = msg.properties.headers || {}
|
||||
const numberRetry = headers['x-retry-count'] || 0
|
||||
const routingKey = msg.fields.routingKey
|
||||
|
||||
if (numberRetry < this.maxRetry) {
|
||||
console.log("Delaying")
|
||||
await this.channel.publish("sim.ex.objenious.delayed", routingKey, msg.content, {
|
||||
headers: {
|
||||
...headers,
|
||||
'x-retry-count': numberRetry + 1
|
||||
}
|
||||
})
|
||||
} else {
|
||||
console.log("DeadLetter")
|
||||
await this.channel.publish("sim.ex.objenious.dlx", routingKey, msg.content, {
|
||||
headers: {
|
||||
...headers
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Hace falta?
|
||||
this.channel.ack(msg)
|
||||
//return this.channel.nack(msg, false, requeue)
|
||||
}
|
||||
|
||||
connection?: AmqpConnectionManager
|
||||
channel?: ChannelWrapper
|
||||
connected: Boolean = false
|
||||
|
||||
private connectionOptions: RMQConnectionParams
|
||||
|
||||
public async connect() {
|
||||
|
||||
try {
|
||||
this.connection = await this.createConnection();
|
||||
if (this.connection == undefined) throw new Error("[RMQ] Error crecreando la conexion")
|
||||
this.channel = await this.createChannel()
|
||||
this.channel.on("close", () => {
|
||||
console.log("[RMQ] Canal desconectado")
|
||||
setTimeout(async () => {
|
||||
this.connect().then(e => {
|
||||
console.log("[RMQ] Canal reconectado")
|
||||
})
|
||||
}, 1000)
|
||||
})
|
||||
} catch (e) {
|
||||
console.error("[RMQ] Error estableciendo la conexion con el servidor", e)
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
publish(events: DomainEvent[]): Promise<void> {
|
||||
|
||||
return new Promise((res, rej) => {
|
||||
try {
|
||||
for (const event of events) {
|
||||
const exchange = "sim.exchange"
|
||||
const routingKey = event.key
|
||||
const content = Buffer.from(JSON.stringify(event))
|
||||
this.channel?.publish(exchange, routingKey, content, {}, (err, ok) => {
|
||||
if (err == undefined) {
|
||||
console.log("Evento publicado ", event)
|
||||
} else {
|
||||
console.error("Error publicando", event)
|
||||
}
|
||||
})
|
||||
}
|
||||
return res()
|
||||
} catch (err) {
|
||||
return rej(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
addSubscribers(subscribers: Array<DomainEventSubscriber<DomainEvent>>): void {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
|
||||
|
||||
protected async createConnection() {
|
||||
const { hostname, port, secure } = { ...this.connectionOptions }
|
||||
const { username, password } = { ...this.connectionOptions };
|
||||
const protocol = secure ? 'amqps' : 'amqp';
|
||||
const vhost = this.connectionOptions.vhost
|
||||
|
||||
const connection = connect({
|
||||
protocol,
|
||||
hostname,
|
||||
port,
|
||||
username,
|
||||
password,
|
||||
vhost
|
||||
});
|
||||
|
||||
connection.on('error', async (error: unknown) => {
|
||||
console.error(`[RMQ] Rabbitmq connection error :: ${error}`);
|
||||
console.log(`[RMQ] Reintentando conexion`)
|
||||
});
|
||||
|
||||
connection.on("disconnect", (err: unknown) => {
|
||||
console.error(`[RMQ] Servidor Rabbitmq desconectado, reintentando ... ::`, err)
|
||||
})
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
protected async createChannel(): Promise<ChannelWrapper> {
|
||||
const delay = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
|
||||
|
||||
if (this.connection == undefined) throw new Error("[RMQ] Intentando crear un canal sin una conexion")
|
||||
const channel = this.connection.createChannel({
|
||||
setup: async (channel: Channel) => {
|
||||
// Exchanges comunes a todos
|
||||
channel.assertExchange("sim.exchange", "topic", { durable: true })
|
||||
channel.assertExchange("sim.dlx", "topic", { durable: true })
|
||||
|
||||
// Estructuras propias de cada servicio
|
||||
if (this.buildStructure != undefined) {
|
||||
let topoligaSuccess = false
|
||||
while (!topoligaSuccess) {
|
||||
try {
|
||||
await this.buildStructure(channel)
|
||||
topoligaSuccess = true
|
||||
} catch (e) {
|
||||
console.log("[RMQ] Error Creando la topologia de rabbitmq", e)
|
||||
topoligaSuccess = false
|
||||
await delay(5 * 1000)
|
||||
}
|
||||
|
||||
}
|
||||
} else {
|
||||
console.warn("[i] Se ha creado un canal sin garantizar que exista la/s cola/s que se van a usar")
|
||||
}
|
||||
},
|
||||
})
|
||||
|
||||
//await channel.prefetch(PREFETCH_LIMIT)
|
||||
|
||||
if (channel == undefined) throw new Error("[RMQ] Error crecreando el canal")
|
||||
|
||||
channel.on('error', (error: unknown) => {
|
||||
console.error(`[RMQ] Rabbitmq channel error :: ${error}`);
|
||||
Promise.reject(error);
|
||||
});
|
||||
|
||||
return channel as ChannelWrapper;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user