Skip to content

Commit 1ed4f41

Browse files
N-Olbertmichaelstaib
authored andcommitted
Fix: Allow reconnect on subscriptions with prefixed redis topic (#8731)
1 parent 8528caa commit 1ed4f41

File tree

3 files changed

+49
-8
lines changed

3 files changed

+49
-8
lines changed

src/HotChocolate/Core/src/Subscriptions/DefaultPubSub.cs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -170,10 +170,7 @@ private async ValueTask<DefaultTopic<TMessage>> CreateTopicAsync<TMessage>(
170170
{
171171
var eventTopic = OnCreateTopic<TMessage>(formattedTopic, bufferCapacity, bufferFullMode);
172172

173-
eventTopic.Closed += (sender, __) =>
174-
{
175-
_topics.TryRemove(((DefaultTopic<TMessage>)sender!).Name, out _);
176-
};
173+
eventTopic.Closed += (_, _) => _topics.TryRemove(formattedTopic, out _);
177174

178175
DiagnosticEvents.Created(formattedTopic);
179176

src/HotChocolate/Core/test/Subscriptions.Redis.Tests/RedisIntegrationTests.cs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,27 @@ public async Task Unsubscribe_Should_RemoveChannel()
6363

6464
await result.DisposeAsync();
6565

66+
var activeChannelsAfterUnsubscribe =
67+
WaitForChannelRemoval(cts, activeChannelsAfterSubscribe.Length, GetActiveChannelsAsync);
68+
Assert.True(activeChannelsAfterSubscribe.Length > activeChannelsAfterUnsubscribe);
69+
}
70+
71+
public static int WaitForChannelRemoval(
72+
CancellationTokenSource cts,
73+
int currentlyActiveChannels,
74+
Func<Task<RedisResult[]>> getActiveChannelsAsync)
75+
{
76+
var activeChannelsAfterUnsubscribe = 0;
6677
var channelRemovedEvent = new ManualResetEventSlim(false);
6778

6879
_ = Task.Run(async () =>
6980
{
7081
while (!cts.Token.IsCancellationRequested)
7182
{
72-
var activeChannels = await GetActiveChannelsAsync();
73-
if (activeChannels.Length < activeChannelsAfterSubscribe.Length)
83+
var activeChannels = await getActiveChannelsAsync();
84+
if (activeChannels.Length < currentlyActiveChannels)
7485
{
86+
activeChannelsAfterUnsubscribe = activeChannels.Length;
7587
channelRemovedEvent.Set();
7688
break;
7789
}
@@ -81,6 +93,7 @@ public async Task Unsubscribe_Should_RemoveChannel()
8193
}, cts.Token);
8294

8395
channelRemovedEvent.Wait(cts.Token);
96+
return activeChannelsAfterUnsubscribe;
8497
}
8598

8699
private async Task<RedisResult[]> GetActiveChannelsAsync()

src/HotChocolate/Core/test/Subscriptions.Redis.Tests/RedisTopicPrefixIntegrationTests.cs

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,44 @@ public async Task Subscribe_Should_Create_Channel_With_Prefix()
5555
cancellationToken: cts.Token);
5656

5757
var activeChannels = await GetActiveChannelsAsync();
58+
Assert.NotEmpty(activeChannels);
59+
}
60+
61+
[Fact]
62+
public async Task Unsubscribe_And_Reconnect_Topic()
63+
{
64+
using var cts = new CancellationTokenSource(Timeout);
65+
await using var services = CreateServer<Subscription>();
66+
67+
var result = await services.ExecuteRequestAsync(
68+
"subscription { onMessage }",
69+
cancellationToken: cts.Token);
70+
71+
var activeChannelsAfterSubscribe = await GetActiveChannelsAsync();
72+
Assert.NotEmpty(activeChannelsAfterSubscribe);
73+
74+
await result.DisposeAsync();
75+
76+
var activeChannelsAfterUnsubscribe = RedisIntegrationTests.WaitForChannelRemoval(
77+
cts,
78+
activeChannelsAfterSubscribe.Length,
79+
GetActiveChannelsAsync);
80+
Assert.True(activeChannelsAfterSubscribe.Length > activeChannelsAfterUnsubscribe);
81+
82+
// reconnect
83+
result = await services.ExecuteRequestAsync(
84+
"subscription { onMessage }",
85+
cancellationToken: cts.Token);
86+
activeChannelsAfterSubscribe = await GetActiveChannelsAsync();
87+
Assert.True(activeChannelsAfterSubscribe.Length > activeChannelsAfterUnsubscribe);
5888

59-
Assert.Contains(activeChannels, channel => channel.ToString()!.StartsWith(TopicPrefix));
89+
await result.DisposeAsync();
6090
}
6191

6292
private async Task<RedisResult[]> GetActiveChannelsAsync()
6393
{
64-
return (RedisResult[])(await redisResource.GetConnection().GetDatabase().ExecuteAsync("PUBSUB", "CHANNELS"))!;
94+
var channels = (RedisResult[])(await redisResource.GetConnection().GetDatabase().ExecuteAsync("PUBSUB", "CHANNELS"))!;
95+
return channels.Where(x => x.ToString()!.StartsWith(TopicPrefix)).ToArray();
6596
}
6697

6798
protected override void ConfigurePubSub(IRequestExecutorBuilder graphqlBuilder)

0 commit comments

Comments
 (0)