Rabbit PubSub
Este documento describe los servicios IRabbitPubSub
y RabbitPubSubService
, que proporcionan una capa de abstracción para publicar y suscribirse a eventos de dominio utilizando RabbitMQ como broker de mensajería. RabbitPubSubService
implementa la interfaz IRabbitPubSub
y ofrece métodos para la publicación y suscripción asíncrona de eventos, incluyendo el manejo de la configuración de exchanges, queues y dead-letter exchanges (DLX).
namespace CodeDesignPlus.Net.RabbitMQ.Abstractions;
/// <summary>/// Interface for RabbitMQ publish-subscribe service./// </summary>public interface IRabbitPubSub : IMessage{}
namespace CodeDesignPlus.Net.PubSub.Abstractions;
/// <summary>/// Interface for managing the publishing and subscribing of domain events./// </summary>public interface IMessage{ /// <summary> /// Publishes a domain event asynchronously. /// </summary> /// <param name="event">The domain event to publish.</param> /// <param name="cancellationToken">A token to monitor for cancellation requests.</param> /// <returns>A task that represents the asynchronous publish operation.</returns> Task PublishAsync(IDomainEvent @event, CancellationToken cancellationToken);
/// <summary> /// Publishes a list of domain events asynchronously. /// </summary> /// <param name="event">The list of domain events to publish.</param> /// <param name="cancellationToken">A token to monitor for cancellation requests.</param> /// <returns>A task that represents the asynchronous publish operation.</returns> Task PublishAsync(IReadOnlyList<IDomainEvent> @event, CancellationToken cancellationToken);
/// <summary> /// Subscribes to a domain event asynchronously. /// </summary> /// <typeparam name="TEvent">The type of the domain event to subscribe to.</typeparam> /// <typeparam name="TEventHandler">The type of the event handler (callback).</typeparam> /// <param name="cancellationToken">A token to monitor for cancellation requests.</param> /// <returns>A task that represents the asynchronous subscribe operation.</returns> Task SubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken) where TEvent : IDomainEvent where TEventHandler : IEventHandler<TEvent>;
/// <summary> /// Unsubscribes from a domain event asynchronously. /// </summary> /// <typeparam name="TEvent">The type of the domain event to unsubscribe from.</typeparam> /// <typeparam name="TEventHandler">The type of the event handler (callback).</typeparam> /// <param name="cancellationToken">A token to monitor for cancellation requests.</param> /// <returns>A task that represents the asynchronous unsubscribe operation.</returns> Task UnsubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken) where TEvent : IDomainEvent where TEventHandler : IEventHandler<TEvent>;}
¿Cómo Funciona?
RabbitPubSubService
facilita la comunicación asíncrona entre diferentes partes de una aplicación (o entre microservicios) mediante la publicación y suscripción de eventos de dominio. Cuando un evento se publica, el servicio lo serializa y lo envía al exchange correspondiente en RabbitMQ. Los servicios suscritos a este evento, a través del servicio, reciben el mensaje y pueden procesarlo. El servicio gestiona la configuración de exchanges, queues (incluyendo dead-letter queues - DLQ) y el binding entre ellos. También maneja la deserialización de los mensajes recibidos y el despacho de eventos a sus respectivos manejadores. Utiliza ChannelProvider
para gestionar los canales y exchanges en RabbitMQ.
Métodos
RabbitPubSubService
implementa la interfaz IRabbitPubSub
, que define los siguientes métodos:
PublishAsync
Type: Task PublishAsync(IDomainEvent @event, CancellationToken cancellationToken)
Publica un evento de dominio en RabbitMQ de forma asíncrona. Toma como argumento el evento a publicar y un CancellationToken
para poder cancelar la operación. El método serializa el evento a JSON y lo publica al exchange correspondiente, previamente declarado.
PublishAsync (sobrecarga)
Type: Task PublishAsync(IReadOnlyList<IDomainEvent> @event, CancellationToken cancellationToken)
Publica una lista de eventos de dominio en RabbitMQ de forma asíncrona. Toma una lista de eventos a publicar y un CancellationToken
. Llama al método PublishAsync
por cada evento en la lista.
SubscribeAsync
Type: Task SubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken)
Se suscribe de forma asíncrona a un evento de dominio específico y a su manejador. Recibe un CancellationToken
y el tipo del evento y su manejador. Crea el exchange y cola correspondientes y se suscribe a la cola, procesando los mensajes que reciba. También establece un consumerTag
en ChannelProvider
que se usa para desuscribirse.
UnsubscribeAsync
Type: Task UnsubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken)
Cancela la suscripción a un evento de dominio específico utilizando el consumerTag
en el ChannelProvider
. Recibe el CancellationToken
y el tipo del evento y su manejador.
Implementación
using CodeDesignPlus.Net.Exceptions;
namespace CodeDesignPlus.Net.RabbitMQ.Services;
/// <summary>/// Service to publish and subscribe to domain events using RabbitMQ./// </summary>public class RabbitPubSubService : IRabbitPubSub{ private readonly ILogger<RabbitPubSubService> logger; private readonly IServiceProvider serviceProvider; private readonly IDomainEventResolver domainEventResolverService; private readonly CoreOptions coreOptions; private readonly IChannelProvider channelProvider; private readonly Dictionary<string, object> argumentsQueue;
/// <summary> /// Initializes a new instance of the <see cref="RabbitPubSubService"/> class. /// </summary> /// <param name="logger">The logger instance.</param> /// <param name="serviceProvider">The service provider.</param> /// <param name="domainEventResolverService">The domain event resolver service.</param> /// <param name="channelProvider">The channel provider.</param> /// <param name="coreOptions">The core options.</param> /// <param name="rabbitMQOptions">The RabbitMQ options.</param> public RabbitPubSubService(ILogger<RabbitPubSubService> logger, IServiceProvider serviceProvider, IDomainEventResolver domainEventResolverService, IChannelProvider channelProvider, IOptions<CoreOptions> coreOptions, IOptions<RabbitMQOptions> rabbitMQOptions) { ArgumentNullException.ThrowIfNull(logger); ArgumentNullException.ThrowIfNull(serviceProvider); ArgumentNullException.ThrowIfNull(domainEventResolverService); ArgumentNullException.ThrowIfNull(coreOptions); ArgumentNullException.ThrowIfNull(rabbitMQOptions); ArgumentNullException.ThrowIfNull(channelProvider);
this.logger = logger; this.serviceProvider = serviceProvider; this.domainEventResolverService = domainEventResolverService; this.coreOptions = coreOptions.Value; this.channelProvider = channelProvider;
this.argumentsQueue = rabbitMQOptions.Value.QueueArguments.GetArguments();
this.logger.LogInformation("RabbitPubSubService initialized."); }
/// <summary> /// Publishes a domain event asynchronously. /// </summary> /// <param name="event">The domain event to publish.</param> /// <param name="cancellationToken">A token to monitor for cancellation requests.</param> /// <returns>A task that represents the asynchronous publish operation.</returns> public Task PublishAsync(IDomainEvent @event, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(@event);
this.logger.LogInformation("Publishing event: {TEvent}.", @event.GetType().Name);
return this.PrivatePublishAsync(@event, cancellationToken); }
/// <summary> /// Publishes a list of domain events asynchronously. /// </summary> /// <param name="event">The list of domain events to publish.</param> /// <param name="cancellationToken">A token to monitor for cancellation requests.</param> /// <returns>A task that represents the asynchronous publish operation.</returns> public Task PublishAsync(IReadOnlyList<IDomainEvent> @event, CancellationToken cancellationToken) { var tasks = @event.Select(x => PublishAsync(x, cancellationToken));
return Task.WhenAll(tasks); }
/// <summary> /// Publishes a domain event to RabbitMQ. /// </summary> /// <param name="event">The domain event to publish.</param> /// <param name="cancellationToken">A token to monitor for cancellation requests.</param> /// <returns>A task that represents the asynchronous publish operation.</returns> private async Task PrivatePublishAsync(IDomainEvent @event, CancellationToken cancellationToken) { var channel = await this.channelProvider.GetChannelPublishAsync(@event.GetType(), cancellationToken);
var exchangeName = await this.channelProvider.ExchangeDeclareAsync(@event.GetType(), cancellationToken);
var message = JsonSerializer.Serialize(@event);
var body = Encoding.UTF8.GetBytes(message);
var properties = new BasicProperties { Persistent = true, AppId = coreOptions.AppName, Type = @event.GetType().Name, Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()), MessageId = @event.EventId.ToString(), CorrelationId = Guid.NewGuid().ToString(), ContentEncoding = "utf-8", ContentType = "application/json" };
await channel.BasicPublishAsync( exchange: exchangeName, routingKey: string.Empty, mandatory: true, basicProperties: properties, body: body );
this.logger.LogInformation("Event {TEvent} published ", @event.GetType().Name); }
/// <summary> /// Subscribes to a domain event asynchronously. /// </summary> /// <typeparam name="TEvent">The type of the domain event.</typeparam> /// <typeparam name="TEventHandler">The type of the event handler.</typeparam> /// <param name="cancellationToken">A token to monitor for cancellation requests.</param> /// <returns>A task that represents the asynchronous subscribe operation.</returns> public async Task SubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken) where TEvent : IDomainEvent where TEventHandler : IEventHandler<TEvent> { var channel = await this.channelProvider.GetChannelConsumerAsync<TEvent, TEventHandler>(cancellationToken);
var queueNameAttribute = typeof(TEventHandler).GetCustomAttribute<QueueNameAttribute>(); var queueName = queueNameAttribute.GetQueueName(coreOptions.AppName, coreOptions.Business, coreOptions.Version);
var exchangeName = this.domainEventResolverService.GetKeyDomainEvent<TEvent>();
await ConfigQueueAsync(channel, queueName, exchangeName); await ConfigQueueDlxAsync(channel, queueName, exchangeName);
this.logger.LogInformation("Subscribed to event: {TEvent}.", typeof(TEvent).Name);
var eventConsumer = new AsyncEventingBasicConsumer(channel);
eventConsumer.ReceivedAsync += async (_, ea) => await RecivedEvent<TEvent, TEventHandler>(channel, ea, cancellationToken);
var consumerTag = await channel.BasicConsumeAsync(queue: queueName, autoAck: false, consumer: eventConsumer, cancellationToken: cancellationToken);
this.channelProvider.SetConsumerTag<TEvent, TEventHandler>(consumerTag); }
/// <summary> /// Processes the received event. /// </summary> /// <typeparam name="TEvent">The type of the domain event.</typeparam> /// <typeparam name="TEventHandler">The type of the event handler.</typeparam> /// <param name="channel">The RabbitMQ channel.</param> /// <param name="eventArguments">The event arguments.</param> /// <param name="cancellationToken">A token to monitor for cancellation requests.</param> /// <returns>A task that represents the asynchronous operation.</returns> public async Task RecivedEvent<TEvent, TEventHandler>(IChannel channel, BasicDeliverEventArgs eventArguments, CancellationToken cancellationToken) where TEvent : IDomainEvent where TEventHandler : IEventHandler<TEvent> { try { this.logger.LogDebug("Processing event: {TEvent}.", typeof(TEvent).Name);
using var scope = this.serviceProvider.CreateScope();
var context = scope.ServiceProvider.GetRequiredService<IEventContext>();
var body = eventArguments.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var @event = JsonSerializer.Deserialize<TEvent>(message);
context.SetCurrentDomainEvent(@event);
var eventHandler = scope.ServiceProvider.GetRequiredService<TEventHandler>();
await eventHandler.HandleAsync(@event, cancellationToken).ConfigureAwait(false);
await channel.BasicAckAsync(deliveryTag: eventArguments.DeliveryTag, multiple: false, cancellationToken: cancellationToken); } catch (CodeDesignPlusException ex) { this.logger.LogWarning(ex, "Warning processing event: {TEvent} | {Message}.", typeof(TEvent).Name, ex.Message); } catch (Exception ex) { this.logger.LogError(ex, "Error processing event: {TEvent} | {Message}.", typeof(TEvent).Name, ex.Message); } finally { await channel.BasicNackAsync(deliveryTag: eventArguments.DeliveryTag, multiple: false, requeue: false, cancellationToken: cancellationToken); } }
/// <summary> /// Configures the RabbitMQ queue. /// </summary> /// <param name="channel">The RabbitMQ channel.</param> /// <param name="queue">The queue name.</param> /// <param name="exchangeName">The exchange name.</param> private async Task ConfigQueueAsync(IChannel channel, string queue, string exchangeName) { argumentsQueue["x-dead-letter-exchange"] = GetExchangeNameDlx(exchangeName); await channel.ExchangeDeclareAsync(exchange: exchangeName, type: ExchangeType.Fanout, durable: true); await channel.QueueDeclareAsync(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: argumentsQueue); await channel.QueueBindAsync(queue: queue, exchange: exchangeName, routingKey: string.Empty); }
/// <summary> /// Configures the dead-letter exchange (DLX) queue. /// </summary> /// <param name="channel">The RabbitMQ channel.</param> /// <param name="queue">The queue name.</param> /// <param name="exchangeName">The exchange name.</param> private static async Task ConfigQueueDlxAsync(IChannel channel, string queue, string exchangeName) { exchangeName = GetExchangeNameDlx(exchangeName); queue = GetQueueNameDlx(queue);
await channel.ExchangeDeclareAsync(exchange: exchangeName, type: ExchangeType.Fanout, durable: true); await channel.QueueDeclareAsync(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: null); await channel.QueueBindAsync(queue: queue, exchange: exchangeName, routingKey: string.Empty); }
/// <summary> /// Gets the dead-letter exchange (DLX) name. /// </summary> /// <param name="exchangeName">The original exchange name.</param> /// <returns>The DLX name.</returns> public static string GetExchangeNameDlx(string exchangeName) => $"{exchangeName}.dlx";
/// <summary> /// Gets the dead-letter queue (DLQ) name. /// </summary> /// <param name="queueName">The original queue name.</param> /// <returns>The DLQ name.</returns> public static string GetQueueNameDlx(string queueName) => $"{queueName}.dlx";
/// <summary> /// Unsubscribes from a domain event asynchronously. /// </summary> /// <typeparam name="TEvent">The type of the domain event.</typeparam> /// <typeparam name="TEventHandler">The type of the event handler.</typeparam> /// <param name="cancellationToken">A token to monitor for cancellation requests.</param> /// <returns>A task that represents the asynchronous unsubscribe operation.</returns> public async Task UnsubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken) where TEvent : IDomainEvent where TEventHandler : IEventHandler<TEvent> { var consumerTag = this.channelProvider.GetConsumerTag<TEvent, TEventHandler>();
if (!string.IsNullOrEmpty(consumerTag)) { var channel = await this.channelProvider.GetChannelConsumerAsync<TEvent, TEventHandler>(cancellationToken); await channel.BasicCancelAsync(consumerTag, cancellationToken: cancellationToken); logger.LogInformation("Unsubscribed from event: {TEvent}.", typeof(TEvent).Name); } }}
El servicio RabbitPubSubService
se implementa como una clase que hereda de IRabbitPubSub
y se encarga de:
- Recibir dependencias: Recibe una instancia de
ILogger<RabbitPubSubService>
para logging,IServiceProvider
para la resolución de dependencias de los handlers,IDomainEventResolver
para obtener el nombre del exchange,IChannelProvider
para obtener los canales y gestionar los exchanges,IOptions<CoreOptions>
para obtener información general de la aplicación, yIOptions<RabbitMQOptions>
para obtener la configuración de RabbitMQ. - Publicar eventos: Al recibir un evento, obtiene el canal de publicación a través del
ChannelProvider
, declara el exchange si no existe, serializa el evento a JSON, y lo publica en el exchange con propiedades que incluyenAppId
,Type
,Timestamp
,MessageId
yCorrelationId
. - Suscribirse a eventos: Al suscribirse a un evento, obtiene el canal de consumo a través del
ChannelProvider
, declara la cola (utilizandoQueueNameAttribute
en el event handler) y su dead-letter queue (DLQ) , realiza el binding y comienza a consumir los mensajes, que son procesados por el event handler correspondiente. - Manejo de dead-letter queue (DLQ): Configura un DLQ para cada queue, de forma que los mensajes que no puedan ser procesados después de múltiples intentos se envían a esta DLQ, evitando que se pierdan.
- Manejo de errores: Captura excepciones durante el procesamiento de eventos y envía un
BasicNack
a RabbitMQ para que el mensaje sea enviado a la dead-letter queue. - Deserialización: Deserializa los mensajes que recibe del bus a su tipo de evento correspondiente.
- Implementación de tags: Permite establecer y obtener el tag de un consumidor específico para poder cancelar suscripciones.
Las dependencias del servicio son:
ILogger<RabbitPubSubService>
: Para el registro de eventos.IServiceProvider
: Para resolver los handlers de eventos.IDomainEventResolver
: Para obtener el nombre del exchange a partir del tipo de evento.IChannelProvider
: Para gestionar los canales y exchanges de RabbitMQ.IOptions<CoreOptions>
: Para acceder a la configuración central de la aplicación.IOptions<RabbitMQOptions>
: Para obtener la configuración de RabbitMQ.
La clase RabbitMQOptions
contendría al menos la siguiente propiedad:
QueueArguments
(Dictionary<string, object>
): Argumentos para las queues.
Ejemplo de Uso
En este ejemplo, se muestra cómo la librería CodeDesignPlus.Net.PubSub
hace uso de la interfaz IMessage
para publicar eventos de dominio. La clase PubSubService
implementa la interfaz IPubSub
y se encarga de publicar eventos de dominio de forma asíncrona. Al recibir un evento, verifica si la configuración UseQueue
está activada y, si es así, encola el evento utilizando IEventQueue
. De lo contrario, publica el evento directamente utilizando IMessage
. La clase PubSubService
recibe las dependencias necesarias en su constructor, como IMessage
, IOptions<PubSubOptions>
, IServiceProvider
y ILogger<PubSubService>
, y las utiliza para realizar la publicación de eventos.
namespace CodeDesignPlus.Net.PubSub.Services;
/// <summary>/// Service responsible for publishing domain events./// </summary>public class PubSubService : IPubSub{ private readonly IMessage message; private readonly IOptions<PubSubOptions> options; private readonly IServiceProvider serviceProvider; private readonly ILogger<PubSubService> logger;
/// <summary> /// Initializes a new instance of the <see cref="PubSubService"/> class. /// </summary> /// <param name="message">The message service used for publishing events.</param> /// <param name="options">The options for configuring the PubSub service.</param> /// <param name="serviceProvider">The service provider for resolving dependencies.</param> /// <param name="logger">The logger for logging information.</param> /// <exception cref="ArgumentNullException">Thrown when any of the parameters are null.</exception> public PubSubService(IMessage message, IOptions<PubSubOptions> options, IServiceProvider serviceProvider, ILogger<PubSubService> logger) { ArgumentNullException.ThrowIfNull(message); ArgumentNullException.ThrowIfNull(options); ArgumentNullException.ThrowIfNull(serviceProvider); ArgumentNullException.ThrowIfNull(logger);
this.message = message; this.options = options; this.serviceProvider = serviceProvider; this.logger = logger;
this.logger.LogDebug("PubSubService initialized."); }
/// <summary> /// Publishes a single domain event asynchronously. /// </summary> /// <param name="event">The domain event to publish.</param> /// <param name="cancellationToken">A token to monitor for cancellation requests.</param> /// <returns>A task that represents the asynchronous publish operation.</returns> public Task PublishAsync(IDomainEvent @event, CancellationToken cancellationToken) { if (this.options.Value.UseQueue) { this.logger.LogDebug("UseQueue is true, enqueuing event of type {Name}.", @event.GetType().Name);
var eventQueueService = this.serviceProvider.GetRequiredService<IEventQueue>();
return eventQueueService.EnqueueAsync(@event, cancellationToken); }
this.logger.LogDebug("UseQueue is false, publishing event of type {Name}.", @event.GetType().Name);
return this.message.PublishAsync(@event, cancellationToken); }
/// <summary> /// Publishes a list of domain events asynchronously. /// </summary> /// <param name="event">The list of domain events to publish.</param> /// <param name="cancellationToken">A token to monitor for cancellation requests.</param> /// <returns>A task that represents the asynchronous publish operation.</returns> public Task PublishAsync(IReadOnlyList<IDomainEvent> @event, CancellationToken cancellationToken) { var tasks = @event.Select(@event => this.PublishAsync(@event, cancellationToken));
return Task.WhenAll(tasks); }}
Conclusiones
RabbitPubSubService
ofrece una solución completa y robusta para la implementación de un patrón publish/subscribe utilizando RabbitMQ como broker de mensajes en una aplicación .NET. La gestión de queues, exchanges y dlq, combinada con una abstracción para la publicación y suscripción a eventos, hace que sea sencillo desacoplar componentes y construir aplicaciones escalables y robustas. Es importante usar correctamente el scope al momento de usar el manejador de los eventos.
Referencias Adicionales
- Documentación oficial de RabbitMQ
- Patrón Publish/Subscribe
- Dead Letter Exchanges in RabbitMQ
- Uso de Scopes en DI de .NET
Espero que esta documentación te sea de utilidad. Si tienes alguna pregunta o necesitas aclarar algo, no dudes en consultarme.