Skip to content

IEventQueue

IEventQueue es una interfaz que define los métodos necesarios para gestionar una cola de eventos de dominio. Esta interfaz permite agregar eventos a la cola y procesarlos de manera asíncrona.

namespace CodeDesignPlus.Net.PubSub.Abstractions;
/// <summary>
/// Interface for managing a queue of domain events.
/// </summary>
public interface IEventQueue
{
/// <summary>
/// Adds an event to the end of the concurrent queue.
/// </summary>
/// <param name="event">The domain event to add to the queue. Can be null for reference types.</param>
/// <param name="cancellationToken">The cancellation token to observe while waiting for the task to complete.</param>
/// <returns>A task that represents the asynchronous enqueue operation.</returns>
Task EnqueueAsync(IDomainEvent @event, CancellationToken cancellationToken);
/// <summary>
/// Tries to remove and return the event at the beginning of the concurrent queue.
/// </summary>
/// <param name="token">The cancellation token to observe while waiting for the task to complete.</param>
/// <returns>A task that represents the asynchronous dequeue operation.</returns>
Task DequeueAsync(CancellationToken token);
}

Métodos


Los métodos que se pueden utilizar con la interfaz IEventQueue son los siguientes:

EnqueueAsync

Type: Task EnqueueAsync(IDomainEvent @event, CancellationToken cancellationToken)

Agrega un evento al final de la cola concurrente.

DequeueAsync

Type: Task DequeueAsync(CancellationToken token)

Intenta eliminar y devolver el evento al principio de la cola concurrente.

Implementación


La implementación de la interfaz IEventQueue se muestra a continuación. Esta implementación es responsable de almacenar los eventos en una cola en memoria antes de ser procesados.

namespace CodeDesignPlus.Net.PubSub.Services;
/// <summary>
/// Provides a service to manage the event queue.
/// </summary>
public class EventQueueService : IEventQueue
{
private readonly ConcurrentQueue<IDomainEvent> queue = new();
private readonly ILogger<EventQueueService> logger;
private readonly IMessage message;
private readonly PubSubOptions options;
private readonly IActivityService activityService;
/// <summary>
/// Initializes a new instance of the <see cref="EventQueueService"/> class.
/// </summary>
/// <param name="logger">The logger to manage the logs.</param>
/// <param name="options">The options for the PubSub service.</param>
/// <param name="message">The message service to publish events.</param>
/// <param name="activityService">The activity service to manage activities (optional).</param>
public EventQueueService(ILogger<EventQueueService> logger, IOptions<PubSubOptions> options, IMessage message, IActivityService activityService = null)
{
ArgumentNullException.ThrowIfNull(logger);
ArgumentNullException.ThrowIfNull(options);
ArgumentNullException.ThrowIfNull(message);
this.logger = logger;
this.message = message;
this.options = options.Value;
this.activityService = activityService;
this.logger.LogDebug("EventQueueService initialized.");
}
/// <summary>
/// Enqueues an event to the queue.
/// </summary>
/// <param name="event">The event to enqueue.</param>
/// <param name="cancellationToken">A cancellation token used to propagate notifications that operations should be canceled.</param>
/// <returns>A task representing the asynchronous operation.</returns>
public Task EnqueueAsync(IDomainEvent @event, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(@event);
var exist = this.queue.Any(x => x.Equals(@event));
if (!exist)
{
var activity = this.activityService?.StartActivity("EventQueueService.EnqueueAsync", ActivityKind.Internal);
this.activityService?.Inject(activity, @event);
activity?.AddTag("event.type", @event.GetType().Name);
activity?.AddTag("event.id", @event.EventId.ToString());
activity?.AddTag("event.aggregate_id", @event.AggregateId.ToString());
this.queue.Enqueue(@event);
this.logger.LogDebug("Event of type {Name} enqueued.", @event.GetType().Name);
activity?.SetStatus(ActivityStatusCode.Ok);
activity?.Stop();
}
else
{
this.logger.LogWarning("Event of type {Name} was already in the queue. Skipping.", @event.GetType().Name);
}
return Task.CompletedTask;
}
/// <summary>
/// Dequeues events from the queue and processes them.
/// </summary>
/// <param name="token">A cancellation token used to propagate notifications that operations should be canceled.</param>
/// <returns>A task representing the asynchronous operation.</returns>
public async Task DequeueAsync(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
try
{
if (this.queue.TryDequeue(out IDomainEvent @event))
{
this.logger.LogDebug("Dequeueing event of type {TEvent}.", @event.GetType().Name);
var parentContext = this.activityService?.Extract(@event);
var activity = this.activityService?.StartActivity("EventQueueService.DequeueAsync", ActivityKind.Internal, parentContext);
activity?.AddTag("event.type", @event.GetType().Name);
activity?.AddTag("event.id", @event.EventId.ToString());
activity?.AddTag("event.aggregate_id", @event.AggregateId.ToString());
await this.message.PublishAsync(@event, token).ConfigureAwait(false);
activity?.SetStatus(ActivityStatusCode.Ok);
activity?.Stop();
}
else
{
this.logger.LogDebug("No events in the queue. Waiting...");
await Task.Delay(TimeSpan.FromSeconds(this.options.SecondsWaitQueue), CancellationToken.None);
}
}
catch (Exception ex)
{
this.logger.LogError(ex, "Error processing event.");
}
}
this.logger.LogWarning("DequeueAsync stopped due to cancellation token.");
}
}

Ejemplo de Uso


Para comprender el funcionamiento y el uso de la interfaz IEventQueue, se muestra la clase PubSubService que implementa la interfaz IPubSub y utiliza el servicio IEventQueue para encolar eventos de dominio cuando UseQueue es true.

namespace CodeDesignPlus.Net.PubSub.Services;
/// <summary>
/// Service responsible for publishing domain events.
/// </summary>
public class PubSubService : IPubSub
{
private readonly IMessage message;
private readonly IOptions<PubSubOptions> options;
private readonly IServiceProvider serviceProvider;
private readonly ILogger<PubSubService> logger;
/// <summary>
/// Initializes a new instance of the <see cref="PubSubService"/> class.
/// </summary>
/// <param name="message">The message service used for publishing events.</param>
/// <param name="options">The options for configuring the PubSub service.</param>
/// <param name="serviceProvider">The service provider for resolving dependencies.</param>
/// <param name="logger">The logger for logging information.</param>
/// <exception cref="ArgumentNullException">Thrown when any of the parameters are null.</exception>
public PubSubService(IMessage message, IOptions<PubSubOptions> options, IServiceProvider serviceProvider, ILogger<PubSubService> logger)
{
ArgumentNullException.ThrowIfNull(message);
ArgumentNullException.ThrowIfNull(options);
ArgumentNullException.ThrowIfNull(serviceProvider);
ArgumentNullException.ThrowIfNull(logger);
this.message = message;
this.options = options;
this.serviceProvider = serviceProvider;
this.logger = logger;
this.logger.LogDebug("PubSubService initialized.");
}
/// <summary>
/// Publishes a single domain event asynchronously.
/// </summary>
/// <param name="event">The domain event to publish.</param>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous publish operation.</returns>
public Task PublishAsync(IDomainEvent @event, CancellationToken cancellationToken)
{
if (this.options.Value.UseQueue)
{
this.logger.LogDebug("UseQueue is true, enqueuing event of type {Name}.", @event.GetType().Name);
var eventQueueService = this.serviceProvider.GetRequiredService<IEventQueue>();
return eventQueueService.EnqueueAsync(@event, cancellationToken);
}
this.logger.LogDebug("UseQueue is false, publishing event of type {Name}.", @event.GetType().Name);
return this.message.PublishAsync(@event, cancellationToken);
}
/// <summary>
/// Publishes a list of domain events asynchronously.
/// </summary>
/// <param name="event">The list of domain events to publish.</param>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous publish operation.</returns>
public Task PublishAsync(IReadOnlyList<IDomainEvent> @event, CancellationToken cancellationToken)
{
var tasks = @event.Select(@event => this.PublishAsync(@event, cancellationToken));
return Task.WhenAll(tasks);
}
}