Files
sf-sim/packages/sim-shared/infrastructure/ObjeniousOperationRepository.ts

324 lines
11 KiB
TypeScript

import { IOperationsRepository, ObjeniousOperation, ObjeniousOperationChange } from "sim-shared/domain/operationsRepository.port.js";
import { Result, tryCatch } from "sim-shared/domain/Result.js";
import { PgClient } from "sim-shared/infrastructure/PgClient.js";
import { ObjeniousLine, ObjeniousLineResponse } from "../domain/objeniousLine.js";
import { HttpClient } from "./HTTPClient.js";
import assert from "node:assert";
import { AxiosResponse } from "axios";
export class ObjeniousOperationsRepository implements IOperationsRepository {
constructor(
private http: HttpClient,
private readonly pgClient: PgClient
) {
}
public async getLineByIccid(iccid: string): Promise<Result<string, ObjeniousLine[]>> {
const path = "/lines"
const params = new URLSearchParams([
["identifier.identifierType", "ICCID"],
["identifier.identifiers", iccid]
])
const req = this.http.client.get<ObjeniousLineResponse>(path, {
params: params
})
const res = await tryCatch(req)
if (res.error != undefined) {
return {
error: res.error?.message
}
}
const lines = res.data.data.content
return {
data: lines
}
}
/**
* Consulta el estado de una o mas lineas directamente a la API de Objenious
* TODO: No hay paginacion como en getLinesByStatusAPI
*/
public async getLinesAPI(
identifierType: "ICCID" | "IMSI" | "IMEI" | "MSISDN" | "REFERENCE",
identifiers: string[]
): Promise<Result<string, ObjeniousLine[]>> {
if (identifiers.length == 0) {
return {
data: []
}
}
// Comprobar < MAX_PAGE_SIZE (Poco probable)
const path = "/lines"
const params = {
"identifier.identifierType": identifierType,
"identifier.identifiers": identifiers.toString()
}
const req = this.http.client.get<ObjeniousLineResponse>(path, {
params: params
})
const res = await tryCatch(req)
if (res.error != undefined) {
return {
error: res.error?.message
}
}
const lines = res.data.data.content
return {
data: lines
}
}
private MAX_PAGE_SIZE = 1000
public async * getLinesByStatusAPI(args?: {
pageSize?: number,
pageNumber?: number,
status?: string,
iccids?: string[]
}): AsyncGenerator<Result<string, ObjeniousLine[]>, Result<string, ObjeniousLine[]>, any> {
const path = "/lines"
const pageSize = args?.pageSize ?? this.MAX_PAGE_SIZE;
let currentPage = args?.pageNumber ?? 0;
let totalPages: number | undefined = undefined; // Como limite de paginas, igual es pasarse pero hasta que se lea
const params: Record<string, string | number> = {}
// Si se va a filtrar por iccids especificamente, en un futuro habra que ampliar el tipo de filtros
if (args?.iccids != undefined) {
params["identifier.identifierType"] = "ICCID"
params["identifier.identifiers"] = args.iccids.toString()
}
const loadNextLine = async (page: number): Promise<Result<string, ObjeniousLine[]>> => {
if (args?.status != undefined) params["simStatus"] = args.status
params["pageSize"] = pageSize
params["pageNumber"] = page
console.log(`[i] Cargando pagina ${currentPage} de ${totalPages ?? "(desc)"}`)
const nextPage = await tryCatch<AxiosResponse<ObjeniousLineResponse>>(this.http.client.get(path, {
params: params
}))
if (nextPage.error != undefined) {
console.error(nextPage.error)
return {
error: nextPage.error.message
}
}
// Se aumenta para la siguiente ejecucion
console.log(`[i] Página ${currentPage} completa, total: ${nextPage.data.data.totalPages}`)
totalPages = nextPage.data.data.totalPages
return {
data: nextPage.data.data.content
}
}
// El inicio se ejecuta siempre
const lines = await loadNextLine(currentPage)
if (lines.error != undefined) {
console.error("[x] Error obteniendo las lineas, cancelando operación");
return {
error: "Error cargando lineas"
}
}
currentPage++;
yield {
data: lines.data
}
// Copia para evitar bucles infinitos por error de la api
const maxPages = totalPages
assert.ok(maxPages != undefined, "No se ha defindo el numero de paginas") // Nunca deberia pasar pero así se evitan bucles infnitos
console.log("maxPages", maxPages)
for (let i = currentPage; i < maxPages!; i++) {
console.log("Bucle i:", i, "page: ", currentPage)
yield await loadNextLine(currentPage);
currentPage++;
}
return {
data: []
}
}
async createOperation(data: ObjeniousOperation): Promise<Result<string, ObjeniousOperation>> {
const query = `
INSERT INTO objenious_operation (operation, iccids, status, max_retry, request_id)
VALUES ($1, $2, $3, $4, $5)
RETURNING *`;
const values = [data.operation, data.iccids, data.status, data.max_retry, data.request_id];
const { rows } = await this.pgClient.query(query, values);
return <Result<string, ObjeniousOperation>>{
data: rows[0]
}
}
async getLastOperationOfLine(iccid: string) {
const query = `
SELECT * FROM public.objenious_operation
WHERE iccids = $1 and error is null
ORDER BY id asc limit 1
`
const values = [iccid];
const { rows } = await this.pgClient.query(query, values);
return <Result<string, ObjeniousOperation>>{
data: rows[0]
}
}
async updateOperation(data: ObjeniousOperationChange): Promise<Result<string, ObjeniousOperation>> {
const client = await this.pgClient.connect();
const {
new_status,
previous_status,
error,
new_request_id,
new_mass_action_id,
operation_id,
new_objenious_status,
previous_objenious_status
} = data
try {
await client.query('BEGIN');
// 1. Actualizar objenious_operation (la operacion base)
const updateParams = [operation_id, new_status, error, new_request_id, new_mass_action_id, new_objenious_status]
const updateOpQuery = `
UPDATE objenious_operation
SET
status = $2::status_enum,
error = COALESCE($3,error),
request_id = COALESCE($4, request_id),
mass_action_id = COALESCE($5, mass_action_id),
last_change_date = now() at time zone 'utc',
end_date = CASE WHEN $2 IN ('finished','error') THEN now() at time zone 'utc' ELSE end_date END,
objenious_status = $6
WHERE id = $1`;
const operation = await client.query<ObjeniousOperation>(
updateOpQuery, updateParams)
// 2. Nuevo registro en objenious_operation_change (indica un cambio de estado)
const insertChangeQuery = `
INSERT INTO objenious_operation_change (operation_id, new_status, previous_status, error, new_request_id, new_mass_action_id, new_objenious_status, previous_objenious_status)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`;
await client.query(insertChangeQuery, [operation_id, new_status, previous_status, error, new_request_id, new_mass_action_id, new_objenious_status, previous_objenious_status]);
await client.query('COMMIT');
return <Result<string, ObjeniousOperation>>{
data: operation.rows[0]
}
} catch (e) {
console.error("Error añadiendo actualizacion de la operation", e)
console.error("datos errorneos", data)
await client.query('ROLLBACK');
return <Result<string, ObjeniousOperation>>{
data: undefined,
error: e
}
} finally {
client.release();
}
}
async getPendingOperations(): Promise<Result<string, ObjeniousOperation[]>> {
// Aprovecha el índice 'pending_operations'
const query = `SELECT * FROM objenious_operation WHERE end_date IS NULL ORDER BY start_date ASC`;
try {
const { rows } = await this.pgClient.query<ObjeniousOperation>(query);
return {
error: undefined,
data: rows
};
} catch (e) {
return {
error: String(e),
data: undefined
}
}
}
/**
* Obtiene el tiempo en suspensión de una linea en miliseguntos y dias efetivos
* Todo el calculo se hace en postgres. Puede que haga falta traer las transiciones
* que normalmente son pocas, para hacer filtros personalizados.
*/
async getSuspendedTime(iccid: string): Promise<Result<string, { total_milliseconds: number, total_days: number }>> {
const query = `
WITH ordered_events AS (
-- 1. Selecciona y normaliza los eventos relevantes del historial
-- Se define el 'estado' final (suspendido vs activo) basado en la operación
SELECT operation, end_date,
CASE WHEN operation = 'suspend' THEN 'suspended' ELSE 'active' END as state
FROM objenious_operation
WHERE iccids = $1 AND status = 'finished' AND error IS NULL
AND operation IN ('suspend', 'activate', 'reactivate', 'terminate')
ORDER BY end_date
),
state_transitions AS (
-- 2. Detecta cambios de estado comparando con la fila anterior (LAG)
SELECT state, end_date,
LAG(state) OVER (ORDER BY end_date) as prev_state
FROM ordered_events
),
filtered_transitions AS (
-- 3. Filtra solo las filas donde el estado realmente ha cambaido
-- Se obtiene la fecha de inicio del siguiente intervalo (LEAD)
SELECT state, end_date,
LEAD(end_date) OVER (ORDER BY end_date) as next_end_date
FROM state_transitions
WHERE state IS DISTINCT FROM prev_state
),
intervals AS (
-- 4. Calcula la duración de los periodos en los que el estado fue 'suspended'
-- Se usa NOW() para el intervalo abierto (último estado hasta hoy)
SELECT EXTRACT(EPOCH FROM (COALESCE(next_end_date, NOW() at time zone 'utc') - end_date)) * 1000 as ms_duration,
(COALESCE(next_end_date, NOW() at time zone 'utc')::date - end_date::date) + 1 as days_duration
FROM filtered_transitions
WHERE state = 'suspended'
)
-- 5. Suma total de tiempo en estado de suspensión
SELECT COALESCE(SUM(ms_duration)::bigint, 0) as total_milliseconds,
COALESCE(SUM(days_duration), 0) as total_days
FROM intervals;
`;
try {
const { rows } = await this.pgClient.query<{ total_milliseconds: string, total_days: string }>(query, [iccid]);
return {
data: {
total_milliseconds: parseFloat(rows[0].total_milliseconds),
total_days: parseInt(rows[0].total_days)
}
};
} catch (e) {
console.error("Error calculating suspended time", e);
return {
error: String(e),
data: undefined
};
}
}
}