import { IOperationsRepository, ObjeniousOperation, ObjeniousOperationChange } from "sim-shared/domain/operationsRepository.port.js"; import { Result, tryCatch } from "sim-shared/domain/Result.js"; import { PgClient } from "sim-shared/infrastructure/PgClient.js"; import { ObjeniousLine, ObjeniousLineResponse } from "../domain/objeniousLine.js"; import { HttpClient } from "./HTTPClient.js"; import assert from "node:assert"; import { AxiosResponse } from "axios"; export class ObjeniousOperationsRepository implements IOperationsRepository { constructor( private http: HttpClient, private readonly pgClient: PgClient ) { } /** * Consulta el estado de una o mas lineas directamente a la API de Objenious * TODO: No hay paginacion como en getLinesByStatusAPI */ public async getLinesAPI( identifierType: "ICCID" | "IMSI" | "IMEI" | "MSISDN" | "REFERENCE", identifiers: string[] ): Promise> { if (identifiers.length == 0) { return { data: [] } } // Comprobar < MAX_PAGE_SIZE (Poco probable) const path = "/lines" const params = { "identifier.identifierType": identifierType, "identifier.identifiers": identifiers.toString() } const req = this.http.client.get(path, { params: params }) const res = await tryCatch(req) if (res.error != undefined) { return { error: res.error?.message } } const lines = res.data.data.content return { data: lines } } private MAX_PAGE_SIZE = 1000 public async * getLinesByStatusAPI(args?: { pageSize?: number, pageNumber?: number, status?: string, iccids?: string[] }): AsyncGenerator, Result, any> { const path = "/lines" const pageSize = args?.pageSize ?? this.MAX_PAGE_SIZE; let currentPage = args?.pageNumber ?? 0; let totalPages: number | undefined = undefined; // Como limite de paginas, igual es pasarse pero hasta que se lea const params: Record = {} // Si se va a filtrar por iccids especificamente, en un futuro habra que ampliar el tipo de filtros if (args?.iccids != undefined) { params["identifier.identifierType"] = "ICCID" params["identifier.identifiers"] = args.iccids.toString() } const loadNextLine = async (page: number): Promise> => { if (args?.status != undefined) params["simStatus"] = args.status params["pageSize"] = pageSize params["pageNumber"] = page console.log(`[i] Cargando pagina ${currentPage} de ${totalPages ?? "(desc)"}`) const nextPage = await tryCatch>(this.http.client.get(path, { params: params })) if (nextPage.error != undefined) { console.error(nextPage.error) return { error: nextPage.error.message } } // Se aumenta para la siguiente ejecucion console.log(`[i] Página ${currentPage} completa, total: ${nextPage.data.data.totalPages}`) totalPages = nextPage.data.data.totalPages return { data: nextPage.data.data.content } } // El inicio se ejecuta siempre const lines = await loadNextLine(currentPage) if (lines.error != undefined) { console.error("[x] Error obteniendo las lineas, cancelando operación"); return { error: "Error cargando lineas" } } currentPage++; yield { data: lines.data } // Copia para evitar bucles infinitos por error de la api const maxPages = totalPages assert.ok(maxPages != undefined, "No se ha defindo el numero de paginas") // Nunca deberia pasar pero así se evitan bucles infnitos console.log("maxPages", maxPages) for (let i = currentPage; i < maxPages!; i++) { console.log("Bucle i:", i, "page: ", currentPage) yield await loadNextLine(currentPage); currentPage++; } return { data: [] } } async createOperation(data: ObjeniousOperation): Promise> { const query = ` INSERT INTO objenious_operation (operation, iccids, status, max_retry, request_id) VALUES ($1, $2, $3, $4, $5) RETURNING *`; const values = [data.operation, data.iccids, data.status, data.max_retry, data.request_id]; const { rows } = await this.pgClient.query(query, values); return >{ data: rows[0] } } async getLastOperationOfLine(iccid: string) { const query = ` SELECT * FROM public.objenious_operation WHERE iccids = $1 and error is null ORDER BY id asc limit 1 ` const values = [iccid]; const { rows } = await this.pgClient.query(query, values); return >{ data: rows[0] } } async updateOperation(data: ObjeniousOperationChange): Promise> { const client = await this.pgClient.connect(); const { new_status, previous_status, error, new_request_id, new_mass_action_id, operation_id, new_objenious_status, previous_objenious_status } = data try { await client.query('BEGIN'); // 1. Actualizar objenious_operation (la operacion base) const updateParams = [operation_id, new_status, error, new_request_id, new_mass_action_id, new_objenious_status] const updateOpQuery = ` UPDATE objenious_operation SET status = $2::status_enum, error = COALESCE($3,error), request_id = COALESCE($4, request_id), mass_action_id = COALESCE($5, mass_action_id), last_change_date = now() at time zone 'utc', end_date = CASE WHEN $2 IN ('finished','error') THEN now() at time zone 'utc' ELSE end_date END, objenious_status = $6 WHERE id = $1`; const operation = await client.query( updateOpQuery, updateParams) // 2. Nuevo registro en objenious_operation_change (indica un cambio de estado) const insertChangeQuery = ` INSERT INTO objenious_operation_change (operation_id, new_status, previous_status, error, new_request_id, new_mass_action_id, new_objenious_status, previous_objenious_status) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`; await client.query(insertChangeQuery, [operation_id, new_status, previous_status, error, new_request_id, new_mass_action_id, new_objenious_status, previous_objenious_status]); await client.query('COMMIT'); return >{ data: operation.rows[0] } } catch (e) { console.error("Error añadiendo actualizacion de la operation", e) console.error("datos errorneos", data) await client.query('ROLLBACK'); return >{ data: undefined, error: e } } finally { client.release(); } } async getPendingOperations(): Promise> { // Aprovecha el índice 'pending_operations' const query = `SELECT * FROM objenious_operation WHERE end_date IS NULL ORDER BY start_date ASC`; try { const { rows } = await this.pgClient.query(query); return { error: undefined, data: rows }; } catch (e) { return { error: String(e), data: undefined } } } }