-
Notifications
You must be signed in to change notification settings - Fork 39
Adds Workflow State Retention #95
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
89d1b86
9b0f015
845f133
e087d8d
c75f03d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,220 @@ | ||||||
| # Workflow: History State Retention | ||||||
|
|
||||||
| * Author(s): @joshvanl | ||||||
|
|
||||||
| ## Overview | ||||||
|
|
||||||
| This proposal details new functionality to the workflow runtime to give users the ability to delete old workflow state from the actor state store after some configured time. | ||||||
| An app scoped configuration policy can be given to set a TTL for all workflows in that app. | ||||||
| All workflow instances may be configured with a unique TTL at workflow scheduling time. | ||||||
| The default remains that workflow state will _not_ be deleted from the actor state store, and will remain there indefinitely. | ||||||
|
|
||||||
| ## Background | ||||||
|
|
||||||
| It is currently the case that in order for users to delete old workflow state from the actor state store database, they either need to use the Purge Workflow API, or delete state from the database directly, either via out of Dapr database operations, or via using some kind of first class TTL feature of that database. | ||||||
| Users typically want to delete old workflow state after some period of time from when the workflow has reached a terminal state. | ||||||
|
|
||||||
| https://github.com/dapr/dapr/issues/9020 | ||||||
|
|
||||||
| ## Design | ||||||
|
|
||||||
| A configuration spec will be added to the runtime workflow spec to set a retention policy for all workflow created from that app. | ||||||
| When scheduling a workflow, users will be able to configure some duration which upon elapsing after the workflow has reached a terminal state, the workflow will be purged from the actor state store. | ||||||
| The duration will only start once the workflow has reached either a TERMINATED, COMPLETED, or FAILED state. | ||||||
|
|
||||||
| If a retention policy is set at both the app level and the workflow level, the workflow level setting will take precedence. | ||||||
|
|
||||||
| Any duration may be given, i.e. days, weeks, or years. | ||||||
| A duration of `0` may also be given, if the workflow actor state is wished to be deleted immediately after reaching a terminal state. | ||||||
|
|
||||||
| ### Usage | ||||||
|
|
||||||
| #### Configuration | ||||||
|
|
||||||
| ```go | ||||||
| type WorkflowSpec struct { | ||||||
| // StateRetentionPolicy defines the retention configuration for workflow | ||||||
| // state once a workflow reaches a terminal state. If not set, workflow | ||||||
| // instances will not be automatically purged. | ||||||
| StateRetentionPolicy *WorkflowStateRetentionPolicy `json:"stateRetentionPolicy,omitempty" yaml:"stateRetentionPolicy,omitempty"` | ||||||
| } | ||||||
|
|
||||||
| // WorkflowStateRetentionPolicy defines the retention policy of workflow state | ||||||
| // for workflow instances once they reaches a specific or any terminal state. | ||||||
| // If not set, workflow instances will not be automatically purged. If a | ||||||
| // specific and any terminal state are both set, the specific terminal state | ||||||
| // takes precedence. Accepts duration strings, e.g. "72h" or "30m", including | ||||||
| // immediate values "0s". | ||||||
| type WorkflowStateRetentionPolicy struct { | ||||||
| // AnyTerminal is the TTL for purging workflow instances that reach any | ||||||
| // terminal state. | ||||||
| AnyTerminal *time.Duration `json:"anyTerminal,omitempty" yaml:"anyTerminal,omitempty"` | ||||||
|
|
||||||
| // Completed is the TTL for purging workflow instances that reach the | ||||||
| // Completed terminal state. | ||||||
| Completed *time.Duration `json:"completed,omitempty" yaml:"completed,omitempty"` | ||||||
|
|
||||||
| // Failed is the TTL for purging workflow instances that reach the Failed | ||||||
| // terminal state. | ||||||
| Failed *time.Duration `json:"failed,omitempty" yaml:"failed,omitempty"` | ||||||
|
|
||||||
| // Terminated is the TTL for purging workflow instances that reach the | ||||||
| // Terminated terminal state. | ||||||
| Terminated *time.Duration `json:"terminated,omitempty" yaml:"terminated,omitempty"` | ||||||
| } | ||||||
| ``` | ||||||
|
|
||||||
| ```yaml | ||||||
| kind: Configuration | ||||||
| metadata: | ||||||
| name: wfpolicy | ||||||
| spec: | ||||||
| workflow: | ||||||
| stateRetentionPolicy: | ||||||
| anyTerminal: "5s" | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How does it work if all are set in this example, like |
||||||
| completed: "0s" | ||||||
| failed: "999h" | ||||||
| terminated: "999h" | ||||||
| ``` | ||||||
|
|
||||||
| #### CLI | ||||||
|
|
||||||
| Users can give a Go style duration string when running a workflow from the CLI. | ||||||
|
|
||||||
| ```bash | ||||||
| $ dapr run my-workflow --state-retention=120h | ||||||
| ``` | ||||||
|
|
||||||
| ```bash | ||||||
| $ dapr run my-workflow --state-retention=0s | ||||||
| ``` | ||||||
|
|
||||||
| The new retention reminders will be displayed like: | ||||||
|
|
||||||
| ```bash | ||||||
| $ dapr scheduler list | ||||||
| NAME BEGIN COUNT LAST TRIGGER | ||||||
| workflow-retention/my-workflow 96h 0 | ||||||
| ``` | ||||||
|
|
||||||
| A user is able to delete a retention reminder by either manually purging the workflow or deleting the reminder directly. | ||||||
|
|
||||||
| ```bash | ||||||
| $ dapr workflow purge my-workflow | ||||||
| $ dapr scheduler delete workflow-retention/my-workflow | ||||||
| ``` | ||||||
|
|
||||||
| #### Go | ||||||
|
|
||||||
| ```go | ||||||
| wf.ScheduleWorkflow(ctx, "my-workflow", workflow.WithStateRetention(time.Hour*24*5)) | ||||||
| wf.ScheduleWorkflow(ctx, "my-workflow", workflow.WithStateRetention(0)) | ||||||
| wf.ScheduleWorkflow(ctx, "my-workflow", workflow.WithStateRetentionCompleted(0)) | ||||||
| wf.ScheduleWorkflow(ctx, "my-workflow", workflow.WithStateRetentionFailed(time.Hour*24*5)) | ||||||
| ``` | ||||||
|
|
||||||
| #### Python | ||||||
|
|
||||||
| ```python | ||||||
| wfClient.schedule_new_workflow(workflow=my_workflow, state_retention=timedelta(days=5)) | ||||||
| wfClient.schedule_new_workflow(workflow=my_workflow, state_retention=timedelta(seconds=0)) | ||||||
| wfClient.schedule_new_workflow(workflow=my_workflow, state_retention_completed=timedelta(seconds=0)) | ||||||
| wfClient.schedule_new_workflow(workflow=my_workflow, state_retention_failed=timedelta(days=5)) | ||||||
| ``` | ||||||
|
|
||||||
| #### Javascript | ||||||
|
|
||||||
| ```js | ||||||
| workflowClient.scheduleNewWorkflow({workflow: MyWorkflow, state_retention: Temporal.Duration.from({days: 5})}) | ||||||
| workflowClient.scheduleNewWorkflow({workflow: MyWorkflow, state_retention: Temporal.Duration.from({})}) | ||||||
| workflowClient.scheduleNewWorkflow({workflow: MyWorkflow, state_retention_completed: Temporal.Duration.from({})}) | ||||||
| workflowClient.scheduleNewWorkflow({workflow: MyWorkflow, state_retention_failed: Temporal.Duration.from({days: 5})}) | ||||||
| ``` | ||||||
|
|
||||||
| #### .NET | ||||||
|
|
||||||
| ```dotnet | ||||||
| workflowClient.ScheduleNewWorkflowAsync( | ||||||
| name: nameof(MyWorkflow), | ||||||
| stateRetention: TimeSpan.FromDays(5); | ||||||
| ); | ||||||
| workflowClient.ScheduleNewWorkflowAsync( | ||||||
| name: nameof(MyWorkflow), | ||||||
| stateRetention: TimeSpan.FromSeconds(0) | ||||||
| stateRetentionCompleted: TimeSpan.FromSeconds(0) | ||||||
| stateRetentionFailed: TimeSpan.FromDays(5) | ||||||
| ); | ||||||
| ``` | ||||||
|
|
||||||
| #### Java | ||||||
|
|
||||||
| ```java | ||||||
| opts.setStateRetention(Duration.ofDays(5)); | ||||||
| opts.setStateRetentionCompleted(Duration.ofSeconds(0)); | ||||||
| opts.setStateRetentionFailed(Duration.ofDays(5)); | ||||||
| workflowClient.scheduleNewWorkflow(OrderProcessingWorkflow.class, opts); | ||||||
| ``` | ||||||
|
|
||||||
| ### Runtime | ||||||
|
|
||||||
| #### protos | ||||||
|
|
||||||
| The following protos will be updated with the new retention policy message so it is piped from workflow creation to execution. | ||||||
|
|
||||||
| The new option will be added to `CreateInstanceRequest`, populated by the client. | ||||||
|
|
||||||
| ```proto | ||||||
| message CreateInstanceRequest { | ||||||
| string instanceId = 1; | ||||||
| string name = 2; | ||||||
| // ... | ||||||
| optional InstanceStateRetentionPolicy retentionPolicy = 10; // NEW | ||||||
| } | ||||||
|
|
||||||
| message InstanceStateRetentionPolicy { | ||||||
| optional google.protobuf.Duration allTerminal = 1; | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| optional google.protobuf.Duration completed = 2; | ||||||
| optional google.protobuf.Duration failed = 3; | ||||||
| optional google.protobuf.Duration terminated = 4; | ||||||
| } | ||||||
| ``` | ||||||
|
|
||||||
| `ExecutionStartedEvent` will contain the retention policy which signals the duration after which the workflow has completed should be purged. | ||||||
| This field will be persistent in the history log. | ||||||
| This field will be populated by the durabletask backend executor, piping the field from `CreateInstanceRequest`. | ||||||
|
|
||||||
| ```proto | ||||||
| message ExecutionStartedEvent { | ||||||
| string name = 1; | ||||||
| // EXISTING | ||||||
| optional InstanceStateRetentionPolicy = 10; // NEW | ||||||
| } | ||||||
| ``` | ||||||
|
|
||||||
| #### Actors | ||||||
|
|
||||||
| Upon workflow reaching a terminal state, after the orchestration actor has written the result to the actor state store, it will then create an actor reminder if the state retention policy field is present in the execution started event or as the app ID workflow configuration. | ||||||
|
|
||||||
| This reminder will target a new actor workflow type, with the actor ID being the instance ID of the workflow. | ||||||
|
|
||||||
| The new actor type will follow convention and have the following form: | ||||||
|
|
||||||
| ``` | ||||||
| dapr.internal.<namespace>.<app-id>.retentioner | ||||||
| ``` | ||||||
|
|
||||||
| Upon activation of the reminder, the new retentioner actor will be activated, call the purge API on the workflow orchestrator actor for the given instance ID, and then deactivate itself. | ||||||
| Along with the other workflow actor types, this type will be registered on workflow client connection, and unregistered on workflow worker client disconnection. | ||||||
|
|
||||||
| By using a new actor type, this feature is fully backwards compatible as older clients will not register for this new purge workflow type. | ||||||
|
|
||||||
| ``` | ||||||
| WORKFLOW COMPLETE -> orestrator -> create retentioner reminder -...> execute retentioner reminder -> execute retentioner actor -> execute purge on orchestrator | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think thats right? |
||||||
| ``` | ||||||
|
|
||||||
| # Alternatives | ||||||
|
|
||||||
| Another option is to use the actor TTL state store functionality to delete store keys based on individual key TTls. | ||||||
| This is not appropriate as it _must_ be the case that workflow data be only delete from the state store once the workflow has reached a terminal state. | ||||||
| Not doing so would corrupt the workflow processing. | ||||||
| It is therefore necessary that the Purge API is used to delete the stored data, which itself processes the request inside the same workflow state machine. | ||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to support different TTL per state? Users might not want to keep
COMPLETEDworkflows for too long, but might want to keepFAILEDones for longer.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that makes sense i think- only think is that it will balloon the options in SDKs a bit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we support setting a global ttl in daprd? so clients have to do nothing?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So current thinking is having a
spec.workflow.purgeTTL.{defaultTerminal,completed,failed,terminated}as time durations in the Dapr config.Then the following optional proto message under the
CreateInstanceRequestandExecutionStartedEventmessages. The workflow runtime will pick the min duration which matches the terminal state of the workflow. Per workflow requests with a matching state will have preference over the Dapr config. If a specific terminal state, anddefaultTerminalare defined, the specific terminal state will take precedence.wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perfect, imo even just the dapr configuration is sufficient, so no real need to modify the SDKs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like having both, but happy to see either! My feeling is the more we can let users control in terms of behaviour in code, the better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd just call it
defaultbecause TTL purging is always at terminal states, right?Regarding adding a configuration for this, how would this handle changes in the configuration for in-flight workflows? Like if a workflow started when the TTL was set at 60s but the configuration changes to 30s, which TTL will the workflow experience? I would assume it's 60s because it's 'saved' in the workflow, but it might feel confusing for users if they change the configuration but still see workflows being purged after 60s...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case it would the configuration a the time at which the workflow reach that terminal state. It would be possible to see what the TTL is with
dapr scheduler list --filter workflow-retention