Fix orders nos
This commit is contained in:
@@ -21,15 +21,22 @@ export class RabbitMQEventBus implements EventBus {
|
||||
channel?: ChannelWrapper
|
||||
connected: Boolean = false
|
||||
|
||||
private delayedExchange: string;
|
||||
private dlxExchange: string;
|
||||
|
||||
private connectionOptions: RMQConnectionParams
|
||||
constructor(args: {
|
||||
connectionParams: RMQConnectionParams,
|
||||
buildStructure?: (chan: Channel) => Promise<void>,
|
||||
maxRetry?: number
|
||||
maxRetry?: number,
|
||||
delayedExchange: string,
|
||||
dlxExchange: string
|
||||
}) {
|
||||
this.connectionOptions = args.connectionParams
|
||||
if (args.buildStructure != undefined) this.buildStructure = args.buildStructure
|
||||
if (args.maxRetry != undefined) this.maxRetry = args.maxRetry
|
||||
this.delayedExchange = args.delayedExchange
|
||||
this.dlxExchange = args.dlxExchange
|
||||
}
|
||||
|
||||
async consume(queue: string, callback: (msg: ConsumeMessage | null) => void) {
|
||||
@@ -50,23 +57,25 @@ export class RabbitMQEventBus implements EventBus {
|
||||
async nack(msg: ConsumeMessage, requeue?: boolean) {
|
||||
if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado");
|
||||
|
||||
console.log("NACK: ", msg.properties.headers)
|
||||
console.log("[i] NACK: ", msg.properties.headers)
|
||||
|
||||
const headers = msg.properties.headers || {}
|
||||
const numberRetry = headers['x-retry-count'] || 0
|
||||
const routingKey = msg.fields.routingKey
|
||||
|
||||
if (numberRetry < this.maxRetry) {
|
||||
console.log("Delaying")
|
||||
await this.channel.publish("sim.ex.objenious.delayed", routingKey, msg.content, {
|
||||
console.log("[i] Delaying ")
|
||||
// "sim.ex.objenious.delayed"
|
||||
await this.channel.publish(this.delayedExchange, routingKey, msg.content, {
|
||||
headers: {
|
||||
...headers,
|
||||
'x-retry-count': numberRetry + 1
|
||||
}
|
||||
})
|
||||
} else {
|
||||
console.log("DeadLetter")
|
||||
await this.channel.publish("sim.ex.objenious.dlx", routingKey, msg.content, {
|
||||
console.log("[i] DeadLetter")
|
||||
//"sim.ex.objenious.dlx"
|
||||
await this.channel.publish(this.dlxExchange, routingKey, msg.content, {
|
||||
headers: {
|
||||
...headers
|
||||
}
|
||||
|
||||
@@ -9,6 +9,6 @@
|
||||
],
|
||||
"include": [
|
||||
"**/*.ts",
|
||||
"../../packages/sim-shared/**/*.ts",
|
||||
"**/*.d.ts",
|
||||
]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user