Skip to content

Commit

Permalink
Merge pull request #3400 from tjamet/min-event-sync
Browse files Browse the repository at this point in the history
Improve MinEventInterval compliance with docs
  • Loading branch information
k8s-ci-robot authored Aug 13, 2024
2 parents 8f62fa9 + 1b5ed44 commit c875e65
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 13 deletions.
48 changes: 37 additions & 11 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,10 @@ type Controller struct {
DomainFilter endpoint.DomainFilter
// The nextRunAt used for throttling and batching reconciliation
nextRunAt time.Time
// The nextRunAtMux is for atomic updating of nextRunAt
nextRunAtMux sync.Mutex
// The runAtMutex is for atomic updating of nextRunAt and lastRunAt
runAtMutex sync.Mutex
// The lastRunAt used for throttling and batching reconciliation
lastRunAt time.Time
// MangedRecordTypes are DNS record types that will be considered for management.
ManagedRecordTypes []string
// ExcludeRecordTypes are DNS record types that will be excluded from management.
Expand All @@ -203,6 +205,10 @@ type Controller struct {
func (c *Controller) RunOnce(ctx context.Context) error {
lastReconcileTimestamp.SetToCurrentTime()

c.runAtMutex.Lock()
c.lastRunAt = time.Now()
c.runAtMutex.Unlock()

records, err := c.Registry.Records(ctx)
if err != nil {
registryErrorsTotal.Inc()
Expand Down Expand Up @@ -264,6 +270,24 @@ func (c *Controller) RunOnce(ctx context.Context) error {
return nil
}

func earliest(r time.Time, times ...time.Time) time.Time {
for _, t := range times {
if t.Before(r) {
r = t
}
}
return r
}

func latest(r time.Time, times ...time.Time) time.Time {
for _, t := range times {
if t.After(r) {
r = t
}
}
return r
}

// Counts the intersections of A and AAAA records in endpoint and registry.
func countMatchingAddressRecords(endpoints []*endpoint.Endpoint, registryRecords []*endpoint.Endpoint) (int, int) {
recordsMap := make(map[string]map[string]struct{})
Expand Down Expand Up @@ -306,18 +330,20 @@ func countAddressRecords(endpoints []*endpoint.Endpoint) (int, int) {

// ScheduleRunOnce makes sure execution happens at most once per interval.
func (c *Controller) ScheduleRunOnce(now time.Time) {
c.nextRunAtMux.Lock()
defer c.nextRunAtMux.Unlock()
// schedule only if a reconciliation is not already planned
// to happen in the following c.MinEventSyncInterval
if !c.nextRunAt.Before(now.Add(c.MinEventSyncInterval)) {
c.nextRunAt = now.Add(c.MinEventSyncInterval)
}
c.runAtMutex.Lock()
defer c.runAtMutex.Unlock()
c.nextRunAt = latest(
c.lastRunAt.Add(c.MinEventSyncInterval),
earliest(
now.Add(5*time.Second),
c.nextRunAt,
),
)
}

func (c *Controller) ShouldRunOnce(now time.Time) bool {
c.nextRunAtMux.Lock()
defer c.nextRunAtMux.Unlock()
c.runAtMutex.Lock()
defer c.runAtMutex.Unlock()
if now.Before(c.nextRunAt) {
return false
}
Expand Down
9 changes: 8 additions & 1 deletion controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,15 +278,17 @@ func valueFromMetric(metric prometheus.Gauge) uint64 {
}

func TestShouldRunOnce(t *testing.T) {
ctrl := &Controller{Interval: 10 * time.Minute, MinEventSyncInterval: 5 * time.Second}
ctrl := &Controller{Interval: 10 * time.Minute, MinEventSyncInterval: 15 * time.Second}

now := time.Now()

// First run of Run loop should execute RunOnce
assert.True(t, ctrl.ShouldRunOnce(now))
assert.Equal(t, now.Add(10*time.Minute), ctrl.nextRunAt)

// Second run should not
assert.False(t, ctrl.ShouldRunOnce(now))
ctrl.lastRunAt = now

now = now.Add(10 * time.Second)
// Changes happen in ingresses or services
Expand Down Expand Up @@ -316,12 +318,17 @@ func TestShouldRunOnce(t *testing.T) {
assert.False(t, ctrl.ShouldRunOnce(now))

// Multiple ingresses or services changes, closer than MinInterval from each other
ctrl.lastRunAt = now
firstChangeTime := now
secondChangeTime := firstChangeTime.Add(time.Second)
// First change
ctrl.ScheduleRunOnce(firstChangeTime)
// Second change
ctrl.ScheduleRunOnce(secondChangeTime)

// Executions should be spaced by at least MinEventSyncInterval
assert.False(t, ctrl.ShouldRunOnce(now.Add(5*time.Second)))

// Should not postpone the reconciliation further than firstChangeTime + MinInterval
now = now.Add(ctrl.MinEventSyncInterval)
assert.True(t, ctrl.ShouldRunOnce(now))
Expand Down
3 changes: 2 additions & 1 deletion docs/tutorials/aws.md
Original file line number Diff line number Diff line change
Expand Up @@ -912,8 +912,9 @@ Route53 has a [5 API requests per second per account hard quota](https://docs.aw
Running several fast polling ExternalDNS instances in a given account can easily hit that limit. Some ways to reduce the request rate include:
* Reduce the polling loop's synchronization interval at the possible cost of slower change propagation (but see `--events` below to reduce the impact).
* `--interval=5m` (default `1m`)
* Trigger the polling loop on changes to K8s objects, rather than only at `interval`, to have responsive updates with long poll intervals
* Trigger the polling loop on changes to K8s objects, rather than only at `interval` and ensure a minimum of time between events, to have responsive updates with long poll intervals
* `--events`
* `--min-event-sync-interval=5m` (default `5s`)
* Limit the [sources watched](https://github.com/kubernetes-sigs/external-dns/blob/master/pkg/apis/externaldns/types.go#L364) when the `--events` flag is specified to specific types, namespaces, labels, or annotations
* `--source=ingress --source=service` - specify multiple times for multiple sources
* `--namespace=my-app`
Expand Down

0 comments on commit c875e65

Please sign in to comment.