Skip to content

Options

El patrón de opciones es una técnica común en aplicaciones .NET que permite centralizar la configuración en un solo lugar, facilitando la gestión y validación de las opciones de configuración. En este caso, la clase KafkaOptions proporciona la configuración necesaria para la interacción con Apache Kafka.

KafkaOptions


La clase KafkaOptions proporciona un conjunto de opciones que pueden configurarse en el archivo appsettings.json de tu aplicación. Estas opciones son esenciales para definir el comportamiento y la configuración de la conexión a Kafka, tanto para productores como consumidores y clientes administrativos.

namespace CodeDesignPlus.Net.Kafka.Options;
/// <summary>
/// Options for configuring Kafka.
/// </summary>
public class KafkaOptions : PubSubOptions
{
/// <summary>
/// The name of the section used in the appsettings.
/// </summary>
public static new readonly string Section = "Kafka";
/// <summary>
/// Gets or sets a value indicating whether Kafka is enabled.
/// </summary>
public bool Enable { get; set; }
/// <summary>
/// Gets or sets the bootstrap servers.
/// </summary>
public string BootstrapServers { get; set; }
/// <summary>
/// Gets or sets the acknowledgment level.
/// </summary>
public Acks Acks { get; set; } = Acks.All;
/// <summary>
/// Gets or sets the batch size.
/// </summary>
public int BatchSize { get; set; }
/// <summary>
/// Gets or sets the linger time in milliseconds.
/// </summary>
public int LingerMs { get; set; }
/// <summary>
/// Gets or sets the compression type.
/// </summary>
public CompressionType CompressionType { get; set; } = CompressionType.Snappy;
/// <summary>
/// Gets or sets the security protocol.
/// </summary>
public SecurityProtocol SecurityProtocol { get; set; } = SecurityProtocol.Plaintext;
/// <summary>
/// Gets the producer configuration.
/// </summary>
public ProducerConfig ProducerConfig => new()
{
BootstrapServers = this.BootstrapServers,
Acks = this.Acks,
BatchSize = this.BatchSize,
LingerMs = this.LingerMs,
CompressionType = this.CompressionType,
SecurityProtocol = this.SecurityProtocol
};
/// <summary>
/// Gets the consumer configuration.
/// </summary>
public ConsumerConfig ConsumerConfig => new()
{
BootstrapServers = this.BootstrapServers,
AutoOffsetReset = AutoOffsetReset.Earliest
};
/// <summary>
/// Gets the admin client configuration.
/// </summary>
public AdminClientConfig AdminClientConfig => new()
{
BootstrapServers = this.BootstrapServers
};
/// <summary>
/// Gets or sets the maximum number of attempts.
/// </summary>
public int MaxAttempts { get; set; } = 60;
}

Propiedades

  • Section

    • Tipo: string
    • Descripción: El nombre de la sección en el archivo appsettings.json donde se encuentran estas opciones.
    • Valor edeterminado:** "Kafka"
  • Enable

    • Tipo: bool
    • Descripción: Indica si la funcionalidad de Kafka está habilitada.
    • Valor edeterminado:** false (heredado de PubSubOptions)
  • BootstrapServers

    • Tipo: string
    • Descripción: La lista de servidores de Kafka a los que se conectará.
    • Valor edeterminado:** null
  • Acks

    • Tipo: Acks (enum)
    • Descripción: Nivel de reconocimiento que debe recibir el productor de Kafka.
    • Valor edeterminado:** Acks.All
  • BatchSize

    • Tipo: int
    • Descripción: Tamaño del lote de mensajes que se enviarán a Kafka.
    • Valor edeterminado:** 0 (valor predeterminado implícito de int)
  • LingerMs

    • Tipo: int
    • Descripción: Tiempo máximo en milisegundos que el productor esperará para acumular mensajes antes de enviarlos.
    • Valor edeterminado:** 0 (valor predeterminado implícito de int)
  • CompressionType

    • Tipo: CompressionType (enum)
    • Descripción: Tipo de compresión que se utilizará para los mensajes enviados a Kafka.
    • Valor edeterminado:** CompressionType.Snappy
  • NameMicroservice

    • Tipo: string
    • Descripción: Nombre del microservicio actual. Este valor es requerido.
    • Valor edeterminado:** null
  • SecurityProtocol

    • Tipo: SecurityProtocol (enum)
    • Descripción: Protocolo de seguridad que se utilizará para la conexión con Kafka.
    • Valor edeterminado:** SecurityProtocol.Plaintext
  • ProducerConfig

    • Tipo: ProducerConfig
    • Descripción: Configuración del productor de Kafka, generada a partir de las opciones de KafkaOptions.
    • Valor edeterminado:** Objeto ProducerConfig generado dinámicamente.
  • ConsumerConfig

    • Tipo: ConsumerConfig
    • Descripción: Configuración del consumidor de Kafka, generada a partir de las opciones de KafkaOptions.
    • Valor edeterminado:** Objeto ConsumerConfig generado dinámicamente.
  • AdminClientConfig

    • Tipo: AdminClientConfig
    • Descripción: Configuración del cliente administrativo de Kafka, generada a partir de las opciones de KafkaOptions.
    • Valor edeterminado:** Objeto AdminClientConfig generado dinámicamente.
  • MaxAttempts

    • Tipo: int
    • Descripción: Máximo número de intentos para realizar una operación con Kafka (ej: reintento de envio).
    • Valor edeterminado:** 60

Ejemplo de Configuración


A continuación se muestra un ejemplo de archivo appsettings.json que configura las opciones requeridas:

{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.Hosting.Lifetime": "Information"
}
},
"Core": {
"Business": "CodeDesignPlus",
"AppName": "sample-kafka-consumer",
"Version": "v1",
"Description": "Sample of CodeDesignPlus.Net.Core",
"Contact": {
"Name": "CodeDesignPlus",
"Email": "custom@outlook.com"
}
},
"Kafka": {
"Enable": true,
"UseQueue": false,
"BootstrapServers": "localhost:9093",
"Acks": "All",
"BatchSize": 16384,
"LingerMs": 5,
"CompressionType": "Snappy",
"SecurityProtocol": "Plaintext",
"MaxAttempts": 60
}
}

Referencias Adicionales