From c3b7de2e08f742e9868859c3858e7184855575b9 Mon Sep 17 00:00:00 2001 From: Seva Kaloshin Date: Tue, 17 Jun 2025 13:17:25 +0200 Subject: [PATCH 01/11] add support of GetVersionOption, ExecuteWithVersion and ExecuteWithMinVersion --- internal/interceptors.go | 6 +- internal/internal_event_handlers.go | 44 ++++++++++-- internal/internal_worker_base.go | 2 +- internal/internal_workflow_testsuite.go | 2 +- internal/workflow.go | 95 +++++++++++++++++++++++-- 5 files changed, 132 insertions(+), 17 deletions(-) diff --git a/internal/interceptors.go b/internal/interceptors.go index d28dd2388..c49a46d2a 100644 --- a/internal/interceptors.go +++ b/internal/interceptors.go @@ -63,7 +63,7 @@ type WorkflowInterceptor interface { GetSignalChannel(ctx Context, signalName string) Channel SideEffect(ctx Context, f func(ctx Context) interface{}) Value MutableSideEffect(ctx Context, id string, f func(ctx Context) interface{}, equals func(a, b interface{}) bool) Value - GetVersion(ctx Context, changeID string, minSupported, maxSupported Version) Version + GetVersion(ctx Context, changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) Version SetQueryHandler(ctx Context, queryType string, handler interface{}) error IsReplaying(ctx Context) bool HasLastCompletionResult(ctx Context) bool @@ -158,8 +158,8 @@ func (t *WorkflowInterceptorBase) MutableSideEffect(ctx Context, id string, f fu } // GetVersion forwards to t.Next -func (t *WorkflowInterceptorBase) GetVersion(ctx Context, changeID string, minSupported, maxSupported Version) Version { - return t.Next.GetVersion(ctx, changeID, minSupported, maxSupported) +func (t *WorkflowInterceptorBase) GetVersion(ctx Context, changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) Version { + return t.Next.GetVersion(ctx, changeID, minSupported, maxSupported, opts...) } // SetQueryHandler forwards to t.Next diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index b419d6931..926a5875e 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -602,20 +602,48 @@ func validateVersion(changeID string, version, minSupported, maxSupported Versio } } -func (wc *workflowEnvironmentImpl) GetVersion(changeID string, minSupported, maxSupported Version) Version { +func (wc *workflowEnvironmentImpl) GetVersion(changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) Version { + // Check if the changeID already has a version assigned + // If it does, validate the version against the min and max supported versions if version, ok := wc.changeVersions[changeID]; ok { validateVersion(changeID, version, minSupported, maxSupported) return version } + // Apply options to determine which version to use + options := &getVersionOptions{} + for _, opt := range opts { + opt(options) + } + var version Version - if wc.isReplay { - // GetVersion for changeID is called first time in replay mode, use DefaultVersion + switch { + + // GetVersion for changeID is called first time in replay mode, use DefaultVersion + case wc.isReplay: version = DefaultVersion - } else { - // GetVersion for changeID is called first time (non-replay mode), generate a marker decision for it. - // Also upsert search attributes to enable ability to search by changeVersion. + + // If ExecuteWithVersion option is used, use the custom version provided + case options.customVersion != nil: + version = *options.customVersion + + // If ExecuteWithMinVersion option is set, use the minimum supported version + case options.useMinVersion: + version = minSupported + + // Otherwise, use the maximum supported version + default: version = maxSupported + } + + // Validate the version against the min and max supported versions + // ensuring it is within the acceptable range + validateVersion(changeID, version, minSupported, maxSupported) + + // If the version is not the DefaultVersion, and it's not a replay, record it and update search attributes + // Keeping the DefaultVersion as a special case where no version marker is recorded + if !wc.isReplay && version != DefaultVersion { + // Record the version marker and update search attributes wc.decisionsHelper.recordVersionMarker(changeID, version, wc.GetDataConverter()) err := wc.UpsertSearchAttributes(createSearchAttributesForChangeVersion(changeID, version, wc.changeVersions)) if err != nil { @@ -623,8 +651,10 @@ func (wc *workflowEnvironmentImpl) GetVersion(changeID string, minSupported, max } } - validateVersion(changeID, version, minSupported, maxSupported) + // Store the version in the changeVersions + // ensuring that it can be retrieved later wc.changeVersions[changeID] = version + return version } diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index b4bfb0ad6..642302dbd 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -76,7 +76,7 @@ type ( localActivityClient workflowTimerClient SideEffect(f func() ([]byte, error), callback resultHandler) - GetVersion(changeID string, minSupported, maxSupported Version) Version + GetVersion(changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) Version WorkflowInfo() *WorkflowInfo Complete(result []byte, err error) RegisterCancelHandler(handler func()) diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index f41c7bb83..122be55de 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -1928,7 +1928,7 @@ func (env *testWorkflowEnvironmentImpl) SideEffect(f func() ([]byte, error), cal callback(f()) } -func (env *testWorkflowEnvironmentImpl) GetVersion(changeID string, minSupported, maxSupported Version) (retVersion Version) { +func (env *testWorkflowEnvironmentImpl) GetVersion(changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) (retVersion Version) { if mockVersion, ok := env.getMockedVersion(changeID, changeID, minSupported, maxSupported); ok { // GetVersion for changeID is mocked env.UpsertSearchAttributes(createSearchAttributesForChangeVersion(changeID, mockVersion, env.changeVersions)) diff --git a/internal/workflow.go b/internal/workflow.go index 0545f808b..82cef95af 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -1565,11 +1565,96 @@ const DefaultVersion Version = -1 // CadenceChangeVersion is used as search attributes key to find workflows with specific change version. const CadenceChangeVersion = "CadenceChangeVersion" +// GetVersionOption represents a function that configures GetVersion behavior +type GetVersionOption func(*getVersionOptions) + +type getVersionOptions struct { + // customVersion is used to force GetVersion to return a specific version + // instead of maxSupported version. Set up via ExecuteWithVersion option. + customVersion *Version + + // useMinVersion is used to force GetVersion to return minSupported version + // instead of maxSupported version. Set up via ExecuteWithMinVersion option. + useMinVersion bool +} + +// ExecuteWithVersion returns a GetVersionOption that forces a specific version to be returned +// when executed for the first time, instead of returning maxSupported version. +// +// This option can be used when you want to separate the versioning of the workflow code and +// activation of the new logic in the workflow code, to ensure that your changes can be safely rolled back +// if needed. For example, initially a workflow has the following code: +// +// err = workflow.ExecuteActivity(ctx, foo).Get(ctx, nil) +// +// It should be updated to: +// +// err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil) +// +// Step 1 +// To roll out your changes safely, both versions of your workflow code should be compatible with each other. +// To achieve that, you can use GetVersion with ExecuteWithVersion option. +// When GetVersion is executed for the first time, it will return DefaultVersion instead of maxSupported version: +// +// v := GetVersion(ctx, "fooChange", DefaultVersion, 1, ExecuteWithVersion(DefaultVersion)) +// if v == DefaultVersion { +// err = workflow.ExecuteActivity(ctx, foo).Get(ctx, nil) +// } else { +// err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil) +// } +// +// At this step, the previous version of the code supports only DefaultVersion, however new version of the code +// supports both DefaultVersion and 1. At the same time, the new version of the code is not yet activated, +// so the workflow started on the new version of the code will still execute foo activity - previous version of the code. +// This makes it possible to safely roll back your changes if needed, as the previous code supports DefaultVersion. +// +// Step 2 +// When the previous version of the code is no longer running, there is no need to start new workflow executions +// with DefaultVersion anymore, and you can the maxSupported version to activate the new code. To achieve that you can +// remove the usage of ExecuteWithVersion option. When GetVersion is executed for the first time, it will return maxSupported version: +// +// v := GetVersion(ctx, "fooChange", DefaultVersion, 1) +// if v == DefaultVersion { +// err = workflow.ExecuteActivity(ctx, foo).Get(ctx, nil) +// } else { +// err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil) +// } +// +// At this step, the previous and old versions of the code support both versions DefaultVersion and 1, +// however the new version of the code is activated, so the workflow started on the new version of the code +// will execute bar activity - new version of the code. This makes it possible to safely roll back your changes if needed, +// because both versions of the code support both versions DefaultVersion and 1. +// +// Step 3 +// When there are no running previous version of the code and there are no workflow executions +// running DefaultVersion the correspondent branch can be removed: +// +// GetVersion(ctx, "fooChange", 1, 1) +// err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil) +// +// ExecuteWithVersion option is useful when you want to ensure that your changes can be safely rolled back if needed, as +// both versions of the workflow code are compatible with each other. +func ExecuteWithVersion(version Version) GetVersionOption { + return func(o *getVersionOptions) { + o.customVersion = &version + } +} + +// ExecuteWithMinVersion returns a GetVersionOption that makes GetVersion return minSupported version +// when executed for the first time, instead of returning maxSupported version. +// To see how this option can be used, see the ExecuteWithVersion option +func ExecuteWithMinVersion() GetVersionOption { + return func(o *getVersionOptions) { + o.useMinVersion = true + } +} + // GetVersion is used to safely perform backwards incompatible changes to workflow definitions. // It is not allowed to update workflow code while there are workflows running as it is going to break // determinism. The solution is to have both old code that is used to replay existing workflows // as well as the new one that is used when it is executed for the first time. -// GetVersion returns maxSupported version when is executed for the first time. This version is recorded into the +// GetVersion returns maxSupported version (to return another version, check GetVersionOption), +// when is executed for the first time. This version is recorded into the // workflow history as a marker event. Even if maxSupported version is changed the version that was recorded is // returned on replay. DefaultVersion constant contains version of code that wasn't versioned before. // For example initially workflow has the following code: @@ -1630,13 +1715,13 @@ const CadenceChangeVersion = "CadenceChangeVersion" // } else { // err = workflow.ExecuteActivity(ctx, qux, data).Get(ctx, nil) // } -func GetVersion(ctx Context, changeID string, minSupported, maxSupported Version) Version { +func GetVersion(ctx Context, changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) Version { i := getWorkflowInterceptor(ctx) - return i.GetVersion(ctx, changeID, minSupported, maxSupported) + return i.GetVersion(ctx, changeID, minSupported, maxSupported, opts...) } -func (wc *workflowEnvironmentInterceptor) GetVersion(ctx Context, changeID string, minSupported, maxSupported Version) Version { - return wc.env.GetVersion(changeID, minSupported, maxSupported) +func (wc *workflowEnvironmentInterceptor) GetVersion(ctx Context, changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) Version { + return wc.env.GetVersion(changeID, minSupported, maxSupported, opts...) } // SetQueryHandler sets the query handler to handle workflow query. The queryType specify which query type this handler From 3f1e0274e529d0c30ac6604ebaf2435f7912553f Mon Sep 17 00:00:00 2001 From: Seva Kaloshin Date: Tue, 17 Jun 2025 13:19:43 +0200 Subject: [PATCH 02/11] fix comment --- internal/internal_event_handlers.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index 926a5875e..859d34b1e 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -605,6 +605,7 @@ func validateVersion(changeID string, version, minSupported, maxSupported Versio func (wc *workflowEnvironmentImpl) GetVersion(changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) Version { // Check if the changeID already has a version assigned // If it does, validate the version against the min and max supported versions + // ensuring it is within the acceptable range if version, ok := wc.changeVersions[changeID]; ok { validateVersion(changeID, version, minSupported, maxSupported) return version From 1ac0235e9e25207fa3c8b528e97868a16d95500a Mon Sep 17 00:00:00 2001 From: Seva Kaloshin Date: Tue, 17 Jun 2025 15:04:06 +0200 Subject: [PATCH 03/11] add tests --- internal/internal_event_handlers_test.go | 62 +++++++++++++++++++++--- 1 file changed, 55 insertions(+), 7 deletions(-) diff --git a/internal/internal_event_handlers_test.go b/internal/internal_event_handlers_test.go index 4ba96b2a2..4ee23dff2 100644 --- a/internal/internal_event_handlers_test.go +++ b/internal/internal_event_handlers_test.go @@ -754,6 +754,22 @@ func TestGetVersion(t *testing.T) { res := weh.GetVersion("test", 1, 3) assert.Equal(t, Version(2), res) }) + t.Run("version exists, ExecuteWithVersion is used", func(t *testing.T) { + weh := testWorkflowExecutionEventHandler(t, newRegistry()) + weh.changeVersions = map[string]Version{ + "test": 2, + } + res := weh.GetVersion("test", 1, 3, ExecuteWithVersion(3)) + assert.Equal(t, Version(2), res) + }) + t.Run("version exists, ExecuteWithMinVersion is used", func(t *testing.T) { + weh := testWorkflowExecutionEventHandler(t, newRegistry()) + weh.changeVersions = map[string]Version{ + "test": 2, + } + res := weh.GetVersion("test", 1, 3, ExecuteWithMinVersion()) + assert.Equal(t, Version(2), res) + }) t.Run("version doesn't exist in replay", func(t *testing.T) { weh := testWorkflowExecutionEventHandler(t, newRegistry()) weh.isReplay = true @@ -770,6 +786,38 @@ func TestGetVersion(t *testing.T) { assert.Equal(t, Version(3), weh.changeVersions["test"]) assert.Equal(t, []byte(`["test-3"]`), weh.workflowInfo.SearchAttributes.IndexedFields[CadenceChangeVersion], "ensure search attributes are updated") }) + t.Run("version doesn't exist, ExecuteWithVersion is used", func(t *testing.T) { + weh := testWorkflowExecutionEventHandler(t, newRegistry()) + res := weh.GetVersion("test", DefaultVersion, 3, ExecuteWithVersion(2)) + assert.Equal(t, Version(2), res) + require.Contains(t, weh.changeVersions, "test") + assert.Equal(t, Version(2), weh.changeVersions["test"]) + assert.Equal(t, []byte(`["test-2"]`), weh.workflowInfo.SearchAttributes.IndexedFields[CadenceChangeVersion], "ensure search attributes are updated") + }) + t.Run("version doesn't exist, ExecuteWithVersion is used, DefaultVersion is used", func(t *testing.T) { + weh := testWorkflowExecutionEventHandler(t, newRegistry()) + res := weh.GetVersion("test", DefaultVersion, 3, ExecuteWithVersion(DefaultVersion)) + assert.Equal(t, DefaultVersion, res) + require.Contains(t, weh.changeVersions, "test") + assert.Equal(t, DefaultVersion, weh.changeVersions["test"]) + require.Nil(t, weh.workflowInfo.SearchAttributes, "ensure search attributes are not updated") + }) + t.Run("version doesn't exist, ExecuteWithMinVersion is used, min is non DefaultVersion", func(t *testing.T) { + weh := testWorkflowExecutionEventHandler(t, newRegistry()) + res := weh.GetVersion("test", 1, 3, ExecuteWithMinVersion()) + assert.Equal(t, Version(1), res) + require.Contains(t, weh.changeVersions, "test") + assert.Equal(t, Version(1), weh.changeVersions["test"]) + assert.Equal(t, []byte(`["test-1"]`), weh.workflowInfo.SearchAttributes.IndexedFields[CadenceChangeVersion], "ensure search attributes are updated") + }) + t.Run("version doesn't exist, ExecuteWithMinVersion is used, DefaultVersion is used", func(t *testing.T) { + weh := testWorkflowExecutionEventHandler(t, newRegistry()) + res := weh.GetVersion("test", DefaultVersion, 3, ExecuteWithMinVersion()) + assert.Equal(t, DefaultVersion, res) + require.Contains(t, weh.changeVersions, "test") + assert.Equal(t, DefaultVersion, weh.changeVersions["test"]) + require.Nil(t, weh.workflowInfo.SearchAttributes, "ensure search attributes are not updated") + }) } func TestMutableSideEffect(t *testing.T) { @@ -982,6 +1030,13 @@ func TestWorkflowExecutionEnvironment_NewTimer_immediate_calls(t *testing.T) { } func testWorkflowExecutionEventHandler(t *testing.T, registry *registry) *workflowExecutionEventHandlerImpl { + var testWorkflowInfo = &WorkflowInfo{ + WorkflowType: WorkflowType{ + Name: "test", + Path: "", + }, + } + return newWorkflowExecutionEventHandler( testWorkflowInfo, func(result []byte, err error) {}, @@ -996,13 +1051,6 @@ func testWorkflowExecutionEventHandler(t *testing.T, registry *registry) *workfl ).(*workflowExecutionEventHandlerImpl) } -var testWorkflowInfo = &WorkflowInfo{ - WorkflowType: WorkflowType{ - Name: "test", - Path: "", - }, -} - func getSerializedDetails[T, V any](t *testing.T, id T, data V) []byte { converter := defaultDataConverter{} res, err := converter.ToData(id, data) From 47a2d2a8e22843b5fdbe139c03f4393eb51fc8fb Mon Sep 17 00:00:00 2001 From: Seva Kaloshin Date: Tue, 17 Jun 2025 15:08:54 +0200 Subject: [PATCH 04/11] add tests --- internal/internal_event_handlers_test.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/internal/internal_event_handlers_test.go b/internal/internal_event_handlers_test.go index 4ee23dff2..0b87fc801 100644 --- a/internal/internal_event_handlers_test.go +++ b/internal/internal_event_handlers_test.go @@ -818,6 +818,23 @@ func TestGetVersion(t *testing.T) { assert.Equal(t, DefaultVersion, weh.changeVersions["test"]) require.Nil(t, weh.workflowInfo.SearchAttributes, "ensure search attributes are not updated") }) + + t.Run("version doesn't exist, ExecuteWithVersion is used, version > maximum version", func(t *testing.T) { + weh := testWorkflowExecutionEventHandler(t, newRegistry()) + assert.PanicsWithValue(t, `Workflow code is too old to support version 10 for "test" changeID. The maximum supported version is 3`, func() { + weh.GetVersion("test", DefaultVersion, 3, ExecuteWithVersion(10)) + }) + + require.Nil(t, weh.workflowInfo.SearchAttributes, "ensure search attributes are not updated") + }) + t.Run("version doesn't exist, ExecuteWithVersion is used, version < minimum version", func(t *testing.T) { + weh := testWorkflowExecutionEventHandler(t, newRegistry()) + assert.PanicsWithValue(t, `Workflow code removed support of version 0. for "test" changeID. The oldest supported version is 1`, func() { + weh.GetVersion("test", 1, 3, ExecuteWithVersion(0)) + }) + + require.Nil(t, weh.workflowInfo.SearchAttributes, "ensure search attributes are not updated") + }) } func TestMutableSideEffect(t *testing.T) { From ca2e6e2811083bd9b18b5e9b309ea2bffd402a15 Mon Sep 17 00:00:00 2001 From: Seva Kaloshin Date: Thu, 19 Jun 2025 13:21:53 +0200 Subject: [PATCH 05/11] add public GetVersion --- internal/internal_event_handlers.go | 8 +-- internal/workflow.go | 23 +++++---- workflow/workflow.go | 78 ++++++++++++++++++++++++++++- 3 files changed, 92 insertions(+), 17 deletions(-) diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index 859d34b1e..36bfc0d18 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -612,7 +612,7 @@ func (wc *workflowEnvironmentImpl) GetVersion(changeID string, minSupported, max } // Apply options to determine which version to use - options := &getVersionOptions{} + options := &GetVersionOptions{} for _, opt := range opts { opt(options) } @@ -625,11 +625,11 @@ func (wc *workflowEnvironmentImpl) GetVersion(changeID string, minSupported, max version = DefaultVersion // If ExecuteWithVersion option is used, use the custom version provided - case options.customVersion != nil: - version = *options.customVersion + case options.CustomVersion != nil: + version = *options.CustomVersion // If ExecuteWithMinVersion option is set, use the minimum supported version - case options.useMinVersion: + case options.UseMinVersion: version = minSupported // Otherwise, use the maximum supported version diff --git a/internal/workflow.go b/internal/workflow.go index 82cef95af..00e94aa57 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -1565,17 +1565,18 @@ const DefaultVersion Version = -1 // CadenceChangeVersion is used as search attributes key to find workflows with specific change version. const CadenceChangeVersion = "CadenceChangeVersion" -// GetVersionOption represents a function that configures GetVersion behavior -type GetVersionOption func(*getVersionOptions) +// GetVersionOption configures GetVersion behavior +type GetVersionOption func(*GetVersionOptions) -type getVersionOptions struct { - // customVersion is used to force GetVersion to return a specific version +// GetVersionOptions contains options for GetVersion +type GetVersionOptions struct { + // CustomVersion is used to force GetVersion to return a specific version // instead of maxSupported version. Set up via ExecuteWithVersion option. - customVersion *Version + CustomVersion *Version - // useMinVersion is used to force GetVersion to return minSupported version + // UseMinVersion is used to force GetVersion to return minSupported version // instead of maxSupported version. Set up via ExecuteWithMinVersion option. - useMinVersion bool + UseMinVersion bool } // ExecuteWithVersion returns a GetVersionOption that forces a specific version to be returned @@ -1635,8 +1636,8 @@ type getVersionOptions struct { // ExecuteWithVersion option is useful when you want to ensure that your changes can be safely rolled back if needed, as // both versions of the workflow code are compatible with each other. func ExecuteWithVersion(version Version) GetVersionOption { - return func(o *getVersionOptions) { - o.customVersion = &version + return func(o *GetVersionOptions) { + o.CustomVersion = &version } } @@ -1644,8 +1645,8 @@ func ExecuteWithVersion(version Version) GetVersionOption { // when executed for the first time, instead of returning maxSupported version. // To see how this option can be used, see the ExecuteWithVersion option func ExecuteWithMinVersion() GetVersionOption { - return func(o *getVersionOptions) { - o.useMinVersion = true + return func(o *GetVersionOptions) { + o.UseMinVersion = true } } diff --git a/workflow/workflow.go b/workflow/workflow.go index 9d4118133..bf9c5030e 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -45,6 +45,9 @@ type ( // Version represents a change version. See GetVersion call. Version = internal.Version + // GetVersionOption configures GetVersion behaviour + GetVersionOption = internal.GetVersionOption + // ChildWorkflowOptions stores all child workflow specific parameters that will be stored inside of a Context. ChildWorkflowOptions = internal.ChildWorkflowOptions @@ -353,6 +356,77 @@ func MutableSideEffect(ctx Context, id string, f func(ctx Context) interface{}, // DefaultVersion is a version returned by GetVersion for code that wasn't versioned before const DefaultVersion Version = internal.DefaultVersion +// ExecuteWithVersion returns a GetVersionOption that forces a specific version to be returned +// when executed for the first time, instead of returning maxSupported version. +// +// This option can be used when you want to separate the versioning of the workflow code and +// activation of the new logic in the workflow code, to ensure that your changes can be safely rolled back +// if needed. For example, initially a workflow has the following code: +// +// err = workflow.ExecuteActivity(ctx, foo).Get(ctx, nil) +// +// It should be updated to: +// +// err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil) +// +// Step 1 +// To roll out your changes safely, both versions of your workflow code should be compatible with each other. +// To achieve that, you can use GetVersion with ExecuteWithVersion option. +// When GetVersion is executed for the first time, it will return DefaultVersion instead of maxSupported version: +// +// v := GetVersion(ctx, "fooChange", DefaultVersion, 1, ExecuteWithVersion(DefaultVersion)) +// if v == DefaultVersion { +// err = workflow.ExecuteActivity(ctx, foo).Get(ctx, nil) +// } else { +// err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil) +// } +// +// At this step, the previous version of the code supports only DefaultVersion, however new version of the code +// supports both DefaultVersion and 1. At the same time, the new version of the code is not yet activated, +// so the workflow started on the new version of the code will still execute foo activity - previous version of the code. +// This makes it possible to safely roll back your changes if needed, as the previous code supports DefaultVersion. +// +// Step 2 +// When the previous version of the code is no longer running, there is no need to start new workflow executions +// with DefaultVersion anymore, and you can the maxSupported version to activate the new code. To achieve that you can +// remove the usage of ExecuteWithVersion option. When GetVersion is executed for the first time, it will return maxSupported version: +// +// v := GetVersion(ctx, "fooChange", DefaultVersion, 1) +// if v == DefaultVersion { +// err = workflow.ExecuteActivity(ctx, foo).Get(ctx, nil) +// } else { +// err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil) +// } +// +// At this step, the previous and old versions of the code support both versions DefaultVersion and 1, +// however the new version of the code is activated, so the workflow started on the new version of the code +// will execute bar activity - new version of the code. This makes it possible to safely roll back your changes if needed, +// because both versions of the code support both versions DefaultVersion and 1. +// +// Step 3 +// When there are no running previous version of the code and there are no workflow executions +// running DefaultVersion the correspondent branch can be removed: +// +// GetVersion(ctx, "fooChange", 1, 1) +// err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil) +// +// ExecuteWithVersion option is useful when you want to ensure that your changes can be safely rolled back if needed, as +// both versions of the workflow code are compatible with each other. +func ExecuteWithVersion(version Version) GetVersionOption { + return func(o *internal.GetVersionOptions) { + o.CustomVersion = &version + } +} + +// ExecuteWithMinVersion returns a GetVersionOption that makes GetVersion return minSupported version +// when executed for the first time, instead of returning maxSupported version. +// To see how this option can be used, see the ExecuteWithVersion option +func ExecuteWithMinVersion() GetVersionOption { + return func(o *internal.GetVersionOptions) { + o.UseMinVersion = true + } +} + // GetVersion is used to safely perform backwards incompatible changes to workflow definitions. // It is not allowed to update workflow code while there are workflows running as it is going to break // determinism. The solution is to have both old code that is used to replay existing workflows @@ -418,8 +492,8 @@ const DefaultVersion Version = internal.DefaultVersion // } else { // err = workflow.ExecuteActivity(ctx, qux, data).Get(ctx, nil) // } -func GetVersion(ctx Context, changeID string, minSupported, maxSupported Version) Version { - return internal.GetVersion(ctx, changeID, minSupported, maxSupported) +func GetVersion(ctx Context, changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) Version { + return internal.GetVersion(ctx, changeID, minSupported, maxSupported, opts...) } // SetQueryHandler sets the query handler to handle workflow query. The queryType specify which query type this handler From d5eae9bf44e981bb43cdefb4133c97586a1a93db Mon Sep 17 00:00:00 2001 From: Seva Kaloshin Date: Thu, 19 Jun 2025 13:22:09 +0200 Subject: [PATCH 06/11] add TestVersionedWorkflows case --- test/replaytests/replay_test.go | 101 ++++++++++ test/replaytests/versioned_workflow.go | 117 ++++++++++++ test/replaytests/versioned_workflow_bar.json | 190 +++++++++++++++++++ test/replaytests/versioned_workflow_foo.json | 163 ++++++++++++++++ 4 files changed, 571 insertions(+) create mode 100644 test/replaytests/versioned_workflow.go create mode 100644 test/replaytests/versioned_workflow_bar.json create mode 100644 test/replaytests/versioned_workflow_foo.json diff --git a/test/replaytests/replay_test.go b/test/replaytests/replay_test.go index 5522ac951..9b7eb27bb 100644 --- a/test/replaytests/replay_test.go +++ b/test/replaytests/replay_test.go @@ -191,3 +191,104 @@ func TestContinueAsNew(t *testing.T) { err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "continue_as_new.json") assert.ErrorContains(t, err, "missing replay decision for WorkflowExecutionContinuedAsNew") } + +// TestSafeDeploymentVersionedWorkflow verifies that versioned workflows can be executed +// safely across different versions without causing non-deterministic errors. +// There are 2 workflow executions: +// +// * VersionedWorkflowFoo - which is the first version of the workflow which version of change id is DefaultVersion +// - This workflow is supposed to execute FooActivity +// +// * VersionedWorkflowBar - which is the second version of the workflow which version of change id is 1 +// - This workflow is supposed to execute BarActivity +// +// There are 4 versions of the workflow: +// +// * VersionedWorkflowV1 - which supports only DefaultVersion and executes FooActivity +// - This workflow is able to replay the history of only of VersionedWorkflowFoo +// +// * VersionedWorkflowV2 - which supports DefaultVersion and Version 1, and can execute FooActivity or BarActivity +// - This workflow is able to replay the history of both of VersionedWorkflowFoo and VersionedWorkflowBar +// - A first execution of this workflow will should execute FooActivity, because of usage workflow.ExecuteWithMinVersion(), +// but the test can't check it due to Replay +// +// * VersionedWorkflowV3 - which supports DefaultVersion and Version 1, and can execute FooActivity or BarActivity +// - This workflow is able to replay the history of both of VersionedWorkflowFoo and VersionedWorkflowBar +// - A first execution of this workflow will should execute BarActivity, but the test can't check it due to Replay +// +// * VersionedWorkflowV4 - which supports Version 1, and can execute BarActivity +// - This workflow is able to replay the history only of VersionedWorkflowBar +// +// So the test focusing workflows supports forward and backward compatibility of the workflows +func TestVersionedWorkflows(t *testing.T) { + const ( + versionedWorkflowFooHistoryFile = "versioned_workflow_foo.json" + versionedWorkflowBarHistoryFile = "versioned_workflow_bar.json" + ) + + t.Run("VersionedWorkflowV1", func(t *testing.T) { + replayer := worker.NewWorkflowReplayer() + replayer.RegisterWorkflowWithOptions(VersionedWorkflowV1, workflow.RegisterOptions{Name: versionedWorkflowName}) + replayer.RegisterActivityWithOptions(FooActivity, activity.RegisterOptions{Name: fooActivityName}) + + t.Run("successfully replayed with VersionedWorkflowFoo", func(t *testing.T) { + err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowFooHistoryFile) + require.NoError(t, err, "Failed to replay VersionedWorkflowFoo history") + }) + + t.Run("fail to replay with VersionedWorkflowBar", func(t *testing.T) { + err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowBarHistoryFile) + require.Error(t, err, "Expected to fail replaying VersionedWorkflowBar history") + }) + }) + + t.Run("VersionedWorkflowV2", func(t *testing.T) { + replayer := worker.NewWorkflowReplayer() + replayer.RegisterWorkflowWithOptions(VersionedWorkflowV2, workflow.RegisterOptions{Name: versionedWorkflowName}) + replayer.RegisterActivityWithOptions(FooActivity, activity.RegisterOptions{Name: fooActivityName}) + replayer.RegisterActivityWithOptions(BarActivity, activity.RegisterOptions{Name: barActivityName}) + + t.Run("successfully replayed with VersionedWorkflowFoo", func(t *testing.T) { + err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowFooHistoryFile) + require.NoError(t, err, "Failed to replay VersionedWorkflowFoo history") + }) + + t.Run("successfully replayed with VersionedWorkflowBar", func(t *testing.T) { + err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowBarHistoryFile) + require.NoError(t, err, "Failed to replay VersionedWorkflowBar history") + }) + }) + + t.Run("VersionedWorkflowV3", func(t *testing.T) { + replayer := worker.NewWorkflowReplayer() + replayer.RegisterWorkflowWithOptions(VersionedWorkflowV3, workflow.RegisterOptions{Name: versionedWorkflowName}) + replayer.RegisterActivityWithOptions(FooActivity, activity.RegisterOptions{Name: fooActivityName}) + replayer.RegisterActivityWithOptions(BarActivity, activity.RegisterOptions{Name: barActivityName}) + + t.Run("successfully replayed with VersionedWorkflowFoo", func(t *testing.T) { + err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowFooHistoryFile) + require.NoError(t, err, "Failed to replay VersionedWorkflowFoo history") + }) + + t.Run("successfully replayed with VersionedWorkflowBar", func(t *testing.T) { + err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowBarHistoryFile) + require.NoError(t, err, "Failed to replay VersionedWorkflowBar history") + }) + }) + + t.Run("VersionedWorkflowV4", func(t *testing.T) { + replayer := worker.NewWorkflowReplayer() + replayer.RegisterWorkflowWithOptions(VersionedWorkflowV4, workflow.RegisterOptions{Name: versionedWorkflowName}) + replayer.RegisterActivityWithOptions(BarActivity, activity.RegisterOptions{Name: barActivityName}) + + t.Run("fail to replay with VersionedWorkflowFoo", func(t *testing.T) { + err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowFooHistoryFile) + require.Error(t, err, "Expected to fail replaying VersionedWorkflowFoo history") + }) + + t.Run("successfully replayed with VersionedWorkflowBar", func(t *testing.T) { + err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowBarHistoryFile) + require.NoError(t, err, "Failed to replay VersionedWorkflowBar history") + }) + }) +} diff --git a/test/replaytests/versioned_workflow.go b/test/replaytests/versioned_workflow.go new file mode 100644 index 000000000..dde29207e --- /dev/null +++ b/test/replaytests/versioned_workflow.go @@ -0,0 +1,117 @@ +package replaytests + +import ( + "go.uber.org/cadence/workflow" + "time" +) + +const ( + // testChangeID is a constant used to identify the version change in the workflow. + testChangeID = "test-change" + + // fooActivityName and barActivityName are the names of the activities used in the workflows. + fooActivityName = "FooActivity" + barActivityName = "BarActivity" + + // versionedWorkflowName is the name of the versioned workflow. + versionedWorkflowName = "VersionedWorkflow" +) + +var activityOptions = workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute, + HeartbeatTimeout: time.Second * 20, +} + +// VersionedWorkflowV1 is the first version of the workflow, and it supports only DefaultVersion. +// It supports workflow executions started by this version VersionedWorkflowV1 +// and VersionedWorkflowV2, as all of them will have the change ID set to DefaultVersion. +func VersionedWorkflowV1(ctx workflow.Context, _ string) (string, error) { + ctx = workflow.WithActivityOptions(ctx, activityOptions) + + var result string + err := workflow.ExecuteActivity(ctx, fooActivityName, "").Get(ctx, &result) + if err != nil { + return "", err + } + + return result, nil +} + +// VersionedWorkflowV2 is the second version of the workflow. It supports DefaultVersion and Version 1. +// All workflows started by this version will have the change ID set to DefaultVersion. +// It supports workflow executions started by VersionedWorkflowV1 and VersionedWorkflowV2, +// as all of them will have the change ID set to DefaultVersion. +// It also supports workflow executions started by VersionedWorkflowV3 and VersionedWorkflowV4 +// because the code supports execution of Version 1 of the workflow. +func VersionedWorkflowV2(ctx workflow.Context, _ string) (string, error) { + ctx = workflow.WithActivityOptions(ctx, activityOptions) + + var result string + var err error + + version := workflow.GetVersion(ctx, testChangeID, workflow.DefaultVersion, 1, workflow.ExecuteWithMinVersion()) + if version == workflow.DefaultVersion { + err = workflow.ExecuteActivity(ctx, fooActivityName, "").Get(ctx, &result) + } else { + err = workflow.ExecuteActivity(ctx, barActivityName, "").Get(ctx, &result) + } + if err != nil { + return "", err + } + + return result, nil +} + +// VersionedWorkflowV3 is the third version of the workflow. It supports DefaultVersion and Version 1 as well. +// However, all workflows started by this version will have the change ID set to Version 1. +// It supports workflow executions started by VersionedWorkflowV1 and VersionedWorkflowV2, +// as all of them will have the change ID set to DefaultVersion, and it supports them. +// It also supports workflow executions started by VersionedWorkflowV3 and VersionedWorkflowV4, +// because the code supports execution of Version 1 of the workflow. +func VersionedWorkflowV3(ctx workflow.Context, _ string) (string, error) { + ctx = workflow.WithActivityOptions(ctx, activityOptions) + + var result string + var err error + + version := workflow.GetVersion(ctx, testChangeID, workflow.DefaultVersion, 1) + if version == workflow.DefaultVersion { + err = workflow.ExecuteActivity(ctx, fooActivityName, "").Get(ctx, &result) + } else { + err = workflow.ExecuteActivity(ctx, barActivityName, "").Get(ctx, &result) + } + if err != nil { + return "", err + } + + return result, nil +} + +// VersionedWorkflowV4 is the fourth version of the workflow. It supports only Version 1. +// All workflows started by this version will have the change ID set to Version 1. +// It supports workflow executions started by VersionedWorkflowV3 and VersionedWorkflowV4, +// as all of them will have the change ID set to Version 1. +func VersionedWorkflowV4(ctx workflow.Context, _ string) (string, error) { + ctx = workflow.WithActivityOptions(ctx, activityOptions) + + var result string + + workflow.GetVersion(ctx, testChangeID, 1, 1) + err := workflow.ExecuteActivity(ctx, barActivityName, "").Get(ctx, &result) + if err != nil { + return "", err + } + + return result, nil +} + +// FooActivity returns "foo" as a result of the activity execution. +func FooActivity(ctx workflow.Context, _ string) (string, error) { + return "foo", nil +} + +// BarActivity returns "bar" as a result of the activity execution. +func BarActivity(ctx workflow.Context, _ string) (string, error) { + return "bar", nil +} diff --git a/test/replaytests/versioned_workflow_bar.json b/test/replaytests/versioned_workflow_bar.json new file mode 100644 index 000000000..c089ef029 --- /dev/null +++ b/test/replaytests/versioned_workflow_bar.json @@ -0,0 +1,190 @@ +[ + { + "eventId": 1, + "timestamp": 1750328857588586000, + "eventType": "WorkflowExecutionStarted", + "version": 1, + "taskId": 2097310, + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "VersionedWorkflow" + }, + "taskList": { + "name": "helloWorldGroup" + }, + "input": "IkNhZGVuY2UiCg==", + "executionStartToCloseTimeoutSeconds": 60, + "taskStartToCloseTimeoutSeconds": 60, + "originalExecutionRunId": "0b70dc24-593e-44b3-9dc9-8be64e871742", + "identity": "90645@seva-NXG577QJ3G@@594a218f-225b-4bfe-8095-77e0ff639238", + "firstExecutionRunId": "0b70dc24-593e-44b3-9dc9-8be64e871742", + "firstDecisionTaskBackoffSeconds": 0, + "header": {}, + "PartitionConfig": null, + "requestId": "ba1cc767-df2b-46e5-ae85-d4c6ce9e4eff" + } + }, + { + "eventId": 2, + "timestamp": 1750328857588605000, + "eventType": "DecisionTaskScheduled", + "version": 1, + "taskId": 2097311, + "decisionTaskScheduledEventAttributes": { + "taskList": { + "name": "helloWorldGroup" + }, + "startToCloseTimeoutSeconds": 60 + } + }, + { + "eventId": 3, + "timestamp": 1750328857604778000, + "eventType": "DecisionTaskStarted", + "version": 1, + "taskId": 2097316, + "decisionTaskStartedEventAttributes": { + "scheduledEventId": 2, + "identity": "90644@seva-NXG577QJ3G@helloWorldGroup@087c6301-76ea-4a0b-b55c-7d955dfc2c8f", + "requestId": "e6822a8e-596c-42c8-9ed0-bcd9710a2492" + } + }, + { + "eventId": 4, + "timestamp": 1750328857616359000, + "eventType": "DecisionTaskCompleted", + "version": 1, + "taskId": 2097319, + "decisionTaskCompletedEventAttributes": { + "scheduledEventId": 2, + "startedEventId": 3, + "identity": "90644@seva-NXG577QJ3G@helloWorldGroup@087c6301-76ea-4a0b-b55c-7d955dfc2c8f", + "binaryChecksum": "10cedbfb9259c86c1afaa39ca7299c7e" + } + }, + { + "eventId": 5, + "timestamp": 1750328857616394000, + "eventType": "MarkerRecorded", + "version": 1, + "taskId": 2097320, + "markerRecordedEventAttributes": { + "markerName": "Version", + "details": "InRlc3QtY2hhbmdlIgoxCg==", + "decisionTaskCompletedEventId": 4 + } + }, + { + "eventId": 6, + "timestamp": 1750328857616410000, + "eventType": "UpsertWorkflowSearchAttributes", + "version": 1, + "taskId": 2097321, + "upsertWorkflowSearchAttributesEventAttributes": { + "decisionTaskCompletedEventId": 4, + "searchAttributes": { + "indexedFields": { + "CadenceChangeVersion": "WyJ0ZXN0LWNoYW5nZS0xIl0=" + } + } + } + }, + { + "eventId": 7, + "timestamp": 1750328857616426000, + "eventType": "ActivityTaskScheduled", + "version": 1, + "taskId": 2097322, + "activityTaskScheduledEventAttributes": { + "activityId": "0", + "activityType": { + "name": "BarActivity" + }, + "taskList": { + "name": "helloWorldGroup" + }, + "input": "ImRhdGEiCg==", + "scheduleToCloseTimeoutSeconds": 60, + "scheduleToStartTimeoutSeconds": 60, + "startToCloseTimeoutSeconds": 60, + "heartbeatTimeoutSeconds": 20, + "decisionTaskCompletedEventId": 4, + "header": {} + } + }, + { + "eventId": 8, + "timestamp": 1750328857616439000, + "eventType": "ActivityTaskStarted", + "version": 1, + "taskId": 2097323, + "activityTaskStartedEventAttributes": { + "scheduledEventId": 7, + "identity": "90644@seva-NXG577QJ3G@helloWorldGroup@087c6301-76ea-4a0b-b55c-7d955dfc2c8f", + "requestId": "250957bb-ecf7-42d2-a305-f4287b373e7b", + "lastFailureReason": "" + } + }, + { + "eventId": 9, + "timestamp": 1750328857628447000, + "eventType": "ActivityTaskCompleted", + "version": 1, + "taskId": 2097327, + "activityTaskCompletedEventAttributes": { + "result": "ImJhciIK", + "scheduledEventId": 7, + "startedEventId": 8, + "identity": "90644@seva-NXG577QJ3G@helloWorldGroup@087c6301-76ea-4a0b-b55c-7d955dfc2c8f" + } + }, + { + "eventId": 10, + "timestamp": 1750328857628453000, + "eventType": "DecisionTaskScheduled", + "version": 1, + "taskId": 2097329, + "decisionTaskScheduledEventAttributes": { + "taskList": { + "name": "seva-NXG577QJ3G:6f461e2d-872c-47ef-945f-1feead5ded84" + }, + "startToCloseTimeoutSeconds": 60 + } + }, + { + "eventId": 11, + "timestamp": 1750328857642536000, + "eventType": "DecisionTaskStarted", + "version": 1, + "taskId": 2097333, + "decisionTaskStartedEventAttributes": { + "scheduledEventId": 10, + "identity": "90644@seva-NXG577QJ3G@helloWorldGroup@087c6301-76ea-4a0b-b55c-7d955dfc2c8f", + "requestId": "4d10c8c1-1780-40b0-8388-c742725c4f90" + } + }, + { + "eventId": 12, + "timestamp": 1750328857656495000, + "eventType": "DecisionTaskCompleted", + "version": 1, + "taskId": 2097336, + "decisionTaskCompletedEventAttributes": { + "scheduledEventId": 10, + "startedEventId": 11, + "identity": "90644@seva-NXG577QJ3G@helloWorldGroup@087c6301-76ea-4a0b-b55c-7d955dfc2c8f", + "binaryChecksum": "10cedbfb9259c86c1afaa39ca7299c7e" + } + }, + { + "eventId": 13, + "timestamp": 1750328857656517000, + "eventType": "WorkflowExecutionCompleted", + "version": 1, + "taskId": 2097337, + "workflowExecutionCompletedEventAttributes": { + "result": "ImJhciIK", + "decisionTaskCompletedEventId": 12 + } + } +] \ No newline at end of file diff --git a/test/replaytests/versioned_workflow_foo.json b/test/replaytests/versioned_workflow_foo.json new file mode 100644 index 000000000..d71e54a61 --- /dev/null +++ b/test/replaytests/versioned_workflow_foo.json @@ -0,0 +1,163 @@ +[ + { + "eventId": 1, + "timestamp": 1750328817805228000, + "eventType": "WorkflowExecutionStarted", + "version": 1, + "taskId": 2097180, + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "VersionedWorkflow" + }, + "taskList": { + "name": "helloWorldGroup" + }, + "input": "IkNhZGVuY2UiCg==", + "executionStartToCloseTimeoutSeconds": 60, + "taskStartToCloseTimeoutSeconds": 60, + "originalExecutionRunId": "5e7665f5-9b14-4801-a0ba-8bbf800d8118", + "identity": "90385@seva-NXG577QJ3G@@8adf222d-05d5-4800-9b4a-cf8e117625ce", + "firstExecutionRunId": "5e7665f5-9b14-4801-a0ba-8bbf800d8118", + "firstDecisionTaskBackoffSeconds": 0, + "header": {}, + "PartitionConfig": null, + "requestId": "c8aaaeb7-7477-4fa3-92bd-8fc9b683b3d2" + } + }, + { + "eventId": 2, + "timestamp": 1750328817805247000, + "eventType": "DecisionTaskScheduled", + "version": 1, + "taskId": 2097181, + "decisionTaskScheduledEventAttributes": { + "taskList": { + "name": "helloWorldGroup" + }, + "startToCloseTimeoutSeconds": 60 + } + }, + { + "eventId": 3, + "timestamp": 1750328817825101000, + "eventType": "DecisionTaskStarted", + "version": 1, + "taskId": 2097186, + "decisionTaskStartedEventAttributes": { + "scheduledEventId": 2, + "identity": "90365@seva-NXG577QJ3G@helloWorldGroup@ae4320fa-15ba-4ed6-97c3-facf0f33ca0c", + "requestId": "cbced42e-e53c-458e-bcde-3cd5b49ab725" + } + }, + { + "eventId": 4, + "timestamp": 1750328817836846000, + "eventType": "DecisionTaskCompleted", + "version": 1, + "taskId": 2097189, + "decisionTaskCompletedEventAttributes": { + "scheduledEventId": 2, + "startedEventId": 3, + "identity": "90365@seva-NXG577QJ3G@helloWorldGroup@ae4320fa-15ba-4ed6-97c3-facf0f33ca0c", + "binaryChecksum": "a5cbe519e9c5b75a2bc2d1140c0cf0b4" + } + }, + { + "eventId": 5, + "timestamp": 1750328817836895000, + "eventType": "ActivityTaskScheduled", + "version": 1, + "taskId": 2097190, + "activityTaskScheduledEventAttributes": { + "activityId": "0", + "activityType": { + "name": "FooActivity" + }, + "taskList": { + "name": "helloWorldGroup" + }, + "input": "ImRhdGEiCg==", + "scheduleToCloseTimeoutSeconds": 60, + "scheduleToStartTimeoutSeconds": 60, + "startToCloseTimeoutSeconds": 60, + "heartbeatTimeoutSeconds": 20, + "decisionTaskCompletedEventId": 4, + "header": {} + } + }, + { + "eventId": 6, + "timestamp": 1750328817836910000, + "eventType": "ActivityTaskStarted", + "version": 1, + "taskId": 2097191, + "activityTaskStartedEventAttributes": { + "scheduledEventId": 5, + "identity": "90365@seva-NXG577QJ3G@helloWorldGroup@ae4320fa-15ba-4ed6-97c3-facf0f33ca0c", + "requestId": "1f6e8fb9-fd71-40af-8ea1-676b6d4f1d6b", + "lastFailureReason": "" + } + }, + { + "eventId": 7, + "timestamp": 1750328817847384000, + "eventType": "ActivityTaskCompleted", + "version": 1, + "taskId": 2097194, + "activityTaskCompletedEventAttributes": { + "result": "ImZvbyIK", + "scheduledEventId": 5, + "startedEventId": 6, + "identity": "90365@seva-NXG577QJ3G@helloWorldGroup@ae4320fa-15ba-4ed6-97c3-facf0f33ca0c" + } + }, + { + "eventId": 8, + "timestamp": 1750328817847391000, + "eventType": "DecisionTaskScheduled", + "version": 1, + "taskId": 2097196, + "decisionTaskScheduledEventAttributes": { + "taskList": { + "name": "seva-NXG577QJ3G:469cc23c-07d7-40b4-b409-c2a2ac12c1e1" + }, + "startToCloseTimeoutSeconds": 60 + } + }, + { + "eventId": 9, + "timestamp": 1750328817861606000, + "eventType": "DecisionTaskStarted", + "version": 1, + "taskId": 2097200, + "decisionTaskStartedEventAttributes": { + "scheduledEventId": 8, + "identity": "90365@seva-NXG577QJ3G@helloWorldGroup@ae4320fa-15ba-4ed6-97c3-facf0f33ca0c", + "requestId": "cef65775-3bf7-4427-9298-f9b16695568e" + } + }, + { + "eventId": 10, + "timestamp": 1750328817874304000, + "eventType": "DecisionTaskCompleted", + "version": 1, + "taskId": 2097203, + "decisionTaskCompletedEventAttributes": { + "scheduledEventId": 8, + "startedEventId": 9, + "identity": "90365@seva-NXG577QJ3G@helloWorldGroup@ae4320fa-15ba-4ed6-97c3-facf0f33ca0c", + "binaryChecksum": "a5cbe519e9c5b75a2bc2d1140c0cf0b4" + } + }, + { + "eventId": 11, + "timestamp": 1750328817874332000, + "eventType": "WorkflowExecutionCompleted", + "version": 1, + "taskId": 2097204, + "workflowExecutionCompletedEventAttributes": { + "result": "ImZvbyIK", + "decisionTaskCompletedEventId": 10 + } + } +] \ No newline at end of file From d847af8f3fc0b075f3eb25095f7461efc2df864c Mon Sep 17 00:00:00 2001 From: Seva Kaloshin Date: Thu, 19 Jun 2025 14:37:06 +0200 Subject: [PATCH 07/11] add integration test for versioned workflow --- test/integration_test.go | 128 ++++++++++++++++++------- test/replaytests/replay_test.go | 14 +-- test/replaytests/versioned_workflow.go | 65 +++++++++---- 3 files changed, 145 insertions(+), 62 deletions(-) diff --git a/test/integration_test.go b/test/integration_test.go index d201cd0fa..2a7bb8ad2 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -25,6 +25,7 @@ import ( "context" "errors" "fmt" + "go.uber.org/cadence/test/replaytests" "net" "strings" "sync" @@ -173,6 +174,7 @@ func (ts *IntegrationTestSuite) BeforeTest(suiteName, testName string) { ts.worker = worker.New(ts.rpcClient.Interface, domainName, ts.taskListName, options) ts.registerWorkflowsAndActivities(ts.worker) + ts.beforeVersionedWorkflowTest(testName, ts.worker) ts.Nil(ts.worker.Start()) } @@ -182,7 +184,7 @@ func (ts *IntegrationTestSuite) TearDownTest() { func (ts *IntegrationTestSuite) TestBasic() { var expected []string - err := ts.executeWorkflow("test-basic", ts.workflows.Basic, &expected) + _, err := ts.executeWorkflow("test-basic", ts.workflows.Basic, &expected) ts.NoError(err) ts.EqualValues(expected, ts.activities.invoked()) ts.Equal([]string{"ExecuteWorkflow begin", "ExecuteActivity", "ExecuteActivity", "ExecuteWorkflow end"}, @@ -191,27 +193,27 @@ func (ts *IntegrationTestSuite) TestBasic() { func (ts *IntegrationTestSuite) TestActivityRetryOnError() { var expected []string - err := ts.executeWorkflow("test-activity-retry-on-error", ts.workflows.ActivityRetryOnError, &expected) + _, err := ts.executeWorkflow("test-activity-retry-on-error", ts.workflows.ActivityRetryOnError, &expected) ts.NoError(err) ts.EqualValues(expected, ts.activities.invoked()) } func (ts *IntegrationTestSuite) TestActivityRetryOnTimeoutStableError() { var expected []string - err := ts.executeWorkflow("test-activity-retry-on-timeout-stable-error", ts.workflows.RetryTimeoutStableErrorWorkflow, &expected) + _, err := ts.executeWorkflow("test-activity-retry-on-timeout-stable-error", ts.workflows.RetryTimeoutStableErrorWorkflow, &expected) ts.Nil(err) } func (ts *IntegrationTestSuite) TestActivityRetryOptionsChange() { var expected []string - err := ts.executeWorkflow("test-activity-retry-options-change", ts.workflows.ActivityRetryOptionsChange, &expected) + _, err := ts.executeWorkflow("test-activity-retry-options-change", ts.workflows.ActivityRetryOptionsChange, &expected) ts.NoError(err) ts.EqualValues(expected, ts.activities.invoked()) } func (ts *IntegrationTestSuite) TestActivityRetryOnStartToCloseTimeout() { var expected []string - err := ts.executeWorkflow( + _, err := ts.executeWorkflow( "test-activity-retry-on-start2close-timeout", ts.workflows.ActivityRetryOnTimeout, &expected, @@ -223,21 +225,21 @@ func (ts *IntegrationTestSuite) TestActivityRetryOnStartToCloseTimeout() { func (ts *IntegrationTestSuite) TestActivityRetryOnHBTimeout() { var expected []string - err := ts.executeWorkflow("test-activity-retry-on-hbtimeout", ts.workflows.ActivityRetryOnHBTimeout, &expected) + _, err := ts.executeWorkflow("test-activity-retry-on-hbtimeout", ts.workflows.ActivityRetryOnHBTimeout, &expected) ts.NoError(err) ts.EqualValues(expected, ts.activities.invoked()) } func (ts *IntegrationTestSuite) TestActivityAutoHeartbeat() { var expected []string - err := ts.executeWorkflow("test-activity-auto-heartbeat", ts.workflows.ActivityAutoHeartbeat, &expected) + _, err := ts.executeWorkflow("test-activity-auto-heartbeat", ts.workflows.ActivityAutoHeartbeat, &expected) ts.NoError(err) ts.EqualValues(expected, ts.activities.invoked()) } func (ts *IntegrationTestSuite) TestContinueAsNew() { var result int - err := ts.executeWorkflow("test-continueasnew", ts.workflows.ContinueAsNew, &result, 4, ts.taskListName) + _, err := ts.executeWorkflow("test-continueasnew", ts.workflows.ContinueAsNew, &result, 4, ts.taskListName) ts.NoError(err) ts.Equal(999, result) } @@ -251,7 +253,7 @@ func (ts *IntegrationTestSuite) TestContinueAsNewCarryOver() { startOptions.SearchAttributes = map[string]interface{}{ "CustomKeywordField": "searchAttr", } - err := ts.executeWorkflowWithOption(startOptions, ts.workflows.ContinueAsNewWithOptions, &result, 4, ts.taskListName) + _, err := ts.executeWorkflowWithOption(startOptions, ts.workflows.ContinueAsNewWithOptions, &result, 4, ts.taskListName) ts.NoError(err) ts.Equal("memoVal,searchAttr", result) } @@ -320,7 +322,7 @@ func (ts *IntegrationTestSuite) TestConsistentQuery() { func (ts *IntegrationTestSuite) TestWorkflowIDReuseRejectDuplicate() { var result string - err := ts.executeWorkflow( + _, err := ts.executeWorkflow( "test-workflowidreuse-reject-duplicate", ts.workflows.IDReusePolicy, &result, @@ -338,7 +340,7 @@ func (ts *IntegrationTestSuite) TestWorkflowIDReuseRejectDuplicate() { func (ts *IntegrationTestSuite) TestWorkflowIDReuseAllowDuplicateFailedOnly1() { var result string - err := ts.executeWorkflow( + _, err := ts.executeWorkflow( "test-workflowidreuse-reject-duplicate-failed-only1", ts.workflows.IDReusePolicy, &result, @@ -356,7 +358,7 @@ func (ts *IntegrationTestSuite) TestWorkflowIDReuseAllowDuplicateFailedOnly1() { func (ts *IntegrationTestSuite) TestWorkflowIDReuseAllowDuplicateFailedOnly2() { var result string - err := ts.executeWorkflow( + _, err := ts.executeWorkflow( "test-workflowidreuse-reject-duplicate-failed-only2", ts.workflows.IDReusePolicy, &result, @@ -371,7 +373,7 @@ func (ts *IntegrationTestSuite) TestWorkflowIDReuseAllowDuplicateFailedOnly2() { func (ts *IntegrationTestSuite) TestWorkflowIDReuseAllowDuplicate() { var result string - err := ts.executeWorkflow( + _, err := ts.executeWorkflow( "test-workflowidreuse-allow-duplicate", ts.workflows.IDReusePolicy, &result, @@ -387,7 +389,7 @@ func (ts *IntegrationTestSuite) TestWorkflowIDReuseAllowDuplicate() { func (ts *IntegrationTestSuite) TestWorkflowIDReuseErrorViaStartWorkflow() { duplicatedWID := "test-workflowidreuse-duplicate-start-error" // setup: run any workflow once to consume the ID - err := ts.executeWorkflow( + _, err := ts.executeWorkflow( duplicatedWID, ts.workflows.SimplestWorkflow, nil, @@ -407,14 +409,14 @@ func (ts *IntegrationTestSuite) TestWorkflowIDReuseErrorViaStartWorkflow() { } func (ts *IntegrationTestSuite) TestChildWFRetryOnError() { - err := ts.executeWorkflow("test-childwf-retry-on-error", ts.workflows.ChildWorkflowRetryOnError, nil) + _, err := ts.executeWorkflow("test-childwf-retry-on-error", ts.workflows.ChildWorkflowRetryOnError, nil) ts.Error(err) ts.Truef(client.IsWorkflowError(err), "child error should be a workflow error: %#v", err) ts.EqualValues([]string{"toUpper", "toUpper", "toUpper"}, ts.activities.invoked()) } func (ts *IntegrationTestSuite) TestChildWFRetryOnTimeout() { - err := ts.executeWorkflow("test-childwf-retry-on-timeout", ts.workflows.ChildWorkflowRetryOnTimeout, nil) + _, err := ts.executeWorkflow("test-childwf-retry-on-timeout", ts.workflows.ChildWorkflowRetryOnTimeout, nil) ts.Error(err) ts.Truef(client.IsWorkflowError(err), "child-timeout error should be a workflow error: %#v", err) ts.EqualValues([]string{"sleep", "sleep", "sleep"}, ts.activities.invoked()) @@ -422,7 +424,7 @@ func (ts *IntegrationTestSuite) TestChildWFRetryOnTimeout() { func (ts *IntegrationTestSuite) TestChildWFWithMemoAndSearchAttributes() { var result string - err := ts.executeWorkflow("test-childwf-success-memo-searchAttr", ts.workflows.ChildWorkflowSuccess, &result) + _, err := ts.executeWorkflow("test-childwf-success-memo-searchAttr", ts.workflows.ChildWorkflowSuccess, &result) ts.NoError(err) ts.EqualValues([]string{"getMemoAndSearchAttr"}, ts.activities.invoked()) ts.Equal("memoVal, searchAttrVal", result) @@ -432,7 +434,7 @@ func (ts *IntegrationTestSuite) TestChildWFWithMemoAndSearchAttributes() { func (ts *IntegrationTestSuite) TestChildWFWithParentClosePolicyTerminate() { var childWorkflowID string - err := ts.executeWorkflow("test-childwf-parent-close-policy", ts.workflows.ChildWorkflowSuccessWithParentClosePolicyTerminate, &childWorkflowID) + _, err := ts.executeWorkflow("test-childwf-parent-close-policy", ts.workflows.ChildWorkflowSuccessWithParentClosePolicyTerminate, &childWorkflowID) ts.NoError(err) // Need to wait for child workflow to finish as well otherwise test becomes flaky ts.waitForWorkflowFinish(childWorkflowID, "") @@ -443,7 +445,7 @@ func (ts *IntegrationTestSuite) TestChildWFWithParentClosePolicyTerminate() { func (ts *IntegrationTestSuite) TestChildWFWithParentClosePolicyAbandon() { var childWorkflowID string - err := ts.executeWorkflow("test-childwf-parent-close-policy", ts.workflows.ChildWorkflowSuccessWithParentClosePolicyAbandon, &childWorkflowID) + _, err := ts.executeWorkflow("test-childwf-parent-close-policy", ts.workflows.ChildWorkflowSuccessWithParentClosePolicyAbandon, &childWorkflowID) ts.NoError(err) resp, err := ts.libClient.DescribeWorkflowExecution(context.Background(), childWorkflowID, "") ts.NoError(err) @@ -452,7 +454,7 @@ func (ts *IntegrationTestSuite) TestChildWFWithParentClosePolicyAbandon() { func (ts *IntegrationTestSuite) TestChildWFCancel() { var childWorkflowID string - err := ts.executeWorkflow("test-childwf-cancel", ts.workflows.ChildWorkflowCancel, &childWorkflowID) + _, err := ts.executeWorkflow("test-childwf-cancel", ts.workflows.ChildWorkflowCancel, &childWorkflowID) ts.NoError(err) resp, err := ts.libClient.DescribeWorkflowExecution(context.Background(), childWorkflowID, "") ts.NoError(err) @@ -468,14 +470,14 @@ func (ts *IntegrationTestSuite) TestActivityCancelUsingReplay() { func (ts *IntegrationTestSuite) TestActivityCancelRepro() { var expected []string - err := ts.executeWorkflow("test-activity-cancel-sm", ts.workflows.ActivityCancelRepro, &expected) + _, err := ts.executeWorkflow("test-activity-cancel-sm", ts.workflows.ActivityCancelRepro, &expected) ts.NoError(err) ts.EqualValues(expected, ts.activities.invoked()) } func (ts *IntegrationTestSuite) TestWorkflowWithLocalActivityCtxPropagation() { var expected string - err := ts.executeWorkflow("test-wf-local-activity-ctx-prop", ts.workflows.WorkflowWithLocalActivityCtxPropagation, &expected) + _, err := ts.executeWorkflow("test-wf-local-activity-ctx-prop", ts.workflows.WorkflowWithLocalActivityCtxPropagation, &expected) ts.NoError(err) ts.EqualValues(expected, "test-data-in-contexttest-data-in-context") } @@ -497,12 +499,12 @@ func (ts *IntegrationTestSuite) TestLargeQueryResultError() { } func (ts *IntegrationTestSuite) TestInspectActivityInfo() { - err := ts.executeWorkflow("test-activity-info", ts.workflows.InspectActivityInfo, nil) + _, err := ts.executeWorkflow("test-activity-info", ts.workflows.InspectActivityInfo, nil) ts.Nil(err) } func (ts *IntegrationTestSuite) TestInspectLocalActivityInfo() { - err := ts.executeWorkflow("test-local-activity-info", ts.workflows.InspectLocalActivityInfo, nil) + _, err := ts.executeWorkflow("test-local-activity-info", ts.workflows.InspectLocalActivityInfo, nil) ts.Nil(err) } @@ -523,7 +525,7 @@ func (ts *IntegrationTestSuite) TestDomainUpdate() { } func (ts *IntegrationTestSuite) TestNonDeterministicWorkflowFailPolicy() { - err := ts.executeWorkflow("test-nondeterminism-failpolicy", ts.workflows.NonDeterminismSimulatorWorkflow, nil) + _, err := ts.executeWorkflow("test-nondeterminism-failpolicy", ts.workflows.NonDeterminismSimulatorWorkflow, nil) var customErr *internal.CustomError ok := errors.As(err, &customErr) ts.Truef(ok, "expected CustomError but got %T", err) @@ -551,11 +553,71 @@ func (ts *IntegrationTestSuite) TestNonDeterministicWorkflowQuery() { func (ts *IntegrationTestSuite) TestOverrideSpanContext() { var result map[string]string - err := ts.executeWorkflow("test-override-span-context", ts.workflows.OverrideSpanContext, &result) + _, err := ts.executeWorkflow("test-override-span-context", ts.workflows.OverrideSpanContext, &result) ts.NoError(err) ts.Equal("some-value", result["mockpfx-baggage-some-key"]) } +// beforeVersionedWorkflowTest registers appropriate versioned workflow and activity to emulate the versioned workflow test. +func (ts *IntegrationTestSuite) beforeVersionedWorkflowTest(testName string, w worker.Worker) { + switch testName { + case "TestVersionedWorkflowV1": + replaytests.SetupWorkerForVersionedWorkflowV1(w) + case "TestVersionedWorkflowV2": + replaytests.SetupWorkerForVersionedWorkflowV2(w) + case "TestVersionedWorkflowV3": + replaytests.SetupWorkerForVersionedWorkflowV3(w) + case "TestVersionedWorkflowV4": + replaytests.SetupWorkerForVersionedWorkflowV4(w) + } +} + +// TestVersionedWorkflowV1 tests that a workflow started on the worker with VersionedWorkflowV1 can be replayed on worker with VersionedWorkflowV2 and VersionedWorkflowV3, but not on VersionedWorkflowV4. +func (ts *IntegrationTestSuite) TestVersionedWorkflowV1() { + execution, err := ts.executeWorkflow("test-versioned-workflow-v1", replaytests.VersionedWorkflowName, nil, "arg") + ts.NoError(err) + + ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV2, execution), "Failed to replay on the replayer with VersionedWorkflowV2") + ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV3, execution), "Failed to replay on the replayer withVersionedWorkflowV3") + ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV4, execution), "Expected to fail replaying the replayer with VersionedWorkflowV4") +} + +// TestVersionedWorkflowV1 tests that a workflow started on the worker with VersionedWorkflowV2 can be replayed on worker with VersionedWorkflowV1 and VersionedWorkflowV3, but not on VersionedWorkflowV4. +func (ts *IntegrationTestSuite) TestVersionedWorkflowV2() { + execution, err := ts.executeWorkflow("test-versioned-workflow-v2", replaytests.VersionedWorkflowName, nil, "arg") + ts.NoError(err) + + ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV1, execution), "Failed to replay on the replayer with VersionedWorkflowV1") + ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV3, execution), "Failed to replay on the replayer withVersionedWorkflowV3") + ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV4, execution), "Expected to fail replaying the replayer with VersionedWorkflowV4") +} + +// TestVersionedWorkflowV1 tests that a workflow started on the worker with VersionedWorkflowV3 can be replayed on worker with VersionedWorkflowV2 and VersionedWorkflowV4, but not on VersionedWorkflowV1. +func (ts *IntegrationTestSuite) TestVersionedWorkflowV3() { + execution, err := ts.executeWorkflow("test-versioned-workflow-v3", replaytests.VersionedWorkflowName, nil, "arg") + ts.NoError(err) + + ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV1, execution), "Expected to fail replaying the replayer with VersionedWorkflowV1") + ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV2, execution), "Failed to replay on the replayer with VersionedWorkflowV2") + ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV4, execution), "Failed to replay on the replayer with VersionedWorkflowV4") +} + +// TestVersionedWorkflowV1 tests that a workflow started on the worker with VersionedWorkflowV4 can be replayed on worker with VersionedWorkflowV2 and VersionedWorkflowV3, but not on VersionedWorkflowV1. +func (ts *IntegrationTestSuite) TestVersionedWorkflowV4() { + execution, err := ts.executeWorkflow("test-versioned-workflow-v4", replaytests.VersionedWorkflowName, nil, "arg") + ts.NoError(err) + + ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV1, execution), "Expected to fail replaying the replayer with VersionedWorkflowV1") + ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV2, execution), "Failed to replay on the replayer with VersionedWorkflowV2") + ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV3, execution), "Failed to replay on the replayer with VersionedWorkflowV3") +} + +func (ts *IntegrationTestSuite) replayVersionedWorkflow(setupWorkerFunc func(w worker.Registry), execution *workflow.Execution) error { + replayer := worker.NewWorkflowReplayer() + setupWorkerFunc(replayer) + return replayer.ReplayWorkflowExecution(context.Background(), ts.rpcClient, zaptest.NewLogger(ts.T()), domainName, *execution) +} + func (ts *IntegrationTestSuite) registerDomain() { ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) defer cancel() @@ -574,12 +636,12 @@ func (ts *IntegrationTestSuite) registerDomain() { time.Sleep(domainCacheRefreshInterval) // wait for domain cache refresh on cadence-server // bellow is used to guarantee domain is ready var dummyReturn string - err = ts.executeWorkflow("test-domain-exist", ts.workflows.SimplestWorkflow, &dummyReturn) + _, err = ts.executeWorkflow("test-domain-exist", ts.workflows.SimplestWorkflow, &dummyReturn) numOfRetry := 20 for err != nil && numOfRetry >= 0 { if _, ok := err.(*shared.EntityNotExistsError); ok { time.Sleep(domainCacheRefreshInterval) - err = ts.executeWorkflow("test-domain-exist", ts.workflows.SimplestWorkflow, &dummyReturn) + _, err = ts.executeWorkflow("test-domain-exist", ts.workflows.SimplestWorkflow, &dummyReturn) } else { break } @@ -588,21 +650,19 @@ func (ts *IntegrationTestSuite) registerDomain() { } // executeWorkflow executes a given workflow and waits for the result -func (ts *IntegrationTestSuite) executeWorkflow( - wfID string, wfFunc interface{}, retValPtr interface{}, args ...interface{}) error { +func (ts *IntegrationTestSuite) executeWorkflow(wfID string, wfFunc interface{}, retValPtr interface{}, args ...interface{}) (*workflow.Execution, error) { options := ts.startWorkflowOptions(wfID) return ts.executeWorkflowWithOption(options, wfFunc, retValPtr, args...) } -func (ts *IntegrationTestSuite) executeWorkflowWithOption( - options client.StartWorkflowOptions, wfFunc interface{}, retValPtr interface{}, args ...interface{}) error { +func (ts *IntegrationTestSuite) executeWorkflowWithOption(options client.StartWorkflowOptions, wfFunc interface{}, retValPtr interface{}, args ...interface{}) (*workflow.Execution, error) { ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) defer cancel() span := ts.workflows.tracer.StartSpan("test-workflow") defer span.Finish() execution, err := ts.libClient.StartWorkflow(ctx, options, wfFunc, args...) if err != nil { - return err + return nil, err } run := ts.libClient.GetWorkflow(ctx, execution.ID, execution.RunID) err = run.Get(ctx, retValPtr) @@ -617,7 +677,7 @@ func (ts *IntegrationTestSuite) executeWorkflowWithOption( logger.Info(event.String()) } } - return err + return execution, err } func (ts *IntegrationTestSuite) startWorkflowOptions(wfID string) client.StartWorkflowOptions { diff --git a/test/replaytests/replay_test.go b/test/replaytests/replay_test.go index 9b7eb27bb..6eb2cb4b6 100644 --- a/test/replaytests/replay_test.go +++ b/test/replaytests/replay_test.go @@ -228,8 +228,7 @@ func TestVersionedWorkflows(t *testing.T) { t.Run("VersionedWorkflowV1", func(t *testing.T) { replayer := worker.NewWorkflowReplayer() - replayer.RegisterWorkflowWithOptions(VersionedWorkflowV1, workflow.RegisterOptions{Name: versionedWorkflowName}) - replayer.RegisterActivityWithOptions(FooActivity, activity.RegisterOptions{Name: fooActivityName}) + SetupWorkerForVersionedWorkflowV1(replayer) t.Run("successfully replayed with VersionedWorkflowFoo", func(t *testing.T) { err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowFooHistoryFile) @@ -244,9 +243,7 @@ func TestVersionedWorkflows(t *testing.T) { t.Run("VersionedWorkflowV2", func(t *testing.T) { replayer := worker.NewWorkflowReplayer() - replayer.RegisterWorkflowWithOptions(VersionedWorkflowV2, workflow.RegisterOptions{Name: versionedWorkflowName}) - replayer.RegisterActivityWithOptions(FooActivity, activity.RegisterOptions{Name: fooActivityName}) - replayer.RegisterActivityWithOptions(BarActivity, activity.RegisterOptions{Name: barActivityName}) + SetupWorkerForVersionedWorkflowV2(replayer) t.Run("successfully replayed with VersionedWorkflowFoo", func(t *testing.T) { err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowFooHistoryFile) @@ -261,9 +258,7 @@ func TestVersionedWorkflows(t *testing.T) { t.Run("VersionedWorkflowV3", func(t *testing.T) { replayer := worker.NewWorkflowReplayer() - replayer.RegisterWorkflowWithOptions(VersionedWorkflowV3, workflow.RegisterOptions{Name: versionedWorkflowName}) - replayer.RegisterActivityWithOptions(FooActivity, activity.RegisterOptions{Name: fooActivityName}) - replayer.RegisterActivityWithOptions(BarActivity, activity.RegisterOptions{Name: barActivityName}) + SetupWorkerForVersionedWorkflowV3(replayer) t.Run("successfully replayed with VersionedWorkflowFoo", func(t *testing.T) { err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowFooHistoryFile) @@ -278,8 +273,7 @@ func TestVersionedWorkflows(t *testing.T) { t.Run("VersionedWorkflowV4", func(t *testing.T) { replayer := worker.NewWorkflowReplayer() - replayer.RegisterWorkflowWithOptions(VersionedWorkflowV4, workflow.RegisterOptions{Name: versionedWorkflowName}) - replayer.RegisterActivityWithOptions(BarActivity, activity.RegisterOptions{Name: barActivityName}) + SetupWorkerForVersionedWorkflowV4(replayer) t.Run("fail to replay with VersionedWorkflowFoo", func(t *testing.T) { err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowFooHistoryFile) diff --git a/test/replaytests/versioned_workflow.go b/test/replaytests/versioned_workflow.go index dde29207e..b0fb1dc7d 100644 --- a/test/replaytests/versioned_workflow.go +++ b/test/replaytests/versioned_workflow.go @@ -1,20 +1,23 @@ package replaytests import ( + "context" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/worker" "go.uber.org/cadence/workflow" "time" ) const ( - // testChangeID is a constant used to identify the version change in the workflow. - testChangeID = "test-change" + // TestChangeID is a constant used to identify the version change in the workflow. + TestChangeID = "test-change" - // fooActivityName and barActivityName are the names of the activities used in the workflows. - fooActivityName = "FooActivity" - barActivityName = "BarActivity" + // FooActivityName and BarActivityName are the names of the activities used in the workflows. + FooActivityName = "FooActivity" + BarActivityName = "BarActivity" - // versionedWorkflowName is the name of the versioned workflow. - versionedWorkflowName = "VersionedWorkflow" + // VersionedWorkflowName is the name of the versioned workflow. + VersionedWorkflowName = "VersionedWorkflow" ) var activityOptions = workflow.ActivityOptions{ @@ -30,7 +33,7 @@ func VersionedWorkflowV1(ctx workflow.Context, _ string) (string, error) { ctx = workflow.WithActivityOptions(ctx, activityOptions) var result string - err := workflow.ExecuteActivity(ctx, fooActivityName, "").Get(ctx, &result) + err := workflow.ExecuteActivity(ctx, FooActivityName, "data").Get(ctx, &result) if err != nil { return "", err } @@ -50,11 +53,11 @@ func VersionedWorkflowV2(ctx workflow.Context, _ string) (string, error) { var result string var err error - version := workflow.GetVersion(ctx, testChangeID, workflow.DefaultVersion, 1, workflow.ExecuteWithMinVersion()) + version := workflow.GetVersion(ctx, TestChangeID, workflow.DefaultVersion, 1, workflow.ExecuteWithMinVersion()) if version == workflow.DefaultVersion { - err = workflow.ExecuteActivity(ctx, fooActivityName, "").Get(ctx, &result) + err = workflow.ExecuteActivity(ctx, FooActivityName, "data").Get(ctx, &result) } else { - err = workflow.ExecuteActivity(ctx, barActivityName, "").Get(ctx, &result) + err = workflow.ExecuteActivity(ctx, BarActivityName, "data").Get(ctx, &result) } if err != nil { return "", err @@ -75,11 +78,11 @@ func VersionedWorkflowV3(ctx workflow.Context, _ string) (string, error) { var result string var err error - version := workflow.GetVersion(ctx, testChangeID, workflow.DefaultVersion, 1) + version := workflow.GetVersion(ctx, TestChangeID, workflow.DefaultVersion, 1) if version == workflow.DefaultVersion { - err = workflow.ExecuteActivity(ctx, fooActivityName, "").Get(ctx, &result) + err = workflow.ExecuteActivity(ctx, FooActivityName, "data").Get(ctx, &result) } else { - err = workflow.ExecuteActivity(ctx, barActivityName, "").Get(ctx, &result) + err = workflow.ExecuteActivity(ctx, BarActivityName, "data").Get(ctx, &result) } if err != nil { return "", err @@ -97,8 +100,8 @@ func VersionedWorkflowV4(ctx workflow.Context, _ string) (string, error) { var result string - workflow.GetVersion(ctx, testChangeID, 1, 1) - err := workflow.ExecuteActivity(ctx, barActivityName, "").Get(ctx, &result) + workflow.GetVersion(ctx, TestChangeID, 1, 1) + err := workflow.ExecuteActivity(ctx, BarActivityName, "data").Get(ctx, &result) if err != nil { return "", err } @@ -107,11 +110,37 @@ func VersionedWorkflowV4(ctx workflow.Context, _ string) (string, error) { } // FooActivity returns "foo" as a result of the activity execution. -func FooActivity(ctx workflow.Context, _ string) (string, error) { +func FooActivity(ctx context.Context, _ string) (string, error) { return "foo", nil } // BarActivity returns "bar" as a result of the activity execution. -func BarActivity(ctx workflow.Context, _ string) (string, error) { +func BarActivity(ctx context.Context, _ string) (string, error) { return "bar", nil } + +// SetupWorkerForVersionedWorkflowV1 registers VersionedWorkflowV1 and FooActivity +func SetupWorkerForVersionedWorkflowV1(w worker.Registry) { + w.RegisterWorkflowWithOptions(VersionedWorkflowV1, workflow.RegisterOptions{Name: VersionedWorkflowName}) + w.RegisterActivityWithOptions(FooActivity, activity.RegisterOptions{Name: FooActivityName}) +} + +// SetupWorkerForVersionedWorkflowV2 registers VersionedWorkflowV2, FooActivity, and BarActivity +func SetupWorkerForVersionedWorkflowV2(w worker.Registry) { + w.RegisterWorkflowWithOptions(VersionedWorkflowV2, workflow.RegisterOptions{Name: VersionedWorkflowName}) + w.RegisterActivityWithOptions(FooActivity, activity.RegisterOptions{Name: FooActivityName}) + w.RegisterActivityWithOptions(BarActivity, activity.RegisterOptions{Name: BarActivityName}) +} + +// SetupWorkerForVersionedWorkflowV3 registers VersionedWorkflowV3, FooActivity, and BarActivity +func SetupWorkerForVersionedWorkflowV3(w worker.Registry) { + w.RegisterWorkflowWithOptions(VersionedWorkflowV3, workflow.RegisterOptions{Name: VersionedWorkflowName}) + w.RegisterActivityWithOptions(FooActivity, activity.RegisterOptions{Name: FooActivityName}) + w.RegisterActivityWithOptions(BarActivity, activity.RegisterOptions{Name: BarActivityName}) +} + +// SetupWorkerForVersionedWorkflowV4 registers VersionedWorkflowV4 and BarActivity +func SetupWorkerForVersionedWorkflowV4(w worker.Registry) { + w.RegisterWorkflowWithOptions(VersionedWorkflowV4, workflow.RegisterOptions{Name: VersionedWorkflowName}) + w.RegisterActivityWithOptions(BarActivity, activity.RegisterOptions{Name: BarActivityName}) +} From 0f89a3c3586c755cf9b0826a45cbe48ee102f957 Mon Sep 17 00:00:00 2001 From: Seva Kaloshin Date: Thu, 19 Jun 2025 14:37:58 +0200 Subject: [PATCH 08/11] fix fmt --- test/integration_test.go | 3 ++- test/replaytests/versioned_workflow.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/test/integration_test.go b/test/integration_test.go index 2a7bb8ad2..aa449e840 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -25,13 +25,14 @@ import ( "context" "errors" "fmt" - "go.uber.org/cadence/test/replaytests" "net" "strings" "sync" "testing" "time" + "go.uber.org/cadence/test/replaytests" + "github.com/pborman/uuid" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" diff --git a/test/replaytests/versioned_workflow.go b/test/replaytests/versioned_workflow.go index b0fb1dc7d..75b60bc4c 100644 --- a/test/replaytests/versioned_workflow.go +++ b/test/replaytests/versioned_workflow.go @@ -2,10 +2,11 @@ package replaytests import ( "context" + "time" + "go.uber.org/cadence/activity" "go.uber.org/cadence/worker" "go.uber.org/cadence/workflow" - "time" ) const ( From 7ad84319b539b01a38c33a5b6142304048529ce5 Mon Sep 17 00:00:00 2001 From: Seva Kaloshin Date: Fri, 20 Jun 2025 14:06:05 +0200 Subject: [PATCH 09/11] add V5 and V6 versions --- test/integration_test.go | 60 ++++++++++++++++++++--- test/replaytests/replay_test.go | 39 +++++++++++++++ test/replaytests/versioned_workflow.go | 66 ++++++++++++++++++++++++++ 3 files changed, 159 insertions(+), 6 deletions(-) diff --git a/test/integration_test.go b/test/integration_test.go index aa449e840..866bd9a65 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -570,30 +570,44 @@ func (ts *IntegrationTestSuite) beforeVersionedWorkflowTest(testName string, w w replaytests.SetupWorkerForVersionedWorkflowV3(w) case "TestVersionedWorkflowV4": replaytests.SetupWorkerForVersionedWorkflowV4(w) + case "TestVersionedWorkflowV5": + replaytests.SetupWorkerForVersionedWorkflowV5(w) + case "TestVersionedWorkflowV6": + replaytests.SetupWorkerForVersionedWorkflowV6(w) } } -// TestVersionedWorkflowV1 tests that a workflow started on the worker with VersionedWorkflowV1 can be replayed on worker with VersionedWorkflowV2 and VersionedWorkflowV3, but not on VersionedWorkflowV4. +// TestVersionedWorkflowV1 tests that a workflow started on the worker with VersionedWorkflowV1 +// can be replayed on worker with VersionedWorkflowV2 and VersionedWorkflowV3, +// but not on VersionedWorkflowV4, VersionedWorkflowV5, VersionedWorkflowV6. func (ts *IntegrationTestSuite) TestVersionedWorkflowV1() { execution, err := ts.executeWorkflow("test-versioned-workflow-v1", replaytests.VersionedWorkflowName, nil, "arg") ts.NoError(err) ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV2, execution), "Failed to replay on the replayer with VersionedWorkflowV2") - ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV3, execution), "Failed to replay on the replayer withVersionedWorkflowV3") + ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV3, execution), "Failed to replay on the replayer with VersionedWorkflowV3") ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV4, execution), "Expected to fail replaying the replayer with VersionedWorkflowV4") + ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV5, execution), "Expected to fail replaying the replayer with VersionedWorkflowV5") + ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV6, execution), "Expected to fail replaying the replayer with VersionedWorkflowV6") } -// TestVersionedWorkflowV1 tests that a workflow started on the worker with VersionedWorkflowV2 can be replayed on worker with VersionedWorkflowV1 and VersionedWorkflowV3, but not on VersionedWorkflowV4. +// TestVersionedWorkflowV2 tests that a workflow started on the worker with VersionedWorkflowV2 +// can be replayed on worker with VersionedWorkflowV1 and VersionedWorkflowV3, +// but not on VersionedWorkflowV4, VersionedWorkflowV5, VersionedWorkflowV6. func (ts *IntegrationTestSuite) TestVersionedWorkflowV2() { execution, err := ts.executeWorkflow("test-versioned-workflow-v2", replaytests.VersionedWorkflowName, nil, "arg") ts.NoError(err) ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV1, execution), "Failed to replay on the replayer with VersionedWorkflowV1") - ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV3, execution), "Failed to replay on the replayer withVersionedWorkflowV3") + ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV3, execution), "Failed to replay on the replayer with VersionedWorkflowV3") ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV4, execution), "Expected to fail replaying the replayer with VersionedWorkflowV4") + ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV5, execution), "Expected to fail replaying the replayer with VersionedWorkflowV5") + ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV6, execution), "Expected to fail replaying the replayer with VersionedWorkflowV6") } -// TestVersionedWorkflowV1 tests that a workflow started on the worker with VersionedWorkflowV3 can be replayed on worker with VersionedWorkflowV2 and VersionedWorkflowV4, but not on VersionedWorkflowV1. +// TestVersionedWorkflowV3 tests that a workflow started on the worker with VersionedWorkflowV3 +// can be replayed on worker with VersionedWorkflowV2, VersionedWorkflowV4, VersionedWorkflowV5, VersionedWorkflowV6 +// but not on VersionedWorkflowV1 func (ts *IntegrationTestSuite) TestVersionedWorkflowV3() { execution, err := ts.executeWorkflow("test-versioned-workflow-v3", replaytests.VersionedWorkflowName, nil, "arg") ts.NoError(err) @@ -601,9 +615,13 @@ func (ts *IntegrationTestSuite) TestVersionedWorkflowV3() { ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV1, execution), "Expected to fail replaying the replayer with VersionedWorkflowV1") ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV2, execution), "Failed to replay on the replayer with VersionedWorkflowV2") ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV4, execution), "Failed to replay on the replayer with VersionedWorkflowV4") + ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV5, execution), "Failed to replay on the replayer with VersionedWorkflowV5") + ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV6, execution), "Failed to replay on the replayer with VersionedWorkflowV6") } -// TestVersionedWorkflowV1 tests that a workflow started on the worker with VersionedWorkflowV4 can be replayed on worker with VersionedWorkflowV2 and VersionedWorkflowV3, but not on VersionedWorkflowV1. +// TestVersionedWorkflowV4 tests that a workflow started on the worker with VersionedWorkflowV4 +// can be replayed on worker with VersionedWorkflowV2, VersionedWorkflowV3, VersionedWorkflowV5, VersionedWorkflowV6 +// but not on VersionedWorkflowV1 func (ts *IntegrationTestSuite) TestVersionedWorkflowV4() { execution, err := ts.executeWorkflow("test-versioned-workflow-v4", replaytests.VersionedWorkflowName, nil, "arg") ts.NoError(err) @@ -611,6 +629,36 @@ func (ts *IntegrationTestSuite) TestVersionedWorkflowV4() { ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV1, execution), "Expected to fail replaying the replayer with VersionedWorkflowV1") ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV2, execution), "Failed to replay on the replayer with VersionedWorkflowV2") ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV3, execution), "Failed to replay on the replayer with VersionedWorkflowV3") + ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV5, execution), "Failed to replay on the replayer with VersionedWorkflowV5") + ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV6, execution), "Failed to replay on the replayer with VersionedWorkflowV6") +} + +// TestVersionedWorkflowV5 tests that a workflow started on the worker with VersionedWorkflowV5 +// can be replayed on worker with VersionedWorkflowV2, VersionedWorkflowV3, VersionedWorkflowV4, VersionedWorkflowV6, +// but not on VersionedWorkflowV1. +func (ts *IntegrationTestSuite) TestVersionedWorkflowV5() { + execution, err := ts.executeWorkflow("test-versioned-workflow-v5", replaytests.VersionedWorkflowName, nil, "arg") + ts.NoError(err) + + ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV1, execution), "Expected to fail replaying the replayer with VersionedWorkflowV1") + ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV2, execution), "Failed to replay on the replayer with VersionedWorkflowV2") + ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV3, execution), "Failed to replay on the replayer with VersionedWorkflowV3") + ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV4, execution), "Failed to replay on the replayer with VersionedWorkflowV4") + ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV6, execution), "Failed to replay on the replayer with VersionedWorkflowV6") +} + +// TestVersionedWorkflowV6 tests that a workflow started on the worker with VersionedWorkflowV6 +// can be replayed on worker with VersionedWorkflowV5 +// but not on VersionedWorkflowV1, VersionedWorkflowV2, VersionedWorkflowV3, VersionedWorkflowV4. +func (ts *IntegrationTestSuite) TestVersionedWorkflowV6() { + execution, err := ts.executeWorkflow("test-versioned-workflow-v6", replaytests.VersionedWorkflowName, nil, "arg") + ts.NoError(err) + + ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV1, execution), "Expected to fail replaying the replayer with VersionedWorkflowV1") + ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV2, execution), "Expected to fail replaying the replayer with VersionedWorkflowV2") + ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV3, execution), "Expected to fail replaying the replayer with VersionedWorkflowV3") + ts.Error(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV4, execution), "Expected to fail replaying the replayer with VersionedWorkflowV4") + ts.NoError(ts.replayVersionedWorkflow(replaytests.SetupWorkerForVersionedWorkflowV5, execution), "Failed to replay on the replayer with VersionedWorkflowV5") } func (ts *IntegrationTestSuite) replayVersionedWorkflow(setupWorkerFunc func(w worker.Registry), execution *workflow.Execution) error { diff --git a/test/replaytests/replay_test.go b/test/replaytests/replay_test.go index 6eb2cb4b6..3b584218e 100644 --- a/test/replaytests/replay_test.go +++ b/test/replaytests/replay_test.go @@ -219,6 +219,15 @@ func TestContinueAsNew(t *testing.T) { // * VersionedWorkflowV4 - which supports Version 1, and can execute BarActivity // - This workflow is able to replay the history only of VersionedWorkflowBar // +// * VersionedWorkflowV5 - which supports Version 1 and 2, and can execute BarActivity and BazActivity +// - This workflow is able to replay the history only of VersionedWorkflowBar +// - A first execution of this workflow will should execute BarActivity, because of usage workflow.ExecuteWithMinVersion(), +// but the test can't check it due to Replay +// +// * VersionedWorkflowV6 - which supports Version 1 and 2, and can execute BarActivity and BazActivity +// - This workflow is able to replay the history only of VersionedWorkflowBar +// - A first execution of this workflow will should execute BazActivity, but the test can't check it due to Replay +// // So the test focusing workflows supports forward and backward compatibility of the workflows func TestVersionedWorkflows(t *testing.T) { const ( @@ -285,4 +294,34 @@ func TestVersionedWorkflows(t *testing.T) { require.NoError(t, err, "Failed to replay VersionedWorkflowBar history") }) }) + + t.Run("VersionedWorkflowV5", func(t *testing.T) { + replayer := worker.NewWorkflowReplayer() + SetupWorkerForVersionedWorkflowV5(replayer) + + t.Run("fail to replay with VersionedWorkflowFoo", func(t *testing.T) { + err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowFooHistoryFile) + require.Error(t, err, "Expected to fail replaying VersionedWorkflowFoo history") + }) + + t.Run("successfully replayed with VersionedWorkflowBar", func(t *testing.T) { + err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowBarHistoryFile) + require.NoError(t, err, "Failed to replay VersionedWorkflowBar history") + }) + }) + + t.Run("VersionedWorkflowV6", func(t *testing.T) { + replayer := worker.NewWorkflowReplayer() + SetupWorkerForVersionedWorkflowV6(replayer) + + t.Run("fail to replay with VersionedWorkflowFoo", func(t *testing.T) { + err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowFooHistoryFile) + require.Error(t, err, "Expected to fail replaying VersionedWorkflowFoo history") + }) + + t.Run("successfully replayed with VersionedWorkflowBar", func(t *testing.T) { + err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), versionedWorkflowBarHistoryFile) + require.NoError(t, err, "Failed to replay VersionedWorkflowBar history") + }) + }) } diff --git a/test/replaytests/versioned_workflow.go b/test/replaytests/versioned_workflow.go index 75b60bc4c..515c39c07 100644 --- a/test/replaytests/versioned_workflow.go +++ b/test/replaytests/versioned_workflow.go @@ -16,6 +16,7 @@ const ( // FooActivityName and BarActivityName are the names of the activities used in the workflows. FooActivityName = "FooActivity" BarActivityName = "BarActivity" + BazActivityName = "BazActivity" // VersionedWorkflowName is the name of the versioned workflow. VersionedWorkflowName = "VersionedWorkflow" @@ -110,6 +111,52 @@ func VersionedWorkflowV4(ctx workflow.Context, _ string) (string, error) { return result, nil } +// VersionedWorkflowV5 is the fifth version of the workflow. It supports Version 1 and 2. +// All workflows started by this version will have the change ID set to Version 1. +// It supports workflow executions started by VersionedWorkflowV3, VersionedWorkflowV4, +// VersionedWorkflowV5, VersionedWorkflowV6 +func VersionedWorkflowV5(ctx workflow.Context, _ string) (string, error) { + ctx = workflow.WithActivityOptions(ctx, activityOptions) + + var result string + var err error + + version := workflow.GetVersion(ctx, TestChangeID, 1, 2, workflow.ExecuteWithVersion(1)) + if version == 1 { + err = workflow.ExecuteActivity(ctx, BarActivityName, "data").Get(ctx, &result) + } else { + err = workflow.ExecuteActivity(ctx, BazActivityName, "data").Get(ctx, &result) + } + if err != nil { + return "", err + } + + return result, nil +} + +// VersionedWorkflowV6 is the sixth version of the workflow. It supports Version 1 and 2. +// All workflows started by this version will have the change ID set to Version 2. +// It supports workflow executions started by VersionedWorkflowV3, VersionedWorkflowV4, +// VersionedWorkflowV5, VersionedWorkflowV6 +func VersionedWorkflowV6(ctx workflow.Context, _ string) (string, error) { + ctx = workflow.WithActivityOptions(ctx, activityOptions) + + var result string + var err error + + version := workflow.GetVersion(ctx, TestChangeID, 1, 2) + if version == 1 { + err = workflow.ExecuteActivity(ctx, BarActivityName, "data").Get(ctx, &result) + } else { + err = workflow.ExecuteActivity(ctx, BazActivityName, "data").Get(ctx, &result) + } + if err != nil { + return "", err + } + + return result, nil +} + // FooActivity returns "foo" as a result of the activity execution. func FooActivity(ctx context.Context, _ string) (string, error) { return "foo", nil @@ -120,6 +167,11 @@ func BarActivity(ctx context.Context, _ string) (string, error) { return "bar", nil } +// BazActivity returns "baz" as a result of the activity execution. +func BazActivity(ctx context.Context, _ string) (string, error) { + return "baz", nil +} + // SetupWorkerForVersionedWorkflowV1 registers VersionedWorkflowV1 and FooActivity func SetupWorkerForVersionedWorkflowV1(w worker.Registry) { w.RegisterWorkflowWithOptions(VersionedWorkflowV1, workflow.RegisterOptions{Name: VersionedWorkflowName}) @@ -145,3 +197,17 @@ func SetupWorkerForVersionedWorkflowV4(w worker.Registry) { w.RegisterWorkflowWithOptions(VersionedWorkflowV4, workflow.RegisterOptions{Name: VersionedWorkflowName}) w.RegisterActivityWithOptions(BarActivity, activity.RegisterOptions{Name: BarActivityName}) } + +// SetupWorkerForVersionedWorkflowV5 registers VersionedWorkflowV6, BarActivity and BazActivity +func SetupWorkerForVersionedWorkflowV5(w worker.Registry) { + w.RegisterWorkflowWithOptions(VersionedWorkflowV5, workflow.RegisterOptions{Name: VersionedWorkflowName}) + w.RegisterActivityWithOptions(BarActivity, activity.RegisterOptions{Name: BarActivityName}) + w.RegisterActivityWithOptions(BazActivity, activity.RegisterOptions{Name: BazActivityName}) +} + +// SetupWorkerForVersionedWorkflowV6 registers VersionedWorkflowV6, BarActivity and BazActivity +func SetupWorkerForVersionedWorkflowV6(w worker.Registry) { + w.RegisterWorkflowWithOptions(VersionedWorkflowV6, workflow.RegisterOptions{Name: VersionedWorkflowName}) + w.RegisterActivityWithOptions(BarActivity, activity.RegisterOptions{Name: BarActivityName}) + w.RegisterActivityWithOptions(BazActivity, activity.RegisterOptions{Name: BazActivityName}) +} From 5168891594a4be36decfc15e93fd225f5273764e Mon Sep 17 00:00:00 2001 From: Seva Kaloshin Date: Fri, 20 Jun 2025 14:30:46 +0200 Subject: [PATCH 10/11] add support of ExecuteWithMinVersion and ExecuteWithVersion to testWorkflowEnvironmentImpl --- internal/internal_event_handlers.go | 1 - internal/internal_workflow_testsuite.go | 40 ++++++++- internal/internal_workflow_testsuite_test.go | 86 ++++++++++++++++++++ 3 files changed, 123 insertions(+), 4 deletions(-) diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index 36bfc0d18..fec4d7e28 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -655,7 +655,6 @@ func (wc *workflowEnvironmentImpl) GetVersion(changeID string, minSupported, max // Store the version in the changeVersions // ensuring that it can be retrieved later wc.changeVersions[changeID] = version - return version } diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index 122be55de..a46b2b961 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -1947,9 +1947,43 @@ func (env *testWorkflowEnvironmentImpl) GetVersion(changeID string, minSupported validateVersion(changeID, version, minSupported, maxSupported) return version } - env.UpsertSearchAttributes(createSearchAttributesForChangeVersion(changeID, maxSupported, env.changeVersions)) - env.changeVersions[changeID] = maxSupported - return maxSupported + + // Apply options to determine which version to use + options := &GetVersionOptions{} + for _, opt := range opts { + opt(options) + } + + // Determine the version to use based on the options provided + var version Version + switch { + // If ExecuteWithVersion option is used, use the custom version provided + case options.CustomVersion != nil: + version = *options.CustomVersion + + // If ExecuteWithMinVersion option is set, use the minimum supported version + case options.UseMinVersion: + version = minSupported + + // Otherwise, use the maximum supported version + default: + version = maxSupported + } + + // Validate the version against the min and max supported versions + // ensuring it is within the acceptable range + validateVersion(changeID, version, minSupported, maxSupported) + + // If the version is not the DefaultVersion, update search attributes + // Keeping the DefaultVersion as a special case where no version marker is recorded + if version != DefaultVersion { + env.UpsertSearchAttributes(createSearchAttributesForChangeVersion(changeID, version, env.changeVersions)) + } + + // Store the version in the changeVersions + // ensuring that it can be retrieved later + env.changeVersions[changeID] = version + return version } func (env *testWorkflowEnvironmentImpl) getMockedVersion(mockedChangeID, changeID string, minSupported, maxSupported Version) (Version, bool) { diff --git a/internal/internal_workflow_testsuite_test.go b/internal/internal_workflow_testsuite_test.go index c5a7b0aaf..9d11a1903 100644 --- a/internal/internal_workflow_testsuite_test.go +++ b/internal/internal_workflow_testsuite_test.go @@ -1274,6 +1274,92 @@ func (s *WorkflowTestSuiteUnitTest) Test_GetVersion() { env.AssertExpectations(s.T()) } +func (s *WorkflowTestSuiteUnitTest) Test_GetVersion_ExecuteWithMinVersion() { + oldActivity := func(ctx context.Context, msg string) (string, error) { + return "hello" + "_" + msg, nil + } + newActivity := func(ctx context.Context, msg string) (string, error) { + return "hello" + "_" + msg, nil + } + workflowFn := func(ctx Context) error { + ctx = WithActivityOptions(ctx, s.activityOptions) + var f Future + v := GetVersion(ctx, "test_change_id", DefaultVersion, 2, ExecuteWithMinVersion()) + if v == DefaultVersion { + f = ExecuteActivity(ctx, oldActivity, "ols_msg") + } else { + f = ExecuteActivity(ctx, newActivity, "new_msg") + } + err := f.Get(ctx, nil) // wait for result + if err != nil { + return err + } + + // test no search attributes + wfInfo := GetWorkflowInfo(ctx) + s.Nil(wfInfo.SearchAttributes) + return err + } + + env := s.NewTestWorkflowEnvironment() + env.RegisterWorkflow(workflowFn) + env.RegisterActivity(oldActivity) + env.RegisterActivity(newActivity) + env.OnActivity(oldActivity, mock.Anything, "ols_msg").Return("hello_ols_msg", nil).Once() + env.ExecuteWorkflow(workflowFn) + + s.True(env.IsWorkflowCompleted()) + s.Nil(env.GetWorkflowError()) + env.AssertExpectations(s.T()) +} + +func (s *WorkflowTestSuiteUnitTest) Test_GetVersion_ExecuteWithVersion() { + oldActivity := func(ctx context.Context, msg string) (string, error) { + return "hello" + "_" + msg, nil + } + newActivity := func(ctx context.Context, msg string) (string, error) { + return "hello" + "_" + msg, nil + } + workflowFn := func(ctx Context) error { + ctx = WithActivityOptions(ctx, s.activityOptions) + var f Future + v := GetVersion(ctx, "test_change_id", DefaultVersion, 2, ExecuteWithVersion(1)) + if v == DefaultVersion { + f = ExecuteActivity(ctx, oldActivity, "ols_msg") + } else { + f = ExecuteActivity(ctx, newActivity, "new_msg") + } + err := f.Get(ctx, nil) // wait for result + if err != nil { + return err + } + + // test searchable change version + wfInfo := GetWorkflowInfo(ctx) + s.NotNil(wfInfo.SearchAttributes) + changeVersionsBytes, ok := wfInfo.SearchAttributes.IndexedFields[CadenceChangeVersion] + s.True(ok) + var changeVersions []string + err = json.Unmarshal(changeVersionsBytes, &changeVersions) + s.NoError(err) + s.Equal(1, len(changeVersions)) + s.Equal("test_change_id-1", changeVersions[0]) + + return err + } + + env := s.NewTestWorkflowEnvironment() + env.RegisterWorkflow(workflowFn) + env.RegisterActivity(oldActivity) + env.RegisterActivity(newActivity) + env.OnActivity(newActivity, mock.Anything, "new_msg").Return("hello new_mock_msg", nil).Once() + env.ExecuteWorkflow(workflowFn) + + s.True(env.IsWorkflowCompleted()) + s.Nil(env.GetWorkflowError()) + env.AssertExpectations(s.T()) +} + func (s *WorkflowTestSuiteUnitTest) Test_MockGetVersion() { oldActivity := func(ctx context.Context, msg string) (string, error) { return "hello" + "_" + msg, nil From 542876aa98ed575c26ddea890c82b77abd372243 Mon Sep 17 00:00:00 2001 From: Seva Kaloshin Date: Fri, 20 Jun 2025 14:40:06 +0200 Subject: [PATCH 11/11] add tests in Test_MockGetVersion --- internal/internal_workflow_testsuite.go | 2 +- internal/internal_workflow_testsuite_test.go | 136 +++++++++++++++++++ internal/workflow_testsuite.go | 4 +- 3 files changed, 139 insertions(+), 3 deletions(-) diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index a46b2b961..10ef3e24b 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -1975,7 +1975,7 @@ func (env *testWorkflowEnvironmentImpl) GetVersion(changeID string, minSupported validateVersion(changeID, version, minSupported, maxSupported) // If the version is not the DefaultVersion, update search attributes - // Keeping the DefaultVersion as a special case where no version marker is recorded + // Keeping the DefaultVersion as a special case where no search attributes are updated if version != DefaultVersion { env.UpsertSearchAttributes(createSearchAttributesForChangeVersion(changeID, version, env.changeVersions)) } diff --git a/internal/internal_workflow_testsuite_test.go b/internal/internal_workflow_testsuite_test.go index 9d11a1903..af0ec0e4a 100644 --- a/internal/internal_workflow_testsuite_test.go +++ b/internal/internal_workflow_testsuite_test.go @@ -1360,6 +1360,142 @@ func (s *WorkflowTestSuiteUnitTest) Test_GetVersion_ExecuteWithVersion() { env.AssertExpectations(s.T()) } +func (s *WorkflowTestSuiteUnitTest) Test_MockGetVersion_ExecuteWithVersion() { + oldActivity := func(ctx context.Context, msg string) (string, error) { + return "hello" + "_" + msg, nil + } + newActivity := func(ctx context.Context, msg string) (string, error) { + return "hello" + "_" + msg, nil + } + workflowFn := func(ctx Context) (string, error) { + ctx = WithActivityOptions(ctx, s.activityOptions) + var f Future + v1 := GetVersion(ctx, "change_1", DefaultVersion, 2, ExecuteWithVersion(1)) + if v1 == DefaultVersion { + f = ExecuteActivity(ctx, oldActivity, "old1") + } else { + f = ExecuteActivity(ctx, newActivity, "new1") + } + var ret1 string + err := f.Get(ctx, &ret1) // wait for result + if err != nil { + return "", err + } + + v2 := GetVersion(ctx, "change_2", DefaultVersion, 2) + if v2 == DefaultVersion { + f = ExecuteActivity(ctx, oldActivity, "old2") + } else { + f = ExecuteActivity(ctx, newActivity, "new2") + } + var ret2 string + err = f.Get(ctx, &ret2) // wait for result + if err != nil { + return "", err + } + + // test searchable change version + wfInfo := GetWorkflowInfo(ctx) + s.NotNil(wfInfo.SearchAttributes) + changeVersionsBytes, ok := wfInfo.SearchAttributes.IndexedFields[CadenceChangeVersion] + s.True(ok) + var changeVersions []string + err = json.Unmarshal(changeVersionsBytes, &changeVersions) + s.NoError(err) + s.Equal(2, len(changeVersions)) + s.Equal("change_2-2", changeVersions[0]) + s.Equal("change_1--1", changeVersions[1]) + + return ret1 + ret2, err + } + + env := s.NewTestWorkflowEnvironment() + env.RegisterWorkflow(workflowFn) + env.RegisterActivity(oldActivity) + env.RegisterActivity(newActivity) + + env.OnGetVersion("change_1", DefaultVersion, 2).Return(func(string, Version, Version) Version { + return DefaultVersion + }) + env.OnGetVersion(mock.Anything, DefaultVersion, 2).Return(Version(2)) + env.ExecuteWorkflow(workflowFn) + + s.True(env.IsWorkflowCompleted()) + s.Nil(env.GetWorkflowError()) + var ret string + s.NoError(env.GetWorkflowResult(&ret)) + s.Equal("hello_old1hello_new2", ret) + env.AssertExpectations(s.T()) +} + +func (s *WorkflowTestSuiteUnitTest) Test_MockGetVersion_ExecuteWithMinVersion() { + oldActivity := func(ctx context.Context, msg string) (string, error) { + return "hello" + "_" + msg, nil + } + newActivity := func(ctx context.Context, msg string) (string, error) { + return "hello" + "_" + msg, nil + } + workflowFn := func(ctx Context) (string, error) { + ctx = WithActivityOptions(ctx, s.activityOptions) + var f Future + v1 := GetVersion(ctx, "change_1", DefaultVersion, 2) + if v1 == DefaultVersion { + f = ExecuteActivity(ctx, oldActivity, "old1") + } else { + f = ExecuteActivity(ctx, newActivity, "new1") + } + var ret1 string + err := f.Get(ctx, &ret1) // wait for result + if err != nil { + return "", err + } + + v2 := GetVersion(ctx, "change_2", DefaultVersion, 2, ExecuteWithMinVersion()) + if v2 == DefaultVersion { + f = ExecuteActivity(ctx, oldActivity, "old2") + } else { + f = ExecuteActivity(ctx, newActivity, "new2") + } + var ret2 string + err = f.Get(ctx, &ret2) // wait for result + if err != nil { + return "", err + } + + // test searchable change version + wfInfo := GetWorkflowInfo(ctx) + s.NotNil(wfInfo.SearchAttributes) + changeVersionsBytes, ok := wfInfo.SearchAttributes.IndexedFields[CadenceChangeVersion] + s.True(ok) + var changeVersions []string + err = json.Unmarshal(changeVersionsBytes, &changeVersions) + s.NoError(err) + s.Equal(2, len(changeVersions)) + s.Equal("change_2-2", changeVersions[0]) + s.Equal("change_1--1", changeVersions[1]) + + return ret1 + ret2, err + } + + env := s.NewTestWorkflowEnvironment() + env.RegisterWorkflow(workflowFn) + env.RegisterActivity(oldActivity) + env.RegisterActivity(newActivity) + + env.OnGetVersion("change_1", DefaultVersion, 2).Return(func(string, Version, Version) Version { + return DefaultVersion + }) + env.OnGetVersion(mock.Anything, DefaultVersion, 2).Return(Version(2)) + env.ExecuteWorkflow(workflowFn) + + s.True(env.IsWorkflowCompleted()) + s.Nil(env.GetWorkflowError()) + var ret string + s.NoError(env.GetWorkflowResult(&ret)) + s.Equal("hello_old1hello_new2", ret) + env.AssertExpectations(s.T()) +} + func (s *WorkflowTestSuiteUnitTest) Test_MockGetVersion() { oldActivity := func(ctx context.Context, msg string) (string, error) { return "hello" + "_" + msg, nil diff --git a/internal/workflow_testsuite.go b/internal/workflow_testsuite.go index a11404ecf..12412ba90 100644 --- a/internal/workflow_testsuite.go +++ b/internal/workflow_testsuite.go @@ -365,8 +365,8 @@ func (t *TestWorkflowEnvironment) OnRequestCancelExternalWorkflow(domainName, wo } // OnGetVersion setup a mock for workflow.GetVersion() call. By default, if mock is not setup, the GetVersion call from -// workflow code will always return the maxSupported version. Make it not possible to test old version branch. With this -// mock support, it is possible to test code branch for different versions. +// workflow code will always return the maxSupported version or version specified by GetVersionOption. +// Make it not possible to test old version branch. With this mock support, it is possible to test code branch for different versions. // // Note: mock can be setup for a specific changeID. Or if mock.Anything is used as changeID then all calls to GetVersion // will be mocked. Mock for a specific changeID has higher priority over mock.Anything.