Skip to content

feat(router): improved heartbeats for subscriptions#2141

Merged
endigma merged 16 commits intomainfrom
jesse/eng-7547-heartbeat-on-sse
Aug 28, 2025
Merged

feat(router): improved heartbeats for subscriptions#2141
endigma merged 16 commits intomainfrom
jesse/eng-7547-heartbeat-on-sse

Conversation

@endigma
Copy link
Copy Markdown
Member

@endigma endigma commented Aug 14, 2025

Summary by CodeRabbit

  • New Features

    • Heartbeat support enabled for subscription streams (SSE and multipart) to keep live connections healthy.
  • Bug Fixes

    • Improved streaming framing for SSE completions and more reliable heartbeat delivery to reduce idle disconnects.
  • Tests

    • New integration tests verifying heartbeat timing, ordering, and stream closure for multipart and SSE subscriptions.
  • Chores

    • Updated dependency pin and renamed heartbeat configuration option for clearer subscription-focused settings.

Checklist

Incorporates wundergraph/graphql-go-tools#1269

Supercedes #2174

Closes ENG-7546
Closes ENG-7192

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Aug 14, 2025

Walkthrough

Rename multipart heartbeat config to a subscription-focused API, pin graphql-go-tools/v2 to v2.0.0-rc.223 in both modules, add Heartbeat() methods to HTTP and WebSocket writers and enable heartbeats for SSE, adjust SSE completion framing, add subscription heartbeat integration tests, and add a test helper to observe channel close semantics. (38 words)

Changes

Cohort / File(s) Summary of Changes
Dependency pinning
router/go.mod, router-tests/go.mod
Update required dependency github.com/wundergraph/graphql-go-tools/v2 to v2.0.0-rc.223.
HTTP/WebSocket response writers
router/core/flushwriter.go, router/core/websocket.go
Add Heartbeat() error: HttpFlushWriter.Heartbeat() checks request context and writes SSE :heartbeat\n\n or a multipart {} via existing Write/Flush; websocketResponseWriter.Heartbeat() added as a no-op. SSE Complete framing updated to include a data: line; heartbeat enabling condition broadened to `UseMultipart
API / config rename (core runtime)
router/core/router.go, router/core/router_config.go, router/core/executor.go, router/core/graph_server.go
Rename heartbeat identifiers and option: WithMultipartHeartbeatIntervalWithSubscriptionHeartbeatInterval; internal field multipartHeartbeatIntervalsubscriptionHeartbeatInterval; executor option MultipartSubHeartbeatIntervalSubscriptionHeartbeatInterval. Propagate renamed field into executor build options.
Tests — heartbeat option usage
router-tests/events/kafka_events_test.go, router-tests/events/nats_events_test.go, router-tests/events/redis_events_test.go
Replace calls to core.WithMultipartHeartbeatInterval(...) with core.WithSubscriptionHeartbeatInterval(...) and rename local variables; durations and test logic unchanged.
Integration tests — subscription heartbeats
router-tests/http_subscriptions_test.go
Add TestHeartbeats verifying multipart and SSE subscription heartbeats: framing, sequencing (payload, heartbeat, completion), and timing with a configured subscription heartbeat interval; add helper readMultipartPrefix and use testenv helpers to assert stream behavior.
Test utilities
router-tests/testenv/utils.go
Change AwaitChannelWithT variadic arg type from ...interface{}...any; add AwaitChannelWithCloseWithT to await a channel value and receive-ok flag (propagates close semantics to callback).

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch jesse/eng-7547-heartbeat-on-sse

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbit in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbit in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbit gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbit read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbit help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbit ignore or @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbit summary or @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbit or @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@endigma endigma force-pushed the jesse/eng-7547-heartbeat-on-sse branch from c754867 to 434833f Compare August 14, 2025 16:28
@endigma endigma marked this pull request as draft August 14, 2025 16:31
@github-actions
Copy link
Copy Markdown

github-actions Bot commented Aug 14, 2025

Router-nonroot image scan passed

✅ No security vulnerabilities found in image:

ghcr.io/wundergraph/cosmo/router:sha-a101be1c6720a3992e3f6dd018c443c32ed05754-nonroot

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
router/core/websocket.go (1)

630-634: WebSocket Heartbeat is a no-op: confirm upstream keep-alive strategy (Ping).

If the resolver’s heartbeat scheduler calls into this, connections will remain idle unless the protocol/server sends Pings elsewhere. If proxies/load balancers require server-initiated keep-alives, consider emitting a Ping here (if supported by wsproto) instead of a no-op.

If wsproto.Proto exposes a Ping method, this is a minimal change:

// If wsproto exposes Ping(), forward it here to keep WS connections alive.
func (rw *websocketResponseWriter) Heartbeat() error {
    type pinger interface{ Ping() error }
    if p, ok := any(rw.protocol).(pinger); ok {
        return p.Ping()
    }
    return nil
}

Otherwise, confirm there is another server-side ping loop for WS connections.

router/core/flushwriter.go (1)

80-106: Avoid name shadowing and use the shared heartbeat constant for multipart.

The local variable named heartbeat shadows the package-level constant and multipart uses a hard-coded "{}". Use the shared constant and simplify the SSE path to reduce confusion and ensure consistency.

Apply this diff:

 func (f *HttpFlushWriter) Heartbeat() error {
 	if err := f.ctx.Err(); err != nil {
 		return err
 	}
 
-	var heartbeat []byte
-	if f.sse {
-		heartbeat = []byte(":\n\n")
-
-		if _, err := f.writer.Write(heartbeat); err != nil {
+	if f.sse {
+		// SSE heartbeat: comment line per spec
+		if _, err := f.writer.Write([]byte(":\n\n")); err != nil {
 			return err
 		}
 
 		f.flusher.Flush()
 	} else if f.multipart {
-		if _, err := f.Write([]byte("{}")); err != nil {
+		// Multipart heartbeat: use shared constant "{}"
+		if _, err := f.Write([]byte(heartbeat)); err != nil {
 			return err
 		}
 
 		if err := f.Flush(); err != nil {
 			return err
 		}
 	}
 
 	return nil
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these settings in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 2e42a29 and 434833f.

⛔ Files ignored due to path filters (2)
  • router-tests/go.sum is excluded by !**/*.sum
  • router/go.sum is excluded by !**/*.sum
📒 Files selected for processing (4)
  • router-tests/go.mod (1 hunks)
  • router/core/flushwriter.go (2 hunks)
  • router/core/websocket.go (1 hunks)
  • router/go.mod (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: build_test
  • GitHub Check: build_push_image (nonroot)
  • GitHub Check: build_push_image
  • GitHub Check: integration_test (./telemetry)
  • GitHub Check: integration_test (./events)
  • GitHub Check: integration_test (./. ./fuzzquery ./lifecycle ./modules)
  • GitHub Check: build_test
  • GitHub Check: Analyze (go)
🔇 Additional comments (4)
router/go.mod (1)

34-34: Pinning graphql-go-tools/v2 to a specific pseudo-version

Great job pinning to a commit for reproducible builds—both router/go.mod and router-tests/go.mod now reference

github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.220.0.20250814103012-0f9f721ba378

Next, please confirm that your go.sum files include corresponding checksum entries for this version so that module sums stay in sync.

router-tests/go.mod (1)

30-30: LGTM: tests module pinned to the same graphql-go-tools/v2 pseudo-version.

Keeping router-tests in sync with router ensures consistent API expectations during CI.

router/core/flushwriter.go (2)

80-106: Potential concurrency interleaving between heartbeats and data writes.

Heartbeat writes directly to the underlying writer (SSE) or buffers then flushes (multipart). If the resolver can invoke Heartbeat concurrently with Write/Flush, outputs may interleave. If concurrency isn’t strictly serialized upstream, wrap HttpFlushWriter writes/flushes with a mutex.

I can draft a small refactor adding a sync.Mutex to HttpFlushWriter and guarding Write/Flush/Heartbeat paths if concurrency is possible here. Want me to propose it?


189-191: Enable heartbeats for SSE as well as multipart — good expansion.

This aligns the heartbeat behavior across streaming transports. Reads cleanly and preserves subscribe-once semantics elsewhere.

@endigma endigma force-pushed the jesse/eng-7547-heartbeat-on-sse branch 4 times, most recently from 7cc6cf0 to cd95bc2 Compare August 20, 2025 09:30
Comment thread router/core/executor.go Outdated
Comment thread router/core/flushwriter.go Outdated
Comment thread router/core/flushwriter.go
@endigma endigma force-pushed the jesse/eng-7547-heartbeat-on-sse branch 2 times, most recently from 06d9dcb to 405e361 Compare August 27, 2025 09:59
@endigma endigma marked this pull request as ready for review August 27, 2025 14:12
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
router/core/flushwriter.go (1)

51-57: SSE “complete” event should end with a blank line.

Per SSE framing, events are terminated by a double newline. Write "event: complete\n\n" to ensure clients can read the final event without blocking.

-	if f.sse {
-		_, _ = f.writer.Write([]byte("event: complete"))
+	if f.sse {
+		// SSE events must be terminated by a blank line
+		_, _ = f.writer.Write([]byte("event: complete\n\n"))
🧹 Nitpick comments (6)
router/core/router.go (2)

1544-1548: Update comment to reflect SSE support and WS no-op.

The option now affects HTTP subscriptions (multipart and SSE); WebSocket heartbeats are a no-op.

Apply:

-// WithSubscriptionHeartbeatInterval sets the interval for the engine to send heartbeats for multipart subscriptions.
+// WithSubscriptionHeartbeatInterval sets the interval for the engine to send heartbeats for HTTP subscriptions
+// (multipart and SSE). WebSocket subscriptions are unaffected.
 func WithSubscriptionHeartbeatInterval(interval time.Duration) Option {
   return func(r *Router) {
     r.subscriptionHeartbeatInterval = interval
   }
 }

1544-1548: Provide a deprecated alias to avoid breaking existing integrators.

Add a thin wrapper to preserve the old API name.

 func WithSubscriptionHeartbeatInterval(interval time.Duration) Option {
   return func(r *Router) {
     r.subscriptionHeartbeatInterval = interval
   }
 }
+
+// Deprecated: Use WithSubscriptionHeartbeatInterval.
+func WithMultipartHeartbeatInterval(interval time.Duration) Option {
+	return WithSubscriptionHeartbeatInterval(interval)
+}
router/core/flushwriter.go (1)

80-106: Avoid identifier shadowing and use the heartbeat constant.

Local variable heartbeat shadows the package-level heartbeat const and obscures intent. Also, reuse the constant for multipart to avoid magic strings.

Apply:

 func (f *HttpFlushWriter) Heartbeat() error {
 	if err := f.ctx.Err(); err != nil {
 		return err
 	}
 
-	var heartbeat []byte
 	if f.sse {
-		heartbeat = []byte(":heartbeat\n\n")
-
-		if _, err := f.writer.Write(heartbeat); err != nil {
+		if _, err := f.writer.Write([]byte(":heartbeat\n\n")); err != nil {
 			return err
 		}
 
 		f.flusher.Flush()
 	} else if f.multipart {
-		if _, err := f.Write([]byte("{}")); err != nil {
+		if _, err := f.Write([]byte(heartbeat)); err != nil {
 			return err
 		}
 
 		if err := f.Flush(); err != nil {
 			return err
 		}
 	}
 
 	return nil
 }
router-tests/http_subscriptions_test.go (3)

95-97: Remove stray println in test loop.

Leaking to stdout adds noise in CI logs.

-					fmt.Println(string(line))
-					messages <- string(line)
+					messages <- string(line)

100-108: Make multipart heartbeat assertions timing-robust.

Assumes exactly one heartbeat between items; with 300ms heartbeat vs 550ms data, more than one heartbeat could slip in. Consider asserting “at least one {} before next payload” (drain extra "{}" with a short non-blocking loop).


156-182: Relax SSE heartbeat expectations to tolerate multiple heartbeats.

Similarly, assert at least one ":heartbeat" block between “next” events, not exactly one. This reduces flakiness under scheduler jitter.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 434833f and 54835a1.

⛔ Files ignored due to path filters (2)
  • router-tests/go.sum is excluded by !**/*.sum
  • router/go.sum is excluded by !**/*.sum
📒 Files selected for processing (12)
  • router-tests/events/kafka_events_test.go (2 hunks)
  • router-tests/events/nats_events_test.go (2 hunks)
  • router-tests/events/redis_events_test.go (2 hunks)
  • router-tests/go.mod (1 hunks)
  • router-tests/http_subscriptions_test.go (1 hunks)
  • router/core/executor.go (1 hunks)
  • router/core/flushwriter.go (2 hunks)
  • router/core/graph_server.go (1 hunks)
  • router/core/router.go (1 hunks)
  • router/core/router_config.go (1 hunks)
  • router/core/websocket.go (1 hunks)
  • router/go.mod (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • router/core/websocket.go
  • router-tests/go.mod
  • router/go.mod
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-20T10:08:17.857Z
Learnt from: endigma
PR: wundergraph/cosmo#2155
File: router/core/router.go:1857-1866
Timestamp: 2025-08-20T10:08:17.857Z
Learning: router/pkg/config/config.schema.json forbids null values for traffic_shaping.subgraphs: additionalProperties references $defs.traffic_shaping_subgraph_request_rule with type "object". Therefore, in core.NewSubgraphTransportOptions, dereferencing each subgraph rule pointer is safe under schema-validated configs, and a nil-check is unnecessary.

Applied to files:

  • router/core/executor.go
🧬 Code graph analysis (3)
router-tests/events/nats_events_test.go (1)
router/core/router.go (1)
  • WithSubscriptionHeartbeatInterval (1545-1549)
router-tests/events/redis_events_test.go (1)
router/core/router.go (1)
  • WithSubscriptionHeartbeatInterval (1545-1549)
router-tests/http_subscriptions_test.go (3)
router-tests/testenv/testenv.go (2)
  • Run (105-122)
  • Environment (1727-1763)
router/core/router.go (1)
  • WithSubscriptionHeartbeatInterval (1545-1549)
router-tests/testenv/utils.go (1)
  • AwaitChannelWithT (10-19)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: build_test
  • GitHub Check: integration_test (./events)
  • GitHub Check: integration_test (./. ./fuzzquery ./lifecycle ./modules)
  • GitHub Check: integration_test (./telemetry)
  • GitHub Check: build_test
  • GitHub Check: build_push_image (nonroot)
  • GitHub Check: build_push_image
  • GitHub Check: Analyze (go)
🔇 Additional comments (9)
router/core/router_config.go (1)

109-121: No stale identifiers found – ready to merge

I’ve re-scanned the repository and confirmed that all legacy multipartHeartbeatInterval identifiers have been fully removed. The only remaining occurrences of “heartbeatInterval” are:

  • router/pkg/mcpserver/server.go: server.WithHeartbeatInterval(…) (mcpserver-specific, unaffected by subscription refactor)
  • router/core/executor.go: internal HeartbeatInterval field (intentionally named for engine heartbeats)
  • router/core/router.go & graph_server.go: uses of WithSubscriptionHeartbeatInterval and subscriptionHeartbeatInterval (new API)
  • All subscription tests: updated to use WithSubscriptionHeartbeatInterval and subscriptionHeartbeatInterval

No stale references to the old identifiers remain. Merging is safe.

router/core/graph_server.go (1)

1206-1207: Wiring to ExecutorBuildOptions.HeartbeatInterval is correct.

This correctly sources the new subscriptionHeartbeatInterval field.

router/core/executor.go (1)

86-88: ResolverOptions uses SubscriptionHeartbeatInterval as intended.

Mapping opts.HeartbeatInterval → resolve.ResolverOptions.SubscriptionHeartbeatInterval is correct.

router-tests/events/redis_events_test.go (1)

481-493: Tests updated to new option name LGTM.

Variable and option rename are consistent; helps catch regressions for multipart heartbeat.

router-tests/events/nats_events_test.go (2)

327-328: Rename to subscription heartbeat option looks correct.

Using core.WithSubscriptionHeartbeatInterval aligns with the new API and enables heartbeats for multipart subscriptions as intended.


381-382: Consistent option usage across tests.

Good switch to core.WithSubscriptionHeartbeatInterval here as well; keeps tests consistent with the router’s new config.

router-tests/events/kafka_events_test.go (1)

420-432: Heartbeat option rename is applied correctly.

subscriptionHeartbeatInterval + core.WithSubscriptionHeartbeatInterval(...) matches the new public API and semantics.

router/core/flushwriter.go (1)

189-191: Enable heartbeats for both multipart and SSE.

Setting ctx.ExecutionOptions.SendHeartbeat when either transport is active is the right behavior.

router-tests/http_subscriptions_test.go (1)

184-193: Completion framing check is good.

Validates “event: complete” followed by a blank line, matching SSE framing.

Copy link
Copy Markdown
Contributor

@StarpTech StarpTech left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
router/core/flushwriter.go (3)

80-105: Avoid local name shadowing and repeated allocations in Heartbeat.

The local variable named "heartbeat" shadows the package-level constant and allocates each tick. Reuse a precomputed SSE frame and the existing constant for multipart.

Apply:

-func (f *HttpFlushWriter) Heartbeat() error {
-	if err := f.ctx.Err(); err != nil {
-		return err
-	}
-
-	var heartbeat []byte
-	if f.sse {
-		heartbeat = []byte(":heartbeat\n\n")
-
-		if _, err := f.writer.Write(heartbeat); err != nil {
-			return err
-		}
-
-		f.flusher.Flush()
-	} else if f.multipart {
-		if _, err := f.Write([]byte("{}")); err != nil {
-			return err
-		}
-
-		if err := f.Flush(); err != nil {
-			return err
-		}
-	}
-
-	return nil
-}
+func (f *HttpFlushWriter) Heartbeat() error {
+	if err := f.ctx.Err(); err != nil {
+		return err
+	}
+	if f.sse {
+		if _, err := f.writer.Write(sseHeartbeat); err != nil {
+			return err
+		}
+		f.flusher.Flush()
+	} else if f.multipart {
+		if _, err := f.Write([]byte(heartbeat)); err != nil { // reuse package-level "{}"
+			return err
+		}
+		if err := f.Flush(); err != nil {
+			return err
+		}
+	} // else: no-op for non-streaming transports
+	return nil
+}

Also add near the consts:

+var sseHeartbeat = []byte(":heartbeat\n\n")

189-191: Don’t start heartbeats for subscribe-once responses.

Saves a ticker and avoids no-op callbacks after immediate close.

- if wgParams.UseMultipart || wgParams.UseSse {
-   ctx.ExecutionOptions.SendHeartbeat = true
- }
+ if (wgParams.UseMultipart || wgParams.UseSse) && !wgParams.SubscribeOnce {
+   ctx.ExecutionOptions.SendHeartbeat = true
+ }

197-199: Avoid allocation in heartbeat sentinel check.

Use bytes.Equal instead of string conversion.

- if string(resp) == heartbeat {
+ if bytes.Equal(resp, []byte(heartbeat)) {
   return resp, nil
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 54835a1 and 73904d9.

📒 Files selected for processing (1)
  • router/core/flushwriter.go (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
  • GitHub Check: build-router
  • GitHub Check: Analyze (javascript-typescript)
  • GitHub Check: Analyze (go)
  • GitHub Check: image_scan
  • GitHub Check: build_push_image (nonroot)
  • GitHub Check: image_scan (nonroot)
  • GitHub Check: integration_test (./telemetry)
  • GitHub Check: build_push_image
  • GitHub Check: integration_test (./. ./fuzzquery ./lifecycle ./modules)
  • GitHub Check: build_test
  • GitHub Check: integration_test (./events)
🔇 Additional comments (2)
router/core/flushwriter.go (2)

56-56: SSE complete frame formatting looks good.

"event: complete" with an empty data line is SSE-compliant and avoids client parsing edge cases.


80-105: Please verify that Heartbeat/Write/Flush calls on HttpFlushWriter are always serialized

I searched the codebase and did not find a SubscriptionResponseWriter interface or any production call‐sites invoking HttpFlushWriter.Heartbeat—only test routines reference it. Without evidence of single‐goroutine execution:

  • Confirm that the engine invokes Heartbeat, Write, and Flush on the same goroutine (i.e., serially).
  • If these methods can run concurrently (for example, a ticker triggering Heartbeat while subscription data is written), add a mutex to prevent interleaved writes.

Suggested change in router/core/flushwriter.go:

 type HttpFlushWriter struct {
   ctx           context.Context
   cancel        context.CancelFunc
   writer        io.Writer
   flusher       http.Flusher
+  mu            sync.Mutex
   subscribeOnce bool
   sse           bool
   multipart     bool
   buf           *bytes.Buffer
   firstMessage  bool
   apolloSubscriptionMultipartPrintBoundary bool
 }

 func (f *HttpFlushWriter) Heartbeat() error {
+  f.mu.Lock(); defer f.mu.Unlock()
   if err := f.ctx.Err(); err != nil {
     return err
   }
   // …
 }

 func (f *HttpFlushWriter) Write(p []byte) (n int, err error) {
+  f.mu.Lock(); defer f.mu.Unlock()
   // existing buffered write logic
 }

 func (f *HttpFlushWriter) Flush() error {
+  f.mu.Lock(); defer f.mu.Unlock()
   // existing flush logic
 }

@endigma endigma force-pushed the jesse/eng-7547-heartbeat-on-sse branch from c6b461a to ed77e07 Compare August 28, 2025 09:05
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (3)
router-tests/http_subscriptions_test.go (3)

3-15: Import strings for robust header parsing.

Upcoming changes below use strings.ToLower/HasPrefix; add the import.

 import (
   "bufio"
   "bytes"
   "fmt"
+  "strings"
   "net/http"
   "testing"
   "time"

80-99: Remove noisy println and increase channel buffer to avoid backpressure.

The fmt.Println is test noise; buffer(1) risks blocking if frames arrive quickly.

-      messages := make(chan string, 1)
+      messages := make(chan string, 16)
@@
-          fmt.Println(string(line))
           messages <- string(line)

156-182: Tolerate multiple or missing heartbeats between SSE events
SSE tests currently expect a heartbeat immediately after each data frame; refactor to ignore any number of blank/heartbeat lines until the next “event: next” or to count heartbeats separately (assert N ≥ 1 between data frames).

Verify test stability locally
In your local Go module environment, run:

go test ./router-tests -run TestHeartbeats -count=50 -timeout=10m -v

to stress the SSE heartbeat test and catch any ordering flakes.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between c6b461a and ed77e07.

⛔ Files ignored due to path filters (2)
  • router-tests/go.sum is excluded by !**/*.sum
  • router/go.sum is excluded by !**/*.sum
📒 Files selected for processing (12)
  • router-tests/events/kafka_events_test.go (2 hunks)
  • router-tests/events/nats_events_test.go (2 hunks)
  • router-tests/events/redis_events_test.go (2 hunks)
  • router-tests/go.mod (1 hunks)
  • router-tests/http_subscriptions_test.go (1 hunks)
  • router/core/executor.go (1 hunks)
  • router/core/flushwriter.go (3 hunks)
  • router/core/graph_server.go (1 hunks)
  • router/core/router.go (1 hunks)
  • router/core/router_config.go (1 hunks)
  • router/core/websocket.go (1 hunks)
  • router/go.mod (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (11)
  • router/core/graph_server.go
  • router/go.mod
  • router-tests/events/nats_events_test.go
  • router/core/router.go
  • router/core/websocket.go
  • router-tests/go.mod
  • router-tests/events/kafka_events_test.go
  • router-tests/events/redis_events_test.go
  • router/core/executor.go
  • router/core/flushwriter.go
  • router/core/router_config.go
🧰 Additional context used
🧬 Code graph analysis (1)
router-tests/http_subscriptions_test.go (3)
router-tests/testenv/testenv.go (1)
  • Run (105-122)
router/core/router.go (1)
  • WithSubscriptionHeartbeatInterval (1545-1549)
router-tests/testenv/utils.go (1)
  • AwaitChannelWithT (10-19)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
  • GitHub Check: build-router
  • GitHub Check: build_push_image (nonroot)
  • GitHub Check: integration_test (./. ./fuzzquery ./lifecycle ./modules)
  • GitHub Check: image_scan (nonroot)
  • GitHub Check: build_push_image
  • GitHub Check: image_scan
  • GitHub Check: integration_test (./telemetry)
  • GitHub Check: integration_test (./events)
  • GitHub Check: build_test
  • GitHub Check: Analyze (javascript-typescript)
  • GitHub Check: Analyze (go)

Comment thread router-tests/http_subscriptions_test.go
Comment thread router-tests/http_subscriptions_test.go
Comment thread router-tests/http_subscriptions_test.go
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
router-tests/testenv/utils.go (2)

13-18: Avoid timer goroutine leak; also rename local var for clarity.
Use a real timer instead of time.After and rename args→item to not clash mentally with msgAndArgs.

-	select {
-	case args := <-ch:
-		f(t, args)
-	case <-time.After(timeout):
-		require.Fail(t, "unable to receive message before timeout", msgAndArgs...)
-	}
+	timer := time.NewTimer(timeout)
+	defer timer.Stop()
+	select {
+	case item := <-ch:
+		f(t, item)
+	case <-timer.C:
+		require.Fail(t, "unable to receive message before timeout", msgAndArgs...)
+	}

21-30: Rename AwaitChannelWithCloseWithT to AwaitChannelWithCloseT and apply a timer.NewTimer pattern
Use a shorter name and mirror the existing timer pattern to ensure the timeout goroutine is stopped.

-func AwaitChannelWithCloseWithT[A any](t *testing.T, timeout time.Duration, ch <-chan A, f func(t *testing.T, item A, ok bool), msgAndArgs ...any) {
+func AwaitChannelWithCloseT[A any](t *testing.T, timeout time.Duration, ch <-chan A, f func(t *testing.T, item A, ok bool), msgAndArgs ...any) {
 	t.Helper()
-	select {
-	case args, ok := <-ch:
-		f(t, args, ok)
-	case <-time.After(timeout):
-		require.Fail(t, "unable to receive message before timeout", msgAndArgs...)
-	}
+	timer := time.NewTimer(timeout)
+	defer timer.Stop()
+	select {
+	case item, ok := <-ch:
+		f(t, item, ok)
+	case <-timer.C:
+		require.Fail(t, "unable to receive message before timeout", msgAndArgs...)
+	}
 }

Also update the call site in router-tests/http_subscriptions_test.go:111.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between f99e42a and 6108dcc.

📒 Files selected for processing (2)
  • router-tests/http_subscriptions_test.go (1 hunks)
  • router-tests/testenv/utils.go (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • router-tests/http_subscriptions_test.go
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: endigma
PR: wundergraph/cosmo#2141
File: router-tests/http_subscriptions_test.go:100-108
Timestamp: 2025-08-28T09:18:10.085Z
Learning: In router-tests/http_subscriptions_test.go heartbeat tests, the message ordering should remain strict with data messages followed by heartbeat messages, as the timing is deterministic and known by design in the Cosmo router implementation.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
  • GitHub Check: image_scan (nonroot)
  • GitHub Check: build_push_image
  • GitHub Check: build_push_image (nonroot)
  • GitHub Check: image_scan
  • GitHub Check: integration_test (./events)
  • GitHub Check: integration_test (./. ./fuzzquery ./lifecycle ./modules)
  • GitHub Check: integration_test (./telemetry)
  • GitHub Check: build_test
  • GitHub Check: build-router
  • GitHub Check: Analyze (go)
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (1)
router-tests/testenv/utils.go (1)

10-10: LGTM: switch to ...any is idiomatic and preserves behavior.

@endigma endigma merged commit ca1861b into main Aug 28, 2025
31 checks passed
@endigma endigma deleted the jesse/eng-7547-heartbeat-on-sse branch August 28, 2025 09:33
@Noroth Noroth mentioned this pull request Sep 30, 2025
5 tasks
@coderabbitai coderabbitai Bot mentioned this pull request Oct 2, 2025
5 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants