diff --git a/deployment/database/01-ordenes-sim.sql b/deployment/database/01-ordenes-sim.sql index e5791a4..31c0984 100644 --- a/deployment/database/01-ordenes-sim.sql +++ b/deployment/database/01-ordenes-sim.sql @@ -11,30 +11,53 @@ CREATE TYPE order_status AS ENUM ( ); CREATE TABLE IF NOT EXISTS order_tracking ( - order_id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, correlation_id VARCHAR(255) NOT NULL, -- ID compartido con RabbitMQ (message_id) exchange VARCHAR(100), -- Exchange al que se envia (de momento solo hay 1 principal sin contar delay y dlx) routing_key VARCHAR(100), -- Routing key del mensaje - + order_type order_types NOT NULL DEFAULT 'unknown', + payload JSONB, -- Duda si es optimo guardar la copia, es útil en caso de fallo -- Campos de reintentos? - status order_status NOT NULL DEFAULT 'PENDING', + status order_status NOT NULL DEFAULT 'pending', retry_count INT DEFAULT 0, error_message TEXT, -- Razón del fallo - -- error_stacktrace TEXT, -- Recomendación, no se hasta que punto es necesario + error_stacktrace TEXT, - start_date TIMESTAMP NOT NULL DEFAULT now(), - update_date TIMESTAMP NOT NULL DEFAULT now(), - finished_date TIMESTAMP + start_date TIMESTAMP NOT NULL DEFAULT (now() at time zone 'utc'), + update_date TIMESTAMP NOT NULL DEFAULT (now() at time zone 'utc'), + finish_date TIMESTAMP ) -CREATE TABALE IF NOT EXISTS order_history( - history_id SERIAL PRIMARY KEY GENERATED ALWAYS AS IDENTITY, - order_id UUID REFERENCES queue_operations(operation_id) ON DELETE CASCADE, +-- Busqueda según id de rabbit +CREATE INDEX IF NOT EXISTS idx_order_correlation + ON order_tracking(correlation_id); +-- Ordenenes que todavia no han finalizado +CREATE INDEX IF NOT EXISTS pending_orders + ON order_tracking(start_date) + WHERE order_tracking.finish_date IS NULL; + +CREATE TABLE IF NOT EXISTS order_history( + id SERIAL PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + order_id BIGINT NOT NULL, previous_status order_status NOT NULL, -- Siempre hay un estado anterior, para casos excepcioneale "unknown" new_status order_status NOT NULL, change_reason TEXT, - change_date TIMESTAMP NOT NULL DEFAULT NOW(), + change_date TIMESTAMP NOT NULL DEFAULT (now() at time zone 'utc'), + + CONSTRAINT fk_order_id + FOREIGN KEY(order_id) + REFERENCES order_tracking(id) + ON DELETE CASCADE ); + +-- fk de order +CREATE INDEX IF NOT EXISTS idx_order_id + ON order_history(order_id); + +-- busquedas por fecha +CREATE INDEX IF NOT EXISTS idx_order_change_date + ON order_history(change_date); + diff --git a/deployment/database/02-objenious.sql b/deployment/database/02-objenious.sql index fcf84b5..2dbb7c6 100644 --- a/deployment/database/02-objenious.sql +++ b/deployment/database/02-objenious.sql @@ -17,7 +17,7 @@ CREATE TABLE if not exists objenious_operation ( operation TEXT NOT NULL, start_date TIMESTAMP NOT NULL DEFAULT now(), last_change_date TIMESTAMP NOT NULL DEFAULT now(), - end_date TIMESTAMP, + finish_date TIMESTAMP, error TEXT, status status_enum, objenious_status TEXT diff --git a/docs/sim-api/Activation Email.bru b/docs/sim-api/Activation Email.bru index a6817c3..8f72b9e 100644 --- a/docs/sim-api/Activation Email.bru +++ b/docs/sim-api/Activation Email.bru @@ -16,7 +16,23 @@ headers { body:json { { - "iccids":["1234"] + "id": "11", + "retry_count": 0, + "max_retry": null, + "max_date_retry": null, + "iccids": [ + "8933201125068886080" + ], + "request_id": "14362", + "mass_action_id": "5208468", + "operation": "activate", + "start_date": "2026-02-13T11:08:42.499Z", + "last_change_date": "2026-02-16T09:24:36.073Z", + "end_date": "2026-02-16T09:24:36.073Z", + "error": null, + "status": "finished", + "objenious_status": "Terminé", + "msisdn": "33764399870" } } diff --git a/packages/_template/index.ts b/packages/_template/index.ts index 35acb93..8449439 100644 --- a/packages/_template/index.ts +++ b/packages/_template/index.ts @@ -1,3 +1,3 @@ -console.log("Template") +console.log(new Date().toISOString()) export default {} diff --git a/packages/sim-objenious-cron/config/postgreConfig.ts b/packages/sim-objenious-cron/config/postgreConfig.ts index adb003a..dab0552 100644 --- a/packages/sim-objenious-cron/config/postgreConfig.ts +++ b/packages/sim-objenious-cron/config/postgreConfig.ts @@ -1,4 +1,4 @@ -import { Pool, QueryResult } from 'pg'; +import { Pool } from 'pg'; import { PgClient } from 'sim-shared/infrastructure/PgClient.js' import { env } from './env/index.js'; diff --git a/packages/sim-shared/config/config.test.ts b/packages/sim-shared/config/config.test.ts new file mode 100644 index 0000000..22fb083 --- /dev/null +++ b/packages/sim-shared/config/config.test.ts @@ -0,0 +1,27 @@ +/** + * !Importate + * Configuración unicamente para lanzar los test, este código no debe de ejecutarse + * en produccion + */ + +import { env, loadEnvFile } from "node:process"; +import { Pool } from "pg"; +import { PgClient } from "../infrastructure/PgClient.js"; + +console.warn("[i!] Se está corriendo codigo de test") +loadEnvFile("../../.env") // Global + +// se hace una por servicio. +export const pgPool = new Pool({ + user: env.POSTGRES_USER, + host: env.POSTGRES_HOST, + database: env.POSTGRES_DATABASE, + password: env.POSTGRES_PASSWORD, + port: Number(env.POSTGRES_PORT) || 5432, +}); + +export const postgresClient = new PgClient({ + pool: pgPool +}) + +console.warn(`[T] TEST DB : ${env.POSTGRES_DATABASE}@${env.POSTGRES_HOST}`) diff --git a/packages/sim-shared/domain/Order.ts b/packages/sim-shared/domain/Order.ts new file mode 100644 index 0000000..9a58d5b --- /dev/null +++ b/packages/sim-shared/domain/Order.ts @@ -0,0 +1,50 @@ +// Reemplaza al enum OrderStatus +export type OrderStatus = + | 'pending' + | 'running' + | 'finished' + | 'failed' + | 'dlx'; + +// Reemplaza al enum OrderTypes +export type OrderType = + | 'activate' + | 'preactivate' + | 'cancel' + | 'pause' + | 'reactivate' + | 'unknown'; + +// Interfaz para la tabla order_tracking +export interface OrderTracking { + id: number; + correlation_id: string; + exchange?: string | null; + routing_key?: string | null; + order_type: OrderType; + payload?: Record | null; // Por no especificar el tipo del json hasta que no se cree + status: OrderStatus; + retry_count: number; + error_message?: string | null; + error_stacktrace?: string | null; + /* TODO: Importante decidir si trabajar con fecha y tener que crear los objetos o seguir como string */ + start_date: string | Date; + update_date: string | Date; + finish_date?: string | Date | null; +} + +// Interfaz para la tabla order_history +export interface OrderHistory { + id: number; + order_id: number; + previous_status: OrderStatus; + new_status: OrderStatus; + change_reason?: string | null; + change_date: Date; +} + +// Tipo útil para la creación (Omitiendo campos generados por la DB) +export type CreateOrderDTO = Pick< + OrderTracking, // Aqui realmente no importan los campos + 'correlation_id' | 'exchange' | 'routing_key' | 'order_type' | 'payload' +>; diff --git a/packages/sim-shared/domain/Result.ts b/packages/sim-shared/domain/Result.ts index f77cd79..e030747 100644 --- a/packages/sim-shared/domain/Result.ts +++ b/packages/sim-shared/domain/Result.ts @@ -1,7 +1,14 @@ /** * Result */ -export type Result = { - error: E | undefined, - data: D | undefined -} +export type Result = + { + error: E, + data: undefined + } + | + { + error: undefined, + data: D + } + diff --git a/packages/sim-shared/infrastructure/OperationRepository.ts b/packages/sim-shared/infrastructure/OperationRepository.ts index 14c02d6..581c325 100644 --- a/packages/sim-shared/infrastructure/OperationRepository.ts +++ b/packages/sim-shared/infrastructure/OperationRepository.ts @@ -45,8 +45,8 @@ export class OperationsRepository implements IOperationsRepository { error = COALESCE($3,error), request_id = COALESCE($4, request_id), mass_action_id = COALESCE($5, mass_action_id), - last_change_date = now(), - end_date = CASE WHEN $2 IN ('finished') THEN now() ELSE end_date END, + last_change_date = now() at time zone 'utc', + end_date = CASE WHEN $2 IN ('finished') THEN now() at time zone 'utc' ELSE end_date END, objenious_status = $6 WHERE id = $1`; diff --git a/packages/sim-shared/infrastructure/OrderRepository.test.ts b/packages/sim-shared/infrastructure/OrderRepository.test.ts new file mode 100644 index 0000000..3081cb0 --- /dev/null +++ b/packages/sim-shared/infrastructure/OrderRepository.test.ts @@ -0,0 +1,35 @@ +import { describe, it } from "node:test"; +import { OrderRepository } from "./OrderRepository.js"; +import { CreateOrderDTO } from "../domain/Order.js"; +import { postgresClient } from "../config/config.test.js"; +import assert from "node:assert"; + +const order1 = { + correlation_id: "fakeRMQid-1234", + exchange: "fake.ex", + routing_key: "test.order.idk", + order_type: "activate", + payload: { iccid: "1234", action: "activate" } +} + + +describe("Test OrderRepository", {}, () => { + const orderRepo = new OrderRepository(postgresClient) + + + it("Insert new Order", async () => { + const newOrder = order1 + const result = await orderRepo.createOrder(newOrder) + + assert(result.error == undefined) + assert(result.data != undefined) + + const order = result.data! + + assert(order.id != undefined) + assert(order.correlation_id == newOrder.correlation_id) + assert(order.status == 'pending') + + console.log("[T] Creada Order", typeof (result.data.start_date)) + }) +}) diff --git a/packages/sim-shared/infrastructure/OrderRepository.ts b/packages/sim-shared/infrastructure/OrderRepository.ts new file mode 100644 index 0000000..d319beb --- /dev/null +++ b/packages/sim-shared/infrastructure/OrderRepository.ts @@ -0,0 +1,381 @@ +/** + * TODO: Usar + */ +import { PoolClient, QueryResult, QueryResultRow } from "pg"; +import { CreateOrderDTO, OrderTracking } from "../domain/Order.js"; +import { Result } from "../domain/Result.js"; +import { PgClient } from "./PgClient.js"; + +export class OrderRepository { + constructor( + private readonly pgClient: PgClient + ) { + + } + + /** + * Comprobacion de la query y devolucion del primer resulado + * Garantiza la gestion de errores + */ + private async getFirst(queryPromise: Promise>) { + try { + const queryResult = await queryPromise + return >{ + data: queryResult.rows[0] + } + } catch (e) { + return >{ + error: e as string + } + } + } + + /** + * Se asume que se va a devolver una lista del tipo T + */ + private async getAll(queryPromise: Promise>) { + + try { + const queryResult = await queryPromise + return >{ + data: queryResult.rows + } + } catch (e) { + return >{ + error: e as string + } + } + } + + /** + * TODO: OrderTracking necestia un tipo para la estructura del mensaje almacenado + */ + public async getOrderById(data: { id: number }) { + const query = ` + SELECT * FROM order_tracking + WHERE id = $1 + ` + const values = [data.id] + const queryPromise = this.pgClient.query>(query, values) + const result = this.getFirst(queryPromise); + return result + } + + /** + * Busqueda según la id de RabbitMq + */ + public async getOrderByQueueId(data: { correlation_id: string }, pool?: PoolClient) { + const query = ` + SELECT * FROM order_tracking + WHERE correlation_id = $1 + ` + const values = [data.correlation_id] + const queryPromise = this.pgClient.query>(query, values) + const result = this.getFirst(queryPromise); + return result + } + + /** + * TODO: + * - variable para el limit + */ + public async getPendingOrders() { + const client = await this.pgClient.connect(); + const query = ` + SELECT * FROM order_tracking + WHERE finish_date IS NULL + ORDER BY start_date ASC + ` + const values: string[] = [] + const queryPromise = client.query(query, values) + const result = await this.getAll(queryPromise) + client.release() + return result + } + + public async createOrder(data: CreateOrderDTO) { + const client = await this.pgClient.connect(); + await client.query("BEGIN") + const query = ` + INSERT INTO order_tracking ( + correlation_id, + exchange, + routing_key, + order_type, + payload, + status + ) + VALUES ( + $1, -- correlation_id + $2, -- exchange + $3, -- routing_key + $4, -- order_type (ej: 'activate') + $5, -- payload (json object) + 'pending' + ) + RETURNING id, correlation_id, status, start_date; + ` + const values = [data.correlation_id, data.exchange, data.routing_key, data.order_type, data.payload] + const queryPromise = client.query(query, values) + const result = await this.getFirst(queryPromise) + + if (result.error == undefined) { + await client.query("COMMIT") + } else { + await client.query("ROLLBACK") + } + + client.release() + return result + } + + /** + * Actualizacion "correcta" del estado de un order + */ + public async updateOrder(args: { + id: number, + new_status: string, + reason: string + }) { + const client = await this.pgClient.connect(); + await client.query('BEGIN'); + + // 1. Se consulta la order de base + const qCurrentOrder = ` + SELECT * FROM order_tracking + WHERE id = $1 + ` + const vCurrentOrder = [args.id] + + const currentOrderResult = await this.getFirst(client.query>(qCurrentOrder, vCurrentOrder)) + + if (currentOrderResult.error != undefined) { + await client.query("ROLLBACK") + client.release() + return currentOrderResult + } + + const currentOrder = currentOrderResult.data! + + // 2. Si todo ok se actualiza el order + const uOrderTracking = ` + UPDATE order_tracking + SET + status = $2, + update_date = (now() at time zone 'utc') + WHERE id = $1 + RETURNING id, status, update_date; + ` + const vOrderTracking = [args.id, args.new_status] + const updatedOrderResult = await this.getFirst( + client.query<{ id: number, status: string, update_date: string }>(uOrderTracking, vOrderTracking) + ) + + if (updatedOrderResult.error != undefined) { + await client.query("ROLLBACK") + client.release() + return updatedOrderResult + } + + // 3. Si todo ok se añade una entradad de order_history con los datos modificados + const iOrderHistory = ` + INSERT INTO order_history ( + order_id, + previous_status, + new_status, + change_reason + ) + VALUES ( + $1, -- ID de la orden + $2, -- Estado anterior + $3, -- Nuevo estado + $4 -- Razón (ej: "Consumer processed successfully" o "RabbitMQ NACK") + ); + ` + const vOrderHistory = [args.id, currentOrder.status, args.new_status, args.reason] + const newOrderHistory = await this.getFirst( + client.query<{ id: number, status: string, update_date: string }>(iOrderHistory, vOrderHistory) + ) + + if (newOrderHistory.error != undefined) { + await client.query("ROLLBACK") + client.release() + return updatedOrderResult + } + + await client.query("COMMIT") + + const updatedOrder = await this.getFirst( + client.query>(qCurrentOrder, vCurrentOrder) + ) + + return updatedOrder + } + + + public async finishOrder(args: { id: number, reason?: string }) { + const client = await this.pgClient.connect(); + await client.query('BEGIN'); + + // 1. Se consulta la order de base + const qCurrentOrder = ` + SELECT * FROM order_tracking + WHERE id = $1 + ` + const vCurrentOrder = [args.id] + + const currentOrderResult = await this.getFirst(client.query>(qCurrentOrder, vCurrentOrder)) + + if (currentOrderResult.error != undefined) { + await client.query("ROLLBACK") + client.release() + return currentOrderResult + } + + const currentOrder = currentOrderResult.data! + + // 2. Si todo ok se actualiza el order + const uOrderTracking = ` + UPDATE order_tracking + SET + status = 'finished', + update_date = (now() at time zone 'utc'), + finish_date = (now() at time zone 'utc') + WHERE id = $1 + RETURNING id, status, update_date; + ` + const vOrderTracking = [args.id] + const updatedOrderResult = await this.getFirst( + client.query<{ id: number, status: string, update_date: string }>(uOrderTracking, vOrderTracking) + ) + + if (updatedOrderResult.error != undefined) { + await client.query("ROLLBACK") + client.release() + return updatedOrderResult + } + + // 3. Si todo ok se guardo un nuevo registro de history + const iOrderHistory = ` + INSERT INTO order_history ( + order_id, + previous_status, + new_status, + change_reason + ) + VALUES ( + $1, -- ID de la orden + $2, -- Estado anterior + 'finished', + $3 -- Siempre "finished successfully" a no ser que se especifique otra razón + ); + ` + const vOrderHistory = [args.id, currentOrder.status, args.reason ?? "finished successfully"] + const newOrderHistory = await this.getFirst( + client.query<{ id: number, status: string, update_date: string }>(iOrderHistory, vOrderHistory) + ) + + if (newOrderHistory.error != undefined) { + await client.query("ROLLBACK") + client.release() + return updatedOrderResult + } + + await client.query("COMMIT") + + const updatedOrder = await this.getFirst( + client.query>(qCurrentOrder, vCurrentOrder) + ) + + return updatedOrder + } + + public async errorOrder(args: { + id: number, + status: "failed" | "dlx", + reason: string, + error?: string, + stackTrace?: string + }) { + const client = await this.pgClient.connect(); + await client.query('BEGIN'); + + // 1. Se consulta la order de base + const qCurrentOrder = ` + SELECT * FROM order_tracking + WHERE id = $1 + ` + const vCurrentOrder = [args.id] + + const currentOrderResult = await this.getFirst(client.query>(qCurrentOrder, vCurrentOrder)) + + if (currentOrderResult.error != undefined) { + await client.query("ROLLBACK") + client.release() + return currentOrderResult + } + + const currentOrder = currentOrderResult.data! + + // 3. Si todo ok se actualiza el order + // Si el status es dlx se asume que ha terminado y no va a reintentarse + // Si es failed se asume que se ha movido a la cola de delay y en algún momento se va a reintentar + const uOrderTracking = ` + UPDATE order_tracking + SET + status = $2, + update_date = (now() at time zone 'utc'), + finish_date = CASE WHEN $2 = 'dlx' THEN (now() at time zone 'utc') ELSE null, + retry_count = retry_count + 1, + error_message = $3, + error_stacktrace = $4 + WHERE id = $1 + RETURNING id, status, update_date; + ` + const vOrderTracking = [args.id, args.status, args.error, args.stackTrace] + const updatedOrderResult = await this.getFirst( + client.query<{ id: number, status: string, update_date: string }>(uOrderTracking, vOrderTracking) + ) + + if (updatedOrderResult.error != undefined) { + await client.query("ROLLBACK") + client.release() + return updatedOrderResult + } + + // 3. Si todo ok se guardo un nuevo registro de history + const iOrderHistory = ` + INSERT INTO order_history ( + order_id, + previous_status, + new_status, + change_reason + ) + VALUES ( + $1, -- ID de la orden + $2, -- Estado anterior + $3, -- En este caso particular 'dlx' o 'failed' + $4 -- En este caso el motivo de fallo completo + ); + ` + const vOrderHistory = [args.id, currentOrder.status, args.reason ?? "finished successfully", args.stackTrace] + const newOrderHistory = await this.getFirst( + client.query<{ id: number, status: string, update_date: string }>(iOrderHistory, vOrderHistory) + ) + + if (newOrderHistory.error != undefined) { + await client.query("ROLLBACK") + client.release() + return updatedOrderResult + } + + await client.query("COMMIT") + + const updatedOrder = await this.getFirst( + client.query>(qCurrentOrder, vCurrentOrder) + ) + + return updatedOrder + } + +} diff --git a/packages/sim-shared/package.json b/packages/sim-shared/package.json index 80432eb..3ec66a3 100644 --- a/packages/sim-shared/package.json +++ b/packages/sim-shared/package.json @@ -38,7 +38,7 @@ } }, "scripts": { - "test": "echo \"Error: no test specified\" ", + "test": "node --import tsx --test ./**/*.test.ts", "dev": "echo \" Shared no es un modulo ejecutable \" ", "build": "tsc --build && tsc-alias -p tsconfig.json && cp package.json ../../dist/packages/sim-shared/" },