Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<NoWarn>1570;1571;1572;1573;1574;1587;1591;1701;1702;1711;1735;0618;VSTHRD200</NoWarn>
<ImplicitUsings>true</ImplicitUsings>
<Nullable>enable</Nullable>
<Version>5.26.0</Version>
<Version>5.27.0</Version>
<RepositoryUrl>$(PackageProjectUrl)</RepositoryUrl>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
Expand Down
29 changes: 15 additions & 14 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,25 @@
<PackageVersion Include="Grpc.Core" Version="2.46.6" />
<PackageVersion Include="Grpc.Tools" Version="2.72.0" />
<PackageVersion Include="HtmlTags" Version="9.0.0" />
<PackageVersion Include="JasperFx" Version="1.22.0" />
<PackageVersion Include="JasperFx.Events" Version="1.24.1" />
<PackageVersion Include="JasperFx" Version="1.23.0" />
<PackageVersion Include="JasperFx.Events" Version="1.25.0" />
<PackageVersion Include="JasperFx.RuntimeCompiler" Version="4.4.0" />
<PackageVersion Include="Lamar.Microsoft.DependencyInjection" Version="15.0.1" />
<PackageVersion Include="Marten" Version="8.26.1" />
<PackageVersion Include="Marten" Version="8.28.0" />
<PackageVersion Include="Microsoft.Data.SqlClient" Version="6.1.3" />
<PackageVersion Include="Polecat" Version="1.4.0" />
<PackageVersion Include="Polecat" Version="1.6.1" />
<PackageVersion Include="Microsoft.Azure.Cosmos" Version="3.46.1" />
<PackageVersion Include="Marten.AspNetCore" Version="8.26.1" />
<PackageVersion Include="Marten.AspNetCore" Version="8.28.0" />
<PackageVersion Include="MemoryPack" Version="1.21.3" />
<PackageVersion Include="MessagePack" Version="3.1.3" />
<PackageVersion Include="Meziantou.Extensions.Logging.Xunit" Version="1.0.15" />
<PackageVersion Include="Microsoft.AspNetCore.Mvc.Testing" Version="9.0.6" />
<PackageVersion Include="Microsoft.AspNetCore.SignalR" Version="1.1.0" />
<PackageVersion Include="Microsoft.AspNetCore.SignalR.Client" Version="9.0.8" />
<PackageVersion Include="Microsoft.Azure.SignalR" Version="1.32.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp" Version="4.8.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.Analyzers" Version="3.3.4" />
<PackageVersion Include="Microsoft.CodeAnalysis.Analyzers" Version="3.11.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.Common" Version="4.14.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp" Version="4.14.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.NetAnalyzers" Version="5.0.3" />
<PackageVersion Include="Microsoft.Extensions.ApiDescription.Server" Version="9.0.5" />
<PackageVersion Include="Microsoft.FeatureManagement" Version="3.2.0" />
Expand Down Expand Up @@ -89,13 +90,13 @@
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="9.0.5" />
<PackageVersion Include="System.Net.NameResolution" Version="4.3.0" />
<PackageVersion Include="System.Threading.Tasks.Dataflow" Version="9.0.5" />
<PackageVersion Include="Weasel.Core" Version="8.10.2" />
<PackageVersion Include="Weasel.EntityFrameworkCore" Version="8.10.2" />
<PackageVersion Include="Weasel.MySql" Version="8.10.2" />
<PackageVersion Include="Weasel.Oracle" Version="8.10.2" />
<PackageVersion Include="Weasel.Postgresql" Version="8.10.2" />
<PackageVersion Include="Weasel.SqlServer" Version="8.10.2" />
<PackageVersion Include="Weasel.Sqlite" Version="8.10.2" />
<PackageVersion Include="Weasel.Core" Version="8.11.2" />
<PackageVersion Include="Weasel.EntityFrameworkCore" Version="8.11.2" />
<PackageVersion Include="Weasel.MySql" Version="8.11.2" />
<PackageVersion Include="Weasel.Oracle" Version="8.11.2" />
<PackageVersion Include="Weasel.Postgresql" Version="8.11.2" />
<PackageVersion Include="Weasel.SqlServer" Version="8.11.2" />
<PackageVersion Include="Weasel.Sqlite" Version="8.11.2" />
<PackageVersion Include="xunit" Version="2.9.3" />
<PackageVersion Include="xunit.assemblyfixture" Version="2.2.0" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.8.2" />
Expand Down
28 changes: 27 additions & 1 deletion build/CITargets.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,32 @@ bool IsToolAvailable(string toolName)
WaitForLocalStackToBeReady();
if (services.Contains("asb-emulator"))
WaitForAzureServiceBusEmulatorToBeReady();
if (services.Contains("kafka"))
WaitForKafkaToBeReady();
}

void WaitForKafkaToBeReady()
{
var attempt = 0;
while (attempt < 30)
{
try
{
using var tcpClient = new System.Net.Sockets.TcpClient();
tcpClient.Connect("localhost", 9092);
Log.Information("Kafka is up and ready!");
return;
}
catch (Exception)
{
// ignore connection errors
}

Thread.Sleep(2000);
attempt++;
}

Log.Warning("Kafka did not become ready after 60 seconds");
}

void WaitForSqlServerToBeReady()
Expand Down Expand Up @@ -316,7 +342,7 @@ void BuildTestProjectsWithFramework(string frameworkOverride, params AbsolutePat
var tests = RootDirectory / "src" / "Transports" / "Kafka" / "Wolverine.Kafka.Tests" / "Wolverine.Kafka.Tests.csproj";

BuildTestProjects(tests);
StartDockerServices("postgresql");
StartDockerServices("kafka", "postgresql");

RunSingleProjectOneClassAtATime(tests);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<ItemGroup>
<ProjectReference Include="..\..\Persistence\Wolverine.Marten\Wolverine.Marten.csproj" />
<ProjectReference Include="..\Wolverine.Http\Wolverine.Http.csproj" />
<ProjectReference Include="../../../../marten/src/Marten.AspNetCore/Marten.AspNetCore.csproj" />
<PackageReference Include="Marten.AspNetCore" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

<ItemGroup>
<PackageReference Include="MySqlConnector" />
<ProjectReference Include="..\..\..\..\..\weasel\src\Weasel.MySql\Weasel.MySql.csproj" />
<PackageReference Include="Weasel.MySql" />
</ItemGroup>

<Import Project="../../../../Analysis.Build.props" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

<ItemGroup>
<PackageReference Include="Oracle.ManagedDataAccess.Core" />
<ProjectReference Include="..\..\..\..\..\weasel\src\Weasel.Oracle\Weasel.Oracle.csproj" />
<PackageReference Include="Weasel.Oracle" />
</ItemGroup>

<Import Project="../../../../Analysis.Build.props" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<PackageReference Include="Microsoft.EntityFrameworkCore"/>
<PackageReference Include="Microsoft.EntityFrameworkCore.Design"/>
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational"/>
<ProjectReference Include="..\..\..\..\weasel\src\Weasel.EntityFrameworkCore\Weasel.EntityFrameworkCore.csproj" />
<PackageReference Include="Weasel.EntityFrameworkCore" />
<ProjectReference Include="..\Wolverine.RDBMS\Wolverine.RDBMS.csproj"/>
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion src/Persistence/Wolverine.Marten/Wolverine.Marten.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<ProjectReference Include="..\Wolverine.Postgresql\Wolverine.Postgresql.csproj" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="../../../../marten/src/Marten/Marten.csproj" />
<PackageReference Include="Marten" />
</ItemGroup>
<Import Project="../../../Analysis.Build.props" />
</Project>
2 changes: 1 addition & 1 deletion src/Persistence/Wolverine.Polecat/Wolverine.Polecat.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<ProjectReference Include="..\Wolverine.SqlServer\Wolverine.SqlServer.csproj" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="../../../../polecat/src/Polecat/Polecat.csproj" />
<PackageReference Include="Polecat" />
</ItemGroup>
<Import Project="../../../Analysis.Build.props" />
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\..\weasel\src\Weasel.Postgresql\Weasel.Postgresql.csproj" />
<PackageReference Include="Weasel.Postgresql" />
</ItemGroup>

<Import Project="../../../Analysis.Build.props" />
Expand Down
2 changes: 1 addition & 1 deletion src/Persistence/Wolverine.RDBMS/Wolverine.RDBMS.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\..\weasel\src\Weasel.Core\Weasel.Core.csproj" />
<PackageReference Include="Weasel.Core" />
</ItemGroup>

<Import Project="../../../Analysis.Build.props" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<EmbeddedResource Include="Schema/*.sql" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\..\weasel\src\Weasel.SqlServer\Weasel.SqlServer.csproj" />
<PackageReference Include="Weasel.SqlServer" />
</ItemGroup>

<Import Project="../../../Analysis.Build.props" />
Expand Down
2 changes: 1 addition & 1 deletion src/Persistence/Wolverine.Sqlite/Wolverine.Sqlite.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\..\weasel\src\Weasel.Sqlite\Weasel.Sqlite.csproj" />
<PackageReference Include="Weasel.Sqlite" />
</ItemGroup>

<Import Project="../../../Analysis.Build.props" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ private KafkaTopicGroup BuildGroup(params string[] topics)
public void specification_uniform_sets_config_on_group()
{
var group = BuildGroup("topic-a", "topic-b");
new KafkaTopicGroupListenerConfiguration(group)
var config = new KafkaTopicGroupListenerConfiguration(group)
.Specification(spec => spec.NumPartitions = 12);
((IDelayedEndpointConfiguration)config).Apply();

var capturedSpec = new TopicSpecification { Name = "topic-a" };
group.SpecificationConfig.ShouldNotBeNull();
Expand All @@ -31,8 +32,9 @@ public void specification_uniform_sets_config_on_group()
public void specification_per_topic_receives_topic_name()
{
var group = BuildGroup("topic-a", "topic-b");
new KafkaTopicGroupListenerConfiguration(group)
var config = new KafkaTopicGroupListenerConfiguration(group)
.Specification((topicName, spec) => spec.NumPartitions = topicName == "topic-a" ? 6 : 24);
((IDelayedEndpointConfiguration)config).Apply();

group.SpecificationConfig.ShouldNotBeNull();

Expand All @@ -51,8 +53,9 @@ public void topic_creation_sets_func_on_group()
var group = BuildGroup("topic-a", "topic-b");
Func<Confluent.Kafka.IAdminClient, string, Task> func = (_, _) => Task.CompletedTask;

new KafkaTopicGroupListenerConfiguration(group)
var config = new KafkaTopicGroupListenerConfiguration(group)
.TopicCreation(func);
((IDelayedEndpointConfiguration)config).Apply();

group.CreateTopicFunc.ShouldNotBeNull();
group.CreateTopicFunc.ShouldBeSameAs(func);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ public async Task do_not_go_into_infinite_loop_with_garbage_data()
});
producer.Flush();

await Task.Delay(2.Minutes());
// Wait long enough to detect any infinite retry loop, but not so long
// it needlessly inflates CI run time. 30 seconds is sufficient — a tight
// retry loop would exhaust resources well before then.
await Task.Delay(30.Seconds());
}

public async Task DisposeAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ namespace Wolverine.Redis.Tests;

public class rate_limiting_end_to_end(ITestOutputHelper output) : IAsyncLifetime
{
private readonly ConditionPoller _poller = new(output, maxRetries: 20, retryDelay: 1000.Milliseconds());
private readonly RateLimit _limit = new(1, 2.Seconds());
private IHost _host = null!;
private RedisRateLimitTracker _tracker = null!;
public async Task InitializeAsync()
{
private readonly ConditionPoller _poller = new(output, maxRetries: 20, retryDelay: 1000.Milliseconds());
private readonly RateLimit _limit = new(1, 2.Seconds());
private IHost _host = null!;
private RedisRateLimitTracker _tracker = null!;

public async Task InitializeAsync()
{
var streamKey = $"rate-limit-{Guid.NewGuid():N}";
var groupName = $"rate-limit-group-{Guid.NewGuid():N}";
var groupName = $"rate-limit-group-{Guid.NewGuid():N}";

_tracker = new RedisRateLimitTracker(output);
var endpointUri = new Uri($"redis://stream/0/{streamKey}");

Expand All @@ -30,32 +30,37 @@ public async Task InitializeAsync()
opts.ApplicationAssembly = typeof(rate_limiting_end_to_end).Assembly;
opts.Services.AddSingleton(_tracker);

// Fast polling so scheduled messages are picked up quickly after the
// listener restarts following the rate-limit pause.
opts.Durability.ScheduledJobFirstExecution = 100.Milliseconds();
opts.Durability.ScheduledJobPollingTime = 200.Milliseconds();

opts.UseRedisTransport(RedisContainerFixture.ConnectionString).AutoProvision();
opts.PublishAllMessages().ToRedisStream(streamKey);
opts.ListenToRedisStream(streamKey, groupName)
.UseDurableInbox()
.StartFromBeginning();

opts.RateLimitEndpoint(endpointUri, _limit);
}).StartAsync();
}
public async Task DisposeAsync()
{
await _host.StopAsync();
_host.Dispose();
}
}).StartAsync();
}

public async Task DisposeAsync()
{
await _host.StopAsync();
_host.Dispose();
}

[Fact]
public async Task rate_limited_messages_are_delayed_with_native_scheduling()
{
RedisRateLimitedMessage[] messages = [
new RedisRateLimitedMessage("A"),
new RedisRateLimitedMessage("B")
];
var bus = _host.MessageBus();
output.WriteLine("{0} Sending message: {1}", DateTime.UtcNow, messages[0]);
];
var bus = _host.MessageBus();

output.WriteLine("{0} Sending message: {1}", DateTime.UtcNow, messages[0]);
await bus.PublishAsync(messages[0]);
output.WriteLine("{0} Sending message: {1}", DateTime.UtcNow, messages[1]);
await bus.PublishAsync(messages[1]);
Expand All @@ -68,7 +73,7 @@ await _poller.WaitForAsync("handled at least 2 messages",

var buckets = handled.Select(x => RateLimitBucket.For(_limit, x.TimeStamp)).ToArray();
buckets[0].WindowStart.ShouldBeLessThan(buckets[1].WindowStart);
}
}
}

public record RedisRateLimitedMessage(string Id);
Expand All @@ -86,7 +91,7 @@ public Task Handle(RedisRateLimitedMessage message, CancellationToken cancellati

public class RedisRateLimitTracker(ITestOutputHelper output)
{
private readonly ITestOutputHelper output = output;
private readonly ITestOutputHelper output = output;
private readonly ConcurrentQueue<(DateTime, RedisRateLimitedMessage)> _handledMessages = [];

public IReadOnlyCollection<(DateTime TimeStamp, RedisRateLimitedMessage Message)> HandledMessages => _handledMessages;
Expand Down
30 changes: 28 additions & 2 deletions src/Wolverine/Transports/ListeningAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,20 @@ public void LatchReceiver()
}

public async ValueTask StopAndDrainAsync()
{
await StopAndDrainCoreAsync(latchBeforeDrain: true);
}

/// <summary>
/// Shared implementation for stop-and-drain. When <paramref name="latchBeforeDrain"/> is
/// <c>true</c> (normal shutdown), the receiver is latched before <see cref="IReceiver.DrainAsync"/>
/// so that the drain knows it is safe to wait for any in-flight messages to complete.
/// When <c>false</c> (pause triggered from within the handler pipeline, e.g. rate limiting),
/// the receiver is <em>not</em> pre-latched, so <see cref="IReceiver.DrainAsync"/> sees
/// <c>_latched == false</c> and returns immediately — avoiding a deadlock caused by the
/// current message's execute frame being on the call stack.
/// </summary>
private async ValueTask StopAndDrainCoreAsync(bool latchBeforeDrain)
{
if (Status == ListeningStatus.Stopped || Status == ListeningStatus.GloballyLatched)
{
Expand All @@ -189,7 +203,15 @@ public async ValueTask StopAndDrainAsync()

await listener.StopAsync();

LatchReceiver();
// When called during normal shutdown, latch BEFORE drain so DrainAsync knows
// it can safely wait for in-flight messages to complete.
// When called from within the handler pipeline (e.g. PauseListenerContinuation),
// do NOT latch here: DrainAsync will see _latched==false and return immediately,
// preventing a deadlock with the current message's execute frame.
if (latchBeforeDrain)
{
LatchReceiver();
}

Listener = null;
_receiver = null;
Expand Down Expand Up @@ -301,7 +323,11 @@ public async ValueTask PauseAsync(TimeSpan pauseTime)
{
using var activity = WolverineTracing.ActivitySource.StartActivity(WolverineTracing.PausingListener);
activity?.SetTag(WolverineTracing.EndpointAddress, Uri);
await StopAndDrainAsync();
// Do NOT pre-latch the receiver here. PauseAsync may be called from within the
// handler pipeline (e.g. via RateLimitContinuation → PauseListenerContinuation).
// Pre-latching causes DrainAsync to wait for the ActionBlock to drain, which
// deadlocks because the current message's execute frame is still on the call stack.
await StopAndDrainCoreAsync(latchBeforeDrain: false);
}
catch (Exception e)
{
Expand Down
6 changes: 3 additions & 3 deletions src/Wolverine/Wolverine.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
<PackageReference Include="System.Threading.Tasks.Dataflow" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\jasperfx\src\JasperFx\JasperFx.csproj" />
<ProjectReference Include="..\..\..\jasperfx\src\JasperFx.Events\JasperFx.Events.csproj" />
<ProjectReference Include="..\..\..\jasperfx\src\JasperFx.RuntimeCompiler\JasperFx.RuntimeCompiler.csproj" />
<PackageReference Include="JasperFx" />
<PackageReference Include="JasperFx.Events" />
<PackageReference Include="JasperFx.RuntimeCompiler" />
</ItemGroup>

<ItemGroup>
Expand Down
Loading