diff --git a/contrib/datadog/go.mod b/contrib/datadog/go.mod index 8d23a395f..30923f4f9 100644 --- a/contrib/datadog/go.mod +++ b/contrib/datadog/go.mod @@ -81,7 +81,7 @@ require ( go.opentelemetry.io/otel/metric v1.40.0 // indirect go.opentelemetry.io/otel/sdk v1.40.0 // indirect go.opentelemetry.io/otel/trace v1.40.0 // indirect - go.temporal.io/api v1.62.5 // indirect + go.temporal.io/api v1.62.6 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect diff --git a/contrib/datadog/go.sum b/contrib/datadog/go.sum index 362921b37..91dba3a03 100644 --- a/contrib/datadog/go.sum +++ b/contrib/datadog/go.sum @@ -233,8 +233,8 @@ go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.0.1 h1:T go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.0.1/go.mod h1:riqUmAOJFDFuIAzZu/3V6cOrTyfWzpgNJnG5UwrapCk= go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.0.1 h1:z/oMlrCv3Kopwh/dtdRagJy+qsRRPA86/Ux3g7+zFXM= go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.0.1/go.mod h1:C7EHYSIiaALi9RnNORCVaPCQDuJgJEn/XxkctaTez1E= -go.temporal.io/api v1.62.5 h1:9R/9CeyM7xqHSlsNt+QIvapQLcRxCZ38bnXQx4mCN6I= -go.temporal.io/api v1.62.5/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.6 h1:JLH8y9URdY9WbdvwMXfGknlhohoPBrXOc9AbQkPInOc= +go.temporal.io/api v1.62.6/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= diff --git a/contrib/envconfig/go.mod b/contrib/envconfig/go.mod index 4102550eb..e54724dfd 100644 --- a/contrib/envconfig/go.mod +++ b/contrib/envconfig/go.mod @@ -22,7 +22,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect - go.temporal.io/api v1.62.5 // indirect + go.temporal.io/api v1.62.6 // indirect golang.org/x/net v0.39.0 // indirect golang.org/x/sync v0.13.0 // indirect golang.org/x/sys v0.32.0 // indirect diff --git a/contrib/envconfig/go.sum b/contrib/envconfig/go.sum index b447e9495..9063e33ce 100644 --- a/contrib/envconfig/go.sum +++ b/contrib/envconfig/go.sum @@ -37,8 +37,8 @@ github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.temporal.io/api v1.62.5 h1:9R/9CeyM7xqHSlsNt+QIvapQLcRxCZ38bnXQx4mCN6I= -go.temporal.io/api v1.62.5/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.6 h1:JLH8y9URdY9WbdvwMXfGknlhohoPBrXOc9AbQkPInOc= +go.temporal.io/api v1.62.6/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/contrib/opentelemetry/go.mod b/contrib/opentelemetry/go.mod index 67b93849b..e7df81e55 100644 --- a/contrib/opentelemetry/go.mod +++ b/contrib/opentelemetry/go.mod @@ -32,7 +32,7 @@ require ( github.com/stretchr/objx v0.5.2 // indirect go.opentelemetry.io/otel/metric v1.40.0 go.opentelemetry.io/otel/sdk/metric v1.40.0 - go.temporal.io/api v1.62.5 // indirect + go.temporal.io/api v1.62.6 // indirect golang.org/x/net v0.39.0 // indirect golang.org/x/sys v0.40.0 // indirect golang.org/x/text v0.24.0 // indirect diff --git a/contrib/opentelemetry/go.sum b/contrib/opentelemetry/go.sum index e8ea4bff6..64e6d2e37 100644 --- a/contrib/opentelemetry/go.sum +++ b/contrib/opentelemetry/go.sum @@ -54,8 +54,8 @@ go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4A go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg= go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw= go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= -go.temporal.io/api v1.62.5 h1:9R/9CeyM7xqHSlsNt+QIvapQLcRxCZ38bnXQx4mCN6I= -go.temporal.io/api v1.62.5/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.6 h1:JLH8y9URdY9WbdvwMXfGknlhohoPBrXOc9AbQkPInOc= +go.temporal.io/api v1.62.6/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/contrib/opentracing/go.mod b/contrib/opentracing/go.mod index d8567c4ce..3a6659f03 100644 --- a/contrib/opentracing/go.mod +++ b/contrib/opentracing/go.mod @@ -22,7 +22,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect - go.temporal.io/api v1.62.5 // indirect + go.temporal.io/api v1.62.6 // indirect golang.org/x/net v0.39.0 // indirect golang.org/x/sync v0.13.0 // indirect golang.org/x/sys v0.32.0 // indirect diff --git a/contrib/opentracing/go.sum b/contrib/opentracing/go.sum index c1ff05fbb..615bfddb3 100644 --- a/contrib/opentracing/go.sum +++ b/contrib/opentracing/go.sum @@ -40,8 +40,8 @@ github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.temporal.io/api v1.62.5 h1:9R/9CeyM7xqHSlsNt+QIvapQLcRxCZ38bnXQx4mCN6I= -go.temporal.io/api v1.62.5/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.6 h1:JLH8y9URdY9WbdvwMXfGknlhohoPBrXOc9AbQkPInOc= +go.temporal.io/api v1.62.6/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/contrib/sysinfo/go.mod b/contrib/sysinfo/go.mod index 5525c778b..72d2be00c 100644 --- a/contrib/sysinfo/go.mod +++ b/contrib/sysinfo/go.mod @@ -35,7 +35,7 @@ require ( github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect - go.temporal.io/api v1.62.5 // indirect + go.temporal.io/api v1.62.6 // indirect golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect golang.org/x/net v0.39.0 // indirect golang.org/x/sync v0.13.0 // indirect diff --git a/contrib/sysinfo/go.sum b/contrib/sysinfo/go.sum index 3e90b0cfd..a37ee668e 100644 --- a/contrib/sysinfo/go.sum +++ b/contrib/sysinfo/go.sum @@ -71,8 +71,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= -go.temporal.io/api v1.62.5 h1:9R/9CeyM7xqHSlsNt+QIvapQLcRxCZ38bnXQx4mCN6I= -go.temporal.io/api v1.62.5/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.6 h1:JLH8y9URdY9WbdvwMXfGknlhohoPBrXOc9AbQkPInOc= +go.temporal.io/api v1.62.6/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/contrib/tally/go.mod b/contrib/tally/go.mod index 93599e009..023ff3108 100644 --- a/contrib/tally/go.mod +++ b/contrib/tally/go.mod @@ -23,7 +23,7 @@ require ( github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/twmb/murmur3 v1.1.5 // indirect - go.temporal.io/api v1.62.5 // indirect + go.temporal.io/api v1.62.6 // indirect go.uber.org/atomic v1.9.0 // indirect golang.org/x/net v0.39.0 // indirect golang.org/x/sync v0.13.0 // indirect diff --git a/contrib/tally/go.sum b/contrib/tally/go.sum index 8938dfe79..18b2e4030 100644 --- a/contrib/tally/go.sum +++ b/contrib/tally/go.sum @@ -119,8 +119,8 @@ github.com/uber-go/tally/v4 v4.1.1/go.mod h1:aXeSTDMl4tNosyf6rdU8jlgScHyjEGGtfJ/ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.temporal.io/api v1.62.5 h1:9R/9CeyM7xqHSlsNt+QIvapQLcRxCZ38bnXQx4mCN6I= -go.temporal.io/api v1.62.5/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.6 h1:JLH8y9URdY9WbdvwMXfGknlhohoPBrXOc9AbQkPInOc= +go.temporal.io/api v1.62.6/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= diff --git a/go.mod b/go.mod index 2b47cf567..d5cedb999 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/nexus-rpc/sdk-go v0.6.0 github.com/robfig/cron v1.2.0 github.com/stretchr/testify v1.10.0 - go.temporal.io/api v1.62.5 + go.temporal.io/api v1.62.6 golang.org/x/sync v0.13.0 golang.org/x/sys v0.32.0 golang.org/x/time v0.3.0 diff --git a/go.sum b/go.sum index fd9193a8e..b607568d1 100644 --- a/go.sum +++ b/go.sum @@ -35,8 +35,8 @@ github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.temporal.io/api v1.62.5 h1:9R/9CeyM7xqHSlsNt+QIvapQLcRxCZ38bnXQx4mCN6I= -go.temporal.io/api v1.62.5/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.6 h1:JLH8y9URdY9WbdvwMXfGknlhohoPBrXOc9AbQkPInOc= +go.temporal.io/api v1.62.6/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/internal/cmd/build/go.mod b/internal/cmd/build/go.mod index e0b3f9b7f..c8db972d6 100644 --- a/internal/cmd/build/go.mod +++ b/internal/cmd/build/go.mod @@ -24,7 +24,7 @@ require ( github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/stretchr/testify v1.10.0 // indirect - go.temporal.io/api v1.62.5 // indirect + go.temporal.io/api v1.62.6 // indirect golang.org/x/exp/typeparams v0.0.0-20250210185358-939b2ce775ac // indirect golang.org/x/mod v0.23.0 // indirect golang.org/x/net v0.39.0 // indirect diff --git a/internal/cmd/build/go.sum b/internal/cmd/build/go.sum index c25120d3b..334c3c231 100644 --- a/internal/cmd/build/go.sum +++ b/internal/cmd/build/go.sum @@ -55,8 +55,8 @@ go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiy go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ= go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM= go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8= -go.temporal.io/api v1.62.5 h1:9R/9CeyM7xqHSlsNt+QIvapQLcRxCZ38bnXQx4mCN6I= -go.temporal.io/api v1.62.5/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.6 h1:JLH8y9URdY9WbdvwMXfGknlhohoPBrXOc9AbQkPInOc= +go.temporal.io/api v1.62.6/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/internal/grpc_dialer.go b/internal/grpc_dialer.go index 517bc9aaa..2c63107f1 100644 --- a/internal/grpc_dialer.go +++ b/internal/grpc_dialer.go @@ -7,6 +7,7 @@ import ( "time" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" + "go.temporal.io/api/proxy" "go.temporal.io/api/serviceerror" "go.temporal.io/sdk/internal/common/metrics" "go.temporal.io/sdk/internal/common/retry" @@ -17,6 +18,7 @@ import ( "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" ) type ( @@ -157,17 +159,21 @@ func requiredInterceptors( interceptors = append(interceptors, interceptor) } } - // Add namespace provider interceptor - interceptors = append(interceptors, namespaceProviderInterceptor()) + // Add temporal header interceptor (namespace + resource ID) + interceptors = append(interceptors, temporalHeaderInterceptor()) return interceptors } -func namespaceProviderInterceptor() grpc.UnaryClientInterceptor { +func temporalHeaderInterceptor() grpc.UnaryClientInterceptor { return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - if nsReq, ok := req.(interface{ GetNamespace() string }); ok { - // Only add namespace if it doesn't already exist - if md, _ := metadata.FromOutgoingContext(ctx); len(md.Get(temporalNamespaceHeaderKey)) == 0 { - ctx = metadata.AppendToOutgoingContext(ctx, temporalNamespaceHeaderKey, nsReq.GetNamespace()) + var extractOpts proxy.ExtractHeadersOptions + extractOpts.Request, _ = req.(proto.Message) + extractOpts.ExistingMetadata, _ = metadata.FromOutgoingContext(ctx) + if extractOpts.Request != nil { + if headers, err := proxy.ExtractTemporalRequestHeaders(ctx, extractOpts); err != nil { + return err + } else if len(headers) > 0 { + ctx = metadata.AppendToOutgoingContext(ctx, headers...) } } return invoker(ctx, method, req, reply, cc, opts...) diff --git a/internal/grpc_dialer_test.go b/internal/grpc_dialer_test.go index 1ae2c572e..a63d50a00 100644 --- a/internal/grpc_dialer_test.go +++ b/internal/grpc_dialer_test.go @@ -547,7 +547,7 @@ func TestExistingContextMetadataJoinedWithSDKHeaders(t *testing.T) { ) } -func TestNamespaceInterceptor(t *testing.T) { +func TestTemporalHeaderInterceptor(t *testing.T) { srv, err := startTestGRPCServer() require.NoError(t, err) defer srv.Stop() @@ -566,12 +566,18 @@ func TestNamespaceInterceptor(t *testing.T) { metadata.ValueFromIncomingContext(srv.getSystemInfoRequestContext, temporalNamespaceHeaderKey), ) // Verify namespace header is set on a request that does have namespace on the request - require.NoError(t, client.SignalWorkflow(context.Background(), "workflowid", "runid", "signalname", nil)) + require.NoError(t, client.SignalWorkflow(context.Background(), "test-workflow-id", "runid", "signalname", nil)) require.Equal( t, []string{"test-namespace"}, metadata.ValueFromIncomingContext(srv.lastSignalWorkflowExecutionContext, temporalNamespaceHeaderKey), ) + // Verify resource-id header is also set + require.Equal( + t, + []string{"workflow:test-workflow-id"}, + metadata.ValueFromIncomingContext(srv.lastSignalWorkflowExecutionContext, "temporal-resource-id"), + ) } func TestCredentialsMTLS(t *testing.T) { diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 44cfbf3b4..2862c61ba 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -1972,6 +1972,7 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow( BinaryChecksum: wth.workerBuildID, QueryResults: queryResults, Namespace: wth.namespace, + ResourceId: fmt.Sprintf("workflow:%s", task.WorkflowExecution.GetWorkflowId()), MeteringMetadata: &commonpb.MeteringMetadata{NonfirstLocalActivityExecutionAttempts: nonfirstLAAttempts}, SdkMetadata: &sdk.WorkflowTaskCompletedMetadata{ LangUsedFlags: langUsedFlags, @@ -2321,7 +2322,8 @@ func (ath *activityTaskHandlerImpl) Execute(taskQueue string, t *workflowservice metricsHandler.Counter(metrics.UnregisteredActivityInvocationCounter).Inc(1) return convertActivityResultToRespondRequest(ath.identity, t.TaskToken, nil, NewActivityNotRegisteredError(activityType, ath.getRegisteredActivityNames()), - dataConverter, failureConverter, ath.namespace, false, ath.versionStamp, ath.deployment, ath.workerDeploymentOptions), nil + dataConverter, failureConverter, ath.namespace, false, ath.versionStamp, ath.deployment, ath.workerDeploymentOptions, + t.WorkflowExecution.GetWorkflowId(), t.ActivityId), nil } // panic handler @@ -2339,7 +2341,8 @@ func (ath *activityTaskHandlerImpl) Execute(taskQueue string, t *workflowservice metricsHandler.Counter(metrics.ActivityTaskErrorCounter).Inc(1) panicErr := newPanicError(p, st) result = convertActivityResultToRespondRequest(ath.identity, t.TaskToken, nil, panicErr, - dataConverter, failureConverter, ath.namespace, false, ath.versionStamp, ath.deployment, ath.workerDeploymentOptions) + dataConverter, failureConverter, ath.namespace, false, ath.versionStamp, ath.deployment, ath.workerDeploymentOptions, + t.WorkflowExecution.GetWorkflowId(), t.ActivityId) } }() @@ -2386,7 +2389,8 @@ func (ath *activityTaskHandlerImpl) Execute(taskQueue string, t *workflowservice ) } return convertActivityResultToRespondRequest(ath.identity, t.TaskToken, output, err, - dataConverter, failureConverter, ath.namespace, isActivityCanceled, ath.versionStamp, ath.deployment, ath.workerDeploymentOptions), nil + dataConverter, failureConverter, ath.namespace, isActivityCanceled, ath.versionStamp, ath.deployment, ath.workerDeploymentOptions, + t.WorkflowExecution.GetWorkflowId(), t.ActivityId), nil } func (ath *activityTaskHandlerImpl) getActivity(name string) activity { @@ -2446,15 +2450,24 @@ func createNewCommandWithMetadata(commandType enumspb.CommandType, metadata *sdk } } +func getActivityResourceIdFromCtx(ctx context.Context) string { + env := getActivityEnvironmentFromCtx(ctx) + if env == nil { + return "" + } + return getActivityResourceId(env.workflowExecution.ID, env.activityID) +} + func recordActivityHeartbeat(ctx context.Context, service workflowservice.WorkflowServiceClient, metricsHandler metrics.Handler, identity string, taskToken []byte, details *commonpb.Payloads, ) error { namespace := getNamespaceFromActivityCtx(ctx) request := &workflowservice.RecordActivityTaskHeartbeatRequest{ - TaskToken: taskToken, - Details: details, - Identity: identity, - Namespace: namespace, + TaskToken: taskToken, + Details: details, + Identity: identity, + Namespace: namespace, + ResourceId: getActivityResourceIdFromCtx(ctx), } var heartbeatResponse *workflowservice.RecordActivityTaskHeartbeatResponse @@ -2486,6 +2499,7 @@ func recordActivityHeartbeatByID(ctx context.Context, service workflowservice.Wo ActivityId: activityID, Details: details, Identity: identity, + ResourceId: getActivityResourceId(workflowID, activityID), } var heartbeatResponse *workflowservice.RecordActivityTaskHeartbeatByIdResponse diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index bc148edf9..490689af2 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -464,7 +464,7 @@ func (wtp *workflowTaskProcessor) processWorkflowTask(task *workflowTask) (retEr if err := visitProtoPayloads(ctx, wtp.inboundPayloadVisitor, task.task); err != nil { // Submit an explicit WFT failure so the server records the error immediately // rather than waiting for the task to time out. - failReq := wtp.errorToFailWorkflowTask(task.task.TaskToken, err) + failReq := wtp.errorToFailWorkflowTask(task.task.TaskToken, task.task.WorkflowExecution.GetWorkflowId(), err) if _, submitErr := wtp.sendTaskCompletedRequest(&workflowTaskCompletion{rawRequest: failReq}, task.task); submitErr != nil { wtp.logger.Warn("Failed to submit WFT failure after inbound visitor error.", tagError, submitErr) } @@ -570,7 +570,7 @@ func (wtp *workflowTaskProcessor) RespondTaskCompletedWithMetrics( tagAttempt, task.Attempt, tagError, taskErr) emitFailMetric = true - failWorkflowTask := wtp.errorToFailWorkflowTask(task.TaskToken, taskErr) + failWorkflowTask := wtp.errorToFailWorkflowTask(task.TaskToken, task.WorkflowExecution.GetWorkflowId(), taskErr) failureReason = "WorkflowError" if failWorkflowTask.Cause == enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR { failureReason = "NonDeterminismError" @@ -584,7 +584,7 @@ func (wtp *workflowTaskProcessor) RespondTaskCompletedWithMetrics( // The outbound visitor failed (e.g. storage driver error or panic). We // cannot send the original response, so fall back to an explicit WFT // failure so the server records the error immediately. - failReq := wtp.errorToFailWorkflowTask(task.TaskToken, err) + failReq := wtp.errorToFailWorkflowTask(task.TaskToken, task.WorkflowExecution.GetWorkflowId(), err) var submitErr error response, submitErr = wtp.sendTaskCompletedRequest(&workflowTaskCompletion{rawRequest: failReq}, task) if submitErr != nil { @@ -726,7 +726,7 @@ func (wtp *workflowTaskProcessor) reportGrpcMessageTooLarge( switch taskCompletion.rawRequest.(type) { case *workflowservice.RespondWorkflowTaskCompletedRequest, *workflowservice.RespondWorkflowTaskFailedRequest: emitFailMetric = true - request := wtp.errorToFailWorkflowTask(task.TaskToken, sendErr) + request := wtp.errorToFailWorkflowTask(task.TaskToken, task.WorkflowExecution.GetWorkflowId(), sendErr) request.Cause = enumspb.WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE if err = visitProtoPayloads(ctx, wtp.outboundPayloadVisitor, request); err != nil { wtp.logger.Error("Failed to visit payloads for GRPC message too large failure response.", tagError, err) @@ -754,7 +754,7 @@ func (wtp *workflowTaskProcessor) reportGrpcMessageTooLarge( return } -func (wtp *workflowTaskProcessor) errorToFailWorkflowTask(taskToken []byte, err error) *workflowservice.RespondWorkflowTaskFailedRequest { +func (wtp *workflowTaskProcessor) errorToFailWorkflowTask(taskToken []byte, workflowId string, err error) *workflowservice.RespondWorkflowTaskFailedRequest { cause := enumspb.WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE // If it was a panic due to a bad state machine or if it was a history // mismatch error, mark as non-deterministic @@ -768,10 +768,10 @@ func (wtp *workflowTaskProcessor) errorToFailWorkflowTask(taskToken []byte, err cause = enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR } - return wtp.errorToFailWorkflowTaskWithCause(taskToken, err, cause) + return wtp.errorToFailWorkflowTaskWithCause(taskToken, workflowId, err, cause) } -func (wtp *workflowTaskProcessor) errorToFailWorkflowTaskWithCause(taskToken []byte, err error, cause enumspb.WorkflowTaskFailedCause) *workflowservice.RespondWorkflowTaskFailedRequest { +func (wtp *workflowTaskProcessor) errorToFailWorkflowTaskWithCause(taskToken []byte, workflowId string, err error, cause enumspb.WorkflowTaskFailedCause) *workflowservice.RespondWorkflowTaskFailedRequest { builtRequest := &workflowservice.RespondWorkflowTaskFailedRequest{ TaskToken: taskToken, Cause: cause, @@ -779,6 +779,7 @@ func (wtp *workflowTaskProcessor) errorToFailWorkflowTaskWithCause(taskToken []b Identity: wtp.identity, BinaryChecksum: wtp.workerBuildID, Namespace: wtp.namespace, + ResourceId: fmt.Sprintf("workflow:%s", workflowId), WorkerVersion: &commonpb.WorkerVersionStamp{ BuildId: wtp.workerBuildID, UseVersioning: wtp.useBuildIDVersioning, @@ -1507,6 +1508,16 @@ func reportActivityCompleteByID( return reportErr } +func getActivityResourceId(workflowId, activityId string) string { + if workflowId != "" { + return fmt.Sprintf("workflow:%s", workflowId) + } + if activityId != "" { + return fmt.Sprintf("activity:%s", activityId) + } + return "" +} + func convertActivityResultToRespondRequest( identity string, taskToken []byte, @@ -1519,6 +1530,8 @@ func convertActivityResultToRespondRequest( versionStamp *commonpb.WorkerVersionStamp, deployment *deploymentpb.Deployment, workerDeploymentOptions *deploymentpb.WorkerDeploymentOptions, + workflowId string, + activityId string, ) interface{} { if err == ErrActivityResultPending { // activity result is pending and will be completed asynchronously. @@ -1532,6 +1545,7 @@ func convertActivityResultToRespondRequest( Result: result, Identity: identity, Namespace: namespace, + ResourceId: getActivityResourceId(workflowId, activityId), WorkerVersion: versionStamp, Deployment: deployment, DeploymentOptions: workerDeploymentOptions, @@ -1547,6 +1561,7 @@ func convertActivityResultToRespondRequest( Details: convertErrDetailsToPayloads(canceledErr.details, dataConverter), Identity: identity, Namespace: namespace, + ResourceId: getActivityResourceId(workflowId, activityId), WorkerVersion: versionStamp, Deployment: deployment, DeploymentOptions: workerDeploymentOptions, @@ -1557,6 +1572,7 @@ func convertActivityResultToRespondRequest( TaskToken: taskToken, Identity: identity, Namespace: namespace, + ResourceId: getActivityResourceId(workflowId, activityId), WorkerVersion: versionStamp, Deployment: deployment, DeploymentOptions: workerDeploymentOptions, @@ -1575,6 +1591,7 @@ func convertActivityResultToRespondRequest( Failure: failureConverter.ErrorToFailure(err), Identity: identity, Namespace: namespace, + ResourceId: getActivityResourceId(workflowId, activityId), WorkerVersion: versionStamp, Deployment: deployment, DeploymentOptions: workerDeploymentOptions, @@ -1607,6 +1624,7 @@ func convertActivityResultToRespondRequestByID( ActivityId: activityID, Result: result, Identity: identity, + ResourceId: getActivityResourceId(workflowID, activityID), } } @@ -1621,6 +1639,7 @@ func convertActivityResultToRespondRequestByID( ActivityId: activityID, Details: convertErrDetailsToPayloads(canceledErr.details, dataConverter), Identity: identity, + ResourceId: getActivityResourceId(workflowID, activityID), } } if errors.Is(err, context.Canceled) { @@ -1630,6 +1649,7 @@ func convertActivityResultToRespondRequestByID( RunId: runID, ActivityId: activityID, Identity: identity, + ResourceId: getActivityResourceId(workflowID, activityID), } } } @@ -1647,6 +1667,7 @@ func convertActivityResultToRespondRequestByID( ActivityId: activityID, Failure: failureConverter.ErrorToFailure(err), Identity: identity, + ResourceId: getActivityResourceId(workflowID, activityID), } } diff --git a/internal/internal_worker_heartbeat.go b/internal/internal_worker_heartbeat.go index 86c0c35fd..125051b35 100644 --- a/internal/internal_worker_heartbeat.go +++ b/internal/internal_worker_heartbeat.go @@ -168,6 +168,7 @@ func (hw *sharedNamespaceWorker) sendHeartbeats() error { _, err := hw.client.recordWorkerHeartbeat(hw.heartbeatCtx, &workflowservice.RecordWorkerHeartbeatRequest{ Namespace: hw.namespace, WorkerHeartbeat: heartbeats, + ResourceId: fmt.Sprintf("worker:%s", hw.client.workerGroupingKey), }) if err != nil { diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 76fce23bd..6a5940d1c 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -512,7 +512,8 @@ func (wc *WorkflowClient) CompleteActivityWithOptions(ctx context.Context, opts // We do allow canceled error to be passed here cancelAllowed := true request := convertActivityResultToRespondRequest(wc.identity, opts.TaskToken, - data, opts.Err, dataConverter, failureConverter, wc.namespace, cancelAllowed, nil, nil, nil) + data, opts.Err, dataConverter, failureConverter, wc.namespace, cancelAllowed, nil, nil, nil, + opts.WorkflowID, "") return reportActivityComplete(ctx, wc.workflowService, request, wc.metricsHandler) } @@ -2198,6 +2199,7 @@ func (w *workflowClientInterceptor) updateWithStartWorkflow( startOp, updateOp, }, + ResourceId: fmt.Sprintf("workflow:%s", startRequest.WorkflowId), } if err := visitProtoPayloads(ctx, w.client.outboundPayloadVisitor, &multiRequest); err != nil { diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index 67b7093b2..97c1de260 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -1262,7 +1262,8 @@ func (env *testWorkflowEnvironmentImpl) CompleteActivity(taskToken []byte, resul // We do allow canceled error to be passed here cancelAllowed := true request := convertActivityResultToRespondRequest("test-identity", taskToken, data, err, - activityHandle.dataConverter, activityHandle.failureConverter, defaultTestNamespace, cancelAllowed, nil, nil, nil) + activityHandle.dataConverter, activityHandle.failureConverter, defaultTestNamespace, cancelAllowed, nil, nil, nil, + "", "") env.handleActivityResult(activityHandle, request, activityHandle.dataConverter) }, false /* do not auto schedule workflow task, because activity might be still pending */) diff --git a/internal/resource_id_test.go b/internal/resource_id_test.go new file mode 100644 index 000000000..2c5b0b515 --- /dev/null +++ b/internal/resource_id_test.go @@ -0,0 +1,825 @@ +package internal + +import ( + "context" + "errors" + "sync/atomic" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" + historypb "go.temporal.io/api/history/v1" + taskqueuepb "go.temporal.io/api/taskqueue/v1" + workerpb "go.temporal.io/api/worker/v1" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/api/workflowservicemock/v1" + "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/internal/common/metrics" +) + +// TestResourceIDImplementation tests that our actual implementation code +// correctly populates resource_id fields in various request types +func TestResourceIDImplementation(t *testing.T) { + t.Run("ActivityHeartbeat", func(t *testing.T) { + testActivityHeartbeatResourceID(t) + }) + t.Run("WorkflowTaskRequests", func(t *testing.T) { + testWorkflowTaskResourceID(t) + }) + t.Run("ActivityTaskRequests", func(t *testing.T) { + testActivityTaskRequestsResourceID(t) + }) + t.Run("BatchOperationRequest", func(t *testing.T) { + testExecuteMultiOperationResourceID(t) + }) + t.Run("WorkerHeartbeatRequest", func(t *testing.T) { + testRecordWorkerHeartbeatResourceID(t) + }) +} + +func TestGetActivityResourceId_BothEmpty(t *testing.T) { + result := getActivityResourceId("", "") + assert.Empty(t, result, "Expected empty resource ID when both workflowId and activityId are empty") +} + +// Test activity heartbeat resource_id population using actual SDK code path +func testActivityHeartbeatResourceID(t *testing.T) { + testCases := []struct { + name string + createActivityEnv func(ServiceInvoker) *activityEnvironment + expectedResourceID string + }{ + { + name: "WithWorkflowExecution", + createActivityEnv: func(invoker ServiceInvoker) *activityEnvironment { + return &activityEnvironment{ + serviceInvoker: invoker, + workflowExecution: WorkflowExecution{ + ID: "test-workflow-123", + RunID: "test-run-456", + }, + logger: getLogger(), + } + }, + expectedResourceID: "workflow:test-workflow-123", + }, + { + name: "StandaloneActivity", + createActivityEnv: func(invoker ServiceInvoker) *activityEnvironment { + return &activityEnvironment{ + serviceInvoker: invoker, + activityID: "standalone-activity-999", + logger: getLogger(), + } + }, + expectedResourceID: "activity:standalone-activity-999", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Test setup inside the loop for clean isolation + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + service := workflowservicemock.NewMockWorkflowServiceClient(mockCtrl) + + var capturedRequest *workflowservice.RecordActivityTaskHeartbeatRequest + service.EXPECT(). + RecordActivityTaskHeartbeat(gomock.Any(), gomock.Any(), gomock.Any()). + Do(func(ctx context.Context, req *workflowservice.RecordActivityTaskHeartbeatRequest, opts ...interface{}) { + capturedRequest = req + }). + Return(&workflowservice.RecordActivityTaskHeartbeatResponse{}, nil). + Times(1) + + ctx, cancel := context.WithCancelCause(context.Background()) + defer cancel(nil) + invoker := newServiceInvoker([]byte("test-token"), "test-identity", service, metrics.NopHandler, cancel, + 1*time.Second, make(chan struct{}), "test-namespace", &atomic.Bool{}) + + // Only the activity environment creation varies + activityCtx, _ := newActivityContext(ctx, nil, tc.createActivityEnv(invoker)) + + // Call the actual SDK function + RecordActivityHeartbeat(activityCtx, "test-heartbeat-details") + + // Validate the captured request + require.NotNil(t, capturedRequest) + assert.Equal(t, tc.expectedResourceID, capturedRequest.ResourceId) + }) + } +} + +// Test workflow task request resource_id population using actual SDK code paths +func testWorkflowTaskResourceID(t *testing.T) { + t.Run("WorkflowTaskCompleted", func(t *testing.T) { + testWorkflowTaskCompletedResourceID(t) + }) + t.Run("WorkflowTaskFailed", func(t *testing.T) { + testWorkflowTaskFailedResourceID(t) + }) +} + +// Test RespondWorkflowTaskCompletedRequest resource_id field (resource_id = 18) +// Expected: Workflow ID from original task +func testWorkflowTaskCompletedResourceID(t *testing.T) { + testCases := []struct { + name string + workflowID string + runID string + expectedResourceID string + }{ + { + name: "StandardWorkflowTask", + workflowID: "test-workflow-completed-123", + runID: "test-run-completed-456", + expectedResourceID: "workflow:test-workflow-completed-123", + }, + { + name: "DifferentWorkflowID", + workflowID: "another-workflow-789", + runID: "another-run-999", + expectedResourceID: "workflow:another-workflow-789", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + service := workflowservicemock.NewMockWorkflowServiceClient(mockCtrl) + + var capturedRequest *workflowservice.RespondWorkflowTaskCompletedRequest + service.EXPECT(). + RespondWorkflowTaskCompleted(gomock.Any(), gomock.Any(), gomock.Any()). + Do(func(ctx context.Context, req *workflowservice.RespondWorkflowTaskCompletedRequest, opts ...interface{}) { + capturedRequest = req + }). + Return(&workflowservice.RespondWorkflowTaskCompletedResponse{}, nil). + Times(1) + + // Create a simple workflow task with basic history + task := createTestWorkflowTask(tc.workflowID, tc.runID) + + // Create workflow task handler + params := workerExecutionParameters{ + Namespace: "test-namespace", + Identity: "test-identity", + cache: NewWorkerCache(), + } + ensureRequiredParams(¶ms) + registry := newRegistry() + registry.RegisterWorkflowWithOptions( + helloWorldWorkflowFunc, + RegisterWorkflowOptions{Name: "HelloWorld_Workflow"}, + ) + + handler := newWorkflowTaskHandler(params, nil, registry) + + // Process the workflow task through the poller to trigger the completion + poller := newWorkflowTaskProcessor(handler, handler, service, params, uuid.NewString()) + + wt := &workflowTask{task: task} + err := poller.processWorkflowTask(wt) + + // We expect the task to complete successfully since workflow is registered + require.NoError(t, err) + + // Validate the captured request + require.NotNil(t, capturedRequest) + assert.Equal(t, tc.expectedResourceID, capturedRequest.ResourceId) + }) + } +} + +// Test RespondWorkflowTaskFailedRequest resource_id field (resource_id = 11) +// Expected: Workflow ID from original task +func testWorkflowTaskFailedResourceID(t *testing.T) { + testCases := []struct { + name string + workflowID string + runID string + expectedResourceID string + failureCause enumspb.WorkflowTaskFailedCause + }{ + { + name: "UnknownWorkflowType", + workflowID: "test-workflow-failed-123", + runID: "test-run-failed-456", + expectedResourceID: "workflow:test-workflow-failed-123", + failureCause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE, + }, + { + name: "NonDeterministicError", + workflowID: "non-deterministic-workflow-789", + runID: "non-deterministic-run-999", + expectedResourceID: "workflow:non-deterministic-workflow-789", + failureCause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + service := workflowservicemock.NewMockWorkflowServiceClient(mockCtrl) + + var capturedRequest *workflowservice.RespondWorkflowTaskFailedRequest + service.EXPECT(). + RespondWorkflowTaskFailed(gomock.Any(), gomock.Any(), gomock.Any()). + Do(func(ctx context.Context, req *workflowservice.RespondWorkflowTaskFailedRequest, opts ...interface{}) { + capturedRequest = req + }). + Return(&workflowservice.RespondWorkflowTaskFailedResponse{}, nil). + Times(1) + + // Create a workflow task that will fail (unregistered workflow type) + task := createTestWorkflowTaskWithType(tc.workflowID, tc.runID, "UnregisteredWorkflow") + + // Create workflow task handler without registering the workflow + params := workerExecutionParameters{ + Namespace: "test-namespace", + Identity: "test-identity", + cache: NewWorkerCache(), + } + ensureRequiredParams(¶ms) + registry := newRegistry() // Empty registry to cause failure + + handler := newWorkflowTaskHandler(params, nil, registry) + + // Process the workflow task through the poller to trigger the failure + poller := newWorkflowTaskProcessor(handler, handler, service, params, uuid.NewString()) + + wt := &workflowTask{task: task} + err := poller.processWorkflowTask(wt) + + // Task processing succeeds but should trigger failure request due to unregistered workflow + require.NoError(t, err) + + // Validate the captured request + require.NotNil(t, capturedRequest) + assert.Equal(t, tc.expectedResourceID, capturedRequest.ResourceId) + }) + } +} + +// Test activity task request resource_id population using actual SDK code paths +func testActivityTaskRequestsResourceID(t *testing.T) { + t.Run("ActivityTaskHeartbeatById", func(t *testing.T) { + testActivityTaskHeartbeatByIdResourceID(t) + }) + t.Run("ConvertActivityResultValidation", func(t *testing.T) { + testConvertActivityResultValidation(t) + }) + t.Run("ConvertActivityResultByIDValidation", func(t *testing.T) { + testConvertActivityResultByIDValidation(t) + }) +} + +// Test convertActivityResultToRespondRequest validation +// This tests all 4 activity task requests that use this function: +// - RespondActivityTaskCompletedRequest (resource_id = 8) +// - RespondActivityTaskFailedRequest (resource_id = 9) +// - RespondActivityTaskCanceledRequest (resource_id = 8) +// - RecordActivityTaskHeartbeatRequest (resource_id = 5) +func testConvertActivityResultValidation(t *testing.T) { + testCases := []struct { + name string + workflowID string + activityID string + expectedResourceID string + testType string + simulateError error + description string + }{ + { + name: "CompletedWithWorkflow", + workflowID: "test-workflow-completed-123", + activityID: "test-activity-456", + expectedResourceID: "workflow:test-workflow-completed-123", + testType: "completed", + simulateError: nil, + description: "RespondActivityTaskCompletedRequest should use workflow ID when present", + }, + { + name: "CompletedStandalone", + workflowID: "", + activityID: "standalone-activity-completed-789", + expectedResourceID: "activity:standalone-activity-completed-789", + testType: "completed", + simulateError: nil, + description: "RespondActivityTaskCompletedRequest should use activity ID for standalone activities", + }, + { + name: "FailedWithWorkflow", + workflowID: "test-workflow-failed-123", + activityID: "test-activity-failed-456", + expectedResourceID: "workflow:test-workflow-failed-123", + testType: "failed", + simulateError: errors.New("activity failed"), + description: "RespondActivityTaskFailedRequest should use workflow ID when present", + }, + { + name: "FailedStandalone", + workflowID: "", + activityID: "standalone-activity-failed-789", + expectedResourceID: "activity:standalone-activity-failed-789", + testType: "failed", + simulateError: errors.New("standalone activity failed"), + description: "RespondActivityTaskFailedRequest should use activity ID for standalone activities", + }, + { + name: "CanceledWithWorkflow", + workflowID: "test-workflow-canceled-123", + activityID: "test-activity-canceled-456", + expectedResourceID: "workflow:test-workflow-canceled-123", + testType: "canceled", + simulateError: NewCanceledError(), + description: "RespondActivityTaskCanceledRequest should use workflow ID when present", + }, + { + name: "CanceledStandalone", + workflowID: "", + activityID: "standalone-activity-canceled-789", + expectedResourceID: "activity:standalone-activity-canceled-789", + testType: "canceled", + simulateError: NewCanceledError(), + description: "RespondActivityTaskCanceledRequest should use activity ID for standalone activities", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Call the conversion function directly to test resource ID population + result := convertActivityResultToRespondRequest( + "test-identity", + []byte("test-task-token"), + nil, // result payloads + tc.simulateError, + converter.GetDefaultDataConverter(), // data converter + GetDefaultFailureConverter(), // failure converter + "test-namespace", + true, // cancel allowed + nil, // version stamp + nil, // deployment + nil, // worker deployment options + tc.workflowID, + tc.activityID, + ) + + // Validate the result based on the test type + switch tc.testType { + case "completed": + if tc.simulateError == nil { + completedRequest, ok := result.(*workflowservice.RespondActivityTaskCompletedRequest) + require.True(t, ok, "Result should be a RespondActivityTaskCompletedRequest") + assert.Equal(t, tc.expectedResourceID, completedRequest.ResourceId, tc.description) + } + case "failed": + if tc.simulateError != nil && !errors.Is(tc.simulateError, context.Canceled) { + var canceledErr *CanceledError + if !errors.As(tc.simulateError, &canceledErr) { + failedRequest, ok := result.(*workflowservice.RespondActivityTaskFailedRequest) + require.True(t, ok, "Result should be a RespondActivityTaskFailedRequest") + assert.Equal(t, tc.expectedResourceID, failedRequest.ResourceId, tc.description) + } + } + case "canceled": + var canceledErr *CanceledError + if errors.As(tc.simulateError, &canceledErr) { + canceledRequest, ok := result.(*workflowservice.RespondActivityTaskCanceledRequest) + require.True(t, ok, "Result should be a RespondActivityTaskCanceledRequest") + assert.Equal(t, tc.expectedResourceID, canceledRequest.ResourceId, tc.description) + } + } + }) + } +} + +// Test convertActivityResultToRespondRequestByID validation +// This tests all 4 ByID activity task requests that use this function: +// - RespondActivityTaskCompletedByIdRequest (resource_id = 7) +// - RespondActivityTaskFailedByIdRequest (resource_id = 8) +// - RespondActivityTaskCanceledByIdRequest (resource_id = 8) +// - RecordActivityTaskHeartbeatByIdRequest (resource_id = 7) +func testConvertActivityResultByIDValidation(t *testing.T) { + testCases := []struct { + name string + workflowID string + activityID string + expectedResourceID string + testType string + simulateError error + description string + }{ + { + name: "CompletedByIDWithWorkflow", + workflowID: "test-workflow-completed-by-id-123", + activityID: "test-activity-by-id-456", + expectedResourceID: "workflow:test-workflow-completed-by-id-123", + testType: "completed", + simulateError: nil, + description: "RespondActivityTaskCompletedByIdRequest should use workflow ID when present", + }, + { + name: "CompletedByIDStandalone", + workflowID: "", + activityID: "standalone-activity-completed-by-id-789", + expectedResourceID: "activity:standalone-activity-completed-by-id-789", + testType: "completed", + simulateError: nil, + description: "RespondActivityTaskCompletedByIdRequest should use activity ID for standalone activities", + }, + { + name: "FailedByIDWithWorkflow", + workflowID: "test-workflow-failed-by-id-123", + activityID: "test-activity-failed-by-id-456", + expectedResourceID: "workflow:test-workflow-failed-by-id-123", + testType: "failed", + simulateError: errors.New("activity failed by ID"), + description: "RespondActivityTaskFailedByIdRequest should use workflow ID when present", + }, + { + name: "FailedByIDStandalone", + workflowID: "", + activityID: "standalone-activity-failed-by-id-789", + expectedResourceID: "activity:standalone-activity-failed-by-id-789", + testType: "failed", + simulateError: errors.New("standalone activity failed by ID"), + description: "RespondActivityTaskFailedByIdRequest should use activity ID for standalone activities", + }, + { + name: "CanceledByIDWithWorkflow", + workflowID: "test-workflow-canceled-by-id-123", + activityID: "test-activity-canceled-by-id-456", + expectedResourceID: "workflow:test-workflow-canceled-by-id-123", + testType: "canceled", + simulateError: NewCanceledError(), + description: "RespondActivityTaskCanceledByIdRequest should use workflow ID when present", + }, + { + name: "CanceledByIDStandalone", + workflowID: "", + activityID: "standalone-activity-canceled-by-id-789", + expectedResourceID: "activity:standalone-activity-canceled-by-id-789", + testType: "canceled", + simulateError: NewCanceledError(), + description: "RespondActivityTaskCanceledByIdRequest should use activity ID for standalone activities", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Call the conversion function directly to test resource ID population + result := convertActivityResultToRespondRequestByID( + "test-identity", + "test-namespace", + tc.workflowID, + "test-run-id", + tc.activityID, + nil, // result payloads + tc.simulateError, + converter.GetDefaultDataConverter(), // data converter + GetDefaultFailureConverter(), // failure converter + true, // cancel allowed + ) + + // Validate the result based on the test type + switch tc.testType { + case "completed": + if tc.simulateError == nil { + completedRequest, ok := result.(*workflowservice.RespondActivityTaskCompletedByIdRequest) + require.True(t, ok, "Result should be a RespondActivityTaskCompletedByIdRequest") + assert.Equal(t, tc.expectedResourceID, completedRequest.ResourceId, tc.description) + } + case "failed": + if tc.simulateError != nil && !errors.Is(tc.simulateError, context.Canceled) { + var canceledErr *CanceledError + if !errors.As(tc.simulateError, &canceledErr) { + failedRequest, ok := result.(*workflowservice.RespondActivityTaskFailedByIdRequest) + require.True(t, ok, "Result should be a RespondActivityTaskFailedByIdRequest") + assert.Equal(t, tc.expectedResourceID, failedRequest.ResourceId, tc.description) + } + } + case "canceled": + var canceledErr *CanceledError + if errors.As(tc.simulateError, &canceledErr) { + canceledRequest, ok := result.(*workflowservice.RespondActivityTaskCanceledByIdRequest) + require.True(t, ok, "Result should be a RespondActivityTaskCanceledByIdRequest") + assert.Equal(t, tc.expectedResourceID, canceledRequest.ResourceId, tc.description) + } + } + }) + } +} + +// Helper functions for creating test workflow tasks + +func createTestWorkflowTask(workflowID, runID string) *workflowservice.PollWorkflowTaskQueueResponse { + return createTestWorkflowTaskWithType(workflowID, runID, "HelloWorld_Workflow") +} + +func createTestWorkflowTaskWithType(workflowID, runID, workflowType string) *workflowservice.PollWorkflowTaskQueueResponse { + events := []*historypb.HistoryEvent{ + { + EventId: 1, + EventTime: nil, + EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, + Attributes: &historypb.HistoryEvent_WorkflowExecutionStartedEventAttributes{ + WorkflowExecutionStartedEventAttributes: &historypb.WorkflowExecutionStartedEventAttributes{ + WorkflowType: &commonpb.WorkflowType{Name: workflowType}, + TaskQueue: &taskqueuepb.TaskQueue{Name: "test-task-queue"}, + }, + }, + }, + { + EventId: 2, + EventTime: nil, + EventType: enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED, + Attributes: &historypb.HistoryEvent_WorkflowTaskScheduledEventAttributes{ + WorkflowTaskScheduledEventAttributes: &historypb.WorkflowTaskScheduledEventAttributes{ + TaskQueue: &taskqueuepb.TaskQueue{Name: "test-task-queue"}, + }, + }, + }, + { + EventId: 3, + EventTime: nil, + EventType: enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED, + Attributes: &historypb.HistoryEvent_WorkflowTaskStartedEventAttributes{ + WorkflowTaskStartedEventAttributes: &historypb.WorkflowTaskStartedEventAttributes{ + Identity: "test-identity", + }, + }, + }, + } + + return &workflowservice.PollWorkflowTaskQueueResponse{ + TaskToken: []byte("test-task-token"), + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + RunId: runID, + }, + WorkflowType: &commonpb.WorkflowType{Name: workflowType}, + History: &historypb.History{Events: events}, + Attempt: 1, + } +} + +// Test RecordActivityTaskHeartbeatByIdRequest resource_id field (resource_id = 7) +// Expected: Workflow ID or activity ID for standalone activities +func testActivityTaskHeartbeatByIdResourceID(t *testing.T) { + testCases := []struct { + name string + workflowID string + activityID string + expectedResourceID string + description string + }{ + { + name: "WithWorkflowExecution", + workflowID: "test-workflow-heartbeat-by-id-123", + activityID: "test-activity-heartbeat-by-id-456", + expectedResourceID: "workflow:test-workflow-heartbeat-by-id-123", + description: "Should use workflow ID when present", + }, + { + name: "StandaloneActivity", + workflowID: "", + activityID: "standalone-activity-heartbeat-by-id-789", + expectedResourceID: "activity:standalone-activity-heartbeat-by-id-789", + description: "Should use activity ID for standalone activities", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + service := workflowservicemock.NewMockWorkflowServiceClient(mockCtrl) + + // Mock GetSystemInfo which is called during client initialization + service.EXPECT(). + GetSystemInfo(gomock.Any(), gomock.Any(), gomock.Any()). + Return(&workflowservice.GetSystemInfoResponse{}, nil). + AnyTimes() + + var capturedRequest *workflowservice.RecordActivityTaskHeartbeatByIdRequest + service.EXPECT(). + RecordActivityTaskHeartbeatById(gomock.Any(), gomock.Any(), gomock.Any()). + Do(func(ctx context.Context, req *workflowservice.RecordActivityTaskHeartbeatByIdRequest, opts ...interface{}) { + capturedRequest = req + }). + Return(&workflowservice.RecordActivityTaskHeartbeatByIdResponse{}, nil). + Times(1) + + // Create workflow client and call heartbeat by ID + client := NewServiceClient(service, nil, ClientOptions{Namespace: "test-namespace"}) + + ctx := context.Background() + err := client.RecordActivityHeartbeatByID(ctx, "test-namespace", tc.workflowID, "", tc.activityID, "heartbeat-details") + + // Validate call succeeded + require.NoError(t, err) + + // Validate the captured request + require.NotNil(t, capturedRequest, "Request should have been captured") + assert.Equal(t, tc.expectedResourceID, capturedRequest.ResourceId, tc.description) + }) + } +} + +// Test ExecuteMultiOperationRequest resource_id field (resource_id = 3) +// Expected: Should match operations[0].start_workflow.workflow_id +func testExecuteMultiOperationResourceID(t *testing.T) { + testCases := []struct { + name string + workflowID string + expectedResourceID string + }{ + { + name: "StartUpdateWorkflow", + workflowID: "test-workflow-batch-123", + expectedResourceID: "workflow:test-workflow-batch-123", + }, + { + name: "MultiOpWithDifferentID", + workflowID: "batch-operation-workflow-456", + expectedResourceID: "workflow:batch-operation-workflow-456", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + service := workflowservicemock.NewMockWorkflowServiceClient(mockCtrl) + + // Mock GetSystemInfo which gets called during client operations + service.EXPECT().GetSystemInfo(gomock.Any(), gomock.Any(), gomock.Any()). + Return(&workflowservice.GetSystemInfoResponse{}, nil).AnyTimes() + + var capturedRequest *workflowservice.ExecuteMultiOperationRequest + service.EXPECT(). + ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()). + Do(func(ctx context.Context, req *workflowservice.ExecuteMultiOperationRequest, opts ...interface{}) { + capturedRequest = req + }). + Return(&workflowservice.ExecuteMultiOperationResponse{ + Responses: []*workflowservice.ExecuteMultiOperationResponse_Response{ + { + Response: &workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow{ + StartWorkflow: &workflowservice.StartWorkflowExecutionResponse{ + RunId: "test-run-id", + }, + }, + }, + { + Response: &workflowservice.ExecuteMultiOperationResponse_Response_UpdateWorkflow{ + UpdateWorkflow: &workflowservice.UpdateWorkflowExecutionResponse{ + Stage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED, + }, + }, + }, + }, + }, nil). + Times(1) + + // Create a client to trigger ExecuteMultiOperation via UpdateWithStartWorkflow + client := NewServiceClient(service, nil, ClientOptions{ + Namespace: "test-namespace", + }) + + // Create start operation with workflow options + startOp := client.NewWithStartWorkflowOperation( + StartWorkflowOptions{ + ID: tc.workflowID, + TaskQueue: "test-task-queue", + WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL, + }, + "TestWorkflow", + ) + + // Execute the update with start workflow operation + ctx := context.Background() + _, err := client.UpdateWithStartWorkflow(ctx, UpdateWithStartWorkflowOptions{ + StartWorkflowOperation: startOp, + UpdateOptions: UpdateWorkflowOptions{ + UpdateName: "TestUpdate", + Args: []any{}, + WaitForStage: WorkflowUpdateStageAccepted, + }, + }) + + // The operation should succeed + require.NoError(t, err) + + // Validate the captured request + require.NotNil(t, capturedRequest, "ExecuteMultiOperationRequest should have been captured") + assert.Equal(t, tc.expectedResourceID, capturedRequest.ResourceId, + "ResourceId should match the workflow ID from the start operation") + + // Additional validation: ensure we have the expected operations + require.Len(t, capturedRequest.Operations, 2, "Should have start and update operations") + + // Verify the first operation is a start workflow operation with the expected workflow ID + startWorkflowOp := capturedRequest.Operations[0].GetStartWorkflow() + require.NotNil(t, startWorkflowOp, "First operation should be StartWorkflow") + assert.Equal(t, tc.workflowID, startWorkflowOp.WorkflowId, + "Start workflow operation should have the expected workflow ID") + }) + } +} + +// Test RecordWorkerHeartbeatRequest resource_id field (resource_id = 4) +// Expected: Contains the worker grouping key +func testRecordWorkerHeartbeatResourceID(t *testing.T) { + testCases := []struct { + name string + groupingKey string + expectedResourceID string + }{ + { + name: "WorkerHeartbeat", + groupingKey: "test-worker-grouping-key-123", + expectedResourceID: "worker:test-worker-grouping-key-123", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + service := workflowservicemock.NewMockWorkflowServiceClient(mockCtrl) + + // Mock GetSystemInfo which gets called during client operations + service.EXPECT().GetSystemInfo(gomock.Any(), gomock.Any(), gomock.Any()). + Return(&workflowservice.GetSystemInfoResponse{}, nil).AnyTimes() + + var capturedRequest *workflowservice.RecordWorkerHeartbeatRequest + service.EXPECT(). + RecordWorkerHeartbeat(gomock.Any(), gomock.Any(), gomock.Any()). + Do(func(ctx context.Context, req *workflowservice.RecordWorkerHeartbeatRequest, opts ...interface{}) { + capturedRequest = req + }). + Return(&workflowservice.RecordWorkerHeartbeatResponse{}, nil). + Times(1) + + // Create a client with a specific workerGroupingKey + wfClient := NewServiceClient(service, nil, ClientOptions{ + Namespace: "test-namespace", + }) + + // Set the workerGroupingKey to our test value + wfClient.workerGroupingKey = tc.groupingKey + + // Create a sharedNamespaceWorker and call sendHeartbeats + heartbeatCtx, heartbeatCancel := context.WithCancel(context.Background()) + defer heartbeatCancel() + + hw := &sharedNamespaceWorker{ + client: wfClient, + namespace: "test-namespace", + heartbeatCtx: heartbeatCtx, + heartbeatCancel: heartbeatCancel, + callbacks: map[string]func() *workerpb.WorkerHeartbeat{ + "test-worker": func() *workerpb.WorkerHeartbeat { + return &workerpb.WorkerHeartbeat{ + WorkerIdentity: "test-worker-identity", + } + }, + }, + } + + // Call sendHeartbeats which should construct and send the request + err := hw.sendHeartbeats() + + // The operation should succeed + require.NoError(t, err) + + // Validate the captured request + require.NotNil(t, capturedRequest, "RecordWorkerHeartbeatRequest should have been captured") + assert.Equal(t, tc.expectedResourceID, capturedRequest.ResourceId, + "ResourceId should match the worker grouping key") + + // Additional validation + assert.Equal(t, "test-namespace", capturedRequest.Namespace, + "Namespace should be set correctly") + require.Len(t, capturedRequest.WorkerHeartbeat, 1, + "Should have one worker heartbeat") + assert.Equal(t, "test-worker-identity", capturedRequest.WorkerHeartbeat[0].WorkerIdentity, + "Worker identity should be set correctly") + }) + } +} diff --git a/test/go.mod b/test/go.mod index c992986d8..396eee9e8 100644 --- a/test/go.mod +++ b/test/go.mod @@ -13,7 +13,7 @@ require ( go.opentelemetry.io/otel v1.40.0 go.opentelemetry.io/otel/sdk v1.40.0 go.opentelemetry.io/otel/trace v1.40.0 - go.temporal.io/api v1.62.5 + go.temporal.io/api v1.62.6 go.temporal.io/sdk v1.29.1 go.temporal.io/sdk/contrib/opentelemetry v0.0.0-00010101000000-000000000000 go.temporal.io/sdk/contrib/opentracing v0.0.0-00010101000000-000000000000 diff --git a/test/go.sum b/test/go.sum index 4729e0af1..7b0001f06 100644 --- a/test/go.sum +++ b/test/go.sum @@ -174,8 +174,8 @@ go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4A go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg= go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw= go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= -go.temporal.io/api v1.62.5 h1:9R/9CeyM7xqHSlsNt+QIvapQLcRxCZ38bnXQx4mCN6I= -go.temporal.io/api v1.62.5/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.6 h1:JLH8y9URdY9WbdvwMXfGknlhohoPBrXOc9AbQkPInOc= +go.temporal.io/api v1.62.6/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=