Skip to content
71 changes: 20 additions & 51 deletions receiver/k8sobjectsreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ type k8sobjectsreceiver struct {
obsrecv *receiverhelper.ObsReport
mu sync.Mutex
cancel context.CancelFunc
wg sync.WaitGroup
}

func newReceiver(params receiver.Settings, config *Config, consumer consumer.Logs) (receiver.Logs, error) {
Expand Down Expand Up @@ -133,6 +132,8 @@ func (kr *k8sobjectsreceiver) Start(ctx context.Context, host component.Host) er
return fmt.Errorf("the extension %T is not implement k8sleaderelector.LeaderElection", k8sLeaderElector)
}

// Register callbacks with the leader elector extension. These callbacks remain active
// for the lifetime of the receiver, allowing it to restart when leadership is regained.
elector.SetCallBackFuncs(
func(ctx context.Context) {
cctx, cancel := context.WithCancel(ctx)
Expand All @@ -142,12 +143,15 @@ func (kr *k8sobjectsreceiver) Start(ctx context.Context, host component.Host) er
}
kr.setting.Logger.Info("Object Receiver started as leader")
},
// onStoppedLeading: stop watches, but DO NOT shut the whole receiver down
func() {
kr.setting.Logger.Info("no longer leader, stopping watches")
kr.stopWatches()
},
)
// Shutdown on leader loss. The receiver will restart if leadership is regained
// since the callbacks remain registered with the leader elector extension.
kr.setting.Logger.Info("no longer leader, stopping")
err = kr.Shutdown(context.Background())
Comment thread
ChrsMark marked this conversation as resolved.
if err != nil {
kr.setting.Logger.Error("shutdown receiver error:", zap.Error(err))
}
})
} else {
cctx, cancel := context.WithCancel(ctx)
kr.cancel = cancel
Expand All @@ -160,37 +164,17 @@ func (kr *k8sobjectsreceiver) Start(ctx context.Context, host component.Host) er
}

func (kr *k8sobjectsreceiver) Shutdown(context.Context) error {
// Stop informers and wait for them to exit.
kr.setting.Logger.Info("Object Receiver stopped")
kr.stopWatches()

if kr.cancel != nil {
kr.cancel()
kr.cancel = nil
}
return nil
}

// stopWatches closes all informer stop channels (idempotently) and waits for their goroutines to exit.
func (kr *k8sobjectsreceiver) stopWatches() {
kr.mu.Lock()
// Copy and clear the list under lock to avoid races on restart
chans := kr.stopperChanList
kr.stopperChanList = nil
kr.mu.Unlock()

if len(chans) == 0 {
return
}
for _, ch := range chans {
select {
case <-ch: // already closed
default:
close(ch)
}
for _, stopperChan := range kr.stopperChanList {
close(stopperChan)
}
// Now wait for all WG-tracked loops (both pull & watch) to exit
kr.wg.Wait()
kr.mu.Unlock()
return nil
}

func (kr *k8sobjectsreceiver) start(ctx context.Context, object *K8sObjectsConfig) {
Expand Down Expand Up @@ -225,9 +209,7 @@ func (kr *k8sobjectsreceiver) startPull(ctx context.Context, config *K8sObjectsC
stopperChan := make(chan struct{})
kr.mu.Lock()
kr.stopperChanList = append(kr.stopperChanList, stopperChan)
kr.wg.Add(1)
kr.mu.Unlock()
defer kr.wg.Done()
ticker := newTicker(ctx, config.Interval)
listOption := metav1.ListOptions{
FieldSelector: config.FieldSelector,
Expand All @@ -248,21 +230,15 @@ func (kr *k8sobjectsreceiver) startPull(ctx context.Context, config *K8sObjectsC
kr.setting.Logger.Error("error in pulling object",
zap.String("resource", config.gvr.String()),
zap.Error(err))
continue
}
if len(objects.Items) == 0 {
continue
} else if len(objects.Items) > 0 {
logs := pullObjectsToLogData(objects, time.Now(), config, kr.setting.BuildInfo.Version)
obsCtx := kr.obsrecv.StartLogsOp(ctx)
logRecordCount := logs.LogRecordCount()
err = kr.consumer.ConsumeLogs(obsCtx, logs)
kr.obsrecv.EndLogsOp(obsCtx, metadata.Type.String(), logRecordCount, err)
}
logs := pullObjectsToLogData(objects, time.Now(), config, kr.setting.BuildInfo.Version)
obsCtx := kr.obsrecv.StartLogsOp(ctx)
logRecordCount := logs.LogRecordCount()
err = kr.consumer.ConsumeLogs(obsCtx, logs)
kr.obsrecv.EndLogsOp(obsCtx, metadata.Type.String(), logRecordCount, err)

case <-stopperChan:
return
case <-ctx.Done():
return
}
}
}
Expand All @@ -271,9 +247,7 @@ func (kr *k8sobjectsreceiver) startWatch(ctx context.Context, config *K8sObjects
stopperChan := make(chan struct{})
kr.mu.Lock()
kr.stopperChanList = append(kr.stopperChanList, stopperChan)
kr.wg.Add(1)
kr.mu.Unlock()
defer kr.wg.Done()

if kr.config.IncludeInitialState {
kr.sendInitialState(ctx, config, resource)
Expand All @@ -286,7 +260,6 @@ func (kr *k8sobjectsreceiver) startWatch(ctx context.Context, config *K8sObjects
})

cancelCtx, cancel := context.WithCancel(ctx)
defer cancel()
cfgCopy := *config
wait.UntilWithContext(cancelCtx, func(newCtx context.Context) {
resourceVersion, err := getResourceVersion(newCtx, &cfgCopy, resource)
Expand Down Expand Up @@ -374,10 +347,6 @@ func (kr *k8sobjectsreceiver) doWatch(ctx context.Context, config *K8sObjectsCon
res := watcher.ResultChan()
for {
select {
case <-ctx.Done():
kr.setting.Logger.Info("context canceled, stopping watch",
zap.String("resource", config.gvr.String()))
return true
case data, ok := <-res:
if data.Type == apiWatch.Error {
errObject := apierrors.FromObject(data.Object)
Expand Down
244 changes: 0 additions & 244 deletions receiver/k8sobjectsreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,247 +516,3 @@ func TestReceiverWithLeaderElection(t *testing.T) {
}, 20*time.Second, 100*time.Millisecond,
"logs not collected")
}

func TestWatchWithLeaderElectionStandby(t *testing.T) {
t.Parallel()

fakeLeaderElection := &k8sleaderelectortest.FakeLeaderElection{}
fakeHost := &k8sleaderelectortest.FakeHost{FakeLeaderElection: fakeLeaderElection}
leaderElectorID := component.MustNewID("k8s_leader_elector")

mockClient := newMockDynamicClient()
mockClient.createPods(
generatePod("pod1", "default", map[string]any{"environment": "production"}, "1"),
)

rCfg := createDefaultConfig().(*Config)
rCfg.makeDynamicClient = mockClient.getMockDynamicClient
rCfg.makeDiscoveryClient = getMockDiscoveryClient
rCfg.ErrorMode = PropagateError
rCfg.IncludeInitialState = false
rCfg.Objects = []*K8sObjectsConfig{
{Name: "pods", Mode: WatchMode, Namespaces: []string{"default"}},
}
rCfg.K8sLeaderElector = &leaderElectorID

r, err := newReceiver(receivertest.NewNopSettings(metadata.Type), rCfg, consumertest.NewNop())
require.NoError(t, err)
kr := r.(*k8sobjectsreceiver)
sink := new(consumertest.LogsSink)
kr.consumer = sink

require.NoError(t, kr.Start(t.Context(), fakeHost))

// Become leader -> watches will start asynchronously
fakeLeaderElection.InvokeOnLeading()

// Give the watch time to establish (avoid list→watch gap)
time.Sleep(150 * time.Millisecond)

// Now create pods that should be observed by the active watch
mockClient.createPods(
generatePod("pod2", "default", map[string]any{"environment": "x"}, "2"),
generatePod("pod3", "default", map[string]any{"environment": "y"}, "3"),
)

require.Eventually(t, func() bool {
return sink.LogRecordCount() == 2
}, 5*time.Second, 50*time.Millisecond, "watch events not collected while leader")

// Standby
fakeLeaderElection.InvokeOnStopping()

// Create while in standby -> should NOT be delivered
mockClient.createPods(
generatePod("pod4", "default", map[string]any{"environment": "standby"}, "4"),
)
time.Sleep(150 * time.Millisecond)
assert.Equal(t, 2, sink.LogRecordCount(), "no events should be received while in standby")

// Resume
fakeLeaderElection.InvokeOnLeading()
time.Sleep(150 * time.Millisecond)

mockClient.createPods(
generatePod("pod5", "default", map[string]any{"environment": "resumed"}, "5"),
)

require.Eventually(t, func() bool {
return sink.LogRecordCount() == 3
}, 5*time.Second, 50*time.Millisecond, "watch did not resume after re-leading")

assert.NoError(t, kr.Shutdown(t.Context()))
}

func TestPullWithLeaderElectionStandby(t *testing.T) {
t.Parallel()

fakeLeaderElection := &k8sleaderelectortest.FakeLeaderElection{}
fakeHost := &k8sleaderelectortest.FakeHost{FakeLeaderElection: fakeLeaderElection}
leaderElectorID := component.MustNewID("k8s_leader_elector")

mockClient := newMockDynamicClient()
mockClient.createPods(generatePod("pod1", "default", map[string]any{"environment": "production"}, "1"))

rCfg := createDefaultConfig().(*Config)
rCfg.makeDynamicClient = mockClient.getMockDynamicClient
rCfg.makeDiscoveryClient = getMockDiscoveryClient
rCfg.ErrorMode = PropagateError
rCfg.Objects = []*K8sObjectsConfig{
{
Name: "pods",
Mode: PullMode,
Interval: 10 * time.Millisecond, // fast pull to make the test snappy
},
}
rCfg.K8sLeaderElector = &leaderElectorID

r, err := newReceiver(receivertest.NewNopSettings(metadata.Type), rCfg, consumertest.NewNop())
require.NoError(t, err)
kr := r.(*k8sobjectsreceiver)
sink := new(consumertest.LogsSink)
kr.consumer = sink

require.NoError(t, kr.Start(t.Context(), fakeHost))

// Become leader: pulls start
fakeLeaderElection.InvokeOnLeading()

// Expect at least one pull to have happened
require.Eventually(t, func() bool { return sink.LogRecordCount() >= 1 },
2*time.Second, 20*time.Millisecond, "pulls did not start while leader")

// Go standby: pulls stop
fakeLeaderElection.InvokeOnStopping()
countAtStandby := sink.LogRecordCount()

// Add more pods while in standby—should NOT increase count
mockClient.createPods(
generatePod("pod2", "default", map[string]any{"environment": "standby"}, "2"),
)
time.Sleep(100 * time.Millisecond)
assert.Equal(t, countAtStandby, sink.LogRecordCount(), "no pulls should occur while in standby")

// Regain leadership: pulls resume
fakeLeaderElection.InvokeOnLeading()

// Now the next pull should include the new pod(s)
require.Eventually(t, func() bool { return sink.LogRecordCount() > countAtStandby },
2*time.Second, 20*time.Millisecond, "pulls did not resume after re-leading")

assert.NoError(t, kr.Shutdown(t.Context()))
}

func TestWatchLeaderFlapDuringStartup_NoPanic(t *testing.T) {
t.Parallel()

fakeLE := &k8sleaderelectortest.FakeLeaderElection{}
fakeHost := &k8sleaderelectortest.FakeHost{FakeLeaderElection: fakeLE}
leaderElectorID := component.MustNewID("k8s_leader_elector")

mockClient := newMockDynamicClient()
mockClient.createPods(generatePod("pod1", "default", map[string]any{"env": "prod"}, "1"))

cfg := createDefaultConfig().(*Config)
cfg.makeDynamicClient = mockClient.getMockDynamicClient
cfg.makeDiscoveryClient = getMockDiscoveryClient
cfg.ErrorMode = PropagateError
cfg.IncludeInitialState = false
cfg.Objects = []*K8sObjectsConfig{
{Name: "pods", Mode: WatchMode, Namespaces: []string{"default"}},
}
cfg.K8sLeaderElector = &leaderElectorID

r, err := newReceiver(receivertest.NewNopSettings(metadata.Type), cfg, consumertest.NewNop())
require.NoError(t, err)
kr := r.(*k8sobjectsreceiver)

require.NoError(t, kr.Start(t.Context(), fakeHost))

// 1) Become leader once and wait until at least one worker is registered
fakeLE.InvokeOnLeading()
require.Eventually(t, func() bool {
kr.mu.Lock()
n := len(kr.stopperChanList)
kr.mu.Unlock()
return n > 0
}, 2*time.Second, 10*time.Millisecond, "worker not registered")

// 2) Flap leadership, but give a *tiny* breathing room between lead/stop
const loops = 100
done := make(chan struct{})
go func() {
defer close(done)
for range loops {
fakeLE.InvokeOnStopping()
// small gap so stopWatches() can complete and workers exit
time.Sleep(1 * time.Millisecond)
fakeLE.InvokeOnLeading()
// small gap so a worker can get started and reach the watch loop
time.Sleep(1 * time.Millisecond)
}
}()

select {
case <-done:
case <-time.After(10 * time.Second):
t.Fatal("leader flap goroutine timed out (possible deadlock)")
}

assert.NoError(t, kr.Shutdown(t.Context()))
}

func TestPullLeaderFlapDuringStartup_NoPanic(t *testing.T) {
t.Parallel()

fakeLE := &k8sleaderelectortest.FakeLeaderElection{}
fakeHost := &k8sleaderelectortest.FakeHost{FakeLeaderElection: fakeLE}
leaderElectorID := component.MustNewID("k8s_leader_elector")

mockClient := newMockDynamicClient()
mockClient.createPods(generatePod("pod1", "default", map[string]any{"env": "prod"}, "1"))

cfg := createDefaultConfig().(*Config)
cfg.makeDynamicClient = mockClient.getMockDynamicClient
cfg.makeDiscoveryClient = getMockDiscoveryClient
cfg.ErrorMode = PropagateError
cfg.Objects = []*K8sObjectsConfig{
{Name: "pods", Mode: PullMode, Interval: 5 * time.Millisecond},
}
cfg.K8sLeaderElector = &leaderElectorID

r, err := newReceiver(receivertest.NewNopSettings(metadata.Type), cfg, consumertest.NewNop())
require.NoError(t, err)
kr := r.(*k8sobjectsreceiver)

require.NoError(t, kr.Start(t.Context(), fakeHost))

// Ensure a worker is registered before flapping
fakeLE.InvokeOnLeading()
require.Eventually(t, func() bool {
kr.mu.Lock()
n := len(kr.stopperChanList)
kr.mu.Unlock()
return n > 0
}, 2*time.Second, 10*time.Millisecond)

const loops = 100
done := make(chan struct{})
go func() {
defer close(done)
for range loops {
fakeLE.InvokeOnStopping()
time.Sleep(1 * time.Millisecond)
fakeLE.InvokeOnLeading()
time.Sleep(1 * time.Millisecond)
}
}()

select {
case <-done:
case <-time.After(10 * time.Second):
t.Fatal("leader flap goroutine timed out (possible deadlock)")
}

assert.NoError(t, kr.Shutdown(t.Context()))
}
Loading