Cola con delay -> cola dead letter
This commit is contained in:
@@ -7,6 +7,6 @@ export interface EventBus {
|
||||
addSubscribers(subscribers: Array<DomainEventSubscriber<DomainEvent>>): void;
|
||||
|
||||
consume(queue: string, callback: (msg: ConsumeMessage | null) => void): void;
|
||||
ack(msg: ConsumeMessage): void;
|
||||
nack(msg: ConsumeMessage): void;
|
||||
ack(msg: ConsumeMessage): Promise<void>;
|
||||
nack(msg: ConsumeMessage): Promise<void>;
|
||||
}
|
||||
|
||||
@@ -53,6 +53,7 @@ export class HttpClient {
|
||||
if (error.response?.status == 401) {
|
||||
this.jwtManager.getAccessToken()
|
||||
}
|
||||
return Promise.reject(error)
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@@ -14,14 +14,17 @@ export type RMQConnectionParams = {
|
||||
}
|
||||
|
||||
export class RabbitMQEventBus implements EventBus {
|
||||
private buildStructure?: (chan: Channel) => void
|
||||
private buildStructure?: (chan: Channel) => Promise<void>
|
||||
private maxRetry: number = 0
|
||||
|
||||
constructor(args: {
|
||||
connectionParams: RMQConnectionParams,
|
||||
buildStructure?: (chan: Channel) => void
|
||||
buildStructure?: (chan: Channel) => Promise<void>,
|
||||
maxRetry?: number
|
||||
}) {
|
||||
this.connectionOptions = args.connectionParams
|
||||
if (args.buildStructure != undefined) this.buildStructure = args.buildStructure
|
||||
if (args.maxRetry != undefined) this.maxRetry = args.maxRetry
|
||||
}
|
||||
|
||||
async consume(queue: string, callback: (msg: ConsumeMessage | null) => void) {
|
||||
@@ -30,14 +33,34 @@ export class RabbitMQEventBus implements EventBus {
|
||||
await this.channel.consume(queue, callback)
|
||||
}
|
||||
|
||||
ack(msg: ConsumeMessage) {
|
||||
async ack(msg: ConsumeMessage) {
|
||||
if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado");
|
||||
return this.channel.ack(msg)
|
||||
}
|
||||
|
||||
nack(msg: ConsumeMessage) {
|
||||
async nack(msg: ConsumeMessage, requeue?: boolean) {
|
||||
if (this.channel == undefined) throw new Error("[RMQ] Canal no iniciallizado");
|
||||
return this.channel.nack(msg)
|
||||
|
||||
const headers = msg.properties.headers || {}
|
||||
const numberRetry = headers['x-retry-count'] || 0
|
||||
const routingKey = msg.fields.routingKey
|
||||
|
||||
if (numberRetry < this.maxRetry) {
|
||||
await this.channel.publish("sim.ex.objenious.delayed", routingKey, msg.content, {
|
||||
headers: {
|
||||
...headers,
|
||||
'x-retry-count': numberRetry + 1
|
||||
}
|
||||
})
|
||||
} else {
|
||||
await this.channel.publish("sim.ex.objenious.dlx", routingKey, msg.content, {
|
||||
headers: {
|
||||
...headers
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
//return this.channel.nack(msg, false, requeue)
|
||||
}
|
||||
|
||||
connection?: AmqpConnectionManager
|
||||
@@ -113,23 +136,36 @@ export class RabbitMQEventBus implements EventBus {
|
||||
});
|
||||
|
||||
connection.on("disconnect", (err: unknown) => {
|
||||
console.error(`[RMQ] Servidor Rabbitmq desconectado, reintentando ... :: ${err}`)
|
||||
console.error(`[RMQ] Servidor Rabbitmq desconectado, reintentando ... ::`, err)
|
||||
})
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
protected async createChannel() {
|
||||
const delay = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
|
||||
|
||||
if (this.connection == undefined) throw new Error("[RMQ] Intentando crear un canal sin una conexion")
|
||||
const channel = this.connection.createChannel({
|
||||
setup: (channel: Channel) => {
|
||||
setup: async (channel: Channel) => {
|
||||
// Exchanges comunes a todos
|
||||
channel.assertExchange("sim.exchange", "topic", { durable: true })
|
||||
channel.assertExchange("sim.dlx", "topic", { durable: true })
|
||||
|
||||
// Estructuras propias de cada servicio
|
||||
if (this.buildStructure != undefined) {
|
||||
this.buildStructure(channel)
|
||||
let topoligaSuccess = false
|
||||
while (!topoligaSuccess) {
|
||||
try {
|
||||
await this.buildStructure(channel)
|
||||
topoligaSuccess = true
|
||||
} catch (e) {
|
||||
console.log("[RMQ] Error Creando la topologia de rabbitmq", e)
|
||||
topoligaSuccess = false
|
||||
await delay(5 * 1000)
|
||||
}
|
||||
|
||||
}
|
||||
} else {
|
||||
console.warn("[i] Se ha creado un canal sin garantizar que exista la/s cola/s que se van a usar")
|
||||
}
|
||||
|
||||
@@ -2,6 +2,8 @@ import { EventBus } from "#shared/domain/EventBus.port";
|
||||
import { ConsumeMessage } from "amqplib";
|
||||
import { SimUseCases } from "./Sim.usecases";
|
||||
import { SimEvents } from "#shared/domain/SimEvents";
|
||||
import { constants } from "node:buffer";
|
||||
import { constrainedMemory } from "node:process";
|
||||
|
||||
/**
|
||||
* La clase usa generadores de funciones para mantener el contexto
|
||||
@@ -47,23 +49,35 @@ export class SimController {
|
||||
}
|
||||
const msgData = this.decodeMsg(msg) as SimEvents.activation
|
||||
|
||||
if (msgData == undefined || msgData.payload == undefined) Promise.reject("Mensaje invalido")
|
||||
if (msgData == undefined || msgData.payload == undefined) throw new Error("Mensaje invalido")
|
||||
console.log("Mensaje procesado", msgData?.toString())
|
||||
|
||||
// TODO: Añadir un validador del mensaje
|
||||
|
||||
const iccid = msgData.payload.iccid
|
||||
|
||||
const headers = msg.properties.headers
|
||||
console.log("HEADERS: ", headers)
|
||||
|
||||
try {
|
||||
// Caso de uso de activaciones
|
||||
await this.useCases.activate({
|
||||
const result = await this.useCases.activate({
|
||||
dueDate: this.genDueDate(2 * 60).toISOString(),
|
||||
identifier: {
|
||||
identifierType: "ICCID",
|
||||
identifiers: [iccid]
|
||||
}
|
||||
})()
|
||||
this.eventBus.ack(msg)
|
||||
|
||||
console.log("Resultado de la peticion", result)
|
||||
|
||||
if (result.error == undefined) {
|
||||
console.log("Ack", msgData)
|
||||
await this.eventBus.ack(msg)
|
||||
} else {
|
||||
console.log("Nack", msgData)
|
||||
await this.eventBus.nack(msg)
|
||||
}
|
||||
} catch (e) {
|
||||
this.eventBus.nack(msg)
|
||||
}
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import { ActivationData } from "#domain/DTOs/objeniousapi"
|
||||
import { HttpClient } from "#shared/infrastructure/HTTPClient"
|
||||
import { AxiosError } from "axios"
|
||||
import { error } from "node:console"
|
||||
|
||||
// TODO: Pasar a un archivo de DTOs
|
||||
|
||||
@@ -21,11 +23,27 @@ export class SimUseCases {
|
||||
...activationData
|
||||
})
|
||||
|
||||
|
||||
try {
|
||||
const e = await req
|
||||
console.log("Activacion con exito", e.data.response)
|
||||
const response = await req
|
||||
console.log("[!] El status de la respuesta es", response.status)
|
||||
|
||||
if (response.status == 200) {
|
||||
console.log("Activacion con exito", response.data.response)
|
||||
return {
|
||||
error: undefined,
|
||||
ok: true
|
||||
}
|
||||
} else {
|
||||
return {
|
||||
error: response.status
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.error("Error activando ", error)
|
||||
console.error("[Sim.usecase] Error activando ", (error as AxiosError).response?.status)
|
||||
return {
|
||||
error: "Error general de la petiacion"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,43 +20,50 @@ export const rmqConnOptions = <RMQConnectionParams>{
|
||||
|
||||
export const rabbitmqEventBus = new RabbitMQEventBus({
|
||||
connectionParams: rmqConnOptions,
|
||||
buildStructure: buildQueues
|
||||
buildStructure: buildQueues,
|
||||
maxRetry: 5
|
||||
})
|
||||
|
||||
function buildQueues(channel: Channel) {
|
||||
channel?.assertQueue("sim.objenious")
|
||||
.then(e => {
|
||||
console.log("[o] Creada la cola " + e.queue)
|
||||
channel?.bindQueue("sim.objenious", "sim.exchange", "sim.objenious.*")
|
||||
.then(e => {
|
||||
console.log("[o] Bindeada la cola sim.objenious a sim.exchange ")
|
||||
})
|
||||
.catch(e => {
|
||||
console.error(e)
|
||||
})
|
||||
})
|
||||
.catch(e => {
|
||||
console.error(e)
|
||||
})
|
||||
async function buildQueues(channel: Channel) {
|
||||
const QUEUES = {
|
||||
OBJ: "sim.objenious",
|
||||
DLX: "sim.objenious.dlx",
|
||||
DEL: "sim.objenious.delayed"
|
||||
}
|
||||
|
||||
const EXCHANGES = {
|
||||
MAIN: "sim.exchange",
|
||||
DLX: "sim.ex.objenious.dlx",
|
||||
DEL: "sim.ex.objenious.delayed"
|
||||
}
|
||||
|
||||
channel?.assertQueue("sim.objenious.dlx")
|
||||
.then(e => {
|
||||
console.log("[o] Creada la cola " + e.queue)
|
||||
const DELAY = 10 * 1000
|
||||
const BASE_OBENIOUS_KEY = "sim.objenious.#"
|
||||
|
||||
await channel.assertExchange(EXCHANGES.DEL, "topic")
|
||||
await channel.assertExchange(EXCHANGES.DLX, "topic")
|
||||
await channel.assertExchange(EXCHANGES.MAIN, "topic")
|
||||
|
||||
await channel.assertQueue(QUEUES.OBJ)
|
||||
await channel.assertQueue(QUEUES.DLX)
|
||||
await channel.assertQueue(QUEUES.DEL, {
|
||||
durable: true,
|
||||
arguments: {
|
||||
'x-message-ttl': DELAY,
|
||||
'x-dead-letter-exchange': EXCHANGES.MAIN,
|
||||
}
|
||||
})
|
||||
|
||||
// Cola dead-letter
|
||||
await channel.bindQueue(QUEUES.DLX, EXCHANGES.DLX, "sim.objenious.#")
|
||||
// Cola delay
|
||||
await channel.bindQueue(QUEUES.DEL, EXCHANGES.DEL, BASE_OBENIOUS_KEY)
|
||||
// Cola objenious -> main exchange
|
||||
await channel.bindQueue(QUEUES.OBJ, EXCHANGES.MAIN, BASE_OBENIOUS_KEY)
|
||||
|
||||
channel?.bindQueue("sim.objenious.dlx", "sim.dlx", "sim.objenious.*")
|
||||
.then(e => {
|
||||
console.log("[o] Bindeada la cola sim.objenious.dlx a sim.dlx")
|
||||
})
|
||||
.catch(e => {
|
||||
console.error(e)
|
||||
})
|
||||
})
|
||||
.catch(e => {
|
||||
console.error(e)
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
export async function startRMQClient() {
|
||||
await rabbitmqEventBus.connect()
|
||||
return rabbitmqEventBus
|
||||
|
||||
@@ -41,6 +41,7 @@ export class SimController {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
try {
|
||||
await this.simUseCases.activation({ iccid, compañia })
|
||||
|
||||
@@ -65,9 +66,10 @@ export class SimController {
|
||||
if (valido == false) return; // Si no es valido ya se ha enviado el error
|
||||
|
||||
const { iccid } = req.body
|
||||
const compañia = this.compañiaFromIccid(iccid)
|
||||
|
||||
try {
|
||||
await this.simUseCases.cancelation({ iccid })
|
||||
await this.simUseCases.cancelation({ iccid, compañia })
|
||||
res.status(200).json({
|
||||
iccid: iccid,
|
||||
operation: "cancelation"
|
||||
@@ -90,9 +92,10 @@ export class SimController {
|
||||
if (valido == false) return; // Si no es valido ya se ha enviado el error
|
||||
|
||||
const { iccid } = req.body
|
||||
const compañia = this.compañiaFromIccid(iccid)
|
||||
|
||||
try {
|
||||
await this.simUseCases.cancelation({ iccid })
|
||||
await this.simUseCases.cancelation({ iccid, compañia })
|
||||
res.status(200).json({
|
||||
iccid: iccid,
|
||||
operation: "cancelation"
|
||||
@@ -114,9 +117,10 @@ export class SimController {
|
||||
if (valido == false) return; // Si no es valido ya se ha enviado el error
|
||||
|
||||
const { iccid } = req.body
|
||||
const compañia = this.compañiaFromIccid(iccid)
|
||||
|
||||
try {
|
||||
await this.simUseCases.cancelation({ iccid })
|
||||
await this.simUseCases.cancelation({ iccid, compañia })
|
||||
res.status(200).json({
|
||||
iccid: iccid,
|
||||
operation: "liberacion"
|
||||
@@ -139,9 +143,10 @@ export class SimController {
|
||||
if (valido == false) return; // Si no es valido ya se ha enviado el error
|
||||
|
||||
const { iccid } = req.body
|
||||
const compañia = this.compañiaFromIccid(iccid)
|
||||
|
||||
try {
|
||||
await this.simUseCases.cancelation({ iccid })
|
||||
await this.simUseCases.cancelation({ iccid, compañia })
|
||||
res.status(200).json({
|
||||
iccid: iccid,
|
||||
operation: "cancelation"
|
||||
|
||||
@@ -46,6 +46,17 @@ export class SimUsecases {
|
||||
return this.eventBus.publish([activationEvent])
|
||||
}
|
||||
|
||||
async cancelation(args: { iccid: string, compañia: string }) {
|
||||
|
||||
const activationEvent = <SimEvents.general>{
|
||||
key: `sim.${args.compañia}.cancelation`,
|
||||
payload: {
|
||||
iccid: args.iccid
|
||||
}
|
||||
}
|
||||
console.log("[d] Cancelation ", activationEvent)
|
||||
return this.eventBus.publish([activationEvent])
|
||||
}
|
||||
|
||||
async pause(args: { iccid: string }) {
|
||||
const cancelationEvent = <SimEvents.pause>{
|
||||
|
||||
Reference in New Issue
Block a user