Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
5f58004
Harden gRPC worker and client against silent disconnects
Copilot Apr 21, 2026
71bca4c
Address CodeQL feedback: drop ReferenceEquals on struct, narrow gener…
Copilot Apr 21, 2026
6533658
Sort new log event additions by EventId in Worker and Client Logs.cs
Copilot Apr 21, 2026
665ba0e
Address Copilot review: clamp backoff cap, cover SilentDisconnectTime…
Copilot Apr 21, 2026
4d1fbbe
Reconnect after graceful stream-end (GOAWAY) and thread-safe state swap
Copilot Apr 21, 2026
e0fc2e8
Fix critical bugs surfaced by code review
Apr 21, 2026
5dfc5b1
Address remaining audit nits
Apr 21, 2026
f71d510
Add M1: regression test for silent-disconnect classification
Apr 21, 2026
cec5f4a
Materialize new channel before swapping cache entry
Apr 21, 2026
ab763db
Address Copilot review feedback on cec5f4a
Apr 21, 2026
955ea63
Address Copilot review feedback on ab763db
Apr 21, 2026
5186ae1
Merge branch 'main' into fix/grpc-resilience-channel-recreate
berndverst Apr 22, 2026
7df10dc
Address PR review follow-up on gRPC resilience
Apr 22, 2026
add1a39
Address PR feedback for gRPC recreation
Apr 22, 2026
4bc4a08
Stabilize WorkItemStreamConsumer timing test
Apr 22, 2026
aba04b7
Clamp worker hello deadline to UTC max
Apr 23, 2026
5594c7b
Simplify worker recreate flow
Apr 23, 2026
a754b55
Merge branch 'main' into fix/grpc-resilience-channel-recreate
berndverst Apr 23, 2026
c0dbb3d
Fix worker recreate ownership
Apr 23, 2026
c08076e
Simplify worker channel cache
Apr 23, 2026
fd9381d
Fix worker recreate disposal timing
Apr 23, 2026
dc243e1
Fix continue-as-new event carryover
Apr 23, 2026
18e23a3
Add worker disconnect coverage tests
Apr 23, 2026
dd9ad77
Fix fatal deferred-dispose filters
Apr 23, 2026
4d30b9b
Fix client recreate dispose race
Apr 23, 2026
caf307b
Ignore local git worktrees
Apr 23, 2026
c85234d
Move wrapper changes to separate PR
Apr 23, 2026
f246e90
Keep worktree ignore local
Apr 23, 2026
d291efd
Address latest PR feedback on reconnect cleanup
Apr 23, 2026
729654a
Merge branch 'main' into fix/grpc-resilience-channel-recreate
berndverst Apr 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 88 additions & 1 deletion src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using Azure.Core;
using Grpc.Net.Client;
using Microsoft.DurableTask.Client.Grpc;
using Microsoft.DurableTask.Client.Grpc.Internal;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;
Expand Down Expand Up @@ -156,7 +158,92 @@ public void Configure(string? name, GrpcDurableTaskClientOptions options)
string cacheKey = $"{optionsName}\u001F{source.EndpointAddress}\u001F{source.TaskHubName}\u001F{source.ResourceId}\u001F{credentialType}\u001F{source.AllowInsecureCredentials}\u001F{retryOptionsKey}";
options.Channel = this.channels.GetOrAdd(
cacheKey,
_ => new Lazy<GrpcChannel>(source.CreateChannel)).Value;
_ => new Lazy<GrpcChannel>(source.CreateChannel, LazyThreadSafetyMode.PublicationOnly)).Value;
options.SetChannelRecreator((oldChannel, ct) => this.RecreateChannelAsync(cacheKey, source, oldChannel, ct));
}

/// <summary>
/// Atomically swaps the cached channel for the given key with a freshly created one and schedules
/// graceful disposal of the old channel after a grace period so any in-flight RPCs from peer
/// clients can drain. Returns the currently cached channel if a peer client has already recreated it.
/// </summary>
Task<GrpcChannel> RecreateChannelAsync(
Comment thread
berndverst marked this conversation as resolved.
Outdated
string cacheKey,
DurableTaskSchedulerClientOptions source,
GrpcChannel oldChannel,
CancellationToken cancellation)
{
cancellation.ThrowIfCancellationRequested();

if (this.disposed == 1)
{
throw new ObjectDisposedException(nameof(ConfigureGrpcChannel));
}

if (!this.channels.TryGetValue(cacheKey, out Lazy<GrpcChannel>? currentLazy))
{
// PublicationOnly avoids permanently caching a transient CreateChannel exception.
Lazy<GrpcChannel> created = new(source.CreateChannel, LazyThreadSafetyMode.PublicationOnly);
if (this.channels.TryAdd(cacheKey, created))
{
return Task.FromResult(created.Value);
}

this.channels.TryGetValue(cacheKey, out currentLazy);
}

if (currentLazy is null)
{
throw new InvalidOperationException("Failed to obtain a cached gRPC channel after recreation attempt.");
}

if (currentLazy.IsValueCreated && !ReferenceEquals(currentLazy.Value, oldChannel))
{
// A peer client already swapped in a new channel; reuse it.
return Task.FromResult(currentLazy.Value);
}

// Materialize the new channel BEFORE swapping the dictionary so a CreateChannel failure
// leaves the existing entry intact. If we swapped a not-yet-materialized Lazy and then
// CreateChannel threw, the dictionary would point to a permanently-failing Lazy and the
// old channel would have already been queued for disposal — an unrecoverable state.
GrpcChannel newChannel = source.CreateChannel();
Lazy<GrpcChannel> newLazy = new(newChannel);
Comment thread
berndverst marked this conversation as resolved.
if (!this.channels.TryUpdate(cacheKey, newLazy, currentLazy))
{
// Lost the race; whoever won has the freshest entry. Dispose the channel we just
// created so it doesn't leak.
this.channels.TryGetValue(cacheKey, out Lazy<GrpcChannel>? winner);
_ = ScheduleDeferredDisposeAsync(newChannel);
return Task.FromResult(winner?.Value ?? newChannel);
}

if (currentLazy.IsValueCreated)
{
_ = ScheduleDeferredDisposeAsync(currentLazy.Value);
}

return Task.FromResult(newChannel);
}

static async Task ScheduleDeferredDisposeAsync(GrpcChannel channel)
{
try
{
await Task.Delay(TimeSpan.FromSeconds(30)).ConfigureAwait(false);
await DisposeChannelAsync(channel).ConfigureAwait(false);
}
catch (Exception ex) when (ex is not OutOfMemoryException
and not StackOverflowException
and not ThreadAbortException)
{
Comment thread
berndverst marked this conversation as resolved.
if (ex is not OperationCanceledException and not ObjectDisposedException)
{
Trace.TraceError(
"Unexpected exception while deferred-disposing gRPC channel in DurableTaskSchedulerClientExtensions.ScheduleDeferredDisposeAsync: {0}",
ex);
}
}
}

/// <inheritdoc/>
Expand Down
Loading
Loading