Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/feat_42707.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: "enhancement"

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: "receiver/k8s_cluster"

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Switch to standby mode when leader lease is lost instead of shutdown"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [42707]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
80 changes: 67 additions & 13 deletions receiver/k8sclusterreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package k8sclusterreceiver // import "github.com/open-telemetry/opentelemetry-co
import (
"context"
"errors"
"sync"
"time"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -36,14 +37,27 @@ type kubernetesReceiver struct {
config *Config
settings receiver.Settings
metricsConsumer consumer.Metrics
cancel context.CancelFunc
obsrecv *receiverhelper.ObsReport
// cancel is the component-level context cancel function, only called during Shutdown
// to fully terminate the receiver.
cancel context.CancelFunc
// sessionCancel is the session-level context cancel function, called when losing leadership
// to stop the current informers/ticker while keeping the receiver instance alive for re-election.
sessionCancel context.CancelFunc
// mu protects sessionCancel and wg during leadership transitions.
mu sync.Mutex
// wg tracks the session goroutine lifecycle for clean shutdown on leadership loss.
wg *sync.WaitGroup
obsrecv *receiverhelper.ObsReport
}

type getExporters interface {
GetExporters() map[pipeline.Signal]map[component.ID]component.Component
}

// startReceiver begins a leadership session by initializing informers and starting watchers.
// It's called on initial Start (if no leader election) or by the leader elector's onStartLeading callback.
// When called multiple times due to leadership transitions, initialize() resets per-session state
// (informer factories and cache sync flags) to ensure clean restarts.
func (kr *kubernetesReceiver) startReceiver(ctx context.Context, host component.Host) error {
if err := kr.resourceWatcher.initialize(); err != nil {
return err
Expand All @@ -60,13 +74,25 @@ func (kr *kubernetesReceiver) startReceiver(ctx context.Context, host component.
return err
}

go func() {
// Create a leadership-session context so we can stop ONLY the watches/ticker on standby.
cctx, cancel := context.WithCancel(ctx)
kr.mu.Lock()
// replace any previous sessionCancel (should normally be nil when starting fresh)
kr.sessionCancel = cancel
// track the session worker goroutine
wg := &sync.WaitGroup{}
wg.Add(1)
kr.wg = wg
kr.mu.Unlock()

go func(wg *sync.WaitGroup) {
defer wg.Done()
kr.settings.Logger.Info("Starting shared informers and wait for initial cache sync.")
for _, informer := range kr.resourceWatcher.informerFactories {
if informer == nil {
continue
}
timedContextForInitialSync := kr.resourceWatcher.startWatchingResources(ctx, informer)
timedContextForInitialSync := kr.resourceWatcher.startWatchingResources(cctx, informer)

// Wait till either the initial cache sync times out or until the cancel method
// corresponding to this context is called.
Expand All @@ -91,12 +117,12 @@ func (kr *kubernetesReceiver) startReceiver(ctx context.Context, host component.
for {
select {
case <-ticker.C:
kr.dispatchMetrics(ctx)
case <-ctx.Done():
kr.dispatchMetrics(cctx)
case <-cctx.Done():
return
}
}
}()
}(wg)
return nil
}

Expand All @@ -122,11 +148,12 @@ func (kr *kubernetesReceiver) Start(ctx context.Context, host component.Host) er
}

leaderElectorExt.SetCallBackFuncs(
func(ctx context.Context) {
if err := kr.startReceiver(ctx, host); err != nil {
func(cbCtx context.Context) {
if err := kr.startReceiver(cbCtx, host); err != nil {
kr.settings.Logger.Error("Failed to start receiver", zap.Error(err))
}
}, func() {
},
func() {
kr.stopReceiver()
},
)
Expand All @@ -140,15 +167,42 @@ func (kr *kubernetesReceiver) Start(ctx context.Context, host component.Host) er
return nil
}

// stopReceiver puts the receiver in standby mode by stopping the current leadership session.
// Unlike Shutdown, this keeps the receiver instance alive and registered with the leader elector,
// so it can compete for leadership again. The receiver remains ready to restart via the
// onStartLeading callback when leadership is regained.
//
// This implements the "standby" pattern where:
// - The session context is cancelled, stopping informers and the metrics collection ticker
// - The session goroutine exits cleanly
// - All cached Kubernetes state (informer factories) will be reset on the next startReceiver call
// - The receiver struct and its registration with leader elector remain intact
func (kr *kubernetesReceiver) stopReceiver() {
kr.settings.Logger.Info("Stopping the receiver")
if kr.cancel != nil {
kr.cancel()
kr.settings.Logger.Info("Stopping the receiver session (standby)")
kr.mu.Lock()
cancel := kr.sessionCancel
wg := kr.wg
kr.sessionCancel = nil
kr.wg = nil
kr.mu.Unlock()

if cancel != nil {
cancel()
Comment thread
ChrsMark marked this conversation as resolved.
}

// Wait for the session goroutine to exit to avoid overlaps on quick flaps.
if wg != nil {
wg.Wait()
}
}

func (kr *kubernetesReceiver) Shutdown(context.Context) error {
// First stop the active session (if any).
kr.stopReceiver()
// Then cancel the component-level context.
if kr.cancel != nil {
Comment thread
dmitryax marked this conversation as resolved.
kr.cancel()
}
return nil
}

Expand Down
111 changes: 111 additions & 0 deletions receiver/k8sclusterreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,103 @@ func getUpdatedPod(pod *corev1.Pod) any {
}
}

func TestReceiverWithLeaderElection_StandbyAndResume_Metrics(t *testing.T) {
fakeLeaderElection := &k8sleaderelectortest.FakeLeaderElection{}
fakeHost := &k8sleaderelectortest.FakeHost{FakeLeaderElection: fakeLeaderElection}

client := newFakeClientWithAllResources()
osQuotaClient := fakeQuota.NewSimpleClientset()
sink := new(consumertest.MetricsSink)

tt := componenttest.NewTelemetry()
defer func() { require.NoError(t, tt.Shutdown(t.Context())) }()

kr := setupReceiver(client, osQuotaClient, sink, nil, 10*time.Second, tt, nil, component.MustNewID("k8s_leader_elector"))

// Seed initial pods once (avoid calling createPods multiple times to prevent name collisions).
createPods(t, client, 2, false)

require.NoError(t, kr.Start(t.Context(), fakeHost))

// Become leader -> start session
fakeLeaderElection.InvokeOnLeading()

// Wait for first collection
var baseline int
require.Eventually(t, func() bool {
baseline = sink.DataPointCount()
return baseline >= 2
}, 10*time.Second, 100*time.Millisecond, "metrics not collected while leader")

// Enter standby: stop current session (informers + ticker)
fakeLeaderElection.InvokeOnStopping()

// While in standby, add a new pod with a UNIQUE name; should NOT affect metrics
createUniquePod(t, client, "default", "standby-pod-1")

// Wait > 1 interval; no new data should arrive in standby
time.Sleep(1500 * time.Millisecond)
require.Equal(t, baseline, sink.DataPointCount(), "no metrics should be emitted while in standby")

// Regain leadership
fakeLeaderElection.InvokeOnLeading()

// Add another unique pod after resuming -> should be picked up
createUniquePod(t, client, "default", "resumed-pod-1")

require.Eventually(t, func() bool {
return sink.DataPointCount() > baseline
}, 10*time.Second, 100*time.Millisecond, "metrics did not resume after re-leading")

require.NoError(t, kr.Shutdown(t.Context()))
}

func TestReceiverWithLeaderElection_StandbyDoesNotDeadlock_OnQuickFlaps(t *testing.T) {
fakeLeaderElection := &k8sleaderelectortest.FakeLeaderElection{}
fakeHost := &k8sleaderelectortest.FakeHost{FakeLeaderElection: fakeLeaderElection}

client := newFakeClientWithAllResources()
osQuotaClient := fakeQuota.NewSimpleClientset()
sink := new(consumertest.MetricsSink)

tt := componenttest.NewTelemetry()
defer func() { require.NoError(t, tt.Shutdown(t.Context())) }()

kr := setupReceiver(client, osQuotaClient, sink, nil, 10*time.Second, tt, nil, component.MustNewID("k8s_leader_elector"))

// Seed something to observe; also avoids zero-work sessions
createUniquePod(t, client, "default", "initial-pod")

require.NoError(t, kr.Start(t.Context(), fakeHost))

// Start one clean session and wait for any data point to ensure we're up
fakeLeaderElection.InvokeOnLeading()
require.Eventually(t, func() bool {
return sink.DataPointCount() >= 1
}, 10*time.Second, 100*time.Millisecond, "session did not start")

// Flap with small sleeps to let stop complete before next start, avoiding exporter race
const loops = 50
done := make(chan struct{})
go func() {
defer close(done)
for i := 0; i < loops; i++ {
fakeLeaderElection.InvokeOnStopping()
time.Sleep(3 * time.Millisecond) // allow session goroutine to exit
fakeLeaderElection.InvokeOnLeading()
time.Sleep(3 * time.Millisecond) // allow new session to initialize
}
}()

select {
case <-done:
case <-time.After(10 * time.Second):
t.Fatal("leader flap goroutine timed out (possible deadlock)")
}

require.NoError(t, kr.Shutdown(t.Context()))
}

func setupReceiver(client *fake.Clientset, osQuotaClient quotaclientset.Interface, metricsConsumer consumer.Metrics, logsConsumer consumer.Logs, initialSyncTimeout time.Duration, tt *componenttest.Telemetry, namespaces []string, leaderElector component.ID) *kubernetesReceiver {
distribution := distributionKubernetes
if osQuotaClient != nil {
Expand Down Expand Up @@ -438,3 +535,17 @@ func gvkToAPIResource(gvk schema.GroupVersionKind) v1.APIResource {
Kind: gvk.Kind,
}
}

func createUniquePod(t *testing.T, client *fake.Clientset, ns, name string) {
t.Helper()
_, err := client.CoreV1().Pods(ns).Create(t.Context(), &corev1.Pod{
ObjectMeta: v1.ObjectMeta{
Name: name,
Namespace: ns,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{Name: "c", Image: "i"}},
},
}, v1.CreateOptions{})
require.NoError(t, err)
}
35 changes: 31 additions & 4 deletions receiver/k8sclusterreceiver/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"reflect"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -62,6 +63,10 @@ type resourceWatcher struct {
initialSyncTimedOut *atomic.Bool
config *Config
entityLogConsumer consumer.Logs
// mu protects metadataConsumers and entityLogConsumer during leadership transitions.
// These fields are modified in setupMetadataExporters (called on leadership gain)
// while potentially being accessed concurrently by informer callbacks.
mu sync.RWMutex

// For mocking.
makeClient func(apiConf k8sconfig.APIConfig) (kubernetes.Interface, error)
Expand All @@ -85,6 +90,17 @@ func newResourceWatcher(set receiver.Settings, cfg *Config, metadataStore *metad
}

func (rw *resourceWatcher) initialize() error {
// Reset per-leadership-session state to avoid accumulating informers and
// to ensure handlers wait for cache sync again after re-election.
// (Old factories tied to the previous context must not be reused.)
rw.informerFactories = nil
if rw.initialSyncDone != nil {
rw.initialSyncDone.Store(false)
}
if rw.initialSyncTimedOut != nil {
rw.initialSyncTimedOut.Store(false)
}

client, err := rw.makeClient(rw.config.APIConfig)
if err != nil {
return fmt.Errorf("Failed to create Kubernetes client: %w", err)
Expand Down Expand Up @@ -331,7 +347,10 @@ func (rw *resourceWatcher) onAdd(obj any) {
}

func (rw *resourceWatcher) hasDestination() bool {
return len(rw.metadataConsumers) != 0 || rw.entityLogConsumer != nil
rw.mu.RLock()
has := len(rw.metadataConsumers) != 0 || rw.entityLogConsumer != nil
rw.mu.RUnlock()
Comment thread
paulojmdias marked this conversation as resolved.
return has
}

func (rw *resourceWatcher) onUpdate(oldObj, newObj any) {
Expand Down Expand Up @@ -424,7 +443,9 @@ func (rw *resourceWatcher) setupMetadataExporters(
)
}

rw.mu.Lock()
rw.metadataConsumers = out
rw.mu.Unlock()
return nil
}

Expand All @@ -448,20 +469,26 @@ func (rw *resourceWatcher) syncMetadataUpdate(oldMetadata, newMetadata map[exper

metadataUpdate := metadata.GetMetadataUpdate(oldMetadata, newMetadata)
if len(metadataUpdate) != 0 {
for _, consume := range rw.metadataConsumers {
rw.mu.RLock()
consumers := append([]metadataConsumer(nil), rw.metadataConsumers...)
rw.mu.RUnlock()
for _, consume := range consumers {
_ = consume(metadataUpdate)
}
}

if rw.entityLogConsumer != nil {
rw.mu.RLock()
entityLogsConsumer := rw.entityLogConsumer
rw.mu.RUnlock()
if entityLogsConsumer != nil {
// Represent metadata update as entity events.
entityEvents := metadata.GetEntityEvents(oldMetadata, newMetadata, timestamp, rw.config.MetadataCollectionInterval)

// Convert entity events to log representation.
logs := entityEvents.ConvertAndMoveToLogs()

if logs.LogRecordCount() != 0 {
err := rw.entityLogConsumer.ConsumeLogs(context.Background(), logs)
err := entityLogsConsumer.ConsumeLogs(context.Background(), logs)
if err != nil {
rw.logger.Error("Error sending entity events to the consumer", zap.Error(err))

Expand Down