Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License.

using Azure.Core;
using Grpc.Net.Client;
using Microsoft.DurableTask.Client.Grpc;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
Expand Down Expand Up @@ -91,6 +92,10 @@ static void ConfigureSchedulerOptions(
options.EnableEntitySupport = true;
});

// Register the channel cache as a singleton to ensure channels are reused
// and properly disposed when the service provider is disposed.
builder.Services.TryAddSingleton<GrpcChannelCache>();

builder.Services.TryAddEnumerable(
ServiceDescriptor.Singleton<IConfigureOptions<GrpcDurableTaskClientOptions>, ConfigureGrpcChannel>());
builder.UseGrpc(_ => { });
Expand All @@ -101,7 +106,10 @@ static void ConfigureSchedulerOptions(
/// using the provided Durable Task Scheduler options.
/// </summary>
/// <param name="schedulerOptions">Monitor for accessing the current scheduler options configuration.</param>
class ConfigureGrpcChannel(IOptionsMonitor<DurableTaskSchedulerClientOptions> schedulerOptions) :
/// <param name="channelCache">Cache for gRPC channels to ensure reuse and proper disposal.</param>
class ConfigureGrpcChannel(
IOptionsMonitor<DurableTaskSchedulerClientOptions> schedulerOptions,
GrpcChannelCache channelCache) :
IConfigureNamedOptions<GrpcDurableTaskClientOptions>
{
/// <summary>
Expand All @@ -117,8 +125,14 @@ class ConfigureGrpcChannel(IOptionsMonitor<DurableTaskSchedulerClientOptions> sc
/// <param name="options">The options instance to configure.</param>
public void Configure(string? name, GrpcDurableTaskClientOptions options)
{
DurableTaskSchedulerClientOptions source = schedulerOptions.Get(name ?? Options.DefaultName);
options.Channel = source.CreateChannel();
string optionsName = name ?? Options.DefaultName;
DurableTaskSchedulerClientOptions source = schedulerOptions.Get(optionsName);

// Create a cache key based on the options name, endpoint, and task hub.
// This ensures channels are reused for the same configuration
Comment on lines +144 to +148
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CreateChannel() behavior depends on more than endpoint/task hub (e.g., ResourceId, Credential, AllowInsecureCredentials, RetryOptions). If any of those values change while EndpointAddress/TaskHubName stay the same (e.g., via options reload), the cached channel will be reused with stale settings. Consider including the channel-affecting fields in the cache key or enforcing immutability for them.

Copilot uses AI. Check for mistakes.
// but separate channels are created for different configurations.
string cacheKey = $"client:{optionsName}:{source.EndpointAddress}:{source.TaskHubName}";
options.Channel = channelCache.GetOrCreate(cacheKey, () => source.CreateChannel());
}
}
}
173 changes: 173 additions & 0 deletions src/Shared/AzureManaged/GrpcChannelCache.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Collections.Concurrent;
using Grpc.Net.Client;

namespace Microsoft.DurableTask;

/// <summary>
/// Thread-safe cache for gRPC channels that ensures channels are reused across retries/calls
/// and properly disposed when replaced or evicted.
/// </summary>
sealed class GrpcChannelCache : IDisposable
{
readonly ConcurrentDictionary<string, GrpcChannel> channels = new();
readonly object syncLock = new();
volatile bool disposed;

/// <summary>
/// Gets or creates a cached gRPC channel for the specified key.
/// If a channel already exists for the key, it is returned.
/// If the factory creates a new channel, any existing channel for the key is disposed.
/// </summary>
/// <param name="key">The cache key (typically endpoint + taskhub combination).</param>
/// <param name="channelFactory">Factory function to create a new channel if needed.</param>
/// <returns>The cached or newly created gRPC channel.</returns>
public GrpcChannel GetOrCreate(string key, Func<GrpcChannel> channelFactory)
{
Check.NotNullOrEmpty(key);
Check.NotNull(channelFactory);

if (this.disposed)
{
throw new ObjectDisposedException(nameof(GrpcChannelCache));
}

// Fast path: return existing channel
if (this.channels.TryGetValue(key, out GrpcChannel? existingChannel))
{
return existingChannel;
}

// Slow path: create new channel with synchronization to avoid creating multiple channels
lock (this.syncLock)
{
if (this.disposed)
{
throw new ObjectDisposedException(nameof(GrpcChannelCache));
}

// Double-check after acquiring lock
if (this.channels.TryGetValue(key, out existingChannel))
{
return existingChannel;
}

GrpcChannel newChannel = channelFactory();
this.channels[key] = newChannel;
return newChannel;
}
}

/// <summary>
/// Replaces an existing channel for the specified key with a new one,
/// disposing the old channel if it exists.
/// </summary>
/// <param name="key">The cache key.</param>
/// <param name="newChannel">The new channel to cache.</param>
public void Replace(string key, GrpcChannel newChannel)
{
Check.NotNullOrEmpty(key);
Check.NotNull(newChannel);

if (this.disposed)
{
throw new ObjectDisposedException(nameof(GrpcChannelCache));
}

GrpcChannel? oldChannel = null;

lock (this.syncLock)
{
if (this.disposed)
{
throw new ObjectDisposedException(nameof(GrpcChannelCache));
}

if (this.channels.TryGetValue(key, out oldChannel))
{
// Only replace if it's actually a different channel
if (ReferenceEquals(oldChannel, newChannel))
{
return;
}
}

this.channels[key] = newChannel;
}

// Dispose the old channel outside the lock to avoid potential deadlocks
DisposeChannelAsync(oldChannel);
}

/// <summary>
/// Removes and disposes a channel for the specified key.
/// </summary>
/// <param name="key">The cache key.</param>
/// <returns>True if a channel was removed; otherwise, false.</returns>
public bool TryRemove(string key)
{
Check.NotNullOrEmpty(key);

if (this.channels.TryRemove(key, out GrpcChannel? channel))
{
DisposeChannelAsync(channel);
return true;
}

return false;
}

/// <inheritdoc/>
public void Dispose()
{
if (this.disposed)
{
return;
}

lock (this.syncLock)
{
if (this.disposed)
{
return;
}

this.disposed = true;

foreach (KeyValuePair<string, GrpcChannel> kvp in this.channels)
{
DisposeChannelAsync(kvp.Value);
}

this.channels.Clear();
}
}

static void DisposeChannelAsync(GrpcChannel? channel)
{
if (channel == null)
{
return;
}

// ShutdownAsync is the graceful way to close a gRPC channel
// We fire-and-forget but ensure the channel is eventually disposed
_ = Task.Run(async () =>
{
try
{
await channel.ShutdownAsync();
}
catch
{
// Ignore shutdown errors during disposal
}
finally
{
channel.Dispose();
}
});
}
}
20 changes: 17 additions & 3 deletions src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License.

using Azure.Core;
using Grpc.Net.Client;
using Microsoft.DurableTask.Worker.Grpc;
using Microsoft.DurableTask.Worker.Grpc.Internal;
using Microsoft.Extensions.DependencyInjection;
Expand Down Expand Up @@ -93,6 +94,10 @@ static void ConfigureSchedulerOptions(
options.EnableEntitySupport = true;
});

// Register the channel cache as a singleton to ensure channels are reused
// and properly disposed when the service provider is disposed.
builder.Services.TryAddSingleton<GrpcChannelCache>();

builder.Services.TryAddEnumerable(
ServiceDescriptor.Singleton<IConfigureOptions<GrpcDurableTaskWorkerOptions>, ConfigureGrpcChannel>());
builder.UseGrpc(_ => { });
Expand All @@ -103,7 +108,10 @@ static void ConfigureSchedulerOptions(
/// using the provided Durable Task Scheduler options.
/// </summary>
/// <param name="schedulerOptions">Monitor for accessing the current scheduler options configuration.</param>
class ConfigureGrpcChannel(IOptionsMonitor<DurableTaskSchedulerWorkerOptions> schedulerOptions) :
/// <param name="channelCache">Cache for gRPC channels to ensure reuse and proper disposal.</param>
class ConfigureGrpcChannel(
IOptionsMonitor<DurableTaskSchedulerWorkerOptions> schedulerOptions,
GrpcChannelCache channelCache) :
IConfigureNamedOptions<GrpcDurableTaskWorkerOptions>
{
/// <summary>
Expand All @@ -119,8 +127,14 @@ class ConfigureGrpcChannel(IOptionsMonitor<DurableTaskSchedulerWorkerOptions> sc
/// <param name="options">The options instance to configure.</param>
public void Configure(string? name, GrpcDurableTaskWorkerOptions options)
{
DurableTaskSchedulerWorkerOptions source = schedulerOptions.Get(name ?? Options.DefaultName);
options.Channel = source.CreateChannel();
string optionsName = name ?? Options.DefaultName;
DurableTaskSchedulerWorkerOptions source = schedulerOptions.Get(optionsName);

// Create a cache key based on the options name, endpoint, and task hub.
// This ensures channels are reused for the same configuration
Comment on lines +146 to +150
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CreateChannel() behavior depends on more than endpoint/task hub (e.g., ResourceId, Credential, AllowInsecureCredentials, and WorkerId via the call credentials interceptor). If any of these values change while EndpointAddress/TaskHubName stay the same (e.g., via options reload), the cached channel will be reused with stale settings. Consider including these fields in the cache key or enforcing immutability for them.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot open a new pull request to apply changes based on this feedback

// but separate channels are created for different configurations.
string cacheKey = $"worker:{optionsName}:{source.EndpointAddress}:{source.TaskHubName}";
options.Channel = channelCache.GetOrCreate(cacheKey, () => source.CreateChannel());
options.ConfigureForAzureManaged();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,4 +280,57 @@ public void UseDurableTaskScheduler_WithConnectionStringAndRetryOptions_ShouldCo
clientOptions.RetryOptions.RetryableStatusCodes.Should().Contain(StatusCode.Unknown);
}
}

[Fact]
public void UseDurableTaskScheduler_SameConfiguration_ReusesSameChannel()
{
// Arrange
ServiceCollection services = new ServiceCollection();
Mock<IDurableTaskClientBuilder> mockBuilder = new Mock<IDurableTaskClientBuilder>();
mockBuilder.Setup(b => b.Services).Returns(services);
DefaultAzureCredential credential = new DefaultAzureCredential();

// Act
mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential);
ServiceProvider provider = services.BuildServiceProvider();

// Resolve options multiple times to trigger channel configuration
IOptionsMonitor<GrpcDurableTaskClientOptions> optionsMonitor = provider.GetRequiredService<IOptionsMonitor<GrpcDurableTaskClientOptions>>();
GrpcDurableTaskClientOptions options1 = optionsMonitor.Get(Options.DefaultName);
GrpcDurableTaskClientOptions options2 = optionsMonitor.Get(Options.DefaultName);

// Assert
options1.Channel.Should().NotBeNull();
options2.Channel.Should().NotBeNull();
options1.Channel.Should().BeSameAs(options2.Channel, "same configuration should reuse the same channel");
}

[Fact]
public void UseDurableTaskScheduler_DifferentNamedOptions_UsesSeparateChannels()
{
// Arrange
ServiceCollection services = new ServiceCollection();
Mock<IDurableTaskClientBuilder> mockBuilder1 = new Mock<IDurableTaskClientBuilder>();
Mock<IDurableTaskClientBuilder> mockBuilder2 = new Mock<IDurableTaskClientBuilder>();
mockBuilder1.Setup(b => b.Services).Returns(services);
mockBuilder1.Setup(b => b.Name).Returns("client1");
mockBuilder2.Setup(b => b.Services).Returns(services);
mockBuilder2.Setup(b => b.Name).Returns("client2");
DefaultAzureCredential credential = new DefaultAzureCredential();

// Act - configure two different named clients with different endpoints
mockBuilder1.Object.UseDurableTaskScheduler("endpoint1.westus3.durabletask.io", ValidTaskHub, credential);
mockBuilder2.Object.UseDurableTaskScheduler("endpoint2.westus3.durabletask.io", ValidTaskHub, credential);
ServiceProvider provider = services.BuildServiceProvider();

Comment on lines +322 to +326
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test uses different endpoints for different named options, so it will pass even if the cache key accidentally ignores the options name. To validate name isolation in the cache key, use the same endpoint/task hub for both names and assert the channels differ; also dispose the ServiceProvider to avoid leaking channels.

Copilot uses AI. Check for mistakes.
// Resolve options for both named clients
IOptionsMonitor<GrpcDurableTaskClientOptions> optionsMonitor = provider.GetRequiredService<IOptionsMonitor<GrpcDurableTaskClientOptions>>();
GrpcDurableTaskClientOptions options1 = optionsMonitor.Get("client1");
GrpcDurableTaskClientOptions options2 = optionsMonitor.Get("client2");

// Assert
options1.Channel.Should().NotBeNull();
options2.Channel.Should().NotBeNull();
options1.Channel.Should().NotBeSameAs(options2.Channel, "different configurations should use different channels");
}
}
Loading
Loading