diff --git a/.chloggen/config.yaml b/.chloggen/config.yaml index 621f2958056..637a0bd7212 100644 --- a/.chloggen/config.yaml +++ b/.chloggen/config.yaml @@ -51,6 +51,7 @@ components: - pkg/xprocessor - pkg/xreceiver - pkg/xscraper + - pkg/xscraperhelper - processor/batch - processor/memory_limiter - processor/sample diff --git a/.chloggen/xscraperhelper.yaml b/.chloggen/xscraperhelper.yaml new file mode 100644 index 00000000000..3981b745dba --- /dev/null +++ b/.chloggen/xscraperhelper.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: new_component + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/otlp) +component: pkg/xscraperhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add xscraperhelper for the experimental OTel profiling signal. + +# One or more tracking issues or pull requests related to the change +issues: [14235] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 8c5b4f8b1a0..c2edf89106d 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -62,6 +62,7 @@ receiver/xreceiver/ @open-telemetry/collector-approvers scraper/ @open-telemetry/collector-approvers scraper/xscraper @open-telemetry/collector-approvers scraper/scraperhelper/ @open-telemetry/collector-approvers +scraper/scraperhelper/xscraperhelper @open-telemetry/collector-approvers service/ @open-telemetry/collector-approvers service/internal/graph/ @open-telemetry/collector-approvers diff --git a/.github/ISSUE_TEMPLATE/bug_report.yaml b/.github/ISSUE_TEMPLATE/bug_report.yaml index bba1d2ed876..1e1c0a635e3 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.yaml +++ b/.github/ISSUE_TEMPLATE/bug_report.yaml @@ -25,6 +25,17 @@ body: - cmd/mdatagen/internal/sampleprocessor - cmd/mdatagen/internal/samplereceiver - cmd/mdatagen/internal/samplescraper + - config/configauth + - config/configcompression + - config/configgrpc + - config/confighttp + - config/configmiddleware + - config/confignet + - config/configopaque + - config/configoptional + - config/configretry + - config/configtelemetry + - config/configtls - confmap - confmap/provider/envprovider - confmap/provider/fileprovider @@ -61,6 +72,8 @@ body: - receiver/x - scraper - scraper/scraperhelper + - scraper/scraperhelper/xscraperhelper + - scraper/xscraper - service - service/internal/graph # End components list diff --git a/.github/ISSUE_TEMPLATE/feature_request.yaml b/.github/ISSUE_TEMPLATE/feature_request.yaml index 2b10eda576f..4e2ffc4e6a9 100644 --- a/.github/ISSUE_TEMPLATE/feature_request.yaml +++ b/.github/ISSUE_TEMPLATE/feature_request.yaml @@ -19,6 +19,17 @@ body: - cmd/mdatagen/internal/sampleprocessor - cmd/mdatagen/internal/samplereceiver - cmd/mdatagen/internal/samplescraper + - config/configauth + - config/configcompression + - config/configgrpc + - config/confighttp + - config/configmiddleware + - config/confignet + - config/configopaque + - config/configoptional + - config/configretry + - config/configtelemetry + - config/configtls - confmap - confmap/provider/envprovider - confmap/provider/fileprovider @@ -55,6 +66,8 @@ body: - receiver/x - scraper - scraper/scraperhelper + - scraper/scraperhelper/xscraperhelper + - scraper/xscraper - service - service/internal/graph # End components list diff --git a/.github/ISSUE_TEMPLATE/other.yaml b/.github/ISSUE_TEMPLATE/other.yaml index 8ec6a71b6e7..ffab0e2771e 100644 --- a/.github/ISSUE_TEMPLATE/other.yaml +++ b/.github/ISSUE_TEMPLATE/other.yaml @@ -18,6 +18,17 @@ body: - cmd/mdatagen/internal/sampleprocessor - cmd/mdatagen/internal/samplereceiver - cmd/mdatagen/internal/samplescraper + - config/configauth + - config/configcompression + - config/configgrpc + - config/confighttp + - config/configmiddleware + - config/confignet + - config/configopaque + - config/configoptional + - config/configretry + - config/configtelemetry + - config/configtls - confmap - confmap/provider/envprovider - confmap/provider/fileprovider @@ -54,6 +65,8 @@ body: - receiver/x - scraper - scraper/scraperhelper + - scraper/scraperhelper/xscraperhelper + - scraper/xscraper - service - service/internal/graph # End components list diff --git a/.github/workflows/utils/cspell.json b/.github/workflows/utils/cspell.json index 9fa40cbdbfc..21613a48e02 100644 --- a/.github/workflows/utils/cspell.json +++ b/.github/workflows/utils/cspell.json @@ -508,6 +508,7 @@ "xprocessorhelper", "xreceiver", "xscraper", + "xscraperhelper", "yamlmapprovider", "yamlprovider", "yamls", diff --git a/scraper/scraperhelper/controller.go b/scraper/scraperhelper/controller.go index 257df474ac2..c713f1f10e7 100644 --- a/scraper/scraperhelper/controller.go +++ b/scraper/scraperhelper/controller.go @@ -5,21 +5,26 @@ package scraperhelper // import "go.opentelemetry.io/collector/scraper/scraperhe import ( "context" - "sync" "time" - "go.uber.org/multierr" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" - "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/scraper" "go.opentelemetry.io/collector/scraper/scrapererror" + "go.opentelemetry.io/collector/scraper/scraperhelper/internal/controller" ) +type ControllerConfig = controller.ControllerConfig + +// NewDefaultControllerConfig returns default scraper controller +// settings with a collection interval of one minute. +func NewDefaultControllerConfig() ControllerConfig { + return controller.NewDefaultControllerConfig() +} + // ControllerOption apply changes to internal options. type ControllerOption interface { apply(*controllerOptions) @@ -75,111 +80,6 @@ type controllerOptions struct { factoriesWithConfig []factoryWithConfig } -type controller[T component.Component] struct { - collectionInterval time.Duration - initialDelay time.Duration - timeout time.Duration - - scrapers []T - scrapeFunc func(*controller[T]) - tickerCh <-chan time.Time - - done chan struct{} - wg sync.WaitGroup - - obsrecv *receiverhelper.ObsReport -} - -func newController[T component.Component]( - cfg *ControllerConfig, - rSet receiver.Settings, - scrapers []T, - scrapeFunc func(*controller[T]), - tickerCh <-chan time.Time, -) (*controller[T], error) { - obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ - ReceiverID: rSet.ID, - Transport: "", - ReceiverCreateSettings: rSet, - }) - if err != nil { - return nil, err - } - - cs := &controller[T]{ - collectionInterval: cfg.CollectionInterval, - initialDelay: cfg.InitialDelay, - timeout: cfg.Timeout, - scrapers: scrapers, - scrapeFunc: scrapeFunc, - done: make(chan struct{}), - tickerCh: tickerCh, - obsrecv: obsrecv, - } - - return cs, nil -} - -// Start the receiver, invoked during service start. -func (sc *controller[T]) Start(ctx context.Context, host component.Host) error { - for _, scrp := range sc.scrapers { - if err := scrp.Start(ctx, host); err != nil { - return err - } - } - - sc.startScraping() - return nil -} - -// Shutdown the receiver, invoked during service shutdown. -func (sc *controller[T]) Shutdown(ctx context.Context) error { - // Signal the goroutine to stop. - close(sc.done) - sc.wg.Wait() - var errs error - for _, scrp := range sc.scrapers { - errs = multierr.Append(errs, scrp.Shutdown(ctx)) - } - - return errs -} - -// startScraping initiates a ticker that calls Scrape based on the configured -// collection interval. -func (sc *controller[T]) startScraping() { - sc.wg.Add(1) - go func() { - defer sc.wg.Done() - if sc.initialDelay > 0 { - select { - case <-time.After(sc.initialDelay): - case <-sc.done: - return - } - } - - if sc.tickerCh == nil { - ticker := time.NewTicker(sc.collectionInterval) - defer ticker.Stop() - - sc.tickerCh = ticker.C - } - // Call scrape method during initialization to ensure - // that scrapers start from when the component starts - // instead of waiting for the full duration to start. - sc.scrapeFunc(sc) - for { - select { - case <-sc.tickerCh: - sc.scrapeFunc(sc) - case <-sc.done: - return - } - } - }() -} - // NewLogsController creates a receiver.Logs with the configured options, that can control multiple scraper.Logs. func NewLogsController(cfg *ControllerConfig, rSet receiver.Settings, @@ -189,7 +89,7 @@ func NewLogsController(cfg *ControllerConfig, co := getOptions(options) scrapers := make([]scraper.Logs, 0, len(co.factoriesWithConfig)) for _, fwc := range co.factoriesWithConfig { - set := getSettings(fwc.f.Type(), rSet) + set := controller.GetSettings(fwc.f.Type(), rSet) s, err := fwc.f.CreateLogs(context.Background(), set, fwc.cfg) if err != nil { return nil, err @@ -200,8 +100,8 @@ func NewLogsController(cfg *ControllerConfig, } scrapers = append(scrapers, s) } - return newController[scraper.Logs]( - cfg, rSet, scrapers, func(c *controller[scraper.Logs]) { scrapeLogs(c, nextConsumer) }, co.tickerCh) + return controller.NewController[scraper.Logs]( + cfg, rSet, scrapers, func(c *controller.Controller[scraper.Logs]) { scrapeLogs(c, nextConsumer) }, co.tickerCh) } // NewMetricsController creates a receiver.Metrics with the configured options, that can control multiple scraper.Metrics. @@ -213,7 +113,7 @@ func NewMetricsController(cfg *ControllerConfig, co := getOptions(options) scrapers := make([]scraper.Metrics, 0, len(co.factoriesWithConfig)) for _, fwc := range co.factoriesWithConfig { - set := getSettings(fwc.f.Type(), rSet) + set := controller.GetSettings(fwc.f.Type(), rSet) s, err := fwc.f.CreateMetrics(context.Background(), set, fwc.cfg) if err != nil { return nil, err @@ -224,17 +124,17 @@ func NewMetricsController(cfg *ControllerConfig, } scrapers = append(scrapers, s) } - return newController[scraper.Metrics]( - cfg, rSet, scrapers, func(c *controller[scraper.Metrics]) { scrapeMetrics(c, nextConsumer) }, co.tickerCh) + return controller.NewController[scraper.Metrics]( + cfg, rSet, scrapers, func(c *controller.Controller[scraper.Metrics]) { scrapeMetrics(c, nextConsumer) }, co.tickerCh) } -func scrapeLogs(c *controller[scraper.Logs], nextConsumer consumer.Logs) { - ctx, done := withScrapeContext(c.timeout) +func scrapeLogs(c *controller.Controller[scraper.Logs], nextConsumer consumer.Logs) { + ctx, done := controller.WithScrapeContext(c.Timeout) defer done() logs := plog.NewLogs() - for i := range c.scrapers { - md, err := c.scrapers[i].ScrapeLogs(ctx) + for i := range c.Scrapers { + md, err := c.Scrapers[i].ScrapeLogs(ctx) if err != nil && !scrapererror.IsPartialScrapeError(err) { continue } @@ -242,18 +142,18 @@ func scrapeLogs(c *controller[scraper.Logs], nextConsumer consumer.Logs) { } logRecordCount := logs.LogRecordCount() - ctx = c.obsrecv.StartMetricsOp(ctx) + ctx = c.Obsrecv.StartMetricsOp(ctx) err := nextConsumer.ConsumeLogs(ctx, logs) - c.obsrecv.EndMetricsOp(ctx, "", logRecordCount, err) + c.Obsrecv.EndMetricsOp(ctx, "", logRecordCount, err) } -func scrapeMetrics(c *controller[scraper.Metrics], nextConsumer consumer.Metrics) { - ctx, done := withScrapeContext(c.timeout) +func scrapeMetrics(c *controller.Controller[scraper.Metrics], nextConsumer consumer.Metrics) { + ctx, done := controller.WithScrapeContext(c.Timeout) defer done() metrics := pmetric.NewMetrics() - for i := range c.scrapers { - md, err := c.scrapers[i].ScrapeMetrics(ctx) + for i := range c.Scrapers { + md, err := c.Scrapers[i].ScrapeMetrics(ctx) if err != nil && !scrapererror.IsPartialScrapeError(err) { continue } @@ -261,9 +161,9 @@ func scrapeMetrics(c *controller[scraper.Metrics], nextConsumer consumer.Metrics } dataPointCount := metrics.DataPointCount() - ctx = c.obsrecv.StartMetricsOp(ctx) + ctx = c.Obsrecv.StartMetricsOp(ctx) err := nextConsumer.ConsumeMetrics(ctx, metrics) - c.obsrecv.EndMetricsOp(ctx, "", dataPointCount, err) + c.Obsrecv.EndMetricsOp(ctx, "", dataPointCount, err) } func getOptions(options []ControllerOption) controllerOptions { @@ -273,21 +173,3 @@ func getOptions(options []ControllerOption) controllerOptions { } return co } - -func getSettings(sType component.Type, rSet receiver.Settings) scraper.Settings { - return scraper.Settings{ - ID: component.NewID(sType), - TelemetrySettings: rSet.TelemetrySettings, - BuildInfo: rSet.BuildInfo, - } -} - -// withScrapeContext will return a context that has no deadline if timeout is 0 -// which implies no explicit timeout had occurred, otherwise, a context -// with a deadline of the provided timeout is returned. -func withScrapeContext(timeout time.Duration) (context.Context, context.CancelFunc) { - if timeout == 0 { - return context.WithCancel(context.Background()) - } - return context.WithTimeout(context.Background(), timeout) -} diff --git a/scraper/scraperhelper/controller_test.go b/scraper/scraperhelper/controller_test.go index 3217d072faf..c111d9262bc 100644 --- a/scraper/scraperhelper/controller_test.go +++ b/scraper/scraperhelper/controller_test.go @@ -12,7 +12,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" sdktrace "go.opentelemetry.io/otel/sdk/trace" @@ -27,29 +26,11 @@ import ( "go.opentelemetry.io/collector/receiver/receivertest" "go.opentelemetry.io/collector/scraper" "go.opentelemetry.io/collector/scraper/scrapererror" + "go.opentelemetry.io/collector/scraper/scraperhelper/internal/controller" "go.opentelemetry.io/collector/scraper/scraperhelper/internal/metadatatest" + "go.opentelemetry.io/collector/scraper/scraperhelper/internal/testhelper" ) -type testInitialize struct { - ch chan bool - err error -} - -func (ts *testInitialize) start(context.Context, component.Host) error { - ts.ch <- true - return ts.err -} - -type testClose struct { - ch chan bool - err error -} - -func (ts *testClose) shutdown(context.Context) error { - ts.ch <- true - return ts.err -} - type testScrape struct { ch chan int timesScrapeCalled int @@ -167,7 +148,7 @@ func TestLogsScrapeController(t *testing.T) { if expectedStartErr != nil { assert.Equal(t, expectedStartErr, err) } else if test.initialize { - assertChannelsCalled(t, initializeChs, "start was not called") + testhelper.AssertChannelsCalled(t, initializeChs, "start was not called") } const iterations = 5 @@ -199,7 +180,7 @@ func TestLogsScrapeController(t *testing.T) { spans := tel.SpanRecorder.Ended() assertReceiverSpan(t, spans) - assertScraperSpan(t, test.scrapeErr, spans, "scraper/scraper/ScrapeLogs") + testhelper.AssertScraperSpan(t, test.scrapeErr, spans, "scraper/scraper/ScrapeLogs") assertLogsScraperObsMetrics(t, tel, receiverID, component.MustNewID("scraper"), test.scrapeErr, sink) } @@ -208,7 +189,7 @@ func TestLogsScrapeController(t *testing.T) { if expectedShutdownErr != nil { assert.EqualError(t, err, expectedShutdownErr.Error()) } else if test.close { - assertChannelsCalled(t, closeChs, "shutdown was not called") + testhelper.AssertChannelsCalled(t, closeChs, "shutdown was not called") } }) } @@ -278,7 +259,7 @@ func TestMetricsScrapeController(t *testing.T) { if expectedStartErr != nil { assert.Equal(t, expectedStartErr, err) } else if test.initialize { - assertChannelsCalled(t, initializeChs, "start was not called") + testhelper.AssertChannelsCalled(t, initializeChs, "start was not called") } const iterations = 5 @@ -310,7 +291,7 @@ func TestMetricsScrapeController(t *testing.T) { spans := tel.SpanRecorder.Ended() assertReceiverSpan(t, spans) - assertScraperSpan(t, test.scrapeErr, spans, "scraper/scraper/ScrapeMetrics") + testhelper.AssertScraperSpan(t, test.scrapeErr, spans, "scraper/scraper/ScrapeMetrics") assertMetricsScraperObsMetrics(t, tel, receiverID, component.MustNewID("scraper"), test.scrapeErr, sink) } @@ -319,7 +300,7 @@ func TestMetricsScrapeController(t *testing.T) { if expectedShutdownErr != nil { assert.EqualError(t, err, expectedShutdownErr.Error()) } else if test.close { - assertChannelsCalled(t, closeChs, "shutdown was not called") + testhelper.AssertChannelsCalled(t, closeChs, "shutdown was not called") } }) } @@ -332,13 +313,13 @@ func configureLogOptions(t *testing.T, test scraperTestCase, initializeChs []cha var scraperOptions []scraper.Option if test.initialize { initializeChs[i] = make(chan bool, 1) - ti := &testInitialize{ch: initializeChs[i], err: test.initializeErr} - scraperOptions = append(scraperOptions, scraper.WithStart(ti.start)) + ti := testhelper.NewTestInitialize(initializeChs[i], test.initializeErr) + scraperOptions = append(scraperOptions, scraper.WithStart(ti.Start)) } if test.close { closeChs[i] = make(chan bool, 1) - tc := &testClose{ch: closeChs[i], err: test.closeErr} - scraperOptions = append(scraperOptions, scraper.WithShutdown(tc.shutdown)) + tc := testhelper.NewTestClose(closeChs[i], test.closeErr) + scraperOptions = append(scraperOptions, scraper.WithShutdown(tc.Shutdown)) } scrapeLogsChs[i] = make(chan int) @@ -359,13 +340,13 @@ func configureMetricOptions(t *testing.T, test scraperTestCase, initializeChs [] var scraperOptions []scraper.Option if test.initialize { initializeChs[i] = make(chan bool, 1) - ti := &testInitialize{ch: initializeChs[i], err: test.initializeErr} - scraperOptions = append(scraperOptions, scraper.WithStart(ti.start)) + ti := testhelper.NewTestInitialize(initializeChs[i], test.initializeErr) + scraperOptions = append(scraperOptions, scraper.WithStart(ti.Start)) } if test.close { closeChs[i] = make(chan bool, 1) - tc := &testClose{ch: closeChs[i], err: test.closeErr} - scraperOptions = append(scraperOptions, scraper.WithShutdown(tc.shutdown)) + tc := testhelper.NewTestClose(closeChs[i], test.closeErr) + scraperOptions = append(scraperOptions, scraper.WithShutdown(tc.Shutdown)) } scrapeMetricsChs[i] = make(chan int) @@ -395,20 +376,6 @@ func getExpectedShutdownErr(test scraperTestCase) error { return errs } -func assertChannelsCalled(t *testing.T, chs []chan bool, message string) { - for _, ic := range chs { - assertChannelCalled(t, ic, message) - } -} - -func assertChannelCalled(t *testing.T, ch chan bool, message string) { - select { - case <-ch: - default: - assert.Fail(t, message) - } -} - func assertReceiverSpan(t *testing.T, spans []sdktrace.ReadOnlySpan) { receiverSpan := false for _, span := range spans { @@ -420,26 +387,6 @@ func assertReceiverSpan(t *testing.T, spans []sdktrace.ReadOnlySpan) { assert.True(t, receiverSpan) } -func assertScraperSpan(t *testing.T, expectedErr error, spans []sdktrace.ReadOnlySpan, expectedSpanName string) { - expectedStatusCode := codes.Unset - expectedStatusMessage := "" - if expectedErr != nil { - expectedStatusCode = codes.Error - expectedStatusMessage = expectedErr.Error() - } - - scraperSpan := false - for _, span := range spans { - if span.Name() == expectedSpanName { - scraperSpan = true - assert.Equal(t, expectedStatusCode, span.Status().Code) - assert.Equal(t, expectedStatusMessage, span.Status().Description) - break - } - } - assert.True(t, scraperSpan) -} - func assertLogsScraperObsMetrics(t *testing.T, tel *componenttest.Telemetry, receiver, scraper component.ID, expectedErr error, sink *consumertest.LogsSink) { logRecordCounts := 0 for _, md := range sink.AllLogs() { @@ -810,3 +757,9 @@ func addLogsScraper(t component.Type, sc scraper.Logs) ControllerOption { }, component.StabilityLevelAlpha)) return AddFactoryWithConfig(f, nil) } + +func TestNewDefaultControllerConfig(t *testing.T) { + controllerConfig := NewDefaultControllerConfig() + intControllerConfig := controller.NewDefaultControllerConfig() + require.Equal(t, intControllerConfig, controllerConfig) +} diff --git a/scraper/scraperhelper/config.go b/scraper/scraperhelper/internal/controller/config.go similarity index 94% rename from scraper/scraperhelper/config.go rename to scraper/scraperhelper/internal/controller/config.go index ebcb68644e1..dffffac21a2 100644 --- a/scraper/scraperhelper/config.go +++ b/scraper/scraperhelper/internal/controller/config.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package scraperhelper // import "go.opentelemetry.io/collector/scraper/scraperhelper" +package controller // import "go.opentelemetry.io/collector/scraper/scraperhelper/internal/controller" import ( "errors" diff --git a/scraper/scraperhelper/config_test.go b/scraper/scraperhelper/internal/controller/config_test.go similarity index 90% rename from scraper/scraperhelper/config_test.go rename to scraper/scraperhelper/internal/controller/config_test.go index c82b95ddfda..52a61866fc4 100644 --- a/scraper/scraperhelper/config_test.go +++ b/scraper/scraperhelper/internal/controller/config_test.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package scraperhelper +package controller // import "go.opentelemetry.io/collector/scraper/scraperhelper/internal/controller" import ( "testing" diff --git a/scraper/scraperhelper/internal/controller/controller.go b/scraper/scraperhelper/internal/controller/controller.go new file mode 100644 index 00000000000..3d00113c969 --- /dev/null +++ b/scraper/scraperhelper/internal/controller/controller.go @@ -0,0 +1,142 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// package controller provides functionality used in scraperhelper and xscraperhelper. + +package controller // import "go.opentelemetry.io/collector/scraper/scraperhelper/internal/controller" + +import ( + "context" + "sync" + "time" + + "go.uber.org/multierr" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receiverhelper" + "go.opentelemetry.io/collector/scraper" +) + +type Controller[T component.Component] struct { + collectionInterval time.Duration + initialDelay time.Duration + Timeout time.Duration + + Scrapers []T + scrapeFunc func(*Controller[T]) + tickerCh <-chan time.Time + + done chan struct{} + wg sync.WaitGroup + + Obsrecv *receiverhelper.ObsReport +} + +func NewController[T component.Component]( + cfg *ControllerConfig, + rSet receiver.Settings, + scrapers []T, + scrapeFunc func(*Controller[T]), + tickerCh <-chan time.Time, +) (*Controller[T], error) { + obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ + ReceiverID: rSet.ID, + Transport: "", + ReceiverCreateSettings: rSet, + }) + if err != nil { + return nil, err + } + + cs := &Controller[T]{ + collectionInterval: cfg.CollectionInterval, + initialDelay: cfg.InitialDelay, + Timeout: cfg.Timeout, + Scrapers: scrapers, + scrapeFunc: scrapeFunc, + done: make(chan struct{}), + tickerCh: tickerCh, + Obsrecv: obsrecv, + } + + return cs, nil +} + +// Start the receiver, invoked during service start. +func (sc *Controller[T]) Start(ctx context.Context, host component.Host) error { + for _, scrp := range sc.Scrapers { + if err := scrp.Start(ctx, host); err != nil { + return err + } + } + + sc.startScraping() + return nil +} + +// Shutdown the receiver, invoked during service shutdown. +func (sc *Controller[T]) Shutdown(ctx context.Context) error { + // Signal the goroutine to stop. + close(sc.done) + sc.wg.Wait() + var errs error + for _, scrp := range sc.Scrapers { + errs = multierr.Append(errs, scrp.Shutdown(ctx)) + } + + return errs +} + +// startScraping initiates a ticker that calls Scrape based on the configured +// collection interval. +func (sc *Controller[T]) startScraping() { + sc.wg.Add(1) + go func() { + defer sc.wg.Done() + if sc.initialDelay > 0 { + select { + case <-time.After(sc.initialDelay): + case <-sc.done: + return + } + } + + if sc.tickerCh == nil { + ticker := time.NewTicker(sc.collectionInterval) + defer ticker.Stop() + + sc.tickerCh = ticker.C + } + // Call scrape method during initialization to ensure + // that scrapers start from when the component starts + // instead of waiting for the full duration to start. + sc.scrapeFunc(sc) + for { + select { + case <-sc.tickerCh: + sc.scrapeFunc(sc) + case <-sc.done: + return + } + } + }() +} + +func GetSettings(sType component.Type, rSet receiver.Settings) scraper.Settings { + return scraper.Settings{ + ID: component.NewID(sType), + TelemetrySettings: rSet.TelemetrySettings, + BuildInfo: rSet.BuildInfo, + } +} + +// WithScrapeContext will return a context that has no deadline if timeout is 0 +// which implies no explicit timeout had occurred, otherwise, a context +// with a deadline of the provided timeout is returned. +func WithScrapeContext(timeout time.Duration) (context.Context, context.CancelFunc) { + if timeout == 0 { + return context.WithCancel(context.Background()) + } + return context.WithTimeout(context.Background(), timeout) +} diff --git a/scraper/scraperhelper/internal/testhelper/helper.go b/scraper/scraperhelper/internal/testhelper/helper.go new file mode 100644 index 00000000000..ec2dfe4cca9 --- /dev/null +++ b/scraper/scraperhelper/internal/testhelper/helper.go @@ -0,0 +1,85 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// package testhelper provides functionality used in tests in scraperhelper and xscraperhelper. + +package testhelper // import "go.opentelemetry.io/collector/scraper/scraperhelper/internal/testhelper" + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel/codes" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + + "go.opentelemetry.io/collector/component" +) + +type TestInitialize struct { + ch chan bool + err error +} + +func NewTestInitialize(ch chan bool, err error) *TestInitialize { + return &TestInitialize{ + ch: ch, + err: err, + } +} + +func (ts *TestInitialize) Start(context.Context, component.Host) error { + ts.ch <- true + return ts.err +} + +type TestClose struct { + ch chan bool + err error +} + +func NewTestClose(ch chan bool, err error) *TestClose { + return &TestClose{ + ch: ch, + err: err, + } +} + +func (ts *TestClose) Shutdown(context.Context) error { + ts.ch <- true + return ts.err +} + +func AssertChannelCalled(t *testing.T, ch chan bool, message string) { + select { + case <-ch: + default: + assert.Fail(t, message) + } +} + +func AssertChannelsCalled(t *testing.T, chs []chan bool, message string) { + for _, ic := range chs { + AssertChannelCalled(t, ic, message) + } +} + +func AssertScraperSpan(t *testing.T, expectedErr error, spans []sdktrace.ReadOnlySpan, expectedSpanName string) { + expectedStatusCode := codes.Unset + expectedStatusMessage := "" + if expectedErr != nil { + expectedStatusCode = codes.Error + expectedStatusMessage = expectedErr.Error() + } + + scraperSpan := false + for _, span := range spans { + if span.Name() == expectedSpanName { + scraperSpan = true + assert.Equal(t, expectedStatusCode, span.Status().Code) + assert.Equal(t, expectedStatusMessage, span.Status().Description) + break + } + } + assert.True(t, scraperSpan) +} diff --git a/scraper/scraperhelper/xscraperhelper/Makefile b/scraper/scraperhelper/xscraperhelper/Makefile new file mode 100644 index 00000000000..bdd863a203b --- /dev/null +++ b/scraper/scraperhelper/xscraperhelper/Makefile @@ -0,0 +1 @@ +include ../../../Makefile.Common diff --git a/scraper/scraperhelper/xscraperhelper/controller.go b/scraper/scraperhelper/xscraperhelper/controller.go new file mode 100644 index 00000000000..257662ad0f2 --- /dev/null +++ b/scraper/scraperhelper/xscraperhelper/controller.go @@ -0,0 +1,122 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package xscraperhelper provides utilities for scrapers. +package xscraperhelper // import "go.opentelemetry.io/collector/scraper/scraperhelper/xscraperhelper" + +import ( + "context" + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer/xconsumer" + "go.opentelemetry.io/collector/pdata/pprofile" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/xreceiver" + "go.opentelemetry.io/collector/scraper/scrapererror" + "go.opentelemetry.io/collector/scraper/scraperhelper" + "go.opentelemetry.io/collector/scraper/scraperhelper/internal/controller" + "go.opentelemetry.io/collector/scraper/xscraper" +) + +const ( + // scraperKey used to identify scrapers in metrics and traces. + scraperKey = "scraper" + spanNameSep = "/" + // receiverKey used to identify receivers in metrics and traces. + receiverKey = "receiver" + // FormatKey used to identify the format of the data received. + formatKey = "format" +) + +type factoryWithConfig struct { + f xscraper.Factory + cfg component.Config +} + +type controllerOptions struct { + tickerCh <-chan time.Time + factoriesWithConfig []factoryWithConfig +} + +// ControllerOption apply changes to internal options. +type ControllerOption interface { + apply(*controllerOptions) +} + +type optionFunc func(*controllerOptions) + +func (of optionFunc) apply(e *controllerOptions) { + of(e) +} + +// AddFactoryWithConfig configures the scraper.Factory and associated config that +// will be used to create a new scraper. The created scraper will be called with +// the specified options, and at the specified collection interval. +// +// Observability information will be reported, and the scraped metrics +// will be passed to the next consumer. +func AddFactoryWithConfig(f xscraper.Factory, cfg component.Config) ControllerOption { + return optionFunc(func(o *controllerOptions) { + o.factoriesWithConfig = append(o.factoriesWithConfig, factoryWithConfig{f: f, cfg: cfg}) + }) +} + +// WithTickerChannel allows you to override the scraper controller's ticker +// channel to specify when scrape is called. This is only expected to be +// used by tests. +func WithTickerChannel(tickerCh <-chan time.Time) ControllerOption { + return optionFunc(func(o *controllerOptions) { + o.tickerCh = tickerCh + }) +} + +// NewProfilesController creates a receiver.Profiles with the configured options, that can control multiple xscraper.Profiles. +func NewProfilesController(cfg *scraperhelper.ControllerConfig, + rSet receiver.Settings, + nextConsumer xconsumer.Profiles, + options ...ControllerOption, +) (xreceiver.Profiles, error) { + co := getOptions(options) + scrapers := make([]xscraper.Profiles, 0, len(co.factoriesWithConfig)) + for _, fwc := range co.factoriesWithConfig { + set := controller.GetSettings(fwc.f.Type(), rSet) + s, err := fwc.f.CreateProfiles(context.Background(), set, fwc.cfg) + if err != nil { + return nil, err + } + s, err = wrapObsProfiles(s, rSet.ID, set.ID, set.TelemetrySettings) + if err != nil { + return nil, err + } + scrapers = append(scrapers, s) + } + return controller.NewController[xscraper.Profiles]( + cfg, rSet, scrapers, func(c *controller.Controller[xscraper.Profiles]) { scrapeProfiles(c, nextConsumer) }, co.tickerCh) +} + +func getOptions(options []ControllerOption) controllerOptions { + co := controllerOptions{} + for _, op := range options { + op.apply(&co) + } + return co +} + +func scrapeProfiles(c *controller.Controller[xscraper.Profiles], nextConsumer xconsumer.Profiles) { + ctx, done := controller.WithScrapeContext(c.Timeout) + defer done() + + profiles := pprofile.NewProfiles() + for i := range c.Scrapers { + md, err := c.Scrapers[i].ScrapeProfiles(ctx) + if err != nil && !scrapererror.IsPartialScrapeError(err) { + continue + } + md.ResourceProfiles().MoveAndAppendTo(profiles.ResourceProfiles()) + } + + // TODO: Add proper receiver observability for profiles when receiverhelper supports it + // For now, we skip the obs report and just consume the profiles directly + _ = nextConsumer.ConsumeProfiles(ctx, profiles) +} diff --git a/scraper/scraperhelper/xscraperhelper/controller_test.go b/scraper/scraperhelper/xscraperhelper/controller_test.go new file mode 100644 index 00000000000..fbdd41a6b03 --- /dev/null +++ b/scraper/scraperhelper/xscraperhelper/controller_test.go @@ -0,0 +1,492 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package xscraperhelper provides utilities for scrapers. +package xscraperhelper // import "go.opentelemetry.io/collector/scraper/scraperhelper/xscraperhelper" + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + "go.uber.org/multierr" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pprofile" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/collector/scraper" + "go.opentelemetry.io/collector/scraper/scrapererror" + "go.opentelemetry.io/collector/scraper/scraperhelper" + "go.opentelemetry.io/collector/scraper/scraperhelper/internal/testhelper" + "go.opentelemetry.io/collector/scraper/scraperhelper/xscraperhelper/internal/metadatatest" + "go.opentelemetry.io/collector/scraper/xscraper" +) + +type testScrape struct { + ch chan int + timesScrapeCalled int + err error +} + +func (ts *testScrape) scrapeProfiles(context.Context) (pprofile.Profiles, error) { + ts.timesScrapeCalled++ + ts.ch <- ts.timesScrapeCalled + + if ts.err != nil { + return pprofile.Profiles{}, ts.err + } + + md := pprofile.NewProfiles() + profile := md.ResourceProfiles().AppendEmpty().ScopeProfiles().AppendEmpty().Profiles().AppendEmpty() + profile.Samples().AppendEmpty() + return md, nil +} + +func newTestNoDelaySettings() *scraperhelper.ControllerConfig { + return &scraperhelper.ControllerConfig{ + CollectionInterval: time.Second, + InitialDelay: 0, + } +} + +type scraperTestCase struct { + name string + + scrapers int + scraperControllerSettings *scraperhelper.ControllerConfig + scrapeErr error + expectScraped bool + + initialize bool + close bool + initializeErr error + closeErr error +} + +func TestProfilesScrapeController(t *testing.T) { + testCases := []scraperTestCase{ + { + name: "NoScrapers", + }, + { + name: "AddProfilesScrapersWithCollectionInterval", + scrapers: 2, + expectScraped: true, + }, + { + name: "AddProfilesScrapers_ScrapeError", + scrapers: 2, + scrapeErr: errors.New("err1"), + }, + { + name: "AddProfilesScrapersWithInitializeAndClose", + scrapers: 2, + initialize: true, + expectScraped: true, + close: true, + }, + { + name: "AddProfilesScrapersWithInitializeAndCloseErrors", + scrapers: 2, + initialize: true, + close: true, + initializeErr: errors.New("err1"), + closeErr: errors.New("err2"), + }, + } + + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + receiverID := component.MustNewID("receiver") + tel := componenttest.NewTelemetry() + t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) + + set := tel.NewTelemetrySettings() + _, parentSpan := set.TracerProvider.Tracer("test").Start(context.Background(), t.Name()) + defer parentSpan.End() + + initializeChs := make([]chan bool, test.scrapers) + scrapeProfilesChs := make([]chan int, test.scrapers) + closeChs := make([]chan bool, test.scrapers) + options := configureProfilesOptions(t, test, initializeChs, scrapeProfilesChs, closeChs) + + tickerCh := make(chan time.Time) + options = append(options, WithTickerChannel(tickerCh)) + + sink := new(consumertest.ProfilesSink) + cfg := newTestNoDelaySettings() + if test.scraperControllerSettings != nil { + cfg = test.scraperControllerSettings + } + + mr, err := NewProfilesController(cfg, receiver.Settings{ID: receiverID, TelemetrySettings: set, BuildInfo: component.NewDefaultBuildInfo()}, sink, options...) + require.NoError(t, err) + + err = mr.Start(context.Background(), componenttest.NewNopHost()) + expectedStartErr := getExpectedStartErr(test) + if expectedStartErr != nil { + assert.Equal(t, expectedStartErr, err) + } else if test.initialize { + testhelper.AssertChannelsCalled(t, initializeChs, "start was not called") + } + + const iterations = 5 + + if test.expectScraped || test.scrapeErr != nil { + // validate that scrape is called at least N times for each configured scraper + for _, ch := range scrapeProfilesChs { + <-ch + } + // Consume the initial scrapes on start + for range iterations { + tickerCh <- time.Now() + + for _, ch := range scrapeProfilesChs { + <-ch + } + } + + // wait until all calls to scrape have completed + if test.scrapeErr == nil { + require.Eventually(t, func() bool { + return sink.SampleCount() == (1+iterations)*(test.scrapers) + }, time.Second, time.Millisecond) + } + + if test.expectScraped { + assert.GreaterOrEqual(t, sink.SampleCount(), iterations) + } + + spans := tel.SpanRecorder.Ended() + testhelper.AssertScraperSpan(t, test.scrapeErr, spans, "scraper/scraper/ScrapeProfiles") + assertProfilesScraperObsMetrics(t, tel, receiverID, component.MustNewID("scraper"), test.scrapeErr, sink) + } + + err = mr.Shutdown(context.Background()) + expectedShutdownErr := getExpectedShutdownErr(test) + if expectedShutdownErr != nil { + assert.EqualError(t, err, expectedShutdownErr.Error()) + } else if test.close { + testhelper.AssertChannelsCalled(t, closeChs, "shutdown was not called") + } + }) + } +} + +func getExpectedStartErr(test scraperTestCase) error { + return test.initializeErr +} + +func getExpectedShutdownErr(test scraperTestCase) error { + var errs []error + + if test.closeErr != nil { + for i := 0; i < test.scrapers; i++ { + errs = append(errs, test.closeErr) + } + } + + return multierr.Combine(errs...) +} + +func configureProfilesOptions(t *testing.T, test scraperTestCase, initializeChs []chan bool, scrapeProfilesChs []chan int, closeChs []chan bool) []ControllerOption { + var profilesOptions []ControllerOption + + for i := 0; i < test.scrapers; i++ { + scrapeProfilesChs[i] = make(chan int) + ts := &testScrape{ch: scrapeProfilesChs[i], err: test.scrapeErr} + + var xscraperOptions []xscraper.Option + if test.initialize { + initializeChs[i] = make(chan bool, 1) + ti := testhelper.NewTestInitialize(initializeChs[i], test.initializeErr) + xscraperOptions = append(xscraperOptions, xscraper.WithStart(ti.Start)) + } + if test.close { + closeChs[i] = make(chan bool, 1) + tc := testhelper.NewTestClose(closeChs[i], test.closeErr) + xscraperOptions = append(xscraperOptions, xscraper.WithShutdown(tc.Shutdown)) + } + + scp, err := xscraper.NewProfiles(ts.scrapeProfiles, xscraperOptions...) + require.NoError(t, err) + + profilesOptions = append(profilesOptions, addProfilesScraper(component.MustNewType("scraper"), scp)) + } + + return profilesOptions +} + +func TestSingleProfilesScraperPerInterval(t *testing.T) { + scrapeCh := make(chan int, 10) + ts := &testScrape{ch: scrapeCh} + + cfg := newTestNoDelaySettings() + + tickerCh := make(chan time.Time) + + scp, err := xscraper.NewProfiles(ts.scrapeProfiles) + require.NoError(t, err) + + recv, err := NewProfilesController( + cfg, + receivertest.NewNopSettings(receivertest.NopType), + new(consumertest.ProfilesSink), + addProfilesScraper(component.MustNewType("scraper"), scp), + WithTickerChannel(tickerCh), + ) + require.NoError(t, err) + + require.NoError(t, recv.Start(context.Background(), componenttest.NewNopHost())) + defer func() { require.NoError(t, recv.Shutdown(context.Background())) }() + + tickerCh <- time.Now() + + assert.Eventually( + t, + func() bool { + return <-scrapeCh == 2 + }, + 300*time.Millisecond, + 100*time.Millisecond, + "Make sure the scraper channel is called twice", + ) + + select { + case <-scrapeCh: + assert.Fail(t, "Scrape was called more than twice") + case <-time.After(100 * time.Millisecond): + return + } +} + +func TestProfilesScraperControllerStartsOnInit(t *testing.T) { + t.Parallel() + + ts := &testScrape{ + ch: make(chan int, 1), + } + + scp, err := xscraper.NewProfiles(ts.scrapeProfiles) + require.NoError(t, err, "Must not error when creating scraper") + + r, err := NewProfilesController( + &scraperhelper.ControllerConfig{ + CollectionInterval: time.Hour, + InitialDelay: 0, + }, + receivertest.NewNopSettings(receivertest.NopType), + new(consumertest.ProfilesSink), + addProfilesScraper(component.MustNewType("scraper"), scp), + ) + require.NoError(t, err, "Must not error when creating scrape controller") + + assert.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()), "Must not error on start") + <-time.After(500 * time.Nanosecond) + require.NoError(t, r.Shutdown(context.Background()), "Must not have errored on shutdown") + assert.Equal(t, 1, ts.timesScrapeCalled, "Must have been called as soon as the controller started") +} + +func TestProfilesScraperControllerInitialDelay(t *testing.T) { + if testing.Short() { + t.Skip("This requires real time to pass, skipping") + return + } + + t.Parallel() + + var ( + elapsed = make(chan time.Time, 1) + cfg = scraperhelper.ControllerConfig{ + CollectionInterval: time.Second, + InitialDelay: 300 * time.Millisecond, + } + ) + + scp, err := xscraper.NewProfiles(func(context.Context) (pprofile.Profiles, error) { + elapsed <- time.Now() + return pprofile.NewProfiles(), nil + }) + require.NoError(t, err, "Must not error when creating scraper") + + r, err := NewProfilesController( + &cfg, + receivertest.NewNopSettings(receivertest.NopType), + new(consumertest.ProfilesSink), + addProfilesScraper(component.MustNewType("scraper"), scp), + ) + require.NoError(t, err, "Must not error when creating receiver") + + t0 := time.Now() + require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()), "Must not error when starting") + t1 := <-elapsed + + assert.GreaterOrEqual(t, t1.Sub(t0), 300*time.Millisecond, "Must have had 300ms pass as defined by initial delay") + + assert.NoError(t, r.Shutdown(context.Background()), "Must not error closing down") +} + +func TestProfilesScraperShutdownBeforeScrapeCanStart(t *testing.T) { + cfg := scraperhelper.ControllerConfig{ + CollectionInterval: time.Second, + InitialDelay: 5 * time.Second, + } + + scp, err := xscraper.NewProfiles(func(context.Context) (pprofile.Profiles, error) { + // make the scraper wait for long enough it would disrupt a shutdown. + time.Sleep(30 * time.Second) + return pprofile.NewProfiles(), nil + }) + require.NoError(t, err, "Must not error when creating scraper") + + r, err := NewProfilesController( + &cfg, + receivertest.NewNopSettings(receivertest.NopType), + new(consumertest.ProfilesSink), + addProfilesScraper(component.MustNewType("scraper"), scp), + ) + require.NoError(t, err, "Must not error when creating receiver") + require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) + shutdown := make(chan struct{}, 1) + go func() { + assert.NoError(t, r.Shutdown(context.Background())) + close(shutdown) + }() + timer := time.NewTicker(10 * time.Second) + select { + case <-timer.C: + require.Fail(t, "shutdown should not wait for scraping") + case <-shutdown: + } +} + +func assertProfilesScraperObsMetrics(t *testing.T, tel *componenttest.Telemetry, receiver, scraper component.ID, expectedErr error, sink *consumertest.ProfilesSink) { + sampleCounts := 0 + for _, md := range sink.AllProfiles() { + sampleCounts += md.SampleCount() + } + + expectedScraped := int64(sink.SampleCount()) + expectedErrored := int64(0) + if expectedErr != nil { + var partialError scrapererror.PartialScrapeError + if errors.As(expectedErr, &partialError) { + expectedErrored = int64(partialError.Failed) + } else { + expectedScraped = int64(0) + expectedErrored = int64(sink.SampleCount()) + } + } + + metadatatest.AssertEqualScraperScrapedProfileRecords(t, tel, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(receiverKey, receiver.String()), + attribute.String(scraperKey, scraper.String())), + Value: expectedScraped, + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + metadatatest.AssertEqualScraperErroredProfileRecords(t, tel, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(receiverKey, receiver.String()), + attribute.String(scraperKey, scraper.String())), + Value: expectedErrored, + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) +} + +func addProfilesScraper(t component.Type, sc xscraper.Profiles) ControllerOption { + f := xscraper.NewFactory(t, nil, + xscraper.WithProfiles(func(context.Context, scraper.Settings, component.Config) (xscraper.Profiles, error) { + return sc, nil + }, component.StabilityLevelDevelopment)) + return AddFactoryWithConfig(f, nil) +} + +// TestNewProfilesControllerCreateError tests that NewProfilesController returns an error +// when the scraper factory's CreateProfiles method fails. +func TestNewProfilesControllerCreateError(t *testing.T) { + expectedErr := errors.New("create profiles error") + f := xscraper.NewFactory(component.MustNewType("scraper"), nil, + xscraper.WithProfiles(func(context.Context, scraper.Settings, component.Config) (xscraper.Profiles, error) { + return nil, expectedErr + }, component.StabilityLevelDevelopment)) + + cfg := newTestNoDelaySettings() + _, err := NewProfilesController( + cfg, + receivertest.NewNopSettings(receivertest.NopType), + new(consumertest.ProfilesSink), + AddFactoryWithConfig(f, nil), + ) + + require.Error(t, err) + assert.Equal(t, expectedErr, err) +} + +// errorMeter is a meter that returns errors when creating instruments. +type errorMeter struct { + metric.Meter +} + +func (errorMeter) Int64Counter(string, ...metric.Int64CounterOption) (metric.Int64Counter, error) { + return nil, errors.New("counter creation error") +} + +// errorMeterProvider provides errorMeter instances. +type errorMeterProvider struct { + metric.MeterProvider +} + +func (errorMeterProvider) Meter(string, ...metric.MeterOption) metric.Meter { + return errorMeter{} +} + +// TestNewProfilesControllerTelemetryError tests that NewProfilesController returns an error +// when telemetry builder creation fails. +func TestNewProfilesControllerTelemetryError(t *testing.T) { + // Create a scraper that works + scp, err := xscraper.NewProfiles(func(context.Context) (pprofile.Profiles, error) { + return pprofile.NewProfiles(), nil + }) + require.NoError(t, err) + + f := xscraper.NewFactory(component.MustNewType("scraper"), nil, + xscraper.WithProfiles(func(context.Context, scraper.Settings, component.Config) (xscraper.Profiles, error) { + return scp, nil + }, component.StabilityLevelDevelopment)) + + // Create telemetry settings with a meter provider that fails + set := componenttest.NewNopTelemetrySettings() + set.MeterProvider = errorMeterProvider{} + + cfg := newTestNoDelaySettings() + _, err = NewProfilesController( + cfg, + receiver.Settings{ + ID: component.MustNewID("receiver"), + TelemetrySettings: set, + BuildInfo: component.NewDefaultBuildInfo(), + }, + new(consumertest.ProfilesSink), + AddFactoryWithConfig(f, nil), + ) + + // The error should be from wrapObsProfiles failing due to telemetry builder creation + require.Error(t, err) + assert.Contains(t, err.Error(), "counter creation error") +} diff --git a/scraper/scraperhelper/xscraperhelper/doc.go b/scraper/scraperhelper/xscraperhelper/doc.go new file mode 100644 index 00000000000..5f903305051 --- /dev/null +++ b/scraper/scraperhelper/xscraperhelper/doc.go @@ -0,0 +1,7 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:generate mdatagen metadata.yaml + +// Package xscraperhelper provides utilities for scrapers. +package xscraperhelper // import "go.opentelemetry.io/collector/scraper/scraperhelper/xscraperhelper" diff --git a/scraper/scraperhelper/xscraperhelper/documentation.md b/scraper/scraperhelper/xscraperhelper/documentation.md new file mode 100644 index 00000000000..994b48d5769 --- /dev/null +++ b/scraper/scraperhelper/xscraperhelper/documentation.md @@ -0,0 +1,23 @@ +[comment]: <> (Code generated by mdatagen. DO NOT EDIT.) + +# xscraperhelper + +## Internal Telemetry + +The following telemetry is emitted by this component. + +### otelcol_scraper_errored_profile_records + +Number of profile records that were unable to be scraped. [Alpha] + +| Unit | Metric Type | Value Type | Monotonic | Stability | +| ---- | ----------- | ---------- | --------- | --------- | +| {datapoints} | Sum | Int | true | Alpha | + +### otelcol_scraper_scraped_profile_records + +Number of profile records successfully scraped. [Alpha] + +| Unit | Metric Type | Value Type | Monotonic | Stability | +| ---- | ----------- | ---------- | --------- | --------- | +| {datapoints} | Sum | Int | true | Alpha | diff --git a/scraper/scraperhelper/xscraperhelper/generated_package_test.go b/scraper/scraperhelper/xscraperhelper/generated_package_test.go new file mode 100644 index 00000000000..18a0e9ad5d7 --- /dev/null +++ b/scraper/scraperhelper/xscraperhelper/generated_package_test.go @@ -0,0 +1,13 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package xscraperhelper + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/scraper/scraperhelper/xscraperhelper/go.mod b/scraper/scraperhelper/xscraperhelper/go.mod new file mode 100644 index 00000000000..33d326b9277 --- /dev/null +++ b/scraper/scraperhelper/xscraperhelper/go.mod @@ -0,0 +1,96 @@ +module go.opentelemetry.io/collector/scraper/scraperhelper/xscraperhelper + +go 1.24.0 + +require ( + github.com/stretchr/testify v1.11.1 + go.opentelemetry.io/collector/component v1.49.0 + go.opentelemetry.io/collector/component/componenttest v0.143.0 + go.opentelemetry.io/collector/consumer/consumertest v0.143.0 + go.opentelemetry.io/collector/consumer/xconsumer v0.143.0 + go.opentelemetry.io/collector/pdata/pprofile v0.143.0 + go.opentelemetry.io/collector/pdata/testdata v0.143.0 + go.opentelemetry.io/collector/pipeline/xpipeline v0.141.0 + go.opentelemetry.io/collector/receiver v1.49.0 + go.opentelemetry.io/collector/receiver/receivertest v0.143.0 + go.opentelemetry.io/collector/receiver/xreceiver v0.143.0 + go.opentelemetry.io/collector/scraper v0.143.0 + go.opentelemetry.io/collector/scraper/scraperhelper v0.141.0 + go.opentelemetry.io/collector/scraper/xscraper v0.0.0-20251128160438-7012862e3615 + go.opentelemetry.io/otel v1.39.0 + go.opentelemetry.io/otel/metric v1.39.0 + go.opentelemetry.io/otel/sdk/metric v1.39.0 + go.opentelemetry.io/otel/trace v1.39.0 + go.uber.org/goleak v1.3.0 + go.uber.org/multierr v1.11.0 + go.uber.org/zap v1.27.1 +) + +require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/hashicorp/go-version v1.8.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + go.opentelemetry.io/auto/sdk v1.2.1 // indirect + go.opentelemetry.io/collector/consumer v1.49.0 // indirect + go.opentelemetry.io/collector/consumer/consumererror v0.143.0 // indirect + go.opentelemetry.io/collector/featuregate v1.49.0 // indirect + go.opentelemetry.io/collector/internal/componentalias v0.0.0-00010101000000-000000000000 // indirect + go.opentelemetry.io/collector/pdata v1.49.0 // indirect + go.opentelemetry.io/collector/pipeline v1.49.0 // indirect + go.opentelemetry.io/collector/receiver/receiverhelper v0.143.0 // indirect + go.opentelemetry.io/otel/sdk v1.39.0 // indirect + golang.org/x/sys v0.39.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20251222181119-0a764e51fe1b // indirect + google.golang.org/grpc v1.78.0 // indirect + google.golang.org/protobuf v1.36.11 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +replace go.opentelemetry.io/collector/scraper/scraperhelper => .. + +replace go.opentelemetry.io/collector/internal/testutil => ../../../internal/testutil + +replace go.opentelemetry.io/collector/featuregate => ../../../featuregate + +replace go.opentelemetry.io/collector/receiver/receiverhelper => ../../../receiver/receiverhelper + +replace go.opentelemetry.io/collector/pdata => ../../../pdata + +replace go.opentelemetry.io/collector/scraper/xscraper => ../../xscraper + +replace go.opentelemetry.io/collector/scraper => ../.. + +replace go.opentelemetry.io/collector/receiver => ../../../receiver + +replace go.opentelemetry.io/collector/pdata/pprofile => ../../../pdata/pprofile + +replace go.opentelemetry.io/collector/pipeline => ../../../pipeline + +replace go.opentelemetry.io/collector/pdata/testdata => ../../../pdata/testdata + +replace go.opentelemetry.io/collector/receiver/xreceiver => ../../../receiver/xreceiver + +replace go.opentelemetry.io/collector/consumer/xconsumer => ../../../consumer/xconsumer + +replace go.opentelemetry.io/collector/consumer/consumertest => ../../../consumer/consumertest + +replace go.opentelemetry.io/collector/component/componenttest => ../../../component/componenttest + +replace go.opentelemetry.io/collector/consumer/consumererror => ../../../consumer/consumererror + +replace go.opentelemetry.io/collector/consumer => ../../../consumer + +replace go.opentelemetry.io/collector/component => ../../../component + +replace go.opentelemetry.io/collector/receiver/receivertest => ../../../receiver/receivertest + +replace go.opentelemetry.io/collector/pipeline/xpipeline => ../../../pipeline/xpipeline + +replace go.opentelemetry.io/collector/internal/componentalias => ../../../internal/componentalias diff --git a/scraper/scraperhelper/xscraperhelper/go.sum b/scraper/scraperhelper/xscraperhelper/go.sum new file mode 100644 index 00000000000..f88d8604788 --- /dev/null +++ b/scraper/scraperhelper/xscraperhelper/go.sum @@ -0,0 +1,80 @@ +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/go-version v1.8.0 h1:KAkNb1HAiZd1ukkxDFGmokVZe1Xy9HG6NUp+bPle2i4= +github.com/hashicorp/go-version v1.8.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee h1:W5t00kpgFdJifH4BDsTlE89Zl93FEloxaWZfGcifgq8= +github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48= +go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8= +go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0= +go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs= +go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18= +go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE= +go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8= +go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= +go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= +go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= +go.opentelemetry.io/proto/slim/otlp v1.9.0 h1:fPVMv8tP3TrsqlkH1HWYUpbCY9cAIemx184VGkS6vlE= +go.opentelemetry.io/proto/slim/otlp v1.9.0/go.mod h1:xXdeJJ90Gqyll+orzUkY4bOd2HECo5JofeoLpymVqdI= +go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.2.0 h1:o13nadWDNkH/quoDomDUClnQBpdQQ2Qqv0lQBjIXjE8= +go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.2.0/go.mod h1:Gyb6Xe7FTi/6xBHwMmngGoHqL0w29Y4eW8TGFzpefGA= +go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.2.0 h1:EiUYvtwu6PMrMHVjcPfnsG3v+ajPkbUeH+IL93+QYyk= +go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.2.0/go.mod h1:mUUHKFiN2SST3AhJ8XhJxEoeVW12oqfXog0Bo8W3Ec4= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc= +go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= +golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= +golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk= +golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM= +golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251222181119-0a764e51fe1b h1:Mv8VFug0MP9e5vUxfBcE3vUkV6CImK3cMNMIDFjmzxU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251222181119-0a764e51fe1b/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= +google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc= +google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/scraper/scraperhelper/xscraperhelper/internal/metadata/generated_telemetry.go b/scraper/scraperhelper/xscraperhelper/internal/metadata/generated_telemetry.go new file mode 100644 index 00000000000..71ec32a78ac --- /dev/null +++ b/scraper/scraperhelper/xscraperhelper/internal/metadata/generated_telemetry.go @@ -0,0 +1,75 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "errors" + "sync" + + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/collector/component" +) + +func Meter(settings component.TelemetrySettings) metric.Meter { + return settings.MeterProvider.Meter("go.opentelemetry.io/collector/scraper/scraperhelper/xscraperhelper") +} + +func Tracer(settings component.TelemetrySettings) trace.Tracer { + return settings.TracerProvider.Tracer("go.opentelemetry.io/collector/scraper/scraperhelper/xscraperhelper") +} + +// TelemetryBuilder provides an interface for components to report telemetry +// as defined in metadata and user config. +type TelemetryBuilder struct { + meter metric.Meter + mu sync.Mutex + registrations []metric.Registration + ScraperErroredProfileRecords metric.Int64Counter + ScraperScrapedProfileRecords metric.Int64Counter +} + +// TelemetryBuilderOption applies changes to default builder. +type TelemetryBuilderOption interface { + apply(*TelemetryBuilder) +} + +type telemetryBuilderOptionFunc func(mb *TelemetryBuilder) + +func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) { + tbof(mb) +} + +// Shutdown unregister all registered callbacks for async instruments. +func (builder *TelemetryBuilder) Shutdown() { + builder.mu.Lock() + defer builder.mu.Unlock() + for _, reg := range builder.registrations { + reg.Unregister() + } +} + +// NewTelemetryBuilder provides a struct with methods to update all internal telemetry +// for a component +func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) { + builder := TelemetryBuilder{} + for _, op := range options { + op.apply(&builder) + } + builder.meter = Meter(settings) + var err, errs error + builder.ScraperErroredProfileRecords, err = builder.meter.Int64Counter( + "otelcol_scraper_errored_profile_records", + metric.WithDescription("Number of profile records that were unable to be scraped. [Alpha]"), + metric.WithUnit("{datapoints}"), + ) + errs = errors.Join(errs, err) + builder.ScraperScrapedProfileRecords, err = builder.meter.Int64Counter( + "otelcol_scraper_scraped_profile_records", + metric.WithDescription("Number of profile records successfully scraped. [Alpha]"), + metric.WithUnit("{datapoints}"), + ) + errs = errors.Join(errs, err) + return &builder, errs +} diff --git a/scraper/scraperhelper/xscraperhelper/internal/metadata/generated_telemetry_test.go b/scraper/scraperhelper/xscraperhelper/internal/metadata/generated_telemetry_test.go new file mode 100644 index 00000000000..12b4a7b91c3 --- /dev/null +++ b/scraper/scraperhelper/xscraperhelper/internal/metadata/generated_telemetry_test.go @@ -0,0 +1,74 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/metric" + embeddedmetric "go.opentelemetry.io/otel/metric/embedded" + noopmetric "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/trace" + embeddedtrace "go.opentelemetry.io/otel/trace/embedded" + nooptrace "go.opentelemetry.io/otel/trace/noop" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" +) + +type mockMeter struct { + noopmetric.Meter + name string +} +type mockMeterProvider struct { + embeddedmetric.MeterProvider +} + +func (m mockMeterProvider) Meter(name string, opts ...metric.MeterOption) metric.Meter { + return mockMeter{name: name} +} + +type mockTracer struct { + nooptrace.Tracer + name string +} + +type mockTracerProvider struct { + embeddedtrace.TracerProvider +} + +func (m mockTracerProvider) Tracer(name string, opts ...trace.TracerOption) trace.Tracer { + return mockTracer{name: name} +} + +func TestProviders(t *testing.T) { + set := component.TelemetrySettings{ + MeterProvider: mockMeterProvider{}, + TracerProvider: mockTracerProvider{}, + } + + meter := Meter(set) + if m, ok := meter.(mockMeter); ok { + require.Equal(t, "go.opentelemetry.io/collector/scraper/scraperhelper/xscraperhelper", m.name) + } else { + require.Fail(t, "returned Meter not mockMeter") + } + + tracer := Tracer(set) + if m, ok := tracer.(mockTracer); ok { + require.Equal(t, "go.opentelemetry.io/collector/scraper/scraperhelper/xscraperhelper", m.name) + } else { + require.Fail(t, "returned Meter not mockTracer") + } +} + +func TestNewTelemetryBuilder(t *testing.T) { + set := componenttest.NewNopTelemetrySettings() + applied := false + _, err := NewTelemetryBuilder(set, telemetryBuilderOptionFunc(func(b *TelemetryBuilder) { + applied = true + })) + require.NoError(t, err) + require.True(t, applied) +} diff --git a/scraper/scraperhelper/xscraperhelper/internal/metadatatest/generated_telemetrytest.go b/scraper/scraperhelper/xscraperhelper/internal/metadatatest/generated_telemetrytest.go new file mode 100644 index 00000000000..55e46c5808f --- /dev/null +++ b/scraper/scraperhelper/xscraperhelper/internal/metadatatest/generated_telemetrytest.go @@ -0,0 +1,45 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadatatest + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + + "go.opentelemetry.io/collector/component/componenttest" +) + +func AssertEqualScraperErroredProfileRecords(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_scraper_errored_profile_records", + Description: "Number of profile records that were unable to be scraped. [Alpha]", + Unit: "{datapoints}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_scraper_errored_profile_records") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + +func AssertEqualScraperScrapedProfileRecords(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_scraper_scraped_profile_records", + Description: "Number of profile records successfully scraped. [Alpha]", + Unit: "{datapoints}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_scraper_scraped_profile_records") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} diff --git a/scraper/scraperhelper/xscraperhelper/internal/metadatatest/generated_telemetrytest_test.go b/scraper/scraperhelper/xscraperhelper/internal/metadatatest/generated_telemetrytest_test.go new file mode 100644 index 00000000000..811dc2363fa --- /dev/null +++ b/scraper/scraperhelper/xscraperhelper/internal/metadatatest/generated_telemetrytest_test.go @@ -0,0 +1,32 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadatatest + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/scraper/scraperhelper/xscraperhelper/internal/metadata" +) + +func TestSetupTelemetry(t *testing.T) { + testTel := componenttest.NewTelemetry() + tb, err := metadata.NewTelemetryBuilder(testTel.NewTelemetrySettings()) + require.NoError(t, err) + defer tb.Shutdown() + tb.ScraperErroredProfileRecords.Add(context.Background(), 1) + tb.ScraperScrapedProfileRecords.Add(context.Background(), 1) + AssertEqualScraperErroredProfileRecords(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + AssertEqualScraperScrapedProfileRecords(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + + require.NoError(t, testTel.Shutdown(context.Background())) +} diff --git a/scraper/scraperhelper/xscraperhelper/metadata.yaml b/scraper/scraperhelper/xscraperhelper/metadata.yaml new file mode 100644 index 00000000000..fd53acbbf49 --- /dev/null +++ b/scraper/scraperhelper/xscraperhelper/metadata.yaml @@ -0,0 +1,33 @@ +type: xscraperhelper +github_project: open-telemetry/opentelemetry-collector + +status: + disable_codecov_badge: true + class: pkg + codeowners: + active: + - florianl + stability: + development: [profiles] + +telemetry: + metrics: + scraper_errored_profile_records: + enabled: true + stability: + level: alpha + description: Number of profile records that were unable to be scraped. + unit: "{datapoints}" + sum: + value_type: int + monotonic: true + + scraper_scraped_profile_records: + enabled: true + stability: + level: alpha + description: Number of profile records successfully scraped. + unit: "{datapoints}" + sum: + value_type: int + monotonic: true diff --git a/scraper/scraperhelper/xscraperhelper/obs_profiles.go b/scraper/scraperhelper/xscraperhelper/obs_profiles.go new file mode 100644 index 00000000000..084600c24a9 --- /dev/null +++ b/scraper/scraperhelper/xscraperhelper/obs_profiles.go @@ -0,0 +1,85 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package xscraperhelper provides utilities for scrapers. +package xscraperhelper // import "go.opentelemetry.io/collector/scraper/scraperhelper/xscraperhelper" + +import ( + "context" + "errors" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/metric" + "go.uber.org/zap" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pprofile" + "go.opentelemetry.io/collector/pipeline/xpipeline" + "go.opentelemetry.io/collector/scraper/scrapererror" + "go.opentelemetry.io/collector/scraper/scraperhelper/xscraperhelper/internal/metadata" + "go.opentelemetry.io/collector/scraper/xscraper" +) + +const ( + + // scrapedProfileRecordsKey used to identify profile records scraped by the + // Collector. + scrapedProfileRecordsKey = "scraped_profile_records" + // erroredProfileRecordsKey used to identify profile records errored (i.e. + // unable to be scraped) by the Collector. + erroredProfileRecordsKey = "errored_profile_records" +) + +func wrapObsProfiles(sc xscraper.Profiles, receiverID, scraperID component.ID, set component.TelemetrySettings) (xscraper.Profiles, error) { + telemetryBuilder, errBuilder := metadata.NewTelemetryBuilder(set) + if errBuilder != nil { + return nil, errBuilder + } + + tracer := metadata.Tracer(set) + spanName := scraperKey + spanNameSep + scraperID.String() + spanNameSep + "ScrapeProfiles" + otelAttrs := metric.WithAttributeSet(attribute.NewSet( + attribute.String(receiverKey, receiverID.String()), + attribute.String(scraperKey, scraperID.String()), + )) + + scraperFuncs := func(ctx context.Context) (pprofile.Profiles, error) { + ctx, span := tracer.Start(ctx, spanName) + defer span.End() + + md, err := sc.ScrapeProfiles(ctx) + numScrapedProfiles := 0 + numErroredProfiles := 0 + if err != nil { + set.Logger.Error("Error scraping profiles", zap.Error(err)) + var partialErr scrapererror.PartialScrapeError + if errors.As(err, &partialErr) { + numErroredProfiles = partialErr.Failed + numScrapedProfiles = md.ProfileCount() + } + } else { + numScrapedProfiles = md.ProfileCount() + } + + telemetryBuilder.ScraperScrapedProfileRecords.Add(ctx, int64(numScrapedProfiles), otelAttrs) + telemetryBuilder.ScraperErroredProfileRecords.Add(ctx, int64(numErroredProfiles), otelAttrs) + + // end span according to errors + if span.IsRecording() { + span.SetAttributes( + attribute.String(formatKey, xpipeline.SignalProfiles.String()), + attribute.Int64(scrapedProfileRecordsKey, int64(numScrapedProfiles)), + attribute.Int64(erroredProfileRecordsKey, int64(numErroredProfiles)), + ) + + if err != nil { + span.SetStatus(codes.Error, err.Error()) + } + } + + return md, err + } + + return xscraper.NewProfiles(scraperFuncs, xscraper.WithStart(sc.Start), xscraper.WithShutdown(sc.Shutdown)) +} diff --git a/scraper/scraperhelper/xscraperhelper/obs_profiles_test.go b/scraper/scraperhelper/xscraperhelper/obs_profiles_test.go new file mode 100644 index 00000000000..bf90b3a875b --- /dev/null +++ b/scraper/scraperhelper/xscraperhelper/obs_profiles_test.go @@ -0,0 +1,135 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package xscraperhelper provides utilities for scrapers. +package xscraperhelper // import "go.opentelemetry.io/collector/scraper/scraperhelper/xscraperhelper" + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/pdata/pprofile" + "go.opentelemetry.io/collector/pdata/testdata" + "go.opentelemetry.io/collector/scraper/scrapererror" + "go.opentelemetry.io/collector/scraper/scraperhelper/xscraperhelper/internal/metadatatest" + "go.opentelemetry.io/collector/scraper/xscraper" +) + +var ( + receiverID = component.MustNewID("fakeReceiver") + scraperID = component.MustNewID("fakeScraper") + + errFake = errors.New("errFake") + partialErrFake = scrapererror.NewPartialScrapeError(errFake, 2) +) + +type testParams struct { + items int + err error +} + +func TestScrapeProfilesDataOp(t *testing.T) { + tel := componenttest.NewTelemetry() + t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) + + set := tel.NewTelemetrySettings() + + parentCtx, parentSpan := set.TracerProvider.Tracer("test").Start(context.Background(), t.Name()) + defer parentSpan.End() + + params := []testParams{ + {items: 23, err: partialErrFake}, + {items: 29, err: errFake}, + {items: 15, err: nil}, + } + for i := range params { + sm, err := xscraper.NewProfiles(func(context.Context) (pprofile.Profiles, error) { + return testdata.GenerateProfiles(params[i].items), params[i].err + }) + require.NoError(t, err) + sf, err := wrapObsProfiles(sm, receiverID, scraperID, set) + require.NoError(t, err) + _, err = sf.ScrapeProfiles(parentCtx) + require.ErrorIs(t, err, params[i].err) + } + + spans := tel.SpanRecorder.Ended() + require.Len(t, spans, len(params)) + + var scrapedProfileRecords, erroredProfileRecords int + for i, span := range spans { + assert.Equal(t, "scraper/"+scraperID.String()+"/ScrapeProfiles", span.Name()) + switch { + case params[i].err == nil: + scrapedProfileRecords += params[i].items + require.Contains(t, span.Attributes(), attribute.Int64(scrapedProfileRecordsKey, int64(params[i].items))) + require.Contains(t, span.Attributes(), attribute.Int64(erroredProfileRecordsKey, 0)) + assert.Equal(t, codes.Unset, span.Status().Code) + case errors.Is(params[i].err, errFake): + // Since we get an error, we cannot record any metrics because we don't know if the returned pprofile.Profiles is valid instance. + require.Contains(t, span.Attributes(), attribute.Int64(scrapedProfileRecordsKey, 0)) + require.Contains(t, span.Attributes(), attribute.Int64(erroredProfileRecordsKey, 0)) + assert.Equal(t, codes.Error, span.Status().Code) + assert.Equal(t, params[i].err.Error(), span.Status().Description) + case errors.Is(params[i].err, partialErrFake): + scrapedProfileRecords += params[i].items + erroredProfileRecords += 2 + require.Contains(t, span.Attributes(), attribute.Int64(scrapedProfileRecordsKey, int64(params[i].items))) + require.Contains(t, span.Attributes(), attribute.Int64(erroredProfileRecordsKey, 2)) + assert.Equal(t, codes.Error, span.Status().Code) + assert.Equal(t, params[i].err.Error(), span.Status().Description) + default: + t.Fatalf("unexpected err param: %v", params[i].err) + } + } + + checkScraperProfiles(t, tel, receiverID, scraperID, int64(scrapedProfileRecords), int64(erroredProfileRecords)) +} + +func TestCheckScraperProfiles(t *testing.T) { + tel := componenttest.NewTelemetry() + t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) + + sm, err := xscraper.NewProfiles(func(context.Context) (pprofile.Profiles, error) { + return testdata.GenerateProfiles(7), nil + }) + require.NoError(t, err) + sf, err := wrapObsProfiles(sm, receiverID, scraperID, tel.NewTelemetrySettings()) + require.NoError(t, err) + _, err = sf.ScrapeProfiles(context.Background()) + require.NoError(t, err) + + checkScraperProfiles(t, tel, receiverID, scraperID, 7, 0) +} + +func checkScraperProfiles(t *testing.T, tel *componenttest.Telemetry, receiver, scraper component.ID, scrapedProfileRecords, erroredProfileRecords int64) { + metadatatest.AssertEqualScraperScrapedProfileRecords(t, tel, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(receiverKey, receiver.String()), + attribute.String(scraperKey, scraper.String())), + Value: scrapedProfileRecords, + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + metadatatest.AssertEqualScraperErroredProfileRecords(t, tel, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(receiverKey, receiver.String()), + attribute.String(scraperKey, scraper.String())), + Value: erroredProfileRecords, + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) +} diff --git a/versions.yaml b/versions.yaml index 6be7234f3fc..3614c8c6c14 100644 --- a/versions.yaml +++ b/versions.yaml @@ -92,6 +92,7 @@ module-sets: - go.opentelemetry.io/collector/receiver/xreceiver - go.opentelemetry.io/collector/scraper - go.opentelemetry.io/collector/scraper/scraperhelper + - go.opentelemetry.io/collector/scraper/scraperhelper/xscraperhelper - go.opentelemetry.io/collector/scraper/scrapertest - go.opentelemetry.io/collector/scraper/xscraper - go.opentelemetry.io/collector/service