feat(grpc_datasource): add ConnectRPC transport support via RPCTransport abstraction#1453
feat(grpc_datasource): add ConnectRPC transport support via RPCTransport abstraction#1453fengyuwusong wants to merge 4 commits intowundergraph:masterfrom
Conversation
Introduce RPCTransport interface to abstract the transport protocol in grpc_datasource, enabling both gRPC and Connect protocol support. Key changes: - New RPCTransport interface with grpcTransport and connectTransport impls - connectTransport implements Connect protocol over HTTP (Protobuf/JSON) - DataSource.cc refactored to DataSource.transport (RPCTransport) - NewDataSourceGRPC convenience function for backward compatibility - NewFactoryConnect in graphql_datasource for Connect protocol entry - ConnectConfiguration for Connect-specific settings - 6 unit tests for connectTransport (protobuf, json, errors, headers) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…t transport Keys ending in "-bin" carry binary values per gRPC convention. The Connect protocol requires these to be base64-encoded when placed in HTTP headers. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
📝 WalkthroughWalkthroughAdds an abstract RPCTransport and a Connect (HTTP) transport implementation, refactors the gRPC datasource to depend on RPCTransport, wires Connect into planner/factory/configuration, and updates tests to exercise both gRPC and Connect transports. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as GraphQL Client
participant Engine as Execution Engine
participant Transport as RPCTransport
participant Server as Remote Server
Client->>Engine: Execute GraphQL operation
Engine->>Transport: Invoke(ctx, methodFullName, inputMessage)
alt Connect (HTTP) Transport
Transport->>Transport: Serialize input (proto or JSON)
Transport->>Transport: Map gRPC metadata -> HTTP headers (base64 for -bin)
Transport->>Server: HTTP POST /{methodFullName} (Content-Type + Connect-Protocol-Version)
else gRPC Transport
Transport->>Server: gRPC Invoke via ClientConnInterface
end
Server-->>Transport: Response (200 or error)
Transport->>Transport: Deserialize response or parse error
Transport->>Engine: Return outputMessage, error?
Engine->>Client: Return GraphQL result
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related issues
Possibly related PRs
🚥 Pre-merge checks | ✅ 2✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (2)
v2/pkg/engine/datasource/grpc_datasource/transport.go (1)
26-31: Add a nil guard ingrpcTransport.Invoketo avoid panic-on-misconfiguration.If
ccis nil, this path panics at runtime. Returning a typed error makes failures deterministic and easier to diagnose.🔧 Proposed defensive guard
import ( "context" + "errors" "google.golang.org/grpc" protoref "google.golang.org/protobuf/reflect/protoreflect" ) @@ func (t *grpcTransport) Invoke(ctx context.Context, method string, input, output protoref.Message) error { + if t == nil || t.cc == nil { + return errors.New("grpc transport: nil client connection") + } // grpc.ClientConnInterface.Invoke accepts (ctx, method, args any, reply any, opts ...grpc.CallOption). // protoref.Message satisfies the any constraint; variadic opts can be omitted. // This wrapper intentionally does not forward grpc.CallOption, as RPCTransport // is protocol-agnostic. The existing grpc_datasource code does not use any CallOption at the Invoke site. return t.cc.Invoke(ctx, method, input, output) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@v2/pkg/engine/datasource/grpc_datasource/transport.go` around lines 26 - 31, Add a nil-check guard at the start of grpcTransport.Invoke to avoid panics when the underlying grpc client connection is not set: if t.cc is nil, return a clear typed error (e.g., use errors.New or fmt.Errorf) that includes context like "grpcTransport: nil client connection" rather than calling t.cc.Invoke; keep the existing behavior unchanged when t.cc is non-nil so the method simply delegates to t.cc.Invoke(ctx, method, input, output).v2/pkg/engine/datasource/grpc_datasource/transport_connect.go (1)
119-127: Consider limiting response body size.The
io.ReadAllcall reads the entire response into memory without bounds. For exceptionally large responses, this could cause memory pressure.💡 Optional: Add a size limit for response bodies
- respBody, err := io.ReadAll(resp.Body) + // Limit response body to a reasonable size (e.g., 10MB) to prevent memory exhaustion. + const maxResponseSize = 10 * 1024 * 1024 + respBody, err := io.ReadAll(io.LimitReader(resp.Body, maxResponseSize))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@v2/pkg/engine/datasource/grpc_datasource/transport_connect.go` around lines 119 - 127, The call to io.ReadAll(resp.Body) in the connect response handling can OOM on very large responses; change it to read with a bounded size (e.g., use io.LimitReader or io.ReadAll(io.LimitReader(resp.Body, maxSize+1))) so you only buffer up to a defined max (choose a sensible constant like maxConnectBodyBytes), detect when the response exceeds that limit (if returned length > maxSize then return a specific error), and continue to use parseConnectError(resp.StatusCode, respBody) for normal-sized bodies; update the code that reads resp.Body (the resp variable in this block) and any error messages to surface “response too large” when the limit is hit.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@v2/pkg/engine/datasource/grpc_datasource/transport_connect.go`:
- Around line 119-127: The call to io.ReadAll(resp.Body) in the connect response
handling can OOM on very large responses; change it to read with a bounded size
(e.g., use io.LimitReader or io.ReadAll(io.LimitReader(resp.Body, maxSize+1)))
so you only buffer up to a defined max (choose a sensible constant like
maxConnectBodyBytes), detect when the response exceeds that limit (if returned
length > maxSize then return a specific error), and continue to use
parseConnectError(resp.StatusCode, respBody) for normal-sized bodies; update the
code that reads resp.Body (the resp variable in this block) and any error
messages to surface “response too large” when the limit is hit.
In `@v2/pkg/engine/datasource/grpc_datasource/transport.go`:
- Around line 26-31: Add a nil-check guard at the start of grpcTransport.Invoke
to avoid panics when the underlying grpc client connection is not set: if t.cc
is nil, return a clear typed error (e.g., use errors.New or fmt.Errorf) that
includes context like "grpcTransport: nil client connection" rather than calling
t.cc.Invoke; keep the existing behavior unchanged when t.cc is non-nil so the
method simply delegates to t.cc.Invoke(ctx, method, input, output).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: eb01c4c7-96f2-4e33-80ad-9b37e4a09b4a
📒 Files selected for processing (9)
v2/pkg/engine/datasource/graphql_datasource/configuration.gov2/pkg/engine/datasource/graphql_datasource/graphql_datasource.gov2/pkg/engine/datasource/grpc_datasource/grpc_datasource.gov2/pkg/engine/datasource/grpc_datasource/grpc_datasource_federation_test.gov2/pkg/engine/datasource/grpc_datasource/grpc_datasource_spy_test.gov2/pkg/engine/datasource/grpc_datasource/grpc_datasource_test.gov2/pkg/engine/datasource/grpc_datasource/transport.gov2/pkg/engine/datasource/grpc_datasource/transport_connect.gov2/pkg/engine/datasource/grpc_datasource/transport_connect_test.go
Add a `grpc_protocol` configuration section that allows users to choose between native gRPC and ConnectRPC on a per-subgraph basis. Connect subgraphs communicate over HTTP/1.1 with either Protobuf or JSON encoding, bypassing the need for HTTP/2 end-to-end. Key changes: - New `GRPCProtocolConfig` in config with global defaults and per-subgraph overrides for protocol and encoding - New `grpcprotocol` package with config validation, resolution, and transport builder - Factory resolver checks Connect transports before gRPC connector - Connect subgraphs skip native gRPC provider registration Depends on: wundergraph/graphql-go-tools#1453 Closes: wundergraph#2664 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…orts Address code review feedback: - Add nil check in grpcTransport.Invoke to prevent panics - Limit Connect response body to 10MB to prevent memory exhaustion Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (2)
v2/pkg/engine/datasource/grpc_datasource/transport_connect.go (2)
50-62: Consider validating BaseURL is non-empty.If
BaseURLis empty,Invokewill construct invalid URLs. A validation or early error would surface configuration mistakes sooner.Additionally, the URL construction on line 65 directly concatenates
baseURL + methodFullName. IfmethodFullNamedoesn't start with/, the resulting URL will be malformed (e.g.,http://localhost:8080foo.Bar/Method). Consider ensuring proper path joining.🔧 Proposed fix
func NewConnectTransport(config ConnectTransportConfig) RPCTransport { + if config.BaseURL == "" { + panic("connect: BaseURL is required") + } httpClient := config.HTTPClient if httpClient == nil { httpClient = http.DefaultClient } return &connectTransport{ baseURL: strings.TrimRight(config.BaseURL, "/"), httpClient: httpClient, encoding: config.Encoding, } } func (t *connectTransport) Invoke(ctx context.Context, methodFullName string, input, output protoref.Message) error { - url := t.baseURL + methodFullName + url := t.baseURL + "/" + strings.TrimLeft(methodFullName, "/")Also applies to: 64-65
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@v2/pkg/engine/datasource/grpc_datasource/transport_connect.go` around lines 50 - 62, NewConnectTransport currently allows an empty BaseURL and connectTransport.Invoke concatenates baseURL + methodFullName which can produce malformed URLs; update NewConnectTransport to validate config.BaseURL is non-empty (return an error or panic in constructor semantics used) and normalize it (trim trailing slash and ensure it includes scheme), and change connectTransport.Invoke to join paths safely by ensuring methodFullName begins with a single leading slash (or using a URL join helper) before concatenation; reference NewConnectTransport, connectTransport, baseURL and Invoke/methodFullName when making these changes.
139-144: Missing default case in unmarshal switch.The marshal switch (lines 81-90) has a default case returning an error for unsupported encodings, but the unmarshal switch lacks one. While currently unreachable (marshal would fail first), adding a default case maintains consistency and guards against future changes.
🔧 Proposed fix
switch t.encoding { case ConnectEncodingProtobuf: err = proto.Unmarshal(respBody, outputMsg) case ConnectEncodingJSON: err = protojson.Unmarshal(respBody, outputMsg) + default: + return fmt.Errorf("connect: unsupported encoding: %d", t.encoding) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@v2/pkg/engine/datasource/grpc_datasource/transport_connect.go` around lines 139 - 144, The unmarshal switch on t.encoding in transport_connect.go (the block that currently handles ConnectEncodingProtobuf and ConnectEncodingJSON into outputMsg) lacks a default branch; add a default case that assigns/returns a clear error for unsupported encodings (e.g. referencing t.encoding) so the unmarshal path mirrors the marshal switch and fails fast if a new/unknown encoding is encountered.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@v2/pkg/engine/datasource/grpc_datasource/transport_connect.go`:
- Around line 161-166: The parseConnectError function currently returns the
entire response body when json.Unmarshal(body, &ce) fails, which can be very
large; update parseConnectError to truncate the body before including it in the
error message (e.g., define a small constant like maxErrBody = 1024 and use the
first N bytes or indicate "<truncated>" when body length > N), so that on
unmarshal failure you still return fmt.Errorf("connect: HTTP %d: %s",
statusCode, truncatedBody) (keep using statusCode and body variables and retain
the existing ce/Unmarshal logic for the success path).
- Around line 124-127: The current read uses io.LimitReader which silently
truncates large responses and leads to misleading unmarshal errors; change the
read to create a named *io.LimitedReader (e.g., lr := io.LimitReader(resp.Body,
maxConnectResponseSize)), read from that into respBody, then after reading check
lr.N (the remaining bytes) — if lr.N == 0 then return a clear error like
"connect: response exceeds maxConnectResponseSize" instead of proceeding to
unmarshal; update the code around respBody, io.LimitReader and
maxConnectResponseSize in transport_connect.go accordingly.
---
Nitpick comments:
In `@v2/pkg/engine/datasource/grpc_datasource/transport_connect.go`:
- Around line 50-62: NewConnectTransport currently allows an empty BaseURL and
connectTransport.Invoke concatenates baseURL + methodFullName which can produce
malformed URLs; update NewConnectTransport to validate config.BaseURL is
non-empty (return an error or panic in constructor semantics used) and normalize
it (trim trailing slash and ensure it includes scheme), and change
connectTransport.Invoke to join paths safely by ensuring methodFullName begins
with a single leading slash (or using a URL join helper) before concatenation;
reference NewConnectTransport, connectTransport, baseURL and
Invoke/methodFullName when making these changes.
- Around line 139-144: The unmarshal switch on t.encoding in
transport_connect.go (the block that currently handles ConnectEncodingProtobuf
and ConnectEncodingJSON into outputMsg) lacks a default branch; add a default
case that assigns/returns a clear error for unsupported encodings (e.g.
referencing t.encoding) so the unmarshal path mirrors the marshal switch and
fails fast if a new/unknown encoding is encountered.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 6f08d99d-a900-4a4d-88a3-68d59d3d3e3c
📒 Files selected for processing (2)
v2/pkg/engine/datasource/grpc_datasource/transport.gov2/pkg/engine/datasource/grpc_datasource/transport_connect.go
🚧 Files skipped from review as they are similar to previous changes (1)
- v2/pkg/engine/datasource/grpc_datasource/transport.go
…odies - Read maxSize+1 bytes to detect truncation and return a clear "response body exceeds N bytes" error instead of cryptic unmarshal failures - Truncate non-JSON error bodies to 256 chars in parseConnectError to prevent log bloat and sensitive data exposure Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
🧹 Nitpick comments (1)
v2/pkg/engine/datasource/grpc_datasource/transport_connect.go (1)
142-147: Consider adding a default case for defensive consistency.The request marshaling switch (lines 81-90) has a default case that returns an error for unsupported encodings, but the response unmarshaling switch does not. While currently unreachable (since encoding is validated at request time), adding a default case would maintain symmetry and guard against future encoding additions.
♻️ Optional: add default case
switch t.encoding { case ConnectEncodingProtobuf: err = proto.Unmarshal(respBody, outputMsg) case ConnectEncodingJSON: err = protojson.Unmarshal(respBody, outputMsg) + default: + return fmt.Errorf("connect: unsupported encoding: %d", t.encoding) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@v2/pkg/engine/datasource/grpc_datasource/transport_connect.go` around lines 142 - 147, The response-unmarshal switch on t.encoding (cases ConnectEncodingProtobuf and ConnectEncodingJSON) lacks a default branch; add a default case that sets err to a descriptive error (e.g., "unsupported response encoding: %v", t.encoding) to mirror the request-side validation and defensively handle future encodings when unmarshaling respBody into outputMsg; ensure the function returns or propagates that err consistently with the existing error handling.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@v2/pkg/engine/datasource/grpc_datasource/transport_connect.go`:
- Around line 142-147: The response-unmarshal switch on t.encoding (cases
ConnectEncodingProtobuf and ConnectEncodingJSON) lacks a default branch; add a
default case that sets err to a descriptive error (e.g., "unsupported response
encoding: %v", t.encoding) to mirror the request-side validation and defensively
handle future encodings when unmarshaling respBody into outputMsg; ensure the
function returns or propagates that err consistently with the existing error
handling.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 0b24065d-aed2-455c-9c98-952012e854f4
📒 Files selected for processing (1)
v2/pkg/engine/datasource/grpc_datasource/transport_connect.go
|
Hi @fengyuwusong , This PR isn’t mergeable under our standards, as it lacks valuable unit and integration tests. The changes are also difficult to review due to limited context and supporting structure. Using AI to assist with code is absolutely fine, but we expect contributions to meet certain quality and clarity standards. To help clarify this, our CTO recently introduced a guideline that we hope you and the broader community will find useful: https://www.human-oss.dev/ Additionally, when working on new features, we recommend opening an issue first to gather feedback before starting implementation. I’m going to close this PR for now, but I hope this doesn’t discourage you—we really value contributions and would be happy to see a revised submission in the future. |
Summary by CodeRabbit
New Features
Refactor
Tests
Chores
Checklist
Motivation and Context
The Cosmo Router currently only supports native gRPC (HTTP/2 + Protobuf) for gRPC-based subgraphs. This creates friction in environments where HTTP/2 pass-through is difficult (load balancers, CDNs) or where teams use the ConnectRPC framework.
This PR adds ConnectRPC as an alternative transport protocol for gRPC datasources, allowing the router to communicate with gRPC subgraphs over HTTP/1.1 using either Protobuf or JSON encoding.
Related issue: wundergraph/cosmo#2664
Changes
New RPCTransport interface (transport.go)
Connect protocol implementation (transport_connect.go)
Integration with existing datasources
Test coverage