Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
2ed498e
[xscraperhelper] init package
florianl Dec 1, 2025
dd5d149
make generate-chloggen-components
florianl Dec 1, 2025
7f6d008
make fmt
florianl Dec 1, 2025
16f2999
add xscraperhelper to versions.yaml
florianl Dec 1, 2025
40c51ca
make crosslink
florianl Dec 1, 2025
36421de
add obs_profiles
florianl Dec 1, 2025
bd55e9f
add wrapObsProfiles to NewProfilesController
florianl Dec 1, 2025
36137ef
add controller test
florianl Dec 1, 2025
4bb9003
make gogenerate
florianl Dec 1, 2025
f41240f
make gotidy
florianl Dec 1, 2025
97086b3
update to v0.141.0
florianl Dec 1, 2025
db71309
Merge branch 'main' into xscraperhelper
florianl Dec 2, 2025
8dbd29b
Merge branch 'main' into xscraperhelper
florianl Dec 3, 2025
c658fe5
make gotidy
florianl Dec 3, 2025
5c93df1
replace SampleCount with ProfileCount
florianl Dec 3, 2025
df2a63d
Merge branch 'main' into xscraperhelper
florianl Dec 4, 2025
92bc761
deduplicate code
florianl Dec 4, 2025
8939632
add missing license header
florianl Dec 4, 2025
e4ef85c
fix package name
florianl Dec 4, 2025
bc85f34
fix impi
florianl Dec 4, 2025
e78a03f
Merge branch 'main' into xscraperhelper
florianl Dec 5, 2025
71b753d
move controller into internal package
florianl Dec 5, 2025
5604a50
make gotidy
florianl Dec 5, 2025
9f6ee96
Revert "make gotidy"
florianl Dec 8, 2025
dbeb55d
Revert "move controller into internal package"
florianl Dec 8, 2025
a5e5cb9
use scraperhelper in xscraperhelper
florianl Dec 9, 2025
502c220
Merge branch 'main' into xscraperhelper
florianl Dec 9, 2025
47da304
reintroduce internal/controller and type alias
florianl Dec 9, 2025
f22bb88
add wrapper for NewDefaultControllerConfig
florianl Dec 9, 2025
23daf85
Merge branch 'main' into xscraperhelper
florianl Dec 10, 2025
72f9f89
go mod tidy
florianl Dec 10, 2025
b09e9e3
reword references from Logs to Profiles
florianl Dec 10, 2025
a97b017
use xpipeline instead of pipeline
florianl Dec 10, 2025
bba0b18
move telemetry to xscraperhelper/metadata.yaml
florianl Dec 10, 2025
d5a096a
rephrace changelog
florianl Dec 10, 2025
101713b
make gotidy
florianl Dec 10, 2025
73067c4
make cross-link
florianl Dec 10, 2025
ff5e89f
make gofmt
florianl Dec 10, 2025
f257bc2
make gotidy
florianl Dec 10, 2025
4593e07
drop README
florianl Dec 10, 2025
9e1249b
move getSettings
florianl Dec 10, 2025
9554a6a
use lower letter
florianl Dec 10, 2025
0f1ef21
Merge branch 'main' into xscraperhelper
florianl Dec 11, 2025
d407484
Merge branch 'main' into xscraperhelper
florianl Dec 16, 2025
3844bbc
make gotidy
florianl Dec 16, 2025
769ce3c
Merge branch 'main' into xscraperhelper
florianl Jan 5, 2026
2cebb76
Merge branch 'main' into xscraperhelper
florianl Jan 6, 2026
197e1bf
go mod tidy
florianl Jan 6, 2026
87afe4c
increase test coverage
florianl Jan 6, 2026
22891d1
Merge branch 'main' into xscraperhelper
florianl Jan 7, 2026
67ae473
Merge branch 'main' into xscraperhelper
florianl Jan 8, 2026
5f888fd
Apply fix from https://github.com/open-telemetry/opentelemetry-collec…
florianl Jan 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .chloggen/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ components:
- pkg/xprocessor
- pkg/xreceiver
- pkg/xscraper
- pkg/xscraperhelper
- processor/batch
- processor/memory_limiter
- processor/sample
Expand Down
25 changes: 25 additions & 0 deletions .chloggen/xscraperhelper.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. receiver/otlp)
component: pkg/xscraperhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add xscraperhelper for the experimental OTel profiling signal.

# One or more tracking issues or pull requests related to the change
issues: [14235]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ receiver/xreceiver/ @open-telemetry/collector-approvers
scraper/ @open-telemetry/collector-approvers
scraper/xscraper @open-telemetry/collector-approvers
scraper/scraperhelper/ @open-telemetry/collector-approvers
scraper/scraperhelper/xscraperhelper @open-telemetry/collector-approvers
service/ @open-telemetry/collector-approvers
service/internal/graph/ @open-telemetry/collector-approvers

Expand Down
13 changes: 13 additions & 0 deletions .github/ISSUE_TEMPLATE/bug_report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ body:
- cmd/mdatagen/internal/sampleprocessor
- cmd/mdatagen/internal/samplereceiver
- cmd/mdatagen/internal/samplescraper
- config/configauth
Comment thread
florianl marked this conversation as resolved.
- config/configcompression
- config/configgrpc
- config/confighttp
- config/configmiddleware
- config/confignet
- config/configopaque
- config/configoptional
- config/configretry
- config/configtelemetry
- config/configtls
- confmap
- confmap/provider/envprovider
- confmap/provider/fileprovider
Expand Down Expand Up @@ -61,6 +72,8 @@ body:
- receiver/x
- scraper
- scraper/scraperhelper
- scraper/scraperhelper/xscraperhelper
- scraper/xscraper
- service
- service/internal/graph
# End components list
Expand Down
13 changes: 13 additions & 0 deletions .github/ISSUE_TEMPLATE/feature_request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@ body:
- cmd/mdatagen/internal/sampleprocessor
- cmd/mdatagen/internal/samplereceiver
- cmd/mdatagen/internal/samplescraper
- config/configauth
- config/configcompression
- config/configgrpc
- config/confighttp
- config/configmiddleware
- config/confignet
- config/configopaque
- config/configoptional
- config/configretry
- config/configtelemetry
- config/configtls
- confmap
- confmap/provider/envprovider
- confmap/provider/fileprovider
Expand Down Expand Up @@ -55,6 +66,8 @@ body:
- receiver/x
- scraper
- scraper/scraperhelper
- scraper/scraperhelper/xscraperhelper
- scraper/xscraper
- service
- service/internal/graph
# End components list
Expand Down
13 changes: 13 additions & 0 deletions .github/ISSUE_TEMPLATE/other.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@ body:
- cmd/mdatagen/internal/sampleprocessor
- cmd/mdatagen/internal/samplereceiver
- cmd/mdatagen/internal/samplescraper
- config/configauth
- config/configcompression
- config/configgrpc
- config/confighttp
- config/configmiddleware
- config/confignet
- config/configopaque
- config/configoptional
- config/configretry
- config/configtelemetry
- config/configtls
- confmap
- confmap/provider/envprovider
- confmap/provider/fileprovider
Expand Down Expand Up @@ -54,6 +65,8 @@ body:
- receiver/x
- scraper
- scraper/scraperhelper
- scraper/scraperhelper/xscraperhelper
- scraper/xscraper
- service
- service/internal/graph
# End components list
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/utils/cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@
"xprocessorhelper",
"xreceiver",
"xscraper",
"xscraperhelper",
"yamlmapprovider",
"yamlprovider",
"yamls",
Expand Down
172 changes: 27 additions & 145 deletions scraper/scraperhelper/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,26 @@ package scraperhelper // import "go.opentelemetry.io/collector/scraper/scraperhe

import (
"context"
"sync"
"time"

"go.uber.org/multierr"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.opentelemetry.io/collector/scraper"
"go.opentelemetry.io/collector/scraper/scrapererror"
"go.opentelemetry.io/collector/scraper/scraperhelper/internal/controller"
)

type ControllerConfig = controller.ControllerConfig

// NewDefaultControllerConfig returns default scraper controller
// settings with a collection interval of one minute.
func NewDefaultControllerConfig() ControllerConfig {
return controller.NewDefaultControllerConfig()
}

// ControllerOption apply changes to internal options.
type ControllerOption interface {
apply(*controllerOptions)
Expand Down Expand Up @@ -75,111 +80,6 @@ type controllerOptions struct {
factoriesWithConfig []factoryWithConfig
}

type controller[T component.Component] struct {
collectionInterval time.Duration
initialDelay time.Duration
timeout time.Duration

scrapers []T
scrapeFunc func(*controller[T])
tickerCh <-chan time.Time

done chan struct{}
wg sync.WaitGroup

obsrecv *receiverhelper.ObsReport
}

func newController[T component.Component](
cfg *ControllerConfig,
rSet receiver.Settings,
scrapers []T,
scrapeFunc func(*controller[T]),
tickerCh <-chan time.Time,
) (*controller[T], error) {
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: rSet.ID,
Transport: "",
ReceiverCreateSettings: rSet,
})
if err != nil {
return nil, err
}

cs := &controller[T]{
collectionInterval: cfg.CollectionInterval,
initialDelay: cfg.InitialDelay,
timeout: cfg.Timeout,
scrapers: scrapers,
scrapeFunc: scrapeFunc,
done: make(chan struct{}),
tickerCh: tickerCh,
obsrecv: obsrecv,
}

return cs, nil
}

// Start the receiver, invoked during service start.
func (sc *controller[T]) Start(ctx context.Context, host component.Host) error {
for _, scrp := range sc.scrapers {
if err := scrp.Start(ctx, host); err != nil {
return err
}
}

sc.startScraping()
return nil
}

// Shutdown the receiver, invoked during service shutdown.
func (sc *controller[T]) Shutdown(ctx context.Context) error {
// Signal the goroutine to stop.
close(sc.done)
sc.wg.Wait()
var errs error
for _, scrp := range sc.scrapers {
errs = multierr.Append(errs, scrp.Shutdown(ctx))
}

return errs
}

// startScraping initiates a ticker that calls Scrape based on the configured
// collection interval.
func (sc *controller[T]) startScraping() {
sc.wg.Add(1)
go func() {
defer sc.wg.Done()
if sc.initialDelay > 0 {
select {
case <-time.After(sc.initialDelay):
case <-sc.done:
return
}
}

if sc.tickerCh == nil {
ticker := time.NewTicker(sc.collectionInterval)
defer ticker.Stop()

sc.tickerCh = ticker.C
}
// Call scrape method during initialization to ensure
// that scrapers start from when the component starts
// instead of waiting for the full duration to start.
sc.scrapeFunc(sc)
for {
select {
case <-sc.tickerCh:
sc.scrapeFunc(sc)
case <-sc.done:
return
}
}
}()
}

// NewLogsController creates a receiver.Logs with the configured options, that can control multiple scraper.Logs.
func NewLogsController(cfg *ControllerConfig,
rSet receiver.Settings,
Expand All @@ -189,7 +89,7 @@ func NewLogsController(cfg *ControllerConfig,
co := getOptions(options)
scrapers := make([]scraper.Logs, 0, len(co.factoriesWithConfig))
for _, fwc := range co.factoriesWithConfig {
set := getSettings(fwc.f.Type(), rSet)
set := controller.GetSettings(fwc.f.Type(), rSet)
s, err := fwc.f.CreateLogs(context.Background(), set, fwc.cfg)
if err != nil {
return nil, err
Expand All @@ -200,8 +100,8 @@ func NewLogsController(cfg *ControllerConfig,
}
scrapers = append(scrapers, s)
}
return newController[scraper.Logs](
cfg, rSet, scrapers, func(c *controller[scraper.Logs]) { scrapeLogs(c, nextConsumer) }, co.tickerCh)
return controller.NewController[scraper.Logs](
cfg, rSet, scrapers, func(c *controller.Controller[scraper.Logs]) { scrapeLogs(c, nextConsumer) }, co.tickerCh)
}

// NewMetricsController creates a receiver.Metrics with the configured options, that can control multiple scraper.Metrics.
Expand All @@ -213,7 +113,7 @@ func NewMetricsController(cfg *ControllerConfig,
co := getOptions(options)
scrapers := make([]scraper.Metrics, 0, len(co.factoriesWithConfig))
for _, fwc := range co.factoriesWithConfig {
set := getSettings(fwc.f.Type(), rSet)
set := controller.GetSettings(fwc.f.Type(), rSet)
s, err := fwc.f.CreateMetrics(context.Background(), set, fwc.cfg)
if err != nil {
return nil, err
Expand All @@ -224,46 +124,46 @@ func NewMetricsController(cfg *ControllerConfig,
}
scrapers = append(scrapers, s)
}
return newController[scraper.Metrics](
cfg, rSet, scrapers, func(c *controller[scraper.Metrics]) { scrapeMetrics(c, nextConsumer) }, co.tickerCh)
return controller.NewController[scraper.Metrics](
cfg, rSet, scrapers, func(c *controller.Controller[scraper.Metrics]) { scrapeMetrics(c, nextConsumer) }, co.tickerCh)
}

func scrapeLogs(c *controller[scraper.Logs], nextConsumer consumer.Logs) {
ctx, done := withScrapeContext(c.timeout)
func scrapeLogs(c *controller.Controller[scraper.Logs], nextConsumer consumer.Logs) {
ctx, done := controller.WithScrapeContext(c.Timeout)
defer done()

logs := plog.NewLogs()
for i := range c.scrapers {
md, err := c.scrapers[i].ScrapeLogs(ctx)
for i := range c.Scrapers {
md, err := c.Scrapers[i].ScrapeLogs(ctx)
if err != nil && !scrapererror.IsPartialScrapeError(err) {
continue
}
md.ResourceLogs().MoveAndAppendTo(logs.ResourceLogs())
}

logRecordCount := logs.LogRecordCount()
ctx = c.obsrecv.StartMetricsOp(ctx)
ctx = c.Obsrecv.StartMetricsOp(ctx)
err := nextConsumer.ConsumeLogs(ctx, logs)
c.obsrecv.EndMetricsOp(ctx, "", logRecordCount, err)
c.Obsrecv.EndMetricsOp(ctx, "", logRecordCount, err)
}

func scrapeMetrics(c *controller[scraper.Metrics], nextConsumer consumer.Metrics) {
ctx, done := withScrapeContext(c.timeout)
func scrapeMetrics(c *controller.Controller[scraper.Metrics], nextConsumer consumer.Metrics) {
ctx, done := controller.WithScrapeContext(c.Timeout)
defer done()

metrics := pmetric.NewMetrics()
for i := range c.scrapers {
md, err := c.scrapers[i].ScrapeMetrics(ctx)
for i := range c.Scrapers {
md, err := c.Scrapers[i].ScrapeMetrics(ctx)
if err != nil && !scrapererror.IsPartialScrapeError(err) {
continue
}
md.ResourceMetrics().MoveAndAppendTo(metrics.ResourceMetrics())
}

dataPointCount := metrics.DataPointCount()
ctx = c.obsrecv.StartMetricsOp(ctx)
ctx = c.Obsrecv.StartMetricsOp(ctx)
err := nextConsumer.ConsumeMetrics(ctx, metrics)
c.obsrecv.EndMetricsOp(ctx, "", dataPointCount, err)
c.Obsrecv.EndMetricsOp(ctx, "", dataPointCount, err)
}

func getOptions(options []ControllerOption) controllerOptions {
Expand All @@ -273,21 +173,3 @@ func getOptions(options []ControllerOption) controllerOptions {
}
return co
}

func getSettings(sType component.Type, rSet receiver.Settings) scraper.Settings {
return scraper.Settings{
ID: component.NewID(sType),
TelemetrySettings: rSet.TelemetrySettings,
BuildInfo: rSet.BuildInfo,
}
}

// withScrapeContext will return a context that has no deadline if timeout is 0
// which implies no explicit timeout had occurred, otherwise, a context
// with a deadline of the provided timeout is returned.
func withScrapeContext(timeout time.Duration) (context.Context, context.CancelFunc) {
if timeout == 0 {
return context.WithCancel(context.Background())
}
return context.WithTimeout(context.Background(), timeout)
}
Loading
Loading