Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
da451a8
added the retry logic
Apr 20, 2026
1fc3df5
Merge branch 'main' into stevosyan/add-retry-to-complete-calls
Apr 27, 2026
4fc5200
Update src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs
sophiatev Apr 27, 2026
61f8a21
Add tests for ExecuteWithRetryAsync retry logic
Copilot Apr 27, 2026
32cc282
removed redundant logs
Apr 27, 2026
d640ac6
fixed line endings
Apr 27, 2026
b18d970
Apply suggestion from @Copilot
sophiatev Apr 27, 2026
5a38c5f
returned the completion logs
Apr 27, 2026
9a57bf8
Merge branch 'stevosyan/add-retry-to-complete-calls' of https://githu…
Apr 27, 2026
614893e
Potential fix for pull request finding 'Local scope variable shadows …
sophiatev Apr 27, 2026
6826505
Update src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs
sophiatev Apr 27, 2026
28b2839
fixed the line endings
Apr 27, 2026
55535e3
Add max-attempts exhaustion test for ExecuteWithRetryAsync
Copilot Apr 27, 2026
ba6c0c0
Assert status code in TransientGrpcRetry log test
Copilot Apr 27, 2026
e609227
simplied method extraction
Apr 27, 2026
9625812
Add status code assertion to MultipleTransientErrors log test
Copilot Apr 27, 2026
8ec4113
refactored so the retry also uses the shared backoff class
Apr 27, 2026
5d1b8e6
Merge branch 'stevosyan/add-retry-to-complete-calls' of https://githu…
Apr 27, 2026
08922fb
Trying to fix line endings
Apr 27, 2026
aa19300
reverting some unnecessary changes
Apr 27, 2026
3df94d7
missed the log changes
Apr 27, 2026
155b5ed
Potential fix for pull request finding 'Missed ternary opportunity'
sophiatev Apr 27, 2026
05a0958
Update src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs
sophiatev Apr 27, 2026
fc5c1ff
Fix thread-safety: create Random per ExecuteWithRetryAsync call, remo…
Copilot Apr 27, 2026
89957bb
fixed attempt logic
Apr 27, 2026
35051fa
fixing line endings
Apr 27, 2026
6d11687
reverting some more unnecessary changes
Apr 27, 2026
de9c357
fixing the failing tests
Apr 27, 2026
2eda8a9
fixed the log tests
Apr 27, 2026
d16cdc3
fixing the failing tests
Apr 27, 2026
1379dd4
test: add integration-level retry tests to RunBackgroundTaskLoggingTe…
Copilot Apr 27, 2026
cf46ceb
fixed the failing test
Apr 27, 2026
ed926d0
fixed another failing max attempt test
Apr 27, 2026
33700f2
test: add Non_Transient_Abandon_Orchestrator_Error_Is_Not_Retried int…
Copilot Apr 27, 2026
b3e93e3
updated the tests slightly
Apr 27, 2026
1142c65
removed try-catch, updated tests
Apr 27, 2026
c3c9571
remove json change
Apr 27, 2026
f9052dd
fix line endings, add a few more logs
Apr 28, 2026
81801a3
change order of a log and reconnect attempt
Apr 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 152 additions & 61 deletions src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -474,14 +474,17 @@ void RunBackgroundTask(P.WorkItem? workItem, Func<Task> handler, CancellationTok
{
try
{
this.Logger.AbandoningOrchestratorWorkItem(instanceId, workItem?.CompletionToken ?? string.Empty);
await this.client.AbandonTaskOrchestratorWorkItemAsync(
new P.AbandonOrchestrationTaskRequest
{
CompletionToken = workItem?.CompletionToken,
},
cancellationToken: cancellation);
this.Logger.AbandonedOrchestratorWorkItem(instanceId, workItem?.CompletionToken ?? string.Empty);
this.Logger.AbandoningOrchestratorWorkItem(instanceId, workItem.CompletionToken ?? string.Empty);
await this.ExecuteWithRetryAsync(
Comment thread
sophiatev marked this conversation as resolved.
Outdated
async () => await this.client.AbandonTaskOrchestratorWorkItemAsync(
new P.AbandonOrchestrationTaskRequest
{
CompletionToken = workItem.CompletionToken,
},
cancellationToken: cancellation),
nameof(this.client.AbandonTaskOrchestratorWorkItemAsync),
cancellation);
this.Logger.AbandonedOrchestratorWorkItem(instanceId, workItem.CompletionToken ?? string.Empty);
}
catch (Exception abandonException)
{
Expand All @@ -496,18 +499,21 @@ await this.client.AbandonTaskOrchestratorWorkItemAsync(
instanceId,
workItem.ActivityRequest.Name,
workItem.ActivityRequest.TaskId,
workItem?.CompletionToken ?? string.Empty);
await this.client.AbandonTaskActivityWorkItemAsync(
new P.AbandonActivityTaskRequest
{
CompletionToken = workItem?.CompletionToken,
},
cancellationToken: cancellation);
this.Logger.AbandonedActivityWorkItem(
instanceId,
workItem.ActivityRequest.Name,
workItem.ActivityRequest.TaskId,
workItem?.CompletionToken ?? string.Empty);
workItem.CompletionToken ?? string.Empty);
await this.ExecuteWithRetryAsync(
async () => await this.client.AbandonTaskActivityWorkItemAsync(
new P.AbandonActivityTaskRequest
{
CompletionToken = workItem.CompletionToken,
},
cancellationToken: cancellation),
nameof(this.client.AbandonTaskActivityWorkItemAsync),
cancellation);
this.Logger.AbandonedActivityWorkItem(
instanceId,
workItem.ActivityRequest.Name,
workItem.ActivityRequest.TaskId,
workItem.CompletionToken ?? string.Empty);
}
catch (Exception abandonException)
{
Expand All @@ -520,16 +526,19 @@ await this.client.AbandonTaskActivityWorkItemAsync(
{
this.Logger.AbandoningEntityWorkItem(
workItem.EntityRequest.InstanceId,
workItem?.CompletionToken ?? string.Empty);
await this.client.AbandonTaskEntityWorkItemAsync(
new P.AbandonEntityTaskRequest
{
CompletionToken = workItem?.CompletionToken,
},
cancellationToken: cancellation);
this.Logger.AbandonedEntityWorkItem(
workItem.CompletionToken ?? string.Empty);
await this.ExecuteWithRetryAsync(
async () => await this.client.AbandonTaskEntityWorkItemAsync(
new P.AbandonEntityTaskRequest
{
CompletionToken = workItem?.CompletionToken,
Comment thread
sophiatev marked this conversation as resolved.
Outdated
},
cancellationToken: cancellation),
nameof(this.client.AbandonTaskEntityWorkItemAsync),
cancellation);
this.Logger.AbandonedEntityWorkItem(
workItem.EntityRequest.InstanceId,
workItem?.CompletionToken ?? string.Empty);
workItem.CompletionToken ?? string.Empty);
}
catch (Exception abandonException)
{
Expand All @@ -542,16 +551,19 @@ await this.client.AbandonTaskEntityWorkItemAsync(
{
this.Logger.AbandoningEntityWorkItem(
workItem.EntityRequestV2.InstanceId,
workItem?.CompletionToken ?? string.Empty);
await this.client.AbandonTaskEntityWorkItemAsync(
new P.AbandonEntityTaskRequest
{
CompletionToken = workItem?.CompletionToken,
},
cancellationToken: cancellation);
this.Logger.AbandonedEntityWorkItem(
workItem.CompletionToken ?? string.Empty);
await this.ExecuteWithRetryAsync(
async () => await this.client.AbandonTaskEntityWorkItemAsync(
new P.AbandonEntityTaskRequest
{
CompletionToken = workItem.CompletionToken,
},
cancellationToken: cancellation),
nameof(this.client.AbandonTaskEntityWorkItemAsync),
cancellation);
this.Logger.AbandonedEntityWorkItem(
workItem.EntityRequestV2.InstanceId,
workItem?.CompletionToken ?? string.Empty);
workItem.CompletionToken ?? string.Empty);
}
catch (Exception abandonException)
{
Expand Down Expand Up @@ -703,12 +715,15 @@ async Task OnRunOrchestratorAsync(
if (!filterPassed)
{
this.Logger.AbandoningOrchestrationDueToOrchestrationFilter(request.InstanceId, completionToken);
await this.client.AbandonTaskOrchestratorWorkItemAsync(
new P.AbandonOrchestrationTaskRequest
{
CompletionToken = completionToken,
},
cancellationToken: cancellationToken);
await this.ExecuteWithRetryAsync(
async () => await this.client.AbandonTaskOrchestratorWorkItemAsync(
new P.AbandonOrchestrationTaskRequest
{
CompletionToken = completionToken,
},
cancellationToken: cancellationToken),
nameof(this.client.AbandonTaskOrchestratorWorkItemAsync),
cancellationToken);

return;
}
Expand Down Expand Up @@ -804,12 +819,15 @@ await this.client.AbandonTaskOrchestratorWorkItemAsync(
else
{
this.Logger.AbandoningOrchestrationDueToVersioning(request.InstanceId, completionToken);
await this.client.AbandonTaskOrchestratorWorkItemAsync(
new P.AbandonOrchestrationTaskRequest
{
CompletionToken = completionToken,
},
cancellationToken: cancellationToken);
await this.ExecuteWithRetryAsync(
async () => await this.client.AbandonTaskOrchestratorWorkItemAsync(
new P.AbandonOrchestrationTaskRequest
{
CompletionToken = completionToken,
},
cancellationToken: cancellationToken),
nameof(this.client.AbandonTaskOrchestratorWorkItemAsync),
cancellationToken);

return;
}
Expand Down Expand Up @@ -915,12 +933,15 @@ async Task OnRunActivityAsync(P.ActivityRequest request, string completionToken,
if (this.worker.workerOptions.Versioning?.FailureStrategy == DurableTaskWorkerOptions.VersionFailureStrategy.Reject)
{
this.Logger.AbandoningActivityWorkItem(instance.InstanceId, request.Name, request.TaskId, completionToken);
await this.client.AbandonTaskActivityWorkItemAsync(
new P.AbandonActivityTaskRequest
{
CompletionToken = completionToken,
},
cancellationToken: cancellation);
await this.ExecuteWithRetryAsync(
async () => await this.client.AbandonTaskActivityWorkItemAsync(
new P.AbandonActivityTaskRequest
{
CompletionToken = completionToken,
},
cancellationToken: cancellation),
nameof(this.client.AbandonTaskActivityWorkItemAsync),
cancellation);
}

return;
Expand Down Expand Up @@ -954,7 +975,10 @@ await this.client.AbandonTaskActivityWorkItemAsync(
// Stop the trace activity here to avoid including the completion time in the latency calculation
traceActivity?.Stop();

await this.client.CompleteActivityTaskAsync(response, cancellationToken: cancellation);
await this.ExecuteWithRetryAsync(
async () => await this.client.CompleteActivityTaskAsync(response, cancellationToken: cancellation),
nameof(this.client.CompleteActivityTaskAsync),
cancellation);
}

async Task OnRunEntityBatchAsync(
Expand Down Expand Up @@ -1020,7 +1044,10 @@ async Task OnRunEntityBatchAsync(
completionToken,
operationInfos?.Take(batchResult.Results?.Count ?? 0));

await this.client.CompleteEntityTaskAsync(response, cancellationToken: cancellation);
await this.ExecuteWithRetryAsync(
async () => await this.client.CompleteEntityTaskAsync(response, cancellationToken: cancellation),
nameof(this.client.CompleteEntityTaskAsync),
cancellation);
}

/// <summary>
Expand Down Expand Up @@ -1082,7 +1109,10 @@ async Task CompleteOrchestratorTaskWithChunkingAsync(
},
};

await this.client.CompleteOrchestratorTaskAsync(failureResponse, cancellationToken: cancellationToken);
await this.ExecuteWithRetryAsync(
async () => await this.client.CompleteOrchestratorTaskAsync(failureResponse, cancellationToken: cancellationToken),
nameof(this.client.CompleteOrchestratorTaskAsync),
cancellationToken);
return;
}

Expand All @@ -1109,7 +1139,10 @@ static bool TryAddAction(
if (totalSize <= maxChunkBytes)
{
// Response fits in one chunk, send it directly (isPartial defaults to false)
await this.client.CompleteOrchestratorTaskAsync(response, cancellationToken: cancellationToken);
await this.ExecuteWithRetryAsync(
async () => await this.client.CompleteOrchestratorTaskAsync(response, cancellationToken: cancellationToken),
nameof(this.client.CompleteOrchestratorTaskAsync),
cancellationToken);
return;
}

Expand Down Expand Up @@ -1169,7 +1202,65 @@ static bool TryAddAction(
chunkIndex++;

// Send the chunk
await this.client.CompleteOrchestratorTaskAsync(chunkedResponse, cancellationToken: cancellationToken);
await this.ExecuteWithRetryAsync(
async () => await this.client.CompleteOrchestratorTaskAsync(chunkedResponse, cancellationToken: cancellationToken),
nameof(this.client.CompleteOrchestratorTaskAsync),
cancellationToken);
}
}

async Task ExecuteWithRetryAsync(
Func<Task> action,
string operationName,
CancellationToken cancellationToken)
{
const int maxAttempts = 10;
TimeSpan delay = TimeSpan.FromMilliseconds(200);

for (int attempt = 1; ; attempt++)
{
try
{
await action();
return;
}
catch (RpcException ex) when (
(ex.StatusCode == StatusCode.Unavailable ||
ex.StatusCode == StatusCode.Unknown ||
ex.StatusCode == StatusCode.DeadlineExceeded ||
ex.StatusCode == StatusCode.Internal) &&
attempt < maxAttempts)
Comment thread
sophiatev marked this conversation as resolved.
{
// Back off with jitter for transient transport errors
#if NET6_0_OR_GREATER
int jitterMs = Random.Shared.Next(0, (int)(delay.TotalMilliseconds * 0.2));
#else
int jitterMs = new Random().Next(0, (int)(delay.TotalMilliseconds * 0.2));
#endif
Comment thread
sophiatev marked this conversation as resolved.
Outdated
Comment thread
sophiatev marked this conversation as resolved.
Outdated
TimeSpan backoff = delay + TimeSpan.FromMilliseconds(jitterMs);

this.Logger.TransientGrpcRetry(
operationName,
attempt,
maxAttempts,
backoff.TotalMilliseconds,
(int)ex.StatusCode,
ex);

try
{
await Task.Delay(backoff, cancellationToken);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// If shutting down during the retry delay, propagate the cancellation exception
throw;
}

// Exponential increase, capping at 15 seconds
delay = TimeSpan.FromMilliseconds(Math.Min(delay.TotalMilliseconds * 2, 15000));
continue;
}
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/Worker/Grpc/Logs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ static partial class Logs
public static partial void ReceivedHealthPing(this ILogger logger);

[LoggerMessage(EventId = 76, Level = LogLevel.Information, Message = "Work-item stream ended by the backend (graceful close). Will reconnect.")]
public static partial void StreamEndedByPeer(this ILogger logger);
public static partial void StreamEndedByPeer(this ILogger logger);

[LoggerMessage(EventId = 77, Level = LogLevel.Warning, Message = "Transient gRPC error for '{OperationName}'. Attempt {Attempt} of {MaxAttempts}. Retrying in {BackoffMs} ms. StatusCode={StatusCode}")]
public static partial void TransientGrpcRetry(this ILogger logger, string operationName, int attempt, int maxAttempts, double backoffMs, int statusCode, Exception exception);
Comment thread
sophiatev marked this conversation as resolved.
Comment thread
sophiatev marked this conversation as resolved.
}
}
Loading
Loading