diff --git a/CHANGELOG.md b/CHANGELOG.md index e42f0eea459..6f1eaddb72a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Greatly reduce the cost of recording metrics in `go.opentelemetry.io/otel/sdk/metric` using hashing for map keys. (#7175) - Add experimental observability for the prometheus exporter in `go.opentelemetry.io/otel/exporters/prometheus`. Check the `go.opentelemetry.io/otel/exporters/prometheus/internal/x` package documentation for more information. (#7345) +- Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc`. (#7353) ### Fixed diff --git a/exporters/otlp/otlplog/otlploggrpc/client.go b/exporters/otlp/otlplog/otlploggrpc/client.go index dcd01bf1130..6a80ec128d9 100644 --- a/exporters/otlp/otlplog/otlploggrpc/client.go +++ b/exporters/otlp/otlplog/otlploggrpc/client.go @@ -6,7 +6,7 @@ package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/o import ( "context" "errors" - "fmt" + "sync/atomic" "time" collogpb "go.opentelemetry.io/proto/otlp/collector/logs/v1" @@ -21,6 +21,8 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/observ" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/retry" ) @@ -37,6 +39,8 @@ type client struct { ourConn bool conn *grpc.ClientConn lsc collogpb.LogsServiceClient + + instrumentation *observ.Instrumentation } // Used for testing. @@ -71,7 +75,18 @@ func newClient(cfg config) (*client, error) { c.lsc = collogpb.NewLogsServiceClient(c.conn) - return c, nil + var err error + id := nextExporterID() + c.instrumentation, err = observ.NewInstrumentation(id, c.conn.CanonicalTarget()) + return c, err +} + +var exporterN atomic.Int64 + +// nextExporterID returns the next unique ID for an exporter. +func nextExporterID() int64 { + const inc = 1 + return exporterN.Add(inc) - inc } func newGRPCDialOptions(cfg config) []grpc.DialOption { @@ -131,6 +146,14 @@ func (c *client) UploadLogs(ctx context.Context, rl []*logpb.ResourceLogs) (uplo ctx, cancel := c.exportContext(ctx) defer cancel() + count := int64(len(rl)) + if c.instrumentation != nil { + eo := c.instrumentation.ExportLogs(ctx, count) + defer func() { + eo.End(uploadErr) + }() + } + return errors.Join(uploadErr, c.requestFunc(ctx, func(ctx context.Context) error { resp, err := c.lsc.Export(ctx, &collogpb.ExportLogsServiceRequest{ ResourceLogs: rl, @@ -139,7 +162,7 @@ func (c *client) UploadLogs(ctx context.Context, rl []*logpb.ResourceLogs) (uplo msg := resp.PartialSuccess.GetErrorMessage() n := resp.PartialSuccess.GetRejectedLogRecords() if n != 0 || msg != "" { - err := errPartial{msg: msg, n: n} + err := internal.LogPartialSuccessError(n, msg) uploadErr = errors.Join(uploadErr, err) } } @@ -152,23 +175,6 @@ func (c *client) UploadLogs(ctx context.Context, rl []*logpb.ResourceLogs) (uplo })) } -type errPartial struct { - msg string - n int64 -} - -var _ error = errPartial{} - -func (e errPartial) Error() string { - const form = "OTLP partial success: %s (%d log records rejected)" - return fmt.Sprintf(form, e.msg, e.n) -} - -func (errPartial) Is(target error) bool { - _, ok := target.(errPartial) - return ok -} - // Shutdown shuts down the client, freeing all resources. // // Any active connections to a remote endpoint are closed if they were created diff --git a/exporters/otlp/otlplog/otlploggrpc/client_test.go b/exporters/otlp/otlplog/otlploggrpc/client_test.go index 929c91736d1..593b9b47466 100644 --- a/exporters/otlp/otlplog/otlploggrpc/client_test.go +++ b/exporters/otlp/otlplog/otlploggrpc/client_test.go @@ -5,6 +5,7 @@ package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/o import ( "context" + "errors" "net" "sync" "testing" @@ -27,8 +28,17 @@ import ( "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/durationpb" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/observ" + "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/log" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" semconv "go.opentelemetry.io/otel/semconv/v1.37.0" + "go.opentelemetry.io/otel/semconv/v1.37.0/otelconv" ) var ( @@ -546,7 +556,7 @@ func TestClient(t *testing.T) { ctx := t.Context() client, _ := clientFactory(t, rCh) - assert.ErrorIs(t, client.UploadLogs(ctx, resourceLogs), errPartial{}) + assert.ErrorIs(t, client.UploadLogs(ctx, resourceLogs), internal.PartialSuccess{}) assert.NoError(t, client.UploadLogs(ctx, resourceLogs)) assert.NoError(t, client.UploadLogs(ctx, resourceLogs)) }) @@ -587,3 +597,694 @@ func TestConfig(t *testing.T) { assert.Equal(t, []string{headers[key]}, got[key]) }) } + +// SetExporterID sets the exporter ID counter to v and returns the previous +// value. +// +// This function is useful for testing purposes, allowing you to reset the +// counter. It should not be used in production code. +func SetExporterID(v int64) int64 { + return exporterN.Swap(v) +} + +func TestClientObservability(t *testing.T) { + testCases := []struct { + name string + enabled bool + test func(t *testing.T, scopeMetrics func() metricdata.ScopeMetrics) + }{ + { + name: "disable", + enabled: false, + test: func(t *testing.T, _ func() metricdata.ScopeMetrics) { + client, _ := clientFactory(t, nil) + assert.Empty(t, client.instrumentation) + }, + }, + { + name: "upload success", + enabled: true, + test: func(t *testing.T, scopeMetrics func() metricdata.ScopeMetrics) { + ctx := t.Context() + client, coll := clientFactory(t, nil) + + componentName := observ.GetComponentName(0) + serverAddrAttrs := observ.ServerAddrAttrs(client.conn.CanonicalTarget()) + wantMetrics := metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{ + Name: observ.ScopeName, + Version: observ.Version, + SchemaURL: semconv.SchemaURL, + }, + Metrics: []metricdata.Metrics{ + { + Name: otelconv.SDKExporterLogInflight{}.Name(), + Description: otelconv.SDKExporterLogInflight{}.Description(), + Unit: otelconv.SDKExporterLogInflight{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + otelconv.SDKExporterLogInflight{}.AttrComponentName(componentName), + otelconv.SDKExporterLogInflight{}.AttrComponentType( + otelconv.ComponentTypeOtlpGRPCLogExporter, + ), + serverAddrAttrs[0], + serverAddrAttrs[1], + ), + Value: 0, + }, + }, + }, + }, + { + Name: otelconv.SDKExporterLogExported{}.Name(), + Description: otelconv.SDKExporterLogExported{}.Description(), + Unit: otelconv.SDKExporterLogExported{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + otelconv.SDKExporterLogExported{}.AttrComponentName(componentName), + otelconv.SDKExporterLogExported{}.AttrComponentType( + otelconv.ComponentTypeOtlpGRPCLogExporter, + ), + serverAddrAttrs[0], + serverAddrAttrs[1], + ), + Value: int64(len(resourceLogs)), + }, + }, + }, + }, + { + Name: otelconv.SDKExporterOperationDuration{}.Name(), + Description: otelconv.SDKExporterOperationDuration{}.Description(), + Unit: otelconv.SDKExporterOperationDuration{}.Unit(), + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet( + otelconv.SDKExporterLogExported{}.AttrComponentName(componentName), + otelconv.SDKExporterOperationDuration{}.AttrComponentType( + otelconv.ComponentTypeOtlpGRPCLogExporter, + ), + otelconv.SDKExporterOperationDuration{}.AttrRPCGRPCStatusCode( + otelconv.RPCGRPCStatusCodeAttr( + codes.OK, + ), + ), + serverAddrAttrs[0], + serverAddrAttrs[1], + ), + Count: 1, + }, + }, + }, + }, + }, + } + + require.NoError(t, client.UploadLogs(ctx, resourceLogs)) + require.NoError(t, client.Shutdown(ctx)) + got := coll.Collect().Dump() + require.Len(t, got, 1, "upload of one ResourceLogs") + diff := cmp.Diff(got[0], resourceLogs[0], cmp.Comparer(proto.Equal)) + if diff != "" { + t.Fatalf("unexpected ResourceLogs:\n%s", diff) + } + + assert.Equal(t, instrumentation.Scope{ + Name: observ.ScopeName, + Version: observ.Version, + SchemaURL: semconv.SchemaURL, + }, wantMetrics.Scope) + + g := scopeMetrics() + metricdatatest.AssertEqual(t, wantMetrics.Metrics[0], g.Metrics[0], metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, wantMetrics.Metrics[1], g.Metrics[1], metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual( + t, + wantMetrics.Metrics[2], + g.Metrics[2], + metricdatatest.IgnoreTimestamp(), + metricdatatest.IgnoreValue(), + ) + }, + }, + { + name: "partial success", + enabled: true, + test: func(t *testing.T, scopeMetrics func() metricdata.ScopeMetrics) { + const n, msg = 2, "bad data" + rCh := make(chan exportResult, 1) + rCh <- exportResult{ + Response: &collogpb.ExportLogsServiceResponse{ + PartialSuccess: &collogpb.ExportLogsPartialSuccess{ + RejectedLogRecords: n, + ErrorMessage: msg, + }, + }, + } + ctx := t.Context() + client, _ := clientFactory(t, rCh) + + componentName := observ.GetComponentName(0) + serverAddrAttrs := observ.ServerAddrAttrs(client.conn.CanonicalTarget()) + var wantErr error + wantErr = errors.Join(wantErr, internal.LogPartialSuccessError(n, msg)) + wantMetrics := metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{ + Name: observ.ScopeName, + Version: observ.Version, + SchemaURL: semconv.SchemaURL, + }, + Metrics: []metricdata.Metrics{ + { + Name: otelconv.SDKExporterLogInflight{}.Name(), + Description: otelconv.SDKExporterLogInflight{}.Description(), + Unit: otelconv.SDKExporterLogInflight{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + otelconv.SDKExporterLogInflight{}.AttrComponentName(componentName), + otelconv.SDKExporterLogInflight{}.AttrComponentType( + otelconv.ComponentTypeOtlpGRPCLogExporter, + ), + serverAddrAttrs[0], + serverAddrAttrs[1], + ), + + Value: 0, + }, + }, + }, + }, + { + Name: otelconv.SDKExporterLogExported{}.Name(), + Description: otelconv.SDKExporterLogExported{}.Description(), + Unit: otelconv.SDKExporterLogExported{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + otelconv.SDKExporterLogExported{}.AttrComponentName(componentName), + otelconv.SDKExporterLogExported{}.AttrComponentType( + otelconv.ComponentTypeOtlpGRPCLogExporter, + ), + serverAddrAttrs[0], + serverAddrAttrs[1], + ), + Value: 0, + }, + { + Attributes: attribute.NewSet( + otelconv.SDKExporterLogExported{}.AttrComponentName(componentName), + otelconv.SDKExporterLogExported{}.AttrComponentType( + otelconv.ComponentTypeOtlpGRPCLogExporter, + ), + serverAddrAttrs[0], + serverAddrAttrs[1], + semconv.ErrorType(wantErr), + ), + Value: 1, + }, + }, + }, + }, + { + Name: otelconv.SDKExporterOperationDuration{}.Name(), + Description: otelconv.SDKExporterOperationDuration{}.Description(), + Unit: otelconv.SDKExporterOperationDuration{}.Unit(), + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet( + otelconv.SDKExporterLogExported{}.AttrComponentName(componentName), + otelconv.SDKExporterOperationDuration{}.AttrComponentType( + otelconv.ComponentTypeOtlpGRPCLogExporter, + ), + otelconv.SDKExporterOperationDuration{}.AttrRPCGRPCStatusCode( + otelconv.RPCGRPCStatusCodeAttr( + status.Code(wantErr), + ), + ), + serverAddrAttrs[0], + serverAddrAttrs[1], + semconv.ErrorType(wantErr), + ), + Count: 1, + }, + }, + }, + }, + }, + } + + err := client.UploadLogs(ctx, resourceLogs) + assert.ErrorContains(t, err, wantErr.Error()) + + assert.Equal(t, instrumentation.Scope{ + Name: observ.ScopeName, + Version: observ.Version, + SchemaURL: semconv.SchemaURL, + }, wantMetrics.Scope) + + g := scopeMetrics() + metricdatatest.AssertEqual(t, wantMetrics.Metrics[0], g.Metrics[0], metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, wantMetrics.Metrics[1], g.Metrics[1], metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual( + t, + wantMetrics.Metrics[2], + g.Metrics[2], + metricdatatest.IgnoreTimestamp(), + metricdatatest.IgnoreValue(), + ) + }, + }, + { + name: "upload failure", + enabled: true, + test: func(t *testing.T, scopeMetrics func() metricdata.ScopeMetrics) { + err := status.Error(codes.InvalidArgument, "request contains invalid arguments") + var wantErr error + wantErr = errors.Join(wantErr, err) + + wantErrTypeAttr := semconv.ErrorType(wantErr) + wantGRPCStatusCodeAttr := otelconv.RPCGRPCStatusCodeAttr(codes.InvalidArgument) + rCh := make(chan exportResult, 1) + rCh <- exportResult{ + Err: err, + } + ctx := t.Context() + client, _ := clientFactory(t, rCh) + uploadErr := client.UploadLogs(ctx, resourceLogs) + assert.ErrorContains(t, uploadErr, "request contains invalid arguments") + + componentName := observ.GetComponentName(0) + + serverAddrAttrs := observ.ServerAddrAttrs(client.conn.CanonicalTarget()) + wantMetrics := metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{ + Name: observ.ScopeName, + Version: observ.Version, + SchemaURL: semconv.SchemaURL, + }, + Metrics: []metricdata.Metrics{ + { + Name: otelconv.SDKExporterLogInflight{}.Name(), + Description: otelconv.SDKExporterLogInflight{}.Description(), + Unit: otelconv.SDKExporterLogInflight{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + otelconv.SDKExporterLogInflight{}.AttrComponentName(componentName), + otelconv.SDKExporterLogInflight{}.AttrComponentType( + otelconv.ComponentTypeOtlpGRPCLogExporter, + ), + serverAddrAttrs[0], + serverAddrAttrs[1], + ), + Value: 0, + }, + }, + }, + }, + { + Name: otelconv.SDKExporterLogExported{}.Name(), + Description: otelconv.SDKExporterLogExported{}.Description(), + Unit: otelconv.SDKExporterLogExported{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + otelconv.SDKExporterLogExported{}.AttrComponentName(componentName), + otelconv.SDKExporterLogExported{}.AttrComponentType( + otelconv.ComponentTypeOtlpGRPCLogExporter, + ), + serverAddrAttrs[0], + serverAddrAttrs[1], + ), + Value: 0, + }, + { + Attributes: attribute.NewSet( + otelconv.SDKExporterLogExported{}.AttrComponentName(componentName), + otelconv.SDKExporterLogExported{}.AttrComponentType( + otelconv.ComponentTypeOtlpGRPCLogExporter, + ), + serverAddrAttrs[0], + serverAddrAttrs[1], + wantErrTypeAttr, + ), + Value: 1, + }, + }, + }, + }, + { + Name: otelconv.SDKExporterOperationDuration{}.Name(), + Description: otelconv.SDKExporterOperationDuration{}.Description(), + Unit: otelconv.SDKExporterOperationDuration{}.Unit(), + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet( + otelconv.SDKExporterLogExported{}.AttrComponentName(componentName), + otelconv.SDKExporterOperationDuration{}.AttrComponentType( + otelconv.ComponentTypeOtlpGRPCLogExporter, + ), + otelconv.SDKExporterOperationDuration{}.AttrRPCGRPCStatusCode( + wantGRPCStatusCodeAttr, + ), + serverAddrAttrs[0], + serverAddrAttrs[1], + wantErrTypeAttr, + ), + Count: 1, + }, + }, + }, + }, + }, + } + g := scopeMetrics() + assert.Equal(t, instrumentation.Scope{ + Name: observ.ScopeName, + Version: observ.Version, + SchemaURL: semconv.SchemaURL, + }, wantMetrics.Scope) + + metricdatatest.AssertEqual(t, wantMetrics.Metrics[0], g.Metrics[0], metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, wantMetrics.Metrics[1], g.Metrics[1], metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual( + t, + wantMetrics.Metrics[2], + g.Metrics[2], + metricdatatest.IgnoreTimestamp(), + metricdatatest.IgnoreValue(), + ) + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + if tc.enabled { + t.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + + // Reset component name counter for each test. + _ = SetExporterID(0) + } + prev := otel.GetMeterProvider() + t.Cleanup(func() { + otel.SetMeterProvider(prev) + }) + r := metric.NewManualReader() + mp := metric.NewMeterProvider(metric.WithReader(r)) + otel.SetMeterProvider(mp) + + scopeMetrics := func() metricdata.ScopeMetrics { + var got metricdata.ResourceMetrics + err := r.Collect(t.Context(), &got) + require.NoError(t, err) + require.Len(t, got.ScopeMetrics, 1) + return got.ScopeMetrics[0] + } + tc.test(t, scopeMetrics) + }) + } +} + +func TestClientObservabilityWithRetry(t *testing.T) { + t.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + + _ = SetExporterID(0) + prev := otel.GetMeterProvider() + t.Cleanup(func() { + otel.SetMeterProvider(prev) + }) + + r := metric.NewManualReader() + mp := metric.NewMeterProvider(metric.WithReader(r)) + otel.SetMeterProvider(mp) + + scopeMetrics := func() metricdata.ScopeMetrics { + var got metricdata.ResourceMetrics + err := r.Collect(t.Context(), &got) + require.NoError(t, err) + require.Len(t, got.ScopeMetrics, 1) + return got.ScopeMetrics[0] + } + + rCh := make(chan exportResult, 2) + rCh <- exportResult{ + Err: status.Error(codes.Unavailable, "service temporarily unavailable"), + } + const n, msg = 1, "some logs rejected" + rCh <- exportResult{ + Response: &collogpb.ExportLogsServiceResponse{ + PartialSuccess: &collogpb.ExportLogsPartialSuccess{ + RejectedLogRecords: n, + ErrorMessage: msg, + }, + }, + } + + ctx := t.Context() + client, _ := clientFactory(t, rCh) + + componentName := observ.GetComponentName(0) + + serverAddrAttrs := observ.ServerAddrAttrs(client.conn.CanonicalTarget()) + var wantErr error + wantErr = errors.Join(wantErr, internal.LogPartialSuccessError(n, msg)) + + wantMetrics := metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{ + Name: observ.ScopeName, + Version: observ.Version, + SchemaURL: semconv.SchemaURL, + }, + Metrics: []metricdata.Metrics{ + { + Name: otelconv.SDKExporterLogInflight{}.Name(), + Description: otelconv.SDKExporterLogInflight{}.Description(), + Unit: otelconv.SDKExporterLogInflight{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + otelconv.SDKExporterLogInflight{}.AttrComponentName(componentName), + otelconv.SDKExporterLogInflight{}.AttrComponentType( + otelconv.ComponentTypeOtlpGRPCLogExporter, + ), + serverAddrAttrs[0], + serverAddrAttrs[1], + ), + Value: 0, + }, + }, + }, + }, + { + Name: otelconv.SDKExporterLogExported{}.Name(), + Description: otelconv.SDKExporterLogExported{}.Description(), + Unit: otelconv.SDKExporterLogExported{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + otelconv.SDKExporterLogExported{}.AttrComponentName(componentName), + otelconv.SDKExporterLogExported{}.AttrComponentType( + otelconv.ComponentTypeOtlpGRPCLogExporter, + ), + serverAddrAttrs[0], + serverAddrAttrs[1], + ), + Value: int64(len(resourceLogs)) - n, + }, + { + Attributes: attribute.NewSet( + otelconv.SDKExporterLogExported{}.AttrComponentName(componentName), + otelconv.SDKExporterLogExported{}.AttrComponentType( + otelconv.ComponentTypeOtlpGRPCLogExporter, + ), + serverAddrAttrs[0], + serverAddrAttrs[1], + semconv.ErrorType(wantErr), + ), + Value: n, + }, + }, + }, + }, + { + Name: otelconv.SDKExporterOperationDuration{}.Name(), + Description: otelconv.SDKExporterOperationDuration{}.Description(), + Unit: otelconv.SDKExporterOperationDuration{}.Unit(), + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet( + otelconv.SDKExporterLogExported{}.AttrComponentName(componentName), + otelconv.SDKExporterOperationDuration{}.AttrComponentType( + otelconv.ComponentTypeOtlpGRPCLogExporter, + ), + otelconv.SDKExporterOperationDuration{}.AttrRPCGRPCStatusCode( + otelconv.RPCGRPCStatusCodeAttr( + status.Code(wantErr), + ), + ), + serverAddrAttrs[0], + serverAddrAttrs[1], + semconv.ErrorType(wantErr), + ), + Count: 1, + }, + }, + }, + }, + }, + } + + err := client.UploadLogs(ctx, resourceLogs) + assert.ErrorContains(t, err, wantErr.Error()) + + assert.Equal(t, instrumentation.Scope{ + Name: observ.ScopeName, + Version: observ.Version, + SchemaURL: semconv.SchemaURL, + }, wantMetrics.Scope) + + g := scopeMetrics() + metricdatatest.AssertEqual(t, wantMetrics.Metrics[0], g.Metrics[0], metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual(t, wantMetrics.Metrics[1], g.Metrics[1], metricdatatest.IgnoreTimestamp()) + metricdatatest.AssertEqual( + t, + wantMetrics.Metrics[2], + g.Metrics[2], + metricdatatest.IgnoreTimestamp(), + metricdatatest.IgnoreValue(), + ) +} + +func BenchmarkExporterExportLogs(b *testing.B) { + const logRecordsCount = 100 + + run := func(b *testing.B) { + coll, err := newGRPCCollector("", nil) + require.NoError(b, err) + b.Cleanup(func() { + coll.srv.Stop() + }) + + ctx := b.Context() + opts := []Option{ + WithEndpoint(coll.listener.Addr().String()), + WithInsecure(), + WithTimeout(5 * time.Second), + } + exp, err := New(ctx, opts...) + require.NoError(b, err) + b.Cleanup(func() { + //nolint:usetesting // required to avoid getting a canceled context at cleanup. + assert.NoError(b, exp.Shutdown(context.Background())) + }) + + logs := make([]log.Record, logRecordsCount) + now := time.Now() + for i := range logs { + logs[i].SetTimestamp(now) + logs[i].SetObservedTimestamp(now) + } + + b.ReportAllocs() + b.ResetTimer() + + for b.Loop() { + err := exp.Export(b.Context(), logs) + require.NoError(b, err) + } + } + + b.Run("Observability", func(b *testing.B) { + b.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + run(b) + }) + + b.Run("NoObservability", func(b *testing.B) { + b.Setenv("OTEL_GO_X_OBSERVABILITY", "false") + run(b) + }) +} + +func TestNextExporterID(t *testing.T) { + SetExporterID(0) + + var expected int64 + for range 10 { + id := nextExporterID() + if id != expected { + t.Errorf("nextExporterID() = %d; want %d", id, expected) + } + expected++ + } +} + +func TestSetExporterID(t *testing.T) { + SetExporterID(0) + + prev := SetExporterID(42) + if prev != 0 { + t.Errorf("SetExporterID(42) returned %d; want 0", prev) + } + + id := nextExporterID() + if id != 42 { + t.Errorf("nextExporterID() = %d; want 42", id) + } +} + +func TestNextExporterIDConcurrentSafe(t *testing.T) { + SetExporterID(0) + + const goroutines = 100 + const increments = 10 + + var wg sync.WaitGroup + wg.Add(goroutines) + + for range goroutines { + go func() { + defer wg.Done() + for range increments { + nextExporterID() + } + }() + } + + wg.Wait() + + expected := int64(goroutines * increments) + if id := nextExporterID(); id != expected { + t.Errorf("nextExporterID() = %d; want %d", id, expected) + } +} diff --git a/exporters/otlp/otlplog/otlploggrpc/exporter_test.go b/exporters/otlp/otlplog/otlploggrpc/exporter_test.go index 68ac0c233cb..86840b69e59 100644 --- a/exporters/otlp/otlplog/otlploggrpc/exporter_test.go +++ b/exporters/otlp/otlplog/otlploggrpc/exporter_test.go @@ -11,6 +11,8 @@ import ( "sync/atomic" "testing" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" collogpb "go.opentelemetry.io/proto/otlp/collector/logs/v1" @@ -214,7 +216,7 @@ func TestExporter(t *testing.T) { c, _ := clientFactory(t, rCh) e := newExporter(c) - assert.ErrorIs(t, e.Export(ctx, records), errPartial{}) + assert.ErrorIs(t, e.Export(ctx, records), internal.PartialSuccess{}) assert.NoError(t, e.Export(ctx, records)) assert.NoError(t, e.Export(ctx, records)) }) diff --git a/exporters/otlp/otlplog/otlploggrpc/go.mod b/exporters/otlp/otlplog/otlploggrpc/go.mod index 706a351d707..b7c1b9cf7d9 100644 --- a/exporters/otlp/otlplog/otlploggrpc/go.mod +++ b/exporters/otlp/otlplog/otlploggrpc/go.mod @@ -11,9 +11,11 @@ require ( github.com/stretchr/testify v1.11.1 go.opentelemetry.io/otel v1.38.0 go.opentelemetry.io/otel/log v0.14.0 + go.opentelemetry.io/otel/metric v1.38.0 go.opentelemetry.io/otel/sdk v1.38.0 go.opentelemetry.io/otel/sdk/log v0.14.0 go.opentelemetry.io/otel/sdk/log/logtest v0.14.0 + go.opentelemetry.io/otel/sdk/metric v1.38.0 go.opentelemetry.io/otel/trace v1.38.0 go.opentelemetry.io/proto/otlp v1.8.0 google.golang.org/genproto/googleapis/rpc v0.0.0-20250929231259-57b25ae835d4 @@ -29,7 +31,6 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect - go.opentelemetry.io/otel/metric v1.38.0 // indirect golang.org/x/net v0.44.0 // indirect golang.org/x/sys v0.36.0 // indirect golang.org/x/text v0.29.0 // indirect diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/gen.go b/exporters/otlp/otlplog/otlploggrpc/internal/gen.go index 6e456d4049c..797bfc95c5f 100644 --- a/exporters/otlp/otlplog/otlploggrpc/internal/gen.go +++ b/exporters/otlp/otlplog/otlploggrpc/internal/gen.go @@ -5,6 +5,12 @@ // package. package internal // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal" +//go:generate gotmpl --body=../../../../../internal/shared/otlp/observ/target.go.tmpl "--data={ \"pkg\": \"observ\", \"pkg_path\": \"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/observ\" }" --out=observ/target.go +//go:generate gotmpl --body=../../../../../internal/shared/otlp/observ/target_test.go.tmpl "--data={ \"pkg\": \"observ\" }" --out=observ/target_test.go + +//go:generate gotmpl --body=../../../../../internal/shared/x/x.go.tmpl "--data={ \"pkg\": \"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc\" }" --out=x/x.go +//go:generate gotmpl --body=../../../../../internal/shared/x/x_test.go.tmpl "--data={}" --out=x/x_test.go + //go:generate gotmpl --body=../../../../../internal/shared/otlp/retry/retry.go.tmpl "--data={}" --out=retry/retry.go //go:generate gotmpl --body=../../../../../internal/shared/otlp/retry/retry_test.go.tmpl "--data={}" --out=retry/retry_test.go diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/observ/instrumentation.go b/exporters/otlp/otlplog/otlploggrpc/internal/observ/instrumentation.go new file mode 100644 index 00000000000..598564a56bc --- /dev/null +++ b/exporters/otlp/otlplog/otlploggrpc/internal/observ/instrumentation.go @@ -0,0 +1,310 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package observ provides observability metrics for OTLP log exporters. +// This is an experimental feature controlled by the x.Observability feature flag. +package observ // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/observ" + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "go.opentelemetry.io/otel/internal/global" + + "google.golang.org/grpc/status" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/x" + "go.opentelemetry.io/otel/metric" + semconv "go.opentelemetry.io/otel/semconv/v1.37.0" + "go.opentelemetry.io/otel/semconv/v1.37.0/otelconv" +) + +const ( + // ScopeName is the unique name of the meter used for instrumentation. + ScopeName = "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/observ" + + // Version is the current version of this instrumentation. + // + // This matches the version of the exporter. + Version = internal.Version +) + +var ( + attrsPool = &sync.Pool{ + New: func() any { + const n = 1 /* component.name */ + + 1 /* component.type */ + + 1 /* server.addr */ + + 1 /* server.port */ + + 1 /* error.type */ + + 1 /* rpc.grpc.status_code */ + s := make([]attribute.KeyValue, 0, n) + // Return a pointer to a slice instead of a slice itself + // to avoid allocations on every call. + return &s + }, + } + addOpPool = &sync.Pool{ + New: func() any { + const n = 1 // WithAttributeSet + o := make([]metric.AddOption, 0, n) + return &o + }, + } + recordOptPool = &sync.Pool{ + New: func() any { + const n = 1 // WithAttributeSet + o := make([]metric.RecordOption, 0, n) + return &o + }, + } +) + +func get[T any](p *sync.Pool) *[]T { return p.Get().(*[]T) } +func put[T any](p *sync.Pool, s *[]T) { + *s = (*s)[:0] + p.Put(s) +} + +// GetComponentName returns the constant name for the exporter with the +// provided id. +func GetComponentName(id int64) string { + return fmt.Sprintf("%s/%d", otelconv.ComponentTypeOtlpGRPCLogExporter, id) +} + +// getPresetAttrs builds the preset attributes for instrumentation. +func getPresetAttrs(id int64, target string) []attribute.KeyValue { + serverAttrs := ServerAddrAttrs(target) + attrs := make([]attribute.KeyValue, 0, 2+len(serverAttrs)) + + attrs = append( + attrs, + semconv.OTelComponentName(GetComponentName(id)), + semconv.OTelComponentTypeOtlpGRPCLogExporter, + ) + attrs = append(attrs, serverAttrs...) + + return attrs +} + +// Instrumentation is experimental instrumentation for the exporter. +type Instrumentation struct { + logInflightMetric metric.Int64UpDownCounter + logExportedMetric metric.Int64Counter + logExportedDurationMetric metric.Float64Histogram + + presetAttrs []attribute.KeyValue + addOpt metric.AddOption + recOpt metric.RecordOption +} + +// NewInstrumentation returns instrumentation for otlplog grpc exporter. +func NewInstrumentation(id int64, target string) (*Instrumentation, error) { + if !x.Observability.Enabled() { + return nil, nil + } + + i := &Instrumentation{} + + mp := otel.GetMeterProvider() + m := mp.Meter( + ScopeName, + metric.WithInstrumentationVersion(Version), + metric.WithSchemaURL(semconv.SchemaURL), + ) + + var err error + + logInflightMetric, e := otelconv.NewSDKExporterLogInflight(m) + if e != nil { + e = fmt.Errorf("failed to create log inflight metric: %w", e) + err = errors.Join(err, e) + } + i.logInflightMetric = logInflightMetric.Inst() + + logExportedMetric, e := otelconv.NewSDKExporterLogExported(m) + if e != nil { + e = fmt.Errorf("failed to create log exported metric: %w", e) + err = errors.Join(err, e) + } + i.logExportedMetric = logExportedMetric.Inst() + + logOpDurationMetric, e := otelconv.NewSDKExporterOperationDuration(m) + if e != nil { + e = fmt.Errorf("failed to create log operation duration metric: %w", e) + err = errors.Join(err, e) + } + i.logExportedDurationMetric = logOpDurationMetric.Inst() + if err != nil { + return nil, err + } + + i.presetAttrs = getPresetAttrs(id, target) + + i.addOpt = metric.WithAttributeSet(attribute.NewSet(i.presetAttrs...)) + i.recOpt = metric.WithAttributeSet(attribute.NewSet(append( + // Default to OK status code. + []attribute.KeyValue{semconv.RPCGRPCStatusCodeOk}, + i.presetAttrs..., + )...)) + return i, nil +} + +// ExportLogs instruments the ExportLogs method of the exporter. It returns +// an [ExportOp] that must have its [ExportOp.End] method called when the +// ExportLogs method returns. +func (i *Instrumentation) ExportLogs(ctx context.Context, count int64) ExportOp { + start := time.Now() + addOpt := get[metric.AddOption](addOpPool) + defer put(addOpPool, addOpt) + + *addOpt = append(*addOpt, i.addOpt) + + i.logInflightMetric.Add(ctx, count, *addOpt...) + + return ExportOp{ + nLogs: count, + ctx: ctx, + start: start, + inst: i, + } +} + +// ExportOp tracks the operation being observed by [Instrumentation.ExportLogs]. +type ExportOp struct { + nLogs int64 + ctx context.Context + start time.Time + + inst *Instrumentation +} + +// End completes the observation of the operation being observed by a call to +// [Instrumentation.ExportLogs]. +// Any error that is encountered is provided as err. +// +// If err is not nil, all logs will be recorded as failures unless error is of +// type [internal.PartialSuccess]. In the case of a PartialSuccess, the number +// of successfully exported logs will be determined by inspecting the +// RejectedItems field of the PartialSuccess. +func (e ExportOp) End(err error) { + addOpt := get[metric.AddOption](addOpPool) + defer put(addOpPool, addOpt) + *addOpt = append(*addOpt, e.inst.addOpt) + + e.inst.logInflightMetric.Add(e.ctx, -e.nLogs, *addOpt...) + success := successful(e.nLogs, err) + e.inst.logExportedMetric.Add(e.ctx, success, *addOpt...) + + if err != nil { + // Add the error.type attribute to the attribute set. + attrs := get[attribute.KeyValue](attrsPool) + defer put(attrsPool, attrs) + *attrs = append(*attrs, e.inst.presetAttrs...) + *attrs = append(*attrs, semconv.ErrorType(err)) + + o := metric.WithAttributeSet(attribute.NewSet(*attrs...)) + + // Reset addOpt with new attribute set + *addOpt = append((*addOpt)[:0], o) + + e.inst.logExportedMetric.Add(e.ctx, e.nLogs-success, *addOpt...) + } + + recordOpt := get[metric.RecordOption](recordOptPool) + defer put(recordOptPool, recordOpt) + *recordOpt = append(*recordOpt, e.inst.recordOption(err)) + e.inst.logExportedDurationMetric.Record(e.ctx, time.Since(e.start).Seconds(), *recordOpt...) +} + +func (i *Instrumentation) recordOption(err error) metric.RecordOption { + if err == nil { + return i.recOpt + } + attrs := get[attribute.KeyValue](attrsPool) + defer put(attrsPool, attrs) + + *attrs = append(*attrs, i.presetAttrs...) + code := int64(status.Code(err)) + *attrs = append( + *attrs, + semconv.RPCGRPCStatusCodeKey.Int64(code), + semconv.ErrorType(err), + ) + + return metric.WithAttributeSet(attribute.NewSet(*attrs...)) +} + +// successful returns the number of successfully exported logs out of the n +// that were exported based on the provided error. +// +// If err is nil, n is returned. All logs were successfully exported. +// +// If err is not nil and not an [internal.PartialSuccess] error, 0 is returned. +// It is assumed all logs failed to be exported. +// +// If err is an [internal.PartialSuccess] error, the number of successfully +// exported logs is computed by subtracting the RejectedItems field from n. If +// RejectedItems is negative, n is returned. If RejectedItems is greater than +// n, 0 is returned. +func successful(n int64, err error) int64 { + if err == nil { + return n // All logs successfully exported. + } + // Split rejection calculation so successful is inlineable. + return n - rejectedCount(n, err) +} + +var errPool = sync.Pool{ + New: func() any { + return new(internal.PartialSuccess) + }, +} + +// rejectedCount returns how many out of the n logs exporter were rejected based on +// the provided non-nil err. +func rejectedCount(n int64, err error) int64 { + ps := errPool.Get().(*internal.PartialSuccess) + defer errPool.Put(ps) + + // check for partial success + if errors.As(err, ps) { + return min(max(ps.RejectedItems, 0), n) + } + // all logs exporter + return n +} + +// ServerAddrAttrs is a function that extracts server address and port attributes +// from a target string. +func ServerAddrAttrs(target string) []attribute.KeyValue { + addr, port, err := ParseCanonicalTarget(target) + if err != nil || (addr == "" && port < 0) { + if err != nil { + global.Debug("failed to parse target", "target", target, "error", err) + } + return nil + } + + // Unix domain sockets: return only the path as server.address + if port == -1 { + return []attribute.KeyValue{semconv.ServerAddress(addr)} + } + + // For network addresses, only include port if it's valid (> 0) + if port > 0 { + return []attribute.KeyValue{ + semconv.ServerAddress(addr), + semconv.ServerPort(port), + } + } + + // Port is 0 or invalid, only return address + return []attribute.KeyValue{semconv.ServerAddress(addr)} +} diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/observ/instrumentation_test.go b/exporters/otlp/otlplog/otlploggrpc/internal/observ/instrumentation_test.go new file mode 100644 index 00000000000..7d6a5b6110a --- /dev/null +++ b/exporters/otlp/otlplog/otlploggrpc/internal/observ/instrumentation_test.go @@ -0,0 +1,329 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package observ // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/observ" + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal" + mapi "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + semconv "go.opentelemetry.io/otel/semconv/v1.37.0" + "go.opentelemetry.io/otel/semconv/v1.37.0/otelconv" +) + +const ( + ID = 0 + TARGET = "localhost:8080" +) + +type errMeterProvider struct { + mapi.MeterProvider + + err error +} + +func (m *errMeterProvider) Meter(string, ...mapi.MeterOption) mapi.Meter { + return &errMeter{err: m.err} +} + +type errMeter struct { + mapi.Meter + + err error +} + +func (m *errMeter) Int64UpDownCounter(string, ...mapi.Int64UpDownCounterOption) (mapi.Int64UpDownCounter, error) { + return nil, m.err +} + +func (m *errMeter) Int64Counter(string, ...mapi.Int64CounterOption) (mapi.Int64Counter, error) { + return nil, m.err +} + +func (m *errMeter) Float64Histogram(string, ...mapi.Float64HistogramOption) (mapi.Float64Histogram, error) { + return nil, m.err +} + +func TestNewExporterMetrics(t *testing.T) { + t.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + + t.Run("No Error", func(t *testing.T) { + em, err := NewInstrumentation(ID, "dns:///example.com:42") + require.NoError(t, err) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.OTelComponentName(GetComponentName(ID)), + semconv.OTelComponentTypeKey.String(string(otelconv.ComponentTypeOtlpGRPCLogExporter)), + semconv.ServerAddress("example.com"), + semconv.ServerPort(42), + }, em.presetAttrs) + + assert.NotNil(t, em.logInflightMetric, "logInflightMetric should be created") + assert.NotNil(t, em.logExportedMetric, "logExportedMetric should be created") + assert.NotNil(t, em.logExportedDurationMetric, "logExportedDurationMetric should be created") + }) + + t.Run("Error", func(t *testing.T) { + orig := otel.GetMeterProvider() + t.Cleanup(func() { otel.SetMeterProvider(orig) }) + mp := &errMeterProvider{err: assert.AnError} + otel.SetMeterProvider(mp) + + _, err := NewInstrumentation(ID, "dns:///:8080") + require.ErrorIs(t, err, assert.AnError, "new instrument errors") + + assert.ErrorContains(t, err, "inflight metric") + assert.ErrorContains(t, err, "log exported metric") + assert.ErrorContains(t, err, "operation duration metric") + }) +} + +func TestServerAddrAttrs(t *testing.T) { + testcases := []struct { + name string + target string + want []attribute.KeyValue + }{ + { + name: "Unix socket", + target: "unix:///tmp/grpc.sock", + want: []attribute.KeyValue{semconv.ServerAddress("/tmp/grpc.sock")}, + }, + { + name: "DNS with port", + target: "dns:///localhost:8080", + want: []attribute.KeyValue{semconv.ServerAddress("localhost"), semconv.ServerPort(8080)}, + }, + { + name: "Dns with endpoint host:port", + target: "dns://8.8.8.8/example.com:4", + want: []attribute.KeyValue{semconv.ServerAddress("example.com"), semconv.ServerPort(4)}, + }, + } + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + attrs := ServerAddrAttrs(tc.target) + assert.Equal(t, tc.want, attrs) + }) + } +} + +func set(err error) attribute.Set { + attrs := []attribute.KeyValue{ + semconv.OTelComponentName(GetComponentName(ID)), + semconv.OTelComponentTypeKey.String(string(otelconv.ComponentTypeOtlpGRPCLogExporter)), + } + attrs = append(attrs, ServerAddrAttrs(TARGET)...) + if err != nil { + attrs = append(attrs, semconv.ErrorType(err)) + } + return attribute.NewSet(attrs...) +} + +func logInflightMetrics() metricdata.Metrics { + m := otelconv.SDKExporterLogInflight{} + return metricdata.Metrics{ + Name: m.Name(), + Description: m.Description(), + Unit: m.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[int64]{ + {Attributes: set(nil), Value: 0}, + }, + }, + } +} + +func logExportedMetrics(success, total int64, err error) metricdata.Metrics { + dp := []metricdata.DataPoint[int64]{ + {Attributes: set(nil), Value: success}, + } + + if err != nil { + dp = append(dp, metricdata.DataPoint[int64]{ + Attributes: set(err), + Value: total - success, + }) + } + + m := otelconv.SDKExporterLogExported{} + return metricdata.Metrics{ + Name: m.Name(), + Description: m.Description(), + Unit: m.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dp, + }, + } +} + +func logOperationDurationMetrics(err error, code codes.Code) metricdata.Metrics { + attrs := []attribute.KeyValue{ + semconv.OTelComponentName(GetComponentName(ID)), + semconv.OTelComponentTypeKey.String(string(otelconv.ComponentTypeOtlpGRPCLogExporter)), + semconv.RPCGRPCStatusCodeKey.Int64(int64(code)), + } + attrs = append(attrs, ServerAddrAttrs(TARGET)...) + if err != nil { + attrs = append(attrs, semconv.ErrorType(err)) + } + + m := otelconv.SDKExporterOperationDuration{} + return metricdata.Metrics{ + Name: m.Name(), + Description: m.Description(), + Unit: m.Unit(), + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[float64]{ + {Attributes: attribute.NewSet(attrs...)}, + }, + }, + } +} + +func setup(t *testing.T) (*Instrumentation, func() metricdata.ScopeMetrics) { + t.Helper() + t.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + original := otel.GetMeterProvider() + t.Cleanup(func() { + otel.SetMeterProvider(original) + }) + + r := metric.NewManualReader() + mp := metric.NewMeterProvider(metric.WithReader(r)) + otel.SetMeterProvider(mp) + + inst, err := NewInstrumentation(ID, TARGET) + require.NoError(t, err) + require.NotNil(t, inst) + + return inst, func() metricdata.ScopeMetrics { + var rm metricdata.ResourceMetrics + require.NoError(t, r.Collect(t.Context(), &rm)) + require.Len(t, rm.ScopeMetrics, 1) + return rm.ScopeMetrics[0] + } +} + +var Scope = instrumentation.Scope{ + Name: ScopeName, + Version: Version, + SchemaURL: semconv.SchemaURL, +} + +func assertMetrics( + t *testing.T, + got metricdata.ScopeMetrics, + spans int64, + success int64, + err error, + code codes.Code, +) { + t.Helper() + + assert.Equal(t, Scope, got.Scope, "unexpected scope") + + m := got.Metrics + require.Len(t, m, 3, "expected 3 metrics") + + o := metricdatatest.IgnoreTimestamp() + want := logInflightMetrics() + metricdatatest.AssertEqual(t, want, m[0], o) + + want = logExportedMetrics(success, spans, err) + metricdatatest.AssertEqual(t, want, m[1], o) + + want = logOperationDurationMetrics(err, code) + metricdatatest.AssertEqual(t, want, m[2], metricdatatest.IgnoreValue(), o) +} + +func TestInstrumentationExportLogs(t *testing.T) { + inst, collect := setup(t) + const n = 10 + inst.ExportLogs(t.Context(), n).End(nil) + assertMetrics(t, collect(), n, n, nil, codes.OK) +} + +func TestInstrumentationExportLogPartialErrors(t *testing.T) { + inst, collect := setup(t) + const n = 10 + const success = 5 + + err := internal.PartialSuccess{RejectedItems: success} + inst.ExportLogs(t.Context(), n).End(err) + + assertMetrics(t, collect(), n, success, err, status.Code(err)) +} + +func TestInstrumentationExportLogAllErrors(t *testing.T) { + inst, collect := setup(t) + const n = 10 + const success = 0 + inst.ExportLogs(t.Context(), n).End(assert.AnError) + + assertMetrics(t, collect(), n, success, assert.AnError, status.Code(assert.AnError)) +} + +func TestInstrumentationExportLogsInvalidPartialErrored(t *testing.T) { + inst, collect := setup(t) + const n = 10 + err := internal.PartialSuccess{RejectedItems: -5} + inst.ExportLogs(t.Context(), n).End(err) + + success := int64(n) + assertMetrics(t, collect(), n, success, err, status.Code(err)) + + err.RejectedItems = n + 5 + inst.ExportLogs(t.Context(), n).End(err) + + success += 0 + assertMetrics(t, collect(), n+n, success, err, status.Code(err)) +} + +func BenchmarkInstrumentationExportLogs(b *testing.B) { + setup := func(tb *testing.B) *Instrumentation { + tb.Helper() + tb.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + inst, err := NewInstrumentation(ID, TARGET) + if err != nil { + tb.Fatalf("failed to create instrumentation: %v", err) + } + return inst + } + run := func(err error) func(*testing.B) { + return func(b *testing.B) { + inst := setup(b) + b.ReportAllocs() + b.ResetTimer() + for b.Loop() { + inst.ExportLogs(b.Context(), 10).End(err) + } + } + } + b.Run("NoError", run(nil)) + b.Run("PartialError", run(&internal.PartialSuccess{RejectedItems: 6})) + b.Run("FullError", run(assert.AnError)) +} + +func BenchmarkSetPresetAttrs(b *testing.B) { + b.ResetTimer() + b.ReportAllocs() + for i := range b.N { + getPresetAttrs(int64(i), "dns:///192.168.1.1:8080") + } +} diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/observ/target.go b/exporters/otlp/otlplog/otlploggrpc/internal/observ/target.go new file mode 100644 index 00000000000..186f00e52a1 --- /dev/null +++ b/exporters/otlp/otlplog/otlploggrpc/internal/observ/target.go @@ -0,0 +1,143 @@ +// Code generated by gotmpl. DO NOT MODIFY. +// source: internal/shared/otlp/observ/target.go.tmpl + +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package observ // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/observ" + +import ( + "errors" + "fmt" + "net" + "net/netip" + "strconv" + "strings" +) + +const ( + schemeUnix = "unix" + schemeUnixAbstract = "unix-abstract" +) + +// ParseCanonicalTarget parses a target string and returns the extracted host +// (domain address or IP), the target port, or an error. +// +// If no port is specified, -1 is returned. +// +// If no host is specified, an empty string is returned. +// +// The target string is expected to always have the form +// "://[authority]/". For example: +// - "dns:///example.com:42" +// - "dns://8.8.8.8/example.com:42" +// - "unix:///path/to/socket" +// - "unix-abstract:///socket-name" +// - "passthrough:///192.34.2.1:42" +// +// The target is expected to come from the CanonicalTarget method of a gRPC +// Client. +func ParseCanonicalTarget(target string) (string, int, error) { + const sep = "://" + + // Find scheme. Do not allocate the string by using url.Parse. + idx := strings.Index(target, sep) + if idx == -1 { + return "", -1, fmt.Errorf("invalid target %q: missing scheme", target) + } + scheme, endpoint := target[:idx], target[idx+len(sep):] + + // Check for unix schemes. + if scheme == schemeUnix || scheme == schemeUnixAbstract { + return parseUnix(endpoint) + } + + // Strip leading slash and any authority. + if i := strings.Index(endpoint, "/"); i != -1 { + endpoint = endpoint[i+1:] + } + + // DNS, passthrough, and custom resolvers. + return parseEndpoint(endpoint) +} + +// parseUnix parses unix socket targets. +func parseUnix(endpoint string) (string, int, error) { + // Format: unix[-abstract]://path + // + // We should have "/path" (empty authority) if valid. + if len(endpoint) >= 1 && endpoint[0] == '/' { + // Return the full path including leading slash. + return endpoint, -1, nil + } + + // If there's no leading slash, it means there might be an authority + // Check for authority case (should error): "authority/path" + if slashIdx := strings.Index(endpoint, "/"); slashIdx > 0 { + return "", -1, fmt.Errorf("invalid (non-empty) authority: %s", endpoint[:slashIdx]) + } + + return "", -1, errors.New("invalid unix target format") +} + +// parseEndpoint parses an endpoint from a gRPC target. +// +// It supports the following formats: +// - "host" +// - "host%zone" +// - "host:port" +// - "host%zone:port" +// - "ipv4" +// - "ipv4%zone" +// - "ipv4:port" +// - "ipv4%zone:port" +// - "ipv6" +// - "ipv6%zone" +// - "[ipv6]" +// - "[ipv6%zone]" +// - "[ipv6]:port" +// - "[ipv6%zone]:port" +// +// It returns the host or host%zone (domain address or IP), the port (or -1 if +// not specified), or an error if the input is not a valid. +func parseEndpoint(endpoint string) (string, int, error) { + // First check if the endpoint is just an IP address. + if ip := parseIP(endpoint); ip != "" { + return ip, -1, nil + } + + // If there's no colon, there is no port (IPv6 with no port checked above). + if !strings.Contains(endpoint, ":") { + return endpoint, -1, nil + } + + host, portStr, err := net.SplitHostPort(endpoint) + if err != nil { + return "", -1, fmt.Errorf("invalid host:port %q: %w", endpoint, err) + } + + const base, bitSize = 10, 16 + port16, err := strconv.ParseUint(portStr, base, bitSize) + if err != nil { + return "", -1, fmt.Errorf("invalid port %q: %w", portStr, err) + } + port := int(port16) // port is guaranteed to be in the range [0, 65535]. + + return host, port, nil +} + +// parseIP attempts to parse the entire endpoint as an IP address. +// It returns the normalized string form of the IP if successful, +// or an empty string if parsing fails. +func parseIP(ip string) string { + // Strip leading and trailing brackets for IPv6 addresses. + if len(ip) >= 2 && ip[0] == '[' && ip[len(ip)-1] == ']' { + ip = ip[1 : len(ip)-1] + } + addr, err := netip.ParseAddr(ip) + if err != nil { + return "" + } + // Return the normalized string form of the IP. + return addr.String() +} diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/observ/target_test.go b/exporters/otlp/otlplog/otlploggrpc/internal/observ/target_test.go new file mode 100644 index 00000000000..27fa8239fc4 --- /dev/null +++ b/exporters/otlp/otlplog/otlploggrpc/internal/observ/target_test.go @@ -0,0 +1,162 @@ +// Code generated by gotmpl. DO NOT MODIFY. +// source: internal/shared/otlp/observ/target_test.go.tmpl + +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package observ + +import "testing" + +func TestParseTarget(t *testing.T) { + // gRPC target naming is defined here: + // https://github.com/grpc/grpc/blob/74232c6bd3c0f4bc35bad035dbeecf5cbc834a11/doc/naming.md + // + // The Go gRPC client only supports the "dns", "unix", "unix-abstract", and + // "passthrough" schemes natively with "dns" being the default: + // https://pkg.go.dev/google.golang.org/grpc@v1.75.1/internal/resolver + // + // Other schemes (e.g., "consul", "zk") are supported via custom resolvers + // that can be registered with the gRPC resolver package. These custom + // resolvers are still expected to follow the general target string format + // when rendered with the CanonicalTarget method: + // + // :/// + // + // All target strings in these tests are rendered with the + // CanonicalTarget method. Therefore they all follow the above format. + tests := []struct { + target string + host string + port int + }{ + // DNS scheme: hostname and port. + {target: "dns:///:8080", host: "", port: 8080}, + {target: "dns:///example.com", host: "example.com", port: -1}, + {target: "dns:///example.com%eth0", host: "example.com%eth0", port: -1}, + {target: "dns:///example.com:42", host: "example.com", port: 42}, + {target: "dns:///example.com%eth0:42", host: "example.com%eth0", port: 42}, + + // DNS scheme: hostname and port with authority. + {target: "dns://8.8.8.8/example.com", host: "example.com", port: -1}, + {target: "dns://8.8.8.8/example.com%eth0", host: "example.com%eth0", port: -1}, + {target: "dns://8.8.8.8/example.com:42", host: "example.com", port: 42}, + {target: "dns://8.8.8.8/example.com%eth0:42", host: "example.com%eth0", port: 42}, + + // DNS scheme: IPv4 address and port. + {target: "dns:///192.168.1.1", host: "192.168.1.1", port: -1}, + {target: "dns:///192.168.1.1%eth0", host: "192.168.1.1%eth0", port: -1}, + {target: "dns:///192.168.1.1:8080", host: "192.168.1.1", port: 8080}, + {target: "dns:///192.168.1.1%eth0:8080", host: "192.168.1.1%eth0", port: 8080}, + + // DNS scheme: IPv6 address and port. + {target: "dns:///2001:0db8:85a3:0000:0000:8a2e:0370:7334", host: "2001:db8:85a3::8a2e:370:7334", port: -1}, + {target: "dns:///2001:db8:85a3:0:0:8a2e:370:7334", host: "2001:db8:85a3::8a2e:370:7334", port: -1}, + {target: "dns:///2001:db8:85a3::8a2e:370:7334", host: "2001:db8:85a3::8a2e:370:7334", port: -1}, + {target: "dns:///2001:db8:85a3::8a2e:370:7334%eth0", host: "2001:db8:85a3::8a2e:370:7334%eth0", port: -1}, + {target: "dns:///[2001:db8:85a3::8a2e:370:7334]", host: "2001:db8:85a3::8a2e:370:7334", port: -1}, + {target: "dns:///[2001:db8:85a3::8a2e:370:7334%eth0]", host: "2001:db8:85a3::8a2e:370:7334%eth0", port: -1}, + {target: "dns:///[::1]:9090", host: "::1", port: 9090}, + {target: "dns:///[::1%eth0]:9090", host: "::1%eth0", port: 9090}, + + // Unix domain sockets. + {target: "unix:///tmp/grpc.sock", host: "/tmp/grpc.sock", port: -1}, + {target: "unix:///absolute_path", host: "/absolute_path", port: -1}, + + // Unix domain socket in abstract namespace. + {target: "unix-abstract:///abstract-socket-name", host: "/abstract-socket-name", port: -1}, + + // International domain names. + {target: "dns:///测试.example.com:8080", host: "测试.example.com", port: 8080}, + + // Port edge cases. + {target: "dns:///example.com:0", host: "example.com", port: 0}, + {target: "dns:///example.com:65535", host: "example.com", port: 65535}, + + // Case sensitivity. + {target: "dns:///EXAMPLE.COM:8080", host: "EXAMPLE.COM", port: 8080}, + {target: "dns:///Example.Com:8080", host: "Example.Com", port: 8080}, + + // Custom and passthrough resolvers scheme + {target: "passthrough:///localhost:50051", host: "localhost", port: 50051}, + {target: "passthrough:///10.0.0.2:7777", host: "10.0.0.2", port: 7777}, + {target: "consul:///my-service", host: "my-service", port: -1}, + {target: "zk:///services/my-service", host: "services/my-service", port: -1}, + } + + for _, tt := range tests { + host, port, err := ParseCanonicalTarget(tt.target) + if err != nil { + t.Errorf("parseTarget(%q) unexpected error: %v", tt.target, err) + continue + } + if host != tt.host { + t.Errorf("parseTarget(%q) host = %q, want %q", tt.target, host, tt.host) + } + if port != tt.port { + t.Errorf("parseTarget(%q) port = %d, want %d", tt.target, port, tt.port) + } + } +} + +func TestParseTargetErrors(t *testing.T) { + targets := []string{ + "dns:///example.com:invalid", // Non-numeric port in URL. + "dns:///example.com:8080:9090", // Multiple colons in port. + "dns:///example.com:99999", // Port out of range. + "dns:///example.com:-1", // Port out of range. + "unix://localhost/sock", // Non-empty authority for unix scheme. + "unix:", // Empty unix scheme. + "unix-abstract://", // Empty unix-abstract scheme. + "unix-abstract://authority/sock", // Non-empty authority for unix-abstract scheme. + "contains-cont\roll-cha\rs", // Invalid URL. + } + + for _, target := range targets { + host, port, err := ParseCanonicalTarget(target) + if err == nil { + t.Errorf("parseTarget(%q) expected error, got nil", target) + } + + if host != "" { + t.Errorf("parseTarget(%q) host = %q, want empty", target, host) + } + + if port != -1 { + t.Errorf("parseTarget(%q) port = %d, want -1", target, port) + } + } +} + +func BenchmarkParseTarget(b *testing.B) { + benchmarks := []struct { + name string + target string + }{ + {"HostName", "dns:///example.com"}, + {"HostPort", "dns:///example.com:8080"}, + {"IPv4WithoutPort", "dns:///192.168.1.1"}, + {"IPv4WithPort", "dns:///192.168.1.1:8080"}, + {"IPv6Bare", "dns:///2001:db8::1"}, + {"IPv6Bracket", "dns:///[2001:db8::1]"}, + {"IPv6WithPort", "dns:///[2001:db8::1]:8080"}, + {"UnixSocket", "unix:///tmp/grpc.sock"}, + {"UnixAbstractSocket", "unix-abstract:///abstract-socket-name"}, + {"Passthrough", "passthrough:///localhost:50051"}, + } + + var ( + host string + port int + err error + ) + for _, bm := range benchmarks { + b.Run(bm.name, func(b *testing.B) { + b.ReportAllocs() + for b.Loop() { + host, port, err = ParseCanonicalTarget(bm.target) + } + }) + } + _, _, _ = host, port, err +} diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/partialsuccess.go b/exporters/otlp/otlplog/otlploggrpc/internal/partialsuccess.go new file mode 100644 index 00000000000..076f05fcf0a --- /dev/null +++ b/exporters/otlp/otlplog/otlploggrpc/internal/partialsuccess.go @@ -0,0 +1,43 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal" + +import "fmt" + +// PartialSuccess represents the underlying error for all handling +// OTLP partial success messages. Use `errors.Is(err, +// PartialSuccess{})` to test whether an error passed to the OTel +// error handler belongs to this category. +type PartialSuccess struct { + ErrorMessage string + RejectedItems int64 + RejectedKind string +} + +var _ error = PartialSuccess{} + +// Error implements the error interface. +func (ps PartialSuccess) Error() string { + msg := ps.ErrorMessage + if msg == "" { + msg = "empty message" + } + return fmt.Sprintf("OTLP partial success: %s (%d %s rejected)", msg, ps.RejectedItems, ps.RejectedKind) +} + +// Is supports the errors.Is() interface. +func (PartialSuccess) Is(err error) bool { + _, ok := err.(PartialSuccess) + return ok +} + +// LogPartialSuccessError returns an error describing a partial success +// response for the log signal. +func LogPartialSuccessError(itemsRejected int64, errorMessage string) error { + return PartialSuccess{ + ErrorMessage: errorMessage, + RejectedItems: itemsRejected, + RejectedKind: "logs", + } +} diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/partialsuccess_test.go b/exporters/otlp/otlplog/otlploggrpc/internal/partialsuccess_test.go new file mode 100644 index 00000000000..78105ce7bb2 --- /dev/null +++ b/exporters/otlp/otlplog/otlploggrpc/internal/partialsuccess_test.go @@ -0,0 +1,34 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +func requireErrorString(t *testing.T, expect string, err error) { + t.Helper() + require.Error(t, err) + require.ErrorIs(t, err, PartialSuccess{}) + + const pfx = "OTLP partial success: " + + msg := err.Error() + require.True(t, strings.HasPrefix(msg, pfx)) + require.Equal(t, expect, msg[len(pfx):]) +} + +func TestPartialSuccessFormat(t *testing.T) { + requireErrorString(t, "empty message (0 logs rejected)", LogPartialSuccessError(0, "")) + requireErrorString(t, "help help (0 logs rejected)", LogPartialSuccessError(0, "help help")) + requireErrorString( + t, + "what happened (10 logs rejected)", + LogPartialSuccessError(10, "what happened"), + ) + requireErrorString(t, "what happened (15 logs rejected)", LogPartialSuccessError(15, "what happened")) +} diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/version.go b/exporters/otlp/otlplog/otlploggrpc/internal/version.go new file mode 100644 index 00000000000..d2e47664873 --- /dev/null +++ b/exporters/otlp/otlplog/otlploggrpc/internal/version.go @@ -0,0 +1,8 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal" + +// Version is the current release version of the OpenTelemetry otlploggrpc +// exporter in use. +const Version = "0.14.0" diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/x/README.md b/exporters/otlp/otlplog/otlploggrpc/internal/x/README.md new file mode 100644 index 00000000000..7d73c7e7db3 --- /dev/null +++ b/exporters/otlp/otlplog/otlploggrpc/internal/x/README.md @@ -0,0 +1,36 @@ +# Experimental Features + +The `otlploggrpc` exporter contains features that have not yet stabilized in the OpenTelemetry specification. +These features are added to the `otlploggrpc` exporter prior to stabilization in the specification so that users can start experimenting with them and provide feedback. + +These features may change in backwards incompatible ways as feedback is applied. +See the [Compatibility and Stability](#compatibility-and-stability) section for more information. + +## Features + +- [Observability](#observability) + +### Observability + +The `otlploggrpc` exporter can be configured to provide observability about itself using OpenTelemetry metrics. + +To opt-in, set the environment variable `OTEL_GO_X_OBSERVABILITY` to `true`. + +When enabled, the exporter will create the following metrics using the global `MeterProvider`: + +- `otel.sdk.exporter.log.inflight` +- `otel.sdk.exporter.log.exported` +- `otel.sdk.exporter.operation.duration` + +Please see the [Semantic conventions for OpenTelemetry SDK metrics] documentation for more details on these metrics. + +[Semantic conventions for OpenTelemetry SDK metrics]: https://github.com/open-telemetry/semantic-conventions/blob/v1.36.0/docs/otel/sdk-metrics.md + +## Compatibility and Stability + +Experimental features do not fall within the scope of the OpenTelemetry Go versioning and stability [policy](../../../../../../VERSIONING.md). +These features may be removed or modified in successive version releases, including patch versions. + +When an experimental feature is promoted to a stable feature, a migration path will be included in the changelog entry of the release. +There is no guarantee that any environment variable feature flags that enabled the experimental feature will be supported by the stable version. +If they are supported, they may be accompanied with a deprecation notice stating a timeline for the removal of that support. diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/x/features.go b/exporters/otlp/otlplog/otlploggrpc/internal/x/features.go new file mode 100644 index 00000000000..0ed1c81aa42 --- /dev/null +++ b/exporters/otlp/otlplog/otlploggrpc/internal/x/features.go @@ -0,0 +1,23 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package x documents experimental features for [go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc]. +package x // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/x" + +import "strings" + +// Observability is an experimental feature flag that determines if exporter +// observability metrics are enabled. +// +// To enable this feature set the OTEL_GO_X_OBSERVABILITY environment variable +// to the case-insensitive string value of "true" (i.e. "True" and "TRUE" +// will also enable this). +var Observability = newFeature( + []string{"OBSERVABILITY"}, + func(v string) (string, bool) { + if strings.EqualFold(v, "true") { + return v, true + } + return "", false + }, +) diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/x/features_test.go b/exporters/otlp/otlplog/otlploggrpc/internal/x/features_test.go new file mode 100644 index 00000000000..a8d3fb06ed4 --- /dev/null +++ b/exporters/otlp/otlplog/otlploggrpc/internal/x/features_test.go @@ -0,0 +1,21 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package x + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestObservability(t *testing.T) { + const key = "OTEL_GO_X_OBSERVABILITY" + require.Contains(t, Observability.Keys(), key) + + t.Run("100", run(setenv(key, "100"), assertDisabled(Observability))) + t.Run("true", run(setenv(key, "true"), assertEnabled(Observability, "true"))) + t.Run("True", run(setenv(key, "True"), assertEnabled(Observability, "True"))) + t.Run("false", run(setenv(key, "false"), assertDisabled(Observability))) + t.Run("empty", run(assertDisabled(Observability))) +} diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/x/x.go b/exporters/otlp/otlplog/otlploggrpc/internal/x/x.go new file mode 100644 index 00000000000..e2d50cedeb5 --- /dev/null +++ b/exporters/otlp/otlplog/otlploggrpc/internal/x/x.go @@ -0,0 +1,58 @@ +// Code generated by gotmpl. DO NOT MODIFY. +// source: internal/shared/x/x.go.tmpl + +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package x documents experimental features for [go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc]. +package x // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/x" + +import ( + "os" +) + +// Feature is an experimental feature control flag. It provides a uniform way +// to interact with these feature flags and parse their values. +type Feature[T any] struct { + keys []string + parse func(v string) (T, bool) +} + +func newFeature[T any](suffix []string, parse func(string) (T, bool)) Feature[T] { + const envKeyRoot = "OTEL_GO_X_" + keys := make([]string, 0, len(suffix)) + for _, s := range suffix { + keys = append(keys, envKeyRoot+s) + } + return Feature[T]{ + keys: keys, + parse: parse, + } +} + +// Keys returns the environment variable keys that can be set to enable the +// feature. +func (f Feature[T]) Keys() []string { return f.keys } + +// Lookup returns the user configured value for the feature and true if the +// user has enabled the feature. Otherwise, if the feature is not enabled, a +// zero-value and false are returned. +func (f Feature[T]) Lookup() (v T, ok bool) { + // https://github.com/open-telemetry/opentelemetry-specification/blob/62effed618589a0bec416a87e559c0a9d96289bb/specification/configuration/sdk-environment-variables.md#parsing-empty-value + // + // > The SDK MUST interpret an empty value of an environment variable the + // > same way as when the variable is unset. + for _, key := range f.keys { + vRaw := os.Getenv(key) + if vRaw != "" { + return f.parse(vRaw) + } + } + return v, ok +} + +// Enabled reports whether the feature is enabled. +func (f Feature[T]) Enabled() bool { + _, ok := f.Lookup() + return ok +} diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/x/x_test.go b/exporters/otlp/otlplog/otlploggrpc/internal/x/x_test.go new file mode 100644 index 00000000000..a715d7608a7 --- /dev/null +++ b/exporters/otlp/otlplog/otlploggrpc/internal/x/x_test.go @@ -0,0 +1,75 @@ +// Code generated by gotmpl. DO NOT MODIFY. +// source: internal/shared/x/x_text.go.tmpl + +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package x + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + mockKey = "OTEL_GO_X_MOCK_FEATURE" + mockKey2 = "OTEL_GO_X_MOCK_FEATURE2" +) + +var mockFeature = newFeature([]string{"MOCK_FEATURE", "MOCK_FEATURE2"}, func(v string) (string, bool) { + if strings.EqualFold(v, "true") { + return v, true + } + return "", false +}) + +func TestFeature(t *testing.T) { + require.Contains(t, mockFeature.Keys(), mockKey) + require.Contains(t, mockFeature.Keys(), mockKey2) + + t.Run("100", run(setenv(mockKey, "100"), assertDisabled(mockFeature))) + t.Run("true", run(setenv(mockKey, "true"), assertEnabled(mockFeature, "true"))) + t.Run("True", run(setenv(mockKey, "True"), assertEnabled(mockFeature, "True"))) + t.Run("false", run(setenv(mockKey, "false"), assertDisabled(mockFeature))) + t.Run("empty", run(assertDisabled(mockFeature))) +} + +func run(steps ...func(*testing.T)) func(*testing.T) { + return func(t *testing.T) { + t.Helper() + for _, step := range steps { + step(t) + } + } +} + +func setenv(k, v string) func(t *testing.T) { //nolint:unparam // This is a reusable test utility function. + return func(t *testing.T) { t.Setenv(k, v) } +} + +func assertEnabled[T any](f Feature[T], want T) func(*testing.T) { + return func(t *testing.T) { + t.Helper() + assert.True(t, f.Enabled(), "not enabled") + + v, ok := f.Lookup() + assert.True(t, ok, "Lookup state") + assert.Equal(t, want, v, "Lookup value") + } +} + +func assertDisabled[T any](f Feature[T]) func(*testing.T) { + var zero T + return func(t *testing.T) { + t.Helper() + + assert.False(t, f.Enabled(), "enabled") + + v, ok := f.Lookup() + assert.False(t, ok, "Lookup state") + assert.Equal(t, zero, v, "Lookup value") + } +} diff --git a/versions.yaml b/versions.yaml index 1c1a4f5a1f3..13062bceca3 100644 --- a/versions.yaml +++ b/versions.yaml @@ -49,3 +49,7 @@ modules: go.opentelemetry.io/otel/exporters/prometheus: version-refs: - ./exporters/prometheus/internal/version.go + + go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc: + version-refs: + - ./exporters/otlp/otlplog/otlploggrpc/internal/version.go \ No newline at end of file