Bug bucle infinito de mensajes delay <-> cola
This commit is contained in:
@@ -38,14 +38,21 @@ export class RabbitMQEventBus implements EventBus {
|
||||
return this.channel.ack(msg)
|
||||
}
|
||||
|
||||
/**
|
||||
* TODO:
|
||||
* - Esta implementacion del nack debe estar en objenious
|
||||
*/
|
||||
async nack(msg: ConsumeMessage, requeue?: boolean) {
|
||||
if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado");
|
||||
|
||||
console.log("NACK: ", msg.properties.headers)
|
||||
|
||||
const headers = msg.properties.headers || {}
|
||||
const numberRetry = headers['x-retry-count'] || 0
|
||||
const routingKey = msg.fields.routingKey
|
||||
|
||||
if (numberRetry < this.maxRetry) {
|
||||
console.log("Dalaying")
|
||||
await this.channel.publish("sim.ex.objenious.delayed", routingKey, msg.content, {
|
||||
headers: {
|
||||
...headers,
|
||||
@@ -53,6 +60,7 @@ export class RabbitMQEventBus implements EventBus {
|
||||
}
|
||||
})
|
||||
} else {
|
||||
console.log("DeadLetter")
|
||||
await this.channel.publish("sim.ex.objenious.dlx", routingKey, msg.content, {
|
||||
headers: {
|
||||
...headers
|
||||
@@ -60,6 +68,8 @@ export class RabbitMQEventBus implements EventBus {
|
||||
})
|
||||
}
|
||||
|
||||
// Hace falta?
|
||||
this.channel.ack(msg)
|
||||
//return this.channel.nack(msg, false, requeue)
|
||||
}
|
||||
|
||||
@@ -100,7 +110,11 @@ export class RabbitMQEventBus implements EventBus {
|
||||
const routingKey = event.key
|
||||
const content = Buffer.from(JSON.stringify(event))
|
||||
this.channel?.publish(exchange, routingKey, content, {}, (err, ok) => {
|
||||
console.log("Confirmacion", err, ok)
|
||||
if (err == undefined) {
|
||||
console.log("Evento publicado ", event)
|
||||
} else {
|
||||
console.error("Error publicando", event)
|
||||
}
|
||||
})
|
||||
}
|
||||
return res()
|
||||
|
||||
Reference in New Issue
Block a user