Orders para test y flujo de migraciones mas simple
This commit is contained in:
@@ -3,13 +3,14 @@ import { SimUsecases } from "./Sim.usecases.js"
|
||||
import { activationValidator, iccidValidator } from "./httpValidators.js"
|
||||
import { companyFromIccid } from "#domain/companies.js"
|
||||
import { BodyValidator } from "sim-shared/aplication/BodyValidator.js"
|
||||
import { error } from "node:console"
|
||||
|
||||
|
||||
export class SimController {
|
||||
private simUseCases: SimUsecases
|
||||
|
||||
constructor(args: {
|
||||
simUseCases: SimUsecases
|
||||
simUseCases: SimUsecases,
|
||||
}) {
|
||||
this.simUseCases = args.simUseCases
|
||||
|
||||
@@ -79,6 +80,18 @@ export class SimController {
|
||||
}
|
||||
}
|
||||
|
||||
public preactivationTest() {
|
||||
return this.controllerGenerator({
|
||||
validator: iccidValidator,
|
||||
useCase: this.simUseCases.test,
|
||||
onError: (data, error) => console.error(error),
|
||||
onSuccess: (data) => {
|
||||
console.log("OK", data)
|
||||
}
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
public preactivation() {
|
||||
return async (req: Request, res: Response) => {
|
||||
console.warn("[!] Se deberia de usar la peticion /sim/activate directamente")
|
||||
|
||||
@@ -1,23 +1,65 @@
|
||||
import { OrderRepository } from "sim-shared/infrastructure/OrderRepository.js";
|
||||
import assert from "node:assert";
|
||||
import { EventBus } from "sim-shared/domain/EventBus.port";
|
||||
import { SimEvents } from "sim-shared/domain/SimEvents";
|
||||
import { uuidv7 } from "uuidv7";
|
||||
import { CreateOrderDTO, OrderType } from "sim-shared/domain/Order.js";
|
||||
|
||||
/**
|
||||
* TODO:
|
||||
* - Conexion con la BDD
|
||||
* - Conexion con RabbitMQ
|
||||
* - Pasar a clase cuando existan las conexiones
|
||||
* Casos de uso de tarjetas sim. Garantiza que todos los metodos usan el mismo bus de mensajes
|
||||
* y repositorio de registro de las ordenes.
|
||||
*/
|
||||
export class SimUsecases {
|
||||
private eventBus: EventBus
|
||||
private eventBus: EventBus;
|
||||
private orderRepository: OrderRepository;
|
||||
|
||||
constructor(args: {
|
||||
eventBus: EventBus
|
||||
eventBus: EventBus,
|
||||
orderRepository: OrderRepository
|
||||
}
|
||||
) {
|
||||
this.eventBus = args.eventBus
|
||||
this.orderRepository = args.orderRepository
|
||||
}
|
||||
|
||||
|
||||
async test(args: { iccid: string }) {
|
||||
assert(args.iccid != undefined)
|
||||
const uuid = uuidv7()
|
||||
const event = <SimEvents.general>{
|
||||
key: `sim.test.test`,
|
||||
payload: {
|
||||
iccid: args.iccid
|
||||
},
|
||||
headers: {
|
||||
message_id: uuid
|
||||
}
|
||||
}
|
||||
|
||||
const publish = await this.eventBus.publish([event])
|
||||
|
||||
/**
|
||||
* TODO:
|
||||
* De momento solo para mensajes publicados de 1 en 1 y si se les ha añadido cabecera
|
||||
* Si se ha saltado el proceso de añadir un ID no se
|
||||
*/
|
||||
if (publish.success.length == 1) {
|
||||
if (event.headers?.message_id != undefined) {
|
||||
const orderType = (event.key.split(".")[2] as OrderType ?? "unknown")
|
||||
assert(orderType)
|
||||
const order: CreateOrderDTO = {
|
||||
correlation_id: event.headers.message_id,
|
||||
order_type: orderType,
|
||||
routing_key: event.key,
|
||||
payload: event
|
||||
}
|
||||
this.orderRepository.createOrder(order)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* WIP
|
||||
* Crea una nueva sim de la que no se tenia registro anteriormente
|
||||
* Si ya existia se modifican los campos pero no se hace un cambio
|
||||
* de estado.
|
||||
|
||||
@@ -8,8 +8,10 @@ export type DomainEventType = string
|
||||
export type DomainEvent = {
|
||||
key: string,
|
||||
payload: Object,
|
||||
options: Object,
|
||||
occurredOn: Date,
|
||||
headers?: Object & {
|
||||
message_id?: string
|
||||
},
|
||||
occurredOn?: Date,
|
||||
}
|
||||
|
||||
export interface DomainEventSubscriber<T extends DomainEvent> {
|
||||
|
||||
@@ -2,7 +2,7 @@ import { ConsumeMessage } from "amqplib";
|
||||
import { DomainEvent, DomainEventSubscriber } from "./DomainEvent.js";
|
||||
|
||||
export interface EventBus {
|
||||
publish(events: Array<DomainEvent>): Promise<void>;
|
||||
publish(events: Array<DomainEvent>): Promise<{ success: DomainEvent[], error: DomainEvent[] }>;
|
||||
// Sacado de NEKI, posiblemente no haga falta
|
||||
addSubscribers(subscribers: Array<DomainEventSubscriber<DomainEvent>>): void;
|
||||
|
||||
|
||||
@@ -17,6 +17,11 @@ export class RabbitMQEventBus implements EventBus {
|
||||
private buildStructure?: (chan: Channel) => Promise<void>
|
||||
private maxRetry: number = 0
|
||||
|
||||
connection?: AmqpConnectionManager
|
||||
channel?: ChannelWrapper
|
||||
connected: Boolean = false
|
||||
|
||||
private connectionOptions: RMQConnectionParams
|
||||
constructor(args: {
|
||||
connectionParams: RMQConnectionParams,
|
||||
buildStructure?: (chan: Channel) => Promise<void>,
|
||||
@@ -73,11 +78,6 @@ export class RabbitMQEventBus implements EventBus {
|
||||
//return this.channel.nack(msg, false, requeue)
|
||||
}
|
||||
|
||||
connection?: AmqpConnectionManager
|
||||
channel?: ChannelWrapper
|
||||
connected: Boolean = false
|
||||
|
||||
private connectionOptions: RMQConnectionParams
|
||||
|
||||
public async connect() {
|
||||
|
||||
@@ -96,28 +96,35 @@ export class RabbitMQEventBus implements EventBus {
|
||||
} catch (e) {
|
||||
console.error("[RMQ] Error estableciendo la conexion con el servidor", e)
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
publish(events: DomainEvent[]): Promise<void> {
|
||||
|
||||
return new Promise((res, rej) => {
|
||||
publish(events: DomainEvent[]): Promise<{ success: DomainEvent[], error: DomainEvent[] }> {
|
||||
return new Promise(async (res, rej) => {
|
||||
const successEvents: DomainEvent[] = []
|
||||
const errorEvents: DomainEvent[] = []
|
||||
try {
|
||||
for (const event of events) {
|
||||
const exchange = "sim.exchange"
|
||||
const routingKey = event.key
|
||||
const content = Buffer.from(JSON.stringify(event))
|
||||
this.channel?.publish(exchange, routingKey, content, {}, (err, ok) => {
|
||||
await this.channel?.publish(exchange, routingKey, content, {
|
||||
headers: {
|
||||
...event.headers
|
||||
}
|
||||
}, (err, ok) => {
|
||||
if (err == undefined) {
|
||||
console.log("Evento publicado ", event)
|
||||
successEvents.push(event)
|
||||
} else {
|
||||
console.error("Error publicando", event)
|
||||
errorEvents.push(event)
|
||||
}
|
||||
})
|
||||
}
|
||||
return res()
|
||||
return res({
|
||||
success: successEvents,
|
||||
error: errorEvents
|
||||
})
|
||||
} catch (err) {
|
||||
return rej(err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user