Crea el slash command `/md-lint` para barrer cualquier `.md` del repo contra un set mínimo de reglas (MD004, MD030, MD031, MD032, MD036, MD040, MD026, MD047, MD034) sin añadir markdownlint-cli2 como devDep. Aplica el primer pase: 7 fences sin lenguaje declarado pasan a `text` en check.md, md-lint.md, SKILL.md, EVENTS-RABBITMQ.md y HOUSE-STYLE.md. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
9.7 KiB
Events & RabbitMQ — sf-sim
Cómo se diseñan, publican y consumen los eventos en el monorepo. Lee esto antes de tocar eventBus, exchanges, colas, retries o cuando diseñes un nuevo evento.
Tabla de contenidos
- Tipo
DomainEvent - Routing keys
- Topología de RabbitMQ
- Publicar
- Consumir
- Reintentos y DLX
- Idempotencia
- Outbox y atomicidad publish+save
- Versionado de eventos
- Tipos de evento del repo
Tipo DomainEvent
packages/sim-shared/domain/DomainEvent.ts:
export type DomainEvent = {
key: string, // routing key
payload: object, // datos del evento
headers?: object & {
message_id?: string // uuidv7, idempotencia
},
occurredOn?: Date,
}
Reglas:
key: routing key, NO es el nombre del evento humano. Sigue el patrón de routing (sim.[compañia].[acción]).payload: datos mínimos para que el consumidor haga su trabajo. NO meter info derivable.headers.message_id: SIEMPRE rellenarlo al publicar (uuidv7). Liga el evento a unaOrder.occurredOn: opcional pero recomendable. Útil para auditoría y para detectar mensajes viejos.
Eventos típicos extendidos (SimEvents namespace en sim-shared/domain/SimEvents.ts):
export namespace SimEvents {
export type activation = DomainEvent & {
key: `sim.${string}.activate`,
payload: { iccid: string, offer?: string },
}
export type cancel = DomainEvent & {
key: `sim.${string}.cancel`,
payload: { iccid: string },
options: {}
}
// ...
}
Los template literals en key te dan autocompletado y errores de compilación si te equivocas con la routing key. Aprovéchalo: define el tipo del evento ANTES de empezar a publicar.
Routing keys
Patrón: sim.[compañia].[acción]
compañia:alai|nos|objenious(del Map endomain/companies.ts)acción:activate|preActivate|reactivate|cancel|pause|free|save|test|unknown
Ejemplos válidos: sim.alai.activate, sim.nos.cancel, sim.objenious.pause.
Por qué este patrón: permite a cada worker hacer binding por compañía con sim.[compañia].* y filtrar por acción si quiere con sim.*.activate. Cambiar este patrón rompe los bindings de los workers existentes.
Si necesitas un nuevo nivel (sim.[compañia].[acción].[sub]):
- Justifica por qué no cabe en payload o headers
- Documenta en el README
- Comprueba que los bindings actuales (
sim.[compañia].*) siguen capturando o ajusta los workers
Topología de RabbitMQ
┌─────────────────┐
│ sim.exchange │ ← topic, exchange principal (DURABLE)
│ (publish) │
└────────┬────────┘
│ binding sim.[compañia].*
▼
┌────────────────────┐
│ q.<servicio>.input │ ← cola del consumidor
└──────────┬─────────┘
│ NACK
▼
┌─────────────────────┐
│ <servicio>.delayed │ ← delay exchange por servicio
└──────────┬──────────┘
│ tras N segundos
▼ (republish a sim.exchange con x-retry-count++)
(back al inicio)
│
│ tras maxRetry
▼
┌──────────────┐
│ sim.dlx │ ← exchange dead-letter
└──────────────┘
- Exchange principal:
sim.exchange, tipotopic, durable. - DLX:
sim.dlx, topic, durable. Cola unica de inspección manual. - Delayed: cada servicio define su propio delayed exchange en su
eventBusConfig.ts. - Bindings: cada worker hace binding desde
sim.exchangecon la routing key que le toca.
Estructura inicial garantizada: RabbitMQEventBus.createChannel asegura que sim.exchange y sim.dlx existen al conectar. Las colas y bindings propios los crea cada servicio en su buildStructure callback.
Publicar
// 1. Construye el evento con tipo
const ev = <SimEvents.activation>{
key: `sim.${compañia}.activate`,
payload: { iccid, offer },
}
// 2. Mete message_id (uuidv7) en headers
const evWithId = {
...ev,
headers: { ...ev.headers, message_id: uuidv7() }
}
// 3. Publica
const result = await this.eventBus.publish([evWithId]);
Reglas:
- Mete
message_idANTES de publicar. Si lo metes después, has perdido la trazabilidad si el publish falla. publishrecibe un array. Aprovecha si tienes que mandar varios.- Si el caso de uso debe persistir además del publish, lee la sección Outbox y atomicidad.
Consumir
this.eventBus.consume(QUEUE_NAME, async (msg) => {
if (msg == null) return;
try {
const event = JSON.parse(msg.content.toString());
const result = await this.usecase.handle(event);
if (result.error != undefined) {
console.error("[!] Handler error", result.error);
return this.eventBus.nack(msg); // delay → retry → DLX
}
return this.eventBus.ack(msg);
} catch (e) {
console.error("[!] Excepción inesperada", e);
return this.eventBus.nack(msg);
}
});
Reglas:
- Si el handler devuelve error de negocio recuperable →
nack(entra a delay). - Si lanza excepción inesperada →
nack(asume que es transitoria; si no lo es, elmaxRetryla moverá a DLX). - NO uses
nack(msg, false, true)(requeue inmediato): satura el broker y no permite el delay. La implementación actual hace publish manual al delayed exchange.
Reintentos y DLX
Configuración global: en sim-shared/config (a definir; actualmente cada servicio decide).
Per-mensaje: header x-retry-count. El bus lo incrementa en cada nack. Cuando supera maxRetry, en vez de delay republica al dlxExchange.
Política recomendada:
| Tipo de error | Acción |
|---|---|
| Red / 5xx externo / timeout | nack (delay + retry) |
| 4xx del proveedor con código de "imposible" | publish directo a DLX |
| Error de validación del payload | publish directo a DLX (no se va a recuperar) |
| Error de BDD que parece transitorio | nack |
| Error desconocido | nack, dejar que maxRetry decida |
Inspección de DLX: la cola DLX es de procesamiento manual. Después de N reintentos asumimos que un humano debe mirar.
Auditoría: un servicio debe documentar su maxRetry y su política, no asumirla del shared.
Idempotencia
Un consumidor PUEDE recibir el mismo mensaje dos veces (reentregas, restart del consumer, retries). Diseña los handlers asumiéndolo.
Mecanismos:
- Por
correlation_id/message_id: laOrderen BDD tiene unique sobrecorrelation_id. Si intentas crear una con el mismo, devuelve la existente sin escribir. - Por estado del agregado: si la operación es "activar", y el agregado ya está activo, no-op idempotente.
- Outbox de procesados: tabla
processed_messages(message_id)con unique. INSERT antes de procesar; si conflict, ya estaba.
Anti-patrón: asumir "exactly once". RabbitMQ no lo garantiza. Diseña para "at least once" + idempotencia.
Outbox y atomicidad publish+save
El problema actual: en varios usecases del repo se hace:
await this.eventBus.publish([eventWithId]); // 1
await this.saveOrder(eventWithId); // 2
Si (1) ok y (2) falla → evento publicado sin order que lo trackee. Si invertimos el orden, mismo problema en sentido contrario.
Solución (transactional outbox):
- En la misma transacción que escribe el agregado, INSERT en una tabla
outbox(id, payload, status='pending'). - Un publisher (cron, worker, listener postgres) lee
outboxy publica al bus. - Cuando el publish confirma, marca
outbox.status = 'sent'.
Tradeoff: añade un componente publisher. Para el repo actual, en muchos casos la consecuencia de la inconsistencia es asumible (la Order se reconcilia con el resultado del worker). Documenta explícitamente cuándo aceptas la ventana y cuándo NO.
Cuándo es bloqueante: cuando un evento perdido implica pérdida de datos o de dinero (activaciones que el cliente paga, p.ej.). En esos casos, outbox es obligatorio.
Versionado de eventos
Aún no resuelto en el repo (TODO en README). Recomendación:
- Añadir versión al payload o en headers (
headers.version: 1). - NUNCA romper compatibilidad de un evento ya publicado: añade campos opcionales, no quites campos.
- Si necesitas un cambio incompatible, crea un
sim.[compañia].[acción].v2y migra consumers gradualmente.
Tipos de evento del repo
Definidos en sim-shared/domain/SimEvents.ts:
| Tipo | Routing key | Payload | Notas |
|---|---|---|---|
general |
sim.*.* |
{ iccid } |
Tipo genérico, evita usarlo si tienes uno específico |
activation |
sim.${compañia}.activate |
{ iccid, offer? } |
|
preActivation |
sim.${compañia}.preActivate |
{ iccid } |
|
reActivation |
sim.${compañia}.reactivate |
{ iccid } |
|
cancel |
sim.${compañia}.cancel |
{ iccid } |
Alias terminate para Objenious |
pause |
sim.${compañia}.pause |
{ iccid } |
Alias suspend |
free |
sim.${compañia}.free |
{ iccid } |
|
save |
sim.${compañia}.save |
{ iccid, imei } |
Si añades un nuevo tipo: defínelo aquí, expórtalo desde SimEvents, documenta routing key y payload, y avisa de qué workers van a recibirlo.