Kafka
El KafkaContainer
es un componente de la librería CodeDesignPlus.Net.xUnit que proporciona un contenedor Docker para ejecutar Apache Kafka, un sistema de mensajería distribuida. Este componente facilita la creación de pruebas unitarias que dependen de Kafka, ofreciendo un entorno de pruebas aislado y reproducible.
¿Cómo Funciona?
El KafkaContainer
utiliza Docker Compose
para definir y gestionar el contenedor Kafka, junto con un contenedor Zookeeper que requiere Kafka. Al inicializar una instancia de esta clase, se genera un archivo docker-compose.yml
temporal con la configuración necesaria para ejecutar los contenedores. Luego, los contenedores se inician y se detienen automáticamente al final de la ejecución de las pruebas.
El proceso general de funcionamiento es el siguiente:
- Configuración del Contenedor: El método
Build()
define la configuración específica del contenedor Kafka y Zookeeper, incluyendo la ruta al archivodocker-compose.yml
, opciones de reinicio forzado, eliminación de contenedores huérfanos, parada al finalizar y un nombre de servicio alternativo generado dinámicamente para evitar conflictos. Tambien se genera un puerto aleatorio para el broker de Kafka, y se establece la variable de entornoKAFKA_HOST_PORT
con este valor. - Creación del Contenedor: Se crea una instancia de
DockerComposeCompositeService
usando la configuración definida. - Inicio y Detención del Contenedor: Los contenedores se inician cuando se crea una instancia de la clase
KafkaContainer
y se detienen automáticamente cuando la instancia se elimina, asegurando la limpieza del entorno.
Container
KafkaContainer
hereda de la clase DockerCompose
, la cual proporciona la lógica base para interactuar con Docker Compose. La clase utiliza la clase DockerComposeCompositeService
de la librería Testcontainers
para iniciar y detener contenedores docker. La configuración del contenedor se define en el método Build()
que crea un archivo docker-compose.yml
con los parámetros necesarios para la ejecución del contenedor Kafka y Zookeeper. El archivo docker-compose.yml
se encuentra dentro del directorio Containers/KafkaContainer
.
namespace CodeDesignPlus.Net.xUnit.Containers.KafkaContainer;
/// <summary>/// Represents a Docker container for Kafka, managed using Docker Compose./// </summary>public class KafkaContainer : DockerCompose{ /// <summary> /// Gets the broker list for the Kafka container. /// </summary> public string BrokerList { get; private set; }
/// <summary> /// Builds the Docker Compose service configuration for the Kafka container. /// </summary> /// <returns>An <see cref="ICompositeService"/> representing the Docker Compose service.</returns> protected override ICompositeService Build() { // Define the path to the Docker Compose file. var file = Path.Combine(Directory.GetCurrentDirectory(), "Containers", "KafkaContainer", (TemplateString)"docker-compose.yml");
// Generate a random port for the Kafka broker. var random = new Random(); var hostPort = random.Next(29000, 29999);
// Set the broker list for the Kafka container. this.BrokerList = $"localhost:{hostPort}";
// Configure the Docker Compose settings. var compose = new DockerComposeConfig { ComposeFilePath = new[] { file }, ForceRecreate = true, RemoveOrphans = true, StopOnDispose = true, AlternativeServiceName = "kafka_" + Guid.NewGuid().ToString("N"), EnvironmentNameValue = new Dictionary<string, string> { { "KAFKA_PORT" , random.Next(9000, 9999).ToString() }, { "KAFKA_HOST_PORT", hostPort.ToString() } } };
// Disable port retrieval and set the container name. this.EnableGetPort = false; this.ContainerName = $"{compose.AlternativeServiceName}-kafka";
// Create and return the Docker Compose service. return new DockerComposeCompositeService(DockerHost, compose); }}
La clase KafkaCollectionFixture
se utiliza como un fixture de xUnit, que proporciona una forma de compartir el contenedor entre las pruebas de una colección.
namespace CodeDesignPlus.Net.xUnit.Containers.KafkaContainer;
/// <summary>/// Provides a fixture for managing a Kafka container during xUnit tests./// </summary>public sealed class KafkaCollectionFixture : IDisposable{ /// <summary> /// The name of the collection for the Kafka tests. /// </summary> public const string Collection = "Kafka Collection";
/// <summary> /// Gets the Kafka container instance. /// </summary> public KafkaContainer Container { get; }
/// <summary> /// Initializes a new instance of the <see cref="KafkaCollectionFixture"/> class. /// </summary> public KafkaCollectionFixture() { this.Container = new KafkaContainer();
// Wait for the Kafka container to be fully initialized. Thread.Sleep(10000); }
/// <summary> /// Disposes the Kafka container instance. /// </summary> public void Dispose() { this.Container.StopInstance(); }}
Ejemplo de Uso
Este ejemplo muestra cómo usar KafkaContainer para realizar pruebas de integración con Apache Kafka. Se instancia un productor y un consumidor Kafka, se envía un mensaje a un topic específico y luego se consume ese mensaje. La prueba verifica que el mensaje consumido coincida con el mensaje enviado, además, demuestra la configuración del productor y el consumidor utilizando la lista de brokers proporcionada por KafkaContainer.
using CodeDesignPlus.Net.xUnit.Containers.KafkaContainer;using Confluent.Kafka;using Confluent.Kafka.Admin;
namespace CodeDesignPlus.Net.xUnit.Test;
[Collection(KafkaCollectionFixture.Collection)]public class KafkaContainerTest(KafkaCollectionFixture kafkaCollectionFixture){ [Fact] public async Task Initialize_Publish_Consumer() { // Arrange var producerConfig = new ProducerConfig { BootstrapServers = kafkaCollectionFixture.Container.BrokerList, SecurityProtocol = SecurityProtocol.Plaintext }; var consumerConfig = new ConsumerConfig { BootstrapServers = kafkaCollectionFixture.Container.BrokerList, GroupId = "test-group", AutoOffsetReset = AutoOffsetReset.Earliest };
var topic = "test-topic"; var testMessage = "test-message";
// Act string? consumedMessage = null;
// Produce the message using (var producer = new ProducerBuilder<Null, string>(producerConfig).Build()) { await producer.ProduceAsync(topic, new Message<Null, string> { Value = testMessage }); producer.Flush(TimeSpan.FromSeconds(10)); }
using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build()) { consumer.Subscribe(topic); var consumeResult = consumer.Consume(); if (consumeResult?.Message?.Value != null) { consumedMessage = consumeResult.Message.Value; } }
// Assert Assert.Equal(testMessage, consumedMessage); }}
using CodeDesignPlus.Net.xUnit.Containers.KafkaContainer;
namespace CodeDesignPlus.Net.xUnit.Test.Definitions;
[CollectionDefinition(KafkaCollectionFixture.Collection)]public class KafkaCollectionDefinition : ICollectionFixture<KafkaCollectionFixture>{}
Conclusiones
- El
KafkaContainer
facilita la creación de pruebas unitarias que requieren una instancia de Apache Kafka. - Utiliza Docker Compose para la gestión del contenedor, proporcionando una forma sencilla de definir la configuración.
- La clase
KafkaCollectionFixture
permite compartir un contenedor entre todas las pruebas de una misma colección. - El uso del
KafkaContainer
mejora la reproducibilidad y el aislamiento de las pruebas unitarias. - El contenedor Kafka se inicia automaticamente al comienzo de cada prueba y se detiene una vez termina, esto permite un ambiente de pruebas limpio.