Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/CommandLineRunner/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static IHostBuilder CreateHostBuilder(string[] args)
{
opts.Connection(ConnectionSource.ConnectionString);
opts.RegisterDocumentType<Target>();
opts.GeneratedCodeMode = TypeLoadMode.Static;
opts.GeneratedCodeMode = TypeLoadMode.Dynamic;

// If you use compiled queries, you will need to register the
// compiled query types with Marten ahead of time
Expand All @@ -52,7 +52,7 @@ public static IHostBuilder CreateHostBuilder(string[] args)

services.AddMarten(opts =>
{
opts.GeneratedCodeMode = TypeLoadMode.Static;
opts.GeneratedCodeMode = TypeLoadMode.Dynamic;
opts.AutoCreateSchemaObjects = AutoCreate.All;
opts.DatabaseSchemaName = "cli";
opts.DisableNpgsqlLogging = true;
Expand Down
6 changes: 5 additions & 1 deletion src/Marten/DocumentStore.EventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,12 @@ public async ValueTask<IProjectionBatch<IDocumentOperations, IQuerySession>> Sta
{
sessionOptions.Tracking = DocumentTracking.IdentityOnly;
}
else
{
sessionOptions.Tracking = DocumentTracking.None;
}

var session = (DocumentSessionBase)IdentitySession(sessionOptions);
var session = (DocumentSessionBase)OpenSession(sessionOptions);
var batch = new ProjectionUpdateBatch(Options.Projections, session, ShardExecutionMode.Rebuild, token)
{
ShouldApplyListeners = mode == ShardExecutionMode.Continuous && range.Events.Any()
Expand Down
73 changes: 40 additions & 33 deletions src/Marten/Events/Daemon/Internals/EventLoader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,50 +84,57 @@ public async Task<EventPage> LoadAsync(EventRequest request,
var runtime = request.Runtime;

await using var reader = await session.ExecuteReaderAsync(_command, token).ConfigureAwait(false);
while (await reader.ReadAsync(token).ConfigureAwait(false))
try
{
try
while (await reader.ReadAsync(token).ConfigureAwait(false))
{
// as a decorator
var @event = await _storage.ResolveAsync(reader, token).ConfigureAwait(false);

if (!await reader.IsDBNullAsync(_aggregateIndex, token).ConfigureAwait(false))
try
{
@event.AggregateTypeName =
await reader.GetFieldValueAsync<string>(_aggregateIndex, token).ConfigureAwait(false);
}
// as a decorator
var @event = await _storage.ResolveAsync(reader, token).ConfigureAwait(false);

page.Add(@event);
}
catch (UnknownEventTypeException e)
{
if (request.ErrorOptions.SkipUnknownEvents)
{
runtime.Logger.EventUnknown(e.EventTypeName);
skippedEvents++;
}
else
{
// Let any other exception throw
throw;
if (!await reader.IsDBNullAsync(_aggregateIndex, token).ConfigureAwait(false))
{
@event.AggregateTypeName =
await reader.GetFieldValueAsync<string>(_aggregateIndex, token).ConfigureAwait(false);
}

page.Add(@event);
}
}
catch (EventDeserializationFailureException e)
{
if (request.ErrorOptions.SkipSerializationErrors)
catch (UnknownEventTypeException e)
{
runtime.Logger.EventDeserializationException(e.InnerException!.GetType().Name!, e.Sequence);
runtime.Logger.EventDeserializationExceptionDebug(e);
await runtime.RecordDeadLetterEventAsync(e.ToDeadLetterEvent(request.Name)).ConfigureAwait(false);
skippedEvents++;
if (request.ErrorOptions.SkipUnknownEvents)
{
runtime.Logger.EventUnknown(e.EventTypeName);
skippedEvents++;
}
else
{
// Let any other exception throw
throw;
}
}
else
catch (EventDeserializationFailureException e)
{
// Let any other exception throw
throw;
if (request.ErrorOptions.SkipSerializationErrors)
{
runtime.Logger.EventDeserializationException(e.InnerException!.GetType().Name!, e.Sequence);
runtime.Logger.EventDeserializationExceptionDebug(e);
await runtime.RecordDeadLetterEventAsync(e.ToDeadLetterEvent(request.Name)).ConfigureAwait(false);
skippedEvents++;
}
else
{
// Let any other exception throw
throw;
}
}
}
}
finally
{
await reader.CloseAsync().ConfigureAwait(false);
}

page.CalculateCeiling(_batchSize, request.HighWater, skippedEvents);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ internal class ProjectionDocumentSession: DocumentSessionBase

public ProjectionDocumentSession(DocumentStore store,
ISessionWorkTracker workTracker,
SessionOptions sessionOptions, ShardExecutionMode mode): base(store, sessionOptions, new TransactionalConnection(sessionOptions), workTracker)
SessionOptions sessionOptions, ShardExecutionMode mode): base(store, sessionOptions, new AutoClosingLifetime(sessionOptions, store.Options), workTracker)
{
Mode = mode;
}
Expand Down
42 changes: 35 additions & 7 deletions src/Marten/Internal/Sessions/QuerySession.Execution.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,33 +52,61 @@ public Task<DbDataReader> ExecuteReaderAsync(NpgsqlBatch batch, CancellationToke
internal async Task<T?> LoadOneAsync<T>(NpgsqlCommand command, ISelector<T> selector, CancellationToken token)
{
await using var reader = await ExecuteReaderAsync(command, token).ConfigureAwait(false);
if (!await reader.ReadAsync(token).ConfigureAwait(false))
try
{
return default;
}
if (!await reader.ReadAsync(token).ConfigureAwait(false))
{
return default;
}

return await selector.ResolveAsync(reader, token).ConfigureAwait(false);
return await selector.ResolveAsync(reader, token).ConfigureAwait(false);
}
finally
{
await reader.CloseAsync().ConfigureAwait(false);
}
}

internal async Task<bool> StreamOne(NpgsqlCommand command, Stream stream, CancellationToken token)
{
await using var reader = (NpgsqlDataReader)await ExecuteReaderAsync(command, token).ConfigureAwait(false);
return await reader.StreamOne(stream, token).ConfigureAwait(false) == 1;
try
{
return await reader.StreamOne(stream, token).ConfigureAwait(false) == 1;
}
finally
{
await reader.CloseAsync().ConfigureAwait(false);
}
}

internal async Task<int> StreamMany(NpgsqlCommand command, Stream stream, CancellationToken token)
{
await using var reader = (NpgsqlDataReader)await ExecuteReaderAsync(command, token).ConfigureAwait(false);

return await reader.StreamMany(stream, token).ConfigureAwait(false);
try
{
return await reader.StreamMany(stream, token).ConfigureAwait(false);
}
finally
{
await reader.CloseAsync().ConfigureAwait(false);
}
}

public async Task<T> ExecuteHandlerAsync<T>(IQueryHandler<T> handler, CancellationToken token)
{
var cmd = this.BuildCommand(handler);

await using var reader = await ExecuteReaderAsync(cmd, token).ConfigureAwait(false);
return await handler.HandleAsync(reader, this, token).ConfigureAwait(false);
try
{
return await handler.HandleAsync(reader, this, token).ConfigureAwait(false);
}
finally
{
await reader.CloseAsync().ConfigureAwait(false);
}
}

[Obsolete(QuerySession.SynchronousRemoval)]
Expand Down
7 changes: 6 additions & 1 deletion src/Marten/Internal/Storage/IdentityMapDocumentStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,19 @@ public sealed override async Task<IReadOnlyList<T>> LoadManyAsync(TId[] ids, IMa
var list = preselectLoadedDocuments(ids, session, out var command);
var selector = (ISelector<T>)BuildSelector(session);

await using (var reader = await session.ExecuteReaderAsync(command, token).ConfigureAwait(false))
await using var reader = await session.ExecuteReaderAsync(command, token).ConfigureAwait(false);
try
{
while (await reader.ReadAsync(token).ConfigureAwait(false))
{
var document = await selector.ResolveAsync(reader, token).ConfigureAwait(false);
list.Add(document);
}
}
finally
{
await reader.CloseAsync().ConfigureAwait(false);
}

return list;
}
Expand Down
14 changes: 11 additions & 3 deletions src/Marten/Internal/Storage/LightweightDocumentStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Marten.Events.Daemon.Internals;
using Marten.Internal.CodeGeneration;
using Marten.Linq.Selectors;
using Marten.Schema;
Expand Down Expand Up @@ -64,10 +65,17 @@ public sealed override async Task<IReadOnlyList<T>> LoadManyAsync(TId[] ids, IMa
var selector = (ISelector<T>)BuildSelector(session);

await using var reader = await session.ExecuteReaderAsync(command, token).ConfigureAwait(false);
while (await reader.ReadAsync(token).ConfigureAwait(false))
try
{
var document = await selector.ResolveAsync(reader, token).ConfigureAwait(false);
list.Add(document);
while (await reader.ReadAsync(token).ConfigureAwait(false))
{
var document = await selector.ResolveAsync(reader, token).ConfigureAwait(false);
list.Add(document);
}
}
finally
{
await reader.CloseAsync().ConfigureAwait(false);
}

return list;
Expand Down
7 changes: 6 additions & 1 deletion src/Marten/Internal/Storage/QueryOnlyDocumentStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,19 @@ public sealed override async Task<IReadOnlyList<T>> LoadManyAsync(TId[] ids, IMa
var command = BuildLoadManyCommand(ids, session.TenantId);
var selector = (ISelector<T>)BuildSelector(session);

await using (var reader = await session.ExecuteReaderAsync(command, token).ConfigureAwait(false))
await using var reader = await session.ExecuteReaderAsync(command, token).ConfigureAwait(false);
try
{
while (await reader.ReadAsync(token).ConfigureAwait(false))
{
var document = await selector.ResolveAsync(reader, token).ConfigureAwait(false);
list.Add(document);
}
}
finally
{
await reader.CloseAsync().ConfigureAwait(false);
}

return list;
}
Expand Down
9 changes: 8 additions & 1 deletion src/Marten/Services/Diagnostics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,14 @@ public NpgsqlCommand PreviewCommand<TDoc, TReturn>(ICompiledQuery<TDoc, TReturn>

await using var conn = _store.Tenancy.Default.Database.CreateConnection();
await conn.OpenAsync(token).ConfigureAwait(false);
return await conn.ExplainQueryAsync(_store.Serializer, cmd, token: token).ConfigureAwait(false);
try
{
return await conn.ExplainQueryAsync(_store.Serializer, cmd, token: token).ConfigureAwait(false);
}
finally
{
await conn.CloseAsync().ConfigureAwait(false);
}
}

/// <summary>
Expand Down
Loading
Loading