Skip to content

Rabbit PubSub

Este documento describe los servicios IRabbitPubSub y RabbitPubSubService, que proporcionan una capa de abstracción para publicar y suscribirse a eventos de dominio utilizando RabbitMQ como broker de mensajería. RabbitPubSubService implementa la interfaz IRabbitPubSub y ofrece métodos para la publicación y suscripción asíncrona de eventos, incluyendo el manejo de la configuración de exchanges, queues y dead-letter exchanges (DLX).

namespace CodeDesignPlus.Net.RabbitMQ.Abstractions;
/// <summary>
/// Interface for RabbitMQ publish-subscribe service.
/// </summary>
public interface IRabbitPubSub : IMessage
{
}

¿Cómo Funciona?


RabbitPubSubService facilita la comunicación asíncrona entre diferentes partes de una aplicación (o entre microservicios) mediante la publicación y suscripción de eventos de dominio. Cuando un evento se publica, el servicio lo serializa y lo envía al exchange correspondiente en RabbitMQ. Los servicios suscritos a este evento, a través del servicio, reciben el mensaje y pueden procesarlo. El servicio gestiona la configuración de exchanges, queues (incluyendo dead-letter queues - DLQ) y el binding entre ellos. También maneja la deserialización de los mensajes recibidos y el despacho de eventos a sus respectivos manejadores. Utiliza ChannelProvider para gestionar los canales y exchanges en RabbitMQ.

Métodos


RabbitPubSubService implementa la interfaz IRabbitPubSub, que define los siguientes métodos:

PublishAsync

Type: Task PublishAsync(IDomainEvent @event, CancellationToken cancellationToken)

Publica un evento de dominio en RabbitMQ de forma asíncrona. Toma como argumento el evento a publicar y un CancellationToken para poder cancelar la operación. El método serializa el evento a JSON y lo publica al exchange correspondiente, previamente declarado.

PublishAsync (sobrecarga)

Type: Task PublishAsync(IReadOnlyList<IDomainEvent> @event, CancellationToken cancellationToken)

Publica una lista de eventos de dominio en RabbitMQ de forma asíncrona. Toma una lista de eventos a publicar y un CancellationToken. Llama al método PublishAsync por cada evento en la lista.

SubscribeAsync

Type: Task SubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken)

Se suscribe de forma asíncrona a un evento de dominio específico y a su manejador. Recibe un CancellationToken y el tipo del evento y su manejador. Crea el exchange y cola correspondientes y se suscribe a la cola, procesando los mensajes que reciba. También establece un consumerTag en ChannelProvider que se usa para desuscribirse.

UnsubscribeAsync

Type: Task UnsubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken)

Cancela la suscripción a un evento de dominio específico utilizando el consumerTag en el ChannelProvider. Recibe el CancellationToken y el tipo del evento y su manejador.

Implementación


using CodeDesignPlus.Net.Exceptions;
namespace CodeDesignPlus.Net.RabbitMQ.Services;
/// <summary>
/// Service to publish and subscribe to domain events using RabbitMQ.
/// </summary>
public class RabbitPubSubService : IRabbitPubSub
{
private readonly ILogger<RabbitPubSubService> logger;
private readonly IServiceProvider serviceProvider;
private readonly IDomainEventResolver domainEventResolverService;
private readonly CoreOptions coreOptions;
private readonly IChannelProvider channelProvider;
private readonly Dictionary<string, object> argumentsQueue;
/// <summary>
/// Initializes a new instance of the <see cref="RabbitPubSubService"/> class.
/// </summary>
/// <param name="logger">The logger instance.</param>
/// <param name="serviceProvider">The service provider.</param>
/// <param name="domainEventResolverService">The domain event resolver service.</param>
/// <param name="channelProvider">The channel provider.</param>
/// <param name="coreOptions">The core options.</param>
/// <param name="rabbitMQOptions">The RabbitMQ options.</param>
public RabbitPubSubService(ILogger<RabbitPubSubService> logger, IServiceProvider serviceProvider, IDomainEventResolver domainEventResolverService, IChannelProvider channelProvider, IOptions<CoreOptions> coreOptions, IOptions<RabbitMQOptions> rabbitMQOptions)
{
ArgumentNullException.ThrowIfNull(logger);
ArgumentNullException.ThrowIfNull(serviceProvider);
ArgumentNullException.ThrowIfNull(domainEventResolverService);
ArgumentNullException.ThrowIfNull(coreOptions);
ArgumentNullException.ThrowIfNull(rabbitMQOptions);
ArgumentNullException.ThrowIfNull(channelProvider);
this.logger = logger;
this.serviceProvider = serviceProvider;
this.domainEventResolverService = domainEventResolverService;
this.coreOptions = coreOptions.Value;
this.channelProvider = channelProvider;
this.argumentsQueue = rabbitMQOptions.Value.QueueArguments.GetArguments();
this.logger.LogInformation("RabbitPubSubService initialized.");
}
/// <summary>
/// Publishes a domain event asynchronously.
/// </summary>
/// <param name="event">The domain event to publish.</param>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous publish operation.</returns>
public Task PublishAsync(IDomainEvent @event, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(@event);
this.logger.LogInformation("Publishing event: {TEvent}.", @event.GetType().Name);
return this.PrivatePublishAsync(@event, cancellationToken);
}
/// <summary>
/// Publishes a list of domain events asynchronously.
/// </summary>
/// <param name="event">The list of domain events to publish.</param>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous publish operation.</returns>
public Task PublishAsync(IReadOnlyList<IDomainEvent> @event, CancellationToken cancellationToken)
{
var tasks = @event.Select(x => PublishAsync(x, cancellationToken));
return Task.WhenAll(tasks);
}
/// <summary>
/// Publishes a domain event to RabbitMQ.
/// </summary>
/// <param name="event">The domain event to publish.</param>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous publish operation.</returns>
private async Task PrivatePublishAsync(IDomainEvent @event, CancellationToken cancellationToken)
{
var channel = await this.channelProvider.GetChannelPublishAsync(@event.GetType(), cancellationToken);
var exchangeName = await this.channelProvider.ExchangeDeclareAsync(@event.GetType(), cancellationToken);
var message = JsonSerializer.Serialize(@event);
var body = Encoding.UTF8.GetBytes(message);
var properties = new BasicProperties
{
Persistent = true,
AppId = coreOptions.AppName,
Type = @event.GetType().Name,
Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()),
MessageId = @event.EventId.ToString(),
CorrelationId = Guid.NewGuid().ToString(),
ContentEncoding = "utf-8",
ContentType = "application/json"
};
await channel.BasicPublishAsync(
exchange: exchangeName,
routingKey: string.Empty,
mandatory: true,
basicProperties: properties,
body: body
);
this.logger.LogInformation("Event {TEvent} published ", @event.GetType().Name);
}
/// <summary>
/// Subscribes to a domain event asynchronously.
/// </summary>
/// <typeparam name="TEvent">The type of the domain event.</typeparam>
/// <typeparam name="TEventHandler">The type of the event handler.</typeparam>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous subscribe operation.</returns>
public async Task SubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken)
where TEvent : IDomainEvent
where TEventHandler : IEventHandler<TEvent>
{
var channel = await this.channelProvider.GetChannelConsumerAsync<TEvent, TEventHandler>(cancellationToken);
var queueNameAttribute = typeof(TEventHandler).GetCustomAttribute<QueueNameAttribute>();
var queueName = queueNameAttribute.GetQueueName(coreOptions.AppName, coreOptions.Business, coreOptions.Version);
var exchangeName = this.domainEventResolverService.GetKeyDomainEvent<TEvent>();
await ConfigQueueAsync(channel, queueName, exchangeName);
await ConfigQueueDlxAsync(channel, queueName, exchangeName);
this.logger.LogInformation("Subscribed to event: {TEvent}.", typeof(TEvent).Name);
var eventConsumer = new AsyncEventingBasicConsumer(channel);
eventConsumer.ReceivedAsync += async (_, ea) => await RecivedEvent<TEvent, TEventHandler>(channel, ea, cancellationToken);
var consumerTag = await channel.BasicConsumeAsync(queue: queueName, autoAck: false, consumer: eventConsumer, cancellationToken: cancellationToken);
this.channelProvider.SetConsumerTag<TEvent, TEventHandler>(consumerTag);
}
/// <summary>
/// Processes the received event.
/// </summary>
/// <typeparam name="TEvent">The type of the domain event.</typeparam>
/// <typeparam name="TEventHandler">The type of the event handler.</typeparam>
/// <param name="channel">The RabbitMQ channel.</param>
/// <param name="eventArguments">The event arguments.</param>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
public async Task RecivedEvent<TEvent, TEventHandler>(IChannel channel, BasicDeliverEventArgs eventArguments, CancellationToken cancellationToken)
where TEvent : IDomainEvent
where TEventHandler : IEventHandler<TEvent>
{
try
{
this.logger.LogDebug("Processing event: {TEvent}.", typeof(TEvent).Name);
using var scope = this.serviceProvider.CreateScope();
var context = scope.ServiceProvider.GetRequiredService<IEventContext>();
var body = eventArguments.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var @event = JsonSerializer.Deserialize<TEvent>(message);
context.SetCurrentDomainEvent(@event);
var eventHandler = scope.ServiceProvider.GetRequiredService<TEventHandler>();
await eventHandler.HandleAsync(@event, cancellationToken).ConfigureAwait(false);
await channel.BasicAckAsync(deliveryTag: eventArguments.DeliveryTag, multiple: false, cancellationToken: cancellationToken);
}
catch (CodeDesignPlusException ex)
{
this.logger.LogWarning(ex, "Warning processing event: {TEvent} | {Message}.", typeof(TEvent).Name, ex.Message);
}
catch (Exception ex)
{
this.logger.LogError(ex, "Error processing event: {TEvent} | {Message}.", typeof(TEvent).Name, ex.Message);
}
finally
{
await channel.BasicNackAsync(deliveryTag: eventArguments.DeliveryTag, multiple: false, requeue: false, cancellationToken: cancellationToken);
}
}
/// <summary>
/// Configures the RabbitMQ queue.
/// </summary>
/// <param name="channel">The RabbitMQ channel.</param>
/// <param name="queue">The queue name.</param>
/// <param name="exchangeName">The exchange name.</param>
private async Task ConfigQueueAsync(IChannel channel, string queue, string exchangeName)
{
argumentsQueue["x-dead-letter-exchange"] = GetExchangeNameDlx(exchangeName);
await channel.ExchangeDeclareAsync(exchange: exchangeName, type: ExchangeType.Fanout, durable: true);
await channel.QueueDeclareAsync(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: argumentsQueue);
await channel.QueueBindAsync(queue: queue, exchange: exchangeName, routingKey: string.Empty);
}
/// <summary>
/// Configures the dead-letter exchange (DLX) queue.
/// </summary>
/// <param name="channel">The RabbitMQ channel.</param>
/// <param name="queue">The queue name.</param>
/// <param name="exchangeName">The exchange name.</param>
private static async Task ConfigQueueDlxAsync(IChannel channel, string queue, string exchangeName)
{
exchangeName = GetExchangeNameDlx(exchangeName);
queue = GetQueueNameDlx(queue);
await channel.ExchangeDeclareAsync(exchange: exchangeName, type: ExchangeType.Fanout, durable: true);
await channel.QueueDeclareAsync(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: null);
await channel.QueueBindAsync(queue: queue, exchange: exchangeName, routingKey: string.Empty);
}
/// <summary>
/// Gets the dead-letter exchange (DLX) name.
/// </summary>
/// <param name="exchangeName">The original exchange name.</param>
/// <returns>The DLX name.</returns>
public static string GetExchangeNameDlx(string exchangeName) => $"{exchangeName}.dlx";
/// <summary>
/// Gets the dead-letter queue (DLQ) name.
/// </summary>
/// <param name="queueName">The original queue name.</param>
/// <returns>The DLQ name.</returns>
public static string GetQueueNameDlx(string queueName) => $"{queueName}.dlx";
/// <summary>
/// Unsubscribes from a domain event asynchronously.
/// </summary>
/// <typeparam name="TEvent">The type of the domain event.</typeparam>
/// <typeparam name="TEventHandler">The type of the event handler.</typeparam>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous unsubscribe operation.</returns>
public async Task UnsubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken)
where TEvent : IDomainEvent
where TEventHandler : IEventHandler<TEvent>
{
var consumerTag = this.channelProvider.GetConsumerTag<TEvent, TEventHandler>();
if (!string.IsNullOrEmpty(consumerTag))
{
var channel = await this.channelProvider.GetChannelConsumerAsync<TEvent, TEventHandler>(cancellationToken);
await channel.BasicCancelAsync(consumerTag, cancellationToken: cancellationToken);
logger.LogInformation("Unsubscribed from event: {TEvent}.", typeof(TEvent).Name);
}
}
}

El servicio RabbitPubSubService se implementa como una clase que hereda de IRabbitPubSub y se encarga de:

  1. Recibir dependencias: Recibe una instancia de ILogger<RabbitPubSubService> para logging, IServiceProvider para la resolución de dependencias de los handlers, IDomainEventResolver para obtener el nombre del exchange, IChannelProvider para obtener los canales y gestionar los exchanges, IOptions<CoreOptions> para obtener información general de la aplicación, y IOptions<RabbitMQOptions> para obtener la configuración de RabbitMQ.
  2. Publicar eventos: Al recibir un evento, obtiene el canal de publicación a través del ChannelProvider, declara el exchange si no existe, serializa el evento a JSON, y lo publica en el exchange con propiedades que incluyen AppId, Type, Timestamp, MessageId y CorrelationId.
  3. Suscribirse a eventos: Al suscribirse a un evento, obtiene el canal de consumo a través del ChannelProvider, declara la cola (utilizando QueueNameAttribute en el event handler) y su dead-letter queue (DLQ) , realiza el binding y comienza a consumir los mensajes, que son procesados por el event handler correspondiente.
  4. Manejo de dead-letter queue (DLQ): Configura un DLQ para cada queue, de forma que los mensajes que no puedan ser procesados después de múltiples intentos se envían a esta DLQ, evitando que se pierdan.
  5. Manejo de errores: Captura excepciones durante el procesamiento de eventos y envía un BasicNack a RabbitMQ para que el mensaje sea enviado a la dead-letter queue.
  6. Deserialización: Deserializa los mensajes que recibe del bus a su tipo de evento correspondiente.
  7. Implementación de tags: Permite establecer y obtener el tag de un consumidor específico para poder cancelar suscripciones.

Las dependencias del servicio son:

  • ILogger<RabbitPubSubService>: Para el registro de eventos.
  • IServiceProvider: Para resolver los handlers de eventos.
  • IDomainEventResolver: Para obtener el nombre del exchange a partir del tipo de evento.
  • IChannelProvider: Para gestionar los canales y exchanges de RabbitMQ.
  • IOptions<CoreOptions>: Para acceder a la configuración central de la aplicación.
  • IOptions<RabbitMQOptions>: Para obtener la configuración de RabbitMQ.

La clase RabbitMQOptions contendría al menos la siguiente propiedad:

  • QueueArguments (Dictionary<string, object>): Argumentos para las queues.

Ejemplo de Uso


En este ejemplo, se muestra cómo la librería CodeDesignPlus.Net.PubSub hace uso de la interfaz IMessage para publicar eventos de dominio. La clase PubSubService implementa la interfaz IPubSub y se encarga de publicar eventos de dominio de forma asíncrona. Al recibir un evento, verifica si la configuración UseQueue está activada y, si es así, encola el evento utilizando IEventQueue. De lo contrario, publica el evento directamente utilizando IMessage. La clase PubSubService recibe las dependencias necesarias en su constructor, como IMessage, IOptions<PubSubOptions>, IServiceProvider y ILogger<PubSubService>, y las utiliza para realizar la publicación de eventos.

namespace CodeDesignPlus.Net.PubSub.Services;
/// <summary>
/// Service responsible for publishing domain events.
/// </summary>
public class PubSubService : IPubSub
{
private readonly IMessage message;
private readonly IOptions<PubSubOptions> options;
private readonly IServiceProvider serviceProvider;
private readonly ILogger<PubSubService> logger;
/// <summary>
/// Initializes a new instance of the <see cref="PubSubService"/> class.
/// </summary>
/// <param name="message">The message service used for publishing events.</param>
/// <param name="options">The options for configuring the PubSub service.</param>
/// <param name="serviceProvider">The service provider for resolving dependencies.</param>
/// <param name="logger">The logger for logging information.</param>
/// <exception cref="ArgumentNullException">Thrown when any of the parameters are null.</exception>
public PubSubService(IMessage message, IOptions<PubSubOptions> options, IServiceProvider serviceProvider, ILogger<PubSubService> logger)
{
ArgumentNullException.ThrowIfNull(message);
ArgumentNullException.ThrowIfNull(options);
ArgumentNullException.ThrowIfNull(serviceProvider);
ArgumentNullException.ThrowIfNull(logger);
this.message = message;
this.options = options;
this.serviceProvider = serviceProvider;
this.logger = logger;
this.logger.LogDebug("PubSubService initialized.");
}
/// <summary>
/// Publishes a single domain event asynchronously.
/// </summary>
/// <param name="event">The domain event to publish.</param>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous publish operation.</returns>
public Task PublishAsync(IDomainEvent @event, CancellationToken cancellationToken)
{
if (this.options.Value.UseQueue)
{
this.logger.LogDebug("UseQueue is true, enqueuing event of type {Name}.", @event.GetType().Name);
var eventQueueService = this.serviceProvider.GetRequiredService<IEventQueue>();
return eventQueueService.EnqueueAsync(@event, cancellationToken);
}
this.logger.LogDebug("UseQueue is false, publishing event of type {Name}.", @event.GetType().Name);
return this.message.PublishAsync(@event, cancellationToken);
}
/// <summary>
/// Publishes a list of domain events asynchronously.
/// </summary>
/// <param name="event">The list of domain events to publish.</param>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous publish operation.</returns>
public Task PublishAsync(IReadOnlyList<IDomainEvent> @event, CancellationToken cancellationToken)
{
var tasks = @event.Select(@event => this.PublishAsync(@event, cancellationToken));
return Task.WhenAll(tasks);
}
}

Conclusiones


RabbitPubSubService ofrece una solución completa y robusta para la implementación de un patrón publish/subscribe utilizando RabbitMQ como broker de mensajes en una aplicación .NET. La gestión de queues, exchanges y dlq, combinada con una abstracción para la publicación y suscripción a eventos, hace que sea sencillo desacoplar componentes y construir aplicaciones escalables y robustas. Es importante usar correctamente el scope al momento de usar el manejador de los eventos.

Referencias Adicionales



Espero que esta documentación te sea de utilidad. Si tienes alguna pregunta o necesitas aclarar algo, no dudes en consultarme.