Skip to content

Commit edd3014

Browse files
Add InMemoryServiceBusTopicSubscriptionAssertions.ReceiveMessageFromSessionAsync (#53)
1 parent bb9d4ca commit edd3014

File tree

6 files changed

+187
-23
lines changed

6 files changed

+187
-23
lines changed

Directory.Build.props

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -47,25 +47,4 @@
4747
<PackageReference Include="Microsoft.SourceLink.GitHub" PrivateAssets="All" />
4848
</ItemGroup>
4949

50-
<!-- Tests -->
51-
52-
<ItemGroup Condition="'$(IsTestProject)' == 'true'">
53-
54-
<PackageReference Include="Microsoft.NET.Test.Sdk" />
55-
<PackageReference Include="MSTest.TestAdapter" />
56-
<PackageReference Include="MSTest.TestFramework" />
57-
<PackageReference Include="coverlet.msbuild">
58-
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
59-
<PrivateAssets>all</PrivateAssets>
60-
</PackageReference>
61-
<PackageReference Include="coverlet.collector">
62-
<PrivateAssets>all</PrivateAssets>
63-
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
64-
</PackageReference>
65-
<PackageReference Include="FluentAssertions" />
66-
<PackageReference Include="Microsoft.Extensions.TimeProvider.Testing" />
67-
<Using Include="Microsoft.VisualStudio.TestTools.UnitTesting" />
68-
<Using Include="FluentAssertions" />
69-
</ItemGroup>
70-
7150
</Project>

docs/service-bus.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ There are following assertions available for in-memory service bus types:
259259
### `InMemoryServiceBusTopicSubscription`
260260

261261
- `.Should().BeEmptyAsync()`
262+
- `.Should().ReceiveMessageFromSessionAsync(string? sessionId = null, TimeSpan? maxWaitTime = null)`
262263

263264
## Hooks
264265

src/Spotflow.InMemory.Azure.ServiceBus.FluentAssertions/InMemoryServiceBusTopicSubscriptionAssertions.cs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
using Azure.Messaging.ServiceBus;
2+
13
using FluentAssertions.Primitives;
24

35
using Spotflow.InMemory.Azure.ServiceBus.FluentAssertions.Internal;
@@ -16,4 +18,51 @@ public async Task BeEmptyAsync(TimeSpan? maxWaitTime = null, string? because = n
1618

1719
await ServiceBusAssertionHelpers.EntityShouldBeEmptyAsync(entity, () => Subject.MessageCount, maxWaitTime, because, becauseArgs);
1820
}
21+
22+
/// <summary>
23+
/// Accepts a session and receives a message from the session.
24+
/// </summary>
25+
/// <param name="sessionId">Session ID to accept. If null, the next available session will be accepted.</param>
26+
/// <param name="maxWaitTime">Maximum wait time for receiving the message. Default is 8 seconds.</param>
27+
/// <param name="cancellationToken">Cancellation token to cancel the operation.</param>
28+
public async Task<ServiceBusReceivedMessage> ReceiveMessageFromSessionAsync(
29+
string? sessionId = null,
30+
TimeSpan? maxWaitTime = null,
31+
TimeProvider? timeProvider = null,
32+
CancellationToken cancellationToken = default)
33+
{
34+
await Task.Yield();
35+
36+
maxWaitTime ??= TimeSpan.FromSeconds(8);
37+
38+
using var timeCts = new CancellationTokenSource(maxWaitTime.Value, timeProvider ?? TimeProvider.System);
39+
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeCts.Token);
40+
41+
var (topicName, subscriptionName) = (Subject.TopicName, Subject.SubscriptionName);
42+
43+
await using var receiverClient = InMemoryServiceBusClient.FromNamespace(Subject.Namespace);
44+
45+
try
46+
{
47+
ServiceBusSessionReceiver session;
48+
49+
if (sessionId is null)
50+
{
51+
session = await receiverClient.AcceptNextSessionAsync(topicName, subscriptionName, cancellationToken: cts.Token);
52+
}
53+
else
54+
{
55+
session = await receiverClient.AcceptSessionAsync(topicName, subscriptionName, sessionId, cancellationToken: cts.Token);
56+
}
57+
58+
await using (session)
59+
{
60+
return await session.ReceiveMessageAsync(cancellationToken: cts.Token);
61+
}
62+
}
63+
catch (OperationCanceledException ex) when (cts.IsCancellationRequested)
64+
{
65+
throw new OperationCanceledException("No session message received soon enough.", ex, cts.Token);
66+
}
67+
}
1968
}

src/Spotflow.InMemory.Azure.ServiceBus/Resources/InMemoryServiceBusTopic.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ internal override bool TryAddMessages(IReadOnlyList<ServiceBusMessage> messages,
6060

6161
public InMemoryServiceBusSubscription? FindSubscription(string subscriptionName)
6262
{
63+
ArgumentException.ThrowIfNullOrWhiteSpace(subscriptionName);
64+
6365
if (!_subscriptions.TryGetValue(subscriptionName, out var subscription))
6466
{
6567
return null;
@@ -68,8 +70,24 @@ internal override bool TryAddMessages(IReadOnlyList<ServiceBusMessage> messages,
6870
return subscription;
6971
}
7072

73+
public InMemoryServiceBusSubscription GetSubscription(string subscriptionName)
74+
{
75+
ArgumentException.ThrowIfNullOrWhiteSpace(subscriptionName);
76+
77+
var sub = FindSubscription(subscriptionName);
78+
79+
if (sub is null)
80+
{
81+
throw new InvalidOperationException($"Subscription '{subscriptionName}' not found in topic '{Namespace.Name}/{TopicName}'");
82+
}
83+
84+
return sub;
85+
}
86+
7187
public InMemoryServiceBusSubscription AddSubscription(string subscriptionName, InMemoryServiceBusSubscriptionOptions? options = null)
7288
{
89+
ArgumentException.ThrowIfNullOrWhiteSpace(subscriptionName);
90+
7391
var subscription = new InMemoryServiceBusSubscription(subscriptionName, options ?? new(), this);
7492

7593
if (!_subscriptions.TryAdd(subscriptionName, subscription))

tests/Tests/ServiceBus/FluentAssertionsTests.cs

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
using FluentAssertions.Execution;
44

5+
using Microsoft.Extensions.Time.Testing;
6+
57
using Spotflow.InMemory.Azure.ServiceBus;
68
using Spotflow.InMemory.Azure.ServiceBus.FluentAssertions;
79

@@ -98,4 +100,101 @@ public async Task Non_Empty_Subscription_Should_Not_Be_Empty_And_Then_Become_Emp
98100
await subscription.Should().BeEmptyAsync(maxWaitTime: TimeSpan.FromMilliseconds(100));
99101
}
100102

103+
[TestMethod]
104+
public async Task Message_From_Next_Session_Should_Be_Received()
105+
{
106+
var provider = new InMemoryServiceBusProvider();
107+
108+
var topic = provider.AddNamespace().AddTopic("test-topic");
109+
var subscription = topic.AddSubscription("test", new() { EnableSessions = true });
110+
111+
var task = subscription.Should().ReceiveMessageFromSessionAsync();
112+
113+
await using var client = InMemoryServiceBusClient.FromNamespace(topic.Namespace);
114+
115+
await using var producerClient = client.CreateSender("test-topic");
116+
117+
var message = new ServiceBusMessage(BinaryData.FromString("Hello, world!"))
118+
{
119+
SessionId = "session-1"
120+
};
121+
122+
task.IsCompleted.Should().BeFalse();
123+
124+
await producerClient.SendMessageAsync(message);
125+
126+
var receivedMessage = await task;
127+
128+
receivedMessage.Should().NotBeNull();
129+
receivedMessage.SessionId.Should().Be("session-1");
130+
receivedMessage.Body.ToString().Should().Be("Hello, world!");
131+
}
132+
133+
[TestMethod]
134+
public async Task Message_From_A_Specific_Session_Should_Be_Received()
135+
{
136+
var provider = new InMemoryServiceBusProvider();
137+
138+
var topic = provider.AddNamespace().AddTopic("test-topic");
139+
var subscription = topic.AddSubscription("test", new() { EnableSessions = true });
140+
141+
var task = subscription.Should().ReceiveMessageFromSessionAsync(sessionId: "session-1");
142+
143+
await using var client = InMemoryServiceBusClient.FromNamespace(topic.Namespace);
144+
145+
await using var producerClient = client.CreateSender("test-topic");
146+
147+
var message = new ServiceBusMessage(BinaryData.FromString("Hello, world!"))
148+
{
149+
SessionId = "session-1"
150+
};
151+
task.IsCompleted.Should().BeFalse();
152+
153+
await producerClient.SendMessageAsync(message);
154+
155+
var receivedMessage = await task;
156+
157+
receivedMessage.Should().NotBeNull();
158+
receivedMessage.SessionId.Should().Be("session-1");
159+
receivedMessage.Body.ToString().Should().Be("Hello, world!");
160+
}
161+
162+
[TestMethod]
163+
public async Task Message_From_Non_Existent_Session_Should_Throw()
164+
{
165+
var provider = new InMemoryServiceBusProvider();
166+
var topic = provider.AddNamespace().AddTopic("test-topic");
167+
var subscription = topic.AddSubscription("test", new() { EnableSessions = true });
168+
169+
await using var client = InMemoryServiceBusClient.FromNamespace(topic.Namespace);
170+
await using var producerClient = client.CreateSender("test-topic");
171+
172+
var message = new ServiceBusMessage(BinaryData.FromString("Hello, world from session 1."))
173+
{
174+
SessionId = "session-1"
175+
};
176+
177+
await producerClient.SendMessageAsync(message);
178+
179+
var timeProvider = new FakeTimeProvider();
180+
181+
var task = subscription.Should().ReceiveMessageFromSessionAsync(sessionId: "session-2", maxWaitTime: TimeSpan.FromSeconds(16), timeProvider: timeProvider);
182+
183+
task.IsCompleted.Should().BeFalse();
184+
185+
timeProvider.Advance(TimeSpan.FromSeconds(14));
186+
187+
task.IsCompleted.Should().BeFalse();
188+
189+
while (!task.IsCompleted) // Periodically advance the time to make sure the CTS registrations are triggered no matter the execution order of underlying sync primitives.
190+
{
191+
timeProvider.Advance(TimeSpan.FromSeconds(1));
192+
await Task.Delay(10);
193+
}
194+
195+
var act = () => task;
196+
197+
await act.Should().ThrowAsync<OperationCanceledException>().WithMessage("No session message received soon enough.");
198+
}
199+
101200
}

tests/Tests/Tests.csproj

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,26 @@
1414
<PackageReference Include="Azure.ResourceManager.EventHubs" />
1515
<PackageReference Include="Microsoft.Extensions.Logging" />
1616
<PackageReference Include="Microsoft.Extensions.Logging.Console" />
17-
</ItemGroup>
18-
17+
</ItemGroup>
18+
19+
<ItemGroup>
20+
<PackageReference Include="Microsoft.NET.Test.Sdk" />
21+
<PackageReference Include="MSTest.TestAdapter" />
22+
<PackageReference Include="MSTest.TestFramework" />
23+
<PackageReference Include="coverlet.msbuild">
24+
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
25+
<PrivateAssets>all</PrivateAssets>
26+
</PackageReference>
27+
<PackageReference Include="coverlet.collector">
28+
<PrivateAssets>all</PrivateAssets>
29+
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
30+
</PackageReference>
31+
<PackageReference Include="FluentAssertions" />
32+
<PackageReference Include="Microsoft.Extensions.TimeProvider.Testing" />
33+
<Using Include="Microsoft.VisualStudio.TestTools.UnitTesting" />
34+
<Using Include="FluentAssertions" />
35+
</ItemGroup>
36+
1937
<ItemGroup>
2038
<ProjectReference Include="..\..\src\Spotflow.InMemory.Azure.EventHubs\Spotflow.InMemory.Azure.EventHubs.csproj" />
2139
<ProjectReference Include="..\..\src\Spotflow.InMemory.Azure.KeyVault\Spotflow.InMemory.Azure.KeyVault.csproj" />

0 commit comments

Comments
 (0)