Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion pkg/controller/controllercmd/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"time"

"k8s.io/client-go/tools/record"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog"
Expand Down Expand Up @@ -64,6 +65,7 @@ type ControllerBuilder struct {
leaderElection *configv1.LeaderElection
fileObserver fileobserver.Observer
fileObserverReactorFn func(file string, action fileobserver.ActionType) error
eventRecorderOptions record.CorrelatorOptions

startFunc StartFunc
componentName string
Expand Down Expand Up @@ -175,6 +177,14 @@ func (b *ControllerBuilder) WithInstanceIdentity(identity string) *ControllerBui
return b
}

// WithEventRecorderOptions allows to override the default Kubernetes event recorder correlator options.
// This is needed if the binary is sending a lot of events.
// Using events.DefaultOperatorEventRecorderOptions here makes a good default for normal operator binary.
func (b *ControllerBuilder) WithEventRecorderOptions(options record.CorrelatorOptions) *ControllerBuilder {
b.eventRecorderOptions = options
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expected to set it to the recommended options by default. Is that the case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return b
}

// Run starts your controller for you. It uses leader election if you asked, otherwise it directly calls you
func (b *ControllerBuilder) Run(ctx context.Context, config *unstructured.Unstructured) error {
clientConfig, err := b.getClientConfig()
Expand All @@ -195,7 +205,7 @@ func (b *ControllerBuilder) Run(ctx context.Context, config *unstructured.Unstru
if err != nil {
klog.Warningf("unable to get owner reference (falling back to namespace): %v", err)
}
eventRecorder := events.NewKubeRecorder(kubeClient.CoreV1().Events(namespace), b.componentName, controllerRef)
eventRecorder := events.NewKubeRecorderWithOptions(kubeClient.CoreV1().Events(namespace), b.eventRecorderOptions, b.componentName, controllerRef)

// if there is file observer defined for this command, add event into default reaction function.
if b.fileObserverReactorFn != nil {
Expand Down Expand Up @@ -298,6 +308,7 @@ func (b ControllerBuilder) getOnStartedLeadingFunc(controllerContext *Controller

select {
case <-ctx.Done(): // context closed means the process likely received signal to terminate
controllerContext.EventRecorder.Shutdown()
case <-stoppedCh:
// if context was not cancelled (it is not "done"), but the startFunc terminated, it means it terminated prematurely
// when this happen, it means the controllers terminated without error.
Expand Down
14 changes: 8 additions & 6 deletions pkg/controller/controllercmd/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strings"
"testing"
"time"

"github.com/openshift/library-go/pkg/operator/events/eventstesting"
)

func TestControllerBuilder_getOnStartedLeadingFunc(t *testing.T) {
Expand All @@ -21,7 +23,7 @@ func TestControllerBuilder_getOnStartedLeadingFunc(t *testing.T) {
}

// controllers finished prematurely, without being asked to finish
b.getOnStartedLeadingFunc(&ControllerContext{}, 3*time.Second)(context.TODO())
b.getOnStartedLeadingFunc(&ControllerContext{EventRecorder: eventstesting.NewTestingEventRecorder(t)}, 3*time.Second)(context.TODO())
if len(nonZeroExits) != 1 || !strings.Contains(nonZeroExits[0], "controllers terminated prematurely") {
t.Errorf("expected controllers to exit prematurely, got %#v", nonZeroExits)
}
Expand All @@ -37,7 +39,7 @@ func TestControllerBuilder_getOnStartedLeadingFunc(t *testing.T) {
time.Sleep(2 * time.Second)
return nil
}
b.getOnStartedLeadingFunc(&ControllerContext{}, 5*time.Second)(ctx)
b.getOnStartedLeadingFunc(&ControllerContext{EventRecorder: eventstesting.NewTestingEventRecorder(t)}, 5*time.Second)(ctx)
if len(nonZeroExits) > 0 {
t.Errorf("expected controllers to exit gracefully, but got %#v", nonZeroExits)
}
Expand All @@ -53,7 +55,7 @@ func TestControllerBuilder_getOnStartedLeadingFunc(t *testing.T) {
time.Sleep(3 * time.Second)
return nil
}
b.getOnStartedLeadingFunc(&ControllerContext{}, 1*time.Second)(ctx)
b.getOnStartedLeadingFunc(&ControllerContext{EventRecorder: eventstesting.NewTestingEventRecorder(t)}, 1*time.Second)(ctx)
if len(nonZeroExits) != 1 && !strings.Contains(nonZeroExits[0], "some controllers failed to shutdown in 1s") {
t.Errorf("expected controllers to failed finish in 1s, got %#v", nonZeroExits)
}
Expand Down Expand Up @@ -86,7 +88,7 @@ func TestControllerBuilder_GracefulShutdown(t *testing.T) {
stoppedCh := make(chan struct{})
go func() {
defer close(stoppedCh)
b.getOnStartedLeadingFunc(&ControllerContext{}, 10*time.Second)(ctx)
b.getOnStartedLeadingFunc(&ControllerContext{EventRecorder: eventstesting.NewTestingEventRecorder(t)}, 10*time.Second)(ctx)
}()

select {
Expand Down Expand Up @@ -118,7 +120,7 @@ func TestControllerBuilder_OnLeadingFunc_ControllerError(t *testing.T) {

go func() {
defer close(stoppedCh)
b.getOnStartedLeadingFunc(&ControllerContext{}, 10*time.Second)(ctx)
b.getOnStartedLeadingFunc(&ControllerContext{EventRecorder: eventstesting.NewTestingEventRecorder(t)}, 10*time.Second)(ctx)
}()

<-startedCh
Expand Down Expand Up @@ -171,7 +173,7 @@ func TestControllerBuilder_OnLeadingFunc_NonZeroExit(t *testing.T) {
}()

go func() {
b.getOnStartedLeadingFunc(&ControllerContext{}, time.Second)(ctx) // graceful time is just 1s
b.getOnStartedLeadingFunc(&ControllerContext{EventRecorder: eventstesting.NewTestingEventRecorder(t)}, time.Second)(ctx) // graceful time is just 1s
}()

select {
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/controllercmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/openshift/library-go/pkg/config/configdefaults"
"github.com/openshift/library-go/pkg/controller/fileobserver"
"github.com/openshift/library-go/pkg/crypto"
"github.com/openshift/library-go/pkg/operator/events"
"github.com/openshift/library-go/pkg/serviceability"

// for metrics
Expand Down Expand Up @@ -267,6 +268,7 @@ func (c *ControllerCommandConfig) StartController(ctx context.Context) error {
WithComponentNamespace(c.basicFlags.Namespace).
WithLeaderElection(config.LeaderElection, c.basicFlags.Namespace, c.componentName+"-lock").
WithVersion(c.version).
WithEventRecorderOptions(events.RecommendedClusterSingletonCorrelatorOptions()).
WithRestartOnChange(exitOnChangeReactorCh, startingFileContent, observedFiles...)

if !c.DisableServing {
Expand Down
2 changes: 2 additions & 0 deletions pkg/operator/events/eventstesting/recorder_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ func (r *TestingEventRecorder) ForComponent(c string) events.Recorder {
return &TestingEventRecorder{t: r.t, component: c}
}

func (r *TestingEventRecorder) Shutdown() {}

func (r *TestingEventRecorder) WithComponentSuffix(suffix string) events.Recorder {
return r.ForComponent(fmt.Sprintf("%s-%s", r.ComponentName(), suffix))
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/operator/events/eventstesting/recorder_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ func (e *EventRecorder) Event(reason, message string) {
e.testingEventRecorder.Event(reason, message)
}

func (e *EventRecorder) Shutdown() {}

func (e *EventRecorder) Eventf(reason, messageFmt string, args ...interface{}) {
e.realEventRecorder.Eventf(reason, messageFmt, args...)
e.testingEventRecorder.Eventf(reason, messageFmt, args...)
Expand Down
4 changes: 4 additions & 0 deletions pkg/operator/events/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type Recorder interface {
// ComponentName returns the current source component name for the event.
// This allows to suffix the original component name with 'sub-component'.
ComponentName() string

Shutdown()
}

// podNameEnv is a name of environment variable inside container that specifies the name of the current replica set.
Expand Down Expand Up @@ -159,6 +161,8 @@ func (r *recorder) ComponentName() string {
return r.sourceComponent
}

func (r *recorder) Shutdown() {}

func (r *recorder) ForComponent(componentName string) Recorder {
newRecorderForComponent := *r
newRecorderForComponent.sourceComponent = componentName
Expand Down
2 changes: 2 additions & 0 deletions pkg/operator/events/recorder_in_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ func (r *inMemoryEventRecorder) ComponentName() string {
return r.source
}

func (r *inMemoryEventRecorder) Shutdown() {}

func (r *inMemoryEventRecorder) ForComponent(component string) Recorder {
r.Lock()
defer r.Unlock()
Expand Down
2 changes: 2 additions & 0 deletions pkg/operator/events/recorder_logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ func (r *LoggingEventRecorder) ForComponent(component string) Recorder {
return &newRecorder
}

func (r *LoggingEventRecorder) Shutdown() {}

func (r *LoggingEventRecorder) WithComponentSuffix(suffix string) Recorder {
return r.ForComponent(fmt.Sprintf("%s-%s", r.ComponentName(), suffix))
}
Expand Down
108 changes: 102 additions & 6 deletions pkg/operator/events/recorder_upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,120 @@ package events

import (
"fmt"

"k8s.io/klog"
"strings"
"sync"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/scheme"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog"
)

// NewKubeRecorder returns new event recorder.
func NewKubeRecorder(client corev1client.EventInterface, sourceComponentName string, involvedObjectRef *corev1.ObjectReference) Recorder {
// NewKubeRecorder returns new event recorder with tweaked correlator options.
func NewKubeRecorderWithOptions(client corev1client.EventInterface, options record.CorrelatorOptions, sourceComponentName string, involvedObjectRef *corev1.ObjectReference) Recorder {
return (&upstreamRecorder{
client: client,
component: sourceComponentName,
involvedObjectRef: involvedObjectRef,
options: options,
fallbackRecorder: NewRecorder(client, sourceComponentName, involvedObjectRef),
}).ForComponent(sourceComponentName)
}

// NewKubeRecorder returns new event recorder with default correlator options.
func NewKubeRecorder(client corev1client.EventInterface, sourceComponentName string, involvedObjectRef *corev1.ObjectReference) Recorder {
return NewKubeRecorderWithOptions(client, record.CorrelatorOptions{}, sourceComponentName, involvedObjectRef)
}

// upstreamRecorder is an implementation of Recorder interface.
type upstreamRecorder struct {
client corev1client.EventInterface
component string
broadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
involvedObjectRef *corev1.ObjectReference
options record.CorrelatorOptions

// shuttingDown indicates that the broadcaster for this recorder is being shut down
shuttingDown bool
shutdownMutex sync.RWMutex

// fallbackRecorder is used when the kube recorder is shutting down
// in that case we create the events directly.
fallbackRecorder Recorder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh what a wicked web we weave

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i can just log the events that come after shutdown... i think the chance we leak events after shutdown is triggered is really small (the window is basically ~1-2s, but this makes sure we don't miss any event at all...

}

// RecommendedClusterSingletonCorrelatorOptions provides recommended event correlator options for components that produce
// many events (like operators).
func RecommendedClusterSingletonCorrelatorOptions() record.CorrelatorOptions {
return record.CorrelatorOptions{
BurstSize: 60, // default: 25 (change allows a single source to send 50 events about object per minute)
QPS: 1. / 1., // default: 1/300 (change allows refill rate to 1 new event every 1s)
KeyFunc: func(event *corev1.Event) (aggregateKey string, localKey string) {
return strings.Join([]string{
event.Source.Component,
event.Source.Host,
event.InvolvedObject.Kind,
event.InvolvedObject.Namespace,
event.InvolvedObject.Name,
string(event.InvolvedObject.UID),
event.InvolvedObject.APIVersion,
event.Type,
event.Reason,
// By default, KeyFunc don't use message for aggregation, this cause events with different message, but same reason not be lost as "similar events".
event.Message,
}, ""), event.Message
},
}
}

var eventsCounterMetric = metrics.NewCounterVec(&metrics.CounterOpts{
Subsystem: "event_recorder",
Name: "total_events_count",
Help: "Total count of events processed by this event recorder per involved object",
StabilityLevel: metrics.ALPHA,
}, []string{"severity"})

func init() {
(&sync.Once{}).Do(func() {
legacyregistry.MustRegister(eventsCounterMetric)
})
}

func (r *upstreamRecorder) ForComponent(componentName string) Recorder {
newRecorderForComponent := *r
broadcaster := record.NewBroadcaster()
newRecorderForComponent := upstreamRecorder{
client: r.client,
fallbackRecorder: r.fallbackRecorder.WithComponentSuffix(componentName),
options: r.options,
involvedObjectRef: r.involvedObjectRef,
shuttingDown: r.shuttingDown,
}

// tweak the event correlator, so we don't loose important events.
broadcaster := record.NewBroadcasterWithCorrelatorOptions(r.options)
broadcaster.StartLogging(klog.Infof)
broadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: newRecorderForComponent.client})

newRecorderForComponent.eventRecorder = broadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: componentName})
newRecorderForComponent.broadcaster = broadcaster
newRecorderForComponent.component = componentName

return &newRecorderForComponent
}

func (r *upstreamRecorder) Shutdown() {
r.shutdownMutex.Lock()
r.shuttingDown = true
r.shutdownMutex.Unlock()
// Wait for broadcaster to flush events (this is blocking)
// TODO: There is still race condition in upstream that might cause panic() on events recorded after the shutdown
// is called as the event recording is not-blocking (go routine based).
r.broadcaster.Shutdown()
}

func (r *upstreamRecorder) WithComponentSuffix(suffix string) Recorder {
return r.ForComponent(fmt.Sprintf("%s-%s", r.ComponentName(), suffix))
}
Expand All @@ -59,12 +134,33 @@ func (r *upstreamRecorder) Warningf(reason, messageFmt string, args ...interface
r.Warning(reason, fmt.Sprintf(messageFmt, args...))
}

func (r *upstreamRecorder) incrementEventsCounter(severity string) {
if r.involvedObjectRef == nil {
return
}
eventsCounterMetric.WithLabelValues(severity).Inc()
}

// Event emits the normal type event.
func (r *upstreamRecorder) Event(reason, message string) {
r.shutdownMutex.RLock()
defer r.shutdownMutex.RUnlock()
defer r.incrementEventsCounter(corev1.EventTypeNormal)
if r.shuttingDown {
r.fallbackRecorder.Event(reason, message)
return
}
r.eventRecorder.Event(r.involvedObjectRef, corev1.EventTypeNormal, reason, message)
}

// Warning emits the warning type event.
func (r *upstreamRecorder) Warning(reason, message string) {
r.shutdownMutex.RLock()
defer r.shutdownMutex.RUnlock()
defer r.incrementEventsCounter(corev1.EventTypeWarning)
if r.shuttingDown {
r.fallbackRecorder.Warning(reason, message)
return
}
r.eventRecorder.Event(r.involvedObjectRef, corev1.EventTypeWarning, reason, message)
}
Loading