-
Notifications
You must be signed in to change notification settings - Fork 55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
IWF-132: Use firstRunId instead workflowId when naming WaitForStateExecutionCompletion workflow #447
Conversation
…ecutionCompletion workflow
service/api/service.go
Outdated
} | ||
|
||
if response.FirstRunId == "" { | ||
parentWfId = req.WorkflowId // Cadence |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we know cadence doesn't have this, we can avoid calling the DescribeWorkflowExecution
API for cadence.
Although this API is really fast (it's backed by caching on cadence/temporal) but still nice to do so if this is not too much. (we do have cadence users in OSS, some ppl just perfer using cadence)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added GetBackendType method to UnifiedClient
service/api/service.go
Outdated
} | ||
|
||
options := uclient.StartWorkflowOptions{ | ||
ID: workflowId, | ||
// TODO: Switch currentWorkflowId to _ (after renaming) after a short amount of time to ensure backward compatibility |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohh interesting! I was thinking of making two deployment -- first deploy the workflowImpl code (which signalWIthStart both), after some time, deploy the api service.
But what you put here made me feel like it's nice to use a config?
Potentially using the Api ApiConfig
yaml:"api"`` in https://github.com/indeedeng/iwf/blob/main/config/config.go
waitForStateCompletionMigration:
SignalWithStartOn: both/new/old
WaitForOn:old/new
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In workflowImpl, we have access to ApiConfig via env like this: https://github.com/indeedeng/iwf/blob/main/service/interpreter/continueAsNewer.go#L57
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good idea. I'll implement it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
service/api/service.go
Outdated
if response.FirstRunId == "" { | ||
parentWfId = req.WorkflowId // Cadence | ||
} else { | ||
parentWfId = response.FirstRunId // Temporal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very nit: parentId? it's not workflowId Lol.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 done
service/interpreter/workflowImpl.go
Outdated
|
||
if response.FirstRunId != "" { // Temporal | ||
// Start WaitForStateCompletionWorkflow with a new name to ensure smooth transition | ||
err = unifiedClient.SignalWithStartWaitForStateCompletionWorkflow( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: we can refactor the code to a common method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
f7df0d5
to
dbcf2a4
Compare
service/interpreter/workflowImpl.go
Outdated
if state.WaitForKey != nil { | ||
return service.IwfSystemConstPrefix + parentId + "_" + state.StateId + "_" + *state.WaitForKey | ||
} else { | ||
return service.IwfSystemConstPrefix + parentId + "_" + *executionContext.StateExecutionId | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: it would be nice to move this to a common place to share with api service: GetWorkflowIdForWaitForStateExecution(parentId, stateExeId, waitForKey, stateId)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
service/interpreter/workflowImpl.go
Outdated
// panic will let the workflow task will retry until the signal is sent | ||
panic(fmt.Errorf("failed to signal on completion %w", err)) | ||
} | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this func doesn't need to return anything(error).
It's a bit tricky here that we need to expand on later about "non determinism and how workflow task is executed in Cadence/Temporal".
For here, we intentionally swallow the error of "alreadyStarted", and panic on any other errors. By panicing, it will just retry automatically.
We don't want to proceed to other states for any other errors. The proceed to the state is the feature for workflow state failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think this fails one of integration tests and I'm looking into it atm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed the issue
6cc92ab
to
3afb3e5
Compare
service/interpreter/workflowImpl.go
Outdated
if provider.GetBackendType() == service.BackendTypeCadence || | ||
(provider.GetBackendType() == service.BackendTypeTemporal && (signalWithStartOn == "old" || signalWithStartOn == "both")) { | ||
workflowId := utils.GetWorkflowIdForWaitForStateExecution(provider.GetWorkflowInfo(ctx).FirstRunID, *executionContext.StateExecutionId, *state.WaitForKey, state.StateId) | ||
|
||
err = signalWithStart(unifiedClient, workflowId) | ||
if err != nil && !unifiedClient.IsWorkflowAlreadyStartedError(err) { | ||
// WorkflowAlreadyStartedError is returned when the started workflow is closed and the signal is not sent | ||
// panic will let the workflow task will retry until the signal is sent | ||
panic(fmt.Errorf("failed to signal on completion %w", err)) | ||
} | ||
} | ||
|
||
err := unifiedClient.SignalWithStartWaitForStateCompletionWorkflow( | ||
context.Background(), | ||
uclient.StartWorkflowOptions{ | ||
ID: workflowId, | ||
TaskQueue: env.GetTaskQueue(), | ||
WorkflowExecutionTimeout: 60 * time.Second, // timeout doesn't matter here as it will complete immediate with the signal | ||
}, | ||
iwfidl.StateCompletionOutput{}) | ||
if err != nil && !unifiedClient.IsWorkflowAlreadyStartedError(err) { | ||
// WorkflowAlreadyStartedError is returned when the started workflow is closed and the signal is not sent | ||
// panic will let the workflow task will retry until the signal is sent | ||
panic(fmt.Errorf("failed to signal on completion %w", err)) | ||
// signalWithStart with new workflowId (containing firstRunId) | ||
if provider.GetBackendType() == service.BackendTypeTemporal && (signalWithStartOn == "both" || signalWithStartOn == "new") { | ||
workflowId := utils.GetWorkflowIdForWaitForStateExecution(provider.GetWorkflowInfo(ctx).FirstRunID, *executionContext.StateExecutionId, *state.WaitForKey, state.StateId) | ||
|
||
// Start WaitForStateCompletionWorkflow with a new name to ensure smooth transition | ||
err = signalWithStart(unifiedClient, workflowId) | ||
if err != nil && !unifiedClient.IsWorkflowAlreadyStartedError(err) { | ||
// WorkflowAlreadyStartedError is returned when the started workflow is closed and the signal is not sent | ||
// panic will let the workflow task will retry until the signal is sent | ||
panic(fmt.Errorf("failed to signal on completion %w", err)) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored this from two nested if statements into two separate ifs with more complicated expressions
service/api/service.go
Outdated
parentId = req.WorkflowId | ||
} | ||
|
||
workflowId = utils.GetWorkflowIdForWaitForStateExecution(parentId, *req.StateExecutionId, *req.WaitForKey, *req.StateId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
*req.WaitForKey
will assume that field is always present (otherwise it will be a panic -- nil pointer error). I think only StateExecutionId is required. So we probably need to pass pointer into the method and let the method do a nil check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
service/interpreter/workflowImpl.go
Outdated
if err != nil && !unifiedClient.IsWorkflowAlreadyStartedError(err) { | ||
// WorkflowAlreadyStartedError is returned when the started workflow is closed and the signal is not sent | ||
// panic will let the workflow task will retry until the signal is sent | ||
panic(fmt.Errorf("failed to signal on completion %w", err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: it will look slightly better to move this error handling into the signalWithStart method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed
config/config.go
Outdated
func GetSignalWithStartOnWithDefault(config Config) string { | ||
if config.Api.WaitForStateCompletionMigration.SignalWithStartOn != "" { | ||
return config.Api.WaitForStateCompletionMigration.SignalWithStartOn | ||
} | ||
return "old" | ||
} | ||
|
||
func GetWaitForOnWithDefault(config Config) string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I actually should have done it this using struct member method instead:
func (c Config) GetSignalWithStartOnWithDefault() string{
// the same code here
}
And to use it:
config.GetSignalWithStartOnWithDefault()
This is very common pattern in Golang, I don't know why I didn't do that way 😂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
a3f4e52
to
15def77
Compare
config/config.go
Outdated
@@ -31,6 +31,15 @@ type ( | |||
// omitRpcInputOutputInHistory is the flag to omit rpc input/output in history | |||
// the input/output is only for debugging purpose but could be too expensive to store | |||
OmitRpcInputOutputInHistory *bool `yaml:"omitRpcInputOutputInHistory"` | |||
// WaitForStateCompletionMigration is used to control naming of the continuedAsNew workflows |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: not continuedAsNew
Description
Checklist
Related Issue
Closes #404