From b1b402200d5e2a8c17107bd2ec0a7b2ee163ad41 Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Tue, 25 Nov 2025 10:45:49 +0000 Subject: [PATCH 1/7] [receiver/k8sevents] shutdown instead of standby on leader election loss Signed-off-by: Paulo Dias --- .chloggen/feat_k8sevents_remove_standby.yaml | 30 +++++++++++++ receiver/k8seventsreceiver/receiver.go | 44 +++++--------------- 2 files changed, 41 insertions(+), 33 deletions(-) create mode 100644 .chloggen/feat_k8sevents_remove_standby.yaml diff --git a/.chloggen/feat_k8sevents_remove_standby.yaml b/.chloggen/feat_k8sevents_remove_standby.yaml new file mode 100644 index 0000000000000..213b7b12141fe --- /dev/null +++ b/.chloggen/feat_k8sevents_remove_standby.yaml @@ -0,0 +1,30 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "enhancement" + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: receiver/k8sevents + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Remove standby mode behavior on leader election loss. Receiver now fully shuts down when losing leader lease instead of entering standby mode. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [44513, 42330] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + This change removes the standby mode feature introduced in PR #42330. The receiver will now call Shutdown() + when leader lease is lost, ensuring is using the same behaviour as the other components which uses the + k8sleaderelector extension. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/k8seventsreceiver/receiver.go b/receiver/k8seventsreceiver/receiver.go index 265a43e7ac5dd..15e5b602e2a03 100644 --- a/receiver/k8seventsreceiver/receiver.go +++ b/receiver/k8seventsreceiver/receiver.go @@ -6,13 +6,13 @@ package k8seventsreceiver // import "github.com/open-telemetry/opentelemetry-col import ( "context" "fmt" - "sync" "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverhelper" + "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" k8s "k8s.io/client-go/kubernetes" @@ -31,7 +31,6 @@ type k8seventsReceiver struct { ctx context.Context cancel context.CancelFunc obsrecv *receiverhelper.ObsReport - wg sync.WaitGroup } // newReceiver creates the Kubernetes events receiver with the given configuration. @@ -92,12 +91,13 @@ func (kr *k8seventsReceiver) Start(ctx context.Context, host component.Host) err } } }, - // onStoppedLeading: stop watches, but DO NOT shut the whole receiver down func() { - kr.settings.Logger.Info("no longer leader, stopping watches") - kr.stopWatches() - }, - ) + kr.settings.Logger.Info("no longer leader, stopping") + err := kr.Shutdown(context.Background()) + if err != nil { + kr.settings.Logger.Error("shutdown receiver error:", zap.Error(err)) + } + }) return nil } @@ -114,32 +114,14 @@ func (kr *k8seventsReceiver) Start(ctx context.Context, host component.Host) err } func (kr *k8seventsReceiver) Shutdown(context.Context) error { - // Stop informers and wait for them to exit. - kr.stopWatches() - if kr.cancel != nil { kr.cancel() - kr.cancel = nil } - return nil -} -// stopWatches closes all informer stop channels (idempotently) and waits for their goroutines to exit. -func (kr *k8seventsReceiver) stopWatches() { - if len(kr.stopperChanList) == 0 { - return - } - for _, ch := range kr.stopperChanList { - select { - case <-ch: // already closed - default: - close(ch) - } + for _, stopperChan := range kr.stopperChanList { + close(stopperChan) } - // Wait for all controller.Run goroutines to finish. - kr.wg.Wait() - // Reset slice so we can start again on leadership regain. - kr.stopperChanList = nil + return nil } // Add the 'Event' handler and trigger the watch for a specific namespace. @@ -188,11 +170,7 @@ func (kr *k8seventsReceiver) startWatchingNamespace( ResyncPeriod: 0, Handler: handlers, }) - kr.wg.Add(1) - go func() { - defer kr.wg.Done() - controller.Run(stopper) - }() + go controller.Run(stopper) } // Allow events with eventTimestamp(EventTime/LastTimestamp/FirstTimestamp) From 5e340dba829e4a41775d5c99bb8122af14ef53ea Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Tue, 25 Nov 2025 10:51:30 +0000 Subject: [PATCH 2/7] chore: update changelog Signed-off-by: Paulo Dias --- .chloggen/feat_k8sevents_remove_standby.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.chloggen/feat_k8sevents_remove_standby.yaml b/.chloggen/feat_k8sevents_remove_standby.yaml index 213b7b12141fe..438350a0c90ef 100644 --- a/.chloggen/feat_k8sevents_remove_standby.yaml +++ b/.chloggen/feat_k8sevents_remove_standby.yaml @@ -4,7 +4,7 @@ change_type: "enhancement" # The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) -component: receiver/k8sevents +component: receiver/k8s_events # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). note: Remove standby mode behavior on leader election loss. Receiver now fully shuts down when losing leader lease instead of entering standby mode. From 90693ac5769f602a5b9b1b62104dfe6bc1b6c16f Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Tue, 25 Nov 2025 11:00:56 +0000 Subject: [PATCH 3/7] fix: linting Signed-off-by: Paulo Dias --- receiver/k8seventsreceiver/receiver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/k8seventsreceiver/receiver.go b/receiver/k8seventsreceiver/receiver.go index 15e5b602e2a03..d1f7bdd0e0642 100644 --- a/receiver/k8seventsreceiver/receiver.go +++ b/receiver/k8seventsreceiver/receiver.go @@ -156,7 +156,7 @@ func (kr *k8seventsReceiver) handleEvent(ev *corev1.Event) { // startWatchingNamespace creates an informer and starts // watching a specific namespace for the events. -func (kr *k8seventsReceiver) startWatchingNamespace( +func (*k8seventsReceiver) startWatchingNamespace( clientset k8s.Interface, handlers cache.ResourceEventHandlerFuncs, ns string, From 6bd6e844a649bde9ef5764ac8f9735b16d838980 Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Tue, 25 Nov 2025 11:21:22 +0000 Subject: [PATCH 4/7] fix: fix behaviour Signed-off-by: Paulo Dias --- receiver/k8seventsreceiver/receiver.go | 17 ++++++++++++++++- receiver/k8seventsreceiver/receiver_test.go | 16 ++++++++-------- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/receiver/k8seventsreceiver/receiver.go b/receiver/k8seventsreceiver/receiver.go index d1f7bdd0e0642..aaa13f87e84d3 100644 --- a/receiver/k8seventsreceiver/receiver.go +++ b/receiver/k8seventsreceiver/receiver.go @@ -6,6 +6,7 @@ package k8seventsreceiver // import "github.com/open-telemetry/opentelemetry-col import ( "context" "fmt" + "sync" "time" "go.opentelemetry.io/collector/component" @@ -31,6 +32,7 @@ type k8seventsReceiver struct { ctx context.Context cancel context.CancelFunc obsrecv *receiverhelper.ObsReport + mu sync.Mutex } // newReceiver creates the Kubernetes events receiver with the given configuration. @@ -81,7 +83,10 @@ func (kr *k8seventsReceiver) Start(ctx context.Context, host component.Host) err kr.settings.Logger.Info("registering the receiver in leader election") elector.SetCallBackFuncs( - func(_ context.Context) { + func(ctx context.Context) { + cctx, cancel := context.WithCancel(ctx) + kr.cancel = cancel + kr.ctx = cctx kr.settings.Logger.Info("Events Receiver started as leader") if len(kr.config.Namespaces) == 0 { kr.startWatch(corev1.NamespaceAll, k8sInterface) @@ -118,9 +123,12 @@ func (kr *k8seventsReceiver) Shutdown(context.Context) error { kr.cancel() } + kr.mu.Lock() for _, stopperChan := range kr.stopperChanList { close(stopperChan) } + kr.stopperChanList = nil + kr.mu.Unlock() return nil } @@ -129,7 +137,9 @@ func (kr *k8seventsReceiver) Shutdown(context.Context) error { // https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/client-go/tools/record/events_cache.go#L327 func (kr *k8seventsReceiver) startWatch(ns string, client k8s.Interface) { stopperChan := make(chan struct{}) + kr.mu.Lock() kr.stopperChanList = append(kr.stopperChanList, stopperChan) + kr.mu.Unlock() kr.startWatchingNamespace(client, cache.ResourceEventHandlerFuncs{ AddFunc: func(obj any) { if ev, ok := obj.(*corev1.Event); ok { @@ -145,6 +155,11 @@ func (kr *k8seventsReceiver) startWatch(ns string, client k8s.Interface) { } func (kr *k8seventsReceiver) handleEvent(ev *corev1.Event) { + // Check if context is cancelled (receiver has been shut down) + if kr.ctx != nil && kr.ctx.Err() != nil { + return + } + if kr.allowEvent(ev) { ld := k8sEventToLogData(kr.settings.Logger, ev, kr.settings.BuildInfo.Version) diff --git a/receiver/k8seventsreceiver/receiver_test.go b/receiver/k8seventsreceiver/receiver_test.go index 6654871277a26..3d4a67bcc8a43 100644 --- a/receiver/k8seventsreceiver/receiver_test.go +++ b/receiver/k8seventsreceiver/receiver_test.go @@ -152,9 +152,6 @@ func TestReceiverWithLeaderElection(t *testing.T) { recv := r.(*k8seventsReceiver) require.NoError(t, r.Start(t.Context(), host)) - t.Cleanup(func() { - assert.NoError(t, r.Shutdown(t.Context())) - }) // Become leader: start processing events le.InvokeOnLeading() @@ -164,21 +161,24 @@ func TestReceiverWithLeaderElection(t *testing.T) { return sink.LogRecordCount() == 1 }, 5*time.Second, 100*time.Millisecond, "logs not collected while leader") - // lose leadership + // lose leadership - this will trigger Shutdown() le.InvokeOnStopping() - // DO NOT call recv.handleEvent(...) here; informer wouldn't deliver to this instance. - // Give a tiny moment and ensure count stays 1. + // Event should be ignored after losing leadership + recv.handleEvent(getEvent("Normal")) time.Sleep(100 * time.Millisecond) - assert.Equal(t, 1, sink.LogRecordCount(), "event should be ignored after losing leadership") + require.Equal(t, 1, sink.LogRecordCount(), "event should be ignored after losing leadership") - // regain leadership and inject again + // regain leadership le.InvokeOnLeading() recv.handleEvent(getEvent("Normal")) require.Eventually(t, func() bool { return sink.LogRecordCount() == 2 }, 5*time.Second, 100*time.Millisecond, "logs not collected after regaining leadership") + + // Final cleanup + require.NoError(t, r.Shutdown(t.Context())) } func getEvent(eventType string) *corev1.Event { From 5d8acf7d5936d1e451154ea8924e692987c5eadc Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Mon, 1 Dec 2025 16:15:26 +0000 Subject: [PATCH 5/7] chore: add comments to clarify shutdown Signed-off-by: Paulo Dias --- receiver/k8seventsreceiver/receiver.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/receiver/k8seventsreceiver/receiver.go b/receiver/k8seventsreceiver/receiver.go index aaa13f87e84d3..5680c11dbeeff 100644 --- a/receiver/k8seventsreceiver/receiver.go +++ b/receiver/k8seventsreceiver/receiver.go @@ -82,6 +82,8 @@ func (kr *k8seventsReceiver) Start(ctx context.Context, host component.Host) err kr.settings.Logger.Info("registering the receiver in leader election") + // Register callbacks with the leader elector extension. These callbacks remain active + // for the lifetime of the receiver, allowing it to restart when leadership is regained. elector.SetCallBackFuncs( func(ctx context.Context) { cctx, cancel := context.WithCancel(ctx) @@ -97,6 +99,8 @@ func (kr *k8seventsReceiver) Start(ctx context.Context, host component.Host) err } }, func() { + // Shutdown on leader loss. The receiver will restart if leadership is regained + // since the callbacks remain registered with the leader elector extension. kr.settings.Logger.Info("no longer leader, stopping") err := kr.Shutdown(context.Background()) if err != nil { From 19ce2b1f87e097330f22330435f87dfc37c28085 Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Wed, 3 Dec 2025 11:00:24 +0000 Subject: [PATCH 6/7] fix: remove unneeded context check Signed-off-by: Paulo Dias --- receiver/k8seventsreceiver/receiver.go | 5 ----- receiver/k8seventsreceiver/receiver_test.go | 5 ++--- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/receiver/k8seventsreceiver/receiver.go b/receiver/k8seventsreceiver/receiver.go index 5680c11dbeeff..6f9ce556efe9a 100644 --- a/receiver/k8seventsreceiver/receiver.go +++ b/receiver/k8seventsreceiver/receiver.go @@ -159,11 +159,6 @@ func (kr *k8seventsReceiver) startWatch(ns string, client k8s.Interface) { } func (kr *k8seventsReceiver) handleEvent(ev *corev1.Event) { - // Check if context is cancelled (receiver has been shut down) - if kr.ctx != nil && kr.ctx.Err() != nil { - return - } - if kr.allowEvent(ev) { ld := k8sEventToLogData(kr.settings.Logger, ev, kr.settings.BuildInfo.Version) diff --git a/receiver/k8seventsreceiver/receiver_test.go b/receiver/k8seventsreceiver/receiver_test.go index 3d4a67bcc8a43..9d9ae6918ab3a 100644 --- a/receiver/k8seventsreceiver/receiver_test.go +++ b/receiver/k8seventsreceiver/receiver_test.go @@ -164,10 +164,9 @@ func TestReceiverWithLeaderElection(t *testing.T) { // lose leadership - this will trigger Shutdown() le.InvokeOnStopping() - // Event should be ignored after losing leadership - recv.handleEvent(getEvent("Normal")) + // Verify count remains at 1 after losing leadership time.Sleep(100 * time.Millisecond) - require.Equal(t, 1, sink.LogRecordCount(), "event should be ignored after losing leadership") + require.Equal(t, 1, sink.LogRecordCount(), "count should remain at 1 after losing leadership") // regain leadership le.InvokeOnLeading() From 5940452666eed4f051f585d4575b7fdbbbd17e45 Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Wed, 10 Dec 2025 22:35:12 +0000 Subject: [PATCH 7/7] chore: remove changelogfile Signed-off-by: Paulo Dias --- .chloggen/feat_k8sevents_remove_standby.yaml | 30 -------------------- 1 file changed, 30 deletions(-) delete mode 100644 .chloggen/feat_k8sevents_remove_standby.yaml diff --git a/.chloggen/feat_k8sevents_remove_standby.yaml b/.chloggen/feat_k8sevents_remove_standby.yaml deleted file mode 100644 index 438350a0c90ef..0000000000000 --- a/.chloggen/feat_k8sevents_remove_standby.yaml +++ /dev/null @@ -1,30 +0,0 @@ -# Use this changelog template to create an entry for release notes. - -# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' -change_type: "enhancement" - -# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) -component: receiver/k8s_events - -# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: Remove standby mode behavior on leader election loss. Receiver now fully shuts down when losing leader lease instead of entering standby mode. - -# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. -issues: [44513, 42330] - -# (Optional) One or more lines of additional information to render under the primary note. -# These lines will be padded with 2 spaces and then inserted directly into the document. -# Use pipe (|) for multiline entries. -subtext: | - This change removes the standby mode feature introduced in PR #42330. The receiver will now call Shutdown() - when leader lease is lost, ensuring is using the same behaviour as the other components which uses the - k8sleaderelector extension. - -# If your change doesn't affect end users or the exported elements of any package, -# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. -# Optional: The change log or logs in which this entry should be included. -# e.g. '[user]' or '[user, api]' -# Include 'user' if the change is relevant to end users. -# Include 'api' if there is a change to a library API. -# Default: '[user]' -change_logs: [user]