base de datos de orders con repositorio y test
This commit is contained in:
381
packages/sim-shared/infrastructure/OrderRepository.ts
Normal file
381
packages/sim-shared/infrastructure/OrderRepository.ts
Normal file
@@ -0,0 +1,381 @@
|
||||
/**
|
||||
* TODO: Usar
|
||||
*/
|
||||
import { PoolClient, QueryResult, QueryResultRow } from "pg";
|
||||
import { CreateOrderDTO, OrderTracking } from "../domain/Order.js";
|
||||
import { Result } from "../domain/Result.js";
|
||||
import { PgClient } from "./PgClient.js";
|
||||
|
||||
export class OrderRepository {
|
||||
constructor(
|
||||
private readonly pgClient: PgClient
|
||||
) {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Comprobacion de la query y devolucion del primer resulado
|
||||
* Garantiza la gestion de errores
|
||||
*/
|
||||
private async getFirst<T extends QueryResultRow>(queryPromise: Promise<QueryResult<T>>) {
|
||||
try {
|
||||
const queryResult = await queryPromise
|
||||
return <Result<string, T>>{
|
||||
data: queryResult.rows[0]
|
||||
}
|
||||
} catch (e) {
|
||||
return <Result<string, T>>{
|
||||
error: e as string
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Se asume que se va a devolver una lista del tipo T
|
||||
*/
|
||||
private async getAll<T extends QueryResultRow>(queryPromise: Promise<QueryResult<T>>) {
|
||||
|
||||
try {
|
||||
const queryResult = await queryPromise
|
||||
return <Result<string, T[]>>{
|
||||
data: queryResult.rows
|
||||
}
|
||||
} catch (e) {
|
||||
return <Result<string, T>>{
|
||||
error: e as string
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* TODO: OrderTracking necestia un tipo para la estructura del mensaje almacenado
|
||||
*/
|
||||
public async getOrderById(data: { id: number }) {
|
||||
const query = `
|
||||
SELECT * FROM order_tracking
|
||||
WHERE id = $1
|
||||
`
|
||||
const values = [data.id]
|
||||
const queryPromise = this.pgClient.query<OrderTracking<any>>(query, values)
|
||||
const result = this.getFirst(queryPromise);
|
||||
return result
|
||||
}
|
||||
|
||||
/**
|
||||
* Busqueda según la id de RabbitMq
|
||||
*/
|
||||
public async getOrderByQueueId(data: { correlation_id: string }, pool?: PoolClient) {
|
||||
const query = `
|
||||
SELECT * FROM order_tracking
|
||||
WHERE correlation_id = $1
|
||||
`
|
||||
const values = [data.correlation_id]
|
||||
const queryPromise = this.pgClient.query<OrderTracking<any>>(query, values)
|
||||
const result = this.getFirst(queryPromise);
|
||||
return result
|
||||
}
|
||||
|
||||
/**
|
||||
* TODO:
|
||||
* - variable para el limit
|
||||
*/
|
||||
public async getPendingOrders() {
|
||||
const client = await this.pgClient.connect();
|
||||
const query = `
|
||||
SELECT * FROM order_tracking
|
||||
WHERE finish_date IS NULL
|
||||
ORDER BY start_date ASC
|
||||
`
|
||||
const values: string[] = []
|
||||
const queryPromise = client.query(query, values)
|
||||
const result = await this.getAll(queryPromise)
|
||||
client.release()
|
||||
return result
|
||||
}
|
||||
|
||||
public async createOrder(data: CreateOrderDTO) {
|
||||
const client = await this.pgClient.connect();
|
||||
await client.query("BEGIN")
|
||||
const query = `
|
||||
INSERT INTO order_tracking (
|
||||
correlation_id,
|
||||
exchange,
|
||||
routing_key,
|
||||
order_type,
|
||||
payload,
|
||||
status
|
||||
)
|
||||
VALUES (
|
||||
$1, -- correlation_id
|
||||
$2, -- exchange
|
||||
$3, -- routing_key
|
||||
$4, -- order_type (ej: 'activate')
|
||||
$5, -- payload (json object)
|
||||
'pending'
|
||||
)
|
||||
RETURNING id, correlation_id, status, start_date;
|
||||
`
|
||||
const values = [data.correlation_id, data.exchange, data.routing_key, data.order_type, data.payload]
|
||||
const queryPromise = client.query(query, values)
|
||||
const result = await this.getFirst(queryPromise)
|
||||
|
||||
if (result.error == undefined) {
|
||||
await client.query("COMMIT")
|
||||
} else {
|
||||
await client.query("ROLLBACK")
|
||||
}
|
||||
|
||||
client.release()
|
||||
return result
|
||||
}
|
||||
|
||||
/**
|
||||
* Actualizacion "correcta" del estado de un order
|
||||
*/
|
||||
public async updateOrder(args: {
|
||||
id: number,
|
||||
new_status: string,
|
||||
reason: string
|
||||
}) {
|
||||
const client = await this.pgClient.connect();
|
||||
await client.query('BEGIN');
|
||||
|
||||
// 1. Se consulta la order de base
|
||||
const qCurrentOrder = `
|
||||
SELECT * FROM order_tracking
|
||||
WHERE id = $1
|
||||
`
|
||||
const vCurrentOrder = [args.id]
|
||||
|
||||
const currentOrderResult = await this.getFirst(client.query<OrderTracking<any>>(qCurrentOrder, vCurrentOrder))
|
||||
|
||||
if (currentOrderResult.error != undefined) {
|
||||
await client.query("ROLLBACK")
|
||||
client.release()
|
||||
return currentOrderResult
|
||||
}
|
||||
|
||||
const currentOrder = currentOrderResult.data!
|
||||
|
||||
// 2. Si todo ok se actualiza el order
|
||||
const uOrderTracking = `
|
||||
UPDATE order_tracking
|
||||
SET
|
||||
status = $2,
|
||||
update_date = (now() at time zone 'utc')
|
||||
WHERE id = $1
|
||||
RETURNING id, status, update_date;
|
||||
`
|
||||
const vOrderTracking = [args.id, args.new_status]
|
||||
const updatedOrderResult = await this.getFirst(
|
||||
client.query<{ id: number, status: string, update_date: string }>(uOrderTracking, vOrderTracking)
|
||||
)
|
||||
|
||||
if (updatedOrderResult.error != undefined) {
|
||||
await client.query("ROLLBACK")
|
||||
client.release()
|
||||
return updatedOrderResult
|
||||
}
|
||||
|
||||
// 3. Si todo ok se añade una entradad de order_history con los datos modificados
|
||||
const iOrderHistory = `
|
||||
INSERT INTO order_history (
|
||||
order_id,
|
||||
previous_status,
|
||||
new_status,
|
||||
change_reason
|
||||
)
|
||||
VALUES (
|
||||
$1, -- ID de la orden
|
||||
$2, -- Estado anterior
|
||||
$3, -- Nuevo estado
|
||||
$4 -- Razón (ej: "Consumer processed successfully" o "RabbitMQ NACK")
|
||||
);
|
||||
`
|
||||
const vOrderHistory = [args.id, currentOrder.status, args.new_status, args.reason]
|
||||
const newOrderHistory = await this.getFirst(
|
||||
client.query<{ id: number, status: string, update_date: string }>(iOrderHistory, vOrderHistory)
|
||||
)
|
||||
|
||||
if (newOrderHistory.error != undefined) {
|
||||
await client.query("ROLLBACK")
|
||||
client.release()
|
||||
return updatedOrderResult
|
||||
}
|
||||
|
||||
await client.query("COMMIT")
|
||||
|
||||
const updatedOrder = await this.getFirst(
|
||||
client.query<OrderTracking<any>>(qCurrentOrder, vCurrentOrder)
|
||||
)
|
||||
|
||||
return updatedOrder
|
||||
}
|
||||
|
||||
|
||||
public async finishOrder(args: { id: number, reason?: string }) {
|
||||
const client = await this.pgClient.connect();
|
||||
await client.query('BEGIN');
|
||||
|
||||
// 1. Se consulta la order de base
|
||||
const qCurrentOrder = `
|
||||
SELECT * FROM order_tracking
|
||||
WHERE id = $1
|
||||
`
|
||||
const vCurrentOrder = [args.id]
|
||||
|
||||
const currentOrderResult = await this.getFirst(client.query<OrderTracking<any>>(qCurrentOrder, vCurrentOrder))
|
||||
|
||||
if (currentOrderResult.error != undefined) {
|
||||
await client.query("ROLLBACK")
|
||||
client.release()
|
||||
return currentOrderResult
|
||||
}
|
||||
|
||||
const currentOrder = currentOrderResult.data!
|
||||
|
||||
// 2. Si todo ok se actualiza el order
|
||||
const uOrderTracking = `
|
||||
UPDATE order_tracking
|
||||
SET
|
||||
status = 'finished',
|
||||
update_date = (now() at time zone 'utc'),
|
||||
finish_date = (now() at time zone 'utc')
|
||||
WHERE id = $1
|
||||
RETURNING id, status, update_date;
|
||||
`
|
||||
const vOrderTracking = [args.id]
|
||||
const updatedOrderResult = await this.getFirst(
|
||||
client.query<{ id: number, status: string, update_date: string }>(uOrderTracking, vOrderTracking)
|
||||
)
|
||||
|
||||
if (updatedOrderResult.error != undefined) {
|
||||
await client.query("ROLLBACK")
|
||||
client.release()
|
||||
return updatedOrderResult
|
||||
}
|
||||
|
||||
// 3. Si todo ok se guardo un nuevo registro de history
|
||||
const iOrderHistory = `
|
||||
INSERT INTO order_history (
|
||||
order_id,
|
||||
previous_status,
|
||||
new_status,
|
||||
change_reason
|
||||
)
|
||||
VALUES (
|
||||
$1, -- ID de la orden
|
||||
$2, -- Estado anterior
|
||||
'finished',
|
||||
$3 -- Siempre "finished successfully" a no ser que se especifique otra razón
|
||||
);
|
||||
`
|
||||
const vOrderHistory = [args.id, currentOrder.status, args.reason ?? "finished successfully"]
|
||||
const newOrderHistory = await this.getFirst(
|
||||
client.query<{ id: number, status: string, update_date: string }>(iOrderHistory, vOrderHistory)
|
||||
)
|
||||
|
||||
if (newOrderHistory.error != undefined) {
|
||||
await client.query("ROLLBACK")
|
||||
client.release()
|
||||
return updatedOrderResult
|
||||
}
|
||||
|
||||
await client.query("COMMIT")
|
||||
|
||||
const updatedOrder = await this.getFirst(
|
||||
client.query<OrderTracking<any>>(qCurrentOrder, vCurrentOrder)
|
||||
)
|
||||
|
||||
return updatedOrder
|
||||
}
|
||||
|
||||
public async errorOrder(args: {
|
||||
id: number,
|
||||
status: "failed" | "dlx",
|
||||
reason: string,
|
||||
error?: string,
|
||||
stackTrace?: string
|
||||
}) {
|
||||
const client = await this.pgClient.connect();
|
||||
await client.query('BEGIN');
|
||||
|
||||
// 1. Se consulta la order de base
|
||||
const qCurrentOrder = `
|
||||
SELECT * FROM order_tracking
|
||||
WHERE id = $1
|
||||
`
|
||||
const vCurrentOrder = [args.id]
|
||||
|
||||
const currentOrderResult = await this.getFirst(client.query<OrderTracking<any>>(qCurrentOrder, vCurrentOrder))
|
||||
|
||||
if (currentOrderResult.error != undefined) {
|
||||
await client.query("ROLLBACK")
|
||||
client.release()
|
||||
return currentOrderResult
|
||||
}
|
||||
|
||||
const currentOrder = currentOrderResult.data!
|
||||
|
||||
// 3. Si todo ok se actualiza el order
|
||||
// Si el status es dlx se asume que ha terminado y no va a reintentarse
|
||||
// Si es failed se asume que se ha movido a la cola de delay y en algún momento se va a reintentar
|
||||
const uOrderTracking = `
|
||||
UPDATE order_tracking
|
||||
SET
|
||||
status = $2,
|
||||
update_date = (now() at time zone 'utc'),
|
||||
finish_date = CASE WHEN $2 = 'dlx' THEN (now() at time zone 'utc') ELSE null,
|
||||
retry_count = retry_count + 1,
|
||||
error_message = $3,
|
||||
error_stacktrace = $4
|
||||
WHERE id = $1
|
||||
RETURNING id, status, update_date;
|
||||
`
|
||||
const vOrderTracking = [args.id, args.status, args.error, args.stackTrace]
|
||||
const updatedOrderResult = await this.getFirst(
|
||||
client.query<{ id: number, status: string, update_date: string }>(uOrderTracking, vOrderTracking)
|
||||
)
|
||||
|
||||
if (updatedOrderResult.error != undefined) {
|
||||
await client.query("ROLLBACK")
|
||||
client.release()
|
||||
return updatedOrderResult
|
||||
}
|
||||
|
||||
// 3. Si todo ok se guardo un nuevo registro de history
|
||||
const iOrderHistory = `
|
||||
INSERT INTO order_history (
|
||||
order_id,
|
||||
previous_status,
|
||||
new_status,
|
||||
change_reason
|
||||
)
|
||||
VALUES (
|
||||
$1, -- ID de la orden
|
||||
$2, -- Estado anterior
|
||||
$3, -- En este caso particular 'dlx' o 'failed'
|
||||
$4 -- En este caso el motivo de fallo completo
|
||||
);
|
||||
`
|
||||
const vOrderHistory = [args.id, currentOrder.status, args.reason ?? "finished successfully", args.stackTrace]
|
||||
const newOrderHistory = await this.getFirst(
|
||||
client.query<{ id: number, status: string, update_date: string }>(iOrderHistory, vOrderHistory)
|
||||
)
|
||||
|
||||
if (newOrderHistory.error != undefined) {
|
||||
await client.query("ROLLBACK")
|
||||
client.release()
|
||||
return updatedOrderResult
|
||||
}
|
||||
|
||||
await client.query("COMMIT")
|
||||
|
||||
const updatedOrder = await this.getFirst(
|
||||
client.query<OrderTracking<any>>(qCurrentOrder, vCurrentOrder)
|
||||
)
|
||||
|
||||
return updatedOrder
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user