EventStore PubSub
El IEventStorePubSub
es una interfaz que hereda de IMessage
que se encuentra en CodeDesignPlus.Net.PubSub.Abstractions
. Esta interfaz proporciona un conjunto de métodos para publicar y suscribirse a eventos del Event Store.
La interfaz IMessage
encapsula las operaciones fundamentales de mensajería, como el envío y recepción de mensajes asíncronos. Esta interfaz proporciona una base sólida para implementar servicios de publicación/suscripción robustos y escalables.
namespace CodeDesignPlus.Net.PubSub.Abstractions;
/// <summary>/// Interface for managing the publishing and subscribing of domain events./// </summary>public interface IMessage{ /// <summary> /// Publishes a domain event asynchronously. /// </summary> /// <param name="event">The domain event to publish.</param> /// <param name="cancellationToken">A token to monitor for cancellation requests.</param> /// <returns>A task that represents the asynchronous publish operation.</returns> Task PublishAsync(IDomainEvent @event, CancellationToken cancellationToken);
/// <summary> /// Publishes a list of domain events asynchronously. /// </summary> /// <param name="event">The list of domain events to publish.</param> /// <param name="cancellationToken">A token to monitor for cancellation requests.</param> /// <returns>A task that represents the asynchronous publish operation.</returns> Task PublishAsync(IReadOnlyList<IDomainEvent> @event, CancellationToken cancellationToken);
/// <summary> /// Subscribes to a domain event asynchronously. /// </summary> /// <typeparam name="TEvent">The type of the domain event to subscribe to.</typeparam> /// <typeparam name="TEventHandler">The type of the event handler (callback).</typeparam> /// <param name="cancellationToken">A token to monitor for cancellation requests.</param> /// <returns>A task that represents the asynchronous subscribe operation.</returns> Task SubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken) where TEvent : IDomainEvent where TEventHandler : IEventHandler<TEvent>;
/// <summary> /// Unsubscribes from a domain event asynchronously. /// </summary> /// <typeparam name="TEvent">The type of the domain event to unsubscribe from.</typeparam> /// <typeparam name="TEventHandler">The type of the event handler (callback).</typeparam> /// <param name="cancellationToken">A token to monitor for cancellation requests.</param> /// <returns>A task that represents the asynchronous unsubscribe operation.</returns> Task UnsubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken) where TEvent : IDomainEvent where TEventHandler : IEventHandler<TEvent>;}
La interfaz IEventStorePubSub
hereda de IMessage
y proporciona un conjunto de métodos para publicar y suscribirse a eventos del Event Store.
namespace CodeDesignPlus.Net.EventStore.PubSub.Abstractions;
/// <summary>/// Represents the service that allows to publish and subscribe to events in EventStore./// </summary>public interface IEventStorePubSub : IMessage{
}
La libreria CodeDesignPlus.Net.EventStore.PubSub
, es un marco robusto diseñado para implementar patrones de publicación-suscripción en la gestión de eventos dentro de aplicaciones .NET Core para interactuar con el Event Store
. De igual forma, existen otros marcos como Kafka
, RabbitMQ
, Redis
, que tienen sus propias paquetes implementando la interfaz IMessage
para la publicación y suscripción de eventos y de esta forma interactuar de la mejor forma con la infraestructura de eventos que se tenga en la organización.
Funcionalidades
-
Publicar Evento:
Task PublishAsync(IDomainEvent @event, CancellationToken cancellationToken)
: Publica un evento en el Event Store.
-
Publicar Eventos:
Task PublishAsync(IReadOnlyList<IDomainEvent> @event, CancellationToken cancellationToken)
: Publica una lista de eventos en el Event Store.
-
Suscribirse a Evento:
Task SubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken)
: Suscribe un método a un evento específico.
-
Desuscribirse de Evento:
Task UnsubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken)
: Desuscribe un método de un evento específico.
Características
- Asincronía: Los métodos de publicación y suscripción son asíncronos, lo que permite una mejor gestión de recursos y evita bloqueos en el hilo principal.
- Manejo de Eventos: Proporciona métodos para publicar y suscribirse a eventos, lo que permite a los desarrolladores reaccionar adecuadamente a cambios en el estado de la conexión.
- Abstracción: Al ser una interfaz, permite la implementación de diferentes estrategias de publicación/suscripción y facilita la prueba unitaria mediante el uso de mocks.
- Escalabilidad: Permite publicar y suscribirse a eventos de forma escalable, lo que facilita la construcción de sistemas robustos y desacoplados basados en eventos.
- Desacoplamiento: Proporciona una comunicación eficiente y desacoplada entre los componentes de la aplicación, lo que facilita la integración con otras bibliotecas y servicios.
- Reutilización: Permite reutilizar la lógica de publicación/suscripción de eventos en diferentes partes de la aplicación, lo que facilita la implementación de patrones de diseño como Event Sourcing y CQRS.
Implementacaión
La interfaz IEventStorePubSub
se implementa en la clase EventStorePubSubService
, que proporciona la lógica para publicar y suscribirse a eventos del Event Store. La clase EventStorePubSubService
se encuentra en el espacio de nombres CodeDesignPlus.Net.EventStore.PubSub.Services
.
using CodeDesignPlus.Net.Core.Abstractions.Options;using CodeDesignPlus.Net.Exceptions;
namespace CodeDesignPlus.Net.EventStore.PubSub.Services;
/// <summary>/// Provides Pub/Sub services for interacting with EventStore./// </summary>public class EventStorePubSubService : IEventStorePubSub{ private readonly IEventStoreFactory eventStoreFactory; private readonly IServiceProvider serviceProvider; private readonly ILogger<EventStorePubSubService> logger; private readonly CoreOptions options; private readonly PersistentSubscriptionSettings settings; private readonly IDomainEventResolver domainEventResolverService;
/// <summary> /// Initializes a new instance of the <see cref="EventStorePubSubService"/> class. /// </summary> /// <param name="eventStoreFactory">The factory to create EventStore connections.</param> /// <param name="serviceProvider">The service provider for resolving dependencies.</param> /// <param name="logger">The logger instance.</param> /// <param name="coreOptions">The EventStore Pub/Sub options.</param> /// <param name="domainEventResolverService">The service to resolve domain events.</param> /// <exception cref="ArgumentNullException"> /// Thrown when <paramref name="eventStoreFactory"/>, <paramref name="serviceProvider"/>, <paramref name="logger"/>, <paramref name="coreOptions"/>, or <paramref name="domainEventResolverService"/> is null. /// </exception> public EventStorePubSubService( IEventStoreFactory eventStoreFactory, IServiceProvider serviceProvider, ILogger<EventStorePubSubService> logger, IOptions<CoreOptions> coreOptions, IDomainEventResolver domainEventResolverService) { ArgumentNullException.ThrowIfNull(eventStoreFactory); ArgumentNullException.ThrowIfNull(serviceProvider); ArgumentNullException.ThrowIfNull(logger); ArgumentNullException.ThrowIfNull(coreOptions); ArgumentNullException.ThrowIfNull(domainEventResolverService);
this.eventStoreFactory = eventStoreFactory; this.serviceProvider = serviceProvider; this.logger = logger; this.domainEventResolverService = domainEventResolverService; this.options = coreOptions.Value;
this.settings = PersistentSubscriptionSettings .Create() .StartFromCurrent();
this.logger.LogInformation("EventStorePubSubService initialized."); }
/// <summary> /// Publishes a domain event to the EventStore. /// </summary> /// <param name="event">The domain event to publish.</param> /// <param name="cancellationToken">The cancellation token.</param> /// <returns>A task that represents the asynchronous operation.</returns> public async Task PublishAsync(IDomainEvent @event, CancellationToken cancellationToken) { var connection = await this.eventStoreFactory.CreateAsync(EventStoreFactoryConst.Core, cancellationToken).ConfigureAwait(false);
var stream = this.domainEventResolverService.GetKeyDomainEvent(@event.GetType());
var eventData = new EventData( @event.EventId, stream, true, Encoding.UTF8.GetBytes(JsonSerializer.Serialize(@event)), Encoding.UTF8.GetBytes(JsonSerializer.Serialize(@event.Metadata)));
await connection.AppendToStreamAsync(stream, ExpectedVersion.Any, eventData).ConfigureAwait(false); }
/// <summary> /// Publishes a list of domain events to the EventStore. /// </summary> /// <param name="event">The list of domain events to publish.</param> /// <param name="cancellationToken">The cancellation token.</param> /// <returns>A task that represents the asynchronous operation.</returns> public Task PublishAsync(IReadOnlyList<IDomainEvent> @event, CancellationToken cancellationToken) { var tasks = @event.Select(x => this.PublishAsync(x, cancellationToken));
return Task.WhenAll(tasks); }
/// <summary> /// Subscribes to a domain event in the EventStore. /// </summary> /// <typeparam name="TEvent">The type of the domain event.</typeparam> /// <typeparam name="TEventHandler">The type of the event handler.</typeparam> /// <param name="cancellationToken">The cancellation token.</param> /// <returns>A task that represents the asynchronous operation.</returns> public async Task SubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken) where TEvent : IDomainEvent where TEventHandler : IEventHandler<TEvent> { var connection = await this.eventStoreFactory.CreateAsync(EventStoreFactoryConst.Core, cancellationToken).ConfigureAwait(false);
var (user, pass) = this.eventStoreFactory.GetCredentials(EventStoreFactoryConst.Core);
var stream = this.domainEventResolverService.GetKeyDomainEvent<TEvent>();
var userCredentials = new UserCredentials(user, pass);
try { await connection.CreatePersistentSubscriptionAsync( stream, options.AppName, this.settings, userCredentials ).ConfigureAwait(false); } catch (Exception e) { this.logger.LogWarning(e, "{Message}", e.Message); }
await connection.ConnectToPersistentSubscriptionAsync( stream, options.AppName, (_, evt) => EventAppearedAsync<TEvent, TEventHandler>(evt, cancellationToken).ConfigureAwait(false), (sub, reason, exception) => this.logger.LogDebug("Subscription dropped: {Reason}", reason) ).ConfigureAwait(false);
this.logger.LogInformation("Subscription to {Stream} created.", stream); }
/// <summary> /// Handles the event when it appears in the EventStore. /// </summary> /// <typeparam name="TEvent">The type of the domain event.</typeparam> /// <typeparam name="TEventHandler">The type of the event handler.</typeparam> /// <param name="event">The resolved event.</param> /// <param name="cancellationToken">The cancellation token.</param> /// <returns>A task that represents the asynchronous operation.</returns> private Task EventAppearedAsync<TEvent, TEventHandler>(ResolvedEvent @event, CancellationToken cancellationToken) where TEvent : IDomainEvent where TEventHandler : IEventHandler<TEvent> { try { using var scope = serviceProvider.CreateScope();
var context = scope.ServiceProvider.GetRequiredService<IEventContext>();
var domainEvent = JsonSerializer.Deserialize<TEvent>(Encoding.UTF8.GetString(@event.Event.Data));
context.SetCurrentDomainEvent(domainEvent);
var eventHandler = scope.ServiceProvider.GetRequiredService<TEventHandler>();
return eventHandler.HandleAsync(@domainEvent, cancellationToken); } catch (CodeDesignPlusException ex) { logger.LogWarning(ex, "Warning processing event: {TEvent} | {Message}.", typeof(TEvent).Name, ex.Message); } catch (Exception ex) { this.logger.LogError(ex, "Error processing event: {TEvent} | {Message}.", typeof(TEvent).Name, ex.Message); }
return Task.CompletedTask; }
/// <summary> /// Unsubscribes from a domain event in the EventStore. /// </summary> /// <typeparam name="TEvent">The type of the domain event.</typeparam> /// <typeparam name="TEventHandler">The type of the event handler.</typeparam> /// <param name="cancellationToken">The cancellation token.</param> /// <returns>A task that represents the asynchronous operation.</returns> public Task UnsubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken) where TEvent : IDomainEvent where TEventHandler : IEventHandler<TEvent> { return Task.CompletedTask; }}
Ejemplo
Una vez que se ha implementado la interfaz IEventStorePubSub
, se puede utilizar para publicar y suscribirse a eventos del Event Store.
-
Agregar las propiedades de configuración al archivo
appsettings.json
:{"EventStore": {"Servers": {"Core": {"ConnectionString": "tcp://localhost:1113?tls=false","User": "admin","Password": "12345678"}}},"EventStorePubSub": {"Enabled": true,"Group": "Group-Microservices",}} -
Registrar la interfaz
IEventStorePubSub
en el contenedor de dependencias:public class Startup{public void ConfigureServices(IServiceCollection services){services.AddSingleton<IMessage, EventStorePubSubService>();services.AddSingleton<IEventStorePubSub, EventStorePubSubService>();services.AddLogging();}} -
Publicar un evento en una clase de servicio:
public class EventStoreService{private readonly IPubSub pubSub;public EventStoreService(IPubSub pubSub){this.pubSub = pubSub;}public async Task PublishEventAsync(IDomainEvent @event){await this.pubSub.PublishAsync(@event, CancellationToken.None);}}
Conclusiones
El IEventStorePubSub
es una interfaz que proporciona métodos para publicar y suscribirse a eventos del Event Store. Al implementar esta interfaz en una clase de servicio, los desarrolladores pueden interactuar con el Event Store de forma eficiente y desacoplada, lo que facilita la construcción de sistemas escalables y robustos basados en eventos. Además, al ser una interfaz, permite la implementación de diferentes estrategias de publicación/suscripción y facilita la prueba unitaria mediante el uso de mocks. Por lo tanto, el IEventStorePubSub
es una herramienta poderosa para gestionar eventos en aplicaciones .NET Core y construir sistemas desacoplados y escalables basados en eventos.