diff --git a/deployment/rabbit/definitions.json b/deployment/rabbit/definitions.json index fd8f4ab..6e66b35 100644 --- a/deployment/rabbit/definitions.json +++ b/deployment/rabbit/definitions.json @@ -84,7 +84,7 @@ "vhost": "sim-vhost", "destination": "sim.logs", "destination_type": "queue", - "routing_key": "sim.*", + "routing_key": "sim.#", "arguments": {} } ] diff --git a/docs/sim-api/Activate.bru b/docs/sim-api/Activate.bru index 5cefd5c..c2c8f8b 100644 --- a/docs/sim-api/Activate.bru +++ b/docs/sim-api/Activate.bru @@ -15,8 +15,7 @@ params:query { } body:form-urlencoded { - iccid: 1234 - : + iccid: 12339912344 } settings { diff --git a/package.json b/package.json index c15c599..5d21bfb 100644 --- a/package.json +++ b/package.json @@ -17,6 +17,7 @@ }, "dependencies": { "@tsconfig/node22": "^22.0.5", + "amqp-connection-manager": "^5.0.0", "axios": "^1.13.3", "cors": "^2.8.5", "dotenv": "^17.2.3", diff --git a/packages/shared/infrastructure/RabbitMQEventBus.ts b/packages/shared/infrastructure/RabbitMQEventBus.ts index cb83bad..7d24999 100644 --- a/packages/shared/infrastructure/RabbitMQEventBus.ts +++ b/packages/shared/infrastructure/RabbitMQEventBus.ts @@ -1,4 +1,6 @@ import { type ChannelModel, type ConfirmChannel, ConsumeMessage, connect as amqConnect } from "amqplib"; +import { connect, AmqpConnectionManager, ChannelWrapper, Channel } from "amqp-connection-manager" + import { DomainEvent, DomainEventSubscriber } from "../domain/DomainEvent"; import { EventBus } from "../domain/EventBus.port"; @@ -11,23 +13,25 @@ export type RMQConnectionParams = { secure: boolean } +const RETRY_DELAY = 1000 const PREFETCH_LIMIT = 1 export class RabbitMQEventBus implements EventBus { + private buildStructure?: (chan: Channel) => void + constructor(args: { - connectionParams: RMQConnectionParams + connectionParams: RMQConnectionParams, + buildStructure?: (chan: Channel) => void }) { this.connectionOptions = args.connectionParams - this.checkStructure(); + if (args.buildStructure != undefined) this.buildStructure = args.buildStructure } async consume(queue: string, callback: (msg: ConsumeMessage | null) => void) { // Comproaciones antes de escuchar if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado"); - this.checkStructure() // El binding (cola -> [routingkey] -> exchange) lo hago por configuracion. Meter colas a demanda? - - await this.channel.prefetch(1) + //await this.channel.prefetch(1) await this.channel.consume(queue, callback) } @@ -42,26 +46,45 @@ export class RabbitMQEventBus implements EventBus { return this.channel.nack(msg) } - connection?: ChannelModel - channel?: ConfirmChannel + connection?: AmqpConnectionManager + channel?: ChannelWrapper connected: Boolean = false private connectionOptions: RMQConnectionParams public async connect() { - this.connection = await this.createConnection(); - if (this.connection == undefined) throw new Error("[RMQ] Error crecreando la conexion") - this.channel = await this.createConfirmChannel() + + try { + this.connection = await this.createConnection(); + if (this.connection == undefined) throw new Error("[RMQ] Error crecreando la conexion") + this.channel = await this.createConfirmChannel() + this.channel.on("close", () => { + console.log("[RMQ] Canal desconectado") + setTimeout(async () => { + this.connect().then(e => { + console.log("[RMQ] Canal reconectado") + }) + }, 1000) + }) + } catch (e) { + console.error("[RMQ] Error estableciendo la conexion con el servidor", e) + } + + + } publish(events: DomainEvent[]): Promise { + return new Promise((res, rej) => { try { for (const event of events) { const exchange = "sim.exchange" const routingKey = event.key const content = Buffer.from(JSON.stringify(event)) - this.channel?.publish(exchange, routingKey, content) + this.channel?.publish(exchange, routingKey, content, {}, (err, ok) => { + console.log("Confirmacion", err, ok) + }) } return res() } catch (err) { @@ -74,17 +97,6 @@ export class RabbitMQEventBus implements EventBus { throw new Error("Method not implemented."); } - /** - * Verificacion que la estructura definida en el JSON corresponde con - * la esperada - * TODO: Faltan las colas fijas según las operaciones - */ - private checkStructure() { - this.channel?.assertQueue("sim.activations") - this.channel?.assertQueue("sim.cancelations") - this.channel?.assertQueue("sim.logs") - this.channel?.assertExchange("sim.exchange", "topic") - } protected async createConnection() { const { hostname, port, secure } = { ...this.connectionOptions } @@ -92,7 +104,7 @@ export class RabbitMQEventBus implements EventBus { const protocol = secure ? 'amqps' : 'amqp'; const vhost = this.connectionOptions.vhost - const connection = await amqConnect({ + const connection = connect({ protocol, hostname, port, @@ -101,17 +113,28 @@ export class RabbitMQEventBus implements EventBus { vhost }); - connection.on('error', (error: unknown) => { + connection.on('error', async (error: unknown) => { console.error(`[RMQ] Rabbitmq connection error :: ${error}`); - Promise.reject(error); + console.log(`[RMQ] Reintentando conexion`) }); + connection.on("disconnect", (err) => { + console.error(`[RMQ] Servidor Rabbitmq desconectado, reintentando ...`) + }) + return connection; } protected async createConfirmChannel() { - const channel = await this.connection?.createConfirmChannel() - await channel?.prefetch(PREFETCH_LIMIT) + if (this.connection == undefined) throw new Error("Intentando crear un canal sin una conexion") + const channel = this.connection?.createChannel({ + json: true, + setup: (channel: Channel) => { + if (this.buildStructure != undefined) this.buildStructure(channel) + }, + }) + + //await channel.prefetch(PREFETCH_LIMIT) if (channel == undefined) throw new Error("[RMQ] Error crecreando el canal") diff --git a/packages/sim-consumidor-nos/config/eventBusConfig.ts b/packages/sim-consumidor-nos/config/eventBusConfig.ts index c6eb638..e62c82a 100644 --- a/packages/sim-consumidor-nos/config/eventBusConfig.ts +++ b/packages/sim-consumidor-nos/config/eventBusConfig.ts @@ -22,7 +22,10 @@ export const rabbitmqEventBus = new RabbitMQEventBus({ }) export async function startRMQClient() { - await rabbitmqEventBus.connect() + await rabbitmqEventBus.connect().catch(async e => { + console.error("Error en la conexion RMQ") + await rabbitmqEventBus.connect() + }) // Bindings especificos, deberia meterlos en la clase try { diff --git a/packages/sim-consumidor-objenious/config/eventBusConfig.ts b/packages/sim-consumidor-objenious/config/eventBusConfig.ts index b524d7e..24e487d 100644 --- a/packages/sim-consumidor-objenious/config/eventBusConfig.ts +++ b/packages/sim-consumidor-objenious/config/eventBusConfig.ts @@ -1,4 +1,5 @@ import { RabbitMQEventBus, RMQConnectionParams } from "#shared/infrastructure/RabbitMQEventBus" +import { Channel } from "amqp-connection-manager" import { env } from "./env" const rmqUser = env.RABBITMQ_USER @@ -18,14 +19,11 @@ export const rmqConnOptions = { } export const rabbitmqEventBus = new RabbitMQEventBus({ - connectionParams: rmqConnOptions + connectionParams: rmqConnOptions, + buildStructure: buildQueues }) -export async function startRMQClient() { - await rabbitmqEventBus.connect() - const channel = rabbitmqEventBus.channel - // Bindings especificos, deberia meterlos en la clase - +function buildQueues(channel: Channel) { channel?.assertQueue("sim.objenious") .then(e => { console.log("[o] Creada la cola " + e.queue) @@ -57,6 +55,9 @@ export async function startRMQClient() { .catch(e => { console.error(e) }) +} +export async function startRMQClient() { + await rabbitmqEventBus.connect() return rabbitmqEventBus } diff --git a/packages/sim-entrada-eventos/aplication/Sim.controller.ts b/packages/sim-entrada-eventos/aplication/Sim.controller.ts index c4e9eb4..4ddd631 100644 --- a/packages/sim-entrada-eventos/aplication/Sim.controller.ts +++ b/packages/sim-entrada-eventos/aplication/Sim.controller.ts @@ -1,5 +1,6 @@ import { Request, Response } from "express" import { SimUsecases } from "aplication/Sim.usecases" +import { error } from "node:console" // Partiendo del caracter 3 2 de pais + 2 de compañia // Metiendolo a la BDD podria ser mas dinamico pero perderia @@ -31,8 +32,22 @@ export class SimController { const { iccid } = req.body const compañia = this.compañiaFromIccid(iccid) + if (compañia == undefined) { + res.status(500).json({ + errors: { + msg: "El iccid no pertenece a una compañia conocida" + } + }) + return; + } + try { await this.simUseCases.activation({ iccid, compañia }) + + res.status(200).json({ + iccid: iccid, + operation: "activation" + }).send() } catch (err) { console.error("Error activando la sim ", req.body) res.status(500).json({ @@ -41,11 +56,6 @@ export class SimController { } }).send() } - - res.status(200).json({ - iccid: iccid, - operation: "activation" - }).send() } async cancelation(req: Request, res: Response) { diff --git a/yarn.lock b/yarn.lock index e322957..27d2836 100644 --- a/yarn.lock +++ b/yarn.lock @@ -719,6 +719,17 @@ __metadata: languageName: node linkType: hard +"amqp-connection-manager@npm:^5.0.0": + version: 5.0.0 + resolution: "amqp-connection-manager@npm:5.0.0" + dependencies: + promise-breaker: "npm:^6.0.0" + peerDependencies: + amqplib: "*" + checksum: 10/bf6c346537f71f1cdf03f4d1832c2945b3f80274e8fca41fa96db35ecc3ea0162e1fbf4cba76326fb3784b2362c8c6dd14a74e3ca30e1fc6dcd3ba6dcc172828 + languageName: node + linkType: hard + "amqplib@npm:^0.10.9": version: 0.10.9 resolution: "amqplib@npm:0.10.9" @@ -2001,6 +2012,13 @@ __metadata: languageName: node linkType: hard +"promise-breaker@npm:^6.0.0": + version: 6.0.0 + resolution: "promise-breaker@npm:6.0.0" + checksum: 10/6f7ad5e55d3f434dc1e02907c3294dc4a44f9962d9af9de186095c75c8f76d11feeb927e96ec9e177fcc9209690defb5c64eeac4767e7c3dd4f120e9d14fb0c8 + languageName: node + linkType: hard + "promise-retry@npm:^2.0.1": version: 2.0.1 resolution: "promise-retry@npm:2.0.1" @@ -2394,6 +2412,7 @@ __metadata: "@types/express": "npm:^5.0.6" "@types/node": "npm:^25.0.3" "@types/supertest": "npm:^6.0.3" + amqp-connection-manager: "npm:^5.0.0" axios: "npm:^1.13.3" concurrently: "npm:^9.2.1" cors: "npm:^2.8.5"