From bd5a99f00557fb446af4fde58791ff46012adeae Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Tue, 30 Sep 2025 22:43:22 +0100 Subject: [PATCH 1/5] [receiver/k8sclusterreceiver] Switch to standby mode when leader lease is lost Signed-off-by: Paulo Dias --- .chloggen/feat_42707.yaml | 27 +++++ receiver/k8sclusterreceiver/receiver.go | 46 ++++++-- receiver/k8sclusterreceiver/receiver_test.go | 111 +++++++++++++++++++ receiver/k8sclusterreceiver/watcher.go | 21 +++- 4 files changed, 192 insertions(+), 13 deletions(-) create mode 100644 .chloggen/feat_42707.yaml diff --git a/.chloggen/feat_42707.yaml b/.chloggen/feat_42707.yaml new file mode 100644 index 0000000000000..c97374c73fcbd --- /dev/null +++ b/.chloggen/feat_42707.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "enhancement" + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: "receiver/k8sclusterreceiver" + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Switch to standby mode when leader lease is lost instead of shutdown" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [42707] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/k8sclusterreceiver/receiver.go b/receiver/k8sclusterreceiver/receiver.go index d06cb6a9c513c..0c3c54e30755d 100644 --- a/receiver/k8sclusterreceiver/receiver.go +++ b/receiver/k8sclusterreceiver/receiver.go @@ -6,6 +6,7 @@ package k8sclusterreceiver // import "github.com/open-telemetry/opentelemetry-co import ( "context" "errors" + "sync" "time" "go.opentelemetry.io/collector/component" @@ -37,6 +38,9 @@ type kubernetesReceiver struct { settings receiver.Settings metricsConsumer consumer.Metrics cancel context.CancelFunc + sessionCancel context.CancelFunc + mu sync.Mutex + wg sync.WaitGroup obsrecv *receiverhelper.ObsReport } @@ -60,13 +64,23 @@ func (kr *kubernetesReceiver) startReceiver(ctx context.Context, host component. return err } + // Create a leadership-session context so we can stop ONLY the watches/ticker on standby. + cctx, cancel := context.WithCancel(ctx) + kr.mu.Lock() + // replace any previous sessionCancel (should normally be nil when starting fresh) + kr.sessionCancel = cancel + // track the session worker goroutine + kr.wg.Add(1) + kr.mu.Unlock() + go func() { + defer kr.wg.Done() kr.settings.Logger.Info("Starting shared informers and wait for initial cache sync.") for _, informer := range kr.resourceWatcher.informerFactories { if informer == nil { continue } - timedContextForInitialSync := kr.resourceWatcher.startWatchingResources(ctx, informer) + timedContextForInitialSync := kr.resourceWatcher.startWatchingResources(cctx, informer) // Wait till either the initial cache sync times out or until the cancel method // corresponding to this context is called. @@ -91,8 +105,8 @@ func (kr *kubernetesReceiver) startReceiver(ctx context.Context, host component. for { select { case <-ticker.C: - kr.dispatchMetrics(ctx) - case <-ctx.Done(): + kr.dispatchMetrics(cctx) + case <-cctx.Done(): return } } @@ -122,11 +136,12 @@ func (kr *kubernetesReceiver) Start(ctx context.Context, host component.Host) er } leaderElectorExt.SetCallBackFuncs( - func(ctx context.Context) { - if err := kr.startReceiver(ctx, host); err != nil { + func(cbCtx context.Context) { + if err := kr.startReceiver(cbCtx, host); err != nil { kr.settings.Logger.Error("Failed to start receiver", zap.Error(err)) } - }, func() { + }, + func() { kr.stopReceiver() }, ) @@ -141,14 +156,27 @@ func (kr *kubernetesReceiver) Start(ctx context.Context, host component.Host) er } func (kr *kubernetesReceiver) stopReceiver() { - kr.settings.Logger.Info("Stopping the receiver") - if kr.cancel != nil { - kr.cancel() + kr.settings.Logger.Info("Stopping the receiver session (standby)") + kr.mu.Lock() + cancel := kr.sessionCancel + kr.sessionCancel = nil + kr.mu.Unlock() + + if cancel != nil { + cancel() } + + // Wait for the session goroutine to exit to avoid overlaps on quick flaps. + kr.wg.Wait() } func (kr *kubernetesReceiver) Shutdown(context.Context) error { + // First stop the active session (if any). kr.stopReceiver() + // Then cancel the component-level context. + if kr.cancel != nil { + kr.cancel() + } return nil } diff --git a/receiver/k8sclusterreceiver/receiver_test.go b/receiver/k8sclusterreceiver/receiver_test.go index 94b666122d1a0..3244cea683f23 100644 --- a/receiver/k8sclusterreceiver/receiver_test.go +++ b/receiver/k8sclusterreceiver/receiver_test.go @@ -358,6 +358,103 @@ func getUpdatedPod(pod *corev1.Pod) any { } } +func TestReceiverWithLeaderElection_StandbyAndResume_Metrics(t *testing.T) { + fakeLeaderElection := &k8sleaderelectortest.FakeLeaderElection{} + fakeHost := &k8sleaderelectortest.FakeHost{FakeLeaderElection: fakeLeaderElection} + + client := newFakeClientWithAllResources() + osQuotaClient := fakeQuota.NewSimpleClientset() + sink := new(consumertest.MetricsSink) + + tt := componenttest.NewTelemetry() + defer func() { require.NoError(t, tt.Shutdown(t.Context())) }() + + kr := setupReceiver(client, osQuotaClient, sink, nil, 10*time.Second, tt, nil, component.MustNewID("k8s_leader_elector")) + + // Seed initial pods once (avoid calling createPods multiple times to prevent name collisions). + createPods(t, client, 2, false) + + require.NoError(t, kr.Start(t.Context(), fakeHost)) + + // Become leader -> start session + fakeLeaderElection.InvokeOnLeading() + + // Wait for first collection + var baseline int + require.Eventually(t, func() bool { + baseline = sink.DataPointCount() + return baseline >= 2 + }, 10*time.Second, 100*time.Millisecond, "metrics not collected while leader") + + // Enter standby: stop current session (informers + ticker) + fakeLeaderElection.InvokeOnStopping() + + // While in standby, add a new pod with a UNIQUE name; should NOT affect metrics + createUniquePod(t, client, "default", "standby-pod-1") + + // Wait > 1 interval; no new data should arrive in standby + time.Sleep(1500 * time.Millisecond) + require.Equal(t, baseline, sink.DataPointCount(), "no metrics should be emitted while in standby") + + // Regain leadership + fakeLeaderElection.InvokeOnLeading() + + // Add another unique pod after resuming -> should be picked up + createUniquePod(t, client, "default", "resumed-pod-1") + + require.Eventually(t, func() bool { + return sink.DataPointCount() > baseline + }, 10*time.Second, 100*time.Millisecond, "metrics did not resume after re-leading") + + require.NoError(t, kr.Shutdown(t.Context())) +} + +func TestReceiverWithLeaderElection_StandbyDoesNotDeadlock_OnQuickFlaps(t *testing.T) { + fakeLeaderElection := &k8sleaderelectortest.FakeLeaderElection{} + fakeHost := &k8sleaderelectortest.FakeHost{FakeLeaderElection: fakeLeaderElection} + + client := newFakeClientWithAllResources() + osQuotaClient := fakeQuota.NewSimpleClientset() + sink := new(consumertest.MetricsSink) + + tt := componenttest.NewTelemetry() + defer func() { require.NoError(t, tt.Shutdown(t.Context())) }() + + kr := setupReceiver(client, osQuotaClient, sink, nil, 10*time.Second, tt, nil, component.MustNewID("k8s_leader_elector")) + + // Seed something to observe; also avoids zero-work sessions + createUniquePod(t, client, "default", "initial-pod") + + require.NoError(t, kr.Start(t.Context(), fakeHost)) + + // Start one clean session and wait for any data point to ensure we're up + fakeLeaderElection.InvokeOnLeading() + require.Eventually(t, func() bool { + return sink.DataPointCount() >= 1 + }, 10*time.Second, 100*time.Millisecond, "session did not start") + + // Flap with small sleeps to let stop complete before next start, avoiding exporter race + const loops = 50 + done := make(chan struct{}) + go func() { + defer close(done) + for i := 0; i < loops; i++ { + fakeLeaderElection.InvokeOnStopping() + time.Sleep(3 * time.Millisecond) // allow session goroutine to exit + fakeLeaderElection.InvokeOnLeading() + time.Sleep(3 * time.Millisecond) // allow new session to initialize + } + }() + + select { + case <-done: + case <-time.After(10 * time.Second): + t.Fatal("leader flap goroutine timed out (possible deadlock)") + } + + require.NoError(t, kr.Shutdown(t.Context())) +} + func setupReceiver(client *fake.Clientset, osQuotaClient quotaclientset.Interface, metricsConsumer consumer.Metrics, logsConsumer consumer.Logs, initialSyncTimeout time.Duration, tt *componenttest.Telemetry, namespaces []string, leaderElector component.ID) *kubernetesReceiver { distribution := distributionKubernetes if osQuotaClient != nil { @@ -438,3 +535,17 @@ func gvkToAPIResource(gvk schema.GroupVersionKind) v1.APIResource { Kind: gvk.Kind, } } + +func createUniquePod(t *testing.T, client *fake.Clientset, ns, name string) { + t.Helper() + _, err := client.CoreV1().Pods(ns).Create(t.Context(), &corev1.Pod{ + ObjectMeta: v1.ObjectMeta{ + Name: name, + Namespace: ns, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "c", Image: "i"}}, + }, + }, v1.CreateOptions{}) + require.NoError(t, err) +} diff --git a/receiver/k8sclusterreceiver/watcher.go b/receiver/k8sclusterreceiver/watcher.go index aef81272c034a..8af3567a29b40 100644 --- a/receiver/k8sclusterreceiver/watcher.go +++ b/receiver/k8sclusterreceiver/watcher.go @@ -8,6 +8,7 @@ import ( "fmt" "reflect" "strings" + "sync" "sync/atomic" "time" @@ -63,6 +64,7 @@ type resourceWatcher struct { initialSyncTimedOut *atomic.Bool config *Config entityLogConsumer consumer.Logs + mu sync.RWMutex // For mocking. makeClient func(apiConf k8sconfig.APIConfig) (kubernetes.Interface, error) @@ -332,7 +334,10 @@ func (rw *resourceWatcher) onAdd(obj any) { } func (rw *resourceWatcher) hasDestination() bool { - return len(rw.metadataConsumers) != 0 || rw.entityLogConsumer != nil + rw.mu.RLock() + has := len(rw.metadataConsumers) != 0 || rw.entityLogConsumer != nil + rw.mu.RUnlock() + return has } func (rw *resourceWatcher) onUpdate(oldObj, newObj any) { @@ -425,7 +430,9 @@ func (rw *resourceWatcher) setupMetadataExporters( ) } + rw.mu.Lock() rw.metadataConsumers = out + rw.mu.Unlock() return nil } @@ -449,12 +456,18 @@ func (rw *resourceWatcher) syncMetadataUpdate(oldMetadata, newMetadata map[exper metadataUpdate := metadata.GetMetadataUpdate(oldMetadata, newMetadata) if len(metadataUpdate) != 0 { - for _, consume := range rw.metadataConsumers { + rw.mu.RLock() + consumers := append([]metadataConsumer(nil), rw.metadataConsumers...) + rw.mu.RUnlock() + for _, consume := range consumers { _ = consume(metadataUpdate) } } - if rw.entityLogConsumer != nil { + rw.mu.RLock() + entityLogsConsumer := rw.entityLogConsumer + rw.mu.RUnlock() + if entityLogsConsumer != nil { // Represent metadata update as entity events. entityEvents := metadata.GetEntityEvents(oldMetadata, newMetadata, timestamp, rw.config.MetadataCollectionInterval) @@ -462,7 +475,7 @@ func (rw *resourceWatcher) syncMetadataUpdate(oldMetadata, newMetadata map[exper logs := entityEvents.ConvertAndMoveToLogs() if logs.LogRecordCount() != 0 { - err := rw.entityLogConsumer.ConsumeLogs(context.Background(), logs) + err := entityLogsConsumer.ConsumeLogs(context.Background(), logs) if err != nil { rw.logger.Error("Error sending entity events to the consumer", zap.Error(err)) From 8365f3e10a7b8d8ef3f2312a96aed1a929c37be1 Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Wed, 1 Oct 2025 12:04:13 +0100 Subject: [PATCH 2/5] feat: Reset per-leadership-session state informers Signed-off-by: Paulo Dias --- receiver/k8sclusterreceiver/watcher.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/receiver/k8sclusterreceiver/watcher.go b/receiver/k8sclusterreceiver/watcher.go index 8af3567a29b40..bcc8f8f547c07 100644 --- a/receiver/k8sclusterreceiver/watcher.go +++ b/receiver/k8sclusterreceiver/watcher.go @@ -88,6 +88,17 @@ func newResourceWatcher(set receiver.Settings, cfg *Config, metadataStore *metad } func (rw *resourceWatcher) initialize() error { + // Reset per-leadership-session state to avoid accumulating informers and + // to ensure handlers wait for cache sync again after re-election. + // (Old factories tied to the previous context must not be reused.) + rw.informerFactories = nil + if rw.initialSyncDone != nil { + rw.initialSyncDone.Store(false) + } + if rw.initialSyncTimedOut != nil { + rw.initialSyncTimedOut.Store(false) + } + client, err := rw.makeClient(rw.config.APIConfig) if err != nil { return fmt.Errorf("Failed to create Kubernetes client: %w", err) From 2f2c72e1021af70f7b00480a2e9e3a60e45355ca Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Wed, 8 Oct 2025 10:40:24 +0100 Subject: [PATCH 3/5] feat: move wg to be session scoped Signed-off-by: Paulo Dias --- receiver/k8sclusterreceiver/receiver.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/receiver/k8sclusterreceiver/receiver.go b/receiver/k8sclusterreceiver/receiver.go index 0c3c54e30755d..f58c650a805e0 100644 --- a/receiver/k8sclusterreceiver/receiver.go +++ b/receiver/k8sclusterreceiver/receiver.go @@ -40,7 +40,7 @@ type kubernetesReceiver struct { cancel context.CancelFunc sessionCancel context.CancelFunc mu sync.Mutex - wg sync.WaitGroup + wg *sync.WaitGroup obsrecv *receiverhelper.ObsReport } @@ -70,11 +70,13 @@ func (kr *kubernetesReceiver) startReceiver(ctx context.Context, host component. // replace any previous sessionCancel (should normally be nil when starting fresh) kr.sessionCancel = cancel // track the session worker goroutine - kr.wg.Add(1) + wg := &sync.WaitGroup{} + wg.Add(1) + kr.wg = wg kr.mu.Unlock() - go func() { - defer kr.wg.Done() + go func(wg *sync.WaitGroup) { + defer wg.Done() kr.settings.Logger.Info("Starting shared informers and wait for initial cache sync.") for _, informer := range kr.resourceWatcher.informerFactories { if informer == nil { @@ -110,7 +112,7 @@ func (kr *kubernetesReceiver) startReceiver(ctx context.Context, host component. return } } - }() + }(wg) return nil } @@ -159,7 +161,9 @@ func (kr *kubernetesReceiver) stopReceiver() { kr.settings.Logger.Info("Stopping the receiver session (standby)") kr.mu.Lock() cancel := kr.sessionCancel + wg := kr.wg kr.sessionCancel = nil + kr.wg = nil kr.mu.Unlock() if cancel != nil { @@ -167,7 +171,9 @@ func (kr *kubernetesReceiver) stopReceiver() { } // Wait for the session goroutine to exit to avoid overlaps on quick flaps. - kr.wg.Wait() + if wg != nil { + wg.Wait() + } } func (kr *kubernetesReceiver) Shutdown(context.Context) error { From a39cf6b05364b45a848d5efd4a2796f01e1aa246 Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Wed, 8 Oct 2025 10:44:11 +0100 Subject: [PATCH 4/5] chore: fix changelog Signed-off-by: Paulo Dias --- .chloggen/feat_42707.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.chloggen/feat_42707.yaml b/.chloggen/feat_42707.yaml index c97374c73fcbd..8c6cbbbf9efbc 100644 --- a/.chloggen/feat_42707.yaml +++ b/.chloggen/feat_42707.yaml @@ -4,7 +4,7 @@ change_type: "enhancement" # The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) -component: "receiver/k8sclusterreceiver" +component: "receiver/k8s_cluster" # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). note: "Switch to standby mode when leader lease is lost instead of shutdown" From 9170c5ee26084d90803974547a805ba7ede5117b Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Mon, 10 Nov 2025 22:46:46 +0000 Subject: [PATCH 5/5] chore: add comprehensive comments explaining standby mode pattern Signed-off-by: Paulo Dias --- receiver/k8sclusterreceiver/receiver.go | 30 ++++++++++++++++++++----- receiver/k8sclusterreceiver/watcher.go | 5 ++++- 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/receiver/k8sclusterreceiver/receiver.go b/receiver/k8sclusterreceiver/receiver.go index f58c650a805e0..43af6cc7f0dc2 100644 --- a/receiver/k8sclusterreceiver/receiver.go +++ b/receiver/k8sclusterreceiver/receiver.go @@ -37,17 +37,27 @@ type kubernetesReceiver struct { config *Config settings receiver.Settings metricsConsumer consumer.Metrics - cancel context.CancelFunc - sessionCancel context.CancelFunc - mu sync.Mutex - wg *sync.WaitGroup - obsrecv *receiverhelper.ObsReport + // cancel is the component-level context cancel function, only called during Shutdown + // to fully terminate the receiver. + cancel context.CancelFunc + // sessionCancel is the session-level context cancel function, called when losing leadership + // to stop the current informers/ticker while keeping the receiver instance alive for re-election. + sessionCancel context.CancelFunc + // mu protects sessionCancel and wg during leadership transitions. + mu sync.Mutex + // wg tracks the session goroutine lifecycle for clean shutdown on leadership loss. + wg *sync.WaitGroup + obsrecv *receiverhelper.ObsReport } type getExporters interface { GetExporters() map[pipeline.Signal]map[component.ID]component.Component } +// startReceiver begins a leadership session by initializing informers and starting watchers. +// It's called on initial Start (if no leader election) or by the leader elector's onStartLeading callback. +// When called multiple times due to leadership transitions, initialize() resets per-session state +// (informer factories and cache sync flags) to ensure clean restarts. func (kr *kubernetesReceiver) startReceiver(ctx context.Context, host component.Host) error { if err := kr.resourceWatcher.initialize(); err != nil { return err @@ -157,6 +167,16 @@ func (kr *kubernetesReceiver) Start(ctx context.Context, host component.Host) er return nil } +// stopReceiver puts the receiver in standby mode by stopping the current leadership session. +// Unlike Shutdown, this keeps the receiver instance alive and registered with the leader elector, +// so it can compete for leadership again. The receiver remains ready to restart via the +// onStartLeading callback when leadership is regained. +// +// This implements the "standby" pattern where: +// - The session context is cancelled, stopping informers and the metrics collection ticker +// - The session goroutine exits cleanly +// - All cached Kubernetes state (informer factories) will be reset on the next startReceiver call +// - The receiver struct and its registration with leader elector remain intact func (kr *kubernetesReceiver) stopReceiver() { kr.settings.Logger.Info("Stopping the receiver session (standby)") kr.mu.Lock() diff --git a/receiver/k8sclusterreceiver/watcher.go b/receiver/k8sclusterreceiver/watcher.go index c5a471c90d673..91bcefaa6758f 100644 --- a/receiver/k8sclusterreceiver/watcher.go +++ b/receiver/k8sclusterreceiver/watcher.go @@ -63,7 +63,10 @@ type resourceWatcher struct { initialSyncTimedOut *atomic.Bool config *Config entityLogConsumer consumer.Logs - mu sync.RWMutex + // mu protects metadataConsumers and entityLogConsumer during leadership transitions. + // These fields are modified in setupMetadataExporters (called on leadership gain) + // while potentially being accessed concurrently by informer callbacks. + mu sync.RWMutex // For mocking. makeClient func(apiConf k8sconfig.APIConfig) (kubernetes.Interface, error)