-
Notifications
You must be signed in to change notification settings - Fork 55
Preserve late events after continue-as-new #711
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,6 +30,7 @@ | |
|
|
||
| int newGuidCounter; | ||
| object? customStatus; | ||
| bool preserveUnprocessedEventsOnContinueAsNew; | ||
| TaskOrchestrationEntityContext? entityFeature; | ||
|
|
||
| /// <summary> | ||
|
|
@@ -349,24 +350,31 @@ | |
| { | ||
| Check.NotNull(options); | ||
|
|
||
| if (!string.IsNullOrWhiteSpace(options.NewVersion)) | ||
| this.preserveUnprocessedEventsOnContinueAsNew = options.PreserveUnprocessedEvents; | ||
|
|
||
| try | ||
| { | ||
| this.innerContext.ContinueAsNew(options.NewVersion, options.NewInput); | ||
| if (!string.IsNullOrWhiteSpace(options.NewVersion)) | ||
| { | ||
| this.innerContext.ContinueAsNew(options.NewVersion, options.NewInput); | ||
| } | ||
| else | ||
| { | ||
| this.innerContext.ContinueAsNew(options.NewInput); | ||
| } | ||
| } | ||
| else | ||
| catch | ||
| { | ||
| this.innerContext.ContinueAsNew(options.NewInput); | ||
| this.preserveUnprocessedEventsOnContinueAsNew = false; | ||
| throw; | ||
| } | ||
|
|
||
| if (options.PreserveUnprocessedEvents) | ||
| { | ||
| // Send all the buffered external events to ourself. | ||
| OrchestrationInstance instance = new() { InstanceId = this.InstanceId }; | ||
| foreach ((string eventName, string eventPayload) in this.externalEventBuffer.TakeAll()) | ||
| { | ||
| #pragma warning disable CS0618 // Type or member is obsolete -- 'internal' usage. | ||
| this.innerContext.SendEvent(instance, eventName, new RawInput(eventPayload)); | ||
| #pragma warning restore CS0618 // Type or member is obsolete | ||
| this.ForwardRawExternalEvent(eventName, eventPayload); | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -477,17 +485,35 @@ | |
| } | ||
| else | ||
| { | ||
| // The orchestrator isn't waiting for this event (yet?). Save it in case | ||
| // the orchestrator wants it later. | ||
| this.externalEventBuffer.Add(eventName, rawEventPayload); | ||
| if (this.preserveUnprocessedEventsOnContinueAsNew) | ||
| { | ||
| // ContinueAsNew has already been scheduled with event preservation enabled. | ||
| // Forward late-arriving events directly to the next execution instead of buffering | ||
| // them on the current wrapper instance, which is about to be discarded. | ||
| this.ForwardRawExternalEvent(eventName, rawEventPayload); | ||
| } | ||
| else | ||
| { | ||
| // The orchestrator isn't waiting for this event (yet?). Save it in case | ||
| // the orchestrator wants it later. | ||
| this.externalEventBuffer.Add(eventName, rawEventPayload); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| void ForwardRawExternalEvent(string eventName, string rawEventPayload) | ||
| { | ||
| OrchestrationInstance instance = new() { InstanceId = this.InstanceId }; | ||
| #pragma warning disable CS0618 // Type or member is obsolete -- 'internal' usage. | ||
| this.innerContext.SendEvent(instance, eventName, new RawInput(rawEventPayload)); | ||
| #pragma warning restore CS0618 // Type or member is obsolete | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Gets the serialized custom status. | ||
| /// </summary> | ||
| /// <returns>The custom status serialized to a string, or <c>null</c> if there is not custom status.</returns> | ||
| internal string? GetSerializedCustomStatus() | ||
|
Check warning on line 516 in src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs
|
||
| { | ||
| return this.DataConverter.Serialize(this.customStatus); | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.