Orders con endpoints para monitorizacion

This commit is contained in:
2026-02-25 12:20:52 +01:00
parent c416114c50
commit 02c80cd503
16 changed files with 373 additions and 63 deletions

View File

@@ -7,8 +7,8 @@ export type DomainEventType = string
export type DomainEvent = {
key: string,
payload: Object,
headers?: Object & {
payload: object,
headers?: object & {
message_id?: string
},
occurredOn?: Date,

View File

@@ -15,6 +15,16 @@ export type OrderType =
| 'reactivate'
| 'unknown';
export const OrderTypeOptions = new Set<OrderType>([
'activate',
'preactivate',
'cancel',
'pause',
'reactivate',
'unknown'
])
// Interfaz para la tabla order_tracking
export interface OrderTracking<T> {
id: number;
@@ -31,6 +41,9 @@ export interface OrderTracking<T> {
start_date: string | Date;
update_date: string | Date;
finish_date?: string | Date | null;
// desde la 1.1.0
webhook_host?: string | null;
webhook_endpoint?: string | null;
}
// Interfaz para la tabla order_history
@@ -46,5 +59,5 @@ export interface OrderHistory {
// Tipo útil para la creación (Omitiendo campos generados por la DB)
export type CreateOrderDTO = Pick<
OrderTracking<any>, // Aqui realmente no importan los campos
'correlation_id' | 'exchange' | 'routing_key' | 'order_type' | 'payload'
'correlation_id' | 'exchange' | 'routing_key' | 'order_type' | 'payload' | 'webhook_host' | 'webhook_endpoint'
>;

View File

@@ -63,7 +63,7 @@ describe("Test OrderRepository", {}, (ctx) => {
})
it("Find by correlation id should return a valid order", async () => {
const result = await orderRepo.getOrderByQueueId({ correlation_id: order1.correlation_id })
const result = await orderRepo.getOrderByQueueId({ message_id: order1.correlation_id })
assert(result.error == undefined)
assert(result.data != undefined)

View File

@@ -72,31 +72,41 @@ export class OrderRepository {
/**
* Busqueda según la id de RabbitMq
*/
public async getOrderByQueueId<T>(data: { correlation_id: string }, pool?: PoolClient) {
public async getOrderByQueueId<T>(data: { message_id: string }, pool?: PoolClient) {
const query = `
SELECT * FROM order_tracking
WHERE correlation_id = $1
`
const values = [data.correlation_id]
const values = [data.message_id]
const queryPromise = this.pgClient.query<OrderTracking<T>>(query, values)
const result = await this.getFirst(queryPromise);
return result
}
/**
*/
* Operaciones que no han concluido con filtros de limit, offset y start
* @param options ()
* @returns
*/
public async getPendingOrders<T>(options?: {
limit?: number
limit?: number,
offset?: number,
start?: number // id de inicio
}) {
const client = await this.pgClient.connect();
const offsetFragment = (options?.offset != undefined) ? `OFFSET ${options?.offset}` : ""
const limitFragment = (options?.limit != undefined) ? `LIMIT ${options?.limit}` : ""
const startFragment = (options?.start != undefined) ? `AND id >= ${options?.start}` : ""
const query = `
SELECT * FROM order_tracking
WHERE finish_date IS NULL
WHERE finish_date IS NULL
${startFragment}
ORDER BY start_date ASC
${offsetFragment}
${limitFragment}
`
if (options?.limit != undefined) {
}
const values: string[] = []
const queryPromise = client.query<OrderTracking<T>>(query, values)
const result = await this.getAll(queryPromise)
@@ -104,7 +114,7 @@ export class OrderRepository {
return result
}
public async createOrder(data: CreateOrderDTO) {
public async createOrder<T extends any>(data: CreateOrderDTO): Promise<Result<string, OrderTracking<T>>> {
const client = await this.pgClient.connect();
await client.query("BEGIN")
const query = `
@@ -114,7 +124,9 @@ export class OrderRepository {
routing_key,
order_type,
payload,
status
status,
webhook_host,
webhook_endpoint
)
VALUES (
$1, -- correlation_id
@@ -122,12 +134,23 @@ export class OrderRepository {
$3, -- routing_key
$4, -- order_type (ej: 'activate')
$5, -- payload (json object)
'pending'
'pending',
$6, -- webhook_host,
$7 -- webhook_endpoint
)
RETURNING id, correlation_id, status, start_date;
RETURNING
id,
correlation_id,
exchange,
routing_key,
order_type,
payload,
status,
webhook_host,
webhook_endpoint
`
const values = [data.correlation_id, data.exchange, data.routing_key, data.order_type, data.payload]
const queryPromise = client.query<{ id: number, correlation_id: string, status: string, start_date: string }>(query, values)
const values = [data.correlation_id, data.exchange, data.routing_key, data.order_type, data.payload, data.webhook_host, data.webhook_endpoint]
const queryPromise = client.query<OrderTracking<T>>(query, values)
// TODO comprobar si start_date convierte a Date por defecto, añadir enum de status
const result = await this.getFirst(queryPromise)

View File

@@ -107,20 +107,24 @@ export class RabbitMQEventBus implements EventBus {
const exchange = "sim.exchange"
const routingKey = event.key
const content = Buffer.from(JSON.stringify(event))
await this.channel?.publish(exchange, routingKey, content, {
const isPublished = await this.channel?.publish(exchange, routingKey, content, {
headers: {
...event.headers
}
}, (err, ok) => {
if (err == undefined) {
console.log("Evento publicado ", event)
successEvents.push(event)
} else {
console.error("Error publicando", event)
errorEvents.push(event)
}
})
// Hay que revisarlo pero en principio la libreria se encarga que el mensaje se publique
// si o si
successEvents.push(event)
}
return res({
success: successEvents,
error: errorEvents
@@ -168,7 +172,8 @@ export class RabbitMQEventBus implements EventBus {
if (this.connection == undefined) throw new Error("[RMQ] Intentando crear un canal sin una conexion")
const channel = this.connection.createChannel({
setup: async (channel: Channel) => {
confirm: true,
setup: async (channel: ConfirmChannel) => {
// Exchanges comunes a todos
channel.assertExchange("sim.exchange", "topic", { durable: true })
channel.assertExchange("sim.dlx", "topic", { durable: true })
@@ -202,6 +207,6 @@ export class RabbitMQEventBus implements EventBus {
Promise.reject(error);
});
return channel as ChannelWrapper;
return channel;
}
}