Skip to content
74 changes: 59 additions & 15 deletions router-tests/router_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package integration

import (
"fmt"
"github.com/wundergraph/cosmo/router/pkg/otel"
"testing"
"time"

Expand Down Expand Up @@ -95,21 +96,23 @@ func TestRouterPlugin(t *testing.T) {
}

func TestVerifyTelemetryForRouterPluginRequests(t *testing.T) {
exporter := tracetest.NewInMemoryExporter(t)
t.Parallel()

testenv.Run(t,
&testenv.Config{
TraceExporter: exporter,
RouterConfigJSONTemplate: testenv.ConfigWithPluginsJSONTemplate,
Plugins: testenv.PluginConfig{
Enabled: true,
Path: "../router/plugins",
},
},
func(t *testing.T, xEnv *testenv.Environment) {
t.Run("query projects simple", func(t *testing.T) {
t.Parallel()
t.Run("query projects simple", func(t *testing.T) {
t.Parallel()

exporter := tracetest.NewInMemoryExporter(t)

testenv.Run(t,
&testenv.Config{
TraceExporter: exporter,
RouterConfigJSONTemplate: testenv.ConfigWithPluginsJSONTemplate,
Plugins: testenv.PluginConfig{
Enabled: true,
Path: "../router/plugins",
},
},
func(t *testing.T, xEnv *testenv.Environment) {
queryName := "query sample"
response := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: fmt.Sprintf(`%s { projects { id name } }`, queryName),
Expand All @@ -119,7 +122,7 @@ func TestVerifyTelemetryForRouterPluginRequests(t *testing.T) {
require.Equal(t, expected, response.Body)

snapshots := exporter.GetSpans().Snapshots()
require.Len(t, snapshots, 8)
require.Len(t, snapshots, 9)

queryNameInstances := 0
for _, sn := range snapshots {
Expand All @@ -131,7 +134,48 @@ func TestVerifyTelemetryForRouterPluginRequests(t *testing.T) {
// Normal http spans would have query sample twice
require.Equal(t, queryNameInstances, 1)
})
})
})

t.Run("verify each invocation having its span", func(t *testing.T) {
t.Parallel()

exporter := tracetest.NewInMemoryExporter(t)

testenv.Run(t,
&testenv.Config{
TraceExporter: exporter,
RouterConfigJSONTemplate: testenv.ConfigWithPluginsJSONTemplate,
Plugins: testenv.PluginConfig{
Enabled: true,
Path: "../router/plugins",
},
},
func(t *testing.T, xEnv *testenv.Environment) {
response := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `query projects { a: projects { id name } b:project(id: 2) { id } }`,
})

expected := `{"data":{"a":[{"id":"1","name":"Cloud Migration Overhaul"},{"id":"2","name":"Microservices Revolution"},{"id":"3","name":"AI-Powered Analytics"},{"id":"4","name":"DevOps Transformation"},{"id":"5","name":"Security Overhaul"},{"id":"6","name":"Mobile App Development"},{"id":"7","name":"Data Lake Implementation"}],"b":{"id":"2"}}}`
require.Equal(t, expected, response.Body)

snapshots := exporter.GetSpans().Snapshots()
require.Len(t, snapshots, 10)

span1 := snapshots[5]
require.Equal(t, "GRPC Plugin Client - Invoke", span1.Name())
require.Contains(t, span1.Attributes(), otel.WgOperationProtocol.String("grpc"))
require.Contains(t, span1.Attributes(), otel.WgOperationType.String("query"))
require.Contains(t, span1.Attributes(), otel.WgOperationName.String("projects"))
require.Len(t, span1.Attributes(), 10)

span2 := snapshots[6]
require.Equal(t, "GRPC Plugin Client - Invoke", span2.Name())
require.Contains(t, span2.Attributes(), otel.WgOperationProtocol.String("grpc"))
require.Contains(t, span1.Attributes(), otel.WgOperationType.String("query"))
require.Contains(t, span1.Attributes(), otel.WgOperationName.String("projects"))
require.Len(t, span2.Attributes(), 10)
})
Comment thread
SkArchon marked this conversation as resolved.
})
}

func TestRouterPluginRequests(t *testing.T) {
Expand Down
63 changes: 53 additions & 10 deletions router/core/graph_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1145,7 +1145,7 @@ func (s *graphServer) buildGraphMux(
subgraphTippers[subgraph] = subgraphTransport
}

if err := s.setupConnector(ctx, opts.EngineConfig, opts.ConfigSubgraphs); err != nil {
if err := s.setupConnector(ctx, opts.EngineConfig, opts.ConfigSubgraphs, telemetryAttExpressions, tracingAttExpressions); err != nil {
return nil, fmt.Errorf("failed to setup plugin host: %w", err)
}

Expand Down Expand Up @@ -1484,7 +1484,13 @@ func (s *graphServer) buildGraphMux(
return gm, nil
}

func (s *graphServer) setupConnector(ctx context.Context, config *nodev1.EngineConfiguration, configSubgraphs []*nodev1.Subgraph) error {
func (s *graphServer) setupConnector(
ctx context.Context,
config *nodev1.EngineConfiguration,
configSubgraphs []*nodev1.Subgraph,
telemetryAttributeExpressions *attributeExpressions,
tracingAttributeExpressions *attributeExpressions,
) error {
s.connector = grpcconnector.NewConnector()

for _, dsConfig := range config.DatasourceConfigurations {
Expand Down Expand Up @@ -1537,6 +1543,39 @@ func (s *graphServer) setupConnector(ctx context.Context, config *nodev1.EngineC

startupConfig := newGRPCStartupParams(s.traceConfig, s.ipAnonymization)

tracer := s.tracerProvider.Tracer("wundergraph/cosmo/router/engine/grpc", oteltrace.WithInstrumentationVersion("0.0.1"))

getTracingAttributes := func(ctx context.Context) []attribute.KeyValue {
Comment thread
StarpTech marked this conversation as resolved.
Outdated
reqCtx := getRequestContext(ctx)
if reqCtx == nil {
return []attribute.KeyValue{}
}

traceAttrs := *reqCtx.telemetry.AcquireAttributes()
defer reqCtx.telemetry.ReleaseAttributes(&traceAttrs)
traceAttrs = append(traceAttrs, reqCtx.telemetry.traceAttrs...)

if telemetryAttributeExpressions != nil {
telemetryValues, err := telemetryAttributeExpressions.expressionsAttributesWithSubgraph(&reqCtx.expressionContext)
if err != nil {
reqCtx.Logger().Warn("failed to resolve grpc plugin expression for telemetry", zap.Error(err))
}
traceAttrs = append(traceAttrs, telemetryValues...)
}

if tracingAttributeExpressions != nil {
tracingValues, err := tracingAttributeExpressions.expressionsAttributesWithSubgraph(&reqCtx.expressionContext)
if err != nil {
reqCtx.Logger().Warn("failed to resolve grpc plugin expression for tracing", zap.Error(err))
}
traceAttrs = append(traceAttrs, tracingValues...)
}

// Override http operation protocol with grpc
traceAttrs = append(traceAttrs, otel.WgOperationProtocol.String(OperationProtocolGRPC.String()))
return traceAttrs
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

if imgRef := pluginConfig.GetImageReference(); imgRef != nil {
ref := fmt.Sprintf("%s/%s:%s",
s.plugins.Registry.URL,
Expand All @@ -1545,10 +1584,12 @@ func (s *graphServer) setupConnector(ctx context.Context, config *nodev1.EngineC
)

grpcPlugin, err := grpcpluginoci.NewGRPCOCIPlugin(grpcpluginoci.GRPCPluginConfig{
Logger: s.logger,
ImageRef: ref,
RegistryToken: s.graphApiToken,
StartupConfig: startupConfig,
Logger: s.logger,
ImageRef: ref,
RegistryToken: s.graphApiToken,
StartupConfig: startupConfig,
Tracer: tracer,
GetTraceAttributes: getTracingAttributes,
})
if err != nil {
return fmt.Errorf("failed to create grpc oci plugin for subgraph %s: %w", dsConfig.Id, err)
Expand All @@ -1565,10 +1606,12 @@ func (s *graphServer) setupConnector(ctx context.Context, config *nodev1.EngineC
}

grpcPlugin, err := grpcplugin.NewGRPCPlugin(grpcplugin.GRPCPluginConfig{
Logger: s.logger,
PluginName: pluginConfig.GetName(),
PluginPath: pluginPath,
StartupConfig: startupConfig,
Logger: s.logger,
PluginName: pluginConfig.GetName(),
PluginPath: pluginPath,
StartupConfig: startupConfig,
Tracer: tracer,
GetTraceAttributes: getTracingAttributes,
})
if err != nil {
return fmt.Errorf("failed to create grpc plugin for subgraph %s: %w", dsConfig.Id, err)
Expand Down
1 change: 1 addition & 0 deletions router/core/operation_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type OperationProtocol string

const (
OperationProtocolHTTP = OperationProtocol("http")
OperationProtocolGRPC = OperationProtocol("grpc")
OperationProtocolWS = OperationProtocol("ws")
)

Expand Down
26 changes: 22 additions & 4 deletions router/pkg/grpcconnector/grpccommon/grpc_plugin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package grpccommon
import (
"context"
"errors"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"io"

"sync"
Expand All @@ -27,6 +29,9 @@ type GRPCPluginClient struct {
config GRPCPluginClientConfig

mu sync.RWMutex

tracer trace.Tracer
getTraceAttributes TraceAttributeGetter
}

type GRPCPluginClientConfig struct {
Expand All @@ -50,7 +55,13 @@ func WithReconnectConfig(reconnectTimeout time.Duration, pingInterval time.Durat

var _ grpc.ClientConnInterface = &GRPCPluginClient{}

func NewGRPCPluginClient(pc *plugin.Client, cc grpc.ClientConnInterface, options ...GRPCPluginClientOption) (*GRPCPluginClient, error) {
type TraceAttributeGetter func(context.Context) []attribute.KeyValue
type GRPCPluginClientOpts struct {
Tracer trace.Tracer
GetTraceAttributes TraceAttributeGetter
}

func NewGRPCPluginClient(pc *plugin.Client, cc grpc.ClientConnInterface, clientOpts GRPCPluginClientOpts, options ...GRPCPluginClientOption) (*GRPCPluginClient, error) {
if pc == nil || cc == nil {
return nil, errors.New("plugin client or grpc client conn is nil")
}
Expand All @@ -62,9 +73,11 @@ func NewGRPCPluginClient(pc *plugin.Client, cc grpc.ClientConnInterface, options
}

return &GRPCPluginClient{
pc: pc,
cc: cc,
config: config,
pc: pc,
cc: cc,
config: config,
tracer: clientOpts.Tracer,
getTraceAttributes: clientOpts.GetTraceAttributes,
}, nil
}

Expand Down Expand Up @@ -133,6 +146,11 @@ func (g *GRPCPluginClient) SetClients(pluginClient *plugin.Client, clientConn gr

// Invoke implements grpc.ClientConnInterface.
func (g *GRPCPluginClient) Invoke(ctx context.Context, method string, args any, reply any, opts ...grpc.CallOption) error {
ctx, span := g.tracer.Start(ctx,
"GRPC Plugin Client - Invoke",
Comment thread
StarpTech marked this conversation as resolved.
Outdated
trace.WithAttributes(g.getTraceAttributes(ctx)...))
defer span.End()

Comment thread
SkArchon marked this conversation as resolved.
Outdated
if g.IsPluginProcessExited() {
if err := g.waitForPluginToBeActive(); err != nil {
return err
Expand Down
28 changes: 22 additions & 6 deletions router/pkg/grpcconnector/grpcplugin/grpc_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"go.opentelemetry.io/otel/trace"
"os"
"os/exec"
"sync"
Expand All @@ -18,10 +19,12 @@ import (
)

type GRPCPluginConfig struct {
Logger *zap.Logger
PluginPath string
PluginName string
StartupConfig grpccommon.GRPCStartupParams
Logger *zap.Logger
PluginPath string
PluginName string
StartupConfig grpccommon.GRPCStartupParams
Tracer trace.Tracer
GetTraceAttributes grpccommon.TraceAttributeGetter
}

type GRPCPlugin struct {
Expand All @@ -36,6 +39,9 @@ type GRPCPlugin struct {

client *grpccommon.GRPCPluginClient
startupConfig grpccommon.GRPCStartupParams
tracer trace.Tracer

getTraceAttributes grpccommon.TraceAttributeGetter
}

var _ grpcconnector.ClientProvider = (*GRPCPlugin)(nil)
Expand Down Expand Up @@ -64,6 +70,10 @@ func NewGRPCPlugin(config GRPCPluginConfig) (*GRPCPlugin, error) {
pluginName: config.PluginName,

startupConfig: config.StartupConfig,

tracer: config.Tracer,

getTraceAttributes: config.GetTraceAttributes,
}, nil
}

Expand Down Expand Up @@ -97,7 +107,10 @@ func (p *GRPCPlugin) fork() error {
}

pluginCmd := exec.Command(filePath)
grpccommon.PrepareCommand(pluginCmd, p.startupConfig)
err = grpccommon.PrepareCommand(pluginCmd, p.startupConfig)
Comment thread
StarpTech marked this conversation as resolved.
if err != nil {
return fmt.Errorf("failed to prepare plugin command: %w", err)
}

pluginClient := plugin.NewClient(&plugin.ClientConfig{
Cmd: pluginCmd,
Expand Down Expand Up @@ -130,7 +143,10 @@ func (p *GRPCPlugin) fork() error {

if p.client == nil {
// first time we start the plugin, we need to create a new client
p.client, err = grpccommon.NewGRPCPluginClient(pluginClient, grpcClient)
p.client, err = grpccommon.NewGRPCPluginClient(pluginClient, grpcClient, grpccommon.GRPCPluginClientOpts{
Tracer: p.tracer,
GetTraceAttributes: p.getTraceAttributes,
})
if err != nil {
return fmt.Errorf("failed to create grpc plugin client: %w", err)
}
Expand Down
22 changes: 17 additions & 5 deletions router/pkg/grpcconnector/grpcpluginoci/grpc_oci_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"go.opentelemetry.io/otel/trace"
"os"
"runtime"
"sync"
Expand All @@ -20,10 +21,12 @@ import (
)

type GRPCPluginConfig struct {
Logger *zap.Logger
ImageRef string
RegistryToken string
StartupConfig grpccommon.GRPCStartupParams
Logger *zap.Logger
ImageRef string
RegistryToken string
StartupConfig grpccommon.GRPCStartupParams
Tracer trace.Tracer
GetTraceAttributes grpccommon.TraceAttributeGetter
}

type GRPCPlugin struct {
Expand All @@ -45,6 +48,9 @@ type GRPCPlugin struct {
client *grpccommon.GRPCPluginClient

startupConfig grpccommon.GRPCStartupParams

tracer trace.Tracer
getTraceAttributes grpccommon.TraceAttributeGetter
}

func NewGRPCOCIPlugin(config GRPCPluginConfig) (*GRPCPlugin, error) {
Expand All @@ -71,6 +77,9 @@ func NewGRPCOCIPlugin(config GRPCPluginConfig) (*GRPCPlugin, error) {

registryUsername: "router",
registryPassword: config.RegistryToken,

tracer: config.Tracer,
getTraceAttributes: config.GetTraceAttributes,
}, nil
}

Expand Down Expand Up @@ -136,7 +145,10 @@ func (p *GRPCPlugin) startPluginProcess() error {

if p.client == nil {
// first time we start the plugin, we need to create a new client
p.client, err = grpccommon.NewGRPCPluginClient(pluginClient, grpcClient)
p.client, err = grpccommon.NewGRPCPluginClient(pluginClient, grpcClient, grpccommon.GRPCPluginClientOpts{
Tracer: p.tracer,
GetTraceAttributes: p.getTraceAttributes,
})
if err != nil {
return fmt.Errorf("failed to create grpc plugin client: %w", err)
}
Expand Down
Loading