diff --git a/controller/controller.go b/controller/controller.go index a3ef2e8b23..c8bb6ff185 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -189,8 +189,10 @@ type Controller struct { DomainFilter endpoint.DomainFilter // The nextRunAt used for throttling and batching reconciliation nextRunAt time.Time - // The nextRunAtMux is for atomic updating of nextRunAt - nextRunAtMux sync.Mutex + // The runAtMutex is for atomic updating of nextRunAt and lastRunAt + runAtMutex sync.Mutex + // The lastRunAt used for throttling and batching reconciliation + lastRunAt time.Time // MangedRecordTypes are DNS record types that will be considered for management. ManagedRecordTypes []string // ExcludeRecordTypes are DNS record types that will be excluded from management. @@ -203,6 +205,10 @@ type Controller struct { func (c *Controller) RunOnce(ctx context.Context) error { lastReconcileTimestamp.SetToCurrentTime() + c.runAtMutex.Lock() + c.lastRunAt = time.Now() + c.runAtMutex.Unlock() + records, err := c.Registry.Records(ctx) if err != nil { registryErrorsTotal.Inc() @@ -264,6 +270,24 @@ func (c *Controller) RunOnce(ctx context.Context) error { return nil } +func earliest(r time.Time, times ...time.Time) time.Time { + for _, t := range times { + if t.Before(r) { + r = t + } + } + return r +} + +func latest(r time.Time, times ...time.Time) time.Time { + for _, t := range times { + if t.After(r) { + r = t + } + } + return r +} + // Counts the intersections of A and AAAA records in endpoint and registry. func countMatchingAddressRecords(endpoints []*endpoint.Endpoint, registryRecords []*endpoint.Endpoint) (int, int) { recordsMap := make(map[string]map[string]struct{}) @@ -306,18 +330,20 @@ func countAddressRecords(endpoints []*endpoint.Endpoint) (int, int) { // ScheduleRunOnce makes sure execution happens at most once per interval. func (c *Controller) ScheduleRunOnce(now time.Time) { - c.nextRunAtMux.Lock() - defer c.nextRunAtMux.Unlock() - // schedule only if a reconciliation is not already planned - // to happen in the following c.MinEventSyncInterval - if !c.nextRunAt.Before(now.Add(c.MinEventSyncInterval)) { - c.nextRunAt = now.Add(c.MinEventSyncInterval) - } + c.runAtMutex.Lock() + defer c.runAtMutex.Unlock() + c.nextRunAt = latest( + c.lastRunAt.Add(c.MinEventSyncInterval), + earliest( + now.Add(5*time.Second), + c.nextRunAt, + ), + ) } func (c *Controller) ShouldRunOnce(now time.Time) bool { - c.nextRunAtMux.Lock() - defer c.nextRunAtMux.Unlock() + c.runAtMutex.Lock() + defer c.runAtMutex.Unlock() if now.Before(c.nextRunAt) { return false } diff --git a/controller/controller_test.go b/controller/controller_test.go index 7fa83f5019..e95aa9802e 100644 --- a/controller/controller_test.go +++ b/controller/controller_test.go @@ -278,15 +278,17 @@ func valueFromMetric(metric prometheus.Gauge) uint64 { } func TestShouldRunOnce(t *testing.T) { - ctrl := &Controller{Interval: 10 * time.Minute, MinEventSyncInterval: 5 * time.Second} + ctrl := &Controller{Interval: 10 * time.Minute, MinEventSyncInterval: 15 * time.Second} now := time.Now() // First run of Run loop should execute RunOnce assert.True(t, ctrl.ShouldRunOnce(now)) + assert.Equal(t, now.Add(10*time.Minute), ctrl.nextRunAt) // Second run should not assert.False(t, ctrl.ShouldRunOnce(now)) + ctrl.lastRunAt = now now = now.Add(10 * time.Second) // Changes happen in ingresses or services @@ -316,12 +318,17 @@ func TestShouldRunOnce(t *testing.T) { assert.False(t, ctrl.ShouldRunOnce(now)) // Multiple ingresses or services changes, closer than MinInterval from each other + ctrl.lastRunAt = now firstChangeTime := now secondChangeTime := firstChangeTime.Add(time.Second) // First change ctrl.ScheduleRunOnce(firstChangeTime) // Second change ctrl.ScheduleRunOnce(secondChangeTime) + + // Executions should be spaced by at least MinEventSyncInterval + assert.False(t, ctrl.ShouldRunOnce(now.Add(5*time.Second))) + // Should not postpone the reconciliation further than firstChangeTime + MinInterval now = now.Add(ctrl.MinEventSyncInterval) assert.True(t, ctrl.ShouldRunOnce(now))