Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions internal/telemetry/authenticator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package telemetry // import "go.opentelemetry.io/collector/internal/telemetry"

import (
"net/http"
"sync"

"google.golang.org/grpc/credentials"

"go.opentelemetry.io/collector/extension/extensionauth"
)

// AuthenticatorProvider is an interface that provides authentication
// for internal telemetry exports.
type AuthenticatorProvider interface {
// GetHTTPRoundTripper returns an authenticated round tripper for HTTP requests.
GetHTTPRoundTripper(base http.RoundTripper) (http.RoundTripper, error)

// GetGRPCCredentials returns gRPC per-RPC credentials for authentication.
GetGRPCCredentials() (credentials.PerRPCCredentials, error)
}

// AuthenticatorManager manages authentication for internal telemetry.
// This allows for late binding of authenticators since telemetry providers
// are created before extensions are initialized.
type AuthenticatorManager struct {
mu sync.RWMutex
authenticator AuthenticatorProvider
}

// NewAuthenticatorManager creates a new AuthenticatorManager.
func NewAuthenticatorManager() *AuthenticatorManager {
return &AuthenticatorManager{}
}

// SetAuthenticator sets the authenticator provider.
func (am *AuthenticatorManager) SetAuthenticator(auth AuthenticatorProvider) {
am.mu.Lock()
defer am.mu.Unlock()
am.authenticator = auth
}

// GetHTTPRoundTripper returns an authenticated round tripper if an authenticator is set.
func (am *AuthenticatorManager) GetHTTPRoundTripper(base http.RoundTripper) (http.RoundTripper, error) {
am.mu.RLock()
defer am.mu.RUnlock()

if am.authenticator == nil {
return base, nil
}

return am.authenticator.GetHTTPRoundTripper(base)
}

// GetGRPCCredentials returns gRPC credentials if an authenticator is set.
func (am *AuthenticatorManager) GetGRPCCredentials() (credentials.PerRPCCredentials, error) {
am.mu.RLock()
defer am.mu.RUnlock()

if am.authenticator == nil {
return nil, nil
}

return am.authenticator.GetGRPCCredentials()
}

// GlobalAuthenticatorManager is the global instance used by internal telemetry.
// This allows telemetry providers to be authenticated even when they're created
// before extensions are initialized.
var GlobalAuthenticatorManager = NewAuthenticatorManager()

// DefaultAuthenticatorProvider creates an AuthenticatorProvider from extension components.
type DefaultAuthenticatorProvider struct {
httpClient extensionauth.HTTPClient
grpcClient extensionauth.GRPCClient
}
Comment on lines +75 to +78
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be able to replace this type with an unexported form.

type authenticatorProvider struct {
  extensionauth.ClientRoundTripperFunc
  extensionauth.ClientPerRPCCredentialsFunc
}

Reviewers, see this evidently-stalled RFC on the topic of how we construct these default implementations: #13263


// NewDefaultAuthenticatorProvider creates a new DefaultAuthenticatorProvider.
func NewDefaultAuthenticatorProvider(httpClient extensionauth.HTTPClient, grpcClient extensionauth.GRPCClient) *DefaultAuthenticatorProvider {
return &DefaultAuthenticatorProvider{
httpClient: httpClient,
grpcClient: grpcClient,
}
}

// GetHTTPRoundTripper implements AuthenticatorProvider.
func (p *DefaultAuthenticatorProvider) GetHTTPRoundTripper(base http.RoundTripper) (http.RoundTripper, error) {
if p.httpClient == nil {
return base, nil
}
return p.httpClient.RoundTripper(base)
}

// GetGRPCCredentials implements AuthenticatorProvider.
func (p *DefaultAuthenticatorProvider) GetGRPCCredentials() (credentials.PerRPCCredentials, error) {
if p.grpcClient == nil {
return nil, nil
}
return p.grpcClient.PerRPCCredentials()
}
Comment on lines +88 to +102
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The suggestion above means we can remove all this.

129 changes: 129 additions & 0 deletions internal/telemetry/authenticator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package telemetry

import (
"context"
"net/http"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/credentials"
)

func TestAuthenticatorManager(t *testing.T) {
manager := NewAuthenticatorManager()

// Test no authenticator set
rt, err := manager.GetHTTPRoundTripper(http.DefaultTransport)
require.NoError(t, err)
assert.Equal(t, http.DefaultTransport, rt)

creds, err := manager.GetGRPCCredentials()
require.NoError(t, err)
assert.Nil(t, creds)

// Test with authenticator set
mockAuth := &mockAuthenticatorProvider{}
manager.SetAuthenticator(mockAuth)

rt, err = manager.GetHTTPRoundTripper(http.DefaultTransport)
require.NoError(t, err)
assert.Equal(t, mockAuth.roundTripper, rt)

creds, err = manager.GetGRPCCredentials()
require.NoError(t, err)
assert.Equal(t, mockAuth.creds, creds)
}

func TestDefaultAuthenticatorProvider(t *testing.T) {
mockHTTPClient := &mockHTTPClient{}
mockGRPCClient := &mockGRPCClient{}

provider := NewDefaultAuthenticatorProvider(mockHTTPClient, mockGRPCClient)

// Test HTTP round tripper
rt, err := provider.GetHTTPRoundTripper(http.DefaultTransport)
require.NoError(t, err)
assert.Equal(t, mockHTTPClient.roundTripper, rt)

// Test gRPC credentials
creds, err := provider.GetGRPCCredentials()
require.NoError(t, err)
assert.Equal(t, mockGRPCClient.creds, creds)
}

func TestDefaultAuthenticatorProviderWithNilClients(t *testing.T) {
provider := NewDefaultAuthenticatorProvider(nil, nil)

// Test HTTP round tripper
rt, err := provider.GetHTTPRoundTripper(http.DefaultTransport)
require.NoError(t, err)
assert.Equal(t, http.DefaultTransport, rt)

// Test gRPC credentials
creds, err := provider.GetGRPCCredentials()
require.NoError(t, err)
assert.Nil(t, creds)
}

// Mock implementations for testing

type mockAuthenticatorProvider struct {
roundTripper http.RoundTripper
creds credentials.PerRPCCredentials
}

func (m *mockAuthenticatorProvider) GetHTTPRoundTripper(base http.RoundTripper) (http.RoundTripper, error) {
if m.roundTripper == nil {
m.roundTripper = &mockRoundTripper{}
}
return m.roundTripper, nil
}

func (m *mockAuthenticatorProvider) GetGRPCCredentials() (credentials.PerRPCCredentials, error) {
if m.creds == nil {
m.creds = &mockCredentials{}
}
return m.creds, nil
}

type mockHTTPClient struct {
roundTripper http.RoundTripper
}

func (m *mockHTTPClient) RoundTripper(base http.RoundTripper) (http.RoundTripper, error) {
if m.roundTripper == nil {
m.roundTripper = &mockRoundTripper{}
}
return m.roundTripper, nil
}

type mockGRPCClient struct {
creds credentials.PerRPCCredentials
}

func (m *mockGRPCClient) PerRPCCredentials() (credentials.PerRPCCredentials, error) {
if m.creds == nil {
m.creds = &mockCredentials{}
}
return m.creds, nil
}

type mockRoundTripper struct{}

func (m *mockRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
return nil, nil
}

type mockCredentials struct{}

func (m *mockCredentials) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{"authorization": "Bearer mock-token"}, nil
}

func (m *mockCredentials) RequireTransportSecurity() bool {
return false
}
3 changes: 3 additions & 0 deletions internal/telemetry/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.24

require (
github.com/stretchr/testify v1.11.1
go.opentelemetry.io/collector/extension/extensionauth v1.40.0
go.opentelemetry.io/collector/featuregate v1.40.0
go.opentelemetry.io/collector/pdata v1.40.0
go.opentelemetry.io/collector/pipeline v1.40.0
Expand Down Expand Up @@ -46,3 +47,5 @@ replace go.opentelemetry.io/collector/pipeline => ../../pipeline
replace go.opentelemetry.io/collector/pdata => ../../pdata

replace go.opentelemetry.io/collector/featuregate => ../../featuregate

replace go.opentelemetry.io/collector/extension/extensionauth => ../../extension/extensionauth
9 changes: 9 additions & 0 deletions internal/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ type TelemetrySettings struct {
// Resource contains the resource attributes for the collector's telemetry.
Resource pcommon.Resource

// AuthenticatorID specifies the name of the extension to use for authenticating
// internal telemetry exports. If set, the collector will use this extension
// to authenticate all HTTP and gRPC connections made by internal telemetry exporters.
AuthenticatorID string

// AuthenticatorManager provides access to authentication for internal telemetry.
// This is used internally by the collector to apply authentication to telemetry exports.
AuthenticatorManager *AuthenticatorManager

// Extra attributes added to instrumentation scopes
extraAttributes attribute.Set
}
Expand Down
66 changes: 62 additions & 4 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/extension/extensionauth"
"go.opentelemetry.io/collector/featuregate"
internaltelemetry "go.opentelemetry.io/collector/internal/telemetry"
"go.opentelemetry.io/collector/internal/telemetry/componentattribute"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/processor"
Expand Down Expand Up @@ -170,10 +172,12 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
}))

srv.telemetrySettings = component.TelemetrySettings{
Logger: logger,
MeterProvider: telProviders.MeterProvider(),
TracerProvider: telProviders.TracerProvider(),
Resource: telProviders.Resource(),
Logger: logger,
MeterProvider: telProviders.MeterProvider(),
TracerProvider: telProviders.TracerProvider(),
Resource: telProviders.Resource(),
AuthenticatorID: cfg.Telemetry.Authenticator,
AuthenticatorManager: internaltelemetry.NewAuthenticatorManager(),
}
srv.host.Reporter = status.NewReporter(srv.host.NotifyComponentStatusChange, func(err error) {
if errors.Is(err, status.ErrStatusNotReady) {
Expand All @@ -193,6 +197,12 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
return nil, err
}

// Configure authentication for internal telemetry if specified
err = srv.configureAuthentication(ctx)
if err != nil {
return nil, fmt.Errorf("failed to configure telemetry authentication: %w", err)
}

if cfg.Telemetry.Metrics.Level != configtelemetry.LevelNone && len(mpConfig.Readers) != 0 {
if err = proctelemetry.RegisterProcessMetrics(srv.telemetrySettings); err != nil {
return nil, fmt.Errorf("failed to register process metrics: %w", err)
Expand Down Expand Up @@ -282,6 +292,54 @@ func (srv *Service) initExtensions(ctx context.Context, cfg extensions.Config) e
return nil
}

// configureAuthentication configures authentication for internal telemetry if specified.
func (srv *Service) configureAuthentication(ctx context.Context) error {
if srv.telemetrySettings.AuthenticatorID == "" {
return nil
}

// Parse the authenticator ID
var authenticatorID component.ID
err := authenticatorID.UnmarshalText([]byte(srv.telemetrySettings.AuthenticatorID))
if err != nil {
return fmt.Errorf("invalid authenticator ID %q: %w", srv.telemetrySettings.AuthenticatorID, err)
}

// Get the extension from the host
extensions := srv.host.GetExtensions()
ext, found := extensions[authenticatorID]
if !found {
return fmt.Errorf("authenticator extension %q not found", authenticatorID)
}

// Create the authenticator provider
var httpClient extensionauth.HTTPClient
var grpcClient extensionauth.GRPCClient

// Check if the extension implements HTTP client authentication
if hc, ok := ext.(extensionauth.HTTPClient); ok {
httpClient = hc
}

// Check if the extension implements gRPC client authentication
if gc, ok := ext.(extensionauth.GRPCClient); ok {
grpcClient = gc
}

if httpClient == nil && grpcClient == nil {
return fmt.Errorf("authenticator extension %q does not implement HTTP or gRPC client authentication", authenticatorID)
}

// Create and set the authenticator provider
provider := internaltelemetry.NewDefaultAuthenticatorProvider(httpClient, grpcClient)
srv.telemetrySettings.AuthenticatorManager.SetAuthenticator(provider)

srv.telemetrySettings.Logger.Info("Configured authentication for internal telemetry",
zap.String("authenticator", authenticatorID.String()))

return nil
}

// Creates the pipeline graph.
func (srv *Service) initGraph(ctx context.Context, cfg Config) error {
var err error
Expand Down
Loading