IEventQueue
IEventQueue
es una interfaz que define los métodos necesarios para gestionar una cola de eventos de dominio. Esta interfaz permite agregar eventos a la cola y procesarlos de manera asíncrona.
namespace CodeDesignPlus.Net.PubSub.Abstractions;
/// <summary>/// Interface for managing a queue of domain events./// </summary>public interface IEventQueue{ /// <summary> /// Adds an event to the end of the concurrent queue. /// </summary> /// <param name="event">The domain event to add to the queue. Can be null for reference types.</param> /// <param name="cancellationToken">The cancellation token to observe while waiting for the task to complete.</param> /// <returns>A task that represents the asynchronous enqueue operation.</returns> Task EnqueueAsync(IDomainEvent @event, CancellationToken cancellationToken);
/// <summary> /// Tries to remove and return the event at the beginning of the concurrent queue. /// </summary> /// <param name="token">The cancellation token to observe while waiting for the task to complete.</param> /// <returns>A task that represents the asynchronous dequeue operation.</returns> Task DequeueAsync(CancellationToken token);}
Métodos
Los métodos que se pueden utilizar con la interfaz IEventQueue
son los siguientes:
EnqueueAsync
Type: Task EnqueueAsync(IDomainEvent @event, CancellationToken cancellationToken)
Agrega un evento al final de la cola concurrente.
DequeueAsync
Type: Task DequeueAsync(CancellationToken token)
Intenta eliminar y devolver el evento al principio de la cola concurrente.
Implementación
La implementación de la interfaz IEventQueue
se muestra a continuación. Esta implementación es responsable de almacenar los eventos en una cola en memoria antes de ser procesados.
namespace CodeDesignPlus.Net.PubSub.Services;
/// <summary>/// Provides a service to manage the event queue./// </summary>public class EventQueueService : IEventQueue{ private readonly ConcurrentQueue<IDomainEvent> queue = new(); private readonly ILogger<EventQueueService> logger; private readonly IMessage message; private readonly PubSubOptions options; private readonly IActivityService activityService;
/// <summary> /// Initializes a new instance of the <see cref="EventQueueService"/> class. /// </summary> /// <param name="logger">The logger to manage the logs.</param> /// <param name="options">The options for the PubSub service.</param> /// <param name="message">The message service to publish events.</param> /// <param name="activityService">The activity service to manage activities (optional).</param> public EventQueueService(ILogger<EventQueueService> logger, IOptions<PubSubOptions> options, IMessage message, IActivityService activityService = null) { ArgumentNullException.ThrowIfNull(logger); ArgumentNullException.ThrowIfNull(options); ArgumentNullException.ThrowIfNull(message);
this.logger = logger; this.message = message; this.options = options.Value; this.activityService = activityService;
this.logger.LogDebug("EventQueueService initialized."); }
/// <summary> /// Enqueues an event to the queue. /// </summary> /// <param name="event">The event to enqueue.</param> /// <param name="cancellationToken">A cancellation token used to propagate notifications that operations should be canceled.</param> /// <returns>A task representing the asynchronous operation.</returns> public Task EnqueueAsync(IDomainEvent @event, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(@event);
var exist = this.queue.Any(x => x.Equals(@event));
if (!exist) { var activity = this.activityService?.StartActivity("EventQueueService.EnqueueAsync", ActivityKind.Internal);
this.activityService?.Inject(activity, @event);
activity?.AddTag("event.type", @event.GetType().Name); activity?.AddTag("event.id", @event.EventId.ToString()); activity?.AddTag("event.aggregate_id", @event.AggregateId.ToString());
this.queue.Enqueue(@event);
this.logger.LogDebug("Event of type {Name} enqueued.", @event.GetType().Name);
activity?.SetStatus(ActivityStatusCode.Ok); activity?.Stop(); } else { this.logger.LogWarning("Event of type {Name} was already in the queue. Skipping.", @event.GetType().Name); }
return Task.CompletedTask; }
/// <summary> /// Dequeues events from the queue and processes them. /// </summary> /// <param name="token">A cancellation token used to propagate notifications that operations should be canceled.</param> /// <returns>A task representing the asynchronous operation.</returns> public async Task DequeueAsync(CancellationToken token) { while (!token.IsCancellationRequested) { try { if (this.queue.TryDequeue(out IDomainEvent @event)) { this.logger.LogDebug("Dequeueing event of type {TEvent}.", @event.GetType().Name);
var parentContext = this.activityService?.Extract(@event);
var activity = this.activityService?.StartActivity("EventQueueService.DequeueAsync", ActivityKind.Internal, parentContext);
activity?.AddTag("event.type", @event.GetType().Name); activity?.AddTag("event.id", @event.EventId.ToString()); activity?.AddTag("event.aggregate_id", @event.AggregateId.ToString());
await this.message.PublishAsync(@event, token).ConfigureAwait(false);
activity?.SetStatus(ActivityStatusCode.Ok);
activity?.Stop(); } else { this.logger.LogDebug("No events in the queue. Waiting...");
await Task.Delay(TimeSpan.FromSeconds(this.options.SecondsWaitQueue), CancellationToken.None); } } catch (Exception ex) { this.logger.LogError(ex, "Error processing event."); } }
this.logger.LogWarning("DequeueAsync stopped due to cancellation token."); }}
Ejemplo de Uso
Para comprender el funcionamiento y el uso de la interfaz IEventQueue
, se muestra la clase PubSubService
que implementa la interfaz IPubSub
y utiliza el servicio IEventQueue
para encolar eventos de dominio cuando UseQueue
es true
.
namespace CodeDesignPlus.Net.PubSub.Services;
/// <summary>/// Service responsible for publishing domain events./// </summary>public class PubSubService : IPubSub{ private readonly IMessage message; private readonly IOptions<PubSubOptions> options; private readonly IServiceProvider serviceProvider; private readonly ILogger<PubSubService> logger;
/// <summary> /// Initializes a new instance of the <see cref="PubSubService"/> class. /// </summary> /// <param name="message">The message service used for publishing events.</param> /// <param name="options">The options for configuring the PubSub service.</param> /// <param name="serviceProvider">The service provider for resolving dependencies.</param> /// <param name="logger">The logger for logging information.</param> /// <exception cref="ArgumentNullException">Thrown when any of the parameters are null.</exception> public PubSubService(IMessage message, IOptions<PubSubOptions> options, IServiceProvider serviceProvider, ILogger<PubSubService> logger) { ArgumentNullException.ThrowIfNull(message); ArgumentNullException.ThrowIfNull(options); ArgumentNullException.ThrowIfNull(serviceProvider); ArgumentNullException.ThrowIfNull(logger);
this.message = message; this.options = options; this.serviceProvider = serviceProvider; this.logger = logger;
this.logger.LogDebug("PubSubService initialized."); }
/// <summary> /// Publishes a single domain event asynchronously. /// </summary> /// <param name="event">The domain event to publish.</param> /// <param name="cancellationToken">A token to monitor for cancellation requests.</param> /// <returns>A task that represents the asynchronous publish operation.</returns> public Task PublishAsync(IDomainEvent @event, CancellationToken cancellationToken) { if (this.options.Value.UseQueue) { this.logger.LogDebug("UseQueue is true, enqueuing event of type {Name}.", @event.GetType().Name);
var eventQueueService = this.serviceProvider.GetRequiredService<IEventQueue>();
return eventQueueService.EnqueueAsync(@event, cancellationToken); }
this.logger.LogDebug("UseQueue is false, publishing event of type {Name}.", @event.GetType().Name);
return this.message.PublishAsync(@event, cancellationToken); }
/// <summary> /// Publishes a list of domain events asynchronously. /// </summary> /// <param name="event">The list of domain events to publish.</param> /// <param name="cancellationToken">A token to monitor for cancellation requests.</param> /// <returns>A task that represents the asynchronous publish operation.</returns> public Task PublishAsync(IReadOnlyList<IDomainEvent> @event, CancellationToken cancellationToken) { var tasks = @event.Select(@event => this.PublishAsync(@event, cancellationToken));
return Task.WhenAll(tasks); }}