From 28f81ca7505c6a755115dd3f44bbe41250f830ac Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Thu, 11 Sep 2025 10:32:22 +0100 Subject: [PATCH 01/16] [processor/k8sattributesprocessor] add support for k8s.cronjob.uid Signed-off-by: Paulo Dias --- processor/k8sattributesprocessor/config.go | 7 +- .../k8sattributesprocessor/documentation.md | 1 + .../internal/kube/client.go | 206 +++++++++++++++--- .../internal/kube/informer.go | 27 +++ .../internal/kube/kube.go | 19 +- .../internal/metadata/generated_config.go | 4 + .../metadata/generated_config_test.go | 2 + .../internal/metadata/generated_resource.go | 7 + .../metadata/generated_resource_test.go | 8 +- .../internal/metadata/generated_telemetry.go | 21 ++ .../internal/metadata/testdata/config.yaml | 4 + .../metadatatest/generated_telemetrytest.go | 48 ++++ .../generated_telemetrytest_test.go | 12 + .../k8sattributesprocessor/metadata.yaml | 25 +++ processor/k8sattributesprocessor/options.go | 5 + 15 files changed, 356 insertions(+), 40 deletions(-) diff --git a/processor/k8sattributesprocessor/config.go b/processor/k8sattributesprocessor/config.go index 12f4340f7b719..6db8467cd7ca4 100644 --- a/processor/k8sattributesprocessor/config.go +++ b/processor/k8sattributesprocessor/config.go @@ -75,7 +75,7 @@ func (cfg *Config) Validate() error { } switch f.From { - case "", kube.MetadataFromPod, kube.MetadataFromNamespace, kube.MetadataFromNode, kube.MetadataFromDeployment, kube.MetadataFromStatefulSet, kube.MetadataFromDaemonSet, kube.MetadataFromJob: + case "", kube.MetadataFromPod, kube.MetadataFromNamespace, kube.MetadataFromNode, kube.MetadataFromDeployment, kube.MetadataFromStatefulSet, kube.MetadataFromDaemonSet, kube.MetadataFromJob, kube.MetadataFromCronJob: default: return fmt.Errorf("%s is not a valid choice for From. Must be one of: pod, namespace, deployment, statefulset, daemonset, job, node", f.From) } @@ -97,7 +97,7 @@ func (cfg *Config) Validate() error { string(conventions.K8SDaemonSetNameKey), string(conventions.K8SDaemonSetUIDKey), string(conventions.K8SStatefulSetNameKey), string(conventions.K8SStatefulSetUIDKey), string(conventions.K8SJobNameKey), string(conventions.K8SJobUIDKey), - string(conventions.K8SCronJobNameKey), + string(conventions.K8SCronJobNameKey), string(conventions.K8SCronJobUIDKey), string(conventions.K8SNodeNameKey), string(conventions.K8SNodeUIDKey), string(conventions.K8SContainerNameKey), string(conventions.ContainerIDKey), string(conventions.ContainerImageNameKey), string(conventions.ContainerImageTagKey), @@ -139,7 +139,8 @@ type ExtractConfig struct { // k8s.node.name, k8s.namespace.name, k8s.pod.start_time, // k8s.replicaset.name, k8s.replicaset.uid, // k8s.daemonset.name, k8s.daemonset.uid, - // k8s.job.name, k8s.job.uid, k8s.cronjob.name, + // k8s.job.name, k8s.job.uid, + // k8s.cronjob.name, k8s.cronjob.uid, // k8s.statefulset.name, k8s.statefulset.uid, // k8s.container.name, container.id, container.image.name, // container.image.tag, container.image.repo_digests diff --git a/processor/k8sattributesprocessor/documentation.md b/processor/k8sattributesprocessor/documentation.md index 56c2d2ff22ff4..5c775137845d0 100644 --- a/processor/k8sattributesprocessor/documentation.md +++ b/processor/k8sattributesprocessor/documentation.md @@ -13,6 +13,7 @@ | k8s.cluster.uid | Gives cluster uid identified with kube-system namespace | Any Str | false | | k8s.container.name | The name of the Container in a Pod template. Requires container.id. | Any Str | false | | k8s.cronjob.name | The name of the CronJob. | Any Str | false | +| k8s.cronjob.uid | The uid of the CronJob. | Any Str | false | | k8s.daemonset.name | The name of the DaemonSet. | Any Str | false | | k8s.daemonset.uid | The UID of the DaemonSet. | Any Str | false | | k8s.deployment.name | The name of the Deployment. | Any Str | true | diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index 33c26e1a62c85..791bb4bf0259c 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -63,6 +63,9 @@ const ( // Semconv attributes https://github.com/open-telemetry/semantic-conventions/blob/main/docs/resource/k8s.md#job K8sJobLabel = "k8s.job.label.%s" K8sJobAnnotation = "k8s.job.annotation.%s" + // Semconv attributes https://github.com/open-telemetry/semantic-conventions/blob/main/docs/resource/k8s.md#cronjob + K8sCronJobLabel = "k8s.cronjob.label.%s" + K8sCronJobAnnotation = "k8s.cronjob.annotation.%s" ) var allowLabelsAnnotationsSingular = featuregate.GlobalRegistry().MustRegister( @@ -86,6 +89,7 @@ type WatchClient struct { daemonsetInformer cache.SharedInformer jobInformer cache.SharedInformer replicasetInformer cache.SharedInformer + cronJobInformer cache.SharedInformer replicasetRegex *regexp.Regexp cronJobRegex *regexp.Regexp deleteQueue []deleteRequest @@ -125,6 +129,13 @@ type WatchClient struct { // Key is job uid Jobs map[string]*Job + // JobUID -> CronJobUID relation (computed from Job ownerReferences) + jobToCronJobUID map[string]string + + // A map containing cron job related data, used to associate them with resources. + // Key is cron job uid + CronJobs map[string]*CronJob + // A map containing ReplicaSets related data, used to associate them with resources. // Key is replicaset uid ReplicaSets map[string]*ReplicaSet @@ -188,6 +199,8 @@ func New( c.StatefulSets = map[string]*StatefulSet{} c.DaemonSets = map[string]*DaemonSet{} c.Jobs = map[string]*Job{} + c.jobToCronJobUID = map[string]string{} + c.CronJobs = map[string]*CronJob{} if newClientSet == nil { newClientSet = k8sconfig.MakeClient } @@ -279,10 +292,14 @@ func New( c.daemonsetInformer = newDaemonSetSharedInformer(c.kc, c.Filters.Namespace) } - if c.extractJobLabelsAnnotations() { + if c.extractJobLabelsAnnotations() || rules.CronJobUID { c.jobInformer = newJobSharedInformer(c.kc, c.Filters.Namespace) } + if c.extractCronJobLabelsAnnotations() { + c.cronJobInformer = newCronJobSharedInformer(c.kc, c.Filters.Namespace) + } + return c, err } @@ -380,6 +397,19 @@ func (c *WatchClient) Start() error { go c.jobInformer.Run(c.stopCh) } + if c.cronJobInformer != nil { + reg, err = c.cronJobInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.handleCronJobAdd, + UpdateFunc: c.handleCronJobUpdate, + DeleteFunc: c.handleCronJobDelete, + }) + if err != nil { + return err + } + synced = append(synced, reg.HasSynced) + go c.cronJobInformer.Run(c.stopCh) + } + reg, err = c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handlePodAdd, UpdateFunc: c.handlePodUpdate, @@ -630,12 +660,44 @@ func (c *WatchClient) handleJobDelete(obj any) { if n, ok := c.Jobs[string(job.UID)]; ok { delete(c.Jobs, n.UID) } + delete(c.jobToCronJobUID, string(job.UID)) c.m.Unlock() } else { c.logger.Error("object received was not of type api_v1.Job", zap.Any("received", obj)) } } +func (c *WatchClient) handleCronJobAdd(obj any) { + c.telemetryBuilder.OtelsvcK8sCronjobAdded.Add(context.Background(), 1) + if cronJob, ok := obj.(*batch_v1.CronJob); ok { + c.addOrUpdateCronJob(cronJob) + } else { + c.logger.Error("object received was not of type batch_v1.CronJob", zap.Any("received", obj)) + } +} + +func (c *WatchClient) handleCronJobUpdate(_, newCJ any) { + c.telemetryBuilder.OtelsvcK8sCronjobUpdated.Add(context.Background(), 1) + if cronJob, ok := newCJ.(*batch_v1.CronJob); ok { + c.addOrUpdateCronJob(cronJob) + } else { + c.logger.Error("object received was not of type batch_v1.CronJob", zap.Any("received", newCJ)) + } +} + +func (c *WatchClient) handleCronJobDelete(obj any) { + c.telemetryBuilder.OtelsvcK8sCronjobDeleted.Add(context.Background(), 1) + if cronJob, ok := ignoreDeletedFinalStateUnknown(obj).(*batch_v1.CronJob); ok { + c.m.Lock() + if n, ok := c.CronJobs[string(cronJob.UID)]; ok { + delete(c.CronJobs, n.UID) + } + c.m.Unlock() + } else { + c.logger.Error("object received was not of type batch_v1.CronJob", zap.Any("received", obj)) + } +} + func (c *WatchClient) deleteLoop(interval, gracePeriod time.Duration) { // This loop runs after N seconds and deletes pods from cache. // It iterates over the delete queue and deletes all that aren't @@ -753,6 +815,16 @@ func (c *WatchClient) GetJob(jobUID string) (*Job, bool) { return nil, false } +func (c *WatchClient) GetCronJob(cronJobUID string) (*CronJob, bool) { + c.m.RLock() + cronJob, ok := c.CronJobs[cronJobUID] + c.m.RUnlock() + if ok { + return cronJob, ok + } + return nil, false +} + func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string { tags := map[string]string{} if c.Rules.PodName { @@ -799,7 +871,8 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string { c.Rules.JobUID || c.Rules.JobName || c.Rules.StatefulSetUID || c.Rules.StatefulSetName || c.Rules.DeploymentName || c.Rules.DeploymentUID || - c.Rules.CronJobName || c.Rules.ServiceName { + c.Rules.CronJobUID || c.Rules.CronJobName || + c.Rules.ServiceName { for _, ref := range pod.OwnerReferences { switch ref.Kind { case "ReplicaSet": @@ -876,6 +949,10 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string { } } } + if c.Rules.CronJobUID { + // tags[string(conventions.K8SCronJobUIDKey)] = c.jobToCronJobUID[string(ref.UID)] + tags[string(conventions.K8SCronJobUIDKey)] = string(ref.UID) + } } } } @@ -1229,6 +1306,20 @@ func (c *WatchClient) extractJobAttributes(d *batch_v1.Job) map[string]string { return tags } +func (c *WatchClient) extractCronJobAttributes(d *batch_v1.CronJob) map[string]string { + tags := map[string]string{} + + for _, r := range c.Rules.Labels { + r.extractFromCronJobMetadata(d.Labels, tags, K8sCronJobLabel) + } + + for _, r := range c.Rules.Annotations { + r.extractFromCronJobMetadata(d.Annotations, tags, K8sCronJobAnnotation) + } + + return tags +} + func (c *WatchClient) podFromAPI(pod *api_v1.Pod) *Pod { newPod := &Pod{ Name: pod.Name, @@ -1250,15 +1341,15 @@ func (c *WatchClient) podFromAPI(pod *api_v1.Pod) *Pod { } } - if statefulset, ok := c.getStatefulSet(getPodStatefulSetUID(pod)); ok { + if statefulset, ok := c.GetStatefulSet(getPodStatefulSetUID(pod)); ok { newPod.StatefulSetUID = statefulset.UID } - if daemonset, ok := c.getDaemonSet(getPodDaemonSetUID(pod)); ok { + if daemonset, ok := c.GetDaemonSet(getPodDaemonSetUID(pod)); ok { newPod.DaemonSetUID = daemonset.UID } - if job, ok := c.getJob(getPodJobUID(pod)); ok { + if job, ok := c.GetJob(getPodJobUID(pod)); ok { newPod.JobUID = job.UID } @@ -1605,6 +1696,22 @@ func (c *WatchClient) extractJobLabelsAnnotations() bool { return false } +func (c *WatchClient) extractCronJobLabelsAnnotations() bool { + for _, r := range c.Rules.Labels { + if r.From == MetadataFromCronJob { + return true + } + } + + for _, r := range c.Rules.Annotations { + if r.From == MetadataFromCronJob { + return true + } + } + + return false +} + func (c *WatchClient) extractNodeLabelsAnnotations() bool { for _, r := range c.Rules.Labels { if r.From == MetadataFromNode { @@ -1688,9 +1795,38 @@ func (c *WatchClient) addOrUpdateJob(job *batch_v1.Job) { } newJob.Attributes = c.extractJobAttributes(job) + // Find CronJob controller owner (if any) and cache its UID + var cronJobUID string + for _, owner := range job.OwnerReferences { + if owner.Kind == "CronJob" { + cronJobUID = string(owner.UID) + break + } + } + c.m.Lock() if job.UID != "" { - c.Jobs[string(job.UID)] = newJob + jobUID := string(job.UID) + c.Jobs[jobUID] = newJob + if cronJobUID != "" { + c.jobToCronJobUID[jobUID] = cronJobUID + } else { + delete(c.jobToCronJobUID, jobUID) + } + } + c.m.Unlock() +} + +func (c *WatchClient) addOrUpdateCronJob(cronJob *batch_v1.CronJob) { + newCronJob := &CronJob{ + Name: cronJob.Name, + UID: string(cronJob.UID), + } + newCronJob.Attributes = c.extractCronJobAttributes(cronJob) + + c.m.Lock() + if cronJob.UID != "" { + c.CronJobs[string(cronJob.UID)] = newCronJob } c.m.Unlock() } @@ -1782,35 +1918,35 @@ func (c *WatchClient) getReplicaSet(uid string) (*ReplicaSet, bool) { return nil, false } -func (c *WatchClient) getStatefulSet(uid string) (*StatefulSet, bool) { - c.m.RLock() - statefulset, ok := c.StatefulSets[uid] - c.m.RUnlock() - if ok { - return statefulset, ok - } - return nil, false -} - -func (c *WatchClient) getDaemonSet(uid string) (*DaemonSet, bool) { - c.m.RLock() - daemonset, ok := c.DaemonSets[uid] - c.m.RUnlock() - if ok { - return daemonset, ok - } - return nil, false -} - -func (c *WatchClient) getJob(uid string) (*Job, bool) { - c.m.RLock() - job, ok := c.Jobs[uid] - c.m.RUnlock() - if ok { - return job, ok - } - return nil, false -} +// func (c *WatchClient) getStatefulSet(uid string) (*StatefulSet, bool) { +// c.m.RLock() +// statefulset, ok := c.StatefulSets[uid] +// c.m.RUnlock() +// if ok { +// return statefulset, ok +// } +// return nil, false +// } + +// func (c *WatchClient) getDaemonSet(uid string) (*DaemonSet, bool) { +// c.m.RLock() +// daemonset, ok := c.DaemonSets[uid] +// c.m.RUnlock() +// if ok { +// return daemonset, ok +// } +// return nil, false +// } + +// func (c *WatchClient) getJob(uid string) (*Job, bool) { +// c.m.RLock() +// job, ok := c.Jobs[uid] +// c.m.RUnlock() +// if ok { +// return job, ok +// } +// return nil, false +// } // runInformerWithDependencies starts the given informer. The second argument is a list of other informers that should complete // before the informer is started. This is necessary e.g. for the pod informer which requires the replica set informer diff --git a/processor/k8sattributesprocessor/internal/kube/informer.go b/processor/k8sattributesprocessor/internal/kube/informer.go index 5740f6128b130..fde815c4d6148 100644 --- a/processor/k8sattributesprocessor/internal/kube/informer.go +++ b/processor/k8sattributesprocessor/internal/kube/informer.go @@ -252,6 +252,33 @@ func jobWatchFuncWithSelectors(client kubernetes.Interface, namespace string) ca } } +func newCronJobSharedInformer( + client kubernetes.Interface, + namespace string, +) cache.SharedInformer { + informer := cache.NewSharedInformer( + &cache.ListWatch{ + ListFunc: cronJobListFuncWithSelectors(client, namespace), + WatchFunc: cronJobWatchFuncWithSelectors(client, namespace), + }, + &batch_v1.CronJob{}, + watchSyncPeriod, + ) + return informer +} + +func cronJobListFuncWithSelectors(client kubernetes.Interface, namespace string) cache.ListFunc { + return func(opts metav1.ListOptions) (runtime.Object, error) { + return client.BatchV1().CronJobs(namespace).List(context.Background(), opts) + } +} + +func cronJobWatchFuncWithSelectors(client kubernetes.Interface, namespace string) cache.WatchFunc { + return func(opts metav1.ListOptions) (watch.Interface, error) { + return client.BatchV1().CronJobs(namespace).Watch(context.Background(), opts) + } +} + func daemonsetListFuncWithSelectors(client kubernetes.Interface, namespace string) cache.ListFunc { return func(opts metav1.ListOptions) (runtime.Object, error) { return client.AppsV1().DaemonSets(namespace).List(context.Background(), opts) diff --git a/processor/k8sattributesprocessor/internal/kube/kube.go b/processor/k8sattributesprocessor/internal/kube/kube.go index 392ce5fff2299..2d3951dfccd7f 100644 --- a/processor/k8sattributesprocessor/internal/kube/kube.go +++ b/processor/k8sattributesprocessor/internal/kube/kube.go @@ -36,7 +36,9 @@ const ( // MetadataFromDaemonSet is used to specify to extract metadata/labels/annotations from daemonset MetadataFromDaemonSet = "daemonset" // MetadataFromJob is used to specify to extract metadata/labels/annotations from job - MetadataFromJob = "job" + MetadataFromJob = "job" + // MetadataFromCronJob is used to specify to extract metadata/labels/annotations from job + MetadataFromCronJob = "cronjob" PodIdentifierMaxLength = 4 ResourceSource = "resource_attribute" @@ -232,6 +234,7 @@ type LabelFilter struct { // from pods and added to the spans as tags. type ExtractionRules struct { CronJobName bool + CronJobUID bool DeploymentName bool DeploymentUID bool DaemonSetUID bool @@ -269,6 +272,7 @@ type ExtractionRules struct { func (rules *ExtractionRules) IncludesOwnerMetadata() bool { rulesNeedingOwnerMetadata := []bool{ rules.CronJobName, + rules.CronJobUID, rules.DeploymentName, rules.DeploymentUID, rules.DaemonSetUID, @@ -356,6 +360,12 @@ func (r *FieldExtractionRule) extractFromJobMetadata(metadata, tags map[string]s } } +func (r *FieldExtractionRule) extractFromCronJobMetadata(metadata, tags map[string]string, formatter string) { + if r.From == MetadataFromCronJob { + r.extractFromMetadata(metadata, tags, formatter) + } +} + func (r *FieldExtractionRule) extractFromMetadata(metadata, tags map[string]string, formatter string) { if r.KeyRegex != nil { for k, v := range metadata { @@ -451,6 +461,13 @@ type Job struct { Attributes map[string]string } +// CronJob represents a kubernetes cronjob. +type CronJob struct { + Name string + UID string + Attributes map[string]string +} + func OtelAnnotations() FieldExtractionRule { return FieldExtractionRule{ Name: "$1", diff --git a/processor/k8sattributesprocessor/internal/metadata/generated_config.go b/processor/k8sattributesprocessor/internal/metadata/generated_config.go index 84be118c837dd..09878e6ccd77f 100644 --- a/processor/k8sattributesprocessor/internal/metadata/generated_config.go +++ b/processor/k8sattributesprocessor/internal/metadata/generated_config.go @@ -34,6 +34,7 @@ type ResourceAttributesConfig struct { K8sClusterUID ResourceAttributeConfig `mapstructure:"k8s.cluster.uid"` K8sContainerName ResourceAttributeConfig `mapstructure:"k8s.container.name"` K8sCronjobName ResourceAttributeConfig `mapstructure:"k8s.cronjob.name"` + K8sCronjobUID ResourceAttributeConfig `mapstructure:"k8s.cronjob.uid"` K8sDaemonsetName ResourceAttributeConfig `mapstructure:"k8s.daemonset.name"` K8sDaemonsetUID ResourceAttributeConfig `mapstructure:"k8s.daemonset.uid"` K8sDeploymentName ResourceAttributeConfig `mapstructure:"k8s.deployment.name"` @@ -81,6 +82,9 @@ func DefaultResourceAttributesConfig() ResourceAttributesConfig { K8sCronjobName: ResourceAttributeConfig{ Enabled: false, }, + K8sCronjobUID: ResourceAttributeConfig{ + Enabled: false, + }, K8sDaemonsetName: ResourceAttributeConfig{ Enabled: false, }, diff --git a/processor/k8sattributesprocessor/internal/metadata/generated_config_test.go b/processor/k8sattributesprocessor/internal/metadata/generated_config_test.go index f755a22fc1b14..c18d153a8228c 100644 --- a/processor/k8sattributesprocessor/internal/metadata/generated_config_test.go +++ b/processor/k8sattributesprocessor/internal/metadata/generated_config_test.go @@ -32,6 +32,7 @@ func TestResourceAttributesConfig(t *testing.T) { K8sClusterUID: ResourceAttributeConfig{Enabled: true}, K8sContainerName: ResourceAttributeConfig{Enabled: true}, K8sCronjobName: ResourceAttributeConfig{Enabled: true}, + K8sCronjobUID: ResourceAttributeConfig{Enabled: true}, K8sDaemonsetName: ResourceAttributeConfig{Enabled: true}, K8sDaemonsetUID: ResourceAttributeConfig{Enabled: true}, K8sDeploymentName: ResourceAttributeConfig{Enabled: true}, @@ -66,6 +67,7 @@ func TestResourceAttributesConfig(t *testing.T) { K8sClusterUID: ResourceAttributeConfig{Enabled: false}, K8sContainerName: ResourceAttributeConfig{Enabled: false}, K8sCronjobName: ResourceAttributeConfig{Enabled: false}, + K8sCronjobUID: ResourceAttributeConfig{Enabled: false}, K8sDaemonsetName: ResourceAttributeConfig{Enabled: false}, K8sDaemonsetUID: ResourceAttributeConfig{Enabled: false}, K8sDeploymentName: ResourceAttributeConfig{Enabled: false}, diff --git a/processor/k8sattributesprocessor/internal/metadata/generated_resource.go b/processor/k8sattributesprocessor/internal/metadata/generated_resource.go index e0932b6eaea00..afc7bccd3ad06 100644 --- a/processor/k8sattributesprocessor/internal/metadata/generated_resource.go +++ b/processor/k8sattributesprocessor/internal/metadata/generated_resource.go @@ -70,6 +70,13 @@ func (rb *ResourceBuilder) SetK8sCronjobName(val string) { } } +// SetK8sCronjobUID sets provided value as "k8s.cronjob.uid" attribute. +func (rb *ResourceBuilder) SetK8sCronjobUID(val string) { + if rb.config.K8sCronjobUID.Enabled { + rb.res.Attributes().PutStr("k8s.cronjob.uid", val) + } +} + // SetK8sDaemonsetName sets provided value as "k8s.daemonset.name" attribute. func (rb *ResourceBuilder) SetK8sDaemonsetName(val string) { if rb.config.K8sDaemonsetName.Enabled { diff --git a/processor/k8sattributesprocessor/internal/metadata/generated_resource_test.go b/processor/k8sattributesprocessor/internal/metadata/generated_resource_test.go index 898a7f94debdf..24aaa49ae0ca3 100644 --- a/processor/k8sattributesprocessor/internal/metadata/generated_resource_test.go +++ b/processor/k8sattributesprocessor/internal/metadata/generated_resource_test.go @@ -20,6 +20,7 @@ func TestResourceBuilder(t *testing.T) { rb.SetK8sClusterUID("k8s.cluster.uid-val") rb.SetK8sContainerName("k8s.container.name-val") rb.SetK8sCronjobName("k8s.cronjob.name-val") + rb.SetK8sCronjobUID("k8s.cronjob.uid-val") rb.SetK8sDaemonsetName("k8s.daemonset.name-val") rb.SetK8sDaemonsetUID("k8s.daemonset.uid-val") rb.SetK8sDeploymentName("k8s.deployment.name-val") @@ -50,7 +51,7 @@ func TestResourceBuilder(t *testing.T) { case "default": assert.Equal(t, 8, res.Attributes().Len()) case "all_set": - assert.Equal(t, 29, res.Attributes().Len()) + assert.Equal(t, 30, res.Attributes().Len()) case "none_set": assert.Equal(t, 0, res.Attributes().Len()) return @@ -93,6 +94,11 @@ func TestResourceBuilder(t *testing.T) { if ok { assert.Equal(t, "k8s.cronjob.name-val", val.Str()) } + val, ok = res.Attributes().Get("k8s.cronjob.uid") + assert.Equal(t, tt == "all_set", ok) + if ok { + assert.Equal(t, "k8s.cronjob.uid-val", val.Str()) + } val, ok = res.Attributes().Get("k8s.daemonset.name") assert.Equal(t, tt == "all_set", ok) if ok { diff --git a/processor/k8sattributesprocessor/internal/metadata/generated_telemetry.go b/processor/k8sattributesprocessor/internal/metadata/generated_telemetry.go index e5ecb1c810113..f4d1770490720 100644 --- a/processor/k8sattributesprocessor/internal/metadata/generated_telemetry.go +++ b/processor/k8sattributesprocessor/internal/metadata/generated_telemetry.go @@ -26,6 +26,9 @@ type TelemetryBuilder struct { meter metric.Meter mu sync.Mutex registrations []metric.Registration + OtelsvcK8sCronjobAdded metric.Int64Counter + OtelsvcK8sCronjobDeleted metric.Int64Counter + OtelsvcK8sCronjobUpdated metric.Int64Counter OtelsvcK8sDaemonsetAdded metric.Int64Counter OtelsvcK8sDaemonsetDeleted metric.Int64Counter OtelsvcK8sDaemonsetUpdated metric.Int64Counter @@ -83,6 +86,24 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme } builder.meter = Meter(settings) var err, errs error + builder.OtelsvcK8sCronjobAdded, err = builder.meter.Int64Counter( + "otelcol_otelsvc_k8s_cronjob_added", + metric.WithDescription("Number of cron job add events received"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.OtelsvcK8sCronjobDeleted, err = builder.meter.Int64Counter( + "otelcol_otelsvc_k8s_cronjob_deleted", + metric.WithDescription("Number of cron job delete events received"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.OtelsvcK8sCronjobUpdated, err = builder.meter.Int64Counter( + "otelcol_otelsvc_k8s_cronjob_updated", + metric.WithDescription("Number of cron job update events received"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) builder.OtelsvcK8sDaemonsetAdded, err = builder.meter.Int64Counter( "otelcol_otelsvc_k8s_daemonset_added", metric.WithDescription("Number of daemonset add events received"), diff --git a/processor/k8sattributesprocessor/internal/metadata/testdata/config.yaml b/processor/k8sattributesprocessor/internal/metadata/testdata/config.yaml index 908fffe72d169..615cdbf8c66dd 100644 --- a/processor/k8sattributesprocessor/internal/metadata/testdata/config.yaml +++ b/processor/k8sattributesprocessor/internal/metadata/testdata/config.yaml @@ -15,6 +15,8 @@ all_set: enabled: true k8s.cronjob.name: enabled: true + k8s.cronjob.uid: + enabled: true k8s.daemonset.name: enabled: true k8s.daemonset.uid: @@ -75,6 +77,8 @@ none_set: enabled: false k8s.cronjob.name: enabled: false + k8s.cronjob.uid: + enabled: false k8s.daemonset.name: enabled: false k8s.daemonset.uid: diff --git a/processor/k8sattributesprocessor/internal/metadatatest/generated_telemetrytest.go b/processor/k8sattributesprocessor/internal/metadatatest/generated_telemetrytest.go index 9ad1c6bc51740..8f0775b438577 100644 --- a/processor/k8sattributesprocessor/internal/metadatatest/generated_telemetrytest.go +++ b/processor/k8sattributesprocessor/internal/metadatatest/generated_telemetrytest.go @@ -21,6 +21,54 @@ func NewSettings(tt *componenttest.Telemetry) processor.Settings { return set } +func AssertEqualOtelsvcK8sCronjobAdded(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_otelsvc_k8s_cronjob_added", + Description: "Number of cron job add events received", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_otelsvc_k8s_cronjob_added") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + +func AssertEqualOtelsvcK8sCronjobDeleted(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_otelsvc_k8s_cronjob_deleted", + Description: "Number of cron job delete events received", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_otelsvc_k8s_cronjob_deleted") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + +func AssertEqualOtelsvcK8sCronjobUpdated(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_otelsvc_k8s_cronjob_updated", + Description: "Number of cron job update events received", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_otelsvc_k8s_cronjob_updated") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + func AssertEqualOtelsvcK8sDaemonsetAdded(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { want := metricdata.Metrics{ Name: "otelcol_otelsvc_k8s_daemonset_added", diff --git a/processor/k8sattributesprocessor/internal/metadatatest/generated_telemetrytest_test.go b/processor/k8sattributesprocessor/internal/metadatatest/generated_telemetrytest_test.go index 69717d33c56a7..7ccef748f0487 100644 --- a/processor/k8sattributesprocessor/internal/metadatatest/generated_telemetrytest_test.go +++ b/processor/k8sattributesprocessor/internal/metadatatest/generated_telemetrytest_test.go @@ -20,6 +20,9 @@ func TestSetupTelemetry(t *testing.T) { tb, err := metadata.NewTelemetryBuilder(testTel.NewTelemetrySettings()) require.NoError(t, err) defer tb.Shutdown() + tb.OtelsvcK8sCronjobAdded.Add(context.Background(), 1) + tb.OtelsvcK8sCronjobDeleted.Add(context.Background(), 1) + tb.OtelsvcK8sCronjobUpdated.Add(context.Background(), 1) tb.OtelsvcK8sDaemonsetAdded.Add(context.Background(), 1) tb.OtelsvcK8sDaemonsetDeleted.Add(context.Background(), 1) tb.OtelsvcK8sDaemonsetUpdated.Add(context.Background(), 1) @@ -46,6 +49,15 @@ func TestSetupTelemetry(t *testing.T) { tb.OtelsvcK8sStatefulsetAdded.Add(context.Background(), 1) tb.OtelsvcK8sStatefulsetDeleted.Add(context.Background(), 1) tb.OtelsvcK8sStatefulsetUpdated.Add(context.Background(), 1) + AssertEqualOtelsvcK8sCronjobAdded(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + AssertEqualOtelsvcK8sCronjobDeleted(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + AssertEqualOtelsvcK8sCronjobUpdated(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) AssertEqualOtelsvcK8sDaemonsetAdded(t, testTel, []metricdata.DataPoint[int64]{{Value: 1}}, metricdatatest.IgnoreTimestamp()) diff --git a/processor/k8sattributesprocessor/metadata.yaml b/processor/k8sattributesprocessor/metadata.yaml index 2b9b10ce3e6d8..41c071620bf1a 100644 --- a/processor/k8sattributesprocessor/metadata.yaml +++ b/processor/k8sattributesprocessor/metadata.yaml @@ -88,6 +88,10 @@ resource_attributes: description: The name of the CronJob. type: string enabled: false + k8s.cronjob.uid: + description: The uid of the CronJob. + type: string + enabled: false k8s.node.name: description: The name of the Node. type: string @@ -276,6 +280,27 @@ telemetry: sum: value_type: int monotonic: true + otelsvc_k8s_cronjob_updated: + enabled: false + description: Number of cron job update events received + unit: "1" + sum: + value_type: int + monotonic: true + otelsvc_k8s_cronjob_added: + enabled: false + description: Number of cron job add events received + unit: "1" + sum: + value_type: int + monotonic: true + otelsvc_k8s_cronjob_deleted: + enabled: false + description: Number of cron job delete events received + unit: "1" + sum: + value_type: int + monotonic: true otelsvc_k8s_daemonset_updated: enabled: false description: Number of daemonset update events received diff --git a/processor/k8sattributesprocessor/options.go b/processor/k8sattributesprocessor/options.go index 1cef0c914511f..76e732b1dbf0c 100644 --- a/processor/k8sattributesprocessor/options.go +++ b/processor/k8sattributesprocessor/options.go @@ -79,6 +79,9 @@ func enabledAttributes() (attributes []string) { if defaultConfig.K8sCronjobName.Enabled { attributes = append(attributes, string(conventions.K8SCronJobNameKey)) } + if defaultConfig.K8sCronjobUID.Enabled { + attributes = append(attributes, string(conventions.K8SCronJobUIDKey)) + } if defaultConfig.K8sDaemonsetName.Enabled { attributes = append(attributes, string(conventions.K8SDaemonSetNameKey)) } @@ -190,6 +193,8 @@ func withExtractMetadata(fields ...string) option { p.rules.JobUID = true case string(conventions.K8SCronJobNameKey): p.rules.CronJobName = true + case string(conventions.K8SCronJobUIDKey): + p.rules.CronJobUID = true case string(conventions.K8SNodeNameKey): p.rules.Node = true case string(conventions.K8SNodeUIDKey): From 72e688c9c1562baf2b707f8b317c7e21327b4d50 Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Thu, 11 Sep 2025 15:50:49 +0100 Subject: [PATCH 02/16] fix: update rule Signed-off-by: Paulo Dias --- processor/k8sattributesprocessor/internal/kube/client.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index fa1cf12526f5e..e8abb3e0a8335 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -960,8 +960,7 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string { } } if c.Rules.CronJobUID { - // tags[string(conventions.K8SCronJobUIDKey)] = c.jobToCronJobUID[string(ref.UID)] - tags[string(conventions.K8SCronJobUIDKey)] = string(ref.UID) + tags[string(conventions.K8SCronJobUIDKey)] = c.jobToCronJobUID[string(ref.UID)] } } } From a516dd962bde94f083b8200e874761067aeb3331 Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Thu, 11 Sep 2025 15:55:18 +0100 Subject: [PATCH 03/16] chore: changelog Signed-off-by: Paulo Dias --- .chloggen/feat_42557.yaml | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 .chloggen/feat_42557.yaml diff --git a/.chloggen/feat_42557.yaml b/.chloggen/feat_42557.yaml new file mode 100644 index 0000000000000..ceea3ab75ee2d --- /dev/null +++ b/.chloggen/feat_42557.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "enhancement" + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: "processor/k8sattributesprocessor" + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Add support for k8s.cronjob.uid attribute in k8sattributesprocessor" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [42557] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] From 4384437cc7608af25dd59e77b9bbb8c9d5c41f71 Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Thu, 11 Sep 2025 17:45:36 +0100 Subject: [PATCH 04/16] feat: add missing tests Signed-off-by: Paulo Dias --- .../k8sattributesprocessor/client_test.go | 12 +++ .../internal/kube/client.go | 4 +- .../internal/kube/client_test.go | 98 +++++++++++++++++++ .../internal/kube/informer_test.go | 7 ++ 4 files changed, 120 insertions(+), 1 deletion(-) diff --git a/processor/k8sattributesprocessor/client_test.go b/processor/k8sattributesprocessor/client_test.go index 7d3c44ce37f4b..65cad1ebe9f8d 100644 --- a/processor/k8sattributesprocessor/client_test.go +++ b/processor/k8sattributesprocessor/client_test.go @@ -31,7 +31,9 @@ type fakeClient struct { Deployments map[string]*kube.Deployment StatefulSets map[string]*kube.StatefulSet DaemonSets map[string]*kube.DaemonSet + ReplicaSets map[string]*kube.ReplicaSet Jobs map[string]*kube.Job + CronJobs map[string]*kube.CronJob StopCh chan struct{} } @@ -90,11 +92,21 @@ func (f *fakeClient) GetDaemonSet(daemonsetUID string) (*kube.DaemonSet, bool) { return s, ok } +func (f *fakeClient) GetReplicaSet(replicaSetUID string) (*kube.ReplicaSet, bool) { + rs, ok := f.ReplicaSets[replicaSetUID] + return rs, ok +} + func (f *fakeClient) GetJob(jobUID string) (*kube.Job, bool) { j, ok := f.Jobs[jobUID] return j, ok } +func (f *fakeClient) GetCronJob(cronJobUID string) (*kube.CronJob, bool) { + cj, ok := f.CronJobs[cronJobUID] + return cj, ok +} + // Start is a noop for FakeClient. func (f *fakeClient) Start() error { if f.Informer != nil { diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index e8abb3e0a8335..6bdaf651ad444 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -960,7 +960,9 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string { } } if c.Rules.CronJobUID { - tags[string(conventions.K8SCronJobUIDKey)] = c.jobToCronJobUID[string(ref.UID)] + if cronJob, ok := c.jobToCronJobUID[string(ref.UID)]; ok && cronJob != "" { + tags[string(conventions.K8SCronJobUIDKey)] = cronJob + } } } } diff --git a/processor/k8sattributesprocessor/internal/kube/client_test.go b/processor/k8sattributesprocessor/internal/kube/client_test.go index 98c0c7daadbe3..c7234c74afab9 100644 --- a/processor/k8sattributesprocessor/internal/kube/client_test.go +++ b/processor/k8sattributesprocessor/internal/kube/client_test.go @@ -3126,3 +3126,101 @@ func TestGetIdentifiersFromAssoc(t *testing.T) { }) } } + +func TestCronJobUIDResolutionFromJob(t *testing.T) { + c, _ := newTestClientWithRulesAndFilters(t, Filters{}) + c.Rules = ExtractionRules{ + CronJobUID: true, // ensures owner refs are kept by removeUnnecessaryPodData + } + + // 1) Add a Job that is owned by a CronJob + job := &batch_v1.Job{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: "my-cronjob-27667920", + UID: "job-uid-123", + OwnerReferences: []meta_v1.OwnerReference{ + { + APIVersion: "batch/v1", + Kind: "CronJob", + Name: "my-cronjob", + UID: "cron-uid-999", + }, + }, + }, + } + c.handleJobAdd(job) + + // 2) Pod owned by that Job + pod := &api_v1.Pod{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: "my-cronjob-27667920-pod", + Namespace: "default", + OwnerReferences: []meta_v1.OwnerReference{ + { + APIVersion: "batch/v1", + Kind: "Job", + Name: "my-cronjob-27667920", + UID: "job-uid-123", + }, + }, + }, + Status: api_v1.PodStatus{ + PodIP: "10.0.0.5", + }, + } + + // The informer transform keeps owner refs when CronJobUID rule is enabled. + transformed := removeUnnecessaryPodData(pod, c.Rules) + c.handlePodAdd(transformed) + + p, ok := c.GetPod(newPodIdentifier("connection", "", "10.0.0.5")) + require.True(t, ok) + + // Assert the pod got k8s.cronjob.uid from the cached Job + got, exists := p.Attributes[string(conventions.K8SCronJobUIDKey)] + require.True(t, exists, "expected k8s.cronjob.uid to be set") + assert.Equal(t, "cron-uid-999", got) +} + +func TestCronJobUIDResolution_NoOwner(t *testing.T) { + c, _ := newTestClientWithRulesAndFilters(t, Filters{}) + c.Rules = ExtractionRules{ + CronJobUID: true, + } + + // Job without a CronJob owner + job := &batch_v1.Job{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: "ad-hoc-job", + UID: "job-uid-456", + }, + } + c.handleJobAdd(job) + + pod := &api_v1.Pod{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: "ad-hoc-job-pod", + Namespace: "default", + OwnerReferences: []meta_v1.OwnerReference{ + { + APIVersion: "batch/v1", + Kind: "Job", + Name: "ad-hoc-job", + UID: "job-uid-456", + }, + }, + }, + Status: api_v1.PodStatus{ + PodIP: "10.0.0.6", + }, + } + + transformed := removeUnnecessaryPodData(pod, c.Rules) + c.handlePodAdd(transformed) + + p, ok := c.GetPod(newPodIdentifier("connection", "", "10.0.0.6")) + require.True(t, ok) + + _, exists := p.Attributes[string(conventions.K8SCronJobUIDKey)] + assert.False(t, exists, "did not expect k8s.cronjob.uid when Job has no CronJob owner") +} diff --git a/processor/k8sattributesprocessor/internal/kube/informer_test.go b/processor/k8sattributesprocessor/internal/kube/informer_test.go index 107c05013f181..87b8e844107b9 100644 --- a/processor/k8sattributesprocessor/internal/kube/informer_test.go +++ b/processor/k8sattributesprocessor/internal/kube/informer_test.go @@ -61,6 +61,13 @@ func Test_newSharedJobInformer(t *testing.T) { assert.NotNil(t, informer) } +func Test_newSharedCronJobInformer(t *testing.T) { + client, err := newFakeAPIClientset(k8sconfig.APIConfig{}) + require.NoError(t, err) + informer := newCronJobSharedInformer(client, "ns") + assert.NotNil(t, informer) +} + func Test_newKubeSystemSharedInformer(t *testing.T) { client, err := newFakeAPIClientset(k8sconfig.APIConfig{}) require.NoError(t, err) From 4fb266164b14c3c0d882099948952f48b4917674 Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Thu, 11 Sep 2025 17:56:11 +0100 Subject: [PATCH 05/16] chore: update readme.md Signed-off-by: Paulo Dias --- processor/k8sattributesprocessor/README.md | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/processor/k8sattributesprocessor/README.md b/processor/k8sattributesprocessor/README.md index 893f578d08d06..eff4462da657f 100644 --- a/processor/k8sattributesprocessor/README.md +++ b/processor/k8sattributesprocessor/README.md @@ -32,7 +32,7 @@ The processor stores the list of running pods and the associated metadata. When to the pod from where the datapoint originated, so we can add the relevant pod metadata to the datapoint. By default, it associates the incoming connection IP to the Pod IP. But for cases where this approach doesn't work (sending through a proxy, etc.), a custom association rule can be specified. -Each association is specified as a list of sources of associations. The maximum number of sources within an association is 4. +Each association is specified as a list of sources of associations. The maximum number of sources within an association is 4. A source is a rule that matches metadata from the datapoint to pod metadata. In order to get an association applied, all the sources specified need to match. @@ -63,7 +63,7 @@ If Pod association rules are not configured, resources are associated with metad Which metadata to collect is determined by `metadata` configuration that defines list of resource attributes to be added. Items in the list called exactly the same as the resource attributes that will be added. -The following attributes are added by default: +The following attributes are added by default: - k8s.namespace.name - k8s.pod.name - k8s.pod.uid @@ -71,8 +71,8 @@ The following attributes are added by default: - k8s.deployment.name - k8s.node.name -These attributes are also available for the use within association rules by default. -The `metadata` section can also be extended with additional attributes which, if present in the `metadata` section, +These attributes are also available for the use within association rules by default. +The `metadata` section can also be extended with additional attributes which, if present in the `metadata` section, are then also available for the use within association rules. Available attributes are: - k8s.namespace.name - k8s.pod.name @@ -100,7 +100,7 @@ are then also available for the use within association rules. Available attribut - [service.instance.id](https://opentelemetry.io/docs/specs/semconv/non-normative/k8s-attributes/#how-serviceinstanceid-should-be-calculated)(cannot be used for source rules in the pod_association) - Any tags extracted from the pod labels and annotations, as described in [extracting attributes from pod labels and annotations](#extracting-attributes-from-pod-labels-and-annotations) -Not all the attributes are guaranteed to be added. Only attribute names from `metadata` should be used for +Not all the attributes are guaranteed to be added. Only attribute names from `metadata` should be used for pod_association's `resource_attribute`, because empty or non-existing values will be ignored. Additional container level attributes can be extracted. If a pod contains more than one container, @@ -204,7 +204,7 @@ the processor associates the received trace to the pod, based on the connection "k8s.pod.name": "telemetrygen-pod", "k8s.pod.uid": "038e2267-b473-489b-b48c-46bafdb852eb", "container.image.name": "telemetrygen", - "container.image.tag": "0.112.0", + "container.image.tag": "0.112.0", "container.image.repo_digests": ["ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:b248ef911f93ae27cbbc85056d1ffacc87fd941bbdc2ffd951b6df8df72b8096"] } } @@ -262,16 +262,16 @@ extract: from: node ``` -## Configuring recommended resource attributes +## Configuring recommended resource attributes -The processor can be configured to set the +The processor can be configured to set the [recommended resource attributes](https://opentelemetry.io/docs/specs/semconv/non-normative/k8s-attributes/): - `otel_annotations` will translate `resource.opentelemetry.io/foo` to the `foo` resource attribute, etc. ```yaml extract: - otel_annotations: true + otel_annotations: true metadata: - service.namespace - service.name @@ -306,7 +306,7 @@ k8sattributes: - tag_name: app.label.component key: app.kubernetes.io/component from: pod - otel_annotations: true + otel_annotations: true pod_association: - sources: # This rule associates all resources containing the 'k8s.pod.ip' attribute with the matching pods. If this attribute is not present in the resource, this rule will not be able to find the matching pod. @@ -400,7 +400,7 @@ rules: resources: ["replicasets", "deployments", "statefulsets", "daemonsets"] verbs: ["get", "list", "watch"] - apiGroups: ["batch"] - resources: ["jobs"] + resources: ["cronjobs","jobs"] verbs: ["get", "list", "watch"] --- apiVersion: rbac.authorization.k8s.io/v1 From 74f3a5a849a11cb4bd5cbdd52ae372cd2d7184e6 Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Wed, 17 Sep 2025 00:04:41 +0100 Subject: [PATCH 06/16] feat: refactor k8s.cronjob.uid evaluation Signed-off-by: Paulo Dias --- processor/k8sattributesprocessor/README.md | 2 +- processor/k8sattributesprocessor/config.go | 2 +- .../internal/kube/client.go | 184 ++++-------- .../internal/kube/client_test.go | 264 +++++++++++++----- .../internal/kube/kube.go | 16 +- 5 files changed, 258 insertions(+), 210 deletions(-) diff --git a/processor/k8sattributesprocessor/README.md b/processor/k8sattributesprocessor/README.md index eff4462da657f..247a7ae7d6618 100644 --- a/processor/k8sattributesprocessor/README.md +++ b/processor/k8sattributesprocessor/README.md @@ -400,7 +400,7 @@ rules: resources: ["replicasets", "deployments", "statefulsets", "daemonsets"] verbs: ["get", "list", "watch"] - apiGroups: ["batch"] - resources: ["cronjobs","jobs"] + resources: ["jobs"] verbs: ["get", "list", "watch"] --- apiVersion: rbac.authorization.k8s.io/v1 diff --git a/processor/k8sattributesprocessor/config.go b/processor/k8sattributesprocessor/config.go index 6db8467cd7ca4..50a369b43ef7f 100644 --- a/processor/k8sattributesprocessor/config.go +++ b/processor/k8sattributesprocessor/config.go @@ -75,7 +75,7 @@ func (cfg *Config) Validate() error { } switch f.From { - case "", kube.MetadataFromPod, kube.MetadataFromNamespace, kube.MetadataFromNode, kube.MetadataFromDeployment, kube.MetadataFromStatefulSet, kube.MetadataFromDaemonSet, kube.MetadataFromJob, kube.MetadataFromCronJob: + case "", kube.MetadataFromPod, kube.MetadataFromNamespace, kube.MetadataFromNode, kube.MetadataFromDeployment, kube.MetadataFromStatefulSet, kube.MetadataFromDaemonSet, kube.MetadataFromJob: default: return fmt.Errorf("%s is not a valid choice for From. Must be one of: pod, namespace, deployment, statefulset, daemonset, job, node", f.From) } diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index 6bdaf651ad444..d9a6159e47407 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -63,9 +63,6 @@ const ( // Semconv attributes https://github.com/open-telemetry/semantic-conventions/blob/main/docs/resource/k8s.md#job K8sJobLabel = "k8s.job.label.%s" K8sJobAnnotation = "k8s.job.annotation.%s" - // Semconv attributes https://github.com/open-telemetry/semantic-conventions/blob/main/docs/resource/k8s.md#cronjob - K8sCronJobLabel = "k8s.cronjob.label.%s" - K8sCronJobAnnotation = "k8s.cronjob.annotation.%s" ) var allowLabelsAnnotationsSingular = featuregate.GlobalRegistry().MustRegister( @@ -89,7 +86,6 @@ type WatchClient struct { daemonsetInformer cache.SharedInformer jobInformer cache.SharedInformer replicasetInformer cache.SharedInformer - cronJobInformer cache.SharedInformer replicasetRegex *regexp.Regexp cronJobRegex *regexp.Regexp deleteQueue []deleteRequest @@ -129,9 +125,6 @@ type WatchClient struct { // Key is job uid Jobs map[string]*Job - // JobUID -> CronJobUID relation (computed from Job ownerReferences) - jobToCronJobUID map[string]string - // A map containing cron job related data, used to associate them with resources. // Key is cron job uid CronJobs map[string]*CronJob @@ -199,7 +192,6 @@ func New( c.StatefulSets = map[string]*StatefulSet{} c.DaemonSets = map[string]*DaemonSet{} c.Jobs = map[string]*Job{} - c.jobToCronJobUID = map[string]string{} c.CronJobs = map[string]*CronJob{} if newClientSet == nil { newClientSet = k8sconfig.MakeClient @@ -296,10 +288,6 @@ func New( c.jobInformer = newJobSharedInformer(c.kc, c.Filters.Namespace) } - if c.extractCronJobLabelsAnnotations() { - c.cronJobInformer = newCronJobSharedInformer(c.kc, c.Filters.Namespace) - } - return c, err } @@ -397,19 +385,6 @@ func (c *WatchClient) Start() error { go c.jobInformer.Run(c.stopCh) } - if c.cronJobInformer != nil { - reg, err = c.cronJobInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: c.handleCronJobAdd, - UpdateFunc: c.handleCronJobUpdate, - DeleteFunc: c.handleCronJobDelete, - }) - if err != nil { - return err - } - synced = append(synced, reg.HasSynced) - go c.cronJobInformer.Run(c.stopCh) - } - reg, err = c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handlePodAdd, UpdateFunc: c.handlePodUpdate, @@ -660,44 +635,12 @@ func (c *WatchClient) handleJobDelete(obj any) { if n, ok := c.Jobs[string(job.UID)]; ok { delete(c.Jobs, n.UID) } - delete(c.jobToCronJobUID, string(job.UID)) c.m.Unlock() } else { c.logger.Error("object received was not of type api_v1.Job", zap.Any("received", obj)) } } -func (c *WatchClient) handleCronJobAdd(obj any) { - c.telemetryBuilder.OtelsvcK8sCronjobAdded.Add(context.Background(), 1) - if cronJob, ok := obj.(*batch_v1.CronJob); ok { - c.addOrUpdateCronJob(cronJob) - } else { - c.logger.Error("object received was not of type batch_v1.CronJob", zap.Any("received", obj)) - } -} - -func (c *WatchClient) handleCronJobUpdate(_, newCJ any) { - c.telemetryBuilder.OtelsvcK8sCronjobUpdated.Add(context.Background(), 1) - if cronJob, ok := newCJ.(*batch_v1.CronJob); ok { - c.addOrUpdateCronJob(cronJob) - } else { - c.logger.Error("object received was not of type batch_v1.CronJob", zap.Any("received", newCJ)) - } -} - -func (c *WatchClient) handleCronJobDelete(obj any) { - c.telemetryBuilder.OtelsvcK8sCronjobDeleted.Add(context.Background(), 1) - if cronJob, ok := ignoreDeletedFinalStateUnknown(obj).(*batch_v1.CronJob); ok { - c.m.Lock() - if n, ok := c.CronJobs[string(cronJob.UID)]; ok { - delete(c.CronJobs, n.UID) - } - c.m.Unlock() - } else { - c.logger.Error("object received was not of type batch_v1.CronJob", zap.Any("received", obj)) - } -} - func (c *WatchClient) deleteLoop(interval, gracePeriod time.Duration) { // This loop runs after N seconds and deletes pods from cache. // It iterates over the delete queue and deletes all that aren't @@ -825,16 +768,6 @@ func (c *WatchClient) GetJob(jobUID string) (*Job, bool) { return nil, false } -func (c *WatchClient) GetCronJob(cronJobUID string) (*CronJob, bool) { - c.m.RLock() - cronJob, ok := c.CronJobs[cronJobUID] - c.m.RUnlock() - if ok { - return cronJob, ok - } - return nil, false -} - func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string { tags := map[string]string{} if c.Rules.PodName { @@ -947,21 +880,33 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string { tags[string(conventions.ServiceNameKey)] = ref.Name } if c.Rules.CronJobName || c.Rules.ServiceName { - parts := c.cronJobRegex.FindStringSubmatch(ref.Name) - if len(parts) == 2 { - name := parts[1] - if c.Rules.CronJobName { - tags[string(conventions.K8SCronJobNameKey)] = name + if job, ok := c.GetJob(string(ref.UID)); ok { + if job.CronJob.Name != "" { + if c.Rules.CronJobName { + tags[string(conventions.K8SCronJobNameKey)] = job.CronJob.Name + } + if c.Rules.ServiceName { + tags[string(conventions.ServiceNameKey)] = job.CronJob.Name + } } - if c.Rules.ServiceName { - // cronjob name wins over job name - tags[string(conventions.ServiceNameKey)] = name + } else { + parts := c.cronJobRegex.FindStringSubmatch(ref.Name) + if len(parts) == 2 { + name := parts[1] + if c.Rules.CronJobName { + tags[string(conventions.K8SCronJobNameKey)] = name + } + if c.Rules.ServiceName { + tags[string(conventions.ServiceNameKey)] = name + } } } } if c.Rules.CronJobUID { - if cronJob, ok := c.jobToCronJobUID[string(ref.UID)]; ok && cronJob != "" { - tags[string(conventions.K8SCronJobUIDKey)] = cronJob + if job, ok := c.GetJob(string(ref.UID)); ok { + if job.CronJob.UID != "" { + tags[string(conventions.K8SCronJobUIDKey)] = job.CronJob.UID + } } } } @@ -1317,20 +1262,6 @@ func (c *WatchClient) extractJobAttributes(d *batch_v1.Job) map[string]string { return tags } -func (c *WatchClient) extractCronJobAttributes(d *batch_v1.CronJob) map[string]string { - tags := map[string]string{} - - for _, r := range c.Rules.Labels { - r.extractFromCronJobMetadata(d.Labels, tags, K8sCronJobLabel) - } - - for _, r := range c.Rules.Annotations { - r.extractFromCronJobMetadata(d.Annotations, tags, K8sCronJobAnnotation) - } - - return tags -} - func (c *WatchClient) podFromAPI(pod *api_v1.Pod) *Pod { newPod := &Pod{ Name: pod.Name, @@ -1707,22 +1638,6 @@ func (c *WatchClient) extractJobLabelsAnnotations() bool { return false } -func (c *WatchClient) extractCronJobLabelsAnnotations() bool { - for _, r := range c.Rules.Labels { - if r.From == MetadataFromCronJob { - return true - } - } - - for _, r := range c.Rules.Annotations { - if r.From == MetadataFromCronJob { - return true - } - } - - return false -} - func (c *WatchClient) extractNodeLabelsAnnotations() bool { for _, r := range c.Rules.Labels { if r.From == MetadataFromNode { @@ -1806,38 +1721,19 @@ func (c *WatchClient) addOrUpdateJob(job *batch_v1.Job) { } newJob.Attributes = c.extractJobAttributes(job) - // Find CronJob controller owner (if any) and cache its UID - var cronJobUID string - for _, owner := range job.OwnerReferences { - if owner.Kind == "CronJob" { - cronJobUID = string(owner.UID) + for _, ownerReference := range job.OwnerReferences { + if ownerReference.Kind == "CronJob" && ownerReference.Controller != nil && *ownerReference.Controller { + newJob.CronJob = CronJob{ + Name: ownerReference.Name, + UID: string(ownerReference.UID), + } break } } c.m.Lock() if job.UID != "" { - jobUID := string(job.UID) - c.Jobs[jobUID] = newJob - if cronJobUID != "" { - c.jobToCronJobUID[jobUID] = cronJobUID - } else { - delete(c.jobToCronJobUID, jobUID) - } - } - c.m.Unlock() -} - -func (c *WatchClient) addOrUpdateCronJob(cronJob *batch_v1.CronJob) { - newCronJob := &CronJob{ - Name: cronJob.Name, - UID: string(cronJob.UID), - } - newCronJob.Attributes = c.extractCronJobAttributes(cronJob) - - c.m.Lock() - if cronJob.UID != "" { - c.CronJobs[string(cronJob.UID)] = newCronJob + c.Jobs[string(job.UID)] = newJob } c.m.Unlock() } @@ -1919,6 +1815,28 @@ func removeUnnecessaryReplicaSetData(replicaset *apps_v1.ReplicaSet) *apps_v1.Re return &transformedReplicaset } +// This function removes all data from the Job except what is required by extraction rules +func removeUnnecessaryJobData(job *batch_v1.Job) *batch_v1.Job { + transformedJob := &batch_v1.Job{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: job.GetName(), + Namespace: job.GetNamespace(), + UID: job.GetUID(), + }, + } + + // Keep only controller owners + var filtered []meta_v1.OwnerReference + for _, or := range job.GetOwnerReferences() { + if or.Controller != nil && *or.Controller { + filtered = append(filtered, or) + } + } + transformedJob.SetOwnerReferences(filtered) + + return transformedJob +} + // runInformerWithDependencies starts the given informer. The second argument is a list of other informers that should complete // before the informer is started. This is necessary e.g. for the pod informer which requires the replica set informer // to be finished to correctly establish the connection to the replicaset/deployment it belongs to. diff --git a/processor/k8sattributesprocessor/internal/kube/client_test.go b/processor/k8sattributesprocessor/internal/kube/client_test.go index c7234c74afab9..147c540a05d92 100644 --- a/processor/k8sattributesprocessor/internal/kube/client_test.go +++ b/processor/k8sattributesprocessor/internal/kube/client_test.go @@ -3127,34 +3127,18 @@ func TestGetIdentifiersFromAssoc(t *testing.T) { } } -func TestCronJobUIDResolutionFromJob(t *testing.T) { +func TestCronJobExtractionRules_FromJobOwner(t *testing.T) { c, _ := newTestClientWithRulesAndFilters(t, Filters{}) - c.Rules = ExtractionRules{ - CronJobUID: true, // ensures owner refs are kept by removeUnnecessaryPodData - } - - // 1) Add a Job that is owned by a CronJob - job := &batch_v1.Job{ - ObjectMeta: meta_v1.ObjectMeta{ - Name: "my-cronjob-27667920", - UID: "job-uid-123", - OwnerReferences: []meta_v1.OwnerReference{ - { - APIVersion: "batch/v1", - Kind: "CronJob", - Name: "my-cronjob", - UID: "cron-uid-999", - }, - }, - }, - } - c.handleJobAdd(job) + // Disable saving ip into k8s.pod.ip so attributes length assertions stay predictable + c.Associations[0].Sources[0].Name = "" - // 2) Pod owned by that Job + // Pod owned by a Job pod := &api_v1.Pod{ ObjectMeta: meta_v1.ObjectMeta{ - Name: "my-cronjob-27667920-pod", - Namespace: "default", + Name: "my-cronjob-27667920-pod", + UID: "pod-uid-1", + Namespace: "ns1", + CreationTimestamp: meta_v1.Now(), OwnerReferences: []meta_v1.OwnerReference{ { APIVersion: "batch/v1", @@ -3164,63 +3148,215 @@ func TestCronJobUIDResolutionFromJob(t *testing.T) { }, }, }, + Spec: api_v1.PodSpec{ + NodeName: "node1", + }, Status: api_v1.PodStatus{ - PodIP: "10.0.0.5", + PodIP: "1.1.1.1", }, } - // The informer transform keeps owner refs when CronJobUID rule is enabled. - transformed := removeUnnecessaryPodData(pod, c.Rules) - c.handlePodAdd(transformed) - - p, ok := c.GetPod(newPodIdentifier("connection", "", "10.0.0.5")) - require.True(t, ok) - - // Assert the pod got k8s.cronjob.uid from the cached Job - got, exists := p.Attributes[string(conventions.K8SCronJobUIDKey)] - require.True(t, exists, "expected k8s.cronjob.uid to be set") - assert.Equal(t, "cron-uid-999", got) -} - -func TestCronJobUIDResolution_NoOwner(t *testing.T) { - c, _ := newTestClientWithRulesAndFilters(t, Filters{}) - c.Rules = ExtractionRules{ - CronJobUID: true, - } - - // Job without a CronJob owner + // The Job object the pod points to (we'll mutate OwnerReferences per test case) job := &batch_v1.Job{ ObjectMeta: meta_v1.ObjectMeta{ - Name: "ad-hoc-job", - UID: "job-uid-456", + Name: "my-cronjob-27667920", + Namespace: "ns1", + UID: "job-uid-123", }, } - c.handleJobAdd(job) - pod := &api_v1.Pod{ - ObjectMeta: meta_v1.ObjectMeta{ - Name: "ad-hoc-job-pod", - Namespace: "default", - OwnerReferences: []meta_v1.OwnerReference{ + isController := true + isNotController := false + + testCases := []struct { + name string + rules ExtractionRules + jobOwners []meta_v1.OwnerReference + want map[string]string + }{ + { + name: "no-rules", + rules: ExtractionRules{}, + jobOwners: []meta_v1.OwnerReference{ { APIVersion: "batch/v1", - Kind: "Job", - Name: "ad-hoc-job", - UID: "job-uid-456", + Kind: "CronJob", + Name: "my-cronjob", + UID: "cron-uid-999", + Controller: &isController, }, }, + want: map[string]string{}, }, - Status: api_v1.PodStatus{ - PodIP: "10.0.0.6", + { + name: "cronjob-is-controller_emit_uid_only", + rules: ExtractionRules{ + CronJobUID: true, + }, + jobOwners: []meta_v1.OwnerReference{ + { + APIVersion: "batch/v1", + Kind: "CronJob", + Name: "my-cronjob", + UID: "cron-uid-999", + Controller: &isController, + }, + }, + want: map[string]string{ + "k8s.cronjob.uid": "cron-uid-999", + }, + }, + { + name: "cronjob-is-controller_emit_name_and_uid", + rules: ExtractionRules{ + CronJobName: true, + CronJobUID: true, + }, + jobOwners: []meta_v1.OwnerReference{ + { + APIVersion: "batch/v1", + Kind: "CronJob", + Name: "my-cronjob", + UID: "cron-uid-999", + Controller: &isController, + }, + }, + want: map[string]string{ + "k8s.cronjob.name": "my-cronjob", + "k8s.cronjob.uid": "cron-uid-999", + }, + }, + { + name: "cronjob-is-not-controller_do_not_emit", + rules: ExtractionRules{ + CronJobName: true, + CronJobUID: true, + }, + jobOwners: []meta_v1.OwnerReference{ + { + APIVersion: "batch/v1", + Kind: "CronJob", + Name: "my-cronjob", + UID: "cron-uid-999", + Controller: &isNotController, + }, + }, + want: map[string]string{}, + }, + { + name: "multiple_owners_only_controller_counts", + rules: ExtractionRules{ + CronJobName: true, + CronJobUID: true, + }, + jobOwners: []meta_v1.OwnerReference{ + { + APIVersion: "batch/v1", + Kind: "CronJob", + Name: "cj-not-controller", + UID: "cron-uid-111", + Controller: &isNotController, + }, + { + APIVersion: "batch/v1", + Kind: "CronJob", + Name: "cj-controller", + UID: "cron-uid-222", + Controller: &isController, + }, + }, + want: map[string]string{ + "k8s.cronjob.name": "cj-controller", + "k8s.cronjob.uid": "cron-uid-222", + }, + }, + { + name: "no_cronjob_owner", + rules: ExtractionRules{ + CronJobName: true, + CronJobUID: true, + }, + jobOwners: []meta_v1.OwnerReference{ + { + APIVersion: "batch/v1", + Kind: "SomethingElse", + Name: "not-a-cronjob", + UID: "whatever", + Controller: &isController, + }, + }, + want: map[string]string{}, }, } - transformed := removeUnnecessaryPodData(pod, c.Rules) - c.handlePodAdd(transformed) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + c.Rules = tc.rules + + // Set owners on Job according to case + job.OwnerReferences = tc.jobOwners + + // Emulate informer transforms (like other tests do) + transformedPod := removeUnnecessaryPodData(pod, c.Rules) + transformedJob := removeUnnecessaryJobData(job) + + // Feed caches + c.handleJobAdd(transformedJob) + c.handlePodAdd(transformedPod) - p, ok := c.GetPod(newPodIdentifier("connection", "", "10.0.0.6")) - require.True(t, ok) + // Fetch enriched pod by connection id + p, ok := c.GetPod(newPodIdentifier("connection", "", pod.Status.PodIP)) + require.True(t, ok) - _, exists := p.Attributes[string(conventions.K8SCronJobUIDKey)] - assert.False(t, exists, "did not expect k8s.cronjob.uid when Job has no CronJob owner") + assert.Len(t, p.Attributes, len(tc.want)) + for k, v := range tc.want { + got, ok := p.Attributes[k] + assert.True(t, ok, "expected attribute %s", k) + assert.Equal(t, v, got) + } + }) + } } + +// func TestCronJobUIDResolution_NoOwner(t *testing.T) { +// c, _ := newTestClientWithRulesAndFilters(t, Filters{}) +// c.Rules = ExtractionRules{ +// CronJobUID: true, +// } + +// // Job without a CronJob owner +// job := &batch_v1.Job{ +// ObjectMeta: meta_v1.ObjectMeta{ +// Name: "ad-hoc-job", +// UID: "job-uid-456", +// }, +// } +// c.handleJobAdd(job) + +// pod := &api_v1.Pod{ +// ObjectMeta: meta_v1.ObjectMeta{ +// Name: "ad-hoc-job-pod", +// Namespace: "default", +// OwnerReferences: []meta_v1.OwnerReference{ +// { +// APIVersion: "batch/v1", +// Kind: "Job", +// Name: "ad-hoc-job", +// UID: "job-uid-456", +// }, +// }, +// }, +// Status: api_v1.PodStatus{ +// PodIP: "10.0.0.6", +// }, +// } + +// transformed := removeUnnecessaryPodData(pod, c.Rules) +// c.handlePodAdd(transformed) + +// p, ok := c.GetPod(newPodIdentifier("connection", "", "10.0.0.6")) +// require.True(t, ok) + +// _, exists := p.Attributes[string(conventions.K8SCronJobUIDKey)] +// assert.False(t, exists, "did not expect k8s.cronjob.uid when Job has no CronJob owner") +// } diff --git a/processor/k8sattributesprocessor/internal/kube/kube.go b/processor/k8sattributesprocessor/internal/kube/kube.go index 2d3951dfccd7f..d0fcd171365ef 100644 --- a/processor/k8sattributesprocessor/internal/kube/kube.go +++ b/processor/k8sattributesprocessor/internal/kube/kube.go @@ -36,9 +36,7 @@ const ( // MetadataFromDaemonSet is used to specify to extract metadata/labels/annotations from daemonset MetadataFromDaemonSet = "daemonset" // MetadataFromJob is used to specify to extract metadata/labels/annotations from job - MetadataFromJob = "job" - // MetadataFromCronJob is used to specify to extract metadata/labels/annotations from job - MetadataFromCronJob = "cronjob" + MetadataFromJob = "job" PodIdentifierMaxLength = 4 ResourceSource = "resource_attribute" @@ -264,8 +262,9 @@ type ExtractionRules struct { ServiceVersion bool ServiceInstanceID bool - Annotations []FieldExtractionRule - Labels []FieldExtractionRule + Annotations []FieldExtractionRule + Labels []FieldExtractionRule + CronJobFromJob bool } // IncludesOwnerMetadata determines whether the ExtractionRules include metadata about Pod Owners @@ -360,12 +359,6 @@ func (r *FieldExtractionRule) extractFromJobMetadata(metadata, tags map[string]s } } -func (r *FieldExtractionRule) extractFromCronJobMetadata(metadata, tags map[string]string, formatter string) { - if r.From == MetadataFromCronJob { - r.extractFromMetadata(metadata, tags, formatter) - } -} - func (r *FieldExtractionRule) extractFromMetadata(metadata, tags map[string]string, formatter string) { if r.KeyRegex != nil { for k, v := range metadata { @@ -459,6 +452,7 @@ type Job struct { Name string UID string Attributes map[string]string + CronJob CronJob } // CronJob represents a kubernetes cronjob. From eef52edf30cbddd82fd15c0d43281ed83deb2f32 Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Wed, 17 Sep 2025 00:12:13 +0100 Subject: [PATCH 07/16] feat: remove informers Signed-off-by: Paulo Dias --- .../k8sattributesprocessor/client_test.go | 5 -- .../internal/kube/client.go | 7 +-- .../internal/kube/client_test.go | 43 ----------------- .../internal/kube/informer.go | 27 ----------- .../internal/kube/informer_test.go | 7 --- .../internal/kube/kube.go | 5 +- .../internal/metadata/generated_telemetry.go | 21 -------- .../metadatatest/generated_telemetrytest.go | 48 ------------------- .../generated_telemetrytest_test.go | 12 ----- .../k8sattributesprocessor/metadata.yaml | 21 -------- 10 files changed, 3 insertions(+), 193 deletions(-) diff --git a/processor/k8sattributesprocessor/client_test.go b/processor/k8sattributesprocessor/client_test.go index 65cad1ebe9f8d..c99e8caf8921c 100644 --- a/processor/k8sattributesprocessor/client_test.go +++ b/processor/k8sattributesprocessor/client_test.go @@ -102,11 +102,6 @@ func (f *fakeClient) GetJob(jobUID string) (*kube.Job, bool) { return j, ok } -func (f *fakeClient) GetCronJob(cronJobUID string) (*kube.CronJob, bool) { - cj, ok := f.CronJobs[cronJobUID] - return cj, ok -} - // Start is a noop for FakeClient. func (f *fakeClient) Start() error { if f.Informer != nil { diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index d9a6159e47407..cda09101e45b5 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -125,10 +125,6 @@ type WatchClient struct { // Key is job uid Jobs map[string]*Job - // A map containing cron job related data, used to associate them with resources. - // Key is cron job uid - CronJobs map[string]*CronJob - // A map containing ReplicaSets related data, used to associate them with resources. // Key is replicaset uid ReplicaSets map[string]*ReplicaSet @@ -192,7 +188,6 @@ func New( c.StatefulSets = map[string]*StatefulSet{} c.DaemonSets = map[string]*DaemonSet{} c.Jobs = map[string]*Job{} - c.CronJobs = map[string]*CronJob{} if newClientSet == nil { newClientSet = k8sconfig.MakeClient } @@ -284,7 +279,7 @@ func New( c.daemonsetInformer = newDaemonSetSharedInformer(c.kc, c.Filters.Namespace) } - if c.extractJobLabelsAnnotations() || rules.CronJobUID { + if c.extractJobLabelsAnnotations() { c.jobInformer = newJobSharedInformer(c.kc, c.Filters.Namespace) } diff --git a/processor/k8sattributesprocessor/internal/kube/client_test.go b/processor/k8sattributesprocessor/internal/kube/client_test.go index 147c540a05d92..b05224757dce6 100644 --- a/processor/k8sattributesprocessor/internal/kube/client_test.go +++ b/processor/k8sattributesprocessor/internal/kube/client_test.go @@ -3317,46 +3317,3 @@ func TestCronJobExtractionRules_FromJobOwner(t *testing.T) { }) } } - -// func TestCronJobUIDResolution_NoOwner(t *testing.T) { -// c, _ := newTestClientWithRulesAndFilters(t, Filters{}) -// c.Rules = ExtractionRules{ -// CronJobUID: true, -// } - -// // Job without a CronJob owner -// job := &batch_v1.Job{ -// ObjectMeta: meta_v1.ObjectMeta{ -// Name: "ad-hoc-job", -// UID: "job-uid-456", -// }, -// } -// c.handleJobAdd(job) - -// pod := &api_v1.Pod{ -// ObjectMeta: meta_v1.ObjectMeta{ -// Name: "ad-hoc-job-pod", -// Namespace: "default", -// OwnerReferences: []meta_v1.OwnerReference{ -// { -// APIVersion: "batch/v1", -// Kind: "Job", -// Name: "ad-hoc-job", -// UID: "job-uid-456", -// }, -// }, -// }, -// Status: api_v1.PodStatus{ -// PodIP: "10.0.0.6", -// }, -// } - -// transformed := removeUnnecessaryPodData(pod, c.Rules) -// c.handlePodAdd(transformed) - -// p, ok := c.GetPod(newPodIdentifier("connection", "", "10.0.0.6")) -// require.True(t, ok) - -// _, exists := p.Attributes[string(conventions.K8SCronJobUIDKey)] -// assert.False(t, exists, "did not expect k8s.cronjob.uid when Job has no CronJob owner") -// } diff --git a/processor/k8sattributesprocessor/internal/kube/informer.go b/processor/k8sattributesprocessor/internal/kube/informer.go index fde815c4d6148..5740f6128b130 100644 --- a/processor/k8sattributesprocessor/internal/kube/informer.go +++ b/processor/k8sattributesprocessor/internal/kube/informer.go @@ -252,33 +252,6 @@ func jobWatchFuncWithSelectors(client kubernetes.Interface, namespace string) ca } } -func newCronJobSharedInformer( - client kubernetes.Interface, - namespace string, -) cache.SharedInformer { - informer := cache.NewSharedInformer( - &cache.ListWatch{ - ListFunc: cronJobListFuncWithSelectors(client, namespace), - WatchFunc: cronJobWatchFuncWithSelectors(client, namespace), - }, - &batch_v1.CronJob{}, - watchSyncPeriod, - ) - return informer -} - -func cronJobListFuncWithSelectors(client kubernetes.Interface, namespace string) cache.ListFunc { - return func(opts metav1.ListOptions) (runtime.Object, error) { - return client.BatchV1().CronJobs(namespace).List(context.Background(), opts) - } -} - -func cronJobWatchFuncWithSelectors(client kubernetes.Interface, namespace string) cache.WatchFunc { - return func(opts metav1.ListOptions) (watch.Interface, error) { - return client.BatchV1().CronJobs(namespace).Watch(context.Background(), opts) - } -} - func daemonsetListFuncWithSelectors(client kubernetes.Interface, namespace string) cache.ListFunc { return func(opts metav1.ListOptions) (runtime.Object, error) { return client.AppsV1().DaemonSets(namespace).List(context.Background(), opts) diff --git a/processor/k8sattributesprocessor/internal/kube/informer_test.go b/processor/k8sattributesprocessor/internal/kube/informer_test.go index 87b8e844107b9..107c05013f181 100644 --- a/processor/k8sattributesprocessor/internal/kube/informer_test.go +++ b/processor/k8sattributesprocessor/internal/kube/informer_test.go @@ -61,13 +61,6 @@ func Test_newSharedJobInformer(t *testing.T) { assert.NotNil(t, informer) } -func Test_newSharedCronJobInformer(t *testing.T) { - client, err := newFakeAPIClientset(k8sconfig.APIConfig{}) - require.NoError(t, err) - informer := newCronJobSharedInformer(client, "ns") - assert.NotNil(t, informer) -} - func Test_newKubeSystemSharedInformer(t *testing.T) { client, err := newFakeAPIClientset(k8sconfig.APIConfig{}) require.NoError(t, err) diff --git a/processor/k8sattributesprocessor/internal/kube/kube.go b/processor/k8sattributesprocessor/internal/kube/kube.go index d0fcd171365ef..cb86a3fe52c9f 100644 --- a/processor/k8sattributesprocessor/internal/kube/kube.go +++ b/processor/k8sattributesprocessor/internal/kube/kube.go @@ -262,9 +262,8 @@ type ExtractionRules struct { ServiceVersion bool ServiceInstanceID bool - Annotations []FieldExtractionRule - Labels []FieldExtractionRule - CronJobFromJob bool + Annotations []FieldExtractionRule + Labels []FieldExtractionRule } // IncludesOwnerMetadata determines whether the ExtractionRules include metadata about Pod Owners diff --git a/processor/k8sattributesprocessor/internal/metadata/generated_telemetry.go b/processor/k8sattributesprocessor/internal/metadata/generated_telemetry.go index f4d1770490720..e5ecb1c810113 100644 --- a/processor/k8sattributesprocessor/internal/metadata/generated_telemetry.go +++ b/processor/k8sattributesprocessor/internal/metadata/generated_telemetry.go @@ -26,9 +26,6 @@ type TelemetryBuilder struct { meter metric.Meter mu sync.Mutex registrations []metric.Registration - OtelsvcK8sCronjobAdded metric.Int64Counter - OtelsvcK8sCronjobDeleted metric.Int64Counter - OtelsvcK8sCronjobUpdated metric.Int64Counter OtelsvcK8sDaemonsetAdded metric.Int64Counter OtelsvcK8sDaemonsetDeleted metric.Int64Counter OtelsvcK8sDaemonsetUpdated metric.Int64Counter @@ -86,24 +83,6 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme } builder.meter = Meter(settings) var err, errs error - builder.OtelsvcK8sCronjobAdded, err = builder.meter.Int64Counter( - "otelcol_otelsvc_k8s_cronjob_added", - metric.WithDescription("Number of cron job add events received"), - metric.WithUnit("1"), - ) - errs = errors.Join(errs, err) - builder.OtelsvcK8sCronjobDeleted, err = builder.meter.Int64Counter( - "otelcol_otelsvc_k8s_cronjob_deleted", - metric.WithDescription("Number of cron job delete events received"), - metric.WithUnit("1"), - ) - errs = errors.Join(errs, err) - builder.OtelsvcK8sCronjobUpdated, err = builder.meter.Int64Counter( - "otelcol_otelsvc_k8s_cronjob_updated", - metric.WithDescription("Number of cron job update events received"), - metric.WithUnit("1"), - ) - errs = errors.Join(errs, err) builder.OtelsvcK8sDaemonsetAdded, err = builder.meter.Int64Counter( "otelcol_otelsvc_k8s_daemonset_added", metric.WithDescription("Number of daemonset add events received"), diff --git a/processor/k8sattributesprocessor/internal/metadatatest/generated_telemetrytest.go b/processor/k8sattributesprocessor/internal/metadatatest/generated_telemetrytest.go index 8f0775b438577..9ad1c6bc51740 100644 --- a/processor/k8sattributesprocessor/internal/metadatatest/generated_telemetrytest.go +++ b/processor/k8sattributesprocessor/internal/metadatatest/generated_telemetrytest.go @@ -21,54 +21,6 @@ func NewSettings(tt *componenttest.Telemetry) processor.Settings { return set } -func AssertEqualOtelsvcK8sCronjobAdded(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { - want := metricdata.Metrics{ - Name: "otelcol_otelsvc_k8s_cronjob_added", - Description: "Number of cron job add events received", - Unit: "1", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: dps, - }, - } - got, err := tt.GetMetric("otelcol_otelsvc_k8s_cronjob_added") - require.NoError(t, err) - metricdatatest.AssertEqual(t, want, got, opts...) -} - -func AssertEqualOtelsvcK8sCronjobDeleted(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { - want := metricdata.Metrics{ - Name: "otelcol_otelsvc_k8s_cronjob_deleted", - Description: "Number of cron job delete events received", - Unit: "1", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: dps, - }, - } - got, err := tt.GetMetric("otelcol_otelsvc_k8s_cronjob_deleted") - require.NoError(t, err) - metricdatatest.AssertEqual(t, want, got, opts...) -} - -func AssertEqualOtelsvcK8sCronjobUpdated(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { - want := metricdata.Metrics{ - Name: "otelcol_otelsvc_k8s_cronjob_updated", - Description: "Number of cron job update events received", - Unit: "1", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: dps, - }, - } - got, err := tt.GetMetric("otelcol_otelsvc_k8s_cronjob_updated") - require.NoError(t, err) - metricdatatest.AssertEqual(t, want, got, opts...) -} - func AssertEqualOtelsvcK8sDaemonsetAdded(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { want := metricdata.Metrics{ Name: "otelcol_otelsvc_k8s_daemonset_added", diff --git a/processor/k8sattributesprocessor/internal/metadatatest/generated_telemetrytest_test.go b/processor/k8sattributesprocessor/internal/metadatatest/generated_telemetrytest_test.go index 7ccef748f0487..69717d33c56a7 100644 --- a/processor/k8sattributesprocessor/internal/metadatatest/generated_telemetrytest_test.go +++ b/processor/k8sattributesprocessor/internal/metadatatest/generated_telemetrytest_test.go @@ -20,9 +20,6 @@ func TestSetupTelemetry(t *testing.T) { tb, err := metadata.NewTelemetryBuilder(testTel.NewTelemetrySettings()) require.NoError(t, err) defer tb.Shutdown() - tb.OtelsvcK8sCronjobAdded.Add(context.Background(), 1) - tb.OtelsvcK8sCronjobDeleted.Add(context.Background(), 1) - tb.OtelsvcK8sCronjobUpdated.Add(context.Background(), 1) tb.OtelsvcK8sDaemonsetAdded.Add(context.Background(), 1) tb.OtelsvcK8sDaemonsetDeleted.Add(context.Background(), 1) tb.OtelsvcK8sDaemonsetUpdated.Add(context.Background(), 1) @@ -49,15 +46,6 @@ func TestSetupTelemetry(t *testing.T) { tb.OtelsvcK8sStatefulsetAdded.Add(context.Background(), 1) tb.OtelsvcK8sStatefulsetDeleted.Add(context.Background(), 1) tb.OtelsvcK8sStatefulsetUpdated.Add(context.Background(), 1) - AssertEqualOtelsvcK8sCronjobAdded(t, testTel, - []metricdata.DataPoint[int64]{{Value: 1}}, - metricdatatest.IgnoreTimestamp()) - AssertEqualOtelsvcK8sCronjobDeleted(t, testTel, - []metricdata.DataPoint[int64]{{Value: 1}}, - metricdatatest.IgnoreTimestamp()) - AssertEqualOtelsvcK8sCronjobUpdated(t, testTel, - []metricdata.DataPoint[int64]{{Value: 1}}, - metricdatatest.IgnoreTimestamp()) AssertEqualOtelsvcK8sDaemonsetAdded(t, testTel, []metricdata.DataPoint[int64]{{Value: 1}}, metricdatatest.IgnoreTimestamp()) diff --git a/processor/k8sattributesprocessor/metadata.yaml b/processor/k8sattributesprocessor/metadata.yaml index 41c071620bf1a..7ab1154d6ae91 100644 --- a/processor/k8sattributesprocessor/metadata.yaml +++ b/processor/k8sattributesprocessor/metadata.yaml @@ -280,27 +280,6 @@ telemetry: sum: value_type: int monotonic: true - otelsvc_k8s_cronjob_updated: - enabled: false - description: Number of cron job update events received - unit: "1" - sum: - value_type: int - monotonic: true - otelsvc_k8s_cronjob_added: - enabled: false - description: Number of cron job add events received - unit: "1" - sum: - value_type: int - monotonic: true - otelsvc_k8s_cronjob_deleted: - enabled: false - description: Number of cron job delete events received - unit: "1" - sum: - value_type: int - monotonic: true otelsvc_k8s_daemonset_updated: enabled: false description: Number of daemonset update events received From f29414ef71128997e94d377adac49548de0c40fb Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Wed, 17 Sep 2025 00:27:07 +0100 Subject: [PATCH 08/16] fix: remove CronJobs from fakeClient Signed-off-by: Paulo Dias --- processor/k8sattributesprocessor/client_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/processor/k8sattributesprocessor/client_test.go b/processor/k8sattributesprocessor/client_test.go index c99e8caf8921c..bb6790b6132c1 100644 --- a/processor/k8sattributesprocessor/client_test.go +++ b/processor/k8sattributesprocessor/client_test.go @@ -33,7 +33,6 @@ type fakeClient struct { DaemonSets map[string]*kube.DaemonSet ReplicaSets map[string]*kube.ReplicaSet Jobs map[string]*kube.Job - CronJobs map[string]*kube.CronJob StopCh chan struct{} } From 953c4bb1766b8d839c8a78d6b98e973914831b9c Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Wed, 17 Sep 2025 14:06:28 +0100 Subject: [PATCH 09/16] fix: revert job name evaluation and run informer if cronjob uid is requested Signed-off-by: Paulo Dias --- .../internal/kube/client.go | 60 +++++++++---------- .../internal/kube/client_test.go | 10 +++- 2 files changed, 36 insertions(+), 34 deletions(-) diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index cda09101e45b5..c270a48c97faa 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -146,6 +146,7 @@ type InformersFactoryList struct { newInformer InformerProvider newNamespaceInformer InformerProviderNamespace newReplicaSetInformer InformerProviderWorkload + newJobInformer InformerProviderWorkload } // New initializes a new k8s Client. @@ -279,8 +280,23 @@ func New( c.daemonsetInformer = newDaemonSetSharedInformer(c.kc, c.Filters.Namespace) } - if c.extractJobLabelsAnnotations() { - c.jobInformer = newJobSharedInformer(c.kc, c.Filters.Namespace) + if c.extractJobLabelsAnnotations() || rules.CronJobUID { + if informersFactory.newJobInformer == nil { + informersFactory.newJobInformer = newJobSharedInformer + } + c.jobInformer = informersFactory.newJobInformer(c.kc, c.Filters.Namespace) + err = c.jobInformer.SetTransform( + func(object any) (any, error) { + originalJob, success := object.(*batch_v1.Job) + if !success { // means this is a cache.DeletedFinalStateUnknown, in which case we do nothing + return object, nil + } + return removeUnnecessaryJobData(originalJob), nil + }, + ) + if err != nil { + return nil, err + } } return c, err @@ -875,25 +891,15 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string { tags[string(conventions.ServiceNameKey)] = ref.Name } if c.Rules.CronJobName || c.Rules.ServiceName { - if job, ok := c.GetJob(string(ref.UID)); ok { - if job.CronJob.Name != "" { - if c.Rules.CronJobName { - tags[string(conventions.K8SCronJobNameKey)] = job.CronJob.Name - } - if c.Rules.ServiceName { - tags[string(conventions.ServiceNameKey)] = job.CronJob.Name - } + parts := c.cronJobRegex.FindStringSubmatch(ref.Name) + if len(parts) == 2 { + name := parts[1] + if c.Rules.CronJobName { + tags[string(conventions.K8SCronJobNameKey)] = name } - } else { - parts := c.cronJobRegex.FindStringSubmatch(ref.Name) - if len(parts) == 2 { - name := parts[1] - if c.Rules.CronJobName { - tags[string(conventions.K8SCronJobNameKey)] = name - } - if c.Rules.ServiceName { - tags[string(conventions.ServiceNameKey)] = name - } + if c.Rules.ServiceName { + // cronjob name wins over job name + tags[string(conventions.ServiceNameKey)] = name } } } @@ -1812,7 +1818,7 @@ func removeUnnecessaryReplicaSetData(replicaset *apps_v1.ReplicaSet) *apps_v1.Re // This function removes all data from the Job except what is required by extraction rules func removeUnnecessaryJobData(job *batch_v1.Job) *batch_v1.Job { - transformedJob := &batch_v1.Job{ + transformedJob := batch_v1.Job{ ObjectMeta: meta_v1.ObjectMeta{ Name: job.GetName(), Namespace: job.GetNamespace(), @@ -1820,16 +1826,8 @@ func removeUnnecessaryJobData(job *batch_v1.Job) *batch_v1.Job { }, } - // Keep only controller owners - var filtered []meta_v1.OwnerReference - for _, or := range job.GetOwnerReferences() { - if or.Controller != nil && *or.Controller { - filtered = append(filtered, or) - } - } - transformedJob.SetOwnerReferences(filtered) - - return transformedJob + transformedJob.SetOwnerReferences(job.GetOwnerReferences()) + return &transformedJob } // runInformerWithDependencies starts the given informer. The second argument is a list of other informers that should complete diff --git a/processor/k8sattributesprocessor/internal/kube/client_test.go b/processor/k8sattributesprocessor/internal/kube/client_test.go index b05224757dce6..c6515d7d6f874 100644 --- a/processor/k8sattributesprocessor/internal/kube/client_test.go +++ b/processor/k8sattributesprocessor/internal/kube/client_test.go @@ -3241,7 +3241,9 @@ func TestCronJobExtractionRules_FromJobOwner(t *testing.T) { Controller: &isNotController, }, }, - want: map[string]string{}, + want: map[string]string{ + "k8s.cronjob.name": "my-cronjob", + }, }, { name: "multiple_owners_only_controller_counts", @@ -3266,7 +3268,7 @@ func TestCronJobExtractionRules_FromJobOwner(t *testing.T) { }, }, want: map[string]string{ - "k8s.cronjob.name": "cj-controller", + "k8s.cronjob.name": "my-cronjob", "k8s.cronjob.uid": "cron-uid-222", }, }, @@ -3285,7 +3287,9 @@ func TestCronJobExtractionRules_FromJobOwner(t *testing.T) { Controller: &isController, }, }, - want: map[string]string{}, + want: map[string]string{ + "k8s.cronjob.name": "my-cronjob", + }, }, } From 2d82df543bc37d3868b4f7c76be2af56e9a18de6 Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Wed, 17 Sep 2025 14:09:56 +0100 Subject: [PATCH 10/16] fix: revert newJobInformer Signed-off-by: Paulo Dias --- .../internal/kube/client.go | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index c270a48c97faa..d5dd05c8c4e23 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -146,7 +146,6 @@ type InformersFactoryList struct { newInformer InformerProvider newNamespaceInformer InformerProviderNamespace newReplicaSetInformer InformerProviderWorkload - newJobInformer InformerProviderWorkload } // New initializes a new k8s Client. @@ -281,22 +280,7 @@ func New( } if c.extractJobLabelsAnnotations() || rules.CronJobUID { - if informersFactory.newJobInformer == nil { - informersFactory.newJobInformer = newJobSharedInformer - } - c.jobInformer = informersFactory.newJobInformer(c.kc, c.Filters.Namespace) - err = c.jobInformer.SetTransform( - func(object any) (any, error) { - originalJob, success := object.(*batch_v1.Job) - if !success { // means this is a cache.DeletedFinalStateUnknown, in which case we do nothing - return object, nil - } - return removeUnnecessaryJobData(originalJob), nil - }, - ) - if err != nil { - return nil, err - } + c.jobInformer = newJobSharedInformer(c.kc, c.Filters.Namespace) } return c, err From 8882667800ef4b55b1ff3cc4d7843f8a609783e1 Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Wed, 17 Sep 2025 14:23:37 +0100 Subject: [PATCH 11/16] fix: newJobInformer Signed-off-by: Paulo Dias --- .../internal/kube/client.go | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index d5dd05c8c4e23..0172423f931eb 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -146,6 +146,7 @@ type InformersFactoryList struct { newInformer InformerProvider newNamespaceInformer InformerProviderNamespace newReplicaSetInformer InformerProviderWorkload + newJobInformer InformerProviderWorkload } // New initializes a new k8s Client. @@ -280,7 +281,23 @@ func New( } if c.extractJobLabelsAnnotations() || rules.CronJobUID { - c.jobInformer = newJobSharedInformer(c.kc, c.Filters.Namespace) + if informersFactory.newJobInformer == nil { + informersFactory.newJobInformer = newJobSharedInformer + } + c.jobInformer = informersFactory.newJobInformer(c.kc, c.Filters.Namespace) + err = c.jobInformer.SetTransform( + func(object any) (any, error) { + originalJob, success := object.(*batch_v1.Job) + if !success { // means this is a cache.DeletedFinalStateUnknown, in which case we do nothing + return object, nil + } + + return removeUnnecessaryJobData(originalJob), nil + }, + ) + if err != nil { + return nil, err + } } return c, err From 9f2bb4bc2c92d7e4c1eb36abb857e8cf2136ec0a Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Wed, 17 Sep 2025 14:28:15 +0100 Subject: [PATCH 12/16] fix: remove removeUnnecessaryJobData Signed-off-by: Paulo Dias --- .../internal/kube/client.go | 33 +------------------ .../internal/kube/client_test.go | 3 +- 2 files changed, 2 insertions(+), 34 deletions(-) diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index 0172423f931eb..0c46600a1c531 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -146,7 +146,6 @@ type InformersFactoryList struct { newInformer InformerProvider newNamespaceInformer InformerProviderNamespace newReplicaSetInformer InformerProviderWorkload - newJobInformer InformerProviderWorkload } // New initializes a new k8s Client. @@ -281,23 +280,7 @@ func New( } if c.extractJobLabelsAnnotations() || rules.CronJobUID { - if informersFactory.newJobInformer == nil { - informersFactory.newJobInformer = newJobSharedInformer - } - c.jobInformer = informersFactory.newJobInformer(c.kc, c.Filters.Namespace) - err = c.jobInformer.SetTransform( - func(object any) (any, error) { - originalJob, success := object.(*batch_v1.Job) - if !success { // means this is a cache.DeletedFinalStateUnknown, in which case we do nothing - return object, nil - } - - return removeUnnecessaryJobData(originalJob), nil - }, - ) - if err != nil { - return nil, err - } + c.jobInformer = newJobSharedInformer(c.kc, c.Filters.Namespace) } return c, err @@ -1817,20 +1800,6 @@ func removeUnnecessaryReplicaSetData(replicaset *apps_v1.ReplicaSet) *apps_v1.Re return &transformedReplicaset } -// This function removes all data from the Job except what is required by extraction rules -func removeUnnecessaryJobData(job *batch_v1.Job) *batch_v1.Job { - transformedJob := batch_v1.Job{ - ObjectMeta: meta_v1.ObjectMeta{ - Name: job.GetName(), - Namespace: job.GetNamespace(), - UID: job.GetUID(), - }, - } - - transformedJob.SetOwnerReferences(job.GetOwnerReferences()) - return &transformedJob -} - // runInformerWithDependencies starts the given informer. The second argument is a list of other informers that should complete // before the informer is started. This is necessary e.g. for the pod informer which requires the replica set informer // to be finished to correctly establish the connection to the replicaset/deployment it belongs to. diff --git a/processor/k8sattributesprocessor/internal/kube/client_test.go b/processor/k8sattributesprocessor/internal/kube/client_test.go index c6515d7d6f874..3f08d394713e4 100644 --- a/processor/k8sattributesprocessor/internal/kube/client_test.go +++ b/processor/k8sattributesprocessor/internal/kube/client_test.go @@ -3302,10 +3302,9 @@ func TestCronJobExtractionRules_FromJobOwner(t *testing.T) { // Emulate informer transforms (like other tests do) transformedPod := removeUnnecessaryPodData(pod, c.Rules) - transformedJob := removeUnnecessaryJobData(job) // Feed caches - c.handleJobAdd(transformedJob) + c.handleJobAdd(job) c.handlePodAdd(transformedPod) // Fetch enriched pod by connection id From dab4cd263c3d938db22c183a16e1475778faf292 Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Wed, 17 Sep 2025 14:40:01 +0100 Subject: [PATCH 13/16] chore: update README.md Signed-off-by: Paulo Dias --- processor/k8sattributesprocessor/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/k8sattributesprocessor/README.md b/processor/k8sattributesprocessor/README.md index 247a7ae7d6618..69fd0c1b4640c 100644 --- a/processor/k8sattributesprocessor/README.md +++ b/processor/k8sattributesprocessor/README.md @@ -325,7 +325,7 @@ k8sattributes: ## Cluster-scoped RBAC -If you'd like to set up the k8sattributesprocessor to receive telemetry from across namespaces, it will need `get`, `watch` and `list` permissions on both `pods` and `namespaces` resources, for all namespaces and pods included in the configured filters. Additionally, when using `k8s.deployment.name` (which is enabled by default) or `k8s.deployment.uid` the processor also needs `get`, `watch` and `list` permissions for `replicasets` resources. When using `k8s.node.uid` or extracting metadata from `node`, the processor needs `get`, `watch` and `list` permissions for `nodes` resources. +If you'd like to set up the k8sattributesprocessor to receive telemetry from across namespaces, it will need `get`, `watch` and `list` permissions on both `pods` and `namespaces` resources, for all namespaces and pods included in the configured filters. Additionally, when using `k8s.deployment.name` (which is enabled by default) or `k8s.deployment.uid` the processor also needs `get`, `watch` and `list` permissions for `replicasets` resources. When using `k8s.node.uid` or extracting metadata from `node`, the processor needs `get`, `watch` and `list` permissions for `nodes` resources. When using `k8s.cronjob.uid` the processor also needs `get`, `watch` and `list` permissions for `jobs` resources. Here is an example of a `ClusterRole` to give a `ServiceAccount` the necessary permissions for all pods, nodes, and namespaces in the cluster (replace `` with a namespace where collector is deployed): From 88bfa7cc8be5e0dcb5b0cef4c95667568dc56efc Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Wed, 17 Sep 2025 14:51:48 +0100 Subject: [PATCH 14/16] chore: add e2e cronjob test Signed-off-by: Paulo Dias --- processor/k8sattributesprocessor/e2e_test.go | 109 ++++++++++++++++++ .../e2e/clusterrbac/collector/configmap.yaml | 1 + .../e2e/clusterrbac/telemetrygen/cronjob.yaml | 34 ++++++ 3 files changed, 144 insertions(+) create mode 100644 processor/k8sattributesprocessor/testdata/e2e/clusterrbac/telemetrygen/cronjob.yaml diff --git a/processor/k8sattributesprocessor/e2e_test.go b/processor/k8sattributesprocessor/e2e_test.go index 50a0e10d195f1..56026716adc78 100644 --- a/processor/k8sattributesprocessor/e2e_test.go +++ b/processor/k8sattributesprocessor/e2e_test.go @@ -131,6 +131,33 @@ func TestE2E_ClusterRBAC(t *testing.T) { service string attrs map[string]*expectedValue }{ + { + name: "traces-cronjob", + dataType: pipeline.SignalTraces, + service: "test-traces-cronjob", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(regex, "telemetrygen-"+testID+"-traces-cronjob-[a-z0-9-]*"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, ""), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, testNs), + "k8s.cronjob.name": newExpectedValue(equal, "telemetrygen-"+testID+"-traces-cronjob"), + "k8s.cronjob.uid": newExpectedValue(exist, ""), + "k8s.job.name": newExpectedValue(equal, "telemetrygen-"+testID+"-traces-cronjob"), + "k8s.job.uid": newExpectedValue(exist, ""), + "k8s.annotations.workload": newExpectedValue(equal, "cronjob"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-traces-cronjob"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "k8s.cluster.uid": newExpectedValue(regex, uidRe), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), + "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), + }, + }, { name: "traces-job", dataType: pipeline.SignalTraces, @@ -236,6 +263,33 @@ func TestE2E_ClusterRBAC(t *testing.T) { "simple-daemonset-workload-annotation": newExpectedValue(equal, "daemonset-annotation"), }, }, + { + name: "metrics-cronjob", + dataType: pipeline.SignalMetrics, + service: "test-metrics-cronjob", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(regex, "telemetrygen-"+testID+"-metrics-cronjob-[a-z0-9-]*"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, ""), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, testNs), + "k8s.cronjob.name": newExpectedValue(equal, "telemetrygen-"+testID+"-metrics-cronjob"), + "k8s.cronjob.uid": newExpectedValue(exist, ""), + "k8s.job.name": newExpectedValue(equal, "telemetrygen-"+testID+"-metrics-cronjob"), + "k8s.job.uid": newExpectedValue(exist, ""), + "k8s.annotations.workload": newExpectedValue(equal, "cronjob"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-metrics-cronjob"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "k8s.cluster.uid": newExpectedValue(regex, uidRe), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), + "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), + }, + }, { name: "metrics-job", dataType: pipeline.SignalMetrics, @@ -341,6 +395,34 @@ func TestE2E_ClusterRBAC(t *testing.T) { "simple-daemonset-workload-annotation": newExpectedValue(equal, "daemonset-annotation"), }, }, + { + name: "logs-cronjob", + dataType: pipeline.SignalLogs, + service: "test-logs-cronjob", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(regex, "telemetrygen-"+testID+"-logs-cronjob-[a-z0-9-]*"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, ""), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, testNs), + "k8s.cronjob.name": newExpectedValue(equal, "telemetrygen-"+testID+"-logs-cronjob"), + "k8s.cronjob.uid": newExpectedValue(exist, ""), + "k8s.job.name": newExpectedValue(equal, "telemetrygen-"+testID+"-logs-cronjob"), + "k8s.job.uid": newExpectedValue(exist, ""), + "k8s.annotations.workload": newExpectedValue(equal, "cronjob"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-logs-cronjob"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "k8s.cluster.uid": newExpectedValue(regex, uidRe), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), + "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), + "simple-cronjob-workload-annotation": newExpectedValue(equal, "cronjob-annotation"), + }, + }, { name: "logs-job", dataType: pipeline.SignalLogs, @@ -446,6 +528,33 @@ func TestE2E_ClusterRBAC(t *testing.T) { "simple-daemonset-workload-annotation": newExpectedValue(equal, "daemonset-annotation"), }, }, + { + name: "profiles-cronjob", + dataType: xpipeline.SignalProfiles, + service: "test-profiles-cronjob", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(regex, "telemetrygen-"+testID+"-profiles-cronjob-[a-z0-9-]*"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, ""), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, testNs), + "k8s.cronjob.name": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-cronjob"), + "k8s.cronjob.uid": newExpectedValue(exist, ""), + "k8s.job.name": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-cronjob"), + "k8s.job.uid": newExpectedValue(exist, ""), + "k8s.annotations.workload": newExpectedValue(equal, "cronjob"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-cronjob"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "k8s.cluster.uid": newExpectedValue(regex, uidRe), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), + "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), + }, + }, { name: "profiles-job", dataType: xpipeline.SignalProfiles, diff --git a/processor/k8sattributesprocessor/testdata/e2e/clusterrbac/collector/configmap.yaml b/processor/k8sattributesprocessor/testdata/e2e/clusterrbac/collector/configmap.yaml index 5c1c1078dd7b5..64468a70c560d 100644 --- a/processor/k8sattributesprocessor/testdata/e2e/clusterrbac/collector/configmap.yaml +++ b/processor/k8sattributesprocessor/testdata/e2e/clusterrbac/collector/configmap.yaml @@ -54,6 +54,7 @@ data: - k8s.daemonset.name - k8s.daemonset.uid - k8s.cronjob.name + - k8s.cronjob.uid - k8s.job.name - k8s.job.uid - k8s.node.name diff --git a/processor/k8sattributesprocessor/testdata/e2e/clusterrbac/telemetrygen/cronjob.yaml b/processor/k8sattributesprocessor/testdata/e2e/clusterrbac/telemetrygen/cronjob.yaml new file mode 100644 index 0000000000000..bbbf24df5743e --- /dev/null +++ b/processor/k8sattributesprocessor/testdata/e2e/clusterrbac/telemetrygen/cronjob.yaml @@ -0,0 +1,34 @@ +apiVersion: batch/v1 +kind: CronJob +metadata: + name: {{ .Name }}-{{ .DataType}}-cronjob + namespace: e2ek8sattribute-clusterrbac + annotations: + workload: cronjob-annotation +spec: + schedule: "*/1 * * * *" + jobTemplate: + spec: + template: + metadata: + annotations: + workload: cronjob + labels: + app: {{ .Name }}-{{ .DataType }}-cronjob + spec: + containers: + - command: + - /telemetrygen + - {{ .DataType }} + - --otlp-insecure + - --otlp-endpoint={{ .OTLPEndpoint }} + - --rate=1 + - --duration=36000s + - --otlp-attributes=service.name="test-{{ .DataType }}-cronjob" + - --otlp-attributes=k8s.container.name="telemetrygen" + {{- if eq .DataType "traces" }} + - --status-code= + {{- end }} + image: ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen:latest + name: telemetrygen + restartPolicy: OnFailure From a319838e6d2804fa79780c3cec2428dd64223744 Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Thu, 18 Sep 2025 09:53:07 +0100 Subject: [PATCH 15/16] feat: add support for cronjobs in k8s telemetrygen Signed-off-by: Paulo Dias --- pkg/xk8stest/k8s_collector.go | 5 +- pkg/xk8stest/k8s_telemetrygen.go | 80 +++++++++++++++++++++++++++++++- 2 files changed, 80 insertions(+), 5 deletions(-) diff --git a/pkg/xk8stest/k8s_collector.go b/pkg/xk8stest/k8s_collector.go index 65aa731c5393a..1e75d0858c4af 100644 --- a/pkg/xk8stest/k8s_collector.go +++ b/pkg/xk8stest/k8s_collector.go @@ -5,6 +5,7 @@ package xk8stest // import "github.com/open-telemetry/opentelemetry-collector-co import ( "bytes" + "maps" "os" "path/filepath" "testing" @@ -39,9 +40,7 @@ func CreateCollectorObjects(t *testing.T, client *K8sClient, testID, manifestsDi "HostEndpoint": host, "TestID": testID, } - for key, value := range templateValues { - defaultTemplateValues[key] = value - } + maps.Copy(defaultTemplateValues, templateValues) require.NoError(t, tmpl.Execute(manifest, defaultTemplateValues)) obj, err := CreateObject(client, manifest.Bytes()) require.NoErrorf(t, err, "failed to create collector object from manifest %s", manifestFile.Name()) diff --git a/pkg/xk8stest/k8s_telemetrygen.go b/pkg/xk8stest/k8s_telemetrygen.go index 57e4570fc54f3..ac7859b96d747 100644 --- a/pkg/xk8stest/k8s_telemetrygen.go +++ b/pkg/xk8stest/k8s_telemetrygen.go @@ -5,6 +5,7 @@ package xk8stest // import "github.com/open-telemetry/opentelemetry-collector-co import ( "bytes" + "fmt" "os" "path/filepath" "testing" @@ -31,6 +32,78 @@ type TelemetrygenCreateOpts struct { DataTypes []string } +// getPodLabelSelectors returns labels used to select pods created by the workload. +// - Deployment/StatefulSet/DaemonSet: spec.selector.matchLabels (fallback to template.metadata.labels) +// - Job: spec.template.metadata.labels +// - CronJob: spec.jobTemplate.spec.template.metadata.labels +func getPodLabelSelectors(obj *unstructured.Unstructured) (map[string]any, error) { + o := obj.Object + spec, ok := o["spec"].(map[string]any) + if !ok || spec == nil { + return nil, fmt.Errorf("%s/%s missing spec", obj.GetKind(), obj.GetName()) + } + + switch obj.GetKind() { + case "Deployment", "StatefulSet", "DaemonSet": + if sel, ok := spec["selector"].(map[string]any); ok && sel != nil { + if ml, ok := sel["matchLabels"].(map[string]any); ok && ml != nil { + return ml, nil + } + } + // fallback — uncommon but robust + if tmpl, ok := spec["template"].(map[string]any); ok && tmpl != nil { + if meta, ok := tmpl["metadata"].(map[string]any); ok && meta != nil { + if ml, ok := meta["labels"].(map[string]any); ok && ml != nil { + return ml, nil + } + } + } + return nil, fmt.Errorf("%s/%s missing selector.matchLabels and template.metadata.labels", obj.GetKind(), obj.GetName()) + + case "Job": + if tmpl, ok := spec["template"].(map[string]any); ok && tmpl != nil { + if meta, ok := tmpl["metadata"].(map[string]any); ok && meta != nil { + if ml, ok := meta["labels"].(map[string]any); ok && ml != nil { + return ml, nil + } + } + } + // last resort if API server already defaulted it + if sel, ok := spec["selector"].(map[string]any); ok && sel != nil { + if ml, ok := sel["matchLabels"].(map[string]any); ok && ml != nil { + return ml, nil + } + } + return nil, fmt.Errorf("Job/%s missing template.metadata.labels (and selector.matchLabels)", obj.GetName()) + + case "CronJob": + jt, ok := spec["jobTemplate"].(map[string]any) + if !ok || jt == nil { + return nil, fmt.Errorf("CronJob/%s missing spec.jobTemplate", obj.GetName()) + } + jts, ok := jt["spec"].(map[string]any) + if !ok || jts == nil { + return nil, fmt.Errorf("CronJob/%s missing spec.jobTemplate.spec", obj.GetName()) + } + tmpl, ok := jts["template"].(map[string]any) + if !ok || tmpl == nil { + return nil, fmt.Errorf("CronJob/%s missing spec.jobTemplate.spec.template", obj.GetName()) + } + meta, ok := tmpl["metadata"].(map[string]any) + if !ok || meta == nil { + return nil, fmt.Errorf("CronJob/%s missing spec.jobTemplate.spec.template.metadata", obj.GetName()) + } + ml, ok := meta["labels"].(map[string]any) + if !ok || ml == nil { + return nil, fmt.Errorf("CronJob/%s missing spec.jobTemplate.spec.template.metadata.labels", obj.GetName()) + } + return ml, nil + + default: + return nil, fmt.Errorf("unsupported kind %q", obj.GetKind()) + } +} + func CreateTelemetryGenObjects(t *testing.T, client *K8sClient, createOpts *TelemetrygenCreateOpts) ([]*unstructured.Unstructured, []*TelemetrygenObjInfo) { telemetrygenObjInfos := make([]*TelemetrygenObjInfo, 0) manifestFiles, err := os.ReadDir(createOpts.ManifestsDir) @@ -48,10 +121,13 @@ func CreateTelemetryGenObjects(t *testing.T, client *K8sClient, createOpts *Tele })) obj, err := CreateObject(client, manifest.Bytes()) require.NoErrorf(t, err, "failed to create telemetrygen object from manifest %s", manifestFile.Name()) - selector := obj.Object["spec"].(map[string]any)["selector"] + + podLabels, err := getPodLabelSelectors(obj) + require.NoErrorf(t, err, "failed to extract pod label selectors for %s %s", obj.GetKind(), obj.GetName()) + telemetrygenObjInfos = append(telemetrygenObjInfos, &TelemetrygenObjInfo{ Namespace: obj.GetNamespace(), - PodLabelSelectors: selector.(map[string]any)["matchLabels"].(map[string]any), + PodLabelSelectors: podLabels, DataType: dataType, Workload: obj.GetKind(), }) From 4fc3469354134377bd6c2c08742a0d664c79a263 Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Thu, 18 Sep 2025 14:08:10 +0100 Subject: [PATCH 16/16] feat: improve e2e tests Signed-off-by: Paulo Dias --- processor/k8sattributesprocessor/e2e_test.go | 155 ++++++++++++++++--- 1 file changed, 131 insertions(+), 24 deletions(-) diff --git a/processor/k8sattributesprocessor/e2e_test.go b/processor/k8sattributesprocessor/e2e_test.go index 56026716adc78..a3cabce4f6264 100644 --- a/processor/k8sattributesprocessor/e2e_test.go +++ b/processor/k8sattributesprocessor/e2e_test.go @@ -144,7 +144,7 @@ func TestE2E_ClusterRBAC(t *testing.T) { "k8s.namespace.name": newExpectedValue(equal, testNs), "k8s.cronjob.name": newExpectedValue(equal, "telemetrygen-"+testID+"-traces-cronjob"), "k8s.cronjob.uid": newExpectedValue(exist, ""), - "k8s.job.name": newExpectedValue(equal, "telemetrygen-"+testID+"-traces-cronjob"), + "k8s.job.name": newExpectedValue(regex, "telemetrygen-"+testID+"-traces-cronjob-[0-9]*"), "k8s.job.uid": newExpectedValue(exist, ""), "k8s.annotations.workload": newExpectedValue(equal, "cronjob"), "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-traces-cronjob"), @@ -276,7 +276,7 @@ func TestE2E_ClusterRBAC(t *testing.T) { "k8s.namespace.name": newExpectedValue(equal, testNs), "k8s.cronjob.name": newExpectedValue(equal, "telemetrygen-"+testID+"-metrics-cronjob"), "k8s.cronjob.uid": newExpectedValue(exist, ""), - "k8s.job.name": newExpectedValue(equal, "telemetrygen-"+testID+"-metrics-cronjob"), + "k8s.job.name": newExpectedValue(regex, "telemetrygen-"+testID+"-metrics-cronjob-[0-9]*"), "k8s.job.uid": newExpectedValue(exist, ""), "k8s.annotations.workload": newExpectedValue(equal, "cronjob"), "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-metrics-cronjob"), @@ -400,27 +400,26 @@ func TestE2E_ClusterRBAC(t *testing.T) { dataType: pipeline.SignalLogs, service: "test-logs-cronjob", attrs: map[string]*expectedValue{ - "k8s.pod.name": newExpectedValue(regex, "telemetrygen-"+testID+"-logs-cronjob-[a-z0-9-]*"), - "k8s.pod.ip": newExpectedValue(exist, ""), - "k8s.pod.uid": newExpectedValue(regex, uidRe), - "k8s.pod.start_time": newExpectedValue(exist, ""), - "k8s.node.name": newExpectedValue(exist, ""), - "k8s.namespace.name": newExpectedValue(equal, testNs), - "k8s.cronjob.name": newExpectedValue(equal, "telemetrygen-"+testID+"-logs-cronjob"), - "k8s.cronjob.uid": newExpectedValue(exist, ""), - "k8s.job.name": newExpectedValue(equal, "telemetrygen-"+testID+"-logs-cronjob"), - "k8s.job.uid": newExpectedValue(exist, ""), - "k8s.annotations.workload": newExpectedValue(equal, "cronjob"), - "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-logs-cronjob"), - "k8s.container.name": newExpectedValue(equal, "telemetrygen"), - "k8s.cluster.uid": newExpectedValue(regex, uidRe), - "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), - "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), - "container.image.tag": newExpectedValue(equal, "latest"), - "container.id": newExpectedValue(exist, ""), - "k8s.node.labels.foo": newExpectedValue(equal, "too"), - "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), - "simple-cronjob-workload-annotation": newExpectedValue(equal, "cronjob-annotation"), + "k8s.pod.name": newExpectedValue(regex, "telemetrygen-"+testID+"-logs-cronjob-[a-z0-9-]*"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, ""), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, testNs), + "k8s.cronjob.name": newExpectedValue(equal, "telemetrygen-"+testID+"-logs-cronjob"), + "k8s.cronjob.uid": newExpectedValue(exist, ""), + "k8s.job.name": newExpectedValue(regex, "telemetrygen-"+testID+"-logs-cronjob-[0-9]*"), + "k8s.job.uid": newExpectedValue(exist, ""), + "k8s.annotations.workload": newExpectedValue(equal, "cronjob"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-logs-cronjob"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "k8s.cluster.uid": newExpectedValue(regex, uidRe), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), + "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), }, }, { @@ -541,7 +540,7 @@ func TestE2E_ClusterRBAC(t *testing.T) { "k8s.namespace.name": newExpectedValue(equal, testNs), "k8s.cronjob.name": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-cronjob"), "k8s.cronjob.uid": newExpectedValue(exist, ""), - "k8s.job.name": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-cronjob"), + "k8s.job.name": newExpectedValue(regex, "telemetrygen-"+testID+"-profiles-cronjob-[0-9]*"), "k8s.job.uid": newExpectedValue(exist, ""), "k8s.annotations.workload": newExpectedValue(equal, "cronjob"), "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-cronjob"), @@ -1296,6 +1295,33 @@ func TestE2E_ClusterRBACCollectorStartAfterTelemetryGen(t *testing.T) { service string attrs map[string]*expectedValue }{ + { + name: "traces-cronjob", + dataType: pipeline.SignalTraces, + service: "test-traces-cronjob", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(regex, "telemetrygen-"+testID+"-traces-cronjob-[a-z0-9-]*"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, ""), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, testNs), + "k8s.cronjob.name": newExpectedValue(equal, "telemetrygen-"+testID+"-traces-cronjob"), + "k8s.cronjob.uid": newExpectedValue(exist, ""), + "k8s.job.name": newExpectedValue(regex, "telemetrygen-"+testID+"-traces-cronjob-[0-9]*"), + "k8s.job.uid": newExpectedValue(exist, ""), + "k8s.annotations.workload": newExpectedValue(equal, "cronjob"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-traces-cronjob"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "k8s.cluster.uid": newExpectedValue(regex, uidRe), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), + "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), + }, + }, { name: "traces-job", dataType: pipeline.SignalTraces, @@ -1401,6 +1427,33 @@ func TestE2E_ClusterRBACCollectorStartAfterTelemetryGen(t *testing.T) { "simple-daemonset-workload-annotation": newExpectedValue(equal, "daemonset-annotation"), }, }, + { + name: "metrics-cronjob", + dataType: pipeline.SignalMetrics, + service: "test-metrics-cronjob", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(regex, "telemetrygen-"+testID+"-metrics-cronjob-[a-z0-9-]*"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, ""), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, testNs), + "k8s.cronjob.name": newExpectedValue(equal, "telemetrygen-"+testID+"-metrics-cronjob"), + "k8s.cronjob.uid": newExpectedValue(exist, ""), + "k8s.job.name": newExpectedValue(regex, "telemetrygen-"+testID+"-metrics-cronjob-[0-9]*"), + "k8s.job.uid": newExpectedValue(exist, ""), + "k8s.annotations.workload": newExpectedValue(equal, "cronjob"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-metrics-cronjob"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "k8s.cluster.uid": newExpectedValue(regex, uidRe), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), + "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), + }, + }, { name: "metrics-job", dataType: pipeline.SignalMetrics, @@ -1506,6 +1559,33 @@ func TestE2E_ClusterRBACCollectorStartAfterTelemetryGen(t *testing.T) { "simple-daemonset-workload-annotation": newExpectedValue(equal, "daemonset-annotation"), }, }, + { + name: "logs-cronjob", + dataType: pipeline.SignalLogs, + service: "test-logs-cronjob", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(regex, "telemetrygen-"+testID+"-logs-cronjob-[a-z0-9-]*"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, ""), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, testNs), + "k8s.cronjob.name": newExpectedValue(equal, "telemetrygen-"+testID+"-logs-cronjob"), + "k8s.cronjob.uid": newExpectedValue(exist, ""), + "k8s.job.name": newExpectedValue(regex, "telemetrygen-"+testID+"-logs-cronjob-[0-9]*"), + "k8s.job.uid": newExpectedValue(exist, ""), + "k8s.annotations.workload": newExpectedValue(equal, "cronjob"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-logs-cronjob"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "k8s.cluster.uid": newExpectedValue(regex, uidRe), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), + "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), + }, + }, { name: "logs-job", dataType: pipeline.SignalLogs, @@ -1611,6 +1691,33 @@ func TestE2E_ClusterRBACCollectorStartAfterTelemetryGen(t *testing.T) { "simple-daemonset-workload-annotation": newExpectedValue(equal, "daemonset-annotation"), }, }, + { + name: "profiles-cronjob", + dataType: xpipeline.SignalProfiles, + service: "test-profiles-cronjob", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(regex, "telemetrygen-"+testID+"-profiles-cronjob-[a-z0-9-]*"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, ""), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, testNs), + "k8s.cronjob.name": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-cronjob"), + "k8s.cronjob.uid": newExpectedValue(exist, ""), + "k8s.job.name": newExpectedValue(regex, "telemetrygen-"+testID+"-profiles-cronjob-[0-9]*"), + "k8s.job.uid": newExpectedValue(exist, ""), + "k8s.annotations.workload": newExpectedValue(equal, "cronjob"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-cronjob"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "k8s.cluster.uid": newExpectedValue(regex, uidRe), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), + "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), + }, + }, { name: "profiles-job", dataType: xpipeline.SignalProfiles,