Kafka PubSub
Este documento describe los servicios IKafkaPubSub
y KafkaPubSub
, los cuales proveen una abstracción para la comunicación mediante un broker de mensajería Kafka. El servicio KafkaPubSub
implementa la interfaz IKafkaPubSub
y ofrece métodos para publicar y suscribirse a eventos de dominio utilizando Kafka, incluyendo la configuración y el manejo de productores y consumidores.
namespace CodeDesignPlus.Net.Kafka.Abstractions;
/// <summary>/// Represents an interface for a Kafka Event Bus, extending the generic PubSub interface./// </summary>/// <remarks>/// This interface provides a contract for interacting with a Kafka-based event bus for publishing and subscribing to events./// Implementations of this interface should handle the underlying details of communication with Kafka brokers./// </remarks>public interface IKafkaPubSub : IMessage{
}
namespace CodeDesignPlus.Net.PubSub.Abstractions;
/// <summary>/// Interface for managing the publishing and subscribing of domain events./// </summary>public interface IMessage{ /// <summary> /// Publishes a domain event asynchronously. /// </summary> /// <param name="event">The domain event to publish.</param> /// <param name="cancellationToken">A token to monitor for cancellation requests.</param> /// <returns>A task that represents the asynchronous publish operation.</returns> Task PublishAsync(IDomainEvent @event, CancellationToken cancellationToken);
/// <summary> /// Publishes a list of domain events asynchronously. /// </summary> /// <param name="event">The list of domain events to publish.</param> /// <param name="cancellationToken">A token to monitor for cancellation requests.</param> /// <returns>A task that represents the asynchronous publish operation.</returns> Task PublishAsync(IReadOnlyList<IDomainEvent> @event, CancellationToken cancellationToken);
/// <summary> /// Subscribes to a domain event asynchronously. /// </summary> /// <typeparam name="TEvent">The type of the domain event to subscribe to.</typeparam> /// <typeparam name="TEventHandler">The type of the event handler (callback).</typeparam> /// <param name="cancellationToken">A token to monitor for cancellation requests.</param> /// <returns>A task that represents the asynchronous subscribe operation.</returns> Task SubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken) where TEvent : IDomainEvent where TEventHandler : IEventHandler<TEvent>;
/// <summary> /// Unsubscribes from a domain event asynchronously. /// </summary> /// <typeparam name="TEvent">The type of the domain event to unsubscribe from.</typeparam> /// <typeparam name="TEventHandler">The type of the event handler (callback).</typeparam> /// <param name="cancellationToken">A token to monitor for cancellation requests.</param> /// <returns>A task that represents the asynchronous unsubscribe operation.</returns> Task UnsubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken) where TEvent : IDomainEvent where TEventHandler : IEventHandler<TEvent>;}
¿Cómo Funciona?
KafkaPubSub
actúa como una capa de abstracción para la interacción con Kafka, facilitando la publicación y suscripción de eventos de dominio de manera asíncrona. Al publicar un evento, el servicio se encarga de serializarlo y enviarlo al topic de Kafka correspondiente. Al suscribirse a un evento, KafkaPubSub
configura un consumidor que recibe los mensajes del topic y los entrega al manejador de eventos registrado. El servicio también maneja la configuración inicial, la creación de topics y el manejo de errores, además de la deserialización de los mensajes.
Métodos
KafkaPubSub
implementa la interfaz IKafkaPubSub
, que define los siguientes métodos:
PublishAsync
Type: Task PublishAsync(IDomainEvent @event, CancellationToken cancellationToken)
Publica un evento de dominio en un topic de Kafka. Toma como argumentos el evento a publicar y un CancellationToken
para poder cancelar la operación. El método obtiene el topic a partir del tipo del evento, crea un mensaje Kafka con headers adicionales (como la fecha de ocurrencia y el ID del evento), y utiliza un productor de Kafka para enviar el mensaje.
PublishAsync (sobrecarga)
Type: Task PublishAsync(IReadOnlyList<IDomainEvent> @event, CancellationToken cancellationToken)
Publica una lista de eventos de dominio en Kafka de forma asíncrona. Toma una lista de eventos y un CancellationToken
, y llama al método PublishAsync
para cada evento en la lista.
SubscribeAsync
Type: Task SubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken)
Se suscribe a un topic de Kafka para un tipo de evento específico. Recibe un CancellationToken
y los tipos del evento y su manejador. El método obtiene el nombre del topic a partir del tipo de evento, espera a que el topic esté creado, y luego inicia un consumidor de Kafka que recibe los mensajes del topic y los pasa al manejador de eventos correspondiente.
UnsubscribeAsync
Type: Task UnsubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken)
Cancela la suscripción de un evento y su manejador de un topic de kafka. Recibe el CancellationToken
, el tipo de evento y su manejador.
Implementación
using CodeDesignPlus.Net.Core.Abstractions.Options;using CodeDesignPlus.Net.Exceptions;
namespace CodeDesignPlus.Net.Kafka.Services;
/// <summary>/// KafkaPubSub service for publishing and subscribing to Kafka topics./// </summary>/// <remarks>/// Initializes a new instance of the <see cref="KafkaPubSub"/> class./// </remarks>/// <param name="logger">The logger instance.</param>/// <param name="domainEventResolver">The domain event resolver service.</param>/// <param name="kafkaOptions">The Kafka options.</param>/// <param name="serviceProvider">The service provider.</param>/// <param name="coreOptions">The core options.</param>public class KafkaPubSub(ILogger<KafkaPubSub> logger, IDomainEventResolver domainEventResolver, IOptions<KafkaOptions> kafkaOptions, IServiceProvider serviceProvider, IOptions<CoreOptions> coreOptions) : IKafkaPubSub{ /// <summary> /// Publishes an event to Kafka. /// </summary> /// <param name="event">The event to publish.</param> /// <param name="cancellationToken">The cancellation token.</param> /// <returns>A task that represents the asynchronous publish operation.</returns> /// <exception cref="ArgumentNullException">Thrown when the event is null.</exception> public async Task PublishAsync(IDomainEvent @event, CancellationToken cancellationToken) { var type = @event.GetType();
logger.LogInformation("Starting to publish event to Kafka. Event type: {EventType}", type.Name);
var topic = domainEventResolver.GetKeyDomainEvent(type);
var headers = new Headers { { "OccurredAt", Encoding.UTF8.GetBytes(@event.OccurredAt.ToString()) }, { "EventId", Encoding.UTF8.GetBytes(@event.EventId.ToString()) }, };
foreach (var item in @event.Metadata.Where(x => x.Value != null)) { headers.Add(item.Key, Encoding.UTF8.GetBytes(item.Value.ToString())); }
var message = new Message<string, IDomainEvent> { Key = @event.EventId.ToString(), Value = @event, Headers = headers };
var producer = serviceProvider.GetRequiredService<IProducer<string, IDomainEvent>>();
await producer.ProduceAsync(topic, message, cancellationToken).ConfigureAwait(false);
logger.LogInformation("Event published to Kafka successfully. Event type: {EventType}", @event.GetType().Name); }
/// <summary> /// Publishes a list of events to Kafka. /// </summary> /// <param name="event">The list of events to publish.</param> /// <param name="cancellationToken">The cancellation token.</param> /// <returns>A task that represents the asynchronous publish operation.</returns> /// <exception cref="ArgumentNullException">Thrown when the events list is null.</exception> public Task PublishAsync(IReadOnlyList<IDomainEvent> @event, CancellationToken cancellationToken) { var tasks = @event.Select(@event => this.PublishAsync(@event, cancellationToken));
return Task.WhenAll(tasks); }
/// <summary> /// Subscribes to a Kafka topic for a specific event type and event handler. /// </summary> /// <typeparam name="TEvent">The type of the event.</typeparam> /// <typeparam name="TEventHandler">The type of the event handler.</typeparam> /// <param name="cancellationToken">The cancellation token.</param> /// <returns>A task that represents the asynchronous subscribe operation.</returns> /// <exception cref="ArgumentNullException">Thrown when the event type or event handler is null.</exception> public async Task SubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken) where TEvent : IDomainEvent where TEventHandler : IEventHandler<TEvent> { var topic = domainEventResolver.GetKeyDomainEvent<TEvent>();
logger.LogInformation("{EventType} | Subscribing to Kafka topic {Topic} ", typeof(TEvent).Name, topic);
await WaitTopicCreatedAsync<TEvent>(topic, cancellationToken).ConfigureAwait(false);
await SubscribeTopicAsync<TEvent, TEventHandler>(topic, cancellationToken).ConfigureAwait(false); }
/// <summary> /// Waits for a Kafka topic to be created. /// </summary> /// <typeparam name="TEvent">The type of the event.</typeparam> /// <param name="topic">The Kafka topic.</param> /// <param name="cancellationToken">The cancellation token.</param> /// <returns>A task that represents the asynchronous wait operation.</returns> /// <exception cref="ArgumentNullException">Thrown when the topic is null.</exception> internal async Task WaitTopicCreatedAsync<TEvent>(string topic, CancellationToken cancellationToken) where TEvent : IDomainEvent { using var adminClient = new AdminClientBuilder(kafkaOptions.Value.AdminClientConfig).Build();
var attempt = 0;
while (!cancellationToken.IsCancellationRequested) { var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(10)); bool topicExists = metadata.Topics.Exists(t => t.Topic == topic);
if (topicExists) { break; }
attempt++; if (attempt >= kafkaOptions.Value.MaxAttempts) { logger.LogWarning("{EventType} | The topic {Topic} does not exist after {MaxAttempts} attempts. Exiting.", typeof(TEvent).Name, topic, kafkaOptions.Value.MaxAttempts);
return; }
logger.LogInformation("{EventType} | The topic {Topic} does not exist, waiting for it to be created.", typeof(TEvent).Name, topic);
await Task.Delay(1000, CancellationToken.None).ConfigureAwait(false); } }
/// <summary> /// Subscribes to a Kafka topic for a specific event type and event handler. /// </summary> /// <typeparam name="TEvent">The type of the event.</typeparam> /// <typeparam name="TEventHandler">The type of the event handler.</typeparam> /// <param name="topic">The Kafka topic.</param> /// <param name="cancellationToken">The cancellation token.</param> /// <returns>A task that represents the asynchronous subscribe operation.</returns> /// <exception cref="ArgumentNullException">Thrown when the topic is null.</exception> internal async Task SubscribeTopicAsync<TEvent, TEventHandler>(string topic, CancellationToken cancellationToken) where TEvent : IDomainEvent where TEventHandler : IEventHandler<TEvent> { using var consumer = GetConsumer<TEvent>().Build();
cancellationToken.Register(consumer.Close);
consumer.Subscribe(topic);
while (!cancellationToken.IsCancellationRequested) { try { logger.LogInformation("{EventType} | Listener the event {Topic}", typeof(TEvent).Name, topic);
using var scope = serviceProvider.CreateScope();
var context = scope.ServiceProvider.GetRequiredService<IEventContext>();
var value = consumer.Consume(cancellationToken);
context.SetCurrentDomainEvent(value.Message.Value);
var eventHandler = scope.ServiceProvider.GetRequiredService<TEventHandler>();
await eventHandler.HandleAsync(value.Message.Value, cancellationToken).ConfigureAwait(false);
logger.LogInformation("{EventType} | End Listener the event {Topic}", typeof(TEvent).Name, topic); } catch (CodeDesignPlusException ex) { logger.LogWarning(ex, "Warning processing event: {TEvent} | {Message}.", typeof(TEvent).Name, ex.Message); } catch (Exception ex) { logger.LogError(ex, "{EventType} | An error occurred while consuming a Kafka message for event topic: {Topic} | {Message}", typeof(TEvent).Name, topic, ex.Message); } } }
/// <summary> /// Gets a Kafka consumer for a specific event type. /// </summary> /// <typeparam name="TEvent">The type of the event.</typeparam> /// <returns>A consumer builder for the specified event type.</returns> internal ConsumerBuilder<string, TEvent> GetConsumer<TEvent>() where TEvent : IDomainEvent { var consumerConfig = kafkaOptions.Value.ConsumerConfig;
consumerConfig.GroupId = coreOptions.Value.AppName;
var consumerBuilder = new ConsumerBuilder<string, TEvent>(consumerConfig);
consumerBuilder.SetValueDeserializer(new JsonSystemTextSerializer<TEvent>()); return consumerBuilder; }
/// <summary> /// Unsubscribes from a Kafka topic for a specific event type and event handler. /// </summary> /// <typeparam name="TEvent">The type of the event.</typeparam> /// <typeparam name="TEventHandler">The type of the event handler.</typeparam> /// <param name="cancellationToken">The cancellation token.</param> /// <returns>A task that represents the asynchronous unsubscribe operation.</returns> /// <exception cref="ArgumentNullException">Thrown when the event type or event handler is null.</exception> public Task UnsubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken) where TEvent : IDomainEvent where TEventHandler : IEventHandler<TEvent> { logger.LogInformation("Unsubscribing from event {EventType} for handler {EventHandlerType}", typeof(TEvent).Name, typeof(TEventHandler).Name);
var consumer = serviceProvider.GetRequiredService<IConsumer<string, TEvent>>(); consumer.Unsubscribe();
return Task.CompletedTask; }}
El servicio KafkaPubSub
se implementa como una clase que hereda de IKafkaPubSub
y se encarga de:
- Recibir dependencias: Recibe una instancia de
ILogger<KafkaPubSub>
para logging,IDomainEventResolver
para obtener el nombre del topic,IOptions<KafkaOptions>
para obtener la configuración de Kafka yIServiceProvider
para resolver dependencias (productores y consumidores). - Publicar eventos: Al recibir un evento, se obtiene el topic asociado al tipo del evento, se crea un mensaje con metadata y se utiliza un productor de Kafka (
IProducer<string, IDomainEvent>
) para enviar el mensaje al topic. - Suscribirse a eventos: Al suscribirse a un evento, se obtiene el topic a partir del tipo del evento, se verifica que el topic exista y se crea un consumidor de Kafka (
ConsumerBuilder<string, TEvent>
) para recibir los mensajes del topic, procesándolos y entregándolos al event handler correspondiente. - Manejo de topics: Se asegura de que el topic exista antes de iniciar el consumidor, esperando hasta que el topic sea creado o hasta que se alcance el número máximo de intentos.
- Deserialización: Utiliza un deserializador que utiliza
System.Text.Json
para los mensajes. - Manejo de errores: Captura excepciones durante el procesamiento de mensajes y las registra en el log.
Las dependencias del servicio son:
ILogger<KafkaPubSub>
: Para el registro de eventos.IDomainEventResolver
: Para obtener el nombre del topic a partir del tipo de evento.IOptions<KafkaOptions>
: Para la configuración de Kafka.IServiceProvider
: Para la resolución de dependencias, como el productor y el consumidor.
La clase KafkaOptions
contendría al menos las siguientes propiedades:
AdminClientConfig
(Dictionary<string, string>
): Configuración para el admin client.ConsumerConfig
(Dictionary<string, string>
): Configuración para el consumidor.MaxAttempts
(int): Numero máximo de intentos para la creación del topic.
Ejemplo de Uso
En este ejemplo, se muestra cómo la librería CodeDesignPlus.Net.PubSub
hace uso de la interfaz IMessage
para publicar eventos de dominio de forma asíncrona. La clase PubSubService
implementa la interfaz IPubSub
y utiliza la inyección de dependencias para obtener una instancia de IMessage
y otros servicios necesarios. El método PublishAsync
se encarga de publicar eventos de dominio, verificando si se debe usar una cola para encolar los eventos o si se deben publicar directamente. Si se usa una cola, se obtiene una instancia de IEventQueue
y se encola el evento; de lo contrario, se publica directamente utilizando IMessage
.
namespace CodeDesignPlus.Net.PubSub.Services;
/// <summary>/// Service responsible for publishing domain events./// </summary>public class PubSubService : IPubSub{ private readonly IMessage message; private readonly IOptions<PubSubOptions> options; private readonly IServiceProvider serviceProvider; private readonly ILogger<PubSubService> logger;
/// <summary> /// Initializes a new instance of the <see cref="PubSubService"/> class. /// </summary> /// <param name="message">The message service used for publishing events.</param> /// <param name="options">The options for configuring the PubSub service.</param> /// <param name="serviceProvider">The service provider for resolving dependencies.</param> /// <param name="logger">The logger for logging information.</param> /// <exception cref="ArgumentNullException">Thrown when any of the parameters are null.</exception> public PubSubService(IMessage message, IOptions<PubSubOptions> options, IServiceProvider serviceProvider, ILogger<PubSubService> logger) { ArgumentNullException.ThrowIfNull(message); ArgumentNullException.ThrowIfNull(options); ArgumentNullException.ThrowIfNull(serviceProvider); ArgumentNullException.ThrowIfNull(logger);
this.message = message; this.options = options; this.serviceProvider = serviceProvider; this.logger = logger;
this.logger.LogDebug("PubSubService initialized."); }
/// <summary> /// Publishes a single domain event asynchronously. /// </summary> /// <param name="event">The domain event to publish.</param> /// <param name="cancellationToken">A token to monitor for cancellation requests.</param> /// <returns>A task that represents the asynchronous publish operation.</returns> public Task PublishAsync(IDomainEvent @event, CancellationToken cancellationToken) { if (this.options.Value.UseQueue) { this.logger.LogDebug("UseQueue is true, enqueuing event of type {Name}.", @event.GetType().Name);
var eventQueueService = this.serviceProvider.GetRequiredService<IEventQueue>();
return eventQueueService.EnqueueAsync(@event, cancellationToken); }
this.logger.LogDebug("UseQueue is false, publishing event of type {Name}.", @event.GetType().Name);
return this.message.PublishAsync(@event, cancellationToken); }
/// <summary> /// Publishes a list of domain events asynchronously. /// </summary> /// <param name="event">The list of domain events to publish.</param> /// <param name="cancellationToken">A token to monitor for cancellation requests.</param> /// <returns>A task that represents the asynchronous publish operation.</returns> public Task PublishAsync(IReadOnlyList<IDomainEvent> @event, CancellationToken cancellationToken) { var tasks = @event.Select(@event => this.PublishAsync(@event, cancellationToken));
return Task.WhenAll(tasks); }}
Conclusiones
KafkaPubSub
proporciona una solución robusta y eficiente para la comunicación asíncrona mediante Kafka en aplicaciones .NET. Su abstracción para la publicación y suscripción, combinada con un manejo adecuado de topics, productores y consumidores, simplifica el desarrollo de sistemas distribuidos basados en eventos. Es importante que la configuración de KafkaOptions
sea la correcta y se registren los productores y consumidores de forma correcta usando los builders.