Skip to content

Kafka

El KafkaContainer es un componente de la librería CodeDesignPlus.Net.xUnit que proporciona un contenedor Docker para ejecutar Apache Kafka, un sistema de mensajería distribuida. Este componente facilita la creación de pruebas unitarias que dependen de Kafka, ofreciendo un entorno de pruebas aislado y reproducible.

¿Cómo Funciona?


El KafkaContainer utiliza Docker Compose para definir y gestionar el contenedor Kafka, junto con un contenedor Zookeeper que requiere Kafka. Al inicializar una instancia de esta clase, se genera un archivo docker-compose.yml temporal con la configuración necesaria para ejecutar los contenedores. Luego, los contenedores se inician y se detienen automáticamente al final de la ejecución de las pruebas.

El proceso general de funcionamiento es el siguiente:

  1. Configuración del Contenedor: El método Build() define la configuración específica del contenedor Kafka y Zookeeper, incluyendo la ruta al archivo docker-compose.yml, opciones de reinicio forzado, eliminación de contenedores huérfanos, parada al finalizar y un nombre de servicio alternativo generado dinámicamente para evitar conflictos. Tambien se genera un puerto aleatorio para el broker de Kafka, y se establece la variable de entorno KAFKA_HOST_PORT con este valor.
  2. Creación del Contenedor: Se crea una instancia de DockerComposeCompositeService usando la configuración definida.
  3. Inicio y Detención del Contenedor: Los contenedores se inician cuando se crea una instancia de la clase KafkaContainer y se detienen automáticamente cuando la instancia se elimina, asegurando la limpieza del entorno.

Container


KafkaContainer hereda de la clase DockerCompose, la cual proporciona la lógica base para interactuar con Docker Compose. La clase utiliza la clase DockerComposeCompositeService de la librería Testcontainers para iniciar y detener contenedores docker. La configuración del contenedor se define en el método Build() que crea un archivo docker-compose.yml con los parámetros necesarios para la ejecución del contenedor Kafka y Zookeeper. El archivo docker-compose.yml se encuentra dentro del directorio Containers/KafkaContainer.

namespace CodeDesignPlus.Net.xUnit.Containers.KafkaContainer;
/// <summary>
/// Represents a Docker container for Kafka, managed using Docker Compose.
/// </summary>
public class KafkaContainer : DockerCompose
{
/// <summary>
/// Gets the broker list for the Kafka container.
/// </summary>
public string BrokerList { get; private set; }
/// <summary>
/// Builds the Docker Compose service configuration for the Kafka container.
/// </summary>
/// <returns>An <see cref="ICompositeService"/> representing the Docker Compose service.</returns>
protected override ICompositeService Build()
{
// Define the path to the Docker Compose file.
var file = Path.Combine(Directory.GetCurrentDirectory(), "Containers", "KafkaContainer", (TemplateString)"docker-compose.yml");
// Generate a random port for the Kafka broker.
var random = new Random();
var hostPort = random.Next(29000, 29999);
// Set the broker list for the Kafka container.
this.BrokerList = $"localhost:{hostPort}";
// Configure the Docker Compose settings.
var compose = new DockerComposeConfig
{
ComposeFilePath = new[] { file },
ForceRecreate = true,
RemoveOrphans = true,
StopOnDispose = true,
AlternativeServiceName = "kafka_" + Guid.NewGuid().ToString("N"),
EnvironmentNameValue = new Dictionary<string, string>
{
{ "KAFKA_PORT" , random.Next(9000, 9999).ToString() },
{ "KAFKA_HOST_PORT", hostPort.ToString() }
}
};
// Disable port retrieval and set the container name.
this.EnableGetPort = false;
this.ContainerName = $"{compose.AlternativeServiceName}-kafka";
// Create and return the Docker Compose service.
return new DockerComposeCompositeService(DockerHost, compose);
}
}

La clase KafkaCollectionFixture se utiliza como un fixture de xUnit, que proporciona una forma de compartir el contenedor entre las pruebas de una colección.

namespace CodeDesignPlus.Net.xUnit.Containers.KafkaContainer;
/// <summary>
/// Provides a fixture for managing a Kafka container during xUnit tests.
/// </summary>
public sealed class KafkaCollectionFixture : IDisposable
{
/// <summary>
/// The name of the collection for the Kafka tests.
/// </summary>
public const string Collection = "Kafka Collection";
/// <summary>
/// Gets the Kafka container instance.
/// </summary>
public KafkaContainer Container { get; }
/// <summary>
/// Initializes a new instance of the <see cref="KafkaCollectionFixture"/> class.
/// </summary>
public KafkaCollectionFixture()
{
this.Container = new KafkaContainer();
// Wait for the Kafka container to be fully initialized.
Thread.Sleep(10000);
}
/// <summary>
/// Disposes the Kafka container instance.
/// </summary>
public void Dispose()
{
this.Container.StopInstance();
}
}

Ejemplo de Uso


Este ejemplo muestra cómo usar KafkaContainer para realizar pruebas de integración con Apache Kafka. Se instancia un productor y un consumidor Kafka, se envía un mensaje a un topic específico y luego se consume ese mensaje. La prueba verifica que el mensaje consumido coincida con el mensaje enviado, además, demuestra la configuración del productor y el consumidor utilizando la lista de brokers proporcionada por KafkaContainer.

using CodeDesignPlus.Net.xUnit.Containers.KafkaContainer;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
namespace CodeDesignPlus.Net.xUnit.Test;
[Collection(KafkaCollectionFixture.Collection)]
public class KafkaContainerTest(KafkaCollectionFixture kafkaCollectionFixture)
{
[Fact]
public async Task Initialize_Publish_Consumer()
{
// Arrange
var producerConfig = new ProducerConfig { BootstrapServers = kafkaCollectionFixture.Container.BrokerList, SecurityProtocol = SecurityProtocol.Plaintext };
var consumerConfig = new ConsumerConfig
{
BootstrapServers = kafkaCollectionFixture.Container.BrokerList,
GroupId = "test-group",
AutoOffsetReset = AutoOffsetReset.Earliest
};
var topic = "test-topic";
var testMessage = "test-message";
// Act
string? consumedMessage = null;
// Produce the message
using (var producer = new ProducerBuilder<Null, string>(producerConfig).Build())
{
await producer.ProduceAsync(topic, new Message<Null, string> { Value = testMessage });
producer.Flush(TimeSpan.FromSeconds(10));
}
using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
consumer.Subscribe(topic);
var consumeResult = consumer.Consume();
if (consumeResult?.Message?.Value != null)
{
consumedMessage = consumeResult.Message.Value;
}
}
// Assert
Assert.Equal(testMessage, consumedMessage);
}
}

Conclusiones


  • El KafkaContainer facilita la creación de pruebas unitarias que requieren una instancia de Apache Kafka.
  • Utiliza Docker Compose para la gestión del contenedor, proporcionando una forma sencilla de definir la configuración.
  • La clase KafkaCollectionFixture permite compartir un contenedor entre todas las pruebas de una misma colección.
  • El uso del KafkaContainer mejora la reproducibilidad y el aislamiento de las pruebas unitarias.
  • El contenedor Kafka se inicia automaticamente al comienzo de cada prueba y se detiene una vez termina, esto permite un ambiente de pruebas limpio.

Referencias Adicionales