Skip to content
Merged
58 changes: 25 additions & 33 deletions receiver/k8seventsreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
k8s "k8s.io/client-go/kubernetes"
Expand All @@ -31,7 +32,7 @@ type k8seventsReceiver struct {
ctx context.Context
cancel context.CancelFunc
obsrecv *receiverhelper.ObsReport
wg sync.WaitGroup
mu sync.Mutex
}

// newReceiver creates the Kubernetes events receiver with the given configuration.
Expand Down Expand Up @@ -81,8 +82,13 @@ func (kr *k8seventsReceiver) Start(ctx context.Context, host component.Host) err

kr.settings.Logger.Info("registering the receiver in leader election")

// Register callbacks with the leader elector extension. These callbacks remain active
// for the lifetime of the receiver, allowing it to restart when leadership is regained.
elector.SetCallBackFuncs(
func(_ context.Context) {
func(ctx context.Context) {
cctx, cancel := context.WithCancel(ctx)
kr.cancel = cancel
kr.ctx = cctx
kr.settings.Logger.Info("Events Receiver started as leader")
if len(kr.config.Namespaces) == 0 {
kr.startWatch(corev1.NamespaceAll, k8sInterface)
Expand All @@ -92,12 +98,15 @@ func (kr *k8seventsReceiver) Start(ctx context.Context, host component.Host) err
}
}
},
// onStoppedLeading: stop watches, but DO NOT shut the whole receiver down
func() {
kr.settings.Logger.Info("no longer leader, stopping watches")
kr.stopWatches()
},
)
// Shutdown on leader loss. The receiver will restart if leadership is regained
// since the callbacks remain registered with the leader elector extension.
kr.settings.Logger.Info("no longer leader, stopping")
err := kr.Shutdown(context.Background())
Comment thread
ChrsMark marked this conversation as resolved.
if err != nil {
kr.settings.Logger.Error("shutdown receiver error:", zap.Error(err))
}
})
return nil
}

Expand All @@ -114,40 +123,27 @@ func (kr *k8seventsReceiver) Start(ctx context.Context, host component.Host) err
}

func (kr *k8seventsReceiver) Shutdown(context.Context) error {
// Stop informers and wait for them to exit.
kr.stopWatches()

if kr.cancel != nil {
kr.cancel()
kr.cancel = nil
}
return nil
}

// stopWatches closes all informer stop channels (idempotently) and waits for their goroutines to exit.
func (kr *k8seventsReceiver) stopWatches() {
if len(kr.stopperChanList) == 0 {
return
}
for _, ch := range kr.stopperChanList {
select {
case <-ch: // already closed
default:
close(ch)
}
kr.mu.Lock()
for _, stopperChan := range kr.stopperChanList {
close(stopperChan)
}
// Wait for all controller.Run goroutines to finish.
kr.wg.Wait()
// Reset slice so we can start again on leadership regain.
kr.stopperChanList = nil
kr.mu.Unlock()
return nil
}

// Add the 'Event' handler and trigger the watch for a specific namespace.
// For new and updated events, the code is relying on the following k8s code implementation:
// https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/client-go/tools/record/events_cache.go#L327
func (kr *k8seventsReceiver) startWatch(ns string, client k8s.Interface) {
stopperChan := make(chan struct{})
kr.mu.Lock()
kr.stopperChanList = append(kr.stopperChanList, stopperChan)
kr.mu.Unlock()
kr.startWatchingNamespace(client, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
if ev, ok := obj.(*corev1.Event); ok {
Expand All @@ -174,7 +170,7 @@ func (kr *k8seventsReceiver) handleEvent(ev *corev1.Event) {

// startWatchingNamespace creates an informer and starts
// watching a specific namespace for the events.
func (kr *k8seventsReceiver) startWatchingNamespace(
func (*k8seventsReceiver) startWatchingNamespace(
clientset k8s.Interface,
handlers cache.ResourceEventHandlerFuncs,
ns string,
Expand All @@ -188,11 +184,7 @@ func (kr *k8seventsReceiver) startWatchingNamespace(
ResyncPeriod: 0,
Handler: handlers,
})
kr.wg.Add(1)
go func() {
defer kr.wg.Done()
controller.Run(stopper)
}()
go controller.Run(stopper)
}

// Allow events with eventTimestamp(EventTime/LastTimestamp/FirstTimestamp)
Expand Down
15 changes: 7 additions & 8 deletions receiver/k8seventsreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,6 @@ func TestReceiverWithLeaderElection(t *testing.T) {
recv := r.(*k8seventsReceiver)

require.NoError(t, r.Start(t.Context(), host))
t.Cleanup(func() {
assert.NoError(t, r.Shutdown(t.Context()))
})

// Become leader: start processing events
le.InvokeOnLeading()
Expand All @@ -164,21 +161,23 @@ func TestReceiverWithLeaderElection(t *testing.T) {
return sink.LogRecordCount() == 1
}, 5*time.Second, 100*time.Millisecond, "logs not collected while leader")

// lose leadership
// lose leadership - this will trigger Shutdown()
le.InvokeOnStopping()

// DO NOT call recv.handleEvent(...) here; informer wouldn't deliver to this instance.
// Give a tiny moment and ensure count stays 1.
// Verify count remains at 1 after losing leadership
time.Sleep(100 * time.Millisecond)
assert.Equal(t, 1, sink.LogRecordCount(), "event should be ignored after losing leadership")
require.Equal(t, 1, sink.LogRecordCount(), "count should remain at 1 after losing leadership")

// regain leadership and inject again
// regain leadership
le.InvokeOnLeading()
recv.handleEvent(getEvent("Normal"))

require.Eventually(t, func() bool {
return sink.LogRecordCount() == 2
}, 5*time.Second, 100*time.Millisecond, "logs not collected after regaining leadership")

// Final cleanup
require.NoError(t, r.Shutdown(t.Context()))
}

func getEvent(eventType string) *corev1.Event {
Expand Down