diff --git a/contrib/aws/lambdaworker/go.mod b/contrib/aws/lambdaworker/go.mod index 08309dc18..38a7af22c 100644 --- a/contrib/aws/lambdaworker/go.mod +++ b/contrib/aws/lambdaworker/go.mod @@ -22,7 +22,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect - go.temporal.io/api v1.62.7 // indirect + go.temporal.io/api v1.62.8 // indirect golang.org/x/net v0.49.0 // indirect golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.40.0 // indirect diff --git a/contrib/aws/lambdaworker/go.sum b/contrib/aws/lambdaworker/go.sum index f4f460614..848b39d87 100644 --- a/contrib/aws/lambdaworker/go.sum +++ b/contrib/aws/lambdaworker/go.sum @@ -61,6 +61,8 @@ go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6 go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= go.temporal.io/api v1.62.7 h1:joCtF30Dr+ynzrFJySewZsWbyf4AETZpuizHhFIyj/o= go.temporal.io/api v1.62.7/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.8 h1:g8RAZmdebYODoNa2GLA4M4TsXNe1096WV3n26C4+fdw= +go.temporal.io/api v1.62.8/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/contrib/aws/lambdaworker/otel/go.mod b/contrib/aws/lambdaworker/otel/go.mod index 4fc834740..a3ca5da3d 100644 --- a/contrib/aws/lambdaworker/otel/go.mod +++ b/contrib/aws/lambdaworker/otel/go.mod @@ -35,7 +35,7 @@ require ( go.opentelemetry.io/otel/metric v1.42.0 // indirect go.opentelemetry.io/otel/trace v1.42.0 // indirect go.opentelemetry.io/proto/otlp v1.9.0 // indirect - go.temporal.io/api v1.62.7 // indirect + go.temporal.io/api v1.62.8 // indirect golang.org/x/net v0.51.0 // indirect golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.41.0 // indirect diff --git a/contrib/aws/lambdaworker/otel/go.sum b/contrib/aws/lambdaworker/otel/go.sum index 1248cddf5..211e8908d 100644 --- a/contrib/aws/lambdaworker/otel/go.sum +++ b/contrib/aws/lambdaworker/otel/go.sum @@ -70,6 +70,8 @@ go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjce go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4= go.temporal.io/api v1.62.7 h1:joCtF30Dr+ynzrFJySewZsWbyf4AETZpuizHhFIyj/o= go.temporal.io/api v1.62.7/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.8 h1:g8RAZmdebYODoNa2GLA4M4TsXNe1096WV3n26C4+fdw= +go.temporal.io/api v1.62.8/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/contrib/aws/s3driver/awssdkv2/go.mod b/contrib/aws/s3driver/awssdkv2/go.mod index 1e95b5995..b048d9455 100644 --- a/contrib/aws/s3driver/awssdkv2/go.mod +++ b/contrib/aws/s3driver/awssdkv2/go.mod @@ -8,7 +8,7 @@ require ( github.com/aws/smithy-go v1.24.2 github.com/johannesboyne/gofakes3 v0.0.0-20260208201424-4c385a1f6a73 github.com/stretchr/testify v1.10.0 - go.temporal.io/api v1.62.7 + go.temporal.io/api v1.62.8 go.temporal.io/sdk v1.25.1 go.temporal.io/sdk/contrib/aws/s3driver v0.0.0 ) diff --git a/contrib/aws/s3driver/awssdkv2/go.sum b/contrib/aws/s3driver/awssdkv2/go.sum index d4cea0fb6..428d727fa 100644 --- a/contrib/aws/s3driver/awssdkv2/go.sum +++ b/contrib/aws/s3driver/awssdkv2/go.sum @@ -82,6 +82,8 @@ go.shabbyrobe.org/gocovmerge v0.0.0-20230507111327-fa4f82cfbf4d h1:Ns9kd1Rwzw7t0 go.shabbyrobe.org/gocovmerge v0.0.0-20230507111327-fa4f82cfbf4d/go.mod h1:92Uoe3l++MlthCm+koNi0tcUCX3anayogF0Pa/sp24k= go.temporal.io/api v1.62.7 h1:joCtF30Dr+ynzrFJySewZsWbyf4AETZpuizHhFIyj/o= go.temporal.io/api v1.62.7/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.8 h1:g8RAZmdebYODoNa2GLA4M4TsXNe1096WV3n26C4+fdw= +go.temporal.io/api v1.62.8/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/contrib/aws/s3driver/go.mod b/contrib/aws/s3driver/go.mod index 50f80a7b8..4dfbc68e4 100644 --- a/contrib/aws/s3driver/go.mod +++ b/contrib/aws/s3driver/go.mod @@ -4,7 +4,7 @@ go 1.24.0 require ( github.com/stretchr/testify v1.10.0 - go.temporal.io/api v1.62.7 + go.temporal.io/api v1.62.8 go.temporal.io/sdk v1.25.1 golang.org/x/sync v0.19.0 google.golang.org/protobuf v1.36.11 diff --git a/contrib/aws/s3driver/go.sum b/contrib/aws/s3driver/go.sum index 5b22527db..01f632096 100644 --- a/contrib/aws/s3driver/go.sum +++ b/contrib/aws/s3driver/go.sum @@ -44,6 +44,8 @@ go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6 go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= go.temporal.io/api v1.62.7 h1:joCtF30Dr+ynzrFJySewZsWbyf4AETZpuizHhFIyj/o= go.temporal.io/api v1.62.7/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.8 h1:g8RAZmdebYODoNa2GLA4M4TsXNe1096WV3n26C4+fdw= +go.temporal.io/api v1.62.8/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/contrib/datadog/go.mod b/contrib/datadog/go.mod index 7aa6f69bf..1282029e6 100644 --- a/contrib/datadog/go.mod +++ b/contrib/datadog/go.mod @@ -81,7 +81,7 @@ require ( go.opentelemetry.io/otel/metric v1.40.0 // indirect go.opentelemetry.io/otel/sdk v1.40.0 // indirect go.opentelemetry.io/otel/trace v1.40.0 // indirect - go.temporal.io/api v1.62.7 // indirect + go.temporal.io/api v1.62.8 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect diff --git a/contrib/datadog/go.sum b/contrib/datadog/go.sum index f947e6eec..9197aa76f 100644 --- a/contrib/datadog/go.sum +++ b/contrib/datadog/go.sum @@ -235,6 +235,8 @@ go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.0.1 h1:z/oMlrCv3Ko go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.0.1/go.mod h1:C7EHYSIiaALi9RnNORCVaPCQDuJgJEn/XxkctaTez1E= go.temporal.io/api v1.62.7 h1:joCtF30Dr+ynzrFJySewZsWbyf4AETZpuizHhFIyj/o= go.temporal.io/api v1.62.7/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.8 h1:g8RAZmdebYODoNa2GLA4M4TsXNe1096WV3n26C4+fdw= +go.temporal.io/api v1.62.8/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= diff --git a/contrib/envconfig/go.mod b/contrib/envconfig/go.mod index 8406a6a90..24ba78048 100644 --- a/contrib/envconfig/go.mod +++ b/contrib/envconfig/go.mod @@ -20,7 +20,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect - go.temporal.io/api v1.62.7 // indirect + go.temporal.io/api v1.62.8 // indirect golang.org/x/net v0.49.0 // indirect golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.40.0 // indirect diff --git a/contrib/envconfig/go.sum b/contrib/envconfig/go.sum index 2449f41cc..88fbac23b 100644 --- a/contrib/envconfig/go.sum +++ b/contrib/envconfig/go.sum @@ -59,6 +59,8 @@ go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6 go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= go.temporal.io/api v1.62.7 h1:joCtF30Dr+ynzrFJySewZsWbyf4AETZpuizHhFIyj/o= go.temporal.io/api v1.62.7/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.8 h1:g8RAZmdebYODoNa2GLA4M4TsXNe1096WV3n26C4+fdw= +go.temporal.io/api v1.62.8/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/contrib/opentelemetry/go.mod b/contrib/opentelemetry/go.mod index d73004604..d95e881ef 100644 --- a/contrib/opentelemetry/go.mod +++ b/contrib/opentelemetry/go.mod @@ -32,7 +32,7 @@ require ( github.com/stretchr/objx v0.5.2 // indirect go.opentelemetry.io/otel/metric v1.40.0 go.opentelemetry.io/otel/sdk/metric v1.40.0 - go.temporal.io/api v1.62.7 // indirect + go.temporal.io/api v1.62.8 // indirect golang.org/x/net v0.49.0 // indirect golang.org/x/sys v0.40.0 // indirect golang.org/x/text v0.33.0 // indirect diff --git a/contrib/opentelemetry/go.sum b/contrib/opentelemetry/go.sum index 1e5b392ed..5324dfe09 100644 --- a/contrib/opentelemetry/go.sum +++ b/contrib/opentelemetry/go.sum @@ -58,6 +58,8 @@ go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZY go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= go.temporal.io/api v1.62.7 h1:joCtF30Dr+ynzrFJySewZsWbyf4AETZpuizHhFIyj/o= go.temporal.io/api v1.62.7/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.8 h1:g8RAZmdebYODoNa2GLA4M4TsXNe1096WV3n26C4+fdw= +go.temporal.io/api v1.62.8/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/contrib/opentracing/go.mod b/contrib/opentracing/go.mod index 4dc61aa05..ec90ca8e3 100644 --- a/contrib/opentracing/go.mod +++ b/contrib/opentracing/go.mod @@ -20,7 +20,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect - go.temporal.io/api v1.62.7 // indirect + go.temporal.io/api v1.62.8 // indirect golang.org/x/net v0.49.0 // indirect golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.40.0 // indirect diff --git a/contrib/opentracing/go.sum b/contrib/opentracing/go.sum index 9bf54514b..7b2760ef5 100644 --- a/contrib/opentracing/go.sum +++ b/contrib/opentracing/go.sum @@ -62,6 +62,8 @@ go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6 go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= go.temporal.io/api v1.62.7 h1:joCtF30Dr+ynzrFJySewZsWbyf4AETZpuizHhFIyj/o= go.temporal.io/api v1.62.7/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.8 h1:g8RAZmdebYODoNa2GLA4M4TsXNe1096WV3n26C4+fdw= +go.temporal.io/api v1.62.8/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/contrib/sysinfo/go.mod b/contrib/sysinfo/go.mod index add397a54..0a142a4c1 100644 --- a/contrib/sysinfo/go.mod +++ b/contrib/sysinfo/go.mod @@ -33,7 +33,7 @@ require ( github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect - go.temporal.io/api v1.62.7 // indirect + go.temporal.io/api v1.62.8 // indirect golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect golang.org/x/net v0.49.0 // indirect golang.org/x/sync v0.19.0 // indirect diff --git a/contrib/sysinfo/go.sum b/contrib/sysinfo/go.sum index 395d61b7a..56283d2d6 100644 --- a/contrib/sysinfo/go.sum +++ b/contrib/sysinfo/go.sum @@ -93,6 +93,8 @@ go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6 go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= go.temporal.io/api v1.62.7 h1:joCtF30Dr+ynzrFJySewZsWbyf4AETZpuizHhFIyj/o= go.temporal.io/api v1.62.7/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.8 h1:g8RAZmdebYODoNa2GLA4M4TsXNe1096WV3n26C4+fdw= +go.temporal.io/api v1.62.8/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/contrib/tally/go.mod b/contrib/tally/go.mod index eeff2666d..5bc6993c0 100644 --- a/contrib/tally/go.mod +++ b/contrib/tally/go.mod @@ -21,7 +21,7 @@ require ( github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/twmb/murmur3 v1.1.5 // indirect - go.temporal.io/api v1.62.7 // indirect + go.temporal.io/api v1.62.8 // indirect go.uber.org/atomic v1.9.0 // indirect golang.org/x/net v0.49.0 // indirect golang.org/x/sync v0.19.0 // indirect diff --git a/contrib/tally/go.sum b/contrib/tally/go.sum index 2f9917acd..3d77a6cdc 100644 --- a/contrib/tally/go.sum +++ b/contrib/tally/go.sum @@ -141,6 +141,8 @@ go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6 go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= go.temporal.io/api v1.62.7 h1:joCtF30Dr+ynzrFJySewZsWbyf4AETZpuizHhFIyj/o= go.temporal.io/api v1.62.7/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.8 h1:g8RAZmdebYODoNa2GLA4M4TsXNe1096WV3n26C4+fdw= +go.temporal.io/api v1.62.8/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= diff --git a/go.mod b/go.mod index 9f2602792..797db4309 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/nexus-rpc/sdk-go v0.6.0 github.com/robfig/cron v1.2.0 github.com/stretchr/testify v1.10.0 - go.temporal.io/api v1.62.7 + go.temporal.io/api v1.62.8 golang.org/x/sync v0.19.0 golang.org/x/sys v0.40.0 golang.org/x/time v0.3.0 diff --git a/go.sum b/go.sum index 88250bb40..a0fc0f04e 100644 --- a/go.sum +++ b/go.sum @@ -57,6 +57,8 @@ go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6 go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= go.temporal.io/api v1.62.7 h1:joCtF30Dr+ynzrFJySewZsWbyf4AETZpuizHhFIyj/o= go.temporal.io/api v1.62.7/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.8 h1:g8RAZmdebYODoNa2GLA4M4TsXNe1096WV3n26C4+fdw= +go.temporal.io/api v1.62.8/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/internal/cmd/build/go.mod b/internal/cmd/build/go.mod index d7d4205d5..a23b45188 100644 --- a/internal/cmd/build/go.mod +++ b/internal/cmd/build/go.mod @@ -22,7 +22,7 @@ require ( github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/stretchr/testify v1.10.0 // indirect - go.temporal.io/api v1.62.7 // indirect + go.temporal.io/api v1.62.8 // indirect golang.org/x/exp/typeparams v0.0.0-20250210185358-939b2ce775ac // indirect golang.org/x/mod v0.31.0 // indirect golang.org/x/net v0.49.0 // indirect diff --git a/internal/cmd/build/go.sum b/internal/cmd/build/go.sum index eb114eacb..2b7882eb7 100644 --- a/internal/cmd/build/go.sum +++ b/internal/cmd/build/go.sum @@ -61,6 +61,8 @@ go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6 go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= go.temporal.io/api v1.62.7 h1:joCtF30Dr+ynzrFJySewZsWbyf4AETZpuizHhFIyj/o= go.temporal.io/api v1.62.7/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.8 h1:g8RAZmdebYODoNa2GLA4M4TsXNe1096WV3n26C4+fdw= +go.temporal.io/api v1.62.8/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/internal/internal_activity_client.go b/internal/internal_activity_client.go index 08f0858eb..c51cc258f 100644 --- a/internal/internal_activity_client.go +++ b/internal/internal_activity_client.go @@ -303,7 +303,7 @@ func (d *ClientActivityExecutionDescription) GetHeartbeatDetails(valuePtrs ...an if details == nil { return ErrNoData } - if err := visitProtoPayloads(context.Background(), d.inboundPayloadVisitor, details); err != nil { + if err := visitProtoPayloads(context.Background(), d.inboundPayloadVisitor, details, 0); err != nil { return err } return d.dataConverter.FromPayloads(details, valuePtrs...) @@ -316,7 +316,7 @@ func (d *ClientActivityExecutionDescription) GetLastFailure() error { if failure == nil { return nil } - if err := visitProtoPayloads(context.Background(), d.inboundPayloadVisitor, failure); err != nil { + if err := visitProtoPayloads(context.Background(), d.inboundPayloadVisitor, failure, 0); err != nil { return err } return d.failureConverter.FailureToError(failure) @@ -592,7 +592,7 @@ func (w *workflowClientInterceptor) ExecuteActivity( ActivityID: request.ActivityId, ActivityType: in.ActivityType, }) - if err := visitProtoPayloads(storeCtx, w.client.outboundPayloadVisitor, request); err != nil { + if err := visitProtoPayloads(storeCtx, w.client.outboundPayloadVisitor, request, 0); err != nil { return nil, err } @@ -686,7 +686,7 @@ func (w *workflowClientInterceptor) PollActivityResult( } } - if err := visitProtoPayloads(ctx, w.client.inboundPayloadVisitor, resp); err != nil { + if err := visitProtoPayloads(ctx, w.client.inboundPayloadVisitor, resp, 0); err != nil { return nil, err } diff --git a/internal/internal_schedule_client.go b/internal/internal_schedule_client.go index c2c33ef32..405ce1c74 100644 --- a/internal/internal_schedule_client.go +++ b/internal/internal_schedule_client.go @@ -136,7 +136,7 @@ func (w *workflowClientInterceptor) CreateSchedule(ctx context.Context, in *Sche WorkflowID: action.GetStartWorkflow().GetWorkflowId(), WorkflowType: action.GetStartWorkflow().GetWorkflowType().GetName(), }) - if err := visitProtoPayloads(storeCtx, w.client.outboundPayloadVisitor, startRequest); err != nil { + if err := visitProtoPayloads(storeCtx, w.client.outboundPayloadVisitor, startRequest, 0); err != nil { return nil, err } @@ -307,7 +307,7 @@ func (scheduleHandle *scheduleHandleImpl) Update(ctx context.Context, options Sc WorkflowID: newSchedulePB.GetAction().GetStartWorkflow().GetWorkflowId(), WorkflowType: newSchedulePB.GetAction().GetStartWorkflow().GetWorkflowType().GetName(), }) - if err := visitProtoPayloads(storeCtx, scheduleHandle.client.outboundPayloadVisitor, updateRequest); err != nil { + if err := visitProtoPayloads(storeCtx, scheduleHandle.client.outboundPayloadVisitor, updateRequest, 0); err != nil { return err } diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 72a74ed93..964ce842f 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -41,8 +41,9 @@ const ( noRetryBackoff = time.Duration(-1) - defaultDefaultHeartbeatThrottleInterval = 30 * time.Second - defaultMaxHeartbeatThrottleInterval = 60 * time.Second + defaultDefaultHeartbeatThrottleInterval = 30 * time.Second + defaultMaxHeartbeatThrottleInterval = 60 * time.Second + defaultMaxConcurrentWorkflowTaskExternalStorageVisits = 3 ) var ( diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index 29381dea2..4923a062d 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -123,7 +123,8 @@ type ( numNormalPollerMetric *numPollerMetric numStickyPollerMetric *numPollerMetric - inboundPayloadVisitor PayloadVisitor + inboundPayloadVisitor PayloadVisitor + payloadVisitorConcurrency int } // workflowTaskProcessor implements processing of a workflow task and can create @@ -152,8 +153,9 @@ type ( numNormalPollerMetric *numPollerMetric numStickyPollerMetric *numPollerMetric - inboundPayloadVisitor PayloadVisitor - outboundPayloadVisitor PayloadVisitor + inboundPayloadVisitor PayloadVisitor + outboundPayloadVisitor PayloadVisitor + payloadVisitorConcurrency int } // activityTaskPoller implements polling/processing a workflow task @@ -188,8 +190,9 @@ type ( // payload visitor to each page fetched, resolving external storage references // in paginated history events that were not part of the initial poll response. retrievingHistoryIterator struct { - inner HistoryIterator - inboundVisitor PayloadVisitor + inner HistoryIterator + inboundVisitor PayloadVisitor + payloadVisitorConcurrency int } localActivityTaskPoller struct { @@ -389,6 +392,7 @@ func newWorkflowTaskProcessor( numStickyPollerMetric: newNumPollerMetric(params.MetricsHandler, metrics.PollerTypeWorkflowStickyTask), inboundPayloadVisitor: params.inboundPayloadVisitor, outboundPayloadVisitor: params.outboundPayloadVisitor, + payloadVisitorConcurrency: params.payloadVisitorConcurrency, } } @@ -426,6 +430,7 @@ func (wtp *workflowTaskProcessor) createPoller(mode workflowTaskPollerMode) task numNormalPollerMetric: wtp.numNormalPollerMetric, numStickyPollerMetric: wtp.numStickyPollerMetric, inboundPayloadVisitor: wtp.inboundPayloadVisitor, + payloadVisitorConcurrency: wtp.payloadVisitorConcurrency, } } @@ -462,7 +467,7 @@ func (wtp *workflowTaskProcessor) processWorkflowTask(task *workflowTask) (retEr downloadPayloadMetrics := &workflowTaskStorageMetrics{logger: wtp.logger} ctx := context.WithValue(context.Background(), storageOperationCallbackContextKey, downloadPayloadMetrics) - if err := visitProtoPayloads(ctx, wtp.inboundPayloadVisitor, task.task); err != nil { + if err := visitProtoPayloads(ctx, wtp.inboundPayloadVisitor, task.task, wtp.payloadVisitorConcurrency); err != nil { // Submit an explicit WFT failure so the server records the error immediately // rather than waiting for the task to time out. failReq := wtp.errorToFailWorkflowTask(task.task.TaskToken, err) @@ -520,7 +525,7 @@ func (wtp *workflowTaskProcessor) processWorkflowTask(task *workflowTask) (retEr return nil, nil } task := wtp.toWorkflowTask(heartbeatResponse.WorkflowTask) - if err := visitProtoPayloads(ctx, wtp.inboundPayloadVisitor, task.task); err != nil { + if err := visitProtoPayloads(ctx, wtp.inboundPayloadVisitor, task.task, wtp.payloadVisitorConcurrency); err != nil { return nil, err } task.doneCh = doneCh @@ -558,7 +563,7 @@ func (wtp *workflowTaskProcessor) processWorkflowTask(task *workflowTask) (retEr // we are getting new workflow task, so reset the workflowTask and continue process the new one task = wtp.toWorkflowTask(response.WorkflowTask) - if err := visitProtoPayloads(ctx, wtp.inboundPayloadVisitor, task.task); err != nil { + if err := visitProtoPayloads(ctx, wtp.inboundPayloadVisitor, task.task, wtp.payloadVisitorConcurrency); err != nil { return err } } @@ -652,7 +657,7 @@ func (wtp *workflowTaskProcessor) RespondTaskCompletedWithMetrics( RunID: task.WorkflowExecution.GetRunId(), WorkflowType: task.WorkflowType.GetName(), }) - if err = visitProtoPayloadsWithContextHook(ctx, wtp.outboundPayloadVisitor, taskCompletion.rawRequest, wtp.commandAwareContextHook(workflowInfo)); err != nil { + if err = visitProtoPayloadsWithContextHook(ctx, wtp.outboundPayloadVisitor, taskCompletion.rawRequest, wtp.payloadVisitorConcurrency, wtp.commandAwareContextHook(workflowInfo)); err != nil { // The outbound visitor failed (e.g. storage driver error or panic). We // cannot send the original response, so fall back to an explicit WFT // failure so the server records the error immediately. @@ -800,7 +805,7 @@ func (wtp *workflowTaskProcessor) reportGrpcMessageTooLarge( emitFailMetric = true request := wtp.errorToFailWorkflowTask(task.TaskToken, sendErr) request.Cause = enumspb.WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE - if err = visitProtoPayloads(ctx, wtp.outboundPayloadVisitor, request); err != nil { + if err = visitProtoPayloads(ctx, wtp.outboundPayloadVisitor, request, wtp.payloadVisitorConcurrency); err != nil { wtp.logger.Error("Failed to visit payloads for GRPC message too large failure response.", tagError, err) return } @@ -814,7 +819,7 @@ func (wtp *workflowTaskProcessor) reportGrpcMessageTooLarge( Failure: wtp.failureConverter.ErrorToFailure(sendErr), Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE, } - if err = visitProtoPayloads(ctx, wtp.outboundPayloadVisitor, request); err != nil { + if err = visitProtoPayloads(ctx, wtp.outboundPayloadVisitor, request, wtp.payloadVisitorConcurrency); err != nil { wtp.logger.Error("Failed to visit payloads for GRPC message too large query failure response.", tagError, err) return } @@ -1235,7 +1240,8 @@ func (wtp *workflowTaskPoller) toWorkflowTask(response *workflowservice.PollWork metricsHandler: wtp.metricsHandler, taskQueue: wtp.taskQueueName, }, - inboundVisitor: wtp.inboundPayloadVisitor, + inboundVisitor: wtp.inboundPayloadVisitor, + payloadVisitorConcurrency: wtp.payloadVisitorConcurrency, }, } } @@ -1253,7 +1259,8 @@ func (wtp *workflowTaskProcessor) toWorkflowTask(response *workflowservice.PollW metricsHandler: wtp.metricsHandler, taskQueue: wtp.taskQueueName, }, - inboundVisitor: wtp.inboundPayloadVisitor, + inboundVisitor: wtp.inboundPayloadVisitor, + payloadVisitorConcurrency: wtp.payloadVisitorConcurrency, }, } } @@ -1292,7 +1299,7 @@ func (r *retrievingHistoryIterator) GetNextPage() (*historypb.History, error) { if err != nil || history == nil { return history, err } - if err := visitProtoPayloads(context.Background(), r.inboundVisitor, history); err != nil { + if err := visitProtoPayloads(context.Background(), r.inboundVisitor, history, r.payloadVisitorConcurrency); err != nil { return nil, err } return history, nil @@ -1462,7 +1469,8 @@ func (atp *activityTaskPoller) ProcessTask(task interface{}) error { executionStartTime := time.Now() - if err := visitProtoPayloads(context.Background(), atp.inboundPayloadVisitor, activityTask.task); err != nil { + // Activity execution does not benefit from concurrent payload visiting. + if err := visitProtoPayloads(context.Background(), atp.inboundPayloadVisitor, activityTask.task, 0); err != nil { return err } @@ -1494,7 +1502,8 @@ func (atp *activityTaskPoller) ProcessTask(task interface{}) error { } } outboundCtx := context.WithValue(context.Background(), storageTargetContextKey, storageTarget) - if err := visitProtoPayloads(outboundCtx, atp.outboundPayloadVisitor, msg); err != nil { + // Activity execution does not benefit from concurrent payload visiting. + if err := visitProtoPayloads(outboundCtx, atp.outboundPayloadVisitor, msg, 0); err != nil { return err } } diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 985ffaf6b..9944def66 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -227,6 +227,8 @@ type ( inboundPayloadVisitor PayloadVisitor outboundPayloadVisitor PayloadVisitor + + payloadVisitorConcurrency int } // HistoryJSONOptions are options for HistoryFromJSON. @@ -1979,7 +1981,7 @@ func (aw *WorkflowReplayer) replayWorkflowHistoryRoot( // task handler. This mirrors what processWorkflowTask does for live workers. replayStorageCb := &replayStorageMetrics{logger: logger} inboundPayloadVisitorCtx := context.WithValue(context.Background(), storageOperationCallbackContextKey, replayStorageCb) - if err := visitProtoPayloads(inboundPayloadVisitorCtx, aw.inboundPayloadVisitor, task); err != nil { + if err := visitProtoPayloads(inboundPayloadVisitorCtx, aw.inboundPayloadVisitor, task, 0); err != nil { return err } @@ -2161,6 +2163,10 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke panic("cannot set both DeploymentOptions.DefaultVersioningBehavior if DeploymentOptions.UseBuildIDForVersioning is false") } + if options.MaxConcurrentWorkflowTaskExternalStorageVisits < 0 { + panic("MaxConcurrentWorkflowTaskExternalStorageVisits must not be negative") + } + // Need reference to result for fatal error handler var aw *AggregatedWorker fatalErrorCallback := func(err error) { @@ -2239,9 +2245,10 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke pollTimeTracker: &pollTimeTracker{}, workerInstanceKey: workerInstanceKey, workerPollCompleteOnShutdown: workerPollCompleteOnShutdown, - serverSupportsAutoscaling: &atomic.Bool{}, - inboundPayloadVisitor: client.inboundPayloadVisitor, - outboundPayloadVisitor: client.outboundPayloadVisitor, + serverSupportsAutoscaling: &atomic.Bool{}, + inboundPayloadVisitor: client.inboundPayloadVisitor, + outboundPayloadVisitor: client.outboundPayloadVisitor, + payloadVisitorConcurrency: options.MaxConcurrentWorkflowTaskExternalStorageVisits, } if options.MaxConcurrentWorkflowTaskPollers != 0 { @@ -2631,6 +2638,9 @@ func setWorkerOptionsDefaults(options *WorkerOptions) { if options.MaxHeartbeatThrottleInterval == 0 { options.MaxHeartbeatThrottleInterval = defaultMaxHeartbeatThrottleInterval } + if options.MaxConcurrentWorkflowTaskExternalStorageVisits == 0 { + options.MaxConcurrentWorkflowTaskExternalStorageVisits = defaultMaxConcurrentWorkflowTaskExternalStorageVisits + } if options.Tuner == nil { // Err cannot happen since these slot numbers are guaranteed valid options.Tuner, _ = NewFixedSizeTuner(FixedSizeTunerOptions{ diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index bbd452fc2..b43f17adf 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -2937,6 +2937,9 @@ func TestWorkerOptionInvalid(t *testing.T) { require.Panics(t, func() { NewAggregatedWorker(&WorkflowClient{}, "worker-options-tq", WorkerOptions{MaxConcurrentWorkflowTaskPollers: 1}) }) + require.Panics(t, func() { + NewAggregatedWorker(&WorkflowClient{}, "worker-options-tq", WorkerOptions{MaxConcurrentWorkflowTaskExternalStorageVisits: -1}) + }) } func TestWorkerOptionDefaults(t *testing.T) { @@ -2978,6 +2981,7 @@ func TestWorkerOptionDefaults(t *testing.T) { MetricsHandler: workflowWorker.executionParameters.MetricsHandler, Identity: workflowWorker.executionParameters.Identity, BackgroundContext: workflowWorker.executionParameters.BackgroundContext, + payloadVisitorConcurrency: 3, } assertWorkerExecutionParamsEqual(t, expected, workflowWorker.executionParameters) @@ -3006,17 +3010,18 @@ func TestWorkerOptionNonDefaults(t *testing.T) { } options := WorkerOptions{ - TaskQueueActivitiesPerSecond: 8888, - MaxConcurrentSessionExecutionSize: 3333, - MaxConcurrentWorkflowTaskExecutionSize: 2222, - MaxConcurrentActivityExecutionSize: 1111, - MaxConcurrentLocalActivityExecutionSize: 101, - MaxConcurrentWorkflowTaskPollers: 11, - MaxConcurrentActivityTaskPollers: 12, - WorkerLocalActivitiesPerSecond: 222, - WorkerActivitiesPerSecond: 99, - StickyScheduleToStartTimeout: 555 * time.Minute, - BackgroundActivityContext: context.Background(), + TaskQueueActivitiesPerSecond: 8888, + MaxConcurrentSessionExecutionSize: 3333, + MaxConcurrentWorkflowTaskExecutionSize: 2222, + MaxConcurrentActivityExecutionSize: 1111, + MaxConcurrentLocalActivityExecutionSize: 101, + MaxConcurrentWorkflowTaskPollers: 11, + MaxConcurrentActivityTaskPollers: 12, + WorkerLocalActivitiesPerSecond: 222, + WorkerActivitiesPerSecond: 99, + StickyScheduleToStartTimeout: 555 * time.Minute, + BackgroundActivityContext: context.Background(), + MaxConcurrentWorkflowTaskExternalStorageVisits: 7, } aggWorker := NewAggregatedWorker(client, taskQueue, options) @@ -3047,6 +3052,7 @@ func TestWorkerOptionNonDefaults(t *testing.T) { Logger: client.logger, MetricsHandler: client.metricsHandler, Identity: client.identity, + payloadVisitorConcurrency: options.MaxConcurrentWorkflowTaskExternalStorageVisits, } assertWorkerExecutionParamsEqual(t, expected, workflowWorker.executionParameters) @@ -3096,6 +3102,7 @@ func TestLocalActivityWorkerOnly(t *testing.T) { MetricsHandler: workflowWorker.executionParameters.MetricsHandler, Identity: workflowWorker.executionParameters.Identity, BackgroundContext: workflowWorker.executionParameters.BackgroundContext, + payloadVisitorConcurrency: 3, } assertWorkerExecutionParamsEqual(t, expected, workflowWorker.executionParameters) @@ -3118,6 +3125,7 @@ func assertWorkerExecutionParamsEqual(t *testing.T, paramsA workerExecutionParam require.Equal(t, paramsA.ActivityTaskPollerBehavior, paramsB.ActivityTaskPollerBehavior) require.Equal(t, paramsA.WorkflowPanicPolicy, paramsB.WorkflowPanicPolicy) require.Equal(t, paramsA.EnableLoggingInReplay, paramsB.EnableLoggingInReplay) + require.Equal(t, paramsA.payloadVisitorConcurrency, paramsB.payloadVisitorConcurrency) } // Encode function args diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 0503319e8..44ca3ad01 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -521,7 +521,7 @@ func (wc *WorkflowClient) CompleteActivityWithOptions(ctx context.Context, opts WorkflowID: opts.WorkflowID, WorkflowType: opts.WorkflowType, }) - if err := visitProtoPayloads(storeCtx, wc.outboundPayloadVisitor, msg); err != nil { + if err := visitProtoPayloads(storeCtx, wc.outboundPayloadVisitor, msg, 0); err != nil { return err } } @@ -583,7 +583,7 @@ func (wc *WorkflowClient) CompleteActivityByIDWithOptions(ctx context.Context, o RunID: opts.RunID, WorkflowType: opts.WorkflowType, }) - if err := visitProtoPayloads(storeCtx, wc.outboundPayloadVisitor, msg); err != nil { + if err := visitProtoPayloads(storeCtx, wc.outboundPayloadVisitor, msg, 0); err != nil { return err } } @@ -639,7 +639,7 @@ func (wc *WorkflowClient) CompleteActivityByActivityIDWithOptions(ctx context.Co RunID: opts.ActivityRunID, ActivityType: opts.ActivityType, }) - if err := visitProtoPayloads(storeCtx, wc.outboundPayloadVisitor, msg); err != nil { + if err := visitProtoPayloads(storeCtx, wc.outboundPayloadVisitor, msg, 0); err != nil { return err } } @@ -1813,7 +1813,7 @@ func (workflowRun *workflowRunImpl) GetWithOptions( return err } - if err := visitProtoPayloads(ctx, workflowRun.inboundPayloadVisitor, closeEvent); err != nil { + if err := visitProtoPayloads(ctx, workflowRun.inboundPayloadVisitor, closeEvent, 0); err != nil { return err } @@ -2079,7 +2079,7 @@ func (w *workflowClientInterceptor) ExecuteWorkflow( WorkflowID: startRequest.WorkflowId, WorkflowType: in.WorkflowType, }) - if err := visitProtoPayloads(storeCtx, w.client.outboundPayloadVisitor, startRequest); err != nil { + if err := visitProtoPayloads(storeCtx, w.client.outboundPayloadVisitor, startRequest, 0); err != nil { return nil, err } @@ -2244,7 +2244,7 @@ func (w *workflowClientInterceptor) updateWithStartWorkflow( WorkflowID: startRequest.WorkflowId, WorkflowType: startRequest.WorkflowType.GetName(), }) - if err := visitProtoPayloads(storeCtx, w.client.outboundPayloadVisitor, &multiRequest); err != nil { + if err := visitProtoPayloads(storeCtx, w.client.outboundPayloadVisitor, &multiRequest, 0); err != nil { return nil, err } @@ -2345,7 +2345,7 @@ func (w *workflowClientInterceptor) updateWithStartWorkflow( break } } - if err := visitProtoPayloads(ctx, w.client.inboundPayloadVisitor, updateResp); err != nil { + if err := visitProtoPayloads(ctx, w.client.inboundPayloadVisitor, updateResp, 0); err != nil { return nil, err } return updateResp, nil @@ -2394,7 +2394,7 @@ func (w *workflowClientInterceptor) SignalWorkflow(ctx context.Context, in *Clie WorkflowID: in.WorkflowID, RunID: in.RunID, }) - if err := visitProtoPayloads(storeCtx, w.client.outboundPayloadVisitor, request); err != nil { + if err := visitProtoPayloads(storeCtx, w.client.outboundPayloadVisitor, request, 0); err != nil { return err } @@ -2482,7 +2482,7 @@ func (w *workflowClientInterceptor) SignalWithStartWorkflow( WorkflowID: in.Options.ID, WorkflowType: in.WorkflowType, }) - if err := visitProtoPayloads(storeCtx, w.client.outboundPayloadVisitor, signalWithStartRequest); err != nil { + if err := visitProtoPayloads(storeCtx, w.client.outboundPayloadVisitor, signalWithStartRequest, 0); err != nil { return nil, err } @@ -2564,7 +2564,7 @@ func (w *workflowClientInterceptor) TerminateWorkflow(ctx context.Context, in *C WorkflowID: in.WorkflowID, RunID: in.RunID, }) - if err := visitProtoPayloads(storeCtx, w.client.outboundPayloadVisitor, request); err != nil { + if err := visitProtoPayloads(storeCtx, w.client.outboundPayloadVisitor, request, 0); err != nil { return err } @@ -2699,7 +2699,7 @@ func (w *workflowClientInterceptor) QueryWorkflow( WorkflowID: in.WorkflowID, RunID: in.RunID, }) - if err := visitProtoPayloads(storeCtx, w.client.outboundPayloadVisitor, req); err != nil { + if err := visitProtoPayloads(storeCtx, w.client.outboundPayloadVisitor, req, 0); err != nil { return nil, err } @@ -2711,7 +2711,7 @@ func (w *workflowClientInterceptor) QueryWorkflow( return nil, err } - if err := visitProtoPayloads(ctx, w.client.inboundPayloadVisitor, resp); err != nil { + if err := visitProtoPayloads(ctx, w.client.inboundPayloadVisitor, resp, 0); err != nil { return nil, err } @@ -2738,7 +2738,7 @@ func (w *workflowClientInterceptor) UpdateWorkflow( WorkflowID: in.WorkflowID, RunID: in.RunID, }) - if err := visitProtoPayloads(storeCtx, w.client.outboundPayloadVisitor, req); err != nil { + if err := visitProtoPayloads(storeCtx, w.client.outboundPayloadVisitor, req, 0); err != nil { return nil, err } @@ -2764,7 +2764,7 @@ func (w *workflowClientInterceptor) UpdateWorkflow( } } - if err := visitProtoPayloads(ctx, w.client.inboundPayloadVisitor, resp); err != nil { + if err := visitProtoPayloads(ctx, w.client.inboundPayloadVisitor, resp, 0); err != nil { return nil, err } @@ -2898,7 +2898,7 @@ func (w *workflowClientInterceptor) PollWorkflowUpdate( return nil, err } - if err := visitProtoPayloads(parentCtx, w.client.inboundPayloadVisitor, resp); err != nil { + if err := visitProtoPayloads(parentCtx, w.client.inboundPayloadVisitor, resp, 0); err != nil { return nil, err } diff --git a/internal/payload_visitor.go b/internal/payload_visitor.go index 7baa69422..9e0c8b0c7 100644 --- a/internal/payload_visitor.go +++ b/internal/payload_visitor.go @@ -16,11 +16,11 @@ type PayloadVisitor interface { // attributes. If visitor is nil, msg is unchanged. An optional ContextHook may // be supplied to override the context for specific message subtrees during // traversal (see [proxy.VisitPayloadsOptions.ContextHook]). -func visitProtoPayloads(ctx context.Context, visitor PayloadVisitor, msg proto.Message) error { - return visitProtoPayloadsWithContextHook(ctx, visitor, msg, nil) +func visitProtoPayloads(ctx context.Context, visitor PayloadVisitor, msg proto.Message, concurrencyLimit int) error { + return visitProtoPayloadsWithContextHook(ctx, visitor, msg, concurrencyLimit, nil) } -func visitProtoPayloadsWithContextHook(ctx context.Context, visitor PayloadVisitor, msg proto.Message, hook func(context.Context, proto.Message) (context.Context, error)) error { +func visitProtoPayloadsWithContextHook(ctx context.Context, visitor PayloadVisitor, msg proto.Message, concurrencyLimit int, hook func(context.Context, proto.Message) (context.Context, error)) error { if visitor == nil { return nil } @@ -28,6 +28,7 @@ func visitProtoPayloadsWithContextHook(ctx context.Context, visitor PayloadVisit Visitor: visitor.Visit, SkipSearchAttributes: true, ContextHook: hook, + ConcurrencyLimit: concurrencyLimit, } return proxy.VisitPayloads(ctx, msg, opts) } diff --git a/internal/payload_visitor_test.go b/internal/payload_visitor_test.go new file mode 100644 index 000000000..433a19828 --- /dev/null +++ b/internal/payload_visitor_test.go @@ -0,0 +1,106 @@ +package internal + +import ( + "context" + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + commandpb "go.temporal.io/api/command/v1" + commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" + "go.temporal.io/api/proxy" + workflowservice "go.temporal.io/api/workflowservice/v1" +) + +// visitorFunc is a PayloadVisitor backed by a plain function, used in tests. +type visitorFunc func(*proxy.VisitPayloadsContext, []*commonpb.Payload) ([]*commonpb.Payload, error) + +func (f visitorFunc) Visit(ctx *proxy.VisitPayloadsContext, p []*commonpb.Payload) ([]*commonpb.Payload, error) { + return f(ctx, p) +} + +// scheduleActivitiesRequest builds a RespondWorkflowTaskCompletedRequest with n +// ScheduleActivityTask commands, each carrying an Input Payloads field. +// proxy.VisitPayloads spawns one concurrent goroutine per *commonpb.Payloads +// field, giving us a controlled number of concurrent visitor calls. +func scheduleActivitiesRequest(n int) *workflowservice.RespondWorkflowTaskCompletedRequest { + commands := make([]*commandpb.Command, n) + for i := range n { + commands[i] = &commandpb.Command{ + CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK, + Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ + ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{ + ActivityId: fmt.Sprintf("activity-%d", i), + ActivityType: &commonpb.ActivityType{Name: "test-activity"}, + Input: &commonpb.Payloads{Payloads: []*commonpb.Payload{{Data: []byte("input")}}}, + }, + }, + } + } + return &workflowservice.RespondWorkflowTaskCompletedRequest{Commands: commands} +} + +// TestVisitProtoPayloads_ConcurrentVisitors verifies that when concurrencyLimit +// equals the number of payload groups, all visitor calls overlap. +func TestVisitProtoPayloads_ConcurrentVisitors(t *testing.T) { + const n = 4 + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + arrived := make(chan struct{}, n) + release := make(chan struct{}) + + visitor := visitorFunc(func(_ *proxy.VisitPayloadsContext, p []*commonpb.Payload) ([]*commonpb.Payload, error) { + select { + case arrived <- struct{}{}: + case <-ctx.Done(): + return nil, ctx.Err() + } + select { + case <-release: + case <-ctx.Done(): + return nil, ctx.Err() + } + return p, nil + }) + + go func() { + for range n { + select { + case <-arrived: + case <-ctx.Done(): + return + } + } + close(release) + }() + + require.NoError(t, visitProtoPayloads(ctx, visitor, scheduleActivitiesRequest(n), n)) +} + +// TestVisitProtoPayloads_SequentialVisitors verifies that concurrencyLimit=1 +// prevents any overlap between visitor calls. +func TestVisitProtoPayloads_SequentialVisitors(t *testing.T) { + const n = 4 + + var current, peak int64 + visitor := visitorFunc(func(_ *proxy.VisitPayloadsContext, p []*commonpb.Payload) ([]*commonpb.Payload, error) { + cur := atomic.AddInt64(¤t, 1) + for { + old := atomic.LoadInt64(&peak) + if cur <= old || atomic.CompareAndSwapInt64(&peak, old, cur) { + break + } + } + atomic.AddInt64(¤t, -1) + return p, nil + }) + + require.NoError(t, visitProtoPayloads(context.Background(), visitor, scheduleActivitiesRequest(n), 1)) + require.EqualValues(t, 1, peak, + "expected at most 1 concurrent visitor call with concurrencyLimit=1") +} diff --git a/internal/worker.go b/internal/worker.go index 367fdf9b0..2bf23d815 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -392,6 +392,21 @@ type ( // // NOTE: Experimental Plugins []WorkerPlugin + + // MaxConcurrentWorkflowTaskExternalStorageVisits sets how many external + // storage operations (reads or writes) may run in parallel when the worker + // processes a single workflow task. When a workflow task contains many large + // payloads that need to be fetched from or uploaded to external storage, + // raising this value can reduce latency by overlapping those calls. Lower + // values reduce pressure on the storage backend. + // A value of 0 uses the default. Set to 1 to disable parallelism. + // Please report any issues you encounter with this setting or if you feel the + // default should be changed. + // + // NOTE: Experimental + // + // default: 3 + MaxConcurrentWorkflowTaskExternalStorageVisits int } ) diff --git a/test/go.mod b/test/go.mod index 4cf7bbe2e..83936358a 100644 --- a/test/go.mod +++ b/test/go.mod @@ -13,7 +13,7 @@ require ( go.opentelemetry.io/otel v1.40.0 go.opentelemetry.io/otel/sdk v1.40.0 go.opentelemetry.io/otel/trace v1.40.0 - go.temporal.io/api v1.62.7 + go.temporal.io/api v1.62.8 go.temporal.io/sdk v1.29.1 go.temporal.io/sdk/contrib/opentelemetry v0.0.0-00010101000000-000000000000 go.temporal.io/sdk/contrib/opentracing v0.0.0-00010101000000-000000000000 diff --git a/test/go.sum b/test/go.sum index 6d569e2b1..fd29fe9fa 100644 --- a/test/go.sum +++ b/test/go.sum @@ -178,6 +178,8 @@ go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZY go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= go.temporal.io/api v1.62.7 h1:joCtF30Dr+ynzrFJySewZsWbyf4AETZpuizHhFIyj/o= go.temporal.io/api v1.62.7/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.8 h1:g8RAZmdebYODoNa2GLA4M4TsXNe1096WV3n26C4+fdw= +go.temporal.io/api v1.62.8/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= diff --git a/worker/worker_test.go b/worker/worker_test.go new file mode 100644 index 000000000..83b9f1ae1 --- /dev/null +++ b/worker/worker_test.go @@ -0,0 +1,19 @@ +package worker_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" +) + +func TestNew_NegativeMaxConcurrentWorkflowTaskExternalStorageVisits(t *testing.T) { + c, err := client.NewLazyClient(client.Options{}) + require.NoError(t, err) + require.PanicsWithValue(t, "MaxConcurrentWorkflowTaskExternalStorageVisits must not be negative", func() { + worker.New(c, "my-task-queue", worker.Options{ + MaxConcurrentWorkflowTaskExternalStorageVisits: -1, + }) + }) +}