diff --git a/.chloggen/add-stored-procedure-columns-to-events.yaml b/.chloggen/add-stored-procedure-columns-to-events.yaml index b24371d6c0c98..f643b6d439db2 100644 --- a/.chloggen/add-stored-procedure-columns-to-events.yaml +++ b/.chloggen/add-stored-procedure-columns-to-events.yaml @@ -16,7 +16,7 @@ issues: [44656] # These lines will be padded with 2 spaces and then inserted directly into the document. # Use pipe (|) for multiline entries. subtext: Refined query and reported events to include stored procedure information when applicable. - Expanded default samples count from 200 to 250 to account for teh addition of stored procedure events. + Additionally, the maximum number of active queries reported by default has been increased from 200 to 250 to account for record deaggregation introduced by this change, ensuring the effective limit remains consistent with the previous 200-query baseline. # If your change doesn't affect end users or the exported elements of any package, # you should instead start your pull request title with [chore] or use the "Skip Changelog" label. diff --git a/.chloggen/issue-39491.yaml b/.chloggen/issue-39491.yaml new file mode 100644 index 0000000000000..e4938d310f433 --- /dev/null +++ b/.chloggen/issue-39491.yaml @@ -0,0 +1,29 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: receiver/filelog + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Suppress repeated permission-denied errors + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [39491] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Only one error is logged per file per process run, and an informational message is emitted when the file becomes readable again. + This reduces log spam and improves clarity for operators. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/.chloggen/vengal-reduce-allocs-in-routing-connector.yaml b/.chloggen/vengal-reduce-allocs-in-routing-connector.yaml new file mode 100644 index 0000000000000..f0be23b14ab06 --- /dev/null +++ b/.chloggen/vengal-reduce-allocs-in-routing-connector.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: connector/routing + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Update existing util functions to reduce allocs. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [45061] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/.github/workflows/update-otel.yaml b/.github/workflows/update-otel.yaml index 956a363befd3d..b2f26e89b6ceb 100644 --- a/.github/workflows/update-otel.yaml +++ b/.github/workflows/update-otel.yaml @@ -23,6 +23,11 @@ jobs: with: path: opentelemetry-collector repository: open-telemetry/opentelemetry-collector + - uses: actions/create-github-app-token@29824e69f54612133e76f7eaac726eef6c875baf # v2.2.1 + id: otelbot-token + with: + app-id: ${{ vars.OTELBOT_APP_ID }} + private-key: ${{ secrets.OTELBOT_PRIVATE_KEY }} - name: Prepare to update dependencies run: | exec > >(tee log.out) 2>&1 @@ -40,6 +45,8 @@ jobs: echo "LAST_COMMIT=$LAST_COMMIT" echo "BRANCH_NAME=$branch" } >> "$GITHUB_ENV" + env: + GH_TOKEN: ${{ steps.otelbot-token.outputs.token }} - name: Gets packages from links with retries uses: nick-fields/retry@ce71cc2ab81d554ebbe88c79ab5975992d79ba08 # v3.0.2 with: @@ -51,11 +58,6 @@ jobs: exec > >(tee -a log.out) 2>&1 cd opentelemetry-collector-contrib make update-otel OTEL_STABLE_VERSION=${{ env.LAST_COMMIT }} OTEL_VERSION=${{ env.LAST_COMMIT }} - - uses: actions/create-github-app-token@29824e69f54612133e76f7eaac726eef6c875baf # v2.2.1 - id: otelbot-token - with: - app-id: ${{ vars.OTELBOT_APP_ID }} - private-key: ${{ secrets.OTELBOT_PRIVATE_KEY }} - name: Push and create PR run: | exec > >(tee -a log.out) 2>&1 diff --git a/connector/routingconnector/internal/pdatautil/utils.go b/connector/routingconnector/internal/pdatautil/utils.go new file mode 100644 index 0000000000000..8efdb1e116ad8 --- /dev/null +++ b/connector/routingconnector/internal/pdatautil/utils.go @@ -0,0 +1,23 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package pdatautil // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector/internal/pdatautil" + +// OnceValue to set a given value only once. +type OnceValue[K any] struct { + val K + isInit bool +} + +func (ov *OnceValue[K]) IsInit() bool { + return ov.isInit +} + +func (ov *OnceValue[K]) Init(val K) { + ov.isInit = true + ov.val = val +} + +func (ov *OnceValue[K]) Value() K { + return ov.val +} diff --git a/connector/routingconnector/internal/plogutil/logs.go b/connector/routingconnector/internal/plogutil/logs.go index dec69abf958c4..9bafb35a7c383 100644 --- a/connector/routingconnector/internal/plogutil/logs.go +++ b/connector/routingconnector/internal/plogutil/logs.go @@ -5,6 +5,8 @@ package plogutil // import "github.com/open-telemetry/opentelemetry-collector-co import ( "go.opentelemetry.io/collector/pdata/plog" + + "github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector/internal/pdatautil" ) // MoveResourcesIf calls f sequentially for each ResourceLogs present in the first plog.Logs. @@ -27,27 +29,25 @@ func MoveRecordsWithContextIf(from, to plog.Logs, f func(plog.ResourceLogs, plog rls := from.ResourceLogs() rls.RemoveIf(func(rl plog.ResourceLogs) bool { sls := rl.ScopeLogs() - var rlCopy *plog.ResourceLogs + var rlCopy pdatautil.OnceValue[plog.ResourceLogs] sls.RemoveIf(func(sl plog.ScopeLogs) bool { lrs := sl.LogRecords() - var slCopy *plog.ScopeLogs + var slCopy pdatautil.OnceValue[plog.ScopeLogs] lrs.RemoveIf(func(lr plog.LogRecord) bool { if !f(rl, sl, lr) { return false } - if rlCopy == nil { - rlc := to.ResourceLogs().AppendEmpty() - rlCopy = &rlc - rl.Resource().CopyTo(rlCopy.Resource()) - rlCopy.SetSchemaUrl(rl.SchemaUrl()) + if !rlCopy.IsInit() { + rlCopy.Init(to.ResourceLogs().AppendEmpty()) + rl.Resource().CopyTo(rlCopy.Value().Resource()) + rlCopy.Value().SetSchemaUrl(rl.SchemaUrl()) } - if slCopy == nil { - slc := rlCopy.ScopeLogs().AppendEmpty() - slCopy = &slc - sl.Scope().CopyTo(slCopy.Scope()) - slCopy.SetSchemaUrl(sl.SchemaUrl()) + if !slCopy.IsInit() { + slCopy.Init(rlCopy.Value().ScopeLogs().AppendEmpty()) + sl.Scope().CopyTo(slCopy.Value().Scope()) + slCopy.Value().SetSchemaUrl(sl.SchemaUrl()) } - lr.MoveTo(slCopy.LogRecords().AppendEmpty()) + lr.MoveTo(slCopy.Value().LogRecords().AppendEmpty()) return true }) return sl.LogRecords().Len() == 0 diff --git a/connector/routingconnector/internal/pmetricutil/metrics.go b/connector/routingconnector/internal/pmetricutil/metrics.go index b858e7113c970..61c60cf075000 100644 --- a/connector/routingconnector/internal/pmetricutil/metrics.go +++ b/connector/routingconnector/internal/pmetricutil/metrics.go @@ -3,7 +3,11 @@ package pmetricutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector/internal/pmetricutil" -import "go.opentelemetry.io/collector/pdata/pmetric" +import ( + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector/internal/pdatautil" +) // MoveResourcesIf calls f sequentially for each ResourceSpans present in the first pmetric.Metrics. // If f returns true, the element is removed from the first pmetric.Metrics and added to the second pmetric.Metrics. @@ -25,21 +29,21 @@ func MoveMetricsWithContextIf(from, to pmetric.Metrics, f func(pmetric.ResourceM rms := from.ResourceMetrics() rms.RemoveIf(func(rm pmetric.ResourceMetrics) bool { sms := rm.ScopeMetrics() - var rmCopy *pmetric.ResourceMetrics + var rmCopy pdatautil.OnceValue[pmetric.ResourceMetrics] sms.RemoveIf(func(sm pmetric.ScopeMetrics) bool { ms := sm.Metrics() - var smCopy *pmetric.ScopeMetrics + var smCopy pdatautil.OnceValue[pmetric.ScopeMetrics] ms.RemoveIf(func(m pmetric.Metric) bool { if !f(rm, sm, m) { return false } - if rmCopy == nil { - rmCopy = copyResourceMetrics(rm, to.ResourceMetrics()) + if !rmCopy.IsInit() { + rmCopy.Init(copyResourceMetrics(rm, to.ResourceMetrics())) } - if smCopy == nil { - smCopy = copyScopeMetrics(sm, rmCopy.ScopeMetrics()) + if !smCopy.IsInit() { + smCopy.Init(copyScopeMetrics(sm, rmCopy.Value().ScopeMetrics())) } - m.MoveTo(smCopy.Metrics().AppendEmpty()) + m.MoveTo(smCopy.Value().Metrics().AppendEmpty()) return true }) return sm.Metrics().Len() == 0 @@ -56,12 +60,12 @@ func MoveDataPointsWithContextIf(from, to pmetric.Metrics, f func(pmetric.Resour rms := from.ResourceMetrics() rms.RemoveIf(func(rm pmetric.ResourceMetrics) bool { sms := rm.ScopeMetrics() - var rmCopy *pmetric.ResourceMetrics + var rmCopy pdatautil.OnceValue[pmetric.ResourceMetrics] sms.RemoveIf(func(sm pmetric.ScopeMetrics) bool { ms := sm.Metrics() - var smCopy *pmetric.ScopeMetrics + var smCopy pdatautil.OnceValue[pmetric.ScopeMetrics] ms.RemoveIf(func(m pmetric.Metric) bool { - var mCopy *pmetric.Metric + var mCopy pdatautil.OnceValue[pmetric.Metric] // TODO condense this code switch m.Type() { @@ -71,17 +75,17 @@ func MoveDataPointsWithContextIf(from, to pmetric.Metrics, f func(pmetric.Resour if !f(rm, sm, m, dp) { return false } - if rmCopy == nil { - rmCopy = copyResourceMetrics(rm, to.ResourceMetrics()) + if !rmCopy.IsInit() { + rmCopy.Init(copyResourceMetrics(rm, to.ResourceMetrics())) } - if smCopy == nil { - smCopy = copyScopeMetrics(sm, rmCopy.ScopeMetrics()) + if !smCopy.IsInit() { + smCopy.Init(copyScopeMetrics(sm, rmCopy.Value().ScopeMetrics())) } - if mCopy == nil { - mCopy = copyMetricDescription(m, smCopy.Metrics()) - mCopy.SetEmptyGauge() + if !mCopy.IsInit() { + mCopy.Init(copyMetricDescription(m, smCopy.Value().Metrics())) + mCopy.Value().SetEmptyGauge() } - dp.MoveTo(mCopy.Gauge().DataPoints().AppendEmpty()) + dp.MoveTo(mCopy.Value().Gauge().DataPoints().AppendEmpty()) return true }) return dps.Len() == 0 @@ -91,18 +95,18 @@ func MoveDataPointsWithContextIf(from, to pmetric.Metrics, f func(pmetric.Resour if !f(rm, sm, m, dp) { return false } - if rmCopy == nil { - rmCopy = copyResourceMetrics(rm, to.ResourceMetrics()) + if !rmCopy.IsInit() { + rmCopy.Init(copyResourceMetrics(rm, to.ResourceMetrics())) } - if smCopy == nil { - smCopy = copyScopeMetrics(sm, rmCopy.ScopeMetrics()) + if !smCopy.IsInit() { + smCopy.Init(copyScopeMetrics(sm, rmCopy.Value().ScopeMetrics())) } - if mCopy == nil { - mCopy = copyMetricDescription(m, smCopy.Metrics()) - mCopy.SetEmptySum().SetAggregationTemporality(m.Sum().AggregationTemporality()) - mCopy.Sum().SetIsMonotonic(m.Sum().IsMonotonic()) + if !mCopy.IsInit() { + mCopy.Init(copyMetricDescription(m, smCopy.Value().Metrics())) + mCopy.Value().SetEmptySum().SetAggregationTemporality(m.Sum().AggregationTemporality()) + mCopy.Value().Sum().SetIsMonotonic(m.Sum().IsMonotonic()) } - dp.MoveTo(mCopy.Sum().DataPoints().AppendEmpty()) + dp.MoveTo(mCopy.Value().Sum().DataPoints().AppendEmpty()) return true }) return dps.Len() == 0 @@ -112,17 +116,17 @@ func MoveDataPointsWithContextIf(from, to pmetric.Metrics, f func(pmetric.Resour if !f(rm, sm, m, dp) { return false } - if rmCopy == nil { - rmCopy = copyResourceMetrics(rm, to.ResourceMetrics()) + if !rmCopy.IsInit() { + rmCopy.Init(copyResourceMetrics(rm, to.ResourceMetrics())) } - if smCopy == nil { - smCopy = copyScopeMetrics(sm, rmCopy.ScopeMetrics()) + if !smCopy.IsInit() { + smCopy.Init(copyScopeMetrics(sm, rmCopy.Value().ScopeMetrics())) } - if mCopy == nil { - mCopy = copyMetricDescription(m, smCopy.Metrics()) - mCopy.SetEmptyHistogram().SetAggregationTemporality(m.Histogram().AggregationTemporality()) + if !mCopy.IsInit() { + mCopy.Init(copyMetricDescription(m, smCopy.Value().Metrics())) + mCopy.Value().SetEmptyHistogram().SetAggregationTemporality(m.Histogram().AggregationTemporality()) } - dp.MoveTo(mCopy.Histogram().DataPoints().AppendEmpty()) + dp.MoveTo(mCopy.Value().Histogram().DataPoints().AppendEmpty()) return true }) return dps.Len() == 0 @@ -132,17 +136,17 @@ func MoveDataPointsWithContextIf(from, to pmetric.Metrics, f func(pmetric.Resour if !f(rm, sm, m, dp) { return false } - if rmCopy == nil { - rmCopy = copyResourceMetrics(rm, to.ResourceMetrics()) + if !rmCopy.IsInit() { + rmCopy.Init(copyResourceMetrics(rm, to.ResourceMetrics())) } - if smCopy == nil { - smCopy = copyScopeMetrics(sm, rmCopy.ScopeMetrics()) + if !smCopy.IsInit() { + smCopy.Init(copyScopeMetrics(sm, rmCopy.Value().ScopeMetrics())) } - if mCopy == nil { - mCopy = copyMetricDescription(m, smCopy.Metrics()) - mCopy.SetEmptyExponentialHistogram().SetAggregationTemporality(m.ExponentialHistogram().AggregationTemporality()) + if !mCopy.IsInit() { + mCopy.Init(copyMetricDescription(m, smCopy.Value().Metrics())) + mCopy.Value().SetEmptyExponentialHistogram().SetAggregationTemporality(m.ExponentialHistogram().AggregationTemporality()) } - dp.MoveTo(mCopy.ExponentialHistogram().DataPoints().AppendEmpty()) + dp.MoveTo(mCopy.Value().ExponentialHistogram().DataPoints().AppendEmpty()) return true }) return dps.Len() == 0 @@ -152,17 +156,17 @@ func MoveDataPointsWithContextIf(from, to pmetric.Metrics, f func(pmetric.Resour if !f(rm, sm, m, dp) { return false } - if rmCopy == nil { - rmCopy = copyResourceMetrics(rm, to.ResourceMetrics()) + if !rmCopy.IsInit() { + rmCopy.Init(copyResourceMetrics(rm, to.ResourceMetrics())) } - if smCopy == nil { - smCopy = copyScopeMetrics(sm, rmCopy.ScopeMetrics()) + if !smCopy.IsInit() { + smCopy.Init(copyScopeMetrics(sm, rmCopy.Value().ScopeMetrics())) } - if mCopy == nil { - mCopy = copyMetricDescription(m, smCopy.Metrics()) - mCopy.SetEmptySummary() + if !mCopy.IsInit() { + mCopy.Init(copyMetricDescription(m, smCopy.Value().Metrics())) + mCopy.Value().SetEmptySummary() } - dp.MoveTo(mCopy.Summary().DataPoints().AppendEmpty()) + dp.MoveTo(mCopy.Value().Summary().DataPoints().AppendEmpty()) return true }) return dps.Len() == 0 @@ -176,24 +180,24 @@ func MoveDataPointsWithContextIf(from, to pmetric.Metrics, f func(pmetric.Resour }) } -func copyResourceMetrics(from pmetric.ResourceMetrics, to pmetric.ResourceMetricsSlice) *pmetric.ResourceMetrics { +func copyResourceMetrics(from pmetric.ResourceMetrics, to pmetric.ResourceMetricsSlice) pmetric.ResourceMetrics { rmc := to.AppendEmpty() from.Resource().CopyTo(rmc.Resource()) rmc.SetSchemaUrl(from.SchemaUrl()) - return &rmc + return rmc } -func copyScopeMetrics(from pmetric.ScopeMetrics, to pmetric.ScopeMetricsSlice) *pmetric.ScopeMetrics { +func copyScopeMetrics(from pmetric.ScopeMetrics, to pmetric.ScopeMetricsSlice) pmetric.ScopeMetrics { smc := to.AppendEmpty() from.Scope().CopyTo(smc.Scope()) smc.SetSchemaUrl(from.SchemaUrl()) - return &smc + return smc } -func copyMetricDescription(from pmetric.Metric, to pmetric.MetricSlice) *pmetric.Metric { +func copyMetricDescription(from pmetric.Metric, to pmetric.MetricSlice) pmetric.Metric { mc := to.AppendEmpty() mc.SetName(from.Name()) mc.SetDescription(from.Description()) mc.SetUnit(from.Unit()) - return &mc + return mc } diff --git a/connector/routingconnector/internal/ptraceutil/traces.go b/connector/routingconnector/internal/ptraceutil/traces.go index 140c946f54708..25ada879a8c7e 100644 --- a/connector/routingconnector/internal/ptraceutil/traces.go +++ b/connector/routingconnector/internal/ptraceutil/traces.go @@ -3,7 +3,11 @@ package ptraceutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector/internal/ptraceutil" -import "go.opentelemetry.io/collector/pdata/ptrace" +import ( + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector/internal/pdatautil" +) // MoveResourcesIf calls f sequentially for each ResourceSpans present in the first ptrace.Traces. // If f returns true, the element is removed from the first ptrace.Traces and added to the second ptrace.Traces. @@ -25,27 +29,25 @@ func MoveSpansWithContextIf(from, to ptrace.Traces, f func(ptrace.ResourceSpans, resourceSpansSlice := from.ResourceSpans() resourceSpansSlice.RemoveIf(func(rs ptrace.ResourceSpans) bool { scopeSpanSlice := rs.ScopeSpans() - var resourceSpansCopy *ptrace.ResourceSpans + var resourceSpansCopy pdatautil.OnceValue[ptrace.ResourceSpans] scopeSpanSlice.RemoveIf(func(ss ptrace.ScopeSpans) bool { spanSlice := ss.Spans() - var scopeSpansCopy *ptrace.ScopeSpans + var scopeSpansCopy pdatautil.OnceValue[ptrace.ScopeSpans] spanSlice.RemoveIf(func(span ptrace.Span) bool { if !f(rs, ss, span) { return false } - if resourceSpansCopy == nil { - rmc := to.ResourceSpans().AppendEmpty() - resourceSpansCopy = &rmc - rs.Resource().CopyTo(resourceSpansCopy.Resource()) - resourceSpansCopy.SetSchemaUrl(rs.SchemaUrl()) + if !resourceSpansCopy.IsInit() { + resourceSpansCopy.Init(to.ResourceSpans().AppendEmpty()) + rs.Resource().CopyTo(resourceSpansCopy.Value().Resource()) + resourceSpansCopy.Value().SetSchemaUrl(rs.SchemaUrl()) } - if scopeSpansCopy == nil { - smc := resourceSpansCopy.ScopeSpans().AppendEmpty() - scopeSpansCopy = &smc - ss.Scope().CopyTo(scopeSpansCopy.Scope()) - scopeSpansCopy.SetSchemaUrl(ss.SchemaUrl()) + if !scopeSpansCopy.IsInit() { + scopeSpansCopy.Init(resourceSpansCopy.Value().ScopeSpans().AppendEmpty()) + ss.Scope().CopyTo(scopeSpansCopy.Value().Scope()) + scopeSpansCopy.Value().SetSchemaUrl(ss.SchemaUrl()) } - span.MoveTo(scopeSpansCopy.Spans().AppendEmpty()) + span.MoveTo(scopeSpansCopy.Value().Spans().AppendEmpty()) return true }) return ss.Spans().Len() == 0 diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index 5982cabb63d14..c08bebbd8526a 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -5,7 +5,9 @@ package fileconsumer // import "github.com/open-telemetry/opentelemetry-collecto import ( "context" + "errors" "fmt" + "io/fs" "os" "sync" "time" @@ -22,6 +24,12 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" ) +const ( + // maxUnreadableEntries limits the number of paths tracked in the unreadable map + // to prevent memory issues when many files have permission errors. + maxUnreadableEntries = 10000 +) + type Manager struct { set component.TelemetrySettings wg sync.WaitGroup @@ -39,12 +47,17 @@ type Manager struct { pollsToArchive int telemetryBuilder *metadata.TelemetryBuilder + + unreadable map[string]struct{} } func (m *Manager) Start(persister operator.Persister) error { ctx, cancel := context.WithCancel(context.Background()) m.cancel = cancel + // initialize runtime-only tracking of unreadable paths + m.unreadable = make(map[string]struct{}) + if _, err := m.fileMatcher.MatchFiles(); err != nil { m.set.Logger.Warn("finding files", zap.Error(err)) } @@ -175,6 +188,8 @@ func (m *Manager) consume(ctx context.Context, paths []string) { m.telemetryBuilder.FileconsumerOpenFiles.Add(ctx, int64(0-m.tracker.EndConsume())) } +// makeFingerprint opens `path` and computes a fingerprint for the file +// and contains logic to only log file permission errors once per file per startup func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.File) { // Normalize the path to handle Windows UNC paths correctly normalizedPath, wasCorrupted := normalizePath(path) @@ -183,10 +198,29 @@ func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.Fi } file, err := os.Open(normalizedPath) // #nosec - operator must read in files defined by user if err != nil { - m.set.Logger.Error("Failed to open file", zap.Error(err), zap.String("original_path", path), zap.String("normalized_path", normalizedPath)) + // If a file is unreadable due to permissions error, store path in map and log error once (unless in debug mode) + if errors.Is(err, fs.ErrPermission) { + _, seen := m.unreadable[path] + if !seen { + // Limit map size to prevent unbounded growth + if len(m.unreadable) < maxUnreadableEntries { + m.unreadable[path] = struct{}{} + } + m.set.Logger.Error("Failed to open file - unreadable", zap.Error(err), zap.String("original_path", path), zap.String("normalized_path", normalizedPath)) + } + } else { + // For non-permission errors, always log + m.set.Logger.Error("Failed to open file", zap.Error(err), zap.String("original_path", path), zap.String("normalized_path", normalizedPath)) + } return nil, nil } + // Notify if previously unreadable file is now able to be read + if _, seen := m.unreadable[path]; seen { + m.set.Logger.Info("Previously unreadable file is now readable", zap.String("path", path)) + delete(m.unreadable, path) + } + fp, err := m.readerFactory.NewFingerprint(file) if err != nil { if err = file.Close(); err != nil { diff --git a/pkg/stanza/fileconsumer/unreadable_test.go b/pkg/stanza/fileconsumer/unreadable_test.go new file mode 100644 index 0000000000000..e5b43d274a26d --- /dev/null +++ b/pkg/stanza/fileconsumer/unreadable_test.go @@ -0,0 +1,165 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package fileconsumer + +import ( + "os" + "runtime" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/internal/filetest" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" +) + +// TestUnreadableFileLoggedOnce verifies that permission-denied errors when +// opening files are logged only once per file per process run, and that an +// informational message is emitted when the file later becomes readable. +func TestUnreadableFileLoggedOnce(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("permission manipulation tests are not reliable on Windows") + } + + t.Parallel() + + tempDir := t.TempDir() + cfg := NewConfig().includeDir(tempDir) + operator, _ := testManager(t, cfg) + + // Create a file and remove permissions so open will fail + f := filetest.OpenTemp(t, tempDir) + _, err := f.WriteString("abc\n") + require.NoError(t, err) + require.NoError(t, f.Close()) + require.NoError(t, os.Chmod(f.Name(), 0)) + + core, obs := observer.New(zapcore.DebugLevel) + logger := zap.New(core) + set := componenttest.NewNopTelemetrySettings() + set.Logger = logger + + operator.set.Logger = set.Logger + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) + defer func() { + require.NoError(t, operator.Stop()) + }() + + // First poll should attempt to open and log an error once + operator.poll(t.Context()) + + // Verify the unreadable map recorded the path and exactly one error was logged + require.Eventually(t, func() bool { + return len(operator.unreadable) == 1 + }, 2*time.Second, 10*time.Millisecond, "expected unreadable map to have one entry after first poll") + + require.Eventually(t, func() bool { + countErrMsgs := 0 + for _, e := range obs.All() { + if e.Level == zapcore.ErrorLevel && e.Message == "Failed to open file - unreadable" { + countErrMsgs++ + } + } + return countErrMsgs == 1 + }, 2*time.Second, 10*time.Millisecond, "expected exactly one 'Failed to open file - unreadable' error after first poll") + + // Second poll should not add another error-level log for the same path + operator.poll(t.Context()) + + require.Eventually(t, func() bool { + countErrMsgs := 0 + for _, e := range obs.All() { + if e.Level == zapcore.ErrorLevel && e.Message == "Failed to open file - unreadable" { + countErrMsgs++ + } + } + return countErrMsgs == 1 + }, 2*time.Second, 10*time.Millisecond, "expected still exactly one 'Failed to open file - unreadable' error after second poll") + + // Verify the unreadable map still contains the entry (no reinitialization) + require.Len(t, operator.unreadable, 1, "expected unreadable map to still have one entry after second poll") + + // Now make the file readable again and poll; should emit an info message + require.NoError(t, os.Chmod(f.Name(), 0o644)) + operator.poll(t.Context()) + + require.Eventually(t, func() bool { + for _, e := range obs.All() { + if e.Level == zapcore.InfoLevel && e.Message == "Previously unreadable file is now readable" { + return true + } + } + return false + }, 2*time.Second, 10*time.Millisecond, "expected at least one info message when file becomes readable") +} + +// TestNonPermissionErrorsAlwaysLogged verifies that errors other than permission +// errors (e.g., file not found) are always logged and not suppressed. +func TestNonPermissionErrorsAlwaysLogged(t *testing.T) { + t.Parallel() + + tempDir := t.TempDir() + cfg := NewConfig().includeDir(tempDir) + operator, _ := testManager(t, cfg) + + core, obs := observer.New(zapcore.DebugLevel) + logger := zap.New(core) + set := componenttest.NewNopTelemetrySettings() + set.Logger = logger + + operator.set.Logger = set.Logger + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) + defer func() { + require.NoError(t, operator.Stop()) + }() + + // Directly call makeFingerprint with a non-existent file path + // This simulates a file being deleted after matching but before opening + nonExistentPath := tempDir + "/non_existent_file.log" + + // First call should log an error + fp, file := operator.makeFingerprint(nonExistentPath) + require.Nil(t, fp) + require.Nil(t, file) + + require.Eventually(t, func() bool { + for _, e := range obs.All() { + if e.Level == zapcore.ErrorLevel && e.Message == "Failed to open file" { + return true + } + } + return false + }, 2*time.Second, 10*time.Millisecond, "expected an error log on first call for non-existent file") + + countBeforeSecondCall := 0 + for _, e := range obs.All() { + if e.Level == zapcore.ErrorLevel && e.Message == "Failed to open file" { + countBeforeSecondCall++ + } + } + + // Second call should also log an error (not suppressed like permission errors) + fp, file = operator.makeFingerprint(nonExistentPath) + require.Nil(t, fp) + require.Nil(t, file) + + require.Eventually(t, func() bool { + countAfterSecondCall := 0 + for _, e := range obs.All() { + if e.Level == zapcore.ErrorLevel && e.Message == "Failed to open file" { + countAfterSecondCall++ + } + } + // Should have at least one more error than before + return countAfterSecondCall > countBeforeSecondCall + }, 2*time.Second, 10*time.Millisecond, "expected another error log on second call for non-permission errors") + + // Verify the unreadable map is empty (non-permission errors shouldn't be tracked) + require.Empty(t, operator.unreadable, "expected unreadable map to be empty for non-permission errors") +} diff --git a/pkg/stanza/trim/benchmark_test.go b/pkg/stanza/trim/benchmark_test.go new file mode 100644 index 0000000000000..33ef5ed01226c --- /dev/null +++ b/pkg/stanza/trim/benchmark_test.go @@ -0,0 +1,135 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package trim + +import ( + "bufio" + "strings" + "testing" +) + +// BenchmarkTrimFuncs tests the core trimming logic (Leading, Trailing, Whitespace, Nop) +func BenchmarkTrimFuncs(b *testing.B) { + inputs := []struct { + name string + data []byte + }{ + { + name: "small clean", + data: []byte("simple_log_line"), + }, + { + name: "small dirty", + data: []byte(" \t\r\n simple_log_line \t\r\n "), + }, + { + name: "all space", + data: []byte(" \t\r\n "), + }, + { + name: "large dirty", + data: []byte(strings.Repeat(" \t content \r\n ", 100)), + }, + } + + funcs := []struct { + name string + fn Func + }{ + { + name: "leading", + fn: Leading, + }, + { + name: "trailing", + fn: Trailing, + }, + { + name: "whitespace", + fn: Whitespace, + }, + { + name: "nop", + fn: Nop, + }, + } + + for _, f := range funcs { + for _, input := range inputs { + b.Run(f.name+"/"+input.name, func(b *testing.B) { + b.ReportAllocs() + data := input.data + + for b.Loop() { + _ = f.fn(data) + } + }) + } + } +} + +// BenchmarkWithFunc tests the overhead of wrapping a SplitFunc +func BenchmarkWithFunc(b *testing.B) { + data := []byte("line1\nline2\nline3\n") + + tests := []struct { + name string + fn bufio.SplitFunc + }{ + { + name: "baseline scanlines", + fn: bufio.ScanLines, + }, + { + name: "with whitespace", + fn: WithFunc(bufio.ScanLines, Whitespace), + }, + } + + for _, tc := range tests { + b.Run(tc.name, func(b *testing.B) { + b.ReportAllocs() + + split := tc.fn + + for b.Loop() { + _, _, _ = split(data, false) + } + }) + } +} + +// BenchmarkToLength tests the truncation logic +func BenchmarkToLength(b *testing.B) { + tests := []struct { + name string + data []byte + limit int + }{ + { + name: "under limit", + data: []byte("short"), + limit: 10, + }, + { + name: "over limit", + data: []byte(strings.Repeat("long", 100)), + limit: 10, + }, + } + + for _, tc := range tests { + b.Run(tc.name, func(b *testing.B) { + b.ReportAllocs() + + // Initialize the split function with the specific limit + split := ToLength(bufio.ScanLines, tc.limit) + data := tc.data + + for b.Loop() { + _, _, _ = split(data, false) + } + }) + } +} diff --git a/pkg/stanza/trim/trim.go b/pkg/stanza/trim/trim.go index 3cf5d4f5c2ce4..8a41381ce58c5 100644 --- a/pkg/stanza/trim/trim.go +++ b/pkg/stanza/trim/trim.go @@ -5,7 +5,6 @@ package trim // import "github.com/open-telemetry/opentelemetry-collector-contri import ( "bufio" - "bytes" ) type Func func([]byte) []byte @@ -45,18 +44,22 @@ func Nop(token []byte) []byte { return token } +func isSpace(c byte) bool { + return c == ' ' || c == '\t' || c == '\r' || c == '\n' +} + func Leading(data []byte) []byte { - token := bytes.TrimLeft(data, "\r\n\t ") - if token == nil { - // TrimLeft sometimes overwrites something with nothing. - // We need to override this behavior in order to preserve empty tokens. - return data + for len(data) > 0 && isSpace(data[0]) { + data = data[1:] } - return token + return data } func Trailing(data []byte) []byte { - return bytes.TrimRight(data, "\r\n\t ") + for len(data) > 0 && isSpace(data[len(data)-1]) { + data = data[:len(data)-1] + } + return data } func Whitespace(data []byte) []byte { diff --git a/pkg/stanza/trim/trim_test.go b/pkg/stanza/trim/trim_test.go index c2a4747b307b1..15fcd4f46f546 100644 --- a/pkg/stanza/trim/trim_test.go +++ b/pkg/stanza/trim/trim_test.go @@ -56,6 +56,13 @@ func TestTrim(t *testing.T) { input: nil, expect: nil, }, + { + name: "trim trailing returns nil when given nil", + preserveLeading: true, + preserveTrailing: false, + input: nil, + expect: nil, + }, { name: "trim leading returns []byte when given []byte", preserveLeading: false, @@ -63,10 +70,32 @@ func TestTrim(t *testing.T) { input: []byte{}, expect: []byte{}, }, + { + name: "all whitespace becomes empty", + preserveLeading: false, + preserveTrailing: false, + input: []byte(" \t\r\n "), + expect: []byte{}, + }, + { + name: "no whitespace remains unchanged", + preserveLeading: false, + preserveTrailing: false, + input: []byte("content"), + expect: []byte("content"), + }, + { + name: "mixed whitespace types", + preserveLeading: false, + preserveTrailing: false, + input: []byte(" \t\r\ncontent \t\r\n"), + expect: []byte("content"), + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + t.Parallel() trimFunc := Config{ PreserveLeading: tc.preserveLeading, PreserveTrailing: tc.preserveTrailing, @@ -76,6 +105,47 @@ func TestTrim(t *testing.T) { } } +func TestIsSpace(t *testing.T) { + testCases := []struct { + name string + b byte + isSpace bool + }{ + { + name: "space", + b: ' ', + isSpace: true, + }, + { + name: "newline", + b: '\n', + isSpace: true, + }, + { + name: "tab", + b: '\t', + isSpace: true, + }, + { + name: "carriage return", + b: '\r', + isSpace: true, + }, + { + name: "not a space", + b: '1', + isSpace: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + assert.Equal(t, tc.isSpace, isSpace(tc.b)) + }) + } +} + func TestWithFunc(t *testing.T) { testCases := []struct { name string diff --git a/processor/resourcedetectionprocessor/e2e_test.go b/processor/resourcedetectionprocessor/e2e_test.go index 8200316a2b18f..7ed397974acff 100644 --- a/processor/resourcedetectionprocessor/e2e_test.go +++ b/processor/resourcedetectionprocessor/e2e_test.go @@ -384,6 +384,240 @@ func TestE2EOpenstackNovaDetector(t *testing.T) { }, 3*time.Minute, 1*time.Second) } +// TestE2EUpcloudDetector tests the UpCloud detector by deploying a metadata-server +// sidecar that simulates the UpCloud IMDS and verifying that the resource attributes +// are correctly detected and attached to metrics. +func TestE2EUpcloudDetector(t *testing.T) { + var expected pmetric.Metrics + expectedFile := filepath.Join("testdata", "e2e", "upcloud", "expected.yaml") + expected, err := golden.ReadMetrics(expectedFile) + require.NoError(t, err) + + k8sClient, err := k8stest.NewK8sClient(testKubeConfig) + require.NoError(t, err) + + metricsConsumer := new(consumertest.MetricsSink) + shutdownSink := startUpSink(t, metricsConsumer) + defer shutdownSink() + + testID := uuid.NewString()[:8] + collectorObjs := k8stest.CreateCollectorObjects(t, k8sClient, testID, filepath.Join(".", "testdata", "e2e", "upcloud", "collector"), map[string]string{}, "") + + defer func() { + for _, obj := range collectorObjs { + require.NoErrorf(t, k8stest.DeleteObject(k8sClient, obj), "failed to delete object %s", obj.GetName()) + } + }() + + wantEntries := 10 + waitForData(t, wantEntries, metricsConsumer) + + // Uncomment to regenerate golden file + // golden.WriteMetrics(t, expectedFile+".actual", metricsConsumer.AllMetrics()[len(metricsConsumer.AllMetrics())-1]) + + require.EventuallyWithT(t, func(tt *assert.CollectT) { + assert.NoError(tt, pmetrictest.CompareMetrics(expected, metricsConsumer.AllMetrics()[len(metricsConsumer.AllMetrics())-1], + pmetrictest.IgnoreTimestamp(), + pmetrictest.IgnoreStartTimestamp(), + pmetrictest.IgnoreScopeVersion(), + pmetrictest.IgnoreResourceMetricsOrder(), + pmetrictest.IgnoreMetricsOrder(), + pmetrictest.IgnoreScopeMetricsOrder(), + pmetrictest.IgnoreMetricDataPointsOrder(), + pmetrictest.IgnoreMetricValues(), + pmetrictest.IgnoreSubsequentDataPoints("system.cpu.time"), + ), + ) + }, 3*time.Minute, 1*time.Second) +} + +// TestE2EVultrDetector tests the Vultr detector by deploying a metadata-server +// sidecar that simulates the Vultr IMDS and verifying that the resource attributes +// are correctly detected and attached to metrics. +func TestE2EVultrDetector(t *testing.T) { + var expected pmetric.Metrics + expectedFile := filepath.Join("testdata", "e2e", "vultr", "expected.yaml") + expected, err := golden.ReadMetrics(expectedFile) + require.NoError(t, err) + + k8sClient, err := k8stest.NewK8sClient(testKubeConfig) + require.NoError(t, err) + + metricsConsumer := new(consumertest.MetricsSink) + shutdownSink := startUpSink(t, metricsConsumer) + defer shutdownSink() + + testID := uuid.NewString()[:8] + collectorObjs := k8stest.CreateCollectorObjects(t, k8sClient, testID, filepath.Join(".", "testdata", "e2e", "vultr", "collector"), map[string]string{}, "") + + defer func() { + for _, obj := range collectorObjs { + require.NoErrorf(t, k8stest.DeleteObject(k8sClient, obj), "failed to delete object %s", obj.GetName()) + } + }() + + wantEntries := 10 + waitForData(t, wantEntries, metricsConsumer) + + // Uncomment to regenerate golden file + // golden.WriteMetrics(t, expectedFile+".actual", metricsConsumer.AllMetrics()[len(metricsConsumer.AllMetrics())-1]) + + require.EventuallyWithT(t, func(tt *assert.CollectT) { + assert.NoError(tt, pmetrictest.CompareMetrics(expected, metricsConsumer.AllMetrics()[len(metricsConsumer.AllMetrics())-1], + pmetrictest.IgnoreTimestamp(), + pmetrictest.IgnoreStartTimestamp(), + pmetrictest.IgnoreScopeVersion(), + pmetrictest.IgnoreResourceMetricsOrder(), + pmetrictest.IgnoreMetricsOrder(), + pmetrictest.IgnoreScopeMetricsOrder(), + pmetrictest.IgnoreMetricDataPointsOrder(), + pmetrictest.IgnoreMetricValues(), + pmetrictest.IgnoreSubsequentDataPoints("system.cpu.time"), + ), + ) + }, 3*time.Minute, 1*time.Second) +} + +// TestE2EEC2Detector tests the EC2 detector by verifying that EC2 metadata +// (like cloud.provider, cloud.region, host.id, etc.) is correctly detected and attached to metrics. +func TestE2EEC2Detector(t *testing.T) { + var expected pmetric.Metrics + expectedFile := filepath.Join("testdata", "e2e", "ec2", "expected.yaml") + expected, err := golden.ReadMetrics(expectedFile) + require.NoError(t, err) + + k8sClient, err := k8stest.NewK8sClient(testKubeConfig) + require.NoError(t, err) + + metricsConsumer := new(consumertest.MetricsSink) + shutdownSink := startUpSink(t, metricsConsumer) + defer shutdownSink() + + testID := uuid.NewString()[:8] + collectorObjs := k8stest.CreateCollectorObjects(t, k8sClient, testID, filepath.Join(".", "testdata", "e2e", "ec2", "collector"), map[string]string{}, "") + + defer func() { + for _, obj := range collectorObjs { + require.NoErrorf(t, k8stest.DeleteObject(k8sClient, obj), "failed to delete object %s", obj.GetName()) + } + }() + + wantEntries := 10 + waitForData(t, wantEntries, metricsConsumer) + + // Uncomment to regenerate golden file + // golden.WriteMetrics(t, expectedFile+".actual", metricsConsumer.AllMetrics()[len(metricsConsumer.AllMetrics())-1]) + + require.EventuallyWithT(t, func(tt *assert.CollectT) { + assert.NoError(tt, pmetrictest.CompareMetrics(expected, metricsConsumer.AllMetrics()[len(metricsConsumer.AllMetrics())-1], + pmetrictest.IgnoreTimestamp(), + pmetrictest.IgnoreStartTimestamp(), + pmetrictest.IgnoreScopeVersion(), + pmetrictest.IgnoreResourceMetricsOrder(), + pmetrictest.IgnoreMetricsOrder(), + pmetrictest.IgnoreScopeMetricsOrder(), + pmetrictest.IgnoreMetricDataPointsOrder(), + pmetrictest.IgnoreMetricValues(), + pmetrictest.IgnoreSubsequentDataPoints("system.cpu.time"), + ), + ) + }, 3*time.Minute, 1*time.Second) +} + +// TestE2EScalewayDetector tests the Scaleway detector by deploying a metadata-server +// sidecar that simulates the Scaleway IMDS (at 169.254.42.42) and verifying that +// the resource attributes are correctly detected and attached to metrics. +func TestE2EScalewayDetector(t *testing.T) { + var expected pmetric.Metrics + expectedFile := filepath.Join("testdata", "e2e", "scaleway", "expected.yaml") + expected, err := golden.ReadMetrics(expectedFile) + require.NoError(t, err) + + k8sClient, err := k8stest.NewK8sClient(testKubeConfig) + require.NoError(t, err) + + metricsConsumer := new(consumertest.MetricsSink) + shutdownSink := startUpSink(t, metricsConsumer) + defer shutdownSink() + + testID := uuid.NewString()[:8] + collectorObjs := k8stest.CreateCollectorObjects(t, k8sClient, testID, filepath.Join(".", "testdata", "e2e", "scaleway", "collector"), map[string]string{}, "") + + defer func() { + for _, obj := range collectorObjs { + require.NoErrorf(t, k8stest.DeleteObject(k8sClient, obj), "failed to delete object %s", obj.GetName()) + } + }() + + wantEntries := 10 + waitForData(t, wantEntries, metricsConsumer) + + // Uncomment to regenerate golden file + // golden.WriteMetrics(t, expectedFile+".actual", metricsConsumer.AllMetrics()[len(metricsConsumer.AllMetrics())-1]) + + require.EventuallyWithT(t, func(tt *assert.CollectT) { + assert.NoError(tt, pmetrictest.CompareMetrics(expected, metricsConsumer.AllMetrics()[len(metricsConsumer.AllMetrics())-1], + pmetrictest.IgnoreTimestamp(), + pmetrictest.IgnoreStartTimestamp(), + pmetrictest.IgnoreScopeVersion(), + pmetrictest.IgnoreResourceMetricsOrder(), + pmetrictest.IgnoreMetricsOrder(), + pmetrictest.IgnoreScopeMetricsOrder(), + pmetrictest.IgnoreMetricDataPointsOrder(), + pmetrictest.IgnoreMetricValues(), + pmetrictest.IgnoreSubsequentDataPoints("system.cpu.time"), + ), + ) + }, 3*time.Minute, 1*time.Second) +} + +// TestE2EAzureDetector tests the Azure detector by deploying a metadata-server +// sidecar that simulates the Azure IMDS and verifying that the resource attributes +// are correctly detected and attached to metrics. +func TestE2EAzureDetector(t *testing.T) { + var expected pmetric.Metrics + expectedFile := filepath.Join("testdata", "e2e", "azure", "expected.yaml") + expected, err := golden.ReadMetrics(expectedFile) + require.NoError(t, err) + + k8sClient, err := k8stest.NewK8sClient(testKubeConfig) + require.NoError(t, err) + + metricsConsumer := new(consumertest.MetricsSink) + shutdownSink := startUpSink(t, metricsConsumer) + defer shutdownSink() + + testID := uuid.NewString()[:8] + collectorObjs := k8stest.CreateCollectorObjects(t, k8sClient, testID, filepath.Join(".", "testdata", "e2e", "azure", "collector"), map[string]string{}, "") + + defer func() { + for _, obj := range collectorObjs { + require.NoErrorf(t, k8stest.DeleteObject(k8sClient, obj), "failed to delete object %s", obj.GetName()) + } + }() + + wantEntries := 10 + waitForData(t, wantEntries, metricsConsumer) + + // Uncomment to regenerate golden file + // golden.WriteMetrics(t, expectedFile+".actual", metricsConsumer.AllMetrics()[len(metricsConsumer.AllMetrics())-1]) + + require.EventuallyWithT(t, func(tt *assert.CollectT) { + assert.NoError(tt, pmetrictest.CompareMetrics(expected, metricsConsumer.AllMetrics()[len(metricsConsumer.AllMetrics())-1], + pmetrictest.IgnoreTimestamp(), + pmetrictest.IgnoreStartTimestamp(), + pmetrictest.IgnoreScopeVersion(), + pmetrictest.IgnoreResourceMetricsOrder(), + pmetrictest.IgnoreMetricsOrder(), + pmetrictest.IgnoreScopeMetricsOrder(), + pmetrictest.IgnoreMetricDataPointsOrder(), + pmetrictest.IgnoreMetricValues(), + pmetrictest.IgnoreSubsequentDataPoints("system.cpu.time"), + ), + ) + }, 3*time.Minute, 1*time.Second) +} + func replaceWithStar(_ string) string { return "*" } diff --git a/processor/resourcedetectionprocessor/testdata/e2e/azure/collector/01-metadata-configmap.yaml b/processor/resourcedetectionprocessor/testdata/e2e/azure/collector/01-metadata-configmap.yaml new file mode 100644 index 0000000000000..d5461acfd72da --- /dev/null +++ b/processor/resourcedetectionprocessor/testdata/e2e/azure/collector/01-metadata-configmap.yaml @@ -0,0 +1,89 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ .Name }}-metadata-config + namespace: default +data: + server.py: | + import json + import os + from http.server import BaseHTTPRequestHandler, HTTPServer + from urllib.parse import urlparse, parse_qs + + # Azure IMDS compute metadata format + AZURE_COMPUTE_METADATA = { + "location": "eastus", + "name": "test-azure-vm", + "vmId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890", + "vmSize": "Standard_D2s_v3", + "subscriptionId": "12345678-1234-1234-1234-123456789abc", + "resourceGroupName": "test-resource-group", + "vmScaleSetName": "scaleset-01", + "zone": "az1", + "osProfile": { + "computerName": "test-azure-vm.internal.cloudapp.net" + }, + "tagsList": [ + {"name": "environment", "value": "testing"}, + {"name": "team", "value": "observability"}, + {"name": "cost-center", "value": "12345"} + ] + } + + class Handler(BaseHTTPRequestHandler): + def do_GET(self): + parsed = urlparse(self.path) + path = parsed.path + query = parse_qs(parsed.query) + + # Health check endpoint + if path == "/healthz": + body = b"ok" + self.send_response(200) + self.send_header("Content-Type", "text/plain; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + return + + # Azure IMDS compute endpoint + if path == "/metadata/instance/compute": + # Check for required Metadata header + metadata_header = self.headers.get("Metadata") + if metadata_header != "True": + self.send_response(400) + self.send_header("Content-Type", "text/plain") + self.end_headers() + self.wfile.write(b"Missing required Metadata header") + return + + # Check for required query parameters + api_version = query.get("api-version", [None])[0] + format_param = query.get("format", [None])[0] + + if not api_version: + self.send_response(400) + self.send_header("Content-Type", "text/plain") + self.end_headers() + self.wfile.write(b"Missing api-version parameter") + return + + body = json.dumps(AZURE_COMPUTE_METADATA).encode("utf-8") + self.send_response(200) + self.send_header("Content-Type", "application/json; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + return + + # Not found + self.send_response(404) + self.end_headers() + + def log_message(self, fmt, *args): + return + + if __name__ == "__main__": + port = int(os.environ.get("PORT", "80")) + server = HTTPServer(("", port), Handler) + server.serve_forever() diff --git a/processor/resourcedetectionprocessor/testdata/e2e/azure/collector/02-configmap.yaml b/processor/resourcedetectionprocessor/testdata/e2e/azure/collector/02-configmap.yaml new file mode 100644 index 0000000000000..d3cc6846a70a7 --- /dev/null +++ b/processor/resourcedetectionprocessor/testdata/e2e/azure/collector/02-configmap.yaml @@ -0,0 +1,46 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ .Name }}-config + namespace: default +data: + relay: | + exporters: + otlp: + endpoint: {{ .HostEndpoint }}:4317 + tls: + insecure: true + extensions: + health_check: + endpoint: 0.0.0.0:13133 + processors: + resourcedetection: + detectors: [azure] + timeout: 2s + override: false + azure: + resource_attributes: + cloud.availability_zone: + enabled: true + tags: + - ^environment$ + - ^team$ + receivers: + hostmetrics: + collection_interval: 1s + scrapers: + cpu: + service: + telemetry: + logs: + level: "debug" + extensions: + - health_check + pipelines: + metrics: + receivers: + - hostmetrics + processors: + - resourcedetection + exporters: + - otlp diff --git a/processor/resourcedetectionprocessor/testdata/e2e/azure/collector/03-serviceaccount.yaml b/processor/resourcedetectionprocessor/testdata/e2e/azure/collector/03-serviceaccount.yaml new file mode 100644 index 0000000000000..7a9803b445b6c --- /dev/null +++ b/processor/resourcedetectionprocessor/testdata/e2e/azure/collector/03-serviceaccount.yaml @@ -0,0 +1,5 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: {{ .Name }}-sa + namespace: default diff --git a/processor/resourcedetectionprocessor/testdata/e2e/azure/collector/04-service.yaml b/processor/resourcedetectionprocessor/testdata/e2e/azure/collector/04-service.yaml new file mode 100644 index 0000000000000..94587d231bbf0 --- /dev/null +++ b/processor/resourcedetectionprocessor/testdata/e2e/azure/collector/04-service.yaml @@ -0,0 +1,12 @@ +apiVersion: v1 +kind: Service +metadata: + name: {{ .Name }} + namespace: default +spec: + selector: + app: {{ .Name }} + ports: + - name: health + port: 13133 + targetPort: 13133 diff --git a/processor/resourcedetectionprocessor/testdata/e2e/azure/collector/05-deployment.yaml b/processor/resourcedetectionprocessor/testdata/e2e/azure/collector/05-deployment.yaml new file mode 100644 index 0000000000000..7214d769be5a8 --- /dev/null +++ b/processor/resourcedetectionprocessor/testdata/e2e/azure/collector/05-deployment.yaml @@ -0,0 +1,80 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ .Name }} + namespace: default +spec: + replicas: 1 + selector: + matchLabels: + app: {{ .Name }} + template: + metadata: + labels: + app: {{ .Name }} + spec: + serviceAccountName: {{ .Name }}-sa + initContainers: + - name: metadata-server + image: python:3.13-alpine + imagePullPolicy: IfNotPresent + restartPolicy: Always + securityContext: + runAsUser: 0 + capabilities: + drop: + - "ALL" + add: + - "NET_BIND_SERVICE" + command: + - python3 + - /scripts/server.py + env: + - name: PORT + value: "80" + ports: + - containerPort: 80 + name: metadata + startupProbe: + httpGet: + path: /healthz + port: 80 + initialDelaySeconds: 1 + periodSeconds: 1 + failureThreshold: 10 + volumeMounts: + - name: metadata-script + mountPath: /scripts + - name: setup-network + image: alpine:3.21 + imagePullPolicy: IfNotPresent + securityContext: + capabilities: + add: + - NET_ADMIN + command: + - sh + - -c + - | + apk add --no-cache iptables + iptables -t nat -A OUTPUT -d 169.254.169.254 -p tcp --dport 80 -j DNAT --to-destination 127.0.0.1:80 + echo "iptables rule added to redirect 169.254.169.254:80 -> 127.0.0.1:80" + containers: + - name: otelcol + image: otelcontribcol:latest + imagePullPolicy: Never + args: + - "--config=/conf/relay" + volumeMounts: + - name: config + mountPath: /conf + volumes: + - name: config + configMap: + name: {{ .Name }}-config + items: + - key: relay + path: relay + - name: metadata-script + configMap: + name: {{ .Name }}-metadata-config diff --git a/processor/resourcedetectionprocessor/testdata/e2e/azure/expected.yaml b/processor/resourcedetectionprocessor/testdata/e2e/azure/expected.yaml new file mode 100644 index 0000000000000..0be2b278415cc --- /dev/null +++ b/processor/resourcedetectionprocessor/testdata/e2e/azure/expected.yaml @@ -0,0 +1,134 @@ +resourceMetrics: + - resource: + attributes: + - key: azure.resourcegroup.name + value: + stringValue: test-resource-group + - key: azure.tag.environment + value: + stringValue: testing + - key: azure.tag.team + value: + stringValue: observability + - key: azure.vm.name + value: + stringValue: test-azure-vm + - key: azure.vm.scaleset.name + value: + stringValue: scaleset-01 + - key: azure.vm.size + value: + stringValue: Standard_D2s_v3 + - key: cloud.account.id + value: + stringValue: 12345678-1234-1234-1234-123456789abc + - key: cloud.availability_zone + value: + stringValue: "az1" + - key: cloud.platform + value: + stringValue: azure_vm + - key: cloud.provider + value: + stringValue: azure + - key: cloud.region + value: + stringValue: eastus + - key: host.id + value: + stringValue: a1b2c3d4-e5f6-7890-abcd-ef1234567890 + - key: host.name + value: + stringValue: test-azure-vm.internal.cloudapp.net + schemaUrl: https://opentelemetry.io/schemas/1.9.0 + scopeMetrics: + - metrics: + - description: Total seconds each logical CPU spent on each mode. + name: system.cpu.time + sum: + aggregationTemporality: 2 + dataPoints: + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: idle + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: interrupt + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: nice + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: softirq + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: steal + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: system + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: user + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: wait + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + isMonotonic: true + unit: s + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/cpuscraper diff --git a/processor/resourcedetectionprocessor/testdata/e2e/ec2/collector/configmap.yaml b/processor/resourcedetectionprocessor/testdata/e2e/ec2/collector/configmap.yaml new file mode 100644 index 0000000000000..7d3de056dd547 --- /dev/null +++ b/processor/resourcedetectionprocessor/testdata/e2e/ec2/collector/configmap.yaml @@ -0,0 +1,59 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ .Name }}-config + namespace: default +data: + relay: | + exporters: + otlp: + endpoint: {{ .HostEndpoint }}:4317 + tls: + insecure: true + extensions: + health_check: + endpoint: 0.0.0.0:13133 + processors: + resourcedetection: + detectors: [ec2] + timeout: 2s + override: false + ec2: + resource_attributes: + host.name: + enabled: true + host.id: + enabled: true + cloud.provider: + enabled: true + cloud.platform: + enabled: true + cloud.account.id: + enabled: true + cloud.region: + enabled: true + cloud.availability_zone: + enabled: true + host.image.id: + enabled: true + host.type: + enabled: true + receivers: + hostmetrics: + collection_interval: 1s + scrapers: + cpu: + service: + telemetry: + logs: + level: "debug" + extensions: + - health_check + pipelines: + metrics: + receivers: + - hostmetrics + processors: + - resourcedetection + exporters: + - otlp diff --git a/processor/resourcedetectionprocessor/testdata/e2e/ec2/collector/deployment.yaml b/processor/resourcedetectionprocessor/testdata/e2e/ec2/collector/deployment.yaml new file mode 100644 index 0000000000000..40916b2213ce6 --- /dev/null +++ b/processor/resourcedetectionprocessor/testdata/e2e/ec2/collector/deployment.yaml @@ -0,0 +1,43 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ .Name }} + namespace: default +spec: + replicas: 1 + selector: + matchLabels: + app: {{ .Name }} + template: + metadata: + labels: + app: {{ .Name }} + spec: + serviceAccountName: {{ .Name }}-sa + containers: + - name: ec2-metadata-mock + image: public.ecr.aws/aws-ec2/amazon-ec2-metadata-mock:v1.13.0 + imagePullPolicy: IfNotPresent + ports: + - containerPort: 1338 + name: metadata + - name: otelcol + image: otelcontribcol:latest + imagePullPolicy: Never + env: + - name: AWS_EC2_METADATA_SERVICE_ENDPOINT + value: "http://localhost:1338" + - name: AWS_REGION + value: "us-east-1" + args: + - "--config=/conf/relay" + volumeMounts: + - name: config + mountPath: /conf + volumes: + - name: config + configMap: + name: {{ .Name }}-config + items: + - key: relay + path: relay diff --git a/processor/resourcedetectionprocessor/testdata/e2e/ec2/collector/service.yaml b/processor/resourcedetectionprocessor/testdata/e2e/ec2/collector/service.yaml new file mode 100644 index 0000000000000..94587d231bbf0 --- /dev/null +++ b/processor/resourcedetectionprocessor/testdata/e2e/ec2/collector/service.yaml @@ -0,0 +1,12 @@ +apiVersion: v1 +kind: Service +metadata: + name: {{ .Name }} + namespace: default +spec: + selector: + app: {{ .Name }} + ports: + - name: health + port: 13133 + targetPort: 13133 diff --git a/processor/resourcedetectionprocessor/testdata/e2e/ec2/collector/serviceaccount.yaml b/processor/resourcedetectionprocessor/testdata/e2e/ec2/collector/serviceaccount.yaml new file mode 100644 index 0000000000000..7a9803b445b6c --- /dev/null +++ b/processor/resourcedetectionprocessor/testdata/e2e/ec2/collector/serviceaccount.yaml @@ -0,0 +1,5 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: {{ .Name }}-sa + namespace: default diff --git a/processor/resourcedetectionprocessor/testdata/e2e/ec2/expected.yaml b/processor/resourcedetectionprocessor/testdata/e2e/ec2/expected.yaml new file mode 100644 index 0000000000000..28003a61cc534 --- /dev/null +++ b/processor/resourcedetectionprocessor/testdata/e2e/ec2/expected.yaml @@ -0,0 +1,122 @@ +resourceMetrics: + - resource: + attributes: + - key: cloud.account.id + value: + stringValue: "0123456789" + - key: cloud.availability_zone + value: + stringValue: us-east-1f + - key: cloud.platform + value: + stringValue: aws_ec2 + - key: cloud.provider + value: + stringValue: aws + - key: cloud.region + value: + stringValue: us-east-1 + - key: host.id + value: + stringValue: i-1234567890abcdef0 + - key: host.image.id + value: + stringValue: ami-0b69ea66ff7391e80 + - key: host.name + value: + stringValue: ip-172-16-34-43.ec2.internal + - key: host.type + value: + stringValue: m4.xlarge + schemaUrl: https://opentelemetry.io/schemas/1.9.0 + scopeMetrics: + - metrics: + - description: Total seconds each logical CPU spent on each mode. + name: system.cpu.time + sum: + aggregationTemporality: 2 + dataPoints: + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: idle + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: interrupt + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: nice + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: softirq + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: steal + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: system + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: user + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: wait + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + isMonotonic: true + unit: s + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/cpuscraper diff --git a/processor/resourcedetectionprocessor/testdata/e2e/scaleway/collector/01-metadata-configmap.yaml b/processor/resourcedetectionprocessor/testdata/e2e/scaleway/collector/01-metadata-configmap.yaml new file mode 100644 index 0000000000000..b1bf991292947 --- /dev/null +++ b/processor/resourcedetectionprocessor/testdata/e2e/scaleway/collector/01-metadata-configmap.yaml @@ -0,0 +1,95 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ .Name }}-metadata-config + namespace: default +data: + server.py: | + import json + import os + import socket + from http.server import BaseHTTPRequestHandler, HTTPServer + + # Scaleway Instance Metadata format + # See https://github.com/scaleway/scaleway-sdk-go/blob/master/api/instance/v1/instance_metadata_sdk.go + SCALEWAY_METADATA = { + "id": "11111111-1111-1111-1111-111111111111", + "name": "test-scaleway-instance", + "hostname": "test-scaleway-instance", + "organization": "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", + "project": "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb", + "commercial_type": "DEV1-S", + "arch": "x86_64", + "state": "running", + "zone": "fr-par-1", + "location": { + "platform_id": "1", + "hypervisor_id": "1", + "node_id": "1", + "cluster_id": "1", + "zone_id": "fr-par-1" + }, + "image": { + "id": "cccccccc-cccc-cccc-cccc-cccccccccccc", + "name": "Ubuntu 22.04 Jammy Jellyfish", + "arch": "x86_64" + }, + "tags": ["otel", "e2e-test"], + "private_ip": "10.0.0.1", + "public_ip": { + "id": "dddddddd-dddd-dddd-dddd-dddddddddddd", + "address": "51.15.0.1", + "gateway": "51.15.0.254", + "netmask": "32", + "family": "inet", + "provisioning_mode": "dhcp" + }, + "ipv6": None, + "ssh_public_keys": [], + "volumes": {}, + "timezone": "UTC" + } + + class Handler(BaseHTTPRequestHandler): + def do_GET(self): + # Health check endpoint + if self.path == "/healthz": + body = b"ok" + self.send_response(200) + self.send_header("Content-Type", "text/plain; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + return + + # Scaleway IMDS metadata endpoint + if self.path == "/conf?format=json" or self.path == "/conf": + body = json.dumps(SCALEWAY_METADATA).encode("utf-8") + self.send_response(200) + self.send_header("Content-Type", "application/json; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + return + + # Not found + self.send_response(404) + self.end_headers() + + def log_message(self, fmt, *args): + return + + class DualStackHTTPServer(HTTPServer): + address_family = socket.AF_INET6 + + def server_bind(self): + # Disable IPV6_V6ONLY to accept both IPv4 and IPv6 connections + self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) + super().server_bind() + + if __name__ == "__main__": + port = int(os.environ.get("PORT", "80")) + + # Start dual-stack server (IPv4 + IPv6) + server = DualStackHTTPServer(("::", port), Handler) + server.serve_forever() diff --git a/processor/resourcedetectionprocessor/testdata/e2e/scaleway/collector/02-configmap.yaml b/processor/resourcedetectionprocessor/testdata/e2e/scaleway/collector/02-configmap.yaml new file mode 100644 index 0000000000000..a35424a571b63 --- /dev/null +++ b/processor/resourcedetectionprocessor/testdata/e2e/scaleway/collector/02-configmap.yaml @@ -0,0 +1,39 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ .Name }}-config + namespace: default +data: + relay: | + exporters: + otlp: + endpoint: {{ .HostEndpoint }}:4317 + tls: + insecure: true + extensions: + health_check: + endpoint: 0.0.0.0:13133 + processors: + resourcedetection: + detectors: [scaleway] + timeout: 2s + override: false + receivers: + hostmetrics: + collection_interval: 1s + scrapers: + cpu: + service: + telemetry: + logs: + level: "debug" + extensions: + - health_check + pipelines: + metrics: + receivers: + - hostmetrics + processors: + - resourcedetection + exporters: + - otlp diff --git a/processor/resourcedetectionprocessor/testdata/e2e/scaleway/collector/03-serviceaccount.yaml b/processor/resourcedetectionprocessor/testdata/e2e/scaleway/collector/03-serviceaccount.yaml new file mode 100644 index 0000000000000..7a9803b445b6c --- /dev/null +++ b/processor/resourcedetectionprocessor/testdata/e2e/scaleway/collector/03-serviceaccount.yaml @@ -0,0 +1,5 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: {{ .Name }}-sa + namespace: default diff --git a/processor/resourcedetectionprocessor/testdata/e2e/scaleway/collector/04-service.yaml b/processor/resourcedetectionprocessor/testdata/e2e/scaleway/collector/04-service.yaml new file mode 100644 index 0000000000000..94587d231bbf0 --- /dev/null +++ b/processor/resourcedetectionprocessor/testdata/e2e/scaleway/collector/04-service.yaml @@ -0,0 +1,12 @@ +apiVersion: v1 +kind: Service +metadata: + name: {{ .Name }} + namespace: default +spec: + selector: + app: {{ .Name }} + ports: + - name: health + port: 13133 + targetPort: 13133 diff --git a/processor/resourcedetectionprocessor/testdata/e2e/scaleway/collector/05-deployment.yaml b/processor/resourcedetectionprocessor/testdata/e2e/scaleway/collector/05-deployment.yaml new file mode 100644 index 0000000000000..dfb616d5eebc7 --- /dev/null +++ b/processor/resourcedetectionprocessor/testdata/e2e/scaleway/collector/05-deployment.yaml @@ -0,0 +1,97 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ .Name }} + namespace: default +spec: + replicas: 1 + selector: + matchLabels: + app: {{ .Name }} + template: + metadata: + labels: + app: {{ .Name }} + spec: + serviceAccountName: {{ .Name }}-sa + initContainers: + - name: metadata-server + image: python:3.13-alpine + imagePullPolicy: IfNotPresent + restartPolicy: Always + securityContext: + runAsUser: 0 + capabilities: + drop: + - "ALL" + add: + - "NET_BIND_SERVICE" + command: + - python3 + - /scripts/server.py + env: + - name: PORT + value: "80" + ports: + - containerPort: 80 + name: metadata + startupProbe: + httpGet: + path: /healthz + port: 80 + initialDelaySeconds: 1 + periodSeconds: 1 + failureThreshold: 10 + volumeMounts: + - name: metadata-script + mountPath: /scripts + - name: setup-network + image: alpine:3.21 + imagePullPolicy: IfNotPresent + securityContext: + capabilities: + add: + - NET_ADMIN + command: + - sh + - -c + - | + apk add --no-cache iptables ip6tables iproute2 curl + # IPv4 redirect for 169.254.42.42 + iptables -t nat -A OUTPUT -d 169.254.42.42 -p tcp --dport 80 -j DNAT --to-destination 127.0.0.1:80 + # IPv6: Add local route and redirect for fd00:42::42 + # The route makes the address routable via loopback, then DNAT redirects to ::1 + ip -6 addr add fd00:42::42/128 dev lo 2>/dev/null || true + ip6tables -t nat -A OUTPUT -d fd00:42::42 -p tcp --dport 80 -j DNAT --to-destination [::1]:80 + echo "iptables rules added to redirect 169.254.42.42:80 and fd00:42::42:80 -> localhost:80" + # Verify IPv4 redirect is working + until curl -sf http://169.254.42.42/conf?format=json > /dev/null; do + echo "Waiting for IPv4 metadata redirect to be ready..." + sleep 1 + done + echo "IPv4 metadata redirect verified working" + # Verify IPv6 redirect is working + until curl -sf -g "http://[fd00:42::42]/conf?format=json" > /dev/null; do + echo "Waiting for IPv6 metadata redirect to be ready..." + sleep 1 + done + echo "IPv6 metadata redirect verified working" + containers: + - name: otelcol + image: otelcontribcol:latest + imagePullPolicy: Never + args: + - "--config=/conf/relay" + volumeMounts: + - name: config + mountPath: /conf + volumes: + - name: config + configMap: + name: {{ .Name }}-config + items: + - key: relay + path: relay + - name: metadata-script + configMap: + name: {{ .Name }}-metadata-config diff --git a/processor/resourcedetectionprocessor/testdata/e2e/scaleway/expected.yaml b/processor/resourcedetectionprocessor/testdata/e2e/scaleway/expected.yaml new file mode 100644 index 0000000000000..e93cacdb6d9bb --- /dev/null +++ b/processor/resourcedetectionprocessor/testdata/e2e/scaleway/expected.yaml @@ -0,0 +1,125 @@ +resourceMetrics: + - resource: + attributes: + - key: cloud.account.id + value: + stringValue: aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa + - key: cloud.availability_zone + value: + stringValue: fr-par-1 + - key: cloud.platform + value: + stringValue: scaleway_cloud_compute + - key: cloud.provider + value: + stringValue: scaleway_cloud + - key: cloud.region + value: + stringValue: fr-par + - key: host.id + value: + stringValue: 11111111-1111-1111-1111-111111111111 + - key: host.image.id + value: + stringValue: cccccccc-cccc-cccc-cccc-cccccccccccc + - key: host.image.name + value: + stringValue: Ubuntu 22.04 Jammy Jellyfish + - key: host.name + value: + stringValue: test-scaleway-instance + - key: host.type + value: + stringValue: DEV1-S + schemaUrl: https://opentelemetry.io/schemas/1.9.0 + scopeMetrics: + - metrics: + - description: Total seconds each logical CPU spent on each mode. + name: system.cpu.time + sum: + aggregationTemporality: 2 + dataPoints: + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: idle + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: interrupt + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: nice + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: softirq + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: steal + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: system + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: user + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: wait + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + isMonotonic: true + unit: s + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/cpuscraper diff --git a/processor/resourcedetectionprocessor/testdata/e2e/upcloud/collector/01-metadata-configmap.yaml b/processor/resourcedetectionprocessor/testdata/e2e/upcloud/collector/01-metadata-configmap.yaml new file mode 100644 index 0000000000000..6e436d0154d83 --- /dev/null +++ b/processor/resourcedetectionprocessor/testdata/e2e/upcloud/collector/01-metadata-configmap.yaml @@ -0,0 +1,53 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ .Name }}-metadata-config + namespace: default +data: + server.py: | + import json + import os + from http.server import BaseHTTPRequestHandler, HTTPServer + + # UpCloud IMDS metadata format + # See https://upcloud.com/docs/guides/upcloud-metadata-service/ + UPCLOUD_METADATA = { + "cloud_name": "upcloud", + "hostname": "test-upcloud-server", + "instance_id": "00aabbcc-1122-3344-5566-778899aabbcc", + "region": "fi-hel1" + } + + class Handler(BaseHTTPRequestHandler): + def do_GET(self): + # Health check endpoint + if self.path == "/healthz": + body = b"ok" + self.send_response(200) + self.send_header("Content-Type", "text/plain; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + return + + # UpCloud IMDS metadata endpoint + if self.path == "/metadata/v1.json": + body = json.dumps(UPCLOUD_METADATA).encode("utf-8") + self.send_response(200) + self.send_header("Content-Type", "application/json; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + return + + # Not found + self.send_response(404) + self.end_headers() + + def log_message(self, fmt, *args): + return + + if __name__ == "__main__": + port = int(os.environ.get("PORT", "80")) + server = HTTPServer(("", port), Handler) + server.serve_forever() diff --git a/processor/resourcedetectionprocessor/testdata/e2e/upcloud/collector/02-configmap.yaml b/processor/resourcedetectionprocessor/testdata/e2e/upcloud/collector/02-configmap.yaml new file mode 100644 index 0000000000000..687150f3ad0fb --- /dev/null +++ b/processor/resourcedetectionprocessor/testdata/e2e/upcloud/collector/02-configmap.yaml @@ -0,0 +1,39 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ .Name }}-config + namespace: default +data: + relay: | + exporters: + otlp: + endpoint: {{ .HostEndpoint }}:4317 + tls: + insecure: true + extensions: + health_check: + endpoint: 0.0.0.0:13133 + processors: + resourcedetection: + detectors: [upcloud] + timeout: 2s + override: false + receivers: + hostmetrics: + collection_interval: 1s + scrapers: + cpu: + service: + telemetry: + logs: + level: "debug" + extensions: + - health_check + pipelines: + metrics: + receivers: + - hostmetrics + processors: + - resourcedetection + exporters: + - otlp diff --git a/processor/resourcedetectionprocessor/testdata/e2e/upcloud/collector/03-serviceaccount.yaml b/processor/resourcedetectionprocessor/testdata/e2e/upcloud/collector/03-serviceaccount.yaml new file mode 100644 index 0000000000000..7a9803b445b6c --- /dev/null +++ b/processor/resourcedetectionprocessor/testdata/e2e/upcloud/collector/03-serviceaccount.yaml @@ -0,0 +1,5 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: {{ .Name }}-sa + namespace: default diff --git a/processor/resourcedetectionprocessor/testdata/e2e/upcloud/collector/04-service.yaml b/processor/resourcedetectionprocessor/testdata/e2e/upcloud/collector/04-service.yaml new file mode 100644 index 0000000000000..94587d231bbf0 --- /dev/null +++ b/processor/resourcedetectionprocessor/testdata/e2e/upcloud/collector/04-service.yaml @@ -0,0 +1,12 @@ +apiVersion: v1 +kind: Service +metadata: + name: {{ .Name }} + namespace: default +spec: + selector: + app: {{ .Name }} + ports: + - name: health + port: 13133 + targetPort: 13133 diff --git a/processor/resourcedetectionprocessor/testdata/e2e/upcloud/collector/05-deployment.yaml b/processor/resourcedetectionprocessor/testdata/e2e/upcloud/collector/05-deployment.yaml new file mode 100644 index 0000000000000..7214d769be5a8 --- /dev/null +++ b/processor/resourcedetectionprocessor/testdata/e2e/upcloud/collector/05-deployment.yaml @@ -0,0 +1,80 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ .Name }} + namespace: default +spec: + replicas: 1 + selector: + matchLabels: + app: {{ .Name }} + template: + metadata: + labels: + app: {{ .Name }} + spec: + serviceAccountName: {{ .Name }}-sa + initContainers: + - name: metadata-server + image: python:3.13-alpine + imagePullPolicy: IfNotPresent + restartPolicy: Always + securityContext: + runAsUser: 0 + capabilities: + drop: + - "ALL" + add: + - "NET_BIND_SERVICE" + command: + - python3 + - /scripts/server.py + env: + - name: PORT + value: "80" + ports: + - containerPort: 80 + name: metadata + startupProbe: + httpGet: + path: /healthz + port: 80 + initialDelaySeconds: 1 + periodSeconds: 1 + failureThreshold: 10 + volumeMounts: + - name: metadata-script + mountPath: /scripts + - name: setup-network + image: alpine:3.21 + imagePullPolicy: IfNotPresent + securityContext: + capabilities: + add: + - NET_ADMIN + command: + - sh + - -c + - | + apk add --no-cache iptables + iptables -t nat -A OUTPUT -d 169.254.169.254 -p tcp --dport 80 -j DNAT --to-destination 127.0.0.1:80 + echo "iptables rule added to redirect 169.254.169.254:80 -> 127.0.0.1:80" + containers: + - name: otelcol + image: otelcontribcol:latest + imagePullPolicy: Never + args: + - "--config=/conf/relay" + volumeMounts: + - name: config + mountPath: /conf + volumes: + - name: config + configMap: + name: {{ .Name }}-config + items: + - key: relay + path: relay + - name: metadata-script + configMap: + name: {{ .Name }}-metadata-config diff --git a/processor/resourcedetectionprocessor/testdata/e2e/upcloud/expected.yaml b/processor/resourcedetectionprocessor/testdata/e2e/upcloud/expected.yaml new file mode 100644 index 0000000000000..9a4857752dfa8 --- /dev/null +++ b/processor/resourcedetectionprocessor/testdata/e2e/upcloud/expected.yaml @@ -0,0 +1,107 @@ +resourceMetrics: + - resource: + attributes: + - key: cloud.provider + value: + stringValue: upcloud + - key: cloud.region + value: + stringValue: fi-hel1 + - key: host.id + value: + stringValue: 00aabbcc-1122-3344-5566-778899aabbcc + - key: host.name + value: + stringValue: test-upcloud-server + schemaUrl: https://opentelemetry.io/schemas/1.9.0 + scopeMetrics: + - metrics: + - description: Total seconds each logical CPU spent on each mode. + name: system.cpu.time + sum: + aggregationTemporality: 2 + dataPoints: + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: idle + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: interrupt + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: nice + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: softirq + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: steal + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: system + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: user + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: wait + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + isMonotonic: true + unit: s + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/cpuscraper diff --git a/processor/resourcedetectionprocessor/testdata/e2e/vultr/collector/01-metadata-configmap.yaml b/processor/resourcedetectionprocessor/testdata/e2e/vultr/collector/01-metadata-configmap.yaml new file mode 100644 index 0000000000000..17f20e268e97d --- /dev/null +++ b/processor/resourcedetectionprocessor/testdata/e2e/vultr/collector/01-metadata-configmap.yaml @@ -0,0 +1,58 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ .Name }}-metadata-config + namespace: default +data: + server.py: | + import json + import os + from http.server import BaseHTTPRequestHandler, HTTPServer + from urllib.parse import urlparse + + # Vultr IMDS metadata format (see https://www.vultr.com/metadata/) + VULTR_METADATA = { + "hostname": "test-vultr-instance", + "instanceid": "12345678", + "instance-v2-id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890", + "region": { + "regioncode": "EWR" + } + } + + class Handler(BaseHTTPRequestHandler): + def do_GET(self): + parsed = urlparse(self.path) + path = parsed.path + + # Health check endpoint + if path == "/healthz": + body = b"ok" + self.send_response(200) + self.send_header("Content-Type", "text/plain; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + return + + # Vultr IMDS endpoint (returns full metadata as JSON) + if path == "/v1.json": + body = json.dumps(VULTR_METADATA).encode("utf-8") + self.send_response(200) + self.send_header("Content-Type", "application/json; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + return + + # Not found + self.send_response(404) + self.end_headers() + + def log_message(self, fmt, *args): + return + + if __name__ == "__main__": + port = int(os.environ.get("PORT", "80")) + server = HTTPServer(("", port), Handler) + server.serve_forever() diff --git a/processor/resourcedetectionprocessor/testdata/e2e/vultr/collector/02-configmap.yaml b/processor/resourcedetectionprocessor/testdata/e2e/vultr/collector/02-configmap.yaml new file mode 100644 index 0000000000000..13caebe5818a4 --- /dev/null +++ b/processor/resourcedetectionprocessor/testdata/e2e/vultr/collector/02-configmap.yaml @@ -0,0 +1,39 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ .Name }}-config + namespace: default +data: + relay: | + exporters: + otlp: + endpoint: {{ .HostEndpoint }}:4317 + tls: + insecure: true + extensions: + health_check: + endpoint: 0.0.0.0:13133 + processors: + resourcedetection: + detectors: [vultr] + timeout: 2s + override: false + receivers: + hostmetrics: + collection_interval: 1s + scrapers: + cpu: + service: + telemetry: + logs: + level: "debug" + extensions: + - health_check + pipelines: + metrics: + receivers: + - hostmetrics + processors: + - resourcedetection + exporters: + - otlp diff --git a/processor/resourcedetectionprocessor/testdata/e2e/vultr/collector/03-serviceaccount.yaml b/processor/resourcedetectionprocessor/testdata/e2e/vultr/collector/03-serviceaccount.yaml new file mode 100644 index 0000000000000..7a9803b445b6c --- /dev/null +++ b/processor/resourcedetectionprocessor/testdata/e2e/vultr/collector/03-serviceaccount.yaml @@ -0,0 +1,5 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: {{ .Name }}-sa + namespace: default diff --git a/processor/resourcedetectionprocessor/testdata/e2e/vultr/collector/04-service.yaml b/processor/resourcedetectionprocessor/testdata/e2e/vultr/collector/04-service.yaml new file mode 100644 index 0000000000000..94587d231bbf0 --- /dev/null +++ b/processor/resourcedetectionprocessor/testdata/e2e/vultr/collector/04-service.yaml @@ -0,0 +1,12 @@ +apiVersion: v1 +kind: Service +metadata: + name: {{ .Name }} + namespace: default +spec: + selector: + app: {{ .Name }} + ports: + - name: health + port: 13133 + targetPort: 13133 diff --git a/processor/resourcedetectionprocessor/testdata/e2e/vultr/collector/05-deployment.yaml b/processor/resourcedetectionprocessor/testdata/e2e/vultr/collector/05-deployment.yaml new file mode 100644 index 0000000000000..7214d769be5a8 --- /dev/null +++ b/processor/resourcedetectionprocessor/testdata/e2e/vultr/collector/05-deployment.yaml @@ -0,0 +1,80 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ .Name }} + namespace: default +spec: + replicas: 1 + selector: + matchLabels: + app: {{ .Name }} + template: + metadata: + labels: + app: {{ .Name }} + spec: + serviceAccountName: {{ .Name }}-sa + initContainers: + - name: metadata-server + image: python:3.13-alpine + imagePullPolicy: IfNotPresent + restartPolicy: Always + securityContext: + runAsUser: 0 + capabilities: + drop: + - "ALL" + add: + - "NET_BIND_SERVICE" + command: + - python3 + - /scripts/server.py + env: + - name: PORT + value: "80" + ports: + - containerPort: 80 + name: metadata + startupProbe: + httpGet: + path: /healthz + port: 80 + initialDelaySeconds: 1 + periodSeconds: 1 + failureThreshold: 10 + volumeMounts: + - name: metadata-script + mountPath: /scripts + - name: setup-network + image: alpine:3.21 + imagePullPolicy: IfNotPresent + securityContext: + capabilities: + add: + - NET_ADMIN + command: + - sh + - -c + - | + apk add --no-cache iptables + iptables -t nat -A OUTPUT -d 169.254.169.254 -p tcp --dport 80 -j DNAT --to-destination 127.0.0.1:80 + echo "iptables rule added to redirect 169.254.169.254:80 -> 127.0.0.1:80" + containers: + - name: otelcol + image: otelcontribcol:latest + imagePullPolicy: Never + args: + - "--config=/conf/relay" + volumeMounts: + - name: config + mountPath: /conf + volumes: + - name: config + configMap: + name: {{ .Name }}-config + items: + - key: relay + path: relay + - name: metadata-script + configMap: + name: {{ .Name }}-metadata-config diff --git a/processor/resourcedetectionprocessor/testdata/e2e/vultr/expected.yaml b/processor/resourcedetectionprocessor/testdata/e2e/vultr/expected.yaml new file mode 100644 index 0000000000000..ed2f969413dc0 --- /dev/null +++ b/processor/resourcedetectionprocessor/testdata/e2e/vultr/expected.yaml @@ -0,0 +1,107 @@ +resourceMetrics: + - resource: + attributes: + - key: cloud.provider + value: + stringValue: vultr + - key: cloud.region + value: + stringValue: ewr + - key: host.id + value: + stringValue: a1b2c3d4-e5f6-7890-abcd-ef1234567890 + - key: host.name + value: + stringValue: test-vultr-instance + schemaUrl: https://opentelemetry.io/schemas/1.9.0 + scopeMetrics: + - metrics: + - description: Total seconds each logical CPU spent on each mode. + name: system.cpu.time + sum: + aggregationTemporality: 2 + dataPoints: + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: idle + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: interrupt + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: nice + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: softirq + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: steal + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: system + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: user + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + - asDouble: 1.0 + attributes: + - key: cpu + value: + stringValue: cpu0 + - key: state + value: + stringValue: wait + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + isMonotonic: true + unit: s + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/cpuscraper diff --git a/receiver/sqlserverreceiver/README.md b/receiver/sqlserverreceiver/README.md index 791308fc5d1cf..e97c46d4896d2 100644 --- a/receiver/sqlserverreceiver/README.md +++ b/receiver/sqlserverreceiver/README.md @@ -61,7 +61,7 @@ sqlserver: top_query_collection: # this collection exports the most expensive queries as logs lookback_time: 60s # which time window should we look for the top queries max_query_sample_count: 1000 # maximum number query we store in cache for top queries. - top_query_count: 200 # The maximum number of active queries to report in a single run. + top_query_count: 250 # The maximum number of active queries to report in a single run. collection_interval: 60s # collection interval for top query collection specifically query_sample_collection: # this collection exports the currently (relate to the query time) executing queries as logs max_rows_per_query: 100 # the maximum number of samples to return for one single query.