448 lines
13 KiB
TypeScript
448 lines
13 KiB
TypeScript
/**
|
|
* TODO: Usar
|
|
*/
|
|
import { PoolClient, QueryResult, QueryResultRow } from "pg";
|
|
import { CreateOrderDTO, ErrorOrderDTO, FinishOrderDTO, OrderTracking, UpdateOrderDTO } from "../domain/Order.js";
|
|
import { Result, tryCatch } from "../domain/Result.js";
|
|
import { PgClient } from "./PgClient.js";
|
|
import assert from "node:assert";
|
|
|
|
/**
|
|
* Agrupa todas las operaciones de *Order*.
|
|
* Las *Order* son seguimientos de operaciones que han entrado correctamente a cualquier cola
|
|
* de mensajes independientemente del pais/empresa objetivo de la tarjeta.
|
|
*
|
|
* Todas las operaciones devuelven un tipo Result<Error,Data> para gestionar los errores
|
|
* de acceso a la BDD, para las operaciones correctas se devuleve Error = undefined, para
|
|
* las erroneas Data = undefined.
|
|
*/
|
|
export class OrderRepository {
|
|
constructor(
|
|
private readonly pgClient: PgClient,
|
|
) {
|
|
}
|
|
|
|
/**
|
|
* Comprobacion de la query y devolucion del primer resulado
|
|
* Garantiza la gestion de errores
|
|
*/
|
|
private async getFirst<T extends QueryResultRow>(queryPromise: Promise<QueryResult<T>>) {
|
|
try {
|
|
const queryResult = await queryPromise
|
|
return <Result<string, T>>{
|
|
data: queryResult.rows[0]
|
|
}
|
|
} catch (e) {
|
|
return <Result<string, T>>{
|
|
error: e as string
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Se asume que se va a devolver una lista del tipo T
|
|
*/
|
|
private async getAll<T extends QueryResultRow>(queryPromise: Promise<QueryResult<T>>) {
|
|
try {
|
|
const queryResult = await queryPromise
|
|
return <Result<string, T[]>>{
|
|
data: queryResult.rows
|
|
}
|
|
} catch (e) {
|
|
return <Result<string, T[]>>{
|
|
error: e as string
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
* El tipo <T> representa el contenido del mensaje de los order
|
|
*/
|
|
public async getOrderById<T>(data: { id: number }): Promise<Result<string, OrderTracking<T>>> {
|
|
const query = `
|
|
SELECT * FROM order_tracking
|
|
WHERE id = $1
|
|
`
|
|
const values = [data.id]
|
|
const queryPromise = this.pgClient.query<OrderTracking<T>>(query, values)
|
|
const result = await this.getFirst(queryPromise);
|
|
return result
|
|
}
|
|
|
|
/**
|
|
* Busqueda según la id de RabbitMq
|
|
*/
|
|
public async getOrderByQueueId<T>(data: { correlation_id: string }, pool?: PoolClient) {
|
|
const query = `
|
|
SELECT * FROM order_tracking
|
|
WHERE correlation_id = $1
|
|
`
|
|
const values = [data.correlation_id]
|
|
const queryPromise = this.pgClient.query<OrderTracking<T>>(query, values)
|
|
const result = await this.getFirst(queryPromise);
|
|
return result
|
|
}
|
|
|
|
/**
|
|
* Operaciones que no han concluido con filtros de limit, offset y start
|
|
* @param options ()
|
|
* @returns
|
|
*/
|
|
public async getPendingOrders<T>(options?: {
|
|
limit?: number,
|
|
offset?: number,
|
|
start?: number // id de inicio
|
|
}) {
|
|
const client = await this.pgClient.connect();
|
|
|
|
const offsetFragment = (options?.offset != undefined) ? `OFFSET ${options?.offset}` : ""
|
|
const limitFragment = (options?.limit != undefined) ? `LIMIT ${options?.limit}` : ""
|
|
const startFragment = (options?.start != undefined) ? `AND id >= ${options?.start}` : ""
|
|
|
|
const query = `
|
|
SELECT * FROM order_tracking
|
|
WHERE finish_date IS NULL
|
|
${startFragment}
|
|
ORDER BY start_date ASC
|
|
${offsetFragment}
|
|
${limitFragment}
|
|
`
|
|
const values: string[] = []
|
|
const queryPromise = client.query<OrderTracking<T>>(query, values)
|
|
const result = await this.getAll(queryPromise)
|
|
client.release()
|
|
return result
|
|
}
|
|
|
|
public async createOrder<T extends any>(data: CreateOrderDTO): Promise<Result<string, OrderTracking<T>>> {
|
|
const client = await this.pgClient.connect();
|
|
await client.query("BEGIN")
|
|
const query = `
|
|
INSERT INTO order_tracking (
|
|
correlation_id,
|
|
exchange,
|
|
routing_key,
|
|
order_type,
|
|
payload,
|
|
status,
|
|
webhook_host,
|
|
webhook_endpoint
|
|
)
|
|
VALUES (
|
|
$1, -- correlation_id
|
|
$2, -- exchange
|
|
$3, -- routing_key
|
|
$4, -- order_type (ej: 'activate')
|
|
$5, -- payload (json object)
|
|
'pending',
|
|
$6, -- webhook_host,
|
|
$7 -- webhook_endpoint
|
|
)
|
|
RETURNING
|
|
id,
|
|
correlation_id,
|
|
exchange,
|
|
routing_key,
|
|
order_type,
|
|
payload,
|
|
status,
|
|
webhook_host,
|
|
webhook_endpoint
|
|
`
|
|
const values = [data.correlation_id, data.exchange, data.routing_key, data.order_type, data.payload, data.webhook_host, data.webhook_endpoint]
|
|
const queryPromise = client.query<OrderTracking<T>>(query, values)
|
|
// TODO comprobar si start_date convierte a Date por defecto, añadir enum de status
|
|
const result = await this.getFirst(queryPromise)
|
|
|
|
if (result.error == undefined) {
|
|
await client.query("COMMIT")
|
|
} else {
|
|
await client.query("ROLLBACK")
|
|
}
|
|
|
|
client.release()
|
|
return result
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
* Actualizacion "correcta" del estado de un order
|
|
*/
|
|
public async updateOrder(args: UpdateOrderDTO): Promise<Result<string, OrderTracking<any>>> {
|
|
// XOR id o correlation_id
|
|
assert((args.id != undefined) != (args.correlation_id != undefined))
|
|
const client = await this.pgClient.connect();
|
|
await client.query('BEGIN');
|
|
|
|
const idType = ('id' in args) ? "id" : "correlation_id"
|
|
const idValue = (args.id != undefined) ? args.id : args.correlation_id
|
|
|
|
// 1. Se consulta la order de base
|
|
const qCurrentOrder = `
|
|
SELECT * FROM order_tracking
|
|
WHERE ${idType} = $1
|
|
`
|
|
const vCurrentOrder = [idValue]
|
|
|
|
const currentOrderResult = await this.getFirst(client.query<OrderTracking<any>>(qCurrentOrder, vCurrentOrder))
|
|
const orderId = currentOrderResult.data?.id
|
|
|
|
if (orderId == undefined) {
|
|
await client.query("ROLLBACK")
|
|
client.release()
|
|
return {
|
|
error: "El order a actualizar no existe " + idType + ": " + idValue
|
|
}
|
|
}
|
|
|
|
if (currentOrderResult.error != undefined) {
|
|
await client.query("ROLLBACK")
|
|
client.release()
|
|
return currentOrderResult
|
|
}
|
|
|
|
const currentOrder = currentOrderResult.data!
|
|
|
|
// 2. Si todo ok se actualiza el order
|
|
const uOrderTracking = `
|
|
UPDATE order_tracking
|
|
SET
|
|
status = $2::order_status,
|
|
update_date = (now() at time zone 'utc')
|
|
WHERE id = $1
|
|
RETURNING id, status, update_date;
|
|
`
|
|
const vOrderTracking = [orderId, args.new_status]
|
|
const updatedOrderResult = await this.getFirst(
|
|
client.query<{ id: number, status: string, update_date: string }>(uOrderTracking, vOrderTracking)
|
|
)
|
|
|
|
if (updatedOrderResult.error != undefined) {
|
|
await client.query("ROLLBACK")
|
|
client.release()
|
|
return updatedOrderResult
|
|
}
|
|
|
|
// 3. Si todo ok se añade una entradad de order_history con los datos modificados
|
|
const iOrderHistory = `
|
|
INSERT INTO order_history (
|
|
order_id,
|
|
previous_status,
|
|
new_status,
|
|
change_reason
|
|
)
|
|
VALUES (
|
|
$1, -- ID de la orden
|
|
$2, -- Estado anterior
|
|
$3::order_status, -- Nuevo estado
|
|
$4 -- Razón (ej: "Consumer processed successfully" o "RabbitMQ NACK")
|
|
)
|
|
RETURNING id;
|
|
`
|
|
const vOrderHistory = [orderId, currentOrder.status, args.new_status, args.reason]
|
|
const newOrderHistoryResult = await this.getFirst(
|
|
client.query<{ id: number }>(iOrderHistory, vOrderHistory)
|
|
)
|
|
|
|
if (newOrderHistoryResult.error != undefined) {
|
|
await client.query("ROLLBACK")
|
|
client.release()
|
|
return newOrderHistoryResult
|
|
}
|
|
|
|
await client.query("COMMIT")
|
|
|
|
const updatedOrder = await this.getFirst(
|
|
client.query<OrderTracking<any>>(qCurrentOrder, vCurrentOrder)
|
|
)
|
|
|
|
client.release()
|
|
return updatedOrder
|
|
}
|
|
|
|
public async finishOrder(args: FinishOrderDTO) {
|
|
const client = await this.pgClient.connect();
|
|
assert((args.id != undefined) != (args.correlation_id != undefined))
|
|
await client.query('BEGIN');
|
|
|
|
const idType = ('id' in args) ? "id" : "correlation_id"
|
|
const idValue = (args.id != undefined) ? args.id : args.correlation_id
|
|
|
|
// 1. Se consulta la order de base
|
|
const qCurrentOrder = `
|
|
SELECT * FROM order_tracking
|
|
WHERE ${idType} = $1
|
|
`
|
|
const vCurrentOrder = [idValue]
|
|
|
|
const currentOrderResult = await this.getFirst(client.query<OrderTracking<any>>(qCurrentOrder, vCurrentOrder))
|
|
const orderId = currentOrderResult.data?.id
|
|
|
|
if (orderId == undefined) {
|
|
await client.query("ROLLBACK")
|
|
client.release()
|
|
return {
|
|
error: "El order a actualizar no existe " + idType + ": " + idValue
|
|
}
|
|
}
|
|
|
|
if (currentOrderResult.error != undefined) {
|
|
await client.query("ROLLBACK")
|
|
client.release()
|
|
return currentOrderResult
|
|
}
|
|
|
|
const currentOrder = currentOrderResult.data!
|
|
|
|
// 2. Si todo ok se actualiza el order
|
|
const uOrderTracking = `
|
|
UPDATE order_tracking
|
|
SET
|
|
status = 'finished',
|
|
update_date = now(),
|
|
finish_date = now()
|
|
WHERE id = $1
|
|
RETURNING id, status, update_date;
|
|
`
|
|
const vOrderTracking = [orderId]
|
|
const updatedOrderResult = await this.getFirst(
|
|
client.query<{ id: number, status: string, update_date: string }>(uOrderTracking, vOrderTracking)
|
|
)
|
|
|
|
if (updatedOrderResult.error != undefined) {
|
|
await client.query("ROLLBACK")
|
|
client.release()
|
|
return updatedOrderResult
|
|
}
|
|
|
|
// 3. Si todo ok se guardo un nuevo registro de history
|
|
const iOrderHistory = `
|
|
INSERT INTO order_history (
|
|
order_id,
|
|
previous_status,
|
|
new_status,
|
|
change_reason
|
|
)
|
|
VALUES (
|
|
$1, -- ID de la orden
|
|
$2, -- Estado anterior
|
|
'finished',
|
|
$3 -- Siempre "finished successfully" a no ser que se especifique otra razón
|
|
)
|
|
RETURNING id;
|
|
`
|
|
const vOrderHistory = [orderId, currentOrder.status, args.reason ?? "finished successfully"]
|
|
const newOrderHistoryResult = await this.getFirst(
|
|
client.query<{ id: number }>(iOrderHistory, vOrderHistory)
|
|
)
|
|
|
|
if (newOrderHistoryResult.error != undefined) {
|
|
await client.query("ROLLBACK")
|
|
client.release()
|
|
return newOrderHistoryResult
|
|
}
|
|
|
|
await client.query("COMMIT")
|
|
|
|
const updatedOrder = await this.getFirst(
|
|
client.query<OrderTracking<any>>(qCurrentOrder, vCurrentOrder)
|
|
)
|
|
|
|
client.release()
|
|
return updatedOrder
|
|
}
|
|
|
|
// TODO: tema de poder filtrar por correlation_id
|
|
public async errorOrder(args: ErrorOrderDTO): Promise<Result<string, OrderTracking<any>>> {
|
|
const client = await this.pgClient.connect();
|
|
await client.query('BEGIN');
|
|
|
|
const idType = ('id' in args) ? "id" : "correlation_id"
|
|
const idValue = (args.id != undefined) ? args.id : args.correlation_id
|
|
|
|
// 1. Se consulta la order de base
|
|
const qCurrentOrder = `
|
|
SELECT * FROM order_tracking
|
|
WHERE ${idType} = $1
|
|
`
|
|
const vCurrentOrder = [idValue]
|
|
|
|
const currentOrderResult = await this.getFirst(client.query<OrderTracking<any>>(qCurrentOrder, vCurrentOrder))
|
|
|
|
if (currentOrderResult.error != undefined) {
|
|
await client.query("ROLLBACK")
|
|
client.release()
|
|
return currentOrderResult
|
|
}
|
|
|
|
const id = currentOrderResult.data.id // Saco el id para evitar busacr por correlation_id que es mas lento
|
|
const currentOrder = currentOrderResult.data!
|
|
|
|
// 3. Si todo ok se actualiza el order
|
|
// Si el status es dlx se asume que ha terminado y no va a reintentarse
|
|
// Si es failed se asume que se ha movido a la cola de delay y en algún momento se va a reintentar
|
|
const uOrderTracking = `
|
|
UPDATE order_tracking
|
|
SET
|
|
status = $2::order_status,
|
|
update_date = (now() at time zone 'utc'),
|
|
finish_date = CASE WHEN $2::order_status = 'dlx' THEN (now() at time zone 'utc') ELSE null END,
|
|
retry_count = retry_count + 1,
|
|
error_message = $3,
|
|
error_stacktrace = $4
|
|
WHERE id = $1
|
|
RETURNING id, status, update_date;
|
|
`
|
|
const vOrderTracking = [id, args.status, args.error, args.stackTrace]
|
|
const updatedOrderResult = await this.getFirst(
|
|
client.query<{ id: number, status: string, update_date: string }>(uOrderTracking, vOrderTracking)
|
|
)
|
|
|
|
if (updatedOrderResult.error != undefined) {
|
|
await client.query("ROLLBACK")
|
|
client.release()
|
|
return updatedOrderResult
|
|
}
|
|
|
|
// 3. Si todo ok se guardo un nuevo registro de history
|
|
const iOrderHistory = `
|
|
INSERT INTO order_history (
|
|
order_id,
|
|
previous_status,
|
|
new_status,
|
|
change_reason
|
|
)
|
|
VALUES (
|
|
$1, -- ID de la orden
|
|
$2, -- Estado anterior
|
|
$3::order_status, -- En este caso particular 'dlx' o 'failed'
|
|
$4 -- En este caso el motivo de fallo completo
|
|
)
|
|
RETURNING id;
|
|
`
|
|
const vOrderHistory = [args.id, currentOrder.status, args.status, args.reason]
|
|
const newOrderHistoryResult = await this.getFirst(
|
|
client.query<{ id: number }>(iOrderHistory, vOrderHistory)
|
|
)
|
|
|
|
if (newOrderHistoryResult.error != undefined) {
|
|
await client.query("ROLLBACK")
|
|
client.release()
|
|
return newOrderHistoryResult
|
|
}
|
|
|
|
await client.query("COMMIT")
|
|
|
|
const updatedOrder = await this.getFirst(
|
|
client.query<OrderTracking<any>>(qCurrentOrder, vCurrentOrder)
|
|
)
|
|
|
|
client.release()
|
|
return updatedOrder
|
|
}
|
|
|
|
}
|