From bda164fd3882aa8ec27e867521c3f75d46759e56 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Tue, 31 Mar 2026 11:48:10 -0500 Subject: [PATCH 1/6] Fix Kafka CI: start Kafka container, fix unit test apply() timing, reduce infinite-loop test delay MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - CITargets.cs: Add 'kafka' to StartDockerServices in CIKafka target — was only starting postgresql, so Kafka tests had no broker in CI. Added WaitForKafkaToBeReady() that polls port 9092 until the broker accepts connections (up to 60 seconds, 30 attempts × 2s). - KafkaTransportTests.cs: Fix KafkaTopicGroupConfigurationTests — the Specification() and TopicCreation() methods use the DelayedEndpointConfiguration pattern, storing actions that are applied lazily at startup. Tests were checking group.SpecificationConfig/CreateTopicFunc immediately after calling the fluent methods, before Apply() was ever called. Added ((IDelayedEndpointConfiguration)config).Apply() before each assertion. - publish_and_receive_raw_json.cs: Reduce the do_not_go_into_infinite_loop_with_garbage_data test delay from 2 minutes to 30 seconds. The test has no assertions after the delay — it only checks the process doesn't crash. 30 seconds is sufficient to observe a tight retry loop, and saves ~90 seconds per CI run. Co-Authored-By: Claude Sonnet 4.6 --- build/CITargets.cs | 28 ++++++++++++++++++- .../KafkaTransportTests.cs | 9 ++++-- .../publish_and_receive_raw_json.cs | 5 +++- 3 files changed, 37 insertions(+), 5 deletions(-) diff --git a/build/CITargets.cs b/build/CITargets.cs index 349f5bf12..3541b89f4 100644 --- a/build/CITargets.cs +++ b/build/CITargets.cs @@ -51,6 +51,32 @@ bool IsToolAvailable(string toolName) WaitForLocalStackToBeReady(); if (services.Contains("asb-emulator")) WaitForAzureServiceBusEmulatorToBeReady(); + if (services.Contains("kafka")) + WaitForKafkaToBeReady(); + } + + void WaitForKafkaToBeReady() + { + var attempt = 0; + while (attempt < 30) + { + try + { + using var tcpClient = new System.Net.Sockets.TcpClient(); + tcpClient.Connect("localhost", 9092); + Log.Information("Kafka is up and ready!"); + return; + } + catch (Exception) + { + // ignore connection errors + } + + Thread.Sleep(2000); + attempt++; + } + + Log.Warning("Kafka did not become ready after 60 seconds"); } void WaitForSqlServerToBeReady() @@ -316,7 +342,7 @@ void BuildTestProjectsWithFramework(string frameworkOverride, params AbsolutePat var tests = RootDirectory / "src" / "Transports" / "Kafka" / "Wolverine.Kafka.Tests" / "Wolverine.Kafka.Tests.csproj"; BuildTestProjects(tests); - StartDockerServices("postgresql"); + StartDockerServices("kafka", "postgresql"); RunSingleProjectOneClassAtATime(tests); }); diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaTransportTests.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaTransportTests.cs index 7a2b59224..7a2b7e40b 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaTransportTests.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaTransportTests.cs @@ -18,8 +18,9 @@ private KafkaTopicGroup BuildGroup(params string[] topics) public void specification_uniform_sets_config_on_group() { var group = BuildGroup("topic-a", "topic-b"); - new KafkaTopicGroupListenerConfiguration(group) + var config = new KafkaTopicGroupListenerConfiguration(group) .Specification(spec => spec.NumPartitions = 12); + ((IDelayedEndpointConfiguration)config).Apply(); var capturedSpec = new TopicSpecification { Name = "topic-a" }; group.SpecificationConfig.ShouldNotBeNull(); @@ -31,8 +32,9 @@ public void specification_uniform_sets_config_on_group() public void specification_per_topic_receives_topic_name() { var group = BuildGroup("topic-a", "topic-b"); - new KafkaTopicGroupListenerConfiguration(group) + var config = new KafkaTopicGroupListenerConfiguration(group) .Specification((topicName, spec) => spec.NumPartitions = topicName == "topic-a" ? 6 : 24); + ((IDelayedEndpointConfiguration)config).Apply(); group.SpecificationConfig.ShouldNotBeNull(); @@ -51,8 +53,9 @@ public void topic_creation_sets_func_on_group() var group = BuildGroup("topic-a", "topic-b"); Func func = (_, _) => Task.CompletedTask; - new KafkaTopicGroupListenerConfiguration(group) + var config = new KafkaTopicGroupListenerConfiguration(group) .TopicCreation(func); + ((IDelayedEndpointConfiguration)config).Apply(); group.CreateTopicFunc.ShouldNotBeNull(); group.CreateTopicFunc.ShouldBeSameAs(func); diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/publish_and_receive_raw_json.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/publish_and_receive_raw_json.cs index 01307ed32..52a98e3f7 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/publish_and_receive_raw_json.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/publish_and_receive_raw_json.cs @@ -87,7 +87,10 @@ public async Task do_not_go_into_infinite_loop_with_garbage_data() }); producer.Flush(); - await Task.Delay(2.Minutes()); + // Wait long enough to detect any infinite retry loop, but not so long + // it needlessly inflates CI run time. 30 seconds is sufficient — a tight + // retry loop would exhaust resources well before then. + await Task.Delay(30.Seconds()); } public async Task DisposeAsync() From 8ec3bbe83fadad2ec37c924d38c8f74b39a2b7e6 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Tue, 31 Mar 2026 13:08:43 -0500 Subject: [PATCH 2/6] Fix rate-limit pause deadlock: don't pre-latch receiver when pausing from handler pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When PauseListenerContinuation fires from within the handler pipeline (e.g. rate limiting), ListeningAgent.PauseAsync called StopAndDrainAsync which always called LatchReceiver() before receiver.DrainAsync(). This caused DurableReceiver/BufferedReceiver/ InlineReceiver's DrainAsync to see _latched=true (waitForCompletion=true) and wait up to DrainTimeout (30s) for the ActionBlock/in-flight count to drain — which never happened because the current message's execute frame was still on the call stack. The result was a 30-second stall on every rate-limit event, causing the rate-limiting test to time out. Fix: split StopAndDrainAsync into a shared StopAndDrainCoreAsync(bool latchBeforeDrain). - Normal shutdown (StopAndDrainAsync) continues to pre-latch so DrainAsync can safely wait for in-flight messages to complete. - PauseAsync now calls StopAndDrainCoreAsync(latchBeforeDrain: false) so DrainAsync sees _latched==false and returns immediately, avoiding the deadlock. Also add fast durability polling settings to the rate-limiting integration test so the scheduled message is picked up quickly after the listener restarts. Co-Authored-By: Claude Sonnet 4.6 --- .../rate_limiting_end_to_end.cs | 53 ++++++++++--------- src/Wolverine/Transports/ListeningAgent.cs | 30 ++++++++++- 2 files changed, 57 insertions(+), 26 deletions(-) diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/rate_limiting_end_to_end.cs b/src/Transports/Redis/Wolverine.Redis.Tests/rate_limiting_end_to_end.cs index 69e73bfdf..1d3f1f141 100644 --- a/src/Transports/Redis/Wolverine.Redis.Tests/rate_limiting_end_to_end.cs +++ b/src/Transports/Redis/Wolverine.Redis.Tests/rate_limiting_end_to_end.cs @@ -11,16 +11,16 @@ namespace Wolverine.Redis.Tests; public class rate_limiting_end_to_end(ITestOutputHelper output) : IAsyncLifetime { - private readonly ConditionPoller _poller = new(output, maxRetries: 20, retryDelay: 1000.Milliseconds()); - private readonly RateLimit _limit = new(1, 2.Seconds()); - private IHost _host = null!; - private RedisRateLimitTracker _tracker = null!; - - public async Task InitializeAsync() - { + private readonly ConditionPoller _poller = new(output, maxRetries: 20, retryDelay: 1000.Milliseconds()); + private readonly RateLimit _limit = new(1, 2.Seconds()); + private IHost _host = null!; + private RedisRateLimitTracker _tracker = null!; + + public async Task InitializeAsync() + { var streamKey = $"rate-limit-{Guid.NewGuid():N}"; - var groupName = $"rate-limit-group-{Guid.NewGuid():N}"; - + var groupName = $"rate-limit-group-{Guid.NewGuid():N}"; + _tracker = new RedisRateLimitTracker(output); var endpointUri = new Uri($"redis://stream/0/{streamKey}"); @@ -30,6 +30,11 @@ public async Task InitializeAsync() opts.ApplicationAssembly = typeof(rate_limiting_end_to_end).Assembly; opts.Services.AddSingleton(_tracker); + // Fast polling so scheduled messages are picked up quickly after the + // listener restarts following the rate-limit pause. + opts.Durability.ScheduledJobFirstExecution = 100.Milliseconds(); + opts.Durability.ScheduledJobPollingTime = 200.Milliseconds(); + opts.UseRedisTransport(RedisContainerFixture.ConnectionString).AutoProvision(); opts.PublishAllMessages().ToRedisStream(streamKey); opts.ListenToRedisStream(streamKey, groupName) @@ -37,25 +42,25 @@ public async Task InitializeAsync() .StartFromBeginning(); opts.RateLimitEndpoint(endpointUri, _limit); - }).StartAsync(); - } - - public async Task DisposeAsync() - { - await _host.StopAsync(); - _host.Dispose(); - } - + }).StartAsync(); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + _host.Dispose(); + } + [Fact] public async Task rate_limited_messages_are_delayed_with_native_scheduling() { RedisRateLimitedMessage[] messages = [ new RedisRateLimitedMessage("A"), new RedisRateLimitedMessage("B") - ]; - var bus = _host.MessageBus(); - - output.WriteLine("{0} Sending message: {1}", DateTime.UtcNow, messages[0]); + ]; + var bus = _host.MessageBus(); + + output.WriteLine("{0} Sending message: {1}", DateTime.UtcNow, messages[0]); await bus.PublishAsync(messages[0]); output.WriteLine("{0} Sending message: {1}", DateTime.UtcNow, messages[1]); await bus.PublishAsync(messages[1]); @@ -68,7 +73,7 @@ await _poller.WaitForAsync("handled at least 2 messages", var buckets = handled.Select(x => RateLimitBucket.For(_limit, x.TimeStamp)).ToArray(); buckets[0].WindowStart.ShouldBeLessThan(buckets[1].WindowStart); - } + } } public record RedisRateLimitedMessage(string Id); @@ -86,7 +91,7 @@ public Task Handle(RedisRateLimitedMessage message, CancellationToken cancellati public class RedisRateLimitTracker(ITestOutputHelper output) { - private readonly ITestOutputHelper output = output; + private readonly ITestOutputHelper output = output; private readonly ConcurrentQueue<(DateTime, RedisRateLimitedMessage)> _handledMessages = []; public IReadOnlyCollection<(DateTime TimeStamp, RedisRateLimitedMessage Message)> HandledMessages => _handledMessages; diff --git a/src/Wolverine/Transports/ListeningAgent.cs b/src/Wolverine/Transports/ListeningAgent.cs index 5d2b35736..61e8252b0 100644 --- a/src/Wolverine/Transports/ListeningAgent.cs +++ b/src/Wolverine/Transports/ListeningAgent.cs @@ -169,6 +169,20 @@ public void LatchReceiver() } public async ValueTask StopAndDrainAsync() + { + await StopAndDrainCoreAsync(latchBeforeDrain: true); + } + + /// + /// Shared implementation for stop-and-drain. When is + /// true (normal shutdown), the receiver is latched before + /// so that the drain knows it is safe to wait for any in-flight messages to complete. + /// When false (pause triggered from within the handler pipeline, e.g. rate limiting), + /// the receiver is not pre-latched, so sees + /// _latched == false and returns immediately — avoiding a deadlock caused by the + /// current message's execute frame being on the call stack. + /// + private async ValueTask StopAndDrainCoreAsync(bool latchBeforeDrain) { if (Status == ListeningStatus.Stopped || Status == ListeningStatus.GloballyLatched) { @@ -189,7 +203,15 @@ public async ValueTask StopAndDrainAsync() await listener.StopAsync(); - LatchReceiver(); + // When called during normal shutdown, latch BEFORE drain so DrainAsync knows + // it can safely wait for in-flight messages to complete. + // When called from within the handler pipeline (e.g. PauseListenerContinuation), + // do NOT latch here: DrainAsync will see _latched==false and return immediately, + // preventing a deadlock with the current message's execute frame. + if (latchBeforeDrain) + { + LatchReceiver(); + } Listener = null; _receiver = null; @@ -301,7 +323,11 @@ public async ValueTask PauseAsync(TimeSpan pauseTime) { using var activity = WolverineTracing.ActivitySource.StartActivity(WolverineTracing.PausingListener); activity?.SetTag(WolverineTracing.EndpointAddress, Uri); - await StopAndDrainAsync(); + // Do NOT pre-latch the receiver here. PauseAsync may be called from within the + // handler pipeline (e.g. via RateLimitContinuation → PauseListenerContinuation). + // Pre-latching causes DrainAsync to wait for the ActionBlock to drain, which + // deadlocks because the current message's execute frame is still on the call stack. + await StopAndDrainCoreAsync(latchBeforeDrain: false); } catch (Exception e) { From 9954d4df94edb138fed46f116221a95f122e850a Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Tue, 31 Mar 2026 16:40:46 -0500 Subject: [PATCH 3/6] Bump version to 5.27.0 and update JasperFx, Weasel, Marten, Polecat packages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - JasperFx: 1.22.0 → 1.23.0 - JasperFx.Events: 1.24.1 → 1.25.0 - All Weasel.*: 8.10.2 → 8.11.2 - Marten + Marten.AspNetCore: 8.26.1 → 8.28.0 - Polecat: 1.4.0 → 1.6.1 - Version: 5.26.0 → 5.27.0 Co-Authored-By: Claude Sonnet 4.6 --- Directory.Build.props | 2 +- Directory.Packages.props | 24 ++++++++++++------------ 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/Directory.Build.props b/Directory.Build.props index f9a057101..a07b3a572 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -11,7 +11,7 @@ 1570;1571;1572;1573;1574;1587;1591;1701;1702;1711;1735;0618;VSTHRD200 true enable - 5.26.0 + 5.27.0 $(PackageProjectUrl) true true diff --git a/Directory.Packages.props b/Directory.Packages.props index ab2abddb2..958f9fadc 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -21,15 +21,15 @@ - - + + - + - + - + @@ -87,13 +87,13 @@ - - - - - - - + + + + + + + From 78375f246d9212c6ec10a584891bb199f0a6dff4 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Tue, 31 Mar 2026 17:05:24 -0500 Subject: [PATCH 4/6] Fix NU1107 Microsoft.CodeAnalysis.Common version conflict Pin Microsoft.CodeAnalysis.Common to 4.14.0 in Directory.Packages.props to resolve the NuGet conflict between Microsoft.CodeAnalysis.Workspaces.MSBuild (which requires 4.14.0) and Microsoft.EntityFrameworkCore.Design (which pulls in 4.8.0 transitively). This was causing all 20 CI workflows to fail. Co-Authored-By: Claude Sonnet 4.6 --- Directory.Packages.props | 1 + 1 file changed, 1 insertion(+) diff --git a/Directory.Packages.props b/Directory.Packages.props index 958f9fadc..410814b24 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -37,6 +37,7 @@ + From dae08d36a09bba36d540ba2b9fe82d4a65251f75 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Tue, 31 Mar 2026 18:05:49 -0500 Subject: [PATCH 5/6] Fix all external project references reintroduced by merge The merge brought back local project references to external repos (jasperfx, weasel, marten). Replace all with proper NuGet package references to match the existing Directory.Packages.props versions. Also add Microsoft.CodeAnalysis.CSharp and Microsoft.CodeAnalysis.Analyzers to support the new Wolverine.SourceGeneration Roslyn analyzer project. Co-Authored-By: Claude Sonnet 4.6 --- Directory.Packages.props | 2 ++ src/Http/Wolverine.Http.Marten/Wolverine.Http.Marten.csproj | 2 +- .../MySql/Wolverine.MySql/Wolverine.MySql.csproj | 2 +- .../Oracle/Wolverine.Oracle/Wolverine.Oracle.csproj | 2 +- .../Wolverine.EntityFrameworkCore.csproj | 2 +- src/Persistence/Wolverine.Marten/Wolverine.Marten.csproj | 2 +- .../Wolverine.Postgresql/Wolverine.Postgresql.csproj | 2 +- src/Persistence/Wolverine.RDBMS/Wolverine.RDBMS.csproj | 2 +- .../Wolverine.SqlServer/Wolverine.SqlServer.csproj | 2 +- src/Persistence/Wolverine.Sqlite/Wolverine.Sqlite.csproj | 2 +- src/Wolverine/Wolverine.csproj | 6 +++--- 11 files changed, 14 insertions(+), 12 deletions(-) diff --git a/Directory.Packages.props b/Directory.Packages.props index e8b7a4cd3..74927b9f3 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -37,7 +37,9 @@ + + diff --git a/src/Http/Wolverine.Http.Marten/Wolverine.Http.Marten.csproj b/src/Http/Wolverine.Http.Marten/Wolverine.Http.Marten.csproj index 4051eb8ba..0e492335b 100644 --- a/src/Http/Wolverine.Http.Marten/Wolverine.Http.Marten.csproj +++ b/src/Http/Wolverine.Http.Marten/Wolverine.Http.Marten.csproj @@ -13,7 +13,7 @@ - + diff --git a/src/Persistence/MySql/Wolverine.MySql/Wolverine.MySql.csproj b/src/Persistence/MySql/Wolverine.MySql/Wolverine.MySql.csproj index 85a8aa60f..737a3d449 100644 --- a/src/Persistence/MySql/Wolverine.MySql/Wolverine.MySql.csproj +++ b/src/Persistence/MySql/Wolverine.MySql/Wolverine.MySql.csproj @@ -16,7 +16,7 @@ - + diff --git a/src/Persistence/Oracle/Wolverine.Oracle/Wolverine.Oracle.csproj b/src/Persistence/Oracle/Wolverine.Oracle/Wolverine.Oracle.csproj index 699c3386a..171ceec48 100644 --- a/src/Persistence/Oracle/Wolverine.Oracle/Wolverine.Oracle.csproj +++ b/src/Persistence/Oracle/Wolverine.Oracle/Wolverine.Oracle.csproj @@ -16,7 +16,7 @@ - + diff --git a/src/Persistence/Wolverine.EntityFrameworkCore/Wolverine.EntityFrameworkCore.csproj b/src/Persistence/Wolverine.EntityFrameworkCore/Wolverine.EntityFrameworkCore.csproj index e4e78cfbe..043a41392 100644 --- a/src/Persistence/Wolverine.EntityFrameworkCore/Wolverine.EntityFrameworkCore.csproj +++ b/src/Persistence/Wolverine.EntityFrameworkCore/Wolverine.EntityFrameworkCore.csproj @@ -15,7 +15,7 @@ - + diff --git a/src/Persistence/Wolverine.Marten/Wolverine.Marten.csproj b/src/Persistence/Wolverine.Marten/Wolverine.Marten.csproj index 9f62070a1..eeacb1aad 100644 --- a/src/Persistence/Wolverine.Marten/Wolverine.Marten.csproj +++ b/src/Persistence/Wolverine.Marten/Wolverine.Marten.csproj @@ -12,7 +12,7 @@ - + diff --git a/src/Persistence/Wolverine.Postgresql/Wolverine.Postgresql.csproj b/src/Persistence/Wolverine.Postgresql/Wolverine.Postgresql.csproj index fd868f21f..11b341f44 100644 --- a/src/Persistence/Wolverine.Postgresql/Wolverine.Postgresql.csproj +++ b/src/Persistence/Wolverine.Postgresql/Wolverine.Postgresql.csproj @@ -15,7 +15,7 @@ - + diff --git a/src/Persistence/Wolverine.RDBMS/Wolverine.RDBMS.csproj b/src/Persistence/Wolverine.RDBMS/Wolverine.RDBMS.csproj index ea65cd880..1b9054da1 100644 --- a/src/Persistence/Wolverine.RDBMS/Wolverine.RDBMS.csproj +++ b/src/Persistence/Wolverine.RDBMS/Wolverine.RDBMS.csproj @@ -14,7 +14,7 @@ - + diff --git a/src/Persistence/Wolverine.SqlServer/Wolverine.SqlServer.csproj b/src/Persistence/Wolverine.SqlServer/Wolverine.SqlServer.csproj index 68d611f32..f4ca5ac90 100644 --- a/src/Persistence/Wolverine.SqlServer/Wolverine.SqlServer.csproj +++ b/src/Persistence/Wolverine.SqlServer/Wolverine.SqlServer.csproj @@ -13,7 +13,7 @@ - + diff --git a/src/Persistence/Wolverine.Sqlite/Wolverine.Sqlite.csproj b/src/Persistence/Wolverine.Sqlite/Wolverine.Sqlite.csproj index dc2c66565..f93e4591c 100644 --- a/src/Persistence/Wolverine.Sqlite/Wolverine.Sqlite.csproj +++ b/src/Persistence/Wolverine.Sqlite/Wolverine.Sqlite.csproj @@ -15,7 +15,7 @@ - + diff --git a/src/Wolverine/Wolverine.csproj b/src/Wolverine/Wolverine.csproj index d115d361b..04fe9c7e0 100644 --- a/src/Wolverine/Wolverine.csproj +++ b/src/Wolverine/Wolverine.csproj @@ -19,9 +19,9 @@ - - - + + + From f578b7e950e2b36b9bb955c34aeb8dc56c9b3a60 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Tue, 31 Mar 2026 18:09:17 -0500 Subject: [PATCH 6/6] Fix Polecat external project reference reintroduced by merge Replace local project reference to ../../../../polecat/src/Polecat/Polecat.csproj with the NuGet package reference, matching the existing Polecat 1.6.1 version in Directory.Packages.props. This was causing CS0246 errors for all Polecat types in CI where the external repo is not present. Co-Authored-By: Claude Sonnet 4.6 --- src/Persistence/Wolverine.Polecat/Wolverine.Polecat.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Persistence/Wolverine.Polecat/Wolverine.Polecat.csproj b/src/Persistence/Wolverine.Polecat/Wolverine.Polecat.csproj index 46b7fd066..9d63c8c5d 100644 --- a/src/Persistence/Wolverine.Polecat/Wolverine.Polecat.csproj +++ b/src/Persistence/Wolverine.Polecat/Wolverine.Polecat.csproj @@ -14,7 +14,7 @@ - +