diff --git a/src/CommandLineRunner/Program.cs b/src/CommandLineRunner/Program.cs index 52df0472e7..18bddcbaf9 100644 --- a/src/CommandLineRunner/Program.cs +++ b/src/CommandLineRunner/Program.cs @@ -43,7 +43,7 @@ public static IHostBuilder CreateHostBuilder(string[] args) { opts.Connection(ConnectionSource.ConnectionString); opts.RegisterDocumentType(); - opts.GeneratedCodeMode = TypeLoadMode.Static; + opts.GeneratedCodeMode = TypeLoadMode.Dynamic; // If you use compiled queries, you will need to register the // compiled query types with Marten ahead of time @@ -52,7 +52,7 @@ public static IHostBuilder CreateHostBuilder(string[] args) services.AddMarten(opts => { - opts.GeneratedCodeMode = TypeLoadMode.Static; + opts.GeneratedCodeMode = TypeLoadMode.Dynamic; opts.AutoCreateSchemaObjects = AutoCreate.All; opts.DatabaseSchemaName = "cli"; opts.DisableNpgsqlLogging = true; diff --git a/src/Marten/DocumentStore.EventStore.cs b/src/Marten/DocumentStore.EventStore.cs index be892e9886..e8acf45749 100644 --- a/src/Marten/DocumentStore.EventStore.cs +++ b/src/Marten/DocumentStore.EventStore.cs @@ -192,8 +192,12 @@ public async ValueTask> Sta { sessionOptions.Tracking = DocumentTracking.IdentityOnly; } + else + { + sessionOptions.Tracking = DocumentTracking.None; + } - var session = (DocumentSessionBase)IdentitySession(sessionOptions); + var session = (DocumentSessionBase)OpenSession(sessionOptions); var batch = new ProjectionUpdateBatch(Options.Projections, session, ShardExecutionMode.Rebuild, token) { ShouldApplyListeners = mode == ShardExecutionMode.Continuous && range.Events.Any() diff --git a/src/Marten/Events/Daemon/Internals/EventLoader.cs b/src/Marten/Events/Daemon/Internals/EventLoader.cs index ef1d847865..db4363da89 100644 --- a/src/Marten/Events/Daemon/Internals/EventLoader.cs +++ b/src/Marten/Events/Daemon/Internals/EventLoader.cs @@ -84,50 +84,57 @@ public async Task LoadAsync(EventRequest request, var runtime = request.Runtime; await using var reader = await session.ExecuteReaderAsync(_command, token).ConfigureAwait(false); - while (await reader.ReadAsync(token).ConfigureAwait(false)) + try { - try + while (await reader.ReadAsync(token).ConfigureAwait(false)) { - // as a decorator - var @event = await _storage.ResolveAsync(reader, token).ConfigureAwait(false); - - if (!await reader.IsDBNullAsync(_aggregateIndex, token).ConfigureAwait(false)) + try { - @event.AggregateTypeName = - await reader.GetFieldValueAsync(_aggregateIndex, token).ConfigureAwait(false); - } + // as a decorator + var @event = await _storage.ResolveAsync(reader, token).ConfigureAwait(false); - page.Add(@event); - } - catch (UnknownEventTypeException e) - { - if (request.ErrorOptions.SkipUnknownEvents) - { - runtime.Logger.EventUnknown(e.EventTypeName); - skippedEvents++; - } - else - { - // Let any other exception throw - throw; + if (!await reader.IsDBNullAsync(_aggregateIndex, token).ConfigureAwait(false)) + { + @event.AggregateTypeName = + await reader.GetFieldValueAsync(_aggregateIndex, token).ConfigureAwait(false); + } + + page.Add(@event); } - } - catch (EventDeserializationFailureException e) - { - if (request.ErrorOptions.SkipSerializationErrors) + catch (UnknownEventTypeException e) { - runtime.Logger.EventDeserializationException(e.InnerException!.GetType().Name!, e.Sequence); - runtime.Logger.EventDeserializationExceptionDebug(e); - await runtime.RecordDeadLetterEventAsync(e.ToDeadLetterEvent(request.Name)).ConfigureAwait(false); - skippedEvents++; + if (request.ErrorOptions.SkipUnknownEvents) + { + runtime.Logger.EventUnknown(e.EventTypeName); + skippedEvents++; + } + else + { + // Let any other exception throw + throw; + } } - else + catch (EventDeserializationFailureException e) { - // Let any other exception throw - throw; + if (request.ErrorOptions.SkipSerializationErrors) + { + runtime.Logger.EventDeserializationException(e.InnerException!.GetType().Name!, e.Sequence); + runtime.Logger.EventDeserializationExceptionDebug(e); + await runtime.RecordDeadLetterEventAsync(e.ToDeadLetterEvent(request.Name)).ConfigureAwait(false); + skippedEvents++; + } + else + { + // Let any other exception throw + throw; + } } } } + finally + { + await reader.CloseAsync().ConfigureAwait(false); + } page.CalculateCeiling(_batchSize, request.HighWater, skippedEvents); diff --git a/src/Marten/Events/Daemon/Internals/ProjectionDocumentSession.cs b/src/Marten/Events/Daemon/Internals/ProjectionDocumentSession.cs index 4eb5d15118..cb185d9ca4 100644 --- a/src/Marten/Events/Daemon/Internals/ProjectionDocumentSession.cs +++ b/src/Marten/Events/Daemon/Internals/ProjectionDocumentSession.cs @@ -18,7 +18,7 @@ internal class ProjectionDocumentSession: DocumentSessionBase public ProjectionDocumentSession(DocumentStore store, ISessionWorkTracker workTracker, - SessionOptions sessionOptions, ShardExecutionMode mode): base(store, sessionOptions, new TransactionalConnection(sessionOptions), workTracker) + SessionOptions sessionOptions, ShardExecutionMode mode): base(store, sessionOptions, new AutoClosingLifetime(sessionOptions, store.Options), workTracker) { Mode = mode; } diff --git a/src/Marten/Internal/Sessions/QuerySession.Execution.cs b/src/Marten/Internal/Sessions/QuerySession.Execution.cs index 2b06dca52e..bc0a402bcb 100644 --- a/src/Marten/Internal/Sessions/QuerySession.Execution.cs +++ b/src/Marten/Internal/Sessions/QuerySession.Execution.cs @@ -52,25 +52,46 @@ public Task ExecuteReaderAsync(NpgsqlBatch batch, CancellationToke internal async Task LoadOneAsync(NpgsqlCommand command, ISelector selector, CancellationToken token) { await using var reader = await ExecuteReaderAsync(command, token).ConfigureAwait(false); - if (!await reader.ReadAsync(token).ConfigureAwait(false)) + try { - return default; - } + if (!await reader.ReadAsync(token).ConfigureAwait(false)) + { + return default; + } - return await selector.ResolveAsync(reader, token).ConfigureAwait(false); + return await selector.ResolveAsync(reader, token).ConfigureAwait(false); + } + finally + { + await reader.CloseAsync().ConfigureAwait(false); + } } internal async Task StreamOne(NpgsqlCommand command, Stream stream, CancellationToken token) { await using var reader = (NpgsqlDataReader)await ExecuteReaderAsync(command, token).ConfigureAwait(false); - return await reader.StreamOne(stream, token).ConfigureAwait(false) == 1; + try + { + return await reader.StreamOne(stream, token).ConfigureAwait(false) == 1; + } + finally + { + await reader.CloseAsync().ConfigureAwait(false); + } } internal async Task StreamMany(NpgsqlCommand command, Stream stream, CancellationToken token) { await using var reader = (NpgsqlDataReader)await ExecuteReaderAsync(command, token).ConfigureAwait(false); - return await reader.StreamMany(stream, token).ConfigureAwait(false); + try + { + return await reader.StreamMany(stream, token).ConfigureAwait(false); + } + finally + { + await reader.CloseAsync().ConfigureAwait(false); + } } public async Task ExecuteHandlerAsync(IQueryHandler handler, CancellationToken token) @@ -78,7 +99,14 @@ public async Task ExecuteHandlerAsync(IQueryHandler handler, Cancellati var cmd = this.BuildCommand(handler); await using var reader = await ExecuteReaderAsync(cmd, token).ConfigureAwait(false); - return await handler.HandleAsync(reader, this, token).ConfigureAwait(false); + try + { + return await handler.HandleAsync(reader, this, token).ConfigureAwait(false); + } + finally + { + await reader.CloseAsync().ConfigureAwait(false); + } } [Obsolete(QuerySession.SynchronousRemoval)] diff --git a/src/Marten/Internal/Storage/IdentityMapDocumentStorage.cs b/src/Marten/Internal/Storage/IdentityMapDocumentStorage.cs index 3fcb368b3b..fab357c509 100644 --- a/src/Marten/Internal/Storage/IdentityMapDocumentStorage.cs +++ b/src/Marten/Internal/Storage/IdentityMapDocumentStorage.cs @@ -138,7 +138,8 @@ public sealed override async Task> LoadManyAsync(TId[] ids, IMa var list = preselectLoadedDocuments(ids, session, out var command); var selector = (ISelector)BuildSelector(session); - await using (var reader = await session.ExecuteReaderAsync(command, token).ConfigureAwait(false)) + await using var reader = await session.ExecuteReaderAsync(command, token).ConfigureAwait(false); + try { while (await reader.ReadAsync(token).ConfigureAwait(false)) { @@ -146,6 +147,10 @@ public sealed override async Task> LoadManyAsync(TId[] ids, IMa list.Add(document); } } + finally + { + await reader.CloseAsync().ConfigureAwait(false); + } return list; } diff --git a/src/Marten/Internal/Storage/LightweightDocumentStorage.cs b/src/Marten/Internal/Storage/LightweightDocumentStorage.cs index 5f922fbebc..deaef8eec2 100644 --- a/src/Marten/Internal/Storage/LightweightDocumentStorage.cs +++ b/src/Marten/Internal/Storage/LightweightDocumentStorage.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; +using Marten.Events.Daemon.Internals; using Marten.Internal.CodeGeneration; using Marten.Linq.Selectors; using Marten.Schema; @@ -64,10 +65,17 @@ public sealed override async Task> LoadManyAsync(TId[] ids, IMa var selector = (ISelector)BuildSelector(session); await using var reader = await session.ExecuteReaderAsync(command, token).ConfigureAwait(false); - while (await reader.ReadAsync(token).ConfigureAwait(false)) + try { - var document = await selector.ResolveAsync(reader, token).ConfigureAwait(false); - list.Add(document); + while (await reader.ReadAsync(token).ConfigureAwait(false)) + { + var document = await selector.ResolveAsync(reader, token).ConfigureAwait(false); + list.Add(document); + } + } + finally + { + await reader.CloseAsync().ConfigureAwait(false); } return list; diff --git a/src/Marten/Internal/Storage/QueryOnlyDocumentStorage.cs b/src/Marten/Internal/Storage/QueryOnlyDocumentStorage.cs index 1d404e7fac..c0679f8979 100644 --- a/src/Marten/Internal/Storage/QueryOnlyDocumentStorage.cs +++ b/src/Marten/Internal/Storage/QueryOnlyDocumentStorage.cs @@ -49,7 +49,8 @@ public sealed override async Task> LoadManyAsync(TId[] ids, IMa var command = BuildLoadManyCommand(ids, session.TenantId); var selector = (ISelector)BuildSelector(session); - await using (var reader = await session.ExecuteReaderAsync(command, token).ConfigureAwait(false)) + await using var reader = await session.ExecuteReaderAsync(command, token).ConfigureAwait(false); + try { while (await reader.ReadAsync(token).ConfigureAwait(false)) { @@ -57,6 +58,10 @@ public sealed override async Task> LoadManyAsync(TId[] ids, IMa list.Add(document); } } + finally + { + await reader.CloseAsync().ConfigureAwait(false); + } return list; } diff --git a/src/Marten/Services/Diagnostics.cs b/src/Marten/Services/Diagnostics.cs index 49f5572b9f..48f53c22e4 100644 --- a/src/Marten/Services/Diagnostics.cs +++ b/src/Marten/Services/Diagnostics.cs @@ -56,7 +56,14 @@ public NpgsqlCommand PreviewCommand(ICompiledQuery await using var conn = _store.Tenancy.Default.Database.CreateConnection(); await conn.OpenAsync(token).ConfigureAwait(false); - return await conn.ExplainQueryAsync(_store.Serializer, cmd, token: token).ConfigureAwait(false); + try + { + return await conn.ExplainQueryAsync(_store.Serializer, cmd, token: token).ConfigureAwait(false); + } + finally + { + await conn.CloseAsync().ConfigureAwait(false); + } } /// diff --git a/src/Marten/Storage/MartenDatabase.EventStorage.cs b/src/Marten/Storage/MartenDatabase.EventStorage.cs index 7d37dc7a94..a10ad3d0ff 100644 --- a/src/Marten/Storage/MartenDatabase.EventStorage.cs +++ b/src/Marten/Storage/MartenDatabase.EventStorage.cs @@ -58,11 +58,18 @@ public async Task FetchHighestEventSequenceNumber(CancellationToken token await EnsureStorageExistsAsync(typeof(IEvent), token).ConfigureAwait(false); await using var conn = CreateConnection(); await conn.OpenAsync(token).ConfigureAwait(false); - var highest = (long)await conn - .CreateCommand($"select last_value from {Options.Events.DatabaseSchemaName}.mt_events_sequence;") - .ExecuteScalarAsync(token).ConfigureAwait(false)!; + try + { + var highest = (long)await conn + .CreateCommand($"select last_value from {Options.Events.DatabaseSchemaName}.mt_events_sequence;") + .ExecuteScalarAsync(token).ConfigureAwait(false)!; - return highest; + return highest; + } + finally + { + await conn.CloseAsync().ConfigureAwait(false); + } } @@ -93,28 +100,35 @@ public async Task FetchEventStoreStatistics( await conn.OpenAsync(token).ConfigureAwait(false); - await using var reader = await conn.CreateCommand(sql).ExecuteReaderAsync(token).ConfigureAwait(false); - - if (await reader.ReadAsync(token).ConfigureAwait(false)) + try { - statistics.EventCount = await reader.GetFieldValueAsync(0, token).ConfigureAwait(false); - } + await using var reader = await conn.CreateCommand(sql).ExecuteReaderAsync(token).ConfigureAwait(false); - await reader.NextResultAsync(token).ConfigureAwait(false); + if (await reader.ReadAsync(token).ConfigureAwait(false)) + { + statistics.EventCount = await reader.GetFieldValueAsync(0, token).ConfigureAwait(false); + } - if (await reader.ReadAsync(token).ConfigureAwait(false)) - { - statistics.StreamCount = await reader.GetFieldValueAsync(0, token).ConfigureAwait(false); - } + await reader.NextResultAsync(token).ConfigureAwait(false); + + if (await reader.ReadAsync(token).ConfigureAwait(false)) + { + statistics.StreamCount = await reader.GetFieldValueAsync(0, token).ConfigureAwait(false); + } + + await reader.NextResultAsync(token).ConfigureAwait(false); - await reader.NextResultAsync(token).ConfigureAwait(false); + if (await reader.ReadAsync(token).ConfigureAwait(false)) + { + statistics.EventSequenceNumber = await reader.GetFieldValueAsync(0, token).ConfigureAwait(false); + } - if (await reader.ReadAsync(token).ConfigureAwait(false)) + return statistics; + } + finally { - statistics.EventSequenceNumber = await reader.GetFieldValueAsync(0, token).ConfigureAwait(false); + await conn.CloseAsync().ConfigureAwait(false); } - - return statistics; } @@ -137,13 +151,20 @@ public async Task> AllProjectionProgress( new ShardStateSelector(Options.EventGraph)); await using var conn = CreateConnection(); - await conn.OpenAsync(token).ConfigureAwait(false); + try + { + await conn.OpenAsync(token).ConfigureAwait(false); - var builder = new CommandBuilder(); - handler.ConfigureCommand(builder, null); + var builder = new CommandBuilder(); + handler.ConfigureCommand(builder, null); - await using var reader = await conn.ExecuteReaderAsync(builder, token).ConfigureAwait(false); - return await handler.HandleAsync(reader, null, token).ConfigureAwait(false); + await using var reader = await conn.ExecuteReaderAsync(builder, token).ConfigureAwait(false); + return await handler.HandleAsync(reader, null, token).ConfigureAwait(false); + } + finally + { + await conn.CloseAsync().ConfigureAwait(false); + } } public async Task> FetchProjectionProgressFor(ShardName[] names, CancellationToken token = default) @@ -155,13 +176,20 @@ public async Task> FetchProjectionProgressFor(ShardNam new ShardStateSelector(Options.EventGraph)); await using var conn = CreateConnection(); - await conn.OpenAsync(token).ConfigureAwait(false); + try + { + await conn.OpenAsync(token).ConfigureAwait(false); - var builder = new CommandBuilder(); - handler.ConfigureCommand(builder, null); + var builder = new CommandBuilder(); + handler.ConfigureCommand(builder, null); - await using var reader = await conn.ExecuteReaderAsync(builder, token).ConfigureAwait(false); - return await handler.HandleAsync(reader, null, token).ConfigureAwait(false); + await using var reader = await conn.ExecuteReaderAsync(builder, token).ConfigureAwait(false); + return await handler.HandleAsync(reader, null, token).ConfigureAwait(false); + } + finally + { + await conn.CloseAsync().ConfigureAwait(false); + } } Task IEventDatabase.WaitForNonStaleProjectionDataAsync(TimeSpan timeout) @@ -191,13 +219,20 @@ public async Task ProjectionProgressFor(ShardName name, await using var conn = CreateConnection(); await conn.OpenAsync(token).ConfigureAwait(false); - var builder = new CommandBuilder(); - handler.ConfigureCommand(builder, null); + try + { + var builder = new CommandBuilder(); + handler.ConfigureCommand(builder, null); - await using var reader = await conn.ExecuteReaderAsync(builder, token).ConfigureAwait(false); - var state = await handler.HandleAsync(reader, null, token).ConfigureAwait(false); + await using var reader = await conn.ExecuteReaderAsync(builder, token).ConfigureAwait(false); + var state = await handler.HandleAsync(reader, null, token).ConfigureAwait(false); - return state?.Sequence ?? 0; + return state?.Sequence ?? 0; + } + finally + { + await conn.CloseAsync().ConfigureAwait(false); + } } public Uri DatabaseUri => Describe().DatabaseUri(); diff --git a/src/Marten/Storage/MartenDatabase.Execution.cs b/src/Marten/Storage/MartenDatabase.Execution.cs index a228308260..0ff18bb654 100644 --- a/src/Marten/Storage/MartenDatabase.Execution.cs +++ b/src/Marten/Storage/MartenDatabase.Execution.cs @@ -25,6 +25,10 @@ public async Task ExecuteAsync(CancellationToken cancellation) Console.WriteLine(e); throw; } + finally + { + await conn.CloseAsync().ConfigureAwait(false); + } await conn.OpenAsync(cancellation).ConfigureAwait(false); await using var reader = await command.ExecuteReaderAsync(cancellation).ConfigureAwait(false); @@ -33,16 +37,11 @@ public async Task ExecuteAsync(CancellationToken cancellation) { return await Handler.HandleAsync(reader, cancellation).ConfigureAwait(false); } - catch (InvalidOperationException e) - { - throw; - } finally { try { await reader.CloseAsync().ConfigureAwait(false); - await reader.DisposeAsync().ConfigureAwait(false); await conn.CloseAsync().ConfigureAwait(false); } catch (Exception) @@ -64,9 +63,16 @@ public async Task SingleCommit(DbCommand command, CancellationToken cancellation await using var conn = CreateConnection(); await conn.OpenAsync(cancellation).ConfigureAwait(false); - command.Connection = conn; + try + { + command.Connection = conn; - await command.ExecuteNonQueryAsync(cancellation).ConfigureAwait(false); + await command.ExecuteNonQueryAsync(cancellation).ConfigureAwait(false); + } + finally + { + await conn.CloseAsync().ConfigureAwait(false); + } } public NpgsqlConnection CreateConnection(ConnectionUsage connectionUsage = ConnectionUsage.ReadWrite)