Skip to content

IPubSub

CodeDesignPlus.Net.PubSub provee un servicio para publicar eventos de dominio de forma asíncrona. Este servicio permite a los desarrolladores publicar eventos de dominio de forma sencilla y eficiente. A continuación se muestra la interfaz y la implementación del servicio IPubSub.

namespace CodeDesignPlus.Net.PubSub.Abstractions;
/// <summary>
/// Interface for publishing domain events.
/// </summary>
public interface IPubSub
{
/// <summary>
/// Publishes a single domain event asynchronously.
/// </summary>
/// <param name="event">The domain event to publish.</param>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous publish operation.</returns>
Task PublishAsync(IDomainEvent @event, CancellationToken cancellationToken);
/// <summary>
/// Publishes a list of domain events asynchronously.
/// </summary>
/// <param name="event">The list of domain events to publish.</param>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous publish operation.</returns>
Task PublishAsync(IReadOnlyList<IDomainEvent> @event, CancellationToken cancellationToken);
}

Métodos


Los métodos que se pueden utilizar con la interfaz IPubSub son los siguientes:

PublishAsync

Type: Task PublishAsync(IDomainEvent @event, CancellationToken cancellationToken)

Publica un evento de dominio de forma asíncrona.

PublishAsync

Type: Task PublishAsync(IReadOnlyList<IDomainEvent> @event, CancellationToken cancellationToken)

Publica una lista de eventos de dominio de forma asíncrona.

Implementación


La implementación del servicio IPubSub se muestra a continuación. Este servicio es responsable de publicar eventos de dominio de forma asíncrona utilizando un servicio de mensajería como RabbitMQ, Kafka o RedisPubSub que son los responsables de la implementación de la interfaz IMessage.

namespace CodeDesignPlus.Net.PubSub.Services;
/// <summary>
/// Service responsible for publishing domain events.
/// </summary>
public class PubSubService : IPubSub
{
private readonly IMessage message;
private readonly IOptions<PubSubOptions> options;
private readonly IServiceProvider serviceProvider;
private readonly ILogger<PubSubService> logger;
/// <summary>
/// Initializes a new instance of the <see cref="PubSubService"/> class.
/// </summary>
/// <param name="message">The message service used for publishing events.</param>
/// <param name="options">The options for configuring the PubSub service.</param>
/// <param name="serviceProvider">The service provider for resolving dependencies.</param>
/// <param name="logger">The logger for logging information.</param>
/// <exception cref="ArgumentNullException">Thrown when any of the parameters are null.</exception>
public PubSubService(IMessage message, IOptions<PubSubOptions> options, IServiceProvider serviceProvider, ILogger<PubSubService> logger)
{
ArgumentNullException.ThrowIfNull(message);
ArgumentNullException.ThrowIfNull(options);
ArgumentNullException.ThrowIfNull(serviceProvider);
ArgumentNullException.ThrowIfNull(logger);
this.message = message;
this.options = options;
this.serviceProvider = serviceProvider;
this.logger = logger;
this.logger.LogDebug("PubSubService initialized.");
}
/// <summary>
/// Publishes a single domain event asynchronously.
/// </summary>
/// <param name="event">The domain event to publish.</param>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous publish operation.</returns>
public Task PublishAsync(IDomainEvent @event, CancellationToken cancellationToken)
{
if (this.options.Value.UseQueue)
{
this.logger.LogDebug("UseQueue is true, enqueuing event of type {Name}.", @event.GetType().Name);
var eventQueueService = this.serviceProvider.GetRequiredService<IEventQueue>();
return eventQueueService.EnqueueAsync(@event, cancellationToken);
}
this.logger.LogDebug("UseQueue is false, publishing event of type {Name}.", @event.GetType().Name);
return this.message.PublishAsync(@event, cancellationToken);
}
/// <summary>
/// Publishes a list of domain events asynchronously.
/// </summary>
/// <param name="event">The list of domain events to publish.</param>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous publish operation.</returns>
public Task PublishAsync(IReadOnlyList<IDomainEvent> @event, CancellationToken cancellationToken)
{
var tasks = @event.Select(@event => this.PublishAsync(@event, cancellationToken));
return Task.WhenAll(tasks);
}
}
  1. El Cliente usa el servicio IPubSub para emitir la publicación de un evento.

  2. PubSubService, que implementa IPubSub, encola el evento en EventQueueService si UseQueue es true.

  3. EventQueueService almacena el evento en una cola en memoria.

  4. EventQueueBackgroundService procesa los eventos en segundo plano y los publica usando el servicio IMessage, que debe ser implementado por el cliente como RabbitMQ, Kafka o RedisPubSub.

  5. El cliente del broker que implementa IMessage distribuye el evento a los consumidores de eventos.

Ejemplo de Uso


A continuación se muestra un ejemplo de cómo usar el servicio IPubSub para publicar un evento de dominio.

using CodeDesignPlus.Net.PubSub.Abstractions;
using System.Threading.Tasks;
public class Example
{
private readonly IPubSub pubSub;
public Example(IPubSub pubSub)
{
this.pubSub = pubSub;
}
public async Task PublishEventAsync()
{
var domainEvent = new DomainEvent();
await this.pubSub.PublishAsync(domainEvent, default);
}
}