diff --git a/router-plugin/config/plugin_config.go b/router-plugin/config/plugin_config.go new file mode 100644 index 0000000000..2e2eb70c21 --- /dev/null +++ b/router-plugin/config/plugin_config.go @@ -0,0 +1,13 @@ +package config + +import sdktrace "go.opentelemetry.io/otel/sdk/trace" + +type RouterPluginConfig struct { + ServiceName string + ServiceVersion string + TracingEnabled bool + TracingErrorHandler func(err error) + + // This should only be used for testing purposes + MemoryExporter sdktrace.SpanExporter +} diff --git a/router-plugin/config/settings.go b/router-plugin/config/settings.go new file mode 100644 index 0000000000..52047de741 --- /dev/null +++ b/router-plugin/config/settings.go @@ -0,0 +1,59 @@ +package config + +import "time" + +type ( + ExporterProtocolType string + ExporterTemporality string + Propagator string +) + +type IPAnonymizationMethod string + +const ( + Hash IPAnonymizationMethod = "hash" + Redact IPAnonymizationMethod = "redact" +) + +const ( + ExporterOLTPHTTP ExporterProtocolType = "http" + ExporterOLTPGRPC ExporterProtocolType = "grpc" +) + +const ( + PropagatorTraceContext Propagator = "tracecontext" + PropagatorB3 Propagator = "b3" + PropagatorJaeger Propagator = "jaeger" + PropagatorBaggage Propagator = "baggage" + PropagatorDatadog Propagator = "datadog" +) + +// StartupConfig contains the configuration passed from the router. +type StartupConfig struct { + Telemetry *Telemetry `json:"telemetry,omitempty"` + IPAnonymization *IPAnonymization `json:"ip_anonymization,omitempty"` +} + +type Telemetry struct { + Tracing *Tracing `json:"tracing,omitempty"` +} + +type Tracing struct { + Exporters []Exporter `json:"exporters,omitempty"` + Propagators []Propagator `json:"propagators,omitempty"` + Sampler float64 `json:"sampler"` +} + +type Exporter struct { + Endpoint string `json:"endpoint"` + Exporter ExporterProtocolType `json:"exporter"` + BatchTimeout time.Duration `json:"batch_timeout"` + ExportTimeout time.Duration `json:"export_timeout"` + Headers map[string]string `json:"headers"` + HTTPPath string `json:"http_path"` +} + +type IPAnonymization struct { + Enabled bool `json:"enabled"` + Method IPAnonymizationMethod `json:"method"` +} diff --git a/router-plugin/go.mod b/router-plugin/go.mod index 6a707e185a..99506041ea 100644 --- a/router-plugin/go.mod +++ b/router-plugin/go.mod @@ -5,21 +5,38 @@ go 1.23.0 require ( github.com/hashicorp/go-plugin v1.6.3 github.com/hashicorp/go-retryablehttp v0.7.7 + github.com/tonglil/opentelemetry-go-datadog-propagator v0.1.3 + go.opentelemetry.io/contrib/propagators/b3 v1.23.0 + go.opentelemetry.io/contrib/propagators/jaeger v1.23.0 + go.opentelemetry.io/otel v1.28.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.23.1 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.23.1 + go.opentelemetry.io/otel/sdk v1.28.0 + go.opentelemetry.io/otel/trace v1.28.0 google.golang.org/grpc v1.68.1 ) require ( - github.com/fatih/color v1.16.0 // indirect + github.com/cenkalti/backoff/v4 v4.2.1 // indirect + github.com/fatih/color v1.18.0 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/protobuf v1.5.4 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-hclog v1.6.3 // indirect github.com/hashicorp/yamux v0.1.1 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/oklog/run v1.0.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.23.1 // indirect + go.opentelemetry.io/otel/metric v1.28.0 // indirect + go.opentelemetry.io/proto/otlp v1.1.0 // indirect golang.org/x/net v0.38.0 // indirect golang.org/x/sys v0.31.0 // indirect golang.org/x/text v0.23.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250106144421-5f5ef82da422 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a // indirect google.golang.org/protobuf v1.36.5 // indirect ) diff --git a/router-plugin/go.sum b/router-plugin/go.sum index cd4f71cc1a..692719d2a2 100644 --- a/router-plugin/go.sum +++ b/router-plugin/go.sum @@ -1,15 +1,26 @@ github.com/bufbuild/protocompile v0.4.0 h1:LbFKd2XowZvQ/kajzguUp2DC9UEIQhIq77fZZlaQsNA= github.com/bufbuild/protocompile v0.4.0/go.mod h1:3v93+mbWn/v3xzN+31nwkJfrEpAUwp+BagBSZWx+TP8= +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= -github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= +github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= +github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 h1:Wqo399gCIufwto+VfwCSvsnfGpF/w5E9CNxSwbpD6No= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0/go.mod h1:qmOFXW2epJhM0qSnUUYpldc7gVz2KMQwJ/QYCDIa7XU= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k= @@ -37,8 +48,32 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= -github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY= -github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/tonglil/opentelemetry-go-datadog-propagator v0.1.3 h1:Ozy1UnlID19jL6+vixEcA1t4NMf8hp01uDAY1nwGl8U= +github.com/tonglil/opentelemetry-go-datadog-propagator v0.1.3/go.mod h1:Ijp5eaviP2mk8CJM+0EDYFKNULr+kicPSB9FOvxOhW0= +go.opentelemetry.io/contrib/propagators/b3 v1.23.0 h1:aaIGWc5JdfRGpCafLRxMJbD65MfTa206AwSKkvGS0Hg= +go.opentelemetry.io/contrib/propagators/b3 v1.23.0/go.mod h1:Gyz7V7XghvwTq+mIhLFlTgcc03UDroOg8vezs4NLhwU= +go.opentelemetry.io/contrib/propagators/jaeger v1.23.0 h1:KFxfTCTkH1usVFzDaWzbmNdFX7ybUTCtkLsUTww0nG4= +go.opentelemetry.io/contrib/propagators/jaeger v1.23.0/go.mod h1:xU+81opGquQICJGzwscLXAQLnIPWI+q7Zu4AQSrgXf8= +go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= +go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.23.1 h1:o8iWeVFa1BcLtVEV0LzrCxV2/55tB3xLxADr6Kyoey4= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.23.1/go.mod h1:SEVfdK4IoBnbT2FXNM/k8yC08MrfbhWk3U4ljM8B3HE= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.23.1 h1:p3A5+f5l9e/kuEBwLOrnpkIDHQFlHmbiVxMURWRK6gQ= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.23.1/go.mod h1:OClrnXUjBqQbInvjJFjYSnMxBSCXBF8r3b34WqjiIrQ= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.23.1 h1:cfuy3bXmLJS7M1RZmAL6SuhGtKUp2KEsrm00OlAXkq4= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.23.1/go.mod h1:22jr92C6KwlwItJmQzfixzQM3oyyuYLCfHiMY+rpsPU= +go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q= +go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= +go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE= +go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg= +go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= +go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= +go.opentelemetry.io/proto/otlp v1.1.0 h1:2Di21piLrCqJ3U3eXGCTPHE9R8Nh+0uglSnOyxikMeI= +go.opentelemetry.io/proto/otlp v1.1.0/go.mod h1:GpBHCBWiqvVLDqmHZsoMM3C5ySeKTC7ej/RNTae6MdY= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -52,6 +87,8 @@ golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= +google.golang.org/genproto/googleapis/api v0.0.0-20250106144421-5f5ef82da422 h1:GVIKPyP/kLIyVOgOnTwFOrvQaQUzOzGMCxgFUOEmm24= +google.golang.org/genproto/googleapis/api v0.0.0-20250106144421-5f5ef82da422/go.mod h1:b6h1vNKhxaSoEI+5jc3PJUCustfli/mRab7295pY7rw= google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a h1:51aaUVRocpvUOSQKM6Q7VuoaktNIaMCLuhZB6DKksq4= google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a/go.mod h1:uRxBH1mhmO8PGhU89cMcHaXKZqO+OfakD8QQO0oYwlQ= google.golang.org/grpc v1.68.1 h1:oI5oTa11+ng8r8XMMN7jAOmWfPZWbYpCFaMUTACxkM0= diff --git a/router-plugin/httpclient/client.go b/router-plugin/httpclient/client.go index e6be435201..33d1dac124 100644 --- a/router-plugin/httpclient/client.go +++ b/router-plugin/httpclient/client.go @@ -10,6 +10,13 @@ import ( "time" "github.com/hashicorp/go-retryablehttp" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/propagation" + semconv "go.opentelemetry.io/otel/semconv/v1.20.0" + "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/trace/noop" ) // Client is a wrapper around http.Client with additional functionality @@ -20,6 +27,7 @@ type Client struct { timeout time.Duration middlewares []Middleware retryOptions RetryOptions + tracer trace.Tracer } // ClientOption is a function that configures a Client @@ -43,6 +51,7 @@ func New(options ...ClientOption) *Client { timeout: 30 * time.Second, middlewares: []Middleware{}, retryOptions: DefaultRetryOptions(), + tracer: noop.NewTracerProvider().Tracer("noop-tracer"), } for _, option := range options { @@ -54,6 +63,13 @@ func New(options ...ClientOption) *Client { return c } +// WithTracing enables tracing using a RoundTripper approach +func WithTracing() ClientOption { + return func(c *Client) { + c.tracer = otel.Tracer("router-plugin-httpclient") + } +} + // WithBaseURL sets the base URL for the client func WithBaseURL(url string) ClientOption { return func(c *Client) { @@ -132,6 +148,26 @@ func (c *Client) Request(ctx context.Context, method, path string, body interfac url = c.baseURL + path } + // Create a span for the HTTP request + var span trace.Span + ctx, span = c.tracer.Start(ctx, fmt.Sprintf("http.request - %s %s", method, url), + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + semconv.HTTPMethodKey.String(method), + semconv.HTTPURLKey.String(url), + ), + ) + defer span.End() + + retry, err := c.startRequest(ctx, method, url, reqBody, body, options) + if err != nil { + span.SetStatus(codes.Error, err.Error()) + span.RecordError(err) + } + return retry, err +} + +func (c *Client) startRequest(ctx context.Context, method string, url string, reqBody io.Reader, body interface{}, options []RequestOption) (*Response, error) { // Use the retryable client if enabled if c.retryOptions.Enabled { return c.doRequestWithRetry(ctx, method, url, reqBody, body != nil, options...) @@ -148,6 +184,8 @@ func (c *Client) doRequest(ctx context.Context, method, url string, body io.Read return nil, fmt.Errorf("error creating request: %w", err) } + otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(req.Header)) + // Add default headers for key, value := range c.headers { req.Header.Set(key, value) @@ -228,6 +266,8 @@ func (c *Client) doRequestWithRetry(ctx context.Context, method, url string, bod // Set context retryReq = retryReq.WithContext(ctx) + otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(retryReq.Header)) + // Add default headers for key, value := range c.headers { retryReq.Header.Set(key, value) diff --git a/router-plugin/plugin.go b/router-plugin/plugin.go index dcb8d0295e..47702eb459 100644 --- a/router-plugin/plugin.go +++ b/router-plugin/plugin.go @@ -2,7 +2,11 @@ package routerplugin import ( "context" + "encoding/json" "errors" + "github.com/wundergraph/cosmo/router-plugin/config" + "github.com/wundergraph/cosmo/router-plugin/setup" + "os" "github.com/hashicorp/go-plugin" "google.golang.org/grpc" @@ -15,9 +19,19 @@ var RouterPluginHandshakeConfig = plugin.HandshakeConfig{ MagicCookieValue: "GRPC_DATASOURCE_PLUGIN", } +const startupConfigKey = "startup_config" + // PluginMapName is the name of the plugin in the plugin map. var PluginMapName = "grpc_datasource" +type RouterPlugin struct { + plugin.Plugin + registrationFunc func(*grpc.Server) + + serveConfig *plugin.ServeConfig + config config.RouterPluginConfig +} + // GRPCPlugin is the interface that is implemented to serve/connect to // a plugin over gRPC. func (p *RouterPlugin) GRPCServer(_ *plugin.GRPCBroker, server *grpc.Server) error { @@ -31,18 +45,37 @@ func (p *RouterPlugin) GRPCClient(_ context.Context, _ *plugin.GRPCBroker, cc *g return cc, nil } -type RouterPlugin struct { - plugin.Plugin - registrationFunc func(*grpc.Server) +type PluginOption func(*RouterPlugin) - serveConfig *plugin.ServeConfig +func WithTestConfig(testConfig *plugin.ServeTestConfig) PluginOption { + return func(c *RouterPlugin) { + c.serveConfig.Test = testConfig + } } -type PluginOption func(*plugin.ServeConfig) +// WithTracing enables tracing for the plugin. +// This includes creating a tracing interceptor +func WithTracing() PluginOption { + return func(c *RouterPlugin) { + c.config.TracingEnabled = true + } +} -func WithTestConfig(testConfig *plugin.ServeTestConfig) PluginOption { - return func(c *plugin.ServeConfig) { - c.Test = testConfig +func WithServiceName(serviceName string) PluginOption { + return func(c *RouterPlugin) { + c.config.ServiceName = serviceName + } +} + +func WithTracingErrorHandler(errHandler func(err error)) PluginOption { + return func(c *RouterPlugin) { + c.config.TracingErrorHandler = errHandler + } +} + +func WithServiceVersion(serviceVersion string) PluginOption { + return func(c *RouterPlugin) { + c.config.ServiceVersion = serviceVersion } } @@ -55,7 +88,7 @@ func NewRouterPlugin(registrationfunc func(*grpc.Server), opts ...PluginOption) registrationFunc: registrationfunc, } - serveConfig := &plugin.ServeConfig{ + routerPlugin.serveConfig = &plugin.ServeConfig{ HandshakeConfig: RouterPluginHandshakeConfig, GRPCServer: plugin.DefaultGRPCServer, Plugins: map[string]plugin.Plugin{ @@ -64,10 +97,27 @@ func NewRouterPlugin(registrationfunc func(*grpc.Server), opts ...PluginOption) } for _, opt := range opts { - opt(serveConfig) + opt(routerPlugin) + } + + var startupConfig config.StartupConfig + if exporterString := os.Getenv(startupConfigKey); exporterString != "" { + err := json.Unmarshal([]byte(exporterString), &startupConfig) + if err != nil { + return nil, err + } } - routerPlugin.serveConfig = serveConfig + grpcServerFunc, err := setup.GrpcServer(setup.GrpcServerInitOpts{ + StartupConfig: startupConfig, + PluginConfig: routerPlugin.config, + }) + if err != nil { + return nil, err + } + + routerPlugin.serveConfig.GRPCServer = grpcServerFunc + return routerPlugin, nil } diff --git a/router-plugin/setup/init.go b/router-plugin/setup/init.go new file mode 100644 index 0000000000..e64db1030f --- /dev/null +++ b/router-plugin/setup/init.go @@ -0,0 +1,60 @@ +package setup + +import ( + "fmt" + "github.com/wundergraph/cosmo/router-plugin/config" + "github.com/wundergraph/cosmo/router-plugin/tracing" + "google.golang.org/grpc" +) + +type GrpcServerInitFunc func(serverOpts []grpc.ServerOption) *grpc.Server + +const ( + baseServiceName = "cosmo-router-plugin" + baseServiceVersion = "1.0.0" +) + +type GrpcServerInitOpts struct { + StartupConfig config.StartupConfig + PluginConfig config.RouterPluginConfig +} + +func GrpcServer(opts GrpcServerInitOpts) (GrpcServerInitFunc, error) { + grpcOpts := make([]grpc.ServerOption, 0) + + isTracingEnabled := opts.PluginConfig.TracingEnabled && + opts.StartupConfig.Telemetry != nil && + opts.StartupConfig.Telemetry.Tracing != nil + + if isTracingEnabled { + serviceName := baseServiceName + if opts.PluginConfig.ServiceName != "" { + serviceName = opts.PluginConfig.ServiceName + } + serviceVersion := baseServiceVersion + if opts.PluginConfig.ServiceVersion != "" { + serviceVersion = opts.PluginConfig.ServiceVersion + } + + tracingInterceptor, err := tracing.CreateTracingInterceptor(tracing.TracingOptions{ + ServiceName: serviceName, + ServiceVersion: serviceVersion, + ErrorHandlerFunc: opts.PluginConfig.TracingErrorHandler, + TracingConfig: opts.StartupConfig.Telemetry.Tracing, + IPAnonymization: opts.StartupConfig.IPAnonymization, + MemoryExporter: opts.PluginConfig.MemoryExporter, + }) + if err != nil { + return nil, fmt.Errorf("failed to create tracing interceptor: %w", err) + } + interceptor := grpc.UnaryInterceptor(tracingInterceptor) + grpcOpts = append(grpcOpts, interceptor) + } + + grpcServerFunc := func(serverOpts []grpc.ServerOption) *grpc.Server { + allOpts := append([]grpc.ServerOption{}, serverOpts...) + allOpts = append(allOpts, grpcOpts...) + return grpc.NewServer(allOpts...) + } + return grpcServerFunc, nil +} diff --git a/router-plugin/tracing/interceptor.go b/router-plugin/tracing/interceptor.go new file mode 100644 index 0000000000..da6febe29b --- /dev/null +++ b/router-plugin/tracing/interceptor.go @@ -0,0 +1,63 @@ +package tracing + +import ( + "context" + "errors" + "fmt" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +const ( + traceparentHeader = "traceparent" + tracestateHeader = "tracestate" + baggageHeader = "baggage" +) + +func CreateTracingInterceptor(tracingOpts TracingOptions) (func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error), error) { + if tracingOpts.TracingConfig == nil { + return nil, errors.New("nil tracing config not supported") + } + + // TODO: We currently don't have a shutdown logic in the plugin which could call tp.Shutdown + tp, err := initTracer(context.Background(), tracingOpts) + if err != nil { + return nil, err + } + + tracer := tp.Tracer(fmt.Sprintf("wundergraph/cosmo/router-plugin/%s", tracingOpts.ServiceName)) + + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + if md, ok := metadata.FromIncomingContext(ctx); ok { + carrier := propagation.MapCarrier{} + if values := md.Get(traceparentHeader); len(values) > 0 { + carrier[traceparentHeader] = values[0] + } + if values := md.Get(tracestateHeader); len(values) > 0 { + carrier[tracestateHeader] = values[0] + } + if values := md.Get(baggageHeader); len(values) > 0 { + carrier[baggageHeader] = values[0] + } + propagator := otel.GetTextMapPropagator() + ctx = propagator.Extract(ctx, carrier) + + var span trace.Span + ctx, span = tracer.Start(ctx, "Router Plugin - "+info.FullMethod) + defer span.End() + + result, err := handler(ctx, req) + if err != nil { + span.SetStatus(codes.Error, err.Error()) + span.RecordError(err) + } + return result, err + } + + return handler(ctx, req) + }, nil +} diff --git a/router-plugin/tracing/propogator.go b/router-plugin/tracing/propogator.go new file mode 100644 index 0000000000..819dfd380a --- /dev/null +++ b/router-plugin/tracing/propogator.go @@ -0,0 +1,31 @@ +package tracing + +import ( + "fmt" + datadog "github.com/tonglil/opentelemetry-go-datadog-propagator" + "github.com/wundergraph/cosmo/router-plugin/config" + "go.opentelemetry.io/contrib/propagators/b3" + "go.opentelemetry.io/contrib/propagators/jaeger" + "go.opentelemetry.io/otel/propagation" +) + +func buildPropagators(propagators []config.Propagator) ([]propagation.TextMapPropagator, error) { + var allPropagators []propagation.TextMapPropagator + for _, p := range propagators { + switch p { + case config.PropagatorTraceContext: + allPropagators = append(allPropagators, propagation.TraceContext{}) + case config.PropagatorB3: + allPropagators = append(allPropagators, b3.New(b3.WithInjectEncoding(b3.B3MultipleHeader|b3.B3SingleHeader))) + case config.PropagatorJaeger: + allPropagators = append(allPropagators, jaeger.Jaeger{}) + case config.PropagatorDatadog: + allPropagators = append(allPropagators, datadog.Propagator{}) + case config.PropagatorBaggage: + allPropagators = append(allPropagators, propagation.Baggage{}) + default: + return nil, fmt.Errorf("unknown trace propagator: %s", p) + } + } + return allPropagators, nil +} diff --git a/router-plugin/tracing/redact.go b/router-plugin/tracing/redact.go new file mode 100644 index 0000000000..e1a1226eec --- /dev/null +++ b/router-plugin/tracing/redact.go @@ -0,0 +1,75 @@ +package tracing + +import ( + "context" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/trace" + semconv17 "go.opentelemetry.io/otel/semconv/v1.17.0" +) + +// SensitiveAttributes that should be redacted by the OTEL http instrumentation package. +// Take attention to the right version of the semconv package. +var SensitiveAttributes = []attribute.Key{ + // Both can contain external IP addresses + semconv17.HTTPClientIPKey, + semconv17.NetSockPeerAddrKey, +} + +type RedactFunc func(key attribute.KeyValue) string + +// Attributes returns an OpenTelemetry SDK TracerProviderOption. It registers +// an OpenTelemetry SpanProcessor that redacts attributes of new spans matching +// the passed keys. +func Attributes(keys []attribute.Key, redactFunc RedactFunc) trace.TracerProviderOption { + r := make(map[attribute.Key]struct{}, len(keys)) + for _, k := range keys { + r[k] = struct{}{} + } + censor := NewAttributeCensor(r, redactFunc) + return trace.WithSpanProcessor(censor) +} + +// AttributeCensor is an OpenTelemetry SpanProcessor that censors attributes of +// new spans. +type AttributeCensor struct { + // args is a slice allocated on creation that is reused when calling + // SetAttributes in OnStart. + args []attribute.KeyValue + redactFunc RedactFunc + replacements map[attribute.Key]struct{} +} + +// NewAttributeCensor returns an AttributeCensor that uses the provided mapping +// of replacement values for a set of keys to redact matching attributes. +// Attributes are matched based on the equality of keys. +func NewAttributeCensor(replacements map[attribute.Key]struct{}, redactFunc RedactFunc) AttributeCensor { + a := AttributeCensor{ + // Allocate a reusable slice to pass to SetAttributes. + args: make([]attribute.KeyValue, 0, len(replacements)), + redactFunc: redactFunc, + replacements: replacements, + } + return a +} + +// OnStart does nothing. +func (c AttributeCensor) OnStart(_ context.Context, _ trace.ReadWriteSpan) { +} + +// OnEnd censors the attributes of s matching the Replacements keys of c. +func (c AttributeCensor) OnEnd(s trace.ReadOnlySpan) { + // We can't change the attribute slice of the span snapshot in OnEnd, but + // we can change the attribute value in the underlying array. + attributes := s.Attributes() + for i := range attributes { + if _, ok := c.replacements[attributes[i].Key]; ok { + attributes[i].Value = attribute.StringValue(c.redactFunc(attributes[i])) + } + } +} + +// Shutdown does nothing. +func (AttributeCensor) Shutdown(context.Context) error { return nil } + +// ForceFlush does nothing. +func (AttributeCensor) ForceFlush(context.Context) error { return nil } diff --git a/router-plugin/tracing/tracer.go b/router-plugin/tracing/tracer.go new file mode 100644 index 0000000000..4df263c51b --- /dev/null +++ b/router-plugin/tracing/tracer.go @@ -0,0 +1,215 @@ +package tracing + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "github.com/wundergraph/cosmo/router-plugin/config" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.20.0" + "net/url" + "time" +) + +type TracingOptions struct { + ServiceName string + ServiceVersion string + ErrorHandlerFunc func(err error) + TracingConfig *config.Tracing + IPAnonymization *config.IPAnonymization + MemoryExporter sdktrace.SpanExporter +} + +const ( + DefaultBatchTimeout = 10 * time.Second + DefaultExportTimeout = 30 * time.Second + + WgIsPlugin = attribute.Key("wg.is_plugin") +) + +func initTracer( + ctx context.Context, + tracingConfig TracingOptions, +) (*sdktrace.TracerProvider, error) { + // Return no-op provider + if len(tracingConfig.TracingConfig.Exporters) == 0 { + provider := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.NeverSample())) + otel.SetTracerProvider(provider) + return provider, nil + } + + r, err := resource.New(ctx, + resource.WithAttributes(semconv.ServiceNameKey.String(tracingConfig.ServiceName)), + resource.WithAttributes(semconv.ServiceVersionKey.String(tracingConfig.ServiceVersion)), + resource.WithAttributes(WgIsPlugin.Bool(true)), + resource.WithProcessPID(), + resource.WithOSType(), + resource.WithTelemetrySDK(), + resource.WithHost(), + ) + if err != nil { + return nil, err + } + + opts := []sdktrace.TracerProviderOption{ + // Set the sampling rate based on the parent span to 100% + sdktrace.WithRawSpanLimits(sdktrace.SpanLimits{ + // Avoid misuse of attributes. + AttributeValueLengthLimit: 3 * 1024, // 3KB + // Based on the default values from the OpenTelemetry specification. + AttributeCountLimit: sdktrace.DefaultAttributeCountLimit, + EventCountLimit: sdktrace.DefaultEventCountLimit, + LinkCountLimit: sdktrace.DefaultLinkCountLimit, + AttributePerEventCountLimit: sdktrace.DefaultEventCountLimit, + AttributePerLinkCountLimit: sdktrace.DefaultAttributePerLinkCountLimit, + }), + // Record information about this application in a Resource. + sdktrace.WithResource(r), + } + + opts = append(opts, + sdktrace.WithSampler( + sdktrace.ParentBased( + sdktrace.TraceIDRatioBased(tracingConfig.TracingConfig.Sampler), + ), + ), + ) + + if tracingConfig.IPAnonymization != nil && tracingConfig.IPAnonymization.Enabled { + var rFunc RedactFunc + switch tracingConfig.IPAnonymization.Method { + case config.Hash: + rFunc = func(key attribute.KeyValue) string { + h := sha256.New() + h.Write([]byte(key.Value.AsString())) + return hex.EncodeToString(h.Sum(nil)) + } + case config.Redact: + rFunc = func(key attribute.KeyValue) string { + return "[REDACTED]" + } + } + // In case hash or redact was not used + if rFunc != nil { + opts = append(opts, Attributes(SensitiveAttributes, rFunc)) + } + } + + if tracingConfig.MemoryExporter != nil { + opts = append(opts, sdktrace.WithSyncer(tracingConfig.MemoryExporter)) + } else { + for _, exp := range tracingConfig.TracingConfig.Exporters { + // Default to OLTP HTTP + if exp.Exporter == "" { + exp.Exporter = config.ExporterOLTPHTTP + } + + exporter, err := createExporter(exp) + if err != nil { + return nil, err + } + + batchTimeout := exp.BatchTimeout + if batchTimeout == 0 { + batchTimeout = DefaultBatchTimeout + } + + exportTimeout := exp.ExportTimeout + if exportTimeout == 0 { + exportTimeout = DefaultExportTimeout + } + + // Always be sure to batch in production. + opts = append(opts, + sdktrace.WithBatcher(exporter, + sdktrace.WithBatchTimeout(batchTimeout), + sdktrace.WithExportTimeout(exportTimeout), + sdktrace.WithMaxExportBatchSize(512), + sdktrace.WithMaxQueueSize(2048), + ), + ) + } + } + + tp := sdktrace.NewTracerProvider(opts...) + + otel.SetTracerProvider(tp) + + propagators, err := buildPropagators(tracingConfig.TracingConfig.Propagators) + if err != nil { + return nil, err + } + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagators...)) + + if tracingConfig.ErrorHandlerFunc != nil { + otel.SetErrorHandler(otel.ErrorHandlerFunc(tracingConfig.ErrorHandlerFunc)) + } + + return tp, nil +} + +func createExporter(exp config.Exporter) (sdktrace.SpanExporter, error) { + u, err := url.Parse(exp.Endpoint) + if err != nil { + return nil, fmt.Errorf("invalid OpenTelemetry endpoint: %w", err) + } + + var exporter sdktrace.SpanExporter + // Just support OTLP and gRPC for now. Jaeger has native OTLP support. + switch exp.Exporter { + case config.ExporterOLTPHTTP: + opts := []otlptracehttp.Option{ + // Includes host and port + otlptracehttp.WithEndpoint(u.Host), + otlptracehttp.WithCompression(otlptracehttp.GzipCompression), + } + + if u.Scheme != "https" { + opts = append(opts, otlptracehttp.WithInsecure()) + } + + if len(exp.Headers) > 0 { + opts = append(opts, otlptracehttp.WithHeaders(exp.Headers)) + } + if len(exp.HTTPPath) > 0 { + opts = append(opts, otlptracehttp.WithURLPath(exp.HTTPPath)) + } + exporter, err = otlptracehttp.New( + context.Background(), + opts..., + ) + case config.ExporterOLTPGRPC: + opts := []otlptracegrpc.Option{ + // Includes host and port + otlptracegrpc.WithEndpoint(u.Host), + otlptracegrpc.WithCompressor("gzip"), + } + + if u.Scheme != "https" { + opts = append(opts, otlptracegrpc.WithInsecure()) + } + + if len(exp.Headers) > 0 { + opts = append(opts, otlptracegrpc.WithHeaders(exp.Headers)) + } + exporter, err = otlptracegrpc.New( + context.Background(), + opts..., + ) + default: + return nil, fmt.Errorf("unknown exporter type: %s", exp.Exporter) + } + + if err != nil { + return nil, err + } + + return exporter, nil +} diff --git a/router-tests/go.mod b/router-tests/go.mod index 42d008a28f..6b381afc8b 100644 --- a/router-tests/go.mod +++ b/router-tests/go.mod @@ -26,6 +26,7 @@ require ( github.com/wundergraph/cosmo/demo v0.0.0-20250729121718-5f0a0b8b1804 github.com/wundergraph/cosmo/demo/pkg/subgraphs/projects v0.0.0-20250715110703-10f2e5f9c79e github.com/wundergraph/cosmo/router v0.0.0-20250729121718-5f0a0b8b1804 + github.com/wundergraph/cosmo/router-plugin v0.0.0-20250616075713-f2b99c96cec4 github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.215 go.opentelemetry.io/otel v1.28.0 go.opentelemetry.io/otel/sdk v1.28.0 @@ -175,6 +176,7 @@ replace ( github.com/wundergraph/cosmo/demo => ../demo github.com/wundergraph/cosmo/demo/pkg/subgraphs/projects => ../demo/pkg/subgraphs/projects github.com/wundergraph/cosmo/router => ../router + github.com/wundergraph/cosmo/router-plugin => ../router-plugin // github.com/wundergraph/graphql-go-tools/v2 => ../../graphql-go-tools/v2 ) diff --git a/router-tests/plugintest/hello/Makefile b/router-tests/plugintest/hello/Makefile new file mode 100644 index 0000000000..973542b488 --- /dev/null +++ b/router-tests/plugintest/hello/Makefile @@ -0,0 +1,5 @@ +make: generate + +generate: + npx wgc@latest router plugin build . --generate-only + diff --git a/router-tests/plugintest/hello/generated/mapping.json b/router-tests/plugintest/hello/generated/mapping.json new file mode 100644 index 0000000000..fcabeb9eea --- /dev/null +++ b/router-tests/plugintest/hello/generated/mapping.json @@ -0,0 +1,42 @@ +{ + "version": 1, + "service": "HelloService", + "operationMappings": [ + { + "type": "OPERATION_TYPE_QUERY", + "original": "run", + "mapped": "QueryRun", + "request": "QueryRunRequest", + "response": "QueryRunResponse" + } + ], + "entityMappings": [], + "typeFieldMappings": [ + { + "type": "Query", + "fieldMappings": [ + { + "original": "run", + "mapped": "run", + "argumentMappings": [ + { + "original": "input", + "mapped": "input" + } + ] + } + ] + }, + { + "type": "Result", + "fieldMappings": [ + { + "original": "responseString", + "mapped": "response_string", + "argumentMappings": [] + } + ] + } + ], + "enumMappings": [] +} \ No newline at end of file diff --git a/router-tests/plugintest/hello/generated/service.pb.go b/router-tests/plugintest/hello/generated/service.pb.go new file mode 100644 index 0000000000..4593f99bff --- /dev/null +++ b/router-tests/plugintest/hello/generated/service.pb.go @@ -0,0 +1,281 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.34.2 +// protoc v5.29.3 +// source: generated/service.proto + +package plugin + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// Request message for run operation. +type QueryRunRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Input string `protobuf:"bytes,1,opt,name=input,proto3" json:"input,omitempty"` +} + +func (x *QueryRunRequest) Reset() { + *x = QueryRunRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_generated_service_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *QueryRunRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*QueryRunRequest) ProtoMessage() {} + +func (x *QueryRunRequest) ProtoReflect() protoreflect.Message { + mi := &file_generated_service_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use QueryRunRequest.ProtoReflect.Descriptor instead. +func (*QueryRunRequest) Descriptor() ([]byte, []int) { + return file_generated_service_proto_rawDescGZIP(), []int{0} +} + +func (x *QueryRunRequest) GetInput() string { + if x != nil { + return x.Input + } + return "" +} + +// Response message for run operation. +type QueryRunResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Run *Result `protobuf:"bytes,1,opt,name=run,proto3" json:"run,omitempty"` +} + +func (x *QueryRunResponse) Reset() { + *x = QueryRunResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_generated_service_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *QueryRunResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*QueryRunResponse) ProtoMessage() {} + +func (x *QueryRunResponse) ProtoReflect() protoreflect.Message { + mi := &file_generated_service_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use QueryRunResponse.ProtoReflect.Descriptor instead. +func (*QueryRunResponse) Descriptor() ([]byte, []int) { + return file_generated_service_proto_rawDescGZIP(), []int{1} +} + +func (x *QueryRunResponse) GetRun() *Result { + if x != nil { + return x.Run + } + return nil +} + +type Result struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ResponseString string `protobuf:"bytes,1,opt,name=response_string,json=responseString,proto3" json:"response_string,omitempty"` +} + +func (x *Result) Reset() { + *x = Result{} + if protoimpl.UnsafeEnabled { + mi := &file_generated_service_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Result) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Result) ProtoMessage() {} + +func (x *Result) ProtoReflect() protoreflect.Message { + mi := &file_generated_service_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Result.ProtoReflect.Descriptor instead. +func (*Result) Descriptor() ([]byte, []int) { + return file_generated_service_proto_rawDescGZIP(), []int{2} +} + +func (x *Result) GetResponseString() string { + if x != nil { + return x.ResponseString + } + return "" +} + +var File_generated_service_proto protoreflect.FileDescriptor + +var file_generated_service_proto_rawDesc = []byte{ + 0x0a, 0x17, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x2f, 0x73, 0x65, 0x72, 0x76, + 0x69, 0x63, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, + 0x63, 0x65, 0x22, 0x27, 0x0a, 0x0f, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x75, 0x6e, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x69, 0x6e, 0x70, 0x75, 0x74, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x69, 0x6e, 0x70, 0x75, 0x74, 0x22, 0x35, 0x0a, 0x10, 0x51, + 0x75, 0x65, 0x72, 0x79, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x21, 0x0a, 0x03, 0x72, 0x75, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x73, + 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x03, 0x72, + 0x75, 0x6e, 0x22, 0x31, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x27, 0x0a, 0x0f, + 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x5f, 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x53, + 0x74, 0x72, 0x69, 0x6e, 0x67, 0x32, 0x51, 0x0a, 0x0c, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x53, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x41, 0x0a, 0x08, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x75, + 0x6e, 0x12, 0x18, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x51, 0x75, 0x65, 0x72, + 0x79, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x73, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x75, 0x6e, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x25, 0x5a, 0x23, 0x67, 0x69, 0x74, 0x68, + 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x77, 0x75, 0x6e, 0x64, 0x65, 0x72, 0x67, 0x72, 0x61, + 0x70, 0x68, 0x2f, 0x63, 0x6f, 0x73, 0x6d, 0x6f, 0x2f, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_generated_service_proto_rawDescOnce sync.Once + file_generated_service_proto_rawDescData = file_generated_service_proto_rawDesc +) + +func file_generated_service_proto_rawDescGZIP() []byte { + file_generated_service_proto_rawDescOnce.Do(func() { + file_generated_service_proto_rawDescData = protoimpl.X.CompressGZIP(file_generated_service_proto_rawDescData) + }) + return file_generated_service_proto_rawDescData +} + +var file_generated_service_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_generated_service_proto_goTypes = []any{ + (*QueryRunRequest)(nil), // 0: service.QueryRunRequest + (*QueryRunResponse)(nil), // 1: service.QueryRunResponse + (*Result)(nil), // 2: service.Result +} +var file_generated_service_proto_depIdxs = []int32{ + 2, // 0: service.QueryRunResponse.run:type_name -> service.Result + 0, // 1: service.HelloService.QueryRun:input_type -> service.QueryRunRequest + 1, // 2: service.HelloService.QueryRun:output_type -> service.QueryRunResponse + 2, // [2:3] is the sub-list for method output_type + 1, // [1:2] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_generated_service_proto_init() } +func file_generated_service_proto_init() { + if File_generated_service_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_generated_service_proto_msgTypes[0].Exporter = func(v any, i int) any { + switch v := v.(*QueryRunRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_generated_service_proto_msgTypes[1].Exporter = func(v any, i int) any { + switch v := v.(*QueryRunResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_generated_service_proto_msgTypes[2].Exporter = func(v any, i int) any { + switch v := v.(*Result); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_generated_service_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_generated_service_proto_goTypes, + DependencyIndexes: file_generated_service_proto_depIdxs, + MessageInfos: file_generated_service_proto_msgTypes, + }.Build() + File_generated_service_proto = out.File + file_generated_service_proto_rawDesc = nil + file_generated_service_proto_goTypes = nil + file_generated_service_proto_depIdxs = nil +} diff --git a/router-tests/plugintest/hello/generated/service.proto b/router-tests/plugintest/hello/generated/service.proto new file mode 100644 index 0000000000..012b6077c5 --- /dev/null +++ b/router-tests/plugintest/hello/generated/service.proto @@ -0,0 +1,22 @@ +syntax = "proto3"; +package service; + +option go_package = "github.com/wundergraph/cosmo/plugin"; + +// Service definition for HelloService +service HelloService { + rpc QueryRun(QueryRunRequest) returns (QueryRunResponse) {} +} + +// Request message for run operation. +message QueryRunRequest { + string input = 1; +} +// Response message for run operation. +message QueryRunResponse { + Result run = 1; +} + +message Result { + string response_string = 1; +} \ No newline at end of file diff --git a/router-tests/plugintest/hello/generated/service.proto.lock.json b/router-tests/plugintest/hello/generated/service.proto.lock.json new file mode 100644 index 0000000000..e667eb4b7b --- /dev/null +++ b/router-tests/plugintest/hello/generated/service.proto.lock.json @@ -0,0 +1,71 @@ +{ + "version": "1.0.0", + "messages": { + "Query": { + "fields": { + "run": 3 + }, + "reservedNumbers": [ + 2, + 1 + ] + }, + "QueryHello": { + "fields": { + "name": 1 + } + }, + "QueryHelloRequest": { + "fields": { + "name": 1 + } + }, + "QueryHelloResponse": { + "fields": { + "hello": 1 + } + }, + "World": { + "fields": { + "id": 1, + "name": 2 + } + }, + "QueryHello2": { + "fields": { + "name": 1 + } + }, + "QueryHello2Request": { + "fields": { + "name": 1 + } + }, + "QueryHello2Response": { + "fields": { + "hello_2": 1 + } + }, + "QueryRun": { + "fields": { + "input": 1 + } + }, + "QueryRunRequest": { + "fields": { + "input": 1 + } + }, + "QueryRunResponse": { + "fields": { + "run": 1 + } + }, + "Result": { + "fields": { + "responseString": 1 + } + } + }, + "enums": {} +} \ No newline at end of file diff --git a/router-tests/plugintest/hello/generated/service_grpc.pb.go b/router-tests/plugintest/hello/generated/service_grpc.pb.go new file mode 100644 index 0000000000..32e7aa1290 --- /dev/null +++ b/router-tests/plugintest/hello/generated/service_grpc.pb.go @@ -0,0 +1,125 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.5.1 +// - protoc v5.29.3 +// source: generated/service.proto + +package plugin + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + HelloService_QueryRun_FullMethodName = "/service.HelloService/QueryRun" +) + +// HelloServiceClient is the client API for HelloService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +// +// Service definition for HelloService +type HelloServiceClient interface { + QueryRun(ctx context.Context, in *QueryRunRequest, opts ...grpc.CallOption) (*QueryRunResponse, error) +} + +type helloServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewHelloServiceClient(cc grpc.ClientConnInterface) HelloServiceClient { + return &helloServiceClient{cc} +} + +func (c *helloServiceClient) QueryRun(ctx context.Context, in *QueryRunRequest, opts ...grpc.CallOption) (*QueryRunResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(QueryRunResponse) + err := c.cc.Invoke(ctx, HelloService_QueryRun_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +// HelloServiceServer is the server API for HelloService service. +// All implementations must embed UnimplementedHelloServiceServer +// for forward compatibility. +// +// Service definition for HelloService +type HelloServiceServer interface { + QueryRun(context.Context, *QueryRunRequest) (*QueryRunResponse, error) + mustEmbedUnimplementedHelloServiceServer() +} + +// UnimplementedHelloServiceServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedHelloServiceServer struct{} + +func (UnimplementedHelloServiceServer) QueryRun(context.Context, *QueryRunRequest) (*QueryRunResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method QueryRun not implemented") +} +func (UnimplementedHelloServiceServer) mustEmbedUnimplementedHelloServiceServer() {} +func (UnimplementedHelloServiceServer) testEmbeddedByValue() {} + +// UnsafeHelloServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to HelloServiceServer will +// result in compilation errors. +type UnsafeHelloServiceServer interface { + mustEmbedUnimplementedHelloServiceServer() +} + +func RegisterHelloServiceServer(s grpc.ServiceRegistrar, srv HelloServiceServer) { + // If the following call pancis, it indicates UnimplementedHelloServiceServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&HelloService_ServiceDesc, srv) +} + +func _HelloService_QueryRun_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(QueryRunRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(HelloServiceServer).QueryRun(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: HelloService_QueryRun_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(HelloServiceServer).QueryRun(ctx, req.(*QueryRunRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// HelloService_ServiceDesc is the grpc.ServiceDesc for HelloService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var HelloService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "service.HelloService", + HandlerType: (*HelloServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "QueryRun", + Handler: _HelloService_QueryRun_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "generated/service.proto", +} diff --git a/router-tests/plugintest/hello/generated/staticcheck.conf b/router-tests/plugintest/hello/generated/staticcheck.conf new file mode 100644 index 0000000000..e290d96df3 --- /dev/null +++ b/router-tests/plugintest/hello/generated/staticcheck.conf @@ -0,0 +1,2 @@ +# This is meant as a workaround to skip staticcheck checks for generated code +checks = ["all", "-SA1019", "-ST1000"] diff --git a/router-tests/plugintest/hello/src/schema.graphql b/router-tests/plugintest/hello/src/schema.graphql new file mode 100644 index 0000000000..e19c8cd205 --- /dev/null +++ b/router-tests/plugintest/hello/src/schema.graphql @@ -0,0 +1,7 @@ +type Result { + responseString: String! +} + +type Query { + run(input: String!): Result! +} diff --git a/router-tests/plugintest/setup.go b/router-tests/plugintest/setup.go new file mode 100644 index 0000000000..94bf406a59 --- /dev/null +++ b/router-tests/plugintest/setup.go @@ -0,0 +1,94 @@ +package plugintest + +import ( + "context" + "github.com/wundergraph/cosmo/router-plugin/config" + routerplugin "github.com/wundergraph/cosmo/router-plugin/setup" + plugin "github.com/wundergraph/cosmo/router-tests/plugintest/hello/generated" + "net" + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/test/bufconn" +) + +type HelloService struct { + runFunc func(_ context.Context, req *plugin.QueryRunRequest) (*plugin.QueryRunResponse, error) + plugin.UnimplementedHelloServiceServer +} + +func (s *HelloService) QueryRun(ctx context.Context, req *plugin.QueryRunRequest) (*plugin.QueryRunResponse, error) { + return s.runFunc(ctx, req) +} + +// PluginGrpcServerSetupResponse is a wrapper that holds the gRPC test components +type PluginGrpcServerSetupResponse[T any] struct { + Client T + Cleanup func() +} + +const ( + bufSize = 1024 * 1024 +) + +type PluginGrpcTestConfig[T any] struct { + StartupConfig config.StartupConfig + RouterPluginConfig config.RouterPluginConfig + RegisterServiceFunc func(grpc.ServiceRegistrar) + CreateClientFunc func(conn *grpc.ClientConn) T +} + +// SetupPluginGrpcServerForTest creates a local gRPC server for testing +func SetupPluginGrpcServerForTest[T any](t *testing.T, testConfig PluginGrpcTestConfig[T]) *PluginGrpcServerSetupResponse[T] { + // Create a buffer for gRPC connections + lis := bufconn.Listen(bufSize) + + opts := routerplugin.GrpcServerInitOpts{ + StartupConfig: testConfig.StartupConfig, + PluginConfig: testConfig.RouterPluginConfig, + } + + server, err := routerplugin.GrpcServer(opts) + require.NoError(t, err) + + // Create a new gRPC server + grpcServer := server([]grpc.ServerOption{}) + + // Register our service + testConfig.RegisterServiceFunc(grpcServer) + + // Start the server + go func() { + err := grpcServer.Serve(lis) + require.NoError(t, err) + }() + + // Create a client connection + dialer := func(context.Context, string) (net.Conn, error) { + return lis.Dial() + } + + conn, err := grpc.NewClient( + "passthrough:///bufnet", + grpc.WithContextDialer(dialer), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + + // Create the service client + client := testConfig.CreateClientFunc(conn) + + // Return cleanup function + cleanup := func() { + err := conn.Close() + require.NoError(t, err) + grpcServer.Stop() + } + + return &PluginGrpcServerSetupResponse[T]{ + Client: client, + Cleanup: cleanup, + } +} diff --git a/router-tests/plugintest/tracing_test.go b/router-tests/plugintest/tracing_test.go new file mode 100644 index 0000000000..cd5c93dac3 --- /dev/null +++ b/router-tests/plugintest/tracing_test.go @@ -0,0 +1,732 @@ +package plugintest + +import ( + "context" + "fmt" + "github.com/wundergraph/cosmo/router-plugin/config" + "github.com/wundergraph/cosmo/router-plugin/httpclient" + "github.com/wundergraph/cosmo/router-plugin/tracing" + plugin "github.com/wundergraph/cosmo/router-tests/plugintest/hello/generated" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + semconv "go.opentelemetry.io/otel/semconv/v1.17.0" + "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" + "github.com/wundergraph/cosmo/router/pkg/trace/tracetest" +) + +func TestTracing(t *testing.T) { + t.Run("verify tracing not enabled", func(t *testing.T) { + + t.Run("with tracing enabled parameter as false", func(t *testing.T) { + exporter := tracetest.NewInMemoryExporter(t) + + startup := getTracingBaseConfig() + startup.Telemetry.Tracing.Exporters = []config.Exporter{ + {}, + } + + tracingEnabled := false + + opts := config.RouterPluginConfig{ + TracingEnabled: tracingEnabled, + MemoryExporter: exporter, + } + + svc := setupTracingTest(t, startup, opts, nil) + defer svc.Cleanup() + + resp, err := svc.Client.QueryRun(context.Background(), &plugin.QueryRunRequest{}) + require.NoError(t, err) + require.NotNil(t, resp.Run) + + sn := exporter.GetSpans().Snapshots() + require.Len(t, sn, 0) + }) + + t.Run("with tracing enabled parameter as true but no exporters", func(t *testing.T) { + exporter := tracetest.NewInMemoryExporter(t) + + startup := getTracingBaseConfig() + startup.Telemetry.Tracing.Exporters = []config.Exporter{} + + tracingEnabled := true + + opts := config.RouterPluginConfig{ + TracingEnabled: tracingEnabled, + MemoryExporter: exporter, + } + + svc := setupTracingTest(t, startup, opts, nil) + defer svc.Cleanup() + + resp, err := svc.Client.QueryRun(context.Background(), &plugin.QueryRunRequest{}) + require.NoError(t, err) + require.NotNil(t, resp.Run) + + sn := exporter.GetSpans().Snapshots() + require.Len(t, sn, 0) + }) + + t.Run("with tracing enabled parameter as true but nil telemetry config", func(t *testing.T) { + exporter := tracetest.NewInMemoryExporter(t) + + startup := getTracingBaseConfig() + startup.Telemetry = nil + + tracingEnabled := true + + opts := config.RouterPluginConfig{ + TracingEnabled: tracingEnabled, + MemoryExporter: exporter, + } + + svc := setupTracingTest(t, startup, opts, nil) + defer svc.Cleanup() + + resp, err := svc.Client.QueryRun(context.Background(), &plugin.QueryRunRequest{}) + require.NoError(t, err) + require.NotNil(t, resp.Run) + + sn := exporter.GetSpans().Snapshots() + require.Len(t, sn, 0) + }) + + t.Run("with tracing enabled parameter as true but nil tracing config", func(t *testing.T) { + exporter := tracetest.NewInMemoryExporter(t) + + startup := getTracingBaseConfig() + startup.Telemetry.Tracing = nil + + tracingEnabled := true + + opts := config.RouterPluginConfig{ + TracingEnabled: tracingEnabled, + MemoryExporter: exporter, + } + + svc := setupTracingTest(t, startup, opts, nil) + defer svc.Cleanup() + + resp, err := svc.Client.QueryRun(context.Background(), &plugin.QueryRunRequest{}) + require.NoError(t, err) + require.NotNil(t, resp.Run) + + sn := exporter.GetSpans().Snapshots() + require.Len(t, sn, 0) + }) + }) + + t.Run("verify span", func(t *testing.T) { + t.Parallel() + + t.Run("with base values", func(t *testing.T) { + exporter := tracetest.NewInMemoryExporter(t) + + startup := getTracingBaseConfig() + + opts := config.RouterPluginConfig{ + TracingEnabled: true, + MemoryExporter: exporter, + } + + svc := setupTracingTest(t, startup, opts, nil) + defer svc.Cleanup() + + resp, err := svc.Client.QueryRun(context.Background(), &plugin.QueryRunRequest{}) + require.NoError(t, err) + require.NotNil(t, resp.Run) + + sn := exporter.GetSpans().Snapshots() + require.Len(t, sn, 1) + + baseSpan := sn[0] + + expectedSpanName := "Router Plugin - /service.HelloService/QueryRun" + require.Equal(t, expectedSpanName, baseSpan.Name()) + + rootSpanAttributes := baseSpan.Resource().Attributes() + require.Len(t, rootSpanAttributes, 9) + + expectedDefaultName := "cosmo-router-plugin" + expectedDefaultVersion := "1.0.0" + require.Contains(t, rootSpanAttributes, semconv.ServiceNameKey.String(expectedDefaultName)) + require.Contains(t, rootSpanAttributes, semconv.ServiceVersionKey.String(expectedDefaultVersion)) + require.Contains(t, rootSpanAttributes, tracing.WgIsPlugin.Bool(true)) + + keys := make(map[attribute.Key]struct{}) + for _, attr := range rootSpanAttributes { + require.True(t, attr.Valid()) + keys[attr.Key] = struct{}{} + fmt.Println("Attribute:", attr.Key, "Value:", attr.Value.AsString()) + } + + require.Contains(t, keys, semconv.HostNameKey.String("").Key) + require.Contains(t, keys, semconv.OSTypeKey.String("").Key) + require.Contains(t, keys, semconv.ProcessPIDKey.String("").Key) + require.Contains(t, keys, semconv.TelemetrySDKLanguageKey.String("").Key) + require.Contains(t, keys, semconv.TelemetrySDKNameKey.String("").Key) + require.Contains(t, keys, semconv.TelemetrySDKVersionKey.String("").Key) + }) + + t.Run("with custom service name and version", func(t *testing.T) { + exporter := tracetest.NewInMemoryExporter(t) + + customName := "test-service" + customVersion := "2.0.0" + + opts := config.RouterPluginConfig{ + ServiceName: customName, + ServiceVersion: customVersion, + TracingEnabled: true, + MemoryExporter: exporter, + } + + svc := setupTracingTest(t, getTracingBaseConfig(), opts, nil) + defer svc.Cleanup() + + resp, err := svc.Client.QueryRun(context.Background(), &plugin.QueryRunRequest{}) + require.NoError(t, err) + require.NotNil(t, resp.Run) + + sn := exporter.GetSpans().Snapshots() + require.Len(t, sn, 1) + + rootSpanAttributes := sn[0].Resource().Attributes() + + require.Contains(t, rootSpanAttributes, semconv.ServiceNameKey.String(customName)) + require.Contains(t, rootSpanAttributes, semconv.ServiceVersionKey.String(customVersion)) + }) + + t.Run("verify custom values set by the plugin", func(t *testing.T) { + exporter := tracetest.NewInMemoryExporter(t) + + opts := config.RouterPluginConfig{ + TracingEnabled: true, + MemoryExporter: exporter, + } + + customKey := attribute.Key("custom.key").String("keythere") + customValue := attribute.Key("custom.count").String("valuethere") + customSpanName := "Custom Span Name" + + runner := func(ctx context.Context, req *plugin.QueryRunRequest) (*plugin.QueryRunResponse, error) { + span := trace.SpanFromContext(ctx) + require.NotNil(t, span) + span.SetName(customSpanName) + span.SetAttributes( + customKey, + customValue, + ) + + response := &plugin.QueryRunResponse{ + Run: &plugin.Result{ + ResponseString: "response string", + }, + } + return response, nil + } + + svc := setupTracingTest(t, getTracingBaseConfig(), opts, runner) + defer svc.Cleanup() + + resp, err := svc.Client.QueryRun(context.Background(), &plugin.QueryRunRequest{}) + require.NoError(t, err) + require.NotNil(t, resp.Run) + + sn := exporter.GetSpans().Snapshots() + require.Len(t, sn, 1) + + baseSpan := sn[0] + + require.Equal(t, customSpanName, baseSpan.Name()) + + attributes := baseSpan.Attributes() + require.Len(t, attributes, 2) + + require.Contains(t, attributes, customKey) + require.Contains(t, attributes, customValue) + }) + + t.Run("verify custom spans created by the plugin", func(t *testing.T) { + exporter := tracetest.NewInMemoryExporter(t) + + opts := config.RouterPluginConfig{ + TracingEnabled: true, + MemoryExporter: exporter, + } + + customKey := attribute.Key("custom.key").String("keythere") + customValue := attribute.Key("custom.count").String("valuethere") + customSpanName := "Custom Span Name" + + runner := func(ctx context.Context, req *plugin.QueryRunRequest) (*plugin.QueryRunResponse, error) { + tracer := otel.Tracer("inner-tracer") + + _, span := tracer.Start(ctx, customSpanName) + span.SetAttributes( + customKey, + customValue, + ) + defer span.End() + + response := &plugin.QueryRunResponse{ + Run: &plugin.Result{ + ResponseString: "response string", + }, + } + return response, nil + } + + svc := setupTracingTest(t, getTracingBaseConfig(), opts, runner) + defer svc.Cleanup() + + resp, err := svc.Client.QueryRun(context.Background(), &plugin.QueryRunRequest{}) + require.NoError(t, err) + require.NotNil(t, resp.Run) + + sn := exporter.GetSpans().Snapshots() + require.Len(t, sn, 2) + + baseSpan := sn[1] + baseSpanName := "Router Plugin - /service.HelloService/QueryRun" + require.Equal(t, baseSpanName, baseSpan.Name()) + require.Len(t, baseSpan.Attributes(), 0) + + customSpan := sn[0] + require.Equal(t, customSpanName, customSpan.Name()) + require.Contains(t, customSpan.Attributes(), customKey) + require.Contains(t, customSpan.Attributes(), customValue) + }) + }) + + t.Run("verify ip anonymization", func(t *testing.T) { + t.Run("when not enabled", func(t *testing.T) { + exporter := tracetest.NewInMemoryExporter(t) + + startup := getTracingBaseConfig() + startup.IPAnonymization = &config.IPAnonymization{ + Enabled: false, + Method: config.Redact, + } + + opts := config.RouterPluginConfig{ + TracingEnabled: true, + MemoryExporter: exporter, + } + + httpClientIPKey := "127.2.2.5" + netSockPeerAddrKey := "127.3.2.6" + + runner := func(ctx context.Context, req *plugin.QueryRunRequest) (*plugin.QueryRunResponse, error) { + span := trace.SpanFromContext(ctx) + require.NotNil(t, span) + span.SetAttributes( + semconv.HTTPClientIPKey.String(httpClientIPKey), + semconv.NetSockPeerAddrKey.String(netSockPeerAddrKey), + ) + + response := &plugin.QueryRunResponse{ + Run: &plugin.Result{ + ResponseString: "response string", + }, + } + return response, nil + } + + svc := setupTracingTest(t, startup, opts, runner) + defer svc.Cleanup() + + resp, err := svc.Client.QueryRun(context.Background(), &plugin.QueryRunRequest{}) + require.NoError(t, err) + require.NotNil(t, resp.Run) + + sn := exporter.GetSpans().Snapshots() + require.Len(t, sn, 1) + + baseSpan := sn[0] + require.Len(t, baseSpan.Attributes(), 2) + require.Contains(t, baseSpan.Attributes(), semconv.HTTPClientIPKey.String(httpClientIPKey)) + require.Contains(t, baseSpan.Attributes(), semconv.NetSockPeerAddrKey.String(netSockPeerAddrKey)) + }) + + t.Run("when nil", func(t *testing.T) { + exporter := tracetest.NewInMemoryExporter(t) + + startup := getTracingBaseConfig() + startup.IPAnonymization = nil + + opts := config.RouterPluginConfig{ + TracingEnabled: true, + MemoryExporter: exporter, + } + + httpClientIPKey := "127.2.2.5" + netSockPeerAddrKey := "127.3.2.6" + + runner := func(ctx context.Context, req *plugin.QueryRunRequest) (*plugin.QueryRunResponse, error) { + span := trace.SpanFromContext(ctx) + require.NotNil(t, span) + span.SetAttributes( + semconv.HTTPClientIPKey.String(httpClientIPKey), + semconv.NetSockPeerAddrKey.String(netSockPeerAddrKey), + ) + + response := &plugin.QueryRunResponse{ + Run: &plugin.Result{ + ResponseString: "response string", + }, + } + return response, nil + } + + svc := setupTracingTest(t, startup, opts, runner) + defer svc.Cleanup() + + resp, err := svc.Client.QueryRun(context.Background(), &plugin.QueryRunRequest{}) + require.NoError(t, err) + require.NotNil(t, resp.Run) + + sn := exporter.GetSpans().Snapshots() + require.Len(t, sn, 1) + + baseSpan := sn[0] + require.Len(t, baseSpan.Attributes(), 2) + require.Contains(t, baseSpan.Attributes(), semconv.HTTPClientIPKey.String(httpClientIPKey)) + require.Contains(t, baseSpan.Attributes(), semconv.NetSockPeerAddrKey.String(netSockPeerAddrKey)) + }) + + t.Run("with redact", func(t *testing.T) { + exporter := tracetest.NewInMemoryExporter(t) + + startup := getTracingBaseConfig() + startup.IPAnonymization = &config.IPAnonymization{ + Enabled: true, + Method: config.Redact, + } + + opts := config.RouterPluginConfig{ + TracingEnabled: true, + MemoryExporter: exporter, + } + + runner := func(ctx context.Context, req *plugin.QueryRunRequest) (*plugin.QueryRunResponse, error) { + span := trace.SpanFromContext(ctx) + require.NotNil(t, span) + span.SetAttributes( + semconv.HTTPClientIPKey.String("127.2.2.5"), + semconv.NetSockPeerAddrKey.String("127.3.2.5"), + ) + + response := &plugin.QueryRunResponse{ + Run: &plugin.Result{ + ResponseString: "response string", + }, + } + return response, nil + } + + svc := setupTracingTest(t, startup, opts, runner) + defer svc.Cleanup() + + resp, err := svc.Client.QueryRun(context.Background(), &plugin.QueryRunRequest{}) + require.NoError(t, err) + require.NotNil(t, resp.Run) + + sn := exporter.GetSpans().Snapshots() + require.Len(t, sn, 1) + + baseSpan := sn[0] + require.Len(t, baseSpan.Attributes(), 2) + require.Contains(t, baseSpan.Attributes(), semconv.HTTPClientIPKey.String("[REDACTED]")) + require.Contains(t, baseSpan.Attributes(), semconv.NetSockPeerAddrKey.String("[REDACTED]")) + }) + + t.Run("with hash", func(t *testing.T) { + exporter := tracetest.NewInMemoryExporter(t) + + startup := getTracingBaseConfig() + startup.IPAnonymization = &config.IPAnonymization{ + Enabled: true, + Method: config.Hash, + } + + opts := config.RouterPluginConfig{ + TracingEnabled: true, + MemoryExporter: exporter, + } + + runner := func(ctx context.Context, req *plugin.QueryRunRequest) (*plugin.QueryRunResponse, error) { + span := trace.SpanFromContext(ctx) + require.NotNil(t, span) + span.SetAttributes( + semconv.HTTPClientIPKey.String("127.2.2.5"), + semconv.NetSockPeerAddrKey.String("127.3.2.5"), + ) + + response := &plugin.QueryRunResponse{ + Run: &plugin.Result{ + ResponseString: "response string", + }, + } + return response, nil + } + + svc := setupTracingTest(t, startup, opts, runner) + defer svc.Cleanup() + + resp, err := svc.Client.QueryRun(context.Background(), &plugin.QueryRunRequest{}) + require.NoError(t, err) + require.NotNil(t, resp.Run) + + sn := exporter.GetSpans().Snapshots() + require.Len(t, sn, 1) + + baseSpan := sn[0] + require.Len(t, baseSpan.Attributes(), 2) + require.Contains(t, baseSpan.Attributes(), semconv.HTTPClientIPKey.String("70c76e7df1c5f51c716f98e4ec3372566a242d429de2cc87c683034df9a440f5")) + require.Contains(t, baseSpan.Attributes(), semconv.NetSockPeerAddrKey.String("a5ec9311d0d04e08d359e8135fda0e8426a797199eefe98f07ab95b7a1acdf59")) + }) + }) + + t.Run("http client tracing", func(t *testing.T) { + t.Run("when tracing is disabled", func(t *testing.T) { + expectedResponse := `{"message":"success"}` + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + _, err := w.Write([]byte(expectedResponse)) + require.NoError(t, err) + })) + defer mockServer.Close() + + exporter := tracetest.NewInMemoryExporter(t) + + opts := config.RouterPluginConfig{ + TracingEnabled: true, + MemoryExporter: exporter, + } + + runner := func(ctx context.Context, req *plugin.QueryRunRequest) (*plugin.QueryRunResponse, error) { + for range 5 { + client := httpclient.New(httpclient.WithBaseURL(mockServer.URL)) + resp, err := client.Get(ctx, "/test") + require.NoError(t, err) + require.Equal(t, expectedResponse, string(resp.Body)) + } + + response := &plugin.QueryRunResponse{ + Run: &plugin.Result{ + ResponseString: "response string", + }, + } + return response, nil + } + + svc := setupTracingTest(t, getTracingBaseConfig(), opts, runner) + defer svc.Cleanup() + + resp, err := svc.Client.QueryRun(context.Background(), &plugin.QueryRunRequest{}) + require.NoError(t, err) + require.NotNil(t, resp.Run) + + snapshots := exporter.GetSpans().Snapshots() + require.Len(t, snapshots, 1) + + baseSpan := snapshots[0] + baseSpanName := "Router Plugin - /service.HelloService/QueryRun" + require.Equal(t, baseSpanName, baseSpan.Name()) + require.Len(t, baseSpan.Attributes(), 0) + }) + + t.Run("when tracing enabled", func(t *testing.T) { + expectedResponse := `{"message":"success"}` + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + _, err := w.Write([]byte(expectedResponse)) + require.NoError(t, err) + })) + defer mockServer.Close() + + exporter := tracetest.NewInMemoryExporter(t) + + opts := config.RouterPluginConfig{ + TracingEnabled: true, + MemoryExporter: exporter, + } + + numberOfHttpRequests := 5 + path := "/test" + + runner := func(ctx context.Context, req *plugin.QueryRunRequest) (*plugin.QueryRunResponse, error) { + for range numberOfHttpRequests { + client := httpclient.New(httpclient.WithTracing(), httpclient.WithBaseURL(mockServer.URL)) + resp, err := client.Get(ctx, path) + require.NoError(t, err) + require.Equal(t, expectedResponse, string(resp.Body)) + } + + response := &plugin.QueryRunResponse{ + Run: &plugin.Result{ + ResponseString: "response string", + }, + } + return response, nil + } + + svc := setupTracingTest(t, getTracingBaseConfig(), opts, runner) + defer svc.Cleanup() + + resp, err := svc.Client.QueryRun(context.Background(), &plugin.QueryRunRequest{}) + require.NoError(t, err) + require.NotNil(t, resp.Run) + + snapshots := exporter.GetSpans().Snapshots() + require.Len(t, snapshots, 1+numberOfHttpRequests) + + httpCallSpan1 := snapshots[0] + require.Len(t, httpCallSpan1.Attributes(), 2) + + expectedMethod := "GET" + expectedUrl := mockServer.URL + path + expectedSpanName := fmt.Sprintf("http.request - %s %s", expectedMethod, expectedUrl) + + require.Equal(t, expectedSpanName, httpCallSpan1.Name()) + require.Contains(t, httpCallSpan1.Attributes(), semconv.HTTPURLKey.String(expectedUrl)) + require.Contains(t, httpCallSpan1.Attributes(), semconv.HTTPMethodKey.String(expectedMethod)) + + // Verify that the HTTP client spans are the actual occurences + httpCallInstances := 0 + for _, snapshot := range snapshots { + if expectedSpanName == snapshot.Name() { + httpCallInstances++ + } + } + require.Equal(t, numberOfHttpRequests, httpCallInstances) + }) + + t.Run("verify external service gets headers propagated", func(t *testing.T) { + traceId := "4bf92f3577b34da6a3ce929d0e0e4736" + traceParentHeader := "00-" + traceId + "-00f067aa0ba902b7-01" + baggageValue := "key1=value1,key2=value2" + + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + //Jaeger + require.Contains(t, r.Header.Get("Uber-Trace-Id"), "4bf92f3577b34da6a3ce929d0e0e4736") + + // B3 + require.Contains(t, r.Header.Get("B3"), traceId) + require.Equal(t, traceId, r.Header.Get("X-B3-Traceid")) + require.NotEmpty(t, r.Header.Get("X-B3-Spanid")) + require.Equal(t, "1", r.Header.Get("X-B3-Sampled")) + + // Datadog + require.NotEmpty(t, r.Header.Get("X-Datadog-Parent-Id")) + require.Equal(t, "11803532876627986230", r.Header.Get("X-Datadog-Trace-Id")) + require.Equal(t, "1", r.Header.Get("X-Datadog-Sampling-Priority")) + + // Traceparent + require.Contains(t, r.Header.Get("Traceparent"), traceId) + + // Baggage + require.Contains(t, r.Header.Get("Baggage"), baggageValue) + + w.Header().Set("Content-Type", "application/json") + _, err := w.Write([]byte(`{"message":"success"}`)) + require.NoError(t, err) + })) + defer mockServer.Close() + + exporter := tracetest.NewInMemoryExporter(t) + + baseConfig := getTracingBaseConfig() + baseConfig.Telemetry.Tracing.Propagators = []config.Propagator{ + config.PropagatorTraceContext, + config.PropagatorB3, + config.PropagatorJaeger, + config.PropagatorBaggage, + config.PropagatorDatadog, + } + + opts := config.RouterPluginConfig{ + TracingEnabled: true, + MemoryExporter: exporter, + } + + runner := func(ctx context.Context, req *plugin.QueryRunRequest) (*plugin.QueryRunResponse, error) { + client := httpclient.New(httpclient.WithTracing(), httpclient.WithBaseURL(mockServer.URL)) + _, err := client.Get(ctx, "/test") + require.NoError(t, err) + + response := &plugin.QueryRunResponse{ + Run: &plugin.Result{ + ResponseString: "response string", + }, + } + return response, nil + } + + svc := setupTracingTest(t, baseConfig, opts, runner) + defer svc.Cleanup() + + ctx := context.Background() + + ctx = metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{ + "traceparent": traceParentHeader, + "Baggage": baggageValue, + })) + + resp, err := svc.Client.QueryRun(ctx, &plugin.QueryRunRequest{}) + require.NoError(t, err) + require.NotNil(t, resp.Run) + require.Len(t, exporter.GetSpans().Snapshots(), 2) + }) + }) + +} + +func getTracingBaseConfig() config.StartupConfig { + return config.StartupConfig{ + Telemetry: &config.Telemetry{ + Tracing: &config.Tracing{ + Sampler: 1.0, + Exporters: []config.Exporter{ + {}, + }, + }, + }, + } +} + +type runFunc func(_ context.Context, req *plugin.QueryRunRequest) (*plugin.QueryRunResponse, error) + +func setupTracingTest(t *testing.T, startup config.StartupConfig, opts config.RouterPluginConfig, runner runFunc) *PluginGrpcServerSetupResponse[plugin.HelloServiceClient] { + // Use base runner if nothing is passed + if runner == nil { + runner = func(_ context.Context, req *plugin.QueryRunRequest) (*plugin.QueryRunResponse, error) { + response := &plugin.QueryRunResponse{ + Run: &plugin.Result{ + ResponseString: "response string", + }, + } + return response, nil + } + } + + svc := SetupPluginGrpcServerForTest[plugin.HelloServiceClient](t, PluginGrpcTestConfig[plugin.HelloServiceClient]{ + StartupConfig: startup, + RouterPluginConfig: opts, + RegisterServiceFunc: func(reg grpc.ServiceRegistrar) { + plugin.RegisterHelloServiceServer(reg, &HelloService{runFunc: runner}) + }, + CreateClientFunc: func(conn *grpc.ClientConn) plugin.HelloServiceClient { + return plugin.NewHelloServiceClient(conn) + }, + }) + return svc +} diff --git a/router-tests/router_plugin_test.go b/router-tests/router_plugin_test.go index e14b6fed65..e99cea7f8b 100644 --- a/router-tests/router_plugin_test.go +++ b/router-tests/router_plugin_test.go @@ -1,6 +1,7 @@ package integration import ( + "fmt" "testing" "time" @@ -9,6 +10,7 @@ import ( "go.uber.org/zap/zapcore" "github.com/wundergraph/cosmo/router-tests/testenv" + "github.com/wundergraph/cosmo/router/pkg/trace/tracetest" ) func TestRouterPlugin(t *testing.T) { @@ -92,6 +94,46 @@ func TestRouterPlugin(t *testing.T) { }) } +func TestVerifyTelemetryForRouterPluginRequests(t *testing.T) { + exporter := tracetest.NewInMemoryExporter(t) + + testenv.Run(t, + &testenv.Config{ + TraceExporter: exporter, + RouterConfigJSONTemplate: testenv.ConfigWithPluginsJSONTemplate, + Plugins: testenv.PluginConfig{ + Enabled: true, + Path: "../router/plugins", + }, + }, + func(t *testing.T, xEnv *testenv.Environment) { + t.Run("query projects simple", func(t *testing.T) { + t.Parallel() + + queryName := "query sample" + response := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: fmt.Sprintf(`%s { projects { id name } }`, queryName), + }) + + expected := `{"data":{"projects":[{"id":"1","name":"Cloud Migration Overhaul"},{"id":"2","name":"Microservices Revolution"},{"id":"3","name":"AI-Powered Analytics"},{"id":"4","name":"DevOps Transformation"},{"id":"5","name":"Security Overhaul"},{"id":"6","name":"Mobile App Development"},{"id":"7","name":"Data Lake Implementation"}]}}` + require.Equal(t, expected, response.Body) + + snapshots := exporter.GetSpans().Snapshots() + require.Len(t, snapshots, 8) + + queryNameInstances := 0 + for _, sn := range snapshots { + if sn.Name() == queryName { + queryNameInstances++ + } + } + + // Normal http spans would have query sample twice + require.Equal(t, queryNameInstances, 1) + }) + }) +} + func TestRouterPluginRequests(t *testing.T) { t.Parallel() tests := []struct { diff --git a/router/core/graph_server.go b/router/core/graph_server.go index 87b02afd2d..372bdc03a8 100644 --- a/router/core/graph_server.go +++ b/router/core/graph_server.go @@ -1515,10 +1515,12 @@ func (s *graphServer) setupConnector(ctx context.Context, config *nodev1.EngineC } grpcPlugin, err := grpcconnector.NewGRPCPlugin(grpcconnector.GRPCPluginConfig{ - Logger: s.logger, - PluginName: pluginConfig.GetName(), - PluginPath: pluginPath, + Logger: s.logger, + PluginName: pluginConfig.GetName(), + PluginPath: pluginPath, + StartupConfig: newGRPCStartupParams(s.traceConfig, s.ipAnonymization), }) + if err != nil { return fmt.Errorf("failed to create grpc plugin for subgraph %s: %w", dsConfig.Id, err) } @@ -1536,6 +1538,50 @@ func (s *graphServer) setupConnector(ctx context.Context, config *nodev1.EngineC return nil } +func newGRPCStartupParams(traceConfig *rtrace.Config, ipAnonymization *IPAnonymizationConfig) grpcconnector.GRPCStartupParams { + startupConfig := grpcconnector.GRPCStartupParams{} + + if traceConfig.Enabled && len(traceConfig.Exporters) > 0 { + enabledExporters := make([]grpcconnector.GRPCExporter, 0) + for _, exporter := range traceConfig.Exporters { + if exporter != nil && !exporter.Disabled { + transformedExporter := grpcconnector.GRPCExporter{ + Endpoint: exporter.Endpoint, + Exporter: string(exporter.Exporter), + BatchTimeout: exporter.BatchTimeout, + ExportTimeout: exporter.ExportTimeout, + Headers: exporter.Headers, + HTTPPath: exporter.HTTPPath, + } + enabledExporters = append(enabledExporters, transformedExporter) + } + } + + // Convert to []string + propagators := make([]string, 0, len(traceConfig.Propagators)) + for _, propagator := range traceConfig.Propagators { + propagators = append(propagators, string(propagator)) + } + + startupConfig.Telemetry = &grpcconnector.GRPCTelemetry{ + Tracing: &grpcconnector.GRPCTracing{ + Exporters: enabledExporters, + Propagators: propagators, + Sampler: traceConfig.Sampler, + }, + } + } + + if ipAnonymization != nil { + startupConfig.IPAnonymization = &grpcconnector.GRPCIPAnonymization{ + Enabled: ipAnonymization.Enabled, + Method: string(ipAnonymization.Method), + } + } + + return startupConfig +} + // wait waits for all in-flight requests to finish. Similar to http.Server.Shutdown we wait in intervals + jitter // to make the shutdown process more efficient. func (s *graphServer) wait(ctx context.Context) error { diff --git a/router/pkg/grpcconnector/grpc_plugin.go b/router/pkg/grpcconnector/grpc_plugin.go index 0f3a96ac50..fb580a8ac6 100644 --- a/router/pkg/grpcconnector/grpc_plugin.go +++ b/router/pkg/grpcconnector/grpc_plugin.go @@ -2,6 +2,7 @@ package grpcconnector import ( "context" + "encoding/json" "errors" "fmt" "os" @@ -15,9 +16,10 @@ import ( ) type GRPCPluginConfig struct { - Logger *zap.Logger - PluginPath string - PluginName string + Logger *zap.Logger + PluginPath string + PluginName string + StartupConfig GRPCStartupParams } type GRPCPlugin struct { @@ -33,7 +35,8 @@ type GRPCPlugin struct { pluginPath string pluginName string - client *GRPCPluginClient + client *GRPCPluginClient + startupConfig GRPCStartupParams } func NewGRPCPlugin(config GRPCPluginConfig) (*GRPCPlugin, error) { @@ -58,6 +61,8 @@ func NewGRPCPlugin(config GRPCPluginConfig) (*GRPCPlugin, error) { pluginPath: config.PluginPath, pluginName: config.PluginName, + + startupConfig: config.StartupConfig, }, nil } @@ -92,10 +97,21 @@ func (p *GRPCPlugin) fork() error { pluginCmd := newPluginCommand(filePath) + // This is the same as SkipHostEnv false + // except that we do that first so that any params are not overriden + pluginCmd.Env = append(pluginCmd.Env, os.Environ()...) + + configJson, err := json.Marshal(p.startupConfig) + if err != nil { + return fmt.Errorf("failed to create plugin startup config: %w", err) + } + pluginCmd.Env = append(pluginCmd.Env, fmt.Sprintf("%s=%s", "startup_config", configJson)) + pluginClient := plugin.NewClient(&plugin.ClientConfig{ Cmd: pluginCmd, AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC}, HandshakeConfig: handshakeConfig, + SkipHostEnv: true, Logger: NewPluginLogger(p.logger), Plugins: map[string]plugin.Plugin{ p.pluginName: p, diff --git a/router/pkg/grpcconnector/grpc_plugin_client.go b/router/pkg/grpcconnector/grpc_plugin_client.go index d701d62f7d..1256c9bf9d 100644 --- a/router/pkg/grpcconnector/grpc_plugin_client.go +++ b/router/pkg/grpcconnector/grpc_plugin_client.go @@ -3,6 +3,8 @@ package grpcconnector import ( "context" "errors" + "go.opentelemetry.io/otel" + "google.golang.org/grpc/metadata" "sync" "sync/atomic" "time" @@ -127,6 +129,11 @@ func (g *GRPCPluginClient) Invoke(ctx context.Context, method string, args any, g.mu.RLock() defer g.mu.RUnlock() + + md := make(metadata.MD) + otel.GetTextMapPropagator().Inject(ctx, metadataCarrier{md}) + ctx = metadata.NewOutgoingContext(ctx, md) + return g.cc.Invoke(ctx, method, args, reply, opts...) } diff --git a/router/pkg/grpcconnector/grpc_startup_config.go b/router/pkg/grpcconnector/grpc_startup_config.go new file mode 100644 index 0000000000..be2430570a --- /dev/null +++ b/router/pkg/grpcconnector/grpc_startup_config.go @@ -0,0 +1,31 @@ +package grpcconnector + +import ( + "time" +) + +type GRPCStartupParams struct { + Telemetry *GRPCTelemetry `json:"telemetry,omitempty"` + IPAnonymization *GRPCIPAnonymization `json:"ip_anonymization,omitempty"` +} +type GRPCTelemetry struct { + Tracing *GRPCTracing `json:"tracing,omitempty"` +} +type GRPCTracing struct { + Exporters []GRPCExporter `json:"exporters,omitempty"` + Propagators []string `json:"propagators,omitempty"` + Sampler float64 `json:"sampler"` +} +type GRPCExporter struct { + Endpoint string `json:"endpoint"` + Exporter string `json:"exporter"` + BatchTimeout time.Duration `json:"batch_timeout"` + ExportTimeout time.Duration `json:"export_timeout"` + Headers map[string]string `json:"headers"` + HTTPPath string `json:"http_path"` +} + +type GRPCIPAnonymization struct { + Enabled bool `json:"enabled"` + Method string `json:"method"` +} diff --git a/router/pkg/grpcconnector/metadatacarrier.go b/router/pkg/grpcconnector/metadatacarrier.go new file mode 100644 index 0000000000..51bcd3c21a --- /dev/null +++ b/router/pkg/grpcconnector/metadatacarrier.go @@ -0,0 +1,30 @@ +package grpcconnector + +import ( + "google.golang.org/grpc/metadata" + "strings" +) + +type metadataCarrier struct { + metadata.MD +} + +func (mc metadataCarrier) Get(key string) string { + values := mc.MD.Get(strings.ToLower(key)) + if len(values) == 0 { + return "" + } + return values[0] +} + +func (mc metadataCarrier) Set(key string, value string) { + mc.MD.Set(strings.ToLower(key), value) +} + +func (mc metadataCarrier) Keys() []string { + keys := make([]string, 0, len(mc.MD)) + for k := range mc.MD { + keys = append(keys, k) + } + return keys +}