Files
sf-sim/packages/sim-consumidor-objenious/config/eventBus.config.ts

73 lines
2.0 KiB
TypeScript
Raw Normal View History

import { RabbitMQEventBus, RMQConnectionParams } from "sim-shared/infrastructure/RabbitMQEventBus.js"
import { Channel } from "amqp-connection-manager"
import { env } from "./env/index.js"
2026-01-16 11:14:35 +01:00
const rmqUser = env.RABBITMQ_USER
const rmqPass = env.RABBITMQ_PASSWORD
const rmqHost = env.RABBITMQ_HOST
const rmqPort = Number(env.RABBITMQ_PORT)
const rmqSecure = false
const rmqVhost = env.RABBITMQ_VHOST
export const rmqConnOptions = <RMQConnectionParams>{
username: rmqUser,
password: rmqPass,
vhost: rmqVhost,
hostname: rmqHost,
port: rmqPort,
secure: rmqSecure,
}
2026-04-22 12:31:46 +02:00
export const QUEUES = {
OBJ: "sim.objenious",
OBJDLX: "sim.objenious.dlx",
OBJDEL: "sim.objenious.delayed",
}
export const EXCHANGES = {
MAIN: "sim.exchange",
DLX: "sim.ex.objenious.dlx",
DEL: "sim.ex.objenious.delayed"
}
2026-01-16 11:14:35 +01:00
export const rabbitmqEventBus = new RabbitMQEventBus({
connectionParams: rmqConnOptions,
2026-01-30 10:42:48 +01:00
buildStructure: buildQueues,
2026-04-22 12:31:46 +02:00
maxRetry: 5,
delayedExchange: EXCHANGES.DEL,
dlxExchange: EXCHANGES.DLX
2026-01-16 11:14:35 +01:00
})
2026-01-30 10:42:48 +01:00
async function buildQueues(channel: Channel) {
const DELAY = 10 * 1000
const BASE_OBENIOUS_KEY = "sim.objenious.#"
await channel.assertExchange(EXCHANGES.DEL, "topic")
await channel.assertExchange(EXCHANGES.DLX, "topic")
await channel.assertExchange(EXCHANGES.MAIN, "topic")
2026-05-12 13:37:48 +02:00
await channel.assertQueue(QUEUES.OBJ, { durable: true })
await channel.assertQueue(QUEUES.OBJDLX, { durable: true })
2026-04-16 12:44:31 +02:00
await channel.assertQueue(QUEUES.OBJDEL, {
2026-01-30 10:42:48 +01:00
durable: true,
arguments: {
'x-message-ttl': DELAY,
'x-dead-letter-exchange': EXCHANGES.MAIN,
}
})
// Cola dead-letter
2026-04-21 10:22:53 +02:00
await channel.bindQueue(QUEUES.OBJDLX, EXCHANGES.DLX, "sim.objenious.#")
2026-01-30 10:42:48 +01:00
// Cola delay
2026-04-21 10:22:53 +02:00
await channel.bindQueue(QUEUES.OBJDEL, EXCHANGES.DEL, BASE_OBENIOUS_KEY)
2026-01-30 10:42:48 +01:00
// Cola objenious -> main exchange
await channel.bindQueue(QUEUES.OBJ, EXCHANGES.MAIN, BASE_OBENIOUS_KEY)
}
2026-01-30 10:42:48 +01:00
export async function startRMQClient() {
await rabbitmqEventBus.connect()
2026-01-16 11:14:35 +01:00
return rabbitmqEventBus
}