Skip to content

Connection

Este documento describe el servicio IRabbitConnection y RabbitConnection, que proporcionan una abstracción para manejar la conexión a un servidor RabbitMQ. El servicio RabbitConnection implementa la interfaz IRabbitConnection y se encarga de crear, gestionar y liberar la conexión a RabbitMQ, incluyendo el manejo de reintentos en caso de fallo inicial.

using CodeDesignPlus.Net.RabbitMQ.Abstractions.Options;
namespace CodeDesignPlus.Net.RabbitMQ.Abstractions;
/// <summary>
/// Represents a RabbitMQ connection.
/// </summary>
public interface IRabbitConnection : IDisposable
{
/// <summary>
/// Gets the RabbitMQ connection.
/// </summary>
IConnection Connection { get; }
/// <summary>
/// Connects to the RabbitMQ server.
/// </summary>
/// <param name="appName">The name of the application.</param>
/// <param name="settings">The options for configuring the RabbitMQ connection.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
/// <exception cref="ArgumentNullException">Thrown if <paramref name="settings"/> or <paramref name="appName"/> is null.</exception>
Task ConnectAsync(RabbitMQOptions settings, string appName);
}

¿Cómo Funciona?


El servicio RabbitConnection funciona como un singleton que mantiene una conexión activa a RabbitMQ. Al instanciarse, lee la configuración necesaria (host, puerto, usuario, contraseña, etc.) desde la configuración de la aplicación. Intenta conectarse al servidor RabbitMQ, utilizando un mecanismo de reintentos configurable. Una vez establecida la conexión, la mantiene activa y la pone a disposición a través de la propiedad Connection. Si la conexión falla después de un número máximo de intentos, lanza una excepción. Además, se asegura de liberar los recursos de la conexión al ser descartado, implementando el patrón IDisposable.

Propiedades


La interfaz IRabbitConnection define una única propiedad y un método que deben ser implementados por cualquier clase que desee proporcionar una conexión a RabbitMQ de forma abstraída.

Connection

Propiedad de solo lectura que proporciona acceso a la conexión activa a RabbitMQ. Es de tipo IConnection de la biblioteca RabbitMQ .NET, que permite interactuar con el servidor RabbitMQ a través de canales.

Métodos


RabbitConnection implementa la interfaz IRabbitConnection, que define los siguientes miembros:

Dispose

Este método, implementado de la interfaz IDisposable, libera los recursos utilizados por la conexión RabbitMQ, asegurando que la conexión se cierre correctamente al finalizar su uso, lo que evita fugas de recursos.

Implementación


namespace CodeDesignPlus.Net.RabbitMQ.Services
{
/// <summary>
/// Manages the RabbitMQ connection.
/// </summary>
/// <param name="logger">The logger for logging messages.</param>
public class RabbitConnection(ILogger<RabbitConnection> logger) : IRabbitConnection
{
/// <summary>
/// Gets the RabbitMQ connection.
/// </summary>
public IConnection Connection { get; private set; }
private bool disposed = false;
/// <summary>
/// Connects to the RabbitMQ server.
/// </summary>
/// <param name="appName">The name of the application.</param>
/// <param name="settings">The options for configuring the RabbitMQ connection.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
/// <exception cref="ArgumentNullException">Thrown if <paramref name="settings"/> or <paramref name="appName"/> is null.</exception>
public async Task ConnectAsync(RabbitMQOptions settings, string appName)
{
ArgumentNullException.ThrowIfNull(appName);
ArgumentNullException.ThrowIfNull(settings);
var factory = new ConnectionFactory
{
HostName = settings.Host,
Port = settings.Port,
UserName = settings.UserName,
Password = settings.Password
};
var isConnected = false;
var retryCount = 0;
var errors = new List<string>();
while (!isConnected && retryCount < settings.MaxRetry)
{
try
{
this.Connection = await factory.CreateConnectionAsync(appName);
isConnected = this.Connection.IsOpen;
}
catch (Exception ex)
{
retryCount++;
logger.LogError(ex, "Error connecting. Attempt {RetryCount} of {MaxRetries}.", retryCount, settings.MaxRetry);
errors.Add(ex.Message);
if (retryCount < settings.MaxRetry)
Thread.Sleep(settings.RetryInterval);
}
}
if (!isConnected)
throw new Exceptions.RabbitMQException("Failed to connect to the RabbitMQ server", errors);
logger.LogInformation("RabbitMQ Connection established successfully.");
}
/// <summary>
/// Releases the unmanaged resources used by the <see cref="RabbitConnection"/> and optionally releases the managed resources.
/// </summary>
/// <param name="disposing">true to release both managed and unmanaged resources; false to release only unmanaged resources.</param>
protected virtual void Dispose(bool disposing)
{
if (!disposed)
{
if (disposing)
this.Connection.Dispose();
disposed = true;
}
}
/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Finalizes an instance of the <see cref="RabbitConnection"/> class.
/// </summary>
~RabbitConnection()
{
Dispose(false);
}
}
}

El servicio RabbitConnection se implementa como una clase que hereda de IRabbitConnection y se encarga de:

  1. Recibir configuración: Lee la configuración de RabbitMQ desde las opciones de la aplicación (IOptions<RabbitMQOptions>).
  2. Crear la conexión: Utiliza la clase ConnectionFactory de la biblioteca RabbitMQ .NET para crear una conexión.
  3. Manejo de reintentos: Implementa un mecanismo de reintentos con un número máximo de intentos y un intervalo entre ellos, para manejar posibles fallos de conexión al inicio.
  4. Implementación de IDisposable: Se asegura de cerrar y liberar correctamente los recursos de la conexión.

Las dependencias del servicio son:

  • IOptions<RabbitMQOptions>: Para la configuración específica de RabbitMQ.
  • IOptions<CoreOptions>: Para la configuración general de la aplicación.
  • ILogger<RabbitConnection>: Para registrar los eventos relacionados con la conexión.

La clase RabbitMQOptions contendría al menos las siguientes propiedades:

  • Host (string): Dirección del servidor RabbitMQ.
  • Port (int): Puerto del servidor RabbitMQ.
  • UserName (string): Nombre de usuario para la autenticación.
  • Password (string): Contraseña para la autenticación.
  • MaxRetry (int): Número máximo de intentos de conexión.
  • RetryInterval (int): Intervalo en milisegundos entre intentos de conexión.

Y CoreOptions al menos la siguiente:

  • AppName (string): Nombre de la aplicación, usado para la configuración de la conexión.

Ejemplo de Uso


En este ejemplo, se inyecta IRabbitConnection en el constructor de ChannelProvider para obtener la conexión a RabbitMQ. Luego, se utilizan los métodos de ChannelProvider para declarar intercambios y obtener canales para publicar y consumir eventos de dominio. La clase ChannelProvider se encarga de gestionar los canales y los intercambios asociados a los eventos de dominio, utilizando la conexión a RabbitMQ proporcionada por IRabbitConnection.

namespace CodeDesignPlus.Net.RabbitMQ.Services;
/// <summary>
/// Provides methods to manage RabbitMQ channels.
/// </summary>
/// <remarks>
/// Initializes a new instance of the <see cref="ChannelProvider"/> class.
/// </remarks>
/// <param name="connection">The RabbitMQ connection.</param>
/// <param name="domainEventResolver">The domain event resolver service.</param>
/// <param name="options">The core options.</param>
public class ChannelProvider(IRabbitConnection connection, IDomainEventResolver domainEventResolver, IOptions<CoreOptions> options) : IChannelProvider
{
private readonly ConcurrentDictionary<string, ChannelModel> channels = new();
private readonly ConcurrentDictionary<string, string> exchanges = new();
/// <summary>
/// Declares an exchange for the specified domain event type.
/// </summary>
/// <param name="domainEventType">The type of the domain event.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>The name of the declared exchange.</returns>
public async Task<string> ExchangeDeclareAsync(Type domainEventType, CancellationToken cancellationToken)
{
var key = domainEventType.Name;
if (exchanges.TryGetValue(key, out var exchange))
return exchange;
var exchangeName = domainEventResolver.GetKeyDomainEvent(domainEventType);
var channel = await GetChannelPublishAsync(domainEventType, cancellationToken);
await channel.ExchangeDeclareAsync(exchange: exchangeName, durable: true, type: ExchangeType.Fanout, arguments: new Dictionary<string, object>
{
{ "x-cdp-bussiness", options.Value.Business },
{ "x-cdp-microservice", options.Value.AppName }
}, cancellationToken: cancellationToken);
exchanges.TryAdd(key, exchangeName);
return exchangeName;
}
/// <summary>
/// Declares an exchange for the specified domain event.
/// </summary>
/// <typeparam name="TEvent">The type of the domain event.</typeparam>
/// <param name="domainEvent">The domain event instance.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>The name of the declared exchange.</returns>
public Task<string> ExchangeDeclareAsync<TEvent>(TEvent domainEvent, CancellationToken cancellationToken) where TEvent : IDomainEvent
{
return ExchangeDeclareAsync(domainEvent.GetType(), cancellationToken);
}
/// <summary>
/// Gets the channel for publishing the specified domain event type.
/// </summary>
/// <param name="domainEventType">The type of the domain event.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>The channel for publishing.</returns>
public Task<IChannel> GetChannelPublishAsync(Type domainEventType, CancellationToken cancellationToken)
{
var key = domainEventType.Name;
return GetChannelAsync(key);
}
/// <summary>
/// Gets the channel for publishing the specified domain event.
/// </summary>
/// <typeparam name="TEvent">The type of the domain event.</typeparam>
/// <param name="domainEvent">The domain event instance.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>The channel for publishing.</returns>
public Task<IChannel> GetChannelPublishAsync<TEvent>(TEvent domainEvent, CancellationToken cancellationToken) where TEvent : IDomainEvent
{
return GetChannelPublishAsync(domainEvent.GetType(), cancellationToken);
}
/// <summary>
/// Gets the channel for consuming the specified domain event with the specified event handler.
/// </summary>
/// <typeparam name="TEvent">The type of the domain event.</typeparam>
/// <typeparam name="TEventHandler">The type of the event handler.</typeparam>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>The channel for consuming.</returns>
public Task<IChannel> GetChannelConsumerAsync<TEvent, TEventHandler>(CancellationToken cancellationToken)
where TEvent : IDomainEvent
where TEventHandler : IEventHandler<TEvent>
{
var key = typeof(TEventHandler).Name;
return GetChannelAsync(key);
}
/// <summary>
/// Sets the consumer tag for the specified event and event handler.
/// </summary>
/// <typeparam name="TEvent">The type of the domain event.</typeparam>
/// <typeparam name="TEventHandler">The type of the event handler.</typeparam>
/// <param name="consumerTag">The consumer tag.</param>
public void SetConsumerTag<TEvent, TEventHandler>(string consumerTag)
where TEvent : IDomainEvent
where TEventHandler : IEventHandler<TEvent>
{
var key = typeof(TEventHandler).Name;
if (channels.TryGetValue(key, out var channel))
channel.ConsumerTag = consumerTag;
}
/// <summary>
/// Gets the consumer tag for the specified event and event handler.
/// </summary>
/// <typeparam name="TEvent">The type of the domain event.</typeparam>
/// <typeparam name="TEventHandler">The type of the event handler.</typeparam>
/// <returns>The consumer tag.</returns>
public string GetConsumerTag<TEvent, TEventHandler>()
where TEvent : IDomainEvent
where TEventHandler : IEventHandler<TEvent>
{
var key = typeof(TEventHandler).Name;
if (channels.TryGetValue(key, out var channel))
return channel.ConsumerTag;
return default;
}
/// <summary>
/// Gets the channel for the specified key.
/// </summary>
/// <param name="key">The key for the channel.</param>
/// <returns>The channel.</returns>
private async Task<IChannel> GetChannelAsync(string key)
{
if (channels.TryGetValue(key, out var channel))
return channel.Channel;
channel = ChannelModel.Create(key, await connection.Connection.CreateChannelAsync());
channels.TryAdd(key, channel);
return channel.Channel;
}
}

Conclusiones


El servicio IRabbitConnection y su implementación RabbitConnection proporcionan una forma centralizada, confiable y fácil de usar para gestionar la conexión a RabbitMQ en una aplicación .NET. Permite la configuración de las opciones de conexión, el manejo de reintentos y la liberación adecuada de recursos. Es recomendable usar la inyección de dependencias para obtener la instancia de IRabbitConnection y no instanciar directamente RabbitConnection.

Referencias Adicionales