Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export class TraceRepository {
SpanAttributes['wg.router.version'] as attrRouterVersion,
SpanAttributes['wg.operation.persisted_id'] as attrOperationPersistedId,
SpanAttributes['wg.federated_graph.id'] as attrFederatedGraphId,
SpanAttributes['wg.operation.protocol'] as attrOperationProtocol,
Comment thread
SkArchon marked this conversation as resolved.
SpanAttributes['wg.operation.batching.is_batched'] as attrIsBatched,
SpanAttributes['wg.operation.batching.operations_count'] as attrBatchedOperationsCount,
SpanAttributes['wg.operation.batching.operation_index'] as attrWgBatchedOperationIndex
Expand Down Expand Up @@ -136,6 +137,7 @@ export class TraceRepository {
operationPersistedID: result.attrOperationPersistedId,
federatedGraphID: result.attrFederatedGraphId,
isBatched: result.attrIsBatched,
operationProtocol: result.attrOperationProtocol,
batchedOperationsCount: result.attrBatchedOperationsCount,
batchedOperationIndex: result.attrWgBatchedOperationIndex,
},
Expand Down
81 changes: 62 additions & 19 deletions router-tests/router_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package integration

import (
"fmt"
"github.com/wundergraph/cosmo/router/pkg/otel"
"testing"
"time"

Expand Down Expand Up @@ -95,31 +96,33 @@ func TestRouterPlugin(t *testing.T) {
}

func TestVerifyTelemetryForRouterPluginRequests(t *testing.T) {
exporter := tracetest.NewInMemoryExporter(t)
t.Parallel()

testenv.Run(t,
&testenv.Config{
TraceExporter: exporter,
RouterConfigJSONTemplate: testenv.ConfigWithPluginsJSONTemplate,
Plugins: testenv.PluginConfig{
Enabled: true,
Path: "../router/plugins",
},
},
func(t *testing.T, xEnv *testenv.Environment) {
t.Run("query projects simple", func(t *testing.T) {
t.Parallel()
t.Run("query projects simple", func(t *testing.T) {
t.Parallel()

exporter := tracetest.NewInMemoryExporter(t)

testenv.Run(t,
&testenv.Config{
TraceExporter: exporter,
RouterConfigJSONTemplate: testenv.ConfigWithPluginsJSONTemplate,
Plugins: testenv.PluginConfig{
Enabled: true,
Path: "../router/plugins",
},
},
func(t *testing.T, xEnv *testenv.Environment) {
queryName := "query sample"
response := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: fmt.Sprintf(`%s { projects { id name } }`, queryName),
Query: fmt.Sprintf(`%s { a:projects { id name }, e:projects { id name } }`, queryName),
})

expected := `{"data":{"projects":[{"id":"1","name":"Cloud Migration Overhaul"},{"id":"2","name":"Microservices Revolution"},{"id":"3","name":"AI-Powered Analytics"},{"id":"4","name":"DevOps Transformation"},{"id":"5","name":"Security Overhaul"},{"id":"6","name":"Mobile App Development"},{"id":"7","name":"Data Lake Implementation"}]}}`
expected := `{"data":{"a":[{"id":"1","name":"Cloud Migration Overhaul"},{"id":"2","name":"Microservices Revolution"},{"id":"3","name":"AI-Powered Analytics"},{"id":"4","name":"DevOps Transformation"},{"id":"5","name":"Security Overhaul"},{"id":"6","name":"Mobile App Development"},{"id":"7","name":"Data Lake Implementation"}],"e":[{"id":"1","name":"Cloud Migration Overhaul"},{"id":"2","name":"Microservices Revolution"},{"id":"3","name":"AI-Powered Analytics"},{"id":"4","name":"DevOps Transformation"},{"id":"5","name":"Security Overhaul"},{"id":"6","name":"Mobile App Development"},{"id":"7","name":"Data Lake Implementation"}]}}`
require.Equal(t, expected, response.Body)

snapshots := exporter.GetSpans().Snapshots()
require.Len(t, snapshots, 8)
require.Len(t, snapshots, 10)

queryNameInstances := 0
for _, sn := range snapshots {
Expand All @@ -128,10 +131,50 @@ func TestVerifyTelemetryForRouterPluginRequests(t *testing.T) {
}
}

// Normal http spans would have query sample twice
require.Equal(t, queryNameInstances, 1)
require.Equal(t, queryNameInstances, 3)
})
})
})

t.Run("verify each invocation having its span", func(t *testing.T) {
t.Parallel()

exporter := tracetest.NewInMemoryExporter(t)

testenv.Run(t,
&testenv.Config{
TraceExporter: exporter,
RouterConfigJSONTemplate: testenv.ConfigWithPluginsJSONTemplate,
Plugins: testenv.PluginConfig{
Enabled: true,
Path: "../router/plugins",
},
},
func(t *testing.T, xEnv *testenv.Environment) {
response := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `query projects { a: projects { id name } b:project(id: 2) { id } }`,
})

expected := `{"data":{"a":[{"id":"1","name":"Cloud Migration Overhaul"},{"id":"2","name":"Microservices Revolution"},{"id":"3","name":"AI-Powered Analytics"},{"id":"4","name":"DevOps Transformation"},{"id":"5","name":"Security Overhaul"},{"id":"6","name":"Mobile App Development"},{"id":"7","name":"Data Lake Implementation"}],"b":{"id":"2"}}}`
require.Equal(t, expected, response.Body)

snapshots := exporter.GetSpans().Snapshots()
require.Len(t, snapshots, 10)

span1 := snapshots[5]
require.Equal(t, "query projects", span1.Name())
require.Contains(t, span1.Attributes(), otel.WgOperationProtocol.String("grpc"))
require.Contains(t, span1.Attributes(), otel.WgOperationType.String("query"))
require.Contains(t, span1.Attributes(), otel.WgOperationName.String("projects"))
require.Len(t, span1.Attributes(), 11)

span2 := snapshots[6]
require.Equal(t, "query projects", span2.Name())
require.Contains(t, span2.Attributes(), otel.WgOperationProtocol.String("grpc"))
require.Contains(t, span2.Attributes(), otel.WgOperationType.String("query"))
require.Contains(t, span2.Attributes(), otel.WgOperationName.String("projects"))
require.Len(t, span2.Attributes(), 11)
})
})
}

func TestRouterPluginRequests(t *testing.T) {
Expand Down
37 changes: 27 additions & 10 deletions router/core/graph_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1145,7 +1145,7 @@ func (s *graphServer) buildGraphMux(
subgraphTippers[subgraph] = subgraphTransport
}

if err := s.setupConnector(ctx, opts.EngineConfig, opts.ConfigSubgraphs); err != nil {
if err := s.setupConnector(ctx, opts.EngineConfig, opts.ConfigSubgraphs, telemetryAttExpressions, tracingAttExpressions); err != nil {
return nil, fmt.Errorf("failed to setup plugin host: %w", err)
}

Expand Down Expand Up @@ -1484,7 +1484,13 @@ func (s *graphServer) buildGraphMux(
return gm, nil
}

func (s *graphServer) setupConnector(ctx context.Context, config *nodev1.EngineConfiguration, configSubgraphs []*nodev1.Subgraph) error {
func (s *graphServer) setupConnector(
ctx context.Context,
config *nodev1.EngineConfiguration,
configSubgraphs []*nodev1.Subgraph,
telemetryAttributeExpressions *attributeExpressions,
tracingAttributeExpressions *attributeExpressions,
) error {
s.connector = grpcconnector.NewConnector()

for _, dsConfig := range config.DatasourceConfigurations {
Expand Down Expand Up @@ -1537,6 +1543,13 @@ func (s *graphServer) setupConnector(ctx context.Context, config *nodev1.EngineC

startupConfig := newGRPCStartupParams(s.traceConfig, s.ipAnonymization)

tracer := s.tracerProvider.Tracer("wundergraph/cosmo/router/engine/grpc", oteltrace.WithInstrumentationVersion("0.0.1"))

getTraceAttributes := CreateGRPCTraceGetter(
telemetryAttributeExpressions,
tracingAttributeExpressions,
)

if imgRef := pluginConfig.GetImageReference(); imgRef != nil {
ref := fmt.Sprintf("%s/%s:%s",
s.plugins.Registry.URL,
Expand All @@ -1545,10 +1558,12 @@ func (s *graphServer) setupConnector(ctx context.Context, config *nodev1.EngineC
)

grpcPlugin, err := grpcpluginoci.NewGRPCOCIPlugin(grpcpluginoci.GRPCPluginConfig{
Logger: s.logger,
ImageRef: ref,
RegistryToken: s.graphApiToken,
StartupConfig: startupConfig,
Logger: s.logger,
ImageRef: ref,
RegistryToken: s.graphApiToken,
StartupConfig: startupConfig,
Tracer: tracer,
GetTraceAttributes: getTraceAttributes,
})
if err != nil {
return fmt.Errorf("failed to create grpc oci plugin for subgraph %s: %w", dsConfig.Id, err)
Expand All @@ -1565,10 +1580,12 @@ func (s *graphServer) setupConnector(ctx context.Context, config *nodev1.EngineC
}

grpcPlugin, err := grpcplugin.NewGRPCPlugin(grpcplugin.GRPCPluginConfig{
Logger: s.logger,
PluginName: pluginConfig.GetName(),
PluginPath: pluginPath,
StartupConfig: startupConfig,
Logger: s.logger,
PluginName: pluginConfig.GetName(),
PluginPath: pluginPath,
StartupConfig: startupConfig,
Tracer: tracer,
GetTraceAttributes: getTraceAttributes,
})
if err != nil {
return fmt.Errorf("failed to create grpc plugin for subgraph %s: %w", dsConfig.Id, err)
Expand Down
1 change: 1 addition & 0 deletions router/core/operation_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type OperationProtocol string

const (
OperationProtocolHTTP = OperationProtocol("http")
OperationProtocolGRPC = OperationProtocol("grpc")
OperationProtocolWS = OperationProtocol("ws")
)

Expand Down
45 changes: 44 additions & 1 deletion router/core/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"fmt"
"github.com/wundergraph/cosmo/router/internal/circuit"
"io"
"net/http"
"net/url"
Expand All @@ -13,6 +12,8 @@ import (
"sync"
"time"

"github.com/wundergraph/cosmo/router/internal/circuit"

"github.com/wundergraph/cosmo/router/internal/expr"
"github.com/wundergraph/cosmo/router/internal/traceclient"

Expand Down Expand Up @@ -462,3 +463,45 @@ func GetSpanName(operationName string, operationType string) string {
}
return fmt.Sprintf("%s %s", operationType, "unnamed")
}

func CreateGRPCTraceGetter(
telemetryAttributeExpressions *attributeExpressions,
tracingAttributeExpressions *attributeExpressions,
) func(context.Context) (string, otrace.SpanStartEventOption) {
return func(ctx context.Context) (string, otrace.SpanStartEventOption) {
reqCtx := getRequestContext(ctx)
if reqCtx == nil {
return "GRPC Plugin Client - Invoke", otrace.WithAttributes()
}

traceAttrs := *reqCtx.telemetry.AcquireAttributes()
defer reqCtx.telemetry.ReleaseAttributes(&traceAttrs)

attrs := make([]attribute.KeyValue, 0, len(reqCtx.telemetry.traceAttrs))

attrs = append(attrs, traceAttrs...)
attrs = append(attrs, reqCtx.telemetry.traceAttrs...)

if telemetryAttributeExpressions != nil {
telemetryValues, err := telemetryAttributeExpressions.expressionsAttributesWithSubgraph(&reqCtx.expressionContext)
if err != nil {
reqCtx.Logger().Warn("failed to resolve grpc plugin expression for telemetry", zap.Error(err))
}
attrs = append(attrs, telemetryValues...)
}

if tracingAttributeExpressions != nil {
tracingValues, err := tracingAttributeExpressions.expressionsAttributesWithSubgraph(&reqCtx.expressionContext)
if err != nil {
reqCtx.Logger().Warn("failed to resolve grpc plugin expression for tracing", zap.Error(err))
}
attrs = append(attrs, tracingValues...)
}

// Override http operation protocol with grpc
attrs = append(attrs, otel.EngineTransportAttribute, otel.WgOperationProtocol.String(OperationProtocolGRPC.String()))

spanName := SpanNameFormatter("", reqCtx.request)
return spanName, otrace.WithAttributes(attrs...)
}
}
27 changes: 23 additions & 4 deletions router/pkg/grpcconnector/grpccommon/grpc_plugin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package grpccommon
import (
"context"
"errors"
"go.opentelemetry.io/otel/trace"
"io"

"sync"
Expand All @@ -27,6 +28,9 @@ type GRPCPluginClient struct {
config GRPCPluginClientConfig

mu sync.RWMutex

tracer trace.Tracer
getTraceAttributes GRPCTraceAttributeGetter
}

type GRPCPluginClientConfig struct {
Expand All @@ -50,7 +54,14 @@ func WithReconnectConfig(reconnectTimeout time.Duration, pingInterval time.Durat

var _ grpc.ClientConnInterface = &GRPCPluginClient{}

func NewGRPCPluginClient(pc *plugin.Client, cc grpc.ClientConnInterface, options ...GRPCPluginClientOption) (*GRPCPluginClient, error) {
type GRPCTraceAttributeGetter func(context.Context) (string, trace.SpanStartEventOption)

type GRPCPluginClientOpts struct {
Tracer trace.Tracer
GetTraceAttributes GRPCTraceAttributeGetter
}

func NewGRPCPluginClient(pc *plugin.Client, cc grpc.ClientConnInterface, clientOpts GRPCPluginClientOpts, options ...GRPCPluginClientOption) (*GRPCPluginClient, error) {
if pc == nil || cc == nil {
return nil, errors.New("plugin client or grpc client conn is nil")
}
Expand All @@ -62,9 +73,11 @@ func NewGRPCPluginClient(pc *plugin.Client, cc grpc.ClientConnInterface, options
}

return &GRPCPluginClient{
pc: pc,
cc: cc,
config: config,
pc: pc,
cc: cc,
config: config,
tracer: clientOpts.Tracer,
getTraceAttributes: clientOpts.GetTraceAttributes,
}, nil
}

Expand Down Expand Up @@ -133,13 +146,19 @@ func (g *GRPCPluginClient) SetClients(pluginClient *plugin.Client, clientConn gr

// Invoke implements grpc.ClientConnInterface.
func (g *GRPCPluginClient) Invoke(ctx context.Context, method string, args any, reply any, opts ...grpc.CallOption) error {
spanName, traceAttributes := g.getTraceAttributes(ctx)
ctx, span := g.tracer.Start(ctx, spanName, traceAttributes)
defer span.End()

if g.IsPluginProcessExited() {
if err := g.waitForPluginToBeActive(); err != nil {
span.RecordError(err)
return err
}
}

if g.isClosed.Load() {
span.RecordError(errors.New("plugin is not active"))
return status.Error(codes.Unavailable, "plugin is not active")
}

Expand Down
Loading
Loading