Skip to content

Overview

Apache Kafka es un sistema de mensajería distribuida que permite la publicación y suscripción de mensajes en tiempo real. CodeDesignPlus.Net.Kafka es una librería que facilita la implementación de productores y consumidores de mensajes en .NET Core.

Propósito y alcance


CodeDesignPlus.Net.Kafka es una librería diseñada para gestionar y administrar la conexión con Apache Kafka en aplicaciones .NET Core. Proporciona una interfaz simple y flexible para publicar y suscribir mensajes en Kafka, permitiendo a los desarrolladores construir sistemas basados en eventos de forma sencilla y eficiente. Esta librería se integra con CodeDesignPlus.Net.PubSub para facilitar la implementación de sistemas distribuidos y escalables en .NET Core.

Principales características


  • Implementación de la interfaz IMessage de CodeDesignPlus.Net.PubSub.
  • Integración simple con Apache Kafka: Configuración fácil de productores y consumidores.
  • Manejo de mensajes resiliente: Soporte para reintentos y control de errores.
  • Configuración flexible: Permite personalizar detalles como serialización, particiones y tiempos de espera.
  • Monitoreo y registros: Compatible con sistemas de logging para facilitar la depuración y el monitoreo.
  • Soporte para DDD y arquitectura hexagonal: Encaja perfectamente con patrones modernos de desarrollo.

Casos de uso típicos


  • Procesamiento de eventos: Implementación de sistemas basados en eventos utilizando Apache Kafka como broker de mensajería.
  • Integraciones asincrónicas: Comunicación entre microservicios mediante publicación y suscripción a eventos.
  • Sistemas distribuidos: Coordinación de datos y tareas entre aplicaciones distribuidas.

Componentes principales


  • KafkaPubSub: Implementación de la interfaz IMessage de CodeDesignPlus.Net.PubSub para la publicación y suscripción de mensajes en Apache Kafka.
  • KafkaOptions: Clase de configuración para personalizar la conexión y el comportamiento de los productores y consumidores.
  • Directorysrc
    • DirectoryCodeDesignPlus.Net.Kafka
      • DirectoryExceptions
        • KafkaException.cs
      • DirectoryExtensions
        • ServiceCollectionExtensions.cs
      • DirectoryOptions
        • KafkaOptions.cs
      • DirectorySerializer
        • JsonSystemTextSerializer.cs
      • DirectoryServices
        • KafkaPubSub.cs
    • DirectoryCodeDesignPlus.Net.Kafka.Abstractions
      • IKafkaPubSub.cs

Primeros pasos


En esta sección, aprenderás a instalar y configurar CodeDesignPlus.Net.Kafka en tu proyecto .NET Core, así como a utilizar sus principales características para publicar y consumir mensajes en Apache Kafka.

Requisitos previos

  • .NET 8 o superior.
  • Conocimientos básicos de patrones de diseño y sistemas distribuidos.
  • Conocimientos sobre la arquitectura orientada a eventos (Event-Driven).
  • Inyección de dependencias en aplicaciones .NET.

Instalación

Para instalar CodeDesignPlus.Net.Kafka, agrega el paquete NuGet a tu proyecto:

Terminal window
dotnet add package CodeDesignPlus.Net.Kafka

Configuración básica

  1. Asignar las opciones de configuración en el appsettings.json:

    {
    "Logging": {
    "LogLevel": {
    "Default": "Information",
    "Microsoft.Hosting.Lifetime": "Information"
    }
    },
    "Core": {
    "Business": "CodeDesignPlus",
    "AppName": "sample-kafka-producer",
    "Version": "v1",
    "Description": "Sample of CodeDesignPlus.Net.Core",
    "Contact": {
    "Name": "CodeDesignPlus",
    "Email": "custom@outlook.com"
    }
    },
    "Kafka": {
    "Enable": true,
    "UseQueue": false,
    "BootstrapServers": "localhost:9093",
    "Acks": "All",
    "BatchSize": 16384,
    "LingerMs": 5,
    "CompressionType": "Snappy",
    "NameMicroservice": "MyMicroservice",
    "SecurityProtocol": "Plaintext",
    "MaxAttempts": 60
    }
    }
  2. Registra los servicios el contenedor de dependencias de tu aplicación:

    // ...
    serviceCollection.AddKafka(configuration);

Ejemplo rápido


El proyecto de ejemplo CodeDesignPlus.Net.Kafka.Sample contiene dos ejemplos, uno para productores y otro para consumidores, que ilustran cómo utilizar CodeDesignPlus.Net.Kafka en una aplicación .NET Core.

El proyecto de ejemplo CodeDesignPlus.Net.Kafka.Producer.Sample contiene un ejemplo de cómo publicar mensajes en Apache Kafka.

// See https://aka.ms/new-console-template for more information
using CodeDesignPlus.Net.Kafka.Extensions;
using CodeDesignPlus.Net.Kafka.Producer.Sample;
using CodeDesignPlus.Net.PubSub.Abstractions;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
var configuration = new ConfigurationBuilder()
.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true)
.Build();
var serviceCollection = new ServiceCollection();
serviceCollection.AddLogging();
serviceCollection.AddKafka(configuration);
var serviceProvider = serviceCollection.BuildServiceProvider();
var pubSub = serviceProvider.GetRequiredService<IPubSub>();
var userCreatedDomainEvent = new UserCreatedDomainEvent(Guid.NewGuid(), "John Doe", "john.doe@codedesignplus.com");
await pubSub.PublishAsync(userCreatedDomainEvent, CancellationToken.None);
Console.WriteLine("Message published successfully");
Console.ReadLine();
  1. Configuración de Kafka en appsettings.json:

    {
    "Logging": {
    "LogLevel": {
    "Default": "Information",
    "Microsoft.Hosting.Lifetime": "Information"
    }
    },
    "Core": {
    "Business": "CodeDesignPlus",
    "AppName": "sample-kafka-producer",
    "Version": "v1",
    "Description": "Sample of CodeDesignPlus.Net.Core",
    "Contact": {
    "Name": "CodeDesignPlus",
    "Email": "custom@outlook.com"
    }
    },
    "Kafka": {
    "Enable": true,
    "UseQueue": false,
    "BootstrapServers": "localhost:9093",
    "Acks": "All",
    "BatchSize": 16384,
    "LingerMs": 5,
    "CompressionType": "Snappy",
    "NameMicroservice": "MyMicroservice",
    "SecurityProtocol": "Plaintext",
    "MaxAttempts": 60
    }
    }
  2. Definir la entidad de usuario:

    Hemos creado la entidad UserEntity, la cual estará vinculada al mensaje que se publicará en Kafka. Esto proporcionará al consumidor el contexto necesario sobre la entidad que ha experimentado una modificación.

    using System;
    using CodeDesignPlus.Net.Core.Abstractions;
    namespace CodeDesignPlus.Net.Kafka.Producer.Sample;
    public class UserEntity : IEntity
    {
    public Guid Id { get; set; }
    public bool IsActive { get; set; }
    public Instant CreatedAt { get; set; }
    public Guid CreatedBy { get; set; }
    public Instant? UpdatedAt { get; set; }
    public Guid? UpdatedBy { get; set; }
    public required string Name { get; set; }
    public required string Email { get; set; }
    public string? Password { get; set; }
    }
  3. Creamos los eventos de dominio:

    Hemos creado el evento de dominio UserCreatedDomainEvent, el cual se publicará en Kafka cuando se cree un nuevo usuario.

    using System;
    using CodeDesignPlus.Net.Core.Abstractions;
    using CodeDesignPlus.Net.Core.Abstractions.Attributes;
    namespace CodeDesignPlus.Net.Kafka.Producer.Sample;
    [EventKey<UserEntity>(1, "created")]
    public class UserCreatedDomainEvent(
    Guid aggregateId,
    string name,
    string email,
    string? password = null,
    Guid? eventId = null,
    Instant? occurredAt = null,
    Dictionary<string, object>? metadata = null
    ) : DomainEvent(aggregateId, eventId, occurredAt, metadata)
    {
    public string Name { get; } = name;
    public string Email { get; } = email;
    public string? Password { get; set; } = password;
    public static UserCreatedDomainEvent Create(Guid aggregateId, string name, string email, string? password = null)
    {
    return new UserCreatedDomainEvent(aggregateId, name, email, password);
    }
    }
  4. Registraremos los servicios necesarios en el contenedor de dependencias.

    serviceCollection.AddKafka(configuration);
  5. Obtenemos la instancia de IPubSub:

    var producer = serviceProvider.GetRequiredService<IPubSub>();
  6. Creamos y publicamos el evento UserCreatedDomainEvent:

    var userCreatedDomainEvent = new UserCreatedDomainEvent(Guid.NewGuid(), "John Doe", "john.doe@codedesignplus.com");
    await producer.PublishAsync(userCreatedDomainEvent, CancellationToken.None);

Métodos de extensión


CodeDesignPlus.Net.Kafka proporciona una serie de métodos de extensión para facilitar la configuración y el uso de los productores y consumidores de mensajes en Apache Kafka.

ServiceCollectionExtensions

ServiceCollectionExtensions contiene los métodos de extensión para registrar los servicios necesarios en el contenedor de dependencias.

Opciones de configuración


CodeDesignPlus.Net.Kafka utiliza la clase KafkaOptions para personalizar la configuración de los productores y consumidores de mensajes en Apache Kafka.

Servicios


La librería CodeDesignPlus.Net.Kafka proporciona una serie de servicios para facilitar la producción y el consumo de mensajes en Apache Kafka.

KafkaPubSub

KafkaPubSub es una implementación de la interfaz IMessage de CodeDesignPlus.Net.PubSub que permite publicar y suscribir mensajes en Apache Kafka.

Conclusiones


CodeDesignPlus.Net.Kafka facilita la implementación de sistemas basados en Apache Kafka en aplicaciones .NET, ofreciendo una API robusta y flexible para manejar tareas comunes y complejas en mensajería distribuida. Esto permite a los desarrolladores enfocarse en la lógica del negocio mientras aprovechan las capacidades de Kafka para construir soluciones escalables y resilientes.

Recursos adicionales