From 237eda41749e56df16cc908d6dda9ffdc22faf42 Mon Sep 17 00:00:00 2001 From: Alvar San Martin Date: Fri, 30 Jan 2026 10:42:48 +0100 Subject: [PATCH] Cola con delay -> cola dead letter --- packages/shared/domain/EventBus.port.ts | 4 +- packages/shared/infrastructure/HTTPClient.ts | 1 + .../shared/infrastructure/RabbitMQEventBus.ts | 52 +++++++++++--- .../aplication/Sim.controller.ts | 20 +++++- .../aplication/Sim.usecases.ts | 24 ++++++- .../config/eventBus.config.ts | 67 ++++++++++--------- .../aplication/Sim.controller.ts | 13 ++-- .../aplication/Sim.usecases.ts | 11 +++ 8 files changed, 142 insertions(+), 50 deletions(-) diff --git a/packages/shared/domain/EventBus.port.ts b/packages/shared/domain/EventBus.port.ts index cdf754c..1f06d7e 100644 --- a/packages/shared/domain/EventBus.port.ts +++ b/packages/shared/domain/EventBus.port.ts @@ -7,6 +7,6 @@ export interface EventBus { addSubscribers(subscribers: Array>): void; consume(queue: string, callback: (msg: ConsumeMessage | null) => void): void; - ack(msg: ConsumeMessage): void; - nack(msg: ConsumeMessage): void; + ack(msg: ConsumeMessage): Promise; + nack(msg: ConsumeMessage): Promise; } diff --git a/packages/shared/infrastructure/HTTPClient.ts b/packages/shared/infrastructure/HTTPClient.ts index 0b6dbbd..cf22310 100644 --- a/packages/shared/infrastructure/HTTPClient.ts +++ b/packages/shared/infrastructure/HTTPClient.ts @@ -53,6 +53,7 @@ export class HttpClient { if (error.response?.status == 401) { this.jwtManager.getAccessToken() } + return Promise.reject(error) } ) diff --git a/packages/shared/infrastructure/RabbitMQEventBus.ts b/packages/shared/infrastructure/RabbitMQEventBus.ts index 86de142..fde9bab 100644 --- a/packages/shared/infrastructure/RabbitMQEventBus.ts +++ b/packages/shared/infrastructure/RabbitMQEventBus.ts @@ -14,14 +14,17 @@ export type RMQConnectionParams = { } export class RabbitMQEventBus implements EventBus { - private buildStructure?: (chan: Channel) => void + private buildStructure?: (chan: Channel) => Promise + private maxRetry: number = 0 constructor(args: { connectionParams: RMQConnectionParams, - buildStructure?: (chan: Channel) => void + buildStructure?: (chan: Channel) => Promise, + maxRetry?: number }) { this.connectionOptions = args.connectionParams if (args.buildStructure != undefined) this.buildStructure = args.buildStructure + if (args.maxRetry != undefined) this.maxRetry = args.maxRetry } async consume(queue: string, callback: (msg: ConsumeMessage | null) => void) { @@ -30,14 +33,34 @@ export class RabbitMQEventBus implements EventBus { await this.channel.consume(queue, callback) } - ack(msg: ConsumeMessage) { + async ack(msg: ConsumeMessage) { if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado"); return this.channel.ack(msg) } - nack(msg: ConsumeMessage) { + async nack(msg: ConsumeMessage, requeue?: boolean) { if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado"); - return this.channel.nack(msg) + + const headers = msg.properties.headers || {} + const numberRetry = headers['x-retry-count'] || 0 + const routingKey = msg.fields.routingKey + + if (numberRetry < this.maxRetry) { + await this.channel.publish("sim.ex.objenious.delayed", routingKey, msg.content, { + headers: { + ...headers, + 'x-retry-count': numberRetry + 1 + } + }) + } else { + await this.channel.publish("sim.ex.objenious.dlx", routingKey, msg.content, { + headers: { + ...headers + } + }) + } + + //return this.channel.nack(msg, false, requeue) } connection?: AmqpConnectionManager @@ -113,23 +136,36 @@ export class RabbitMQEventBus implements EventBus { }); connection.on("disconnect", (err: unknown) => { - console.error(`[RMQ] Servidor Rabbitmq desconectado, reintentando ... :: ${err}`) + console.error(`[RMQ] Servidor Rabbitmq desconectado, reintentando ... ::`, err) }) return connection; } protected async createChannel() { + const delay = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); + if (this.connection == undefined) throw new Error("[RMQ] Intentando crear un canal sin una conexion") const channel = this.connection.createChannel({ - setup: (channel: Channel) => { + setup: async (channel: Channel) => { // Exchanges comunes a todos channel.assertExchange("sim.exchange", "topic", { durable: true }) channel.assertExchange("sim.dlx", "topic", { durable: true }) // Estructuras propias de cada servicio if (this.buildStructure != undefined) { - this.buildStructure(channel) + let topoligaSuccess = false + while (!topoligaSuccess) { + try { + await this.buildStructure(channel) + topoligaSuccess = true + } catch (e) { + console.log("[RMQ] Error Creando la topologia de rabbitmq", e) + topoligaSuccess = false + await delay(5 * 1000) + } + + } } else { console.warn("[i] Se ha creado un canal sin garantizar que exista la/s cola/s que se van a usar") } diff --git a/packages/sim-consumidor-objenious/aplication/Sim.controller.ts b/packages/sim-consumidor-objenious/aplication/Sim.controller.ts index 058faef..05c607b 100644 --- a/packages/sim-consumidor-objenious/aplication/Sim.controller.ts +++ b/packages/sim-consumidor-objenious/aplication/Sim.controller.ts @@ -2,6 +2,8 @@ import { EventBus } from "#shared/domain/EventBus.port"; import { ConsumeMessage } from "amqplib"; import { SimUseCases } from "./Sim.usecases"; import { SimEvents } from "#shared/domain/SimEvents"; +import { constants } from "node:buffer"; +import { constrainedMemory } from "node:process"; /** * La clase usa generadores de funciones para mantener el contexto @@ -47,23 +49,35 @@ export class SimController { } const msgData = this.decodeMsg(msg) as SimEvents.activation - if (msgData == undefined || msgData.payload == undefined) Promise.reject("Mensaje invalido") + if (msgData == undefined || msgData.payload == undefined) throw new Error("Mensaje invalido") console.log("Mensaje procesado", msgData?.toString()) // TODO: Añadir un validador del mensaje const iccid = msgData.payload.iccid + const headers = msg.properties.headers + console.log("HEADERS: ", headers) + try { // Caso de uso de activaciones - await this.useCases.activate({ + const result = await this.useCases.activate({ dueDate: this.genDueDate(2 * 60).toISOString(), identifier: { identifierType: "ICCID", identifiers: [iccid] } })() - this.eventBus.ack(msg) + + console.log("Resultado de la peticion", result) + + if (result.error == undefined) { + console.log("Ack", msgData) + await this.eventBus.ack(msg) + } else { + console.log("Nack", msgData) + await this.eventBus.nack(msg) + } } catch (e) { this.eventBus.nack(msg) } diff --git a/packages/sim-consumidor-objenious/aplication/Sim.usecases.ts b/packages/sim-consumidor-objenious/aplication/Sim.usecases.ts index 93c9d61..368e6ef 100644 --- a/packages/sim-consumidor-objenious/aplication/Sim.usecases.ts +++ b/packages/sim-consumidor-objenious/aplication/Sim.usecases.ts @@ -1,5 +1,7 @@ import { ActivationData } from "#domain/DTOs/objeniousapi" import { HttpClient } from "#shared/infrastructure/HTTPClient" +import { AxiosError } from "axios" +import { error } from "node:console" // TODO: Pasar a un archivo de DTOs @@ -21,11 +23,27 @@ export class SimUseCases { ...activationData }) + try { - const e = await req - console.log("Activacion con exito", e.data.response) + const response = await req + console.log("[!] El status de la respuesta es", response.status) + + if (response.status == 200) { + console.log("Activacion con exito", response.data.response) + return { + error: undefined, + ok: true + } + } else { + return { + error: response.status + } + } } catch (error) { - console.error("Error activando ", error) + console.error("[Sim.usecase] Error activando ", (error as AxiosError).response?.status) + return { + error: "Error general de la petiacion" + } } } } diff --git a/packages/sim-consumidor-objenious/config/eventBus.config.ts b/packages/sim-consumidor-objenious/config/eventBus.config.ts index 24e487d..143b7db 100644 --- a/packages/sim-consumidor-objenious/config/eventBus.config.ts +++ b/packages/sim-consumidor-objenious/config/eventBus.config.ts @@ -20,43 +20,50 @@ export const rmqConnOptions = { export const rabbitmqEventBus = new RabbitMQEventBus({ connectionParams: rmqConnOptions, - buildStructure: buildQueues + buildStructure: buildQueues, + maxRetry: 5 }) -function buildQueues(channel: Channel) { - channel?.assertQueue("sim.objenious") - .then(e => { - console.log("[o] Creada la cola " + e.queue) - channel?.bindQueue("sim.objenious", "sim.exchange", "sim.objenious.*") - .then(e => { - console.log("[o] Bindeada la cola sim.objenious a sim.exchange ") - }) - .catch(e => { - console.error(e) - }) - }) - .catch(e => { - console.error(e) - }) +async function buildQueues(channel: Channel) { + const QUEUES = { + OBJ: "sim.objenious", + DLX: "sim.objenious.dlx", + DEL: "sim.objenious.delayed" + } + const EXCHANGES = { + MAIN: "sim.exchange", + DLX: "sim.ex.objenious.dlx", + DEL: "sim.ex.objenious.delayed" + } - channel?.assertQueue("sim.objenious.dlx") - .then(e => { - console.log("[o] Creada la cola " + e.queue) + const DELAY = 10 * 1000 + const BASE_OBENIOUS_KEY = "sim.objenious.#" + + await channel.assertExchange(EXCHANGES.DEL, "topic") + await channel.assertExchange(EXCHANGES.DLX, "topic") + await channel.assertExchange(EXCHANGES.MAIN, "topic") + + await channel.assertQueue(QUEUES.OBJ) + await channel.assertQueue(QUEUES.DLX) + await channel.assertQueue(QUEUES.DEL, { + durable: true, + arguments: { + 'x-message-ttl': DELAY, + 'x-dead-letter-exchange': EXCHANGES.MAIN, + } + }) + + // Cola dead-letter + await channel.bindQueue(QUEUES.DLX, EXCHANGES.DLX, "sim.objenious.#") + // Cola delay + await channel.bindQueue(QUEUES.DEL, EXCHANGES.DEL, BASE_OBENIOUS_KEY) + // Cola objenious -> main exchange + await channel.bindQueue(QUEUES.OBJ, EXCHANGES.MAIN, BASE_OBENIOUS_KEY) - channel?.bindQueue("sim.objenious.dlx", "sim.dlx", "sim.objenious.*") - .then(e => { - console.log("[o] Bindeada la cola sim.objenious.dlx a sim.dlx") - }) - .catch(e => { - console.error(e) - }) - }) - .catch(e => { - console.error(e) - }) } + export async function startRMQClient() { await rabbitmqEventBus.connect() return rabbitmqEventBus diff --git a/packages/sim-entrada-eventos/aplication/Sim.controller.ts b/packages/sim-entrada-eventos/aplication/Sim.controller.ts index e0475ca..4ec1a07 100644 --- a/packages/sim-entrada-eventos/aplication/Sim.controller.ts +++ b/packages/sim-entrada-eventos/aplication/Sim.controller.ts @@ -41,6 +41,7 @@ export class SimController { return; } + try { await this.simUseCases.activation({ iccid, compañia }) @@ -65,9 +66,10 @@ export class SimController { if (valido == false) return; // Si no es valido ya se ha enviado el error const { iccid } = req.body + const compañia = this.compañiaFromIccid(iccid) try { - await this.simUseCases.cancelation({ iccid }) + await this.simUseCases.cancelation({ iccid, compañia }) res.status(200).json({ iccid: iccid, operation: "cancelation" @@ -90,9 +92,10 @@ export class SimController { if (valido == false) return; // Si no es valido ya se ha enviado el error const { iccid } = req.body + const compañia = this.compañiaFromIccid(iccid) try { - await this.simUseCases.cancelation({ iccid }) + await this.simUseCases.cancelation({ iccid, compañia }) res.status(200).json({ iccid: iccid, operation: "cancelation" @@ -114,9 +117,10 @@ export class SimController { if (valido == false) return; // Si no es valido ya se ha enviado el error const { iccid } = req.body + const compañia = this.compañiaFromIccid(iccid) try { - await this.simUseCases.cancelation({ iccid }) + await this.simUseCases.cancelation({ iccid, compañia }) res.status(200).json({ iccid: iccid, operation: "liberacion" @@ -139,9 +143,10 @@ export class SimController { if (valido == false) return; // Si no es valido ya se ha enviado el error const { iccid } = req.body + const compañia = this.compañiaFromIccid(iccid) try { - await this.simUseCases.cancelation({ iccid }) + await this.simUseCases.cancelation({ iccid, compañia }) res.status(200).json({ iccid: iccid, operation: "cancelation" diff --git a/packages/sim-entrada-eventos/aplication/Sim.usecases.ts b/packages/sim-entrada-eventos/aplication/Sim.usecases.ts index 51ac33f..c0895ab 100644 --- a/packages/sim-entrada-eventos/aplication/Sim.usecases.ts +++ b/packages/sim-entrada-eventos/aplication/Sim.usecases.ts @@ -46,6 +46,17 @@ export class SimUsecases { return this.eventBus.publish([activationEvent]) } + async cancelation(args: { iccid: string, compañia: string }) { + + const activationEvent = { + key: `sim.${args.compañia}.cancelation`, + payload: { + iccid: args.iccid + } + } + console.log("[d] Cancelation ", activationEvent) + return this.eventBus.publish([activationEvent]) + } async pause(args: { iccid: string }) { const cancelationEvent = {