26 Commits

Author SHA1 Message Date
f78a333e1e Merge pull request 'WEBINT-338_tiempo_suspension' (#2) from WEBINT-338_tiempo_suspension into main
Reviewed-on: #2
2026-04-28 13:40:03 +00:00
01c55cba0f No se traquea el .env 2026-04-28 15:35:00 +02:00
10b2ae244c ignore 2026-04-28 15:33:08 +02:00
2dba2ebfae Query de sims por networkStatus y tiempo de suspension 2026-04-28 15:23:27 +02:00
d7eb4ad326 Gateway para francia funciona 2026-04-27 17:24:40 +02:00
d818441bde Docu 2026-04-27 16:30:22 +02:00
c91965567d Actualizacion docs 2026-04-27 15:17:20 +02:00
d063b47bec Merge branch 'main' into WEBINT-338_tiempo_suspension 2026-04-27 14:00:08 +02:00
6112de297b Bug duplicados solo en la primera linea 2026-04-27 13:46:34 +02:00
166c940295 Fix 2026-04-27 13:32:55 +02:00
246e4cb83b Comentarios 2026-04-27 13:03:47 +02:00
4517796ef3 Calculo del tiempo en suspension 2026-04-27 12:12:12 +02:00
e1450c6e97 POST -> PATCH 2026-04-22 15:22:37 +02:00
e40a19bbfb Bug de correlation_id undefined 2026-04-22 13:31:24 +02:00
fbdb64f3a1 Arreglo de bugs ordes 2026-04-22 12:59:23 +02:00
9a29f49669 Fix orders nos 2026-04-22 12:31:46 +02:00
c2081191ae Trazabilidad de nos, arreglo de orders 2026-04-21 17:39:09 +02:00
f0f3827fd0 Registro del estado/resultado de las operaciones de NOS 2026-04-21 15:51:16 +02:00
ee8f84bc57 Typescript 6 2026-04-21 13:33:01 +02:00
f95677d503 Error pageNOS 2026-04-21 12:50:54 +02:00
59b0b57ec2 Merge branch 'main' into WEBINT-334-migracion-nos 2026-04-21 10:47:08 +02:00
9174b0b6a4 Nombres de colas 2026-04-21 10:22:53 +02:00
e62c49ce91 Endpoints NOS 2026-04-21 10:11:21 +02:00
32990b4dcd Controladores y rutas 2026-04-17 15:49:53 +02:00
da2413002b Repositorio de nos completo 2026-04-17 14:06:41 +02:00
fdbb81ba64 Inicio port NOS 2026-04-16 17:46:32 +02:00
79 changed files with 2581 additions and 995 deletions

32
.env
View File

@@ -1,32 +0,0 @@
PORT=3000
API_HOSTNAME=0.0.0.0
RABBITMQ_USER=guest
RABBITMQ_PASSWORD=guest
ENVIORMENT=development
#RABBITMQ_HOST=rabbitmq-sim-broker
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USER=guest
RABBITMQ_PASSWORD=guest
RABBITMQ_SECURE=false
RABBITMQ_VHOST=sim-vhost
# Hay cosas que unificar de varios servicios
#POSTGRES_HOST=postgresql-sim
POSTGRES_HOST=localhost
POSTGRES_DB=postgres
POSTGRES_DATABASE=postgres
POSTGRES_PORT=5433
POSTGRES_USER=postgres
POSTGRES_PASSWORD='1234'
# Para el postgres local para generar el script de resultado de migraciones
PGHOST=localhost
PGUSER=alvar
PGPASSWORD=alvar
PGPORT=5433
# Proxy
CONNECTIONS_URL=https://sim-connections.savefamilygps.net

2
.gitignore vendored
View File

@@ -20,3 +20,5 @@ node_modules
dist/*
.env

View File

@@ -12,13 +12,13 @@ La compañia a la que pertenece cada peticion y por tanto el servicio que lo va
## Decisiones pendientes
- [x] La capa worker según acción y la de operaciones de proveedores, se podrían unir en una sola con un enrutamiento por acción y compañía, pasando de tener claves `sim.[acción]` a `sim.[compañia].[acción]`. *Se ha aplicado el cambio ahora las routing keys tienen la estructura `sim.[compañia].[acción]`*
- [x] La estructura de RMQ se genera por medio del JSON, igual habría que definir cada cola en el worker que la consuma para poder añadir workers sin parar el RMQ. *Se ha aplicado el cambio, ahora solo se define en el json el broker principal para garantizar que exita sin servicios consumidores. Sin embargo tal como estan estructurdos los proyectos no es posible reiniciar solo un servicio*
- [x] La capa worker según acción y la de operaciones de proveedores, se podrían unir en una sola con un enrutamiento por acción y compañía, pasando de tener claves `sim.[acción]` a `sim.[compañia].[acción]`. _Se ha aplicado el cambio ahora las routing keys tienen la estructura `sim.[compañia].[acción]`_
- [x] La estructura de RMQ se genera por medio del JSON, igual habría que definir cada cola en el worker que la consuma para poder añadir workers sin parar el RMQ. _Se ha aplicado el cambio, ahora solo se define en el json el broker principal para garantizar que exita sin servicios consumidores. Sin embargo tal como estan estructurdos los proyectos no es posible reiniciar solo un servicio_
- [ ] Versionado de la API.
- [x] Método para sacar la compañía a partir del iccid, o buscar en la BDD si no es posible. *De momento es un objeto Map en el servicio de gateway*
- [ ] Cola de mensajes que no se han podido procesar. Distinguir según error de red; se reintenta; o error del propio mensaje; se envía a la cola de errores. v2 Se ha creado una cola de delay pero no se distingue el tipo de error, despues de n reintentos el mensaje va a la cola de dead-letter.
- [ ] Seguimiento de las peticiones de Objenious, por cada peticion hay qye hacer un seguimiento del request y de los mass action para saber si las activaciones han tenido exito. Habria que crear otra cola para consultar cada x tiempo o mejor un cron?
- [ ] Actualizar en la base de datos el estado de las peticiones de las sim y añadir el número de telefono cuando se activen o cuando se cumpla una accion.
- [x] Método para sacar la compañía a partir del iccid, o buscar en la BDD si no es posible. _De momento es un objeto Map en el servicio de gateway_
- [ ] Cola de mensajes que no se han podido procesar. Distinguir según error de red; se reintenta; o error del propio mensaje; se envía a la cola de errores. v2 Se ha creado una cola de delay pero no se distingue el tipo de error, después de n reintentos el mensaje va a la cola de dead-letter.
- [x] Seguimiento de las peticiones de Objenious, por cada peticion hay qye hacer un seguimiento del request y de los mass action para saber si las activaciones han tenido exito. Habria que crear otra cola para consultar cada x tiempo o mejor un cron?
- [x] Actualizar en la base de datos el estado de las peticiones de las sim y añadir el número de telefono cuando se activen o cuando se cumpla una accion.
## Versión con consumidores basados en la compañia
@@ -32,8 +32,14 @@ OBJENIOUS (33)2011a
## Diagrama de las colas de Rabbitmq
Actualmente la topologia de las colas consiste en un exchage principal que recibe todos los mensajes y los redistribuye en las colas de cada empresa y a la de logs. Para evitar reintentos de mensajes instantaneos, que podrian ser inutiles si algún servicio se ha caido, se ha añadido una cola de delay que alamcena los mesajes fallidos durante n segundos antes de ser reenviados al exchange principal. Si despues de n reintentos el mensaje sigue fallando se envia a la cola de dead-letter para ser procesado manualmente.
Actualmente la topología de las colas consiste en un exchage principal que recibe todos los mensajes y los redistribuye en las colas de cada empresa y a la de logs. Para evitar reintentos de mensajes instantáneos, que podrían ser inútiles si algún servicio se ha caído, se ha añadido una cola de delay que almacena los mensajes fallidos durante n segundos antes de ser reenviados al exchange principal. Si después de n reintentos el mensaje sigue fallando se envía a la cola de dead-letter para ser procesado manualmente.
![img](./imgs/diagrama-rabbit.png)
La decisión del numero de reintentos y la cola de dlx se hace en los servicios, con una configuracion global en shared.
La decisión del numero de reintentos y la cola de dlx se hace en los servicios, con una configuración global en shared.
## Puertos internos para comunicaciones entre sub-servicios
- **3000**: Gateway (sim-entrada-eventos)
- **3001**: Consumidor NOS (sim-consumidor-nos)
- **3002**: Consumidor Objenious (sim-consumidor-objenious)

File diff suppressed because one or more lines are too long

View File

@@ -11,7 +11,7 @@ post {
}
body:form-urlencoded {
iccid: 8933201125065160380
iccid: 8935103196306448300
offer: SAVEFAMILY1
}

View File

@@ -11,7 +11,7 @@ post {
}
body:form-urlencoded {
iccid: 8933201125068890074
iccid: 8933201125068887054
}
settings {

View File

@@ -0,0 +1,26 @@
meta {
name: France Suspended Lines
type: http
seq: 17
}
get {
url: {{baseurl}}/france/lines?status=SUSPENDED&limit=100
body: none
auth: inherit
}
params:query {
status: SUSPENDED
limit: 100
}
vars:pre-request {
iccid: 8933201125065160331
~baseurl: http://localhost:3002
}
settings {
encodeUrl: true
timeout: 0
}

View File

@@ -0,0 +1,21 @@
meta {
name: France Suspended Time
type: http
seq: 15
}
get {
url: {{baseurl}}/france/lines/{{iccid}}/suspended-time
body: none
auth: inherit
}
vars:pre-request {
iccid: 8933201125065160331
~baseurl: http://localhost:3002
}
settings {
encodeUrl: true
timeout: 0
}

View File

@@ -15,7 +15,7 @@ params:query {
}
body:form-urlencoded {
iccid: 8933201125068886700
iccid: 8933201125065160331
}
settings {

View File

@@ -11,7 +11,7 @@ post {
}
body:form-urlencoded {
iccid: 8933201125065160380
iccid: 8935103196306448300
~offer: SAVEFAMILY1
}

View File

@@ -0,0 +1,23 @@
info:
name: Select Page
type: http
seq: 6
http:
method: GET
url: "{{baseurl}}/selectPage"
params:
- name: iccid
value: "8935103196306448300"
type: query
disabled: true
body:
type: json
data: ""
auth: inherit
settings:
encodeUrl: true
timeout: 0
followRedirects: true
maxRedirects: 5

25
docs/sim-nos/Select.yml Normal file
View File

@@ -0,0 +1,25 @@
info:
name: Select
type: http
seq: 5
http:
method: GET
url: "{{baseurl}}/select?iccid=8935103196306448300"
params:
- name: iccid
value: "8935103196306448300"
type: query
body:
type: json
data: |-
{
"iccid": "8933201125068890066"
}
auth: inherit
settings:
encodeUrl: true
timeout: 0
followRedirects: true
maxRedirects: 5

View File

@@ -0,0 +1,7 @@
name: local
color: "#2E8A54"
variables:
- name: baseurl
value: http://localhost:3001
- secret: true
name: token

View File

@@ -0,0 +1,7 @@
name: prod
color: "#CE4F3B"
variables:
- name: baseurl
value: https://nosconnectcenter-api.iot-x.com
- secret: true
name: token

View File

@@ -0,0 +1,10 @@
opencollection: 1.0.0
info:
name: sim-nos
bundled: false
extensions:
bruno:
ignore:
- node_modules
- .git

View File

@@ -0,0 +1,22 @@
info:
name: subscriber actions
type: http
seq: 1
http:
method: GET
url: "{{baseurl}}/subscribers/{{iccid}}/actions"
auth:
type: bearer
token: "{{token}}"
runtime:
variables:
- name: iccid
value: "8935103196306448300"
settings:
encodeUrl: true
timeout: 0
followRedirects: true
maxRedirects: 5

View File

@@ -0,0 +1,22 @@
info:
name: subscriber info
type: http
seq: 2
http:
method: GET
url: "{{baseurl}}/subscribers/{{iccid}}"
auth:
type: bearer
token: "{{token}}"
runtime:
variables:
- name: iccid
value: "8935103196306448300"
settings:
encodeUrl: true
timeout: 0
followRedirects: true
maxRedirects: 5

View File

@@ -0,0 +1,22 @@
info:
name: subscriber products available
type: http
seq: 4
http:
method: GET
url: "{{baseurl}}/subscribers/{{iccid}}/products/available"
auth:
type: bearer
token: "{{token}}"
runtime:
variables:
- name: iccid
value: "8935103196306448300"
settings:
encodeUrl: true
timeout: 0
followRedirects: true
maxRedirects: 5

View File

@@ -0,0 +1,22 @@
info:
name: subscribers
type: http
seq: 3
http:
method: GET
url: "{{baseurl}}/subscribers"
auth:
type: bearer
token: "{{token}}"
runtime:
variables:
- name: iccid
value: "8935103196306448300"
settings:
encodeUrl: true
timeout: 0
followRedirects: true
maxRedirects: 5

View File

@@ -7,11 +7,11 @@
],
"scripts": {
"test": "vitest watch",
"build": "yarn workspaces foreach -A --exclude sim-consumidor-nos run build && cp .env dist/ && yarn setup:runtime",
"build": "rm -rf ./dist && yarn workspaces foreach -Api run build && cp .env dist/ && yarn setup:runtime",
"setup:runtime": "mkdir -p dist/packages/node_modules && ln -sf ../sim-shared dist/packages/node_modules/sim-shared && ln -sf ../sf-consumidor-objenious dist/packages/node_modules/sim-consumidor-objenious",
"start": "yarn setup:runtime && yarn workspaces foreach -Apiv --exclude sim-consumidor-nos run start",
"start": "yarn setup:runtime && yarn workspaces foreach -Apiv run start",
"typecheck": "npx tsc --noEmit",
"dev": "yarn workspaces foreach -Apiv --exclude sim-consumidor-nos run dev ",
"dev": "yarn workspaces foreach -Apiv run dev",
"lint": "eslint .",
"lint:fix": "eslint --fix .",
"format": "prettier --write .",
@@ -28,7 +28,7 @@
"dotenv": "^17.2.3",
"express": "^5.2.1",
"pg": "^8.18.0",
"typescript": "^5.9.3",
"typescript": "^6.0.3",
"uuidv7": "^1.1.0",
"vite": "^7.3.1",
"vite-tsconfig-paths": "^6.0.5"

View File

@@ -1,3 +1,8 @@
NOS_BASE_URL=localhost
APP_PORT=3001
APP_HOST="0.0.0.0"
ENVIORMENT=development
NOS_ACCESS_TOKEN=2YGhecTr4+uKbVKxaqBlk2edsrHA2OQY
NOS_BASE_URL=https://nosconnectcenter-api.iot-x.com

View File

@@ -1,65 +1,178 @@
import { EventBus } from "sim-shared/domain/EventBus.port.js";
import { ConsumeMessage } from "amqplib";
import { Request, Response } from "express"
import { SimNosUsecases } from "./SimNOS.usecases.js";
import { EventBus } from "sim-shared/domain/EventBus.port.js";
import { Result } from "sim-shared/domain/Result.js";
import { SimEvents } from "sim-shared/domain/SimEvents.js";
import { iccidValidator } from "./httpValidators.js";
export class SimNosController {
private eventBus: EventBus;
private activationUseCases: any;
private routes = new Map<string, () => void>([
["activate", async () => { console.log("caso de uso activate") }],
["pause", async () => { console.log("caso de uso pause") }],
["cancel", async () => { console.log("caso de uso cancel") }],
])
constructor(
eventBus: EventBus
private uscases: SimNosUsecases,
private eventBus: EventBus,
) {
this.eventBus = eventBus
// No se si hay un sistema mejor
// convertor en const () => {} para conservar el contexto??
this.recibeMsg = this.recibeMsg.bind(this)
}
public async recibeMsg(msg: ConsumeMessage | null) {
if (!this.validateActivationMsg(msg)) {
throw new Error("Error consumiendo el mensaje no es valido")
}
private validateMsg(msg: ConsumeMessage | null) {
if (msg == undefined) return false;
const msgData = this.decodeMsg(msg) as SimEvents.general
if (msgData == undefined || msgData.payload == undefined) throw new Error("Mensaje invalido")
return msgData;
}
msg = msg!
const msgParsed = JSON.parse(String(msg.content))
const msgKey = msg.fields.routingKey.split(".")
const accion = msgKey[2]
if (accion == undefined) {
console.error("La routingKey es incorrecta: " + accion)
this.eventBus.nack(msg)
return;
}
if (this.routes.get(accion) == undefined) {
console.error("No hay una ruta definida para la accion")
this.eventBus.nack(msg)
return;
private decodeMsg(msg: ConsumeMessage): object | undefined {
if (msg.content == undefined) {
console.warn('[Sim.controller] Mensaje vacío');
return undefined;
}
try {
this.routes.get(accion)!()
} catch (err) {
console.log("Error procesando el mensaje")
this.eventBus.nack(msg)
} finally {
this.eventBus.ack(msg)
// Convertir el Buffer a String (UTF-8)
const contentJson = JSON.parse(Buffer.from(msg.content).toString('utf8'))
return contentJson;
} catch (error) {
console.error('Error al decodificar JSON:', error);
console.error(Buffer.from(msg.content).toString(("utf8")))
// Aquí podrías decidir devolver el string crudo o null
return undefined;
}
}
/**
* TODO:
* - Loguear motivos de la no validacion
* Metodo duplicado se puede generalizar la a una clase sharedController con las funciones basicas
*/
private async tryUseCase<T extends any>
(msg: ConsumeMessage, usecase: () => Promise<Result<string, T>>): Promise<Result<string, T>> {
try {
const result = await usecase()
if (result.error == undefined) {
await this.eventBus.ack(msg)
return result
} else {
console.error("Error procesando el caso de uso (NOS)", result.error)
this.eventBus.nack(msg)
return result
}
} catch (e) {
console.error("Error general procesando el caso de uso (NOS)")
this.eventBus.nack(msg)
return {
error: String(e)
}
}
}
public activate() {
return async (msg: ConsumeMessage) => {
console.log("[i] Evento activate ", msg.fields)
const data = this.validateMsg(msg) as SimEvents.activation
const iccid = data.payload.iccid
const correlation_id = data.headers?.message_id
const res = await this.tryUseCase(msg, this.uscases.activate({
iccid: iccid,
correlation_id: correlation_id
}))
return res;
}
}
public suspend() {
return async (msg: ConsumeMessage) => {
console.log("Evento suspend ", msg.fields)
const data = this.validateMsg(msg) as SimEvents.suspend
const iccid = data.payload.iccid
const correlation_id = data.headers?.message_id
const res = await this.tryUseCase(msg, this.uscases.suspend({
iccid: iccid,
correlation_id: correlation_id
}))
return res;
}
}
public terminate() {
return async (msg: ConsumeMessage) => {
console.log("Evento termiante no soportado ", msg.fields)
}
}
public reActivate() {
return async (msg: ConsumeMessage) => {
console.log("Evento reActivate ", msg.fields)
const data = this.validateMsg(msg) as SimEvents.reActivation
const iccid = data.payload.iccid
const correlation_id = data.headers?.message_id
const res = await this.tryUseCase(msg, this.uscases.reactivate({
iccid: iccid,
correlation_id: correlation_id
}))
return res;
}
}
/**
* Select especificamente por REST para evitar pasar por las colas.
* La respuesta es instantanea no se tiene que registrar como operación.
*/
private validateActivationMsg(msg: ConsumeMessage | null) {
if (msg == undefined) return false;
return true;
public selectREST() {
return async (req: Request, res: Response) => {
const { query } = req
const body = { iccid: query.iccid as string }
console.log("Evento select", body)
const validateBody = iccidValidator.validate(body);
if (validateBody.error != undefined) {
res.status(402).json(validateBody)
return;
}
const iccid: string | string[] = body.iccid
if (Array.isArray(iccid)) {
// TODO: Automatizar la paginacion
//const usecaseRes = this.uscases.selectMany({ iccid })
} else {
const usecaseRes = await this.uscases.selectOne({ iccid })
if (usecaseRes.error != undefined) {
res.status(500).json(usecaseRes)
return;
} else {
res.send(usecaseRes.data)
return;
}
}
res.status(200).json(validateBody)
}
}
public selectPageREST() {
return async (req: Request, res: Response) => {
const { offset, limit, filter, orderBy } = req.query
const params = {
offset: (offset != undefined) ? Number(offset) : undefined,
limit: (limit != undefined) ? Number(limit) : undefined,
filter: (filter != undefined) ? String(filter) : undefined,
orderBy: (orderBy != undefined) ? String(orderBy) : undefined
}
const usecaseRes = await this.uscases.selectPage(params)
if (usecaseRes.error != undefined) {
res.status(500).json(usecaseRes)
return;
} else {
res.status(200).send(usecaseRes.data)
return;
}
}
}
}

View File

@@ -0,0 +1,79 @@
/**
* Dirige cada mensaje dependiendo de el tipo de acción que contenga
* Podría hacerse con varias colas, pero así se controla mejor que
* las operaciones se hagan de 1 en 1.
*/
import { ConsumeMessage } from "amqplib";
import { SimNosController } from "./SimNOS.controller.js";
import { EventBus } from "sim-shared/domain/EventBus.port.js";
import { Result } from "sim-shared/domain/Result.js";
type FuncType = ((m: ConsumeMessage) => Promise<Result<string, any>>)
export class SimNosRouter {
private readonly routes: Map<string, FuncType>;
constructor(
private readonly simController: SimNosController,
private readonly eventBus: EventBus
) {
this.routes = new Map<string, FuncType>([
//["select", undefined],
["activate", this.simController.activate()],
["pause", this.simController.suspend()],
["reactivate", this.simController.reActivate()],
//["cancel", this.simController.terminate()],
//["preActivate", this.simController.preActivate()]
]);
}
/**
* Enruta el mensaje a la acción correspondiente basándose en la routing key
* TODO: No estoy seguro que deba meter el nack aqui
* - De moemento el ack-nack se gestiona en los controller, por si acaso hay casos
* limite en
*/
public route = async (msg: ConsumeMessage | null): Promise<void> => {
if (!msg) {
console.error("[Router] Mensaje vacío");
return;
}
const action = this.extractAction(msg);
if (!action) {
console.error("[Router] La routing key no tiene una acción definida", msg.fields.routingKey);
this.eventBus.nack(msg)
return;
}
const handler = this.routes.get(action);
if (!handler) {
console.error(`[Router] La acción '${action}' no tiene un controlador asociado`);
this.eventBus.nack(msg)
return;
}
try {
console.log("[Router] Ejecutando operación:", action);
// El controlador devuelve una función (thunk) que debe ser ejecutada
const executeParams = handler(msg);
if (typeof executeParams === "function") {
const res = await executeParams;
}
} catch (error) {
console.error(`[Router] Error al ejecutar la operación '${action}':`, error);
this.eventBus.nack(msg)
}
};
private extractAction(msg: ConsumeMessage): string | undefined {
// Se asume que la acción está en la tercera posición: domain.compañia.accion
return msg.fields.routingKey.split(".")[2];
}
}

View File

@@ -0,0 +1,156 @@
/**
* Documentación de referencia:
* https://pelion-help.iot-x.com/nos/en-US/Content/API/APIReference/API%20Reference.htm?tocpath=_____7
*
* En nos el correlation_id ya va a ser obligatorio en todos los mensajes
*
* TODO:
* - Control de errores más preciso
*
*/
import { NosHttpClient } from "#infrastructure/NosHttpClient.js";
import { NosRepository } from "#infrastructure/NosRepository.js";
import { ErrorOrderDTO, FinishOrderDTO, UpdateOrderDTO } from "sim-shared/domain/Order.js";
import { Result } from "sim-shared/domain/Result.js";
import { OrderRepository } from "sim-shared/infrastructure/OrderRepository.js";
export class SimNosUsecases {
constructor(
private httpClient: NosHttpClient,
private nosRepository: NosRepository,
private orderRepository: OrderRepository
) {
}
private async setRunning(correlation_id: string) {
// En NOS el updateOrder se hace con el correlation_id que viene en la cabecera del
// mensaje consumido
const updateData: UpdateOrderDTO = {
new_status: "running",
correlation_id: correlation_id
}
const order = await this.orderRepository.updateOrder(updateData)
return order
}
private async setFinished(correlation_id: string) {
// En NOS el updateOrder se hace con el correlation_id que viene en la cabecera del
// mensaje consumido
const updateData: FinishOrderDTO = {
correlation_id: correlation_id
}
const order = await this.orderRepository.finishOrder(updateData)
return order
}
private async setFailed(correlation_id: string, reason: string, detail?: string) {
// En NOS el updateOrder se hace con el correlation_id que viene en la cabecera del
// mensaje consumido
const updateData: ErrorOrderDTO = {
status: "failed",
correlation_id: correlation_id,
reason: reason,
error: reason,
stackTrace: detail
}
console.log("SET FAILED DATA:", updateData)
const order = await this.orderRepository.errorOrder(updateData)
console.log("SET FAILED RES:", order)
return order
}
public usecaseTemplate<T, R>(
func: (_: T) => Promise<Result<string, R>>,
args: T,
correlation_id?: string | undefined
) {
return async () => {
// Operacion pending -> running
if (correlation_id != undefined)
this.setRunning(correlation_id)
.then()
.catch(e => console.error("Error actualizando el order", e))
try {
const res = await func(args)
if (res.error != undefined) {
console.log("Error peticion: ", res, correlation_id)
if (correlation_id != undefined)
this.setFailed(correlation_id, res.error)
.then(e => console.log("failed", e))
.catch(e => console.error(e))
return res;
} else {
if (correlation_id != undefined)
this.setFinished(correlation_id).then()
return res;
}
} catch (e) {
if (correlation_id != undefined)
this.setFailed(correlation_id, "Error general de operacion de SIM (NOS) ", String(e)).then()
return {
error: "Error general de operacion de SIM (NOS) " + String(e)
}
}
}
}
public activate(args: {
iccid: string,
correlation_id?: string
}) {
return this.usecaseTemplate(
(args) => this.nosRepository.activateSim(args), args.iccid, args.correlation_id)
}
public suspend(args: {
iccid: string,
correlation_id?: string
}) {
return this.usecaseTemplate(
(args) => this.nosRepository.bar(args), args.iccid, args.correlation_id)
}
public reactivate(args: {
iccid: string,
correlation_id?: string
}) {
return this.usecaseTemplate(
(args) => this.nosRepository.unbar(args), args.iccid, args.correlation_id)
}
public terminate(args: { iccid: string }) {
throw new Error("No hay termination para NOS")
}
/* Importante: Las operaciones de lectua no dejan registro en orders */
public async selectOne(args: {
iccid: string
}) {
const res = await this.nosRepository.getLineInfo(args.iccid)
return res
}
public async selectPage(args: {
offset?: number,
limit?: number,
filter?: string,
orderBy?: string
}) {
const res = await this.nosRepository.getLinePage(args)
return res
}
/**
public selectMany(args: {
iccid: string[]
}) {
return {}
}
*/
}

View File

@@ -0,0 +1,39 @@
import { BodyValidator, Validator } from "sim-shared/aplication/BodyValidator.js";
const iccidNotNull = <Validator<{ iccid: unknown }>>{
field: "iccid",
errorMsg: "El iccid no está definido",
validationFunc: (a: { iccid: unknown }) => {
return (a.iccid != null && a.iccid != undefined)
}
}
const iccidValueOrArray = <Validator<{ iccid: unknown }>>{
field: "iccid",
errorMsg: "El iccid debe de ser un único valor o una lista",
validationFunc: (a: { iccid: unknown }) => {
return (typeof a.iccid == "string" || Array.isArray(a.iccid))
}
}
const iccidLongitudValidator = <Validator<{ iccid: string | string[] }>>{
field: "iccid",
errorMsg: "La longitud del iccid/s es incorrecta debera ser de 19 caracteres",
validationFunc: (a: { iccid: string | string[] }) => {
if (Array.isArray(a.iccid)) {
const res = (a.iccid as string[]).filter(e => e.length != 19)
if (res.length > 0) return false;
} else {
return (a.iccid as string).length == 19
}
},
}
export const iccidValidator = new BodyValidator<{ iccid: string | string[] }>(
[
iccidNotNull,
iccidValueOrArray,
iccidLongitudValidator,
]
)

View File

@@ -30,7 +30,11 @@ export const env = {
RABBITMQ_RETRY_INTERVAL: process.env.RABBITMQ_INTERVAL,
RABBITMQ_VHOST: String(process.env.RABBITMQ_VHOST),
APP_PORT: Number(process.env.APP_PORT),
APP_HOST: String(process.env.APP_HOST),
// ESPECIFICO NOS
NOS_BASE_URL: String(process.env.NOS_BASE_URL)
NOS_BASE_URL: String(process.env.NOS_BASE_URL),
NOS_ACCESS_TOKEN: String(process.env.NOS_ACCESS_TOKEN)
};

View File

@@ -1,6 +1,6 @@
import { RabbitMQEventBus, RMQConnectionParams } from "sim-shared/infrastructure/RabbitMQEventBus.js"
import { Channel } from "amqp-connection-manager"
import { env } from "./env/index.js"
import { env } from "./env/env.js"
const rmqUser = env.RABBITMQ_USER
const rmqPass = env.RABBITMQ_PASSWORD
@@ -18,24 +18,28 @@ export const rmqConnOptions = <RMQConnectionParams>{
secure: rmqSecure,
}
const QUEUES = {
NOS: "sim.nos",
NOSDLX: "sim.nos.dlx",
NOSDEL: "sim.nos.delayed",
}
const EXCHANGES = {
MAIN: "sim.exchange",
DLX: "sim.ex.nos.dlx",
DEL: "sim.ex.nos.delayed"
}
export const rabbitmqEventBus = new RabbitMQEventBus({
connectionParams: rmqConnOptions,
buildStructure: buildQueues,
maxRetry: 5
maxRetry: 2,
delayedExchange: EXCHANGES.DEL,
dlxExchange: EXCHANGES.DLX
})
async function buildQueues(channel: Channel) {
const QUEUES = {
NOS: "sim.nos",
NOSDLX: "sim.nos.dlx",
NOSDEL: "sim.nos.delayed",
}
const EXCHANGES = {
MAIN: "sim.exchange",
DLX: "sim.ex.nos.dlx",
DEL: "sim.ex.nos.delayed"
}
const DELAY = 10 * 1000
const BASE_NOS_KEY = "sim.nos.#"
@@ -60,7 +64,6 @@ async function buildQueues(channel: Channel) {
await channel.bindQueue(QUEUES.NOSDEL, EXCHANGES.DEL, BASE_NOS_KEY)
// Cola nos -> main exchange
await channel.bindQueue(QUEUES.NOS, EXCHANGES.MAIN, BASE_NOS_KEY)
}
export async function startRMQClient() {

View File

@@ -0,0 +1,18 @@
import { Pool, QueryResult } from 'pg';
import { PgClient } from 'sim-shared/infrastructure/PgClient.js'
import { env } from './env/env.js';
// Configuracion de la conexion a la BDD, deberia ser la
// Misma para todos los servicios pero hasta que se unifique todo
// se hace una por servicio.
export const pgPool = new Pool({
user: env.POSTGRES_USER,
host: env.POSTGRES_HOST,
database: env.POSTGRES_DATABASE,
password: env.POSTGRES_PASSWORD,
port: Number(env.POSTGRES_PORT) || 5433,
});
export const pgClient = new PgClient({
pool: pgPool
})

View File

@@ -0,0 +1,131 @@
export namespace NosApi {
export type ActivationData = {
/**
The unique physical subscriber identifier:
Cellular - the ICCID
Non - IP - the EUI
Satellite - the IMEI
*/
physicalId: string,
/**
example: 447000000001
The unique network subscriber identifier:
Cellular subscriber - the MSISDN
Non - IP subscriber - the Device EUI
Satellite subscriber - the Subscription ID
*/
subscriberId: string,
/**
example: 9999
If the subscriber uses Circuit Switched Data(CSD), this field displays its data number.If the subscriber does not use CSD, this field is null.
*/
dataNumber: string
/**
example: 172.0.0.1
The subscriber IP address.
*/
ip: string
/**
example: 234150000000001
The subscriber IMSI.
*/
imsi: string
}
type OkResponse<T> = {
error?: undefined | null,
content: T
}
type ErrorResponse<E = GeneralError> = {
content?: undefined | null,
error: E
}
type GeneralError = {
children?: string[],
code: string,
message: string
}
export type ActivateResponseOK = OkResponse<ActivationData>
export type ActivateResponseError = ErrorResponse
export type ActivateResponse = ActivateResponseOK | ActivateResponseError
export type LineDataResponseOK = OkResponse<LineData>
export type LineDataResponseError = ErrorResponse
export type LineDataResponse = LineDataResponseOK | LineDataResponseError
export type PageDataResponseOk = OkResponse<LineData[]>
export type PageDataResponseError = OkResponse<LineData[]>
export type PageResponse = PageDataResponseOk | PageDataResponseError
export type LineData = {
physicalId: string
subscriberId: string
imsi: string
nickname: string
operatorCode: string
tariffName: string
lineRental: number
contractLength: number
isBarred: boolean
isActive: boolean
terminateDate: any
groupId: number
subscriberType: any
connectionDate: string
expiryDate: string
networkState: NetworkState
billingState: BillingState
operatorName: string
imei: string
dataUsage?: number
eid?: string
smdpProvider?: string
related?: {
parent: RelatedItem,
profiles: RelatedItem[]
}
}
type RelatedItem = {
physicalId: string
isEnabledOnParent: boolean
}
export type NetworkState = {
currentStateId: number
currentState: string
isTransferring: boolean
lastTransferred: number
isOnline: boolean
lastSeenOnline: number
}
export type BillingState = {
currentStateId: number
currentState: string
}
export type BarData = {
product: string
description: string
enabled: boolean
}
export type BarResponseOk = OkResponse<BarData>
export type BarResponseError = ErrorResponse
export type BarResponse = BarResponseOk | BarResponseError
}

View File

@@ -1,14 +1,74 @@
import { startRMQClient } from "#config/eventBus.config.js"
import express from "express"
import cors from 'cors';
import { SimNosRouter } from "./aplication/SimNOS.router.js"
import { SimNosController } from "./aplication/SimNOS.controller.js"
import { SimNosUsecases } from "./aplication/SimNOS.usecases.js"
import { NosHttpClient } from "./infrastructure/NosHttpClient.js"
import { env } from "#config/env/env.js"
import { NosRepository } from "./infrastructure/NosRepository.js"
import { OrderRepository } from "sim-shared/infrastructure/OrderRepository.js";
import { pgClient } from "#config/postgreConfig.js";
import { startRMQClient } from "#config/eventBus.config.js";
const RMQ_QUEUE = "sim.nos"
const NOS_BASE_URL = env.NOS_BASE_URL
const PORT = env.APP_PORT
const HOSTNAME = env.APP_HOST
async function startWorker() {
// Instancia de dependencias
const rmqClient = await startRMQClient()
const nosHttpClient = new NosHttpClient(
NOS_BASE_URL
)
const nosRepository = new NosRepository(
nosHttpClient
)
const orderRepository = new OrderRepository(
pgClient
)
const simUsecases = new SimNosUsecases(
nosHttpClient,
nosRepository,
orderRepository
)
const simController = new SimNosController(
simUsecases,
rmqClient
)
rmqClient.consume("sim.nos", simController.recibeMsg)
const simRouter = new SimNosRouter(
simController,
rmqClient
)
// RMQ
rmqClient.consume(RMQ_QUEUE, simRouter.route)
.then(() => console.log("Cliente rmq creado con exito"))
.catch(e => console.error("Error conectando con RABBITMQ", e))
// Express
const app = express()
app.use(cors());
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
app.get("/select", simController.selectREST())
app.get("/selectPage", simController.selectPageREST())
app.listen(PORT, HOSTNAME, (e) => {
if (e == undefined) {
console.log("[o] Servidor iniciado en el puerto %d", PORT)
} else {
console.error("Error express ", e)
}
})
}
startWorker()

View File

@@ -0,0 +1,41 @@
import axios, { AxiosInstance } from "axios";
import { env } from "#config/env/env.js"
export class NosHttpClient {
public client: AxiosInstance;
constructor(
private baseURL: string,
//private jwtManager: JWTProvider<any>
) {
this.client = axios.create({
baseURL: baseURL
})
// Interceptor para los headers fijos
this.client.interceptors.request.use(
async (config) => {
// Configuracion especifica de NOS (El token simepre es el mismo?)
const token = env.NOS_ACCESS_TOKEN;
config.headers.Authorization = `Bearer ${token}`
config.headers.set("content-type", "application/json")
return config
},
(error) => Promise.reject(error)
)
}
get post() {
return this.client.post
}
get patch() {
return this.client.patch
}
get get() {
return this.client.get
}
}

View File

@@ -0,0 +1,177 @@
import { Result } from "sim-shared/domain/Result.js";
import { NosHttpClient } from "./NosHttpClient.js";
import { NosApi } from "#domain/NosAPI.js";
import axios, { AxiosError, AxiosResponse } from "axios";
export class NosRepository {
constructor(
private httpClient: NosHttpClient
) {
}
/**
* E => Tipo de error
* T => Tipo de dato para cod 200
*
* TODO:
* - Mejor gestion de los errores
* - E no se aplica todavia por no hacer la transformacion del error
*/
private async manageNosRequest<E, T>(promise: Promise<AxiosResponse<T>>): Promise<Result<string, T>> {
try {
const res = await promise
return {
data: res.data
}
} catch (e) {
if (axios.isAxiosError(e)) {
const error = e as AxiosError
return {
error: error.code + " : " + String(error.response?.statusText)
}
} else {
return {
error: String(e)
}
}
}
}
public async getLineInfo(iccid: string): Promise<Result<string, NosApi.LineData>> {
const PATH = "/subscribers/" + iccid
console.log("PAth", PATH)
const lineRequest = this.httpClient.get<NosApi.LineDataResponseOK>(PATH)
const lineResponse = await this.manageNosRequest<string, NosApi.LineDataResponseOK>(lineRequest)
if (lineResponse.error != undefined) {
return lineResponse
} else {
return {
data: lineResponse.data.content
}
}
}
/**
* El metodo de NOS de paginar las lineas
* maximo por pagina 100, default 25
* no devuelve el offset ni el numero de elementos restantes
* hay que llevar la cuenta
*/
public async getLinePage(args: {
limit?: number,
offset?: number,
filter?: string,
orderBy?: string
}): Promise<Result<string, any>> {
const PATH = "/subscribers"
const LIMIT = 100
const options = {
limit: args.limit ?? LIMIT,
offset: args.offset ?? 0,
filter: args.filter,
orderBy: args.orderBy
}
const pageRequest = this.httpClient.get(PATH, {
params: options
})
const pageResponse = await this.manageNosRequest<string, any>(pageRequest)
if (pageResponse.error != undefined) {
return pageResponse
} else {
return {
data: pageResponse.data.content
}
}
}
public async getLinesInfo(iccid: string[]) /*Promise<Result<string, NosApi.LineData>>*/ {
throw new Error("NOS no permite buscar iccid en bulk, se puede hacer un apaño pero está en proceso")
const PATH = "/subscribers"
const LIMIT = 100
const steps = Math.ceil(iccid.length / LIMIT)
const options = {
limit: LIMIT,
offset: 0,
}
const req = this.httpClient.post<NosApi.LineDataResponseOK>(PATH)
const resp = await this.manageNosRequest<string, NosApi.LineDataResponseOK>(req)
if (resp.error != undefined) {
return resp
} else {
return {
//@ts-expect-error
data: resp.data.content
}
}
}
public async activateSim(iccid: string): Promise<Result<string, NosApi.ActivationData>> {
const PATH = '/provisioning'
const PRODUCT_ID = 1330 // No se que es, preguntar a Ivan
const data = {
productSetId: PRODUCT_ID
}
const req = this.httpClient.post<NosApi.ActivateResponseOK>(PATH, data)
const resp = await this.manageNosRequest<string, NosApi.ActivateResponseOK>(req)
if (resp.error != undefined) {
return resp
} else {
return {
data: resp.data.content
}
}
}
/**
* "A bar is a service provisioning action that results in a subscriber being blocked from accessing an operator's network. The bar remains in place until the operator is sent an unbar request."
* Se entiende que un "bar" es una suspension temporal
*/
public async bar(iccid: string) {
const PATH = `/subscribers/${iccid}/products`
const data = {
product: "BAR DN TOTAL",
action: "enable"
}
const req = this.httpClient.patch<NosApi.BarResponseOk>(PATH, data)
const resp = await this.manageNosRequest<string, NosApi.BarResponseOk>(req)
if (resp.error != undefined) {
return resp
} else {
return {
data: resp.data.content
}
}
}
public async unbar(iccid: string) {
const PATH = `/subscribers/${iccid}/products`
const data = {
product: "BAR DN TOTAL",
action: "disable"
}
const req = this.httpClient.patch<NosApi.BarResponseOk>(PATH, data)
const resp = await this.manageNosRequest<string, NosApi.BarResponseOk>(req)
if (resp.error != undefined) {
return resp
} else {
return {
data: resp.data.content
}
}
}
}

View File

@@ -1,8 +1,42 @@
{
"name": "sim-consumidor-nos",
"version": "1.0.0",
"type": "module",
"description": "consumidor generico de eventos de NOS",
"main": "index.ts",
"imports": {
"#config/*.js": {
"types": "./config/*.ts",
"default": "./config/*.js"
},
"#config/*": {
"types": "./config/*.ts",
"default": "./config/*.js"
},
"#infrastructure/*.js": {
"types": "./infrastructure/*.ts",
"default": "./infrastructure/*.js"
},
"#infrastructure/*": {
"types": "./infrastructure/*.ts",
"default": "./infrastructure/*.js"
},
"#domain/*.js": {
"types": "./domain/*.ts",
"default": "./domain/*.js"
},
"#domain/*": {
"types": "./domain/*.ts",
"default": "./domain/*.js"
},
"#aplication/*.js": {
"types": "./aplication/*.ts",
"default": "./aplication/*.js"
},
"#aplication/*": {
"types": "./aplication/*.ts",
"default": "./aplication/*.js"
}
},
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"build": "yarn tsc --project tsconfig.json && yarn tsc-alias && cp package.json ../../dist/packages/sim-consumidor-nos/",
@@ -13,54 +47,13 @@
"author": "",
"license": "ISC",
"packageManager": "yarn@4.12.0",
"imports": {
"#config/*.js": {
"types": "./config/*.ts",
"default": "./config/*.js"
},
"#config/*": {
"types": "./config/*.ts",
"default": "./config/*.js"
},
"#adapters/*.js": {
"types": "./adapters/*.ts",
"default": "./adapters/*.js"
},
"#adapters/*": {
"types": "./adapters/*.ts",
"default": "./adapters/*.js"
},
"#domain/*.js": {
"types": "./domain/*.ts",
"default": "./domain/*.js"
},
"#domain/*": {
"types": "./domain/*.ts",
"default": "./domain/*.js"
},
"#ports/*.js": {
"types": "./ports/*.ts",
"default": "./ports/*.js"
},
"#ports/*": {
"types": "./ports/*.ts",
"default": "./ports/*.js"
},
"#tests/*.js": {
"types": "./__tests__/*.ts",
"default": "./__tests__/*.js"
},
"#tests/*": {
"types": "./__tests__/*.ts",
"default": "./__tests__/*.js"
}
},
"dependencies": {
"@tsconfig/node22": "*",
"amqplib": "^0.10.9",
"cors": "*",
"dotenv": "*",
"express": "*",
"sim-shared": "sim-shared:*",
"typescript": "*"
},
"devDependencies": {
@@ -71,6 +64,7 @@
"@types/supertest": "*",
"prettier": "*",
"supertest": "*",
"tsc-alias": "^1.8.16",
"tsx": "*",
"vitest": "*"
}

View File

@@ -0,0 +1,11 @@
# NOS
## Particularidades de las operaciones de NOS
- Documentación de la API: [DOC](https://pelion-help.iot-x.com/nos/en/Content/API/APIReference/API%20Reference.htm?tocpath=_____7)
- No se necesita la pre-activación de las SIM.
- La suspensión y reactivación se llama "bar" y "unbar".
- El token de Authentication dura exactamente 1 año, solo se puede refrescar
desde la web.
- En la documentación la URL de la API es <https://nos-api.iot-x.com> pero la
de producción es <https://nosconnectcenter-api.iot-x.com>.

View File

@@ -0,0 +1,5 @@
## ENV PARA DATOS DE TEST - shared nunca se lanza en produccion
NOTIFICATION_URL="https://sf-sim-activation.savefamilygps.net/send-activation-mail"
# NOTIFICATION_URL="localhost"
SIM_ACTIVATION_API_KEY=9e48c4ac-1ab0-4397-b3f3-6c239200dfe6

View File

@@ -2,16 +2,40 @@
"extends": "../../tsconfig.json",
"compilerOptions": {
"outDir": "../../dist",
"baseUrl": ".",
"rootDir": "../../",
"paths": {
"#config/*": [
"./config/*"
],
"#infrastructure/*": [
"./infrastructure/*"
],
"#domain/*": [
"./domain/*"
],
"#aplication/*": [
"./aplication/*"
],
"config/*": [
"./config/*"
],
"infrastructure/*": [
"./infrastructure/*"
],
"domain/*": [
"./domain/*"
]
}
},
"exclude": [
"node_modules"
],
"include": [
"**/*.ts",
"src/**/*.d.ts"
"**/*.d.ts",
"../../packages/sim-shared/**/*.ts"
],
"files": [
"index.ts"
]
}
}

View File

@@ -1,4 +1,6 @@
# claves de Objenious
HOST=0.0.0.0
OBJ_PEM_PATH=./obj.pem
OBJ_AUTHORIZATION=XOc7FtwXD8hUX2SFVX94XSty8wkOmChkwDNF09O_aIxPubMDdFUdCDCB4zpzSIxi8nOcTg7r_LM_nmd5qm7uLbksf_XArjI8iAyhjKz_2BAXPhmvKs4Fc9f3vv5LDfCVrPB9lP8P7rJ66_qnWs4jvhLQxSfn29m96hgXeCf8oySdIDUjN2q9Js3KAS5LL52Ri6ryvUeO1PvMhaPQMWRqoHIqTV1wPfPtiqQwcjUPmu5GeW164Kq1JLgV3KaGzfCZ9Qv9lbv30EJrukXxWuLCAhBS0kzrBXZoWvf2pb9uh3Am_93_dDxiIGQfIap9ZU_m8ZD1HPgvZOMCY6ZkxQconQ
OBJ_CLI_ASSERTION=XOc7FtwXD8hUX2SFVX94XSty8wkOmChkwDNF09O_aIxPubMDdFUdCDCB4zpzSIxi8nOcTg7r_LM_nmd5qm7uLbksf_XArjI8iAyhjKz_2BAXPhmvKs4Fc9f3vv5LDfCVrPB9lP8P7rJ66_qnWs4jvhLQxSfn29m96hgXeCf8oySdIDUjN2q9Js3KAS5LL52Ri6ryvUeO1PvMhaPQMWRqoHIqTV1wPfPtiqQwcjUPmu5GeW164Kq1JLgV3KaGzfCZ9Qv9lbv30EJrukXxWuLCAhBS0kzrBXZoWvf2pb9uh3Am_93_dDxiIGQfIap9ZU_m8ZD1HPgvZOMCY6ZkxQconQ

View File

@@ -10,6 +10,7 @@ import { OrderRepository } from "sim-shared/infrastructure/OrderRepository.js";
import { PauseCancelTaskRepository } from "#adapters/PauseCancelTaskRepository.js";
import { ObjeniousOperationsRepository } from "sim-shared/infrastructure/ObjeniousOperationRepository.js";
import { ActionData } from "#domain/DTOs/objeniousapi.js";
import { ObjeniousLinesRepository } from "sim-shared/infrastructure/ObjeniousLinesRepository.js";
describe("SimController Integration Tests (Real UseCases)", () => {
let eventBusMock: any;
@@ -32,11 +33,13 @@ describe("SimController Integration Tests (Real UseCases)", () => {
);
const orderRepository = new OrderRepository(postgrClient);
const pauseRepository = new PauseCancelTaskRepository(postgrClient);
const linesRepository = new ObjeniousLinesRepository(postgrClient) // tiene que apuntar a "intranet"
useCases = new SimUseCases({
httpClient: httpInstance,
operationRepository: operationRepository,
orderRepository: orderRepository,
pauseRepository: pauseRepository
pauseRepository: pauseRepository,
objeniousLinesRepository: linesRepository
});
// @ts-expect-error
useCases.findActivationDate = async (data: ActionData) => new Date()

View File

@@ -4,6 +4,9 @@ import { SimUseCases } from "./Sim.usecases.js";
import { SimEvents } from "sim-shared/domain/SimEvents.js";
import { Result } from "sim-shared/domain/Result.js";
import { ActionData } from "#domain/DTOs/objeniousapi.js";
import { Request, Response } from "express"
import { PaginationArgs, QueryPaginationArgs } from "sim-shared/domain/PaginationArgs.js";
import { paginationValidator } from "./httpValidators.js";
/**
* La clase usa generadores de funciones para mantener el contexto
@@ -21,7 +24,6 @@ export class SimController {
) {
this.eventBus = eventBus
this.useCases = useCases
}
private decodeMsg(msg: ConsumeMessage): object | undefined {
@@ -109,7 +111,7 @@ export class SimController {
return async (msg: ConsumeMessage) => {
let msgData;
try {
msgData = this.validateMsg(msg) as SimEvents.pause
msgData = this.validateMsg(msg) as SimEvents.suspend
} catch (e) {
throw new Error("Error de preactivacion consumiendo el mensaje no es valido" + String(e))
}
@@ -136,7 +138,7 @@ export class SimController {
return async (msg: ConsumeMessage) => {
let msgData;
try {
msgData = this.validateMsg(msg) as SimEvents.pause
msgData = this.validateMsg(msg) as SimEvents.suspend
} catch (e) {
throw new Error("Error de reactivacion consumiendo el mensaje no es valido" + String(e))
}
@@ -165,7 +167,7 @@ export class SimController {
return async (msg: ConsumeMessage) => {
let msgData;
try {
msgData = this.validateMsg(msg) as SimEvents.pause
msgData = this.validateMsg(msg) as SimEvents.suspend
} catch (e) {
throw new Error("Error de suspension consumiendo el mensaje no es valido" + String(e))
}
@@ -195,7 +197,7 @@ export class SimController {
return async (msg: ConsumeMessage) => {
let msgData;
try {
msgData = this.validateMsg(msg) as SimEvents.pause
msgData = this.validateMsg(msg) as SimEvents.suspend
} catch (e) {
throw new Error("Error consumiendo el mensaje no es valido" + String(e))
}
@@ -220,6 +222,36 @@ export class SimController {
}
}
public queryLines() {
const DEFAULT_LIMIT = 1000
const DEFAULT_OFFSET = 0
return async (req: Request, res: Response) => {
const queryParams = req.query
const paginationArgs: QueryPaginationArgs = {
limit: queryParams.limit as string | undefined,
offset: queryParams.offset as string | undefined
}
const validationRes = paginationValidator.validate(paginationArgs)
if (validationRes.error != undefined) {
res.status(402).json(validationRes)
return;
}
const paginationValues = {
limit: (queryParams.limit != undefined) ? Number(queryParams.limit) : DEFAULT_LIMIT,
offset: (queryParams.offset != undefined) ? Number(queryParams.offset) : DEFAULT_OFFSET
}
const status = req.query.status
const queryRes = await this.useCases.getLinesByQuery({ status: status as string | undefined }, paginationValues)
res.json(queryRes)
}
}
/**
* TODO:
* - Loguear motivos de la no validacion

View File

@@ -7,6 +7,9 @@ import assert from "node:assert"
import { OrderRepository } from "sim-shared/infrastructure/OrderRepository.js"
import { CreatePauseCancelTaskDTO, PauseCancelTaskRepository } from "#adapters/PauseCancelTaskRepository.js"
import { ObjeniousOperationsRepository } from "sim-shared/infrastructure/ObjeniousOperationRepository.js"
import { ObjeniousLinesRepository } from "sim-shared/infrastructure/ObjeniousLinesRepository.js"
import { error } from "node:console"
import { ObjeniousLine, ObjeniousLineDb } from "sim-shared/domain/objeniousLine.js"
// TODO:
// - Pasar a un archivo de DTOs
@@ -17,17 +20,19 @@ export class SimUseCases {
private readonly objeniousRepository: ObjeniousOperationsRepository
private readonly orderRepository: OrderRepository
private readonly pauseRepository: PauseCancelTaskRepository
private readonly objeniousLinesRepository: ObjeniousLinesRepository
constructor(args: {
httpClient: HttpClient,
operationRepository: ObjeniousOperationsRepository,
orderRepository: OrderRepository,
pauseRepository: PauseCancelTaskRepository
pauseRepository: PauseCancelTaskRepository,
objeniousLinesRepository: ObjeniousLinesRepository
}) {
this.httpClient = args.httpClient
this.objeniousRepository = args.operationRepository
this.orderRepository = args.orderRepository
this.pauseRepository = args.pauseRepository
this.objeniousLinesRepository = args.objeniousLinesRepository
}
private async logOperation(data: ObjeniousOperation) {
@@ -119,7 +124,6 @@ export class SimUseCases {
const iccid = activationData.identifier.identifiers
// Comporbación excepcional para saber si la linea está suspendida
const statusLinea = await this.objeniousRepository.getLinesAPI("ICCID", [String(iccid)])
console.log("statusLinea, ", iccid, statusLinea)
if (statusLinea.data != undefined && statusLinea.data[0].status.networkStatus == "SUSPENDED") {
const res = await this.reActivate(activationData)()
return res;
@@ -269,7 +273,7 @@ export class SimUseCases {
/**
* Metodo muy especifico para obtener la fecha e activacion o en su defecto
* la actual para aber cuando se va a completar el periodo de test de una linea
* la actual para saber cuando se va a completar el periodo de test de una linea
*/
private async findActivationDate(actionData: ActionData) {
const iccid = actionData.identifier.identifiers
@@ -278,7 +282,7 @@ export class SimUseCases {
// Si no se pueden sacar datos de la linea guardo momentaneamente el error
// pero no se cancela la operacion, el error puede ser de objenious y no nos
// puede afectar
console.log("LineData", lineData.data)
//console.log("LineData", lineData.data)
if (lineData.error != undefined) {
console.error(lineData.error)
} else {
@@ -321,13 +325,14 @@ export class SimUseCases {
}
// TODO REGISTRAR EL ORDER
/*
if (correlation_id != undefined) {
await this.orderRepository.createOrder({
correlation_id: correlation_id,
order_type: "pause"
})
}
*/
let activationDate;
try {
activationDate = await this.findActivationDate(suspendData)
@@ -434,10 +439,53 @@ export class SimUseCases {
identifier: terminationData.identifier
},
url: OPERATION_URL,
iccid: terminationData.identifier.identifiers[0], //
iccid: terminationData.identifier.identifiers[0],
operation: "terminate"
})
}
/**
* Calcula el tiempo que una linea ha estado en suspensión
*/
public async getSuspendedTime(iccid: string):
Promise<Result<string, { total_milliseconds: number, total_days: number }>> {
try {
const result = await this.objeniousRepository.getSuspendedTime(iccid);
if (result.error !== undefined) {
return { error: result.error as string, data: undefined };
}
return {
data: {
total_milliseconds: result.data!.total_milliseconds,
total_days: result.data!.total_days
}
};
} catch (error) {
console.error("[Sim.usecases] Error getting suspended time", error);
return { error: "Error getting suspended time", data: undefined };
}
}
/**
* Busqueda de líneas **en nuestro volcado** según una query y con paginacion
*/
public async getLinesByQuery(query: { status?: string | undefined }, pagination: { limit: number, offset: number })
: Promise<Result<string, {
data: ObjeniousLineDb[],
offset: number,
rowCount: number
}>> {
try {
const linesQuery = await this.objeniousLinesRepository.getLinesByStatus(query, pagination)
return {
data: linesQuery,
}
} catch (e) {
return {
error: String(e)
}
}
}
}

View File

@@ -0,0 +1,18 @@
import { BodyValidator, Validator } from "sim-shared/aplication/BodyValidator.js";
import { QueryPaginationArgs } from "sim-shared/domain/PaginationArgs.js";
const limitPositiveOrUndefined = <Validator<QueryPaginationArgs>>{
field: "limit",
validationFunc: (args) => (args.limit == undefined || !isNaN(+args.limit) && parseInt(args.limit) >= 0),
errorMsg: "El campo limit debe ser un numero o undefined (default 0)"
}
const offsetPositiveOrUndefined = <Validator<QueryPaginationArgs>>{
field: "offset",
validationFunc: (args) => (args.offset == undefined || isNaN(+args.offset) && parseInt(args.offset) >= 1),
errorMsg: "El campo offset debe ser un numero o undefined (default 0)"
}
export const paginationValidator = new BodyValidator<QueryPaginationArgs & {}>([
limitPositiveOrUndefined,
offsetPositiveOrUndefined
])

View File

@@ -4,7 +4,10 @@ import path from "node:path";
loadEnvFile(path.join("../../.env")) // Global
loadEnvFile(path.join("./.env")) // base
export const env = {
PORT: parseInt(process.env.OBJENIOUS_CONSUMER_PORT || "3002"),
ENVIRONMENT: process.env.ENVIORMENT,
POSTGRES_USER: process.env.POSTGRES_USER,
POSTGRES_PASSWORD: process.env.POSTGRES_PASSWORD,

View File

@@ -18,24 +18,26 @@ export const rmqConnOptions = <RMQConnectionParams>{
secure: rmqSecure,
}
export const QUEUES = {
OBJ: "sim.objenious",
OBJDLX: "sim.objenious.dlx",
OBJDEL: "sim.objenious.delayed",
}
export const EXCHANGES = {
MAIN: "sim.exchange",
DLX: "sim.ex.objenious.dlx",
DEL: "sim.ex.objenious.delayed"
}
export const rabbitmqEventBus = new RabbitMQEventBus({
connectionParams: rmqConnOptions,
buildStructure: buildQueues,
maxRetry: 5
maxRetry: 5,
delayedExchange: EXCHANGES.DEL,
dlxExchange: EXCHANGES.DLX
})
async function buildQueues(channel: Channel) {
const QUEUES = {
OBJ: "sim.objenious",
OBJDLX: "sim.objenious.dlx",
OBJDEL: "sim.objenious.delayed",
}
const EXCHANGES = {
MAIN: "sim.exchange",
DLX: "sim.ex.objenious.dlx",
DEL: "sim.ex.objenious.delayed"
}
const DELAY = 10 * 1000
const BASE_OBENIOUS_KEY = "sim.objenious.#"
@@ -55,9 +57,9 @@ async function buildQueues(channel: Channel) {
})
// Cola dead-letter
await channel.bindQueue(QUEUES.DLX, EXCHANGES.DLX, "sim.objenious.#")
await channel.bindQueue(QUEUES.OBJDLX, EXCHANGES.DLX, "sim.objenious.#")
// Cola delay
await channel.bindQueue(QUEUES.DEL, EXCHANGES.DEL, BASE_OBENIOUS_KEY)
await channel.bindQueue(QUEUES.OBJDEL, EXCHANGES.DEL, BASE_OBENIOUS_KEY)
// Cola objenious -> main exchange
await channel.bindQueue(QUEUES.OBJ, EXCHANGES.MAIN, BASE_OBENIOUS_KEY)

View File

@@ -0,0 +1,20 @@
/**
* Cliente de postgres para la intranet. Se usa solo porque hace falta para el
* volcado de datos, si se usa en mas partes algo estás haciendo mal.
*/
import { Pool } from 'pg';
import { PgClient } from 'sim-shared/infrastructure/PgClient.js'
import { env } from './env/index.js';
export const pgPoolIntranet = new Pool({
user: env.POSTGRES_USER,
host: env.POSTGRES_HOST,
database: "intranet",
password: env.POSTGRES_PASSWORD,
port: Number(env.POSTGRES_PORT) || 5432,
});
export const postgresClientIntranet = new PgClient({
pool: pgPoolIntranet
})

View File

@@ -10,6 +10,13 @@ import { SimRouter } from "./aplication/Sim.router.js"
import { OrderRepository } from "sim-shared/infrastructure/OrderRepository.js"
import { PauseCancelTaskRepository } from "#adapters/PauseCancelTaskRepository.js"
import express from "express"
import cors from "cors"
import assert from "node:assert";
import { env } from "#config/env/index.js"
import { ObjeniousLinesRepository } from "sim-shared/infrastructure/ObjeniousLinesRepository.js"
import { postgresClientIntranet } from "#config/intranetPostgresConfig.js"
async function startWorker() {
const rmqClient = await startRMQClient()
@@ -26,20 +33,76 @@ async function startWorker() {
const orderRepository = new OrderRepository(pgClient)
const pauseRepository = new PauseCancelTaskRepository(pgClient)
const simActivationController = new SimController(
rmqClient,
new SimUseCases({
httpClient: httpClient,
operationRepository: operationRepository,
orderRepository: orderRepository,
pauseRepository: pauseRepository
})
const linesRepository = new ObjeniousLinesRepository(
postgresClientIntranet
)
const simRouter = new SimRouter(simActivationController, rmqClient)
const simUseCases = new SimUseCases({
httpClient: httpClient,
operationRepository: operationRepository,
orderRepository: orderRepository,
pauseRepository: pauseRepository,
objeniousLinesRepository: linesRepository
})
const simController = new SimController(
rmqClient,
simUseCases
)
const simRouter = new SimRouter(simController, rmqClient)
// de momento solo una cola por simplificar
rmqClient.consume("sim.objenious", simRouter.route)
// Servidor express
const port = env.PORT
const app = express()
app.use(cors())
app.use(express.json())
app.get("/health", async (req, res) => {
res.json({ ok: "true" })
})
// TODO: meter el template de controller con los validadores
app.get("/lines/:iccid/suspended-time", async (req, res) => {
const iccid = req.params.iccid
if (!iccid) {
res.status(400).json({ error: "iccid is required" })
return
}
const result = await simUseCases.getSuspendedTime(iccid)
if (result.error !== undefined) {
res.status(500).json({ error: result.error })
return
}
res.json(result) // {data:{...}} || {error:{...}}
})
/**
* Opciones query:
* - state
*
* Respuestas:
* - OK: data: {
* lines: ObjeniousLineDb[],
* offset: number,
* rowCount: number
* }
*
* - ERR: error: {
* message: string
* }
*/
app.get("/lines", simController.queryLines())
assert.ok(port, "Puerto del servicio no definido")
app.listen(port, () => {
console.log(`[o] HTTP server listening on port ${port}`)
})
}
startWorker()

View File

@@ -124,3 +124,5 @@ export class PauseCancelTaskRepository {
}
}
export default PauseCancelTask

View File

@@ -13,10 +13,10 @@
"types": "./config/*.ts",
"default": "./config/*.js"
},
"#shared/*.js": {
"sim-shared/*.js": {
"default": "../sim-shared/*.js"
},
"#shared/*": {
"sim-shared/*": {
"default": "../sim-shared/*.js"
},
"#adapters/*.js": {

View File

@@ -9,6 +9,10 @@
],
"include": [
"**/*.ts",
"**/*.d.ts",
"../../packages/sim-shared/**/*.ts"
],
"files": [
"index.ts"
]
}

View File

@@ -1,8 +1,8 @@
import { BodyValidator } from "sim-shared/aplication/BodyValidator.js"
import { OrderUsecases } from "./Order.usecases.js"
import { Request, Response } from "express"
import { PaginationArgs } from "#domain/common.js"
import { idValidator, uuidValidator } from "./httpValidators.js"
import { PaginationArgs } from "sim-shared/domain/PaginationArgs.js"
export class OrderController {
private orderUseCases: OrderUsecases

View File

@@ -1,4 +1,4 @@
import { PaginationArgs } from "#domain/common.js";
import { PaginationArgs } from "sim-shared/domain/PaginationArgs.js";
import { OrderRepository } from "sim-shared/infrastructure/OrderRepository.js";

View File

@@ -3,7 +3,7 @@ import { SimUsecases } from "./Sim.usecases.js"
import { activationValidator, iccidValidator } from "./httpValidators.js"
import { companyFromIccid } from "#domain/companies.js"
import { BodyValidator } from "sim-shared/aplication/BodyValidator.js"
import { tryCatch } from "packages/sim-shared/domain/Result.js"
import { tryCatch } from "sim-shared/domain/Result.js"
export class SimController {

View File

@@ -1,10 +1,10 @@
import { OrderRepository } from "sim-shared/infrastructure/OrderRepository.js";
import { Result } from "sim-shared/domain/Result.js";
import assert from "node:assert";
import { EventBus } from "sim-shared/domain/EventBus.port";
import { SimEvents } from "sim-shared/domain/SimEvents";
import { SimEvents } from "sim-shared/domain/SimEvents.js";
import { uuidv7 } from "uuidv7";
import { CreateOrderDTO, OrderTracking, OrderType, OrderTypeOptions } from "sim-shared/domain/Order.js";
import { EventBus } from "sim-shared/domain/EventBus.port.js";
/**
* Casos de uso de tarjetas sim. Garantiza que todos los metodos usan el mismo bus de mensajes

View File

@@ -3,6 +3,7 @@ import path from "node:path";
loadEnvFile(path.join("../../.env")) // Global
//loadEnvFile(path.join("./.env")) // Global
export const env = {
ENVIRONMENT: process.env.ENVIORMENT,
@@ -22,5 +23,7 @@ export const env = {
RABBITMQ_SECURE: process.env.RABBITMQ_SECURE,
RABBITMQ_RETRY_INTERVAL: process.env.RABBITMQ_INTERVAL,
RABBITMQ_VHOST: String(process.env.RABBITMQ_VHOST),
CONNECTIONS_URL: String(process.env.CONNECTIONS_URL)
CONNECTIONS_URL: String(process.env.CONNECTIONS_URL),
OBJENIOUS_CONSUMER_URL: process.env.OBJENIOUS_CONSUMER_URL,
NOS_CONSUMER_URL: process.env.NOS_CONSUMER_URL,
};

View File

@@ -18,7 +18,10 @@ export const rmqConnOptions = <RMQConnectionParams>{
}
export const rabbitmqEventBus = new RabbitMQEventBus({
connectionParams: rmqConnOptions
connectionParams: rmqConnOptions,
// La entrada de eventos no tiene que definir exchanges de dlx o delay pero es obligatorio
delayedExchange: "-",
dlxExchange: "-"
})
export async function startRMQClient() {

View File

@@ -1,6 +0,0 @@
export type PaginationArgs = {
limit?: number,
offset?: number,
start?: number
}

View File

@@ -6,6 +6,7 @@ import { rabbitmqEventBus } from '#config/eventBusConfig.js';
import { env } from "#config/env/index.js"
import { orderRoutes } from "#adapters/orderRoutes.http.js";
import { connectionsRoutes } from "#adapters/simconnectionsRoutes.js";
import { franceRoutes } from "#adapters/franceRoutes.http.js";
const PORT = env.API_PORT
const HOSTNAME = "0.0.0.0"
@@ -19,7 +20,6 @@ rabbitmqEventBus.connect()
console.error("[!] El cliente RMQ no se ha podido iniciar", e)
})
// Middleware
app.use(cors());
@@ -32,6 +32,9 @@ app.use("/orders", orderRoutes)
app.use("/docs", express.static(path.join(process.cwd(), '../../docs')))
// Rutas especificas para casos especiales como el tiempo de suspension de francia
app.use("/france", franceRoutes)
app.get("/health", (req, res) => {
res.status(200).json({ status: "ok" })
})
@@ -39,4 +42,5 @@ app.get("/health", (req, res) => {
app.listen(PORT, HOSTNAME, () => {
console.log("[o] Servidor iniciado en el puerto %d", PORT)
})
export default {}

View File

@@ -0,0 +1,33 @@
import { Router, Request } from "express"
import { ClientRequest, } from "http"
import { createProxyMiddleware } from "http-proxy-middleware"
import { env } from "#config/env/index.js"
import assert from "node:assert"
const franceRoutes = Router()
const FRANCE_URL = env.OBJENIOUS_CONSUMER_URL
assert.ok(FRANCE_URL, "No se ha definido una URL para el servicio consumidor de Francia")
franceRoutes.use("", createProxyMiddleware({
target: FRANCE_URL,
changeOrigin: true,
pathRewrite: {
'^/france/*': '/'
},
on: {
proxyReq: (proxyReq: ClientRequest, req: Request) => {
/* Debug de las peticiones */
const protocol = req.protocol;
const host = req.get('host');
const originalFullUrl = `${protocol}://${host}${req.originalUrl}`;
const destinationFullUrl = `${FRANCE_URL}${proxyReq.path}`;
//console.log(`[Proxy Req]: ${req.method} ${req.url} -> ${proxyReq.path}`);
}
}
}
))
//orderRoutes.get("/:iccid/suspended-time",)
export { franceRoutes }

View File

@@ -1,4 +1,4 @@
import { rabbitmqEventBus } from '#config/eventBusConfig.js';
import { rabbitmqEventBus } from '../config/eventBusConfig.js';
import { SimUsecases } from '../aplication/Sim.usecases.js';
import { SimController } from '../aplication/Sim.controller.js';
import { Router } from 'express';

View File

@@ -9,6 +9,7 @@
],
"include": [
"**/*.ts",
"**/*.d.ts",
"../../packages/sim-shared/**/*.ts"
],
"files": [

View File

@@ -6,11 +6,11 @@ import { CheckObjeniousRequests } from "./tasks/check_objenious_request.js"
import { ObjeniousOperationsRepository } from "sim-shared/infrastructure/ObjeniousOperationRepository.js"
import { OrderRepository } from "sim-shared/infrastructure/OrderRepository.js"
import { TaskVolcadoLineas } from "./tasks/volcado_lineas.js"
import { ObjeniousLinesRepository } from "./infranstructure/ObjeniousLinesRepository.js"
import { ObjeniousLinesRepository } from "sim-shared/infrastructure/ObjeniousLinesRepository.js"
import { postgresClientIntranet } from "./config/intranetPostgresConfig.js"
import { PauseCancelTaskRepository } from "packages/sim-consumidor-objenious/infrastructure/PauseCancelTaskRepository.js"
import { PauseCancelTaskRepository } from "sim-consumidor-objenious/infrastructure/PauseCancelTaskRepository.js"
import { PauseTerminateTask } from "./tasks/check_pause_terminate.js"
import { SimUseCases } from "packages/sim-consumidor-objenious/aplication/Sim.usecases.js"
import { SimUseCases } from "sim-consumidor-objenious/aplication/Sim.usecases.js"
async function startCron() {
const commonSettings = {
@@ -52,7 +52,8 @@ async function startCron() {
httpClient: httpClient,
operationRepository: operationRepository,
orderRepository: orderRepository,
pauseRepository: pauseRepo
pauseRepository: pauseRepo,
objeniousLinesRepository: objeniousLineRepository
})
const pauseTask = new PauseTerminateTask(
@@ -62,7 +63,7 @@ async function startCron() {
orderRepository
)
await objTask.getPendingOperations()
//await objTask.getPendingOperations()
const PERIODO_PETICIONES = 10 * 60 * 1000
const interval = setInterval(async () => {
try {
@@ -73,6 +74,7 @@ async function startCron() {
}, PERIODO_PETICIONES)
const PERIODO_VOLCADO = 60 * 60 * 1000
//await volcadoLineasTask.loadLines()
const volcadoInterval = setInterval(async () => {
try {
await volcadoLineasTask.loadLines()
@@ -81,7 +83,7 @@ async function startCron() {
}
}, PERIODO_VOLCADO)
await pauseTask.run()
// await pauseTask.run()
const PERIODO_CANCELACIONES = 60 * 60 * 1000;
const clacelacionesInterval = setInterval(async () => {
await pauseTask.run()

View File

@@ -3,7 +3,7 @@ import { OrderRepository } from "sim-shared/infrastructure/OrderRepository.js";
import axios from "axios";
import { IOperationsRepository, Objenious, ObjeniousOperation, ObjeniousOperationChange, StatusEnum } from "sim-shared/domain/operationsRepository.port.js";
import { HttpClient } from "sim-shared/infrastructure/HTTPClient.js";
import { ObjeniousOperationsRepository } from "packages/sim-shared/infrastructure/ObjeniousOperationRepository.js";
import { ObjeniousOperationsRepository } from "sim-shared/infrastructure/ObjeniousOperationRepository.js";
export class CheckObjeniousRequests {
constructor(
@@ -37,6 +37,11 @@ export class CheckObjeniousRequests {
// Todas las validas
const operacionesValidas = pendingOperations.data
.filter((e) => e.request_id != undefined)
.filter((e) => e.operation != "terminate" && e.operation != "suspend")
// Filtrar suspension / terminacion
// Validas sin MassId
const solicitarMassId = operacionesValidas
.filter((e) => e.mass_action_id == undefined)

View File

@@ -2,7 +2,7 @@ import { ObjeniousLine } from "sim-shared/domain/objeniousLine.js";
import { PauseCancelTaskRepository } from "sim-consumidor-objenious/infrastructure/PauseCancelTaskRepository.js";
import { ObjeniousOperationsRepository } from "sim-shared/infrastructure/ObjeniousOperationRepository.js";
import { SimUseCases } from "sim-consumidor-objenious/aplication/Sim.usecases.js";
import { OrderRepository } from "packages/sim-shared/infrastructure/OrderRepository.js";
import { OrderRepository } from "sim-shared/infrastructure/OrderRepository.js";
const logger =
{
@@ -66,6 +66,7 @@ export class PauseTerminateTask {
}
while (!lines.done) {
lines = await lineGenerator.next()
if (lines.value.error != undefined || lines.value.data == undefined) {
logger.error("Error cargando las lineas", lines.value.error)
finError(lines.value.error)
@@ -74,7 +75,6 @@ export class PauseTerminateTask {
lineasActualizadas.push(...lines.value.data)
}
lines = await lineGenerator.next()
}
console.log("Cargado: ", lineasActualizadas)
@@ -126,6 +126,7 @@ export class PauseTerminateTask {
switch (operacionTipo) {
case "suspend":
result = await this.simUsecases.suspend(actionData)()
console.log("SUSPENDED", result)
break;
case "terminate":
result = await this.simUsecases.terminate(actionData)()

View File

@@ -1,5 +1,5 @@
import { lineToCreateLineDto, ObjeniousLine } from "sim-shared/domain/objeniousLine.js";
import { ObjeniousLinesRepository } from "../infranstructure/ObjeniousLinesRepository.js";
import { ObjeniousLinesRepository } from "sim-shared/infrastructure/ObjeniousLinesRepository.js";
import { ObjeniousOperationsRepository } from "sim-shared/infrastructure/ObjeniousOperationRepository.js";
export class TaskVolcadoLineas {

View File

@@ -9,6 +9,7 @@
],
"include": [
"**/*.ts",
"**/*.d.ts",
"../../packages/sim-shared/**/*.ts",
"../../packages/sim-consumidor-objenious/**/*.ts"
]

View File

@@ -0,0 +1,14 @@
/**
* Los parametros según entran por la query pueden ser de tipo string
*/
export type QueryPaginationArgs = {
limit?: string,
offset?: string,
start?: string// start = offset; Por compatibilidad, normalmente solo limit y offset
}
export type PaginationArgs = {
limit?: number,
offset?: number,
start?: number // start = offset; Por compatibilidad, normalmente solo limit y offset
}

View File

@@ -47,6 +47,7 @@ export namespace SimEvents {
options: {
}
}
export type suspend = pause
export type free = DomainEvent & {
key: `sim.${string}.free`,

View File

@@ -6,6 +6,7 @@ export interface IOperationsRepository {
createOperation(data: ObjeniousOperation): Promise<Result<string, ObjeniousOperation>>
updateOperation(data: ObjeniousOperationChange): Promise<Result<string, ObjeniousOperation>>
getPendingOperations(): Promise<Result<string, ObjeniousOperation[]>>
getSuspendedTime(iccid: string): Promise<Result<string, { total_milliseconds: number, total_days: number }>>
}
export type ObjeniousOperation = {

View File

@@ -1,12 +1,12 @@
import test, { after, before, describe } from "node:test";
import { CreateObjeniousLineDTO } from "sim-shared/domain/objeniousLine.js";
import { ObjeniousLinesRepository } from "./ObjeniousLinesRepository.js";
import { postgrClient } from "../config/postgreConfig.js";
import { ObjeniousLinesRepository } from "sim-shared/infrastructure/ObjeniousLinesRepository.js";
import assert from "node:assert";
import { postgresClient } from "../config/config.test.js";
describe("Line insertion test", async () => {
//const pgClient = postgreClientIntranet
const pgClient = postgrClient // En prod hay que usar el de Intrantet para usar la otra base de datos
const pgClient = postgresClient// En prod hay que usar el de Intrantet para usar la otra base de datos
const lineRepository = new ObjeniousLinesRepository(pgClient)
const lineaTest: CreateObjeniousLineDTO = {
simId: 1234,

View File

@@ -4,7 +4,7 @@
*/
import { createHash } from "node:crypto";
import { PoolClient } from "pg";
import { CreateObjeniousLineDTO } from "sim-shared/domain/objeniousLine.js";
import { CreateObjeniousLineDTO, ObjeniousLineDb } from "sim-shared/domain/objeniousLine.js";
import { PgClient } from "sim-shared/infrastructure/PgClient.js";
export class ObjeniousLinesRepository {
@@ -24,6 +24,58 @@ export class ObjeniousLinesRepository {
}
}
/**
* Hay que hacer la query un poco mas general
*/
public async getLinesByStatus(query: { status?: string | undefined }, pagination: { limit: number, offset: number }) {
// $1 y $2 se reservan para paginación
const values: (string | number)[] = [
pagination.limit,
pagination.offset,
]
const paginationStr = `
LIMIT $1
OFFSET $2
`
const conditionsStr = `
WHERE
raw -> 'status' ->> 'networkStatus' = $3
`
let queryStr = `
SELECT * FROM objenious_lines
`
if (query.status != undefined) {
queryStr = queryStr + conditionsStr
values.push(query.status)
}
queryStr = queryStr + `
${paginationStr};
`
let client: PoolClient | undefined = undefined;
try {
client = await this.pgClient.connect();
const res = await client.query<ObjeniousLineDb>(queryStr, values);
return {
data: res.rows,
offset: pagination.offset,
rowCount: res.rowCount ?? 0,
}
} catch (err) {
console.error('Error en la query:', err);
throw err;
} finally {
if (client != undefined) {
client.release()
}
}
}
public async insertOrUpdate(data: CreateObjeniousLineDTO) {
const query = `
INSERT INTO objenious_lines (

View File

@@ -1,4 +1,4 @@
import { before, describe, it } from "node:test";
import { before, after, describe, it } from "node:test";
import { ObjeniousOperationsRepository } from "./ObjeniousOperationRepository.js";
import { httpObjClient, postgresClient } from "../config/config.test.js";
import { ObjeniousOperation } from "../domain/operationsRepository.port.js";
@@ -21,6 +21,7 @@ describe("[Integration] Test API requests", () => {
httpObjClient,
postgresClient
)
const suspend_iccid = "test_suspended_time_iccid";
before(async () => {
await repository.createOperation(correctOperation)
@@ -36,4 +37,40 @@ describe("[Integration] Test API requests", () => {
* - Se ignoran las erroneas
*/
})
it("Calculates suspended time accurately", async () => {
// Se crean registros con un iccid concocido
await postgresClient.query(`
INSERT INTO objenious_operation (operation, iccids, status, error, start_date, end_date) VALUES
('suspend', $1, 'finished', NULL, NOW() - INTERVAL '3 days', NOW() - INTERVAL '3 days'),
('reactivate', $1, 'finished', NULL, NOW() - INTERVAL '2 days', NOW() - INTERVAL '2 days'),
('suspend', $1, 'finished', NULL, NOW() - INTERVAL '1 day', NOW() - INTERVAL '1 day');
`, [suspend_iccid]);
const result = await repository.getSuspendedTime(suspend_iccid);
if (result.error) {
throw new Error("Query returned an error: " + result.error);
}
// Se esperan mas o menos 2 dias para cada periodo, total 4 (Puede cambiar un poco por zona horaria)
// 2 dias en ms
const expectedApproxMs = 2 * 24 * 60 * 60 * 1000;
const msDiff = Math.abs(result.data!.total_milliseconds - expectedApproxMs);
// Margen de 5s
if (msDiff > 5000) {
throw new Error(`Expected approx ${expectedApproxMs} ms, got ${result.data!.total_milliseconds}`);
}
// como se incluye el dia de sespension los dias van a variar de 2 a 3
if (result.data!.total_days < 2) {
throw new Error("total_days should be at least 2");
}
});
after(async () => {
// Eliminacion de los iccid de periodo de suspensiones
await postgresClient.query(`DELETE FROM objenious_operation WHERE iccids = '${suspend_iccid}'`);
});
})

View File

@@ -231,4 +231,66 @@ export class ObjeniousOperationsRepository implements IOperationsRepository {
}
}
}
/**
* Obtiene el tiempo en suspensión de una linea en miliseguntos y dias efetivos
* Todo el calculo se hace en postgres. Puede que haga falta traer las transiciones
* que normalmente son pocas, para hacer filtros personalizados.
*/
async getSuspendedTime(iccid: string): Promise<Result<string, { total_milliseconds: number, total_days: number }>> {
const query = `
WITH ordered_events AS (
-- 1. Selecciona y normaliza los eventos relevantes del historial
-- Se define el 'estado' final (suspendido vs activo) basado en la operación
SELECT operation, end_date,
CASE WHEN operation = 'suspend' THEN 'suspended' ELSE 'active' END as state
FROM objenious_operation
WHERE iccids = $1 AND status = 'finished' AND error IS NULL
AND operation IN ('suspend', 'activate', 'reactivate', 'terminate')
ORDER BY end_date
),
state_transitions AS (
-- 2. Detecta cambios de estado comparando con la fila anterior (LAG)
SELECT state, end_date,
LAG(state) OVER (ORDER BY end_date) as prev_state
FROM ordered_events
),
filtered_transitions AS (
-- 3. Filtra solo las filas donde el estado realmente ha cambaido
-- Se obtiene la fecha de inicio del siguiente intervalo (LEAD)
SELECT state, end_date,
LEAD(end_date) OVER (ORDER BY end_date) as next_end_date
FROM state_transitions
WHERE state IS DISTINCT FROM prev_state
),
intervals AS (
-- 4. Calcula la duración de los periodos en los que el estado fue 'suspended'
-- Se usa NOW() para el intervalo abierto (último estado hasta hoy)
SELECT EXTRACT(EPOCH FROM (COALESCE(next_end_date, NOW() at time zone 'utc') - end_date)) * 1000 as ms_duration,
(COALESCE(next_end_date, NOW() at time zone 'utc')::date - end_date::date) + 1 as days_duration
FROM filtered_transitions
WHERE state = 'suspended'
)
-- 5. Suma total de tiempo en estado de suspensión
SELECT COALESCE(SUM(ms_duration)::bigint, 0) as total_milliseconds,
COALESCE(SUM(days_duration), 0) as total_days
FROM intervals;
`;
try {
const { rows } = await this.pgClient.query<{ total_milliseconds: string, total_days: string }>(query, [iccid]);
return {
data: {
total_milliseconds: parseFloat(rows[0].total_milliseconds),
total_days: parseInt(rows[0].total_days)
}
};
} catch (e) {
console.error("Error calculating suspended time", e);
return {
error: String(e),
data: undefined
};
}
}
}

View File

@@ -20,7 +20,6 @@ const order2 = <CreateOrderDTO>{
payload: { iccid: "5678", action: "activate" }
}
describe("Test OrderRepository", {}, (ctx) => {
const orderRepo = new OrderRepository(postgresClient)
let testIds: number[] = []

View File

@@ -380,6 +380,7 @@ export class OrderRepository {
const id = currentOrderResult.data.id // Saco el id para evitar busacr por correlation_id que es mas lento
const currentOrder = currentOrderResult.data!
//console.log("Current Order", currentOrder)
// 3. Si todo ok se actualiza el order
// Si el status es dlx se asume que ha terminado y no va a reintentarse
@@ -401,6 +402,8 @@ export class OrderRepository {
client.query<{ id: number, status: string, update_date: string }>(uOrderTracking, vOrderTracking)
)
console.log("updatedOrderResult", updatedOrderResult)
if (updatedOrderResult.error != undefined) {
await client.query("ROLLBACK")
client.release()
@@ -423,7 +426,7 @@ export class OrderRepository {
)
RETURNING id;
`
const vOrderHistory = [args.id, currentOrder.status, args.status, args.reason]
const vOrderHistory = [currentOrder.id, currentOrder.status, args.status, args.reason]
const newOrderHistoryResult = await this.getFirst(
client.query<{ id: number }>(iOrderHistory, vOrderHistory)
)

View File

@@ -21,15 +21,22 @@ export class RabbitMQEventBus implements EventBus {
channel?: ChannelWrapper
connected: Boolean = false
private delayedExchange: string;
private dlxExchange: string;
private connectionOptions: RMQConnectionParams
constructor(args: {
connectionParams: RMQConnectionParams,
buildStructure?: (chan: Channel) => Promise<void>,
maxRetry?: number
maxRetry?: number,
delayedExchange: string,
dlxExchange: string
}) {
this.connectionOptions = args.connectionParams
if (args.buildStructure != undefined) this.buildStructure = args.buildStructure
if (args.maxRetry != undefined) this.maxRetry = args.maxRetry
this.delayedExchange = args.delayedExchange
this.dlxExchange = args.dlxExchange
}
async consume(queue: string, callback: (msg: ConsumeMessage | null) => void) {
@@ -50,23 +57,25 @@ export class RabbitMQEventBus implements EventBus {
async nack(msg: ConsumeMessage, requeue?: boolean) {
if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado");
console.log("NACK: ", msg.properties.headers)
console.log("[i] NACK: ", msg.properties.headers)
const headers = msg.properties.headers || {}
const numberRetry = headers['x-retry-count'] || 0
const routingKey = msg.fields.routingKey
if (numberRetry < this.maxRetry) {
console.log("Delaying")
await this.channel.publish("sim.ex.objenious.delayed", routingKey, msg.content, {
console.log("[i] Delaying ")
// "sim.ex.objenious.delayed"
await this.channel.publish(this.delayedExchange, routingKey, msg.content, {
headers: {
...headers,
'x-retry-count': numberRetry + 1
}
})
} else {
console.log("DeadLetter")
await this.channel.publish("sim.ex.objenious.dlx", routingKey, msg.content, {
console.log("[i] DeadLetter")
//"sim.ex.objenious.dlx"
await this.channel.publish(this.dlxExchange, routingKey, msg.content, {
headers: {
...headers
}

View File

@@ -9,6 +9,6 @@
],
"include": [
"**/*.ts",
"../../packages/sim-shared/**/*.ts",
"**/*.d.ts",
]
}

View File

@@ -3,7 +3,7 @@
"compilerOptions": {
"outDir": "dist",
"rootDir": "./",
"baseUrl": "./",
//"baseUrl": "./", //eliminar para v6
"composite": true,
"esModuleInterop": true,
"sourceMap": true,
@@ -12,6 +12,11 @@
"resolveJsonModule": true,
"module": "nodenext",
"moduleResolution": "nodenext",
"paths": {
"sim-consumidor-objenious": [
"./packages/sim-consumidor-objenious/*"
]
}
},
"include": [
"**/*.ts",
@@ -20,5 +25,5 @@
],
"exclude": [
"dist"
]
],
}

1628
yarn.lock

File diff suppressed because it is too large Load Diff