Files

521 lines
15 KiB
TypeScript
Raw Permalink Normal View History

/**
* TODO: Usar
*/
import { PoolClient, QueryResult, QueryResultRow } from "pg";
2026-04-27 09:33:55 +02:00
import { CreateOrderDTO, ErrorOrderDTO, FinishOrderDTO, OrderQuery, OrderTracking, UpdateOrderDTO } from "../domain/Order.js";
import { Result, tryCatch } from "../domain/Result.js";
import { PgClient } from "./PgClient.js";
import assert from "node:assert";
2026-02-23 13:35:36 +01:00
/**
* Agrupa todas las operaciones de *Order*.
* Las *Order* son seguimientos de operaciones que han entrado correctamente a cualquier cola
* de mensajes independientemente del pais/empresa objetivo de la tarjeta.
*
* Todas las operaciones devuelven un tipo Result<Error,Data> para gestionar los errores
* de acceso a la BDD, para las operaciones correctas se devuleve Error = undefined, para
* las erroneas Data = undefined.
*/
export class OrderRepository {
constructor(
private readonly pgClient: PgClient,
) {
}
/**
* Comprobacion de la query y devolucion del primer resulado
* Garantiza la gestion de errores
*/
private async getFirst<T extends QueryResultRow>(queryPromise: Promise<QueryResult<T>>) {
try {
const queryResult = await queryPromise
return <Result<string, T>>{
data: queryResult.rows[0]
}
} catch (e) {
return <Result<string, T>>{
error: e as string
}
}
}
/**
* Se asume que se va a devolver una lista del tipo T
*/
private async getAll<T extends QueryResultRow>(queryPromise: Promise<QueryResult<T>>) {
try {
const queryResult = await queryPromise
return <Result<string, T[]>>{
data: queryResult.rows
}
} catch (e) {
return <Result<string, T[]>>{
error: e as string
}
}
}
2026-04-27 09:33:55 +02:00
/**
* Mapeo de prefijos a operadores SQL
*/
private OPERATOR_MAP: Record<string, string> = {
"-eq": "=",
"-neq": "!=",
"-gt": ">",
"-gte": ">=",
"-lt": "<",
"-lte": "<=",
"-like": "LIKE",
};
/**
* Tabla general para sacar datos de la tabla en base a unas condiciones
* TODO:
* - Dar la opción de generar los campos a devolver en vez de *
* - Garantizar el numero de parametros de respuesta
*/
private generateTableQuery(table: string, query: OrderQuery) {
const { conditions, limit, offset } = query;
const whereClauses: string[] = [];
const queryValues: any[] = [];
let paramIndex = 1; // Para los parametros de PostgreSQL ($1, $2) (que empiezan por 1)
2026-04-27 09:33:55 +02:00
for (const [column, filter] of Object.entries(conditions)) {
const match = filter.match(/^(-\w+)\s+(.+)$/);
if (match) {
const [_, prefix, value] = match;
const operator = this.OPERATOR_MAP[prefix];
if (operator) {
// Eliminación de comillas
const cleanValue = value.replace(/^'|'$/g, "");
whereClauses.push(`${column} ${operator} $${paramIndex}`);
queryValues.push(operator === "LIKE" ? `%${cleanValue}%` : cleanValue);
paramIndex++;
}
}
}
// 2. Query completa
// TODO: Cambair el * por parametros
let sql = `SELECT * FROM ${table}`;
if (whereClauses.length > 0) {
sql += ` WHERE ${whereClauses.join(" AND ")}`;
}
// 3. Paginacion
if (limit !== undefined) {
sql += ` LIMIT ${Number(limit)}`;
}
if (offset !== undefined) {
sql += ` OFFSET ${Number(offset)}`;
}
return {
sql,
values: queryValues,
};
}
public async getOrdersByQuery(args: OrderQuery) {
const query = this.generateTableQuery('order_tracking', args)
const queryPromise = this.pgClient.query<OrderTracking<Record<string, any>>>(query.sql, query.values)
const result = await this.getAll(queryPromise)
}
/**
2026-02-23 13:35:36 +01:00
* El tipo <T> representa el contenido del mensaje de los order
*/
2026-02-23 13:35:36 +01:00
public async getOrderById<T>(data: { id: number }): Promise<Result<string, OrderTracking<T>>> {
const query = `
SELECT * FROM order_tracking
WHERE id = $1
2026-02-23 13:35:36 +01:00
`
const values = [data.id]
2026-02-23 13:35:36 +01:00
const queryPromise = this.pgClient.query<OrderTracking<T>>(query, values)
const result = await this.getFirst(queryPromise);
return result
}
/**
* Busqueda según la id de RabbitMq
*/
public async getOrderByQueueId<T>(data: { correlation_id: string }, pool?: PoolClient) {
const query = `
SELECT * FROM order_tracking
WHERE correlation_id = $1
`
const values = [data.correlation_id]
2026-02-23 13:35:36 +01:00
const queryPromise = this.pgClient.query<OrderTracking<T>>(query, values)
const result = await this.getFirst(queryPromise);
return result
}
/**
* Operaciones que no han concluido con filtros de limit, offset y start
* @param options ()
* @returns
*/
2026-02-23 13:35:36 +01:00
public async getPendingOrders<T>(options?: {
limit?: number,
offset?: number,
start?: number // id de inicio
2026-02-23 13:35:36 +01:00
}) {
const client = await this.pgClient.connect();
const offsetFragment = (options?.offset != undefined) ? `OFFSET ${options?.offset}` : ""
const limitFragment = (options?.limit != undefined) ? `LIMIT ${options?.limit}` : ""
const startFragment = (options?.start != undefined) ? `AND id >= ${options?.start}` : ""
const query = `
SELECT * FROM order_tracking
WHERE finish_date IS NULL
${startFragment}
ORDER BY start_date ASC
${offsetFragment}
${limitFragment}
`
const values: string[] = []
2026-02-23 13:35:36 +01:00
const queryPromise = client.query<OrderTracking<T>>(query, values)
const result = await this.getAll(queryPromise)
client.release()
return result
}
public async createOrder<T extends any>(data: CreateOrderDTO): Promise<Result<string, OrderTracking<T>>> {
const client = await this.pgClient.connect();
await client.query("BEGIN")
const query = `
INSERT INTO order_tracking (
correlation_id,
exchange,
routing_key,
order_type,
payload,
status,
webhook_host,
webhook_endpoint
)
VALUES (
$1, -- correlation_id
$2, -- exchange
$3, -- routing_key
$4, -- order_type (ej: 'activate')
$5, -- payload (json object)
'pending',
$6, -- webhook_host,
$7 -- webhook_endpoint
)
RETURNING
id,
correlation_id,
exchange,
routing_key,
order_type,
payload,
status,
webhook_host,
webhook_endpoint
`
const values = [data.correlation_id, data.exchange, data.routing_key, data.order_type, data.payload, data.webhook_host, data.webhook_endpoint]
const queryPromise = client.query<OrderTracking<T>>(query, values)
2026-02-23 13:35:36 +01:00
// TODO comprobar si start_date convierte a Date por defecto, añadir enum de status
const result = await this.getFirst(queryPromise)
if (result.error == undefined) {
await client.query("COMMIT")
} else {
await client.query("ROLLBACK")
}
client.release()
return result
}
/**
* Actualizacion "correcta" del estado de un order
*/
public async updateOrder(args: UpdateOrderDTO): Promise<Result<string, OrderTracking<any>>> {
// XOR id o correlation_id
assert((args.id != undefined) != (args.correlation_id != undefined))
const client = await this.pgClient.connect();
await client.query('BEGIN');
2026-02-27 11:16:45 +01:00
const idType = ('id' in args) ? "id" : "correlation_id"
const idValue = (args.id != undefined) ? args.id : args.correlation_id
// 1. Se consulta la order de base
const qCurrentOrder = `
SELECT * FROM order_tracking
WHERE ${idType} = $1
`
const vCurrentOrder = [idValue]
const currentOrderResult = await this.getFirst(client.query<OrderTracking<any>>(qCurrentOrder, vCurrentOrder))
const orderId = currentOrderResult.data?.id
if (orderId == undefined) {
2026-04-08 14:47:57 +02:00
await client.query("ROLLBACK")
client.release()
return {
error: "El order a actualizar no existe " + idType + ": " + idValue
}
}
if (currentOrderResult.error != undefined) {
await client.query("ROLLBACK")
client.release()
return currentOrderResult
}
const currentOrder = currentOrderResult.data!
// 2. Si todo ok se actualiza el order
const uOrderTracking = `
UPDATE order_tracking
SET
status = $2::order_status,
update_date = (now() at time zone 'utc')
WHERE id = $1
RETURNING id, status, update_date;
`
const vOrderTracking = [orderId, args.new_status]
const updatedOrderResult = await this.getFirst(
client.query<{ id: number, status: string, update_date: string }>(uOrderTracking, vOrderTracking)
)
if (updatedOrderResult.error != undefined) {
await client.query("ROLLBACK")
client.release()
return updatedOrderResult
}
// 3. Si todo ok se añade una entradad de order_history con los datos modificados
const iOrderHistory = `
INSERT INTO order_history (
order_id,
previous_status,
new_status,
change_reason
)
VALUES (
$1, -- ID de la orden
$2, -- Estado anterior
$3::order_status, -- Nuevo estado
$4 -- Razón (ej: "Consumer processed successfully" o "RabbitMQ NACK")
)
RETURNING id;
`
const vOrderHistory = [orderId, currentOrder.status, args.new_status, args.reason]
const newOrderHistoryResult = await this.getFirst(
client.query<{ id: number }>(iOrderHistory, vOrderHistory)
)
if (newOrderHistoryResult.error != undefined) {
await client.query("ROLLBACK")
client.release()
return newOrderHistoryResult
}
await client.query("COMMIT")
const updatedOrder = await this.getFirst(
client.query<OrderTracking<any>>(qCurrentOrder, vCurrentOrder)
)
client.release()
return updatedOrder
}
2026-02-27 11:16:45 +01:00
public async finishOrder(args: FinishOrderDTO) {
const client = await this.pgClient.connect();
2026-02-27 11:16:45 +01:00
assert((args.id != undefined) != (args.correlation_id != undefined))
await client.query('BEGIN');
2026-02-27 11:16:45 +01:00
const idType = ('id' in args) ? "id" : "correlation_id"
const idValue = (args.id != undefined) ? args.id : args.correlation_id
// 1. Se consulta la order de base
const qCurrentOrder = `
SELECT * FROM order_tracking
2026-02-27 11:16:45 +01:00
WHERE ${idType} = $1
`
2026-02-27 11:16:45 +01:00
const vCurrentOrder = [idValue]
const currentOrderResult = await this.getFirst(client.query<OrderTracking<any>>(qCurrentOrder, vCurrentOrder))
2026-02-27 11:16:45 +01:00
const orderId = currentOrderResult.data?.id
if (orderId == undefined) {
2026-04-08 14:47:57 +02:00
await client.query("ROLLBACK")
client.release()
2026-02-27 11:16:45 +01:00
return {
error: "El order a actualizar no existe " + idType + ": " + idValue
}
}
if (currentOrderResult.error != undefined) {
await client.query("ROLLBACK")
client.release()
return currentOrderResult
}
const currentOrder = currentOrderResult.data!
// 2. Si todo ok se actualiza el order
const uOrderTracking = `
UPDATE order_tracking
SET
status = 'finished',
2026-04-15 15:11:13 +02:00
update_date = now(),
finish_date = now()
WHERE id = $1
RETURNING id, status, update_date;
`
2026-02-27 11:16:45 +01:00
const vOrderTracking = [orderId]
const updatedOrderResult = await this.getFirst(
client.query<{ id: number, status: string, update_date: string }>(uOrderTracking, vOrderTracking)
)
if (updatedOrderResult.error != undefined) {
await client.query("ROLLBACK")
client.release()
return updatedOrderResult
}
// 3. Si todo ok se guardo un nuevo registro de history
const iOrderHistory = `
INSERT INTO order_history (
order_id,
previous_status,
new_status,
change_reason
)
VALUES (
$1, -- ID de la orden
$2, -- Estado anterior
'finished',
$3 -- Siempre "finished successfully" a no ser que se especifique otra razón
)
RETURNING id;
`
const vOrderHistory = [orderId, currentOrder.status, args.reason ?? "finished successfully"]
const newOrderHistoryResult = await this.getFirst(
client.query<{ id: number }>(iOrderHistory, vOrderHistory)
)
if (newOrderHistoryResult.error != undefined) {
await client.query("ROLLBACK")
client.release()
return newOrderHistoryResult
}
await client.query("COMMIT")
const updatedOrder = await this.getFirst(
client.query<OrderTracking<any>>(qCurrentOrder, vCurrentOrder)
)
client.release()
return updatedOrder
}
// TODO: tema de poder filtrar por correlation_id
public async errorOrder(args: ErrorOrderDTO): Promise<Result<string, OrderTracking<any>>> {
const client = await this.pgClient.connect();
await client.query('BEGIN');
const idType = ('id' in args) ? "id" : "correlation_id"
const idValue = (args.id != undefined) ? args.id : args.correlation_id
// 1. Se consulta la order de base
const qCurrentOrder = `
SELECT * FROM order_tracking
WHERE ${idType} = $1
`
const vCurrentOrder = [idValue]
const currentOrderResult = await this.getFirst(client.query<OrderTracking<any>>(qCurrentOrder, vCurrentOrder))
if (currentOrderResult.error != undefined) {
await client.query("ROLLBACK")
client.release()
return currentOrderResult
}
const id = currentOrderResult.data.id // Saco el id para evitar busacr por correlation_id que es mas lento
const currentOrder = currentOrderResult.data!
2026-04-22 13:31:24 +02:00
//console.log("Current Order", currentOrder)
// 3. Si todo ok se actualiza el order
// Si el status es dlx se asume que ha terminado y no va a reintentarse
// Si es failed se asume que se ha movido a la cola de delay y en algún momento se va a reintentar
const uOrderTracking = `
UPDATE order_tracking
SET
status = $2::order_status,
update_date = (now() at time zone 'utc'),
finish_date = CASE WHEN $2::order_status = 'dlx' THEN (now() at time zone 'utc') ELSE null END,
retry_count = retry_count + 1,
error_message = $3,
error_stacktrace = $4
WHERE id = $1
RETURNING id, status, update_date;
`
const vOrderTracking = [id, args.status, args.error, args.stackTrace]
const updatedOrderResult = await this.getFirst(
client.query<{ id: number, status: string, update_date: string }>(uOrderTracking, vOrderTracking)
)
2026-04-22 12:59:23 +02:00
console.log("updatedOrderResult", updatedOrderResult)
if (updatedOrderResult.error != undefined) {
await client.query("ROLLBACK")
client.release()
return updatedOrderResult
}
// 3. Si todo ok se guardo un nuevo registro de history
const iOrderHistory = `
INSERT INTO order_history (
order_id,
previous_status,
new_status,
change_reason
)
VALUES (
$1, -- ID de la orden
$2, -- Estado anterior
$3::order_status, -- En este caso particular 'dlx' o 'failed'
$4 -- En este caso el motivo de fallo completo
)
RETURNING id;
`
2026-04-21 17:39:09 +02:00
const vOrderHistory = [currentOrder.id, currentOrder.status, args.status, args.reason]
const newOrderHistoryResult = await this.getFirst(
client.query<{ id: number }>(iOrderHistory, vOrderHistory)
)
if (newOrderHistoryResult.error != undefined) {
await client.query("ROLLBACK")
client.release()
return newOrderHistoryResult
}
await client.query("COMMIT")
const updatedOrder = await this.getFirst(
client.query<OrderTracking<any>>(qCurrentOrder, vCurrentOrder)
)
client.release()
return updatedOrder
}
}