Skip to content

IEventSourcingService

IEventSourcingService define la interfaz para interactuar con un almacén de eventos en un sistema de Event Sourcing. Este servicio proporciona métodos para persistir, recuperar y consultar eventos, así como para trabajar con snapshots de agregados. La interfaz es fundamental para la implementación de la capa de persistencia en una arquitectura de Event Sourcing, permitiendo la desacoplamiento entre el dominio y la tecnología de almacenamiento de eventos.

¿Cómo Funciona?

IEventSourcingService actúa como un intermediario entre la lógica de negocio y la base de datos de eventos. Los métodos definidos en esta interfaz permiten:

  1. Persistencia de Eventos: Agregar eventos al almacén de eventos (AppendEventAsync).
  2. Carga de Eventos: Recuperar eventos para un agregado específico (LoadEventsAsync).
  3. Gestión de Snapshots: Guardar y recuperar snapshots para optimizar la rehidratación de agregados (LoadSnapshotAsync, SaveSnapshotAsync).
  4. Consulta de Eventos: Buscar y contar eventos (SearchEventsAsync, CountEventsAsync).
  5. Gestión de Versiones: Recuperar la última versión de un agregado (GetVersionAsync).

El servicio abstrae la complejidad de la persistencia, permitiendo a los agregados y a la lógica de dominio concentrarse en su propia funcionalidad, mientras delegan las operaciones de almacenamiento y recuperación a la implementación del servicio.

Métodos


El servicio IEventSourcingService define los siguientes métodos para interactuar con el almacén de eventos:

CountEventsAsync

Type: CountEventsAsync(string category, Guid aggregateId, CancellationToken cancellationToken = default)

Realiza un conteo del número total de eventos persistidos en el almacén para una categoría específica y un identificador de agregado dado. Este método permite obtener la cantidad de eventos asociados a un determinado agregado en una categoría, siendo útil para diversas operaciones como la comprobación de la existencia de eventos o la gestión de la concurrencia. El conteo se realiza de forma asíncrona y puede ser cancelado mediante un CancellationToken.

AppendEventAsync

Type: AppendEventAsync<TDomainEvent>(string category, TDomainEvent @event, long? version = null, CancellationToken cancellationToken = default)

Añade un nuevo evento al almacén. Este método toma un objeto evento, que implementa la interfaz IDomainEvent, y lo persiste en el almacén. El evento se asocia a una categoría específica. Permite opcionalmente, especificar un número de versión para el evento con el propósito de implementar control de concurrencia optimista. Si se proporciona una versión, el almacenamiento fallará si la versión actual del agregado no coincide. La operación de añadir el evento es asíncrona y puede ser cancelada mediante un CancellationToken.

GetVersionAsync

Type: GetVersionAsync(string category, Guid aggregateId, CancellationToken cancellationToken = default)

Obtiene la versión actual de un agregado específico dentro de una categoría determinada en el almacén de eventos. La versión representa el número de eventos que se han almacenado para ese agregado hasta este momento. Esta información es crucial para implementar mecanismos de control de concurrencia optimista, asegurando que las modificaciones a un agregado se basen en su estado más reciente. La obtención de la versión se realiza de forma asíncrona y soporta la cancelación a través de un CancellationToken.

LoadEventsAsync

Type: LoadEventsAsync(string category, Guid aggregateId, CancellationToken cancellationToken = default)

Carga todos los eventos almacenados para una categoría y un identificador de agregado específicos. Este método recupera el historial completo de eventos asociados a un agregado, permitiendo la reconstrucción del estado del agregado en un punto específico en el tiempo. La carga de los eventos se realiza de forma asíncrona y se puede cancelar mediante un CancellationToken. Devuelve una colección de objetos que implementan la interfaz IDomainEvent, representando el historial del agregado.

LoadSnapshotAsync

Type: LoadSnapshotAsync<TAggregate>(string category, Guid aggregateId, CancellationToken cancellationToken = default)

Recupera un snapshot previamente guardado de un agregado desde el almacén. Un snapshot representa el estado del agregado en un momento dado, sirviendo como punto de partida para la reconstrucción del estado actual sin necesidad de reproducir toda la secuencia de eventos. Este método permite optimizar el proceso de carga de agregados, particularmente aquellos con un historial extenso de eventos. La operación de carga es asíncrona, admite cancelación por medio de un CancellationToken y devuelve un objeto del tipo TAggregate.

SaveSnapshotAsync

Type: SaveSnapshotAsync<TAggregate>(TAggregate aggregate, CancellationToken cancellationToken = default)

Guarda un snapshot del estado actual de un agregado en el almacén. Este método toma un objeto agregado y lo serializa para su almacenamiento, permitiendo la posterior restauración rápida del agregado sin necesidad de volver a aplicar todos sus eventos. La operación de almacenamiento del snapshot es asíncrona y puede ser cancelada a través de un CancellationToken. El agregado debe implementar la interfaz IAggregateRoot.

SearchEventsAsync

Type: SearchEventsAsync(string streamName, CancellationToken cancellationToken = default)

Realiza una búsqueda de eventos en el almacén basándose en el nombre del stream de eventos. Este método permite una búsqueda flexible de eventos relacionados a un stream específico, ofreciendo la capacidad de obtener información de grupos de eventos. La búsqueda se realiza de forma asíncrona y puede ser cancelada mediante un CancellationToken. Devuelve una colección de objetos que implementan la interfaz IDomainEvent.

SearchEventsAsync

Type: SearchEventsAsync<TDomainEvent>(CancellationToken cancellationToken = default)

Busca eventos en el almacén de eventos por tipo específico. Este método permite recuperar todos los eventos que coincidan con el tipo TDomainEvent proporcionado. Proporciona un método de consulta para encontrar eventos específicos sin conocer la categoría o el ID del agregado asociado. La búsqueda es asíncrona y permite cancelación a través de un CancellationToken. El resultado es una colección de eventos del tipo especificado.

SearchEventsAsync

Type: SearchEventsAsync<TDomainEvent>(string category, CancellationToken cancellationToken = default)

Busca eventos en el almacén por tipo específico y categoría. Este método permite recuperar todos los eventos de un tipo determinado (TDomainEvent) que estén asociados a una categoría específica. Facilita la búsqueda de eventos en un contexto específico de dominio. La operación de búsqueda se realiza de forma asíncrona y puede ser cancelada utilizando un CancellationToken. El método devuelve una colección de objetos del tipo TDomainEvent.

Implementación


La interfaz IEventSourcingService define el contrato, pero la implementación concreta variará según la tecnología de persistencia utilizada (por ejemplo, EventStoreDB). La implementación debe manejar:

  • Conexión al Almacén de Eventos: Establecer la conexión a la base de datos de eventos.
  • Persistencia de Eventos: Guardar los eventos de manera atómica y asegurar la integridad de los datos.
  • Recuperación de Eventos: Leer los eventos eficientemente para la rehidratación de agregados.
  • Gestión de Snapshots: Almacenar y recuperar snapshots de manera consistente.
  • Consulta de Eventos: Implementar mecanismos eficientes para buscar y contar eventos.

Ejemplo de Uso


Para utilizar el servicio IEventSourcingService, debe implementar una clase que lo contenga y proporcione la lógica necesaria para interactuar con el almacén de eventos. A continuación, se muestra un ejemplo de implementación de un servicio de Event Sourcing en memoria, en la que podemos observar cada uno de los métodos de la interfaz IEventSourcingService.

using System;
using CodeDesignPlus.Net.Core.Abstractions;
using CodeDesignPlus.Net.Event.Sourcing.Abstractions;
namespace CodeDesignPlus.Net.Event.Sourcing.Sample.Services;
public class InMemoryEventSourcingService : IEventSourcing
{
public Task AppendEventAsync<TDomainEvent>(string category, TDomainEvent @event, long? version = null, CancellationToken cancellationToken = default) where TDomainEvent : IDomainEvent
{
throw new NotImplementedException();
}
public Task<long> CountEventsAsync(string category, Guid aggregateId, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
public Task<long> GetVersionAsync(string category, Guid aggregateId, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
public Task<IEnumerable<IDomainEvent>> LoadEventsAsync(string category, Guid aggregateId, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
public Task<TAggregate> LoadSnapshotAsync<TAggregate>(string category, Guid aggregateId, CancellationToken cancellationToken = default) where TAggregate : Abstractions.IAggregateRoot
{
throw new NotImplementedException();
}
public Task SaveSnapshotAsync<TAggregate>(TAggregate aggregate, CancellationToken cancellationToken = default) where TAggregate : Abstractions.IAggregateRoot
{
throw new NotImplementedException();
}
public Task<IEnumerable<IDomainEvent>> SearchEventsAsync(string streamName, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
public Task<IEnumerable<TDomainEvent>> SearchEventsAsync<TDomainEvent>(CancellationToken cancellationToken = default) where TDomainEvent : IDomainEvent
{
throw new NotImplementedException();
}
public Task<IEnumerable<TDomainEvent>> SearchEventsAsync<TDomainEvent>(string category, CancellationToken cancellationToken = default) where TDomainEvent : IDomainEvent
{
throw new NotImplementedException();
}
}

Conclusiones


IEventSourcingService es una interfaz esencial en la arquitectura de Event Sourcing, que proporciona un contrato para interactuar con un almacén de eventos. Su abstracción permite flexibilidad en la implementación del almacén de eventos, mientras que sus métodos facilitan la gestión del ciclo de vida de los eventos y la rehidratación de agregados.

Referencias Adicionales