diff --git a/.chloggen/k8sattributesprocessor-stable-observability.yaml b/.chloggen/k8sattributesprocessor-stable-observability.yaml new file mode 100644 index 0000000000000..6e20d9c085300 --- /dev/null +++ b/.chloggen/k8sattributesprocessor-stable-observability.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: processor/k8sattributes + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Added processor-specific observability metrics: `otelcol.k8s.pod.association` with `status`, `pod_identifier`, and `otelcol.signal` attributes" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [44587] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/processor/k8sattributesprocessor/factory.go b/processor/k8sattributesprocessor/factory.go index 351f429d1c8c4..d6a9107035bd4 100644 --- a/processor/k8sattributesprocessor/factory.go +++ b/processor/k8sattributesprocessor/factory.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/collector/processor/processorhelper" "go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper" "go.opentelemetry.io/collector/processor/xprocessor" + "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/kube" @@ -171,11 +172,17 @@ func createKubernetesProcessor( cfg component.Config, options ...option, ) *kubernetesprocessor { + telemetry, err := metadata.NewTelemetryBuilder(params.TelemetrySettings) + if err != nil { + params.Logger.Error("failed to create telemetry builder", zap.Error(err)) + } + kp := &kubernetesprocessor{ logger: params.Logger, cfg: cfg, options: options, telemetrySettings: params.TelemetrySettings, + telemetry: telemetry, } return kp diff --git a/processor/k8sattributesprocessor/internal/metadata/generated_telemetry.go b/processor/k8sattributesprocessor/internal/metadata/generated_telemetry.go index 398c9c2a1c0d7..1d28d4a5699ef 100644 --- a/processor/k8sattributesprocessor/internal/metadata/generated_telemetry.go +++ b/processor/k8sattributesprocessor/internal/metadata/generated_telemetry.go @@ -25,6 +25,7 @@ type TelemetryBuilder struct { meter metric.Meter mu sync.Mutex registrations []metric.Registration + OtelcolK8sPodAssociation metric.Int64Counter OtelsvcK8sDaemonsetAdded metric.Int64Counter OtelsvcK8sDaemonsetDeleted metric.Int64Counter OtelsvcK8sDaemonsetUpdated metric.Int64Counter @@ -82,6 +83,12 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme } builder.meter = Meter(settings) var err, errs error + builder.OtelcolK8sPodAssociation, err = builder.meter.Int64Counter( + "otelcol_otelcol.k8s.pod.association", + metric.WithDescription("Number of pod associations' evaluations [Development]"), + metric.WithUnit("{resources}"), + ) + errs = errors.Join(errs, err) builder.OtelsvcK8sDaemonsetAdded, err = builder.meter.Int64Counter( "otelcol_otelsvc_k8s_daemonset_added", metric.WithDescription("Number of daemonset add events received [Development]"), diff --git a/processor/k8sattributesprocessor/internal/metadatatest/generated_telemetrytest.go b/processor/k8sattributesprocessor/internal/metadatatest/generated_telemetrytest.go index d24438b42d02f..0a1a0c056c00d 100644 --- a/processor/k8sattributesprocessor/internal/metadatatest/generated_telemetrytest.go +++ b/processor/k8sattributesprocessor/internal/metadatatest/generated_telemetrytest.go @@ -21,6 +21,22 @@ func NewSettings(tt *componenttest.Telemetry) processor.Settings { return set } +func AssertEqualOtelcolK8sPodAssociation(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_otelcol.k8s.pod.association", + Description: "Number of pod associations' evaluations [Development]", + Unit: "{resources}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_otelcol.k8s.pod.association") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + func AssertEqualOtelsvcK8sDaemonsetAdded(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { want := metricdata.Metrics{ Name: "otelcol_otelsvc_k8s_daemonset_added", diff --git a/processor/k8sattributesprocessor/internal/metadatatest/generated_telemetrytest_test.go b/processor/k8sattributesprocessor/internal/metadatatest/generated_telemetrytest_test.go index 76f1d8f01d40c..42a5b4e10b0b0 100644 --- a/processor/k8sattributesprocessor/internal/metadatatest/generated_telemetrytest_test.go +++ b/processor/k8sattributesprocessor/internal/metadatatest/generated_telemetrytest_test.go @@ -19,6 +19,7 @@ func TestSetupTelemetry(t *testing.T) { tb, err := metadata.NewTelemetryBuilder(testTel.NewTelemetrySettings()) require.NoError(t, err) defer tb.Shutdown() + tb.OtelcolK8sPodAssociation.Add(context.Background(), 1) tb.OtelsvcK8sDaemonsetAdded.Add(context.Background(), 1) tb.OtelsvcK8sDaemonsetDeleted.Add(context.Background(), 1) tb.OtelsvcK8sDaemonsetUpdated.Add(context.Background(), 1) @@ -45,6 +46,9 @@ func TestSetupTelemetry(t *testing.T) { tb.OtelsvcK8sStatefulsetAdded.Add(context.Background(), 1) tb.OtelsvcK8sStatefulsetDeleted.Add(context.Background(), 1) tb.OtelsvcK8sStatefulsetUpdated.Add(context.Background(), 1) + AssertEqualOtelcolK8sPodAssociation(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) AssertEqualOtelsvcK8sDaemonsetAdded(t, testTel, []metricdata.DataPoint[int64]{{Value: 1}}, metricdatatest.IgnoreTimestamp()) diff --git a/processor/k8sattributesprocessor/metadata.yaml b/processor/k8sattributesprocessor/metadata.yaml index 9be811d8d60b2..a6e016e773515 100644 --- a/processor/k8sattributesprocessor/metadata.yaml +++ b/processor/k8sattributesprocessor/metadata.yaml @@ -148,8 +148,37 @@ tests: goleak: skip: true +attributes: + otelcol.signal: + description: The signal type the telemetry metric is associated with + type: string + enum: + - metrics + - traces + - logs + - profiles + pod_identifier: + description: The pod identifier value(s) used for the association attempt + type: string + status: + description: The status of the pod association operation + type: string + enum: + - success + - error + telemetry: metrics: + otelcol.k8s.pod.association: + enabled: false + description: Number of pod associations' evaluations + stability: + level: development + unit: "{resources}" + attributes: [status, pod_identifier, otelcol.signal] + sum: + value_type: int + monotonic: true otelsvc_k8s_daemonset_added: enabled: false description: Number of daemonset add events received diff --git a/processor/k8sattributesprocessor/processor.go b/processor/k8sattributesprocessor/processor.go index d505749b01092..b037f15280a68 100644 --- a/processor/k8sattributesprocessor/processor.go +++ b/processor/k8sattributesprocessor/processor.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "strconv" + "strings" "time" "go.opentelemetry.io/collector/component" @@ -16,11 +17,14 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/pprofile" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" conventions "go.opentelemetry.io/otel/semconv/v1.39.0" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/kube" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/metadata" ) const ( @@ -31,6 +35,7 @@ type kubernetesprocessor struct { cfg component.Config options []option telemetrySettings component.TelemetrySettings + telemetry *metadata.TelemetryBuilder logger *zap.Logger apiConfig k8sconfig.APIConfig kc kube.Client @@ -88,6 +93,9 @@ func (kp *kubernetesprocessor) Start(_ context.Context, host component.Host) err } func (kp *kubernetesprocessor) Shutdown(context.Context) error { + if kp.telemetry != nil { + kp.telemetry.Shutdown() + } if kp.kc == nil { return nil } @@ -101,7 +109,7 @@ func (kp *kubernetesprocessor) Shutdown(context.Context) error { func (kp *kubernetesprocessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { rss := td.ResourceSpans() for i := 0; i < rss.Len(); i++ { - kp.processResource(ctx, rss.At(i).Resource()) + kp.processResource(ctx, rss.At(i).Resource(), "traces") } return td, nil @@ -111,7 +119,7 @@ func (kp *kubernetesprocessor) processTraces(ctx context.Context, td ptrace.Trac func (kp *kubernetesprocessor) processMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) { rm := md.ResourceMetrics() for i := 0; i < rm.Len(); i++ { - kp.processResource(ctx, rm.At(i).Resource()) + kp.processResource(ctx, rm.At(i).Resource(), "metrics") } return md, nil @@ -121,7 +129,7 @@ func (kp *kubernetesprocessor) processMetrics(ctx context.Context, md pmetric.Me func (kp *kubernetesprocessor) processLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) { rl := ld.ResourceLogs() for i := 0; i < rl.Len(); i++ { - kp.processResource(ctx, rl.At(i).Resource()) + kp.processResource(ctx, rl.At(i).Resource(), "logs") } return ld, nil @@ -131,14 +139,14 @@ func (kp *kubernetesprocessor) processLogs(ctx context.Context, ld plog.Logs) (p func (kp *kubernetesprocessor) processProfiles(ctx context.Context, pd pprofile.Profiles) (pprofile.Profiles, error) { rp := pd.ResourceProfiles() for i := 0; i < rp.Len(); i++ { - kp.processResource(ctx, rp.At(i).Resource()) + kp.processResource(ctx, rp.At(i).Resource(), "profiles") } return pd, nil } // processResource adds Pod metadata tags to resource based on pod association configuration -func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pcommon.Resource) { +func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pcommon.Resource, signalType string) { podIdentifierValue := extractPodID(ctx, resource.Attributes(), kp.podAssociations) kp.logger.Debug("evaluating pod identifier", zap.Any("value", podIdentifierValue)) @@ -155,17 +163,48 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pco } var pod *kube.Pod + var podFound bool + podIdentifierStr := buildPodIdentifierString(podIdentifierValue) if podIdentifierValue.IsNotEmpty() { - var podFound bool if pod, podFound = kp.kc.GetPod(podIdentifierValue); podFound { kp.logger.Debug("getting the pod", zap.Any("pod", pod)) + // Record successful pod association + if kp.telemetry != nil { + successAttr := metric.WithAttributes( + attribute.String("status", "success"), + attribute.String("pod_identifier", podIdentifierStr), + attribute.String("otelcol.signal", signalType), + ) + kp.telemetry.OtelcolK8sPodAssociation.Add(ctx, 1, successAttr) + } + for key, val := range pod.Attributes { setResourceAttribute(resource.Attributes(), key, val) } kp.addContainerAttributes(resource.Attributes(), pod) } else { - kp.logger.Debug("unable to find pod based on identifier", zap.Any("value", podIdentifierValue)) + // Record failed pod association + kp.logger.Debug("pod not found", zap.Any("podIdentifier", podIdentifierValue)) + if kp.telemetry != nil { + errorAttr := metric.WithAttributes( + attribute.String("status", "error"), + attribute.String("pod_identifier", podIdentifierStr), + attribute.String("otelcol.signal", signalType), + ) + kp.telemetry.OtelcolK8sPodAssociation.Add(ctx, 1, errorAttr) + } + } + } else { + // Record failed pod association when no identifier found + kp.logger.Debug("no pod identifier found") + if kp.telemetry != nil { + errorAttr := metric.WithAttributes( + attribute.String("status", "error"), + attribute.String("pod_identifier", podIdentifierStr), + attribute.String("otelcol.signal", signalType), + ) + kp.telemetry.OtelcolK8sPodAssociation.Add(ctx, 1, errorAttr) } } @@ -407,6 +446,20 @@ func (kp *kubernetesprocessor) getUIDForPodsNode(nodeName string) string { return node.NodeUID } +// buildPodIdentifierString combines all identifier values into a comma-separated string +func buildPodIdentifierString(podIdentifierValue kube.PodIdentifier) string { + var identifiers []string + for i := range podIdentifierValue { + if podIdentifierValue[i].Value != "" { + identifiers = append(identifiers, podIdentifierValue[i].Value) + } + } + if len(identifiers) > 0 { + return strings.Join(identifiers, ",") + } + return "unknown" +} + // intFromAttribute extracts int value from an attribute stored as string or int func intFromAttribute(val pcommon.Value) (int, error) { switch val.Type() {