diff --git a/.github/workflows/pr_validation.yml b/.github/workflows/pr_validation.yml index cb9d20e8..c67efdbb 100644 --- a/.github/workflows/pr_validation.yml +++ b/.github/workflows/pr_validation.yml @@ -47,7 +47,7 @@ jobs: - name: "dotnet test" shell: bash - run: dotnet test -c Release + run: dotnet test -c Release - name: "dotnet pack" run: dotnet pack -c Release -o ./bin/nuget diff --git a/Akka.Hosting.sln b/Akka.Hosting.sln index e6ad9bf7..69c1f7fb 100644 --- a/Akka.Hosting.sln +++ b/Akka.Hosting.sln @@ -44,6 +44,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Remote.Hosting.Tests", EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Hosting.LoggingDemo", "src\Examples\Akka.Hosting.LoggingDemo\Akka.Hosting.LoggingDemo.csproj", "{298D7727-FDC6-49B2-9030-CC7F0E09B0B8}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Hosting.OpenTelemetry.Demo", "src\Examples\Akka.Hosting.OpenTelemetry.Demo\Akka.Hosting.OpenTelemetry.Demo.csproj", "{6BCB9636-DAD2-4364-8A83-3D2888FE5AB2}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Hosting.OpenTelemetry.AppHost", "src\Examples\Akka.Hosting.OpenTelemetry.AppHost\Akka.Hosting.OpenTelemetry.AppHost.csproj", "{CC22F505-B648-420E-BC8A-0FCE46082986}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -110,6 +114,14 @@ Global {298D7727-FDC6-49B2-9030-CC7F0E09B0B8}.Debug|Any CPU.Build.0 = Debug|Any CPU {298D7727-FDC6-49B2-9030-CC7F0E09B0B8}.Release|Any CPU.ActiveCfg = Release|Any CPU {298D7727-FDC6-49B2-9030-CC7F0E09B0B8}.Release|Any CPU.Build.0 = Release|Any CPU + {6BCB9636-DAD2-4364-8A83-3D2888FE5AB2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {6BCB9636-DAD2-4364-8A83-3D2888FE5AB2}.Debug|Any CPU.Build.0 = Debug|Any CPU + {6BCB9636-DAD2-4364-8A83-3D2888FE5AB2}.Release|Any CPU.ActiveCfg = Release|Any CPU + {6BCB9636-DAD2-4364-8A83-3D2888FE5AB2}.Release|Any CPU.Build.0 = Release|Any CPU + {CC22F505-B648-420E-BC8A-0FCE46082986}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {CC22F505-B648-420E-BC8A-0FCE46082986}.Debug|Any CPU.Build.0 = Debug|Any CPU + {CC22F505-B648-420E-BC8A-0FCE46082986}.Release|Any CPU.ActiveCfg = Release|Any CPU + {CC22F505-B648-420E-BC8A-0FCE46082986}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -118,6 +130,8 @@ Global {5F6A7BE8-6906-46CE-BA1C-72EA11EFA33B} = {EFA970FF-6BCC-4C38-84D8-324D40F2BF03} {4F79325B-9EE7-4501-800F-7A1F8DFBCC80} = {EFA970FF-6BCC-4C38-84D8-324D40F2BF03} {298D7727-FDC6-49B2-9030-CC7F0E09B0B8} = {EFA970FF-6BCC-4C38-84D8-324D40F2BF03} + {6BCB9636-DAD2-4364-8A83-3D2888FE5AB2} = {EFA970FF-6BCC-4C38-84D8-324D40F2BF03} + {CC22F505-B648-420E-BC8A-0FCE46082986} = {EFA970FF-6BCC-4C38-84D8-324D40F2BF03} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {B99E6BB8-642A-4A68-86DF-69567CBA700A} diff --git a/Directory.Build.props b/Directory.Build.props index aeeacccf..24b68fd0 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -3,12 +3,16 @@ Copyright © 2013-$([System.DateTime]::Now.Year) Akka.NET Team Akka.NET Team 1.5.59 - **Bug Fixes** + **New Features** +* [Add OpenTelemetry trace correlation support for LoggerFactoryLogger](https://github.com/akkadotnet/Akka.Hosting/issues/700) - enables proper trace correlation for logs emitted from actor code. When using Akka.NET 1.5.59+, `LogEvent.ActivityContext` captures trace context at log creation time and flows it through to OpenTelemetry `LogRecord`s via the new `AkkaTraceContextProcessor`. + +**Bug Fixes** * [Fix semantic logging not capturing named placeholders as structured properties](https://github.com/akkadotnet/Akka.Hosting/pull/702) - resolved [issue #701](https://github.com/akkadotnet/Akka.Hosting/issues/701) where named placeholders like `{Event}` in log messages were not captured as searchable structured properties. Made all `LoggerConfigBuilder` properties optional and refactored message formatting code. * [Fix TestKit startup timeout race condition](https://github.com/akkadotnet/Akka.Hosting/pull/705) - resolved race condition in `TestKit.InitializeAsync()` where `CancellationTokenSource.Register()` threw exceptions on the timer thread, causing unhandled exceptions that crashed the test host process. Also increased default startup timeout from 10s to 30s for CI environments. **Updates** -* [Bump Akka version from 1.5.58 to 1.5.59](https://github.com/akkadotnet/akka.net/releases/tag/1.5.59) +* [Bump Akka version from 1.5.58 to 1.5.59](https://github.com/akkadotnet/akka.net/releases/tag/1.5.59) +* Added OpenTelemetry dependency (1.9.0+) for trace correlation support akkalogo.png https://github.com/akkadotnet/Akka.Hosting @@ -31,8 +35,8 @@ 6.0.3 3.1.5 1.5.59 - [6.0.0,) - [6.0.10,) + [8.0.0,) + [8.0.5,) @@ -50,4 +54,4 @@ true snupkg - \ No newline at end of file + diff --git a/README.md b/README.md index a4e16c6d..e1e8902b 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ See the ["Introduction to Akka.Hosting - HOCON-less, "Pit of Success" Akka.NET R * [Microsoft.Extensions.Logging.ILoggerFactory Logging Support](#microsoftextensionsloggingiloggerfactory-logging-support) * [Serilog Support](#serilog-support) * [Microsoft.Extensions.Logging Log Event Filtering](#microsoftextensionslogging-log-event-filtering) +- [OpenTelemetry Trace Correlation](#opentelemetry-trace-correlation) - [Microsoft.Extensions.Diagnostics.HealthChecks Integration](#healthchecks) * [Dependency Injected Health Checks](#healthcheck-di) * [Built-in HealthChecks](#healthcheck-builtin) @@ -629,6 +630,46 @@ To set up the `Microsoft.Extensions.Logging` log filtering, you will need to edi [Back to top](#akkahosting) + +# OpenTelemetry Trace Correlation + +Akka.NET processes log events asynchronously, which means `Activity.Current` does not flow across actor mailbox boundaries. To preserve trace correlation, Akka.Hosting captures the `ActivityContext` at log creation time and includes it in the log state. The `AkkaTraceContextProcessor` then applies that context to OpenTelemetry `LogRecord`s so exporters can correlate logs with traces. + +Minimal setup: + +```csharp +using Akka.Hosting; +using Akka.Hosting.Logging; +using OpenTelemetry.Logs; +using OpenTelemetry.Resources; + +builder.Logging.AddOpenTelemetry(options => +{ + options.SetResourceBuilder(ResourceBuilder.CreateDefault() + .AddService("my-service")); + + // Register before exporters + options.AddAkkaTraceCorrelation(); + + // Add OTLP exporter if you have not configured it elsewhere. + // Your mileage may vary; use the OpenTelemetry configuration that fits your app. + options.AddOtlpExporter(); +}); + +builder.Services.AddAkka("MySystem", configBuilder => +{ + configBuilder.ConfigureLoggers(setup => + { + setup.ClearLoggers(); + setup.AddLoggerFactory(); + }); +}); +``` + +See the demo in `src/Examples/Akka.Hosting.OpenTelemetry.AppHost` and `src/Examples/Akka.Hosting.OpenTelemetry.Demo` for a working Aspire setup. + +[Back to top](#akkahosting) + ## Filtering Logs In Akka.NET diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 498e3831..126d451e 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,4 +1,7 @@ -#### 1.5.59 January 26th 2026 #### +#### 1.5.59 January 2026 #### + +**New Features** +* [Add OpenTelemetry trace correlation support for LoggerFactoryLogger](https://github.com/akkadotnet/Akka.Hosting/issues/700) - enables proper trace correlation for logs emitted from actor code. Solves the problem that `Activity.Current` doesn't flow across actor mailbox boundaries because it uses `AsyncLocal`. When using Akka.NET 1.5.59+, `LogEvent.ActivityContext` captures trace context at log creation time and flows it through to OpenTelemetry `LogRecord`s via the new `AkkaTraceContextProcessor`. Register with `options.AddAkkaTraceCorrelation()` in your OpenTelemetry logging configuration. **Bug Fixes** * [Fix semantic logging not capturing named placeholders as structured properties](https://github.com/akkadotnet/Akka.Hosting/pull/702) - resolved [issue #701](https://github.com/akkadotnet/Akka.Hosting/issues/701) where named placeholders like `{Event}` in log messages were not captured as searchable structured properties. Made all `LoggerConfigBuilder` properties optional and refactored message formatting code. @@ -6,6 +9,7 @@ **Updates** * [Bump Akka version from 1.5.58 to 1.5.59](https://github.com/akkadotnet/akka.net/releases/tag/1.5.59) +* Added `OpenTelemetry` package dependency (1.9.0+) for trace correlation support #### 1.5.58 January 9th 2026 #### @@ -79,4 +83,4 @@ * [Added dependency-injected health checks](https://github.com/akkadotnet/Akka.Hosting/pull/659) - `WithHealthCheck()` generic methods for DI-resolved health checks **Updates** -* [Bump Akka version from 1.5.50 to 1.5.51](https://github.com/akkadotnet/akka.net/releases/tag/1.5.51) \ No newline at end of file +* [Bump Akka version from 1.5.50 to 1.5.51](https://github.com/akkadotnet/akka.net/releases/tag/1.5.51) diff --git a/src/Akka.Cluster.Hosting.Tests/ClusterOptionsSpec.cs b/src/Akka.Cluster.Hosting.Tests/ClusterOptionsSpec.cs index c3bf2bc4..fd4db96d 100644 --- a/src/Akka.Cluster.Hosting.Tests/ClusterOptionsSpec.cs +++ b/src/Akka.Cluster.Hosting.Tests/ClusterOptionsSpec.cs @@ -144,7 +144,7 @@ public void ClusterOptionsConfigurationTest() using var stream = new MemoryStream(Encoding.UTF8.GetBytes(json)); var jsonConfig = new ConfigurationBuilder().AddJsonStream(stream).Build(); - var clusterOptions = jsonConfig.GetSection("Akka:ClusterOptions").Get(); + var clusterOptions = jsonConfig.GetSection("Akka:ClusterOptions").Get()!; clusterOptions.SplitBrainResolver = jsonConfig.GetSection("Akka:KeepMajorityOption").Get(); var builder = new AkkaConfigurationBuilder(new ServiceCollection(), "") diff --git a/src/Akka.Cluster.Hosting.Tests/XUnitLogger.cs b/src/Akka.Cluster.Hosting.Tests/XUnitLogger.cs index b2cb0f1d..e1dc4763 100644 --- a/src/Akka.Cluster.Hosting.Tests/XUnitLogger.cs +++ b/src/Akka.Cluster.Hosting.Tests/XUnitLogger.cs @@ -58,7 +58,7 @@ public bool IsEnabled(LogLevel logLevel) }; } - public IDisposable BeginScope(TState state) + public IDisposable? BeginScope(TState state) where TState : notnull { throw new NotImplementedException(); } diff --git a/src/Akka.Hosting.API.Tests/verify/CoreApiSpec.ApproveCore.verified.txt b/src/Akka.Hosting.API.Tests/verify/CoreApiSpec.ApproveCore.verified.txt index 34c8bd1f..e54f8bb0 100644 --- a/src/Akka.Hosting.API.Tests/verify/CoreApiSpec.ApproveCore.verified.txt +++ b/src/Akka.Hosting.API.Tests/verify/CoreApiSpec.ApproveCore.verified.txt @@ -99,6 +99,10 @@ namespace Akka.Hosting public static Akka.Hosting.AkkaConfigurationBuilder WithHealthCheck(this Akka.Hosting.AkkaConfigurationBuilder builder, string name, Akka.Hosting.IAkkaHealthCheck healthCheck, Microsoft.Extensions.Diagnostics.HealthChecks.HealthStatus? failureStatus = default, System.Collections.Generic.IEnumerable? tags = null, System.TimeSpan? timeout = default) { } public static Akka.Hosting.AkkaConfigurationBuilder WithHealthCheck(this Akka.Hosting.AkkaConfigurationBuilder builder, string name, System.Func> healthCheck, Microsoft.Extensions.Diagnostics.HealthChecks.HealthStatus? failureStatus = default, System.Collections.Generic.IEnumerable? tags = null, System.TimeSpan? timeout = default) { } } + public static class AkkaOpenTelemetryExtensions + { + public static OpenTelemetry.Logs.OpenTelemetryLoggerOptions AddAkkaTraceCorrelation(this OpenTelemetry.Logs.OpenTelemetryLoggerOptions options) { } + } public class DeadLetterOptions { public DeadLetterOptions() { } @@ -251,6 +255,11 @@ namespace Akka.Hosting.HealthChecks } namespace Akka.Hosting.Logging { + public sealed class AkkaTraceContextProcessor : OpenTelemetry.BaseProcessor + { + public AkkaTraceContextProcessor() { } + public override void OnEnd(OpenTelemetry.Logs.LogRecord data) { } + } public class LoggerFactoryLogger : Akka.Actor.ActorBase, Akka.Dispatch.IRequiresMessageQueue { protected readonly Akka.Event.ILoggingAdapter InternalLogger; @@ -264,4 +273,4 @@ namespace Akka.Hosting.Logging public LoggerFactorySetup(Microsoft.Extensions.Logging.ILoggerFactory loggerFactory) { } public Microsoft.Extensions.Logging.ILoggerFactory LoggerFactory { get; } } -} \ No newline at end of file +} diff --git a/src/Akka.Hosting.API.Tests/verify/CoreApiSpec.ApproveTestKit.verified.txt b/src/Akka.Hosting.API.Tests/verify/CoreApiSpec.ApproveTestKit.verified.txt index f6f9cf28..db03077d 100644 --- a/src/Akka.Hosting.API.Tests/verify/CoreApiSpec.ApproveTestKit.verified.txt +++ b/src/Akka.Hosting.API.Tests/verify/CoreApiSpec.ApproveTestKit.verified.txt @@ -9,7 +9,8 @@ namespace Akka.Hosting.TestKit.Internals public class XUnitLogger : Microsoft.Extensions.Logging.ILogger { public XUnitLogger(string category, Xunit.Abstractions.ITestOutputHelper helper, Microsoft.Extensions.Logging.LogLevel logLevel) { } - public System.IDisposable BeginScope(TState state) { } + public System.IDisposable? BeginScope(TState state) + where TState : notnull { } public bool IsEnabled(Microsoft.Extensions.Logging.LogLevel logLevel) { } public void Log(Microsoft.Extensions.Logging.LogLevel logLevel, Microsoft.Extensions.Logging.EventId eventId, TState state, System.Exception? exception, System.Func formatter) { } } diff --git a/src/Akka.Hosting.TestKit.Tests/Akka.Hosting.TestKit.Tests.csproj b/src/Akka.Hosting.TestKit.Tests/Akka.Hosting.TestKit.Tests.csproj index a22c86ae..d3a479da 100644 --- a/src/Akka.Hosting.TestKit.Tests/Akka.Hosting.TestKit.Tests.csproj +++ b/src/Akka.Hosting.TestKit.Tests/Akka.Hosting.TestKit.Tests.csproj @@ -9,7 +9,7 @@ - + diff --git a/src/Akka.Hosting.TestKit/Akka.Hosting.TestKit.csproj b/src/Akka.Hosting.TestKit/Akka.Hosting.TestKit.csproj index df752999..b697283c 100644 --- a/src/Akka.Hosting.TestKit/Akka.Hosting.TestKit.csproj +++ b/src/Akka.Hosting.TestKit/Akka.Hosting.TestKit.csproj @@ -11,7 +11,7 @@ - + diff --git a/src/Akka.Hosting.TestKit/Internals/XUnitLogger.cs b/src/Akka.Hosting.TestKit/Internals/XUnitLogger.cs index eb04b2e4..8dc90a7d 100644 --- a/src/Akka.Hosting.TestKit/Internals/XUnitLogger.cs +++ b/src/Akka.Hosting.TestKit/Internals/XUnitLogger.cs @@ -68,7 +68,7 @@ public bool IsEnabled(LogLevel logLevel) }; } - public IDisposable BeginScope(TState state) + public IDisposable? BeginScope(TState state) where TState : notnull { return NullScope.Instance; } diff --git a/src/Akka.Hosting.Tests/Akka.Hosting.Tests.csproj b/src/Akka.Hosting.Tests/Akka.Hosting.Tests.csproj index bdd3853a..eaaf90af 100644 --- a/src/Akka.Hosting.Tests/Akka.Hosting.Tests.csproj +++ b/src/Akka.Hosting.Tests/Akka.Hosting.Tests.csproj @@ -8,7 +8,7 @@ - + diff --git a/src/Akka.Hosting.Tests/Logging/AkkaLogStateSpecs.cs b/src/Akka.Hosting.Tests/Logging/AkkaLogStateSpecs.cs new file mode 100644 index 00000000..82698992 --- /dev/null +++ b/src/Akka.Hosting.Tests/Logging/AkkaLogStateSpecs.cs @@ -0,0 +1,261 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using Akka.Hosting.Logging; +using FluentAssertions; +using Xunit; + +namespace Akka.Hosting.Tests.Logging; + +public class AkkaLogStateSpecs +{ + [Fact] + public void AkkaLogState_WithSemanticProperties_ShouldYieldAllProperties() + { + // Arrange + var traceId = ActivityTraceId.CreateRandom(); + var spanId = ActivitySpanId.CreateRandom(); + var activityContext = new ActivityContext(traceId, spanId, ActivityTraceFlags.Recorded); + + var semanticProperties = new Dictionary + { + { "UserId", 123 }, + { "Action", "Login" } + }; + + var actorPath = "akka://test/user/myactor"; + var timestamp = DateTimeOffset.UtcNow; + var threadId = 42; + var logSource = "MyActor"; + var template = "User {UserId} performed {Action}"; + var formattedMessage = "User 123 performed Login"; + + // Act + var state = new AkkaLogState( + activityContext, + semanticProperties, + actorPath, + timestamp, + threadId, + logSource, + template, + formattedMessage); + + var items = state.ToList(); + + // Assert + // Should have: 3 trace context + 2 semantic props + 4 Akka metadata + 1 OriginalFormat = 10 + items.Should().HaveCount(10); + + // Verify trace context (should be ActivityTraceId/ActivitySpanId, not strings) + items.Should().Contain(kvp => kvp.Key == AkkaLogState.TraceIdKey && kvp.Value is ActivityTraceId); + items.Should().Contain(kvp => kvp.Key == AkkaLogState.SpanIdKey && kvp.Value is ActivitySpanId); + items.Should().Contain(kvp => kvp.Key == AkkaLogState.TraceFlagsKey && (int)kvp.Value! == (int)ActivityTraceFlags.Recorded); + + // Verify semantic properties + items.Should().Contain(kvp => kvp.Key == "UserId" && (int)kvp.Value! == 123); + items.Should().Contain(kvp => kvp.Key == "Action" && (string)kvp.Value! == "Login"); + + // Verify Akka metadata + items.Should().Contain(kvp => kvp.Key == "ActorPath" && (string)kvp.Value! == actorPath); + items.Should().Contain(kvp => kvp.Key == "Timestamp" && (DateTimeOffset)kvp.Value! == timestamp); + items.Should().Contain(kvp => kvp.Key == "Thread" && (int)kvp.Value! == threadId); + items.Should().Contain(kvp => kvp.Key == "LogSource" && (string)kvp.Value! == logSource); + + // Verify OriginalFormat + items.Should().Contain(kvp => kvp.Key == "{OriginalFormat}" && (string)kvp.Value! == template); + } + + [Fact] + public void AkkaLogState_WithoutTraceContext_ShouldOmitTraceProperties() + { + // Arrange + var activityContext = default(ActivityContext); // No trace context + + var semanticProperties = new Dictionary + { + { "Message", "Hello" } + }; + + // Act + var state = new AkkaLogState( + activityContext, + semanticProperties, + "akka://test/user/actor", + DateTimeOffset.UtcNow, + 1, + "Source", + "Template", + "Formatted"); + + var items = state.ToList(); + + // Assert + // Should have: 0 trace context + 1 semantic prop + 4 Akka metadata + 1 OriginalFormat = 6 + items.Should().HaveCount(6); + + // Should NOT contain trace context keys + items.Should().NotContain(kvp => kvp.Key == AkkaLogState.TraceIdKey); + items.Should().NotContain(kvp => kvp.Key == AkkaLogState.SpanIdKey); + items.Should().NotContain(kvp => kvp.Key == AkkaLogState.TraceFlagsKey); + } + + [Fact] + public void AkkaLogState_NonStructuredMessage_ShouldYieldMinimalProperties() + { + // Arrange + var traceId = ActivityTraceId.CreateRandom(); + var spanId = ActivitySpanId.CreateRandom(); + var activityContext = new ActivityContext(traceId, spanId, ActivityTraceFlags.None); + var message = "Plain text message"; + + // Act + var state = new AkkaLogState(activityContext, message); + var items = state.ToList(); + + // Assert + // Should have: 3 trace context + 1 OriginalFormat = 4 + items.Should().HaveCount(4); + + // Verify trace context is present + items.Should().Contain(kvp => kvp.Key == AkkaLogState.TraceIdKey); + items.Should().Contain(kvp => kvp.Key == AkkaLogState.SpanIdKey); + items.Should().Contain(kvp => kvp.Key == AkkaLogState.TraceFlagsKey); + + // Verify OriginalFormat + items.Should().Contain(kvp => kvp.Key == "{OriginalFormat}" && (string)kvp.Value! == message); + + // Should NOT contain Akka metadata (ActorPath, etc.) + items.Should().NotContain(kvp => kvp.Key == "ActorPath"); + } + + [Fact] + public void AkkaLogState_NonStructuredMessage_WithoutTraceContext_ShouldYieldOnlyOriginalFormat() + { + // Arrange + var activityContext = default(ActivityContext); + var message = "Plain text message"; + + // Act + var state = new AkkaLogState(activityContext, message); + var items = state.ToList(); + + // Assert + // Should have: 0 trace context + 1 OriginalFormat = 1 + items.Should().HaveCount(1); + items.Single().Key.Should().Be("{OriginalFormat}"); + items.Single().Value.Should().Be(message); + } + + [Fact] + public void AkkaLogState_ToString_ShouldReturnFormattedMessage() + { + // Arrange + var semanticProperties = new Dictionary { { "Name", "World" } }; + var formattedMessage = "Hello World!"; + + var state = new AkkaLogState( + default, + semanticProperties, + "path", + DateTimeOffset.UtcNow, + 1, + "source", + "Hello {Name}!", + formattedMessage); + + // Act & Assert + state.ToString().Should().Be(formattedMessage); + } + + [Fact] + public void AkkaLogState_ShouldBeEnumerableMultipleTimes() + { + // Arrange + var semanticProperties = new Dictionary { { "Key", "Value" } }; + var state = new AkkaLogState( + default, + semanticProperties, + "path", + DateTimeOffset.UtcNow, + 1, + "source", + "template", + "formatted"); + + // Act - enumerate multiple times + var firstPass = state.ToList(); + var secondPass = state.ToList(); + + // Assert + firstPass.Should().BeEquivalentTo(secondPass); + } + + [Fact] + public void AkkaLogState_TraceContext_ShouldStoreStructsDirectly() + { + // Arrange - this test verifies we're not allocating strings for TraceId/SpanId + var traceId = ActivityTraceId.CreateRandom(); + var spanId = ActivitySpanId.CreateRandom(); + var activityContext = new ActivityContext(traceId, spanId, ActivityTraceFlags.Recorded); + + var state = new AkkaLogState( + activityContext, + new Dictionary(), + "path", + DateTimeOffset.UtcNow, + 1, + "source", + "template", + "formatted"); + + // Act + var items = state.ToDictionary(kvp => kvp.Key, kvp => kvp.Value); + + // Assert - values should be the actual struct types, not strings + items[AkkaLogState.TraceIdKey].Should().BeOfType(); + items[AkkaLogState.SpanIdKey].Should().BeOfType(); + items[AkkaLogState.TraceFlagsKey].Should().BeOfType(); + + // Verify the actual values match + ((ActivityTraceId)items[AkkaLogState.TraceIdKey]!).Should().Be(traceId); + ((ActivitySpanId)items[AkkaLogState.SpanIdKey]!).Should().Be(spanId); + ((int)items[AkkaLogState.TraceFlagsKey]!).Should().Be((int)ActivityTraceFlags.Recorded); + } + + [Fact] + public void AkkaLogState_EmptySemanticProperties_ShouldStillYieldAkkaMetadata() + { + // Arrange + var emptyProperties = new Dictionary(); + + var state = new AkkaLogState( + default, + emptyProperties, + "akka://test/user/actor", + DateTimeOffset.UtcNow, + 99, + "TestSource", + "template", + "formatted"); + + // Act + var items = state.ToList(); + + // Assert + // Should have: 0 trace context + 0 semantic props + 4 Akka metadata + 1 OriginalFormat = 5 + items.Should().HaveCount(5); + items.Should().Contain(kvp => kvp.Key == "ActorPath"); + items.Should().Contain(kvp => kvp.Key == "Timestamp"); + items.Should().Contain(kvp => kvp.Key == "Thread" && (int)kvp.Value! == 99); + items.Should().Contain(kvp => kvp.Key == "LogSource" && (string)kvp.Value! == "TestSource"); + items.Should().Contain(kvp => kvp.Key == "{OriginalFormat}"); + } +} diff --git a/src/Akka.Hosting.Tests/Logging/AkkaTraceContextProcessorSpecs.cs b/src/Akka.Hosting.Tests/Logging/AkkaTraceContextProcessorSpecs.cs new file mode 100644 index 00000000..ee4a4792 --- /dev/null +++ b/src/Akka.Hosting.Tests/Logging/AkkaTraceContextProcessorSpecs.cs @@ -0,0 +1,326 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Runtime.CompilerServices; +using Akka.Hosting.Logging; +using FluentAssertions; +using OpenTelemetry.Logs; +using Xunit; + +namespace Akka.Hosting.Tests.Logging; + +public class AkkaTraceContextProcessorSpecs +{ + [Fact] + public void Processor_ShouldExtractTraceContext_FromActivityTraceIdAndSpanId() + { + // Arrange + var processor = new AkkaTraceContextProcessor(); + var traceId = ActivityTraceId.CreateRandom(); + var spanId = ActivitySpanId.CreateRandom(); + var traceFlags = ActivityTraceFlags.Recorded; + + var attributes = new List> + { + new(AkkaLogState.TraceIdKey, traceId), + new(AkkaLogState.SpanIdKey, spanId), + new(AkkaLogState.TraceFlagsKey, (int)traceFlags) + }; + + var logRecord = CreateLogRecord(attributes); + + // Act + processor.OnEnd(logRecord); + + // Assert + logRecord.TraceId.Should().Be(traceId); + logRecord.SpanId.Should().Be(spanId); + logRecord.TraceFlags.Should().Be(traceFlags); + } + + [Fact] + public void Processor_ShouldExtractTraceContext_FromStringRepresentation() + { + // Arrange + var processor = new AkkaTraceContextProcessor(); + var traceId = ActivityTraceId.CreateRandom(); + var spanId = ActivitySpanId.CreateRandom(); + + // Use string representation (backwards compatibility path) + var attributes = new List> + { + new(AkkaLogState.TraceIdKey, traceId.ToString()), + new(AkkaLogState.SpanIdKey, spanId.ToString()), + new(AkkaLogState.TraceFlagsKey, (int)ActivityTraceFlags.None) + }; + + var logRecord = CreateLogRecord(attributes); + + // Act + processor.OnEnd(logRecord); + + // Assert + logRecord.TraceId.Should().Be(traceId); + logRecord.SpanId.Should().Be(spanId); + } + + [Fact] + public void Processor_ShouldSkip_WhenTraceIdAlreadySet() + { + // Arrange + var processor = new AkkaTraceContextProcessor(); + var existingTraceId = ActivityTraceId.CreateRandom(); + var existingSpanId = ActivitySpanId.CreateRandom(); + var newTraceId = ActivityTraceId.CreateRandom(); + var newSpanId = ActivitySpanId.CreateRandom(); + + var attributes = new List> + { + new(AkkaLogState.TraceIdKey, newTraceId), + new(AkkaLogState.SpanIdKey, newSpanId), + new(AkkaLogState.TraceFlagsKey, (int)ActivityTraceFlags.Recorded) + }; + + var logRecord = CreateLogRecordWithExistingTrace(attributes, existingTraceId, existingSpanId); + + // Act + processor.OnEnd(logRecord); + + // Assert - should keep existing trace context + logRecord.TraceId.Should().Be(existingTraceId); + logRecord.SpanId.Should().Be(existingSpanId); + } + + [Fact] + public void Processor_ShouldSkip_WhenNoAttributes() + { + // Arrange + var processor = new AkkaTraceContextProcessor(); + var logRecord = CreateLogRecord(null); + + // Act + processor.OnEnd(logRecord); + + // Assert - should remain default + logRecord.TraceId.Should().Be(default(ActivityTraceId)); + logRecord.SpanId.Should().Be(default(ActivitySpanId)); + } + + [Fact] + public void Processor_ShouldSkip_WhenMissingTraceId() + { + // Arrange + var processor = new AkkaTraceContextProcessor(); + var spanId = ActivitySpanId.CreateRandom(); + + var attributes = new List> + { + // Missing TraceId + new(AkkaLogState.SpanIdKey, spanId), + new(AkkaLogState.TraceFlagsKey, 0) + }; + + var logRecord = CreateLogRecord(attributes); + + // Act + processor.OnEnd(logRecord); + + // Assert - should not set partial trace context + logRecord.TraceId.Should().Be(default(ActivityTraceId)); + logRecord.SpanId.Should().Be(default(ActivitySpanId)); + } + + [Fact] + public void Processor_ShouldSkip_WhenMissingSpanId() + { + // Arrange + var processor = new AkkaTraceContextProcessor(); + var traceId = ActivityTraceId.CreateRandom(); + + var attributes = new List> + { + new(AkkaLogState.TraceIdKey, traceId), + // Missing SpanId + new(AkkaLogState.TraceFlagsKey, 0) + }; + + var logRecord = CreateLogRecord(attributes); + + // Act + processor.OnEnd(logRecord); + + // Assert - should not set partial trace context + logRecord.TraceId.Should().Be(default(ActivityTraceId)); + logRecord.SpanId.Should().Be(default(ActivitySpanId)); + } + + [Fact] + public void Processor_ShouldHandle_InvalidStringTraceId() + { + // Arrange + var processor = new AkkaTraceContextProcessor(); + var spanId = ActivitySpanId.CreateRandom(); + + var attributes = new List> + { + new(AkkaLogState.TraceIdKey, "invalid-trace-id"), // Invalid format + new(AkkaLogState.SpanIdKey, spanId), + new(AkkaLogState.TraceFlagsKey, 0) + }; + + var logRecord = CreateLogRecord(attributes); + + // Act + processor.OnEnd(logRecord); + + // Assert - should not crash, should not set invalid trace context + logRecord.TraceId.Should().Be(default(ActivityTraceId)); + } + + [Fact] + public void Processor_ShouldHandle_UnexpectedValueTypes() + { + // Arrange + var processor = new AkkaTraceContextProcessor(); + + var attributes = new List> + { + new(AkkaLogState.TraceIdKey, 12345), // Wrong type (int instead of ActivityTraceId or string) + new(AkkaLogState.SpanIdKey, new object()), // Wrong type + new(AkkaLogState.TraceFlagsKey, "not-an-int") + }; + + var logRecord = CreateLogRecord(attributes); + + // Act - should not throw + var act = () => processor.OnEnd(logRecord); + + // Assert + act.Should().NotThrow(); + logRecord.TraceId.Should().Be(default(ActivityTraceId)); + } + + [Fact] + public void Processor_ShouldDefaultTraceFlags_WhenNotProvided() + { + // Arrange + var processor = new AkkaTraceContextProcessor(); + var traceId = ActivityTraceId.CreateRandom(); + var spanId = ActivitySpanId.CreateRandom(); + + var attributes = new List> + { + new(AkkaLogState.TraceIdKey, traceId), + new(AkkaLogState.SpanIdKey, spanId) + // No TraceFlags + }; + + var logRecord = CreateLogRecord(attributes); + + // Act + processor.OnEnd(logRecord); + + // Assert + logRecord.TraceId.Should().Be(traceId); + logRecord.SpanId.Should().Be(spanId); + logRecord.TraceFlags.Should().Be(ActivityTraceFlags.None); + } + + [Fact] + public void Processor_ShouldWork_WithAkkaLogState() + { + // Arrange - integration test with actual AkkaLogState + var processor = new AkkaTraceContextProcessor(); + var traceId = ActivityTraceId.CreateRandom(); + var spanId = ActivitySpanId.CreateRandom(); + var activityContext = new ActivityContext(traceId, spanId, ActivityTraceFlags.Recorded); + + var state = new AkkaLogState( + activityContext, + new Dictionary { { "Key", "Value" } }, + "akka://test/user/actor", + DateTimeOffset.UtcNow, + 1, + "TestSource", + "Template with {Key}", + "Template with Value"); + + // Convert state to attributes list (simulating what MEL does) + var attributes = new List>(); + foreach (var kvp in state) + { + attributes.Add(kvp); + } + + var logRecord = CreateLogRecord(attributes); + + // Act + processor.OnEnd(logRecord); + + // Assert + logRecord.TraceId.Should().Be(traceId); + logRecord.SpanId.Should().Be(spanId); + logRecord.TraceFlags.Should().Be(ActivityTraceFlags.Recorded); + } + + /// + /// Creates a LogRecord instance using UnsafeAccessor to call the internal constructor. + /// + [UnsafeAccessor(UnsafeAccessorKind.Constructor)] + private static extern LogRecord CreateLogRecordInstance(); + + /// + /// Sets the Attributes property on a LogRecord using UnsafeAccessor. + /// + [UnsafeAccessor(UnsafeAccessorKind.Method, Name = "set_Attributes")] + private static extern void SetAttributes(LogRecord record, IReadOnlyList>? value); + + /// + /// Sets the TraceId property on a LogRecord using UnsafeAccessor. + /// + [UnsafeAccessor(UnsafeAccessorKind.Method, Name = "set_TraceId")] + private static extern void SetTraceId(LogRecord record, ActivityTraceId value); + + /// + /// Sets the SpanId property on a LogRecord using UnsafeAccessor. + /// + [UnsafeAccessor(UnsafeAccessorKind.Method, Name = "set_SpanId")] + private static extern void SetSpanId(LogRecord record, ActivitySpanId value); + + /// + /// Creates a LogRecord with the specified attributes. + /// + private static LogRecord CreateLogRecord(IReadOnlyList>? attributes) + { + var logRecord = CreateLogRecordInstance(); + + if (attributes != null) + { + SetAttributes(logRecord, attributes); + } + + return logRecord; + } + + /// + /// Creates a LogRecord with existing trace context set. + /// + private static LogRecord CreateLogRecordWithExistingTrace( + IReadOnlyList>? attributes, + ActivityTraceId traceId, + ActivitySpanId spanId) + { + var logRecord = CreateLogRecord(attributes); + + SetTraceId(logRecord, traceId); + SetSpanId(logRecord, spanId); + + return logRecord; + } +} diff --git a/src/Akka.Hosting.Tests/Logging/Issue701SemanticLoggingRegressionSpecs.cs b/src/Akka.Hosting.Tests/Logging/Issue701SemanticLoggingRegressionSpecs.cs index f2922056..ebdc829a 100644 --- a/src/Akka.Hosting.Tests/Logging/Issue701SemanticLoggingRegressionSpecs.cs +++ b/src/Akka.Hosting.Tests/Logging/Issue701SemanticLoggingRegressionSpecs.cs @@ -61,7 +61,7 @@ private void AssertMetadata(BugReproLogEntry entry, string? format = null) entry.State.Should().ContainKey("{OriginalFormat}"); entry.State["ActorPath"].Should().BeOfType(); - entry.State["Timestamp"].Should().BeOfType(); + entry.State["Timestamp"].Should().BeOfType(); entry.State["Thread"].Should().BeOfType(); entry.State["LogSource"].Should().BeOfType(); entry.State["{OriginalFormat}"].Should().BeOfType(); @@ -277,7 +277,7 @@ public void Log(LogLevel logLevel, EventId eventId, TState state, public bool IsEnabled(LogLevel logLevel) => true; - public IDisposable BeginScope(TState state) => EmptyDisposable.Instance; + public IDisposable BeginScope(TState state) where TState : notnull => EmptyDisposable.Instance; } public class BugReproLogEntry diff --git a/src/Akka.Hosting.Tests/Logging/SemanticLoggingSpecs.cs b/src/Akka.Hosting.Tests/Logging/SemanticLoggingSpecs.cs index cc240984..f4544c44 100644 --- a/src/Akka.Hosting.Tests/Logging/SemanticLoggingSpecs.cs +++ b/src/Akka.Hosting.Tests/Logging/SemanticLoggingSpecs.cs @@ -377,7 +377,7 @@ public void Log(LogLevel logLevel, EventId eventId, TState state, public bool IsEnabled(LogLevel logLevel) => true; - public IDisposable BeginScope(TState state) => EmptyDisposable.Instance; + public IDisposable? BeginScope(TState state) where TState : notnull => EmptyDisposable.Instance; } public class LogEntry diff --git a/src/Akka.Hosting.Tests/Logging/TestLogger.cs b/src/Akka.Hosting.Tests/Logging/TestLogger.cs index dcf3b53d..6345e67c 100644 --- a/src/Akka.Hosting.Tests/Logging/TestLogger.cs +++ b/src/Akka.Hosting.Tests/Logging/TestLogger.cs @@ -76,7 +76,7 @@ public bool IsEnabled(LogLevel logLevel) return true; } - public IDisposable BeginScope(TState state) + public IDisposable? BeginScope(TState state) where TState : notnull { return EmptyDisposable.Instance; } diff --git a/src/Akka.Hosting.Tests/XUnitLogger.cs b/src/Akka.Hosting.Tests/XUnitLogger.cs index f7283aed..eb389143 100644 --- a/src/Akka.Hosting.Tests/XUnitLogger.cs +++ b/src/Akka.Hosting.Tests/XUnitLogger.cs @@ -58,7 +58,7 @@ public bool IsEnabled(LogLevel logLevel) }; } - public IDisposable BeginScope(TState state) + public IDisposable? BeginScope(TState state) where TState : notnull { throw new NotImplementedException(); } diff --git a/src/Akka.Hosting/Akka.Hosting.csproj b/src/Akka.Hosting/Akka.Hosting.csproj index d21640e2..af8459bd 100644 --- a/src/Akka.Hosting/Akka.Hosting.csproj +++ b/src/Akka.Hosting/Akka.Hosting.csproj @@ -12,5 +12,6 @@ + diff --git a/src/Akka.Hosting/AkkaOpenTelemetryExtensions.cs b/src/Akka.Hosting/AkkaOpenTelemetryExtensions.cs new file mode 100644 index 00000000..9563a946 --- /dev/null +++ b/src/Akka.Hosting/AkkaOpenTelemetryExtensions.cs @@ -0,0 +1,55 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +using Akka.Hosting.Logging; +using OpenTelemetry.Logs; + +namespace Akka.Hosting +{ + /// + /// Extension methods for integrating Akka.NET logging with OpenTelemetry. + /// + public static class AkkaOpenTelemetryExtensions + { + /// + /// Adds the Akka.NET trace correlation processor to the OpenTelemetry logging pipeline. + /// + /// The OpenTelemetry logger options. + /// The options instance for chaining. + /// + /// + /// This processor extracts trace context (TraceId, SpanId, TraceFlags) from Akka.NET + /// log events and applies them to OpenTelemetry + /// instances. This enables proper trace correlation for logs emitted from actor code, + /// solving the problem that doesn't + /// flow across actor mailbox boundaries. + /// + /// + /// Important: This processor should be registered before any exporters + /// to ensure the trace context is applied before logs are exported. + /// + /// + /// + /// builder.Logging.AddOpenTelemetry(options => + /// { + /// options.SetResourceBuilder(ResourceBuilder.CreateDefault() + /// .AddService("my-service")); + /// + /// // Register Akka trace correlation FIRST (before exporters) + /// options.AddAkkaTraceCorrelation(); + /// + /// options.AddOtlpExporter(); + /// }); + /// + /// + /// + public static OpenTelemetryLoggerOptions AddAkkaTraceCorrelation(this OpenTelemetryLoggerOptions options) + { + options.AddProcessor(new AkkaTraceContextProcessor()); + return options; + } + } +} diff --git a/src/Akka.Hosting/Logging/AkkaLogState.cs b/src/Akka.Hosting/Logging/AkkaLogState.cs new file mode 100644 index 00000000..a934595c --- /dev/null +++ b/src/Akka.Hosting/Logging/AkkaLogState.cs @@ -0,0 +1,151 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; +using System.Collections; +using System.Collections.Generic; +using System.Diagnostics; + +namespace Akka.Hosting.Logging +{ + /// + /// Log state struct that carries both structured log properties and OpenTelemetry trace context. + /// Implements for compatibility with Microsoft.Extensions.Logging. + /// + /// + /// + /// This struct is designed for minimal allocations - it holds references to existing data + /// and yields values lazily via the enumerator rather than copying into a new collection. + /// + /// + /// This struct is used to propagate trace context (TraceId, SpanId, TraceFlags) from the + /// originating actor thread to the logging infrastructure, solving the problem that + /// doesn't flow across actor mailbox boundaries because + /// it uses . + /// + /// + internal readonly struct AkkaLogState : IEnumerable> + { + /// + /// Key for the trace ID in the log state dictionary. + /// + public const string TraceIdKey = "Akka.TraceId"; + + /// + /// Key for the span ID in the log state dictionary. + /// + public const string SpanIdKey = "Akka.SpanId"; + + /// + /// Key for the trace flags in the log state dictionary. + /// + public const string TraceFlagsKey = "Akka.TraceFlags"; + + private readonly ActivityContext _activityContext; + private readonly IReadOnlyDictionary? _semanticProperties; + private readonly string _actorPath; + private readonly DateTimeOffset _timestamp; + private readonly int _threadId; + private readonly string _logSource; + private readonly string _template; + private readonly string _formattedMessage; + private readonly bool _hasSemanticProperties; + + /// + /// Creates an with trace context, semantic properties, and Akka metadata. + /// + /// The activity context containing trace correlation IDs. + /// Structured properties from the log message template (referenced, not copied). + /// The path of the actor that generated the log. + /// The timestamp when the log was created. + /// The managed thread ID of the originating thread. + /// The source of the log event. + /// The message template string. + /// The pre-formatted message for display. + public AkkaLogState( + ActivityContext activityContext, + IReadOnlyDictionary semanticProperties, + string actorPath, + DateTimeOffset timestamp, + int threadId, + string logSource, + string template, + string formattedMessage) + { + _activityContext = activityContext; + _semanticProperties = semanticProperties; + _actorPath = actorPath; + _timestamp = timestamp; + _threadId = threadId; + _logSource = logSource; + _template = template; + _formattedMessage = formattedMessage; + _hasSemanticProperties = true; + } + + /// + /// Creates an with trace context for non-structured messages. + /// + /// The activity context containing trace correlation IDs. + /// The formatted log message. + public AkkaLogState(ActivityContext activityContext, string formattedMessage) + { + _activityContext = activityContext; + _formattedMessage = formattedMessage; + _semanticProperties = null; + _actorPath = string.Empty; + _timestamp = default; + _threadId = 0; + _logSource = string.Empty; + _template = formattedMessage; + _hasSemanticProperties = false; + } + + /// + public IEnumerator> GetEnumerator() + { + // Yield trace context if present (store structs directly, no ToString() allocation) + if (_activityContext.TraceId != default) + { + yield return new KeyValuePair(TraceIdKey, _activityContext.TraceId); + yield return new KeyValuePair(SpanIdKey, _activityContext.SpanId); + yield return new KeyValuePair(TraceFlagsKey, (int)_activityContext.TraceFlags); + } + + if (_hasSemanticProperties) + { + // Yield semantic properties (referenced, not copied) + if (_semanticProperties != null) + { + foreach (var prop in _semanticProperties) + { + yield return new KeyValuePair(prop.Key, prop.Value); + } + } + + // Yield Akka metadata + yield return new KeyValuePair("ActorPath", _actorPath); + yield return new KeyValuePair("Timestamp", _timestamp); + yield return new KeyValuePair("Thread", _threadId); + yield return new KeyValuePair("LogSource", _logSource); + + // Yield OriginalFormat for MEL convention + yield return new KeyValuePair("{OriginalFormat}", _template); + } + else + { + // For non-structured messages, just yield OriginalFormat + yield return new KeyValuePair("{OriginalFormat}", _template); + } + } + + /// + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); + + /// + public override string ToString() => _formattedMessage; + } +} diff --git a/src/Akka.Hosting/Logging/AkkaTraceContextProcessor.cs b/src/Akka.Hosting/Logging/AkkaTraceContextProcessor.cs new file mode 100644 index 00000000..275f94c4 --- /dev/null +++ b/src/Akka.Hosting/Logging/AkkaTraceContextProcessor.cs @@ -0,0 +1,168 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; +using System.Diagnostics; +using OpenTelemetry; +using OpenTelemetry.Logs; + +namespace Akka.Hosting.Logging +{ + /// + /// OpenTelemetry log processor that extracts trace context from Akka.NET log events + /// and applies it to the . + /// + /// + /// + /// This processor solves the problem that doesn't flow + /// across actor mailbox boundaries because it uses . + /// + /// + /// When emits logs, it includes the original + /// captured at log creation time as attributes + /// (Akka.TraceId, Akka.SpanId, Akka.TraceFlags). This processor extracts those + /// attributes and sets them on the so that the log + /// is properly correlated with the originating trace. + /// + /// + public sealed class AkkaTraceContextProcessor : BaseProcessor + { + /// + public override void OnEnd(LogRecord data) + { + // Skip if TraceId is already set (e.g., from Activity.Current that happened to be present) + if (data.TraceId != default) + { + return; + } + + // Try to extract trace context from Akka log state attributes + var attributes = data.Attributes; + if (attributes == null) + { + return; + } + + ActivityTraceId? traceId = null; + ActivitySpanId? spanId = null; + ActivityTraceFlags traceFlags = ActivityTraceFlags.None; + + foreach (var attr in attributes) + { + switch (attr.Key) + { + case AkkaLogState.TraceIdKey: + traceId = ExtractTraceId(attr.Value); + break; + + case AkkaLogState.SpanIdKey: + spanId = ExtractSpanId(attr.Value); + break; + + case AkkaLogState.TraceFlagsKey when attr.Value is int flagsInt: + traceFlags = (ActivityTraceFlags)flagsInt; + break; + } + } + + // Only set trace context if we found both TraceId and SpanId + if (traceId.HasValue && spanId.HasValue) + { + SetTraceContext(data, traceId.Value, spanId.Value, traceFlags); + } + } + + private static ActivityTraceId? ExtractTraceId(object? value) + { + // Handle ActivityTraceId directly (no allocation path) + if (value is ActivityTraceId traceId) + { + return traceId; + } + + // Fallback: handle string representation (for backwards compatibility) + if (value is string traceIdStr) + { + return TryParseTraceId(traceIdStr); + } + + return null; + } + + private static ActivitySpanId? ExtractSpanId(object? value) + { + // Handle ActivitySpanId directly (no allocation path) + if (value is ActivitySpanId spanId) + { + return spanId; + } + + // Fallback: handle string representation (for backwards compatibility) + if (value is string spanIdStr) + { + return TryParseSpanId(spanIdStr); + } + + return null; + } + + private static void SetTraceContext(LogRecord record, ActivityTraceId traceId, ActivitySpanId spanId, ActivityTraceFlags traceFlags) + { + // LogRecord has internal setters, so we need to use reflection + try + { + var recordType = typeof(LogRecord); + + var traceIdProp = recordType.GetProperty("TraceId"); + var spanIdProp = recordType.GetProperty("SpanId"); + var traceFlagsProp = recordType.GetProperty("TraceFlags"); + + traceIdProp?.SetValue(record, traceId); + spanIdProp?.SetValue(record, spanId); + traceFlagsProp?.SetValue(record, traceFlags); + } + catch + { + // Silently ignore reflection failures + // The trace context attributes are still present in the log state + } + } + + private static ActivityTraceId? TryParseTraceId(string traceIdStr) + { + try + { + // ActivityTraceId expects a 32 character hex string + if (traceIdStr.Length == 32) + { + return ActivityTraceId.CreateFromString(traceIdStr.AsSpan()); + } + } + catch + { + // Ignore parse failures + } + return null; + } + + private static ActivitySpanId? TryParseSpanId(string spanIdStr) + { + try + { + // ActivitySpanId expects a 16 character hex string + if (spanIdStr.Length == 16) + { + return ActivitySpanId.CreateFromString(spanIdStr.AsSpan()); + } + } + catch + { + // Ignore parse failures + } + return null; + } + } +} diff --git a/src/Akka.Hosting/Logging/LoggerFactoryLogger.cs b/src/Akka.Hosting/Logging/LoggerFactoryLogger.cs index f007f18e..4debd06f 100644 --- a/src/Akka.Hosting/Logging/LoggerFactoryLogger.cs +++ b/src/Akka.Hosting/Logging/LoggerFactoryLogger.cs @@ -64,46 +64,61 @@ protected virtual void Log(LogEvent log, ActorPath path) { var logLevel = GetLogLevel(log.LogLevel()); - // Create state dictionary with structured properties from the log message - var state = new Dictionary(); + // Capture ActivityContext (Akka.NET 1.5.59+) for trace correlation + var activityContext = log.ActivityContext ?? default; // Use semantic logging to extract structured properties if (log.TryGetProperties(out var properties) && properties is not null) { - // Add structured properties from the message template - foreach (var prop in properties) - { - state[prop.Key] = prop.Value; - } - } - - // Add Akka metadata properties - state["ActorPath"] = path.ToString(); - state["Timestamp"] = log.Timestamp; - state["Thread"] = log.Thread.ManagedThreadId; - state["LogSource"] = log.LogSource; + var formattedMessage = SafeFormat(log); - // Add {OriginalFormat} key per MEL convention for structured logging - // This allows MEL sinks to recognize and preserve the message template - state["{OriginalFormat}"] = log.GetTemplate(); + // Include trace context and structured properties in AkkaLogState + var state = new AkkaLogState( + activityContext, + properties, + path.ToString(), + log.Timestamp, + log.Thread.ManagedThreadId, + log.LogSource, + log.GetTemplate(), + formattedMessage); - // Log with structured state - // Use LogMessage.ToString() which applies the correct formatter (SemanticLogMessageFormatter) - // instead of string.Format() which only works with positional {0} placeholders - _akkaLogger.Log(logLevel, new EventId(), state, log.Cause, (s, ex) => + _akkaLogger.Log(logLevel, new EventId(), state, log.Cause, + (s, ex) => formattedMessage); + } + else { - try + var formattedMessage = SafeFormat(log); + + if (log.ActivityContext.HasValue) { - return log.ToString(); + // Preserve trace context even for non-structured logs + var state = new AkkaLogState(activityContext, formattedMessage); + _akkaLogger.Log(logLevel, new EventId(), state, log.Cause, + (s, ex) => formattedMessage); } - catch + else { - if(log.Message is LogMessage msg) - return $"Received a malformed formatted message. Log level: [{log.LogLevel()}], Template: [{msg.Format}], args: [{string.Join(",", msg.Unformatted())}]"; - - return $"Received a malformed formatted message. Log level: [{log.LogLevel()}], Message: [{log.Message}]"; + // Fallback for non-structured messages without trace context + _akkaLogger.Log(logLevel, new EventId(), log, log.Cause, + (@event, exception) => formattedMessage); } - }); + } + } + + private static string SafeFormat(LogEvent log) + { + try + { + return log.ToString(); + } + catch + { + if (log.Message is LogMessage msg) + return $"Received a malformed formatted message. Log level: [{log.LogLevel()}], Template: [{msg.Format}], args: [{string.Join(",", msg.Unformatted())}]"; + + return $"Received a malformed formatted message. Log level: [{log.LogLevel()}], Message: [{log.Message}]"; + } } private static LogLevel GetLogLevel(Event.LogLevel level) @@ -118,4 +133,4 @@ private static LogLevel GetLogLevel(Event.LogLevel level) }; } } -} \ No newline at end of file +} diff --git a/src/Akka.Remote.Hosting.Tests/RemoteConfigurationSpecs.cs b/src/Akka.Remote.Hosting.Tests/RemoteConfigurationSpecs.cs index 29b2436c..0c8e6789 100644 --- a/src/Akka.Remote.Hosting.Tests/RemoteConfigurationSpecs.cs +++ b/src/Akka.Remote.Hosting.Tests/RemoteConfigurationSpecs.cs @@ -651,7 +651,7 @@ public async Task ClusterOptionsConfigurationTest() // arrange using var stream = new MemoryStream(Encoding.UTF8.GetBytes(json)); var jsonConfig = new ConfigurationBuilder().AddJsonStream(stream).Build(); - var remoteOptions = jsonConfig.GetSection("Akka:RemoteOptions").Get(); + var remoteOptions = jsonConfig.GetSection("Akka:RemoteOptions").Get()!; using var host = new HostBuilder().ConfigureServices(services => { diff --git a/src/Examples/Akka.Hosting.OpenTelemetry.AppHost/Akka.Hosting.OpenTelemetry.AppHost.csproj b/src/Examples/Akka.Hosting.OpenTelemetry.AppHost/Akka.Hosting.OpenTelemetry.AppHost.csproj new file mode 100644 index 00000000..0d9f9cee --- /dev/null +++ b/src/Examples/Akka.Hosting.OpenTelemetry.AppHost/Akka.Hosting.OpenTelemetry.AppHost.csproj @@ -0,0 +1,22 @@ + + + + + + Exe + net8.0 + enable + enable + false + 3f42e154-c98e-4c42-9f2b-8f8f8f8f8f8f + + + + + + + + + + + diff --git a/src/Examples/Akka.Hosting.OpenTelemetry.AppHost/Program.cs b/src/Examples/Akka.Hosting.OpenTelemetry.AppHost/Program.cs new file mode 100644 index 00000000..0dc72532 --- /dev/null +++ b/src/Examples/Akka.Hosting.OpenTelemetry.AppHost/Program.cs @@ -0,0 +1,12 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +var builder = DistributedApplication.CreateBuilder(args); + +// Add the Akka.NET demo service +builder.AddProject("demo"); + +builder.Build().Run(); diff --git a/src/Examples/Akka.Hosting.OpenTelemetry.Demo/Actors/TraceDemoActor.cs b/src/Examples/Akka.Hosting.OpenTelemetry.Demo/Actors/TraceDemoActor.cs new file mode 100644 index 00000000..9548b8a4 --- /dev/null +++ b/src/Examples/Akka.Hosting.OpenTelemetry.Demo/Actors/TraceDemoActor.cs @@ -0,0 +1,65 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +using Akka.Actor; +using Akka.Event; + +namespace Akka.Hosting.OpenTelemetry.Demo.Actors; + +/// +/// Messages for the trace demo actor. +/// +public sealed record ProcessRequest(string RequestId, string Data); +public sealed record ProcessResponse(string RequestId, string Result); + +/// +/// Actor that demonstrates OpenTelemetry trace correlation. +/// Logs are emitted with the trace context from the originating request, +/// even though Activity.Current doesn't flow across mailbox boundaries. +/// +public sealed class TraceDemoActor : ReceiveActor +{ + private readonly ILoggingAdapter _log = Context.GetLogger(); + + public TraceDemoActor() + { + Receive(HandleProcessRequest); + } + + private void HandleProcessRequest(ProcessRequest request) + { + // This log will include the TraceId and SpanId from the originating request + // even though we're on a different thread after crossing the mailbox boundary + _log.Info("Processing request {RequestId} with data: {Data}", request.RequestId, request.Data); + + // Simulate some processing + var result = $"Processed: {request.Data.ToUpperInvariant()}"; + + _log.Info("Completed processing request {RequestId}, result: {Result}", request.RequestId, result); + + Sender.Tell(new ProcessResponse(request.RequestId, result)); + } +} + +/// +/// Actor that forwards messages to demonstrate trace context flowing through actor chains. +/// +public sealed class ForwarderActor : ReceiveActor +{ + private readonly ILoggingAdapter _log = Context.GetLogger(); + private readonly IActorRef _target; + + public ForwarderActor(IActorRef target) + { + _target = target; + + Receive(request => + { + _log.Info("Forwarding request {RequestId} to processor", request.RequestId); + _target.Forward(request); + }); + } +} diff --git a/src/Examples/Akka.Hosting.OpenTelemetry.Demo/Akka.Hosting.OpenTelemetry.Demo.csproj b/src/Examples/Akka.Hosting.OpenTelemetry.Demo/Akka.Hosting.OpenTelemetry.Demo.csproj new file mode 100644 index 00000000..69d13994 --- /dev/null +++ b/src/Examples/Akka.Hosting.OpenTelemetry.Demo/Akka.Hosting.OpenTelemetry.Demo.csproj @@ -0,0 +1,22 @@ + + + + Exe + net8.0 + enable + enable + false + + + + + + + + + + + + + + diff --git a/src/Examples/Akka.Hosting.OpenTelemetry.Demo/Extensions.cs b/src/Examples/Akka.Hosting.OpenTelemetry.Demo/Extensions.cs new file mode 100644 index 00000000..a05c1ddb --- /dev/null +++ b/src/Examples/Akka.Hosting.OpenTelemetry.Demo/Extensions.cs @@ -0,0 +1,37 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +using OpenTelemetry.Trace; + +namespace Akka.Hosting.OpenTelemetry.Demo; + +/// +/// Extension methods for configuring Aspire service defaults. +/// +public static class Extensions +{ + /// + /// Adds Aspire service defaults including OpenTelemetry and health checks. + /// + public static IHostApplicationBuilder AddServiceDefaults(this IHostApplicationBuilder builder) + { + // Add service discovery + builder.Services.AddServiceDiscovery(); + + // Configure OpenTelemetry tracing + builder.Services.AddOpenTelemetry() + .WithTracing(tracing => + { + tracing.AddSource("Akka.Hosting.OpenTelemetry.Demo"); + tracing.AddOtlpExporter(); + }); + + // Add health checks + builder.Services.AddHealthChecks(); + + return builder; + } +} diff --git a/src/Examples/Akka.Hosting.OpenTelemetry.Demo/Program.cs b/src/Examples/Akka.Hosting.OpenTelemetry.Demo/Program.cs new file mode 100644 index 00000000..8eae3073 --- /dev/null +++ b/src/Examples/Akka.Hosting.OpenTelemetry.Demo/Program.cs @@ -0,0 +1,62 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System.Diagnostics; +using Akka.Actor; +using Akka.Hosting; +using Akka.Hosting.Logging; +using Akka.Hosting.OpenTelemetry.Demo; +using Akka.Hosting.OpenTelemetry.Demo.Actors; +using OpenTelemetry.Logs; +using OpenTelemetry.Resources; + +var builder = Host.CreateApplicationBuilder(args); + +// Add Aspire service defaults (service discovery + tracing) +builder.AddServiceDefaults(); + +// Configure OpenTelemetry logging with Akka trace correlation +builder.Logging.AddOpenTelemetry(options => +{ + options.SetResourceBuilder(ResourceBuilder.CreateDefault() + .AddService("akka-otel-demo")); + + // CRITICAL: Register Akka trace correlation processor FIRST (before exporters) + // This extracts trace context from Akka log attributes and applies it to LogRecords + options.AddAkkaTraceCorrelation(); + + // Include formatted message for easier debugging + options.IncludeFormattedMessage = true; + options.IncludeScopes = true; + + options.AddOtlpExporter(); +}); + +// Configure Akka.NET with LoggerFactoryLogger +builder.Services.AddAkka("TraceDemo", configBuilder => +{ + configBuilder.ConfigureLoggers(setup => + { + setup.ClearLoggers(); + setup.AddLoggerFactory(); + }); + + // Register actors + configBuilder.WithActors((system, registry, resolver) => + { + var processor = system.ActorOf(Props.Create(), "processor"); + var forwarder = system.ActorOf(Props.Create(() => new ForwarderActor(processor)), "forwarder"); + + registry.Register(processor); + registry.Register(forwarder); + }); +}); + +// Add background service to demonstrate trace correlation +builder.Services.AddHostedService(); + +var app = builder.Build(); +app.Run(); diff --git a/src/Examples/Akka.Hosting.OpenTelemetry.Demo/TraceDemoService.cs b/src/Examples/Akka.Hosting.OpenTelemetry.Demo/TraceDemoService.cs new file mode 100644 index 00000000..d9da9626 --- /dev/null +++ b/src/Examples/Akka.Hosting.OpenTelemetry.Demo/TraceDemoService.cs @@ -0,0 +1,101 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System.Diagnostics; +using Akka.Actor; +using Akka.Hosting; +using Akka.Hosting.OpenTelemetry.Demo.Actors; + +namespace Akka.Hosting.OpenTelemetry.Demo; + +/// +/// Background service that demonstrates OpenTelemetry trace correlation with Akka.NET actors. +/// Creates traced operations and sends messages to actors, verifying that logs from actors +/// are correlated with the originating trace. +/// +public sealed class TraceDemoService : BackgroundService +{ + private static readonly ActivitySource ActivitySource = new("Akka.Hosting.OpenTelemetry.Demo"); + + private readonly ILogger _logger; + private readonly IRequiredActor _processorActor; + private readonly IRequiredActor _forwarderActor; + private int _requestCounter; + + public TraceDemoService( + ILogger logger, + IRequiredActor processorActor, + IRequiredActor forwarderActor) + { + _logger = logger; + _processorActor = processorActor; + _forwarderActor = forwarderActor; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + // Wait a moment for the actor system to fully initialize + await Task.Delay(2000, stoppingToken); + + _logger.LogInformation("Starting trace correlation demo..."); + + while (!stoppingToken.IsCancellationRequested) + { + try + { + // Create a traced operation + using var activity = ActivitySource.StartActivity("ProcessMessage"); + + var requestId = $"REQ-{++_requestCounter:D4}"; + var data = $"Sample data for request {requestId}"; + + _logger.LogInformation("Sending request {RequestId} to actor within trace {TraceId}", + requestId, activity?.TraceId.ToString() ?? "none"); + + // Send directly to processor + var processor = await _processorActor.GetAsync(stoppingToken); + var response = await processor.Ask( + new ProcessRequest(requestId, data), + TimeSpan.FromSeconds(5), + stoppingToken); + + _logger.LogInformation("Received response for {RequestId}: {Result}", + response.RequestId, response.Result); + + // Now test forwarding through multiple actors + using var forwardActivity = ActivitySource.StartActivity("ForwardMessage"); + + var forwardRequestId = $"FWD-{_requestCounter:D4}"; + var forwardData = $"Forwarded data for request {forwardRequestId}"; + + _logger.LogInformation("Sending forwarded request {RequestId} within trace {TraceId}", + forwardRequestId, forwardActivity?.TraceId.ToString() ?? "none"); + + var forwarder = await _forwarderActor.GetAsync(stoppingToken); + var forwardResponse = await forwarder.Ask( + new ProcessRequest(forwardRequestId, forwardData), + TimeSpan.FromSeconds(5), + stoppingToken); + + _logger.LogInformation("Received forwarded response for {RequestId}: {Result}", + forwardResponse.RequestId, forwardResponse.Result); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + break; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error during trace demo iteration"); + } + + // Wait before next iteration + await Task.Delay(5000, stoppingToken); + } + + _logger.LogInformation("Trace correlation demo stopped."); + } +}