diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4bc26af..35ab006 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,7 +13,7 @@ jobs: strategy: fail-fast: true matrix: - go: ['1.23'] + go: ['1.24'] os: ['ubuntu-latest'] runs-on: ${{ matrix.os }} diff --git a/go.mod b/go.mod index 63fc489..d3229e5 100644 --- a/go.mod +++ b/go.mod @@ -1,37 +1,35 @@ module goa.design/pulse -go 1.22.7 - -toolchain go1.23.1 +go 1.24.0 require ( github.com/gorilla/websocket v1.5.3 github.com/oklog/ulid/v2 v2.1.0 - github.com/redis/go-redis/v9 v9.7.0 + github.com/redis/go-redis/v9 v9.7.1 github.com/stretchr/testify v1.10.0 go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.59.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.34.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0 - goa.design/clue v1.0.7 - goa.design/goa/v3 v3.19.1 + goa.design/clue v1.1.0 + goa.design/goa/v3 v3.20.0 goa.design/model v1.9.8 - google.golang.org/grpc v1.69.4 + google.golang.org/grpc v1.71.0 ) require ( - github.com/aws/smithy-go v1.22.1 // indirect + github.com/aws/smithy-go v1.22.3 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dimfeld/httppath v0.0.0-20170720192232-ee938bf73598 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect - github.com/go-chi/chi/v5 v5.2.0 // indirect + github.com/go-chi/chi/v5 v5.2.1 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/google/uuid v1.6.0 // indirect - github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect github.com/manveru/faker v0.0.0-20171103152722-9fbc68a78c4d // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect @@ -44,16 +42,16 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.34.0 // indirect go.opentelemetry.io/otel/trace v1.34.0 // indirect go.opentelemetry.io/proto/otlp v1.5.0 // indirect - golang.org/x/mod v0.22.0 // indirect - golang.org/x/net v0.34.0 // indirect - golang.org/x/sync v0.10.0 // indirect - golang.org/x/sys v0.29.0 // indirect - golang.org/x/term v0.28.0 // indirect - golang.org/x/text v0.21.0 // indirect - golang.org/x/tools v0.29.0 // indirect - google.golang.org/genproto v0.0.0-20250115164207-1a7da9e5054f // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect - google.golang.org/protobuf v1.36.3 // indirect + golang.org/x/mod v0.23.0 // indirect + golang.org/x/net v0.36.0 // indirect + golang.org/x/sync v0.11.0 // indirect + golang.org/x/sys v0.30.0 // indirect + golang.org/x/term v0.29.0 // indirect + golang.org/x/text v0.22.0 // indirect + golang.org/x/tools v0.30.0 // indirect + google.golang.org/genproto v0.0.0-20250303144028-a0af3efb3deb // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb // indirect + google.golang.org/protobuf v1.36.5 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 9388357..643ff69 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/aws/smithy-go v1.22.1 h1:/HPHZQ0g7f4eUeK6HKglFz8uwVfZKgoI25rb/J+dnro= -github.com/aws/smithy-go v1.22.1/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= +github.com/aws/smithy-go v1.22.3 h1:Z//5NuZCSW6R4PhQ93hShNbyBbn8BWCmCVCt+Q8Io5k= +github.com/aws/smithy-go v1.22.3/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= @@ -16,8 +16,8 @@ github.com/dimfeld/httppath v0.0.0-20170720192232-ee938bf73598 h1:MGKhKyiYrvMDZs github.com/dimfeld/httppath v0.0.0-20170720192232-ee938bf73598/go.mod h1:0FpDmbrt36utu8jEmeU05dPC9AB5tsLYVVi+ZHfyuwI= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/go-chi/chi/v5 v5.2.0 h1:Aj1EtB0qR2Rdo2dG4O94RIU35w2lvQSj6BRA4+qwFL0= -github.com/go-chi/chi/v5 v5.2.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= +github.com/go-chi/chi/v5 v5.2.1 h1:KOIHODQj58PmL80G2Eak4WdvUzjSJSm0vG72crDCqb8= +github.com/go-chi/chi/v5 v5.2.1/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -25,14 +25,14 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 h1:VNqngBF40hVlDloBruUehVYC3ArSgIyScOAyMRqBxRg= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1/go.mod h1:RBRO7fro65R6tjKzYgLAFo0t1QEXY1Dp+i/bvpRiqiQ= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 h1:5ZPtiqj0JL5oKWmcsq4VMaAW5ukBEgSGXEN89zeH1Jo= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3/go.mod h1:ndYquD05frm2vACXE1nsccT4oJzjhw2arTS2cpUD1PI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -48,8 +48,8 @@ github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNs github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= -github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= +github.com/redis/go-redis/v9 v9.7.1 h1:4LhKRCIduqXqtvCUlaq9c8bdHOkICjDMrr1+Zb3osAc= +github.com/redis/go-redis/v9 v9.7.1/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= @@ -72,10 +72,10 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0 h1:tgJ0u go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0/go.mod h1:U7HYyW0zt/a9x5J1Kjs+r1f/d4ZHnYFclhYY2+YbeoE= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.34.0 h1:BEj3SPM81McUZHYjRS5pEgNgnmzGJ5tRpU5krWnV8Bs= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.34.0/go.mod h1:9cKLGBDzI/F3NoHLQGm4ZrYdIHsvGt6ej6hUowxY0J4= -go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.31.0 h1:HZgBIps9wH0RDrwjrmNa3DVbNRW60HEhdzqZFyAp3fI= -go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.31.0/go.mod h1:RDRhvt6TDG0eIXmonAx5bd9IcwpqCkziwkOClzWKwAQ= -go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.31.0 h1:UGZ1QwZWY67Z6BmckTU+9Rxn04m2bD3gD6Mk0OIOCPk= -go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.31.0/go.mod h1:fcwWuDuaObkkChiDlhEpSq9+X1C0omv+s5mBtToAQ64= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.32.0 h1:SZmDnHcgp3zwlPBS2JX2urGYe/jBKEIT6ZedHRUyCz8= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.32.0/go.mod h1:fdWW0HtZJ7+jNpTKUR0GpMEDP69nR8YBJQxNiVCE3jk= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0 h1:jBpDk4HAUsrnVO1FsfCfCOTEc/MkInJmvfCHYLFiT80= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0/go.mod h1:H9LUIM1daaeZaz91vZcfeM0fejXPmgCYE8ZhzqfJuiU= go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A= @@ -88,36 +88,36 @@ go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= -goa.design/clue v1.0.7 h1:Z0qhUTvMMo2C7bxn9X7Wt4DXahGMdYuIg7pr3F+iLOs= -goa.design/clue v1.0.7/go.mod h1:z9vhVyNCV02Aggr20KilzR/QQigD/wuz+0uGvWr4MYk= -goa.design/goa/v3 v3.19.1 h1:jpV3LEy7YANzPMwm++Lu17RoThRJgXrPxdEM0A1nlOE= -goa.design/goa/v3 v3.19.1/go.mod h1:astNE9ube0YCxqq7DQkt1MtLxB/b3kRPEFkEZovcO2I= +goa.design/clue v1.1.0 h1:HAgmiLSDB++SX4Shv8bYLewRmD8MtztSLiUHFSoo0Qg= +goa.design/clue v1.1.0/go.mod h1:wlamhUMR5f2EIxVK2O/Fp0QwBK9VzijEsLi0VPxRPUE= +goa.design/goa/v3 v3.20.0 h1:mYYNqCBg9SSxe2jxvPJFOPmJqqKkSAUSU84jpczky3s= +goa.design/goa/v3 v3.20.0/go.mod h1:g8sT4ioTaRt8BZKwZ1YOQe7UgWqkZMx+q6NWgQfzLUU= goa.design/model v1.9.8 h1:SGf+q+hYO1rh/Jvq7T0ZbfBcANzi3Lc3RHKWBDZCWCE= goa.design/model v1.9.8/go.mod h1:RqPSTbZV49gD3+IBsT9/zf+EPxt4zuDPuT/6r857H3w= -golang.org/x/mod v0.22.0 h1:D4nJWe9zXqHOmWqj4VMOJhvzj7bEZg4wEYa759z1pH4= -golang.org/x/mod v0.22.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= -golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= -golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= -golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= -golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= -golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/term v0.28.0 h1:/Ts8HFuMR2E6IP/jlo7QVLZHggjKQbhu/7H0LJFr3Gg= -golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek= -golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= -golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= -golang.org/x/tools v0.29.0 h1:Xx0h3TtM9rzQpQuR4dKLrdglAmCEN5Oi+P74JdhdzXE= -golang.org/x/tools v0.29.0/go.mod h1:KMQVMRsVxU6nHCFXrBPhDB8XncLNLM0lIy/F14RP588= -google.golang.org/genproto v0.0.0-20250115164207-1a7da9e5054f h1:387Y+JbxF52bmesc8kq1NyYIp33dnxCw6eiA7JMsTmw= -google.golang.org/genproto v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:0joYwWwLQh18AOj8zMYeZLjzuqcYTU3/nC5JdCvC3JI= -google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f h1:gap6+3Gk41EItBuyi4XX/bp4oqJ3UwuIMl25yGinuAA= -google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:Ic02D47M+zbarjYYUlK57y316f2MoN0gjAwI3f2S95o= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f h1:OxYkA3wjPsZyBylwymxSHa7ViiW1Sml4ToBrncvFehI= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:+2Yz8+CLJbIfL9z73EW45avw8Lmge3xVElCP9zEKi50= -google.golang.org/grpc v1.69.4 h1:MF5TftSMkd8GLw/m0KM6V8CMOCY6NZ1NQDPGFgbTt4A= -google.golang.org/grpc v1.69.4/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= -google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU= -google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +golang.org/x/mod v0.23.0 h1:Zb7khfcRGKk+kqfxFaP5tZqCnDZMjC5VtUBs87Hr6QM= +golang.org/x/mod v0.23.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= +golang.org/x/net v0.36.0 h1:vWF2fRbw4qslQsQzgFqZff+BItCvGFQqKzKIzx1rmoA= +golang.org/x/net v0.36.0/go.mod h1:bFmbeoIPfrw4sMHNhb4J9f6+tPziuGjq7Jk/38fxi1I= +golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= +golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= +golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.29.0 h1:L6pJp37ocefwRRtYPKSWOWzOtWSxVajvz2ldH/xi3iU= +golang.org/x/term v0.29.0/go.mod h1:6bl4lRlvVuDgSf3179VpIxBF0o10JUpXWOnI7nErv7s= +golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= +golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= +golang.org/x/tools v0.30.0 h1:BgcpHewrV5AUp2G9MebG4XPFI1E2W41zU1SaqVA9vJY= +golang.org/x/tools v0.30.0/go.mod h1:c347cR/OJfw5TI+GfX7RUPNMdDRRbjvYTS0jPyvsVtY= +google.golang.org/genproto v0.0.0-20250303144028-a0af3efb3deb h1:ITgPrl429bc6+2ZraNSzMDk3I95nmQln2fuPstKwFDE= +google.golang.org/genproto v0.0.0-20250303144028-a0af3efb3deb/go.mod h1:sAo5UzpjUwgFBCzupwhcLcxHVDK7vG5IqI30YnwX2eE= +google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb h1:p31xT4yrYrSM/G4Sn2+TNUkVhFCbG9y8itM2S6Th950= +google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb/go.mod h1:jbe3Bkdp+Dh2IrslsFCklNhweNTBgSYanP1UXhJDhKg= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb h1:TLPQVbx1GJ8VKZxz52VAxl1EBgKXXbTiU9Fc5fZeLn4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb/go.mod h1:LuRYeWDFV6WOn90g357N17oMCaxpgCnbi/44qJvDn2I= +google.golang.org/grpc v1.71.0 h1:kF77BGdPTQ4/JZWMlb9VpJ5pa25aqvVqogsxNHHdeBg= +google.golang.org/grpc v1.71.0/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/pool/node.go b/pool/node.go index 2a6cdda..8b2c447 100644 --- a/pool/node.go +++ b/pool/node.go @@ -91,10 +91,6 @@ const ( evDispatchReturn string = "d" ) -const ( - maxRequeuingRetries = 3 // Maximum number of times to retry requeuing jobs -) - // pendingEventTTL is the TTL for pending events. var pendingEventTTL = 2 * time.Minute @@ -506,7 +502,7 @@ func (node *Node) Shutdown(ctx context.Context) error { } // Signal all nodes to shutdown. - if _, err := node.nodeShutdownMap.SetAndWait(ctx, "shutdown", node.ID); err != nil { + if _, err := node.nodeShutdownMap.Set(ctx, "shutdown", node.ID); err != nil { node.logger.Error(fmt.Errorf("Shutdown: failed to set shutdown status in shutdown map: %w", err)) } <-node.closed // Wait for this node to be closed @@ -603,7 +599,7 @@ func (node *Node) stopAllJobs(ctx context.Context) { pulse.Go(node.logger, func() { defer wg.Done() for _, job := range worker.Jobs() { - if err := worker.stopJob(ctx, job.Key, false); err != nil { + if err := worker.stopJob(ctx, job.Key); err != nil { node.logger.Error(fmt.Errorf("Close: failed to stop job %q for worker %q: %w", job.Key, worker.ID, err)) } total.Add(1) @@ -990,6 +986,7 @@ func (node *Node) cleanupWorker(ctx context.Context, workerID string) { payload, ok := node.JobPayload(key) if !ok { node.logger.Error(fmt.Errorf("requeueWorkerJobs: failed to get job payload"), "job", key, "worker", workerID) + requeued++ // We will never be able to requeue this job continue } job := &Job{Key: key, Payload: []byte(payload), CreatedAt: time.Now(), NodeID: node.ID} @@ -1017,12 +1014,21 @@ func (node *Node) processInactiveJobs(ctx context.Context) { ticker := time.NewTicker(node.ackGracePeriod) // Run at ackGracePeriod frequency since pending jobs expire after 2*ackGracePeriod defer ticker.Stop() + payloadCleanupTicker, err := node.NewTicker(ctx, "jobPayloadCleanup", node.workerTTL) + if err != nil { + node.logger.Error(fmt.Errorf("processInactiveJobs: failed to create payload cleanup ticker: %w", err)) + return + } + defer payloadCleanupTicker.Stop() + for { select { case <-node.stop: return case <-ticker.C: node.cleanupStalePendingJobs(ctx) + case <-payloadCleanupTicker.C: + node.cleanupOrphanedJobPayloads(ctx) } } } @@ -1044,6 +1050,29 @@ func (node *Node) cleanupStalePendingJobs(ctx context.Context) { } } +// cleanupOrphanedJobPayloads checks for and removes entries in the job payload map +// that don't have a corresponding entry in the job map. +func (node *Node) cleanupOrphanedJobPayloads(ctx context.Context) { + // Get all existing job keys from the job map + existingJobs := make(map[string]struct{}) + for _, jobs := range node.jobMap.Map() { + for _, key := range strings.Split(jobs, ",") { + existingJobs[key] = struct{}{} + } + } + + // Check each payload entry + for key := range node.jobPayloadMap.Map() { + if _, exists := existingJobs[key]; !exists { + if _, err := node.jobPayloadMap.Delete(ctx, key); err != nil { + node.logger.Error(fmt.Errorf("cleanupOrphanedJobPayloads: failed to delete orphaned payload for job %q: %w", key, err)) + continue + } + node.logger.Info("cleanupOrphanedJobPayloads: removed orphaned payload", "key", key) + } + } +} + // acquireCleanupLock tries to acquire the cleanup lock for a worker. // It returns true if the lock was acquired, false if another node holds the lock. // It will clear any stale or invalid locks it finds. @@ -1197,6 +1226,12 @@ func (node *Node) removeWorkerFromMaps(ctx context.Context, id string) { if _, err := node.workerCleanupMap.Delete(ctx, id); err != nil { node.logger.Error(fmt.Errorf("removeWorkerFromMaps: failed to remove cleanup timestamp: %w", err), "worker", id) } + jobKeys, _ := node.jobMap.GetValues(id) + for _, key := range jobKeys { + if _, err := node.jobPayloadMap.Delete(ctx, key); err != nil { + node.logger.Error(fmt.Errorf("removeWorkerFromMaps: failed to remove job %s from payload map: %w", key, err)) + } + } if _, err := node.jobMap.Delete(ctx, id); err != nil { node.logger.Error(fmt.Errorf("removeWorkerFromMaps: failed to remove worker %s from jobs map: %w", id, err)) } diff --git a/pool/node_test.go b/pool/node_test.go index 071f10d..159e243 100644 --- a/pool/node_test.go +++ b/pool/node_test.go @@ -944,6 +944,144 @@ func TestStaleWorkerCleanupAfterJobRequeue(t *testing.T) { }, max, delay, "Stale worker was not properly cleaned up") } +func TestRemoveWorkerCleansJobPayloads(t *testing.T) { + testName := strings.Replace(t.Name(), "/", "_", -1) + ctx := ptesting.NewTestContext(t) + rdb := ptesting.NewRedisClient(t) + node := newTestNode(t, ctx, rdb, testName) + defer ptesting.CleanupRedis(t, rdb, true, testName) + + // Create a worker and dispatch jobs to it + worker := newTestWorker(t, ctx, node) + jobs := []struct { + key string + payload []byte + }{ + {key: "job1", payload: []byte("payload1")}, + {key: "job2", payload: []byte("payload2")}, + } + + // Dispatch jobs + for _, job := range jobs { + assert.NoError(t, node.DispatchJob(ctx, job.key, job.payload), "Failed to dispatch job") + } + + // Verify jobs are received and payloads are stored + require.Eventually(t, func() bool { + return len(worker.Jobs()) == len(jobs) + }, max, delay, "Worker did not receive all jobs") + + // Verify payloads are in the map + for _, job := range jobs { + payload, ok := node.JobPayload(job.key) + assert.True(t, ok, "Job payload not found for key %s", job.key) + assert.Equal(t, job.payload, payload, "Incorrect payload for job %s", job.key) + } + + // Remove the worker maps + node.removeWorkerFromMaps(ctx, worker.ID) + + // Verify job payloads are removed + assert.Eventually(t, func() bool { + for _, job := range jobs { + if _, ok := node.JobPayload(job.key); ok { + return false + } + } + return true + }, max, delay, "Job payloads were not cleaned up after worker removal") + + // Shutdown node + assert.NoError(t, node.Shutdown(ctx), "Failed to shutdown node") +} + +func TestCleanupOrphanedJobPayloads(t *testing.T) { + type jobInfo struct { + key string + payload []byte + } + + tests := []struct { + name string + setupJobs []jobInfo + deletedJobs []string + }{ + { + name: "no orphaned payloads", + setupJobs: []jobInfo{ + {key: "job1", payload: []byte("payload1")}, + {key: "job2", payload: []byte("payload2")}, + {key: "job3", payload: []byte("payload3")}, + }, + }, + { + name: "some orphaned payloads", + setupJobs: []jobInfo{ + {key: "job1", payload: []byte("payload1")}, + {key: "job2", payload: []byte("payload2")}, + {key: "job3", payload: []byte("payload3")}, + }, + deletedJobs: []string{"job1", "job3"}, + }, + { + name: "all orphaned payloads", + setupJobs: []jobInfo{ + {key: "job1", payload: []byte("payload1")}, + {key: "job2", payload: []byte("payload2")}, + }, + deletedJobs: []string{"job1", "job2"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testName := strings.Replace(t.Name(), "/", "_", -1) + ctx := ptesting.NewTestContext(t) + rdb := ptesting.NewRedisClient(t) + node := newTestNode(t, ctx, rdb, testName) + worker := newTestWorker(t, ctx, node) + defer ptesting.CleanupRedis(t, rdb, true, testName) + + // Dispatch jobs to the worker + for _, job := range tt.setupJobs { + require.NoError(t, node.DispatchJob(ctx, job.key, job.payload)) + } + + // Wait for jobs to be assigned + require.Eventually(t, func() bool { + jobs, _ := node.jobMap.GetValues(worker.ID) + return len(jobs) == len(tt.setupJobs) + }, max, delay, "Jobs were not assigned, got %d jobs in jobMap, expected %d", node.jobMap.Len(), len(tt.setupJobs)) + + // Delete some job keys + for _, key := range tt.deletedJobs { + _, _, err := node.jobMap.RemoveValues(ctx, worker.ID, key) + assert.NoError(t, err) + } + + assert.Eventually(t, func() bool { + jobs, _ := node.jobMap.GetValues(worker.ID) + return len(jobs) == len(tt.setupJobs)-len(tt.deletedJobs) + }, max, delay, "Job keys were not deleted") + + // Run cleanup + node.cleanupOrphanedJobPayloads(ctx) + + // Verify payloads + assert.Eventually(t, func() bool { + for _, key := range tt.deletedJobs { + if _, ok := node.JobPayload(key); ok { + return false + } + } + return true + }, max, delay, fmt.Sprintf("Payload cleanup did not complete as expected, expected %d payloads, got %d", len(tt.setupJobs)-len(tt.deletedJobs), len(node.jobPayloadMap.Map()))) + + assert.NoError(t, node.Shutdown(ctx)) + }) + } +} + type mockAcker struct { XAckFunc func(ctx context.Context, streamKey, sinkName string, ids ...string) *redis.IntCmd } diff --git a/pool/ticker.go b/pool/ticker.go index 5859c2c..c7f511d 100644 --- a/pool/ticker.go +++ b/pool/ticker.go @@ -80,7 +80,9 @@ func (t *Ticker) Stop() { if _, err := t.tickerMap.Delete(context.Background(), t.name); err != nil { t.logger.Error(err, "msg", "failed to delete ticker") } - t.tickerMap.Unsubscribe(t.mapch) + if t.mapch != nil { + t.tickerMap.Unsubscribe(t.mapch) + } t.mapch = nil t.lock.Unlock() t.wg.Wait() @@ -90,6 +92,10 @@ func (t *Ticker) Stop() { func (t *Ticker) handleEvents() { defer t.wg.Done() t.lock.Lock() + if t.mapch == nil { + t.lock.Unlock() + return + } ch := t.mapch t.lock.Unlock() for { @@ -103,7 +109,9 @@ func (t *Ticker) handleEvents() { if !ok { t.logger.Info("stopped remotely") t.lock.Lock() - t.tickerMap.Unsubscribe(t.mapch) + if t.mapch != nil { + t.tickerMap.Unsubscribe(t.mapch) + } t.mapch = nil t.lock.Unlock() return diff --git a/pool/worker.go b/pool/worker.go index a434a1c..c428c36 100644 --- a/pool/worker.go +++ b/pool/worker.go @@ -196,7 +196,7 @@ func (w *Worker) handleEvents(ctx context.Context, c <-chan *streaming.Event) { err = w.startJob(ctx, unmarshalJob(payload)) case evStopJob: w.logger.Debug("handleEvents: received stop job", "event", ev.EventName, "id", ev.ID) - err = w.stopJob(ctx, unmarshalJobKey(payload), false) + err = w.stopJob(ctx, unmarshalJobKey(payload)) case evNotify: w.logger.Debug("handleEvents: received notify", "event", ev.EventName, "id", ev.ID) key, payload := unmarshalNotification(payload) @@ -266,7 +266,7 @@ func (w *Worker) startJob(ctx context.Context, job *Job) error { } // stopJob stops a job. -func (w *Worker) stopJob(ctx context.Context, key string, forRequeue bool) error { +func (w *Worker) stopJob(ctx context.Context, key string) error { if _, ok := w.jobs.Load(key); !ok { return fmt.Errorf("job %s not found in local worker", key) } @@ -278,12 +278,10 @@ func (w *Worker) stopJob(ctx context.Context, key string, forRequeue bool) error if _, _, err := w.jobsMap.RemoveValues(ctx, w.ID, key); err != nil { w.logger.Error(fmt.Errorf("stop job: failed to remove job %q from jobs map: %w", key, err)) } - if !forRequeue { - if _, err := w.jobPayloadsMap.Delete(ctx, key); err != nil { - w.logger.Error(fmt.Errorf("stop job: failed to remove job payload %q from job payloads map: %w", key, err)) - } + if _, err := w.jobPayloadsMap.Delete(ctx, key); err != nil { + w.logger.Error(fmt.Errorf("stop job: failed to remove job payload %q from job payloads map: %w", key, err)) } - w.logger.Info("stopped job", "job", key, "for_requeue", forRequeue) + w.logger.Info("stopped job", "job", key) return nil } @@ -482,7 +480,7 @@ func (w *Worker) requeueJob(ctx context.Context, job *Job) error { return fmt.Errorf("requeueJob: failed to add job to pool stream: %w", err) } w.node.pendingJobChannels.Store(eventID, nil) - if err := w.stopJob(ctx, job.Key, true); err != nil { + if err := w.stopJob(ctx, job.Key); err != nil { return fmt.Errorf("failed to stop job: %w", err) } return nil