Skip to content

Redis PubSub

Este documento describe los servicios IRedisPubSub y RedisPubSubService, los cuales proporcionan una capa de abstracción para publicar y suscribirse a eventos de dominio utilizando Redis Pub/Sub. RedisPubSubService implementa la interfaz IRedisPubSub y ofrece métodos para la publicación y suscripción asíncrona de eventos, utilizando el servicio de Redis para el manejo de los canales.

namespace CodeDesignPlus.Net.Redis.PubSub.Abstractions;
/// <summary>
/// Defines the interface for Redis Pub/Sub services, extending the <see cref="IMessage"/> interface.
/// </summary>
public interface IRedisPubSub : IMessage
{
}

¿Cómo Funciona?


RedisPubSubService simplifica la comunicación asíncrona entre diferentes partes de una aplicación (o entre microservicios) a través de la publicación y suscripción de eventos de dominio mediante el mecanismo Pub/Sub de Redis. Cuando un evento se publica, el servicio lo serializa y lo envía al canal de Redis correspondiente. Los servicios que se han suscrito a ese canal, a través del servicio, reciben el mensaje y pueden procesarlo. La implementación se basa en un IRedis para interactuar con Redis y el IDomainEventResolver para resolver los canales de los eventos.

Métodos


RedisPubSubService implementa la interfaz IRedisPubSub, que define los siguientes métodos:

PublishAsync

Type: Task PublishAsync(IDomainEvent @event, CancellationToken cancellationToken)

Publica un evento de dominio en el canal de Redis correspondiente de forma asíncrona. Recibe el evento a publicar y un CancellationToken. El método serializa el evento a JSON y lo envía al canal de Redis.

PublishAsync (sobrecarga)

Type: Task PublishAsync(IReadOnlyList<IDomainEvent> @event, CancellationToken cancellationToken)

Publica una lista de eventos de dominio en Redis de forma asíncrona. Recibe la lista de eventos y un CancellationToken. Llama al método PublishAsync por cada evento en la lista.

SubscribeAsync

Type: Task SubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken)

Se suscribe a un canal de Redis para un tipo de evento específico. Recibe un CancellationToken y los tipos del evento y su manejador. El método crea una suscripción a un canal de Redis y pasa los mensajes al manejador de eventos correspondiente.

UnsubscribeAsync

Type: Task UnsubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken)

Cancela la suscripción de un evento y su manejador al canal de Redis. Recibe el CancellationToken, el tipo del evento y su manejador.

Implementación


using CodeDesignPlus.Net.Exceptions;
namespace CodeDesignPlus.Net.Redis.PubSub.Services;
/// <summary>
/// Provides Redis Pub/Sub services for publishing and subscribing to domain events.
/// </summary>
public class RedisPubSubService : IRedisPubSub
{
private readonly ILogger<RedisPubSubService> logger;
private readonly Redis.Abstractions.IRedis redisService;
private readonly IDomainEventResolver domainEventResolverService;
private readonly IServiceProvider serviceProvider;
/// <summary>
/// Initializes a new instance of the <see cref="RedisPubSubService"/> class.
/// </summary>
/// <param name="redisServiceFactory">The factory to create Redis services.</param>
/// <param name="serviceProvider">The service provider.</param>
/// <param name="logger">The logger instance.</param>
/// <param name="domainEventResolverService">The domain event resolver service.</param>
/// <exception cref="ArgumentNullException">Thrown when any of the parameters are null.</exception>
public RedisPubSubService(
IRedisFactory redisServiceFactory,
IServiceProvider serviceProvider,
ILogger<RedisPubSubService> logger,
IDomainEventResolver domainEventResolverService)
{
ArgumentNullException.ThrowIfNull(redisServiceFactory);
ArgumentNullException.ThrowIfNull(serviceProvider);
ArgumentNullException.ThrowIfNull(logger);
ArgumentNullException.ThrowIfNull(domainEventResolverService);
this.redisService = redisServiceFactory.Create(FactoryConst.RedisPubSub);
this.domainEventResolverService = domainEventResolverService;
this.serviceProvider = serviceProvider;
this.logger = logger;
this.logger.LogInformation("RedisPubSubService initialized.");
}
/// <summary>
/// Publishes a domain event asynchronously.
/// </summary>
/// <param name="event">The domain event to publish.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
/// <exception cref="ArgumentNullException">Thrown when the event is null.</exception>
public Task PublishAsync(IDomainEvent @event, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(@event);
this.logger.LogInformation("Publishing event: {TEvent}.", @event.GetType().Name);
return this.PrivatePublishAsync<long>(@event);
}
/// <summary>
/// Publishes a list of domain events asynchronously.
/// </summary>
/// <param name="event">The list of domain events to publish.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
public Task PublishAsync(IReadOnlyList<IDomainEvent> @event, CancellationToken cancellationToken)
{
var tasks = @event.Select(x => this.PublishAsync(x, cancellationToken));
return Task.WhenAll(tasks);
}
/// <summary>
/// Publishes a domain event to the Redis channel.
/// </summary>
/// <typeparam name="TResult">The result type.</typeparam>
/// <param name="event">The domain event to publish.</param>
/// <returns>A task that represents the asynchronous operation, containing the result.</returns>
private async Task<TResult> PrivatePublishAsync<TResult>(object @event)
{
var channel = this.domainEventResolverService.GetKeyDomainEvent(@event.GetType());
var message = JsonSerializer.Serialize(@event);
var notified = await this.redisService.Subscriber.PublishAsync(RedisChannel.Literal(channel), message);
this.logger.LogInformation("Event {TEvent} published with {Notified} notifications.", @event.GetType().Name, notified);
return (TResult)Convert.ChangeType(notified, typeof(TResult));
}
/// <summary>
/// Subscribes to a domain event asynchronously.
/// </summary>
/// <typeparam name="TEvent">The type of the domain event.</typeparam>
/// <typeparam name="TEventHandler">The type of the event handler.</typeparam>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
public Task SubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken)
where TEvent : IDomainEvent
where TEventHandler : IEventHandler<TEvent>
{
var channel = this.domainEventResolverService.GetKeyDomainEvent<TEvent>();
this.logger.LogInformation("Subscribed to event: {TEvent}.", typeof(TEvent).Name);
return this.redisService.Subscriber.SubscribeAsync(RedisChannel.Literal(channel), (_, v) => this.ListenerEvent<TEvent, TEventHandler>(v, cancellationToken));
}
/// <summary>
/// Handles the received domain event.
/// </summary>
/// <typeparam name="TEvent">The type of the domain event.</typeparam>
/// <typeparam name="TEventHandler">The type of the event handler.</typeparam>
/// <param name="value">The received event value.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public void ListenerEvent<TEvent, TEventHandler>(RedisValue value, CancellationToken cancellationToken)
where TEvent : IDomainEvent
where TEventHandler : IEventHandler<TEvent>
{
try
{
using var scope = serviceProvider.CreateScope();
var context = scope.ServiceProvider.GetRequiredService<IEventContext>();
var @event = JsonSerializer.Deserialize<TEvent>(value);
context.SetCurrentDomainEvent(@event);
var eventHandler = scope.ServiceProvider.GetRequiredService<TEventHandler>();
eventHandler.HandleAsync(@event, cancellationToken).ConfigureAwait(false);
}
catch (CodeDesignPlusException ex)
{
logger.LogWarning(ex, "Warning processing event: {TEvent} | {Message}.", typeof(TEvent).Name, ex.Message);
}
catch (Exception ex)
{
this.logger.LogError(ex, "Error processing event: {TEvent} | {Message}.", typeof(TEvent).Name, ex.Message);
}
}
/// <summary>
/// Unsubscribes from a domain event asynchronously.
/// </summary>
/// <typeparam name="TEvent">The type of the domain event.</typeparam>
/// <typeparam name="TEventHandler">The type of the event handler.</typeparam>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
public Task UnsubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken)
where TEvent : IDomainEvent
where TEventHandler : IEventHandler<TEvent>
{
var channel = this.domainEventResolverService.GetKeyDomainEvent<TEvent>();
this.redisService.Subscriber.Unsubscribe(RedisChannel.Literal(channel));
this.logger.LogInformation("Unsubscribed from event: {TEvent}.", typeof(TEvent).Name);
return Task.CompletedTask;
}
}

El servicio RedisPubSubService se implementa como una clase que hereda de IRedisPubSub y se encarga de:

  1. Recibir dependencias: Recibe una instancia de ILogger<RedisPubSubService> para el registro de eventos, un IRedisFactory para la creación de servicios Redis, un IServiceProvider para la resolución de los handlers, y un IDomainEventResolver para la resolución de canales.
  2. Publicar eventos: Al recibir un evento, obtiene el nombre del canal asociado al tipo del evento, lo serializa a JSON, y utiliza el servicio IRedis para publicar el mensaje en el canal de Redis.
  3. Suscribirse a eventos: Al suscribirse a un evento, obtiene el nombre del canal a partir del tipo del evento y crea una suscripción al mismo. Cuando se recibe un mensaje, lo deserializa y lo pasa al manejador de eventos.
  4. Manejo de eventos: Utiliza System.Text.Json para serializar y deserializar los eventos.

Las dependencias del servicio son:

  • ILogger<RedisPubSubService>: Para el registro de eventos.
  • IRedisFactory: Para crear la conexión a Redis.
  • IServiceProvider: Para resolver los handlers de eventos.
  • IDomainEventResolver: Para obtener el canal de Redis asociado al tipo de evento.

Ejemplo de Uso


En este ejemplo, se muestra cómo la librería CodeDesignPlus.Net.PubSub hace uso de la interfaz IMessage para publicar eventos de dominio de forma asíncrona. La clase PubSubService implementa la interfaz IPubSub y utiliza la inyección de dependencias para obtener una instancia de IMessage y otros servicios necesarios. El método PublishAsync se encarga de publicar eventos de dominio, verificando si se debe usar una cola para encolar los eventos o si se deben publicar directamente. Si se usa una cola, se obtiene una instancia de IEventQueue y se encola el evento; de lo contrario, se publica directamente utilizando IMessage.

namespace CodeDesignPlus.Net.PubSub.Services;
/// <summary>
/// Service responsible for publishing domain events.
/// </summary>
public class PubSubService : IPubSub
{
private readonly IMessage message;
private readonly IOptions<PubSubOptions> options;
private readonly IServiceProvider serviceProvider;
private readonly ILogger<PubSubService> logger;
/// <summary>
/// Initializes a new instance of the <see cref="PubSubService"/> class.
/// </summary>
/// <param name="message">The message service used for publishing events.</param>
/// <param name="options">The options for configuring the PubSub service.</param>
/// <param name="serviceProvider">The service provider for resolving dependencies.</param>
/// <param name="logger">The logger for logging information.</param>
/// <exception cref="ArgumentNullException">Thrown when any of the parameters are null.</exception>
public PubSubService(IMessage message, IOptions<PubSubOptions> options, IServiceProvider serviceProvider, ILogger<PubSubService> logger)
{
ArgumentNullException.ThrowIfNull(message);
ArgumentNullException.ThrowIfNull(options);
ArgumentNullException.ThrowIfNull(serviceProvider);
ArgumentNullException.ThrowIfNull(logger);
this.message = message;
this.options = options;
this.serviceProvider = serviceProvider;
this.logger = logger;
this.logger.LogDebug("PubSubService initialized.");
}
/// <summary>
/// Publishes a single domain event asynchronously.
/// </summary>
/// <param name="event">The domain event to publish.</param>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous publish operation.</returns>
public Task PublishAsync(IDomainEvent @event, CancellationToken cancellationToken)
{
if (this.options.Value.UseQueue)
{
this.logger.LogDebug("UseQueue is true, enqueuing event of type {Name}.", @event.GetType().Name);
var eventQueueService = this.serviceProvider.GetRequiredService<IEventQueue>();
return eventQueueService.EnqueueAsync(@event, cancellationToken);
}
this.logger.LogDebug("UseQueue is false, publishing event of type {Name}.", @event.GetType().Name);
return this.message.PublishAsync(@event, cancellationToken);
}
/// <summary>
/// Publishes a list of domain events asynchronously.
/// </summary>
/// <param name="event">The list of domain events to publish.</param>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous publish operation.</returns>
public Task PublishAsync(IReadOnlyList<IDomainEvent> @event, CancellationToken cancellationToken)
{
var tasks = @event.Select(@event => this.PublishAsync(@event, cancellationToken));
return Task.WhenAll(tasks);
}
}

Conclusiones


RedisPubSubService ofrece una solución para la comunicación asíncrona utilizando Redis Pub/Sub en aplicaciones .NET. Proporciona una abstracción para la publicación y suscripción de eventos, lo que simplifica el desarrollo de sistemas basados en eventos y reduce la dependencia directa de la implementación de Redis. Es necesario que el IRedis sea registrado en la aplicación, junto con el IRedisFactory, y se configure la sección “Redis” en el archivo de configuración.

Referencias Adicionales