Skip to content

IActivityService

La interfaz IActivityService define el contrato para la gestión de actividades dentro del sistema PubSub. Esta interfaz proporciona métodos para iniciar, agregar, obtener, eliminar e inyectar actividades en eventos de dominio.

namespace CodeDesignPlus.Net.PubSub.Abstractions;
/// <summary>
/// Interface for managing activities within the PubSub system.
/// </summary>
public interface IActivityService
{
/// <summary>
/// Gets the activity source used for creating activities.
/// </summary>
ActivitySource ActivitySource { get; }
/// <summary>
/// Starts a new activity with the specified name and kind.
/// </summary>
/// <param name="name">The name of the activity.</param>
/// <param name="kind">The kind of activity.</param>
/// <param name="propagationContext">Optional propagation context for the activity.</param>
/// <returns>The started activity.</returns>
Activity StartActivity(string name, ActivityKind kind, PropagationContext? propagationContext = null);
/// <summary>
/// Adds an activity to the service with the specified ID.
/// </summary>
/// <param name="id">The ID of the activity.</param>
/// <param name="activity">The activity to add.</param>
/// <returns>True if the activity was added successfully; otherwise, false.</returns>
bool AddActivity(int id, Activity activity);
/// <summary>
/// Tries to get an activity with the specified ID.
/// </summary>
/// <param name="id">The ID of the activity.</param>
/// <param name="activity">When this method returns, contains the activity associated with the specified ID, if the ID is found; otherwise, null.</param>
/// <returns>True if the activity was found; otherwise, false.</returns>
bool TryGetActivity(int id, out Activity activity);
/// <summary>
/// Removes an activity with the specified ID.
/// </summary>
/// <param name="id">The ID of the activity.</param>
/// <param name="activity">When this method returns, contains the removed activity, if the activity was removed; otherwise, null.</param>
/// <returns>True if the activity was removed successfully; otherwise, false.</returns>
bool RemoveActivity(int id, out Activity activity);
/// <summary>
/// Injects the specified activity into the given domain event.
/// </summary>
/// <typeparam name="TDomainEvent">The type of the domain event.</typeparam>
/// <param name="activity">The activity to inject.</param>
/// <param name="domainEvent">The domain event to inject the activity into.</param>
void Inject<TDomainEvent>(Activity activity, TDomainEvent domainEvent) where TDomainEvent : IDomainEvent;
/// <summary>
/// Extracts the propagation context from the given domain event.
/// </summary>
/// <typeparam name="TDomainEvent">The type of the domain event.</typeparam>
/// <param name="domainEvent">The domain event to extract the propagation context from.</param>
/// <returns>The extracted propagation context.</returns>
PropagationContext Extract<TDomainEvent>(TDomainEvent domainEvent) where TDomainEvent : IDomainEvent;
}

Propiedades

La interfaz IActivityService define las siguientes propiedades para la gestión de actividades dentro del sistema PubSub.

ActivitySource

La propiedad ActivitySource se utiliza para obtener el origen de la actividad que se utiliza para crear actividades dentro del sistema PubSub.

Métodos


La interfaz IActivityService define los siguientes métodos para la gestión de actividades dentro del sistema PubSub.

StartActivity

Type: StartActivity(string name, ActivityKind kind, PropagationContext? propagationContext = null)

El método StartActivity se utiliza para iniciar una nueva actividad con el nombre y el tipo especificados. Este método devuelve la actividad iniciada.

AddActivity

Type: AddActivity(int id, Activity activity)

El método AddActivity se utiliza para agregar una actividad al servicio con el ID especificado. Este método devuelve true si la actividad se agregó correctamente; de lo contrario, devuelve false.

TryGetActivity

Type: TryGetActivity(int id, out Activity activity)

El método TryGetActivity se utiliza para intentar obtener una actividad con el ID especificado. Este método devuelve true si la actividad se encontró; de lo contrario, devuelve false.

RemoveActivity

Type: RemoveActivity(int id, out Activity activity)

El método RemoveActivity se utiliza para eliminar una actividad con el ID especificado. Este método devuelve true si la actividad se eliminó correctamente; de lo contrario, devuelve false.

Inject

Type: Inject<TDomainEvent>(Activity activity, TDomainEvent domainEvent) where TDomainEvent : IDomainEvent

El método Inject se utiliza para inyectar la actividad especificada en el evento de dominio dado. Este método inyecta el contexto de trazado en las propiedades básicas del evento de dominio.

Extract

Type: Extract<TDomainEvent>(TDomainEvent domainEvent) where TDomainEvent : IDomainEvent

El método Extract se utiliza para extraer el contexto de propagación del evento de dominio dado. Este método extrae el contexto de trazado de las propiedades básicas del evento de dominio.

Implementación


La implementación de la interfaz IActivityService se muestra a continuación. Esta implementación proporciona métodos para iniciar, agregar, obtener, eliminar e inyectar actividades en eventos de dominio.

namespace CodeDesignPlus.Net.PubSub.Diagnostics;
/// <summary>
/// Provides services for managing activity sources and context propagation.
/// </summary>
/// <remarks>
/// https://github.com/open-telemetry/opentelemetry-dotnet/blob/main/examples/MicroserviceExample/Utils/Messaging/MessageReceiver.cs
/// </remarks>
public class ActivitySourceService : IActivityService
{
/// <summary>
/// The activity source used for creating activities.
/// </summary>
public readonly ActivitySource ActivitySource;
private readonly ConcurrentDictionary<int, Activity> activityMap = new();
private readonly TextMapPropagator propagator = Propagators.DefaultTextMapPropagator;
/// <summary>
/// Initializes a new instance of the <see cref="ActivitySourceService"/> class.
/// </summary>
public ActivitySourceService()
{
var assemblyName = typeof(ActivitySourceService).Assembly.GetName();
this.ActivitySource = new(assemblyName.Name, assemblyName.Version.ToString());
}
/// <summary>
/// Gets the activity source.
/// </summary>
ActivitySource IActivityService.ActivitySource => this.ActivitySource;
/// <summary>
/// Adds an activity to the activity map.
/// </summary>
/// <param name="id">The identifier of the activity.</param>
/// <param name="activity">The activity to add.</param>
/// <returns>True if the activity was added successfully; otherwise, false.</returns>
public bool AddActivity(int id, Activity activity)
{
return this.activityMap.TryAdd(id, activity);
}
/// <summary>
/// Removes an activity from the activity map.
/// </summary>
/// <param name="id">The identifier of the activity.</param>
/// <param name="activity">The removed activity.</param>
/// <returns>True if the activity was removed successfully; otherwise, false.</returns>
public bool RemoveActivity(int id, out Activity activity)
{
return this.activityMap.TryRemove(id, out activity);
}
/// <summary>
/// Starts a new activity.
/// </summary>
/// <param name="name">The name of the activity.</param>
/// <param name="kind">The kind of the activity.</param>
/// <param name="propagationContext">The propagation context.</param>
/// <returns>The started activity.</returns>
public Activity StartActivity(string name, ActivityKind kind, PropagationContext? propagationContext = null)
{
if (propagationContext.HasValue)
return this.ActivitySource.StartActivity(name, kind, propagationContext.Value.ActivityContext);
return this.ActivitySource.StartActivity(name, kind);
}
/// <summary>
/// Tries to get an activity from the activity map.
/// </summary>
/// <param name="id">The identifier of the activity.</param>
/// <param name="activity">The retrieved activity.</param>
/// <returns>True if the activity was found; otherwise, false.</returns>
public bool TryGetActivity(int id, out Activity activity)
{
return this.activityMap.TryGetValue(id, out activity);
}
/// <summary>
/// Injects the trace context into the domain event.
/// </summary>
/// <typeparam name="TDomainEvent">The type of the domain event.</typeparam>
/// <param name="activity">The activity containing the trace context.</param>
/// <param name="domainEvent">The domain event.</param>
public void Inject<TDomainEvent>(Activity activity, TDomainEvent domainEvent) where TDomainEvent : IDomainEvent
{
ActivityContext contextToInject = default;
if (activity != null)
{
contextToInject = activity.Context;
}
else if (Activity.Current != null)
{
contextToInject = Activity.Current.Context;
}
this.propagator.Inject(new PropagationContext(contextToInject, Baggage.Current), domainEvent, InjectTraceContextIntoBasicProperties);
}
/// <summary>
/// Extracts the trace context from the domain event.
/// </summary>
/// <typeparam name="TDomainEvent">The type of the domain event.</typeparam>
/// <param name="domainEvent">The domain event.</param>
/// <returns>The extracted propagation context.</returns>
public PropagationContext Extract<TDomainEvent>(TDomainEvent domainEvent) where TDomainEvent : IDomainEvent
{
var parentContext = this.propagator.Extract(default, domainEvent, ExtractTraceContextFromBasicProperties);
Baggage.Current = parentContext.Baggage;
return parentContext;
}
/// <summary>
/// Extracts the trace context from the basic properties of the domain event.
/// </summary>
/// <typeparam name="TDomainEvent">The type of the domain event.</typeparam>
/// <param name="event">The domain event.</param>
/// <param name="key">The key of the trace context.</param>
/// <returns>The extracted trace context values.</returns>
internal static IEnumerable<string> ExtractTraceContextFromBasicProperties<TDomainEvent>(TDomainEvent @event, string key) where TDomainEvent : IDomainEvent
{
if (@event.Metadata.TryGetValue(key, out var value))
{
return new[] { value.ToString() };
}
return Array.Empty<string>();
}
/// <summary>
/// Injects the trace context into the basic properties of the domain event.
/// </summary>
/// <typeparam name="TDomainEvent">The type of the domain event.</typeparam>
/// <param name="domainEvent">The domain event.</param>
/// <param name="key">The key of the trace context.</param>
/// <param name="value">The value of the trace context.</param>
internal static void InjectTraceContextIntoBasicProperties<TDomainEvent>(TDomainEvent domainEvent, string key, string value) where TDomainEvent : IDomainEvent
{
Console.WriteLine($"Injecting {key}: {value}");
domainEvent.Metadata.Add(key, value);
}
}

Ejemplo de Uso


La implementación de la interfaz IActivityService se utiliza en el servicio EventQueueService para gestionar actividades y la propagación de contexto dentro del sistema PubSub. A continuación se muestra la implementación del servicio EventQueueService que utiliza la interfaz IActivityService para gestionar actividades y la propagación de contexto.

namespace CodeDesignPlus.Net.PubSub.Services;
/// <summary>
/// Provides a service to manage the event queue.
/// </summary>
public class EventQueueService : IEventQueue
{
private readonly ConcurrentQueue<IDomainEvent> queue = new();
private readonly ILogger<EventQueueService> logger;
private readonly IMessage message;
private readonly PubSubOptions options;
private readonly IActivityService activityService;
/// <summary>
/// Initializes a new instance of the <see cref="EventQueueService"/> class.
/// </summary>
/// <param name="logger">The logger to manage the logs.</param>
/// <param name="options">The options for the PubSub service.</param>
/// <param name="message">The message service to publish events.</param>
/// <param name="activityService">The activity service to manage activities (optional).</param>
public EventQueueService(ILogger<EventQueueService> logger, IOptions<PubSubOptions> options, IMessage message, IActivityService activityService = null)
{
ArgumentNullException.ThrowIfNull(logger);
ArgumentNullException.ThrowIfNull(options);
ArgumentNullException.ThrowIfNull(message);
this.logger = logger;
this.message = message;
this.options = options.Value;
this.activityService = activityService;
this.logger.LogDebug("EventQueueService initialized.");
}
/// <summary>
/// Enqueues an event to the queue.
/// </summary>
/// <param name="event">The event to enqueue.</param>
/// <param name="cancellationToken">A cancellation token used to propagate notifications that operations should be canceled.</param>
/// <returns>A task representing the asynchronous operation.</returns>
public Task EnqueueAsync(IDomainEvent @event, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(@event);
var exist = this.queue.Any(x => x.Equals(@event));
if (!exist)
{
var activity = this.activityService?.StartActivity("EventQueueService.EnqueueAsync", ActivityKind.Internal);
this.activityService?.Inject(activity, @event);
activity?.AddTag("event.type", @event.GetType().Name);
activity?.AddTag("event.id", @event.EventId.ToString());
activity?.AddTag("event.aggregate_id", @event.AggregateId.ToString());
this.queue.Enqueue(@event);
this.logger.LogDebug("Event of type {Name} enqueued.", @event.GetType().Name);
activity?.SetStatus(ActivityStatusCode.Ok);
activity?.Stop();
}
else
{
this.logger.LogWarning("Event of type {Name} was already in the queue. Skipping.", @event.GetType().Name);
}
return Task.CompletedTask;
}
/// <summary>
/// Dequeues events from the queue and processes them.
/// </summary>
/// <param name="token">A cancellation token used to propagate notifications that operations should be canceled.</param>
/// <returns>A task representing the asynchronous operation.</returns>
public async Task DequeueAsync(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
try
{
if (this.queue.TryDequeue(out IDomainEvent @event))
{
this.logger.LogDebug("Dequeueing event of type {TEvent}.", @event.GetType().Name);
var parentContext = this.activityService?.Extract(@event);
var activity = this.activityService?.StartActivity("EventQueueService.DequeueAsync", ActivityKind.Internal, parentContext);
activity?.AddTag("event.type", @event.GetType().Name);
activity?.AddTag("event.id", @event.EventId.ToString());
activity?.AddTag("event.aggregate_id", @event.AggregateId.ToString());
await this.message.PublishAsync(@event, token).ConfigureAwait(false);
activity?.SetStatus(ActivityStatusCode.Ok);
activity?.Stop();
}
else
{
this.logger.LogDebug("No events in the queue. Waiting...");
await Task.Delay(TimeSpan.FromSeconds(this.options.SecondsWaitQueue), CancellationToken.None);
}
}
catch (Exception ex)
{
this.logger.LogError(ex, "Error processing event.");
}
}
this.logger.LogWarning("DequeueAsync stopped due to cancellation token.");
}
}

Conclusiones


La interfaz IActivityService y su implementación ActivitySourceService proporcionan un conjunto robusto de herramientas para gestionar actividades y la propagación de contexto dentro del sistema PubSub. Estas herramientas permiten a los desarrolladores iniciar, agregar, obtener, eliminar e inyectar actividades en eventos de dominio, facilitando la creación de sistemas de eventos escalables y trazables.