Skip to content
6 changes: 3 additions & 3 deletions internal/interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type WorkflowInterceptor interface {
GetSignalChannel(ctx Context, signalName string) Channel
SideEffect(ctx Context, f func(ctx Context) interface{}) Value
MutableSideEffect(ctx Context, id string, f func(ctx Context) interface{}, equals func(a, b interface{}) bool) Value
GetVersion(ctx Context, changeID string, minSupported, maxSupported Version) Version
GetVersion(ctx Context, changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) Version
SetQueryHandler(ctx Context, queryType string, handler interface{}) error
IsReplaying(ctx Context) bool
HasLastCompletionResult(ctx Context) bool
Expand Down Expand Up @@ -158,8 +158,8 @@ func (t *WorkflowInterceptorBase) MutableSideEffect(ctx Context, id string, f fu
}

// GetVersion forwards to t.Next
func (t *WorkflowInterceptorBase) GetVersion(ctx Context, changeID string, minSupported, maxSupported Version) Version {
return t.Next.GetVersion(ctx, changeID, minSupported, maxSupported)
func (t *WorkflowInterceptorBase) GetVersion(ctx Context, changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) Version {
return t.Next.GetVersion(ctx, changeID, minSupported, maxSupported, opts...)
}

// SetQueryHandler forwards to t.Next
Expand Down
44 changes: 37 additions & 7 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,28 +602,58 @@ func validateVersion(changeID string, version, minSupported, maxSupported Versio
}
}

func (wc *workflowEnvironmentImpl) GetVersion(changeID string, minSupported, maxSupported Version) Version {
func (wc *workflowEnvironmentImpl) GetVersion(changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) Version {
// Check if the changeID already has a version assigned
// If it does, validate the version against the min and max supported versions
// ensuring it is within the acceptable range
if version, ok := wc.changeVersions[changeID]; ok {
validateVersion(changeID, version, minSupported, maxSupported)
return version
}

// Apply options to determine which version to use
options := &GetVersionOptions{}
for _, opt := range opts {
opt(options)
}

var version Version
if wc.isReplay {
// GetVersion for changeID is called first time in replay mode, use DefaultVersion
switch {

// GetVersion for changeID is called first time in replay mode, use DefaultVersion
case wc.isReplay:
version = DefaultVersion
} else {
// GetVersion for changeID is called first time (non-replay mode), generate a marker decision for it.
// Also upsert search attributes to enable ability to search by changeVersion.

// If ExecuteWithVersion option is used, use the custom version provided
case options.CustomVersion != nil:
version = *options.CustomVersion

// If ExecuteWithMinVersion option is set, use the minimum supported version
case options.UseMinVersion:
version = minSupported

// Otherwise, use the maximum supported version
default:
version = maxSupported
}

// Validate the version against the min and max supported versions
// ensuring it is within the acceptable range
validateVersion(changeID, version, minSupported, maxSupported)

// If the version is not the DefaultVersion, and it's not a replay, record it and update search attributes
// Keeping the DefaultVersion as a special case where no version marker is recorded
if !wc.isReplay && version != DefaultVersion {
// Record the version marker and update search attributes
wc.decisionsHelper.recordVersionMarker(changeID, version, wc.GetDataConverter())
err := wc.UpsertSearchAttributes(createSearchAttributesForChangeVersion(changeID, version, wc.changeVersions))
if err != nil {
wc.logger.Warn("UpsertSearchAttributes failed", zap.Error(err))
}
}

validateVersion(changeID, version, minSupported, maxSupported)
// Store the version in the changeVersions
// ensuring that it can be retrieved later
wc.changeVersions[changeID] = version
return version
}
Expand Down
79 changes: 72 additions & 7 deletions internal/internal_event_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,22 @@ func TestGetVersion(t *testing.T) {
res := weh.GetVersion("test", 1, 3)
assert.Equal(t, Version(2), res)
})
t.Run("version exists, ExecuteWithVersion is used", func(t *testing.T) {
weh := testWorkflowExecutionEventHandler(t, newRegistry())
weh.changeVersions = map[string]Version{
"test": 2,
}
res := weh.GetVersion("test", 1, 3, ExecuteWithVersion(3))
assert.Equal(t, Version(2), res)
})
t.Run("version exists, ExecuteWithMinVersion is used", func(t *testing.T) {
weh := testWorkflowExecutionEventHandler(t, newRegistry())
weh.changeVersions = map[string]Version{
"test": 2,
}
res := weh.GetVersion("test", 1, 3, ExecuteWithMinVersion())
assert.Equal(t, Version(2), res)
})
t.Run("version doesn't exist in replay", func(t *testing.T) {
weh := testWorkflowExecutionEventHandler(t, newRegistry())
weh.isReplay = true
Expand All @@ -770,6 +786,55 @@ func TestGetVersion(t *testing.T) {
assert.Equal(t, Version(3), weh.changeVersions["test"])
assert.Equal(t, []byte(`["test-3"]`), weh.workflowInfo.SearchAttributes.IndexedFields[CadenceChangeVersion], "ensure search attributes are updated")
})
t.Run("version doesn't exist, ExecuteWithVersion is used", func(t *testing.T) {
weh := testWorkflowExecutionEventHandler(t, newRegistry())
res := weh.GetVersion("test", DefaultVersion, 3, ExecuteWithVersion(2))
assert.Equal(t, Version(2), res)
require.Contains(t, weh.changeVersions, "test")
assert.Equal(t, Version(2), weh.changeVersions["test"])
assert.Equal(t, []byte(`["test-2"]`), weh.workflowInfo.SearchAttributes.IndexedFields[CadenceChangeVersion], "ensure search attributes are updated")
})
t.Run("version doesn't exist, ExecuteWithVersion is used, DefaultVersion is used", func(t *testing.T) {
weh := testWorkflowExecutionEventHandler(t, newRegistry())
res := weh.GetVersion("test", DefaultVersion, 3, ExecuteWithVersion(DefaultVersion))
assert.Equal(t, DefaultVersion, res)
require.Contains(t, weh.changeVersions, "test")
assert.Equal(t, DefaultVersion, weh.changeVersions["test"])
require.Nil(t, weh.workflowInfo.SearchAttributes, "ensure search attributes are not updated")
})
t.Run("version doesn't exist, ExecuteWithMinVersion is used, min is non DefaultVersion", func(t *testing.T) {
weh := testWorkflowExecutionEventHandler(t, newRegistry())
res := weh.GetVersion("test", 1, 3, ExecuteWithMinVersion())
assert.Equal(t, Version(1), res)
require.Contains(t, weh.changeVersions, "test")
assert.Equal(t, Version(1), weh.changeVersions["test"])
assert.Equal(t, []byte(`["test-1"]`), weh.workflowInfo.SearchAttributes.IndexedFields[CadenceChangeVersion], "ensure search attributes are updated")
})
t.Run("version doesn't exist, ExecuteWithMinVersion is used, DefaultVersion is used", func(t *testing.T) {
weh := testWorkflowExecutionEventHandler(t, newRegistry())
res := weh.GetVersion("test", DefaultVersion, 3, ExecuteWithMinVersion())
assert.Equal(t, DefaultVersion, res)
require.Contains(t, weh.changeVersions, "test")
assert.Equal(t, DefaultVersion, weh.changeVersions["test"])
require.Nil(t, weh.workflowInfo.SearchAttributes, "ensure search attributes are not updated")
})

t.Run("version doesn't exist, ExecuteWithVersion is used, version > maximum version", func(t *testing.T) {
weh := testWorkflowExecutionEventHandler(t, newRegistry())
assert.PanicsWithValue(t, `Workflow code is too old to support version 10 for "test" changeID. The maximum supported version is 3`, func() {
weh.GetVersion("test", DefaultVersion, 3, ExecuteWithVersion(10))
})

require.Nil(t, weh.workflowInfo.SearchAttributes, "ensure search attributes are not updated")
})
t.Run("version doesn't exist, ExecuteWithVersion is used, version < minimum version", func(t *testing.T) {
weh := testWorkflowExecutionEventHandler(t, newRegistry())
assert.PanicsWithValue(t, `Workflow code removed support of version 0. for "test" changeID. The oldest supported version is 1`, func() {
weh.GetVersion("test", 1, 3, ExecuteWithVersion(0))
})

require.Nil(t, weh.workflowInfo.SearchAttributes, "ensure search attributes are not updated")
})
}

func TestMutableSideEffect(t *testing.T) {
Expand Down Expand Up @@ -982,6 +1047,13 @@ func TestWorkflowExecutionEnvironment_NewTimer_immediate_calls(t *testing.T) {
}

func testWorkflowExecutionEventHandler(t *testing.T, registry *registry) *workflowExecutionEventHandlerImpl {
var testWorkflowInfo = &WorkflowInfo{
WorkflowType: WorkflowType{
Name: "test",
Path: "",
},
}

return newWorkflowExecutionEventHandler(
testWorkflowInfo,
func(result []byte, err error) {},
Expand All @@ -996,13 +1068,6 @@ func testWorkflowExecutionEventHandler(t *testing.T, registry *registry) *workfl
).(*workflowExecutionEventHandlerImpl)
}

var testWorkflowInfo = &WorkflowInfo{
WorkflowType: WorkflowType{
Name: "test",
Path: "",
},
}

func getSerializedDetails[T, V any](t *testing.T, id T, data V) []byte {
converter := defaultDataConverter{}
res, err := converter.ToData(id, data)
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type (
localActivityClient
workflowTimerClient
SideEffect(f func() ([]byte, error), callback resultHandler)
GetVersion(changeID string, minSupported, maxSupported Version) Version
GetVersion(changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) Version
WorkflowInfo() *WorkflowInfo
Complete(result []byte, err error)
RegisterCancelHandler(handler func())
Expand Down
42 changes: 38 additions & 4 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -1928,7 +1928,7 @@ func (env *testWorkflowEnvironmentImpl) SideEffect(f func() ([]byte, error), cal
callback(f())
}

func (env *testWorkflowEnvironmentImpl) GetVersion(changeID string, minSupported, maxSupported Version) (retVersion Version) {
func (env *testWorkflowEnvironmentImpl) GetVersion(changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) (retVersion Version) {
if mockVersion, ok := env.getMockedVersion(changeID, changeID, minSupported, maxSupported); ok {
// GetVersion for changeID is mocked
env.UpsertSearchAttributes(createSearchAttributesForChangeVersion(changeID, mockVersion, env.changeVersions))
Expand All @@ -1947,9 +1947,43 @@ func (env *testWorkflowEnvironmentImpl) GetVersion(changeID string, minSupported
validateVersion(changeID, version, minSupported, maxSupported)
return version
}
env.UpsertSearchAttributes(createSearchAttributesForChangeVersion(changeID, maxSupported, env.changeVersions))
env.changeVersions[changeID] = maxSupported
return maxSupported

// Apply options to determine which version to use
options := &GetVersionOptions{}
for _, opt := range opts {
opt(options)
}

// Determine the version to use based on the options provided
var version Version
switch {
// If ExecuteWithVersion option is used, use the custom version provided
case options.CustomVersion != nil:
version = *options.CustomVersion

// If ExecuteWithMinVersion option is set, use the minimum supported version
case options.UseMinVersion:
version = minSupported

// Otherwise, use the maximum supported version
default:
version = maxSupported
}

// Validate the version against the min and max supported versions
// ensuring it is within the acceptable range
validateVersion(changeID, version, minSupported, maxSupported)

// If the version is not the DefaultVersion, update search attributes
// Keeping the DefaultVersion as a special case where no search attributes are updated
if version != DefaultVersion {
env.UpsertSearchAttributes(createSearchAttributesForChangeVersion(changeID, version, env.changeVersions))
}

// Store the version in the changeVersions
// ensuring that it can be retrieved later
env.changeVersions[changeID] = version
return version
}

func (env *testWorkflowEnvironmentImpl) getMockedVersion(mockedChangeID, changeID string, minSupported, maxSupported Version) (Version, bool) {
Expand Down
Loading
Loading