diff --git a/packages/sim-consumidor-nos/.env b/packages/sim-consumidor-nos/.env index 6979d6b..597b9fa 100644 --- a/packages/sim-consumidor-nos/.env +++ b/packages/sim-consumidor-nos/.env @@ -1,5 +1,3 @@ -PORT=3000 -RABBITMQ_USER=guest -RABBITMQ_PASSWORD=guest +NOS_BASE_URL=localhost ENVIORMENT=development diff --git a/packages/sim-consumidor-nos/config/env/index.ts b/packages/sim-consumidor-nos/config/env/index.ts index 021873e..4aecfbb 100644 --- a/packages/sim-consumidor-nos/config/env/index.ts +++ b/packages/sim-consumidor-nos/config/env/index.ts @@ -1,5 +1,16 @@ import { loadEnvFile } from "node:process"; -loadEnvFile("../../.env") +import path from "node:path"; + +try { + loadEnvFile(path.join("./.env")) // base +} catch (e) { + console.error("Error cargando el .env desde ./.env") +} +try { + loadEnvFile(path.join("../../.env")) // Global +} catch (e) { + console.error("Error cargando el .env desde ../../.env") +} export const env = { ENVIRONMENT: process.env.ENVIORMENT, @@ -18,5 +29,8 @@ export const env = { RABBITMQ_SECURE: process.env.RABBITMQ_SECURE, RABBITMQ_RETRY_INTERVAL: process.env.RABBITMQ_INTERVAL, RABBITMQ_VHOST: String(process.env.RABBITMQ_VHOST), + + // ESPECIFICO NOS + NOS_BASE_URL: String(process.env.NOS_BASE_URL) }; diff --git a/packages/sim-consumidor-nos/config/eventBus.config.ts b/packages/sim-consumidor-nos/config/eventBus.config.ts new file mode 100644 index 0000000..5c84601 --- /dev/null +++ b/packages/sim-consumidor-nos/config/eventBus.config.ts @@ -0,0 +1,69 @@ +import { RabbitMQEventBus, RMQConnectionParams } from "sim-shared/infrastructure/RabbitMQEventBus.js" +import { Channel } from "amqp-connection-manager" +import { env } from "./env/index.js" + +const rmqUser = env.RABBITMQ_USER +const rmqPass = env.RABBITMQ_PASSWORD +const rmqHost = env.RABBITMQ_HOST +const rmqPort = Number(env.RABBITMQ_PORT) +const rmqSecure = false +const rmqVhost = env.RABBITMQ_VHOST + +export const rmqConnOptions = { + username: rmqUser, + password: rmqPass, + vhost: rmqVhost, + hostname: rmqHost, + port: rmqPort, + secure: rmqSecure, +} + +export const rabbitmqEventBus = new RabbitMQEventBus({ + connectionParams: rmqConnOptions, + buildStructure: buildQueues, + maxRetry: 5 +}) + +async function buildQueues(channel: Channel) { + const QUEUES = { + NOS: "sim.nos", + NOSDLX: "sim.nos.dlx", + NOSDEL: "sim.nos.delayed", + } + + const EXCHANGES = { + MAIN: "sim.exchange", + DLX: "sim.ex.nos.dlx", + DEL: "sim.ex.nos.delayed" + } + + const DELAY = 10 * 1000 + const BASE_NOS_KEY = "sim.nos.#" + + await channel.assertExchange(EXCHANGES.DEL, "topic") + await channel.assertExchange(EXCHANGES.DLX, "topic") + await channel.assertExchange(EXCHANGES.MAIN, "topic") + + await channel.assertQueue(QUEUES.NOS) + await channel.assertQueue(QUEUES.NOSDLX) + await channel.assertQueue(QUEUES.NOSDEL, { + durable: true, + arguments: { + 'x-message-ttl': DELAY, + 'x-dead-letter-exchange': EXCHANGES.MAIN, + } + }) + + // Cola dead-letter + await channel.bindQueue(QUEUES.NOSDLX, EXCHANGES.DLX, "sim.nos.#") + // Cola delay + await channel.bindQueue(QUEUES.NOSDEL, EXCHANGES.DEL, BASE_NOS_KEY) + // Cola nos -> main exchange + await channel.bindQueue(QUEUES.NOS, EXCHANGES.MAIN, BASE_NOS_KEY) + +} + +export async function startRMQClient() { + await rabbitmqEventBus.connect() + return rabbitmqEventBus +} diff --git a/packages/sim-consumidor-nos/config/eventBusConfig.ts b/packages/sim-consumidor-nos/config/eventBusConfig.ts deleted file mode 100644 index 9a0e1f3..0000000 --- a/packages/sim-consumidor-nos/config/eventBusConfig.ts +++ /dev/null @@ -1,39 +0,0 @@ -import { RabbitMQEventBus, RMQConnectionParams } from "sim-shared/infrastructure/RabbitMQEventBus.js" -import { env } from "./env" - -const rmqUser = env.RABBITMQ_USER -const rmqPass = env.RABBITMQ_PASSWORD -const rmqHost = env.RABBITMQ_HOST -const rmqPort = Number(env.RABBITMQ_PORT) -const rmqSecure = false -const rmqVhost = env.RABBITMQ_VHOST - -export const rmqConnOptions = { - username: rmqUser, - password: rmqPass, - vhost: rmqVhost, - hostname: rmqHost, - port: rmqPort, - secure: rmqSecure, -} - -export const rabbitmqEventBus = new RabbitMQEventBus({ - connectionParams: rmqConnOptions -}) - -export async function startRMQClient() { - await rabbitmqEventBus.connect().catch(async e => { - console.error("Error en la conexion RMQ") - await rabbitmqEventBus.connect() - }) - - // Bindings especificos, deberia meterlos en la clase - try { - await rabbitmqEventBus.channel?.assertQueue("sim.nos") - } catch { - console.log("[i] Cola de sims de nos creada") - await rabbitmqEventBus.channel?.bindQueue("sim.nos", "sim.exchange", "sim.nos.*") - } - - return rabbitmqEventBus -} diff --git a/packages/sim-consumidor-nos/index.ts b/packages/sim-consumidor-nos/index.ts index 36cce2a..3214b9d 100644 --- a/packages/sim-consumidor-nos/index.ts +++ b/packages/sim-consumidor-nos/index.ts @@ -1,5 +1,5 @@ -import { startRMQClient } from "#config/eventBusConfig" +import { startRMQClient } from "#config/eventBus.config.js" import { SimNosController } from "./aplication/SimNOS.controller.js" async function startWorker() { diff --git a/packages/sim-consumidor-nos/package.json b/packages/sim-consumidor-nos/package.json index ee4549f..40057f5 100644 --- a/packages/sim-consumidor-nos/package.json +++ b/packages/sim-consumidor-nos/package.json @@ -7,7 +7,8 @@ "test": "echo \"Error: no test specified\" && exit 1", "build": "yarn tsc --project tsconfig.json && yarn tsc-alias && cp package.json ../../dist/packages/sim-consumidor-nos/", "esbuild": "esbuild index.ts --platform=node", - "start": "node ../../dist/packages/sim-consumidor-nos/index.js" + "start": "node ../../dist/packages/sim-consumidor-nos/index.js", + "dev": "tsx watch index.ts" }, "author": "", "license": "ISC", diff --git a/packages/sim-consumidor-objenious/config/eventBus.config.ts b/packages/sim-consumidor-objenious/config/eventBus.config.ts index 4c5db47..81f583d 100644 --- a/packages/sim-consumidor-objenious/config/eventBus.config.ts +++ b/packages/sim-consumidor-objenious/config/eventBus.config.ts @@ -27,8 +27,8 @@ export const rabbitmqEventBus = new RabbitMQEventBus({ async function buildQueues(channel: Channel) { const QUEUES = { OBJ: "sim.objenious", - DLX: "sim.objenious.dlx", - DEL: "sim.objenious.delayed" + OBJDLX: "sim.objenious.dlx", + OBJDEL: "sim.objenious.delayed", } const EXCHANGES = { @@ -45,8 +45,8 @@ async function buildQueues(channel: Channel) { await channel.assertExchange(EXCHANGES.MAIN, "topic") await channel.assertQueue(QUEUES.OBJ) - await channel.assertQueue(QUEUES.DLX) - await channel.assertQueue(QUEUES.DEL, { + await channel.assertQueue(QUEUES.OBJDLX) + await channel.assertQueue(QUEUES.OBJDEL, { durable: true, arguments: { 'x-message-ttl': DELAY,