Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public void Configure(ISiloBuilder builder, string name, IConfigurationSection c
{
// Get a connection multiplexer instance by name.
var multiplexer = services.GetRequiredKeyedService<IConnectionMultiplexer>(serviceKey);
options.CreateMultiplexer = _ => Task.FromResult(multiplexer);
options.CreateMultiplexer = _ => Task.FromResult((Multiplexer: multiplexer, IsShared: true));
options.ConfigurationOptions = new ConfigurationOptions();
}
else
Expand Down Expand Up @@ -64,7 +64,7 @@ public void Configure(IClientBuilder builder, string name, IConfigurationSection
{
// Get a connection multiplexer instance by name.
var multiplexer = services.GetRequiredKeyedService<IConnectionMultiplexer>(serviceKey);
options.CreateMultiplexer = _ => Task.FromResult(multiplexer);
options.CreateMultiplexer = _ => Task.FromResult((Multiplexer: multiplexer, IsShared: true));
options.ConfigurationOptions = new ConfigurationOptions();
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ public class RedisClusteringOptions
public ConfigurationOptions ConfigurationOptions { get; set; }

/// <summary>
/// The delegate used to create a Redis connection multiplexer.
/// The delegate used to create a Redis connection multiplexer and indicate whether it is shared.
/// </summary>
public Func<RedisClusteringOptions, Task<IConnectionMultiplexer>> CreateMultiplexer { get; set; } = DefaultCreateMultiplexer;
/// <remarks>
/// When <c>IsShared</c> is <see langword="true"/>, the provider will not dispose the returned multiplexer.
/// </remarks>
public Func<RedisClusteringOptions, Task<(IConnectionMultiplexer Multiplexer, bool IsShared)>> CreateMultiplexer { get; set; } = DefaultCreateMultiplexer;

/// <summary>
/// The delegate used to create redis key for RedisMembershipTable.
Expand All @@ -39,9 +42,9 @@ public class RedisClusteringOptions
/// <summary>
/// The default multiplexer creation delegate.
/// </summary>
public static async Task<IConnectionMultiplexer> DefaultCreateMultiplexer(RedisClusteringOptions options)
public static async Task<(IConnectionMultiplexer Multiplexer, bool IsShared)> DefaultCreateMultiplexer(RedisClusteringOptions options)
{
return await ConnectionMultiplexer.ConnectAsync(options.ConfigurationOptions);
return (await ConnectionMultiplexer.ConnectAsync(options.ConfigurationOptions), false);
}

/// <summary>
Expand Down
40 changes: 37 additions & 3 deletions src/Redis/Orleans.Clustering.Redis/Storage/RedisMembershipTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

namespace Orleans.Clustering.Redis
{
internal class RedisMembershipTable : IMembershipTable, IDisposable
internal class RedisMembershipTable : IMembershipTable, IDisposable, IAsyncDisposable
{
private const string TableVersionKey = "Version";
private static readonly TableVersion DefaultTableVersion = new TableVersion(0, "0");
Expand All @@ -21,6 +21,7 @@ internal class RedisMembershipTable : IMembershipTable, IDisposable
private readonly RedisKey _clusterKey;
private IConnectionMultiplexer _muxer = null!;
private IDatabase _db = null!;
private bool _muxerIsShared;

public RedisMembershipTable(IOptions<RedisClusteringOptions> redisOptions, IOptions<ClusterOptions> clusterOptions)
{
Expand All @@ -39,7 +40,7 @@ public async Task DeleteMembershipTableEntries(string clusterId)

public async Task InitializeMembershipTable(bool tryInitTableVersion)
{
_muxer = await _redisOptions.CreateMultiplexer(_redisOptions);
(_muxer, _muxerIsShared) = await _redisOptions.CreateMultiplexer(_redisOptions);
_db = _muxer.GetDatabase();

if (tryInitTableVersion)
Expand Down Expand Up @@ -215,7 +216,40 @@ public async Task CleanupDefunctSiloEntries(DateTimeOffset beforeDate)

public void Dispose()
{
_muxer?.Dispose();
var muxer = _muxer;
if (muxer is null)
{
return;
}

var muxerIsShared = _muxerIsShared;
_muxer = null!;
_db = null!;
_muxerIsShared = false;

if (!muxerIsShared)
{
muxer.Dispose();
}
}

public async ValueTask DisposeAsync()
{
var muxer = _muxer;
if (muxer is null)
{
return;
}

var muxerIsShared = _muxerIsShared;
_muxer = null!;
_db = null!;
_muxerIsShared = false;

if (!muxerIsShared)
{
await muxer.DisposeAsync().ConfigureAwait(false);
}
}

private enum UpsertResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public void Configure(ISiloBuilder builder, string name, IConfigurationSection c
{
// Get a connection multiplexer instance by name.
var multiplexer = services.GetRequiredKeyedService<IConnectionMultiplexer>(serviceKey);
options.CreateMultiplexer = _ => Task.FromResult(multiplexer);
options.CreateMultiplexer = _ => Task.FromResult((Multiplexer: multiplexer, IsShared: true));
options.ConfigurationOptions = new ConfigurationOptions();
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ public class RedisGrainDirectoryOptions
public ConfigurationOptions ConfigurationOptions { get; set; }

/// <summary>
/// The delegate used to create a Redis connection multiplexer.
/// The delegate used to create a Redis connection multiplexer and indicate whether it is shared.
/// </summary>
public Func<RedisGrainDirectoryOptions, Task<IConnectionMultiplexer>> CreateMultiplexer { get; set; } = DefaultCreateMultiplexer;
/// <remarks>
/// When <c>IsShared</c> is <see langword="true"/>, the provider will not dispose the returned multiplexer.
/// </remarks>
public Func<RedisGrainDirectoryOptions, Task<(IConnectionMultiplexer Multiplexer, bool IsShared)>> CreateMultiplexer { get; set; } = DefaultCreateMultiplexer;

/// <summary>
/// Entry expiry, null by default. A value should be set ONLY for ephemeral environments (like in tests).
Expand All @@ -32,7 +35,8 @@ public class RedisGrainDirectoryOptions
/// <summary>
/// The default multiplexer creation delegate.
/// </summary>
public static async Task<IConnectionMultiplexer> DefaultCreateMultiplexer(RedisGrainDirectoryOptions options) => await ConnectionMultiplexer.ConnectAsync(options.ConfigurationOptions);
public static async Task<(IConnectionMultiplexer Multiplexer, bool IsShared)> DefaultCreateMultiplexer(RedisGrainDirectoryOptions options)
=> (Multiplexer: await ConnectionMultiplexer.ConnectAsync(options.ConfigurationOptions), IsShared: false);
}

internal class RedactRedisConfigurationOptions : RedactAttribute
Expand Down
56 changes: 46 additions & 10 deletions src/Redis/Orleans.GrainDirectory.Redis/RedisGrainDirectory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

namespace Orleans.GrainDirectory.Redis
{
public partial class RedisGrainDirectory : IGrainDirectory, ILifecycleParticipant<ISiloLifecycle>
public partial class RedisGrainDirectory : IGrainDirectory, ILifecycleParticipant<ISiloLifecycle>, IDisposable, IAsyncDisposable
{
private readonly RedisGrainDirectoryOptions _directoryOptions;
private readonly ClusterOptions _clusterOptions;
Expand All @@ -25,6 +25,7 @@ public partial class RedisGrainDirectory : IGrainDirectory, ILifecycleParticipan
// Both are initialized in the Initialize method.
private IConnectionMultiplexer _redis = null!;
private IDatabase _database = null!;
private bool _redisIsShared;

private bool _disposed;

Expand Down Expand Up @@ -172,12 +173,12 @@ public Task UnregisterSilos(List<SiloAddress> siloAddresses)

public void Participate(ISiloLifecycle lifecycle)
{
lifecycle.Subscribe(nameof(RedisGrainDirectory), ServiceLifecycleStage.RuntimeInitialize, Initialize, Uninitialize);
lifecycle.Subscribe(nameof(RedisGrainDirectory), ServiceLifecycleStage.RuntimeInitialize, Initialize);
}

public async Task Initialize(CancellationToken ct = default)
{
_redis = await _directoryOptions.CreateMultiplexer(_directoryOptions);
(_redis, _redisIsShared) = await _directoryOptions.CreateMultiplexer(_directoryOptions);

// Configure logging
_redis.ConnectionRestored += LogConnectionRestored;
Expand All @@ -188,16 +189,51 @@ public async Task Initialize(CancellationToken ct = default)
_database = _redis.GetDatabase();
}

private async Task Uninitialize(CancellationToken arg)
public void Dispose()
{
if (_redis != null && _redis.IsConnected)
var redis = _redis;
if (redis is null)
{
Comment on lines +192 to 196

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 0d58300: _disposed is now set at the very start of Dispose(), before the early
eturn when _redis is null. So disposing before (or without) Initialize leaves the instance in a disposed state and later Lookup calls return null instead of throwing a NullReferenceException.

_disposed = true;
return;
}

var redisIsShared = _redisIsShared;
redis.ConnectionRestored -= LogConnectionRestored;
redis.ConnectionFailed -= LogConnectionFailed;
redis.ErrorMessage -= LogErrorMessage;
redis.InternalError -= LogInternalError;
_disposed = true;
_redis = null!;
_database = null!;
_redisIsShared = false;
Comment thread
ReubenBond marked this conversation as resolved.

if (!redisIsShared)
{
redis.Dispose();
}
}

await _redis.CloseAsync();
_redis.Dispose();
_redis = null!;
_database = null!;
public async ValueTask DisposeAsync()
{
var redis = _redis;
if (redis is null)
{
return;
}

var redisIsShared = _redisIsShared;
redis.ConnectionRestored -= LogConnectionRestored;
redis.ConnectionFailed -= LogConnectionFailed;
redis.ErrorMessage -= LogErrorMessage;
redis.InternalError -= LogInternalError;
_disposed = true;
_redis = null!;
_database = null!;
_redisIsShared = false;
Comment thread
ReubenBond marked this conversation as resolved.

if (!redisIsShared)
{
await redis.DisposeAsync().ConfigureAwait(false);
}
}
Comment on lines +216 to 238

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 0d58300: same change applied to DisposeAsync() — _disposed is set before the early return so the object behaves consistently as disposed even when initialization never completed.


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void Configure(ISiloBuilder builder, string name, IConfigurationSection c
{
// Get a connection multiplexer instance by name.
var multiplexer = services.GetRequiredKeyedService<IConnectionMultiplexer>(serviceKey);
options.CreateMultiplexer = _ => Task.FromResult(multiplexer);
options.CreateMultiplexer = _ => Task.FromResult((Multiplexer: multiplexer, IsShared: true));
options.ConfigurationOptions = new ConfigurationOptions();
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ public class RedisStorageOptions : IStorageProviderSerializerOptions
public ConfigurationOptions? ConfigurationOptions { get; set; }

/// <summary>
/// The delegate used to create a Redis connection multiplexer.
/// The delegate used to create a Redis connection multiplexer and indicate whether it is shared.
/// </summary>
public Func<RedisStorageOptions, Task<IConnectionMultiplexer>> CreateMultiplexer { get; set; } = DefaultCreateMultiplexer;
/// <remarks>
/// When <c>IsShared</c> is <see langword="true"/>, the provider will not dispose the returned multiplexer.
/// </remarks>
public Func<RedisStorageOptions, Task<(IConnectionMultiplexer Multiplexer, bool IsShared)>> CreateMultiplexer { get; set; } = DefaultCreateMultiplexer;

/// <summary>
/// Entry expiry, null by default. A value should be set only for ephemeral environments, such as testing environments.
Expand All @@ -52,7 +55,8 @@ public class RedisStorageOptions : IStorageProviderSerializerOptions
/// <summary>
/// The default multiplexer creation delegate.
/// </summary>
public static async Task<IConnectionMultiplexer> DefaultCreateMultiplexer(RedisStorageOptions options) => await ConnectionMultiplexer.ConnectAsync(options.ConfigurationOptions!);
public static async Task<(IConnectionMultiplexer Multiplexer, bool IsShared)> DefaultCreateMultiplexer(RedisStorageOptions options)
=> (Multiplexer: await ConnectionMultiplexer.ConnectAsync(options.ConfigurationOptions!), IsShared: false);
}

/// <summary>
Expand Down
45 changes: 38 additions & 7 deletions src/Redis/Orleans.Persistence.Redis/Storage/RedisGrainStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace Orleans.Persistence
/// <summary>
/// Redis-based grain storage provider
/// </summary>
public partial class RedisGrainStorage : IGrainStorage, ILifecycleParticipant<ISiloLifecycle>
public partial class RedisGrainStorage : IGrainStorage, ILifecycleParticipant<ISiloLifecycle>, IDisposable, IAsyncDisposable
{
private readonly string _serviceId;
private readonly RedisValue _ttl;
Expand All @@ -34,6 +34,7 @@ public partial class RedisGrainStorage : IGrainStorage, ILifecycleParticipant<IS
private readonly Func<string, GrainId, RedisKey> _getKeyFunc;
private IConnectionMultiplexer _connection;
private IDatabase _db;
private bool _connectionIsShared;

/// <summary>
/// Creates a new instance of the <see cref="RedisGrainStorage"/> type.
Expand Down Expand Up @@ -61,7 +62,7 @@ public RedisGrainStorage(
public void Participate(ISiloLifecycle lifecycle)
{
var name = OptionFormattingUtilities.Name<RedisGrainStorage>(_name);
lifecycle.Subscribe(name, _options.InitStage, Init, Close);
lifecycle.Subscribe(name, _options.InitStage, Init);
}

private async Task Init(CancellationToken cancellationToken)
Expand All @@ -72,7 +73,7 @@ private async Task Init(CancellationToken cancellationToken)
{
LogDebugInitializing(_name, _serviceId, _options.DeleteStateOnClear);

_connection = await _options.CreateMultiplexer(_options).ConfigureAwait(false);
(_connection, _connectionIsShared) = await _options.CreateMultiplexer(_options).ConfigureAwait(false);
_db = _connection.GetDatabase();

var elapsed = Stopwatch.GetElapsedTime(startTime);
Expand Down Expand Up @@ -252,12 +253,42 @@ public async Task ClearStateAsync<T>(string grainType, GrainId grainId, IGrainSt
}
}

private async Task Close(CancellationToken cancellationToken)
public void Dispose()
{
if (_connection is null) return;
var connection = _connection;
if (connection is null)
{
return;
}

var connectionIsShared = _connectionIsShared;
_connection = null;
_db = null;
_connectionIsShared = false;

if (!connectionIsShared)
{
connection.Dispose();
}
}

await _connection.CloseAsync().ConfigureAwait(false);
_connection.Dispose();
public async ValueTask DisposeAsync()
{
var connection = _connection;
if (connection is null)
{
return;
}

var connectionIsShared = _connectionIsShared;
_connection = null;
_db = null;
_connectionIsShared = false;

if (!connectionIsShared)
{
await connection.DisposeAsync().ConfigureAwait(false);
}
}

private T CreateInstance<T>() => _activatorProvider.GetActivator<T>().Create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public void Configure(ISiloBuilder builder, string name, IConfigurationSection c
{
// Get a connection multiplexer instance by name.
var multiplexer = services.GetRequiredKeyedService<IConnectionMultiplexer>(serviceKey);
options.CreateMultiplexer = _ => Task.FromResult(multiplexer);
options.CreateMultiplexer = _ => Task.FromResult((Multiplexer: multiplexer, IsShared: true));
options.ConfigurationOptions = new ConfigurationOptions();
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ public class RedisReminderTableOptions
public ConfigurationOptions ConfigurationOptions { get; set; }

/// <summary>
/// The delegate used to create a Redis connection multiplexer.
/// The delegate used to create a Redis connection multiplexer and indicate whether it is shared.
/// </summary>
public Func<RedisReminderTableOptions, Task<IConnectionMultiplexer>> CreateMultiplexer { get; set; } = DefaultCreateMultiplexer;
/// <remarks>
/// When <c>IsShared</c> is <see langword="true"/>, the provider will not dispose the returned multiplexer.
/// </remarks>
public Func<RedisReminderTableOptions, Task<(IConnectionMultiplexer Multiplexer, bool IsShared)>> CreateMultiplexer { get; set; } = DefaultCreateMultiplexer;

/// <summary>
/// Entry expiry, null by default. A value should be set ONLY for ephemeral environments (like in tests).
Expand All @@ -33,7 +36,8 @@ public class RedisReminderTableOptions
/// <summary>
/// The default multiplexer creation delegate.
/// </summary>
public static async Task<IConnectionMultiplexer> DefaultCreateMultiplexer(RedisReminderTableOptions options) => await ConnectionMultiplexer.ConnectAsync(options.ConfigurationOptions);
public static async Task<(IConnectionMultiplexer Multiplexer, bool IsShared)> DefaultCreateMultiplexer(RedisReminderTableOptions options)
=> (Multiplexer: await ConnectionMultiplexer.ConnectAsync(options.ConfigurationOptions), IsShared: false);
}

internal class RedactRedisConfigurationOptions : RedactAttribute
Expand Down
Loading
Loading