Completada la tarea de volcado
This commit is contained in:
@@ -10,7 +10,7 @@ import { env } from './env/index.js';
|
|||||||
export const pgPoolIntranet = new Pool({
|
export const pgPoolIntranet = new Pool({
|
||||||
user: env.POSTGRES_USER,
|
user: env.POSTGRES_USER,
|
||||||
host: env.POSTGRES_HOST,
|
host: env.POSTGRES_HOST,
|
||||||
database: "intranet",
|
database: "postgres",
|
||||||
password: env.POSTGRES_PASSWORD,
|
password: env.POSTGRES_PASSWORD,
|
||||||
port: Number(env.POSTGRES_PORT) || 5432,
|
port: Number(env.POSTGRES_PORT) || 5432,
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -5,6 +5,9 @@ import { httpInstance } from "./config/httpClient.config.js"
|
|||||||
import { CheckObjeniousRequests } from "./tasks/check_objenious_request.js"
|
import { CheckObjeniousRequests } from "./tasks/check_objenious_request.js"
|
||||||
import { ObjeniousOperationsRepository } from "sim-shared/infrastructure/ObjeniousOperationRepository.js"
|
import { ObjeniousOperationsRepository } from "sim-shared/infrastructure/ObjeniousOperationRepository.js"
|
||||||
import { OrderRepository } from "sim-shared/infrastructure/OrderRepository.js"
|
import { OrderRepository } from "sim-shared/infrastructure/OrderRepository.js"
|
||||||
|
import { TaskVolcadoLineas } from "./tasks/volcado_lineas.js"
|
||||||
|
import { ObjeniousLinesRepository } from "./infranstructure/ObjeniousLinesRepository.js"
|
||||||
|
import { postgresClientIntranet } from "./config/intranetPostgresConfig.js"
|
||||||
|
|
||||||
async function startCron() {
|
async function startCron() {
|
||||||
const commonSettings = {
|
const commonSettings = {
|
||||||
@@ -14,10 +17,13 @@ async function startCron() {
|
|||||||
|
|
||||||
const httpClient = httpInstance
|
const httpClient = httpInstance
|
||||||
const pgClient = new PgClient({ pool: pgPool })
|
const pgClient = new PgClient({ pool: pgPool })
|
||||||
|
|
||||||
|
console.log("[i] Comprobando conexion con la BDD ")
|
||||||
await pgClient.checkDatabaseConnection()
|
await pgClient.checkDatabaseConnection()
|
||||||
await pgClient.checkDatabaseConnection()
|
|
||||||
const operationRepository = new ObjeniousOperationsRepository(pgClient)
|
const operationRepository = new ObjeniousOperationsRepository(pgClient)
|
||||||
const orderRepository = new OrderRepository(pgClient)
|
const orderRepository = new OrderRepository(pgClient)
|
||||||
|
const objeniousLineRepository = new ObjeniousLinesRepository(postgresClientIntranet)
|
||||||
|
|
||||||
const objTask = new CheckObjeniousRequests(
|
const objTask = new CheckObjeniousRequests(
|
||||||
operationRepository,
|
operationRepository,
|
||||||
@@ -25,23 +31,28 @@ async function startCron() {
|
|||||||
httpClient,
|
httpClient,
|
||||||
)
|
)
|
||||||
|
|
||||||
await objTask.getPendingOperations()
|
const volcadoLineasTask = new TaskVolcadoLineas(httpClient, objeniousLineRepository)
|
||||||
|
|
||||||
|
const PERIODO_PETICIONES = 10 * 60 * 60
|
||||||
const interval = setInterval(async () => {
|
const interval = setInterval(async () => {
|
||||||
console.log("Updating...")
|
try {
|
||||||
await objTask.getPendingOperations()
|
await objTask.getPendingOperations()
|
||||||
console.log("Update finished")
|
} catch (e) {
|
||||||
}, 10 * 60 * 1000)
|
console.error("[x] Error de actualizacion de las lineas ")
|
||||||
/*
|
}
|
||||||
const task = cron.createTask("* * * * *", async () => {
|
}, PERIODO_PETICIONES)
|
||||||
}
|
|
||||||
, {
|
const PERIODO_VOLCADO = 60 * 60 * 1000
|
||||||
...commonSettings,
|
const volcadoInterval = setInterval(async () => {
|
||||||
name: "Test"
|
try {
|
||||||
})
|
await volcadoLineasTask.loadLines()
|
||||||
*/
|
} catch (e) {
|
||||||
|
console.error("[x] Volcado de lineas de Objenious Fallido", e)
|
||||||
|
}
|
||||||
|
}, PERIODO_VOLCADO)
|
||||||
|
|
||||||
|
await volcadoLineasTask.loadLines()
|
||||||
|
|
||||||
//await objTask.getPendingOperations()
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import test, { describe } from "node:test";
|
import test, { after, before, describe } from "node:test";
|
||||||
import { CreateObjeniousLineDTO } from "sim-shared/domain/objeniousLine.js";
|
import { CreateObjeniousLineDTO } from "sim-shared/domain/objeniousLine.js";
|
||||||
import { ObjeniousLinesRepository } from "./ObjeniousLinesRepository.js";
|
import { ObjeniousLinesRepository } from "./ObjeniousLinesRepository.js";
|
||||||
import { postgrClient } from "../config/postgreConfig.js";
|
import { postgrClient } from "../config/postgreConfig.js";
|
||||||
@@ -9,7 +9,7 @@ describe("Line insertion test", async () => {
|
|||||||
const pgClient = postgrClient // En prod hay que usar el de Intrantet para usar la otra base de datos
|
const pgClient = postgrClient // En prod hay que usar el de Intrantet para usar la otra base de datos
|
||||||
const lineRepository = new ObjeniousLinesRepository(pgClient)
|
const lineRepository = new ObjeniousLinesRepository(pgClient)
|
||||||
const lineaTest: CreateObjeniousLineDTO = {
|
const lineaTest: CreateObjeniousLineDTO = {
|
||||||
simId: "1234",
|
simId: 1234,
|
||||||
iccid: "9999999999999",
|
iccid: "9999999999999",
|
||||||
msisdn: "34654674732",
|
msisdn: "34654674732",
|
||||||
imei: "219789481293",
|
imei: "219789481293",
|
||||||
@@ -27,6 +27,19 @@ describe("Line insertion test", async () => {
|
|||||||
raw: { test: "test" } as any // Para este test no hace falta
|
raw: { test: "test" } as any // Para este test no hace falta
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Clean up before and after tests to ensure isolation
|
||||||
|
const cleanup = async () => {
|
||||||
|
await pgClient.query("DELETE FROM objenious_lines WHERE simId = 1234");
|
||||||
|
};
|
||||||
|
|
||||||
|
before(async () => {
|
||||||
|
await cleanup()
|
||||||
|
})
|
||||||
|
|
||||||
|
after(async () => {
|
||||||
|
await cleanup()
|
||||||
|
})
|
||||||
|
|
||||||
test("Should insert new line", async () => {
|
test("Should insert new line", async () => {
|
||||||
const res = await lineRepository.insertOrUpdate(lineaTest)
|
const res = await lineRepository.insertOrUpdate(lineaTest)
|
||||||
assert.ok(res != undefined, "The line wasn't created")
|
assert.ok(res != undefined, "The line wasn't created")
|
||||||
@@ -34,7 +47,13 @@ describe("Line insertion test", async () => {
|
|||||||
|
|
||||||
test("Should not update a line if the hash is the same", async () => {
|
test("Should not update a line if the hash is the same", async () => {
|
||||||
const res = await lineRepository.insertOrUpdate(lineaTest)
|
const res = await lineRepository.insertOrUpdate(lineaTest)
|
||||||
console.log("Update", res)
|
assert.ok(res == undefined, "The line have been updated")
|
||||||
assert.ok(res != undefined, "The line wasn't updated")
|
})
|
||||||
|
|
||||||
|
test("Should update a line if the hash changes", async () => {
|
||||||
|
const updated = structuredClone(lineaTest)
|
||||||
|
lineaTest.billingActivationDate = new Date()
|
||||||
|
const res = await lineRepository.insertOrUpdate(lineaTest)
|
||||||
|
assert.ok(res != undefined, "The line have been updated")
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -1,3 +1,8 @@
|
|||||||
|
/**
|
||||||
|
* Repositorio para el volcado de lineas de objenious en intranet
|
||||||
|
* solo para uso en el volcado.
|
||||||
|
*/
|
||||||
|
import { createHash } from "node:crypto";
|
||||||
import { PoolClient } from "pg";
|
import { PoolClient } from "pg";
|
||||||
import { CreateObjeniousLineDTO } from "sim-shared/domain/objeniousLine.js";
|
import { CreateObjeniousLineDTO } from "sim-shared/domain/objeniousLine.js";
|
||||||
import { PgClient } from "sim-shared/infrastructure/PgClient.js";
|
import { PgClient } from "sim-shared/infrastructure/PgClient.js";
|
||||||
@@ -11,7 +16,8 @@ export class ObjeniousLinesRepository {
|
|||||||
private generateLineHash(data: CreateObjeniousLineDTO) {
|
private generateLineHash(data: CreateObjeniousLineDTO) {
|
||||||
try {
|
try {
|
||||||
const lineStr = JSON.stringify(data)
|
const lineStr = JSON.stringify(data)
|
||||||
return lineStr
|
const hash = createHash("sha256").update(lineStr).digest("base64url")
|
||||||
|
return hash
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
console.error("[x] Error generando el hash de la linea", data)
|
console.error("[x] Error generando el hash de la linea", data)
|
||||||
return undefined
|
return undefined
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import { env } from "#config/env/index.js";
|
import { env } from "../config/env/index.js";
|
||||||
import { OrderRepository } from "sim-shared/infrastructure/OrderRepository.js";
|
import { OrderRepository } from "sim-shared/infrastructure/OrderRepository.js";
|
||||||
import axios from "axios";
|
import axios from "axios";
|
||||||
import { IOperationsRepository, Objenious, ObjeniousOperation, ObjeniousOperationChange, StatusEnum } from "sim-shared/domain/operationsRepository.port.js";
|
import { IOperationsRepository, Objenious, ObjeniousOperation, ObjeniousOperationChange, StatusEnum } from "sim-shared/domain/operationsRepository.port.js";
|
||||||
@@ -16,6 +16,7 @@ export class CheckObjeniousRequests {
|
|||||||
* TODO: meter a una funcion a parte task con los 3 pasos
|
* TODO: meter a una funcion a parte task con los 3 pasos
|
||||||
*/
|
*/
|
||||||
public async getPendingOperations() {
|
public async getPendingOperations() {
|
||||||
|
console.log("[i] Inicio revision de peticiones")
|
||||||
// 1. Se obtienen todas las operaciones pendientes de la BDD
|
// 1. Se obtienen todas las operaciones pendientes de la BDD
|
||||||
const pendingOperations = await this.operationsRepository.getPendingOperations()
|
const pendingOperations = await this.operationsRepository.getPendingOperations()
|
||||||
|
|
||||||
@@ -49,6 +50,8 @@ export class CheckObjeniousRequests {
|
|||||||
|
|
||||||
console.log("[cron] Solicitando status para", merged.map(e => e.id))
|
console.log("[cron] Solicitando status para", merged.map(e => e.id))
|
||||||
const result = await this.getMassActionsStatus(merged)
|
const result = await this.getMassActionsStatus(merged)
|
||||||
|
|
||||||
|
console.log("[o] Revisión de eventos completa")
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -1,68 +1,133 @@
|
|||||||
import { ObjeniousLine, ObjeniousLineResponse } from "sim-shared/domain/objeniousLine.js";
|
import assert from "node:assert";
|
||||||
|
import { lineToCreateLineDto, ObjeniousLine, ObjeniousLineResponse } from "sim-shared/domain/objeniousLine.js";
|
||||||
import { tryCatch, Result } from "sim-shared/domain/Result.js";
|
import { tryCatch, Result } from "sim-shared/domain/Result.js";
|
||||||
import { HttpClient } from "sim-shared/infrastructure/HTTPClient.js";
|
import { HttpClient } from "sim-shared/infrastructure/HTTPClient.js";
|
||||||
|
import { ObjeniousLinesRepository } from "../infranstructure/ObjeniousLinesRepository.js";
|
||||||
|
import { AxiosResponse } from "axios";
|
||||||
|
import { constants } from "node:buffer";
|
||||||
|
|
||||||
const MAX_PAGE_SIZE = 1000
|
const MAX_PAGE_SIZE = 100
|
||||||
|
|
||||||
export class VolcadoLineas {
|
export class TaskVolcadoLineas {
|
||||||
constructor(
|
constructor(
|
||||||
private readonly httpClient: HttpClient
|
private readonly httpClient: HttpClient,
|
||||||
|
private readonly linesRepository: ObjeniousLinesRepository,
|
||||||
) {
|
) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Mover al repo
|
* Mover al repo
|
||||||
*/
|
*/
|
||||||
private async getLinesByStatus(args?: {
|
private async * getLinesByStatus(args?: {
|
||||||
pageSize?: number,
|
pageSize?: number,
|
||||||
pageNumber?: number,
|
pageNumber?: number,
|
||||||
status?: string
|
status?: string
|
||||||
}): Promise<Result<string, ObjeniousLine[]>> {
|
}): AsyncGenerator<Result<string, ObjeniousLine[]>, Result<string, ObjeniousLine[]>, any> {
|
||||||
|
|
||||||
const path = "/lines"
|
const path = "/lines"
|
||||||
const pageSize = args?.pageSize ?? MAX_PAGE_SIZE;
|
const pageSize = args?.pageSize ?? MAX_PAGE_SIZE;
|
||||||
const status = args?.status ?? null;
|
|
||||||
let currentPage = args?.pageNumber ?? 0;
|
|
||||||
let totalPages = 1;
|
|
||||||
let allLines: ObjeniousLine[] = []
|
|
||||||
|
|
||||||
const loadNextLine = async () => {
|
let currentPage = args?.pageNumber ?? 0;
|
||||||
const nextPage = await tryCatch<ObjeniousLineResponse>(this.httpClient.client.get(path, {
|
let totalPages: number | undefined = undefined; // Como limite de paginas, igual es pasarse pero hasta que se lea
|
||||||
params: {
|
|
||||||
simStatus: status,
|
const params: Record<string, string | number> = {}
|
||||||
pageSize: pageSize,
|
|
||||||
pageNumber: currentPage
|
const loadNextLine = async (page: number): Promise<Result<string, ObjeniousLine[]>> => {
|
||||||
}
|
if (args?.status != undefined) params["simStatus"] = args.status
|
||||||
|
params["pageSize"] = pageSize
|
||||||
|
params["pageNumber"] = page
|
||||||
|
console.log("Params", params)
|
||||||
|
console.log(`[i] Cargando pagina ${currentPage} de ${totalPages ?? "(desc)"}`)
|
||||||
|
const nextPage = await tryCatch<AxiosResponse<ObjeniousLineResponse>>(this.httpClient.client.get(path, {
|
||||||
|
params: params
|
||||||
}))
|
}))
|
||||||
|
|
||||||
if (nextPage.error != undefined) {
|
if (nextPage.error != undefined) {
|
||||||
|
console.error(nextPage.error.msg)
|
||||||
return {
|
return {
|
||||||
error: nextPage.error.msg.message
|
error: nextPage.error.msg.message
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Se aumenta para la siguiente ejecucion
|
// Se aumenta para la siguiente ejecucion
|
||||||
currentPage = nextPage.data.pageNumber + 1
|
console.log(`[i] Página ${currentPage} completa, total: ${nextPage.data.data.totalPages}`)
|
||||||
allLines = [...allLines, ...nextPage.data.content]
|
totalPages = nextPage.data.data.totalPages
|
||||||
totalPages = nextPage.data.totalPages
|
|
||||||
|
return {
|
||||||
|
data: nextPage.data.data.content
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// El inicio se ejecuta siempre
|
// El inicio se ejecuta siempre
|
||||||
await loadNextLine()
|
const lines = await loadNextLine(currentPage)
|
||||||
|
|
||||||
|
if (lines.error != undefined) {
|
||||||
|
console.error("[x] Error obteniendo las lineas, cancelando operación");
|
||||||
|
return {
|
||||||
|
error: "Error cargando lineas"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
currentPage++;
|
||||||
|
|
||||||
|
yield {
|
||||||
|
data: lines.data
|
||||||
|
}
|
||||||
|
|
||||||
// Copia para evitar bucles infinitos por error de la api
|
// Copia para evitar bucles infinitos por error de la api
|
||||||
const maxPages = totalPages
|
const maxPages = totalPages
|
||||||
|
assert.ok(maxPages != undefined, "No se ha defindo el numero de paginas") // Nunca deberia pasar pero así se evitan bucles infnitos
|
||||||
for (let i = currentPage; i < maxPages; i++) {
|
console.log("maxPages", maxPages)
|
||||||
await loadNextLine()
|
for (let i = currentPage; i < maxPages!; i++) {
|
||||||
|
console.log("Bucle i:", i, "page: ", currentPage)
|
||||||
|
yield await loadNextLine(currentPage);
|
||||||
|
currentPage++;
|
||||||
}
|
}
|
||||||
|
|
||||||
return {
|
return {
|
||||||
data: allLines
|
data: []
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async saveLines(lines: ObjeniousLine[]) {
|
||||||
|
const linesToCreate = lines.map(lineToCreateLineDto)
|
||||||
|
let created: number[] = []
|
||||||
|
|
||||||
|
|
||||||
|
for (const line of linesToCreate) {
|
||||||
|
// Si es lento pasar a Promise.all
|
||||||
|
const res = await this.linesRepository.insertOrUpdate(line)
|
||||||
|
if (res?.id != undefined)
|
||||||
|
created.push(res.id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public async loadLines() {
|
public async loadLines() {
|
||||||
|
console.log("[i] Iniciando task de volcado de lineas de Objenious")
|
||||||
|
// Carga todas las lineas en memoria, hay que comprobar que no se gaste demasiada
|
||||||
|
|
||||||
|
const linesIterator = this.getLinesByStatus()
|
||||||
|
let lines = await linesIterator.next()
|
||||||
|
|
||||||
|
if (lines.value.error != undefined || lines.value.data == undefined) {
|
||||||
|
console.error("[x] Error cargando las lineas a volcar", lines.value.error)
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.saveLines(lines.value.data)
|
||||||
|
|
||||||
|
while (!lines.done) {
|
||||||
|
console.log()
|
||||||
|
lines = await linesIterator.next()
|
||||||
|
if (lines.value.error != undefined || lines.value.data == undefined) {
|
||||||
|
console.error("[x] Error cargando las lineas a volcar", lines.value.error)
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
await this.saveLines(lines.value.data)
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log("[i] Terminado task de volcado de lineas de Objenious")
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -93,23 +93,52 @@ export type ObjeniousLine = {
|
|||||||
|
|
||||||
export type ObjeniousLineDb = {
|
export type ObjeniousLineDb = {
|
||||||
id: number;
|
id: number;
|
||||||
simId?: string;
|
simId?: number;
|
||||||
iccid: string;
|
iccid: string;
|
||||||
msisdn?: string;
|
msisdn?: string;
|
||||||
imei?: string;
|
imei?: string;
|
||||||
imeiChangeDate?: Date;
|
imeiChangeDate?: Date;
|
||||||
offerCode?: string;
|
offerCode?: string;
|
||||||
status?: string;
|
status?: string;
|
||||||
preactivationDate?: Date;
|
preactivationDate?: Date | null;
|
||||||
activationDate?: Date;
|
activationDate?: Date | null;
|
||||||
commercialStatus?: string;
|
commercialStatus?: string;
|
||||||
commercialStatusDate?: Date;
|
commercialStatusDate?: Date | null;
|
||||||
billingStatus?: string;
|
billingStatus?: string;
|
||||||
billingStatusChangeDate?: Date;
|
billingStatusChangeDate?: Date | null;
|
||||||
billingActivationDate?: Date;
|
billingActivationDate?: Date | null;
|
||||||
createDate?: Date;
|
createDate?: Date | null;
|
||||||
raw: ObjeniousLine;
|
raw: ObjeniousLine;
|
||||||
}
|
}
|
||||||
|
|
||||||
// DTO para inserción (omite el ID autogenerado)
|
// DTO para inserción (omite el ID autogenerado)
|
||||||
export type CreateObjeniousLineDTO = Omit<ObjeniousLineDb, 'id'>;
|
export type CreateObjeniousLineDTO = Omit<ObjeniousLineDb, 'id'>;
|
||||||
|
|
||||||
|
export function lineToCreateLineDto(line: ObjeniousLine): CreateObjeniousLineDTO {
|
||||||
|
|
||||||
|
const dateOrNull = (data: string | null) => {
|
||||||
|
if (data == null) return null;
|
||||||
|
return new Date(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
const transformed: CreateObjeniousLineDTO = {
|
||||||
|
simId: line.identifier.simId,
|
||||||
|
iccid: line.identifier.iccid,
|
||||||
|
msisdn: line.identifier.msisdn,
|
||||||
|
imei: line.identifier.imei,
|
||||||
|
imeiChangeDate: new Date(line.device.imeiChangeDate),
|
||||||
|
offerCode: line.offer.code,
|
||||||
|
status: line.status.status,
|
||||||
|
preactivationDate: dateOrNull(line.status.preactivationDate),
|
||||||
|
activationDate: dateOrNull(line.status.activationDate),
|
||||||
|
commercialStatus: line.status.commercialStatus,
|
||||||
|
commercialStatusDate: dateOrNull(line.status.commercialStatusDate),
|
||||||
|
billingStatus: line.status.billingStatus,
|
||||||
|
billingStatusChangeDate: dateOrNull(line.status.activationDate),
|
||||||
|
billingActivationDate: dateOrNull(line.status.activationDate),
|
||||||
|
createDate: dateOrNull(line.status.activationDate),
|
||||||
|
raw: line
|
||||||
|
}
|
||||||
|
|
||||||
|
return transformed;
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user