Skip to content

Add support for per message deflate to websocket connectinos#2536

Closed
dbinnersley wants to merge 1 commit intowundergraph:mainfrom
dbinnersley:per-message-deflate
Closed

Add support for per message deflate to websocket connectinos#2536
dbinnersley wants to merge 1 commit intowundergraph:mainfrom
dbinnersley:per-message-deflate

Conversation

@dbinnersley
Copy link
Copy Markdown

@dbinnersley dbinnersley commented Feb 19, 2026

Summary by CodeRabbit

  • New Features

    • Added per-message deflate compression support for WebSocket connections, reducing bandwidth usage.
    • New configuration options: WEBSOCKETS_COMPRESSION_ENABLED and WEBSOCKETS_COMPRESSION_LEVEL (1-9) for tuning compression behavior.
  • Tests

    • Added comprehensive test coverage for compression negotiation and operation across various configurations.

Checklist

This is reopening PR #2424 to add support for per-message-deflate to websockets. This is a requirement for us to be using websockets though cosmo-router

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Feb 19, 2026

Walkthrough

This PR adds per-message deflate compression support for GraphQL WebSocket connections. Changes include: configuration schema and types for compression settings, compression negotiation and frame handling logic in the WebSocket core, test infrastructure with compression-enabled dialers, and comprehensive test scenarios validating compression behavior across different configurations.

Changes

Cohort / File(s) Summary
Configuration and Schema
router/pkg/config/config.go, router/pkg/config/config.schema.json, router/pkg/config/fixtures/full.yaml, router/pkg/config/testdata/config_defaults.json, router/pkg/config/testdata/config_full.json
Added WebSocketCompressionConfiguration type with Enabled and Level fields, extended WebSocketConfiguration schema with compression settings (default level 6, range 1-9), and populated fixture and testdata files with example compression configurations.
Core WebSocket Compression Implementation
router/core/websocket.go
Added compressionEnabled and compressionLevel fields to wsConnectionWrapper and WebsocketHandler; updated newWSConnectionWrapper signature to accept compression parameters; reworked ReadJSON to decompress RSV1-marked frames via wsflate; reworked WriteText and WriteJSON to use new writeCompressed helper for compressed output; added permessage-deflate negotiation during WebSocket upgrade with tracking via compressionNegotiated flag.
Test Infrastructure and Scenarios
router-tests/testenv/testenv.go, router-tests/websocket_test.go
Added GraphQLWebsocketDialWithCompressionRetry and InitGraphQLWebSocketConnectionWithCompression helpers to support compression-enabled WebSocket connections in tests; introduced 8 new types (wsJSONMessage, wsCloseMessage, GraphQLWSSubscriptionMessage, GraphQLPayload, GraphQLWSSimpleResponse, GraphQLWSDataResponse, CountEmpData, CountEmpResponse) for WebSocket message modeling; added three test scenarios validating compression negotiation and operation with server/client compression enabled, disabled, and custom compression levels.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: adding per-message deflate compression support to WebSocket connections.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
router/core/websocket.go (1)

162-302: ⚠️ Potential issue | 🟠 Major

Handle fragmented frames when compression is enabled.

The current implementation will fail to correctly process fragmented compressed messages. According to gobwas/ws RFC 7692 implementation, wsflate.DecompressFrame explicitly rejects fragmented messages (when FIN ≠ 1) and only works on single-frame payloads. Additionally, per RFC 7692 rules, the RSV1 compression bit may only be set on the first frame of a message; continuation frames must not have RSV1 set.

The current code calls wsflate.DecompressFrame on individual frames without buffering continuation frames, which will:

  • Drop continuation frames after the first text/binary frame
  • Truncate large/fragmented messages
  • Desynchronize the stream if a client sends fragmented compressed messages

To handle fragmented compressed messages correctly, buffer data frames until the FIN bit is set, then decompress the reassembled payload. Alternatively, use the library's recommended streaming approach: wsutil.Reader with wsflate.MessageState as a RecvExtension, wrapped with wsflate.Reader for streaming decompression.

🔧 Suggested buffering of fragmented compressed messages
-	if c.compressionEnabled {
-		// Read frames directly and handle compression
-		controlHandler := wsutil.ControlFrameHandler(c.conn, ws.StateServerSide)
-		for {
-			frame, err := ws.ReadFrame(c.conn)
-			if err != nil {
-				return err
-			}
-
-			// Unmask client frames
-			if frame.Header.Masked {
-				ws.Cipher(frame.Payload, frame.Header.Mask, 0)
-			}
-
-			if frame.Header.OpCode.IsControl() {
-				if err := controlHandler(frame.Header, bytes.NewReader(frame.Payload)); err != nil {
-					return err
-				}
-				continue
-			}
-
-			if frame.Header.OpCode == ws.OpText || frame.Header.OpCode == ws.OpBinary {
-				// Check if frame is compressed (RSV1 bit set)
-				isCompressed, err := wsflate.IsCompressed(frame.Header)
-				if err != nil {
-					return err
-				}
-				if isCompressed {
-					frame, err = wsflate.DecompressFrame(frame)
-					if err != nil {
-						return err
-					}
-				}
-				text = frame.Payload
-				break
-			}
-		}
-	} else {
+	if c.compressionEnabled {
+		controlHandler := wsutil.ControlFrameHandler(c.conn, ws.StateServerSide)
+		var (
+			frame        ws.Frame
+			payload      []byte
+			isCompressed bool
+			op           ws.OpCode
+			started      bool
+		)
+		for {
+			frame, err = ws.ReadFrame(c.conn)
+			if err != nil {
+				return err
+			}
+			if frame.Header.Masked {
+				ws.Cipher(frame.Payload, frame.Header.Mask, 0)
+			}
+			if frame.Header.OpCode.IsControl() {
+				if err := controlHandler(frame.Header, bytes.NewReader(frame.Payload)); err != nil {
+					return err
+				}
+				continue
+			}
+			if !started {
+				if frame.Header.OpCode != ws.OpText && frame.Header.OpCode != ws.OpBinary {
+					continue
+				}
+				op = frame.Header.OpCode
+				started = true
+				isCompressed, err = wsflate.IsCompressed(frame.Header)
+				if err != nil {
+					return err
+				}
+			} else if frame.Header.OpCode != ws.OpContinuation {
+				return fmt.Errorf("unexpected opcode %v while waiting for continuation", frame.Header.OpCode)
+			}
+			payload = append(payload, frame.Payload...)
+			if frame.Header.Fin {
+				break
+			}
+		}
+		if isCompressed {
+			frame = ws.NewFrame(op, true, payload)
+			frame.Header.Rsv = ws.Rsv(true, false, false)
+			frame, err = wsflate.DecompressFrame(frame)
+			if err != nil {
+				return err
+			}
+			text = frame.Payload
+		} else {
+			text = payload
+		}
+	} else {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@router/core/websocket.go` around lines 162 - 302, ReadJSON currently calls
wsflate.DecompressFrame on single frames, which fails for fragmented compressed
messages; update ReadJSON to buffer data frames (accumulating Payloads and
respecting FIN) and only call wsflate.DecompressFrame on the reassembled message
when FIN is true, OR replace the manual frame loop with the library streaming
approach by using wsutil.Reader with wsflate.MessageState as a RecvExtension and
wrapping it with wsflate.Reader to read a full decompressed message; also adjust
writeCompressed/WriteText/WriteJSON usages if you pick streaming to ensure RSV1
is set only on the first frame and continuation frames are sent without RSV1.
Ensure you modify the wsConnectionWrapper.ReadJSON and related write paths
(writeCompressed) accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@router/core/websocket.go`:
- Around line 162-302: ReadJSON currently calls wsflate.DecompressFrame on
single frames, which fails for fragmented compressed messages; update ReadJSON
to buffer data frames (accumulating Payloads and respecting FIN) and only call
wsflate.DecompressFrame on the reassembled message when FIN is true, OR replace
the manual frame loop with the library streaming approach by using wsutil.Reader
with wsflate.MessageState as a RecvExtension and wrapping it with wsflate.Reader
to read a full decompressed message; also adjust
writeCompressed/WriteText/WriteJSON usages if you pick streaming to ensure RSV1
is set only on the first frame and continuation frames are sent without RSV1.
Ensure you modify the wsConnectionWrapper.ReadJSON and related write paths
(writeCompressed) accordingly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant