/** * TODO: Usar */ import { PoolClient, QueryResult, QueryResultRow } from "pg"; import { CreateOrderDTO, ErrorOrderDTO, FinishOrderDTO, OrderTracking, UpdateOrderDTO } from "../domain/Order.js"; import { Result, tryCatch } from "../domain/Result.js"; import { PgClient } from "./PgClient.js"; import assert from "node:assert"; /** * Agrupa todas las operaciones de *Order*. * Las *Order* son seguimientos de operaciones que han entrado correctamente a cualquier cola * de mensajes independientemente del pais/empresa objetivo de la tarjeta. * * Todas las operaciones devuelven un tipo Result para gestionar los errores * de acceso a la BDD, para las operaciones correctas se devuleve Error = undefined, para * las erroneas Data = undefined. */ export class OrderRepository { constructor( private readonly pgClient: PgClient, ) { } /** * Comprobacion de la query y devolucion del primer resulado * Garantiza la gestion de errores */ private async getFirst(queryPromise: Promise>) { try { const queryResult = await queryPromise return >{ data: queryResult.rows[0] } } catch (e) { return >{ error: e as string } } } /** * Se asume que se va a devolver una lista del tipo T */ private async getAll(queryPromise: Promise>) { try { const queryResult = await queryPromise return >{ data: queryResult.rows } } catch (e) { return >{ error: e as string } } } /** * El tipo representa el contenido del mensaje de los order */ public async getOrderById(data: { id: number }): Promise>> { const query = ` SELECT * FROM order_tracking WHERE id = $1 ` const values = [data.id] const queryPromise = this.pgClient.query>(query, values) const result = await this.getFirst(queryPromise); return result } /** * Busqueda según la id de RabbitMq */ public async getOrderByQueueId(data: { correlation_id: string }, pool?: PoolClient) { const query = ` SELECT * FROM order_tracking WHERE correlation_id = $1 ` const values = [data.correlation_id] const queryPromise = this.pgClient.query>(query, values) const result = await this.getFirst(queryPromise); return result } /** * Operaciones que no han concluido con filtros de limit, offset y start * @param options () * @returns */ public async getPendingOrders(options?: { limit?: number, offset?: number, start?: number // id de inicio }) { const client = await this.pgClient.connect(); const offsetFragment = (options?.offset != undefined) ? `OFFSET ${options?.offset}` : "" const limitFragment = (options?.limit != undefined) ? `LIMIT ${options?.limit}` : "" const startFragment = (options?.start != undefined) ? `AND id >= ${options?.start}` : "" const query = ` SELECT * FROM order_tracking WHERE finish_date IS NULL ${startFragment} ORDER BY start_date ASC ${offsetFragment} ${limitFragment} ` const values: string[] = [] const queryPromise = client.query>(query, values) const result = await this.getAll(queryPromise) client.release() return result } public async createOrder(data: CreateOrderDTO): Promise>> { const client = await this.pgClient.connect(); await client.query("BEGIN") const query = ` INSERT INTO order_tracking ( correlation_id, exchange, routing_key, order_type, payload, status, webhook_host, webhook_endpoint ) VALUES ( $1, -- correlation_id $2, -- exchange $3, -- routing_key $4, -- order_type (ej: 'activate') $5, -- payload (json object) 'pending', $6, -- webhook_host, $7 -- webhook_endpoint ) RETURNING id, correlation_id, exchange, routing_key, order_type, payload, status, webhook_host, webhook_endpoint ` const values = [data.correlation_id, data.exchange, data.routing_key, data.order_type, data.payload, data.webhook_host, data.webhook_endpoint] const queryPromise = client.query>(query, values) // TODO comprobar si start_date convierte a Date por defecto, añadir enum de status const result = await this.getFirst(queryPromise) if (result.error == undefined) { await client.query("COMMIT") } else { await client.query("ROLLBACK") } client.release() return result } /** * Actualizacion "correcta" del estado de un order */ public async updateOrder(args: UpdateOrderDTO): Promise>> { // XOR id o correlation_id assert((args.id != undefined) != (args.correlation_id != undefined)) const client = await this.pgClient.connect(); await client.query('BEGIN'); const idType = ('id' in args) ? "id" : "correlation_id" const idValue = (args.id != undefined) ? args.id : args.correlation_id // 1. Se consulta la order de base const qCurrentOrder = ` SELECT * FROM order_tracking WHERE ${idType} = $1 ` const vCurrentOrder = [idValue] const currentOrderResult = await this.getFirst(client.query>(qCurrentOrder, vCurrentOrder)) const orderId = currentOrderResult.data?.id if (orderId == undefined) { await client.query("ROLLBACK") client.release() return { error: "El order a actualizar no existe " + idType + ": " + idValue } } if (currentOrderResult.error != undefined) { await client.query("ROLLBACK") client.release() return currentOrderResult } const currentOrder = currentOrderResult.data! // 2. Si todo ok se actualiza el order const uOrderTracking = ` UPDATE order_tracking SET status = $2::order_status, update_date = (now() at time zone 'utc') WHERE id = $1 RETURNING id, status, update_date; ` const vOrderTracking = [orderId, args.new_status] const updatedOrderResult = await this.getFirst( client.query<{ id: number, status: string, update_date: string }>(uOrderTracking, vOrderTracking) ) if (updatedOrderResult.error != undefined) { await client.query("ROLLBACK") client.release() return updatedOrderResult } // 3. Si todo ok se añade una entradad de order_history con los datos modificados const iOrderHistory = ` INSERT INTO order_history ( order_id, previous_status, new_status, change_reason ) VALUES ( $1, -- ID de la orden $2, -- Estado anterior $3::order_status, -- Nuevo estado $4 -- Razón (ej: "Consumer processed successfully" o "RabbitMQ NACK") ) RETURNING id; ` const vOrderHistory = [orderId, currentOrder.status, args.new_status, args.reason] const newOrderHistoryResult = await this.getFirst( client.query<{ id: number }>(iOrderHistory, vOrderHistory) ) if (newOrderHistoryResult.error != undefined) { await client.query("ROLLBACK") client.release() return newOrderHistoryResult } await client.query("COMMIT") const updatedOrder = await this.getFirst( client.query>(qCurrentOrder, vCurrentOrder) ) client.release() return updatedOrder } public async finishOrder(args: FinishOrderDTO) { const client = await this.pgClient.connect(); assert((args.id != undefined) != (args.correlation_id != undefined)) await client.query('BEGIN'); const idType = ('id' in args) ? "id" : "correlation_id" const idValue = (args.id != undefined) ? args.id : args.correlation_id // 1. Se consulta la order de base const qCurrentOrder = ` SELECT * FROM order_tracking WHERE ${idType} = $1 ` const vCurrentOrder = [idValue] const currentOrderResult = await this.getFirst(client.query>(qCurrentOrder, vCurrentOrder)) const orderId = currentOrderResult.data?.id if (orderId == undefined) { await client.query("ROLLBACK") client.release() return { error: "El order a actualizar no existe " + idType + ": " + idValue } } if (currentOrderResult.error != undefined) { await client.query("ROLLBACK") client.release() return currentOrderResult } const currentOrder = currentOrderResult.data! // 2. Si todo ok se actualiza el order const uOrderTracking = ` UPDATE order_tracking SET status = 'finished', update_date = now(), finish_date = now() WHERE id = $1 RETURNING id, status, update_date; ` const vOrderTracking = [orderId] const updatedOrderResult = await this.getFirst( client.query<{ id: number, status: string, update_date: string }>(uOrderTracking, vOrderTracking) ) if (updatedOrderResult.error != undefined) { await client.query("ROLLBACK") client.release() return updatedOrderResult } // 3. Si todo ok se guardo un nuevo registro de history const iOrderHistory = ` INSERT INTO order_history ( order_id, previous_status, new_status, change_reason ) VALUES ( $1, -- ID de la orden $2, -- Estado anterior 'finished', $3 -- Siempre "finished successfully" a no ser que se especifique otra razón ) RETURNING id; ` const vOrderHistory = [orderId, currentOrder.status, args.reason ?? "finished successfully"] const newOrderHistoryResult = await this.getFirst( client.query<{ id: number }>(iOrderHistory, vOrderHistory) ) if (newOrderHistoryResult.error != undefined) { await client.query("ROLLBACK") client.release() return newOrderHistoryResult } await client.query("COMMIT") const updatedOrder = await this.getFirst( client.query>(qCurrentOrder, vCurrentOrder) ) client.release() return updatedOrder } // TODO: tema de poder filtrar por correlation_id public async errorOrder(args: ErrorOrderDTO): Promise>> { const client = await this.pgClient.connect(); await client.query('BEGIN'); const idType = ('id' in args) ? "id" : "correlation_id" const idValue = (args.id != undefined) ? args.id : args.correlation_id // 1. Se consulta la order de base const qCurrentOrder = ` SELECT * FROM order_tracking WHERE ${idType} = $1 ` const vCurrentOrder = [idValue] const currentOrderResult = await this.getFirst(client.query>(qCurrentOrder, vCurrentOrder)) if (currentOrderResult.error != undefined) { await client.query("ROLLBACK") client.release() return currentOrderResult } const id = currentOrderResult.data.id // Saco el id para evitar busacr por correlation_id que es mas lento const currentOrder = currentOrderResult.data! console.log("Current Order", currentOrder) // 3. Si todo ok se actualiza el order // Si el status es dlx se asume que ha terminado y no va a reintentarse // Si es failed se asume que se ha movido a la cola de delay y en algún momento se va a reintentar const uOrderTracking = ` UPDATE order_tracking SET status = $2::order_status, update_date = (now() at time zone 'utc'), finish_date = CASE WHEN $2::order_status = 'dlx' THEN (now() at time zone 'utc') ELSE null END, retry_count = retry_count + 1, error_message = $3, error_stacktrace = $4 WHERE id = $1 RETURNING id, status, update_date; ` const vOrderTracking = [id, args.status, args.error, args.stackTrace] const updatedOrderResult = await this.getFirst( client.query<{ id: number, status: string, update_date: string }>(uOrderTracking, vOrderTracking) ) if (updatedOrderResult.error != undefined) { await client.query("ROLLBACK") client.release() return updatedOrderResult } // 3. Si todo ok se guardo un nuevo registro de history const iOrderHistory = ` INSERT INTO order_history ( order_id, previous_status, new_status, change_reason ) VALUES ( $1, -- ID de la orden $2, -- Estado anterior $3::order_status, -- En este caso particular 'dlx' o 'failed' $4 -- En este caso el motivo de fallo completo ) RETURNING id; ` const vOrderHistory = [currentOrder.id, currentOrder.status, args.status, args.reason] const newOrderHistoryResult = await this.getFirst( client.query<{ id: number }>(iOrderHistory, vOrderHistory) ) if (newOrderHistoryResult.error != undefined) { await client.query("ROLLBACK") client.release() return newOrderHistoryResult } await client.query("COMMIT") const updatedOrder = await this.getFirst( client.query>(qCurrentOrder, vCurrentOrder) ) client.release() return updatedOrder } }