diff --git a/controlplane/src/core/repositories/analytics/TraceRepository.ts b/controlplane/src/core/repositories/analytics/TraceRepository.ts index 0c1bdfa3ff..92650c20cb 100644 --- a/controlplane/src/core/repositories/analytics/TraceRepository.ts +++ b/controlplane/src/core/repositories/analytics/TraceRepository.ts @@ -47,6 +47,7 @@ export class TraceRepository { SpanAttributes['wg.router.version'] as attrRouterVersion, SpanAttributes['wg.operation.persisted_id'] as attrOperationPersistedId, SpanAttributes['wg.federated_graph.id'] as attrFederatedGraphId, + SpanAttributes['wg.operation.protocol'] as attrOperationProtocol, SpanAttributes['wg.operation.batching.is_batched'] as attrIsBatched, SpanAttributes['wg.operation.batching.operations_count'] as attrBatchedOperationsCount, SpanAttributes['wg.operation.batching.operation_index'] as attrWgBatchedOperationIndex @@ -136,6 +137,7 @@ export class TraceRepository { operationPersistedID: result.attrOperationPersistedId, federatedGraphID: result.attrFederatedGraphId, isBatched: result.attrIsBatched, + operationProtocol: result.attrOperationProtocol, batchedOperationsCount: result.attrBatchedOperationsCount, batchedOperationIndex: result.attrWgBatchedOperationIndex, }, diff --git a/router-tests/router_plugin_test.go b/router-tests/router_plugin_test.go index 77264610e8..fd7eed2dfb 100644 --- a/router-tests/router_plugin_test.go +++ b/router-tests/router_plugin_test.go @@ -2,6 +2,7 @@ package integration import ( "fmt" + "github.com/wundergraph/cosmo/router/pkg/otel" "testing" "time" @@ -95,31 +96,33 @@ func TestRouterPlugin(t *testing.T) { } func TestVerifyTelemetryForRouterPluginRequests(t *testing.T) { - exporter := tracetest.NewInMemoryExporter(t) + t.Parallel() - testenv.Run(t, - &testenv.Config{ - TraceExporter: exporter, - RouterConfigJSONTemplate: testenv.ConfigWithPluginsJSONTemplate, - Plugins: testenv.PluginConfig{ - Enabled: true, - Path: "../router/plugins", - }, - }, - func(t *testing.T, xEnv *testenv.Environment) { - t.Run("query projects simple", func(t *testing.T) { - t.Parallel() + t.Run("query projects simple", func(t *testing.T) { + t.Parallel() + + exporter := tracetest.NewInMemoryExporter(t) + testenv.Run(t, + &testenv.Config{ + TraceExporter: exporter, + RouterConfigJSONTemplate: testenv.ConfigWithPluginsJSONTemplate, + Plugins: testenv.PluginConfig{ + Enabled: true, + Path: "../router/plugins", + }, + }, + func(t *testing.T, xEnv *testenv.Environment) { queryName := "query sample" response := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ - Query: fmt.Sprintf(`%s { projects { id name } }`, queryName), + Query: fmt.Sprintf(`%s { a:projects { id name }, e:projects { id name } }`, queryName), }) - expected := `{"data":{"projects":[{"id":"1","name":"Cloud Migration Overhaul"},{"id":"2","name":"Microservices Revolution"},{"id":"3","name":"AI-Powered Analytics"},{"id":"4","name":"DevOps Transformation"},{"id":"5","name":"Security Overhaul"},{"id":"6","name":"Mobile App Development"},{"id":"7","name":"Data Lake Implementation"}]}}` + expected := `{"data":{"a":[{"id":"1","name":"Cloud Migration Overhaul"},{"id":"2","name":"Microservices Revolution"},{"id":"3","name":"AI-Powered Analytics"},{"id":"4","name":"DevOps Transformation"},{"id":"5","name":"Security Overhaul"},{"id":"6","name":"Mobile App Development"},{"id":"7","name":"Data Lake Implementation"}],"e":[{"id":"1","name":"Cloud Migration Overhaul"},{"id":"2","name":"Microservices Revolution"},{"id":"3","name":"AI-Powered Analytics"},{"id":"4","name":"DevOps Transformation"},{"id":"5","name":"Security Overhaul"},{"id":"6","name":"Mobile App Development"},{"id":"7","name":"Data Lake Implementation"}]}}` require.Equal(t, expected, response.Body) snapshots := exporter.GetSpans().Snapshots() - require.Len(t, snapshots, 8) + require.Len(t, snapshots, 10) queryNameInstances := 0 for _, sn := range snapshots { @@ -128,10 +131,50 @@ func TestVerifyTelemetryForRouterPluginRequests(t *testing.T) { } } - // Normal http spans would have query sample twice - require.Equal(t, queryNameInstances, 1) + require.Equal(t, queryNameInstances, 3) }) - }) + }) + + t.Run("verify each invocation having its span", func(t *testing.T) { + t.Parallel() + + exporter := tracetest.NewInMemoryExporter(t) + + testenv.Run(t, + &testenv.Config{ + TraceExporter: exporter, + RouterConfigJSONTemplate: testenv.ConfigWithPluginsJSONTemplate, + Plugins: testenv.PluginConfig{ + Enabled: true, + Path: "../router/plugins", + }, + }, + func(t *testing.T, xEnv *testenv.Environment) { + response := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query projects { a: projects { id name } b:project(id: 2) { id } }`, + }) + + expected := `{"data":{"a":[{"id":"1","name":"Cloud Migration Overhaul"},{"id":"2","name":"Microservices Revolution"},{"id":"3","name":"AI-Powered Analytics"},{"id":"4","name":"DevOps Transformation"},{"id":"5","name":"Security Overhaul"},{"id":"6","name":"Mobile App Development"},{"id":"7","name":"Data Lake Implementation"}],"b":{"id":"2"}}}` + require.Equal(t, expected, response.Body) + + snapshots := exporter.GetSpans().Snapshots() + require.Len(t, snapshots, 10) + + span1 := snapshots[5] + require.Equal(t, "query projects", span1.Name()) + require.Contains(t, span1.Attributes(), otel.WgOperationProtocol.String("grpc")) + require.Contains(t, span1.Attributes(), otel.WgOperationType.String("query")) + require.Contains(t, span1.Attributes(), otel.WgOperationName.String("projects")) + require.Len(t, span1.Attributes(), 11) + + span2 := snapshots[6] + require.Equal(t, "query projects", span2.Name()) + require.Contains(t, span2.Attributes(), otel.WgOperationProtocol.String("grpc")) + require.Contains(t, span2.Attributes(), otel.WgOperationType.String("query")) + require.Contains(t, span2.Attributes(), otel.WgOperationName.String("projects")) + require.Len(t, span2.Attributes(), 11) + }) + }) } func TestRouterPluginRequests(t *testing.T) { diff --git a/router/core/graph_server.go b/router/core/graph_server.go index f79b9f0f92..da70696ca8 100644 --- a/router/core/graph_server.go +++ b/router/core/graph_server.go @@ -1145,7 +1145,7 @@ func (s *graphServer) buildGraphMux( subgraphTippers[subgraph] = subgraphTransport } - if err := s.setupConnector(ctx, opts.EngineConfig, opts.ConfigSubgraphs); err != nil { + if err := s.setupConnector(ctx, opts.EngineConfig, opts.ConfigSubgraphs, telemetryAttExpressions, tracingAttExpressions); err != nil { return nil, fmt.Errorf("failed to setup plugin host: %w", err) } @@ -1484,7 +1484,13 @@ func (s *graphServer) buildGraphMux( return gm, nil } -func (s *graphServer) setupConnector(ctx context.Context, config *nodev1.EngineConfiguration, configSubgraphs []*nodev1.Subgraph) error { +func (s *graphServer) setupConnector( + ctx context.Context, + config *nodev1.EngineConfiguration, + configSubgraphs []*nodev1.Subgraph, + telemetryAttributeExpressions *attributeExpressions, + tracingAttributeExpressions *attributeExpressions, +) error { s.connector = grpcconnector.NewConnector() for _, dsConfig := range config.DatasourceConfigurations { @@ -1537,6 +1543,13 @@ func (s *graphServer) setupConnector(ctx context.Context, config *nodev1.EngineC startupConfig := newGRPCStartupParams(s.traceConfig, s.ipAnonymization) + tracer := s.tracerProvider.Tracer("wundergraph/cosmo/router/engine/grpc", oteltrace.WithInstrumentationVersion("0.0.1")) + + getTraceAttributes := CreateGRPCTraceGetter( + telemetryAttributeExpressions, + tracingAttributeExpressions, + ) + if imgRef := pluginConfig.GetImageReference(); imgRef != nil { ref := fmt.Sprintf("%s/%s:%s", s.plugins.Registry.URL, @@ -1545,10 +1558,12 @@ func (s *graphServer) setupConnector(ctx context.Context, config *nodev1.EngineC ) grpcPlugin, err := grpcpluginoci.NewGRPCOCIPlugin(grpcpluginoci.GRPCPluginConfig{ - Logger: s.logger, - ImageRef: ref, - RegistryToken: s.graphApiToken, - StartupConfig: startupConfig, + Logger: s.logger, + ImageRef: ref, + RegistryToken: s.graphApiToken, + StartupConfig: startupConfig, + Tracer: tracer, + GetTraceAttributes: getTraceAttributes, }) if err != nil { return fmt.Errorf("failed to create grpc oci plugin for subgraph %s: %w", dsConfig.Id, err) @@ -1565,10 +1580,12 @@ func (s *graphServer) setupConnector(ctx context.Context, config *nodev1.EngineC } grpcPlugin, err := grpcplugin.NewGRPCPlugin(grpcplugin.GRPCPluginConfig{ - Logger: s.logger, - PluginName: pluginConfig.GetName(), - PluginPath: pluginPath, - StartupConfig: startupConfig, + Logger: s.logger, + PluginName: pluginConfig.GetName(), + PluginPath: pluginPath, + StartupConfig: startupConfig, + Tracer: tracer, + GetTraceAttributes: getTraceAttributes, }) if err != nil { return fmt.Errorf("failed to create grpc plugin for subgraph %s: %w", dsConfig.Id, err) diff --git a/router/core/operation_metrics.go b/router/core/operation_metrics.go index 9c2c459bc4..0f4b658873 100644 --- a/router/core/operation_metrics.go +++ b/router/core/operation_metrics.go @@ -19,6 +19,7 @@ type OperationProtocol string const ( OperationProtocolHTTP = OperationProtocol("http") + OperationProtocolGRPC = OperationProtocol("grpc") OperationProtocolWS = OperationProtocol("ws") ) diff --git a/router/core/transport.go b/router/core/transport.go index 9e3feffa8b..96c838689d 100644 --- a/router/core/transport.go +++ b/router/core/transport.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "fmt" - "github.com/wundergraph/cosmo/router/internal/circuit" "io" "net/http" "net/url" @@ -13,6 +12,8 @@ import ( "sync" "time" + "github.com/wundergraph/cosmo/router/internal/circuit" + "github.com/wundergraph/cosmo/router/internal/expr" "github.com/wundergraph/cosmo/router/internal/traceclient" @@ -462,3 +463,45 @@ func GetSpanName(operationName string, operationType string) string { } return fmt.Sprintf("%s %s", operationType, "unnamed") } + +func CreateGRPCTraceGetter( + telemetryAttributeExpressions *attributeExpressions, + tracingAttributeExpressions *attributeExpressions, +) func(context.Context) (string, otrace.SpanStartEventOption) { + return func(ctx context.Context) (string, otrace.SpanStartEventOption) { + reqCtx := getRequestContext(ctx) + if reqCtx == nil { + return "GRPC Plugin Client - Invoke", otrace.WithAttributes() + } + + traceAttrs := *reqCtx.telemetry.AcquireAttributes() + defer reqCtx.telemetry.ReleaseAttributes(&traceAttrs) + + attrs := make([]attribute.KeyValue, 0, len(reqCtx.telemetry.traceAttrs)) + + attrs = append(attrs, traceAttrs...) + attrs = append(attrs, reqCtx.telemetry.traceAttrs...) + + if telemetryAttributeExpressions != nil { + telemetryValues, err := telemetryAttributeExpressions.expressionsAttributesWithSubgraph(&reqCtx.expressionContext) + if err != nil { + reqCtx.Logger().Warn("failed to resolve grpc plugin expression for telemetry", zap.Error(err)) + } + attrs = append(attrs, telemetryValues...) + } + + if tracingAttributeExpressions != nil { + tracingValues, err := tracingAttributeExpressions.expressionsAttributesWithSubgraph(&reqCtx.expressionContext) + if err != nil { + reqCtx.Logger().Warn("failed to resolve grpc plugin expression for tracing", zap.Error(err)) + } + attrs = append(attrs, tracingValues...) + } + + // Override http operation protocol with grpc + attrs = append(attrs, otel.EngineTransportAttribute, otel.WgOperationProtocol.String(OperationProtocolGRPC.String())) + + spanName := SpanNameFormatter("", reqCtx.request) + return spanName, otrace.WithAttributes(attrs...) + } +} diff --git a/router/pkg/grpcconnector/grpccommon/grpc_plugin_client.go b/router/pkg/grpcconnector/grpccommon/grpc_plugin_client.go index 27a97c4e54..9e1a4ea43f 100644 --- a/router/pkg/grpcconnector/grpccommon/grpc_plugin_client.go +++ b/router/pkg/grpcconnector/grpccommon/grpc_plugin_client.go @@ -3,6 +3,7 @@ package grpccommon import ( "context" "errors" + "go.opentelemetry.io/otel/trace" "io" "sync" @@ -27,6 +28,9 @@ type GRPCPluginClient struct { config GRPCPluginClientConfig mu sync.RWMutex + + tracer trace.Tracer + getTraceAttributes GRPCTraceAttributeGetter } type GRPCPluginClientConfig struct { @@ -50,7 +54,14 @@ func WithReconnectConfig(reconnectTimeout time.Duration, pingInterval time.Durat var _ grpc.ClientConnInterface = &GRPCPluginClient{} -func NewGRPCPluginClient(pc *plugin.Client, cc grpc.ClientConnInterface, options ...GRPCPluginClientOption) (*GRPCPluginClient, error) { +type GRPCTraceAttributeGetter func(context.Context) (string, trace.SpanStartEventOption) + +type GRPCPluginClientOpts struct { + Tracer trace.Tracer + GetTraceAttributes GRPCTraceAttributeGetter +} + +func NewGRPCPluginClient(pc *plugin.Client, cc grpc.ClientConnInterface, clientOpts GRPCPluginClientOpts, options ...GRPCPluginClientOption) (*GRPCPluginClient, error) { if pc == nil || cc == nil { return nil, errors.New("plugin client or grpc client conn is nil") } @@ -62,9 +73,11 @@ func NewGRPCPluginClient(pc *plugin.Client, cc grpc.ClientConnInterface, options } return &GRPCPluginClient{ - pc: pc, - cc: cc, - config: config, + pc: pc, + cc: cc, + config: config, + tracer: clientOpts.Tracer, + getTraceAttributes: clientOpts.GetTraceAttributes, }, nil } @@ -133,13 +146,19 @@ func (g *GRPCPluginClient) SetClients(pluginClient *plugin.Client, clientConn gr // Invoke implements grpc.ClientConnInterface. func (g *GRPCPluginClient) Invoke(ctx context.Context, method string, args any, reply any, opts ...grpc.CallOption) error { + spanName, traceAttributes := g.getTraceAttributes(ctx) + ctx, span := g.tracer.Start(ctx, spanName, traceAttributes) + defer span.End() + if g.IsPluginProcessExited() { if err := g.waitForPluginToBeActive(); err != nil { + span.RecordError(err) return err } } if g.isClosed.Load() { + span.RecordError(errors.New("plugin is not active")) return status.Error(codes.Unavailable, "plugin is not active") } diff --git a/router/pkg/grpcconnector/grpcplugin/grpc_plugin.go b/router/pkg/grpcconnector/grpcplugin/grpc_plugin.go index 214e6ddb31..269d395035 100644 --- a/router/pkg/grpcconnector/grpcplugin/grpc_plugin.go +++ b/router/pkg/grpcconnector/grpcplugin/grpc_plugin.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "go.opentelemetry.io/otel/trace" "os" "os/exec" "sync" @@ -18,10 +19,12 @@ import ( ) type GRPCPluginConfig struct { - Logger *zap.Logger - PluginPath string - PluginName string - StartupConfig grpccommon.GRPCStartupParams + Logger *zap.Logger + PluginPath string + PluginName string + StartupConfig grpccommon.GRPCStartupParams + Tracer trace.Tracer + GetTraceAttributes grpccommon.GRPCTraceAttributeGetter } type GRPCPlugin struct { @@ -36,6 +39,9 @@ type GRPCPlugin struct { client *grpccommon.GRPCPluginClient startupConfig grpccommon.GRPCStartupParams + tracer trace.Tracer + + getTraceAttributes grpccommon.GRPCTraceAttributeGetter } var _ grpcconnector.ClientProvider = (*GRPCPlugin)(nil) @@ -64,6 +70,10 @@ func NewGRPCPlugin(config GRPCPluginConfig) (*GRPCPlugin, error) { pluginName: config.PluginName, startupConfig: config.StartupConfig, + + tracer: config.Tracer, + + getTraceAttributes: config.GetTraceAttributes, }, nil } @@ -97,7 +107,10 @@ func (p *GRPCPlugin) fork() error { } pluginCmd := exec.Command(filePath) - grpccommon.PrepareCommand(pluginCmd, p.startupConfig) + err = grpccommon.PrepareCommand(pluginCmd, p.startupConfig) + if err != nil { + return fmt.Errorf("failed to prepare plugin command: %w", err) + } pluginClient := plugin.NewClient(&plugin.ClientConfig{ Cmd: pluginCmd, @@ -130,7 +143,10 @@ func (p *GRPCPlugin) fork() error { if p.client == nil { // first time we start the plugin, we need to create a new client - p.client, err = grpccommon.NewGRPCPluginClient(pluginClient, grpcClient) + p.client, err = grpccommon.NewGRPCPluginClient(pluginClient, grpcClient, grpccommon.GRPCPluginClientOpts{ + Tracer: p.tracer, + GetTraceAttributes: p.getTraceAttributes, + }) if err != nil { return fmt.Errorf("failed to create grpc plugin client: %w", err) } diff --git a/router/pkg/grpcconnector/grpcpluginoci/grpc_oci_plugin.go b/router/pkg/grpcconnector/grpcpluginoci/grpc_oci_plugin.go index ddc687c56d..3fb5fcc57e 100644 --- a/router/pkg/grpcconnector/grpcpluginoci/grpc_oci_plugin.go +++ b/router/pkg/grpcconnector/grpcpluginoci/grpc_oci_plugin.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "go.opentelemetry.io/otel/trace" "os" "runtime" "sync" @@ -20,10 +21,12 @@ import ( ) type GRPCPluginConfig struct { - Logger *zap.Logger - ImageRef string - RegistryToken string - StartupConfig grpccommon.GRPCStartupParams + Logger *zap.Logger + ImageRef string + RegistryToken string + StartupConfig grpccommon.GRPCStartupParams + Tracer trace.Tracer + GetTraceAttributes grpccommon.GRPCTraceAttributeGetter } type GRPCPlugin struct { @@ -45,6 +48,9 @@ type GRPCPlugin struct { client *grpccommon.GRPCPluginClient startupConfig grpccommon.GRPCStartupParams + + tracer trace.Tracer + getTraceAttributes grpccommon.GRPCTraceAttributeGetter } func NewGRPCOCIPlugin(config GRPCPluginConfig) (*GRPCPlugin, error) { @@ -71,6 +77,9 @@ func NewGRPCOCIPlugin(config GRPCPluginConfig) (*GRPCPlugin, error) { registryUsername: "router", registryPassword: config.RegistryToken, + + tracer: config.Tracer, + getTraceAttributes: config.GetTraceAttributes, }, nil } @@ -136,7 +145,10 @@ func (p *GRPCPlugin) startPluginProcess() error { if p.client == nil { // first time we start the plugin, we need to create a new client - p.client, err = grpccommon.NewGRPCPluginClient(pluginClient, grpcClient) + p.client, err = grpccommon.NewGRPCPluginClient(pluginClient, grpcClient, grpccommon.GRPCPluginClientOpts{ + Tracer: p.tracer, + GetTraceAttributes: p.getTraceAttributes, + }) if err != nil { return fmt.Errorf("failed to create grpc plugin client: %w", err) } diff --git a/studio/src/components/analytics/trace.tsx b/studio/src/components/analytics/trace.tsx index 2e3d33da69..e268ba387e 100644 --- a/studio/src/components/analytics/trace.tsx +++ b/studio/src/components/analytics/trace.tsx @@ -1,3 +1,4 @@ +import { docsBaseURL } from "@/lib/constants"; import { nsToTime } from "@/lib/insights-helpers"; import { Service, @@ -12,9 +13,10 @@ import { MinusIcon, PlusIcon, } from "@heroicons/react/24/outline"; -import clsx from "clsx"; import { Span } from "@wundergraph/cosmo-connect/dist/platform/v1/platform_pb"; +import clsx from "clsx"; import { useCallback, useEffect, useState } from "react"; +import { BsFillLightningChargeFill } from "react-icons/bs"; import { useMovable } from "react-move-hook"; import { Button } from "../ui/button"; import { Card } from "../ui/card"; @@ -24,7 +26,6 @@ import { TooltipProvider, TooltipTrigger, } from "../ui/tooltip"; -import { docsBaseURL } from "@/lib/constants"; interface SpanNode extends Span { children?: SpanNode[]; @@ -207,7 +208,13 @@ function Node({ )}
-
{service?.name}
{" "} +
+ {span.attributes?.operationProtocol === "grpc" && ( + + )} +
{service?.name}
{" "} +
+
{mapSpanKind[span.spanKind]}
@@ -224,8 +231,13 @@ function Node({
-
+
{span.spanName}
+ {span.attributes?.operationProtocol === "grpc" && ( +
+ Cosmo Connect{" "} +
+ )}