Files
sf-sim/packages/sim-shared/infrastructure/OrderRepository.ts

388 lines
11 KiB
TypeScript
Raw Normal View History

/**
* TODO: Usar
*/
import { PoolClient, QueryResult, QueryResultRow } from "pg";
import { CreateOrderDTO, OrderTracking } from "../domain/Order.js";
import { Result } from "../domain/Result.js";
import { PgClient } from "./PgClient.js";
export class OrderRepository {
constructor(
private readonly pgClient: PgClient
) {
}
/**
* Comprobacion de la query y devolucion del primer resulado
* Garantiza la gestion de errores
*/
private async getFirst<T extends QueryResultRow>(queryPromise: Promise<QueryResult<T>>) {
try {
const queryResult = await queryPromise
return <Result<string, T>>{
data: queryResult.rows[0]
}
} catch (e) {
return <Result<string, T>>{
error: e as string
}
}
}
/**
* Se asume que se va a devolver una lista del tipo T
*/
private async getAll<T extends QueryResultRow>(queryPromise: Promise<QueryResult<T>>) {
try {
const queryResult = await queryPromise
return <Result<string, T[]>>{
data: queryResult.rows
}
} catch (e) {
return <Result<string, T[]>>{
error: e as string
}
}
}
/**
* TODO: OrderTracking necestia un tipo para la estructura del mensaje almacenado
*/
public async getOrderById(data: { id: number }) {
const query = `
SELECT * FROM order_tracking
WHERE id = $1
`
const values = [data.id]
const queryPromise = this.pgClient.query<OrderTracking<any>>(query, values)
const result = await this.getFirst(queryPromise);
return result
}
/**
* Busqueda según la id de RabbitMq
*/
public async getOrderByQueueId(data: { correlation_id: string }, pool?: PoolClient) {
const query = `
SELECT * FROM order_tracking
WHERE correlation_id = $1
`
const values = [data.correlation_id]
const queryPromise = this.pgClient.query<OrderTracking<any>>(query, values)
const result = await this.getFirst(queryPromise);
return result
}
/**
* TODO:
* - variable para el limit
*/
public async getPendingOrders() {
const client = await this.pgClient.connect();
const query = `
SELECT * FROM order_tracking
WHERE finish_date IS NULL
ORDER BY start_date ASC
`
const values: string[] = []
const queryPromise = client.query(query, values)
const result = await this.getAll(queryPromise)
client.release()
return result
}
public async createOrder(data: CreateOrderDTO) {
const client = await this.pgClient.connect();
await client.query("BEGIN")
const query = `
INSERT INTO order_tracking (
correlation_id,
exchange,
routing_key,
order_type,
payload,
status
)
VALUES (
$1, -- correlation_id
$2, -- exchange
$3, -- routing_key
$4, -- order_type (ej: 'activate')
$5, -- payload (json object)
'pending'
)
RETURNING id, correlation_id, status, start_date;
`
const values = [data.correlation_id, data.exchange, data.routing_key, data.order_type, data.payload]
const queryPromise = client.query(query, values)
const result = await this.getFirst(queryPromise)
if (result.error == undefined) {
await client.query("COMMIT")
} else {
await client.query("ROLLBACK")
}
client.release()
return result
}
/**
* Actualizacion "correcta" del estado de un order
*/
public async updateOrder(args: {
id: number,
new_status: string,
reason: string
}) {
const client = await this.pgClient.connect();
await client.query('BEGIN');
// 1. Se consulta la order de base
const qCurrentOrder = `
SELECT * FROM order_tracking
WHERE id = $1
`
const vCurrentOrder = [args.id]
const currentOrderResult = await this.getFirst(client.query<OrderTracking<any>>(qCurrentOrder, vCurrentOrder))
if (currentOrderResult.error != undefined) {
await client.query("ROLLBACK")
client.release()
return currentOrderResult
}
const currentOrder = currentOrderResult.data!
// 2. Si todo ok se actualiza el order
const uOrderTracking = `
UPDATE order_tracking
SET
status = $2::order_status,
update_date = (now() at time zone 'utc')
WHERE id = $1
RETURNING id, status, update_date;
`
const vOrderTracking = [args.id, args.new_status]
const updatedOrderResult = await this.getFirst(
client.query<{ id: number, status: string, update_date: string }>(uOrderTracking, vOrderTracking)
)
if (updatedOrderResult.error != undefined) {
await client.query("ROLLBACK")
client.release()
return updatedOrderResult
}
// 3. Si todo ok se añade una entradad de order_history con los datos modificados
const iOrderHistory = `
INSERT INTO order_history (
order_id,
previous_status,
new_status,
change_reason
)
VALUES (
$1, -- ID de la orden
$2, -- Estado anterior
$3::order_status, -- Nuevo estado
$4 -- Razón (ej: "Consumer processed successfully" o "RabbitMQ NACK")
)
RETURNING id;
`
const vOrderHistory = [args.id, currentOrder.status, args.new_status, args.reason]
const newOrderHistoryResult = await this.getFirst(
client.query<{ id: number }>(iOrderHistory, vOrderHistory)
)
if (newOrderHistoryResult.error != undefined) {
await client.query("ROLLBACK")
client.release()
return newOrderHistoryResult
}
await client.query("COMMIT")
const updatedOrder = await this.getFirst(
client.query<OrderTracking<any>>(qCurrentOrder, vCurrentOrder)
)
client.release()
return updatedOrder
}
public async finishOrder(args: { id: number, reason?: string }) {
const client = await this.pgClient.connect();
await client.query('BEGIN');
// 1. Se consulta la order de base
const qCurrentOrder = `
SELECT * FROM order_tracking
WHERE id = $1
`
const vCurrentOrder = [args.id]
const currentOrderResult = await this.getFirst(client.query<OrderTracking<any>>(qCurrentOrder, vCurrentOrder))
if (currentOrderResult.error != undefined) {
await client.query("ROLLBACK")
client.release()
return currentOrderResult
}
const currentOrder = currentOrderResult.data!
// 2. Si todo ok se actualiza el order
const uOrderTracking = `
UPDATE order_tracking
SET
status = 'finished',
update_date = (now() at time zone 'utc'),
finish_date = (now() at time zone 'utc')
WHERE id = $1
RETURNING id, status, update_date;
`
const vOrderTracking = [args.id]
const updatedOrderResult = await this.getFirst(
client.query<{ id: number, status: string, update_date: string }>(uOrderTracking, vOrderTracking)
)
if (updatedOrderResult.error != undefined) {
await client.query("ROLLBACK")
client.release()
return updatedOrderResult
}
// 3. Si todo ok se guardo un nuevo registro de history
const iOrderHistory = `
INSERT INTO order_history (
order_id,
previous_status,
new_status,
change_reason
)
VALUES (
$1, -- ID de la orden
$2, -- Estado anterior
'finished',
$3 -- Siempre "finished successfully" a no ser que se especifique otra razón
)
RETURNING id;
`
const vOrderHistory = [args.id, currentOrder.status, args.reason ?? "finished successfully"]
const newOrderHistoryResult = await this.getFirst(
client.query<{ id: number }>(iOrderHistory, vOrderHistory)
)
if (newOrderHistoryResult.error != undefined) {
await client.query("ROLLBACK")
client.release()
return newOrderHistoryResult
}
await client.query("COMMIT")
const updatedOrder = await this.getFirst(
client.query<OrderTracking<any>>(qCurrentOrder, vCurrentOrder)
)
client.release()
return updatedOrder
}
public async errorOrder(args: {
id: number,
status: "failed" | "dlx",
reason: string,
error?: string,
stackTrace?: string
}) {
const client = await this.pgClient.connect();
await client.query('BEGIN');
// 1. Se consulta la order de base
const qCurrentOrder = `
SELECT * FROM order_tracking
WHERE id = $1
`
const vCurrentOrder = [args.id]
const currentOrderResult = await this.getFirst(client.query<OrderTracking<any>>(qCurrentOrder, vCurrentOrder))
if (currentOrderResult.error != undefined) {
await client.query("ROLLBACK")
client.release()
return currentOrderResult
}
const currentOrder = currentOrderResult.data!
// 3. Si todo ok se actualiza el order
// Si el status es dlx se asume que ha terminado y no va a reintentarse
// Si es failed se asume que se ha movido a la cola de delay y en algún momento se va a reintentar
const uOrderTracking = `
UPDATE order_tracking
SET
status = $2::order_status,
update_date = (now() at time zone 'utc'),
finish_date = CASE WHEN $2::order_status = 'dlx' THEN (now() at time zone 'utc') ELSE null END,
retry_count = retry_count + 1,
error_message = $3,
error_stacktrace = $4
WHERE id = $1
RETURNING id, status, update_date;
`
const vOrderTracking = [args.id, args.status, args.error, args.stackTrace]
const updatedOrderResult = await this.getFirst(
client.query<{ id: number, status: string, update_date: string }>(uOrderTracking, vOrderTracking)
)
if (updatedOrderResult.error != undefined) {
await client.query("ROLLBACK")
client.release()
return updatedOrderResult
}
// 3. Si todo ok se guardo un nuevo registro de history
const iOrderHistory = `
INSERT INTO order_history (
order_id,
previous_status,
new_status,
change_reason
)
VALUES (
$1, -- ID de la orden
$2, -- Estado anterior
$3::order_status, -- En este caso particular 'dlx' o 'failed'
$4 -- En este caso el motivo de fallo completo
)
RETURNING id;
`
const vOrderHistory = [args.id, currentOrder.status, args.status, args.reason]
const newOrderHistoryResult = await this.getFirst(
client.query<{ id: number }>(iOrderHistory, vOrderHistory)
)
if (newOrderHistoryResult.error != undefined) {
await client.query("ROLLBACK")
client.release()
return newOrderHistoryResult
}
await client.query("COMMIT")
const updatedOrder = await this.getFirst(
client.query<OrderTracking<any>>(qCurrentOrder, vCurrentOrder)
)
client.release()
return updatedOrder
}
}