Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions src/Marten/Services/BatchQuerying/BatchedQuery.Events.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,31 +132,46 @@ public Task<IEventStream<T>> FetchForWriting<T>(string key, long expectedVersion

public async Task<IEventStream<T>> FetchForExclusiveWriting<T>(Guid id) where T : class
{
await Parent.BeginTransactionAsync(CancellationToken.None).ConfigureAwait(false);

// Enlist synchronously BEFORE the first await so the item is in _items
// by the time control returns to the caller. A subsequent Execute() is
// then guaranteed to see and process the item.
//
// Previously, `await Parent.BeginTransactionAsync(...)` ran first. Under
// concurrency BeginTransactionAsync does not complete synchronously
// (AutoClosingLifetime.StartAsync performs a real socket round-trip in
// NpgsqlConnection.OpenAsync), so the method yielded before AddItem ran.
// The codegen pattern `var t = batch.Events.FetchForExclusiveWriting(id);
// await batch.Execute(ct); var s = await t;` then called Execute with an
// empty _items list, returned immediately, and the item.Result was never
// populated — causing the awaiter on `t` to wedge forever.
_documentTypes.Add(typeof(IEvent));
var plan = Parent.Events.As<EventStore>().FindFetchPlan<T, Guid>();
if (plan.Lifecycle != ProjectionLifecycle.Live)
{
_documentTypes.Add(typeof(T));
}
var handler = plan.BuildQueryHandler(Parent, id, true);
var resultTask = AddItem(handler);

return await AddItem(handler).ConfigureAwait(false);
await Parent.BeginTransactionAsync(CancellationToken.None).ConfigureAwait(false);
return await resultTask.ConfigureAwait(false);
}

public async Task<IEventStream<T>> FetchForExclusiveWriting<T>(string key) where T : class
{
await Parent.BeginTransactionAsync(CancellationToken.None).ConfigureAwait(false);

// See the Guid overload above for the explanation — enlist synchronously
// before the first await to avoid the async-vs-sync-enlistment race.
_documentTypes.Add(typeof(IEvent));
var plan = Parent.Events.As<EventStore>().FindFetchPlan<T, string>();
if (plan.Lifecycle != ProjectionLifecycle.Live)
{
_documentTypes.Add(typeof(T));
}
var handler = plan.BuildQueryHandler(Parent, key, true);
return await AddItem(handler).ConfigureAwait(false);
var resultTask = AddItem(handler);

await Parent.BeginTransactionAsync(CancellationToken.None).ConfigureAwait(false);
return await resultTask.ConfigureAwait(false);
}

public Task<T?> FetchLatest<T>(Guid id) where T : class
Expand Down
Loading