From de99207eb1d060a108d9cfe6730a75382a2f206f Mon Sep 17 00:00:00 2001 From: steve-ziegler Date: Fri, 29 May 2026 22:22:56 -0400 Subject: [PATCH] fix(batched-query): enlist FetchForExclusiveWriting item before awaiting BeginTransactionAsync MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `BatchedQuery.FetchForExclusiveWriting` (both Guid and string-key overloads) had `await Parent.BeginTransactionAsync(...)` as its first statement, before calling `AddItem(handler)` to enlist the query item. Under concurrency, `BeginTransactionAsync` does not complete synchronously — `AutoClosingLifetime.StartAsync` performs a real `NpgsqlConnection.OpenAsync` socket round-trip — so the method yielded before `AddItem` ran. Wolverine HTTP's `[Aggregate(LoadStyle = Exclusive)]` codegen calls the method without immediately awaiting: var task = batch.Events.FetchForExclusiveWriting(id); await batch.Execute(ct); var stream = await task; Under the race, `Execute` ran with `_items.Count == 0`, returned immediately, and `task.Result` was never going to be populated (`item.Result` is set only by `Execute`). The `await` on `task` wedged forever. The non-exclusive `FetchForWriting` overloads in this same file don't have the bug because they're synchronous — `AddItem` always runs before the caller gets the Task back. Fix: enlist via `AddItem` synchronously before any `await`, so the item is in `_items` by the time control returns to the caller. Any subsequent `Execute()` is then guaranteed to see and process it. A 20-parallel-loop reproducer against a Wolverine HTTP `[Aggregate(LoadStyle = Exclusive)]` endpoint hung past 60s before this fix; passes in 5s with the fix. Full reproducer + diagnostic trace in the linked issue. --- .../BatchQuerying/BatchedQuery.Events.cs | 27 ++++++++++++++----- 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/src/Marten/Services/BatchQuerying/BatchedQuery.Events.cs b/src/Marten/Services/BatchQuerying/BatchedQuery.Events.cs index e69cf9005d..190dad56b7 100644 --- a/src/Marten/Services/BatchQuerying/BatchedQuery.Events.cs +++ b/src/Marten/Services/BatchQuerying/BatchedQuery.Events.cs @@ -132,8 +132,18 @@ public Task> FetchForWriting(string key, long expectedVersion public async Task> FetchForExclusiveWriting(Guid id) where T : class { - await Parent.BeginTransactionAsync(CancellationToken.None).ConfigureAwait(false); - + // Enlist synchronously BEFORE the first await so the item is in _items + // by the time control returns to the caller. A subsequent Execute() is + // then guaranteed to see and process the item. + // + // Previously, `await Parent.BeginTransactionAsync(...)` ran first. Under + // concurrency BeginTransactionAsync does not complete synchronously + // (AutoClosingLifetime.StartAsync performs a real socket round-trip in + // NpgsqlConnection.OpenAsync), so the method yielded before AddItem ran. + // The codegen pattern `var t = batch.Events.FetchForExclusiveWriting(id); + // await batch.Execute(ct); var s = await t;` then called Execute with an + // empty _items list, returned immediately, and the item.Result was never + // populated — causing the awaiter on `t` to wedge forever. _documentTypes.Add(typeof(IEvent)); var plan = Parent.Events.As().FindFetchPlan(); if (plan.Lifecycle != ProjectionLifecycle.Live) @@ -141,14 +151,16 @@ public async Task> FetchForExclusiveWriting(Guid id) where T _documentTypes.Add(typeof(T)); } var handler = plan.BuildQueryHandler(Parent, id, true); + var resultTask = AddItem(handler); - return await AddItem(handler).ConfigureAwait(false); + await Parent.BeginTransactionAsync(CancellationToken.None).ConfigureAwait(false); + return await resultTask.ConfigureAwait(false); } public async Task> FetchForExclusiveWriting(string key) where T : class { - await Parent.BeginTransactionAsync(CancellationToken.None).ConfigureAwait(false); - + // See the Guid overload above for the explanation — enlist synchronously + // before the first await to avoid the async-vs-sync-enlistment race. _documentTypes.Add(typeof(IEvent)); var plan = Parent.Events.As().FindFetchPlan(); if (plan.Lifecycle != ProjectionLifecycle.Live) @@ -156,7 +168,10 @@ public async Task> FetchForExclusiveWriting(string key) where _documentTypes.Add(typeof(T)); } var handler = plan.BuildQueryHandler(Parent, key, true); - return await AddItem(handler).ConfigureAwait(false); + var resultTask = AddItem(handler); + + await Parent.BeginTransactionAsync(CancellationToken.None).ConfigureAwait(false); + return await resultTask.ConfigureAwait(false); } public Task FetchLatest(Guid id) where T : class