Channel Provider
Este documento describe los servicios IChannelProvider
y ChannelProvider
, los cuales se encargan de gestionar los canales y exchanges en RabbitMQ, proporcionando una capa de abstracción que facilita la interacción con la infraestructura de mensajería. ChannelProvider
implementa la interfaz IChannelProvider
y gestiona la creación, reutilización y configuración de canales y exchanges.
namespace CodeDesignPlus.Net.RabbitMQ.Abstractions;
/// <summary>/// Provides methods to manage RabbitMQ channels and exchanges./// </summary>public interface IChannelProvider{ /// <summary> /// Gets the channel for publishing the specified domain event type. /// </summary> /// <param name="domainEventType">The type of the domain event.</param> /// <param name="cancellationToken">The token to monitor for cancellation requests.</param> /// <returns>The channel for publishing.</returns> Task<IChannel> GetChannelPublishAsync(Type domainEventType, CancellationToken cancellationToken);
/// <summary> /// Gets the channel for publishing the specified domain event. /// </summary> /// <typeparam name="TDomainEvent">The type of the domain event.</typeparam> /// <param name="domainEvent">The domain event instance.</param> /// <param name="cancellationToken">The token to monitor for cancellation requests.</param> /// <returns>The channel for publishing.</returns> Task<IChannel> GetChannelPublishAsync<TDomainEvent>(TDomainEvent domainEvent, CancellationToken cancellationToken) where TDomainEvent : IDomainEvent;
/// <summary> /// Declares an exchange for the specified domain event type. /// </summary> /// <param name="domainEventType">The type of the domain event.</param> /// <param name="cancellationToken">The token to monitor for cancellation requests.</param> /// <returns>The name of the declared exchange.</returns> Task<string> ExchangeDeclareAsync(Type domainEventType, CancellationToken cancellationToken);
/// <summary> /// Declares an exchange for the specified domain event. /// </summary> /// <typeparam name="TDomainEvent">The type of the domain event.</typeparam> /// <param name="domainEvent">The domain event instance.</param> /// <param name="cancellationToken">The token to monitor for cancellation requests.</param> /// <returns>The name of the declared exchange.</returns> Task<string> ExchangeDeclareAsync<TDomainEvent>(TDomainEvent domainEvent, CancellationToken cancellationToken) where TDomainEvent : IDomainEvent;
/// <summary> /// Gets the channel for consuming the specified domain event with the specified event handler. /// </summary> /// <typeparam name="TEvent">The type of the domain event.</typeparam> /// <typeparam name="TEventHandler">The type of the event handler.</typeparam> /// <param name="cancellationToken">The token to monitor for cancellation requests.</param> /// <returns>The channel for consuming.</returns> Task<IChannel> GetChannelConsumerAsync<TEvent, TEventHandler>(CancellationToken cancellationToken) where TEvent : IDomainEvent where TEventHandler : IEventHandler<TEvent>;
/// <summary> /// Sets the consumer tag for the specified event and event handler. /// </summary> /// <typeparam name="TEvent">The type of the domain event.</typeparam> /// <typeparam name="TEventHandler">The type of the event handler.</typeparam> /// <param name="consumerTag">The consumer tag.</param> void SetConsumerTag<TEvent, TEventHandler>(string consumerTag) where TEvent : IDomainEvent where TEventHandler : IEventHandler<TEvent>;
/// <summary> /// Gets the consumer tag for the specified event and event handler. /// </summary> /// <typeparam name="TEvent">The type of the domain event.</typeparam> /// <typeparam name="TEventHandler">The type of the event handler.</typeparam> /// <returns>The consumer tag.</returns> string GetConsumerTag<TEvent, TEventHandler>() where TEvent : IDomainEvent where TEventHandler : IEventHandler<TEvent>;}
¿Cómo Funciona?
ChannelProvider
actúa como un administrador de canales y exchanges para RabbitMQ. Internamente, mantiene un registro de los canales y exchanges creados para evitar la creación innecesaria de recursos. Utiliza un diccionario concurrente para almacenar los canales y exchanges, garantizando la seguridad en entornos de alta concurrencia. Cuando se solicita un canal para publicar o consumir, ChannelProvider
verifica si existe un canal para ese tipo de evento o manejador, reutilizándolo si es posible o creando uno nuevo si no existe. De manera similar, se encarga de declarar los exchanges para cada tipo de evento. La implementación se asegura que los exchanges declarados tengan la configuración de los headers para la información del negocio y microservicio.
Métodos
ChannelProvider
implementa la interfaz IChannelProvider
, que define los siguientes métodos:
GetChannelPublish
Type: IModel GetChannelPublish(Type domainEventType)
Obtiene un canal de RabbitMQ para publicar un evento de dominio específico. Recibe el tipo del evento de dominio como argumento. Si ya existe un canal asociado a este tipo, lo reutiliza; de lo contrario, crea un nuevo canal.
GetChannelPublish
Type: IModel GetChannelPublish<TDomainEvent>(TDomainEvent domainEvent) where TDomainEvent : IDomainEvent
Obtiene un canal de RabbitMQ para publicar un evento de dominio específico utilizando el tipo genérico del evento. Recibe una instancia del evento de dominio y deriva el tipo de evento desde la instancia. Si ya existe un canal asociado a este tipo, lo reutiliza; de lo contrario, crea un nuevo canal.
ExchangeDeclare
Type: string ExchangeDeclare(Type domainEventType)
Declara un exchange en RabbitMQ para el tipo de evento de dominio especificado. Si ya existe un exchange con ese nombre, lo devuelve; de lo contrario, crea un nuevo exchange del tipo fanout y lo registra para futuras solicitudes.
ExchangeDeclare
Type: string ExchangeDeclare<TDomainEvent>(TDomainEvent domainEvent) where TDomainEvent : IDomainEvent
Declara un exchange en RabbitMQ para un evento de dominio específico. Recibe una instancia del evento de dominio y deriva el tipo de evento desde la instancia. Si ya existe un exchange con ese nombre, lo devuelve; de lo contrario, crea un nuevo exchange del tipo fanout y lo registra para futuras solicitudes.
GetChannelConsumer
Type: IModel GetChannelConsumer<TEvent, TEventHandler>() where TEvent : IDomainEvent where TEventHandler : IEventHandler<TEvent>
Obtiene un canal de RabbitMQ para consumir un evento de dominio específico utilizando el tipo genérico del evento y su manejador. Recibe el tipo de evento y el tipo de su manejador. Si ya existe un canal asociado a este manejador, lo reutiliza; de lo contrario, crea un nuevo canal.
SetConsumerTag
Type: void SetConsumerTag<TEvent, TEventHandler>(string consumerTag) where TEvent : IDomainEvent where TEventHandler : IEventHandler<TEvent>
Establece un tag para un consumidor específico asociado a un tipo de evento y manejador. Se utiliza para identificar un consumidor en particular dentro de un canal.
GetConsumerTag
Type: string GetConsumerTag<TEvent, TEventHandler>() where TEvent : IDomainEvent where TEventHandler : IEventHandler<TEvent>
Obtiene el tag del consumidor asociado a un tipo de evento y manejador.
Implementación
namespace CodeDesignPlus.Net.RabbitMQ.Services;
/// <summary>/// Provides methods to manage RabbitMQ channels./// </summary>/// <remarks>/// Initializes a new instance of the <see cref="ChannelProvider"/> class./// </remarks>/// <param name="connection">The RabbitMQ connection.</param>/// <param name="domainEventResolver">The domain event resolver service.</param>/// <param name="options">The core options.</param>public class ChannelProvider(IRabbitConnection connection, IDomainEventResolver domainEventResolver, IOptions<CoreOptions> options) : IChannelProvider{ private readonly ConcurrentDictionary<string, ChannelModel> channels = new(); private readonly ConcurrentDictionary<string, string> exchanges = new();
/// <summary> /// Declares an exchange for the specified domain event type. /// </summary> /// <param name="domainEventType">The type of the domain event.</param> /// <param name="cancellationToken">The token to monitor for cancellation requests.</param> /// <returns>The name of the declared exchange.</returns> public async Task<string> ExchangeDeclareAsync(Type domainEventType, CancellationToken cancellationToken) { var key = domainEventType.Name;
if (exchanges.TryGetValue(key, out var exchange)) return exchange;
var exchangeName = domainEventResolver.GetKeyDomainEvent(domainEventType);
var channel = await GetChannelPublishAsync(domainEventType, cancellationToken);
await channel.ExchangeDeclareAsync(exchange: exchangeName, durable: true, type: ExchangeType.Fanout, arguments: new Dictionary<string, object> { { "x-cdp-bussiness", options.Value.Business }, { "x-cdp-microservice", options.Value.AppName } }, cancellationToken: cancellationToken);
exchanges.TryAdd(key, exchangeName);
return exchangeName; }
/// <summary> /// Declares an exchange for the specified domain event. /// </summary> /// <typeparam name="TEvent">The type of the domain event.</typeparam> /// <param name="domainEvent">The domain event instance.</param> /// <param name="cancellationToken">The token to monitor for cancellation requests.</param> /// <returns>The name of the declared exchange.</returns> public Task<string> ExchangeDeclareAsync<TEvent>(TEvent domainEvent, CancellationToken cancellationToken) where TEvent : IDomainEvent { return ExchangeDeclareAsync(domainEvent.GetType(), cancellationToken); }
/// <summary> /// Gets the channel for publishing the specified domain event type. /// </summary> /// <param name="domainEventType">The type of the domain event.</param> /// <param name="cancellationToken">The token to monitor for cancellation requests.</param> /// <returns>The channel for publishing.</returns> public Task<IChannel> GetChannelPublishAsync(Type domainEventType, CancellationToken cancellationToken) { var key = domainEventType.Name;
return GetChannelAsync(key); }
/// <summary> /// Gets the channel for publishing the specified domain event. /// </summary> /// <typeparam name="TEvent">The type of the domain event.</typeparam> /// <param name="domainEvent">The domain event instance.</param> /// <param name="cancellationToken">The token to monitor for cancellation requests.</param> /// <returns>The channel for publishing.</returns> public Task<IChannel> GetChannelPublishAsync<TEvent>(TEvent domainEvent, CancellationToken cancellationToken) where TEvent : IDomainEvent { return GetChannelPublishAsync(domainEvent.GetType(), cancellationToken); }
/// <summary> /// Gets the channel for consuming the specified domain event with the specified event handler. /// </summary> /// <typeparam name="TEvent">The type of the domain event.</typeparam> /// <typeparam name="TEventHandler">The type of the event handler.</typeparam> /// <param name="cancellationToken">The token to monitor for cancellation requests.</param> /// <returns>The channel for consuming.</returns> public Task<IChannel> GetChannelConsumerAsync<TEvent, TEventHandler>(CancellationToken cancellationToken) where TEvent : IDomainEvent where TEventHandler : IEventHandler<TEvent> { var key = typeof(TEventHandler).Name;
return GetChannelAsync(key); }
/// <summary> /// Sets the consumer tag for the specified event and event handler. /// </summary> /// <typeparam name="TEvent">The type of the domain event.</typeparam> /// <typeparam name="TEventHandler">The type of the event handler.</typeparam> /// <param name="consumerTag">The consumer tag.</param> public void SetConsumerTag<TEvent, TEventHandler>(string consumerTag) where TEvent : IDomainEvent where TEventHandler : IEventHandler<TEvent> { var key = typeof(TEventHandler).Name;
if (channels.TryGetValue(key, out var channel)) channel.ConsumerTag = consumerTag; }
/// <summary> /// Gets the consumer tag for the specified event and event handler. /// </summary> /// <typeparam name="TEvent">The type of the domain event.</typeparam> /// <typeparam name="TEventHandler">The type of the event handler.</typeparam> /// <returns>The consumer tag.</returns> public string GetConsumerTag<TEvent, TEventHandler>() where TEvent : IDomainEvent where TEventHandler : IEventHandler<TEvent> { var key = typeof(TEventHandler).Name;
if (channels.TryGetValue(key, out var channel)) return channel.ConsumerTag;
return default; }
/// <summary> /// Gets the channel for the specified key. /// </summary> /// <param name="key">The key for the channel.</param> /// <returns>The channel.</returns> private async Task<IChannel> GetChannelAsync(string key) { if (channels.TryGetValue(key, out var channel)) return channel.Channel;
channel = ChannelModel.Create(key, await connection.Connection.CreateChannelAsync());
channels.TryAdd(key, channel);
return channel.Channel; }}
El servicio ChannelProvider
se implementa como una clase que hereda de IChannelProvider
y se encarga de:
- Recibir dependencias: Recibe la conexión a RabbitMQ (
IRabbitConnection
), el servicio de resolución de eventos de dominio (IDomainEventResolver
) y las opciones de la aplicación (IOptions<CoreOptions>
). - Gestión de canales: Utiliza un
ConcurrentDictionary<string, ChannelModel>
para almacenar los canales creados y poder reutilizarlos. - Gestión de exchanges: Utiliza un
ConcurrentDictionary<string, string>
para almacenar los exchanges declarados y poder reutilizarlos. - Creación de exchanges: Al solicitar un exchange, se verifica si ya existe. Si no, se crea uno nuevo con tipo fanout y se almacena.
- Creación de canales: Al solicitar un canal, se verifica si ya existe. Si no, se crea uno nuevo a partir de la conexión de RabbitMQ y se almacena.
- Implementación de tags: Permite establecer y obtener el tag de un consumidor específico.
- Manejo de concurrencia: Utiliza diccionarios concurrentes para manejar la creación y acceso a canales y exchanges de forma segura en entornos de alta concurrencia.
Las dependencias del servicio son:
IRabbitConnection
: Para la conexión a RabbitMQ.IDomainEventResolver
: Para obtener el nombre del exchange basado en el evento de dominio.IOptions<CoreOptions>
: Para obtener información general de la aplicación, como el nombre del negocio y el microservicio.
La clase ChannelModel
es una clase interna que guarda el IModel
y un ConsumerTag
.
Ejemplo de Uso
En este ejemplo, se muestra como se pueden usar los métodos de ChannelProvider
para publicar y consumir eventos de dominio en RabbitMQ. La clase RabbitPubSubService
se encarga de publicar eventos de dominio y suscribirse a ellos utilizando ChannelProvider
para obtener los canales y exchanges necesarios. La implementación de RabbitPubSubService
utiliza la inyección de dependencias para obtener las instancias necesarias, como IDomainEventResolver
y IOptions<CoreOptions>
. Además, se muestra cómo se configuran los exchanges y las colas en RabbitMQ para los eventos de dominio, y cómo se procesan los eventos recibidos por los consumidores.
using CodeDesignPlus.Net.Exceptions;
namespace CodeDesignPlus.Net.RabbitMQ.Services;
/// <summary>/// Service to publish and subscribe to domain events using RabbitMQ./// </summary>public class RabbitPubSubService : IRabbitPubSub{ private readonly ILogger<RabbitPubSubService> logger; private readonly IServiceProvider serviceProvider; private readonly IDomainEventResolver domainEventResolverService; private readonly CoreOptions coreOptions; private readonly IChannelProvider channelProvider; private readonly Dictionary<string, object> argumentsQueue;
/// <summary> /// Initializes a new instance of the <see cref="RabbitPubSubService"/> class. /// </summary> /// <param name="logger">The logger instance.</param> /// <param name="serviceProvider">The service provider.</param> /// <param name="domainEventResolverService">The domain event resolver service.</param> /// <param name="channelProvider">The channel provider.</param> /// <param name="coreOptions">The core options.</param> /// <param name="rabbitMQOptions">The RabbitMQ options.</param> public RabbitPubSubService(ILogger<RabbitPubSubService> logger, IServiceProvider serviceProvider, IDomainEventResolver domainEventResolverService, IChannelProvider channelProvider, IOptions<CoreOptions> coreOptions, IOptions<RabbitMQOptions> rabbitMQOptions) { ArgumentNullException.ThrowIfNull(logger); ArgumentNullException.ThrowIfNull(serviceProvider); ArgumentNullException.ThrowIfNull(domainEventResolverService); ArgumentNullException.ThrowIfNull(coreOptions); ArgumentNullException.ThrowIfNull(rabbitMQOptions); ArgumentNullException.ThrowIfNull(channelProvider);
this.logger = logger; this.serviceProvider = serviceProvider; this.domainEventResolverService = domainEventResolverService; this.coreOptions = coreOptions.Value; this.channelProvider = channelProvider;
this.argumentsQueue = rabbitMQOptions.Value.QueueArguments.GetArguments();
this.logger.LogInformation("RabbitPubSubService initialized."); }
/// <summary> /// Publishes a domain event asynchronously. /// </summary> /// <param name="event">The domain event to publish.</param> /// <param name="cancellationToken">A token to monitor for cancellation requests.</param> /// <returns>A task that represents the asynchronous publish operation.</returns> public Task PublishAsync(IDomainEvent @event, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(@event);
this.logger.LogInformation("Publishing event: {TEvent}.", @event.GetType().Name);
return this.PrivatePublishAsync(@event, cancellationToken); }
/// <summary> /// Publishes a list of domain events asynchronously. /// </summary> /// <param name="event">The list of domain events to publish.</param> /// <param name="cancellationToken">A token to monitor for cancellation requests.</param> /// <returns>A task that represents the asynchronous publish operation.</returns> public Task PublishAsync(IReadOnlyList<IDomainEvent> @event, CancellationToken cancellationToken) { var tasks = @event.Select(x => PublishAsync(x, cancellationToken));
return Task.WhenAll(tasks); }
/// <summary> /// Publishes a domain event to RabbitMQ. /// </summary> /// <param name="event">The domain event to publish.</param> /// <param name="cancellationToken">A token to monitor for cancellation requests.</param> /// <returns>A task that represents the asynchronous publish operation.</returns> private async Task PrivatePublishAsync(IDomainEvent @event, CancellationToken cancellationToken) { var channel = await this.channelProvider.GetChannelPublishAsync(@event.GetType(), cancellationToken);
var exchangeName = await this.channelProvider.ExchangeDeclareAsync(@event.GetType(), cancellationToken);
var message = JsonSerializer.Serialize(@event);
var body = Encoding.UTF8.GetBytes(message);
var properties = new BasicProperties { Persistent = true, AppId = coreOptions.AppName, Type = @event.GetType().Name, Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()), MessageId = @event.EventId.ToString(), CorrelationId = Guid.NewGuid().ToString(), ContentEncoding = "utf-8", ContentType = "application/json" };
await channel.BasicPublishAsync( exchange: exchangeName, routingKey: string.Empty, mandatory: true, basicProperties: properties, body: body );
this.logger.LogInformation("Event {TEvent} published ", @event.GetType().Name); }
/// <summary> /// Subscribes to a domain event asynchronously. /// </summary> /// <typeparam name="TEvent">The type of the domain event.</typeparam> /// <typeparam name="TEventHandler">The type of the event handler.</typeparam> /// <param name="cancellationToken">A token to monitor for cancellation requests.</param> /// <returns>A task that represents the asynchronous subscribe operation.</returns> public async Task SubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken) where TEvent : IDomainEvent where TEventHandler : IEventHandler<TEvent> { var channel = await this.channelProvider.GetChannelConsumerAsync<TEvent, TEventHandler>(cancellationToken);
var queueNameAttribute = typeof(TEventHandler).GetCustomAttribute<QueueNameAttribute>(); var queueName = queueNameAttribute.GetQueueName(coreOptions.AppName, coreOptions.Business, coreOptions.Version);
var exchangeName = this.domainEventResolverService.GetKeyDomainEvent<TEvent>();
await ConfigQueueAsync(channel, queueName, exchangeName); await ConfigQueueDlxAsync(channel, queueName, exchangeName);
this.logger.LogInformation("Subscribed to event: {TEvent}.", typeof(TEvent).Name);
var eventConsumer = new AsyncEventingBasicConsumer(channel);
eventConsumer.ReceivedAsync += async (_, ea) => await RecivedEvent<TEvent, TEventHandler>(channel, ea, cancellationToken);
var consumerTag = await channel.BasicConsumeAsync(queue: queueName, autoAck: false, consumer: eventConsumer, cancellationToken: cancellationToken);
this.channelProvider.SetConsumerTag<TEvent, TEventHandler>(consumerTag); }
/// <summary> /// Processes the received event. /// </summary> /// <typeparam name="TEvent">The type of the domain event.</typeparam> /// <typeparam name="TEventHandler">The type of the event handler.</typeparam> /// <param name="channel">The RabbitMQ channel.</param> /// <param name="eventArguments">The event arguments.</param> /// <param name="cancellationToken">A token to monitor for cancellation requests.</param> /// <returns>A task that represents the asynchronous operation.</returns> public async Task RecivedEvent<TEvent, TEventHandler>(IChannel channel, BasicDeliverEventArgs eventArguments, CancellationToken cancellationToken) where TEvent : IDomainEvent where TEventHandler : IEventHandler<TEvent> { try { this.logger.LogDebug("Processing event: {TEvent}.", typeof(TEvent).Name);
using var scope = this.serviceProvider.CreateScope();
var context = scope.ServiceProvider.GetRequiredService<IEventContext>();
var body = eventArguments.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var @event = JsonSerializer.Deserialize<TEvent>(message);
context.SetCurrentDomainEvent(@event);
var eventHandler = scope.ServiceProvider.GetRequiredService<TEventHandler>();
await eventHandler.HandleAsync(@event, cancellationToken).ConfigureAwait(false);
await channel.BasicAckAsync(deliveryTag: eventArguments.DeliveryTag, multiple: false, cancellationToken: cancellationToken); } catch (CodeDesignPlusException ex) { this.logger.LogWarning(ex, "Warning processing event: {TEvent} | {Message}.", typeof(TEvent).Name, ex.Message); } catch (Exception ex) { this.logger.LogError(ex, "Error processing event: {TEvent} | {Message}.", typeof(TEvent).Name, ex.Message); } finally { await channel.BasicNackAsync(deliveryTag: eventArguments.DeliveryTag, multiple: false, requeue: false, cancellationToken: cancellationToken); } }
/// <summary> /// Configures the RabbitMQ queue. /// </summary> /// <param name="channel">The RabbitMQ channel.</param> /// <param name="queue">The queue name.</param> /// <param name="exchangeName">The exchange name.</param> private async Task ConfigQueueAsync(IChannel channel, string queue, string exchangeName) { argumentsQueue["x-dead-letter-exchange"] = GetExchangeNameDlx(exchangeName); await channel.ExchangeDeclareAsync(exchange: exchangeName, type: ExchangeType.Fanout, durable: true); await channel.QueueDeclareAsync(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: argumentsQueue); await channel.QueueBindAsync(queue: queue, exchange: exchangeName, routingKey: string.Empty); }
/// <summary> /// Configures the dead-letter exchange (DLX) queue. /// </summary> /// <param name="channel">The RabbitMQ channel.</param> /// <param name="queue">The queue name.</param> /// <param name="exchangeName">The exchange name.</param> private static async Task ConfigQueueDlxAsync(IChannel channel, string queue, string exchangeName) { exchangeName = GetExchangeNameDlx(exchangeName); queue = GetQueueNameDlx(queue);
await channel.ExchangeDeclareAsync(exchange: exchangeName, type: ExchangeType.Fanout, durable: true); await channel.QueueDeclareAsync(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: null); await channel.QueueBindAsync(queue: queue, exchange: exchangeName, routingKey: string.Empty); }
/// <summary> /// Gets the dead-letter exchange (DLX) name. /// </summary> /// <param name="exchangeName">The original exchange name.</param> /// <returns>The DLX name.</returns> public static string GetExchangeNameDlx(string exchangeName) => $"{exchangeName}.dlx";
/// <summary> /// Gets the dead-letter queue (DLQ) name. /// </summary> /// <param name="queueName">The original queue name.</param> /// <returns>The DLQ name.</returns> public static string GetQueueNameDlx(string queueName) => $"{queueName}.dlx";
/// <summary> /// Unsubscribes from a domain event asynchronously. /// </summary> /// <typeparam name="TEvent">The type of the domain event.</typeparam> /// <typeparam name="TEventHandler">The type of the event handler.</typeparam> /// <param name="cancellationToken">A token to monitor for cancellation requests.</param> /// <returns>A task that represents the asynchronous unsubscribe operation.</returns> public async Task UnsubscribeAsync<TEvent, TEventHandler>(CancellationToken cancellationToken) where TEvent : IDomainEvent where TEventHandler : IEventHandler<TEvent> { var consumerTag = this.channelProvider.GetConsumerTag<TEvent, TEventHandler>();
if (!string.IsNullOrEmpty(consumerTag)) { var channel = await this.channelProvider.GetChannelConsumerAsync<TEvent, TEventHandler>(cancellationToken); await channel.BasicCancelAsync(consumerTag, cancellationToken: cancellationToken); logger.LogInformation("Unsubscribed from event: {TEvent}.", typeof(TEvent).Name); } }}
Conclusiones
ChannelProvider
ofrece una solución eficiente y segura para manejar los canales y exchanges de RabbitMQ en una aplicación .NET. La gestión centralizada de canales y exchanges, combinada con la reutilización de recursos, mejora el rendimiento y simplifica el código que interactúa con RabbitMQ. Es importante notar que se recomienda usar el tipo genérico de los metodos ExchangeDeclare
y GetChannelPublish
siempre que sea posible para evitar errores.