Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
3264056
fix: improve large payload error handling in PayloadInterceptor
YunchuWang Mar 30, 2026
b9c7fbf
fix: catch permanent payload failures in standalone worker to prevent…
YunchuWang Mar 31, 2026
61238e7
fix: handle all permanent blob storage failures and fix entity comple…
YunchuWang Mar 31, 2026
8e52f33
fix: revert entity failureDetails approach — sidecar converts it to a…
YunchuWang Mar 31, 2026
5b6433f
fix: revert error message change and remove entity catch
YunchuWang Mar 31, 2026
b84df5c
refactor: rename LargePayloadStorageOptions properties for config parity
YunchuWang Mar 31, 2026
67b64e8
fix: add OrchestrationTraceContext and fix indentation in FailedPreco…
YunchuWang Mar 31, 2026
0615e29
fix: use generic wording in error messages without config property names
YunchuWang Mar 31, 2026
b0b8141
fix: update threshold error message wording
YunchuWang Mar 31, 2026
7dcb94b
test: add unit and integration tests for large payload error handling
YunchuWang Mar 31, 2026
95f45f9
fix: catch InvalidOperationException from interceptor in activity/orc…
YunchuWang Mar 31, 2026
bdbb019
refactor: introduce PayloadStorageException, remove dead code, fix Ma…
YunchuWang Mar 31, 2026
3efc192
refactor: remove redundant completionToken parameter from CompleteOrc…
YunchuWang Mar 31, 2026
6b618e8
Merge branch 'main' into users/wangbill/fix-large-payload-stuck
YunchuWang Apr 1, 2026
19ccc90
refactor: move PayloadStorageException to AzureBlobPayloads extension
YunchuWang Apr 1, 2026
e366d75
Merge branch 'main' into users/wangbill/fix-large-payload-stuck
YunchuWang Apr 1, 2026
0b403be
fix: address PR review comments
YunchuWang Apr 1, 2026
a4b06b2
fix: use fully-qualified type name for PayloadStorageException matching
YunchuWang Apr 1, 2026
99cd979
fix: skip ValidateActionsSize when payload externalization is enabled
YunchuWang Apr 1, 2026
7aaf127
test: add Scenario 4 (13MB activity I/O) to sample and increase MaxPa…
YunchuWang Apr 1, 2026
2644b1b
test: add integration test for ValidateActionsSize bypass with LP ena…
YunchuWang Apr 1, 2026
aad7e3f
fix: only set ChunkIndex when multiple chunks needed (match AzureMana…
YunchuWang Apr 1, 2026
eaeb306
chore: remove accidentally committed Azurite data files
YunchuWang Apr 1, 2026
29f3e68
test: add comprehensive 13MB scenarios (5-9) to sample
YunchuWang Apr 1, 2026
7c941a5
chore: remove Azurite data files and add to gitignore
YunchuWang Apr 1, 2026
69f682c
fix: remove redundant completion message from scenario output
YunchuWang Apr 1, 2026
ad64ad0
docs: add comprehensive scenario documentation header to LargePayload…
YunchuWang Apr 1, 2026
fa4639a
fix: resolve StyleCop SA1507 (double blank line) and SA1623 (doc summ…
YunchuWang Apr 1, 2026
eaf5630
style: remove pervasive double/triple blank lines in LargePayloadCons…
YunchuWang Apr 1, 2026
4213b0d
refactor: move permanent failure handling into interceptor per cgillu…
YunchuWang Apr 1, 2026
0901c9e
Update test/Grpc.IntegrationTests/LargePayloadTests.cs
YunchuWang Apr 1, 2026
5f1321b
fix: expand permanent failure detection to include 4xx RequestFailedE…
YunchuWang Apr 1, 2026
15d9f12
Update src/Extensions/AzureBlobPayloads/Interceptors/AzureBlobPayload…
YunchuWang Apr 2, 2026
7d97431
fix: update test comments to match interceptor-based failure handling
YunchuWang Apr 2, 2026
d242c26
Update test/Grpc.IntegrationTests/LargePayloadTests.cs
YunchuWang Apr 2, 2026
134da64
fix: exclude 408/429 from permanent failure detection, fix ThreeLarge…
YunchuWang Apr 2, 2026
f04c61b
fix: use ex.StackTrace instead of ex.ToString() in FailureDetails to …
YunchuWang Apr 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 116 additions & 8 deletions samples/LargePayloadConsoleApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
builder.Services.AddExternalizedPayloadStore(opts =>
{
// Keep threshold small to force externalization for demo purposes
opts.ExternalizeThresholdBytes = 1024; // 1KB
opts.ThresholdBytes = 1024; // 1KB

opts.MaxPayloadBytes = 50 * 1024; // 50KB cap for overflow testing
opts.ConnectionString = builder.Configuration.GetValue<string>("DURABLETASK_STORAGE") ?? "UseDevelopmentStorage=true";
opts.ContainerName = builder.Configuration.GetValue<string>("DURABLETASK_PAYLOAD_CONTAINER") ?? "payloads";
});
Expand Down Expand Up @@ -76,15 +78,15 @@
(ctx, _) => ctx.Entities.CallEntityAsync<int>(
new EntityInstanceId(nameof(EchoLengthEntity), "1"),
operationName: "EchoLength",
input: new string('E', 700 * 1024)));
input: new string('E', 40 * 1024)));
tasks.AddEntity<EchoLengthEntity>(nameof(EchoLengthEntity));

tasks.AddOrchestratorFunc<object?, int>(
"LargeEntityOperationOutput",
async (ctx, _) => (await ctx.Entities.CallEntityAsync<string>(
new EntityInstanceId(nameof(LargeResultEntity), "1"),
operationName: "Produce",
input: 850 * 1024)).Length);
input: 40 * 1024)).Length);
tasks.AddEntity<LargeResultEntity>(nameof(LargeResultEntity));

tasks.AddOrchestratorFunc<object?, object?>(
Expand All @@ -94,10 +96,36 @@
await ctx.Entities.CallEntityAsync(
new EntityInstanceId(nameof(StateEntity), "1"),
operationName: "Set",
input: new string('S', 900 * 1024));
input: new string('S', 40 * 1024));
return null;
});
tasks.AddEntity<StateEntity>(nameof(StateEntity));



// Overflow: activity output > MaxPayloadBytes

tasks.AddOrchestratorFunc<object?, string>("ActivityProducesOversized", async (ctx, _) =>

{

return await ctx.CallActivityAsync<string>("ProduceOversized");

});

tasks.AddActivityFunc<string>("ProduceOversized", (ctx) => Task.FromResult(new string('O', 60 * 1024)));



// Overflow: orchestration output > MaxPayloadBytes

tasks.AddOrchestratorFunc<object?, string>("OrchestrationProducesOversized", (ctx, _) =>

{

return Task.FromResult(new string('P', 60 * 1024));

});
Comment thread
YunchuWang marked this conversation as resolved.
Outdated
});

// Use shared store (no duplication of options)
Expand All @@ -112,7 +140,7 @@ await ctx.Entities.CallEntityAsync(
await using DurableTaskClient client = host.Services.GetRequiredService<DurableTaskClient>();

// Option A: Directly pass an oversized input to orchestration to trigger externalization
string largeInput = new string('B', 1024 * 1024); // 1MB
string largeInput = new string('B', 40 * 1024); // 40KB
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync("LargeInputEcho", largeInput);
Console.WriteLine($"Started orchestration with direct large input. Instance: {instanceId}");

Expand All @@ -136,7 +164,7 @@ await ctx.Entities.CallEntityAsync(
// Run entity samples
Console.WriteLine();
Console.WriteLine("Running LargeEntityOperationInput...");
string largeEntityInput = new string('E', 700 * 1024); // 700KB
string largeEntityInput = new string('E', 40 * 1024); // 40KB
string entityInputInstance = await client.ScheduleNewOrchestrationInstanceAsync("LargeEntityOperationInput");
OrchestrationMetadata entityInputResult = await client.WaitForInstanceCompletionAsync(entityInputInstance, getInputsAndOutputs: true, cts.Token);
int entityInputLength = entityInputResult.ReadOutputAs<int>();
Expand All @@ -145,7 +173,7 @@ await ctx.Entities.CallEntityAsync(

Console.WriteLine();
Console.WriteLine("Running LargeEntityOperationOutput...");
int largeEntityOutputLength = 850 * 1024; // 850KB
int largeEntityOutputLength = 40 * 1024; // 40KB
string entityOutputInstance = await client.ScheduleNewOrchestrationInstanceAsync("LargeEntityOperationOutput");
OrchestrationMetadata entityOutputResult = await client.WaitForInstanceCompletionAsync(entityOutputInstance, getInputsAndOutputs: true, cts.Token);
int entityOutputLength = entityOutputResult.ReadOutputAs<int>();
Expand All @@ -154,7 +182,7 @@ await ctx.Entities.CallEntityAsync(

Console.WriteLine();
Console.WriteLine("Running LargeEntityState and querying state...");
string largeEntityState = new string('S', 900 * 1024); // 900KB
string largeEntityState = new string('S', 40 * 1024); // 40KB
string entityStateInstance = await client.ScheduleNewOrchestrationInstanceAsync("LargeEntityState");
OrchestrationMetadata entityStateOrch = await client.WaitForInstanceCompletionAsync(entityStateInstance, getInputsAndOutputs: true, cts.Token);
Console.WriteLine($"Status: {entityStateOrch.RuntimeStatus}");
Expand All @@ -163,6 +191,86 @@ await ctx.Entities.CallEntityAsync(
Console.WriteLine($"State length: {stateLength}");
Console.WriteLine($"Deserialized state equals original: {state?.State == largeEntityState}");



// ==================== Overflow Scenarios ====================

Console.WriteLine();

Console.WriteLine("=== Overflow Scenarios (MaxPayloadBytes=50KB) ===");



// Scenario 1: Client input > cap -> PayloadStorageException

Console.WriteLine("[Scenario 1] Client oversized input");

try

{

string tooLarge = new string('Z', 60 * 1024);

await client.ScheduleNewOrchestrationInstanceAsync("LargeInputEcho", tooLarge);

Console.WriteLine("ERROR: Expected PayloadStorageException!");

}

catch (PayloadStorageException ex)

{

Console.WriteLine("PASS: " + ex.GetType().Name + ": " + ex.Message);

}

catch (Exception ex)

{

Console.WriteLine("FAIL: " + ex.GetType().Name + ": " + ex.Message);

}



// Scenario 2: Activity output > cap -> orchestration fails

Console.WriteLine("[Scenario 2] Activity oversized output");

string actOvfId = await client.ScheduleNewOrchestrationInstanceAsync("ActivityProducesOversized");

OrchestrationMetadata actOvfResult = await client.WaitForInstanceCompletionAsync(actOvfId, getInputsAndOutputs: true, cts.Token);

Console.WriteLine(" Status: " + actOvfResult.RuntimeStatus);

Console.WriteLine(actOvfResult.RuntimeStatus == OrchestrationRuntimeStatus.Failed ? "PASS" : "FAIL: expected Failed");

if (actOvfResult.FailureDetails != null) Console.WriteLine(" Error: " + actOvfResult.FailureDetails.ErrorMessage);



// Scenario 3: Orchestration output > cap -> fails

Console.WriteLine("[Scenario 3] Orchestration oversized output");

string orchOvfId = await client.ScheduleNewOrchestrationInstanceAsync("OrchestrationProducesOversized");

OrchestrationMetadata orchOvfResult = await client.WaitForInstanceCompletionAsync(orchOvfId, getInputsAndOutputs: true, cts.Token);

Console.WriteLine(" Status: " + orchOvfResult.RuntimeStatus);

Console.WriteLine(orchOvfResult.RuntimeStatus == OrchestrationRuntimeStatus.Failed ? "PASS" : "FAIL: expected Failed");

if (orchOvfResult.FailureDetails != null) Console.WriteLine(" Error: " + orchOvfResult.FailureDetails.ErrorMessage);



Console.WriteLine("=== All scenarios complete ===");



public class EchoLengthEntity : TaskEntity<int>
{
public int EchoLength(string input)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace Microsoft.DurableTask;

/// <summary>
/// Exception thrown when a payload storage operation fails permanently.
/// </summary>
public sealed class PayloadStorageException : InvalidOperationException
{
/// <summary>
/// Initializes a new instance of the <see cref="PayloadStorageException" /> class.
/// </summary>
/// <param name="message">The error message.</param>
public PayloadStorageException(string message)
: base(message)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="PayloadStorageException" /> class.
/// </summary>
/// <param name="message">The error message.</param>
/// <param name="innerException">The inner exception.</param>
public PayloadStorageException(string message, Exception innerException)
: base(message, innerException)
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ Status GetStatus()
return new Status(StatusCode.Unknown, string.Empty);
}


Comment thread
YunchuWang marked this conversation as resolved.
Outdated
Metadata GetTrailers()
{
return startCallTask.Status == TaskStatus.RanToCompletion ? startCallTask.Result.GetTrailers() : [];
Comment thread
YunchuWang marked this conversation as resolved.
Expand Down Expand Up @@ -174,17 +175,17 @@ public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRe
}

int size = Encoding.UTF8.GetByteCount(value);
if (size < this.options.ExternalizeThresholdBytes)
if (size < this.options.ThresholdBytes)
{
return value;
}

// Enforce a hard cap to prevent unbounded payload sizes
if (size > this.options.MaxExternalizedPayloadBytes)
if (size > this.options.MaxPayloadBytes)
{
throw new InvalidOperationException(
$"Payload size {size / 1024} kb exceeds the configured maximum of {this.options.MaxExternalizedPayloadBytes / 1024} kb. " +
"Consider reducing the payload or increase MaxExternalizedPayloadBytes setting.");
throw new PayloadStorageException(
$"Payload size {size / 1024} KB exceeds the configured maximum of {this.options.MaxPayloadBytes / 1024} KB. " +
"Reduce the payload size or increase the max payload size limit.");
Comment thread
YunchuWang marked this conversation as resolved.
Comment thread
YunchuWang marked this conversation as resolved.
}
Comment thread
YunchuWang marked this conversation as resolved.

return await this.payloadStore.UploadAsync(value!, cancellation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace Microsoft.DurableTask;
/// </summary>
public sealed class LargePayloadStorageOptions
{
int externalizeThresholdBytes = 900_000;
int thresholdBytes = 900_000;
Comment thread
YunchuWang marked this conversation as resolved.

/// <summary>
/// Initializes a new instance of the <see cref="LargePayloadStorageOptions"/> class.
Expand Down Expand Up @@ -63,20 +63,20 @@ public LargePayloadStorageOptions(Uri accountUri, TokenCredential credential)
/// Gets or sets the threshold in bytes at which payloads are externalized. Default is 900_000 bytes.
/// Value must not exceed 1 MiB (1,048,576 bytes).
/// </summary>
public int ExternalizeThresholdBytes
public int ThresholdBytes
{
get => this.externalizeThresholdBytes;
get => this.thresholdBytes;
set
{
const int OneMiB = 1 * 1024 * 1024;
if (value > OneMiB)
{
throw new ArgumentOutOfRangeException(
nameof(this.ExternalizeThresholdBytes),
$"ExternalizeThresholdBytes cannot exceed 1 MiB ({OneMiB} bytes).");
nameof(this.ThresholdBytes),
$"Payload storage threshold cannot exceed 1 MiB ({OneMiB} bytes).");
}

this.externalizeThresholdBytes = value;
this.thresholdBytes = value;
}
Comment thread
YunchuWang marked this conversation as resolved.
}
Comment thread
YunchuWang marked this conversation as resolved.

Expand All @@ -85,7 +85,7 @@ public int ExternalizeThresholdBytes
/// Defaults to 10MB. Requests exceeding this limit will fail fast
/// with a clear error to prevent unbounded payload growth and excessive storage/network usage.
/// </summary>
public int MaxExternalizedPayloadBytes { get; set; } = 10 * 1024 * 1024;
public int MaxPayloadBytes { get; set; } = 10 * 1024 * 1024;
Comment thread
YunchuWang marked this conversation as resolved.

/// <summary>
/// Gets or sets the Azure Storage connection string to the customer's storage account.
Expand Down Expand Up @@ -114,5 +114,5 @@ public int ExternalizeThresholdBytes
/// Gets or sets a value indicating whether payloads should be gzip-compressed when stored.
/// Defaults to true for reduced storage and bandwidth.
/// </summary>
public bool CompressPayloads { get; set; } = true;
public bool CompressionEnabled { get; set; } = true;
Comment thread
YunchuWang marked this conversation as resolved.
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public BlobPayloadStore(LargePayloadStorageOptions options)
Mode = RetryMode.Exponential,
MaxRetries = MaxRetryAttempts,
Delay = TimeSpan.FromMilliseconds(BaseDelayMs),
MaxDelay = TimeSpan.FromSeconds(MaxDelayMs),
MaxDelay = TimeSpan.FromMilliseconds(MaxDelayMs),
NetworkTimeout = TimeSpan.FromMinutes(NetworkTimeoutMinutes),
},
};
Expand All @@ -79,7 +79,7 @@ public override async Task<string> UploadAsync(string payLoad, CancellationToken
// Ensure container exists (idempotent)
await this.containerClient.CreateIfNotExistsAsync(PublicAccessType.None, default, default, cancellationToken);

if (this.options.CompressPayloads)
if (this.options.CompressionEnabled)
{
BlobOpenWriteOptions writeOptions = new()
{
Expand Down Expand Up @@ -138,7 +138,7 @@ public override async Task<string> DownloadAsync(string token, CancellationToken
}
catch (RequestFailedException ex) when (ex.Status == (int)HttpStatusCode.NotFound)
{
throw new InvalidOperationException(
throw new PayloadStorageException(
$"The blob '{name}' was not found in container '{container}'. " +
"The payload may have been deleted or the container was never created.",
ex);
Expand Down
Loading
Loading