Estructura del consumidor

This commit is contained in:
2026-01-16 11:14:35 +01:00
parent 20482915de
commit b29348b88d
18 changed files with 174 additions and 61 deletions

View File

@@ -1 +1,5 @@
# sim-cola-eventos
Monorepo de servicios / workers para centralizar los procesos de las SIM con sus subscripciones
[[./imgs/diagrama-servicios-sim.png]]

View File

@@ -19,20 +19,6 @@ CREATE table if not exists sim_cards (
deleted_at TIMESTAMP
);
CREATE TABLE if not exists sim_operations (
id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
sim_id BIGINT,
operation_type TEXT NOT NULL,
happened_at TIMESTAMP,
CONSTRAINT valid_operations CHECK (
operation_type in ('free','preactivate','activate','pause','cancel')
),
CONSTRAINT fk_sim_id
FOREIGN KEY(sim_id)
REFERENCES sim_cards(id)
);
CREATE TABLE if not exists sim_envio (
id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
@@ -94,6 +80,21 @@ CREATE TABLE sim_subscription (
FOREIGN KEY(subscription_type_id) REFERENCES sim_subscription_types(id)
);
CREATE TABLE if not exists sim_subscription_operations (
id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
sim_id BIGINT,
operation_type TEXT NOT NULL,
happened_at TIMESTAMP,
CONSTRAINT valid_operations CHECK (
operation_type in ('free','preactivate','activate','pause','cancel')
),
CONSTRAINT fk_subscription_id
FOREIGN KEY(subscription_id)
REFERENCES sim_subscriptions(id)
);
-- Se supone que indica un cambio
CREATE TABLE sim_subscription_historic (
id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,

View File

@@ -51,7 +51,21 @@
],
"queues": [
{
"name": "sim.queue",
"name": "sim.logs",
"vhost": "sim-vhost",
"durable": true,
"auto_delete": false,
"arguments": {}
},
{
"name": "sim.activations",
"vhost": "sim-vhost",
"durable": true,
"auto_delete": false,
"arguments": {}
},
{
"name": "sim.cancelations",
"vhost": "sim-vhost",
"durable": true,
"auto_delete": false,
@@ -62,7 +76,23 @@
{
"source": "sim.exchange",
"vhost": "sim-vhost",
"destination": "sim.queue",
"destination": "sim.activations",
"destination_type": "queue",
"routing_key": "sim.activation",
"arguments": {}
},
{
"source": "sim.exchange",
"vhost": "sim-vhost",
"destination": "sim.cancelations",
"destination_type": "queue",
"routing_key": "sim.cancelation",
"arguments": {}
},
{
"source": "sim.exchange",
"vhost": "sim-vhost",
"destination": "sim.logs",
"destination_type": "queue",
"routing_key": "sim.*",
"arguments": {}

Binary file not shown.

After

Width:  |  Height:  |  Size: 88 KiB

View File

@@ -1,6 +1,11 @@
import { ConsumeMessage } from "amqplib";
import { DomainEvent, DomainEventSubscriber } from "./DomainEvent";
export interface EventBus {
publish(events: Array<DomainEvent>): Promise<void>;
// Sacado de NEKI, posiblemente no haga falta
addSubscribers(subscribers: Array<DomainEventSubscriber<DomainEvent>>): void;
consume(queue: string, callback: (msg: ConsumeMessage | null) => void): void;
ack(msg: ConsumeMessage): void;
}

View File

@@ -0,0 +1,9 @@
import { RabbitMQEventBus } from "./RabbitMQEventBus";
export class RabbitMQConsumer {
constructor(
connection: RabbitMQEventBus
) {
}
}

View File

@@ -1,7 +1,6 @@
import { type ChannelModel, type ConfirmChannel, connect as amqConnect } from "amqplib";
import { type ChannelModel, type ConfirmChannel, ConsumeMessage, connect as amqConnect } from "amqplib";
import { DomainEvent, DomainEventSubscriber } from "../domain/DomainEvent";
import { EventBus } from "../domain/EventBus.port";
import { buffer } from "node:stream/consumers";
export type RMQConnectionParams = {
username: string,
@@ -21,6 +20,23 @@ export class RabbitMQEventBus implements EventBus {
this.checkStructure();
}
async consume(queue: string, callback: (msg: ConsumeMessage | null) => void) {
// Comproaciones antes de escuchar
if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado");
this.checkStructure()
// El binding (cola -> [routingkey] -> exchange) lo hago por configuracion. Meter colas a demanda?
await this.channel.prefetch(1)
await this.channel.consume(queue, callback)
}
ack(msg: ConsumeMessage) {
if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado");
return this.channel.ack(msg)
}
connection?: ChannelModel
channel?: ConfirmChannel
connected: Boolean = false
@@ -59,8 +75,10 @@ export class RabbitMQEventBus implements EventBus {
* TODO: Faltan las colas fijas según las operaciones
*/
private checkStructure() {
this.channel?.assertQueue("sim.queue")
this.channel?.assertExchange("sim.exchange", "direct")
this.channel?.assertQueue("sim.activations")
this.channel?.assertQueue("sim.cancelations")
this.channel?.assertQueue("sim.logs")
this.channel?.assertExchange("sim.exchange", "topic")
}
protected async createConnection() {

View File

@@ -0,0 +1,37 @@
import { EventBus } from "#shared/domain/EventBus.port";
import { ConsumeMessage } from "amqplib";
export class SimActivationController {
private eventBus: EventBus;
private activationUseCases: any;
constructor(
eventBus: EventBus
) {
this.eventBus = eventBus
// No se si hay un sistema mejor
// convertor en const () => {} para conservar el contexto??
this.activateSim = this.activateSim.bind(this)
}
public activateSim(msg: ConsumeMessage | null) {
if (!this.validateActivationMsg(msg)) {
throw new Error("Error consumiendo el mensaje no es valido")
}
console.log("mensaje procesado", String(msg?.content))
// Caso de uso de activaciones
//
this.eventBus.ack(msg!)
}
/**
* TODO:
* - Loguear motivos de la no validacion
*/
private validateActivationMsg(msg: ConsumeMessage | null) {
if (msg == undefined) return false;
return true;
}
}

View File

@@ -1,7 +1,6 @@
import { loadEnvFile } from "node:process";
loadEnvFile("../../.env")
export const env = {
ENVIRONMENT: process.env.ENVIORMENT,
POSTGRES_USER: process.env.POSTGRES_USER,

View File

@@ -0,0 +1,27 @@
import { RabbitMQEventBus, RMQConnectionParams } from "#shared/infrastructure/RabbitMQEventBus"
import { env } from "./env"
const rmqUser = env.RABBITMQ_USER
const rmqPass = env.RABBITMQ_PASSWORD
const rmqHost = env.RABBITMQ_HOST
const rmqPort = Number(env.RABBITMQ_PORT)
const rmqSecure = false
const rmqVhost = env.RABBITMQ_VHOST
export const rmqConnOptions = <RMQConnectionParams>{
username: rmqUser,
password: rmqPass,
vhost: rmqVhost,
hostname: rmqHost,
port: rmqPort,
secure: rmqSecure,
}
export const rabbitmqEventBus = new RabbitMQEventBus({
connectionParams: rmqConnOptions
})
export async function startRMQClient() {
await rabbitmqEventBus.connect()
return rabbitmqEventBus
}

View File

@@ -0,0 +1,20 @@
import { startRMQClient } from "#config/eventBusConfig"
import { SimActivationController } from "aplication/SimActivation.controller"
async function startWorker() {
const rmqClient = await startRMQClient()
const simActivationController = new SimActivationController(rmqClient)
rmqClient.consume("sim.activations", simActivationController.activateSim)
}
startWorker()
.then(e => {
console.log("[o] Worker de activacion de SIM iniciado")
})
.catch(e => {
console.log("[x] Error iniciando worker de activacion de SIM")
})
export default {}

View File

@@ -1,37 +0,0 @@
import { env } from "./config/env"
import { RabbitConnection } from "#shared/adapters/queues/RabbitMQClient"
import { ConsumeMessage } from "amqplib"
const rmqUser = env.RABBITMQ_USER
const rmqPass = env.RABBITMQ_PASSWORD
const rmqHost = env.RABBITMQ_HOST
const rmqPort = Number(env.RABBITMQ_PORT)
const rmqSecure = false
const rmqVhost = env.RABBITMQ_VHOST
async function test() {
const rbmq = new RabbitConnection({
username: rmqUser,
password: rmqPass,
vhost: String(rmqVhost),
hostname: rmqHost,
port: rmqPort,
secure: rmqSecure
})
await rbmq.connect()
console.log("[Consumidor] iniciado")
await rbmq.channel?.consume("sim.queue", (buff: ConsumeMessage | null) => {
const decoded = buff?.content.toString()
console.log(" [Consumidor] Mensaje recibido ", decoded)
}, {
noAck: true
})
}
test()
export default {}

View File

@@ -21,6 +21,6 @@ export const rabbitmqEventBus = new RabbitMQEventBus({
connectionParams: rmqConnOptions
})
export async function startRMQServer() {
export async function startRMQClient() {
await rabbitmqEventBus.connect()
}

View File

@@ -2284,9 +2284,9 @@ __metadata:
languageName: node
linkType: hard
"sim-consumidor@workspace:packages/sim-consumidor":
"sim-consumidor@workspace:packages/sim-consumidor-activaciones":
version: 0.0.0-use.local
resolution: "sim-consumidor@workspace:packages/sim-consumidor"
resolution: "sim-consumidor@workspace:packages/sim-consumidor-activaciones"
dependencies:
"@tsconfig/node22": "npm:*"
"@types/amqplib": "npm:^0.10.8"