Reconexiones automaticas si se cae rabbitmq

This commit is contained in:
2026-01-28 10:25:49 +01:00
parent 50b3b307ec
commit ad217a277b
8 changed files with 98 additions and 42 deletions

View File

@@ -84,7 +84,7 @@
"vhost": "sim-vhost",
"destination": "sim.logs",
"destination_type": "queue",
"routing_key": "sim.*",
"routing_key": "sim.#",
"arguments": {}
}
]

View File

@@ -15,8 +15,7 @@ params:query {
}
body:form-urlencoded {
iccid: 1234
:
iccid: 12339912344
}
settings {

View File

@@ -17,6 +17,7 @@
},
"dependencies": {
"@tsconfig/node22": "^22.0.5",
"amqp-connection-manager": "^5.0.0",
"axios": "^1.13.3",
"cors": "^2.8.5",
"dotenv": "^17.2.3",

View File

@@ -1,4 +1,6 @@
import { type ChannelModel, type ConfirmChannel, ConsumeMessage, connect as amqConnect } from "amqplib";
import { connect, AmqpConnectionManager, ChannelWrapper, Channel } from "amqp-connection-manager"
import { DomainEvent, DomainEventSubscriber } from "../domain/DomainEvent";
import { EventBus } from "../domain/EventBus.port";
@@ -11,23 +13,25 @@ export type RMQConnectionParams = {
secure: boolean
}
const RETRY_DELAY = 1000
const PREFETCH_LIMIT = 1
export class RabbitMQEventBus implements EventBus {
private buildStructure?: (chan: Channel) => void
constructor(args: {
connectionParams: RMQConnectionParams
connectionParams: RMQConnectionParams,
buildStructure?: (chan: Channel) => void
}) {
this.connectionOptions = args.connectionParams
this.checkStructure();
if (args.buildStructure != undefined) this.buildStructure = args.buildStructure
}
async consume(queue: string, callback: (msg: ConsumeMessage | null) => void) {
// Comproaciones antes de escuchar
if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado");
this.checkStructure()
// El binding (cola -> [routingkey] -> exchange) lo hago por configuracion. Meter colas a demanda?
await this.channel.prefetch(1)
//await this.channel.prefetch(1)
await this.channel.consume(queue, callback)
}
@@ -42,26 +46,45 @@ export class RabbitMQEventBus implements EventBus {
return this.channel.nack(msg)
}
connection?: ChannelModel
channel?: ConfirmChannel
connection?: AmqpConnectionManager
channel?: ChannelWrapper
connected: Boolean = false
private connectionOptions: RMQConnectionParams
public async connect() {
this.connection = await this.createConnection();
if (this.connection == undefined) throw new Error("[RMQ] Error crecreando la conexion")
this.channel = await this.createConfirmChannel()
try {
this.connection = await this.createConnection();
if (this.connection == undefined) throw new Error("[RMQ] Error crecreando la conexion")
this.channel = await this.createConfirmChannel()
this.channel.on("close", () => {
console.log("[RMQ] Canal desconectado")
setTimeout(async () => {
this.connect().then(e => {
console.log("[RMQ] Canal reconectado")
})
}, 1000)
})
} catch (e) {
console.error("[RMQ] Error estableciendo la conexion con el servidor", e)
}
}
publish(events: DomainEvent[]): Promise<void> {
return new Promise((res, rej) => {
try {
for (const event of events) {
const exchange = "sim.exchange"
const routingKey = event.key
const content = Buffer.from(JSON.stringify(event))
this.channel?.publish(exchange, routingKey, content)
this.channel?.publish(exchange, routingKey, content, {}, (err, ok) => {
console.log("Confirmacion", err, ok)
})
}
return res()
} catch (err) {
@@ -74,17 +97,6 @@ export class RabbitMQEventBus implements EventBus {
throw new Error("Method not implemented.");
}
/**
* Verificacion que la estructura definida en el JSON corresponde con
* la esperada
* TODO: Faltan las colas fijas según las operaciones
*/
private checkStructure() {
this.channel?.assertQueue("sim.activations")
this.channel?.assertQueue("sim.cancelations")
this.channel?.assertQueue("sim.logs")
this.channel?.assertExchange("sim.exchange", "topic")
}
protected async createConnection() {
const { hostname, port, secure } = { ...this.connectionOptions }
@@ -92,7 +104,7 @@ export class RabbitMQEventBus implements EventBus {
const protocol = secure ? 'amqps' : 'amqp';
const vhost = this.connectionOptions.vhost
const connection = await amqConnect({
const connection = connect({
protocol,
hostname,
port,
@@ -101,17 +113,28 @@ export class RabbitMQEventBus implements EventBus {
vhost
});
connection.on('error', (error: unknown) => {
connection.on('error', async (error: unknown) => {
console.error(`[RMQ] Rabbitmq connection error :: ${error}`);
Promise.reject(error);
console.log(`[RMQ] Reintentando conexion`)
});
connection.on("disconnect", (err) => {
console.error(`[RMQ] Servidor Rabbitmq desconectado, reintentando ...`)
})
return connection;
}
protected async createConfirmChannel() {
const channel = await this.connection?.createConfirmChannel()
await channel?.prefetch(PREFETCH_LIMIT)
if (this.connection == undefined) throw new Error("Intentando crear un canal sin una conexion")
const channel = this.connection?.createChannel({
json: true,
setup: (channel: Channel) => {
if (this.buildStructure != undefined) this.buildStructure(channel)
},
})
//await channel.prefetch(PREFETCH_LIMIT)
if (channel == undefined) throw new Error("[RMQ] Error crecreando el canal")

View File

@@ -22,7 +22,10 @@ export const rabbitmqEventBus = new RabbitMQEventBus({
})
export async function startRMQClient() {
await rabbitmqEventBus.connect()
await rabbitmqEventBus.connect().catch(async e => {
console.error("Error en la conexion RMQ")
await rabbitmqEventBus.connect()
})
// Bindings especificos, deberia meterlos en la clase
try {

View File

@@ -1,4 +1,5 @@
import { RabbitMQEventBus, RMQConnectionParams } from "#shared/infrastructure/RabbitMQEventBus"
import { Channel } from "amqp-connection-manager"
import { env } from "./env"
const rmqUser = env.RABBITMQ_USER
@@ -18,14 +19,11 @@ export const rmqConnOptions = <RMQConnectionParams>{
}
export const rabbitmqEventBus = new RabbitMQEventBus({
connectionParams: rmqConnOptions
connectionParams: rmqConnOptions,
buildStructure: buildQueues
})
export async function startRMQClient() {
await rabbitmqEventBus.connect()
const channel = rabbitmqEventBus.channel
// Bindings especificos, deberia meterlos en la clase
function buildQueues(channel: Channel) {
channel?.assertQueue("sim.objenious")
.then(e => {
console.log("[o] Creada la cola " + e.queue)
@@ -57,6 +55,9 @@ export async function startRMQClient() {
.catch(e => {
console.error(e)
})
}
export async function startRMQClient() {
await rabbitmqEventBus.connect()
return rabbitmqEventBus
}

View File

@@ -1,5 +1,6 @@
import { Request, Response } from "express"
import { SimUsecases } from "aplication/Sim.usecases"
import { error } from "node:console"
// Partiendo del caracter 3 2 de pais + 2 de compañia
// Metiendolo a la BDD podria ser mas dinamico pero perderia
@@ -31,8 +32,22 @@ export class SimController {
const { iccid } = req.body
const compañia = this.compañiaFromIccid(iccid)
if (compañia == undefined) {
res.status(500).json({
errors: {
msg: "El iccid no pertenece a una compañia conocida"
}
})
return;
}
try {
await this.simUseCases.activation({ iccid, compañia })
res.status(200).json({
iccid: iccid,
operation: "activation"
}).send()
} catch (err) {
console.error("Error activando la sim ", req.body)
res.status(500).json({
@@ -41,11 +56,6 @@ export class SimController {
}
}).send()
}
res.status(200).json({
iccid: iccid,
operation: "activation"
}).send()
}
async cancelation(req: Request, res: Response) {

View File

@@ -719,6 +719,17 @@ __metadata:
languageName: node
linkType: hard
"amqp-connection-manager@npm:^5.0.0":
version: 5.0.0
resolution: "amqp-connection-manager@npm:5.0.0"
dependencies:
promise-breaker: "npm:^6.0.0"
peerDependencies:
amqplib: "*"
checksum: 10/bf6c346537f71f1cdf03f4d1832c2945b3f80274e8fca41fa96db35ecc3ea0162e1fbf4cba76326fb3784b2362c8c6dd14a74e3ca30e1fc6dcd3ba6dcc172828
languageName: node
linkType: hard
"amqplib@npm:^0.10.9":
version: 0.10.9
resolution: "amqplib@npm:0.10.9"
@@ -2001,6 +2012,13 @@ __metadata:
languageName: node
linkType: hard
"promise-breaker@npm:^6.0.0":
version: 6.0.0
resolution: "promise-breaker@npm:6.0.0"
checksum: 10/6f7ad5e55d3f434dc1e02907c3294dc4a44f9962d9af9de186095c75c8f76d11feeb927e96ec9e177fcc9209690defb5c64eeac4767e7c3dd4f120e9d14fb0c8
languageName: node
linkType: hard
"promise-retry@npm:^2.0.1":
version: 2.0.1
resolution: "promise-retry@npm:2.0.1"
@@ -2394,6 +2412,7 @@ __metadata:
"@types/express": "npm:^5.0.6"
"@types/node": "npm:^25.0.3"
"@types/supertest": "npm:^6.0.3"
amqp-connection-manager: "npm:^5.0.0"
axios: "npm:^1.13.3"
concurrently: "npm:^9.2.1"
cors: "npm:^2.8.5"