diff --git a/Directory.Build.props b/Directory.Build.props index f9a057101..a07b3a572 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -11,7 +11,7 @@ 1570;1571;1572;1573;1574;1587;1591;1701;1702;1711;1735;0618;VSTHRD200 true enable - 5.26.0 + 5.27.0 $(PackageProjectUrl) true true diff --git a/Directory.Packages.props b/Directory.Packages.props index fa192536d..74927b9f3 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -21,15 +21,15 @@ - - + + - + - + - + @@ -37,8 +37,9 @@ - - + + + @@ -89,13 +90,13 @@ - - - - - - - + + + + + + + diff --git a/build/CITargets.cs b/build/CITargets.cs index 349f5bf12..3541b89f4 100644 --- a/build/CITargets.cs +++ b/build/CITargets.cs @@ -51,6 +51,32 @@ bool IsToolAvailable(string toolName) WaitForLocalStackToBeReady(); if (services.Contains("asb-emulator")) WaitForAzureServiceBusEmulatorToBeReady(); + if (services.Contains("kafka")) + WaitForKafkaToBeReady(); + } + + void WaitForKafkaToBeReady() + { + var attempt = 0; + while (attempt < 30) + { + try + { + using var tcpClient = new System.Net.Sockets.TcpClient(); + tcpClient.Connect("localhost", 9092); + Log.Information("Kafka is up and ready!"); + return; + } + catch (Exception) + { + // ignore connection errors + } + + Thread.Sleep(2000); + attempt++; + } + + Log.Warning("Kafka did not become ready after 60 seconds"); } void WaitForSqlServerToBeReady() @@ -316,7 +342,7 @@ void BuildTestProjectsWithFramework(string frameworkOverride, params AbsolutePat var tests = RootDirectory / "src" / "Transports" / "Kafka" / "Wolverine.Kafka.Tests" / "Wolverine.Kafka.Tests.csproj"; BuildTestProjects(tests); - StartDockerServices("postgresql"); + StartDockerServices("kafka", "postgresql"); RunSingleProjectOneClassAtATime(tests); }); diff --git a/src/Http/Wolverine.Http.Marten/Wolverine.Http.Marten.csproj b/src/Http/Wolverine.Http.Marten/Wolverine.Http.Marten.csproj index 4051eb8ba..0e492335b 100644 --- a/src/Http/Wolverine.Http.Marten/Wolverine.Http.Marten.csproj +++ b/src/Http/Wolverine.Http.Marten/Wolverine.Http.Marten.csproj @@ -13,7 +13,7 @@ - + diff --git a/src/Persistence/MySql/Wolverine.MySql/Wolverine.MySql.csproj b/src/Persistence/MySql/Wolverine.MySql/Wolverine.MySql.csproj index 85a8aa60f..737a3d449 100644 --- a/src/Persistence/MySql/Wolverine.MySql/Wolverine.MySql.csproj +++ b/src/Persistence/MySql/Wolverine.MySql/Wolverine.MySql.csproj @@ -16,7 +16,7 @@ - + diff --git a/src/Persistence/Oracle/Wolverine.Oracle/Wolverine.Oracle.csproj b/src/Persistence/Oracle/Wolverine.Oracle/Wolverine.Oracle.csproj index 699c3386a..171ceec48 100644 --- a/src/Persistence/Oracle/Wolverine.Oracle/Wolverine.Oracle.csproj +++ b/src/Persistence/Oracle/Wolverine.Oracle/Wolverine.Oracle.csproj @@ -16,7 +16,7 @@ - + diff --git a/src/Persistence/Wolverine.EntityFrameworkCore/Wolverine.EntityFrameworkCore.csproj b/src/Persistence/Wolverine.EntityFrameworkCore/Wolverine.EntityFrameworkCore.csproj index e4e78cfbe..043a41392 100644 --- a/src/Persistence/Wolverine.EntityFrameworkCore/Wolverine.EntityFrameworkCore.csproj +++ b/src/Persistence/Wolverine.EntityFrameworkCore/Wolverine.EntityFrameworkCore.csproj @@ -15,7 +15,7 @@ - + diff --git a/src/Persistence/Wolverine.Marten/Wolverine.Marten.csproj b/src/Persistence/Wolverine.Marten/Wolverine.Marten.csproj index 9f62070a1..eeacb1aad 100644 --- a/src/Persistence/Wolverine.Marten/Wolverine.Marten.csproj +++ b/src/Persistence/Wolverine.Marten/Wolverine.Marten.csproj @@ -12,7 +12,7 @@ - + diff --git a/src/Persistence/Wolverine.Polecat/Wolverine.Polecat.csproj b/src/Persistence/Wolverine.Polecat/Wolverine.Polecat.csproj index 46b7fd066..9d63c8c5d 100644 --- a/src/Persistence/Wolverine.Polecat/Wolverine.Polecat.csproj +++ b/src/Persistence/Wolverine.Polecat/Wolverine.Polecat.csproj @@ -14,7 +14,7 @@ - + diff --git a/src/Persistence/Wolverine.Postgresql/Wolverine.Postgresql.csproj b/src/Persistence/Wolverine.Postgresql/Wolverine.Postgresql.csproj index fd868f21f..11b341f44 100644 --- a/src/Persistence/Wolverine.Postgresql/Wolverine.Postgresql.csproj +++ b/src/Persistence/Wolverine.Postgresql/Wolverine.Postgresql.csproj @@ -15,7 +15,7 @@ - + diff --git a/src/Persistence/Wolverine.RDBMS/Wolverine.RDBMS.csproj b/src/Persistence/Wolverine.RDBMS/Wolverine.RDBMS.csproj index ea65cd880..1b9054da1 100644 --- a/src/Persistence/Wolverine.RDBMS/Wolverine.RDBMS.csproj +++ b/src/Persistence/Wolverine.RDBMS/Wolverine.RDBMS.csproj @@ -14,7 +14,7 @@ - + diff --git a/src/Persistence/Wolverine.SqlServer/Wolverine.SqlServer.csproj b/src/Persistence/Wolverine.SqlServer/Wolverine.SqlServer.csproj index 68d611f32..f4ca5ac90 100644 --- a/src/Persistence/Wolverine.SqlServer/Wolverine.SqlServer.csproj +++ b/src/Persistence/Wolverine.SqlServer/Wolverine.SqlServer.csproj @@ -13,7 +13,7 @@ - + diff --git a/src/Persistence/Wolverine.Sqlite/Wolverine.Sqlite.csproj b/src/Persistence/Wolverine.Sqlite/Wolverine.Sqlite.csproj index dc2c66565..f93e4591c 100644 --- a/src/Persistence/Wolverine.Sqlite/Wolverine.Sqlite.csproj +++ b/src/Persistence/Wolverine.Sqlite/Wolverine.Sqlite.csproj @@ -15,7 +15,7 @@ - + diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaTransportTests.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaTransportTests.cs index 7a2b59224..7a2b7e40b 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaTransportTests.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/KafkaTransportTests.cs @@ -18,8 +18,9 @@ private KafkaTopicGroup BuildGroup(params string[] topics) public void specification_uniform_sets_config_on_group() { var group = BuildGroup("topic-a", "topic-b"); - new KafkaTopicGroupListenerConfiguration(group) + var config = new KafkaTopicGroupListenerConfiguration(group) .Specification(spec => spec.NumPartitions = 12); + ((IDelayedEndpointConfiguration)config).Apply(); var capturedSpec = new TopicSpecification { Name = "topic-a" }; group.SpecificationConfig.ShouldNotBeNull(); @@ -31,8 +32,9 @@ public void specification_uniform_sets_config_on_group() public void specification_per_topic_receives_topic_name() { var group = BuildGroup("topic-a", "topic-b"); - new KafkaTopicGroupListenerConfiguration(group) + var config = new KafkaTopicGroupListenerConfiguration(group) .Specification((topicName, spec) => spec.NumPartitions = topicName == "topic-a" ? 6 : 24); + ((IDelayedEndpointConfiguration)config).Apply(); group.SpecificationConfig.ShouldNotBeNull(); @@ -51,8 +53,9 @@ public void topic_creation_sets_func_on_group() var group = BuildGroup("topic-a", "topic-b"); Func func = (_, _) => Task.CompletedTask; - new KafkaTopicGroupListenerConfiguration(group) + var config = new KafkaTopicGroupListenerConfiguration(group) .TopicCreation(func); + ((IDelayedEndpointConfiguration)config).Apply(); group.CreateTopicFunc.ShouldNotBeNull(); group.CreateTopicFunc.ShouldBeSameAs(func); diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/publish_and_receive_raw_json.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/publish_and_receive_raw_json.cs index 01307ed32..52a98e3f7 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/publish_and_receive_raw_json.cs +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/publish_and_receive_raw_json.cs @@ -87,7 +87,10 @@ public async Task do_not_go_into_infinite_loop_with_garbage_data() }); producer.Flush(); - await Task.Delay(2.Minutes()); + // Wait long enough to detect any infinite retry loop, but not so long + // it needlessly inflates CI run time. 30 seconds is sufficient — a tight + // retry loop would exhaust resources well before then. + await Task.Delay(30.Seconds()); } public async Task DisposeAsync() diff --git a/src/Transports/Redis/Wolverine.Redis.Tests/rate_limiting_end_to_end.cs b/src/Transports/Redis/Wolverine.Redis.Tests/rate_limiting_end_to_end.cs index 69e73bfdf..1d3f1f141 100644 --- a/src/Transports/Redis/Wolverine.Redis.Tests/rate_limiting_end_to_end.cs +++ b/src/Transports/Redis/Wolverine.Redis.Tests/rate_limiting_end_to_end.cs @@ -11,16 +11,16 @@ namespace Wolverine.Redis.Tests; public class rate_limiting_end_to_end(ITestOutputHelper output) : IAsyncLifetime { - private readonly ConditionPoller _poller = new(output, maxRetries: 20, retryDelay: 1000.Milliseconds()); - private readonly RateLimit _limit = new(1, 2.Seconds()); - private IHost _host = null!; - private RedisRateLimitTracker _tracker = null!; - - public async Task InitializeAsync() - { + private readonly ConditionPoller _poller = new(output, maxRetries: 20, retryDelay: 1000.Milliseconds()); + private readonly RateLimit _limit = new(1, 2.Seconds()); + private IHost _host = null!; + private RedisRateLimitTracker _tracker = null!; + + public async Task InitializeAsync() + { var streamKey = $"rate-limit-{Guid.NewGuid():N}"; - var groupName = $"rate-limit-group-{Guid.NewGuid():N}"; - + var groupName = $"rate-limit-group-{Guid.NewGuid():N}"; + _tracker = new RedisRateLimitTracker(output); var endpointUri = new Uri($"redis://stream/0/{streamKey}"); @@ -30,6 +30,11 @@ public async Task InitializeAsync() opts.ApplicationAssembly = typeof(rate_limiting_end_to_end).Assembly; opts.Services.AddSingleton(_tracker); + // Fast polling so scheduled messages are picked up quickly after the + // listener restarts following the rate-limit pause. + opts.Durability.ScheduledJobFirstExecution = 100.Milliseconds(); + opts.Durability.ScheduledJobPollingTime = 200.Milliseconds(); + opts.UseRedisTransport(RedisContainerFixture.ConnectionString).AutoProvision(); opts.PublishAllMessages().ToRedisStream(streamKey); opts.ListenToRedisStream(streamKey, groupName) @@ -37,25 +42,25 @@ public async Task InitializeAsync() .StartFromBeginning(); opts.RateLimitEndpoint(endpointUri, _limit); - }).StartAsync(); - } - - public async Task DisposeAsync() - { - await _host.StopAsync(); - _host.Dispose(); - } - + }).StartAsync(); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + _host.Dispose(); + } + [Fact] public async Task rate_limited_messages_are_delayed_with_native_scheduling() { RedisRateLimitedMessage[] messages = [ new RedisRateLimitedMessage("A"), new RedisRateLimitedMessage("B") - ]; - var bus = _host.MessageBus(); - - output.WriteLine("{0} Sending message: {1}", DateTime.UtcNow, messages[0]); + ]; + var bus = _host.MessageBus(); + + output.WriteLine("{0} Sending message: {1}", DateTime.UtcNow, messages[0]); await bus.PublishAsync(messages[0]); output.WriteLine("{0} Sending message: {1}", DateTime.UtcNow, messages[1]); await bus.PublishAsync(messages[1]); @@ -68,7 +73,7 @@ await _poller.WaitForAsync("handled at least 2 messages", var buckets = handled.Select(x => RateLimitBucket.For(_limit, x.TimeStamp)).ToArray(); buckets[0].WindowStart.ShouldBeLessThan(buckets[1].WindowStart); - } + } } public record RedisRateLimitedMessage(string Id); @@ -86,7 +91,7 @@ public Task Handle(RedisRateLimitedMessage message, CancellationToken cancellati public class RedisRateLimitTracker(ITestOutputHelper output) { - private readonly ITestOutputHelper output = output; + private readonly ITestOutputHelper output = output; private readonly ConcurrentQueue<(DateTime, RedisRateLimitedMessage)> _handledMessages = []; public IReadOnlyCollection<(DateTime TimeStamp, RedisRateLimitedMessage Message)> HandledMessages => _handledMessages; diff --git a/src/Wolverine/Transports/ListeningAgent.cs b/src/Wolverine/Transports/ListeningAgent.cs index 5d2b35736..61e8252b0 100644 --- a/src/Wolverine/Transports/ListeningAgent.cs +++ b/src/Wolverine/Transports/ListeningAgent.cs @@ -169,6 +169,20 @@ public void LatchReceiver() } public async ValueTask StopAndDrainAsync() + { + await StopAndDrainCoreAsync(latchBeforeDrain: true); + } + + /// + /// Shared implementation for stop-and-drain. When is + /// true (normal shutdown), the receiver is latched before + /// so that the drain knows it is safe to wait for any in-flight messages to complete. + /// When false (pause triggered from within the handler pipeline, e.g. rate limiting), + /// the receiver is not pre-latched, so sees + /// _latched == false and returns immediately — avoiding a deadlock caused by the + /// current message's execute frame being on the call stack. + /// + private async ValueTask StopAndDrainCoreAsync(bool latchBeforeDrain) { if (Status == ListeningStatus.Stopped || Status == ListeningStatus.GloballyLatched) { @@ -189,7 +203,15 @@ public async ValueTask StopAndDrainAsync() await listener.StopAsync(); - LatchReceiver(); + // When called during normal shutdown, latch BEFORE drain so DrainAsync knows + // it can safely wait for in-flight messages to complete. + // When called from within the handler pipeline (e.g. PauseListenerContinuation), + // do NOT latch here: DrainAsync will see _latched==false and return immediately, + // preventing a deadlock with the current message's execute frame. + if (latchBeforeDrain) + { + LatchReceiver(); + } Listener = null; _receiver = null; @@ -301,7 +323,11 @@ public async ValueTask PauseAsync(TimeSpan pauseTime) { using var activity = WolverineTracing.ActivitySource.StartActivity(WolverineTracing.PausingListener); activity?.SetTag(WolverineTracing.EndpointAddress, Uri); - await StopAndDrainAsync(); + // Do NOT pre-latch the receiver here. PauseAsync may be called from within the + // handler pipeline (e.g. via RateLimitContinuation → PauseListenerContinuation). + // Pre-latching causes DrainAsync to wait for the ActionBlock to drain, which + // deadlocks because the current message's execute frame is still on the call stack. + await StopAndDrainCoreAsync(latchBeforeDrain: false); } catch (Exception e) { diff --git a/src/Wolverine/Wolverine.csproj b/src/Wolverine/Wolverine.csproj index d115d361b..04fe9c7e0 100644 --- a/src/Wolverine/Wolverine.csproj +++ b/src/Wolverine/Wolverine.csproj @@ -19,9 +19,9 @@ - - - + + +