73 lines
2.0 KiB
TypeScript
73 lines
2.0 KiB
TypeScript
import { RabbitMQEventBus, RMQConnectionParams } from "sim-shared/infrastructure/RabbitMQEventBus.js"
|
|
import { Channel } from "amqp-connection-manager"
|
|
import { env } from "./env/index.js"
|
|
|
|
const rmqUser = env.RABBITMQ_USER
|
|
const rmqPass = env.RABBITMQ_PASSWORD
|
|
const rmqHost = env.RABBITMQ_HOST
|
|
const rmqPort = Number(env.RABBITMQ_PORT)
|
|
const rmqSecure = false
|
|
const rmqVhost = env.RABBITMQ_VHOST
|
|
|
|
export const rmqConnOptions = <RMQConnectionParams>{
|
|
username: rmqUser,
|
|
password: rmqPass,
|
|
vhost: rmqVhost,
|
|
hostname: rmqHost,
|
|
port: rmqPort,
|
|
secure: rmqSecure,
|
|
}
|
|
|
|
export const QUEUES = {
|
|
OBJ: "sim.objenious",
|
|
OBJDLX: "sim.objenious.dlx",
|
|
OBJDEL: "sim.objenious.delayed",
|
|
}
|
|
|
|
export const EXCHANGES = {
|
|
MAIN: "sim.exchange",
|
|
DLX: "sim.ex.objenious.dlx",
|
|
DEL: "sim.ex.objenious.delayed"
|
|
}
|
|
export const rabbitmqEventBus = new RabbitMQEventBus({
|
|
connectionParams: rmqConnOptions,
|
|
buildStructure: buildQueues,
|
|
maxRetry: 5,
|
|
delayedExchange: EXCHANGES.DEL,
|
|
dlxExchange: EXCHANGES.DLX
|
|
})
|
|
|
|
async function buildQueues(channel: Channel) {
|
|
|
|
const DELAY = 10 * 1000
|
|
const BASE_OBENIOUS_KEY = "sim.objenious.#"
|
|
|
|
await channel.assertExchange(EXCHANGES.DEL, "topic")
|
|
await channel.assertExchange(EXCHANGES.DLX, "topic")
|
|
await channel.assertExchange(EXCHANGES.MAIN, "topic")
|
|
|
|
await channel.assertQueue(QUEUES.OBJ, { durable: true })
|
|
await channel.assertQueue(QUEUES.OBJDLX, { durable: true })
|
|
await channel.assertQueue(QUEUES.OBJDEL, {
|
|
durable: true,
|
|
arguments: {
|
|
'x-message-ttl': DELAY,
|
|
'x-dead-letter-exchange': EXCHANGES.MAIN,
|
|
}
|
|
})
|
|
|
|
// Cola dead-letter
|
|
await channel.bindQueue(QUEUES.OBJDLX, EXCHANGES.DLX, "sim.objenious.#")
|
|
// Cola delay
|
|
await channel.bindQueue(QUEUES.OBJDEL, EXCHANGES.DEL, BASE_OBENIOUS_KEY)
|
|
// Cola objenious -> main exchange
|
|
await channel.bindQueue(QUEUES.OBJ, EXCHANGES.MAIN, BASE_OBENIOUS_KEY)
|
|
|
|
}
|
|
|
|
|
|
export async function startRMQClient() {
|
|
await rabbitmqEventBus.connect()
|
|
return rabbitmqEventBus
|
|
}
|