diff --git a/packages/sim-objenious-cron/config/intranetPostgresConfig.ts b/packages/sim-objenious-cron/config/intranetPostgresConfig.ts index 7797136..001134b 100644 --- a/packages/sim-objenious-cron/config/intranetPostgresConfig.ts +++ b/packages/sim-objenious-cron/config/intranetPostgresConfig.ts @@ -10,7 +10,7 @@ import { env } from './env/index.js'; export const pgPoolIntranet = new Pool({ user: env.POSTGRES_USER, host: env.POSTGRES_HOST, - database: "intranet", + database: "postgres", password: env.POSTGRES_PASSWORD, port: Number(env.POSTGRES_PORT) || 5432, }); diff --git a/packages/sim-objenious-cron/index.ts b/packages/sim-objenious-cron/index.ts index e9cf2ef..38a83f6 100644 --- a/packages/sim-objenious-cron/index.ts +++ b/packages/sim-objenious-cron/index.ts @@ -5,6 +5,9 @@ import { httpInstance } from "./config/httpClient.config.js" import { CheckObjeniousRequests } from "./tasks/check_objenious_request.js" import { ObjeniousOperationsRepository } from "sim-shared/infrastructure/ObjeniousOperationRepository.js" import { OrderRepository } from "sim-shared/infrastructure/OrderRepository.js" +import { TaskVolcadoLineas } from "./tasks/volcado_lineas.js" +import { ObjeniousLinesRepository } from "./infranstructure/ObjeniousLinesRepository.js" +import { postgresClientIntranet } from "./config/intranetPostgresConfig.js" async function startCron() { const commonSettings = { @@ -14,10 +17,13 @@ async function startCron() { const httpClient = httpInstance const pgClient = new PgClient({ pool: pgPool }) + + console.log("[i] Comprobando conexion con la BDD ") await pgClient.checkDatabaseConnection() - await pgClient.checkDatabaseConnection() + const operationRepository = new ObjeniousOperationsRepository(pgClient) const orderRepository = new OrderRepository(pgClient) + const objeniousLineRepository = new ObjeniousLinesRepository(postgresClientIntranet) const objTask = new CheckObjeniousRequests( operationRepository, @@ -25,23 +31,28 @@ async function startCron() { httpClient, ) - await objTask.getPendingOperations() + const volcadoLineasTask = new TaskVolcadoLineas(httpClient, objeniousLineRepository) + const PERIODO_PETICIONES = 10 * 60 * 60 const interval = setInterval(async () => { - console.log("Updating...") - await objTask.getPendingOperations() - console.log("Update finished") - }, 10 * 60 * 1000) - /* - const task = cron.createTask("* * * * *", async () => { - } - , { - ...commonSettings, - name: "Test" - }) -*/ + try { + await objTask.getPendingOperations() + } catch (e) { + console.error("[x] Error de actualizacion de las lineas ") + } + }, PERIODO_PETICIONES) + + const PERIODO_VOLCADO = 60 * 60 * 1000 + const volcadoInterval = setInterval(async () => { + try { + await volcadoLineasTask.loadLines() + } catch (e) { + console.error("[x] Volcado de lineas de Objenious Fallido", e) + } + }, PERIODO_VOLCADO) + + await volcadoLineasTask.loadLines() - //await objTask.getPendingOperations() } diff --git a/packages/sim-objenious-cron/infranstructure/ObjeniousLinesRepository.test.ts b/packages/sim-objenious-cron/infranstructure/ObjeniousLinesRepository.test.ts index 83a9f80..7fa5623 100644 --- a/packages/sim-objenious-cron/infranstructure/ObjeniousLinesRepository.test.ts +++ b/packages/sim-objenious-cron/infranstructure/ObjeniousLinesRepository.test.ts @@ -1,4 +1,4 @@ -import test, { describe } from "node:test"; +import test, { after, before, describe } from "node:test"; import { CreateObjeniousLineDTO } from "sim-shared/domain/objeniousLine.js"; import { ObjeniousLinesRepository } from "./ObjeniousLinesRepository.js"; import { postgrClient } from "../config/postgreConfig.js"; @@ -9,7 +9,7 @@ describe("Line insertion test", async () => { const pgClient = postgrClient // En prod hay que usar el de Intrantet para usar la otra base de datos const lineRepository = new ObjeniousLinesRepository(pgClient) const lineaTest: CreateObjeniousLineDTO = { - simId: "1234", + simId: 1234, iccid: "9999999999999", msisdn: "34654674732", imei: "219789481293", @@ -27,6 +27,19 @@ describe("Line insertion test", async () => { raw: { test: "test" } as any // Para este test no hace falta } + // Clean up before and after tests to ensure isolation + const cleanup = async () => { + await pgClient.query("DELETE FROM objenious_lines WHERE simId = 1234"); + }; + + before(async () => { + await cleanup() + }) + + after(async () => { + await cleanup() + }) + test("Should insert new line", async () => { const res = await lineRepository.insertOrUpdate(lineaTest) assert.ok(res != undefined, "The line wasn't created") @@ -34,7 +47,13 @@ describe("Line insertion test", async () => { test("Should not update a line if the hash is the same", async () => { const res = await lineRepository.insertOrUpdate(lineaTest) - console.log("Update", res) - assert.ok(res != undefined, "The line wasn't updated") + assert.ok(res == undefined, "The line have been updated") + }) + + test("Should update a line if the hash changes", async () => { + const updated = structuredClone(lineaTest) + lineaTest.billingActivationDate = new Date() + const res = await lineRepository.insertOrUpdate(lineaTest) + assert.ok(res != undefined, "The line have been updated") }) }) diff --git a/packages/sim-objenious-cron/infranstructure/ObjeniousLinesRepository.ts b/packages/sim-objenious-cron/infranstructure/ObjeniousLinesRepository.ts index 7499db6..4bba217 100644 --- a/packages/sim-objenious-cron/infranstructure/ObjeniousLinesRepository.ts +++ b/packages/sim-objenious-cron/infranstructure/ObjeniousLinesRepository.ts @@ -1,3 +1,8 @@ +/** + * Repositorio para el volcado de lineas de objenious en intranet + * solo para uso en el volcado. + */ +import { createHash } from "node:crypto"; import { PoolClient } from "pg"; import { CreateObjeniousLineDTO } from "sim-shared/domain/objeniousLine.js"; import { PgClient } from "sim-shared/infrastructure/PgClient.js"; @@ -11,7 +16,8 @@ export class ObjeniousLinesRepository { private generateLineHash(data: CreateObjeniousLineDTO) { try { const lineStr = JSON.stringify(data) - return lineStr + const hash = createHash("sha256").update(lineStr).digest("base64url") + return hash } catch (e) { console.error("[x] Error generando el hash de la linea", data) return undefined diff --git a/packages/sim-objenious-cron/tasks/check_objenious_request.ts b/packages/sim-objenious-cron/tasks/check_objenious_request.ts index 5b85fbe..8e4d5f4 100644 --- a/packages/sim-objenious-cron/tasks/check_objenious_request.ts +++ b/packages/sim-objenious-cron/tasks/check_objenious_request.ts @@ -1,4 +1,4 @@ -import { env } from "#config/env/index.js"; +import { env } from "../config/env/index.js"; import { OrderRepository } from "sim-shared/infrastructure/OrderRepository.js"; import axios from "axios"; import { IOperationsRepository, Objenious, ObjeniousOperation, ObjeniousOperationChange, StatusEnum } from "sim-shared/domain/operationsRepository.port.js"; @@ -16,6 +16,7 @@ export class CheckObjeniousRequests { * TODO: meter a una funcion a parte task con los 3 pasos */ public async getPendingOperations() { + console.log("[i] Inicio revision de peticiones") // 1. Se obtienen todas las operaciones pendientes de la BDD const pendingOperations = await this.operationsRepository.getPendingOperations() @@ -49,6 +50,8 @@ export class CheckObjeniousRequests { console.log("[cron] Solicitando status para", merged.map(e => e.id)) const result = await this.getMassActionsStatus(merged) + + console.log("[o] Revisión de eventos completa") } /** diff --git a/packages/sim-objenious-cron/tasks/volcado_lineas.ts b/packages/sim-objenious-cron/tasks/volcado_lineas.ts index 1155702..e8cbd8a 100644 --- a/packages/sim-objenious-cron/tasks/volcado_lineas.ts +++ b/packages/sim-objenious-cron/tasks/volcado_lineas.ts @@ -1,68 +1,133 @@ -import { ObjeniousLine, ObjeniousLineResponse } from "sim-shared/domain/objeniousLine.js"; +import assert from "node:assert"; +import { lineToCreateLineDto, ObjeniousLine, ObjeniousLineResponse } from "sim-shared/domain/objeniousLine.js"; import { tryCatch, Result } from "sim-shared/domain/Result.js"; import { HttpClient } from "sim-shared/infrastructure/HTTPClient.js"; +import { ObjeniousLinesRepository } from "../infranstructure/ObjeniousLinesRepository.js"; +import { AxiosResponse } from "axios"; +import { constants } from "node:buffer"; -const MAX_PAGE_SIZE = 1000 +const MAX_PAGE_SIZE = 100 -export class VolcadoLineas { +export class TaskVolcadoLineas { constructor( - private readonly httpClient: HttpClient + private readonly httpClient: HttpClient, + private readonly linesRepository: ObjeniousLinesRepository, ) { } /** * Mover al repo */ - private async getLinesByStatus(args?: { + private async * getLinesByStatus(args?: { pageSize?: number, pageNumber?: number, status?: string - }): Promise> { + }): AsyncGenerator, Result, any> { const path = "/lines" const pageSize = args?.pageSize ?? MAX_PAGE_SIZE; - const status = args?.status ?? null; - let currentPage = args?.pageNumber ?? 0; - let totalPages = 1; - let allLines: ObjeniousLine[] = [] - const loadNextLine = async () => { - const nextPage = await tryCatch(this.httpClient.client.get(path, { - params: { - simStatus: status, - pageSize: pageSize, - pageNumber: currentPage - } + let currentPage = args?.pageNumber ?? 0; + let totalPages: number | undefined = undefined; // Como limite de paginas, igual es pasarse pero hasta que se lea + + const params: Record = {} + + const loadNextLine = async (page: number): Promise> => { + if (args?.status != undefined) params["simStatus"] = args.status + params["pageSize"] = pageSize + params["pageNumber"] = page + console.log("Params", params) + console.log(`[i] Cargando pagina ${currentPage} de ${totalPages ?? "(desc)"}`) + const nextPage = await tryCatch>(this.httpClient.client.get(path, { + params: params })) + if (nextPage.error != undefined) { + console.error(nextPage.error.msg) return { error: nextPage.error.msg.message } } // Se aumenta para la siguiente ejecucion - currentPage = nextPage.data.pageNumber + 1 - allLines = [...allLines, ...nextPage.data.content] - totalPages = nextPage.data.totalPages + console.log(`[i] Página ${currentPage} completa, total: ${nextPage.data.data.totalPages}`) + totalPages = nextPage.data.data.totalPages + + return { + data: nextPage.data.data.content + } + } // El inicio se ejecuta siempre - await loadNextLine() + const lines = await loadNextLine(currentPage) + + if (lines.error != undefined) { + console.error("[x] Error obteniendo las lineas, cancelando operación"); + return { + error: "Error cargando lineas" + } + } + + currentPage++; + + yield { + data: lines.data + } // Copia para evitar bucles infinitos por error de la api const maxPages = totalPages - - for (let i = currentPage; i < maxPages; i++) { - await loadNextLine() + assert.ok(maxPages != undefined, "No se ha defindo el numero de paginas") // Nunca deberia pasar pero así se evitan bucles infnitos + console.log("maxPages", maxPages) + for (let i = currentPage; i < maxPages!; i++) { + console.log("Bucle i:", i, "page: ", currentPage) + yield await loadNextLine(currentPage); + currentPage++; } return { - data: allLines + data: [] + } + } + + private async saveLines(lines: ObjeniousLine[]) { + const linesToCreate = lines.map(lineToCreateLineDto) + let created: number[] = [] + + + for (const line of linesToCreate) { + // Si es lento pasar a Promise.all + const res = await this.linesRepository.insertOrUpdate(line) + if (res?.id != undefined) + created.push(res.id) } } public async loadLines() { + console.log("[i] Iniciando task de volcado de lineas de Objenious") + // Carga todas las lineas en memoria, hay que comprobar que no se gaste demasiada + const linesIterator = this.getLinesByStatus() + let lines = await linesIterator.next() + + if (lines.value.error != undefined || lines.value.data == undefined) { + console.error("[x] Error cargando las lineas a volcar", lines.value.error) + return; + } + + await this.saveLines(lines.value.data) + + while (!lines.done) { + console.log() + lines = await linesIterator.next() + if (lines.value.error != undefined || lines.value.data == undefined) { + console.error("[x] Error cargando las lineas a volcar", lines.value.error) + return; + } + await this.saveLines(lines.value.data) + } + + console.log("[i] Terminado task de volcado de lineas de Objenious") } } diff --git a/packages/sim-shared/domain/objeniousLine.ts b/packages/sim-shared/domain/objeniousLine.ts index aa763fb..c04eb30 100644 --- a/packages/sim-shared/domain/objeniousLine.ts +++ b/packages/sim-shared/domain/objeniousLine.ts @@ -93,23 +93,52 @@ export type ObjeniousLine = { export type ObjeniousLineDb = { id: number; - simId?: string; + simId?: number; iccid: string; msisdn?: string; imei?: string; imeiChangeDate?: Date; offerCode?: string; status?: string; - preactivationDate?: Date; - activationDate?: Date; + preactivationDate?: Date | null; + activationDate?: Date | null; commercialStatus?: string; - commercialStatusDate?: Date; + commercialStatusDate?: Date | null; billingStatus?: string; - billingStatusChangeDate?: Date; - billingActivationDate?: Date; - createDate?: Date; + billingStatusChangeDate?: Date | null; + billingActivationDate?: Date | null; + createDate?: Date | null; raw: ObjeniousLine; } // DTO para inserción (omite el ID autogenerado) export type CreateObjeniousLineDTO = Omit; + +export function lineToCreateLineDto(line: ObjeniousLine): CreateObjeniousLineDTO { + + const dateOrNull = (data: string | null) => { + if (data == null) return null; + return new Date(data) + } + + const transformed: CreateObjeniousLineDTO = { + simId: line.identifier.simId, + iccid: line.identifier.iccid, + msisdn: line.identifier.msisdn, + imei: line.identifier.imei, + imeiChangeDate: new Date(line.device.imeiChangeDate), + offerCode: line.offer.code, + status: line.status.status, + preactivationDate: dateOrNull(line.status.preactivationDate), + activationDate: dateOrNull(line.status.activationDate), + commercialStatus: line.status.commercialStatus, + commercialStatusDate: dateOrNull(line.status.commercialStatusDate), + billingStatus: line.status.billingStatus, + billingStatusChangeDate: dateOrNull(line.status.activationDate), + billingActivationDate: dateOrNull(line.status.activationDate), + createDate: dateOrNull(line.status.activationDate), + raw: line + } + + return transformed; +}