Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using System.Threading;
using Marten.Testing.Harness;
using System.Threading.Tasks;
using Marten.Services;
using Xunit;

namespace CoreTests.Bugs;

public class Bug_3911_use_sticky_connections_when_otel_connection_tracking_is_on : BugIntegrationContext
{
[Fact]
public async Task do_not_blow_up()
{
StoreOptions(opts => opts.OpenTelemetry.TrackConnections = TrackLevel.Verbose);

using var conn = theSession.Connection;
}

[Fact]
public async Task do_not_blow_up_starting_an_async_transaction()
{
StoreOptions(opts => opts.OpenTelemetry.TrackConnections = TrackLevel.Verbose);

await theSession.BeginTransactionAsync(CancellationToken.None);

}
}
57 changes: 43 additions & 14 deletions src/Marten/Internal/Sessions/EventTracingConnectionLifetime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ImTools;
using JasperFx;
using JasperFx.Descriptors;
using Marten.Events.Operations;
Expand All @@ -17,14 +18,14 @@
namespace Marten.Internal.Sessions;

internal class EventTracingConnectionLifetime:
IConnectionLifetime
IConnectionLifetime, ITransactionStarter
{
private const string MartenCommandExecutionStarted = "marten.command.execution.started";
private const string MartenBatchExecutionStarted = "marten.batch.execution.started";
private const string MartenBatchPagesExecutionStarted = "marten.batch.pages.execution.started";
private readonly IConnectionLifetime _innerConnectionLifetime;
private readonly OpenTelemetryOptions _telemetryOptions;
private readonly Activity? _databaseActivity;
private readonly string _tenantId;

public EventTracingConnectionLifetime(IConnectionLifetime innerConnectionLifetime, string tenantId,
OpenTelemetryOptions telemetryOptions)
Expand All @@ -38,24 +39,36 @@ public EventTracingConnectionLifetime(IConnectionLifetime innerConnectionLifetim

Logger = innerConnectionLifetime.Logger;
CommandTimeout = innerConnectionLifetime.CommandTimeout;
_innerConnectionLifetime = innerConnectionLifetime;
InnerConnectionLifetime = innerConnectionLifetime;
_telemetryOptions = telemetryOptions;

var currentActivity = Activity.Current ?? null;
var tags = new ActivityTagsCollection(new[] { new KeyValuePair<string, object?>(OtelConstants.TenantId, tenantId) });
var tags = new ActivityTagsCollection([new KeyValuePair<string, object?>(OtelConstants.TenantId, tenantId)]);
_databaseActivity = MartenTracing.StartConnectionActivity(currentActivity, tags);

_tenantId = tenantId;
}

public EventTracingConnectionLifetime(OpenTelemetryOptions telemetryOptions, Activity? databaseActivity, IConnectionLifetime innerConnectionLifetime, IMartenSessionLogger logger)
{
_telemetryOptions = telemetryOptions;
_databaseActivity = databaseActivity;
InnerConnectionLifetime = innerConnectionLifetime;
Logger = logger;
}

public IConnectionLifetime InnerConnectionLifetime { get; }

public ValueTask DisposeAsync()
{
_databaseActivity?.Stop();
return _innerConnectionLifetime.DisposeAsync();
return InnerConnectionLifetime.DisposeAsync();
}

public void Dispose()
{
_databaseActivity?.Stop();
_innerConnectionLifetime.Dispose();
InnerConnectionLifetime.Dispose();
}

public IMartenSessionLogger Logger { get; set; }
Expand All @@ -66,7 +79,7 @@ public int Execute(NpgsqlCommand cmd)

try
{
return _innerConnectionLifetime.Execute(cmd);
return InnerConnectionLifetime.Execute(cmd);
}
catch (Exception e)
{
Expand All @@ -82,7 +95,7 @@ public int Execute(NpgsqlCommand cmd)

try
{
return await _innerConnectionLifetime.ExecuteAsync(command, token).ConfigureAwait(false);
return await InnerConnectionLifetime.ExecuteAsync(command, token).ConfigureAwait(false);
}
catch (Exception e)
{
Expand All @@ -98,7 +111,7 @@ public DbDataReader ExecuteReader(NpgsqlCommand command)

try
{
return _innerConnectionLifetime.ExecuteReader(command);
return InnerConnectionLifetime.ExecuteReader(command);
}
catch (Exception e)
{
Expand All @@ -114,7 +127,7 @@ public async Task<DbDataReader> ExecuteReaderAsync(NpgsqlCommand command, Cancel

try
{
return await _innerConnectionLifetime.ExecuteReaderAsync(command, token).ConfigureAwait(false);
return await InnerConnectionLifetime.ExecuteReaderAsync(command, token).ConfigureAwait(false);
}
catch (Exception e)
{
Expand All @@ -130,7 +143,7 @@ public DbDataReader ExecuteReader(NpgsqlBatch batch)

try
{
return _innerConnectionLifetime.ExecuteReader(batch);
return InnerConnectionLifetime.ExecuteReader(batch);
}
catch (Exception e)
{
Expand All @@ -146,7 +159,7 @@ public async Task<DbDataReader> ExecuteReaderAsync(NpgsqlBatch batch, Cancellati

try
{
return await _innerConnectionLifetime.ExecuteReaderAsync(batch, token).ConfigureAwait(false);
return await InnerConnectionLifetime.ExecuteReaderAsync(batch, token).ConfigureAwait(false);
}
catch (Exception e)
{
Expand All @@ -162,7 +175,7 @@ public void ExecuteBatchPages(IReadOnlyList<OperationPage> pages, List<Exception

try
{
_innerConnectionLifetime.ExecuteBatchPages(pages, exceptions);
InnerConnectionLifetime.ExecuteBatchPages(pages, exceptions);
writeVerboseEvents(pages);
}
catch (AggregateException e)
Expand All @@ -185,7 +198,7 @@ public async Task ExecuteBatchPagesAsync(IReadOnlyList<OperationPage> pages, Lis

try
{
await _innerConnectionLifetime.ExecuteBatchPagesAsync(pages, exceptions, token).ConfigureAwait(false);
await InnerConnectionLifetime.ExecuteBatchPagesAsync(pages, exceptions, token).ConfigureAwait(false);

writeVerboseEvents(pages);
}
Expand Down Expand Up @@ -223,4 +236,20 @@ private void writeVerboseEvents(IReadOnlyList<OperationPage> pages)
}
}
}

public IAlwaysConnectedLifetime Start()
{
if (InnerConnectionLifetime is ITransactionStarter starter) return starter.Start();

throw new InvalidOperationException(
$"The inner connection lifetime {InnerConnectionLifetime} does not implement {nameof(ITransactionStarter)}");
}

public Task<IAlwaysConnectedLifetime> StartAsync(CancellationToken token)
{
if (InnerConnectionLifetime is ITransactionStarter starter) return starter.StartAsync(token);

throw new InvalidOperationException(
$"The inner connection lifetime {InnerConnectionLifetime} does not implement {nameof(ITransactionStarter)}");
}
}
Loading