From 089744c6ff136d80e1ddc07ac0a985b40fee9b5e Mon Sep 17 00:00:00 2001 From: Thibault Jamet Date: Mon, 8 Jul 2024 09:19:12 +0200 Subject: [PATCH] Add cache at provider level **Description** In the current implementation, DNS providers are called to list all records on every loop. This is expensive in terms of number of requests to the provider and may result in being rate limited, as reported in 1293 and 3397. In our case, we have approximately 20,000 records in our AWS Hosted Zone. The ListResourceRecordSets API call allows a maximum of 300 items per call. That requires 67 API calls per external-dns deployment during every sync period With this, we introduce an optional generic caching mechanism at the provider level, that re-uses the latest known list of records for a given time. This prevents from expensive Provider calls to list all records for each object modification that does not change the actual record (annotations, statuses, ingress routing, ...) This introduces 2 trade-offs: 1. Any changes or corruption directly on the provider side will be longer to detect and to resolve, up to the cache time 2. Any conflicting records in the DNS provider (such as a different external-dns instance) injected during the cache validity will cause the first iteration of the next reconcile loop to fail, and hence add a delay until the next retry **Checklist** - [X] Unit tests updated - [X] End user documentation updated Change-Id: I0bdcfa994ac1b76acedb05d458a97c080284c5aa --- docs/tutorials/aws.md | 2 + main.go | 7 ++ pkg/apis/externaldns/types.go | 3 + provider/cached_provider.go | 73 ++++++++++++++ provider/cached_provider_test.go | 164 +++++++++++++++++++++++++++++++ 5 files changed, 249 insertions(+) create mode 100644 provider/cached_provider.go create mode 100644 provider/cached_provider_test.go diff --git a/docs/tutorials/aws.md b/docs/tutorials/aws.md index 214fa4ca9b..532dd45e33 100644 --- a/docs/tutorials/aws.md +++ b/docs/tutorials/aws.md @@ -912,6 +912,8 @@ Route53 has a [5 API requests per second per account hard quota](https://docs.aw Running several fast polling ExternalDNS instances in a given account can easily hit that limit. Some ways to reduce the request rate include: * Reduce the polling loop's synchronization interval at the possible cost of slower change propagation (but see `--events` below to reduce the impact). * `--interval=5m` (default `1m`) +* Cache the results of the zone at the possible cost of slower propagation when the zone gets modified from other sources + * `--provider-cache-time=15m` (default `0m`) * Trigger the polling loop on changes to K8s objects, rather than only at `interval` and ensure a minimum of time between events, to have responsive updates with long poll intervals * `--events` * `--min-event-sync-interval=5m` (default `5s`) diff --git a/main.go b/main.go index f05c46906f..05765845ff 100644 --- a/main.go +++ b/main.go @@ -401,6 +401,13 @@ func main() { os.Exit(0) } + if cfg.ProviderCacheTime > 0 { + p = &provider.CachedProvider{ + Provider: p, + RefreshDelay: cfg.ProviderCacheTime, + } + } + var r registry.Registry switch cfg.Registry { case "dynamodb": diff --git a/pkg/apis/externaldns/types.go b/pkg/apis/externaldns/types.go index 822993d093..05ba1f2e1b 100644 --- a/pkg/apis/externaldns/types.go +++ b/pkg/apis/externaldns/types.go @@ -67,6 +67,7 @@ type Config struct { AlwaysPublishNotReadyAddresses bool ConnectorSourceServer string Provider string + ProviderCacheTime int GoogleProject string GoogleBatchChangeSize int GoogleBatchChangeInterval time.Duration @@ -239,6 +240,7 @@ var defaultConfig = &Config{ PublishHostIP: false, ConnectorSourceServer: "localhost:8080", Provider: "", + ProviderCacheTime: 0, GoogleProject: "", GoogleBatchChangeSize: 1000, GoogleBatchChangeInterval: time.Second, @@ -456,6 +458,7 @@ func (cfg *Config) ParseFlags(args []string) error { // Flags related to providers providers := []string{"akamai", "alibabacloud", "aws", "aws-sd", "azure", "azure-dns", "azure-private-dns", "bluecat", "civo", "cloudflare", "coredns", "designate", "digitalocean", "dnsimple", "dyn", "exoscale", "gandi", "godaddy", "google", "ibmcloud", "inmemory", "linode", "ns1", "oci", "ovh", "pdns", "pihole", "plural", "rcodezero", "rdns", "rfc2136", "safedns", "scaleway", "skydns", "tencentcloud", "transip", "ultradns", "vinyldns", "vultr", "webhook"} app.Flag("provider", "The DNS provider where the DNS records will be created (required, options: "+strings.Join(providers, ", ")+")").Required().PlaceHolder("provider").EnumVar(&cfg.Provider, providers...) + app.Flag("provider-cache-time", "The time to cache the DNS provider record list requests.").Default(defaultConfig.ProviderCacheTime.String()).DurationVar(&cfg.ProviderCacheTime) app.Flag("domain-filter", "Limit possible target zones by a domain suffix; specify multiple times for multiple domains (optional)").Default("").StringsVar(&cfg.DomainFilter) app.Flag("exclude-domains", "Exclude subdomains (optional)").Default("").StringsVar(&cfg.ExcludeDomains) app.Flag("regex-domain-filter", "Limit possible domains and target zones by a Regex filter; Overrides domain-filter (optional)").Default(defaultConfig.RegexDomainFilter.String()).RegexpVar(&cfg.RegexDomainFilter) diff --git a/provider/cached_provider.go b/provider/cached_provider.go new file mode 100644 index 0000000000..e4b3652662 --- /dev/null +++ b/provider/cached_provider.go @@ -0,0 +1,73 @@ +package provider + +import ( + "context" + "time" + + "github.com/prometheus/client_golang/prometheus" + "sigs.k8s.io/external-dns/endpoint" + "sigs.k8s.io/external-dns/plan" +) + +var ( + cachedRecordsCallsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "external_dns", + Subsystem: "provider", + Name: "cache_records_calls", + Help: "Number of calls to the provider cache Records list.", + }, + []string{ + "from_cache", + }, + ) + cachedApplyChangesCallsTotal = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "external_dns", + Subsystem: "provider", + Name: "cache_apply_changes_calls", + Help: "Number of calls to the provider cache ApplyChanges.", + }, + ) +) + +type CachedProvider struct { + Provider + RefreshDelay time.Duration + err error + lastRead time.Time + cache []*endpoint.Endpoint +} + +func (c *CachedProvider) Records(ctx context.Context) ([]*endpoint.Endpoint, error) { + if c.needRefresh() { + c.cache, c.err = c.Provider.Records(ctx) + c.lastRead = time.Now() + cachedRecordsCallsTotal.WithLabelValues("false").Inc() + } else { + cachedRecordsCallsTotal.WithLabelValues("true").Inc() + } + return c.cache, c.err +} +func (c *CachedProvider) ApplyChanges(ctx context.Context, changes *plan.Changes) error { + c.Reset() + cachedApplyChangesCallsTotal.Inc() + return c.Provider.ApplyChanges(ctx, changes) +} + +func (c *CachedProvider) Reset() { + c.err = nil + c.cache = nil + c.lastRead = time.Time{} +} + +func (c *CachedProvider) needRefresh() bool { + if c.cache == nil || c.err != nil { + return true + } + return time.Now().After(c.lastRead.Add(c.RefreshDelay)) +} + +func init() { + prometheus.MustRegister(cachedRecordsCallsTotal) +} diff --git a/provider/cached_provider_test.go b/provider/cached_provider_test.go new file mode 100644 index 0000000000..653e8f8b58 --- /dev/null +++ b/provider/cached_provider_test.go @@ -0,0 +1,164 @@ +package provider + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "sigs.k8s.io/external-dns/endpoint" + "sigs.k8s.io/external-dns/plan" +) + +type testProviderFunc struct { + records func(ctx context.Context) ([]*endpoint.Endpoint, error) + applyChanges func(ctx context.Context, changes *plan.Changes) error + propertyValuesEqual func(name string, previous string, current string) bool + adjustEndpoints func(endpoints []*endpoint.Endpoint) []*endpoint.Endpoint + getDomainFilter func() endpoint.DomainFilterInterface +} + +func (p *testProviderFunc) Records(ctx context.Context) ([]*endpoint.Endpoint, error) { + return p.records(ctx) +} + +func (p *testProviderFunc) ApplyChanges(ctx context.Context, changes *plan.Changes) error { + return p.applyChanges(ctx, changes) +} + +func (p *testProviderFunc) PropertyValuesEqual(name string, previous string, current string) bool { + return p.propertyValuesEqual(name, previous, current) +} + +func (p *testProviderFunc) AdjustEndpoints(endpoints []*endpoint.Endpoint) []*endpoint.Endpoint { + return p.adjustEndpoints(endpoints) +} + +func (p *testProviderFunc) GetDomainFilter() endpoint.DomainFilterInterface { + return p.getDomainFilter() +} + +func recordsNotCalled(t *testing.T) func(ctx context.Context) ([]*endpoint.Endpoint, error) { + return func(ctx context.Context) ([]*endpoint.Endpoint, error) { + t.Errorf("unexpected call to Records") + return nil, nil + } +} + +func applyChangesNotCalled(t *testing.T) func(ctx context.Context, changes *plan.Changes) error { + return func(ctx context.Context, changes *plan.Changes) error { + t.Errorf("unexpected call to ApplyChanges") + return nil + } +} + +func propertyValuesEqualNotCalled(t *testing.T) func(name string, previous string, current string) bool { + return func(name string, previous string, current string) bool { + t.Errorf("unexpected call to PropertyValuesEqual") + return false + } +} + +func adjustEndpointsNotCalled(t *testing.T) func(endpoints []*endpoint.Endpoint) []*endpoint.Endpoint { + return func(endpoints []*endpoint.Endpoint) []*endpoint.Endpoint { + t.Errorf("unexpected call to AdjustEndpoints") + return endpoints + } +} + +func newTestProviderFunc(t *testing.T) *testProviderFunc { + return &testProviderFunc{ + records: recordsNotCalled(t), + applyChanges: applyChangesNotCalled(t), + propertyValuesEqual: propertyValuesEqualNotCalled(t), + adjustEndpoints: adjustEndpointsNotCalled(t), + } +} + +func TestCachedProviderCallsProviderOnFirstCall(t *testing.T) { + testProvider := newTestProviderFunc(t) + testProvider.records = func(ctx context.Context) ([]*endpoint.Endpoint, error) { + return []*endpoint.Endpoint{{DNSName: "domain.fqdn"}}, nil + } + provider := CachedProvider{ + Provider: testProvider, + } + endpoints, err := provider.Records(context.Background()) + assert.NoError(t, err) + require.NotNil(t, endpoints) + require.Len(t, endpoints, 1) + require.NotNil(t, endpoints[0]) + assert.Equal(t, "domain.fqdn", endpoints[0].DNSName) +} + +func TestCachedProviderUsesCacheWhileValid(t *testing.T) { + testProvider := newTestProviderFunc(t) + testProvider.records = func(ctx context.Context) ([]*endpoint.Endpoint, error) { + return []*endpoint.Endpoint{{DNSName: "domain.fqdn"}}, nil + } + provider := CachedProvider{ + RefreshDelay: 30 * time.Second, + Provider: testProvider, + } + _, err := provider.Records(context.Background()) + require.NoError(t, err) + + t.Run("With consecutive calls within the caching time frame", func(t *testing.T) { + testProvider.records = recordsNotCalled(t) + endpoints, err := provider.Records(context.Background()) + assert.NoError(t, err) + require.NotNil(t, endpoints) + require.Len(t, endpoints, 1) + require.NotNil(t, endpoints[0]) + assert.Equal(t, "domain.fqdn", endpoints[0].DNSName) + }) + + t.Run("When the caching time frame is exceeded", func(t *testing.T) { + testProvider.records = func(ctx context.Context) ([]*endpoint.Endpoint, error) { + return []*endpoint.Endpoint{{DNSName: "new.domain.fqdn"}}, nil + } + provider.lastRead = time.Now().Add(-20 * time.Minute) + endpoints, err := provider.Records(context.Background()) + assert.NoError(t, err) + require.NotNil(t, endpoints) + require.Len(t, endpoints, 1) + require.NotNil(t, endpoints[0]) + assert.Equal(t, "new.domain.fqdn", endpoints[0].DNSName) + }) +} + +func TestCachedProviderForcesCacheRefreshOnUpdate(t *testing.T) { + testProvider := newTestProviderFunc(t) + testProvider.records = func(ctx context.Context) ([]*endpoint.Endpoint, error) { + return []*endpoint.Endpoint{{DNSName: "domain.fqdn"}}, nil + } + provider := CachedProvider{ + RefreshDelay: 30 * time.Second, + Provider: testProvider, + } + _, err := provider.Records(context.Background()) + require.NoError(t, err) + + t.Run("When the caching time frame is exceeded", func(t *testing.T) { + testProvider.records = recordsNotCalled(t) + testProvider.applyChanges = func(ctx context.Context, changes *plan.Changes) error { + return nil + } + err := provider.ApplyChanges(context.Background(), &plan.Changes{}) + assert.NoError(t, err) + t.Run("Next call to Records is not cached", func(t *testing.T) { + testProvider.applyChanges = applyChangesNotCalled(t) + testProvider.records = func(ctx context.Context) ([]*endpoint.Endpoint, error) { + return []*endpoint.Endpoint{{DNSName: "new.domain.fqdn"}}, nil + } + endpoints, err := provider.Records(context.Background()) + + assert.NoError(t, err) + require.NotNil(t, endpoints) + require.Len(t, endpoints, 1) + require.NotNil(t, endpoints[0]) + assert.Equal(t, "new.domain.fqdn", endpoints[0].DNSName) + }) + }) +}