Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions otelcol/unmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,9 @@ type configSettings struct {
// unmarshal the configSettings from a confmap.Conf.
// After the config is unmarshalled, `Validate()` must be called to validate.
func unmarshal(v *confmap.Conf, factories Factories) (*configSettings, error) {
// TODO remove these params once SDK and resource creation are encapsulated
// within the otelconftelemetry factory. They are not used when creating
// the default config.
// TODO: inject the telemetry factory through factories, once available.
// See https://github.com/open-telemetry/opentelemetry-collector/issues/4970
telFactory := otelconftelemetry.NewFactory(nil, nil)
telFactory := otelconftelemetry.NewFactory()
defaultTelConfig := *telFactory.CreateDefaultConfig().(*otelconftelemetry.Config)

// Unmarshal top level sections and validate.
Expand Down
2 changes: 1 addition & 1 deletion service/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestConfigValidate(t *testing.T) {
}

func TestConfmapMarshalConfig(t *testing.T) {
telFactory := otelconftelemetry.NewFactory(nil, nil)
telFactory := otelconftelemetry.NewFactory()
defaultTelConfig := *telFactory.CreateDefaultConfig().(*otelconftelemetry.Config)
conf := confmap.New()

Expand Down
91 changes: 21 additions & 70 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ import (

config "go.opentelemetry.io/contrib/otelconf/v0.3.0"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
noopmetric "go.opentelemetry.io/otel/metric/noop"
sdkresource "go.opentelemetry.io/otel/sdk/resource"
nooptrace "go.opentelemetry.io/otel/trace/noop"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand All @@ -37,7 +35,6 @@ import (
"go.opentelemetry.io/collector/service/internal/graph"
"go.opentelemetry.io/collector/service/internal/moduleinfo"
"go.opentelemetry.io/collector/service/internal/proctelemetry"
"go.opentelemetry.io/collector/service/internal/resource"
"go.opentelemetry.io/collector/service/internal/status"
"go.opentelemetry.io/collector/service/telemetry"
"go.opentelemetry.io/collector/service/telemetry/otelconftelemetry"
Expand Down Expand Up @@ -101,11 +98,11 @@ type Settings struct {

// Service represents the implementation of a component.Host.
type Service struct {
buildInfo component.BuildInfo
telemetrySettings component.TelemetrySettings
host *graph.Host
collectorConf *confmap.Conf
sdk *config.SDK
buildInfo component.BuildInfo
telemetrySettings component.TelemetrySettings
host *graph.Host
collectorConf *confmap.Conf
telemetryProviders telemetry.Providers
}

// New creates a new Service, its telemetry, and Components.
Expand All @@ -126,39 +123,31 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
collectorConf: set.CollectorConf,
}

// Fetch data for internal telemetry like instance id and sdk version to provide for internal telemetry.
res := resource.New(set.BuildInfo, cfg.Telemetry.Resource)
pcommonRes := pdataFromSdk(res)

mpConfig := &cfg.Telemetry.Metrics.MeterProvider
if mpConfig.Views == nil {
mpConfig.Views = configureViews(cfg.Telemetry.Metrics.Level)
}

sdk, err := otelconftelemetry.NewSDK(ctx, &cfg.Telemetry, res)
if err != nil {
return nil, fmt.Errorf("failed to create SDK: %w", err)
}
srv.sdk = sdk
defer func() {
if err != nil {
err = multierr.Append(err, sdk.Shutdown(ctx))
}
}()

telFactory := otelconftelemetry.NewFactory(sdk, res)
telFactory := otelconftelemetry.NewFactory()
telset := telemetry.Settings{
BuildInfo: set.BuildInfo,
ZapOptions: set.LoggingOptions,
}

logger, loggerProvider, err := telFactory.CreateLogger(ctx, telset, &cfg.Telemetry)
telProviders, err := telFactory.CreateProviders(ctx, telset, &cfg.Telemetry)
if err != nil {
return nil, fmt.Errorf("failed to create logger: %w", err)
return nil, fmt.Errorf("failed to create telemetry providers: %w", err)
}
srv.telemetryProviders = telProviders
defer func() {
if err != nil {
err = multierr.Append(err, telProviders.Shutdown(ctx))
}
}()

// Use initialized logger to handle any subsequent errors
// https://github.com/open-telemetry/opentelemetry-collector/pull/13081
logger := telProviders.Logger()
defer func() {
if err != nil {
logger.Error("error found during service initialization", zap.Error(err))
Expand All @@ -168,6 +157,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
// Wrap the zap.Logger with componentattribute so scope attributes
// can be added and removed dynamically, and tee logs to the
// LoggerProvider.
loggerProvider := telProviders.LoggerProvider()
logger = logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
core = componentattribute.NewConsoleCoreWithAttributes(core, attribute.NewSet())
core = componentattribute.NewOTelTeeCoreWithAttributes(
Expand All @@ -179,23 +169,11 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
return core
}))

tracerProvider, err := telFactory.CreateTracerProvider(ctx, telset, &cfg.Telemetry)
if err != nil {
return nil, fmt.Errorf("failed to create tracer provider: %w", err)
}

logger.Info("Setting up own telemetry...")

mp, err := telFactory.CreateMeterProvider(ctx, telset, &cfg.Telemetry)
if err != nil {
return nil, fmt.Errorf("failed to create meter provider: %w", err)
}
srv.telemetrySettings = component.TelemetrySettings{
Logger: logger,
MeterProvider: mp,
TracerProvider: tracerProvider,
// Construct telemetry attributes from build info and config's resource attributes.
Resource: pcommonRes,
MeterProvider: telProviders.MeterProvider(),
TracerProvider: telProviders.TracerProvider(),
Resource: telProviders.Resource(),
}
srv.host.Reporter = status.NewReporter(srv.host.NotifyComponentStatusChange, func(err error) {
if errors.Is(err, status.ErrStatusNotReady) {
Expand All @@ -220,25 +198,9 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
return nil, fmt.Errorf("failed to register process metrics: %w", err)
}
}

logsAboutMeterProvider(logger, cfg.Telemetry.Metrics, mp)

return srv, nil
}

func logsAboutMeterProvider(logger *zap.Logger, cfg otelconftelemetry.MetricsConfig, mp metric.MeterProvider) {
if cfg.Level == configtelemetry.LevelNone || len(cfg.Readers) == 0 {
logger.Info("Skipped telemetry setup.")
return
}
Comment on lines -230 to -233
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logging is now done inside otelconftelemetry.


if lmp, ok := mp.(interface {
LogAboutServers(logger *zap.Logger, cfg otelconftelemetry.MetricsConfig)
}); ok {
lmp.LogAboutServers(logger, cfg)
}
Comment on lines -235 to -239
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead code, nothing implements this interface.

}

// Start starts the extensions and pipelines. If Start fails Shutdown should be called to ensure a clean state.
// Start does the following steps in order:
// 1. Start all extensions.
Expand Down Expand Up @@ -299,8 +261,8 @@ func (srv *Service) Shutdown(ctx context.Context) error {

srv.telemetrySettings.Logger.Info("Shutdown complete.")

if err := srv.sdk.Shutdown(ctx); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown telemetry: %w", err))
if err := srv.telemetryProviders.Shutdown(ctx); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown telemetry providers: %w", err))
}

return errs
Expand Down Expand Up @@ -344,17 +306,6 @@ func (srv *Service) Logger() *zap.Logger {
return srv.telemetrySettings.Logger
}

func pdataFromSdk(res *sdkresource.Resource) pcommon.Resource {
// pcommon.NewResource is the best way to generate a new resource currently and is safe to use outside of tests.
// Because the resource is signal agnostic, and we need a net new resource, not an existing one, this is the only
// method of creating it without exposing internal packages.
pcommonRes := pcommon.NewResource()
for _, keyValue := range res.Attributes() {
pcommonRes.Attributes().PutStr(string(keyValue.Key), keyValue.Value.AsString())
}
return pcommonRes
}

func dropViewOption(selector *config.ViewSelector) config.View {
return config.View{
Selector: selector,
Expand Down
4 changes: 2 additions & 2 deletions service/telemetry/otelconftelemetry/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ import (

func TestComponentConfigStruct(t *testing.T) {
require.NoError(t, componenttest.CheckConfigStruct(
NewFactory(nil, nil).CreateDefaultConfig(),
NewFactory().CreateDefaultConfig(),
))
}

func TestUnmarshalDefaultConfig(t *testing.T) {
factory := NewFactory(nil, nil)
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
require.NoError(t, confmap.New().Unmarshal(&cfg))
assert.Equal(t, factory.CreateDefaultConfig(), cfg)
Expand Down
38 changes: 5 additions & 33 deletions service/telemetry/otelconftelemetry/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@ import (
"time"

config "go.opentelemetry.io/contrib/otelconf/v0.3.0"
"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/component"
Expand All @@ -28,25 +23,18 @@ var useLocalHostAsDefaultMetricsAddressFeatureGate = featuregate.GlobalRegistry(
featuregate.WithRegisterDescription("controls whether default Prometheus metrics server use localhost as the default host for their endpoints"),
)

// Factory is factory interface for telemetry.
// Factory is factory interface for telemetry providers.
// This interface cannot be directly implemented. Implementations must
// use the NewFactory to implement it.
//
// NOTE This API is experimental and will change soon - use at your own risk.
// See https://github.com/open-telemetry/opentelemetry-collector/issues/4970
type Factory interface {
// CreateDefaultConfig creates the default configuration for the telemetry.
// TODO: Should we just inherit from component.Factory?
CreateDefaultConfig() component.Config

// CreateLogger creates a logger.
CreateLogger(context.Context, telemetry.Settings, component.Config) (*zap.Logger, log.LoggerProvider, error)

// CreateTracerProvider creates a TracerProvider.
CreateTracerProvider(context.Context, telemetry.Settings, component.Config) (trace.TracerProvider, error)

// CreateMeterProvider creates a MeterProvider.
CreateMeterProvider(context.Context, telemetry.Settings, component.Config) (metric.MeterProvider, error)
// CreateProviders creates telemetry providers.
CreateProviders(context.Context, telemetry.Settings, component.Config) (telemetry.Providers, error)

// unexportedFactoryFunc is used to prevent external implementations of Factory.
unexportedFactoryFunc()
Expand All @@ -56,24 +44,8 @@ type Factory interface {
//
// NOTE This API is experimental and will change soon - use at your own risk.
// See https://github.com/open-telemetry/opentelemetry-collector/issues/4970
//
// TODO remove the parameters once the factory is fully self-contained
// and is responsible for creating the SDK and resource itself.
func NewFactory(sdk *config.SDK, res *resource.Resource) Factory {
return newFactory(createDefaultConfig,
withLogger(func(_ context.Context, set telemetry.Settings, cfg component.Config) (*zap.Logger, log.LoggerProvider, error) {
c := *cfg.(*Config)
return newLogger(set, c, sdk, res)
}),
withTracerProvider(func(_ context.Context, _ telemetry.Settings, cfg component.Config) (trace.TracerProvider, error) {
c := *cfg.(*Config)
return newTracerProvider(c, sdk)
}),
withMeterProvider(func(_ context.Context, _ telemetry.Settings, cfg component.Config) (metric.MeterProvider, error) {
c := *cfg.(*Config)
return newMeterProvider(c, sdk)
}),
)
func NewFactory() Factory {
return newFactory(createDefaultConfig, createProviders)
}

func createDefaultConfig() component.Config {
Expand Down
67 changes: 7 additions & 60 deletions service/telemetry/otelconftelemetry/factory_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,6 @@ package otelconftelemetry // import "go.opentelemetry.io/collector/service/telem
import (
"context"

"go.opentelemetry.io/otel/log"
lognoop "go.opentelemetry.io/otel/log/noop"
"go.opentelemetry.io/otel/metric"
metricnoop "go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/trace"
tracenoop "go.opentelemetry.io/otel/trace/noop"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/service/telemetry"
)
Expand All @@ -38,72 +30,27 @@ var _ Factory = (*factory)(nil)
// Factory is the implementation of Factory.
type factory struct {
createDefaultConfig component.CreateDefaultConfigFunc
createLoggerFunc
createTracerProviderFunc
createMeterProviderFunc
createProvidersFunc
}

func (f *factory) CreateDefaultConfig() component.Config {
return f.createDefaultConfig()
}

// createLoggerFunc is the equivalent of Factory.CreateLogger.
type createLoggerFunc func(context.Context, telemetry.Settings, component.Config) (*zap.Logger, log.LoggerProvider, error)

// withLogger overrides the default no-op logger.
func withLogger(createLogger createLoggerFunc) factoryOption {
return factoryOptionFunc(func(o *factory) {
o.createLoggerFunc = createLogger
})
}

func (f *factory) CreateLogger(ctx context.Context, set telemetry.Settings, cfg component.Config) (*zap.Logger, log.LoggerProvider, error) {
if f.createLoggerFunc == nil {
return zap.NewNop(), lognoop.NewLoggerProvider(), nil
}
return f.createLoggerFunc(ctx, set, cfg)
}

// createTracerProviderFunc is the equivalent of Factory.CreateTracerProvider.
type createTracerProviderFunc func(context.Context, telemetry.Settings, component.Config) (trace.TracerProvider, error)

// withTracerProvider overrides the default no-op tracer provider.
func withTracerProvider(createTracerProvider createTracerProviderFunc) factoryOption {
return factoryOptionFunc(func(o *factory) {
o.createTracerProviderFunc = createTracerProvider
})
}

func (f *factory) CreateTracerProvider(ctx context.Context, set telemetry.Settings, cfg component.Config) (trace.TracerProvider, error) {
if f.createTracerProviderFunc == nil {
return tracenoop.NewTracerProvider(), nil
}
return f.createTracerProviderFunc(ctx, set, cfg)
}

// createMeterProviderFunc is the equivalent of Factory.CreateMeterProvider.
type createMeterProviderFunc func(context.Context, telemetry.Settings, component.Config) (metric.MeterProvider, error)

// withMeterProvider overrides the default no-op meter provider.
func withMeterProvider(createMeterProvider createMeterProviderFunc) factoryOption {
return factoryOptionFunc(func(o *factory) {
o.createMeterProviderFunc = createMeterProvider
})
}
// createProvidersFunc is the equivalent of Factory.CreateProviders.
type createProvidersFunc func(context.Context, telemetry.Settings, component.Config) (telemetry.Providers, error)

func (f *factory) CreateMeterProvider(ctx context.Context, set telemetry.Settings, cfg component.Config) (metric.MeterProvider, error) {
if f.createMeterProviderFunc == nil {
return metricnoop.NewMeterProvider(), nil
}
return f.createMeterProviderFunc(ctx, set, cfg)
func (f *factory) CreateProviders(ctx context.Context, set telemetry.Settings, cfg component.Config) (telemetry.Providers, error) {
return f.createProvidersFunc(ctx, set, cfg)
}

func (f *factory) unexportedFactoryFunc() {}

// newFactory returns a new Factory.
func newFactory(createDefaultConfig component.CreateDefaultConfigFunc, options ...factoryOption) Factory {
func newFactory(createDefaultConfig component.CreateDefaultConfigFunc, createProviders createProvidersFunc, options ...factoryOption) Factory {
f := &factory{
createDefaultConfig: createDefaultConfig,
createProvidersFunc: createProviders,
}
for _, op := range options {
op.applyTelemetryFactoryOption(f)
Expand Down
Loading
Loading