Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
3264056
fix: improve large payload error handling in PayloadInterceptor
YunchuWang Mar 30, 2026
b9c7fbf
fix: catch permanent payload failures in standalone worker to prevent…
YunchuWang Mar 31, 2026
61238e7
fix: handle all permanent blob storage failures and fix entity comple…
YunchuWang Mar 31, 2026
8e52f33
fix: revert entity failureDetails approach — sidecar converts it to a…
YunchuWang Mar 31, 2026
5b6433f
fix: revert error message change and remove entity catch
YunchuWang Mar 31, 2026
b84df5c
refactor: rename LargePayloadStorageOptions properties for config parity
YunchuWang Mar 31, 2026
67b64e8
fix: add OrchestrationTraceContext and fix indentation in FailedPreco…
YunchuWang Mar 31, 2026
0615e29
fix: use generic wording in error messages without config property names
YunchuWang Mar 31, 2026
b0b8141
fix: update threshold error message wording
YunchuWang Mar 31, 2026
7dcb94b
test: add unit and integration tests for large payload error handling
YunchuWang Mar 31, 2026
95f45f9
fix: catch InvalidOperationException from interceptor in activity/orc…
YunchuWang Mar 31, 2026
bdbb019
refactor: introduce PayloadStorageException, remove dead code, fix Ma…
YunchuWang Mar 31, 2026
3efc192
refactor: remove redundant completionToken parameter from CompleteOrc…
YunchuWang Mar 31, 2026
6b618e8
Merge branch 'main' into users/wangbill/fix-large-payload-stuck
YunchuWang Apr 1, 2026
19ccc90
refactor: move PayloadStorageException to AzureBlobPayloads extension
YunchuWang Apr 1, 2026
e366d75
Merge branch 'main' into users/wangbill/fix-large-payload-stuck
YunchuWang Apr 1, 2026
0b403be
fix: address PR review comments
YunchuWang Apr 1, 2026
a4b06b2
fix: use fully-qualified type name for PayloadStorageException matching
YunchuWang Apr 1, 2026
99cd979
fix: skip ValidateActionsSize when payload externalization is enabled
YunchuWang Apr 1, 2026
7aaf127
test: add Scenario 4 (13MB activity I/O) to sample and increase MaxPa…
YunchuWang Apr 1, 2026
2644b1b
test: add integration test for ValidateActionsSize bypass with LP ena…
YunchuWang Apr 1, 2026
aad7e3f
fix: only set ChunkIndex when multiple chunks needed (match AzureMana…
YunchuWang Apr 1, 2026
eaeb306
chore: remove accidentally committed Azurite data files
YunchuWang Apr 1, 2026
29f3e68
test: add comprehensive 13MB scenarios (5-9) to sample
YunchuWang Apr 1, 2026
7c941a5
chore: remove Azurite data files and add to gitignore
YunchuWang Apr 1, 2026
69f682c
fix: remove redundant completion message from scenario output
YunchuWang Apr 1, 2026
ad64ad0
docs: add comprehensive scenario documentation header to LargePayload…
YunchuWang Apr 1, 2026
fa4639a
fix: resolve StyleCop SA1507 (double blank line) and SA1623 (doc summ…
YunchuWang Apr 1, 2026
eaf5630
style: remove pervasive double/triple blank lines in LargePayloadCons…
YunchuWang Apr 1, 2026
4213b0d
refactor: move permanent failure handling into interceptor per cgillu…
YunchuWang Apr 1, 2026
0901c9e
Update test/Grpc.IntegrationTests/LargePayloadTests.cs
YunchuWang Apr 1, 2026
5f1321b
fix: expand permanent failure detection to include 4xx RequestFailedE…
YunchuWang Apr 1, 2026
15d9f12
Update src/Extensions/AzureBlobPayloads/Interceptors/AzureBlobPayload…
YunchuWang Apr 2, 2026
7d97431
fix: update test comments to match interceptor-based failure handling
YunchuWang Apr 2, 2026
d242c26
Update test/Grpc.IntegrationTests/LargePayloadTests.cs
YunchuWang Apr 2, 2026
134da64
fix: exclude 408/429 from permanent failure detection, fix ThreeLarge…
YunchuWang Apr 2, 2026
f04c61b
fix: use ex.StackTrace instead of ex.ToString() in FailureDetails to …
YunchuWang Apr 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ Status GetStatus()

if (startCallTask.IsFaulted)
{
// Distinguish permanent failures from transient ones so callers
// can avoid retrying errors that will never succeed.
Exception? inner = startCallTask.Exception?.InnerException ?? startCallTask.Exception;
if (IsPermanentFailure(inner))
{
return new Status(StatusCode.FailedPrecondition, inner?.Message ?? "Permanent failure");
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed
}

return new Status(StatusCode.Internal, startCallTask.Exception?.Message ?? "Unknown error");
}

Expand All @@ -80,6 +88,28 @@ Status GetStatus()
return new Status(StatusCode.Unknown, string.Empty);
}

static bool IsPermanentFailure(Exception? ex)
{
// InvalidOperationException: payload too large, blob not found, decompression error, etc.
if (ex is InvalidOperationException or ArgumentException)
{
return true;
}

// Azure Storage errors that are permanent (auth/permission failures).
// Transient errors (429, 500, 503) are already retried by the Azure SDK
// client with exponential backoff before reaching here, so if they still
// fail, they may also be effectively permanent — but we keep them as
// Internal/retryable to give the upper layers a chance.
if (ex is Azure.RequestFailedException rfe &&
rfe.Status is 401 or 403 or 404)
{
return true;
}

return false;
}

Comment thread
YunchuWang marked this conversation as resolved.
Outdated
Metadata GetTrailers()
{
return startCallTask.Status == TaskStatus.RanToCompletion ? startCallTask.Result.GetTrailers() : [];
Comment thread
YunchuWang marked this conversation as resolved.
Expand Down Expand Up @@ -183,8 +213,11 @@ public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRe
if (size > this.options.MaxExternalizedPayloadBytes)
{
throw new InvalidOperationException(
$"Payload size {size / 1024} kb exceeds the configured maximum of {this.options.MaxExternalizedPayloadBytes / 1024} kb. " +
"Consider reducing the payload or increase MaxExternalizedPayloadBytes setting.");
$"Payload size {size / 1024} kb exceeds the configured maximum of " +
$"{this.options.MaxExternalizedPayloadBytes / 1024} kb. Consider reducing the payload size, " +
$"or increase the max payload size limit " +
$"(LargePayloadStorageOptions.MaxExternalizedPayloadBytes for standalone SDK, " +
$"or 'largePayloadStorageMaxPayloadBytes' in host.json extensions.durableTask section for Azure Functions).");
}
Comment thread
YunchuWang marked this conversation as resolved.

return await this.payloadStore.UploadAsync(value!, cancellation);
Expand Down
94 changes: 90 additions & 4 deletions src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,8 @@ await this.client.AbandonTaskOrchestratorWorkItemAsync(
await this.CompleteOrchestratorTaskWithChunkingAsync(
response,
this.worker.grpcOptions.CompleteOrchestrationWorkItemChunkSizeInBytes,
cancellationToken);
cancellationToken,
completionToken);
}

async Task OnRunActivityAsync(P.ActivityRequest request, string completionToken, CancellationToken cancellation)
Expand Down Expand Up @@ -853,7 +854,33 @@ await this.client.AbandonTaskActivityWorkItemAsync(
// Stop the trace activity here to avoid including the completion time in the latency calculation
traceActivity?.Stop();

await this.client.CompleteActivityTaskAsync(response, cancellationToken: cancellation);
try
{
await this.client.CompleteActivityTaskAsync(response, cancellationToken: cancellation);
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.FailedPrecondition)
{
// Permanent failure (e.g., payload too large for externalization).
// Complete the activity with a failure instead of letting the exception propagate
// to RunBackgroundTask, which would abandon and cause an infinite re-delivery loop.
this.Logger.UnexpectedError(ex, instance.InstanceId);

P.ActivityResponse failureResponse = new()
{
InstanceId = instance.InstanceId,
TaskId = request.TaskId,
FailureDetails = new P.TaskFailureDetails
{
ErrorType = typeof(InvalidOperationException).FullName,
ErrorMessage = ex.Status.Detail,
StackTrace = ex.ToString(),
Comment thread
YunchuWang marked this conversation as resolved.
Outdated
IsNonRetriable = true,
},
CompletionToken = completionToken,
};

await this.client.CompleteActivityTaskAsync(failureResponse, cancellationToken: cancellation);
}
}

async Task OnRunEntityBatchAsync(
Expand Down Expand Up @@ -919,7 +946,31 @@ async Task OnRunEntityBatchAsync(
completionToken,
operationInfos?.Take(batchResult.Results?.Count ?? 0));

await this.client.CompleteEntityTaskAsync(response, cancellationToken: cancellation);
try
{
await this.client.CompleteEntityTaskAsync(response, cancellationToken: cancellation);
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.FailedPrecondition)
{
// Permanent failure (e.g., payload too large for externalization).
// Complete with a batch-level failure using failureDetails so the entity
// framework sees the failure instead of an infinite abandon/re-deliver loop.
this.Logger.UnexpectedError(ex, batchRequest.InstanceId ?? string.Empty);

P.EntityBatchResult failureResponse = new()
{
CompletionToken = completionToken ?? response.CompletionToken,
FailureDetails = new P.TaskFailureDetails
{
ErrorType = typeof(InvalidOperationException).FullName,
ErrorMessage = ex.Status.Detail,
StackTrace = ex.ToString(),
IsNonRetriable = true,
},
};

await this.client.CompleteEntityTaskAsync(failureResponse, cancellationToken: cancellation);
}
}

/// <summary>
Expand All @@ -931,7 +982,8 @@ async Task OnRunEntityBatchAsync(
async Task CompleteOrchestratorTaskWithChunkingAsync(
P.OrchestratorResponse response,
int maxChunkBytes,
CancellationToken cancellationToken)
CancellationToken cancellationToken,
string? completionToken = null)
{
// Validate that no single action exceeds the maximum chunk size
static P.TaskFailureDetails? ValidateActionsSize(IEnumerable<P.OrchestratorAction> actions, int maxChunkBytes)
Expand Down Expand Up @@ -1002,6 +1054,8 @@ static bool TryAddAction(
}

// Check if the entire response fits in one chunk
try
{
int totalSize = response.CalculateSize();
if (totalSize <= maxChunkBytes)
{
Expand Down Expand Up @@ -1054,6 +1108,38 @@ static bool TryAddAction(
// Send the chunk
await this.client.CompleteOrchestratorTaskAsync(chunkedResponse, cancellationToken: cancellationToken);
}
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.FailedPrecondition)
{
// Permanent failure (e.g., payload too large for externalization).
// Complete with a failure to prevent infinite re-delivery loop.
this.Logger.UnexpectedError(ex, response.InstanceId);

P.OrchestratorResponse failureResponse = new()
{
InstanceId = response.InstanceId,
CompletionToken = response.CompletionToken ?? completionToken,
Actions =
{
new P.OrchestratorAction
{
CompleteOrchestration = new P.CompleteOrchestrationAction
{
OrchestrationStatus = P.OrchestrationStatus.Failed,
FailureDetails = new P.TaskFailureDetails
{
ErrorType = typeof(InvalidOperationException).FullName,
ErrorMessage = ex.Status.Detail,
StackTrace = ex.ToString(),
Comment thread
YunchuWang marked this conversation as resolved.
Outdated
IsNonRetriable = true,
},
},
},
},
};

await this.client.CompleteOrchestratorTaskAsync(failureResponse, cancellationToken: cancellationToken);
}
}
}
}
Loading