diff --git a/cmd/features.go b/cmd/features.go index 464f4005cc..ed3de8e645 100644 --- a/cmd/features.go +++ b/cmd/features.go @@ -2,6 +2,7 @@ // Use of this source code is governed by an Apache2 // license that can be found in the LICENSE file. +//go:build opa_wasm // +build opa_wasm package cmd diff --git a/cmd/run.go b/cmd/run.go index a84f06c63e..03192efe75 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -282,7 +282,7 @@ func initRuntime(ctx context.Context, params runCmdParams, args []string) (*runt return nil, err } - rt.SetDistributedTracingErrorHandler() + rt.SetDistributedTracingLogging() return rt, nil } diff --git a/features/tracing/tracing.go b/features/tracing/tracing.go new file mode 100644 index 0000000000..a77ad4cc87 --- /dev/null +++ b/features/tracing/tracing.go @@ -0,0 +1,34 @@ +// Copyright 2021 The OPA Authors. All rights reserved. +// Use of this source code is governed by an Apache2 +// license that can be found in the LICENSE file. + +package tracing + +import ( + "net/http" + + pkg_tracing "github.com/open-policy-agent/opa/tracing" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" +) + +func init() { + pkg_tracing.RegisterHTTPTracing(&factory{}) +} + +type factory struct{} + +func (*factory) NewTransport(tr http.RoundTripper, opts pkg_tracing.Options) http.RoundTripper { + return otelhttp.NewTransport(tr, convertOpts(opts)...) +} + +func (*factory) NewHandler(f http.Handler, label string, opts pkg_tracing.Options) http.Handler { + return otelhttp.NewHandler(f, label, convertOpts(opts)...) +} + +func convertOpts(opts pkg_tracing.Options) []otelhttp.Option { + otelOpts := make([]otelhttp.Option, 0, len(opts)) + for _, opt := range opts { + otelOpts = append(otelOpts, opt.(otelhttp.Option)) + } + return otelOpts +} diff --git a/go.mod b/go.mod index fe6126c2c0..75b889e361 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/fsnotify/fsnotify v1.5.1 github.com/ghodss/yaml v1.0.0 github.com/go-ini/ini v1.66.2 + github.com/go-logr/logr v1.2.1 github.com/gobwas/glob v0.2.3 github.com/golang/glog v1.0.0 // indirect github.com/golang/snappy v0.0.4 // indirect diff --git a/internal/distributedtracing/distributedtracing.go b/internal/distributedtracing/distributedtracing.go index 02283b513b..71f73b61dd 100644 --- a/internal/distributedtracing/distributedtracing.go +++ b/internal/distributedtracing/distributedtracing.go @@ -1,3 +1,7 @@ +// Copyright 2021 The OPA Authors. All rights reserved. +// Use of this source code is governed by an Apache2 +// license that can be found in the LICENSE file. + package distributedtracing import ( @@ -7,9 +11,7 @@ import ( "fmt" "io/ioutil" - "github.com/open-policy-agent/opa/config" - "github.com/open-policy-agent/opa/logging" - "github.com/open-policy-agent/opa/util" + "github.com/go-logr/logr" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" @@ -19,6 +21,16 @@ import ( "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.7.0" "google.golang.org/grpc/credentials" + + "github.com/open-policy-agent/opa/config" + "github.com/open-policy-agent/opa/logging" + "github.com/open-policy-agent/opa/tracing" + "github.com/open-policy-agent/opa/util" + + // The import registers opentelemetry with the top-level `tracing` package, + // so the latter can be used from rego/topdown without an explicit build-time + // dependency. + _ "github.com/open-policy-agent/opa/features/tracing" ) const ( @@ -55,9 +67,7 @@ type distributedTracingConfig struct { TLSCACertFile string `json:"tls_ca_cert_file,omitempty"` } -type Options []otelhttp.Option - -func Init(ctx context.Context, raw []byte, id string) (traceExporter *otlptrace.Exporter, options Options, err error) { +func Init(ctx context.Context, raw []byte, id string) (*otlptrace.Exporter, tracing.Options, error) { parsedConfig, err := config.ParseConfig(raw, id) if err != nil { return nil, nil, err @@ -87,8 +97,10 @@ func Init(ctx context.Context, raw []byte, id string) (traceExporter *otlptrace. return nil, nil, err } - traceExporter = otlptracegrpc.NewUnstarted(otlptracegrpc.WithEndpoint(distributedTracingConfig.Address), - tlsOption) + traceExporter := otlptracegrpc.NewUnstarted( + otlptracegrpc.WithEndpoint(distributedTracingConfig.Address), + tlsOption, + ) res, err := resource.New(ctx, resource.WithAttributes( @@ -105,7 +117,7 @@ func Init(ctx context.Context, raw []byte, id string) (traceExporter *otlptrace. trace.WithSpanProcessor(trace.NewBatchSpanProcessor(traceExporter)), ) - options = append(options, + options := tracing.NewOptions( otelhttp.WithTracerProvider(traceProvider), otelhttp.WithPropagators(propagation.TraceContext{}), ) @@ -113,10 +125,9 @@ func Init(ctx context.Context, raw []byte, id string) (traceExporter *otlptrace. return traceExporter, options, nil } -func SetErrorHandler(logger logging.Logger) { - otel.SetErrorHandler(&errorHandler{ - logger: logger, - }) +func SetupLogging(logger logging.Logger) { + otel.SetErrorHandler(&errorHandler{logger: logger}) + otel.SetLogger(logr.New(&sink{logger: logger})) } func parseDistributedTracingConfig(raw []byte) (*distributedTracingConfig, error) { @@ -241,3 +252,37 @@ type errorHandler struct { func (e *errorHandler) Handle(err error) { e.logger.Warn("Distributed tracing: " + err.Error()) } + +// NOTE(sr): This adapter code is used to ensure that whatever otel logs, now or +// in the future, will end up in "our" logs, and not go through whatever defaults +// it has set up with its global logger. As such, it's to a full-featured +// implementation fo the logr.LogSink interface, but a rather minimal one. Notably, +// fields are no supported, the initial runtime time info is ignored, and there is +// no support for different verbosity level is "info" logs: they're all printed +// as-is. + +type sink struct { + logger logging.Logger +} + +func (s *sink) Enabled(level int) bool { + return int(s.logger.GetLevel()) >= level +} + +func (*sink) Init(logr.RuntimeInfo) {} // ignored + +func (s *sink) Info(_ int, msg string, _ ...interface{}) { + s.logger.Info(msg) +} + +func (s *sink) Error(err error, msg string, _ ...interface{}) { + s.logger.WithFields(map[string]interface{}{"err": err}).Error(msg) +} + +func (s *sink) WithName(name string) logr.LogSink { + return &sink{s.logger.WithFields(map[string]interface{}{"name": name})} +} + +func (s *sink) WithValues(...interface{}) logr.LogSink { // ignored + return s +} diff --git a/rego/rego.go b/rego/rego.go index 3659089f9c..d7048a0e81 100644 --- a/rego/rego.go +++ b/rego/rego.go @@ -18,7 +18,6 @@ import ( "github.com/open-policy-agent/opa/bundle" bundleUtils "github.com/open-policy-agent/opa/internal/bundle" "github.com/open-policy-agent/opa/internal/compiler/wasm" - "github.com/open-policy-agent/opa/internal/distributedtracing" "github.com/open-policy-agent/opa/internal/future" "github.com/open-policy-agent/opa/internal/ir" "github.com/open-policy-agent/opa/internal/planner" @@ -32,6 +31,7 @@ import ( "github.com/open-policy-agent/opa/topdown" "github.com/open-policy-agent/opa/topdown/cache" "github.com/open-policy-agent/opa/topdown/print" + "github.com/open-policy-agent/opa/tracing" "github.com/open-policy-agent/opa/types" "github.com/open-policy-agent/opa/util" ) @@ -514,7 +514,7 @@ type Rego struct { generateJSON func(*ast.Term, *EvalContext) (interface{}, error) printHook print.Hook enablePrintStatements bool - distributedTacingOpts distributedtracing.Options + distributedTacingOpts tracing.Options } // Function represents a built-in function that is callable in Rego. @@ -1065,7 +1065,7 @@ func PrintHook(h print.Hook) func(r *Rego) { } // DistributedTracingOpts sets the options to be used by distributed tracing. -func DistributedTracingOpts(tr distributedtracing.Options) func(r *Rego) { +func DistributedTracingOpts(tr tracing.Options) func(r *Rego) { return func(r *Rego) { r.distributedTacingOpts = tr } diff --git a/runtime/runtime.go b/runtime/runtime.go index 043c6a69d6..bf8f362455 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -22,14 +22,13 @@ import ( "github.com/fsnotify/fsnotify" "github.com/gorilla/mux" - "github.com/pkg/errors" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.uber.org/automaxprocs/maxprocs" "github.com/open-policy-agent/opa/ast" "github.com/open-policy-agent/opa/bundle" "github.com/open-policy-agent/opa/internal/config" - "github.com/open-policy-agent/opa/internal/distributedtracing" + internal_tracing "github.com/open-policy-agent/opa/internal/distributedtracing" internal_logging "github.com/open-policy-agent/opa/internal/logging" "github.com/open-policy-agent/opa/internal/prometheus" "github.com/open-policy-agent/opa/internal/report" @@ -46,6 +45,7 @@ import ( "github.com/open-policy-agent/opa/server" "github.com/open-policy-agent/opa/storage" "github.com/open-policy-agent/opa/storage/inmem" + "github.com/open-policy-agent/opa/tracing" "github.com/open-policy-agent/opa/util" "github.com/open-policy-agent/opa/version" ) @@ -202,7 +202,7 @@ type Params struct { // If it is nil, a new mux.Router will be created Router *mux.Router - DistrbutedTracingOpts distributedtracing.Options + DistributedTracingOpts tracing.Options } // LoggingConfig stores the configuration for OPA's logging behaviour. @@ -277,7 +277,7 @@ func NewRuntime(ctx context.Context, params Params) (*Runtime, error) { config, err := config.Load(params.ConfigFile, params.ConfigOverrides, params.ConfigOverrideFiles) if err != nil { - return nil, errors.Wrap(err, "config error") + return nil, fmt.Errorf("config error: %w", err) } var reporter *report.Reporter @@ -285,13 +285,13 @@ func NewRuntime(ctx context.Context, params Params) (*Runtime, error) { var err error reporter, err = report.New(params.ID, report.Options{Logger: logger}) if err != nil { - return nil, errors.Wrap(err, "config error") + return nil, fmt.Errorf("config error: %w", err) } } loaded, err := initload.LoadPaths(params.Paths, params.Filter, params.BundleMode, params.BundleVerificationConfig, params.SkipBundleVerification) if err != nil { - return nil, errors.Wrap(err, "load error") + return nil, fmt.Errorf("load error: %w", err) } info, err := runtime.Term(runtime.Params{Config: config}) @@ -327,26 +327,26 @@ func NewRuntime(ctx context.Context, params Params) (*Runtime, error) { plugins.PrintHook(loggingPrintHook{logger: logger}), plugins.WithRouter(params.Router)) if err != nil { - return nil, errors.Wrap(err, "config error") + return nil, fmt.Errorf("config error: %w", err) } if err := manager.Init(ctx); err != nil { - return nil, errors.Wrap(err, "initialization error") + return nil, fmt.Errorf("initialization error: %w", err) } metrics := prometheus.New(metrics.New(), errorLogger(logger)) - traceExporter, distrbutedTracingOpts, err := distributedtracing.Init(ctx, config, params.ID) + traceExporter, distributedTracingOpts, err := internal_tracing.Init(ctx, config, params.ID) if err != nil { return nil, fmt.Errorf("config error: %w", err) } - if distrbutedTracingOpts != nil { - params.DistrbutedTracingOpts = distrbutedTracingOpts + if distributedTracingOpts != nil { + params.DistributedTracingOpts = distributedTracingOpts } disco, err := discovery.New(manager, discovery.Factories(registeredPlugins), discovery.Metrics(metrics)) if err != nil { - return nil, errors.Wrap(err, "config error") + return nil, fmt.Errorf("config error: %w", err) } manager.Register("discovery", disco) @@ -418,14 +418,14 @@ func (rt *Runtime) Serve(ctx context.Context) error { if rt.traceExporter != nil { if err := rt.traceExporter.Start(ctx); err != nil { - rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Failed to start trace exporter.") + rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Failed to start OpenTelemetry trace exporter.") return err } defer func() { err := rt.traceExporter.Shutdown(ctx) if err != nil { - rt.logger.Error("Failed to shutdown OpenTelemetry trace exporter gracefully.") + rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Failed to shutdown OpenTelemetry trace exporter gracefully.") } }() } @@ -448,7 +448,7 @@ func (rt *Runtime) Serve(ctx context.Context) error { WithRuntime(rt.Manager.Info). WithMetrics(rt.metrics). WithMinTLSVersion(rt.Params.MinTLSVersion). - WithDistributedTracingOpts(rt.Params.DistrbutedTracingOpts) + WithDistributedTracingOpts(rt.Params.DistributedTracingOpts) if rt.Params.DiagnosticAddrs != nil { rt.server = rt.server.WithDiagnosticAddresses(*rt.Params.DiagnosticAddrs) @@ -586,9 +586,10 @@ func (rt *Runtime) StartREPL(ctx context.Context) { repl.Loop(ctx) } -// SetDistributedTracingErrorHandler configures the distributed tracing's ErrorHandler. -func (rt *Runtime) SetDistributedTracingErrorHandler() { - distributedtracing.SetErrorHandler(rt.logger) +// SetDistributedTracingLogging configures the distributed tracing's ErrorHandler, +// and logger instances. +func (rt *Runtime) SetDistributedTracingLogging() { + internal_tracing.SetupLogging(rt.logger) } func (rt *Runtime) checkOPAUpdate(ctx context.Context) *report.DataResponse { diff --git a/server/features.go b/server/features.go index 37de967a71..ea846a427a 100644 --- a/server/features.go +++ b/server/features.go @@ -2,6 +2,7 @@ // Use of this source code is governed by an Apache2 // license that can be found in the LICENSE file. +//go:build opa_wasm // +build opa_wasm package server diff --git a/server/server.go b/server/server.go index a127e2cc3e..054f040263 100644 --- a/server/server.go +++ b/server/server.go @@ -26,15 +26,12 @@ import ( "github.com/gorilla/mux" "github.com/pkg/errors" - "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" "github.com/open-policy-agent/opa/ast" "github.com/open-policy-agent/opa/bundle" - "github.com/open-policy-agent/opa/internal/distributedtracing" - "github.com/open-policy-agent/opa/metrics" "github.com/open-policy-agent/opa/plugins" bundlePlugin "github.com/open-policy-agent/opa/plugins/bundle" @@ -48,6 +45,7 @@ import ( "github.com/open-policy-agent/opa/topdown" iCache "github.com/open-policy-agent/opa/topdown/cache" "github.com/open-policy-agent/opa/topdown/lineage" + "github.com/open-policy-agent/opa/tracing" "github.com/open-policy-agent/opa/util" "github.com/open-policy-agent/opa/version" ) @@ -131,7 +129,7 @@ type Server struct { defaultDecisionPath string interQueryBuiltinCache iCache.InterQueryCache allPluginsOkOnce bool - distributedTracingOpts distributedtracing.Options + distributedTracingOpts tracing.Options } // Metrics defines the interface that the server requires for recording HTTP @@ -338,8 +336,8 @@ func (s *Server) WithMinTLSVersion(minTLSVersion uint16) *Server { } // WithDistributedTracingOpts sets the options to be used by distributed tracing. -func (s *Server) WithDistributedTracingOpts(traceOpts distributedtracing.Options) *Server { - s.distributedTracingOpts = traceOpts +func (s *Server) WithDistributedTracingOpts(opts tracing.Options) *Server { + s.distributedTracingOpts = opts return s } @@ -736,7 +734,7 @@ func (s *Server) initRouters() { func (s *Server) instrumentHandler(handler func(http.ResponseWriter, *http.Request), label string) http.Handler { var httpHandler http.Handler = http.HandlerFunc(handler) if len(s.distributedTracingOpts) > 0 { - httpHandler = otelhttp.NewHandler(http.HandlerFunc(handler), label, s.distributedTracingOpts...) + httpHandler = tracing.NewHandler(httpHandler, label, s.distributedTracingOpts) } if s.metrics != nil { return s.metrics.InstrumentHandler(httpHandler, label) diff --git a/test/e2e/distributedtracing/distributedtracing_test.go b/test/e2e/distributedtracing/distributedtracing_test.go index 53efa45c96..ab9d588064 100644 --- a/test/e2e/distributedtracing/distributedtracing_test.go +++ b/test/e2e/distributedtracing/distributedtracing_test.go @@ -1,3 +1,7 @@ +// Copyright 2021 The OPA Authors. All rights reserved. +// Use of this source code is governed by an Apache2 +// license that can be found in the LICENSE file. + package distributedtracing import ( @@ -8,8 +12,8 @@ import ( "strings" "testing" - "github.com/open-policy-agent/opa/internal/distributedtracing" "github.com/open-policy-agent/opa/test/e2e" + "github.com/open-policy-agent/opa/tracing" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/trace" @@ -21,13 +25,13 @@ var spanExporter *tracetest.InMemoryExporter func TestMain(m *testing.M) { spanExporter = tracetest.NewInMemoryExporter() - options := distributedtracing.Options{ + options := tracing.NewOptions( otelhttp.WithTracerProvider(trace.NewTracerProvider(trace.WithSpanProcessor(trace.NewSimpleSpanProcessor(spanExporter)))), - } + ) flag.Parse() testServerParams := e2e.NewAPIServerTestParams() - testServerParams.DistrbutedTracingOpts = options + testServerParams.DistributedTracingOpts = options var err error testRuntime, err = e2e.NewTestRuntime(testServerParams) @@ -101,6 +105,9 @@ func TestClientSpan(t *testing.T) { spans := spanExporter.GetSpans() + // 3 = GET /v1/data/test (HTTP server handler) + // + http.send (HTTP client instrumentation) + // + GET /health (HTTP server handler) if got, expected := len(spans), 3; got != expected { t.Fatalf("got %d span(s), expected %d", got, expected) } diff --git a/topdown/builtins.go b/topdown/builtins.go index 6c20a9959f..4081878673 100644 --- a/topdown/builtins.go +++ b/topdown/builtins.go @@ -12,11 +12,11 @@ import ( "math/rand" "github.com/open-policy-agent/opa/ast" - "github.com/open-policy-agent/opa/internal/distributedtracing" "github.com/open-policy-agent/opa/metrics" "github.com/open-policy-agent/opa/topdown/builtins" "github.com/open-policy-agent/opa/topdown/cache" "github.com/open-policy-agent/opa/topdown/print" + "github.com/open-policy-agent/opa/tracing" ) type ( @@ -35,23 +35,23 @@ type ( // BuiltinContext contains context from the evaluator that may be used by // built-in functions. BuiltinContext struct { - Context context.Context // request context that was passed when query started - Metrics metrics.Metrics // metrics registry for recording built-in specific metrics - Seed io.Reader // randomization source - Time *ast.Term // wall clock time - Cancel Cancel // atomic value that signals evaluation to halt - Runtime *ast.Term // runtime information on the OPA instance - Cache builtins.Cache // built-in function state cache - InterQueryBuiltinCache cache.InterQueryCache // cross-query built-in function state cache - Location *ast.Location // location of built-in call - Tracers []Tracer // Deprecated: Use QueryTracers instead - QueryTracers []QueryTracer // tracer objects for trace() built-in function - TraceEnabled bool // indicates whether tracing is enabled for the evaluation - QueryID uint64 // identifies query being evaluated - ParentID uint64 // identifies parent of query being evaluated - PrintHook print.Hook // provides callback function to use for printing - DistributedTracingOpts distributedtracing.Options // options to be used by distributed tracing. - rand *rand.Rand // randomization source for non-security-sensitive operations + Context context.Context // request context that was passed when query started + Metrics metrics.Metrics // metrics registry for recording built-in specific metrics + Seed io.Reader // randomization source + Time *ast.Term // wall clock time + Cancel Cancel // atomic value that signals evaluation to halt + Runtime *ast.Term // runtime information on the OPA instance + Cache builtins.Cache // built-in function state cache + InterQueryBuiltinCache cache.InterQueryCache // cross-query built-in function state cache + Location *ast.Location // location of built-in call + Tracers []Tracer // Deprecated: Use QueryTracers instead + QueryTracers []QueryTracer // tracer objects for trace() built-in function + TraceEnabled bool // indicates whether tracing is enabled for the evaluation + QueryID uint64 // identifies query being evaluated + ParentID uint64 // identifies parent of query being evaluated + PrintHook print.Hook // provides callback function to use for printing + DistributedTracingOpts tracing.Options // options to be used by distributed tracing. + rand *rand.Rand // randomization source for non-security-sensitive operations } // BuiltinFunc defines an interface for implementing built-in functions. diff --git a/topdown/eval.go b/topdown/eval.go index 22aaa0cb26..d2fdce5411 100644 --- a/topdown/eval.go +++ b/topdown/eval.go @@ -9,13 +9,13 @@ import ( "strings" "github.com/open-policy-agent/opa/ast" - "github.com/open-policy-agent/opa/internal/distributedtracing" "github.com/open-policy-agent/opa/metrics" "github.com/open-policy-agent/opa/storage" "github.com/open-policy-agent/opa/topdown/builtins" "github.com/open-policy-agent/opa/topdown/cache" "github.com/open-policy-agent/opa/topdown/copypropagation" "github.com/open-policy-agent/opa/topdown/print" + "github.com/open-policy-agent/opa/tracing" ) type evalIterator func(*eval) error @@ -91,7 +91,7 @@ type eval struct { runtime *ast.Term builtinErrors *builtinErrors printHook print.Hook - distributedTracingOpts distributedtracing.Options + tracingOpts tracing.Options findOne bool } @@ -716,7 +716,7 @@ func (e *eval) evalCall(terms []*ast.Term, iter unifyIterator) error { QueryID: e.queryID, ParentID: parentID, PrintHook: e.printHook, - DistributedTracingOpts: e.distributedTracingOpts, + DistributedTracingOpts: e.tracingOpts, } eval := evalBuiltin{ diff --git a/topdown/http.go b/topdown/http.go index c03c8f603c..e855794cff 100644 --- a/topdown/http.go +++ b/topdown/http.go @@ -26,8 +26,8 @@ import ( "github.com/open-policy-agent/opa/internal/version" "github.com/open-policy-agent/opa/topdown/builtins" "github.com/open-policy-agent/opa/topdown/cache" + "github.com/open-policy-agent/opa/tracing" "github.com/open-policy-agent/opa/util" - "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" ) type cachingMode string @@ -574,7 +574,7 @@ func createHTTPRequest(bctx BuiltinContext, obj ast.Object) (*http.Request, *htt } if len(bctx.DistributedTracingOpts) > 0 { - client.Transport = otelhttp.NewTransport(client.Transport, bctx.DistributedTracingOpts...) + client.Transport = tracing.NewTransport(client.Transport, bctx.DistributedTracingOpts) } return req, client, nil diff --git a/topdown/http_test.go b/topdown/http_test.go index 45f598d931..66ea15fc70 100644 --- a/topdown/http_test.go +++ b/topdown/http_test.go @@ -26,11 +26,10 @@ import ( "testing" "time" - "github.com/open-policy-agent/opa/internal/distributedtracing" "github.com/open-policy-agent/opa/internal/version" "github.com/open-policy-agent/opa/metrics" "github.com/open-policy-agent/opa/topdown/builtins" - "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "github.com/open-policy-agent/opa/tracing" iCache "github.com/open-policy-agent/opa/topdown/cache" @@ -2626,24 +2625,29 @@ func TestSocketHTTPGetRequest(t *testing.T) { } } +type tracemock struct { + called int +} + +func (m *tracemock) NewTransport(rt http.RoundTripper, _ tracing.Options) http.RoundTripper { + m.called++ + return rt +} +func (*tracemock) NewHandler(http.Handler, string, tracing.Options) http.Handler { + panic("unreachable") +} + func TestDistributedTracingEnabled(t *testing.T) { - c := []byte(`{"distributed_tracing": { - "enabled": true - }}`) - ctx := context.Background() - _, traceOpts, err := distributedtracing.Init(ctx, c, "foo") - if err != nil { - t.Fatalf("Unexpected error initializing trace exporter %v", err) - } + mock := tracemock{} + tracing.RegisterHTTPTracing(&mock) builtinContext := BuiltinContext{ - Context: ctx, - DistributedTracingOpts: traceOpts, + Context: context.Background(), + DistributedTracingOpts: tracing.NewOptions(true), // any option means it's enabled } - obj := ast.NewObject() - _, client, err := createHTTPRequest(builtinContext, obj) + _, client, err := createHTTPRequest(builtinContext, ast.NewObject()) if err != nil { t.Fatalf("Unexpected error creating HTTP request %v", err) } @@ -2651,17 +2655,21 @@ func TestDistributedTracingEnabled(t *testing.T) { t.Fatal("No Transport defined") } - _, ok := client.Transport.(*otelhttp.Transport) - - if !ok { - t.Fatal("Expected otelhttp.Transport") + if exp, act := 1, mock.called; exp != act { + t.Errorf("calls to NewTransported: expected %d, got %d", exp, act) } - } func TestDistributedTracingDisabled(t *testing.T) { - obj := ast.NewObject() - _, client, err := createHTTPRequest(BuiltinContext{Context: context.Background()}, obj) + + mock := tracemock{} + tracing.RegisterHTTPTracing(&mock) + + builtinContext := BuiltinContext{ + Context: context.Background(), + } + + _, client, err := createHTTPRequest(builtinContext, ast.NewObject()) if err != nil { t.Fatalf("Unexpected error creating HTTP request %v", err) } @@ -2669,10 +2677,7 @@ func TestDistributedTracingDisabled(t *testing.T) { t.Fatal("No Transport defined") } - _, ok := client.Transport.(*otelhttp.Transport) - - if ok { - t.Fatal("Unexpected otelhttp.Transport") + if exp, act := 0, mock.called; exp != act { + t.Errorf("calls to NewTransported: expected %d, got %d", exp, act) } - } diff --git a/topdown/query.go b/topdown/query.go index c9caf6c865..9a9a3c679e 100644 --- a/topdown/query.go +++ b/topdown/query.go @@ -7,16 +7,15 @@ import ( "sort" "time" - "github.com/open-policy-agent/opa/resolver" - "github.com/open-policy-agent/opa/topdown/cache" - "github.com/open-policy-agent/opa/topdown/print" - "github.com/open-policy-agent/opa/ast" - "github.com/open-policy-agent/opa/internal/distributedtracing" "github.com/open-policy-agent/opa/metrics" + "github.com/open-policy-agent/opa/resolver" "github.com/open-policy-agent/opa/storage" "github.com/open-policy-agent/opa/topdown/builtins" + "github.com/open-policy-agent/opa/topdown/cache" "github.com/open-policy-agent/opa/topdown/copypropagation" + "github.com/open-policy-agent/opa/topdown/print" + "github.com/open-policy-agent/opa/tracing" ) // QueryResultSet represents a collection of results returned by a query. @@ -55,7 +54,7 @@ type Query struct { interQueryBuiltinCache cache.InterQueryCache strictBuiltinErrors bool printHook print.Hook - distributedTracingOpts distributedtracing.Options + tracingOpts tracing.Options } // Builtin represents a built-in function that queries can call. @@ -261,8 +260,8 @@ func (q *Query) WithPrintHook(h print.Hook) *Query { } // WithDistributedTracingOpts sets the options to be used by distributed tracing. -func (q *Query) WithDistributedTracingOpts(tr distributedtracing.Options) *Query { - q.distributedTracingOpts = tr +func (q *Query) WithDistributedTracingOpts(tr tracing.Options) *Query { + q.tracingOpts = tr return q } @@ -461,7 +460,7 @@ func (q *Query) Iter(ctx context.Context, iter func(QueryResult) error) error { earlyExit: q.earlyExit, builtinErrors: &builtinErrors{}, printHook: q.printHook, - distributedTracingOpts: q.distributedTracingOpts, + tracingOpts: q.tracingOpts, } e.caller = e q.metrics.Timer(metrics.RegoQueryEval).Start() diff --git a/tracing/tracing.go b/tracing/tracing.go new file mode 100644 index 0000000000..2708b78e29 --- /dev/null +++ b/tracing/tracing.go @@ -0,0 +1,55 @@ +// Copyright 2021 The OPA Authors. All rights reserved. +// Use of this source code is governed by an Apache2 +// license that can be found in the LICENSE file. + +// Package tracing enables dependency-injection at runtime. When used +// together with an underscore-import of `github.com/open-policy-agent/opa/features/tracing`, +// the server and its runtime will emit OpenTelemetry spans to the +// configured sink. +package tracing + +import "net/http" + +// Options are options for the HTTPTracingService, passed along as-is. +type Options []interface{} + +// NewOptions is a helper method for constructing `tracing.Options` +func NewOptions(opts ...interface{}) Options { + return opts +} + +// HTTPTracingService defines how distributed tracing comes in, server- and client-side +type HTTPTracingService interface { + // NewTransport is used when setting up an HTTP client + NewTransport(http.RoundTripper, Options) http.RoundTripper + + // NewHandler is used to wrap an http.Handler in the server + NewHandler(http.Handler, string, Options) http.Handler +} + +var tracing HTTPTracingService + +// RegisterHTTPTracing enables a HTTPTracingService for further use. +func RegisterHTTPTracing(ht HTTPTracingService) { + tracing = ht +} + +// NewTransport returns another http.RoundTripper, instrumented to emit tracing +// spans according to Options. Provided by the HTTPTracingService registered with +// this package via RegisterHTTPTracing. +func NewTransport(tr http.RoundTripper, opts Options) http.RoundTripper { + if tracing == nil { + return tr + } + return tracing.NewTransport(tr, opts) +} + +// NewHandler returns another http.Handler, instrumented to emit tracing spans +// according to Options. Provided by the HTTPTracingService registered with +// this package via RegisterHTTPTracing. +func NewHandler(f http.Handler, label string, opts Options) http.Handler { + if tracing == nil { + return f + } + return tracing.NewHandler(f, label, opts) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 1374ba6eaa..df821b1f0b 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -60,6 +60,7 @@ github.com/ghodss/yaml ## explicit github.com/go-ini/ini # github.com/go-logr/logr v1.2.1 +## explicit github.com/go-logr/logr github.com/go-logr/logr/funcr # github.com/go-logr/stdr v1.2.0