Skip to content

Overview

Apache Kafka es un sistema de mensajería distribuida que permite la publicación y suscripción de mensajes en tiempo real. CodeDesignPlus.Net.Kafka es una librería que facilita la implementación de productores y consumidores de mensajes en .NET Core.


CodeDesignPlus.Net.Kafka es una librería diseñada para gestionar y administrar la conexión con Apache Kafka en aplicaciones .NET Core. Proporciona una interfaz simple y flexible para publicar y suscribir mensajes en Kafka, permitiendo a los desarrolladores construir sistemas basados en eventos de forma sencilla y eficiente. Esta librería se integra con CodeDesignPlus.Net.PubSub para facilitar la implementación de sistemas distribuidos y escalables en .NET Core.


  • Implementación de la interfaz IMessage de CodeDesignPlus.Net.PubSub.
  • Integración simple con Apache Kafka: Configuración fácil de productores y consumidores.
  • Manejo de mensajes resiliente: Soporte para reintentos y control de errores.
  • Configuración flexible: Permite personalizar detalles como serialización, particiones y tiempos de espera.
  • Monitoreo y registros: Compatible con sistemas de logging para facilitar la depuración y el monitoreo.
  • Soporte para DDD y arquitectura hexagonal: Encaja perfectamente con patrones modernos de desarrollo.

  • Procesamiento de eventos: Implementación de sistemas basados en eventos utilizando Apache Kafka como broker de mensajería.
  • Integraciones asincrónicas: Comunicación entre microservicios mediante publicación y suscripción a eventos.
  • Sistemas distribuidos: Coordinación de datos y tareas entre aplicaciones distribuidas.

  • KafkaPubSub: Implementación de la interfaz IMessage de CodeDesignPlus.Net.PubSub para la publicación y suscripción de mensajes en Apache Kafka.
  • KafkaOptions: Clase de configuración para personalizar la conexión y el comportamiento de los productores y consumidores.
  • Directorysrc
    • DirectoryCodeDesignPlus.Net.Kafka
      • DirectoryExceptions
        • KafkaException.cs
      • DirectoryExtensions
        • ServiceCollectionExtensions.cs
      • DirectoryOptions
        • KafkaOptions.cs
      • DirectorySerializer
        • JsonSystemTextSerializer.cs
      • DirectoryServices
        • KafkaPubSub.cs
    • DirectoryCodeDesignPlus.Net.Kafka.Abstractions
      • IKafkaPubSub.cs

En esta sección, aprenderás a instalar y configurar CodeDesignPlus.Net.Kafka en tu proyecto .NET Core, así como a utilizar sus principales características para publicar y consumir mensajes en Apache Kafka.

  • .NET 8 o superior.
  • Conocimientos básicos de patrones de diseño y sistemas distribuidos.
  • Conocimientos sobre la arquitectura orientada a eventos (Event-Driven).
  • Inyección de dependencias en aplicaciones .NET.

Para instalar CodeDesignPlus.Net.Kafka, agrega el paquete NuGet a tu proyecto:

Terminal window
dotnet add package CodeDesignPlus.Net.Kafka
  1. Asignar las opciones de configuración en el appsettings.json:

    {
    "Logging": {
    "LogLevel": {
    "Default": "Information",
    "Microsoft.Hosting.Lifetime": "Information"
    }
    },
    "Core": {
    "Business": "CodeDesignPlus",
    "AppName": "sample-kafka-producer",
    "Version": "v1",
    "Description": "Sample of CodeDesignPlus.Net.Core",
    "Contact": {
    "Name": "CodeDesignPlus",
    "Email": "custom@outlook.com"
    }
    },
    "Kafka": {
    "Enable": true,
    "UseQueue": false,
    "BootstrapServers": "localhost:9093",
    "Acks": "All",
    "BatchSize": 16384,
    "LingerMs": 5,
    "CompressionType": "Snappy",
    "NameMicroservice": "MyMicroservice",
    "SecurityProtocol": "Plaintext",
    "MaxAttempts": 60
    }
    }
  2. Registra los servicios el contenedor de dependencias de tu aplicación:

    // ...
    serviceCollection.AddKafka(configuration);

El proyecto de ejemplo CodeDesignPlus.Net.Kafka.Sample contiene dos ejemplos, uno para productores y otro para consumidores, que ilustran cómo utilizar CodeDesignPlus.Net.Kafka en una aplicación .NET Core.

El proyecto de ejemplo CodeDesignPlus.Net.Kafka.Producer.Sample contiene un ejemplo de cómo publicar mensajes en Apache Kafka.

// See https://aka.ms/new-console-template for more information
using CodeDesignPlus.Net.Kafka.Extensions;
using CodeDesignPlus.Net.Kafka.Producer.Sample;
using CodeDesignPlus.Net.PubSub.Abstractions;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
var configuration = new ConfigurationBuilder()
.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true)
.Build();
var serviceCollection = new ServiceCollection();
serviceCollection.AddLogging();
serviceCollection.AddKafka(configuration);
var serviceProvider = serviceCollection.BuildServiceProvider();
var pubSub = serviceProvider.GetRequiredService<IPubSub>();
var userCreatedDomainEvent = new UserCreatedDomainEvent(Guid.NewGuid(), "John Doe", "john.doe@codedesignplus.com");
await pubSub.PublishAsync(userCreatedDomainEvent, CancellationToken.None);
Console.WriteLine("Message published successfully");
Console.ReadLine();
  1. Configuración de Kafka en appsettings.json:

    {
    "Logging": {
    "LogLevel": {
    "Default": "Information",
    "Microsoft.Hosting.Lifetime": "Information"
    }
    },
    "Core": {
    "Business": "CodeDesignPlus",
    "AppName": "sample-kafka-producer",
    "Version": "v1",
    "Description": "Sample of CodeDesignPlus.Net.Core",
    "Contact": {
    "Name": "CodeDesignPlus",
    "Email": "custom@outlook.com"
    }
    },
    "Kafka": {
    "Enable": true,
    "UseQueue": false,
    "BootstrapServers": "localhost:9093",
    "Acks": "All",
    "BatchSize": 16384,
    "LingerMs": 5,
    "CompressionType": "Snappy",
    "NameMicroservice": "MyMicroservice",
    "SecurityProtocol": "Plaintext",
    "MaxAttempts": 60
    }
    }
  2. Definir la entidad de usuario:

    Hemos creado la entidad UserEntity, la cual estará vinculada al mensaje que se publicará en Kafka. Esto proporcionará al consumidor el contexto necesario sobre la entidad que ha experimentado una modificación.

    using System;
    using CodeDesignPlus.Net.Core.Abstractions;
    namespace CodeDesignPlus.Net.Kafka.Producer.Sample;
    public class UserEntity : IEntity
    {
    public Guid Id { get; set; }
    public bool IsActive { get; set; }
    public Instant CreatedAt { get; set; }
    public Guid CreatedBy { get; set; }
    public Instant? UpdatedAt { get; set; }
    public Guid? UpdatedBy { get; set; }
    public required string Name { get; set; }
    public required string Email { get; set; }
    public string? Password { get; set; }
    }
  3. Creamos los eventos de dominio:

    Hemos creado el evento de dominio UserCreatedDomainEvent, el cual se publicará en Kafka cuando se cree un nuevo usuario.

    using System;
    using CodeDesignPlus.Net.Core.Abstractions;
    using CodeDesignPlus.Net.Core.Abstractions.Attributes;
    namespace CodeDesignPlus.Net.Kafka.Producer.Sample;
    [EventKey<UserEntity>(1, "created")]
    public class UserCreatedDomainEvent(
    Guid aggregateId,
    string name,
    string email,
    string? password = null,
    Guid? eventId = null,
    Instant? occurredAt = null,
    Dictionary<string, object>? metadata = null
    ) : DomainEvent(aggregateId, eventId, occurredAt, metadata)
    {
    public string Name { get; } = name;
    public string Email { get; } = email;
    public string? Password { get; set; } = password;
    public static UserCreatedDomainEvent Create(Guid aggregateId, string name, string email, string? password = null)
    {
    return new UserCreatedDomainEvent(aggregateId, name, email, password);
    }
    }
  4. Registraremos los servicios necesarios en el contenedor de dependencias.

    serviceCollection.AddKafka(configuration);
  5. Obtenemos la instancia de IPubSub:

    var producer = serviceProvider.GetRequiredService<IPubSub>();
  6. Creamos y publicamos el evento UserCreatedDomainEvent:

    var userCreatedDomainEvent = new UserCreatedDomainEvent(Guid.NewGuid(), "John Doe", "john.doe@codedesignplus.com");
    await producer.PublishAsync(userCreatedDomainEvent, CancellationToken.None);

CodeDesignPlus.Net.Kafka proporciona una serie de métodos de extensión para facilitar la configuración y el uso de los productores y consumidores de mensajes en Apache Kafka.

ServiceCollectionExtensions contiene los métodos de extensión para registrar los servicios necesarios en el contenedor de dependencias.


CodeDesignPlus.Net.Kafka utiliza la clase KafkaOptions para personalizar la configuración de los productores y consumidores de mensajes en Apache Kafka.


La librería CodeDesignPlus.Net.Kafka proporciona una serie de servicios para facilitar la producción y el consumo de mensajes en Apache Kafka.

KafkaPubSub es una implementación de la interfaz IMessage de CodeDesignPlus.Net.PubSub que permite publicar y suscribir mensajes en Apache Kafka.


CodeDesignPlus.Net.Kafka facilita la implementación de sistemas basados en Apache Kafka en aplicaciones .NET, ofreciendo una API robusta y flexible para manejar tareas comunes y complejas en mensajería distribuida. Esto permite a los desarrolladores enfocarse en la lógica del negocio mientras aprovechan las capacidades de Kafka para construir soluciones escalables y resilientes.