Skip to content

Commit

Permalink
Improve MinEventInterval compliance with docs
Browse files Browse the repository at this point in the history
**Description**

In the command line arguments, we see `min-event-sync-interval` as
"The minimum interval between two consecutive synchronizations triggered from kubernetes events"

In the code, it actually acts a different way.

It imposes a certain dealy between syncs.
While this is compliant with the "minimum delay between 2 consecutive
synchronizations", it has side-effects in case of large delays.

In particular, when trying to fine-tune external-dns to match the
provider rate-limits.

In this case, it may be interesting to restrict the rate of reconciling
actions happening by having a high `min-event-sync-interval`, while
keeping a low latency for initial events.

This would allow to maximise the bulk effect of high change rate while
keeping fast enough reaction for isolated changes.

**Checklist**

- [X] Unit tests updated
- [X] End user documentation updated
> End user documentation matches the updated behaviour with more
> accuracy
Change-Id: Ibcea707974a095a2d5861a3974b4c79e5a15b00e
  • Loading branch information
tjamet committed Aug 6, 2024
1 parent 3b35ea6 commit 2007a49
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 12 deletions.
48 changes: 37 additions & 11 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,10 @@ type Controller struct {
DomainFilter endpoint.DomainFilter
// The nextRunAt used for throttling and batching reconciliation
nextRunAt time.Time
// The nextRunAtMux is for atomic updating of nextRunAt
nextRunAtMux sync.Mutex
// The runAtMutex is for atomic updating of nextRunAt and lastRunAt
runAtMutex sync.Mutex
// The lastRunAt used for throttling and batching reconciliation
lastRunAt time.Time
// MangedRecordTypes are DNS record types that will be considered for management.
ManagedRecordTypes []string
// ExcludeRecordTypes are DNS record types that will be excluded from management.
Expand All @@ -203,6 +205,10 @@ type Controller struct {
func (c *Controller) RunOnce(ctx context.Context) error {
lastReconcileTimestamp.SetToCurrentTime()

c.runAtMutex.Lock()
c.lastRunAt = time.Now()
c.runAtMutex.Unlock()

records, err := c.Registry.Records(ctx)
if err != nil {
registryErrorsTotal.Inc()
Expand Down Expand Up @@ -264,6 +270,24 @@ func (c *Controller) RunOnce(ctx context.Context) error {
return nil
}

func earliest(r time.Time, times ...time.Time) time.Time {
for _, t := range times {
if t.Before(r) {
r = t
}
}
return r
}

func latest(r time.Time, times ...time.Time) time.Time {
for _, t := range times {
if t.After(r) {
r = t
}
}
return r
}

// Counts the intersections of A and AAAA records in endpoint and registry.
func countMatchingAddressRecords(endpoints []*endpoint.Endpoint, registryRecords []*endpoint.Endpoint) (int, int) {
recordsMap := make(map[string]map[string]struct{})
Expand Down Expand Up @@ -306,18 +330,20 @@ func countAddressRecords(endpoints []*endpoint.Endpoint) (int, int) {

// ScheduleRunOnce makes sure execution happens at most once per interval.
func (c *Controller) ScheduleRunOnce(now time.Time) {
c.nextRunAtMux.Lock()
defer c.nextRunAtMux.Unlock()
// schedule only if a reconciliation is not already planned
// to happen in the following c.MinEventSyncInterval
if !c.nextRunAt.Before(now.Add(c.MinEventSyncInterval)) {
c.nextRunAt = now.Add(c.MinEventSyncInterval)
}
c.runAtMutex.Lock()
defer c.runAtMutex.Unlock()
c.nextRunAt = latest(
c.lastRunAt.Add(c.MinEventSyncInterval),
earliest(
now.Add(5*time.Second),
c.nextRunAt,
),
)
}

func (c *Controller) ShouldRunOnce(now time.Time) bool {
c.nextRunAtMux.Lock()
defer c.nextRunAtMux.Unlock()
c.runAtMutex.Lock()
defer c.runAtMutex.Unlock()
if now.Before(c.nextRunAt) {
return false
}
Expand Down
9 changes: 8 additions & 1 deletion controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,15 +278,17 @@ func valueFromMetric(metric prometheus.Gauge) uint64 {
}

func TestShouldRunOnce(t *testing.T) {
ctrl := &Controller{Interval: 10 * time.Minute, MinEventSyncInterval: 5 * time.Second}
ctrl := &Controller{Interval: 10 * time.Minute, MinEventSyncInterval: 15 * time.Second}

now := time.Now()

// First run of Run loop should execute RunOnce
assert.True(t, ctrl.ShouldRunOnce(now))
assert.Equal(t, now.Add(10*time.Minute), ctrl.nextRunAt)

// Second run should not
assert.False(t, ctrl.ShouldRunOnce(now))
ctrl.lastRunAt = now

now = now.Add(10 * time.Second)
// Changes happen in ingresses or services
Expand Down Expand Up @@ -316,12 +318,17 @@ func TestShouldRunOnce(t *testing.T) {
assert.False(t, ctrl.ShouldRunOnce(now))

// Multiple ingresses or services changes, closer than MinInterval from each other
ctrl.lastRunAt = now
firstChangeTime := now
secondChangeTime := firstChangeTime.Add(time.Second)
// First change
ctrl.ScheduleRunOnce(firstChangeTime)
// Second change
ctrl.ScheduleRunOnce(secondChangeTime)

// Executions should be spaced by at least MinEventSyncInterval
assert.False(t, ctrl.ShouldRunOnce(now.Add(5*time.Second)))

// Should not postpone the reconciliation further than firstChangeTime + MinInterval
now = now.Add(ctrl.MinEventSyncInterval)
assert.True(t, ctrl.ShouldRunOnce(now))
Expand Down

0 comments on commit 2007a49

Please sign in to comment.