diff --git a/README.md b/README.md index 3debc9e..f52c1c4 100644 --- a/README.md +++ b/README.md @@ -1 +1,5 @@ # sim-cola-eventos + +Monorepo de servicios / workers para centralizar los procesos de las SIM con sus subscripciones + +[[./imgs/diagrama-servicios-sim.png]] diff --git a/deployment/database/init.sql b/deployment/database/init.sql index 3c8299b..426715b 100644 --- a/deployment/database/init.sql +++ b/deployment/database/init.sql @@ -19,20 +19,6 @@ CREATE table if not exists sim_cards ( deleted_at TIMESTAMP ); -CREATE TABLE if not exists sim_operations ( - id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, - sim_id BIGINT, - operation_type TEXT NOT NULL, - happened_at TIMESTAMP, - - CONSTRAINT valid_operations CHECK ( - operation_type in ('free','preactivate','activate','pause','cancel') - ), - - CONSTRAINT fk_sim_id - FOREIGN KEY(sim_id) - REFERENCES sim_cards(id) -); CREATE TABLE if not exists sim_envio ( id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, @@ -94,6 +80,21 @@ CREATE TABLE sim_subscription ( FOREIGN KEY(subscription_type_id) REFERENCES sim_subscription_types(id) ); +CREATE TABLE if not exists sim_subscription_operations ( + id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + sim_id BIGINT, + operation_type TEXT NOT NULL, + happened_at TIMESTAMP, + + CONSTRAINT valid_operations CHECK ( + operation_type in ('free','preactivate','activate','pause','cancel') + ), + + CONSTRAINT fk_subscription_id + FOREIGN KEY(subscription_id) + REFERENCES sim_subscriptions(id) +); + -- Se supone que indica un cambio CREATE TABLE sim_subscription_historic ( id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, diff --git a/deployment/rabbit/definitions.json b/deployment/rabbit/definitions.json index ca05872..cdaca00 100644 --- a/deployment/rabbit/definitions.json +++ b/deployment/rabbit/definitions.json @@ -51,7 +51,21 @@ ], "queues": [ { - "name": "sim.queue", + "name": "sim.logs", + "vhost": "sim-vhost", + "durable": true, + "auto_delete": false, + "arguments": {} + }, + { + "name": "sim.activations", + "vhost": "sim-vhost", + "durable": true, + "auto_delete": false, + "arguments": {} + }, + { + "name": "sim.cancelations", "vhost": "sim-vhost", "durable": true, "auto_delete": false, @@ -62,7 +76,23 @@ { "source": "sim.exchange", "vhost": "sim-vhost", - "destination": "sim.queue", + "destination": "sim.activations", + "destination_type": "queue", + "routing_key": "sim.activation", + "arguments": {} + }, + { + "source": "sim.exchange", + "vhost": "sim-vhost", + "destination": "sim.cancelations", + "destination_type": "queue", + "routing_key": "sim.cancelation", + "arguments": {} + }, + { + "source": "sim.exchange", + "vhost": "sim-vhost", + "destination": "sim.logs", "destination_type": "queue", "routing_key": "sim.*", "arguments": {} diff --git a/imgs/diagrama-servicios-sim.png b/imgs/diagrama-servicios-sim.png new file mode 100644 index 0000000..bf3adbc Binary files /dev/null and b/imgs/diagrama-servicios-sim.png differ diff --git a/packages/shared/domain/EventBus.port.ts b/packages/shared/domain/EventBus.port.ts index ff8a257..4248a53 100644 --- a/packages/shared/domain/EventBus.port.ts +++ b/packages/shared/domain/EventBus.port.ts @@ -1,6 +1,11 @@ +import { ConsumeMessage } from "amqplib"; import { DomainEvent, DomainEventSubscriber } from "./DomainEvent"; export interface EventBus { publish(events: Array): Promise; + // Sacado de NEKI, posiblemente no haga falta addSubscribers(subscribers: Array>): void; + + consume(queue: string, callback: (msg: ConsumeMessage | null) => void): void; + ack(msg: ConsumeMessage): void; } diff --git a/packages/shared/infrastructure/RabbitMQConsumer.ts b/packages/shared/infrastructure/RabbitMQConsumer.ts new file mode 100644 index 0000000..b2066e1 --- /dev/null +++ b/packages/shared/infrastructure/RabbitMQConsumer.ts @@ -0,0 +1,9 @@ +import { RabbitMQEventBus } from "./RabbitMQEventBus"; + +export class RabbitMQConsumer { + constructor( + connection: RabbitMQEventBus + ) { + + } +} diff --git a/packages/shared/infrastructure/RabbitMQEventBus.ts b/packages/shared/infrastructure/RabbitMQEventBus.ts index c6860b1..aaa6fff 100644 --- a/packages/shared/infrastructure/RabbitMQEventBus.ts +++ b/packages/shared/infrastructure/RabbitMQEventBus.ts @@ -1,7 +1,6 @@ -import { type ChannelModel, type ConfirmChannel, connect as amqConnect } from "amqplib"; +import { type ChannelModel, type ConfirmChannel, ConsumeMessage, connect as amqConnect } from "amqplib"; import { DomainEvent, DomainEventSubscriber } from "../domain/DomainEvent"; import { EventBus } from "../domain/EventBus.port"; -import { buffer } from "node:stream/consumers"; export type RMQConnectionParams = { username: string, @@ -21,6 +20,23 @@ export class RabbitMQEventBus implements EventBus { this.checkStructure(); } + async consume(queue: string, callback: (msg: ConsumeMessage | null) => void) { + // Comproaciones antes de escuchar + if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado"); + this.checkStructure() + + // El binding (cola -> [routingkey] -> exchange) lo hago por configuracion. Meter colas a demanda? + + await this.channel.prefetch(1) + + await this.channel.consume(queue, callback) + } + + ack(msg: ConsumeMessage) { + if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado"); + return this.channel.ack(msg) + } + connection?: ChannelModel channel?: ConfirmChannel connected: Boolean = false @@ -59,8 +75,10 @@ export class RabbitMQEventBus implements EventBus { * TODO: Faltan las colas fijas segĂșn las operaciones */ private checkStructure() { - this.channel?.assertQueue("sim.queue") - this.channel?.assertExchange("sim.exchange", "direct") + this.channel?.assertQueue("sim.activations") + this.channel?.assertQueue("sim.cancelations") + this.channel?.assertQueue("sim.logs") + this.channel?.assertExchange("sim.exchange", "topic") } protected async createConnection() { diff --git a/packages/sim-consumidor/.env b/packages/sim-consumidor-activaciones/.env similarity index 100% rename from packages/sim-consumidor/.env rename to packages/sim-consumidor-activaciones/.env diff --git a/packages/sim-consumidor-activaciones/aplication/SimActivation.controller.ts b/packages/sim-consumidor-activaciones/aplication/SimActivation.controller.ts new file mode 100644 index 0000000..9072fb1 --- /dev/null +++ b/packages/sim-consumidor-activaciones/aplication/SimActivation.controller.ts @@ -0,0 +1,37 @@ +import { EventBus } from "#shared/domain/EventBus.port"; +import { ConsumeMessage } from "amqplib"; + +export class SimActivationController { + private eventBus: EventBus; + private activationUseCases: any; + + constructor( + eventBus: EventBus + ) { + this.eventBus = eventBus + + // No se si hay un sistema mejor + // convertor en const () => {} para conservar el contexto?? + this.activateSim = this.activateSim.bind(this) + } + + public activateSim(msg: ConsumeMessage | null) { + if (!this.validateActivationMsg(msg)) { + throw new Error("Error consumiendo el mensaje no es valido") + } + console.log("mensaje procesado", String(msg?.content)) + + // Caso de uso de activaciones + // + this.eventBus.ack(msg!) + } + + /** + * TODO: + * - Loguear motivos de la no validacion + */ + private validateActivationMsg(msg: ConsumeMessage | null) { + if (msg == undefined) return false; + return true; + } +} diff --git a/packages/sim-consumidor/config/env/index.ts b/packages/sim-consumidor-activaciones/config/env/index.ts similarity index 99% rename from packages/sim-consumidor/config/env/index.ts rename to packages/sim-consumidor-activaciones/config/env/index.ts index 459f2f7..f803690 100644 --- a/packages/sim-consumidor/config/env/index.ts +++ b/packages/sim-consumidor-activaciones/config/env/index.ts @@ -1,7 +1,6 @@ import { loadEnvFile } from "node:process"; loadEnvFile("../../.env") - export const env = { ENVIRONMENT: process.env.ENVIORMENT, POSTGRES_USER: process.env.POSTGRES_USER, diff --git a/packages/sim-consumidor-activaciones/config/eventBusConfig.ts b/packages/sim-consumidor-activaciones/config/eventBusConfig.ts new file mode 100644 index 0000000..f74e427 --- /dev/null +++ b/packages/sim-consumidor-activaciones/config/eventBusConfig.ts @@ -0,0 +1,27 @@ +import { RabbitMQEventBus, RMQConnectionParams } from "#shared/infrastructure/RabbitMQEventBus" +import { env } from "./env" + +const rmqUser = env.RABBITMQ_USER +const rmqPass = env.RABBITMQ_PASSWORD +const rmqHost = env.RABBITMQ_HOST +const rmqPort = Number(env.RABBITMQ_PORT) +const rmqSecure = false +const rmqVhost = env.RABBITMQ_VHOST + +export const rmqConnOptions = { + username: rmqUser, + password: rmqPass, + vhost: rmqVhost, + hostname: rmqHost, + port: rmqPort, + secure: rmqSecure, +} + +export const rabbitmqEventBus = new RabbitMQEventBus({ + connectionParams: rmqConnOptions +}) + +export async function startRMQClient() { + await rabbitmqEventBus.connect() + return rabbitmqEventBus +} diff --git a/packages/sim-consumidor-activaciones/index.ts b/packages/sim-consumidor-activaciones/index.ts new file mode 100644 index 0000000..58bc29e --- /dev/null +++ b/packages/sim-consumidor-activaciones/index.ts @@ -0,0 +1,20 @@ + +import { startRMQClient } from "#config/eventBusConfig" +import { SimActivationController } from "aplication/SimActivation.controller" + +async function startWorker() { + const rmqClient = await startRMQClient() + const simActivationController = new SimActivationController(rmqClient) + + rmqClient.consume("sim.activations", simActivationController.activateSim) +} + +startWorker() + .then(e => { + console.log("[o] Worker de activacion de SIM iniciado") + }) + .catch(e => { + console.log("[x] Error iniciando worker de activacion de SIM") + }) + +export default {} diff --git a/packages/sim-consumidor/package.json b/packages/sim-consumidor-activaciones/package.json similarity index 100% rename from packages/sim-consumidor/package.json rename to packages/sim-consumidor-activaciones/package.json diff --git a/packages/sim-consumidor/tsconfig.json b/packages/sim-consumidor-activaciones/tsconfig.json similarity index 100% rename from packages/sim-consumidor/tsconfig.json rename to packages/sim-consumidor-activaciones/tsconfig.json diff --git a/packages/sim-consumidor/index.ts b/packages/sim-consumidor/index.ts deleted file mode 100644 index e42b90e..0000000 --- a/packages/sim-consumidor/index.ts +++ /dev/null @@ -1,37 +0,0 @@ -import { env } from "./config/env" -import { RabbitConnection } from "#shared/adapters/queues/RabbitMQClient" -import { ConsumeMessage } from "amqplib" - -const rmqUser = env.RABBITMQ_USER -const rmqPass = env.RABBITMQ_PASSWORD -const rmqHost = env.RABBITMQ_HOST -const rmqPort = Number(env.RABBITMQ_PORT) -const rmqSecure = false -const rmqVhost = env.RABBITMQ_VHOST - - -async function test() { - const rbmq = new RabbitConnection({ - username: rmqUser, - password: rmqPass, - vhost: String(rmqVhost), - hostname: rmqHost, - port: rmqPort, - secure: rmqSecure - }) - await rbmq.connect() - console.log("[Consumidor] iniciado") - - await rbmq.channel?.consume("sim.queue", (buff: ConsumeMessage | null) => { - const decoded = buff?.content.toString() - console.log(" [Consumidor] Mensaje recibido ", decoded) - }, { - noAck: true - }) - -} - - - -test() -export default {} diff --git a/packages/sim-entrada-eventos/aplication/SimController.ts b/packages/sim-entrada-eventos/aplication/Sim.controller.ts similarity index 100% rename from packages/sim-entrada-eventos/aplication/SimController.ts rename to packages/sim-entrada-eventos/aplication/Sim.controller.ts diff --git a/packages/sim-entrada-eventos/config/eventBusConfig.ts b/packages/sim-entrada-eventos/config/eventBusConfig.ts index ebe7f11..3165ed9 100644 --- a/packages/sim-entrada-eventos/config/eventBusConfig.ts +++ b/packages/sim-entrada-eventos/config/eventBusConfig.ts @@ -21,6 +21,6 @@ export const rabbitmqEventBus = new RabbitMQEventBus({ connectionParams: rmqConnOptions }) -export async function startRMQServer() { +export async function startRMQClient() { await rabbitmqEventBus.connect() } diff --git a/yarn.lock b/yarn.lock index 0392e2e..f851539 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2284,9 +2284,9 @@ __metadata: languageName: node linkType: hard -"sim-consumidor@workspace:packages/sim-consumidor": +"sim-consumidor@workspace:packages/sim-consumidor-activaciones": version: 0.0.0-use.local - resolution: "sim-consumidor@workspace:packages/sim-consumidor" + resolution: "sim-consumidor@workspace:packages/sim-consumidor-activaciones" dependencies: "@tsconfig/node22": "npm:*" "@types/amqplib": "npm:^0.10.8"