diff --git a/.chloggen/feat_42266.yaml b/.chloggen/feat_42266.yaml new file mode 100644 index 0000000000000..a8419de4a4dfa --- /dev/null +++ b/.chloggen/feat_42266.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "enhancement" + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receiver/k8seventsreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Added support for Leader Election into `k8seventsreceiver` using `k8sleaderelector` extension." + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [42266] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/k8seventsreceiver/README.md b/receiver/k8seventsreceiver/README.md index f04890e29d637..d363eac670b08 100644 --- a/receiver/k8seventsreceiver/README.md +++ b/receiver/k8seventsreceiver/README.md @@ -28,6 +28,7 @@ The following settings are optional: the K8s API server. This can be one of `none` (for no auth), `serviceAccount` (to use the standard service account token provided to the agent pod), or `kubeConfig` to use credentials from `~/.kube/config`. +- `k8s_leader_elector` (default: none): if specified, will enable Leader Election by using `k8sleaderelector` extension - `namespaces` (default = `all`): An array of `namespaces` to collect events from. This receiver will continuously watch all the `namespaces` mentioned in the array for new events. @@ -37,6 +38,7 @@ Examples: ```yaml k8s_events: auth_type: kubeConfig + k8s_leader_elector: k8s_leader_elector namespaces: [default, my_namespace] ``` diff --git a/receiver/k8seventsreceiver/config.go b/receiver/k8seventsreceiver/config.go index 96ba451fef24a..5886da233d377 100644 --- a/receiver/k8seventsreceiver/config.go +++ b/receiver/k8seventsreceiver/config.go @@ -4,6 +4,7 @@ package k8seventsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8seventsreceiver" import ( + "go.opentelemetry.io/collector/component" k8s "k8s.io/client-go/kubernetes" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" @@ -16,6 +17,8 @@ type Config struct { // List of ‘namespaces’ to collect events from. Namespaces []string `mapstructure:"namespaces"` + K8sLeaderElector *component.ID `mapstructure:"k8s_leader_elector"` + // For mocking makeClient func(apiConf k8sconfig.APIConfig) (k8s.Interface, error) } diff --git a/receiver/k8seventsreceiver/go.mod b/receiver/k8seventsreceiver/go.mod index 57c416de97737..773bae1f6b439 100644 --- a/receiver/k8seventsreceiver/go.mod +++ b/receiver/k8seventsreceiver/go.mod @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8seve go 1.24.0 require ( + github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector v0.136.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.136.0 github.com/stretchr/testify v1.11.1 go.opentelemetry.io/collector/component v1.42.1-0.20250925151503-069408608b28 @@ -61,6 +62,7 @@ require ( go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/collector/consumer/consumererror v0.136.1-0.20250925151503-069408608b28 // indirect go.opentelemetry.io/collector/consumer/xconsumer v0.136.1-0.20250925151503-069408608b28 // indirect + go.opentelemetry.io/collector/extension v1.42.1-0.20250925151503-069408608b28 // indirect go.opentelemetry.io/collector/featuregate v1.42.1-0.20250925151503-069408608b28 // indirect go.opentelemetry.io/collector/internal/telemetry v0.136.1-0.20250925151503-069408608b28 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.136.1-0.20250925151503-069408608b28 // indirect @@ -91,7 +93,7 @@ require ( k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 // indirect sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect - sigs.k8s.io/structured-merge-diff/v4 v4.4.2 // indirect + sigs.k8s.io/structured-merge-diff/v4 v4.4.3 // indirect sigs.k8s.io/yaml v1.5.0 // indirect ) @@ -100,6 +102,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sco // openshift removed all tags from their repo, use the pseudoversion from the release-3.9 branch HEAD replace github.com/openshift/api v3.9.0+incompatible => github.com/openshift/api v0.0.0-20180801171038-322a19404e37 +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector => ../../extension/k8sleaderelector + retract ( v0.76.2 v0.76.1 diff --git a/receiver/k8seventsreceiver/go.sum b/receiver/k8seventsreceiver/go.sum index 78b5e5489c699..ce1ab5f650725 100644 --- a/receiver/k8seventsreceiver/go.sum +++ b/receiver/k8seventsreceiver/go.sum @@ -125,6 +125,10 @@ go.opentelemetry.io/collector/consumer/consumertest v0.136.1-0.20250925151503-06 go.opentelemetry.io/collector/consumer/consumertest v0.136.1-0.20250925151503-069408608b28/go.mod h1:gTdRvUiJSmzmWp2Ndlh0N0yQ3hPnmTYul2DWuy31/D0= go.opentelemetry.io/collector/consumer/xconsumer v0.136.1-0.20250925151503-069408608b28 h1:oSsW9KQUKHg1jraUmerf0cGeFKK4ilVNfplYiDlz0G4= go.opentelemetry.io/collector/consumer/xconsumer v0.136.1-0.20250925151503-069408608b28/go.mod h1:sXw0lOF6D1iKhLy2xorJ8D3PysDXT0egmHJZu8TY0lE= +go.opentelemetry.io/collector/extension v1.42.1-0.20250925151503-069408608b28 h1:NzPl01v8TOgXUiETVuI9y+UMpHoeS6eT46hKsNe8OtA= +go.opentelemetry.io/collector/extension v1.42.1-0.20250925151503-069408608b28/go.mod h1:lXWCtS04+LjdrG5fZopmQh37SOGxMMf7e7nu/Vh4CQM= +go.opentelemetry.io/collector/extension/extensiontest v0.136.1-0.20250925151503-069408608b28 h1:by0Yo7V1DX6tVqC/sPT6ijX1Ma8QOLTLIIHomrL4i4A= +go.opentelemetry.io/collector/extension/extensiontest v0.136.1-0.20250925151503-069408608b28/go.mod h1:v/2dxcC8j51YbIpv9/UuUtzXTRRCZb6nqGZ++hLO4Js= go.opentelemetry.io/collector/featuregate v1.42.1-0.20250925151503-069408608b28 h1:1UZY4/wGe9+B3qNG0QQ/EMkeKPejiwj+qvyK8ibwNtQ= go.opentelemetry.io/collector/featuregate v1.42.1-0.20250925151503-069408608b28/go.mod h1:d0tiRzVYrytB6LkcYgz2ESFTv7OktRPQe0QEQcPt1L4= go.opentelemetry.io/collector/internal/telemetry v0.136.1-0.20250925151503-069408608b28 h1:d1PZYb5PSbxKPVzq+zDIUDJDG0bXpy6co8oOWzMmo3U= @@ -248,8 +252,8 @@ k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 h1:M3sRQVHv7vB20Xc2ybTt7ODCeFj6J k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 h1:/Rv+M11QRah1itp8VhT6HoVx1Ray9eB4DBr+K+/sCJ8= sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3/go.mod h1:18nIHnGi6636UCz6m8i4DhaJ65T6EruyzmoQqI2BVDo= -sigs.k8s.io/structured-merge-diff/v4 v4.4.2 h1:MdmvkGuXi/8io6ixD5wud3vOLwc1rj0aNqRlpuvjmwA= -sigs.k8s.io/structured-merge-diff/v4 v4.4.2/go.mod h1:N8f93tFZh9U6vpxwRArLiikrE5/2tiu1w1AGfACIGE4= +sigs.k8s.io/structured-merge-diff/v4 v4.4.3 h1:sCP7Vv3xx/CWIuTPVN38lUPx0uw0lcLfzaiDa8Ja01A= +sigs.k8s.io/structured-merge-diff/v4 v4.4.3/go.mod h1:N8f93tFZh9U6vpxwRArLiikrE5/2tiu1w1AGfACIGE4= sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= sigs.k8s.io/yaml v1.5.0 h1:M10b2U7aEUY6hRtU870n2VTPgR5RZiL/I6Lcc2F4NUQ= sigs.k8s.io/yaml v1.5.0/go.mod h1:wZs27Rbxoai4C0f8/9urLZtZtF3avA3gKvGyPdDqTO4= diff --git a/receiver/k8seventsreceiver/receiver.go b/receiver/k8seventsreceiver/receiver.go index 9ac69f9f35fb3..eb2a86c6917a5 100644 --- a/receiver/k8seventsreceiver/receiver.go +++ b/receiver/k8seventsreceiver/receiver.go @@ -5,6 +5,8 @@ package k8seventsreceiver // import "github.com/open-telemetry/opentelemetry-col import ( "context" + "fmt" + "sync" "time" "go.opentelemetry.io/collector/component" @@ -16,6 +18,7 @@ import ( k8s "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8seventsreceiver/internal/metadata" ) @@ -28,6 +31,7 @@ type k8seventsReceiver struct { ctx context.Context cancel context.CancelFunc obsrecv *receiverhelper.ObsReport + wg sync.WaitGroup } // newReceiver creates the Kubernetes events receiver with the given configuration. @@ -56,7 +60,7 @@ func newReceiver( }, nil } -func (kr *k8seventsReceiver) Start(ctx context.Context, _ component.Host) error { +func (kr *k8seventsReceiver) Start(ctx context.Context, host component.Host) error { kr.ctx, kr.cancel = context.WithCancel(ctx) k8sInterface, err := kr.config.getK8sClient() @@ -64,6 +68,40 @@ func (kr *k8seventsReceiver) Start(ctx context.Context, _ component.Host) error return err } + if kr.config.K8sLeaderElector != nil { + k8sLeaderElector := host.GetExtensions()[*kr.config.K8sLeaderElector] + if k8sLeaderElector == nil { + return fmt.Errorf("unknown k8s leader elector %q", kr.config.K8sLeaderElector) + } + + elector, ok := k8sLeaderElector.(k8sleaderelector.LeaderElection) + if !ok { + return fmt.Errorf("the extension %T does not implement k8sleaderelector.LeaderElection", k8sLeaderElector) + } + + kr.settings.Logger.Info("registering the receiver in leader election") + + elector.SetCallBackFuncs( + func(_ context.Context) { + kr.settings.Logger.Info("Events Receiver started as leader") + if len(kr.config.Namespaces) == 0 { + kr.startWatch(corev1.NamespaceAll, k8sInterface) + } else { + for _, ns := range kr.config.Namespaces { + kr.startWatch(ns, k8sInterface) + } + } + }, + // onStoppedLeading: stop watches, but DO NOT shut the whole receiver down + func() { + kr.settings.Logger.Info("no longer leader, stopping watches") + kr.stopWatches() + }, + ) + return nil + } + + // No leader election: start immediately. kr.settings.Logger.Info("starting to watch namespaces for the events.") if len(kr.config.Namespaces) == 0 { kr.startWatch(corev1.NamespaceAll, k8sInterface) @@ -72,22 +110,38 @@ func (kr *k8seventsReceiver) Start(ctx context.Context, _ component.Host) error kr.startWatch(ns, k8sInterface) } } - return nil } func (kr *k8seventsReceiver) Shutdown(context.Context) error { - if kr.cancel == nil { - return nil - } - // Stop watching all the namespaces by closing all the stopper channels. - for _, stopperChan := range kr.stopperChanList { - close(stopperChan) + // Stop informers and wait for them to exit. + kr.stopWatches() + + if kr.cancel != nil { + kr.cancel() + kr.cancel = nil } - kr.cancel() return nil } +// stopWatches closes all informer stop channels (idempotently) and waits for their goroutines to exit. +func (kr *k8seventsReceiver) stopWatches() { + if len(kr.stopperChanList) == 0 { + return + } + for _, ch := range kr.stopperChanList { + select { + case <-ch: // already closed + default: + close(ch) + } + } + // Wait for all controller.Run goroutines to finish. + kr.wg.Wait() + // Reset slice so we can start again on leadership regain. + kr.stopperChanList = nil +} + // Add the 'Event' handler and trigger the watch for a specific namespace. // For new and updated events, the code is relying on the following k8s code implementation: // https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/client-go/tools/record/events_cache.go#L327 @@ -118,7 +172,7 @@ func (kr *k8seventsReceiver) handleEvent(ev *corev1.Event) { // startWatchingNamespace creates an informer and starts // watching a specific namespace for the events. -func (*k8seventsReceiver) startWatchingNamespace( +func (kr *k8seventsReceiver) startWatchingNamespace( clientset k8s.Interface, handlers cache.ResourceEventHandlerFuncs, ns string, @@ -132,7 +186,11 @@ func (*k8seventsReceiver) startWatchingNamespace( ResyncPeriod: 0, Handler: handlers, }) - go controller.Run(stopper) + kr.wg.Add(1) + go func() { + defer kr.wg.Done() + controller.Run(stopper) + }() } // Allow events with eventTimestamp(EventTime/LastTimestamp/FirstTimestamp) diff --git a/receiver/k8seventsreceiver/receiver_test.go b/receiver/k8seventsreceiver/receiver_test.go index 1e434eecb577f..79915b82d337f 100644 --- a/receiver/k8seventsreceiver/receiver_test.go +++ b/receiver/k8seventsreceiver/receiver_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/receiver/receivertest" @@ -18,6 +19,7 @@ import ( k8s "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector/k8sleaderelectortest" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8seventsreceiver/internal/metadata" ) @@ -129,6 +131,56 @@ func TestAllowEvent(t *testing.T) { assert.False(t, shouldAllowEvent) } +func TestReceiverWithLeaderElection(t *testing.T) { + le := &k8sleaderelectortest.FakeLeaderElection{} + host := &k8sleaderelectortest.FakeHost{FakeLeaderElection: le} + leaderID := component.MustNewID("k8s_leader_elector") + + cfg := createDefaultConfig().(*Config) + cfg.K8sLeaderElector = &leaderID + cfg.makeClient = func(_ k8sconfig.APIConfig) (k8s.Interface, error) { + return fake.NewSimpleClientset(), nil + } + + sink := new(consumertest.LogsSink) + r, err := newReceiver( + receivertest.NewNopSettings(metadata.Type), + cfg, + sink, + ) + require.NoError(t, err) + recv := r.(*k8seventsReceiver) + + require.NoError(t, r.Start(t.Context(), host)) + t.Cleanup(func() { + assert.NoError(t, r.Shutdown(t.Context())) + }) + + // Become leader: start processing events + le.InvokeOnLeading() + recv.handleEvent(getEvent()) + + require.Eventually(t, func() bool { + return sink.LogRecordCount() == 1 + }, 5*time.Second, 100*time.Millisecond, "logs not collected while leader") + + // lose leadership + le.InvokeOnStopping() + + // DO NOT call recv.handleEvent(...) here; informer wouldn't deliver to this instance. + // Give a tiny moment and ensure count stays 1. + time.Sleep(100 * time.Millisecond) + assert.Equal(t, 1, sink.LogRecordCount(), "event should be ignored after losing leadership") + + // regain leadership and inject again + le.InvokeOnLeading() + recv.handleEvent(getEvent()) + + require.Eventually(t, func() bool { + return sink.LogRecordCount() == 2 + }, 5*time.Second, 100*time.Millisecond, "logs not collected after regaining leadership") +} + func getEvent() *corev1.Event { return &corev1.Event{ InvolvedObject: corev1.ObjectReference{