Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/Grpc/orchestrator_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ message CreateSubOrchestrationAction {
google.protobuf.StringValue version = 3;
google.protobuf.StringValue input = 4;
TraceContext parentTraceContext = 5;
map<string, string> tags = 6;
}

message CreateTimerAction {
Expand Down Expand Up @@ -356,6 +357,13 @@ message OrchestratorResponse {

// Whether or not a history is required to complete the original OrchestratorRequest and none was provided.
bool requiresHistory = 7;

// True if this is a partial (chunked) completion. The backend must keep the work item open until the final chunk (isPartial=false).
bool isPartial = 8;

// Zero-based position of the current chunk within a chunked completion sequence.
// This field is omitted for non-chunked completions.
google.protobuf.Int32Value chunkIndex = 9;
}

message CreateInstanceRequest {
Expand Down
4 changes: 2 additions & 2 deletions src/Grpc/versions.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# The following files were downloaded from branch main at 2025-11-14 16:36:47 UTC
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/9f762f1301b91e3e7c736b9c5a29c2e09f2a850e/protos/orchestrator_service.proto
# The following files were downloaded from branch main at 2025-12-12 01:49:06 UTC
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/b03c06dea21952dcf2a86551fd761b6b78c64d15/protos/orchestrator_service.proto
2 changes: 1 addition & 1 deletion src/InProcessTestHost/Sidecar/Grpc/ProtobufUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public static OrchestratorAction ToOrchestratorAction(Proto.OrchestratorAction a
ParentTraceContext = a.CreateSubOrchestration.ParentTraceContext is not null
? new DistributedTraceContext(a.CreateSubOrchestration.ParentTraceContext.TraceParent, a.CreateSubOrchestration.ParentTraceContext.TraceState)
: null,
Tags = null, // TODO
Tags = a.CreateSubOrchestration.Tags,
Version = a.CreateSubOrchestration.Version,
};
case Proto.OrchestratorAction.OrchestratorActionTypeOneofCase.CreateTimer:
Expand Down
11 changes: 10 additions & 1 deletion src/Shared/Grpc/ProtoUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,16 @@ internal static P.OrchestratorResponse ConstructOrchestratorResponse(
Name = subOrchestrationAction.Name,
Version = subOrchestrationAction.Version,
ParentTraceContext = CreateTraceContext(),
};
};

if (subOrchestrationAction.Tags != null)
{
foreach (KeyValuePair<string, string> tag in subOrchestrationAction.Tags)
{
protoAction.CreateSubOrchestration.Tags[tag.Key] = tag.Value;
}
}

break;
case OrchestratorActionType.CreateTimer:
var createTimerAction = (CreateTimerOrchestratorAction)action;
Expand Down
6 changes: 4 additions & 2 deletions src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ public override async Task<TResult> CallSubOrchestratorAsync<TResult>(
orchestratorName.Name,
version,
instanceId,
input),
input,
options?.Tags),
orchestratorName.Name,
handler,
default);
Expand All @@ -246,7 +247,8 @@ public override async Task<TResult> CallSubOrchestratorAsync<TResult>(
orchestratorName.Name,
version,
instanceId,
input);
input,
options?.Tags);
}
}
catch (global::DurableTask.Core.Exceptions.SubOrchestrationFailedException e)
Expand Down
48 changes: 47 additions & 1 deletion test/Grpc.IntegrationTests/OrchestrationPatterns.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public async Task EmptyOrchestration()
}

[Fact]
public async Task ScheduleOrchesrationWithTags()
public async Task ScheduleOrchestrationWithTags()
{
TaskName orchestratorName = nameof(EmptyOrchestration);
await using HostTestLifetime server = await this.StartWorkerAsync(b =>
Expand Down Expand Up @@ -67,6 +67,52 @@ public async Task ScheduleOrchesrationWithTags()
Assert.Equal("value2", metadata.Tags["tag2"]);
}

[Fact]
public async Task ScheduleSubOrchestrationWithTags()
{
TaskName orchestratorName = nameof(ScheduleSubOrchestrationWithTags);

// Schedule a new orchestration instance with tags
SubOrchestrationOptions subOrchestrationOptions = new()
{
InstanceId = "instance_id",
Tags = new Dictionary<string, string>
{
{ "tag1", "value1" },
{ "tag2", "value2" }
}
};

await using HostTestLifetime server = await this.StartWorkerAsync(b =>
{
b.AddTasks(tasks => tasks.AddOrchestratorFunc<int, int>(orchestratorName, async (ctx, input) =>
{
int result = 1;
if (input < 2)
{
// recursively call this same orchestrator
result += await ctx.CallSubOrchestratorAsync<int>(orchestratorName, input: input + 1, subOrchestrationOptions);
}

return result;
}));
});


await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input: 1);

OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(
subOrchestrationOptions.InstanceId, this.TimeoutToken);

Assert.NotNull(metadata);
Assert.Equal(subOrchestrationOptions.InstanceId, metadata.InstanceId);
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
Assert.NotNull(metadata.Tags);
Assert.Equal(2, metadata.Tags.Count);
Assert.Equal("value1", metadata.Tags["tag1"]);
Assert.Equal("value2", metadata.Tags["tag2"]);
}

[Fact]
public async Task SingleTimer()
{
Expand Down
Loading