diff --git a/pkg/controller/controllercmd/builder.go b/pkg/controller/controllercmd/builder.go index 850b8ecc9d..2fcf74c523 100644 --- a/pkg/controller/controllercmd/builder.go +++ b/pkg/controller/controllercmd/builder.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "k8s.io/client-go/tools/record" "k8s.io/component-base/metrics" "k8s.io/component-base/metrics/legacyregistry" "k8s.io/klog" @@ -64,6 +65,7 @@ type ControllerBuilder struct { leaderElection *configv1.LeaderElection fileObserver fileobserver.Observer fileObserverReactorFn func(file string, action fileobserver.ActionType) error + eventRecorderOptions record.CorrelatorOptions startFunc StartFunc componentName string @@ -175,6 +177,14 @@ func (b *ControllerBuilder) WithInstanceIdentity(identity string) *ControllerBui return b } +// WithEventRecorderOptions allows to override the default Kubernetes event recorder correlator options. +// This is needed if the binary is sending a lot of events. +// Using events.DefaultOperatorEventRecorderOptions here makes a good default for normal operator binary. +func (b *ControllerBuilder) WithEventRecorderOptions(options record.CorrelatorOptions) *ControllerBuilder { + b.eventRecorderOptions = options + return b +} + // Run starts your controller for you. It uses leader election if you asked, otherwise it directly calls you func (b *ControllerBuilder) Run(ctx context.Context, config *unstructured.Unstructured) error { clientConfig, err := b.getClientConfig() @@ -195,7 +205,7 @@ func (b *ControllerBuilder) Run(ctx context.Context, config *unstructured.Unstru if err != nil { klog.Warningf("unable to get owner reference (falling back to namespace): %v", err) } - eventRecorder := events.NewKubeRecorder(kubeClient.CoreV1().Events(namespace), b.componentName, controllerRef) + eventRecorder := events.NewKubeRecorderWithOptions(kubeClient.CoreV1().Events(namespace), b.eventRecorderOptions, b.componentName, controllerRef) // if there is file observer defined for this command, add event into default reaction function. if b.fileObserverReactorFn != nil { @@ -298,6 +308,7 @@ func (b ControllerBuilder) getOnStartedLeadingFunc(controllerContext *Controller select { case <-ctx.Done(): // context closed means the process likely received signal to terminate + controllerContext.EventRecorder.Shutdown() case <-stoppedCh: // if context was not cancelled (it is not "done"), but the startFunc terminated, it means it terminated prematurely // when this happen, it means the controllers terminated without error. diff --git a/pkg/controller/controllercmd/builder_test.go b/pkg/controller/controllercmd/builder_test.go index 79ddcdc441..d180d35e0b 100644 --- a/pkg/controller/controllercmd/builder_test.go +++ b/pkg/controller/controllercmd/builder_test.go @@ -6,6 +6,8 @@ import ( "strings" "testing" "time" + + "github.com/openshift/library-go/pkg/operator/events/eventstesting" ) func TestControllerBuilder_getOnStartedLeadingFunc(t *testing.T) { @@ -21,7 +23,7 @@ func TestControllerBuilder_getOnStartedLeadingFunc(t *testing.T) { } // controllers finished prematurely, without being asked to finish - b.getOnStartedLeadingFunc(&ControllerContext{}, 3*time.Second)(context.TODO()) + b.getOnStartedLeadingFunc(&ControllerContext{EventRecorder: eventstesting.NewTestingEventRecorder(t)}, 3*time.Second)(context.TODO()) if len(nonZeroExits) != 1 || !strings.Contains(nonZeroExits[0], "controllers terminated prematurely") { t.Errorf("expected controllers to exit prematurely, got %#v", nonZeroExits) } @@ -37,7 +39,7 @@ func TestControllerBuilder_getOnStartedLeadingFunc(t *testing.T) { time.Sleep(2 * time.Second) return nil } - b.getOnStartedLeadingFunc(&ControllerContext{}, 5*time.Second)(ctx) + b.getOnStartedLeadingFunc(&ControllerContext{EventRecorder: eventstesting.NewTestingEventRecorder(t)}, 5*time.Second)(ctx) if len(nonZeroExits) > 0 { t.Errorf("expected controllers to exit gracefully, but got %#v", nonZeroExits) } @@ -53,7 +55,7 @@ func TestControllerBuilder_getOnStartedLeadingFunc(t *testing.T) { time.Sleep(3 * time.Second) return nil } - b.getOnStartedLeadingFunc(&ControllerContext{}, 1*time.Second)(ctx) + b.getOnStartedLeadingFunc(&ControllerContext{EventRecorder: eventstesting.NewTestingEventRecorder(t)}, 1*time.Second)(ctx) if len(nonZeroExits) != 1 && !strings.Contains(nonZeroExits[0], "some controllers failed to shutdown in 1s") { t.Errorf("expected controllers to failed finish in 1s, got %#v", nonZeroExits) } @@ -86,7 +88,7 @@ func TestControllerBuilder_GracefulShutdown(t *testing.T) { stoppedCh := make(chan struct{}) go func() { defer close(stoppedCh) - b.getOnStartedLeadingFunc(&ControllerContext{}, 10*time.Second)(ctx) + b.getOnStartedLeadingFunc(&ControllerContext{EventRecorder: eventstesting.NewTestingEventRecorder(t)}, 10*time.Second)(ctx) }() select { @@ -118,7 +120,7 @@ func TestControllerBuilder_OnLeadingFunc_ControllerError(t *testing.T) { go func() { defer close(stoppedCh) - b.getOnStartedLeadingFunc(&ControllerContext{}, 10*time.Second)(ctx) + b.getOnStartedLeadingFunc(&ControllerContext{EventRecorder: eventstesting.NewTestingEventRecorder(t)}, 10*time.Second)(ctx) }() <-startedCh @@ -171,7 +173,7 @@ func TestControllerBuilder_OnLeadingFunc_NonZeroExit(t *testing.T) { }() go func() { - b.getOnStartedLeadingFunc(&ControllerContext{}, time.Second)(ctx) // graceful time is just 1s + b.getOnStartedLeadingFunc(&ControllerContext{EventRecorder: eventstesting.NewTestingEventRecorder(t)}, time.Second)(ctx) // graceful time is just 1s }() select { diff --git a/pkg/controller/controllercmd/cmd.go b/pkg/controller/controllercmd/cmd.go index 1e603efe16..0e9d1201ab 100644 --- a/pkg/controller/controllercmd/cmd.go +++ b/pkg/controller/controllercmd/cmd.go @@ -24,6 +24,7 @@ import ( "github.com/openshift/library-go/pkg/config/configdefaults" "github.com/openshift/library-go/pkg/controller/fileobserver" "github.com/openshift/library-go/pkg/crypto" + "github.com/openshift/library-go/pkg/operator/events" "github.com/openshift/library-go/pkg/serviceability" // for metrics @@ -267,6 +268,7 @@ func (c *ControllerCommandConfig) StartController(ctx context.Context) error { WithComponentNamespace(c.basicFlags.Namespace). WithLeaderElection(config.LeaderElection, c.basicFlags.Namespace, c.componentName+"-lock"). WithVersion(c.version). + WithEventRecorderOptions(events.RecommendedClusterSingletonCorrelatorOptions()). WithRestartOnChange(exitOnChangeReactorCh, startingFileContent, observedFiles...) if !c.DisableServing { diff --git a/pkg/operator/events/eventstesting/recorder_testing.go b/pkg/operator/events/eventstesting/recorder_testing.go index 83ea0e88d4..b86fd35af5 100644 --- a/pkg/operator/events/eventstesting/recorder_testing.go +++ b/pkg/operator/events/eventstesting/recorder_testing.go @@ -25,6 +25,8 @@ func (r *TestingEventRecorder) ForComponent(c string) events.Recorder { return &TestingEventRecorder{t: r.t, component: c} } +func (r *TestingEventRecorder) Shutdown() {} + func (r *TestingEventRecorder) WithComponentSuffix(suffix string) events.Recorder { return r.ForComponent(fmt.Sprintf("%s-%s", r.ComponentName(), suffix)) } diff --git a/pkg/operator/events/eventstesting/recorder_wrapper.go b/pkg/operator/events/eventstesting/recorder_wrapper.go index 38bc9a523d..97e5ec340b 100644 --- a/pkg/operator/events/eventstesting/recorder_wrapper.go +++ b/pkg/operator/events/eventstesting/recorder_wrapper.go @@ -23,6 +23,8 @@ func (e *EventRecorder) Event(reason, message string) { e.testingEventRecorder.Event(reason, message) } +func (e *EventRecorder) Shutdown() {} + func (e *EventRecorder) Eventf(reason, messageFmt string, args ...interface{}) { e.realEventRecorder.Eventf(reason, messageFmt, args...) e.testingEventRecorder.Eventf(reason, messageFmt, args...) diff --git a/pkg/operator/events/recorder.go b/pkg/operator/events/recorder.go index bb8e8ddafd..7139c1b695 100644 --- a/pkg/operator/events/recorder.go +++ b/pkg/operator/events/recorder.go @@ -33,6 +33,8 @@ type Recorder interface { // ComponentName returns the current source component name for the event. // This allows to suffix the original component name with 'sub-component'. ComponentName() string + + Shutdown() } // podNameEnv is a name of environment variable inside container that specifies the name of the current replica set. @@ -159,6 +161,8 @@ func (r *recorder) ComponentName() string { return r.sourceComponent } +func (r *recorder) Shutdown() {} + func (r *recorder) ForComponent(componentName string) Recorder { newRecorderForComponent := *r newRecorderForComponent.sourceComponent = componentName diff --git a/pkg/operator/events/recorder_in_memory.go b/pkg/operator/events/recorder_in_memory.go index b64d9f6a98..27103cd704 100644 --- a/pkg/operator/events/recorder_in_memory.go +++ b/pkg/operator/events/recorder_in_memory.go @@ -37,6 +37,8 @@ func (r *inMemoryEventRecorder) ComponentName() string { return r.source } +func (r *inMemoryEventRecorder) Shutdown() {} + func (r *inMemoryEventRecorder) ForComponent(component string) Recorder { r.Lock() defer r.Unlock() diff --git a/pkg/operator/events/recorder_logging.go b/pkg/operator/events/recorder_logging.go index 7f3b5cd8bd..5a8ec765a1 100644 --- a/pkg/operator/events/recorder_logging.go +++ b/pkg/operator/events/recorder_logging.go @@ -26,6 +26,8 @@ func (r *LoggingEventRecorder) ForComponent(component string) Recorder { return &newRecorder } +func (r *LoggingEventRecorder) Shutdown() {} + func (r *LoggingEventRecorder) WithComponentSuffix(suffix string) Recorder { return r.ForComponent(fmt.Sprintf("%s-%s", r.ComponentName(), suffix)) } diff --git a/pkg/operator/events/recorder_upstream.go b/pkg/operator/events/recorder_upstream.go index 359d2eb81e..3eb2387536 100644 --- a/pkg/operator/events/recorder_upstream.go +++ b/pkg/operator/events/recorder_upstream.go @@ -2,24 +2,34 @@ package events import ( "fmt" - - "k8s.io/klog" + "strings" + "sync" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes/scheme" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" + "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/legacyregistry" + "k8s.io/klog" ) -// NewKubeRecorder returns new event recorder. -func NewKubeRecorder(client corev1client.EventInterface, sourceComponentName string, involvedObjectRef *corev1.ObjectReference) Recorder { +// NewKubeRecorder returns new event recorder with tweaked correlator options. +func NewKubeRecorderWithOptions(client corev1client.EventInterface, options record.CorrelatorOptions, sourceComponentName string, involvedObjectRef *corev1.ObjectReference) Recorder { return (&upstreamRecorder{ client: client, component: sourceComponentName, involvedObjectRef: involvedObjectRef, + options: options, + fallbackRecorder: NewRecorder(client, sourceComponentName, involvedObjectRef), }).ForComponent(sourceComponentName) } +// NewKubeRecorder returns new event recorder with default correlator options. +func NewKubeRecorder(client corev1client.EventInterface, sourceComponentName string, involvedObjectRef *corev1.ObjectReference) Recorder { + return NewKubeRecorderWithOptions(client, record.CorrelatorOptions{}, sourceComponentName, involvedObjectRef) +} + // upstreamRecorder is an implementation of Recorder interface. type upstreamRecorder struct { client corev1client.EventInterface @@ -27,20 +37,85 @@ type upstreamRecorder struct { broadcaster record.EventBroadcaster eventRecorder record.EventRecorder involvedObjectRef *corev1.ObjectReference + options record.CorrelatorOptions + + // shuttingDown indicates that the broadcaster for this recorder is being shut down + shuttingDown bool + shutdownMutex sync.RWMutex + + // fallbackRecorder is used when the kube recorder is shutting down + // in that case we create the events directly. + fallbackRecorder Recorder +} + +// RecommendedClusterSingletonCorrelatorOptions provides recommended event correlator options for components that produce +// many events (like operators). +func RecommendedClusterSingletonCorrelatorOptions() record.CorrelatorOptions { + return record.CorrelatorOptions{ + BurstSize: 60, // default: 25 (change allows a single source to send 50 events about object per minute) + QPS: 1. / 1., // default: 1/300 (change allows refill rate to 1 new event every 1s) + KeyFunc: func(event *corev1.Event) (aggregateKey string, localKey string) { + return strings.Join([]string{ + event.Source.Component, + event.Source.Host, + event.InvolvedObject.Kind, + event.InvolvedObject.Namespace, + event.InvolvedObject.Name, + string(event.InvolvedObject.UID), + event.InvolvedObject.APIVersion, + event.Type, + event.Reason, + // By default, KeyFunc don't use message for aggregation, this cause events with different message, but same reason not be lost as "similar events". + event.Message, + }, ""), event.Message + }, + } +} + +var eventsCounterMetric = metrics.NewCounterVec(&metrics.CounterOpts{ + Subsystem: "event_recorder", + Name: "total_events_count", + Help: "Total count of events processed by this event recorder per involved object", + StabilityLevel: metrics.ALPHA, +}, []string{"severity"}) + +func init() { + (&sync.Once{}).Do(func() { + legacyregistry.MustRegister(eventsCounterMetric) + }) } func (r *upstreamRecorder) ForComponent(componentName string) Recorder { - newRecorderForComponent := *r - broadcaster := record.NewBroadcaster() + newRecorderForComponent := upstreamRecorder{ + client: r.client, + fallbackRecorder: r.fallbackRecorder.WithComponentSuffix(componentName), + options: r.options, + involvedObjectRef: r.involvedObjectRef, + shuttingDown: r.shuttingDown, + } + + // tweak the event correlator, so we don't loose important events. + broadcaster := record.NewBroadcasterWithCorrelatorOptions(r.options) broadcaster.StartLogging(klog.Infof) broadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: newRecorderForComponent.client}) newRecorderForComponent.eventRecorder = broadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: componentName}) + newRecorderForComponent.broadcaster = broadcaster newRecorderForComponent.component = componentName return &newRecorderForComponent } +func (r *upstreamRecorder) Shutdown() { + r.shutdownMutex.Lock() + r.shuttingDown = true + r.shutdownMutex.Unlock() + // Wait for broadcaster to flush events (this is blocking) + // TODO: There is still race condition in upstream that might cause panic() on events recorded after the shutdown + // is called as the event recording is not-blocking (go routine based). + r.broadcaster.Shutdown() +} + func (r *upstreamRecorder) WithComponentSuffix(suffix string) Recorder { return r.ForComponent(fmt.Sprintf("%s-%s", r.ComponentName(), suffix)) } @@ -59,12 +134,33 @@ func (r *upstreamRecorder) Warningf(reason, messageFmt string, args ...interface r.Warning(reason, fmt.Sprintf(messageFmt, args...)) } +func (r *upstreamRecorder) incrementEventsCounter(severity string) { + if r.involvedObjectRef == nil { + return + } + eventsCounterMetric.WithLabelValues(severity).Inc() +} + // Event emits the normal type event. func (r *upstreamRecorder) Event(reason, message string) { + r.shutdownMutex.RLock() + defer r.shutdownMutex.RUnlock() + defer r.incrementEventsCounter(corev1.EventTypeNormal) + if r.shuttingDown { + r.fallbackRecorder.Event(reason, message) + return + } r.eventRecorder.Event(r.involvedObjectRef, corev1.EventTypeNormal, reason, message) } // Warning emits the warning type event. func (r *upstreamRecorder) Warning(reason, message string) { + r.shutdownMutex.RLock() + defer r.shutdownMutex.RUnlock() + defer r.incrementEventsCounter(corev1.EventTypeWarning) + if r.shuttingDown { + r.fallbackRecorder.Warning(reason, message) + return + } r.eventRecorder.Event(r.involvedObjectRef, corev1.EventTypeWarning, reason, message) } diff --git a/pkg/operator/events/recorder_upstream_test.go b/pkg/operator/events/recorder_upstream_test.go new file mode 100644 index 0000000000..63981c4d55 --- /dev/null +++ b/pkg/operator/events/recorder_upstream_test.go @@ -0,0 +1,141 @@ +package events + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/record" +) + +var fakeObjectReference = &v1.ObjectReference{ + Kind: "Deployment", + Namespace: "operator-namespace", + Name: "operator", + UID: "33ff5131-0396-4536-8ef0-8f31b397adee", + APIVersion: "apps/v1", +} + +func TestUpstreamRecorder_Correlator(t *testing.T) { + tests := []struct { + name string + options record.CorrelatorOptions + runEvents func(Recorder) + evalEvents func(*v1.EventList, *testing.T) + }{ + { + name: "correlate events similar messages with default event recorder", + options: record.CorrelatorOptions{}, + runEvents: func(r Recorder) { + for i := 0; i <= 30; i++ { + time.Sleep(10 * time.Millisecond) + r.Event("TestReason", fmt.Sprintf("test message %d", i)) + } + }, + evalEvents: func(events *v1.EventList, t *testing.T) { + if len(events.Items) < 10 { + t.Errorf("expected 10 events, got %d", len(events.Items)) + } + if lastEventMessage := events.Items[len(events.Items)-1].Message; !strings.Contains(lastEventMessage, "combined from similar events") { + t.Errorf("expected last event to be combined, got %q", lastEventMessage) + } + }, + }, + { + name: "do not correlate events with similar messages with operator options", + options: RecommendedClusterSingletonCorrelatorOptions(), + runEvents: func(r Recorder) { + for i := 0; i <= 30; i++ { + time.Sleep(10 * time.Millisecond) + r.Event("TestReason", fmt.Sprintf("test message %d", i)) + } + }, + evalEvents: func(events *v1.EventList, t *testing.T) { + if len(events.Items) < 30 { + t.Errorf("expected 30 events, got %d", len(events.Items)) + } + for _, e := range events.Items { + if strings.Contains(e.Message, "combined") { + t.Errorf("expected no combined messaged, got %q", e.Message) + break + } + } + }, + }, + { + name: "correlate events with same messages with operator options", + options: RecommendedClusterSingletonCorrelatorOptions(), + runEvents: func(r Recorder) { + for i := 0; i <= 30; i++ { + time.Sleep(10 * time.Millisecond) + r.Event("TestReason", "test message") + } + }, + evalEvents: func(events *v1.EventList, t *testing.T) { + if len(events.Items) != 1 { + t.Errorf("expected 1 event, got %d", len(events.Items)) + } + if count := events.Items[0].Count; count < 30 { + t.Errorf("expected the event have count of 30, got %d", count) + } + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + client := fake.NewSimpleClientset() + testRecorder := NewKubeRecorderWithOptions(client.CoreV1().Events("operator-namespace"), test.options, "test", fakeObjectReference).WithComponentSuffix("suffix") + test.runEvents(testRecorder) + + events, err := client.CoreV1().Events("operator-namespace").List(context.TODO(), metav1.ListOptions{}) + if err != nil { + t.Fatal(err) + } + test.evalEvents(events, t) + }) + } + +} + +/* +// TODO: This test is racy, because upstream event recorder is non-blocking... which means the all events are created as go-routines with non-fallback recorder... +func TestUpstreamRecorder_Shutdown(t *testing.T) { + for testCount := 1; testCount != 10; testCount++ { + t.Run(fmt.Sprintf("test_%d", testCount), func(t *testing.T) { + client := fake.NewSimpleClientset() + fallbackClient := fake.NewSimpleClientset() + recorder := NewKubeRecorder(client.CoreV1().Events("operator-namespace"), RecommendedClusterSingletonCorrelatorOptions(), "test", fakeObjectReference) + recorder.(*upstreamRecorder).fallbackRecorder = NewKubeRecorder(fallbackClient.CoreV1().Events("operator-namespace"), RecommendedClusterSingletonCorrelatorOptions(), "test", fakeObjectReference) + + eventsSendChan := make(chan struct{}) + eventsDoneChan := make(chan struct{}) + go func() { + defer close(eventsDoneChan) + counter := 0 + for i := 0; i <= 50; i++ { + time.Sleep(5 * time.Millisecond) + recorder.Eventf(fmt.Sprintf("TestReason%d", i), "test message %d", i) + counter++ + if counter == 10 { + // at this point the recorder should switch to fallback client + close(eventsSendChan) + } + } + }() + + <-eventsSendChan + recorder.Shutdown() + <-eventsDoneChan + + t.Logf("client actions: %d", len(client.Actions())) + t.Logf("fallback client actions: %d", len(fallbackClient.Actions())) + }) + } +} +*/ diff --git a/pkg/operator/staticpod/certsyncpod/certsync_cmd.go b/pkg/operator/staticpod/certsyncpod/certsync_cmd.go index cb0508a0fb..359adb3102 100644 --- a/pkg/operator/staticpod/certsyncpod/certsync_cmd.go +++ b/pkg/operator/staticpod/certsyncpod/certsync_cmd.go @@ -82,7 +82,7 @@ func (o *CertSyncControllerOptions) Run() error { kubeInformers := informers.NewSharedInformerFactoryWithOptions(o.kubeClient, 10*time.Minute, informers.WithNamespace(o.Namespace)) - eventRecorder := events.NewKubeRecorder(o.kubeClient.CoreV1().Events(o.Namespace), "cert-syncer", + eventRecorder := events.NewKubeRecorderWithOptions(o.kubeClient.CoreV1().Events(o.Namespace), events.RecommendedClusterSingletonCorrelatorOptions(), "cert-syncer", &corev1.ObjectReference{ APIVersion: "v1", Kind: "Pod",