From 2c8275f8eb69e40c602028aef3e02924390ceafe Mon Sep 17 00:00:00 2001 From: Dominik Korittki <23359034+dkorittki@users.noreply.github.com> Date: Fri, 28 Nov 2025 11:03:27 +0100 Subject: [PATCH 1/8] fix: flush errors on sse streams --- router-tests/http_subscriptions_test.go | 59 ++++++++++++++++++++++++- router/core/graphql_handler.go | 4 ++ 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/router-tests/http_subscriptions_test.go b/router-tests/http_subscriptions_test.go index 833334082b..794c2f503f 100644 --- a/router-tests/http_subscriptions_test.go +++ b/router-tests/http_subscriptions_test.go @@ -136,9 +136,9 @@ func TestHeartbeats(t *testing.T) { resp, err := client.Do(req) require.NoError(t, err) + defer resp.Body.Close() require.Equal(t, http.StatusOK, resp.StatusCode) - defer resp.Body.Close() reader := bufio.NewReader(resp.Body) lines := make(chan string, 50) @@ -198,4 +198,61 @@ func TestHeartbeats(t *testing.T) { }) }) }) + + t.Run("should write an error on sse", func(t *testing.T) { + testenv.Run(t, &testenv.Config{ + RouterOptions: []core.Option{ + core.WithSubscriptionHeartbeatInterval(subscriptionHeartbeatInterval), + }, + Subgraphs: testenv.SubgraphsConfig{ + Employees: testenv.SubgraphConfig{ + Middleware: func(h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusForbidden) + _, _ = w.Write([]byte(`{"errors":[{"message":"Subgraph forbidden","extensions":{"code":"FORBIDDEN"}}]}`)) + }) + }, + }, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + client := http.Client{ + Timeout: time.Second * 100, + } + + subscribePayload := []byte(`{"query":"subscription { countEmp(max: 5, intervalMilliseconds: 550) }"}`) + + req, err := http.NewRequest(http.MethodPost, xEnv.GraphQLRequestURL(), bytes.NewReader(subscribePayload)) + require.NoError(t, err) + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "text/event-stream") + req.Header.Set("Connection", "keep-alive") + req.Header.Set("Cache-Control", "no-cache") + + resp, err := client.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + require.Equal(t, http.StatusOK, resp.StatusCode) + + reader := bufio.NewReader(resp.Body) + var actualLines []string + for { + line, _, err := reader.ReadLine() + if err != nil { + break + } + actualLines = append(actualLines, string(line)) + } + + expectedLines := []string{ + "event: next", + `data: {"errors":[{"message":"Subscription Upgrade request failed for Subgraph 'employees'.","extensions":{"statusCode":403}}],"data":null}`, + "", + "", + } + + assert.Equal(t, expectedLines, actualLines) + }) + }) } diff --git a/router/core/graphql_handler.go b/router/core/graphql_handler.go index 29a8772ca8..df9ca9cb2b 100644 --- a/router/core/graphql_handler.go +++ b/router/core/graphql_handler.go @@ -438,6 +438,10 @@ func (h *GraphQLHandler) WriteError(ctx *resolve.Context, err error, res *resolv if wsRw, ok := w.(*websocketResponseWriter); ok { _ = wsRw.Flush() } + + if httpRw, ok := w.(*HttpFlushWriter); ok { + _ = httpRw.Flush() + } } func (h *GraphQLHandler) setDebugCacheHeaders(w http.ResponseWriter, opCtx *operationContext) { From 060e090a3c06f15021508fa202b160bab517c19e Mon Sep 17 00:00:00 2001 From: Dominik Korittki <23359034+dkorittki@users.noreply.github.com> Date: Wed, 3 Dec 2025 10:28:42 +0100 Subject: [PATCH 2/8] chore: keep timeouts in mind in test --- router-tests/http_subscriptions_test.go | 39 ++++++++++++++++--------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/router-tests/http_subscriptions_test.go b/router-tests/http_subscriptions_test.go index 794c2f503f..76e250cc69 100644 --- a/router-tests/http_subscriptions_test.go +++ b/router-tests/http_subscriptions_test.go @@ -236,23 +236,34 @@ func TestHeartbeats(t *testing.T) { require.Equal(t, http.StatusOK, resp.StatusCode) reader := bufio.NewReader(resp.Body) - var actualLines []string - for { - line, _, err := reader.ReadLine() - if err != nil { - break + lines := make(chan string, 50) + + go func() { + defer close(lines) + for { + line, _, err := reader.ReadLine() + if err != nil { + return + } + lines <- string(line) } - actualLines = append(actualLines, string(line)) - } + }() - expectedLines := []string{ - "event: next", - `data: {"errors":[{"message":"Subscription Upgrade request failed for Subgraph 'employees'.","extensions":{"statusCode":403}}],"data":null}`, - "", - "", - } + testenv.AwaitChannelWithT(t, 5*time.Second, lines, func(t *testing.T, line string) { + assert.Equal(t, "event: next", line) + }) + + testenv.AwaitChannelWithT(t, 5*time.Second, lines, func(t *testing.T, line string) { + assert.Equal(t, `data: {"errors":[{"message":"Subscription Upgrade request failed for Subgraph 'employees'.","extensions":{"statusCode":403}}],"data":null}`, line) + }) + + testenv.AwaitChannelWithT(t, 5*time.Second, lines, func(t *testing.T, line string) { + assert.Equal(t, "", line) + }) - assert.Equal(t, expectedLines, actualLines) + testenv.AwaitChannelWithT(t, 5*time.Second, lines, func(t *testing.T, line string) { + assert.Equal(t, "", line) + }) }) }) } From 0b323e05dbef67485b7ae854a08771a059878cb6 Mon Sep 17 00:00:00 2001 From: Dominik Korittki <23359034+dkorittki@users.noreply.github.com> Date: Tue, 9 Dec 2025 13:46:59 +0100 Subject: [PATCH 3/8] fix: send complete event --- router/core/websocket.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/router/core/websocket.go b/router/core/websocket.go index 15ab328c59..3bbbab3e5f 100644 --- a/router/core/websocket.go +++ b/router/core/websocket.go @@ -1060,6 +1060,8 @@ func (h *WebSocketConnectionHandler) executeSubscription(registration *Subscript if err != nil { h.logger.Warn("Resolving GraphQL subscription", zap.Error(err)) h.graphqlHandler.WriteError(resolveCtx, err, p.Response.Response, rw) + _ = rw.Flush() + rw.Complete() return } } From 3a2dee4703ced315d11d2dc653cd3c2bd1baf0f4 Mon Sep 17 00:00:00 2001 From: Dominik Korittki <23359034+dkorittki@users.noreply.github.com> Date: Mon, 12 Jan 2026 16:16:58 +0100 Subject: [PATCH 4/8] fix: send complete event + set seperators correctly After an SSE error is flushed on the connection the router will sent an complete event to tell the client the error is terminal. Also a bug got fixed alongside this where too many newlines are inserted between the error and complete event. This happened because the payload already contained a newline. This is now correctly trimmed before the seperators are added to the response payload during flushing. --- router-tests/http_subscriptions_test.go | 8 ++++++++ router/core/flushwriter.go | 4 ++++ router/core/graphql_handler.go | 1 + 3 files changed, 13 insertions(+) diff --git a/router-tests/http_subscriptions_test.go b/router-tests/http_subscriptions_test.go index 76e250cc69..ea3fb4dd2b 100644 --- a/router-tests/http_subscriptions_test.go +++ b/router-tests/http_subscriptions_test.go @@ -261,6 +261,14 @@ func TestHeartbeats(t *testing.T) { assert.Equal(t, "", line) }) + testenv.AwaitChannelWithT(t, 5*time.Second, lines, func(t *testing.T, line string) { + assert.Equal(t, "event: complete", line) + }) + + testenv.AwaitChannelWithT(t, 5*time.Second, lines, func(t *testing.T, line string) { + assert.Equal(t, "data: ", line) + }) + testenv.AwaitChannelWithT(t, 5*time.Second, lines, func(t *testing.T, line string) { assert.Equal(t, "", line) }) diff --git a/router/core/flushwriter.go b/router/core/flushwriter.go index 90456ffc89..635d3edfae 100644 --- a/router/core/flushwriter.go +++ b/router/core/flushwriter.go @@ -143,6 +143,10 @@ func (f *HttpFlushWriter) Flush() (err error) { separation = "" } + if bytes.HasSuffix(resp, []byte{'\n'}) { + resp = bytes.TrimRight(resp, "\n") + } + full := flushBreak + string(resp) + separation _, err = f.writer.Write([]byte(full)) if err != nil { diff --git a/router/core/graphql_handler.go b/router/core/graphql_handler.go index b3ef7a3ba8..9e21c16764 100644 --- a/router/core/graphql_handler.go +++ b/router/core/graphql_handler.go @@ -438,6 +438,7 @@ func (h *GraphQLHandler) WriteError(ctx *resolve.Context, err error, res *resolv if httpRw, ok := w.(*HttpFlushWriter); ok { _ = httpRw.Flush() + httpRw.Complete() } } From 12969cebccf1f11678a64fcaff5c2846437e47ee Mon Sep 17 00:00:00 2001 From: Dominik Korittki <23359034+dkorittki@users.noreply.github.com> Date: Tue, 13 Jan 2026 14:52:17 +0100 Subject: [PATCH 5/8] fix: don't send complete when writing errors When writing errors no complete events should be sent. This is not the task of the WriteError method. The callee of WriteError might want to keep the connection open. It's the responsibility of the callee wether to send complete events or not. This change makes the writeError behave the same for sse as it does for websockets, where no connections are marked as terminal either. --- router/core/graphql_handler.go | 1 - 1 file changed, 1 deletion(-) diff --git a/router/core/graphql_handler.go b/router/core/graphql_handler.go index 658dc7fa25..1e2c277828 100644 --- a/router/core/graphql_handler.go +++ b/router/core/graphql_handler.go @@ -436,7 +436,6 @@ func (h *GraphQLHandler) WriteError(ctx *resolve.Context, err error, res *resolv if httpRw, ok := w.(*HttpFlushWriter); ok { _ = httpRw.Flush() - httpRw.Complete() } } From 414198014841a756dc815602b51603e27f64f537 Mon Sep 17 00:00:00 2001 From: Dominik Korittki <23359034+dkorittki@users.noreply.github.com> Date: Wed, 14 Jan 2026 10:27:22 +0100 Subject: [PATCH 6/8] fix: adjust test to match expectation --- router-tests/http_subscriptions_test.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/router-tests/http_subscriptions_test.go b/router-tests/http_subscriptions_test.go index ea3fb4dd2b..7f88e6cfdf 100644 --- a/router-tests/http_subscriptions_test.go +++ b/router-tests/http_subscriptions_test.go @@ -260,18 +260,6 @@ func TestHeartbeats(t *testing.T) { testenv.AwaitChannelWithT(t, 5*time.Second, lines, func(t *testing.T, line string) { assert.Equal(t, "", line) }) - - testenv.AwaitChannelWithT(t, 5*time.Second, lines, func(t *testing.T, line string) { - assert.Equal(t, "event: complete", line) - }) - - testenv.AwaitChannelWithT(t, 5*time.Second, lines, func(t *testing.T, line string) { - assert.Equal(t, "data: ", line) - }) - - testenv.AwaitChannelWithT(t, 5*time.Second, lines, func(t *testing.T, line string) { - assert.Equal(t, "", line) - }) }) }) } From c2f1214d773c05002576642a3d2183d0ef4e6a8e Mon Sep 17 00:00:00 2001 From: Dominik Korittki <23359034+dkorittki@users.noreply.github.com> Date: Wed, 14 Jan 2026 15:22:34 +0100 Subject: [PATCH 7/8] fix: remove flush + complete from async subscription resolve --- router/core/websocket.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/router/core/websocket.go b/router/core/websocket.go index 9936c0aa27..35d6ffc304 100644 --- a/router/core/websocket.go +++ b/router/core/websocket.go @@ -1063,8 +1063,6 @@ func (h *WebSocketConnectionHandler) executeSubscription(registration *Subscript if err != nil { h.logger.Warn("Resolving GraphQL subscription", zap.Error(err)) h.graphqlHandler.WriteError(resolveCtx, err, p.Response.Response, rw) - _ = rw.Flush() - rw.Complete() return } } From 9b857101440b4e3aec147c105794e8133b255f73 Mon Sep 17 00:00:00 2001 From: Dominik Korittki <23359034+dkorittki@users.noreply.github.com> Date: Thu, 22 Jan 2026 14:18:56 +0100 Subject: [PATCH 8/8] chore: use unified interface for flush call --- router/core/flushwriter.go | 2 ++ router/core/graphql_handler.go | 8 ++------ 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/router/core/flushwriter.go b/router/core/flushwriter.go index 635d3edfae..5e80248012 100644 --- a/router/core/flushwriter.go +++ b/router/core/flushwriter.go @@ -143,6 +143,8 @@ func (f *HttpFlushWriter) Flush() (err error) { separation = "" } + // resp sometimes ends with newlines. We need to remove them + // to cleanly add the seperation in the next step. if bytes.HasSuffix(resp, []byte{'\n'}) { resp = bytes.TrimRight(resp, "\n") } diff --git a/router/core/graphql_handler.go b/router/core/graphql_handler.go index 1e2c277828..f5475997de 100644 --- a/router/core/graphql_handler.go +++ b/router/core/graphql_handler.go @@ -430,12 +430,8 @@ func (h *GraphQLHandler) WriteError(ctx *resolve.Context, err error, res *resolv } } - if wsRw, ok := w.(*websocketResponseWriter); ok { - _ = wsRw.Flush() - } - - if httpRw, ok := w.(*HttpFlushWriter); ok { - _ = httpRw.Flush() + if flusher, ok := w.(resolve.SubscriptionResponseWriter); ok { + _ = flusher.Flush() } }