Cron completo y mejora de logs
This commit is contained in:
@@ -4,7 +4,12 @@
|
||||
* "Test"
|
||||
*/
|
||||
|
||||
CREATE TYPE SUSPENDTERMINATE AS ENUM ('suspend','terminate');
|
||||
|
||||
DO $$ BEGIN
|
||||
CREATE TYPE SUSPENDTERMINATE AS ENUM ('suspend','terminate');
|
||||
EXCEPTION
|
||||
WHEN duplicate_object THEN null;
|
||||
END $$;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS pause_cancel_tasks (
|
||||
id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
|
||||
@@ -15,7 +20,8 @@ CREATE TABLE IF NOT EXISTS pause_cancel_tasks (
|
||||
next_check TIMESTAMPTZ, -- Si se ha comprobado se asignará la siguiente fecha de revision
|
||||
|
||||
completed_date TIMESTAMPTZ, -- Cuando se ha completado, para bien o mal.
|
||||
error TEXT
|
||||
error TEXT,
|
||||
actionData JSONB -- datos de la operacion original.
|
||||
);
|
||||
|
||||
-- Indice de las tareas que no han terminado
|
||||
|
||||
@@ -9,6 +9,7 @@ import { httpInstance } from "#config/httpClient.config.js";
|
||||
import { OrderRepository } from "sim-shared/infrastructure/OrderRepository.js";
|
||||
import { PauseCancelTaskRepository } from "#adapters/PauseCancelTaskRepository.js";
|
||||
import { ObjeniousOperationsRepository } from "sim-shared/infrastructure/ObjeniousOperationRepository.js";
|
||||
import { ActionData } from "#domain/DTOs/objeniousapi.js";
|
||||
|
||||
describe("SimController Integration Tests (Real UseCases)", () => {
|
||||
let eventBusMock: any;
|
||||
@@ -31,13 +32,14 @@ describe("SimController Integration Tests (Real UseCases)", () => {
|
||||
);
|
||||
const orderRepository = new OrderRepository(postgrClient);
|
||||
const pauseRepository = new PauseCancelTaskRepository(postgrClient);
|
||||
|
||||
useCases = new SimUseCases({
|
||||
httpClient: httpInstance,
|
||||
operationRepository: operationRepository,
|
||||
orderRepository: orderRepository,
|
||||
pauseRepository: pauseRepository
|
||||
});
|
||||
// @ts-expect-error
|
||||
useCases.findActivationDate = async (data: ActionData) => new Date()
|
||||
|
||||
controller = new SimController(eventBusMock as unknown as EventBus, useCases);
|
||||
});
|
||||
@@ -70,6 +72,7 @@ describe("SimController Integration Tests (Real UseCases)", () => {
|
||||
const handler = controller.suspend();
|
||||
await handler(msg);
|
||||
|
||||
console.log("Nack: ", eventBusMock.nack.mock.callCount())
|
||||
// Verify that it reached the stage_suspend logic (which adds to pauseRepository)
|
||||
// We can query the DB or check if ACK was called
|
||||
assert.strictEqual(eventBusMock.ack.mock.callCount(), 1, "Message should be ACKed on success");
|
||||
|
||||
@@ -185,6 +185,7 @@ export class SimController {
|
||||
}
|
||||
|
||||
const useCaseRes = await this.tryUseCase(msg, this.useCases.stage_suspend(suspendData))
|
||||
console.log("res::", useCaseRes)
|
||||
/*
|
||||
const res = await this.tryUseCase(msg, this.useCases.suspend(actionData))
|
||||
*/
|
||||
|
||||
@@ -6,6 +6,7 @@ import { ObjeniousOperation, IOperationsRepository as OperationsRepositoryPort }
|
||||
import assert from "node:assert"
|
||||
import { OrderRepository } from "sim-shared/infrastructure/OrderRepository.js"
|
||||
import { CreatePauseCancelTaskDTO, PauseCancelTaskRepository } from "#adapters/PauseCancelTaskRepository.js"
|
||||
import { ObjeniousOperationsRepository } from "sim-shared/infrastructure/ObjeniousOperationRepository.js"
|
||||
|
||||
// TODO:
|
||||
// - Pasar a un archivo de DTOs
|
||||
@@ -13,13 +14,13 @@ import { CreatePauseCancelTaskDTO, PauseCancelTaskRepository } from "#adapters/P
|
||||
|
||||
export class SimUseCases {
|
||||
private readonly httpClient: HttpClient
|
||||
private readonly objeniousRepository: OperationsRepositoryPort
|
||||
private readonly objeniousRepository: ObjeniousOperationsRepository
|
||||
private readonly orderRepository: OrderRepository
|
||||
private readonly pauseRepository: PauseCancelTaskRepository
|
||||
|
||||
constructor(args: {
|
||||
httpClient: HttpClient,
|
||||
operationRepository: OperationsRepositoryPort,
|
||||
operationRepository: ObjeniousOperationsRepository,
|
||||
orderRepository: OrderRepository,
|
||||
pauseRepository: PauseCancelTaskRepository
|
||||
}) {
|
||||
@@ -243,31 +244,64 @@ export class SimUseCases {
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Metodo muy especifico para obtener la fecha e activacion o en su defecto
|
||||
* la actual para aber cuando se va a completar el periodo de test de una linea
|
||||
*/
|
||||
private async findActivationDate(actionData: ActionData) {
|
||||
const iccid = actionData.identifier.identifiers
|
||||
const lineData = await this.objeniousRepository.getLinesAPI("ICCID", iccid)
|
||||
let activationDate = new Date()
|
||||
// Si no se pueden sacar datos de la linea guardo momentaneamente el error
|
||||
// pero no se cancela la operacion, el error puede ser de objenious y no nos
|
||||
// puede afectar
|
||||
if (lineData.error != undefined) {
|
||||
console.error(lineData.error)
|
||||
} else {
|
||||
const activationDateStr = lineData.data[0].status.activationDate
|
||||
if (activationDateStr != undefined && activationDateStr != "") {
|
||||
activationDate = new Date(activationDateStr)
|
||||
}
|
||||
}
|
||||
return activationDate
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Paso previo a la suspension para evitar errores cuando el billing es test
|
||||
*/
|
||||
public stage_suspend(suspendData: ActionData): () => Promise<Result<string, boolean>> {
|
||||
return async (): Promise<Result<string, boolean>> => {
|
||||
const correlation_id = suspendData.correlation_id
|
||||
const iccid = suspendData.identifier.identifiers
|
||||
|
||||
const newTask: CreatePauseCancelTaskDTO = {
|
||||
iccid: suspendData.identifier.identifiers[0],
|
||||
activation_date: new Date(), // TODO: BUSCAR LA DE VERDAD
|
||||
next_check: undefined, // Que se haga instantaneamente al ser la primera
|
||||
operation_type: "suspend"
|
||||
}
|
||||
|
||||
const taskCreated = await this.pauseRepository.addTask(newTask)
|
||||
|
||||
// Caso que la task no se pueda crear en la BDD
|
||||
if (taskCreated.error != undefined) {
|
||||
console.error("[Sim.usecases]", taskCreated.error)
|
||||
const fail = (error: string) => {
|
||||
console.error("[Sim.usecases]", error)
|
||||
if (correlation_id != undefined) {
|
||||
this.orderRepository.updateOrder({
|
||||
correlation_id: correlation_id,
|
||||
new_status: "failed"
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
console.log("Preactivationdate", suspendData)
|
||||
const activationDate = await this.findActivationDate(suspendData)
|
||||
console.log("ActivationDate", activationDate)
|
||||
|
||||
const newTask: CreatePauseCancelTaskDTO = {
|
||||
iccid: iccid[0],
|
||||
activation_date: activationDate,
|
||||
next_check: undefined, // Que se haga instantaneamente al ser la primera
|
||||
operation_type: "suspend",
|
||||
actionData: suspendData
|
||||
}
|
||||
|
||||
const taskCreated = await this.pauseRepository.addTask(newTask)
|
||||
|
||||
// Caso que la task no se pueda crear en la BDD
|
||||
if (taskCreated.error != undefined) {
|
||||
fail(taskCreated.error)
|
||||
return {
|
||||
error: taskCreated.error
|
||||
}
|
||||
@@ -294,11 +328,13 @@ export class SimUseCases {
|
||||
return async (): Promise<Result<string, boolean>> => {
|
||||
const correlation_id = terminateData.correlation_id
|
||||
|
||||
const activationDate = await this.findActivationDate(terminateData)
|
||||
const newTask: CreatePauseCancelTaskDTO = {
|
||||
iccid: terminateData.identifier.identifiers[0],
|
||||
activation_date: new Date(), // TODO: BUSCAR LA DE VERDAD
|
||||
activation_date: activationDate,
|
||||
next_check: undefined, // Que se haga instantaneamente al ser la primera
|
||||
operation_type: "terminate"
|
||||
operation_type: "terminate",
|
||||
actionData: terminateData
|
||||
}
|
||||
|
||||
const taskCreated = await this.pauseRepository.addTask(newTask)
|
||||
|
||||
@@ -7,7 +7,15 @@ const testTask: CreatePauseCancelTaskDTO = {
|
||||
iccid: "1234",
|
||||
operation_type: "suspend",
|
||||
activation_date: new Date(),
|
||||
next_check: new Date()
|
||||
next_check: new Date(),
|
||||
actionData: {
|
||||
dueDate: new Date().toString(),
|
||||
correlation_id: "12223",
|
||||
identifier: {
|
||||
identifiers: ["1234"],
|
||||
identifierType: "ICCID"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
describe("Test PauseCancelTaskRepository - DB", () => {
|
||||
@@ -31,7 +39,7 @@ describe("Test PauseCancelTaskRepository - DB", () => {
|
||||
const created = await pauseRepo.addTask(testTask)
|
||||
assert.ok(created != undefined, "A value must be returned always")
|
||||
assert.ok(created.error == undefined, "Should not return a error")
|
||||
assert.ok(created.data != undefined, "Data mus be returned")
|
||||
assert.ok(created.data != undefined, "Data must be returned")
|
||||
createdIds.push(created.data.id)
|
||||
})
|
||||
|
||||
@@ -43,7 +51,7 @@ describe("Test PauseCancelTaskRepository - DB", () => {
|
||||
|
||||
assert.ok(updated != undefined, "A value must be returned always")
|
||||
assert.ok(updated.error == undefined, "Should not return a error")
|
||||
assert.ok(updated.data != undefined, "Data mus be returned")
|
||||
assert.ok(updated.data != undefined, "Data must be returned")
|
||||
})
|
||||
|
||||
it("Should finish a existing task", async () => {
|
||||
@@ -54,6 +62,16 @@ describe("Test PauseCancelTaskRepository - DB", () => {
|
||||
|
||||
assert.ok(finish != undefined, "A value must be returned always")
|
||||
assert.ok(finish.error == undefined, "Should not return a error")
|
||||
assert.ok(finish.data != undefined, "Data mus be returned")
|
||||
assert.ok(finish.data != undefined, "Data must be returned")
|
||||
})
|
||||
|
||||
it("Should get at least 1 pending task", async () => {
|
||||
const pending = await pauseRepo.getPending()
|
||||
|
||||
assert.ok(pending != undefined, "A value must be returned always")
|
||||
assert.ok(pending.error == undefined, "Should not return a error")
|
||||
assert.ok(pending.data != undefined, "Data must be returned")
|
||||
|
||||
console.log("--> ", pending.data)
|
||||
})
|
||||
})
|
||||
|
||||
@@ -2,6 +2,7 @@ import { Result } from "sim-shared/domain/Result.js";
|
||||
import { QueryResult } from "pg";
|
||||
import { PgClient } from "sim-shared/infrastructure/PgClient.js";
|
||||
import { AxiosError } from "axios";
|
||||
import { ActionData } from "#domain/DTOs/objeniousapi.js";
|
||||
|
||||
export type PauseCancelTask = {
|
||||
id: number;
|
||||
@@ -12,9 +13,10 @@ export type PauseCancelTask = {
|
||||
next_check?: Date | null;
|
||||
completed_date?: Date | null;
|
||||
error?: string | null;
|
||||
actionData: ActionData
|
||||
}
|
||||
|
||||
export type CreatePauseCancelTaskDTO = Pick<PauseCancelTask, "iccid" | "activation_date" | "next_check" | "operation_type">
|
||||
export type CreatePauseCancelTaskDTO = Pick<PauseCancelTask, "iccid" | "activation_date" | "next_check" | "operation_type" | "actionData">
|
||||
export type UpdatePauseCancelTaskDTO = Pick<PauseCancelTask, "id" | "next_check">
|
||||
export type FinishPauseCancelTaskDTO = Pick<PauseCancelTask, "id" | "error">
|
||||
|
||||
@@ -55,12 +57,12 @@ export class PauseCancelTaskRepository {
|
||||
public async addTask(task: CreatePauseCancelTaskDTO): Promise<Result<string, PauseCancelTask>> {
|
||||
|
||||
const sql = `
|
||||
INSERT INTO pause_cancel_tasks (iccid, activation_date, next_check, last_checked, operation_type)
|
||||
VALUES ($1, $2, $3, now(), $4)
|
||||
INSERT INTO pause_cancel_tasks (iccid, activation_date, next_check, last_checked, operation_type, actionData)
|
||||
VALUES ($1, $2, $3, now(), $4, $5)
|
||||
RETURNING *;
|
||||
`;
|
||||
try {
|
||||
const values = [task.iccid, task.activation_date, task.next_check, task.operation_type];
|
||||
const values = [task.iccid, task.activation_date, task.next_check, task.operation_type, JSON.stringify(task.actionData)];
|
||||
const res: QueryResult<PauseCancelTask> = await this.pgClient.query(sql, values);
|
||||
return {
|
||||
data: res.rows[0]
|
||||
|
||||
157
packages/sim-objenious-cron/tasks/check_pause_terminate.ts
Normal file
157
packages/sim-objenious-cron/tasks/check_pause_terminate.ts
Normal file
@@ -0,0 +1,157 @@
|
||||
import { ObjeniousLine } from "sim-shared/domain/objeniousLine.js";
|
||||
import { PauseCancelTaskRepository } from "sim-consumidor-objenious/infrastructure/PauseCancelTaskRepository.js";
|
||||
import { ObjeniousOperationsRepository } from "sim-shared/infrastructure/ObjeniousOperationRepository.js";
|
||||
import { SimUseCases } from "sim-consumidor-objenious/aplication/Sim.usecases.js";
|
||||
import { OrderRepository } from "packages/sim-shared/infrastructure/OrderRepository.js";
|
||||
|
||||
const logger =
|
||||
{
|
||||
log: (...data: any[]) => console.log("[i] [TaskPauseTerminate]", data),
|
||||
error: (...data: any[]) => console.error("[x] [TaskPauseTerminate] ", data),
|
||||
}
|
||||
|
||||
|
||||
export class TaskPauseTerminate {
|
||||
constructor(
|
||||
private readonly objeniousRepo: ObjeniousOperationsRepository,
|
||||
private readonly pauseRepo: PauseCancelTaskRepository,
|
||||
private readonly simUsecases: SimUseCases,
|
||||
private readonly orderRepo: OrderRepository
|
||||
) {
|
||||
}
|
||||
|
||||
public async run() {
|
||||
const finError = (err: any) => {
|
||||
logger.error("Finalizado con errores proceso de comprobacion de lineas en pausa o canceladas")
|
||||
logger.error(err)
|
||||
}
|
||||
try {
|
||||
logger.log("Iniciando proceso de comprobacion de lineas en pausa o canceladas")
|
||||
|
||||
// 1. Se comprueba cuantas peticiones hay qye revisar
|
||||
const peticionesRevisar = await this.pauseRepo.getPending()
|
||||
|
||||
if (peticionesRevisar.error != undefined) {
|
||||
finError(peticionesRevisar.error)
|
||||
return 1;
|
||||
}
|
||||
|
||||
logger.log(`Se van a revisar ${peticionesRevisar.data?.length} peticiones`)
|
||||
|
||||
// 2. Se comprueba que alguna de las lineas haya dejado de estar en estado de test
|
||||
const iccids = peticionesRevisar.data.map(e => e.iccid)
|
||||
const lineasActualizadas: ObjeniousLine[] = []
|
||||
|
||||
const lineGenerator = this.objeniousRepo.getLinesByStatusAPI({
|
||||
iccids: iccids
|
||||
})
|
||||
|
||||
let lines = await lineGenerator.next()
|
||||
|
||||
if (lines.value.error != undefined || lines.value.data == undefined) {
|
||||
logger.error("Error cargando las lineas", lines.value.error)
|
||||
finError(lines.value.error)
|
||||
return 1;
|
||||
} else {
|
||||
lineasActualizadas.push(...lines.value.data)
|
||||
}
|
||||
|
||||
while (!lines.done) {
|
||||
if (lines.value.error != undefined || lines.value.data == undefined) {
|
||||
logger.error("Error cargando las lineas", lines.value.error)
|
||||
finError(lines.value.error)
|
||||
return 1;
|
||||
} else {
|
||||
lineasActualizadas.push(...lines.value.data)
|
||||
}
|
||||
|
||||
lines = await lineGenerator.next()
|
||||
}
|
||||
|
||||
// 3. Se separan las lineas que se tienen que actualizar al no ser test
|
||||
// y las que se tienen que reencolar al ser test
|
||||
const lineasNoTest = lineasActualizadas.filter(e => e.status.billingStatus != "TEST")
|
||||
const lineasTest = lineasActualizadas.filter(e => e.status.billingStatus == "TEST")
|
||||
|
||||
// 4. Las lineas de test se reencolan
|
||||
// El proximo reintento es en 1 dia
|
||||
const proximoReintento = new Date()
|
||||
proximoReintento.setDate(new Date().getDate() + 1)
|
||||
|
||||
// 5. Reintentos en 1 dia
|
||||
for (const linea of lineasTest) {
|
||||
const lineaId = peticionesRevisar.data
|
||||
.find(e => e.iccid == linea.identifier.iccid)?.id
|
||||
|
||||
if (lineaId == undefined) continue; // Esto puede ser un problema si se generaliza
|
||||
|
||||
this.pauseRepo.updateTask({
|
||||
id: lineaId,
|
||||
next_check: proximoReintento
|
||||
})
|
||||
}
|
||||
|
||||
// 6. Operaciones de pausa/cancelacion definitiva
|
||||
for (const linea of lineasNoTest) {
|
||||
const operacion = peticionesRevisar.data
|
||||
.find(e => e.iccid == linea.identifier.iccid)
|
||||
|
||||
if (operacion == undefined) continue;
|
||||
const dueDate = new Date()
|
||||
dueDate.setMinutes(new Date().getMinutes() + 15)
|
||||
|
||||
const operacionTipo = operacion.operation_type
|
||||
const actionData = operacion.actionData
|
||||
const correlation_id = operacion.actionData.correlation_id
|
||||
actionData.dueDate = dueDate.toString()
|
||||
|
||||
if (linea.status.billingStatus == "ACTIVATED") {
|
||||
let exito = false;
|
||||
let result = null;
|
||||
// IMPORTANTE COMRPOBAR EL DUE DATE
|
||||
switch (operacionTipo) {
|
||||
case "suspend":
|
||||
result = await this.simUsecases.suspend(actionData)()
|
||||
break;
|
||||
case "terminate":
|
||||
result = await this.simUsecases.terminate(actionData)()
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
if (result == undefined) {
|
||||
logger.error("Operacion desconocida", operacion)
|
||||
} else if (result?.error != undefined) {
|
||||
// error usecase
|
||||
logger.error(result.error)
|
||||
await this.pauseRepo.finishTask({
|
||||
id: operacion.id,
|
||||
error: result.error
|
||||
})
|
||||
if (correlation_id != undefined)
|
||||
await this.orderRepo.errorOrder({
|
||||
correlation_id: correlation_id,
|
||||
status: "dlx",
|
||||
reason: result.error
|
||||
})
|
||||
} else {
|
||||
// ok
|
||||
await this.pauseRepo.finishTask({ id: operacion.id })
|
||||
if (correlation_id != undefined)
|
||||
await this.orderRepo.finishOrder({ correlation_id })
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: SUSPENDED Y TERMINATED
|
||||
}
|
||||
|
||||
|
||||
logger.log("Finalizado con exito proceso de comprobacion de lineas en pausa o canceladas")
|
||||
} catch (e) {
|
||||
finError(e)
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
import { lineToCreateLineDto, ObjeniousLine } from "sim-shared/domain/objeniousLine.js";
|
||||
import { ObjeniousLinesRepository } from "../infranstructure/ObjeniousLinesRepository.js";
|
||||
import { ObjeniousOperationsRepository } from "packages/sim-shared/infrastructure/ObjeniousOperationRepository.js";
|
||||
import { ObjeniousOperationsRepository } from "sim-shared/infrastructure/ObjeniousOperationRepository.js";
|
||||
|
||||
export class TaskVolcadoLineas {
|
||||
constructor(
|
||||
@@ -40,7 +40,6 @@ export class TaskVolcadoLineas {
|
||||
await this.saveLines(lines.value.data)
|
||||
|
||||
while (!lines.done) {
|
||||
console.log()
|
||||
lines = await linesIterator.next()
|
||||
if (lines.value.error != undefined || lines.value.data == undefined) {
|
||||
console.error("[x] Error cargando las lineas a volcar", lines.value.error)
|
||||
|
||||
@@ -83,7 +83,7 @@ export type ObjeniousLine = {
|
||||
commercialStatus: string, //"test",
|
||||
commercialStatusDate: string, //"2026-03-17T11:41:01.493+00:00",
|
||||
networkStatus: string, // "ACTIVATED",
|
||||
billingStatus: string, //"TEST",
|
||||
billingStatus: "ACTIVATED" | "SUSPENDED" | "CANCELED" | "TEST",
|
||||
billingStatusChangeDate: string | null, // "2026-03-17T11:01:00.276+00:00",
|
||||
billingActivationDate: string | null //,
|
||||
createdDate: string | null,//"2026-01-30T01:50:02.060+00:00"
|
||||
|
||||
@@ -16,6 +16,7 @@ export class ObjeniousOperationsRepository implements IOperationsRepository {
|
||||
|
||||
/**
|
||||
* Consulta el estado de una o mas lineas directamente a la API de Objenious
|
||||
* TODO: No hay paginacion como en getLinesByStatusAPI
|
||||
*/
|
||||
public async getLinesAPI(
|
||||
identifierType: "ICCID" | "IMSI" | "IMEI" | "MSISDN" | "REFERENCE",
|
||||
@@ -59,7 +60,8 @@ export class ObjeniousOperationsRepository implements IOperationsRepository {
|
||||
public async * getLinesByStatusAPI(args?: {
|
||||
pageSize?: number,
|
||||
pageNumber?: number,
|
||||
status?: string
|
||||
status?: string,
|
||||
iccids?: string[]
|
||||
}): AsyncGenerator<Result<string, ObjeniousLine[]>, Result<string, ObjeniousLine[]>, any> {
|
||||
|
||||
const path = "/lines"
|
||||
@@ -70,6 +72,12 @@ export class ObjeniousOperationsRepository implements IOperationsRepository {
|
||||
|
||||
const params: Record<string, string | number> = {}
|
||||
|
||||
// Si se va a filtrar por iccids especificamente, en un futuro habra que ampliar el tipo de filtros
|
||||
if (args?.iccids != undefined) {
|
||||
params["identifier.identifierType"] = "ICCID"
|
||||
params["identifier.identifiers"] = args.iccids.toString()
|
||||
}
|
||||
|
||||
const loadNextLine = async (page: number): Promise<Result<string, ObjeniousLine[]>> => {
|
||||
if (args?.status != undefined) params["simStatus"] = args.status
|
||||
params["pageSize"] = pageSize
|
||||
|
||||
Reference in New Issue
Block a user