diff --git a/CHANGELOG.md b/CHANGELOG.md index b3abe45887e..e679cec50a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add `WithInstrumentationAttributeSet` option to `go.opentelemetry.io/otel/log`, `go.opentelemetry.io/otel/metric`, and `go.opentelemetry.io/otel/trace` packages. This provides a concurrent-safe and performant alternative to `WithInstrumentationAttributes` by accepting a pre-constructed `attribute.Set`. (#7287) +- - Add experimental self-observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc`. +Check the `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/x` package documentation for more information. (#7084) ### Fixed diff --git a/exporters/otlp/otlplog/otlploggrpc/client.go b/exporters/otlp/otlplog/otlploggrpc/client.go index d1b31ef2aa6..0bbbec912ad 100644 --- a/exporters/otlp/otlplog/otlploggrpc/client.go +++ b/exporters/otlp/otlplog/otlploggrpc/client.go @@ -22,7 +22,11 @@ import ( "google.golang.org/grpc/status" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/counter" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/observ" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/retry" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/x" + "go.opentelemetry.io/otel/semconv/v1.37.0/otelconv" ) // The methods of this type are not expected to be called concurrently. @@ -38,6 +42,8 @@ type client struct { ourConn bool conn *grpc.ClientConn lsc collogpb.LogsServiceClient + + instrumentation *observ.Instrumentation } // Used for testing. @@ -72,7 +78,20 @@ func newClient(cfg config) (*client, error) { c.lsc = collogpb.NewLogsServiceClient(c.conn) - return c, nil + if !x.SelfObservability.Enabled() { + return c, nil + } + + id := counter.NextExporterID() + componentName := fmt.Sprintf("%s/%d", otelconv.ComponentTypeOtlpGRPCLogExporter, id) + var err error + c.instrumentation, err = observ.NewInstrumentation( + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc", + componentName, + otelconv.ComponentTypeOtlpGRPCLogExporter, + c.conn.CanonicalTarget(), + ) + return c, err } func newGRPCDialOptions(cfg config) []grpc.DialOption { @@ -121,7 +140,7 @@ func newGRPCDialOptions(cfg config) []grpc.DialOption { // The otlplog.Exporter synchronizes access to client methods, and // ensures this is not called after the Exporter is shutdown. Only thing // to do here is send data. -func (c *client) UploadLogs(ctx context.Context, rl []*logpb.ResourceLogs) error { +func (c *client) UploadLogs(ctx context.Context, rl []*logpb.ResourceLogs) (err error) { select { case <-ctx.Done(): // Do not upload if the context is already expired. @@ -132,16 +151,33 @@ func (c *client) UploadLogs(ctx context.Context, rl []*logpb.ResourceLogs) error ctx, cancel := c.exportContext(ctx) defer cancel() + success := int64(len(rl)) + // partialSuccessErr records an error when the export is partially successful. + var partialSuccessErr error + if c.instrumentation != nil { + count := len(rl) + trackExportFunc := c.instrumentation.ExportSpans(ctx, int64(count)) + defer func() { + if partialSuccessErr != nil { + trackExportFunc(partialSuccessErr, success, status.Code(partialSuccessErr)) + return + } + trackExportFunc(err, success, status.Code(err)) + }() + } + return c.requestFunc(ctx, func(ctx context.Context) error { resp, err := c.lsc.Export(ctx, &collogpb.ExportLogsServiceRequest{ ResourceLogs: rl, }) + if resp != nil && resp.PartialSuccess != nil { msg := resp.PartialSuccess.GetErrorMessage() n := resp.PartialSuccess.GetRejectedLogRecords() + success -= n if n != 0 || msg != "" { - err := fmt.Errorf("OTLP partial success: %s (%d log records rejected)", msg, n) - otel.Handle(err) + partialSuccessErr = fmt.Errorf("OTLP partial success: %s (%d log records rejected)", msg, n) + otel.Handle(partialSuccessErr) } } // nil is converted to OK. @@ -149,6 +185,7 @@ func (c *client) UploadLogs(ctx context.Context, rl []*logpb.ResourceLogs) error // Success. return nil } + success = 0 return err }) } diff --git a/exporters/otlp/otlplog/otlploggrpc/client_test.go b/exporters/otlp/otlplog/otlploggrpc/client_test.go index e1526f735ab..b3f3520eb08 100644 --- a/exporters/otlp/otlplog/otlploggrpc/client_test.go +++ b/exporters/otlp/otlplog/otlploggrpc/client_test.go @@ -29,8 +29,16 @@ import ( "google.golang.org/protobuf/types/known/durationpb" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/observ" + "go.opentelemetry.io/otel/sdk" + "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/log" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" semconv "go.opentelemetry.io/otel/semconv/v1.37.0" + "go.opentelemetry.io/otel/semconv/v1.37.0/otelconv" ) var ( @@ -601,3 +609,419 @@ func TestConfig(t *testing.T) { assert.Equal(t, []string{headers[key]}, got[key]) }) } + +func TestSelfObservability(t *testing.T) { + testCases := []struct { + name string + enabled bool + test func(t *testing.T, scopeMetrics func() metricdata.ScopeMetrics) + }{ + { + name: "disable", + enabled: false, + test: func(t *testing.T, _ func() metricdata.ScopeMetrics) { + client, _ := clientFactory(t, nil) + assert.Empty(t, client.instrumentation) + }, + }, + { + name: "upload success", + enabled: true, + test: func(t *testing.T, scopeMetrics func() metricdata.ScopeMetrics) { + ctx := context.Background() + client, coll := clientFactory(t, nil) + componentName := fmt.Sprintf( + "%s/%d", + otelconv.ComponentTypeOtlpGRPCLogExporter, + 0, + ) + serverAddrAttrs := observ.ServerAddrAttrs(client.conn.Target()) + wantMetrics := metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{ + Name: "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc", + Version: sdk.Version(), + SchemaURL: semconv.SchemaURL, + }, + Metrics: []metricdata.Metrics{ + { + Name: otelconv.SDKExporterLogInflight{}.Name(), + Description: otelconv.SDKExporterLogInflight{}.Description(), + Unit: otelconv.SDKExporterLogInflight{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + otelconv.SDKExporterLogInflight{}.AttrComponentName(componentName), + otelconv.SDKExporterLogInflight{}.AttrComponentType( + otelconv.ComponentTypeOtlpGRPCLogExporter, + ), + serverAddrAttrs[0], + serverAddrAttrs[1], + ), + Value: 0, + }, + }, + }, + }, + { + Name: otelconv.SDKExporterLogExported{}.Name(), + Description: otelconv.SDKExporterLogExported{}.Description(), + Unit: otelconv.SDKExporterLogExported{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + otelconv.SDKExporterLogExported{}.AttrComponentName(componentName), + otelconv.SDKExporterLogExported{}.AttrComponentType( + otelconv.ComponentTypeOtlpGRPCLogExporter, + ), + serverAddrAttrs[0], + serverAddrAttrs[1], + ), + Value: int64(len(resourceLogs)), + }, + }, + }, + }, + { + Name: otelconv.SDKExporterOperationDuration{}.Name(), + Description: otelconv.SDKExporterOperationDuration{}.Description(), + Unit: otelconv.SDKExporterOperationDuration{}.Unit(), + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet( + otelconv.SDKExporterLogExported{}.AttrComponentName(componentName), + otelconv.SDKExporterOperationDuration{}.AttrComponentType( + otelconv.ComponentTypeOtlpGRPCLogExporter, + ), + otelconv.SDKExporterOperationDuration{}.AttrRPCGRPCStatusCode( + otelconv.RPCGRPCStatusCodeAttr( + codes.OK, + ), + ), + serverAddrAttrs[0], + serverAddrAttrs[1], + ), + Count: 1, + }, + }, + }, + }, + }, + } + + require.NoError(t, client.UploadLogs(ctx, resourceLogs)) + require.NoError(t, client.Shutdown(ctx)) + got := coll.Collect().Dump() + require.Len(t, got, 1, "upload of one ResourceLogs") + diff := cmp.Diff(got[0], resourceLogs[0], cmp.Comparer(proto.Equal)) + if diff != "" { + t.Fatalf("unexpected ResourceLogs:\n%s", diff) + } + g := scopeMetrics() + metricdatatest.AssertEqual(t, wantMetrics.Metrics[0], g.Metrics[0], metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, wantMetrics.Metrics[1], g.Metrics[1], metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual( + t, + wantMetrics.Metrics[2], + g.Metrics[2], + metricdatatest.IgnoreTimestamp(), + metricdatatest.IgnoreValue(), + ) + }, + }, + { + name: "partial success", + enabled: true, + test: func(t *testing.T, scopeMetrics func() metricdata.ScopeMetrics) { + const n, msg = 2, "bad data" + rCh := make(chan exportResult, 1) + rCh <- exportResult{ + Response: &collogpb.ExportLogsServiceResponse{ + PartialSuccess: &collogpb.ExportLogsPartialSuccess{ + RejectedLogRecords: n, + ErrorMessage: msg, + }, + }, + } + ctx := context.Background() + client, _ := clientFactory(t, rCh) + + componentName := fmt.Sprintf( + "%s/%d", + otelconv.ComponentTypeOtlpGRPCLogExporter, + 1, + ) + serverAddrAttrs := observ.ServerAddrAttrs(client.conn.Target()) + wantErr := fmt.Errorf("OTLP partial success: %s (%d log records rejected)", msg, n) + wantMetrics := metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{ + Name: "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc", + Version: sdk.Version(), + SchemaURL: semconv.SchemaURL, + }, + Metrics: []metricdata.Metrics{ + { + Name: otelconv.SDKExporterLogInflight{}.Name(), + Description: otelconv.SDKExporterLogInflight{}.Description(), + Unit: otelconv.SDKExporterLogInflight{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + otelconv.SDKExporterLogInflight{}.AttrComponentName(componentName), + otelconv.SDKExporterLogInflight{}.AttrComponentType( + otelconv.ComponentTypeOtlpGRPCLogExporter, + ), + serverAddrAttrs[0], + serverAddrAttrs[1], + ), + + Value: 0, + }, + }, + }, + }, + { + Name: otelconv.SDKExporterLogExported{}.Name(), + Description: otelconv.SDKExporterLogExported{}.Description(), + Unit: otelconv.SDKExporterLogExported{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + otelconv.SDKExporterLogExported{}.AttrComponentName(componentName), + otelconv.SDKExporterLogExported{}.AttrComponentType( + otelconv.ComponentTypeOtlpGRPCLogExporter, + ), + serverAddrAttrs[0], + serverAddrAttrs[1], + ), + Value: int64(len(resourceLogs)) - 2, + }, + { + Attributes: attribute.NewSet( + otelconv.SDKExporterLogExported{}.AttrComponentName(componentName), + otelconv.SDKExporterLogExported{}.AttrComponentType( + otelconv.ComponentTypeOtlpGRPCLogExporter, + ), + serverAddrAttrs[0], + serverAddrAttrs[1], + semconv.ErrorType(wantErr), + ), + Value: 2, + }, + }, + }, + }, + { + Name: otelconv.SDKExporterOperationDuration{}.Name(), + Description: otelconv.SDKExporterOperationDuration{}.Description(), + Unit: otelconv.SDKExporterOperationDuration{}.Unit(), + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet( + otelconv.SDKExporterLogExported{}.AttrComponentName(componentName), + otelconv.SDKExporterOperationDuration{}.AttrComponentType( + otelconv.ComponentTypeOtlpGRPCLogExporter, + ), + otelconv.SDKExporterOperationDuration{}.AttrRPCGRPCStatusCode( + otelconv.RPCGRPCStatusCodeAttr( + status.Code(wantErr), + ), + ), + serverAddrAttrs[0], + serverAddrAttrs[1], + semconv.ErrorType(wantErr), + ), + Count: 1, + }, + }, + }, + }, + }, + } + + defer func(orig otel.ErrorHandler) { + otel.SetErrorHandler(orig) + }(otel.GetErrorHandler()) + + var errs []error + eh := otel.ErrorHandlerFunc(func(e error) { errs = append(errs, e) }) + otel.SetErrorHandler(eh) + + require.NoError(t, client.UploadLogs(ctx, resourceLogs)) + + require.Len(t, errs, 1) + assert.ErrorContains(t, errs[0], wantErr.Error()) + + g := scopeMetrics() + metricdatatest.AssertEqual(t, wantMetrics.Metrics[0], g.Metrics[0], metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, wantMetrics.Metrics[1], g.Metrics[1], metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual( + t, + wantMetrics.Metrics[2], + g.Metrics[2], + metricdatatest.IgnoreTimestamp(), + metricdatatest.IgnoreValue(), + ) + }, + }, + { + name: "upload failure", + enabled: true, + test: func(t *testing.T, scopeMetrics func() metricdata.ScopeMetrics) { + wantErr := status.Error(codes.InvalidArgument, "request contains invalid arguments") + wantErrTypeAttr := semconv.ErrorType(wantErr) + wantGRPCStatusCodeAttr := otelconv.RPCGRPCStatusCodeAttr(codes.InvalidArgument) + rCh := make(chan exportResult, 1) + rCh <- exportResult{ + Err: wantErr, + } + ctx := context.Background() + client, _ := clientFactory(t, rCh) + err := client.UploadLogs(ctx, resourceLogs) + assert.ErrorContains(t, err, "request contains invalid arguments") + + componentName := fmt.Sprintf( + "%s/%d", + otelconv.ComponentTypeOtlpGRPCLogExporter, + 2, + ) + serverAddrAttrs := observ.ServerAddrAttrs(client.conn.Target()) + wantMetrics := metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{ + Name: "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc", + Version: sdk.Version(), + SchemaURL: semconv.SchemaURL, + }, + Metrics: []metricdata.Metrics{ + { + Name: otelconv.SDKExporterLogInflight{}.Name(), + Description: otelconv.SDKExporterLogInflight{}.Description(), + Unit: otelconv.SDKExporterLogInflight{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + otelconv.SDKExporterLogInflight{}.AttrComponentName(componentName), + otelconv.SDKExporterLogInflight{}.AttrComponentType( + otelconv.ComponentTypeOtlpGRPCLogExporter, + ), + serverAddrAttrs[0], + serverAddrAttrs[1], + ), + Value: 0, + }, + }, + }, + }, + { + Name: otelconv.SDKExporterLogExported{}.Name(), + Description: otelconv.SDKExporterLogExported{}.Description(), + Unit: otelconv.SDKExporterLogExported{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + otelconv.SDKExporterLogExported{}.AttrComponentName(componentName), + otelconv.SDKExporterLogExported{}.AttrComponentType( + otelconv.ComponentTypeOtlpGRPCLogExporter, + ), + serverAddrAttrs[0], + serverAddrAttrs[1], + ), + Value: 0, + }, + { + Attributes: attribute.NewSet( + otelconv.SDKExporterLogExported{}.AttrComponentName(componentName), + otelconv.SDKExporterLogExported{}.AttrComponentType( + otelconv.ComponentTypeOtlpGRPCLogExporter, + ), + serverAddrAttrs[0], + serverAddrAttrs[1], + wantErrTypeAttr, + ), + Value: 1, + }, + }, + }, + }, + { + Name: otelconv.SDKExporterOperationDuration{}.Name(), + Description: otelconv.SDKExporterOperationDuration{}.Description(), + Unit: otelconv.SDKExporterOperationDuration{}.Unit(), + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet( + otelconv.SDKExporterLogExported{}.AttrComponentName(componentName), + otelconv.SDKExporterOperationDuration{}.AttrComponentType( + otelconv.ComponentTypeOtlpGRPCLogExporter, + ), + otelconv.SDKExporterOperationDuration{}.AttrRPCGRPCStatusCode( + wantGRPCStatusCodeAttr, + ), + serverAddrAttrs[0], + serverAddrAttrs[1], + wantErrTypeAttr, + ), + Count: 1, + }, + }, + }, + }, + }, + } + g := scopeMetrics() + metricdatatest.AssertEqual(t, wantMetrics.Metrics[0], g.Metrics[0], metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, wantMetrics.Metrics[1], g.Metrics[1], metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual( + t, + wantMetrics.Metrics[2], + g.Metrics[2], + metricdatatest.IgnoreTimestamp(), + metricdatatest.IgnoreValue(), + ) + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + if tc.enabled { + t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", "True") + } + prev := otel.GetMeterProvider() + t.Cleanup(func() { + otel.SetMeterProvider(prev) + }) + r := metric.NewManualReader() + mp := metric.NewMeterProvider(metric.WithReader(r)) + otel.SetMeterProvider(mp) + + scopeMetrics := func() metricdata.ScopeMetrics { + var got metricdata.ResourceMetrics + err := r.Collect(context.Background(), &got) + require.NoError(t, err) + require.Len(t, got.ScopeMetrics, 1) + return got.ScopeMetrics[0] + } + tc.test(t, scopeMetrics) + }) + } +} diff --git a/exporters/otlp/otlplog/otlploggrpc/go.mod b/exporters/otlp/otlplog/otlploggrpc/go.mod index 41cec54a3ee..2851607bc52 100644 --- a/exporters/otlp/otlplog/otlploggrpc/go.mod +++ b/exporters/otlp/otlplog/otlploggrpc/go.mod @@ -11,9 +11,11 @@ require ( github.com/stretchr/testify v1.11.1 go.opentelemetry.io/otel v1.38.0 go.opentelemetry.io/otel/log v0.14.0 + go.opentelemetry.io/otel/metric v1.38.0 go.opentelemetry.io/otel/sdk v1.38.0 go.opentelemetry.io/otel/sdk/log v0.14.0 go.opentelemetry.io/otel/sdk/log/logtest v0.14.0 + go.opentelemetry.io/otel/sdk/metric v1.38.0 go.opentelemetry.io/otel/trace v1.38.0 go.opentelemetry.io/proto/otlp v1.8.0 google.golang.org/genproto/googleapis/rpc v0.0.0-20250908214217-97024824d090 @@ -29,7 +31,6 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect - go.opentelemetry.io/otel/metric v1.38.0 // indirect golang.org/x/net v0.43.0 // indirect golang.org/x/sys v0.36.0 // indirect golang.org/x/text v0.29.0 // indirect diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/counter/counter.go b/exporters/otlp/otlplog/otlploggrpc/internal/counter/counter.go new file mode 100644 index 00000000000..af1be225231 --- /dev/null +++ b/exporters/otlp/otlplog/otlploggrpc/internal/counter/counter.go @@ -0,0 +1,30 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package counter provides a simple counter for generating unique IDs. +// +// This package is used to generate unique IDs while allowing testing packages +// to reset the counter. +package counter // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/counter" + +import ( + "sync/atomic" +) + +// exporterN is a global 0-based count of the number of exporters created. +var exporterN atomic.Int64 + +// NextExporterID returns the next unique ID for an exporter. +func NextExporterID() int64 { + const inc = 1 + return exporterN.Add(inc) - inc +} + +// SetExporterID sets the exporter ID counter to v and returns the previous +// value. +// +// This function is useful for testing purposes, allowing you to reset the +// counter. It should not be used in production code. +func SetExporterID(v int64) int64 { + return exporterN.Swap(v) +} diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/counter/counter_test.go b/exporters/otlp/otlplog/otlploggrpc/internal/counter/counter_test.go new file mode 100644 index 00000000000..787216e3bbc --- /dev/null +++ b/exporters/otlp/otlplog/otlploggrpc/internal/counter/counter_test.go @@ -0,0 +1,62 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package counter + +import ( + "sync" + "testing" +) + +func TestNextExporterID(t *testing.T) { + SetExporterID(0) + + var expected int64 + for range 10 { + id := NextExporterID() + if id != expected { + t.Errorf("NextExporterID() = %d; want %d", id, expected) + } + expected++ + } +} + +func TestSetExporterID(t *testing.T) { + SetExporterID(0) + + prev := SetExporterID(42) + if prev != 0 { + t.Errorf("SetExporterID(42) returned %d; want 0", prev) + } + + id := NextExporterID() + if id != 42 { + t.Errorf("NextExporterID() = %d; want 42", id) + } +} + +func TestNextExporterIDConcurrentSafe(t *testing.T) { + SetExporterID(0) + + const goroutines = 100 + const increments = 10 + + var wg sync.WaitGroup + wg.Add(goroutines) + + for range goroutines { + go func() { + defer wg.Done() + for range increments { + NextExporterID() + } + }() + } + + wg.Wait() + + expected := int64(goroutines * increments) + if id := NextExporterID(); id != expected { + t.Errorf("NextExporterID() = %d; want %d", id, expected) + } +} diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/observ/instrumentation.go b/exporters/otlp/otlplog/otlploggrpc/internal/observ/instrumentation.go new file mode 100644 index 00000000000..12ad406ba57 --- /dev/null +++ b/exporters/otlp/otlplog/otlploggrpc/internal/observ/instrumentation.go @@ -0,0 +1,225 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package observ provides self-observability metrics for OTLP log exporters. +// This is an experimental feature controlled by the x.SelfObservability feature flag. +package observ // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/observ" + +import ( + "context" + "errors" + "fmt" + "net" + "net/url" + "strconv" + "strings" + "sync" + "time" + + "google.golang.org/grpc/codes" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk" + semconv "go.opentelemetry.io/otel/semconv/v1.37.0" + "go.opentelemetry.io/otel/semconv/v1.37.0/otelconv" +) + +var ( + attrsPool = &sync.Pool{ + New: func() any { + // "component.name" + "component.type" + "error.type" + "server.address" + "server.port" + const n = 1 + 1 + 1 + 1 + 1 + s := make([]attribute.KeyValue, 0, n) + return &s + }, + } + addOpPool = &sync.Pool{ + New: func() any { + const n = 1 // WithAttributeSet + o := make([]metric.AddOption, 0, n) + return &o + }, + } + recordOptPool = &sync.Pool{ + New: func() any { + const n = 1 + 1 // WithAttributeSet + "rpc.grpc.status_code" + o := make([]metric.RecordOption, 0, n) + return &o + }, + } +) + +func get[T any](p *sync.Pool) *[]T { return p.Get().(*[]T) } +func put[T any](p *sync.Pool, s *[]T) { + *s = (*s)[:0] + p.Put(s) +} + +// Instrumentation is experimental instrumentation for the exporter. +type Instrumentation struct { + logInflightMetric metric.Int64UpDownCounter + logExportedMetric metric.Int64Counter + logExportedDurationMetric metric.Float64Histogram + presetAttrs []attribute.KeyValue + setOpt metric.MeasurementOption +} + +// NewInstrumentation returns instrumentation for otlplog grpc exporter. +func NewInstrumentation( + name, componentName string, + componentType otelconv.ComponentTypeAttr, + target string, +) (*Instrumentation, error) { + i := &Instrumentation{} + + mp := otel.GetMeterProvider() + m := mp.Meter( + name, + metric.WithInstrumentationVersion(sdk.Version()), + metric.WithSchemaURL(semconv.SchemaURL), + ) + + var err error + + logInflightMetric, e := otelconv.NewSDKExporterLogInflight(m) + if e != nil { + e = fmt.Errorf("failed to create span inflight metric: %w", e) + otel.Handle(e) + err = errors.Join(err, e) + } + i.logInflightMetric = logInflightMetric.Inst() + + logExportedMetric, e := otelconv.NewSDKExporterLogExported(m) + if e != nil { + e = fmt.Errorf("failed to create span exported metric: %w", e) + otel.Handle(e) + err = errors.Join(err, e) + } + i.logExportedMetric = logExportedMetric.Inst() + + logOpDurationMetric, e := otelconv.NewSDKExporterOperationDuration(m) + if e != nil { + e = fmt.Errorf("failed to create operation duration metric: %w", e) + otel.Handle(e) + err = errors.Join(err, e) + } + i.logExportedDurationMetric = logOpDurationMetric.Inst() + if err != nil { + return nil, err + } + + i.presetAttrs = []attribute.KeyValue{ + semconv.OTelComponentName(componentName), + semconv.OTelComponentTypeKey.String(string(componentType)), + } + i.presetAttrs = append(i.presetAttrs, ServerAddrAttrs(target)...) + s := attribute.NewSet(i.presetAttrs...) + i.setOpt = metric.WithAttributeSet(s) + + return i, nil +} + +// ExportSpanDone is a function that is called when a call to an Exporter's +// ExportSpans method completes +// +// The number of successful exports is provided as success. Any error that is encountered is provided as error +// The code of last gRPC requests performed in scope of this export call. +type ExportSpanDone func(err error, success int64, code codes.Code) + +// ExportSpans instruments the ExportSpans method of the exporter. It returns a +// function that needs to be deferred so it is called when the method returns. +func (i *Instrumentation) ExportSpans(ctx context.Context, count int64) ExportSpanDone { + addOpt := get[metric.AddOption](addOpPool) + defer put(addOpPool, addOpt) + + *addOpt = append(*addOpt, i.setOpt) + + start := time.Now() + i.logInflightMetric.Add(ctx, count, *addOpt...) + + return i.end(ctx, start, count) +} + +func (i *Instrumentation) end(ctx context.Context, start time.Time, count int64) ExportSpanDone { + return func(err error, success int64, code codes.Code) { + addOpt := get[metric.AddOption](addOpPool) + defer put(addOpPool, addOpt) + + *addOpt = append(*addOpt, i.setOpt) + + duration := time.Since(start).Seconds() + i.logInflightMetric.Add(ctx, -count, *addOpt...) + i.logExportedMetric.Add(ctx, success, *addOpt...) + + mOpt := i.setOpt + if err != nil { + attrs := get[attribute.KeyValue](attrsPool) + defer put(attrsPool, attrs) + *attrs = append(*attrs, i.presetAttrs...) + *attrs = append(*attrs, semconv.ErrorType(err)) + + set := attribute.NewSet(*attrs...) + mOpt = metric.WithAttributeSet(set) + + *addOpt = append((*addOpt)[:0], mOpt) + + i.logExportedMetric.Add(ctx, count-success, *addOpt...) + } + + recordOpt := get[metric.RecordOption](recordOptPool) + defer put(recordOptPool, recordOpt) + *recordOpt = append( + *recordOpt, + mOpt, + metric.WithAttributes( + semconv.RPCGRPCStatusCodeKey.Int64(int64(code)), + ), + ) + i.logExportedDurationMetric.Record(ctx, duration, *recordOpt...) + } +} + +// ServerAddrAttrs is a function that extracts server address and port attributes +// from a target string. +func ServerAddrAttrs(target string) []attribute.KeyValue { + if !strings.Contains(target, "://") { + return splitHostPortAttrs(target) + } + + u, err := url.Parse(target) + if err != nil || u.Scheme == "" { + return splitHostPortAttrs(target) + } + + switch u.Scheme { + case "unix": + // unix:///path/to/socket + return []attribute.KeyValue{semconv.ServerAddress(u.Path)} + case "dns": + // dns:///example.com:42 or dns://8.8.8.8/example.com:42 + addr := u.Opaque + if addr == "" { + addr = strings.TrimPrefix(u.Path, "/") + } + return splitHostPortAttrs(addr) + default: + return splitHostPortAttrs(u.Host) + } +} + +func splitHostPortAttrs(target string) []attribute.KeyValue { + host, pStr, err := net.SplitHostPort(target) + if err != nil { + return []attribute.KeyValue{semconv.ServerAddress(target)} + } + port, err := strconv.Atoi(pStr) + if err != nil { + return []attribute.KeyValue{semconv.ServerAddress(host)} + } + return []attribute.KeyValue{ + semconv.ServerAddress(host), + semconv.ServerPort(port), + } +} diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/observ/instrumentation_test.go b/exporters/otlp/otlplog/otlploggrpc/internal/observ/instrumentation_test.go new file mode 100644 index 00000000000..35d1de80fae --- /dev/null +++ b/exporters/otlp/otlplog/otlploggrpc/internal/observ/instrumentation_test.go @@ -0,0 +1,125 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package observ // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/observ" + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + mapi "go.opentelemetry.io/otel/metric" + semconv "go.opentelemetry.io/otel/semconv/v1.36.0" +) + +type errMeterProvider struct { + mapi.MeterProvider + + err error +} + +func (m *errMeterProvider) Meter(string, ...mapi.MeterOption) mapi.Meter { + return &errMeter{err: m.err} +} + +type errMeter struct { + mapi.Meter + + err error +} + +func (m *errMeter) Int64UpDownCounter(string, ...mapi.Int64UpDownCounterOption) (mapi.Int64UpDownCounter, error) { + return nil, m.err +} + +func (m *errMeter) Int64Counter(string, ...mapi.Int64CounterOption) (mapi.Int64Counter, error) { + return nil, m.err +} + +func (m *errMeter) Float64Histogram(string, ...mapi.Float64HistogramOption) (mapi.Float64Histogram, error) { + return nil, m.err +} + +func TestNewExporterMetrics(t *testing.T) { + t.Run("No Error", func(t *testing.T) { + em, err := NewInstrumentation( + "newExportMetricsTest", + "newExportMetricsTest/1", + "newExportMetricsTest", + "localhost:8080", + ) + require.NoError(t, err) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.OTelComponentName("newExportMetricsTest/1"), + semconv.OTelComponentTypeKey.String("newExportMetricsTest"), + semconv.ServerAddress("localhost"), + semconv.ServerPort(8080), + }, em.presetAttrs) + + assert.NotNil(t, em.logInflightMetric, "logInflightMetric should be created") + assert.NotNil(t, em.logExportedMetric, "logExportedMetric should be created") + assert.NotNil(t, em.logExportedDurationMetric, "logExportedDurationMetric should be created") + }) + + t.Run("Error", func(t *testing.T) { + orig := otel.GetMeterProvider() + t.Cleanup(func() { otel.SetMeterProvider(orig) }) + mp := &errMeterProvider{err: assert.AnError} + otel.SetMeterProvider(mp) + + _, err := NewInstrumentation( + "newExportMetrics", + "newExportMetrics/1", + "newExportMetrics", + "localhost:8080", + ) + require.ErrorIs(t, err, assert.AnError, "new instrument errors") + + assert.ErrorContains(t, err, "inflight metric") + assert.ErrorContains(t, err, "span exported metric") + assert.ErrorContains(t, err, "operation duration metric") + }) +} + +func TestServerAddrAttrs(t *testing.T) { + testcases := []struct { + name string + target string + want []attribute.KeyValue + }{ + { + name: "Unix socket", + target: "unix:///tmp/grpc.sock", + want: []attribute.KeyValue{semconv.ServerAddress("/tmp/grpc.sock")}, + }, + { + name: "DNS with port", + target: "dns:///localhost:8080", + want: []attribute.KeyValue{semconv.ServerAddress("localhost"), semconv.ServerPort(8080)}, + }, + { + name: "Dns with endpoint host:port", + target: "dns://8.8.8.8/example.com:4", + want: []attribute.KeyValue{semconv.ServerAddress("example.com"), semconv.ServerPort(4)}, + }, + { + name: "Simple host port", + target: "localhost:10001", + want: []attribute.KeyValue{semconv.ServerAddress("localhost"), semconv.ServerPort(10001)}, + }, + { + name: "Host without port", + target: "example.com", + want: []attribute.KeyValue{semconv.ServerAddress("example.com")}, + }, + } + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + attrs := ServerAddrAttrs(tc.target) + assert.Equal(t, tc.want, attrs) + }) + } +} diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/x/README.md b/exporters/otlp/otlplog/otlploggrpc/internal/x/README.md new file mode 100644 index 00000000000..bc8e580597e --- /dev/null +++ b/exporters/otlp/otlplog/otlploggrpc/internal/x/README.md @@ -0,0 +1,36 @@ +# Experimental Features + +The `otlploggrpc` exporter contains features that have not yet stabilized in the OpenTelemetry specification. +These features are added to the `otlploggrpc` exporter prior to stabilization in the specification so that users can start experimenting with them and provide feedback. + +These features may change in backwards incompatible ways as feedback is applied. +See the [Compatibility and Stability](#compatibility-and-stability) section for more information. + +## Features + +- [Self-Observability](#self-observability) + +### Self-Observability + +The `otlploggrpc` exporter provides a self-observability feature that allows you to monitor the exporter itself. + +To opt-in, set the environment variable `OTEL_GO_X_SELF_OBSERVABILITY` to `true`. + +When enabled, the exporter will create the following metrics using the global `MeterProvider`: + +- `otel.sdk.exporter.log.inflight` +- `otel.sdk.exporter.log.exported` +- `otel.sdk.exporter.operation.duration` + +Please see the [Semantic conventions for OpenTelemetry SDK metrics] documentation for more details on these metrics. + +[Semantic conventions for OpenTelemetry SDK metrics]: https://github.com/open-telemetry/semantic-conventions/blob/v1.36.0/docs/otel/sdk-metrics.md + +## Compatibility and Stability + +Experimental features do not fall within the scope of the OpenTelemetry Go versioning and stability [policy](../../../../../../VERSIONING.md). +These features may be removed or modified in successive version releases, including patch versions. + +When an experimental feature is promoted to a stable feature, a migration path will be included in the changelog entry of the release. +There is no guarantee that any environment variable feature flags that enabled the experimental feature will be supported by the stable version. +If they are supported, they may be accompanied with a deprecation notice stating a timeline for the removal of that support. diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/x/x.go b/exporters/otlp/otlplog/otlploggrpc/internal/x/x.go new file mode 100644 index 00000000000..b609f2deb28 --- /dev/null +++ b/exporters/otlp/otlplog/otlploggrpc/internal/x/x.go @@ -0,0 +1,68 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package x contains support for OTel SDK experimental features. +// +// This package should only be used for features defined in the specification. +// It should not be used for experiments or new project ideas. +package x // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/x" + +import ( + "os" + "strings" +) + +// Feature is an experimental feature control flag. It provides a uniform way +// to interact with these feature flags and parse their values. +type Feature[T any] struct { + key string + parse func(v string) (T, bool) +} + +func newFeature[T any](suffix string, parse func(string) (T, bool)) Feature[T] { + const envKeyRoot = "OTEL_GO_X_" + return Feature[T]{ + key: envKeyRoot + suffix, + parse: parse, + } +} + +// SelfObservability is an experimental feature flag that determines if SDK +// self-observability metrics are enabled. +// +// To enable this feature set the OTEL_GO_X_SELF_OBSERVABILITY environment variable +// to the case-insensitive string value of "true" (i.e. "True" and "TRUE" +// will also enable this). +var SelfObservability = newFeature("SELF_OBSERVABILITY", func(v string) (string, bool) { + if strings.EqualFold(v, "true") { + return v, true + } + return "", false +}) + +// Key returns the environment variable key that needs to be set to enable the +// feature. +func (f Feature[T]) Key() string { + return f.key +} + +// Lookup returns the user configured value for the feature and true if the +// user has enabled the feature. Otherwise, if the feature is not enabled, a +// zero-value and false are returned. +func (f Feature[T]) Lookup() (v T, ok bool) { + // https://github.com/open-telemetry/opentelemetry-specification/blob/62effed618589a0bec416a87e559c0a9d96289bb/specification/configuration/sdk-environment-variables.md#parsing-empty-value + // + // > The SDK MUST interpret an empty value of an environment variable the + // > same way as when the variable is unset. + vRaw := os.Getenv(f.key) + if vRaw == "" { + return v, ok + } + return f.parse(vRaw) +} + +// Enabled reports whether the feature is enabled. +func (f Feature[T]) Enabled() bool { + _, ok := f.Lookup() + return ok +} diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/x/x_test.go b/exporters/otlp/otlplog/otlploggrpc/internal/x/x_test.go new file mode 100644 index 00000000000..bf386209147 --- /dev/null +++ b/exporters/otlp/otlplog/otlploggrpc/internal/x/x_test.go @@ -0,0 +1,60 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package x + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSelfObservability(t *testing.T) { + const key = "OTEL_GO_X_SELF_OBSERVABILITY" + require.Equal(t, key, SelfObservability.Key()) + + t.Run("-100", run(setenv(key, "-100"), assertDisabled(SelfObservability))) + t.Run("100", run(setenv(key, "100"), assertDisabled(SelfObservability))) + t.Run("true", run(setenv(key, "true"), assertEnabled(SelfObservability, "true"))) + t.Run("True", run(setenv(key, "True"), assertEnabled(SelfObservability, "True"))) + t.Run("false", run(setenv(key, "false"), assertDisabled(SelfObservability))) + t.Run("empty", run(assertDisabled(SelfObservability))) +} + +func run(steps ...func(*testing.T)) func(*testing.T) { + return func(t *testing.T) { + t.Helper() + for _, step := range steps { + step(t) + } + } +} + +func setenv(k, v string) func(t *testing.T) { //nolint:unparam // This is a reusable test utility function. + return func(t *testing.T) { t.Setenv(k, v) } +} + +func assertEnabled[T any](f Feature[T], want T) func(*testing.T) { + return func(t *testing.T) { + t.Helper() + assert.True(t, f.Enabled(), "not enabled") + + v, ok := f.Lookup() + assert.True(t, ok, "Lookup state") + assert.Equal(t, want, v, "Lookup value") + } +} + +func assertDisabled[T any](f Feature[T]) func(*testing.T) { + var zero T + return func(t *testing.T) { + t.Helper() + + assert.False(t, f.Enabled(), "enabled") + + v, ok := f.Lookup() + assert.False(t, ok, "Lookup state") + assert.Equal(t, zero, v, "Lookup value") + } +}