diff --git a/internal/telemetry/authenticator.go b/internal/telemetry/authenticator.go new file mode 100644 index 00000000000..87f2f73f85d --- /dev/null +++ b/internal/telemetry/authenticator.go @@ -0,0 +1,102 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package telemetry // import "go.opentelemetry.io/collector/internal/telemetry" + +import ( + "net/http" + "sync" + + "google.golang.org/grpc/credentials" + + "go.opentelemetry.io/collector/extension/extensionauth" +) + +// AuthenticatorProvider is an interface that provides authentication +// for internal telemetry exports. +type AuthenticatorProvider interface { + // GetHTTPRoundTripper returns an authenticated round tripper for HTTP requests. + GetHTTPRoundTripper(base http.RoundTripper) (http.RoundTripper, error) + + // GetGRPCCredentials returns gRPC per-RPC credentials for authentication. + GetGRPCCredentials() (credentials.PerRPCCredentials, error) +} + +// AuthenticatorManager manages authentication for internal telemetry. +// This allows for late binding of authenticators since telemetry providers +// are created before extensions are initialized. +type AuthenticatorManager struct { + mu sync.RWMutex + authenticator AuthenticatorProvider +} + +// NewAuthenticatorManager creates a new AuthenticatorManager. +func NewAuthenticatorManager() *AuthenticatorManager { + return &AuthenticatorManager{} +} + +// SetAuthenticator sets the authenticator provider. +func (am *AuthenticatorManager) SetAuthenticator(auth AuthenticatorProvider) { + am.mu.Lock() + defer am.mu.Unlock() + am.authenticator = auth +} + +// GetHTTPRoundTripper returns an authenticated round tripper if an authenticator is set. +func (am *AuthenticatorManager) GetHTTPRoundTripper(base http.RoundTripper) (http.RoundTripper, error) { + am.mu.RLock() + defer am.mu.RUnlock() + + if am.authenticator == nil { + return base, nil + } + + return am.authenticator.GetHTTPRoundTripper(base) +} + +// GetGRPCCredentials returns gRPC credentials if an authenticator is set. +func (am *AuthenticatorManager) GetGRPCCredentials() (credentials.PerRPCCredentials, error) { + am.mu.RLock() + defer am.mu.RUnlock() + + if am.authenticator == nil { + return nil, nil + } + + return am.authenticator.GetGRPCCredentials() +} + +// GlobalAuthenticatorManager is the global instance used by internal telemetry. +// This allows telemetry providers to be authenticated even when they're created +// before extensions are initialized. +var GlobalAuthenticatorManager = NewAuthenticatorManager() + +// DefaultAuthenticatorProvider creates an AuthenticatorProvider from extension components. +type DefaultAuthenticatorProvider struct { + httpClient extensionauth.HTTPClient + grpcClient extensionauth.GRPCClient +} + +// NewDefaultAuthenticatorProvider creates a new DefaultAuthenticatorProvider. +func NewDefaultAuthenticatorProvider(httpClient extensionauth.HTTPClient, grpcClient extensionauth.GRPCClient) *DefaultAuthenticatorProvider { + return &DefaultAuthenticatorProvider{ + httpClient: httpClient, + grpcClient: grpcClient, + } +} + +// GetHTTPRoundTripper implements AuthenticatorProvider. +func (p *DefaultAuthenticatorProvider) GetHTTPRoundTripper(base http.RoundTripper) (http.RoundTripper, error) { + if p.httpClient == nil { + return base, nil + } + return p.httpClient.RoundTripper(base) +} + +// GetGRPCCredentials implements AuthenticatorProvider. +func (p *DefaultAuthenticatorProvider) GetGRPCCredentials() (credentials.PerRPCCredentials, error) { + if p.grpcClient == nil { + return nil, nil + } + return p.grpcClient.PerRPCCredentials() +} diff --git a/internal/telemetry/authenticator_test.go b/internal/telemetry/authenticator_test.go new file mode 100644 index 00000000000..48060497c93 --- /dev/null +++ b/internal/telemetry/authenticator_test.go @@ -0,0 +1,129 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package telemetry + +import ( + "context" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/credentials" +) + +func TestAuthenticatorManager(t *testing.T) { + manager := NewAuthenticatorManager() + + // Test no authenticator set + rt, err := manager.GetHTTPRoundTripper(http.DefaultTransport) + require.NoError(t, err) + assert.Equal(t, http.DefaultTransport, rt) + + creds, err := manager.GetGRPCCredentials() + require.NoError(t, err) + assert.Nil(t, creds) + + // Test with authenticator set + mockAuth := &mockAuthenticatorProvider{} + manager.SetAuthenticator(mockAuth) + + rt, err = manager.GetHTTPRoundTripper(http.DefaultTransport) + require.NoError(t, err) + assert.Equal(t, mockAuth.roundTripper, rt) + + creds, err = manager.GetGRPCCredentials() + require.NoError(t, err) + assert.Equal(t, mockAuth.creds, creds) +} + +func TestDefaultAuthenticatorProvider(t *testing.T) { + mockHTTPClient := &mockHTTPClient{} + mockGRPCClient := &mockGRPCClient{} + + provider := NewDefaultAuthenticatorProvider(mockHTTPClient, mockGRPCClient) + + // Test HTTP round tripper + rt, err := provider.GetHTTPRoundTripper(http.DefaultTransport) + require.NoError(t, err) + assert.Equal(t, mockHTTPClient.roundTripper, rt) + + // Test gRPC credentials + creds, err := provider.GetGRPCCredentials() + require.NoError(t, err) + assert.Equal(t, mockGRPCClient.creds, creds) +} + +func TestDefaultAuthenticatorProviderWithNilClients(t *testing.T) { + provider := NewDefaultAuthenticatorProvider(nil, nil) + + // Test HTTP round tripper + rt, err := provider.GetHTTPRoundTripper(http.DefaultTransport) + require.NoError(t, err) + assert.Equal(t, http.DefaultTransport, rt) + + // Test gRPC credentials + creds, err := provider.GetGRPCCredentials() + require.NoError(t, err) + assert.Nil(t, creds) +} + +// Mock implementations for testing + +type mockAuthenticatorProvider struct { + roundTripper http.RoundTripper + creds credentials.PerRPCCredentials +} + +func (m *mockAuthenticatorProvider) GetHTTPRoundTripper(base http.RoundTripper) (http.RoundTripper, error) { + if m.roundTripper == nil { + m.roundTripper = &mockRoundTripper{} + } + return m.roundTripper, nil +} + +func (m *mockAuthenticatorProvider) GetGRPCCredentials() (credentials.PerRPCCredentials, error) { + if m.creds == nil { + m.creds = &mockCredentials{} + } + return m.creds, nil +} + +type mockHTTPClient struct { + roundTripper http.RoundTripper +} + +func (m *mockHTTPClient) RoundTripper(base http.RoundTripper) (http.RoundTripper, error) { + if m.roundTripper == nil { + m.roundTripper = &mockRoundTripper{} + } + return m.roundTripper, nil +} + +type mockGRPCClient struct { + creds credentials.PerRPCCredentials +} + +func (m *mockGRPCClient) PerRPCCredentials() (credentials.PerRPCCredentials, error) { + if m.creds == nil { + m.creds = &mockCredentials{} + } + return m.creds, nil +} + +type mockRoundTripper struct{} + +func (m *mockRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + return nil, nil +} + +type mockCredentials struct{} + +func (m *mockCredentials) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) { + return map[string]string{"authorization": "Bearer mock-token"}, nil +} + +func (m *mockCredentials) RequireTransportSecurity() bool { + return false +} diff --git a/internal/telemetry/go.mod b/internal/telemetry/go.mod index 4fbff7b9e42..c0c989e213c 100644 --- a/internal/telemetry/go.mod +++ b/internal/telemetry/go.mod @@ -4,6 +4,7 @@ go 1.24 require ( github.com/stretchr/testify v1.11.1 + go.opentelemetry.io/collector/extension/extensionauth v1.40.0 go.opentelemetry.io/collector/featuregate v1.40.0 go.opentelemetry.io/collector/pdata v1.40.0 go.opentelemetry.io/collector/pipeline v1.40.0 @@ -46,3 +47,5 @@ replace go.opentelemetry.io/collector/pipeline => ../../pipeline replace go.opentelemetry.io/collector/pdata => ../../pdata replace go.opentelemetry.io/collector/featuregate => ../../featuregate + +replace go.opentelemetry.io/collector/extension/extensionauth => ../../extension/extensionauth diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go index 5ebe2d55dde..1151aaad33d 100644 --- a/internal/telemetry/telemetry.go +++ b/internal/telemetry/telemetry.go @@ -39,6 +39,15 @@ type TelemetrySettings struct { // Resource contains the resource attributes for the collector's telemetry. Resource pcommon.Resource + // AuthenticatorID specifies the name of the extension to use for authenticating + // internal telemetry exports. If set, the collector will use this extension + // to authenticate all HTTP and gRPC connections made by internal telemetry exporters. + AuthenticatorID string + + // AuthenticatorManager provides access to authentication for internal telemetry. + // This is used internally by the collector to apply authentication to telemetry exports. + AuthenticatorManager *AuthenticatorManager + // Extra attributes added to instrumentation scopes extraAttributes attribute.Set } diff --git a/service/service.go b/service/service.go index aa89d26c33e..9eb0f43e35b 100644 --- a/service/service.go +++ b/service/service.go @@ -25,7 +25,9 @@ import ( "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/extension" + "go.opentelemetry.io/collector/extension/extensionauth" "go.opentelemetry.io/collector/featuregate" + internaltelemetry "go.opentelemetry.io/collector/internal/telemetry" "go.opentelemetry.io/collector/internal/telemetry/componentattribute" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/processor" @@ -170,10 +172,12 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { })) srv.telemetrySettings = component.TelemetrySettings{ - Logger: logger, - MeterProvider: telProviders.MeterProvider(), - TracerProvider: telProviders.TracerProvider(), - Resource: telProviders.Resource(), + Logger: logger, + MeterProvider: telProviders.MeterProvider(), + TracerProvider: telProviders.TracerProvider(), + Resource: telProviders.Resource(), + AuthenticatorID: cfg.Telemetry.Authenticator, + AuthenticatorManager: internaltelemetry.NewAuthenticatorManager(), } srv.host.Reporter = status.NewReporter(srv.host.NotifyComponentStatusChange, func(err error) { if errors.Is(err, status.ErrStatusNotReady) { @@ -193,6 +197,12 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { return nil, err } + // Configure authentication for internal telemetry if specified + err = srv.configureAuthentication(ctx) + if err != nil { + return nil, fmt.Errorf("failed to configure telemetry authentication: %w", err) + } + if cfg.Telemetry.Metrics.Level != configtelemetry.LevelNone && len(mpConfig.Readers) != 0 { if err = proctelemetry.RegisterProcessMetrics(srv.telemetrySettings); err != nil { return nil, fmt.Errorf("failed to register process metrics: %w", err) @@ -282,6 +292,54 @@ func (srv *Service) initExtensions(ctx context.Context, cfg extensions.Config) e return nil } +// configureAuthentication configures authentication for internal telemetry if specified. +func (srv *Service) configureAuthentication(ctx context.Context) error { + if srv.telemetrySettings.AuthenticatorID == "" { + return nil + } + + // Parse the authenticator ID + var authenticatorID component.ID + err := authenticatorID.UnmarshalText([]byte(srv.telemetrySettings.AuthenticatorID)) + if err != nil { + return fmt.Errorf("invalid authenticator ID %q: %w", srv.telemetrySettings.AuthenticatorID, err) + } + + // Get the extension from the host + extensions := srv.host.GetExtensions() + ext, found := extensions[authenticatorID] + if !found { + return fmt.Errorf("authenticator extension %q not found", authenticatorID) + } + + // Create the authenticator provider + var httpClient extensionauth.HTTPClient + var grpcClient extensionauth.GRPCClient + + // Check if the extension implements HTTP client authentication + if hc, ok := ext.(extensionauth.HTTPClient); ok { + httpClient = hc + } + + // Check if the extension implements gRPC client authentication + if gc, ok := ext.(extensionauth.GRPCClient); ok { + grpcClient = gc + } + + if httpClient == nil && grpcClient == nil { + return fmt.Errorf("authenticator extension %q does not implement HTTP or gRPC client authentication", authenticatorID) + } + + // Create and set the authenticator provider + provider := internaltelemetry.NewDefaultAuthenticatorProvider(httpClient, grpcClient) + srv.telemetrySettings.AuthenticatorManager.SetAuthenticator(provider) + + srv.telemetrySettings.Logger.Info("Configured authentication for internal telemetry", + zap.String("authenticator", authenticatorID.String())) + + return nil +} + // Creates the pipeline graph. func (srv *Service) initGraph(ctx context.Context, cfg Config) error { var err error diff --git a/service/service_test.go b/service/service_test.go index 4e74ebfd541..e532f44f174 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -23,14 +23,17 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" + "google.golang.org/grpc/credentials" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/extension/zpagesextension" + internaltelemetry "go.opentelemetry.io/collector/internal/telemetry" "go.opentelemetry.io/collector/internal/testutil" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" @@ -1030,3 +1033,125 @@ func TestValidateGraph(t *testing.T) { }) } } + +// Test authenticator extension that implements both HTTP and gRPC client authentication +type testAuthExtension struct { + component.StartFunc + component.ShutdownFunc +} + +func (e *testAuthExtension) RoundTripper(base http.RoundTripper) (http.RoundTripper, error) { + return &testRoundTripper{base: base}, nil +} + +func (e *testAuthExtension) PerRPCCredentials() (credentials.PerRPCCredentials, error) { + return &testCredentials{}, nil +} + +type testRoundTripper struct { + base http.RoundTripper +} + +func (rt *testRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + req.Header.Set("Authorization", "Bearer test-token") + return rt.base.RoundTrip(req) +} + +type testCredentials struct{} + +func (c *testCredentials) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) { + return map[string]string{"authorization": "Bearer test-token"}, nil +} + +func (c *testCredentials) RequireTransportSecurity() bool { + return false +} + +func TestAuthenticatorManagerIntegration(t *testing.T) { + // Test that the authenticator manager can be configured with real extension implementations + manager := internaltelemetry.NewAuthenticatorManager() + + // Test with no authenticator + rt, err := manager.GetHTTPRoundTripper(http.DefaultTransport) + require.NoError(t, err) + assert.Equal(t, http.DefaultTransport, rt) + + creds, err := manager.GetGRPCCredentials() + require.NoError(t, err) + assert.Nil(t, creds) + + // Test with authenticator configured + authExt := &testAuthExtension{} + provider := internaltelemetry.NewDefaultAuthenticatorProvider(authExt, authExt) + manager.SetAuthenticator(provider) + + rt, err = manager.GetHTTPRoundTripper(http.DefaultTransport) + require.NoError(t, err) + assert.IsType(t, &testRoundTripper{}, rt) + + creds, err = manager.GetGRPCCredentials() + require.NoError(t, err) + assert.IsType(t, &testCredentials{}, creds) +} + +func TestTelemetrySettingsAuthenticatorFields(t *testing.T) { + // Test that TelemetrySettings properly contains the new authentication fields + settings := component.TelemetrySettings{ + Logger: componenttest.NewNopTelemetrySettings().Logger, + AuthenticatorID: "testauth", + AuthenticatorManager: internaltelemetry.NewAuthenticatorManager(), + } + + assert.Equal(t, "testauth", settings.AuthenticatorID) + assert.NotNil(t, settings.AuthenticatorManager) + + // Test the authenticator manager functions + rt, err := settings.AuthenticatorManager.GetHTTPRoundTripper(http.DefaultTransport) + require.NoError(t, err) + assert.Equal(t, http.DefaultTransport, rt) +} + +func TestComponentIDParsing(t *testing.T) { + // Test that component ID parsing works correctly for authenticator IDs + testCases := []struct { + name string + idString string + expectError bool + }{ + { + name: "simple ID", + idString: "bearertokenauth", + expectError: false, + }, + { + name: "ID with name", + idString: "bearertokenauth/custom", + expectError: false, + }, + { + name: "empty ID", + idString: "", + expectError: true, + }, + { + name: "invalid format", + idString: "invalid/too/many/parts", + expectError: false, // This should actually be valid according to the ID parsing rules + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var id component.ID + err := id.UnmarshalText([]byte(tc.idString)) + if tc.expectError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + if tc.idString != "" { + assert.Equal(t, tc.idString, id.String()) + } + } + }) + } +} diff --git a/service/telemetry/otelconftelemetry/config.go b/service/telemetry/otelconftelemetry/config.go index 0229e7c740c..51473fc07cd 100644 --- a/service/telemetry/otelconftelemetry/config.go +++ b/service/telemetry/otelconftelemetry/config.go @@ -16,6 +16,11 @@ type Config struct { Metrics MetricsConfig `mapstructure:"metrics"` Traces TracesConfig `mapstructure:"traces,omitempty"` + // Authenticator specifies the name of the extension to use for authenticating + // internal telemetry exports. If set, the collector will use this extension + // to authenticate all HTTP and gRPC connections made by internal telemetry exporters. + Authenticator string `mapstructure:"authenticator,omitempty"` + // Resource specifies user-defined attributes to include with all emitted telemetry. // Note that some attributes are added automatically (e.g. service.version) even // if they are not specified here. In order to suppress such attributes the diff --git a/service/telemetry/otelconftelemetry/config_test.go b/service/telemetry/otelconftelemetry/config_test.go index 8c1fce16ef5..185651bda00 100644 --- a/service/telemetry/otelconftelemetry/config_test.go +++ b/service/telemetry/otelconftelemetry/config_test.go @@ -118,3 +118,80 @@ func TestConfig(t *testing.T) { }) } } + +func TestConfigWithAuthenticator(t *testing.T) { + tests := []struct { + name string + configMap map[string]any + expectedAuth string + }{ + { + name: "valid authenticator", + configMap: map[string]any{ + "authenticator": "bearertokenauth", + "logs": map[string]any{ + "level": "info", + }, + "metrics": map[string]any{ + "level": "none", + }, + "traces": map[string]any{ + "level": "none", + }, + }, + expectedAuth: "bearertokenauth", + }, + { + name: "valid authenticator with name", + configMap: map[string]any{ + "authenticator": "bearertokenauth/custom", + "logs": map[string]any{ + "level": "info", + }, + "metrics": map[string]any{ + "level": "none", + }, + "traces": map[string]any{ + "level": "none", + }, + }, + expectedAuth: "bearertokenauth/custom", + }, + { + name: "no authenticator", + configMap: map[string]any{ + "logs": map[string]any{ + "level": "info", + }, + "metrics": map[string]any{ + "level": "none", + }, + "traces": map[string]any{ + "level": "none", + }, + }, + expectedAuth: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + conf := confmap.NewFromStringMap(tt.configMap) + + cfg := createDefaultConfig().(*Config) + err := conf.Unmarshal(cfg) + require.NoError(t, err) + + assert.Equal(t, tt.expectedAuth, cfg.Authenticator) + }) + } +} + +func TestConfigValidateWithAuthenticator(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Authenticator = "bearertokenauth" + + // Should not affect validation + err := cfg.Validate() + assert.NoError(t, err) +}