-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Closed as not planned
Description
Version: 2.6.111 (Latest Stable)
I've been having an issue with UnsubscribeAsync(Channel, Handler) where some object are being held by ConnectionMultiplexer.Subscribtion even after Unsubscribe has been called.
ConnectionMultiplexer.GetCounters().Subscription.Subscriptions also never returns to Zero after all my objects are unsubscribed.
This issue only seems to occur when there is a lot of concurrency.
I was able to reproduce it consistently with this small example:
using System.Collections.Concurrent;
using StackExchange.Redis;
public static class Program
{
public static async Task Main(string[] args)
{
ConnectionMultiplexer pubsub = await ConnectionMultiplexer.ConnectAsync("localhost:6379");
{
const int ELEMENTS = 1000;
//--------------------
// Create Subscribers
List<Subscriber> subscribers = new List<Subscriber>(ELEMENTS);
for (int i = 0; i < ELEMENTS; i++)
subscribers.Add(new Subscriber(pubsub.GetSubscriber()));
//--------------------
// Initialize Subscribers
List<Func<Task>> initializes = new List<Func<Task>>(ELEMENTS);
for (int i = 0; i < ELEMENTS; i++)
{
int index = i;
initializes.Add(async () => { await subscribers[index].Initialize(); });
}
await Task.WhenAll(initializes.AsParallel().Select(async task => await task()));
//--------------------
// Publish Data
await pubsub.GetSubscriber().PublishAsync(Subscriber.CHANNEL, "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789");
await Task.Delay(1000);
//--------------------
// Shutdown Subscribers
List<Func<Task>> shutdowns = new List<Func<Task>>(ELEMENTS);
for (int i = 0; i < ELEMENTS; i++)
{
int index = i;
shutdowns.Add(async () => { await subscribers[index].Shutdown(); });
}
await Task.WhenAll(shutdowns.AsParallel().Select(async task => await task()));
}
//--------------------
// Random Delay + GC.Collect
await Task.Delay(1000);
GC.Collect();
await Task.Delay(1000);
//--------------------
// Issue => Subscription leaked Subs != Refs
Console.WriteLine($"Subs: {pubsub.GetCounters().Subscription.Subscriptions}");
Console.WriteLine($"Refs: {Subscriber.REF_COUNT}");
//If you were to take a heap snapshot here, you'd see a random number of Subscribers being held by ConnectionMultiplexer.Subscription
}
public class Subscriber
{
public const string CHANNEL = "CHANNEL:TEST";
public static int REF_COUNT = 0;
public Subscriber(ISubscriber sub)
{
_sub = sub;
}
public async Task Initialize()
{
await _sub.SubscribeAsync(CHANNEL, OnMessage);
Interlocked.Increment(ref REF_COUNT);
}
public async Task Shutdown()
{
await _sub.UnsubscribeAsync(CHANNEL, OnMessage);
Interlocked.Decrement(ref REF_COUNT);
}
private void OnMessage(RedisChannel channel, RedisValue value)
{
_pending.Enqueue(value);
}
private ISubscriber _sub;
private ConcurrentQueue<string> _pending = new ConcurrentQueue<string>();
}
}
Here's an example of it running with Watch + Heap Snapshot:
Here's another Heap Snapshot of objects being held by the Subscription after they Unsubscribed and GC was manually called:
Metadata
Metadata
Assignees
Labels
No labels

