Overview
Apache Kafka es un sistema de mensajería distribuida que permite la publicación y suscripción de mensajes en tiempo real. CodeDesignPlus.Net.Kafka es una librería que facilita la implementación de productores y consumidores de mensajes en .NET Core.
Propósito y alcance
CodeDesignPlus.Net.Kafka
es una librería diseñada para gestionar y administrar la conexión con Apache Kafka en aplicaciones .NET Core. Proporciona una interfaz simple y flexible para publicar y suscribir mensajes en Kafka, permitiendo a los desarrolladores construir sistemas basados en eventos de forma sencilla y eficiente. Esta librería se integra con CodeDesignPlus.Net.PubSub
para facilitar la implementación de sistemas distribuidos y escalables en .NET Core.
Principales características
- Implementación de la interfaz
IMessage
deCodeDesignPlus.Net.PubSub
. - Integración simple con Apache Kafka: Configuración fácil de productores y consumidores.
- Manejo de mensajes resiliente: Soporte para reintentos y control de errores.
- Configuración flexible: Permite personalizar detalles como serialización, particiones y tiempos de espera.
- Monitoreo y registros: Compatible con sistemas de logging para facilitar la depuración y el monitoreo.
- Soporte para DDD y arquitectura hexagonal: Encaja perfectamente con patrones modernos de desarrollo.
Casos de uso típicos
- Procesamiento de eventos: Implementación de sistemas basados en eventos utilizando Apache Kafka como broker de mensajería.
- Integraciones asincrónicas: Comunicación entre microservicios mediante publicación y suscripción a eventos.
- Sistemas distribuidos: Coordinación de datos y tareas entre aplicaciones distribuidas.
Componentes principales
KafkaPubSub
: Implementación de la interfazIMessage
deCodeDesignPlus.Net.PubSub
para la publicación y suscripción de mensajes en Apache Kafka.KafkaOptions
: Clase de configuración para personalizar la conexión y el comportamiento de los productores y consumidores.
Directorysrc
DirectoryCodeDesignPlus.Net.Kafka
DirectoryExceptions
- KafkaException.cs
DirectoryExtensions
- ServiceCollectionExtensions.cs
DirectoryOptions
- KafkaOptions.cs
DirectorySerializer
- JsonSystemTextSerializer.cs
DirectoryServices
- KafkaPubSub.cs
DirectoryCodeDesignPlus.Net.Kafka.Abstractions
- IKafkaPubSub.cs
Primeros pasos
En esta sección, aprenderás a instalar y configurar CodeDesignPlus.Net.Kafka
en tu proyecto .NET Core, así como a utilizar sus principales características para publicar y consumir mensajes en Apache Kafka.
Requisitos previos
- .NET 8 o superior.
- Conocimientos básicos de patrones de diseño y sistemas distribuidos.
- Conocimientos sobre la arquitectura orientada a eventos (Event-Driven).
- Inyección de dependencias en aplicaciones .NET.
Instalación
Para instalar CodeDesignPlus.Net.Kafka
, agrega el paquete NuGet a tu proyecto:
dotnet add package CodeDesignPlus.Net.Kafka
Install-Package CodeDesignPlus.Net.Kafka
<PackageReference Include="CodeDesignPlus.Net.Kafka" Version="1.0.0" />
Configuración básica
-
Asignar las opciones de configuración en el
appsettings.json
:{"Logging": {"LogLevel": {"Default": "Information","Microsoft.Hosting.Lifetime": "Information"}},"Core": {"Business": "CodeDesignPlus","AppName": "sample-kafka-producer","Version": "v1","Description": "Sample of CodeDesignPlus.Net.Core","Contact": {"Name": "CodeDesignPlus","Email": "custom@outlook.com"}},"Kafka": {"Enable": true,"UseQueue": false,"BootstrapServers": "localhost:9093","Acks": "All","BatchSize": 16384,"LingerMs": 5,"CompressionType": "Snappy","NameMicroservice": "MyMicroservice","SecurityProtocol": "Plaintext","MaxAttempts": 60}} -
Registra los servicios el contenedor de dependencias de tu aplicación:
// ...serviceCollection.AddKafka(configuration);
Ejemplo rápido
El proyecto de ejemplo CodeDesignPlus.Net.Kafka.Sample contiene dos ejemplos, uno para productores y otro para consumidores, que ilustran cómo utilizar CodeDesignPlus.Net.Kafka
en una aplicación .NET Core.
El proyecto de ejemplo CodeDesignPlus.Net.Kafka.Producer.Sample
contiene un ejemplo de cómo publicar mensajes en Apache Kafka.
// See https://aka.ms/new-console-template for more information
using CodeDesignPlus.Net.Kafka.Extensions;using CodeDesignPlus.Net.Kafka.Producer.Sample;using CodeDesignPlus.Net.PubSub.Abstractions;using Microsoft.Extensions.Configuration;using Microsoft.Extensions.DependencyInjection;
var configuration = new ConfigurationBuilder() .AddJsonFile("appsettings.json", optional: false, reloadOnChange: true) .Build();
var serviceCollection = new ServiceCollection();
serviceCollection.AddLogging();serviceCollection.AddKafka(configuration);
var serviceProvider = serviceCollection.BuildServiceProvider();
var pubSub = serviceProvider.GetRequiredService<IPubSub>();
var userCreatedDomainEvent = new UserCreatedDomainEvent(Guid.NewGuid(), "John Doe", "john.doe@codedesignplus.com");
await pubSub.PublishAsync(userCreatedDomainEvent, CancellationToken.None);
Console.WriteLine("Message published successfully");
Console.ReadLine();
-
Configuración de Kafka en
appsettings.json
:{"Logging": {"LogLevel": {"Default": "Information","Microsoft.Hosting.Lifetime": "Information"}},"Core": {"Business": "CodeDesignPlus","AppName": "sample-kafka-producer","Version": "v1","Description": "Sample of CodeDesignPlus.Net.Core","Contact": {"Name": "CodeDesignPlus","Email": "custom@outlook.com"}},"Kafka": {"Enable": true,"UseQueue": false,"BootstrapServers": "localhost:9093","Acks": "All","BatchSize": 16384,"LingerMs": 5,"CompressionType": "Snappy","NameMicroservice": "MyMicroservice","SecurityProtocol": "Plaintext","MaxAttempts": 60}} -
Definir la entidad de usuario:
Hemos creado la entidad
UserEntity
, la cual estará vinculada al mensaje que se publicará en Kafka. Esto proporcionará al consumidor el contexto necesario sobre la entidad que ha experimentado una modificación.using System;using CodeDesignPlus.Net.Core.Abstractions;namespace CodeDesignPlus.Net.Kafka.Producer.Sample;public class UserEntity : IEntity{public Guid Id { get; set; }public bool IsActive { get; set; }public Instant CreatedAt { get; set; }public Guid CreatedBy { get; set; }public Instant? UpdatedAt { get; set; }public Guid? UpdatedBy { get; set; }public required string Name { get; set; }public required string Email { get; set; }public string? Password { get; set; }} -
Creamos los eventos de dominio:
Hemos creado el evento de dominio
UserCreatedDomainEvent
, el cual se publicará en Kafka cuando se cree un nuevo usuario.using System;using CodeDesignPlus.Net.Core.Abstractions;using CodeDesignPlus.Net.Core.Abstractions.Attributes;namespace CodeDesignPlus.Net.Kafka.Producer.Sample;[EventKey<UserEntity>(1, "created")]public class UserCreatedDomainEvent(Guid aggregateId,string name,string email,string? password = null,Guid? eventId = null,Instant? occurredAt = null,Dictionary<string, object>? metadata = null) : DomainEvent(aggregateId, eventId, occurredAt, metadata){public string Name { get; } = name;public string Email { get; } = email;public string? Password { get; set; } = password;public static UserCreatedDomainEvent Create(Guid aggregateId, string name, string email, string? password = null){return new UserCreatedDomainEvent(aggregateId, name, email, password);}} -
Registraremos los servicios necesarios en el contenedor de dependencias.
serviceCollection.AddKafka(configuration); -
Obtenemos la instancia de
IPubSub
:var producer = serviceProvider.GetRequiredService<IPubSub>(); -
Creamos y publicamos el evento
UserCreatedDomainEvent
:var userCreatedDomainEvent = new UserCreatedDomainEvent(Guid.NewGuid(), "John Doe", "john.doe@codedesignplus.com");await producer.PublishAsync(userCreatedDomainEvent, CancellationToken.None);
El proyecto de ejemplo CodeDesignPlus.Net.Kafka.Consumer.Sample
contiene un ejemplo de cómo suscribirse a mensajes en Apache Kafka.
using CodeDesignPlus.Net.Kafka.Extensions;
var builder = Host.CreateApplicationBuilder(args);
builder.Services.AddKafka(builder.Configuration);
var host = builder.Build();host.Run();
-
Configuración de Kafka en
appsettings.json
:{"Logging": {"LogLevel": {"Default": "Information","Microsoft.Hosting.Lifetime": "Information"}},"Core": {"Business": "CodeDesignPlus","AppName": "sample-kafka-consumer","Version": "v1","Description": "Sample of CodeDesignPlus.Net.Core","Contact": {"Name": "CodeDesignPlus","Email": "custom@outlook.com"}},"Kafka": {"Enable": true,"UseQueue": false,"BootstrapServers": "localhost:9093","Acks": "All","BatchSize": 16384,"LingerMs": 5,"CompressionType": "Snappy","SecurityProtocol": "Plaintext","MaxAttempts": 60}} -
Definir la entidad de usuario:
Hemos creado la entidad
UserEntity
, que almacenará la información del evento y contendrá solo los datos necesarios para proporcionar el contexto requerido por el consumidor (proyección).using System;using CodeDesignPlus.Net.Core.Abstractions;namespace CodeDesignPlus.Net.Kafka.Consumer.Sample;public class UserEntity : IEntity{public Guid Id { get; set; }public bool IsActive { get; set; }public Instant CreatedAt { get; set; }public Guid CreatedBy { get; set; }public Instant? UpdatedAt { get; set; }public Guid? UpdatedBy { get; set; }public required string Name { get; set; }public required string Email { get; set; }public string? Password { get; set; }} -
Creamos los eventos de dominio:
Hemos creado el evento de dominio
UserCreatedDomainEvent
, que representa la estructura del evento de dominio a ser consumido.using System;using CodeDesignPlus.Net.Core.Abstractions;using CodeDesignPlus.Net.Core.Abstractions.Attributes;namespace CodeDesignPlus.Net.Kafka.Consumer.Sample;[EventKey<UserEntity>(1, "created", "sample-kafka-producer")]public class UserCreatedEvent(Guid aggregateId,string name,string email,string? password = null,Guid? eventId = null,Instant? occurredAt = null,Dictionary<string, object>? metadata = null) : DomainEvent(aggregateId, eventId, occurredAt, metadata){public string Name { get; } = name;public string Email { get; } = email;public string? Password { get; set; } = password;public static UserCreatedEvent Create(Guid aggregateId, string name, string email, string? password = null){return new UserCreatedEvent(aggregateId, name, email, password);}} -
Creamos el event handler:
Hemos creado el event handler
UserCreatedDomainEventHandler
, el cual se encargará de procesar el evento de dominioUserCreatedDomainEvent
cuando se publique en Kafka.El event handler es registrado en el contenedor de dependencias automaticamente por la librería
CodeDesignPlus.Net.PubSub
.using CodeDesignPlus.Net.PubSub.Abstractions;using CodeDesignPlus.Net.Serializers;namespace CodeDesignPlus.Net.Kafka.Consumer.Sample;public class UserCreatedEventHandler(ILogger<UserCreatedEventHandler> logger) : IEventHandler<UserCreatedEvent>{public Task HandleAsync(UserCreatedEvent data, CancellationToken token){logger.LogDebug("Invoked Event: {Json}", JsonSerializer.Serialize(data));return Task.CompletedTask;}} -
Registraremos los servicios necesarios en el contenedor de dependencias.
builder.Services.AddKafka(builder.Configuration);
Métodos de extensión
CodeDesignPlus.Net.Kafka
proporciona una serie de métodos de extensión para facilitar la configuración y el uso de los productores y consumidores de mensajes en Apache Kafka.
ServiceCollectionExtensions
ServiceCollectionExtensions
contiene los métodos de extensión para registrar los servicios necesarios en el contenedor de dependencias.
Opciones de configuración
CodeDesignPlus.Net.Kafka
utiliza la clase KafkaOptions
para personalizar la configuración de los productores y consumidores de mensajes en Apache Kafka.
Servicios
La librería CodeDesignPlus.Net.Kafka
proporciona una serie de servicios para facilitar la producción y el consumo de mensajes en Apache Kafka.
KafkaPubSub
KafkaPubSub
es una implementación de la interfaz IMessage
de CodeDesignPlus.Net.PubSub
que permite publicar y suscribir mensajes en Apache Kafka.
Conclusiones
CodeDesignPlus.Net.Kafka
facilita la implementación de sistemas basados en Apache Kafka en aplicaciones .NET, ofreciendo una API robusta y flexible para manejar tareas comunes y complejas en mensajería distribuida. Esto permite a los desarrolladores enfocarse en la lógica del negocio mientras aprovechan las capacidades de Kafka para construir soluciones escalables y resilientes.