Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,16 @@ private static void AddRabbitMQ(

IConnectionFactory CreateConnectionFactory(IServiceProvider sp)
{
var connectionString = settings.ConnectionString;
// ensure the log forwarder is initialized
sp.GetRequiredService<RabbitMQEventSourceLogForwarder>().Start();

var factory = new ConnectionFactory();

var configurationOptionsSection = configSection.GetSection("ConnectionFactory");
configurationOptionsSection.Bind(factory);

// the connection string from settings should win over the one from the ConnectionFactory section
var connectionString = settings.ConnectionString;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this a bug before?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, this was just me being nit-picky. This variable isn't used until here, so I moved it since it seemed out of place.

if (!string.IsNullOrEmpty(connectionString))
{
factory.Uri = new(connectionString);
Expand All @@ -105,6 +107,8 @@ IConnectionFactory CreateConnectionFactory(IServiceProvider sp)
builder.Services.AddKeyedSingleton<IConnection>(serviceKey, (sp, key) => CreateConnection(sp.GetRequiredKeyedService<IConnectionFactory>(key), settings.MaxConnectRetryCount));
}

builder.Services.AddSingleton<RabbitMQEventSourceLogForwarder>();

if (settings.Tracing)
{
// Note that RabbitMQ.Client v6.6 doesn't have built-in support for tracing. See https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1261
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
{
"definitions": {
"logLevel": {
"properties": {
"RabbitMQ.Client": {
"$ref": "#/definitions/logLevelThreshold"
}
}
}
},
"properties": {
"Aspire": {
"type": "object",
Expand Down
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commit message made my day 🤣

Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Collections;
using System.Diagnostics;
using System.Diagnostics.Tracing;
using Microsoft.Extensions.Logging;

namespace Aspire.RabbitMQ.Client;

internal sealed class RabbitMQEventSourceLogForwarder : IDisposable
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{
private static readonly Func<ErrorEventSourceEvent, Exception?, string> s_formatErrorEvent = FormatErrorEvent;
private static readonly Func<EventSourceEvent, Exception?, string> s_formatEvent = FormatEvent;

private readonly ILogger _logger;
private RabbitMQEventSourceListener? _listener;

public RabbitMQEventSourceLogForwarder(ILoggerFactory loggerFactory)
{
_logger = loggerFactory.CreateLogger("RabbitMQ.Client");
}

/// <summary>
/// Initiates the log forwarding from the RabbitMQ event sources to a provided <see cref="ILoggerFactory"/>, call <see cref="Dispose"/> to stop forwarding.
/// </summary>
public void Start()
{
_listener ??= new RabbitMQEventSourceListener(LogEvent, EventLevel.Verbose);
}

private void LogEvent(EventWrittenEventArgs eventData)
{
var level = MapLevel(eventData.Level);
var eventId = new EventId(eventData.EventId, eventData.EventName);

// Special case the Error event so the Exception Details are written correctly
if (eventData.EventId == 3 &&
eventData.EventName == "Error" &&
eventData.PayloadNames?.Count == 2 &&
eventData.Payload?.Count == 2 &&
eventData.PayloadNames[0] == "message" &&
eventData.PayloadNames[1] == "ex")
{
_logger.Log(level, eventId, new ErrorEventSourceEvent(eventData), null, s_formatErrorEvent);
}
else
{
Debug.Assert(
(eventData.EventId == 1 && eventData.EventName == "Info") ||
(eventData.EventId == 2 && eventData.EventName == "Warn"));

_logger.Log(level, eventId, new EventSourceEvent(eventData), null, s_formatEvent);
}
}

private static string FormatErrorEvent(ErrorEventSourceEvent eventSourceEvent, Exception? ex) =>
eventSourceEvent.EventData.Payload?[0]?.ToString() ?? "<empty>";

private static string FormatEvent(EventSourceEvent eventSourceEvent, Exception? ex) =>
eventSourceEvent.EventData.Payload?[0]?.ToString() ?? "<empty>";

public void Dispose() => _listener?.Dispose();

private static LogLevel MapLevel(EventLevel level) => level switch
{
EventLevel.Critical => LogLevel.Critical,
EventLevel.Error => LogLevel.Error,
EventLevel.Informational => LogLevel.Information,
EventLevel.Verbose => LogLevel.Debug,
EventLevel.Warning => LogLevel.Warning,
EventLevel.LogAlways => LogLevel.Information,
_ => throw new ArgumentOutOfRangeException(nameof(level), level, null),
};

private readonly struct EventSourceEvent : IReadOnlyList<KeyValuePair<string, object?>>
{
public EventWrittenEventArgs EventData { get; }

public EventSourceEvent(EventWrittenEventArgs eventData)
{
// only Info and Warn events are expected, which always have 'message' as the only payload
Debug.Assert(eventData.PayloadNames?.Count == 1 && eventData.PayloadNames[0] == "message");

EventData = eventData;
}

public IEnumerator<KeyValuePair<string, object?>> GetEnumerator()
{
for (var i = 0; i < Count; i++)
{
yield return this[i];
}
}

IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}

public int Count => EventData.PayloadNames?.Count ?? 0;

public KeyValuePair<string, object?> this[int index] => new(EventData.PayloadNames![index], EventData.Payload![index]);
}

private readonly struct ErrorEventSourceEvent : IReadOnlyList<KeyValuePair<string, object?>>
{
public EventWrittenEventArgs EventData { get; }

public ErrorEventSourceEvent(EventWrittenEventArgs eventData)
{
EventData = eventData;
}

public IEnumerator<KeyValuePair<string, object?>> GetEnumerator()
{
for (var i = 0; i < Count; i++)
{
yield return this[i];
}
}

IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}

public int Count => 5;

public KeyValuePair<string, object?> this[int index]
{
get
{
Debug.Assert(EventData.PayloadNames?.Count == 2 && EventData.Payload?.Count == 2);
Debug.Assert(EventData.PayloadNames[0] == "message");
Debug.Assert(EventData.PayloadNames[1] == "ex");

ArgumentOutOfRangeException.ThrowIfGreaterThanOrEqual(index, 5);

return index switch
{
0 => new(EventData.PayloadNames[0], EventData.Payload[0]),
< 5 => GetExData(EventData, index),
_ => throw new UnreachableException()
};

static KeyValuePair<string, object?> GetExData(EventWrittenEventArgs eventData, int index)
{
Debug.Assert(index >= 1 && index <= 4);
Debug.Assert(eventData.Payload?.Count == 2);
var exData = eventData.Payload[1] as IDictionary<string, object?>;
Debug.Assert(exData is not null && exData.Count == 4);

return index switch
{
1 => new("ex.Type", exData["Type"]),
2 => new("ex.Message", exData["Message"]),
3 => new("ex.StackTrace", exData["StackTrace"]),
4 => new("ex.InnerException", exData["InnerException"]),
_ => throw new UnreachableException()
};
}
}
}
}

/// <summary>
/// Implementation of <see cref="EventListener"/> that listens to events produced by the RabbitMQ.Client library.
/// </summary>
private sealed class RabbitMQEventSourceListener : EventListener
{
private readonly List<EventSource> _eventSources = new List<EventSource>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread safety?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think thread safety is a concern here since this List is only touched in between the base ctor running and this class's ctor running. Between that time, I don't think another thread can touch this object.

Note that this implementation was copied from the Azure SDK, and it hasn't changed in years. So I semi-trust it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure there's a bug here with _eventSources, but I'm also not sure what this accomplishes either. If I recall correctly, you're right that OnEventSourceCreated will be called from the base constructor, and so you do have to handle the case where the EventListener might not be fully initialized when events start to show up in OnEventWritten. That said, you already handle this on line 225 where you ensure that _log is non-null before calling Invoke. I think you can get rid of all of this _eventSources machinery.

I realize I'm late to the party here, but would be worth removing for simplicity, or if it doesn't work without this, identifying an issue that needs to be documented/fixed.


private readonly Action<EventWrittenEventArgs> _log;
private readonly EventLevel _level;

public RabbitMQEventSourceListener(Action<EventWrittenEventArgs> log, EventLevel level)
{
_log = log;
_level = level;

foreach (EventSource eventSource in _eventSources)
{
OnEventSourceCreated(eventSource);
}

_eventSources.Clear();
}

protected sealed override void OnEventSourceCreated(EventSource eventSource)
{
base.OnEventSourceCreated(eventSource);

if (_log == null)
{
_eventSources.Add(eventSource);
}
Comment on lines +194 to +197
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why store all event sources?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this implementation was copied from the Azure SDK.

Only the event sources that are added before the ctor fully finishes (i.e. _log == null) get added to the list. I think this is to handle the case where the base EventListener starts calling virtual methods on this object before the ctor has fully completed. This ensures those event sources are listened to.


if (eventSource.Name == "rabbitmq-dotnet-client" || eventSource.Name == "rabbitmq-client")
{
EnableEvents(eventSource, _level);
}
}

protected sealed override void OnEventWritten(EventWrittenEventArgs eventData)
{
// Workaround https://github.com/dotnet/corefx/issues/42600
if (eventData.EventId == -1)
{
return;
}

// There is a very tight race during the listener creation where EnableEvents was called
// and the thread producing events not observing the `_log` field assignment
_log?.Invoke(eventData);
}
}
}
2 changes: 1 addition & 1 deletion src/Components/Telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ Aspire.Npgsql.EntityFrameworkCore.PostgreSQL:

Aspire.RabbitMQ.Client:
- Log categories:
- TODO
- "RabbitMQ.Client"
- Activity source names:
- "Aspire.RabbitMQ.Client"
- Metric names:
Expand Down