From ca1e63fab7c8163fc012eaa9b2ae3b7e79db5493 Mon Sep 17 00:00:00 2001 From: endigma Date: Thu, 14 Aug 2025 11:30:42 +0100 Subject: [PATCH 01/16] chore: use wip go-tools 0f9f721b --- router-tests/go.mod | 2 +- router-tests/go.sum | 4 ++-- router/go.mod | 2 +- router/go.sum | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/router-tests/go.mod b/router-tests/go.mod index 08ab9fbaae..a3fe95d987 100644 --- a/router-tests/go.mod +++ b/router-tests/go.mod @@ -27,7 +27,7 @@ require ( github.com/wundergraph/cosmo/demo/pkg/subgraphs/projects v0.0.0-20250715110703-10f2e5f9c79e github.com/wundergraph/cosmo/router v0.0.0-20250820135159-bf8852195d3f github.com/wundergraph/cosmo/router-plugin v0.0.0-20250808194725-de123ba1c65e - github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220 + github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250814103012-0f9f721ba378 go.opentelemetry.io/otel v1.36.0 go.opentelemetry.io/otel/sdk v1.36.0 go.opentelemetry.io/otel/sdk/metric v1.36.0 diff --git a/router-tests/go.sum b/router-tests/go.sum index 4a69658e75..1ce235f7fe 100644 --- a/router-tests/go.sum +++ b/router-tests/go.sum @@ -352,8 +352,8 @@ github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083 h1:8/D7f8gKxTB github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083/go.mod h1:eOTL6acwctsN4F3b7YE+eE2t8zcJ/doLm9sZzsxxxrE= github.com/wundergraph/consul/sdk v0.0.0-20250204115147-ed842a8fd301 h1:EzfKHQoTjFDDcgaECCCR2aTePqMu9QBmPbyhqIYOhV0= github.com/wundergraph/consul/sdk v0.0.0-20250204115147-ed842a8fd301/go.mod h1:wxI0Nak5dI5RvJuzGyiEK4nZj0O9X+Aw6U0tC1wPKq0= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220 h1:+imPYcv+XExZ+ofX5jCxtaA7upeys7uWA7RsTZiTTWE= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220/go.mod h1:DnYY1alnsgzkanSwbFiFIdXKOuf8dHQWQ2P4BzTc6aI= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250814103012-0f9f721ba378 h1:RE9BBQl+cBiv387C3LVfFpsuK3U93aLxKGVdsx9A7dE= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250814103012-0f9f721ba378/go.mod h1:DnYY1alnsgzkanSwbFiFIdXKOuf8dHQWQ2P4BzTc6aI= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM= github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4= diff --git a/router/go.mod b/router/go.mod index 402ab2d326..31c38a7c8f 100644 --- a/router/go.mod +++ b/router/go.mod @@ -31,7 +31,7 @@ require ( github.com/tidwall/gjson v1.18.0 github.com/tidwall/sjson v1.2.5 github.com/twmb/franz-go v1.16.1 - github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220 + github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250814103012-0f9f721ba378 // Do not upgrade, it renames attributes we rely on go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 go.opentelemetry.io/contrib/propagators/b3 v1.23.0 diff --git a/router/go.sum b/router/go.sum index 0f23a8fce4..4648000fa8 100644 --- a/router/go.sum +++ b/router/go.sum @@ -317,8 +317,8 @@ github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/ github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083 h1:8/D7f8gKxTBjW+SZK4mhxTTBVpxcqeBgWF1Rfmltbfk= github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083/go.mod h1:eOTL6acwctsN4F3b7YE+eE2t8zcJ/doLm9sZzsxxxrE= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220 h1:+imPYcv+XExZ+ofX5jCxtaA7upeys7uWA7RsTZiTTWE= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220/go.mod h1:DnYY1alnsgzkanSwbFiFIdXKOuf8dHQWQ2P4BzTc6aI= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250814103012-0f9f721ba378 h1:RE9BBQl+cBiv387C3LVfFpsuK3U93aLxKGVdsx9A7dE= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250814103012-0f9f721ba378/go.mod h1:DnYY1alnsgzkanSwbFiFIdXKOuf8dHQWQ2P4BzTc6aI= github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4= github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4= github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= From c53c95c2b5ea15de7dbbd6f1fd66841f5203b023 Mon Sep 17 00:00:00 2001 From: endigma Date: Tue, 12 Aug 2025 15:16:29 +0100 Subject: [PATCH 02/16] feat: heartbeat for SSE, heartbeat per writer --- router/core/flushwriter.go | 23 ++++++++++++++++++++++- router/core/websocket.go | 5 +++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/router/core/flushwriter.go b/router/core/flushwriter.go index 5e507babf9..fd59bc284f 100644 --- a/router/core/flushwriter.go +++ b/router/core/flushwriter.go @@ -77,6 +77,27 @@ func (f *HttpFlushWriter) Write(p []byte) (n int, err error) { return f.buf.Write(p) } +func (f *HttpFlushWriter) Heartbeat() error { + if err := f.ctx.Err(); err != nil { + return err + } + + var heartbeat []byte + if f.sse { + heartbeat = []byte(":\n\n") + } else if f.multipart { + heartbeat = []byte("{}") + } + + if _, err := f.writer.Write(heartbeat); err != nil { + return err + } + + f.flusher.Flush() + + return nil +} + func (f *HttpFlushWriter) Close(_ resolve.SubscriptionCloseKind) { if f.ctx.Err() != nil { return @@ -159,7 +180,7 @@ func GetSubscriptionResponseWriter(ctx *resolve.Context, r *http.Request, w http flushWriter.ctx, flushWriter.cancel = context.WithCancel(ctx.Context()) ctx = ctx.WithContext(flushWriter.ctx) - if wgParams.UseMultipart { + if wgParams.UseMultipart || wgParams.UseSse { ctx.ExecutionOptions.SendHeartbeat = true } diff --git a/router/core/websocket.go b/router/core/websocket.go index d1942e97c9..c0dcc78518 100644 --- a/router/core/websocket.go +++ b/router/core/websocket.go @@ -627,6 +627,11 @@ func (rw *websocketResponseWriter) Complete() { } } +// Heartbeat is a no-op function for WebSocket subscriptions. +func (rw *websocketResponseWriter) Heartbeat() error { + return nil +} + func (rw *websocketResponseWriter) Close(kind resolve.SubscriptionCloseKind) { err := rw.protocol.Close(kind.WSCode, kind.Reason) if err != nil { From 77c72fbcc5b8279ebc60f0b2f35d73baf2c56c9b Mon Sep 17 00:00:00 2001 From: endigma Date: Tue, 12 Aug 2025 15:18:51 +0100 Subject: [PATCH 03/16] Use full write/flush methods for multipart --- router/core/flushwriter.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/router/core/flushwriter.go b/router/core/flushwriter.go index fd59bc284f..ee8594d374 100644 --- a/router/core/flushwriter.go +++ b/router/core/flushwriter.go @@ -85,16 +85,22 @@ func (f *HttpFlushWriter) Heartbeat() error { var heartbeat []byte if f.sse { heartbeat = []byte(":\n\n") + + if _, err := f.writer.Write(heartbeat); err != nil { + return err + } + + f.flusher.Flush() } else if f.multipart { - heartbeat = []byte("{}") - } + if _, err := f.Write([]byte("{}")); err != nil { + return err + } - if _, err := f.writer.Write(heartbeat); err != nil { - return err + if err := f.Flush(); err != nil { + return err + } } - f.flusher.Flush() - return nil } From 27b078d698defcc6074d1f270734c068d2e0772a Mon Sep 17 00:00:00 2001 From: endigma Date: Mon, 18 Aug 2025 09:59:18 +0100 Subject: [PATCH 04/16] chore: use wip go-tools 097167a6 --- router-tests/go.mod | 2 +- router-tests/go.sum | 4 ++-- router/go.mod | 2 +- router/go.sum | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/router-tests/go.mod b/router-tests/go.mod index a3fe95d987..320f0b17e0 100644 --- a/router-tests/go.mod +++ b/router-tests/go.mod @@ -27,7 +27,7 @@ require ( github.com/wundergraph/cosmo/demo/pkg/subgraphs/projects v0.0.0-20250715110703-10f2e5f9c79e github.com/wundergraph/cosmo/router v0.0.0-20250820135159-bf8852195d3f github.com/wundergraph/cosmo/router-plugin v0.0.0-20250808194725-de123ba1c65e - github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250814103012-0f9f721ba378 + github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250814164921-097167a60d91 go.opentelemetry.io/otel v1.36.0 go.opentelemetry.io/otel/sdk v1.36.0 go.opentelemetry.io/otel/sdk/metric v1.36.0 diff --git a/router-tests/go.sum b/router-tests/go.sum index 1ce235f7fe..4e89bf35fe 100644 --- a/router-tests/go.sum +++ b/router-tests/go.sum @@ -352,8 +352,8 @@ github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083 h1:8/D7f8gKxTB github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083/go.mod h1:eOTL6acwctsN4F3b7YE+eE2t8zcJ/doLm9sZzsxxxrE= github.com/wundergraph/consul/sdk v0.0.0-20250204115147-ed842a8fd301 h1:EzfKHQoTjFDDcgaECCCR2aTePqMu9QBmPbyhqIYOhV0= github.com/wundergraph/consul/sdk v0.0.0-20250204115147-ed842a8fd301/go.mod h1:wxI0Nak5dI5RvJuzGyiEK4nZj0O9X+Aw6U0tC1wPKq0= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250814103012-0f9f721ba378 h1:RE9BBQl+cBiv387C3LVfFpsuK3U93aLxKGVdsx9A7dE= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250814103012-0f9f721ba378/go.mod h1:DnYY1alnsgzkanSwbFiFIdXKOuf8dHQWQ2P4BzTc6aI= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250814164921-097167a60d91 h1:rQGEbuuUJclGoWpH0Duo6Bg4sRCRZQuo9rVWg6Kglck= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250814164921-097167a60d91/go.mod h1:DnYY1alnsgzkanSwbFiFIdXKOuf8dHQWQ2P4BzTc6aI= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM= github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4= diff --git a/router/go.mod b/router/go.mod index 31c38a7c8f..70859446fc 100644 --- a/router/go.mod +++ b/router/go.mod @@ -31,7 +31,7 @@ require ( github.com/tidwall/gjson v1.18.0 github.com/tidwall/sjson v1.2.5 github.com/twmb/franz-go v1.16.1 - github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250814103012-0f9f721ba378 + github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250814164921-097167a60d91 // Do not upgrade, it renames attributes we rely on go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 go.opentelemetry.io/contrib/propagators/b3 v1.23.0 diff --git a/router/go.sum b/router/go.sum index 4648000fa8..6bd09c4463 100644 --- a/router/go.sum +++ b/router/go.sum @@ -317,8 +317,8 @@ github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/ github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083 h1:8/D7f8gKxTBjW+SZK4mhxTTBVpxcqeBgWF1Rfmltbfk= github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083/go.mod h1:eOTL6acwctsN4F3b7YE+eE2t8zcJ/doLm9sZzsxxxrE= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250814103012-0f9f721ba378 h1:RE9BBQl+cBiv387C3LVfFpsuK3U93aLxKGVdsx9A7dE= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250814103012-0f9f721ba378/go.mod h1:DnYY1alnsgzkanSwbFiFIdXKOuf8dHQWQ2P4BzTc6aI= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250814164921-097167a60d91 h1:rQGEbuuUJclGoWpH0Duo6Bg4sRCRZQuo9rVWg6Kglck= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250814164921-097167a60d91/go.mod h1:DnYY1alnsgzkanSwbFiFIdXKOuf8dHQWQ2P4BzTc6aI= github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4= github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4= github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= From 1dce0b6980fcb6ec600252d028de5c48a8f033aa Mon Sep 17 00:00:00 2001 From: endigma Date: Mon, 18 Aug 2025 09:59:46 +0100 Subject: [PATCH 05/16] Rename MultipartSubHeartbeatInterval to SubHeartbeatInterval --- router/core/executor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/router/core/executor.go b/router/core/executor.go index 560f40840e..2de976a667 100644 --- a/router/core/executor.go +++ b/router/core/executor.go @@ -83,7 +83,7 @@ func (b *ExecutorConfigurationBuilder) Build(ctx context.Context, opts *Executor AllowedSubgraphErrorFields: opts.RouterEngineConfig.SubgraphErrorPropagation.AllowedFields, AllowAllErrorExtensionFields: opts.RouterEngineConfig.SubgraphErrorPropagation.AllowAllExtensionFields, MaxRecyclableParserSize: opts.RouterEngineConfig.Execution.ResolverMaxRecyclableParserSize, - MultipartSubHeartbeatInterval: opts.HeartbeatInterval, + SubHeartbeatInterval: opts.HeartbeatInterval, MaxSubscriptionFetchTimeout: opts.RouterEngineConfig.Execution.SubscriptionFetchTimeout, } From c812aaddd792d18828e5abe65c404bf425b532c7 Mon Sep 17 00:00:00 2001 From: endigma Date: Mon, 18 Aug 2025 10:44:30 +0100 Subject: [PATCH 06/16] chore: use wip go-tools 5a1ebf7a --- router-tests/go.mod | 2 +- router-tests/go.sum | 4 ++-- router/go.mod | 2 +- router/go.sum | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/router-tests/go.mod b/router-tests/go.mod index 320f0b17e0..a5bf0b8927 100644 --- a/router-tests/go.mod +++ b/router-tests/go.mod @@ -27,7 +27,7 @@ require ( github.com/wundergraph/cosmo/demo/pkg/subgraphs/projects v0.0.0-20250715110703-10f2e5f9c79e github.com/wundergraph/cosmo/router v0.0.0-20250820135159-bf8852195d3f github.com/wundergraph/cosmo/router-plugin v0.0.0-20250808194725-de123ba1c65e - github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250814164921-097167a60d91 + github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250818093450-5a1ebf7aa4cc go.opentelemetry.io/otel v1.36.0 go.opentelemetry.io/otel/sdk v1.36.0 go.opentelemetry.io/otel/sdk/metric v1.36.0 diff --git a/router-tests/go.sum b/router-tests/go.sum index 4e89bf35fe..59917f3bb8 100644 --- a/router-tests/go.sum +++ b/router-tests/go.sum @@ -352,8 +352,8 @@ github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083 h1:8/D7f8gKxTB github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083/go.mod h1:eOTL6acwctsN4F3b7YE+eE2t8zcJ/doLm9sZzsxxxrE= github.com/wundergraph/consul/sdk v0.0.0-20250204115147-ed842a8fd301 h1:EzfKHQoTjFDDcgaECCCR2aTePqMu9QBmPbyhqIYOhV0= github.com/wundergraph/consul/sdk v0.0.0-20250204115147-ed842a8fd301/go.mod h1:wxI0Nak5dI5RvJuzGyiEK4nZj0O9X+Aw6U0tC1wPKq0= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250814164921-097167a60d91 h1:rQGEbuuUJclGoWpH0Duo6Bg4sRCRZQuo9rVWg6Kglck= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250814164921-097167a60d91/go.mod h1:DnYY1alnsgzkanSwbFiFIdXKOuf8dHQWQ2P4BzTc6aI= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250818093450-5a1ebf7aa4cc h1:fEnGbif0OXwaudsfCT4jfdpc/z2yzS08xcJhXd6BYTQ= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250818093450-5a1ebf7aa4cc/go.mod h1:DnYY1alnsgzkanSwbFiFIdXKOuf8dHQWQ2P4BzTc6aI= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM= github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4= diff --git a/router/go.mod b/router/go.mod index 70859446fc..9526675817 100644 --- a/router/go.mod +++ b/router/go.mod @@ -31,7 +31,7 @@ require ( github.com/tidwall/gjson v1.18.0 github.com/tidwall/sjson v1.2.5 github.com/twmb/franz-go v1.16.1 - github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250814164921-097167a60d91 + github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250818093450-5a1ebf7aa4cc // Do not upgrade, it renames attributes we rely on go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 go.opentelemetry.io/contrib/propagators/b3 v1.23.0 diff --git a/router/go.sum b/router/go.sum index 6bd09c4463..cefd374ca0 100644 --- a/router/go.sum +++ b/router/go.sum @@ -317,8 +317,8 @@ github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/ github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083 h1:8/D7f8gKxTBjW+SZK4mhxTTBVpxcqeBgWF1Rfmltbfk= github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083/go.mod h1:eOTL6acwctsN4F3b7YE+eE2t8zcJ/doLm9sZzsxxxrE= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250814164921-097167a60d91 h1:rQGEbuuUJclGoWpH0Duo6Bg4sRCRZQuo9rVWg6Kglck= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250814164921-097167a60d91/go.mod h1:DnYY1alnsgzkanSwbFiFIdXKOuf8dHQWQ2P4BzTc6aI= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250818093450-5a1ebf7aa4cc h1:fEnGbif0OXwaudsfCT4jfdpc/z2yzS08xcJhXd6BYTQ= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250818093450-5a1ebf7aa4cc/go.mod h1:DnYY1alnsgzkanSwbFiFIdXKOuf8dHQWQ2P4BzTc6aI= github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4= github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4= github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= From 3420a7c816e5c6266a1d098919bceee16ca1310d Mon Sep 17 00:00:00 2001 From: endigma Date: Fri, 22 Aug 2025 10:10:51 +0100 Subject: [PATCH 07/16] chore: use wip go-tools 8c167f36 --- router-tests/go.mod | 2 +- router-tests/go.sum | 4 ++-- router/core/executor.go | 2 +- router/go.mod | 2 +- router/go.sum | 4 ++-- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/router-tests/go.mod b/router-tests/go.mod index a5bf0b8927..a6ca8f4afe 100644 --- a/router-tests/go.mod +++ b/router-tests/go.mod @@ -27,7 +27,7 @@ require ( github.com/wundergraph/cosmo/demo/pkg/subgraphs/projects v0.0.0-20250715110703-10f2e5f9c79e github.com/wundergraph/cosmo/router v0.0.0-20250820135159-bf8852195d3f github.com/wundergraph/cosmo/router-plugin v0.0.0-20250808194725-de123ba1c65e - github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250818093450-5a1ebf7aa4cc + github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250822090944-8c167f368118 go.opentelemetry.io/otel v1.36.0 go.opentelemetry.io/otel/sdk v1.36.0 go.opentelemetry.io/otel/sdk/metric v1.36.0 diff --git a/router-tests/go.sum b/router-tests/go.sum index 59917f3bb8..ecf032bcd5 100644 --- a/router-tests/go.sum +++ b/router-tests/go.sum @@ -352,8 +352,8 @@ github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083 h1:8/D7f8gKxTB github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083/go.mod h1:eOTL6acwctsN4F3b7YE+eE2t8zcJ/doLm9sZzsxxxrE= github.com/wundergraph/consul/sdk v0.0.0-20250204115147-ed842a8fd301 h1:EzfKHQoTjFDDcgaECCCR2aTePqMu9QBmPbyhqIYOhV0= github.com/wundergraph/consul/sdk v0.0.0-20250204115147-ed842a8fd301/go.mod h1:wxI0Nak5dI5RvJuzGyiEK4nZj0O9X+Aw6U0tC1wPKq0= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250818093450-5a1ebf7aa4cc h1:fEnGbif0OXwaudsfCT4jfdpc/z2yzS08xcJhXd6BYTQ= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250818093450-5a1ebf7aa4cc/go.mod h1:DnYY1alnsgzkanSwbFiFIdXKOuf8dHQWQ2P4BzTc6aI= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250822090944-8c167f368118 h1:m/IJgwL2oqDg2COD74BrJt9mVrko8DE1j0U1KYcgwqs= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250822090944-8c167f368118/go.mod h1:DnYY1alnsgzkanSwbFiFIdXKOuf8dHQWQ2P4BzTc6aI= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM= github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4= diff --git a/router/core/executor.go b/router/core/executor.go index 2de976a667..2a8384bf99 100644 --- a/router/core/executor.go +++ b/router/core/executor.go @@ -83,7 +83,7 @@ func (b *ExecutorConfigurationBuilder) Build(ctx context.Context, opts *Executor AllowedSubgraphErrorFields: opts.RouterEngineConfig.SubgraphErrorPropagation.AllowedFields, AllowAllErrorExtensionFields: opts.RouterEngineConfig.SubgraphErrorPropagation.AllowAllExtensionFields, MaxRecyclableParserSize: opts.RouterEngineConfig.Execution.ResolverMaxRecyclableParserSize, - SubHeartbeatInterval: opts.HeartbeatInterval, + SubscriptionHeartbeatInterval: opts.HeartbeatInterval, MaxSubscriptionFetchTimeout: opts.RouterEngineConfig.Execution.SubscriptionFetchTimeout, } diff --git a/router/go.mod b/router/go.mod index 9526675817..f692e3a8c6 100644 --- a/router/go.mod +++ b/router/go.mod @@ -31,7 +31,7 @@ require ( github.com/tidwall/gjson v1.18.0 github.com/tidwall/sjson v1.2.5 github.com/twmb/franz-go v1.16.1 - github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250818093450-5a1ebf7aa4cc + github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250822090944-8c167f368118 // Do not upgrade, it renames attributes we rely on go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 go.opentelemetry.io/contrib/propagators/b3 v1.23.0 diff --git a/router/go.sum b/router/go.sum index cefd374ca0..99d9f36ba2 100644 --- a/router/go.sum +++ b/router/go.sum @@ -317,8 +317,8 @@ github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/ github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083 h1:8/D7f8gKxTBjW+SZK4mhxTTBVpxcqeBgWF1Rfmltbfk= github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083/go.mod h1:eOTL6acwctsN4F3b7YE+eE2t8zcJ/doLm9sZzsxxxrE= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250818093450-5a1ebf7aa4cc h1:fEnGbif0OXwaudsfCT4jfdpc/z2yzS08xcJhXd6BYTQ= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250818093450-5a1ebf7aa4cc/go.mod h1:DnYY1alnsgzkanSwbFiFIdXKOuf8dHQWQ2P4BzTc6aI= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250822090944-8c167f368118 h1:m/IJgwL2oqDg2COD74BrJt9mVrko8DE1j0U1KYcgwqs= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250822090944-8c167f368118/go.mod h1:DnYY1alnsgzkanSwbFiFIdXKOuf8dHQWQ2P4BzTc6aI= github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4= github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4= github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= From a781bbe4e356954f0c05e7f777867b147be64b90 Mon Sep 17 00:00:00 2001 From: endigma Date: Fri, 22 Aug 2025 10:18:31 +0100 Subject: [PATCH 08/16] chore: use wip go-tools fadc6bc8 --- router-tests/go.mod | 2 +- router-tests/go.sum | 4 ++-- router/go.mod | 2 +- router/go.sum | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/router-tests/go.mod b/router-tests/go.mod index a6ca8f4afe..dba9735ccb 100644 --- a/router-tests/go.mod +++ b/router-tests/go.mod @@ -27,7 +27,7 @@ require ( github.com/wundergraph/cosmo/demo/pkg/subgraphs/projects v0.0.0-20250715110703-10f2e5f9c79e github.com/wundergraph/cosmo/router v0.0.0-20250820135159-bf8852195d3f github.com/wundergraph/cosmo/router-plugin v0.0.0-20250808194725-de123ba1c65e - github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250822090944-8c167f368118 + github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250822091746-fadc6bc858a1 go.opentelemetry.io/otel v1.36.0 go.opentelemetry.io/otel/sdk v1.36.0 go.opentelemetry.io/otel/sdk/metric v1.36.0 diff --git a/router-tests/go.sum b/router-tests/go.sum index ecf032bcd5..92d7df2663 100644 --- a/router-tests/go.sum +++ b/router-tests/go.sum @@ -352,8 +352,8 @@ github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083 h1:8/D7f8gKxTB github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083/go.mod h1:eOTL6acwctsN4F3b7YE+eE2t8zcJ/doLm9sZzsxxxrE= github.com/wundergraph/consul/sdk v0.0.0-20250204115147-ed842a8fd301 h1:EzfKHQoTjFDDcgaECCCR2aTePqMu9QBmPbyhqIYOhV0= github.com/wundergraph/consul/sdk v0.0.0-20250204115147-ed842a8fd301/go.mod h1:wxI0Nak5dI5RvJuzGyiEK4nZj0O9X+Aw6U0tC1wPKq0= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250822090944-8c167f368118 h1:m/IJgwL2oqDg2COD74BrJt9mVrko8DE1j0U1KYcgwqs= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250822090944-8c167f368118/go.mod h1:DnYY1alnsgzkanSwbFiFIdXKOuf8dHQWQ2P4BzTc6aI= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250822091746-fadc6bc858a1 h1:n2kO8PPlnGBKujGdFN4eN5OdpXJ5EfFeh0NlpVZaAos= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250822091746-fadc6bc858a1/go.mod h1:DnYY1alnsgzkanSwbFiFIdXKOuf8dHQWQ2P4BzTc6aI= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM= github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4= diff --git a/router/go.mod b/router/go.mod index f692e3a8c6..d2a2fe4c4d 100644 --- a/router/go.mod +++ b/router/go.mod @@ -31,7 +31,7 @@ require ( github.com/tidwall/gjson v1.18.0 github.com/tidwall/sjson v1.2.5 github.com/twmb/franz-go v1.16.1 - github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250822090944-8c167f368118 + github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250822091746-fadc6bc858a1 // Do not upgrade, it renames attributes we rely on go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 go.opentelemetry.io/contrib/propagators/b3 v1.23.0 diff --git a/router/go.sum b/router/go.sum index 99d9f36ba2..254a54a927 100644 --- a/router/go.sum +++ b/router/go.sum @@ -317,8 +317,8 @@ github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/ github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083 h1:8/D7f8gKxTBjW+SZK4mhxTTBVpxcqeBgWF1Rfmltbfk= github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083/go.mod h1:eOTL6acwctsN4F3b7YE+eE2t8zcJ/doLm9sZzsxxxrE= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250822090944-8c167f368118 h1:m/IJgwL2oqDg2COD74BrJt9mVrko8DE1j0U1KYcgwqs= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250822090944-8c167f368118/go.mod h1:DnYY1alnsgzkanSwbFiFIdXKOuf8dHQWQ2P4BzTc6aI= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250822091746-fadc6bc858a1 h1:n2kO8PPlnGBKujGdFN4eN5OdpXJ5EfFeh0NlpVZaAos= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250822091746-fadc6bc858a1/go.mod h1:DnYY1alnsgzkanSwbFiFIdXKOuf8dHQWQ2P4BzTc6aI= github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4= github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4= github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= From da99da47f35efc83a2062f890a5f4d20a471d514 Mon Sep 17 00:00:00 2001 From: endigma Date: Wed, 27 Aug 2025 14:51:23 +0100 Subject: [PATCH 09/16] Rename all mentions of multipart in heartbeat interval settings --- router-tests/events/kafka_events_test.go | 4 ++-- router-tests/events/nats_events_test.go | 4 ++-- router-tests/events/redis_events_test.go | 4 ++-- router/core/flushwriter.go | 2 +- router/core/graph_server.go | 2 +- router/core/router.go | 6 +++--- router/core/router_config.go | 24 ++++++++++++------------ 7 files changed, 23 insertions(+), 23 deletions(-) diff --git a/router-tests/events/kafka_events_test.go b/router-tests/events/kafka_events_test.go index dbc17f870a..40e83d202f 100644 --- a/router-tests/events/kafka_events_test.go +++ b/router-tests/events/kafka_events_test.go @@ -417,7 +417,7 @@ func TestKafkaEvents(t *testing.T) { t.Run("multipart", func(t *testing.T) { t.Parallel() - multipartHeartbeatInterval := time.Second * 5 + subscriptionHeartbeatInterval := time.Second * 5 t.Run("subscribe sync", func(t *testing.T) { t.Parallel() @@ -428,7 +428,7 @@ func TestKafkaEvents(t *testing.T) { RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate, EnableKafka: true, RouterOptions: []core.Option{ - core.WithMultipartHeartbeatInterval(multipartHeartbeatInterval), + core.WithSubscriptionHeartbeatInterval(interval time.Duration)tHeartbeatInterval(multipartHeartbeatInterval), }, }, func(t *testing.T, xEnv *testenv.Environment) { EnsureTopicExists(t, xEnv, topics...) diff --git a/router-tests/events/nats_events_test.go b/router-tests/events/nats_events_test.go index d3235643b4..9e1558db24 100644 --- a/router-tests/events/nats_events_test.go +++ b/router-tests/events/nats_events_test.go @@ -324,7 +324,7 @@ func TestNatsEvents(t *testing.T) { testenv.Run(t, &testenv.Config{ RouterConfigJSONTemplate: testenv.ConfigWithEdfsNatsJSONTemplate, RouterOptions: []core.Option{ - core.WithMultipartHeartbeatInterval(heartbeatInterval), + core.WithSubscriptionHeartbeatInterval(heartbeatInterval), }, EnableNats: true, TLSConfig: &core.TlsConfig{ @@ -378,7 +378,7 @@ func TestNatsEvents(t *testing.T) { EnableNats: true, TLSConfig: nil, // Force Http/1 RouterOptions: []core.Option{ - core.WithMultipartHeartbeatInterval(heartbeatInterval), + core.WithSubscriptionHeartbeatInterval(heartbeatInterval), }, }, func(t *testing.T, xEnv *testenv.Environment) { diff --git a/router-tests/events/redis_events_test.go b/router-tests/events/redis_events_test.go index 1c287f7d6f..f6c9e54d13 100644 --- a/router-tests/events/redis_events_test.go +++ b/router-tests/events/redis_events_test.go @@ -478,7 +478,7 @@ func TestRedisEvents(t *testing.T) { t.Run("multipart", func(t *testing.T) { t.Parallel() - multipartHeartbeatInterval := time.Second * 5 + subscriptionHeartbeatInterval := time.Second * 5 t.Run("subscribe sync", func(t *testing.T) { t.Parallel() @@ -489,7 +489,7 @@ func TestRedisEvents(t *testing.T) { RouterConfigJSONTemplate: testenv.ConfigWithEdfsRedisJSONTemplate, EnableRedis: true, RouterOptions: []core.Option{ - core.WithMultipartHeartbeatInterval(multipartHeartbeatInterval), + core.WithSubscriptionHeartbeatInterval(subscriptionHeartbeatInterval), }, }, func(t *testing.T, xEnv *testenv.Environment) { subscribePayload := []byte(`{"query":"subscription { employeeUpdates { id details { forename surname } }}"}`) diff --git a/router/core/flushwriter.go b/router/core/flushwriter.go index ee8594d374..2b7132ce9d 100644 --- a/router/core/flushwriter.go +++ b/router/core/flushwriter.go @@ -84,7 +84,7 @@ func (f *HttpFlushWriter) Heartbeat() error { var heartbeat []byte if f.sse { - heartbeat = []byte(":\n\n") + heartbeat = []byte(":heartbeat\n\n") if _, err := f.writer.Write(heartbeat); err != nil { return err diff --git a/router/core/graph_server.go b/router/core/graph_server.go index da70696ca8..380b8f0573 100644 --- a/router/core/graph_server.go +++ b/router/core/graph_server.go @@ -1203,7 +1203,7 @@ func (s *graphServer) buildGraphMux( Reporter: s.engineStats, ApolloCompatibilityFlags: s.apolloCompatibilityFlags, ApolloRouterCompatibilityFlags: s.apolloRouterCompatibilityFlags, - HeartbeatInterval: s.multipartHeartbeatInterval, + HeartbeatInterval: s.subscriptionHeartbeatInterval, PluginsEnabled: s.plugins.Enabled, InstanceData: s.instanceData, }, diff --git a/router/core/router.go b/router/core/router.go index 7a5c3f6102..e5388aeeb7 100644 --- a/router/core/router.go +++ b/router/core/router.go @@ -1541,10 +1541,10 @@ func WithCors(corsOpts *cors.Config) Option { } } -// WithMultipartHeartbeatInterval sets the interval for the engine to send heartbeats for multipart subscriptions. -func WithMultipartHeartbeatInterval(interval time.Duration) Option { +// WithSubscriptionHeartbeatInterval sets the interval for the engine to send heartbeats for multipart subscriptions. +func WithSubscriptionHeartbeatInterval(interval time.Duration) Option { return func(r *Router) { - r.multipartHeartbeatInterval = interval + r.subscriptionHeartbeatInterval = interval } } diff --git a/router/core/router_config.go b/router/core/router_config.go index 0b70b1a72c..2a2a78924d 100644 --- a/router/core/router_config.go +++ b/router/core/router_config.go @@ -106,18 +106,18 @@ type Config struct { // should be removed once the users have migrated to the new overrides config overrideRoutingURLConfiguration config.OverrideRoutingURLConfiguration // the new overrides config - overrides config.OverridesConfiguration - authorization *config.AuthorizationConfiguration - rateLimit *config.RateLimitConfiguration - webSocketConfiguration *config.WebSocketConfiguration - subgraphErrorPropagation config.SubgraphErrorPropagationConfiguration - clientHeader config.ClientHeader - cacheWarmup *config.CacheWarmupConfiguration - multipartHeartbeatInterval time.Duration - hostName string - mcp config.MCPConfiguration - plugins config.PluginsConfiguration - tracingAttributes []config.CustomAttribute + overrides config.OverridesConfiguration + authorization *config.AuthorizationConfiguration + rateLimit *config.RateLimitConfiguration + webSocketConfiguration *config.WebSocketConfiguration + subgraphErrorPropagation config.SubgraphErrorPropagationConfiguration + clientHeader config.ClientHeader + cacheWarmup *config.CacheWarmupConfiguration + subscriptionHeartbeatInterval time.Duration + hostName string + mcp config.MCPConfiguration + plugins config.PluginsConfiguration + tracingAttributes []config.CustomAttribute } // Usage returns an anonymized version of the config for usage tracking From 5345b8ba80e89fefb1126b9a9105db0b871801ba Mon Sep 17 00:00:00 2001 From: endigma Date: Wed, 27 Aug 2025 14:52:51 +0100 Subject: [PATCH 10/16] test: new e2e for heartbeats --- router-tests/http_subscriptions_test.go | 219 ++++++++++++++++++++++++ 1 file changed, 219 insertions(+) create mode 100644 router-tests/http_subscriptions_test.go diff --git a/router-tests/http_subscriptions_test.go b/router-tests/http_subscriptions_test.go new file mode 100644 index 0000000000..293980e9a8 --- /dev/null +++ b/router-tests/http_subscriptions_test.go @@ -0,0 +1,219 @@ +package integration + +import ( + "bufio" + "bytes" + "fmt" + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/wundergraph/cosmo/router-tests/testenv" + "github.com/wundergraph/cosmo/router/core" +) + +func readMultipartPrefix(reader *bufio.Reader) error { + blankHeader, _, err := reader.ReadLine() + if err != nil { + return err + } + + if len(blankHeader) != 0 { + return fmt.Errorf("expected blank header, got %q", blankHeader) + } + + graphQLHeader, _, err := reader.ReadLine() + if err != nil { + return err + } + + if string(graphQLHeader) != "--graphql" { + return fmt.Errorf("expected graphql header, got %q", graphQLHeader) + } + + contentTypeHeader, _, err := reader.ReadLine() + if err != nil { + return err + } + + if string(contentTypeHeader) != "Content-Type: application/json" { + return fmt.Errorf("expected content type header, got %q", contentTypeHeader) + } + + blankFooter, _, err := reader.ReadLine() + if err != nil { + return err + } + + if len(blankFooter) != 0 { + return fmt.Errorf("expected blank footer, got %q", blankFooter) + } + + return nil +} + +func readSSEPrefix(reader *bufio.Reader) error { + eventNext, _, err := reader.ReadLine() + if err != nil { + return err + } + + switch string(eventNext) { + case "event: next": + case ":heartbeat": + blankFollower, _, err := reader.ReadLine() + if err != nil { + return err + } + + if len(blankFollower) != 0 { + return fmt.Errorf("expected blank follower, got %q", blankFollower) + } + default: + return fmt.Errorf("expected event: next or comment, got %q", eventNext) + } + + return nil +} + +func TestHeartbeats(t *testing.T) { + subscriptionHeartbeatInterval := time.Millisecond * 300 + + t.Run("should work correctly for multipart", func(t *testing.T) { + testenv.Run(t, &testenv.Config{ + RouterOptions: []core.Option{ + core.WithSubscriptionHeartbeatInterval(subscriptionHeartbeatInterval), + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + client := http.Client{ + Timeout: time.Second * 100, + } + + subscribePayload := []byte(`{"query":"subscription { countEmp(max: 5, intervalMilliseconds: 550) }"}`) + + req := xEnv.MakeGraphQLMultipartRequest(http.MethodPost, bytes.NewReader(subscribePayload)) + resp, gErr := client.Do(req) + require.NoError(t, gErr) + require.Equal(t, http.StatusOK, resp.StatusCode) + + defer resp.Body.Close() + reader := bufio.NewReader(resp.Body) + + messages := make(chan string, 1) + + go func() { + defer close(messages) + for { + err := readMultipartPrefix(reader) + if err != nil { + return + } + + line, _, err := reader.ReadLine() + if err != nil { + return + } + + fmt.Println(string(line)) + messages <- string(line) + } + }() + + for i := 0; i <= 5; i++ { + testenv.AwaitChannelWithT(t, 5*time.Second, messages, func(t *testing.T, msg string) { + assert.Equal(t, fmt.Sprintf(`{"payload":{"data":{"countEmp":%d}}}`, i), msg) + }) + + testenv.AwaitChannelWithT(t, 5*time.Second, messages, func(t *testing.T, msg string) { + assert.Equal(t, `{}`, msg) + }) + } + + // Channel should be closed after all heartbeats are received + _, ok := <-messages + require.False(t, ok, "channel should be closed") + }) + }) + + t.Run("should work correctly for sse", func(t *testing.T) { + testenv.Run(t, &testenv.Config{ + RouterOptions: []core.Option{ + core.WithSubscriptionHeartbeatInterval(subscriptionHeartbeatInterval), + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + client := http.Client{ + Timeout: time.Second * 100, + } + + subscribePayload := []byte(`{"query":"subscription { countEmp(max: 5, intervalMilliseconds: 550) }"}`) + + req, err := http.NewRequest(http.MethodPost, xEnv.GraphQLRequestURL(), bytes.NewReader(subscribePayload)) + require.NoError(t, err) + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "text/event-stream") + req.Header.Set("Connection", "keep-alive") + req.Header.Set("Cache-Control", "no-cache") + + resp, err := client.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + defer resp.Body.Close() + reader := bufio.NewReader(resp.Body) + + lines := make(chan string, 50) + + go func() { + defer close(lines) + for { + line, _, err := reader.ReadLine() + if err != nil { + return + } + lines <- string(line) + } + }() + + // Assert the expected SSE sequence + for i := 0; i <= 5; i++ { + // Expect "event: next" + testenv.AwaitChannelWithT(t, 5*time.Second, lines, func(t *testing.T, line string) { + assert.Equal(t, "event: next", line) + }) + + // Expect data line with count + testenv.AwaitChannelWithT(t, 5*time.Second, lines, func(t *testing.T, line string) { + assert.Equal(t, fmt.Sprintf(`data: {"data":{"countEmp":%d}}`, i), line) + }) + + // Expect blank line + testenv.AwaitChannelWithT(t, 5*time.Second, lines, func(t *testing.T, line string) { + assert.Equal(t, "", line) + }) + + // Expect heartbeat + testenv.AwaitChannelWithT(t, 5*time.Second, lines, func(t *testing.T, line string) { + assert.Equal(t, ":heartbeat", line) + }) + + // Expect blank line after heartbeat + testenv.AwaitChannelWithT(t, 5*time.Second, lines, func(t *testing.T, line string) { + assert.Equal(t, "", line) + }) + } + + // Expect completion event + testenv.AwaitChannelWithT(t, 5*time.Second, lines, func(t *testing.T, line string) { + assert.Equal(t, "event: complete", line) + }) + + // Expect blank line after complete + testenv.AwaitChannelWithT(t, 5*time.Second, lines, func(t *testing.T, line string) { + assert.Equal(t, "", line) + }) + }) + }) +} From 749a75d77b5e0b078d9d73602b30c4497b46e306 Mon Sep 17 00:00:00 2001 From: endigma Date: Wed, 27 Aug 2025 15:08:45 +0100 Subject: [PATCH 11/16] fix typo --- router-tests/events/kafka_events_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/router-tests/events/kafka_events_test.go b/router-tests/events/kafka_events_test.go index 40e83d202f..3ad51a592c 100644 --- a/router-tests/events/kafka_events_test.go +++ b/router-tests/events/kafka_events_test.go @@ -428,7 +428,7 @@ func TestKafkaEvents(t *testing.T) { RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate, EnableKafka: true, RouterOptions: []core.Option{ - core.WithSubscriptionHeartbeatInterval(interval time.Duration)tHeartbeatInterval(multipartHeartbeatInterval), + core.WithSubscriptionHeartbeatInterval(subscriptionHeartbeatInterval), }, }, func(t *testing.T, xEnv *testenv.Environment) { EnsureTopicExists(t, xEnv, topics...) From 666e864c954fa44b22118b5866d0b99699163ac7 Mon Sep 17 00:00:00 2001 From: endigma Date: Wed, 27 Aug 2025 15:11:31 +0100 Subject: [PATCH 12/16] Fix typo, bump engine --- router-tests/go.mod | 2 +- router-tests/go.sum | 4 ++-- router-tests/http_subscriptions_test.go | 24 ------------------------ router/go.mod | 2 +- router/go.sum | 4 ++-- 5 files changed, 6 insertions(+), 30 deletions(-) diff --git a/router-tests/go.mod b/router-tests/go.mod index dba9735ccb..ca1d1e67ce 100644 --- a/router-tests/go.mod +++ b/router-tests/go.mod @@ -27,7 +27,7 @@ require ( github.com/wundergraph/cosmo/demo/pkg/subgraphs/projects v0.0.0-20250715110703-10f2e5f9c79e github.com/wundergraph/cosmo/router v0.0.0-20250820135159-bf8852195d3f github.com/wundergraph/cosmo/router-plugin v0.0.0-20250808194725-de123ba1c65e - github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250822091746-fadc6bc858a1 + github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.222.0.20250827140926-62e42e61483f go.opentelemetry.io/otel v1.36.0 go.opentelemetry.io/otel/sdk v1.36.0 go.opentelemetry.io/otel/sdk/metric v1.36.0 diff --git a/router-tests/go.sum b/router-tests/go.sum index 92d7df2663..3d65d84545 100644 --- a/router-tests/go.sum +++ b/router-tests/go.sum @@ -352,8 +352,8 @@ github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083 h1:8/D7f8gKxTB github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083/go.mod h1:eOTL6acwctsN4F3b7YE+eE2t8zcJ/doLm9sZzsxxxrE= github.com/wundergraph/consul/sdk v0.0.0-20250204115147-ed842a8fd301 h1:EzfKHQoTjFDDcgaECCCR2aTePqMu9QBmPbyhqIYOhV0= github.com/wundergraph/consul/sdk v0.0.0-20250204115147-ed842a8fd301/go.mod h1:wxI0Nak5dI5RvJuzGyiEK4nZj0O9X+Aw6U0tC1wPKq0= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250822091746-fadc6bc858a1 h1:n2kO8PPlnGBKujGdFN4eN5OdpXJ5EfFeh0NlpVZaAos= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250822091746-fadc6bc858a1/go.mod h1:DnYY1alnsgzkanSwbFiFIdXKOuf8dHQWQ2P4BzTc6aI= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.222.0.20250827140926-62e42e61483f h1:vzHSC25S4UlOOEgoXiUdWK5VKMDsTxki0x/Ikixqzog= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.222.0.20250827140926-62e42e61483f/go.mod h1:DnYY1alnsgzkanSwbFiFIdXKOuf8dHQWQ2P4BzTc6aI= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM= github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4= diff --git a/router-tests/http_subscriptions_test.go b/router-tests/http_subscriptions_test.go index 293980e9a8..2236bd8bd9 100644 --- a/router-tests/http_subscriptions_test.go +++ b/router-tests/http_subscriptions_test.go @@ -54,30 +54,6 @@ func readMultipartPrefix(reader *bufio.Reader) error { return nil } -func readSSEPrefix(reader *bufio.Reader) error { - eventNext, _, err := reader.ReadLine() - if err != nil { - return err - } - - switch string(eventNext) { - case "event: next": - case ":heartbeat": - blankFollower, _, err := reader.ReadLine() - if err != nil { - return err - } - - if len(blankFollower) != 0 { - return fmt.Errorf("expected blank follower, got %q", blankFollower) - } - default: - return fmt.Errorf("expected event: next or comment, got %q", eventNext) - } - - return nil -} - func TestHeartbeats(t *testing.T) { subscriptionHeartbeatInterval := time.Millisecond * 300 diff --git a/router/go.mod b/router/go.mod index d2a2fe4c4d..dd940d36a9 100644 --- a/router/go.mod +++ b/router/go.mod @@ -31,7 +31,7 @@ require ( github.com/tidwall/gjson v1.18.0 github.com/tidwall/sjson v1.2.5 github.com/twmb/franz-go v1.16.1 - github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250822091746-fadc6bc858a1 + github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.222.0.20250827140926-62e42e61483f // Do not upgrade, it renames attributes we rely on go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 go.opentelemetry.io/contrib/propagators/b3 v1.23.0 diff --git a/router/go.sum b/router/go.sum index 254a54a927..224951982c 100644 --- a/router/go.sum +++ b/router/go.sum @@ -317,8 +317,8 @@ github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/ github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083 h1:8/D7f8gKxTBjW+SZK4mhxTTBVpxcqeBgWF1Rfmltbfk= github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083/go.mod h1:eOTL6acwctsN4F3b7YE+eE2t8zcJ/doLm9sZzsxxxrE= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250822091746-fadc6bc858a1 h1:n2kO8PPlnGBKujGdFN4eN5OdpXJ5EfFeh0NlpVZaAos= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250822091746-fadc6bc858a1/go.mod h1:DnYY1alnsgzkanSwbFiFIdXKOuf8dHQWQ2P4BzTc6aI= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.222.0.20250827140926-62e42e61483f h1:vzHSC25S4UlOOEgoXiUdWK5VKMDsTxki0x/Ikixqzog= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.222.0.20250827140926-62e42e61483f/go.mod h1:DnYY1alnsgzkanSwbFiFIdXKOuf8dHQWQ2P4BzTc6aI= github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4= github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4= github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= From b6ee4ad60f472be55e0830281191fd72ce88e1bc Mon Sep 17 00:00:00 2001 From: endigma Date: Wed, 27 Aug 2025 16:14:31 +0100 Subject: [PATCH 13/16] Make SSE complete events work properly with graphql-sse --- router/core/flushwriter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/router/core/flushwriter.go b/router/core/flushwriter.go index 2b7132ce9d..90456ffc89 100644 --- a/router/core/flushwriter.go +++ b/router/core/flushwriter.go @@ -53,7 +53,7 @@ func (f *HttpFlushWriter) Complete() { return } if f.sse { - _, _ = f.writer.Write([]byte("event: complete")) + _, _ = f.writer.Write([]byte("event: complete\ndata: \n\n")) } else if f.multipart { // Write the final boundary in the multipart response if f.apolloSubscriptionMultipartPrintBoundary { From ed77e07b5a8c6bb61c2338d37517d7835402f729 Mon Sep 17 00:00:00 2001 From: endigma Date: Wed, 27 Aug 2025 16:18:04 +0100 Subject: [PATCH 14/16] Add empty data line to test --- router-tests/http_subscriptions_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/router-tests/http_subscriptions_test.go b/router-tests/http_subscriptions_test.go index 2236bd8bd9..da391d41bc 100644 --- a/router-tests/http_subscriptions_test.go +++ b/router-tests/http_subscriptions_test.go @@ -186,6 +186,11 @@ func TestHeartbeats(t *testing.T) { assert.Equal(t, "event: complete", line) }) + // Expect empty data line event + testenv.AwaitChannelWithT(t, 5*time.Second, lines, func(t *testing.T, line string) { + assert.Equal(t, "data: ", line) + }) + // Expect blank line after complete testenv.AwaitChannelWithT(t, 5*time.Second, lines, func(t *testing.T, line string) { assert.Equal(t, "", line) From f99e42a8e81f35d9a40d331d98188be20a138b21 Mon Sep 17 00:00:00 2001 From: endigma Date: Thu, 28 Aug 2025 10:08:03 +0100 Subject: [PATCH 15/16] Use release graphql-go-tools --- router-tests/go.mod | 2 +- router-tests/go.sum | 4 ++-- router/go.mod | 2 +- router/go.sum | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/router-tests/go.mod b/router-tests/go.mod index ca1d1e67ce..bec8f63fb7 100644 --- a/router-tests/go.mod +++ b/router-tests/go.mod @@ -27,7 +27,7 @@ require ( github.com/wundergraph/cosmo/demo/pkg/subgraphs/projects v0.0.0-20250715110703-10f2e5f9c79e github.com/wundergraph/cosmo/router v0.0.0-20250820135159-bf8852195d3f github.com/wundergraph/cosmo/router-plugin v0.0.0-20250808194725-de123ba1c65e - github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.222.0.20250827140926-62e42e61483f + github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.223 go.opentelemetry.io/otel v1.36.0 go.opentelemetry.io/otel/sdk v1.36.0 go.opentelemetry.io/otel/sdk/metric v1.36.0 diff --git a/router-tests/go.sum b/router-tests/go.sum index 3d65d84545..87ab146b7f 100644 --- a/router-tests/go.sum +++ b/router-tests/go.sum @@ -352,8 +352,8 @@ github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083 h1:8/D7f8gKxTB github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083/go.mod h1:eOTL6acwctsN4F3b7YE+eE2t8zcJ/doLm9sZzsxxxrE= github.com/wundergraph/consul/sdk v0.0.0-20250204115147-ed842a8fd301 h1:EzfKHQoTjFDDcgaECCCR2aTePqMu9QBmPbyhqIYOhV0= github.com/wundergraph/consul/sdk v0.0.0-20250204115147-ed842a8fd301/go.mod h1:wxI0Nak5dI5RvJuzGyiEK4nZj0O9X+Aw6U0tC1wPKq0= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.222.0.20250827140926-62e42e61483f h1:vzHSC25S4UlOOEgoXiUdWK5VKMDsTxki0x/Ikixqzog= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.222.0.20250827140926-62e42e61483f/go.mod h1:DnYY1alnsgzkanSwbFiFIdXKOuf8dHQWQ2P4BzTc6aI= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.223 h1:PUYcDoqkgqDnZVpO0c+y80kR308OQBtFzRPPegr0bIk= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.223/go.mod h1:DnYY1alnsgzkanSwbFiFIdXKOuf8dHQWQ2P4BzTc6aI= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM= github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4= diff --git a/router/go.mod b/router/go.mod index dd940d36a9..4954a40fe1 100644 --- a/router/go.mod +++ b/router/go.mod @@ -31,7 +31,7 @@ require ( github.com/tidwall/gjson v1.18.0 github.com/tidwall/sjson v1.2.5 github.com/twmb/franz-go v1.16.1 - github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.222.0.20250827140926-62e42e61483f + github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.223 // Do not upgrade, it renames attributes we rely on go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 go.opentelemetry.io/contrib/propagators/b3 v1.23.0 diff --git a/router/go.sum b/router/go.sum index 224951982c..a72296f7de 100644 --- a/router/go.sum +++ b/router/go.sum @@ -317,8 +317,8 @@ github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/ github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083 h1:8/D7f8gKxTBjW+SZK4mhxTTBVpxcqeBgWF1Rfmltbfk= github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083/go.mod h1:eOTL6acwctsN4F3b7YE+eE2t8zcJ/doLm9sZzsxxxrE= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.222.0.20250827140926-62e42e61483f h1:vzHSC25S4UlOOEgoXiUdWK5VKMDsTxki0x/Ikixqzog= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.222.0.20250827140926-62e42e61483f/go.mod h1:DnYY1alnsgzkanSwbFiFIdXKOuf8dHQWQ2P4BzTc6aI= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.223 h1:PUYcDoqkgqDnZVpO0c+y80kR308OQBtFzRPPegr0bIk= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.223/go.mod h1:DnYY1alnsgzkanSwbFiFIdXKOuf8dHQWQ2P4BzTc6aI= github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4= github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4= github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= From 6108dccfaf963aee58e74fed8c9b5d080c821e12 Mon Sep 17 00:00:00 2001 From: endigma Date: Thu, 28 Aug 2025 10:20:43 +0100 Subject: [PATCH 16/16] chore: don't block on channel close, could cause test timeouts --- router-tests/http_subscriptions_test.go | 5 +++-- router-tests/testenv/utils.go | 13 ++++++++++++- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/router-tests/http_subscriptions_test.go b/router-tests/http_subscriptions_test.go index da391d41bc..833334082b 100644 --- a/router-tests/http_subscriptions_test.go +++ b/router-tests/http_subscriptions_test.go @@ -108,8 +108,9 @@ func TestHeartbeats(t *testing.T) { } // Channel should be closed after all heartbeats are received - _, ok := <-messages - require.False(t, ok, "channel should be closed") + testenv.AwaitChannelWithCloseWithT(t, 5*time.Second, messages, func(t *testing.T, _ string, ok bool) { + require.False(t, ok, "channel should be closed") + }) }) }) diff --git a/router-tests/testenv/utils.go b/router-tests/testenv/utils.go index 47bac51fdd..bd4f1842db 100644 --- a/router-tests/testenv/utils.go +++ b/router-tests/testenv/utils.go @@ -7,7 +7,7 @@ import ( "github.com/stretchr/testify/require" ) -func AwaitChannelWithT[A any](t *testing.T, timeout time.Duration, ch <-chan A, f func(*testing.T, A), msgAndArgs ...interface{}) { +func AwaitChannelWithT[A any](t *testing.T, timeout time.Duration, ch <-chan A, f func(*testing.T, A), msgAndArgs ...any) { t.Helper() select { @@ -17,3 +17,14 @@ func AwaitChannelWithT[A any](t *testing.T, timeout time.Duration, ch <-chan A, require.Fail(t, "unable to receive message before timeout", msgAndArgs...) } } + +func AwaitChannelWithCloseWithT[A any](t *testing.T, timeout time.Duration, ch <-chan A, f func(t *testing.T, item A, ok bool), msgAndArgs ...any) { + t.Helper() + + select { + case args, ok := <-ch: + f(t, args, ok) + case <-time.After(timeout): + require.Fail(t, "unable to receive message before timeout", msgAndArgs...) + } +}