Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
using JasperFx.Core;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine.Tracking;
using Xunit;

namespace Wolverine.AzureServiceBus.Tests.Bugs;

public class Bug_2283_purge_session_subscription : IAsyncLifetime
{
private IHost _host;

public async Task InitializeAsync()
{
// This should not throw even though the subscription has sessions enabled
// and AutoPurgeOnStartup is set. Before the fix, PurgeAsync on a session-enabled
// subscription used CreateReceiver instead of AcceptNextSessionAsync, which fails.
_host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseAzureServiceBusTesting()
.AutoProvision()
.AutoPurgeOnStartup();

opts.PublishMessage<Bug2283Message>()
.ToAzureServiceBusTopic("bug2283")
.SendInline();

opts.ListenToAzureServiceBusSubscription("bug2283sub")
.FromTopic("bug2283")
.RequireSessions(1)
.ProcessInline();
}).StartAsync();
}

public async Task DisposeAsync()
{
await _host.StopAsync();
_host.Dispose();
await AzureServiceBusTesting.DeleteAllEmulatorObjectsAsync();
}

[Fact]
public async Task can_start_with_auto_purge_and_session_enabled_subscription()
{
// If we got here, the host started successfully with AutoPurgeOnStartup
// and a session-enabled subscription without throwing.
// Now verify we can also send and receive messages through it.
Func<IMessageContext, Task> sendMany = async bus =>
{
await bus.SendAsync(new Bug2283Message("First"), new DeliveryOptions { GroupId = "session1" });
await bus.SendAsync(new Bug2283Message("Second"), new DeliveryOptions { GroupId = "session1" });
await bus.SendAsync(new Bug2283Message("Third"), new DeliveryOptions { GroupId = "session1" });
};

var session = await _host.TrackActivity()
.IncludeExternalTransports()
.Timeout(30.Seconds())
.ExecuteAndWaitAsync(sendMany);

session.Received.MessagesOf<Bug2283Message>().Select(x => x.Name)
.ShouldBe(["First", "Second", "Third"]);
}

[Fact]
public async Task can_purge_existing_messages_from_session_subscription()
{
// First, send some messages that will sit in the subscription
Func<IMessageContext, Task> sendMessages = async bus =>
{
await bus.SendAsync(new Bug2283Message("Pre1"), new DeliveryOptions { GroupId = "purge-test" });
await bus.SendAsync(new Bug2283Message("Pre2"), new DeliveryOptions { GroupId = "purge-test" });
};

await _host.TrackActivity()
.IncludeExternalTransports()
.Timeout(30.Seconds())
.ExecuteAndWaitAsync(sendMessages);

// Now start a second host with the same subscription + AutoPurgeOnStartup.
// This validates that purge works when there ARE messages in a session-enabled subscription.
using var host2 = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseAzureServiceBusTesting()
.AutoProvision()
.AutoPurgeOnStartup();

opts.PublishMessage<Bug2283Message>()
.ToAzureServiceBusTopic("bug2283")
.SendInline();

opts.ListenToAzureServiceBusSubscription("bug2283sub")
.FromTopic("bug2283")
.RequireSessions(1)
.ProcessInline();
}).StartAsync();

// Send new messages through host2 and verify only the new ones arrive
Func<IMessageContext, Task> sendNew = async bus =>
{
await bus.SendAsync(new Bug2283Message("New1"), new DeliveryOptions { GroupId = "purge-test-2" });
};

var session = await host2.TrackActivity()
.IncludeExternalTransports()
.Timeout(30.Seconds())
.ExecuteAndWaitAsync(sendNew);

var received = session.Received.MessagesOf<Bug2283Message>().Select(x => x.Name).ToArray();
received.ShouldContain("New1");

await host2.StopAsync();
}
}

public record Bug2283Message(string Name);

public static class Bug2283Handler
{
public static void Handle(Bug2283Message message)
{
// nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,29 +120,62 @@ internal async ValueTask SetupAsync(ServiceBusAdministrationClient client, ILogg

public async ValueTask PurgeAsync(ILogger logger)
{
try
await Parent.WithServiceBusClientAsync(async client =>
{
await Parent.WithServiceBusClientAsync(async client =>
try
{
var receiver = client.CreateReceiver(Topic.TopicName, SubscriptionName);

var stopwatch = new Stopwatch();
stopwatch.Start();
while (stopwatch.ElapsedMilliseconds < 2000)
if (Options.RequiresSession)
{
var messages = await receiver.ReceiveMessagesAsync(25, 1.Seconds());
if (!messages.Any())
{
return;
}

foreach (var message in messages) await receiver.CompleteMessageAsync(message);
await purgeWithSessions(client);
}
else
{
await purgeWithoutSessions(client);
}
});
}
catch (Exception e)
{
logger.LogDebug(e, "Error trying to purge Azure Service Bus subscription {SubscriptionName} for topic {TopicName}", SubscriptionName, Topic.TopicName);
}
});
}

private async Task purgeWithSessions(ServiceBusClient client)
{
var cancellation = new CancellationTokenSource();
cancellation.CancelAfter(2000);

var stopwatch = new Stopwatch();
stopwatch.Start();
while (stopwatch.ElapsedMilliseconds < 2000)
{
var session = await client.AcceptNextSessionAsync(Topic.TopicName, SubscriptionName, cancellationToken: cancellation.Token);

var messages = await session.ReceiveMessagesAsync(25, 1.Seconds(), cancellation.Token);
foreach (var message in messages) await session.CompleteMessageAsync(message, cancellation.Token);
while (messages.Any())
{
messages = await session.ReceiveMessagesAsync(25, 1.Seconds(), cancellation.Token);
foreach (var message in messages) await session.CompleteMessageAsync(message, cancellation.Token);
}
}
catch (Exception e)
}

private async Task purgeWithoutSessions(ServiceBusClient client)
{
var receiver = client.CreateReceiver(Topic.TopicName, SubscriptionName);

var stopwatch = new Stopwatch();
stopwatch.Start();
while (stopwatch.ElapsedMilliseconds < 2000)
{
logger.LogError(e, "Error trying to purge Azure Service Bus subscription {SubscriptionName} for topic {TopicName}", SubscriptionName, Topic.TopicName);
var messages = await receiver.ReceiveMessagesAsync(25, 1.Seconds());
if (!messages.Any())
{
return;
}

foreach (var message in messages) await receiver.CompleteMessageAsync(message);
}
}

Expand Down
Loading