-
Notifications
You must be signed in to change notification settings - Fork 56
Fix GrpcChannel handle leak in AzureManaged backend #625
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
882ea20
387c960
9ef84a9
19f521f
9b67032
f307902
95bc2d0
a585f2c
b6630f2
5ec4cd9
609df5d
d5d1151
9a005e5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -1,7 +1,10 @@ | ||||||
| // Copyright (c) Microsoft Corporation. | ||||||
| // Licensed under the MIT License. | ||||||
|
|
||||||
| using System.Collections.Concurrent; | ||||||
| using System.Linq; | ||||||
| using Azure.Core; | ||||||
| using Grpc.Net.Client; | ||||||
| using Microsoft.DurableTask.Client.Grpc; | ||||||
| using Microsoft.Extensions.DependencyInjection; | ||||||
| using Microsoft.Extensions.DependencyInjection.Extensions; | ||||||
|
|
@@ -99,11 +102,23 @@ static void ConfigureSchedulerOptions( | |||||
| /// <summary> | ||||||
| /// Configuration class that sets up gRPC channels for client options | ||||||
| /// using the provided Durable Task Scheduler options. | ||||||
| /// Channels are cached per configuration key and disposed when the service provider is disposed. | ||||||
| /// </summary> | ||||||
| /// <param name="schedulerOptions">Monitor for accessing the current scheduler options configuration.</param> | ||||||
| class ConfigureGrpcChannel(IOptionsMonitor<DurableTaskSchedulerClientOptions> schedulerOptions) : | ||||||
| IConfigureNamedOptions<GrpcDurableTaskClientOptions> | ||||||
| sealed class ConfigureGrpcChannel : IConfigureNamedOptions<GrpcDurableTaskClientOptions>, IAsyncDisposable | ||||||
| { | ||||||
| readonly IOptionsMonitor<DurableTaskSchedulerClientOptions> schedulerOptions; | ||||||
| readonly ConcurrentDictionary<string, Lazy<GrpcChannel>> channels = new(); | ||||||
| int disposed; | ||||||
|
||||||
| int disposed; | |
| volatile int disposed; |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
disposed is written via Interlocked.Exchange in DisposeAsync, but read here via a plain field read. For cross-thread visibility, the read should use Volatile.Read(ref this.disposed) (or make the field volatile) to ensure Configure reliably observes disposal under concurrency.
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CreateChannel() behavior depends on more than endpoint/task hub (e.g., ResourceId, Credential, AllowInsecureCredentials, RetryOptions). If any of those values change while EndpointAddress/TaskHubName stay the same (e.g., via options reload), the cached channel will be reused with stale settings. Consider including the channel-affecting fields in the cache key or enforcing immutability for them.
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The channel cache key is built by concatenating strings with ':' delimiters, but EndpointAddress commonly contains ':' (e.g., "https://" or ports). This can create ambiguous keys and potential collisions. Prefer a composite key type (e.g., ValueTuple/record struct) rather than a delimited string.
| string cacheKey = $"{optionsName}:{source.EndpointAddress}:{source.TaskHubName}"; | |
| string cacheKey = string.Concat(optionsName, "\u001F", source.EndpointAddress, "\u001F", source.TaskHubName); |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cache key only includes the credential type (FullName), but CreateChannel() captures and uses the TokenCredential instance (e.g., for AccessTokenCache and auth headers). If the credential instance changes while keeping the same type (e.g., two ManagedIdentityCredential instances with different client IDs, or config reload), this will incorrectly reuse a channel created with the old credential. Consider keying on the credential instance identity (or a stable credential-specific identifier) rather than only its type, and also normalize EndpointAddress the same way CreateChannel() does (scheme/no-scheme) to avoid duplicate channels for the same effective endpoint.
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DisposeAsync throws an AggregateException when any channel shutdown/dispose fails. Throwing from ServiceProvider disposal can surface as app shutdown failures and is difficult for callers to handle. Consider making this best-effort (swallow/log disposal errors) instead of throwing.
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -1,7 +1,10 @@ | ||||||
| // Copyright (c) Microsoft Corporation. | ||||||
| // Licensed under the MIT License. | ||||||
|
|
||||||
| using System.Collections.Concurrent; | ||||||
| using System.Linq; | ||||||
| using Azure.Core; | ||||||
| using Grpc.Net.Client; | ||||||
| using Microsoft.DurableTask.Worker.Grpc; | ||||||
| using Microsoft.DurableTask.Worker.Grpc.Internal; | ||||||
| using Microsoft.Extensions.DependencyInjection; | ||||||
|
|
@@ -101,11 +104,23 @@ static void ConfigureSchedulerOptions( | |||||
| /// <summary> | ||||||
| /// Configuration class that sets up gRPC channels for worker options | ||||||
| /// using the provided Durable Task Scheduler options. | ||||||
| /// Channels are cached per configuration key and disposed when the service provider is disposed. | ||||||
| /// </summary> | ||||||
| /// <param name="schedulerOptions">Monitor for accessing the current scheduler options configuration.</param> | ||||||
| class ConfigureGrpcChannel(IOptionsMonitor<DurableTaskSchedulerWorkerOptions> schedulerOptions) : | ||||||
| IConfigureNamedOptions<GrpcDurableTaskWorkerOptions> | ||||||
| sealed class ConfigureGrpcChannel : IConfigureNamedOptions<GrpcDurableTaskWorkerOptions>, IAsyncDisposable | ||||||
| { | ||||||
| readonly IOptionsMonitor<DurableTaskSchedulerWorkerOptions> schedulerOptions; | ||||||
| readonly ConcurrentDictionary<string, Lazy<GrpcChannel>> channels = new(); | ||||||
| int disposed; | ||||||
|
Comment on lines
+109
to
+113
|
||||||
| int disposed; | |
| volatile int disposed; |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
disposed is written via Interlocked.Exchange in DisposeAsync, but read here via a plain field read. Use Volatile.Read(ref this.disposed) (or make the field volatile) so concurrent calls to Configure reliably observe disposal.
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CreateChannel() behavior depends on more than endpoint/task hub (e.g., ResourceId, Credential, AllowInsecureCredentials, and WorkerId via the call credentials interceptor). If any of these values change while EndpointAddress/TaskHubName stay the same (e.g., via options reload), the cached channel will be reused with stale settings. Consider including these fields in the cache key or enforcing immutability for them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@copilot open a new pull request to apply changes based on this feedback
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cache key uses source.EndpointAddress, but DurableTaskSchedulerWorkerOptions.CreateChannel() normalizes endpoints by adding a scheme when missing. As a result, equivalent endpoints (with vs without https://) will not share a cache entry and will create multiple channels. Consider normalizing the endpoint string for the cache key the same way CreateChannel() does (or use the channel target/normalized endpoint).
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cache key only includes the credential type (FullName), but CreateChannel() uses the TokenCredential instance (token cache + auth headers). If the credential instance changes while keeping the same type, this will incorrectly reuse a channel created with the old credential. Consider keying on the credential instance identity (or a stable credential-specific identifier) rather than only its type, and normalize EndpointAddress the same way CreateChannel() does to avoid duplicate channels for equivalent endpoints.
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DisposeAsync throws an AggregateException when any channel shutdown/dispose fails. Throwing from ServiceProvider disposal can surface as app shutdown failures and is difficult for callers to handle. Consider making this best-effort (swallow/log disposal errors) instead of throwing.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ | |
| using Azure.Identity; | ||
| using FluentAssertions; | ||
| using Grpc.Core; | ||
| using Grpc.Net.Client; | ||
| using Microsoft.DurableTask.Client.Grpc; | ||
| using Microsoft.Extensions.DependencyInjection; | ||
| using Microsoft.Extensions.Options; | ||
|
|
@@ -280,4 +281,120 @@ public void UseDurableTaskScheduler_WithConnectionStringAndRetryOptions_ShouldCo | |
| clientOptions.RetryOptions.RetryableStatusCodes.Should().Contain(StatusCode.Unknown); | ||
| } | ||
| } | ||
|
|
||
| [Fact] | ||
| public void UseDurableTaskScheduler_SameConfiguration_ReusesSameChannel() | ||
| { | ||
| // Arrange | ||
| ServiceCollection services = new ServiceCollection(); | ||
| Mock<IDurableTaskClientBuilder> mockBuilder = new Mock<IDurableTaskClientBuilder>(); | ||
| mockBuilder.Setup(b => b.Services).Returns(services); | ||
| DefaultAzureCredential credential = new DefaultAzureCredential(); | ||
|
|
||
| // Act | ||
| mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); | ||
| using ServiceProvider provider = services.BuildServiceProvider(); | ||
|
|
||
| // Resolve options multiple times to trigger channel configuration | ||
| IOptionsFactory<GrpcDurableTaskClientOptions> optionsFactory = provider.GetRequiredService<IOptionsFactory<GrpcDurableTaskClientOptions>>(); | ||
| GrpcDurableTaskClientOptions options1 = optionsFactory.Create(Options.DefaultName); | ||
| GrpcDurableTaskClientOptions options2 = optionsFactory.Create(Options.DefaultName); | ||
|
|
||
| // Assert | ||
| options1.Channel.Should().NotBeNull(); | ||
| options2.Channel.Should().NotBeNull(); | ||
| options1.Channel.Should().BeSameAs(options2.Channel, "same configuration should reuse the same channel"); | ||
| } | ||
|
|
||
| [Fact] | ||
| public void UseDurableTaskScheduler_DifferentNamedOptions_UsesSeparateChannels() | ||
| { | ||
| // Arrange | ||
| ServiceCollection services = new ServiceCollection(); | ||
| Mock<IDurableTaskClientBuilder> mockBuilder1 = new Mock<IDurableTaskClientBuilder>(); | ||
| Mock<IDurableTaskClientBuilder> mockBuilder2 = new Mock<IDurableTaskClientBuilder>(); | ||
| mockBuilder1.Setup(b => b.Services).Returns(services); | ||
| mockBuilder1.Setup(b => b.Name).Returns("client1"); | ||
| mockBuilder2.Setup(b => b.Services).Returns(services); | ||
| mockBuilder2.Setup(b => b.Name).Returns("client2"); | ||
| DefaultAzureCredential credential = new DefaultAzureCredential(); | ||
|
|
||
| // Act - configure two different named clients with different endpoints | ||
| mockBuilder1.Object.UseDurableTaskScheduler("endpoint1.westus3.durabletask.io", ValidTaskHub, credential); | ||
| mockBuilder2.Object.UseDurableTaskScheduler("endpoint2.westus3.durabletask.io", ValidTaskHub, credential); | ||
| ServiceProvider provider = services.BuildServiceProvider(); | ||
|
|
||
|
Comment on lines
+322
to
+326
|
||
| // Resolve options for both named clients | ||
| IOptionsMonitor<GrpcDurableTaskClientOptions> optionsMonitor = provider.GetRequiredService<IOptionsMonitor<GrpcDurableTaskClientOptions>>(); | ||
| GrpcDurableTaskClientOptions options1 = optionsMonitor.Get("client1"); | ||
| GrpcDurableTaskClientOptions options2 = optionsMonitor.Get("client2"); | ||
|
|
||
| // Assert | ||
| options1.Channel.Should().NotBeNull(); | ||
| options2.Channel.Should().NotBeNull(); | ||
| options1.Channel.Should().NotBeSameAs(options2.Channel, "different configurations should use different channels"); | ||
| } | ||
|
|
||
| [Fact] | ||
| public async Task UseDurableTaskScheduler_ServiceProviderDispose_DisposesChannels() | ||
| { | ||
| // Arrange | ||
| ServiceCollection services = new ServiceCollection(); | ||
| Mock<IDurableTaskClientBuilder> mockBuilder = new Mock<IDurableTaskClientBuilder>(); | ||
| mockBuilder.Setup(b => b.Services).Returns(services); | ||
| DefaultAzureCredential credential = new DefaultAzureCredential(); | ||
|
|
||
| // Act | ||
| mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); | ||
| ServiceProvider provider = services.BuildServiceProvider(); | ||
|
|
||
| // Resolve options to trigger channel creation | ||
| IOptionsMonitor<GrpcDurableTaskClientOptions> optionsMonitor = provider.GetRequiredService<IOptionsMonitor<GrpcDurableTaskClientOptions>>(); | ||
| GrpcDurableTaskClientOptions options = optionsMonitor.Get(Options.DefaultName); | ||
| options.Channel.Should().NotBeNull(); | ||
| GrpcChannel channel = options.Channel!; | ||
|
|
||
| // Dispose the service provider - this should dispose the ConfigureGrpcChannel which disposes channels | ||
| await provider.DisposeAsync(); | ||
|
|
||
| // Assert - verify the channel was disposed by checking it throws ObjectDisposedException | ||
| Action action = () => channel.CreateCallInvoker(); | ||
| action.Should().Throw<ObjectDisposedException>("channel should be disposed after provider disposal"); | ||
|
|
||
| // Also verify that creating a new provider and getting options still works | ||
| ServiceCollection services2 = new ServiceCollection(); | ||
| Mock<IDurableTaskClientBuilder> mockBuilder2 = new Mock<IDurableTaskClientBuilder>(); | ||
| mockBuilder2.Setup(b => b.Services).Returns(services2); | ||
| mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); | ||
| await using ServiceProvider provider2 = services2.BuildServiceProvider(); | ||
|
|
||
| IOptionsMonitor<GrpcDurableTaskClientOptions> newOptionsMonitor = provider2.GetRequiredService<IOptionsMonitor<GrpcDurableTaskClientOptions>>(); | ||
| GrpcDurableTaskClientOptions newOptions = newOptionsMonitor.Get(Options.DefaultName); | ||
| newOptions.Channel.Should().NotBeNull(); | ||
| newOptions.Channel.Should().NotBeSameAs(channel, "new provider should create a new channel"); | ||
| } | ||
|
|
||
| [Fact] | ||
| public async Task UseDurableTaskScheduler_ConfigureAfterDispose_ThrowsObjectDisposedException() | ||
| { | ||
| // Arrange | ||
| ServiceCollection services = new ServiceCollection(); | ||
| Mock<IDurableTaskClientBuilder> mockBuilder = new Mock<IDurableTaskClientBuilder>(); | ||
| mockBuilder.Setup(b => b.Services).Returns(services); | ||
| DefaultAzureCredential credential = new DefaultAzureCredential(); | ||
|
|
||
| // Act | ||
| mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); | ||
| ServiceProvider provider = services.BuildServiceProvider(); | ||
|
|
||
| // Resolve options monitor before disposal | ||
| IOptionsMonitor<GrpcDurableTaskClientOptions> optionsMonitor = provider.GetRequiredService<IOptionsMonitor<GrpcDurableTaskClientOptions>>(); | ||
|
|
||
| // Dispose the service provider | ||
| await provider.DisposeAsync(); | ||
|
|
||
| // Assert - attempting to get options after disposal should throw | ||
| Action action = () => optionsMonitor.Get(Options.DefaultName); | ||
| action.Should().Throw<ObjectDisposedException>("configuring options after disposal should throw"); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR description mentions IDisposable and a volatile disposed flag, but this implementation is IAsyncDisposable-only and the disposed field isn’t volatile (and is read without Volatile.Read). Either update the PR description or adjust the implementation to match (e.g., implement IDisposable and use volatile/Volatile.Read for the disposed check).