Skip to content

EventStore PubSub

El IEventStorePubSub es una interfaz que hereda de IMessage que se encuentra en CodeDesignPlus.Net.PubSub.Abstractions . Esta interfaz proporciona un conjunto de métodos para publicar y suscribirse a eventos del Event Store.

La interfaz IMessage encapsula las operaciones fundamentales de mensajería, como el envío y recepción de mensajes asíncronos. Esta interfaz proporciona una base sólida para implementar servicios de publicación/suscripción robustos y escalables.

namespace CodeDesignPlus.Net.PubSub.Abstractions;
/// <summary>
/// Interface for managing the publishing and subscribing of domain events.
/// </summary>
public interface IMessage
{
/// <summary>
/// Publishes a domain event asynchronously.
/// </summary>
/// <param name="event">The domain event to publish.</param>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous publish operation.</returns>
Task PublishAsync(IDomainEvent @event, CancellationToken cancellationToken);
/// <summary>
/// Publishes a list of domain events asynchronously.
/// </summary>
/// <param name="event">The list of domain events to publish.</param>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous publish operation.</returns>
Task PublishAsync(IReadOnlyList<IDomainEvent> @event, CancellationToken cancellationToken);
/// <summary>
/// Subscribes to a domain event asynchronously.
/// </summary>
/// <typeparam name="TEvent">The type of the domain event to subscribe to.</typeparam>
/// <typeparam name="TEventHandler">The type of the event handler (callback).</typeparam>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous subscribe operation.</returns>
Task SubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken)
where TEvent : IDomainEvent
where TEventHandler : IEventHandler<TEvent>;
/// <summary>
/// Unsubscribes from a domain event asynchronously.
/// </summary>
/// <typeparam name="TEvent">The type of the domain event to unsubscribe from.</typeparam>
/// <typeparam name="TEventHandler">The type of the event handler (callback).</typeparam>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous unsubscribe operation.</returns>
Task UnsubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken)
where TEvent : IDomainEvent
where TEventHandler : IEventHandler<TEvent>;
}

La libreria CodeDesignPlus.Net.EventStore.PubSub, es un marco robusto diseñado para implementar patrones de publicación-suscripción en la gestión de eventos dentro de aplicaciones .NET Core para interactuar con el Event Store. De igual forma, existen otros marcos como Kafka, RabbitMQ, Redis, que tienen sus propias paquetes implementando la interfaz IMessage para la publicación y suscripción de eventos y de esta forma interactuar de la mejor forma con la infraestructura de eventos que se tenga en la organización.

Funcionalidades


  1. Publicar Evento:

    • Task PublishAsync(IDomainEvent @event, CancellationToken cancellationToken): Publica un evento en el Event Store.
  2. Publicar Eventos:

    • Task PublishAsync(IReadOnlyList<IDomainEvent> @event, CancellationToken cancellationToken): Publica una lista de eventos en el Event Store.
  3. Suscribirse a Evento:

    • Task SubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken): Suscribe un método a un evento específico.
  4. Desuscribirse de Evento:

    • Task UnsubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken): Desuscribe un método de un evento específico.

Características


  • Asincronía: Los métodos de publicación y suscripción son asíncronos, lo que permite una mejor gestión de recursos y evita bloqueos en el hilo principal.
  • Manejo de Eventos: Proporciona métodos para publicar y suscribirse a eventos, lo que permite a los desarrolladores reaccionar adecuadamente a cambios en el estado de la conexión.
  • Abstracción: Al ser una interfaz, permite la implementación de diferentes estrategias de publicación/suscripción y facilita la prueba unitaria mediante el uso de mocks.
  • Escalabilidad: Permite publicar y suscribirse a eventos de forma escalable, lo que facilita la construcción de sistemas robustos y desacoplados basados en eventos.
  • Desacoplamiento: Proporciona una comunicación eficiente y desacoplada entre los componentes de la aplicación, lo que facilita la integración con otras bibliotecas y servicios.
  • Reutilización: Permite reutilizar la lógica de publicación/suscripción de eventos en diferentes partes de la aplicación, lo que facilita la implementación de patrones de diseño como Event Sourcing y CQRS.

Implementacaión


La interfaz IEventStorePubSub se implementa en la clase EventStorePubSubService, que proporciona la lógica para publicar y suscribirse a eventos del Event Store. La clase EventStorePubSubService se encuentra en el espacio de nombres CodeDesignPlus.Net.EventStore.PubSub.Services.

using CodeDesignPlus.Net.Core.Abstractions.Options;
using CodeDesignPlus.Net.Exceptions;
namespace CodeDesignPlus.Net.EventStore.PubSub.Services;
/// <summary>
/// Provides Pub/Sub services for interacting with EventStore.
/// </summary>
public class EventStorePubSubService : IEventStorePubSub
{
private readonly IEventStoreFactory eventStoreFactory;
private readonly IServiceProvider serviceProvider;
private readonly ILogger<EventStorePubSubService> logger;
private readonly CoreOptions options;
private readonly PersistentSubscriptionSettings settings;
private readonly IDomainEventResolver domainEventResolverService;
/// <summary>
/// Initializes a new instance of the <see cref="EventStorePubSubService"/> class.
/// </summary>
/// <param name="eventStoreFactory">The factory to create EventStore connections.</param>
/// <param name="serviceProvider">The service provider for resolving dependencies.</param>
/// <param name="logger">The logger instance.</param>
/// <param name="coreOptions">The EventStore Pub/Sub options.</param>
/// <param name="domainEventResolverService">The service to resolve domain events.</param>
/// <exception cref="ArgumentNullException">
/// Thrown when <paramref name="eventStoreFactory"/>, <paramref name="serviceProvider"/>, <paramref name="logger"/>, <paramref name="coreOptions"/>, or <paramref name="domainEventResolverService"/> is null.
/// </exception>
public EventStorePubSubService(
IEventStoreFactory eventStoreFactory,
IServiceProvider serviceProvider,
ILogger<EventStorePubSubService> logger,
IOptions<CoreOptions> coreOptions,
IDomainEventResolver domainEventResolverService)
{
ArgumentNullException.ThrowIfNull(eventStoreFactory);
ArgumentNullException.ThrowIfNull(serviceProvider);
ArgumentNullException.ThrowIfNull(logger);
ArgumentNullException.ThrowIfNull(coreOptions);
ArgumentNullException.ThrowIfNull(domainEventResolverService);
this.eventStoreFactory = eventStoreFactory;
this.serviceProvider = serviceProvider;
this.logger = logger;
this.domainEventResolverService = domainEventResolverService;
this.options = coreOptions.Value;
this.settings = PersistentSubscriptionSettings
.Create()
.StartFromCurrent();
this.logger.LogInformation("EventStorePubSubService initialized.");
}
/// <summary>
/// Publishes a domain event to the EventStore.
/// </summary>
/// <param name="event">The domain event to publish.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
public async Task PublishAsync(IDomainEvent @event, CancellationToken cancellationToken)
{
var connection = await this.eventStoreFactory.CreateAsync(EventStoreFactoryConst.Core, cancellationToken).ConfigureAwait(false);
var stream = this.domainEventResolverService.GetKeyDomainEvent(@event.GetType());
var eventData = new EventData(
@event.EventId,
stream,
true,
Encoding.UTF8.GetBytes(JsonSerializer.Serialize(@event)),
Encoding.UTF8.GetBytes(JsonSerializer.Serialize(@event.Metadata)));
await connection.AppendToStreamAsync(stream, ExpectedVersion.Any, eventData).ConfigureAwait(false);
}
/// <summary>
/// Publishes a list of domain events to the EventStore.
/// </summary>
/// <param name="event">The list of domain events to publish.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
public Task PublishAsync(IReadOnlyList<IDomainEvent> @event, CancellationToken cancellationToken)
{
var tasks = @event.Select(x => this.PublishAsync(x, cancellationToken));
return Task.WhenAll(tasks);
}
/// <summary>
/// Subscribes to a domain event in the EventStore.
/// </summary>
/// <typeparam name="TEvent">The type of the domain event.</typeparam>
/// <typeparam name="TEventHandler">The type of the event handler.</typeparam>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
public async Task SubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken)
where TEvent : IDomainEvent
where TEventHandler : IEventHandler<TEvent>
{
var connection = await this.eventStoreFactory.CreateAsync(EventStoreFactoryConst.Core, cancellationToken).ConfigureAwait(false);
var (user, pass) = this.eventStoreFactory.GetCredentials(EventStoreFactoryConst.Core);
var stream = this.domainEventResolverService.GetKeyDomainEvent<TEvent>();
var userCredentials = new UserCredentials(user, pass);
try
{
await connection.CreatePersistentSubscriptionAsync(
stream,
options.AppName,
this.settings,
userCredentials
).ConfigureAwait(false);
}
catch (Exception e)
{
this.logger.LogWarning(e, "{Message}", e.Message);
}
await connection.ConnectToPersistentSubscriptionAsync(
stream,
options.AppName,
(_, evt) => EventAppearedAsync<TEvent, TEventHandler>(evt, cancellationToken).ConfigureAwait(false),
(sub, reason, exception) => this.logger.LogDebug("Subscription dropped: {Reason}", reason)
).ConfigureAwait(false);
this.logger.LogInformation("Subscription to {Stream} created.", stream);
}
/// <summary>
/// Handles the event when it appears in the EventStore.
/// </summary>
/// <typeparam name="TEvent">The type of the domain event.</typeparam>
/// <typeparam name="TEventHandler">The type of the event handler.</typeparam>
/// <param name="event">The resolved event.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
private Task EventAppearedAsync<TEvent, TEventHandler>(ResolvedEvent @event, CancellationToken cancellationToken)
where TEvent : IDomainEvent
where TEventHandler : IEventHandler<TEvent>
{
try
{
using var scope = serviceProvider.CreateScope();
var context = scope.ServiceProvider.GetRequiredService<IEventContext>();
var domainEvent = JsonSerializer.Deserialize<TEvent>(Encoding.UTF8.GetString(@event.Event.Data));
context.SetCurrentDomainEvent(domainEvent);
var eventHandler = scope.ServiceProvider.GetRequiredService<TEventHandler>();
return eventHandler.HandleAsync(@domainEvent, cancellationToken);
}
catch (CodeDesignPlusException ex)
{
logger.LogWarning(ex, "Warning processing event: {TEvent} | {Message}.", typeof(TEvent).Name, ex.Message);
}
catch (Exception ex)
{
this.logger.LogError(ex, "Error processing event: {TEvent} | {Message}.", typeof(TEvent).Name, ex.Message);
}
return Task.CompletedTask;
}
/// <summary>
/// Unsubscribes from a domain event in the EventStore.
/// </summary>
/// <typeparam name="TEvent">The type of the domain event.</typeparam>
/// <typeparam name="TEventHandler">The type of the event handler.</typeparam>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
public Task UnsubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken)
where TEvent : IDomainEvent
where TEventHandler : IEventHandler<TEvent>
{
return Task.CompletedTask;
}
}

Ejemplo


Una vez que se ha implementado la interfaz IEventStorePubSub, se puede utilizar para publicar y suscribirse a eventos del Event Store.

  1. Agregar las propiedades de configuración al archivo appsettings.json:

    {
    "EventStore": {
    "Servers": {
    "Core": {
    "ConnectionString": "tcp://localhost:1113?tls=false",
    "User": "admin",
    "Password": "12345678"
    }
    }
    },
    "EventStorePubSub": {
    "Enabled": true,
    "Group": "Group-Microservices",
    }
    }
  2. Registrar la interfaz IEventStorePubSub en el contenedor de dependencias:

    public class Startup
    {
    public void ConfigureServices(IServiceCollection services)
    {
    services.AddSingleton<IMessage, EventStorePubSubService>();
    services.AddSingleton<IEventStorePubSub, EventStorePubSubService>();
    services.AddLogging();
    }
    }
  3. Publicar un evento en una clase de servicio:

    public class EventStoreService
    {
    private readonly IPubSub pubSub;
    public EventStoreService(IPubSub pubSub)
    {
    this.pubSub = pubSub;
    }
    public async Task PublishEventAsync(IDomainEvent @event)
    {
    await this.pubSub.PublishAsync(@event, CancellationToken.None);
    }
    }

Conclusiones


El IEventStorePubSub es una interfaz que proporciona métodos para publicar y suscribirse a eventos del Event Store. Al implementar esta interfaz en una clase de servicio, los desarrolladores pueden interactuar con el Event Store de forma eficiente y desacoplada, lo que facilita la construcción de sistemas escalables y robustos basados en eventos. Además, al ser una interfaz, permite la implementación de diferentes estrategias de publicación/suscripción y facilita la prueba unitaria mediante el uso de mocks. Por lo tanto, el IEventStorePubSub es una herramienta poderosa para gestionar eventos en aplicaciones .NET Core y construir sistemas desacoplados y escalables basados en eventos.