From 9b03e092be31a4501b99adfa14c3f6e7ecb40ff8 Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Mon, 10 Nov 2025 22:58:52 +0000 Subject: [PATCH 1/2] [chore][receiver/k8scluster] Add mutex protection for metadata consumers Signed-off-by: Paulo Dias --- receiver/k8sclusterreceiver/watcher.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/receiver/k8sclusterreceiver/watcher.go b/receiver/k8sclusterreceiver/watcher.go index 39689563b8996..85117637f6518 100644 --- a/receiver/k8sclusterreceiver/watcher.go +++ b/receiver/k8sclusterreceiver/watcher.go @@ -8,6 +8,7 @@ import ( "fmt" "reflect" "strings" + "sync" "sync/atomic" "time" @@ -62,6 +63,7 @@ type resourceWatcher struct { initialSyncTimedOut *atomic.Bool config *Config entityLogConsumer consumer.Logs + mu sync.RWMutex // For mocking. makeClient func(apiConf k8sconfig.APIConfig) (kubernetes.Interface, error) @@ -331,7 +333,10 @@ func (rw *resourceWatcher) onAdd(obj any) { } func (rw *resourceWatcher) hasDestination() bool { - return len(rw.metadataConsumers) != 0 || rw.entityLogConsumer != nil + rw.mu.RLock() + has := len(rw.metadataConsumers) != 0 || rw.entityLogConsumer != nil + rw.mu.RUnlock() + return has } func (rw *resourceWatcher) onUpdate(oldObj, newObj any) { @@ -424,7 +429,9 @@ func (rw *resourceWatcher) setupMetadataExporters( ) } + rw.mu.Lock() rw.metadataConsumers = out + rw.mu.Unlock() return nil } @@ -448,11 +455,14 @@ func (rw *resourceWatcher) syncMetadataUpdate(oldMetadata, newMetadata map[exper metadataUpdate := metadata.GetMetadataUpdate(oldMetadata, newMetadata) if len(metadataUpdate) != 0 { + rw.mu.RLock() for _, consume := range rw.metadataConsumers { _ = consume(metadataUpdate) } + rw.mu.RUnlock() } + rw.mu.RLock() if rw.entityLogConsumer != nil { // Represent metadata update as entity events. entityEvents := metadata.GetEntityEvents(oldMetadata, newMetadata, timestamp, rw.config.MetadataCollectionInterval) @@ -477,6 +487,7 @@ func (rw *resourceWatcher) syncMetadataUpdate(oldMetadata, newMetadata map[exper } } } + rw.mu.RUnlock() } // stringSliceToMap converts a slice of strings into a map with keys from the slice From 147c906ff2c6adf6cb55c4ae57f188067e69a1ee Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Tue, 25 Nov 2025 10:02:00 +0000 Subject: [PATCH 2/2] chore: move initialize after setupMetadataExporters Signed-off-by: Paulo Dias --- receiver/k8sclusterreceiver/receiver.go | 9 +++++---- receiver/k8sclusterreceiver/watcher.go | 13 +------------ 2 files changed, 6 insertions(+), 16 deletions(-) diff --git a/receiver/k8sclusterreceiver/receiver.go b/receiver/k8sclusterreceiver/receiver.go index d06cb6a9c513c..71a135acc1fda 100644 --- a/receiver/k8sclusterreceiver/receiver.go +++ b/receiver/k8sclusterreceiver/receiver.go @@ -45,21 +45,22 @@ type getExporters interface { } func (kr *kubernetesReceiver) startReceiver(ctx context.Context, host component.Host) error { - if err := kr.resourceWatcher.initialize(); err != nil { - return err - } - ge, ok := host.(getExporters) if !ok { return errors.New("unable to get exporters") } exporters := ge.GetExporters() + // Setup metadata exporters before initializing watchers to avoid concurrent access if err := kr.resourceWatcher.setupMetadataExporters( exporters[pipeline.SignalMetrics], kr.config.MetadataExporters); err != nil { return err } + if err := kr.resourceWatcher.initialize(); err != nil { + return err + } + go func() { kr.settings.Logger.Info("Starting shared informers and wait for initial cache sync.") for _, informer := range kr.resourceWatcher.informerFactories { diff --git a/receiver/k8sclusterreceiver/watcher.go b/receiver/k8sclusterreceiver/watcher.go index 85117637f6518..39689563b8996 100644 --- a/receiver/k8sclusterreceiver/watcher.go +++ b/receiver/k8sclusterreceiver/watcher.go @@ -8,7 +8,6 @@ import ( "fmt" "reflect" "strings" - "sync" "sync/atomic" "time" @@ -63,7 +62,6 @@ type resourceWatcher struct { initialSyncTimedOut *atomic.Bool config *Config entityLogConsumer consumer.Logs - mu sync.RWMutex // For mocking. makeClient func(apiConf k8sconfig.APIConfig) (kubernetes.Interface, error) @@ -333,10 +331,7 @@ func (rw *resourceWatcher) onAdd(obj any) { } func (rw *resourceWatcher) hasDestination() bool { - rw.mu.RLock() - has := len(rw.metadataConsumers) != 0 || rw.entityLogConsumer != nil - rw.mu.RUnlock() - return has + return len(rw.metadataConsumers) != 0 || rw.entityLogConsumer != nil } func (rw *resourceWatcher) onUpdate(oldObj, newObj any) { @@ -429,9 +424,7 @@ func (rw *resourceWatcher) setupMetadataExporters( ) } - rw.mu.Lock() rw.metadataConsumers = out - rw.mu.Unlock() return nil } @@ -455,14 +448,11 @@ func (rw *resourceWatcher) syncMetadataUpdate(oldMetadata, newMetadata map[exper metadataUpdate := metadata.GetMetadataUpdate(oldMetadata, newMetadata) if len(metadataUpdate) != 0 { - rw.mu.RLock() for _, consume := range rw.metadataConsumers { _ = consume(metadataUpdate) } - rw.mu.RUnlock() } - rw.mu.RLock() if rw.entityLogConsumer != nil { // Represent metadata update as entity events. entityEvents := metadata.GetEntityEvents(oldMetadata, newMetadata, timestamp, rw.config.MetadataCollectionInterval) @@ -487,7 +477,6 @@ func (rw *resourceWatcher) syncMetadataUpdate(oldMetadata, newMetadata map[exper } } } - rw.mu.RUnlock() } // stringSliceToMap converts a slice of strings into a map with keys from the slice