Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
5f58004
Harden gRPC worker and client against silent disconnects
Copilot Apr 21, 2026
71bca4c
Address CodeQL feedback: drop ReferenceEquals on struct, narrow gener…
Copilot Apr 21, 2026
6533658
Sort new log event additions by EventId in Worker and Client Logs.cs
Copilot Apr 21, 2026
665ba0e
Address Copilot review: clamp backoff cap, cover SilentDisconnectTime…
Copilot Apr 21, 2026
4d1fbbe
Reconnect after graceful stream-end (GOAWAY) and thread-safe state swap
Copilot Apr 21, 2026
e0fc2e8
Fix critical bugs surfaced by code review
Apr 21, 2026
5dfc5b1
Address remaining audit nits
Apr 21, 2026
f71d510
Add M1: regression test for silent-disconnect classification
Apr 21, 2026
cec5f4a
Materialize new channel before swapping cache entry
Apr 21, 2026
ab763db
Address Copilot review feedback on cec5f4a
Apr 21, 2026
955ea63
Address Copilot review feedback on ab763db
Apr 21, 2026
5186ae1
Merge branch 'main' into fix/grpc-resilience-channel-recreate
berndverst Apr 22, 2026
7df10dc
Address PR review follow-up on gRPC resilience
Apr 22, 2026
add1a39
Address PR feedback for gRPC recreation
Apr 22, 2026
4bc4a08
Stabilize WorkItemStreamConsumer timing test
Apr 22, 2026
aba04b7
Clamp worker hello deadline to UTC max
Apr 23, 2026
5594c7b
Simplify worker recreate flow
Apr 23, 2026
a754b55
Merge branch 'main' into fix/grpc-resilience-channel-recreate
berndverst Apr 23, 2026
c0dbb3d
Fix worker recreate ownership
Apr 23, 2026
c08076e
Simplify worker channel cache
Apr 23, 2026
fd9381d
Fix worker recreate disposal timing
Apr 23, 2026
dc243e1
Fix continue-as-new event carryover
Apr 23, 2026
18e23a3
Add worker disconnect coverage tests
Apr 23, 2026
dd9ad77
Fix fatal deferred-dispose filters
Apr 23, 2026
4d30b9b
Fix client recreate dispose race
Apr 23, 2026
caf307b
Ignore local git worktrees
Apr 23, 2026
c85234d
Move wrapper changes to separate PR
Apr 23, 2026
f246e90
Keep worktree ignore local
Apr 23, 2026
d291efd
Address latest PR feedback on reconnect cleanup
Apr 23, 2026
729654a
Merge branch 'main' into fix/grpc-resilience-channel-recreate
berndverst Apr 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Azure.Core;
using Grpc.Net.Client;
using Microsoft.DurableTask.Client.Grpc;
using Microsoft.DurableTask.Client.Grpc.Internal;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;
Expand Down Expand Up @@ -157,6 +158,83 @@ public void Configure(string? name, GrpcDurableTaskClientOptions options)
options.Channel = this.channels.GetOrAdd(
cacheKey,
_ => new Lazy<GrpcChannel>(source.CreateChannel)).Value;
options.SetChannelRecreator((oldChannel, ct) => this.RecreateChannelAsync(cacheKey, source, oldChannel, ct));
}

/// <summary>
/// Atomically swaps the cached channel for the given key with a freshly created one and schedules
/// graceful disposal of the old channel after a grace period so any in-flight RPCs from peer
/// clients can drain. Returns the currently cached channel if a peer client has already recreated it.
/// </summary>
async Task<GrpcChannel> RecreateChannelAsync(
string cacheKey,
DurableTaskSchedulerClientOptions source,
GrpcChannel oldChannel,
CancellationToken cancellation)
{
cancellation.ThrowIfCancellationRequested();

if (this.disposed == 1)
{
throw new ObjectDisposedException(nameof(ConfigureGrpcChannel));
}

if (!this.channels.TryGetValue(cacheKey, out Lazy<GrpcChannel>? currentLazy))
{
Lazy<GrpcChannel> created = new(source.CreateChannel);
if (this.channels.TryAdd(cacheKey, created))
{
return created.Value;
}

this.channels.TryGetValue(cacheKey, out currentLazy);
}

if (currentLazy is null)
{
throw new InvalidOperationException("Failed to obtain a cached gRPC channel after recreation attempt.");
}

if (currentLazy.IsValueCreated && !ReferenceEquals(currentLazy.Value, oldChannel))
{
// A peer client already swapped in a new channel; reuse it.
return currentLazy.Value;
}

Lazy<GrpcChannel> newLazy = new(source.CreateChannel);
if (!this.channels.TryUpdate(cacheKey, newLazy, currentLazy))
{
this.channels.TryGetValue(cacheKey, out Lazy<GrpcChannel>? winner);
return winner?.Value ?? newLazy.Value;
}

if (currentLazy.IsValueCreated)
{
_ = ScheduleDeferredDisposeAsync(currentLazy.Value);
}

await Task.Yield();
return newLazy.Value;
}

static async Task ScheduleDeferredDisposeAsync(GrpcChannel channel)
{
try
{
await Task.Delay(TimeSpan.FromSeconds(30)).ConfigureAwait(false);
await DisposeChannelAsync(channel).ConfigureAwait(false);
}
catch (Exception ex) when (ex is not OutOfMemoryException
and not StackOverflowException
and not ThreadAbortException)
{
Comment thread
berndverst marked this conversation as resolved.
if (ex is not OperationCanceledException and not ObjectDisposedException)
{
Trace.TraceError(
"Unexpected exception while deferred-disposing gRPC channel in DurableTaskSchedulerClientExtensions.ScheduleDeferredDisposeAsync: {0}",
ex);
}
}
}

/// <inheritdoc/>
Expand Down
270 changes: 270 additions & 0 deletions src/Client/Grpc/ChannelRecreatingCallInvoker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Diagnostics;
using Microsoft.Extensions.Logging;

namespace Microsoft.DurableTask.Client.Grpc;

/// <summary>
/// A <see cref="CallInvoker"/> wrapper that observes RPC outcomes and triggers a fire-and-forget channel
/// recreation after a configurable number of consecutive transport failures
/// (<see cref="StatusCode.Unavailable"/>, or <see cref="StatusCode.DeadlineExceeded"/> on RPCs that are
/// not long-poll waits). This guards against half-open HTTP/2 connections that can otherwise wedge
/// an entire client process for the lifetime of the gRPC channel.
/// </summary>
/// <remarks>
/// <para>The wrapper holds an immutable <see cref="TransportState"/> (channel + invoker pair) and swaps
/// the entire pair atomically on recreate to avoid torn state. Streaming RPCs are forwarded without
/// outcome observation; only unary RPC outcomes count toward the failure threshold.</para>
/// <para>The triggering RPC still surfaces its original failure to the caller; subsequent RPCs benefit
/// from the recreated channel.</para>
/// </remarks>
sealed class ChannelRecreatingCallInvoker : CallInvoker, IAsyncDisposable
{
/// <summary>
/// Methods on which a <see cref="StatusCode.DeadlineExceeded"/> response is expected behavior
/// (long-poll-style waits) and must NOT be counted toward the recreate threshold.
/// </summary>
static readonly HashSet<string> DeadlineExceededAllowedMethods = new(StringComparer.Ordinal)
{
"/TaskHubSidecarService/WaitForInstanceCompletion",
Comment thread
sophiatev marked this conversation as resolved.
"/TaskHubSidecarService/WaitForInstanceStart",
};

readonly Func<GrpcChannel, CancellationToken, Task<GrpcChannel>> recreator;
readonly int failureThreshold;
readonly TimeSpan minRecreateInterval;
readonly bool ownsChannel;
readonly ILogger logger;

TransportState state;
int consecutiveFailures;
int recreateInFlight;
long lastRecreateTicks;
Comment thread
berndverst marked this conversation as resolved.
Outdated

public ChannelRecreatingCallInvoker(
GrpcChannel initialChannel,
Func<GrpcChannel, CancellationToken, Task<GrpcChannel>> recreator,
int failureThreshold,
TimeSpan minRecreateInterval,
bool ownsChannel,
ILogger logger)
{
this.recreator = recreator;
this.failureThreshold = failureThreshold;
this.minRecreateInterval = minRecreateInterval;
this.ownsChannel = ownsChannel;
this.logger = logger;
this.state = new TransportState(initialChannel, initialChannel.CreateCallInvoker());

// Seed lastRecreateTicks so cooldown does not block the very first recreate attempt.
this.lastRecreateTicks = Stopwatch.GetTimestamp() - StopwatchTicksFor(minRecreateInterval);
Comment thread
berndverst marked this conversation as resolved.
Outdated
}

public override TResponse BlockingUnaryCall<TRequest, TResponse>(
Method<TRequest, TResponse> method, string? host, CallOptions options, TRequest request)
{
TransportState current = this.state;
try
{
TResponse response = current.Invoker.BlockingUnaryCall(method, host, options, request);
this.RecordSuccess();
return response;
}
catch (RpcException ex)
{
this.RecordFailure(ex.StatusCode, method.FullName);
throw;
}
}

public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
Method<TRequest, TResponse> method, string? host, CallOptions options, TRequest request)
{
TransportState current = this.state;
AsyncUnaryCall<TResponse> call = current.Invoker.AsyncUnaryCall(method, host, options, request);
this.ObserveOutcome(call.ResponseAsync, method.FullName);
return call;
}

public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(
Method<TRequest, TResponse> method, string? host, CallOptions options, TRequest request)
{
// Streaming calls are forwarded without outcome observation. The streaming methods used by the
// DurableTask client are bounded snapshots (e.g. StreamInstanceHistory) where errors surface as
// exceptions to user code, so global failure counting on these would create false positives.
return this.state.Invoker.AsyncServerStreamingCall(method, host, options, request);
}

public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(
Method<TRequest, TResponse> method, string? host, CallOptions options)
{
return this.state.Invoker.AsyncClientStreamingCall(method, host, options);
}

public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(
Method<TRequest, TResponse> method, string? host, CallOptions options)
{
return this.state.Invoker.AsyncDuplexStreamingCall(method, host, options);
}

public async ValueTask DisposeAsync()
{
if (!this.ownsChannel)
{
return;
}

TransportState current = this.state;
try
{
#if NET6_0_OR_GREATER
await current.Channel.ShutdownAsync().ConfigureAwait(false);
current.Channel.Dispose();
#else
await current.Channel.ShutdownAsync().ConfigureAwait(false);
Comment thread
berndverst marked this conversation as resolved.
Outdated
#endif
}
catch (Exception ex) when (ex is OperationCanceledException or ObjectDisposedException)
{
// Best-effort disposal.
}
}

static long StopwatchTicksFor(TimeSpan ts) =>
(long)(ts.TotalSeconds * Stopwatch.Frequency);

void ObserveOutcome<TResponse>(Task<TResponse> responseAsync, string methodFullName)
{
// Use ContinueWith with TaskScheduler.Default so we don't capture sync context.
responseAsync.ContinueWith(
(t, state) =>
{
var (self, name) = ((ChannelRecreatingCallInvoker, string))state!;
if (t.Status == TaskStatus.RanToCompletion)
{
self.RecordSuccess();
}
else if (t.Exception?.InnerException is RpcException rpcEx)
{
self.RecordFailure(rpcEx.StatusCode, name);
}
},
(this, methodFullName),
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
}

void RecordSuccess()
{
Volatile.Write(ref this.consecutiveFailures, 0);
}

void RecordFailure(StatusCode status, string methodFullName)
{
bool counts = status switch
{
StatusCode.Unavailable => true,
StatusCode.DeadlineExceeded => !DeadlineExceededAllowedMethods.Contains(methodFullName),
_ => false,
};

if (!counts)
{
return;
}
Comment thread
berndverst marked this conversation as resolved.

int count = Interlocked.Increment(ref this.consecutiveFailures);
if (this.failureThreshold <= 0 || count < this.failureThreshold)
{
return;
}

this.MaybeTriggerRecreate(count);
}

void MaybeTriggerRecreate(int observedCount)
{
long nowTicks = Stopwatch.GetTimestamp();
long elapsedTicks = nowTicks - Volatile.Read(ref this.lastRecreateTicks);
long cooldownTicks = StopwatchTicksFor(this.minRecreateInterval);
if (elapsedTicks < cooldownTicks)
{
return;
}

// Single-flight guard: only one recreate task in flight at a time.
if (Interlocked.CompareExchange(ref this.recreateInFlight, 1, 0) != 0)
{
return;
}

// Re-check elapsed under the guard to avoid back-to-back recreates that won the CAS race.
Comment thread
sophiatev marked this conversation as resolved.
Outdated
Comment thread
berndverst marked this conversation as resolved.
Outdated
elapsedTicks = Stopwatch.GetTimestamp() - Volatile.Read(ref this.lastRecreateTicks);
if (elapsedTicks < cooldownTicks)
{
Interlocked.Exchange(ref this.recreateInFlight, 0);
return;
}

this.logger.RecreatingChannel(observedCount);
_ = Task.Run(() => this.RecreateAsync(observedCount));
Comment thread
berndverst marked this conversation as resolved.
}

async Task RecreateAsync(int observedCount)
{
try
{
TransportState current = this.state;
using CancellationTokenSource cts = new(TimeSpan.FromSeconds(30));
GrpcChannel newChannel = await this.recreator(current.Channel, cts.Token).ConfigureAwait(false);

if (!ReferenceEquals(newChannel, current.Channel))
{
this.state = new TransportState(newChannel, newChannel.CreateCallInvoker());
Comment thread
berndverst marked this conversation as resolved.
Outdated
this.logger.ChannelRecreated(GetEndpointDescription(newChannel));
}

// Successful recreate (even if a peer beat us to it) → reset the failure counter.
Volatile.Write(ref this.consecutiveFailures, 0);
Volatile.Write(ref this.lastRecreateTicks, Stopwatch.GetTimestamp());
}
catch (Exception ex) when (ex is not OutOfMemoryException
and not StackOverflowException
and not ThreadAbortException)
{
this.logger.ChannelRecreateFailed(ex);

// Update lastRecreateTicks even on failure so the cooldown applies to failed attempts too.
Volatile.Write(ref this.lastRecreateTicks, Stopwatch.GetTimestamp());
}
finally
{
Interlocked.Exchange(ref this.recreateInFlight, 0);
}
}

static string GetEndpointDescription(GrpcChannel channel)
{
#if NET6_0_OR_GREATER
return channel.Target ?? "(unknown)";
Comment thread
berndverst marked this conversation as resolved.
Outdated
#else
return channel.Target ?? "(unknown)";
#endif
}

sealed class TransportState
{
public TransportState(GrpcChannel channel, CallInvoker invoker)
{
this.Channel = channel;
this.Invoker = invoker;
}

public GrpcChannel Channel { get; }

public CallInvoker Invoker { get; }
}
}
Loading
Loading