diff --git a/src/CoreTests/Bugs/Bug_3911_use_sticky_connections_when_otel_connection_tracking_is_on.cs b/src/CoreTests/Bugs/Bug_3911_use_sticky_connections_when_otel_connection_tracking_is_on.cs new file mode 100644 index 0000000000..bf3a2b22b8 --- /dev/null +++ b/src/CoreTests/Bugs/Bug_3911_use_sticky_connections_when_otel_connection_tracking_is_on.cs @@ -0,0 +1,27 @@ +using System.Threading; +using Marten.Testing.Harness; +using System.Threading.Tasks; +using Marten.Services; +using Xunit; + +namespace CoreTests.Bugs; + +public class Bug_3911_use_sticky_connections_when_otel_connection_tracking_is_on : BugIntegrationContext +{ + [Fact] + public async Task do_not_blow_up() + { + StoreOptions(opts => opts.OpenTelemetry.TrackConnections = TrackLevel.Verbose); + + using var conn = theSession.Connection; + } + + [Fact] + public async Task do_not_blow_up_starting_an_async_transaction() + { + StoreOptions(opts => opts.OpenTelemetry.TrackConnections = TrackLevel.Verbose); + + await theSession.BeginTransactionAsync(CancellationToken.None); + + } +} diff --git a/src/Marten/Internal/Sessions/EventTracingConnectionLifetime.cs b/src/Marten/Internal/Sessions/EventTracingConnectionLifetime.cs index b9abeb7942..b695ccafd5 100644 --- a/src/Marten/Internal/Sessions/EventTracingConnectionLifetime.cs +++ b/src/Marten/Internal/Sessions/EventTracingConnectionLifetime.cs @@ -6,6 +6,7 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; +using ImTools; using JasperFx; using JasperFx.Descriptors; using Marten.Events.Operations; @@ -17,14 +18,14 @@ namespace Marten.Internal.Sessions; internal class EventTracingConnectionLifetime: - IConnectionLifetime + IConnectionLifetime, ITransactionStarter { private const string MartenCommandExecutionStarted = "marten.command.execution.started"; private const string MartenBatchExecutionStarted = "marten.batch.execution.started"; private const string MartenBatchPagesExecutionStarted = "marten.batch.pages.execution.started"; - private readonly IConnectionLifetime _innerConnectionLifetime; private readonly OpenTelemetryOptions _telemetryOptions; private readonly Activity? _databaseActivity; + private readonly string _tenantId; public EventTracingConnectionLifetime(IConnectionLifetime innerConnectionLifetime, string tenantId, OpenTelemetryOptions telemetryOptions) @@ -38,24 +39,36 @@ public EventTracingConnectionLifetime(IConnectionLifetime innerConnectionLifetim Logger = innerConnectionLifetime.Logger; CommandTimeout = innerConnectionLifetime.CommandTimeout; - _innerConnectionLifetime = innerConnectionLifetime; + InnerConnectionLifetime = innerConnectionLifetime; _telemetryOptions = telemetryOptions; var currentActivity = Activity.Current ?? null; - var tags = new ActivityTagsCollection(new[] { new KeyValuePair(OtelConstants.TenantId, tenantId) }); + var tags = new ActivityTagsCollection([new KeyValuePair(OtelConstants.TenantId, tenantId)]); _databaseActivity = MartenTracing.StartConnectionActivity(currentActivity, tags); + + _tenantId = tenantId; } + public EventTracingConnectionLifetime(OpenTelemetryOptions telemetryOptions, Activity? databaseActivity, IConnectionLifetime innerConnectionLifetime, IMartenSessionLogger logger) + { + _telemetryOptions = telemetryOptions; + _databaseActivity = databaseActivity; + InnerConnectionLifetime = innerConnectionLifetime; + Logger = logger; + } + + public IConnectionLifetime InnerConnectionLifetime { get; } + public ValueTask DisposeAsync() { _databaseActivity?.Stop(); - return _innerConnectionLifetime.DisposeAsync(); + return InnerConnectionLifetime.DisposeAsync(); } public void Dispose() { _databaseActivity?.Stop(); - _innerConnectionLifetime.Dispose(); + InnerConnectionLifetime.Dispose(); } public IMartenSessionLogger Logger { get; set; } @@ -66,7 +79,7 @@ public int Execute(NpgsqlCommand cmd) try { - return _innerConnectionLifetime.Execute(cmd); + return InnerConnectionLifetime.Execute(cmd); } catch (Exception e) { @@ -82,7 +95,7 @@ public int Execute(NpgsqlCommand cmd) try { - return await _innerConnectionLifetime.ExecuteAsync(command, token).ConfigureAwait(false); + return await InnerConnectionLifetime.ExecuteAsync(command, token).ConfigureAwait(false); } catch (Exception e) { @@ -98,7 +111,7 @@ public DbDataReader ExecuteReader(NpgsqlCommand command) try { - return _innerConnectionLifetime.ExecuteReader(command); + return InnerConnectionLifetime.ExecuteReader(command); } catch (Exception e) { @@ -114,7 +127,7 @@ public async Task ExecuteReaderAsync(NpgsqlCommand command, Cancel try { - return await _innerConnectionLifetime.ExecuteReaderAsync(command, token).ConfigureAwait(false); + return await InnerConnectionLifetime.ExecuteReaderAsync(command, token).ConfigureAwait(false); } catch (Exception e) { @@ -130,7 +143,7 @@ public DbDataReader ExecuteReader(NpgsqlBatch batch) try { - return _innerConnectionLifetime.ExecuteReader(batch); + return InnerConnectionLifetime.ExecuteReader(batch); } catch (Exception e) { @@ -146,7 +159,7 @@ public async Task ExecuteReaderAsync(NpgsqlBatch batch, Cancellati try { - return await _innerConnectionLifetime.ExecuteReaderAsync(batch, token).ConfigureAwait(false); + return await InnerConnectionLifetime.ExecuteReaderAsync(batch, token).ConfigureAwait(false); } catch (Exception e) { @@ -162,7 +175,7 @@ public void ExecuteBatchPages(IReadOnlyList pages, List pages, Lis try { - await _innerConnectionLifetime.ExecuteBatchPagesAsync(pages, exceptions, token).ConfigureAwait(false); + await InnerConnectionLifetime.ExecuteBatchPagesAsync(pages, exceptions, token).ConfigureAwait(false); writeVerboseEvents(pages); } @@ -223,4 +236,20 @@ private void writeVerboseEvents(IReadOnlyList pages) } } } + + public IAlwaysConnectedLifetime Start() + { + if (InnerConnectionLifetime is ITransactionStarter starter) return starter.Start(); + + throw new InvalidOperationException( + $"The inner connection lifetime {InnerConnectionLifetime} does not implement {nameof(ITransactionStarter)}"); + } + + public Task StartAsync(CancellationToken token) + { + if (InnerConnectionLifetime is ITransactionStarter starter) return starter.StartAsync(token); + + throw new InvalidOperationException( + $"The inner connection lifetime {InnerConnectionLifetime} does not implement {nameof(ITransactionStarter)}"); + } }