Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/flags.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
| `--[no-]version` | Show application version. |
| `--server=""` | The Kubernetes API server to connect to (default: auto-detect) |
| `--kubeconfig=""` | Retrieve target cluster configuration from a Kubernetes configuration file (default: auto-detect) |
| `--kube-api-cache-sync-timeout=1m0s` | Timeout for waiting for Kubernetes informer caches to sync during startup. Values <= 0 use the default (60s). Increase only after ruling out RBAC, network, or API server issues. |
| `--request-timeout=30s` | Request timeout when calling Kubernetes APIs. 0s means no timeout |
| `--[no-]resolve-service-load-balancer-hostname` | Resolve the hostname of LoadBalancer-type Service object to IP addresses in order to create DNS A/AAAA records instead of CNAMEs |
| `--[no-]listen-endpoint-events` | Trigger a reconcile on changes to EndpointSlices, for Service source (default: false) |
Expand Down
3 changes: 1 addition & 2 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,8 @@ markdown_extensions:
- extra
- admonition
- smarty
- sane_lists
- nl2br
- mdx_truly_sane_lists:
nested_indent: 2
- attr_list
- def_list
- footnotes
Expand Down
9 changes: 6 additions & 3 deletions pkg/apis/externaldns/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ const (

// Config is a project-wide configuration
type Config struct {
APIServerURL string
KubeConfig string
RequestTimeout time.Duration
APIServerURL string
KubeConfig string
CacheSyncTimeout time.Duration
RequestTimeout time.Duration
DefaultTargets []string
GlooNamespaces []string
SkipperRouteGroupVersion string
Expand Down Expand Up @@ -338,6 +339,7 @@ var defaultConfig = &Config{
RegexDomainExclude: regexp.MustCompile(""),
RegexDomainFilter: regexp.MustCompile(""),
Registry: "txt",
CacheSyncTimeout: time.Second * 60,
RequestTimeout: time.Second * 30,
RFC2136BatchChangeSize: 50,
RFC2136GSSTSIG: false,
Expand Down Expand Up @@ -493,6 +495,7 @@ func bindFlags(b flags.FlagBinder, cfg *Config) {
// Flags related to Kubernetes
b.StringVar("server", "The Kubernetes API server to connect to (default: auto-detect)", defaultConfig.APIServerURL, &cfg.APIServerURL)
b.StringVar("kubeconfig", "Retrieve target cluster configuration from a Kubernetes configuration file (default: auto-detect)", defaultConfig.KubeConfig, &cfg.KubeConfig)
b.DurationVar("kube-api-cache-sync-timeout", "Timeout for waiting for Kubernetes informer caches to sync during startup. Values <= 0 use the default (60s). Increase only after ruling out RBAC, network, or API server issues.", defaultConfig.CacheSyncTimeout, &cfg.CacheSyncTimeout)
b.DurationVar("request-timeout", "Request timeout when calling Kubernetes APIs. 0s means no timeout", defaultConfig.RequestTimeout, &cfg.RequestTimeout)
b.BoolVar("resolve-service-load-balancer-hostname", "Resolve the hostname of LoadBalancer-type Service object to IP addresses in order to create DNS A/AAAA records instead of CNAMEs", false, &cfg.ResolveServiceLoadBalancerHostname)
b.BoolVar("listen-endpoint-events", "Trigger a reconcile on changes to EndpointSlices, for Service source (default: false)", false, &cfg.ListenEndpointEvents)
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/externaldns/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ var (
minimalConfig = &Config{
APIServerURL: "",
KubeConfig: "",
CacheSyncTimeout: time.Second * 60,
RequestTimeout: time.Second * 30,
GlooNamespaces: []string{"gloo-system"},
SkipperRouteGroupVersion: "zalando.org/v1",
Expand Down Expand Up @@ -139,6 +140,7 @@ var (
overriddenConfig = &Config{
APIServerURL: "http://127.0.0.1:8080",
KubeConfig: "/some/path",
CacheSyncTimeout: time.Second * 60,
RequestTimeout: time.Second * 77,
GlooNamespaces: []string{"gloo-not-system", "gloo-second-system"},
SkipperRouteGroupVersion: "zalando.org/v2",
Expand Down
4 changes: 3 additions & 1 deletion source/ambassador_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"sort"
"strings"
"time"

ambassador "github.com/datawire/ambassador/pkg/api/getambassador.io/v2"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -83,6 +84,7 @@ func NewAmbassadorHostSource(
namespace string,
annotationFilter string,
labelSelector labels.Selector,
timeout time.Duration,
) (Source, error) {
// Use shared informer to listen for add/update/delete of Host in the specified namespace.
// Set resync period to 0, to prevent processing when nothing has changed.
Expand All @@ -100,7 +102,7 @@ func NewAmbassadorHostSource(
informerFactory.Start(ctx.Done())

// wait for the local cache to be populated.
if err := informers.WaitForDynamicCacheSync(ctx, informerFactory); err != nil {
if err := informers.WaitForDynamicCacheSync(ctx, informerFactory, timeout); err != nil {
return nil, err
}

Expand Down
3 changes: 2 additions & 1 deletion source/ambassador_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"testing"
"time"

ambassador "github.com/datawire/ambassador/pkg/api/getambassador.io/v2"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -637,7 +638,7 @@ func TestAmbassadorHostSource(t *testing.T) {
_, err = fakeDynamicClient.Resource(ambHostGVR).Namespace(namespace).Create(context.Background(), host, metav1.CreateOptions{})
assert.NoError(t, err)

source, err := NewAmbassadorHostSource(context.TODO(), fakeDynamicClient, fakeKubernetesClient, namespace, ti.annotationFilter, ti.labelSelector)
source, err := NewAmbassadorHostSource(context.TODO(), fakeDynamicClient, fakeKubernetesClient, namespace, ti.annotationFilter, ti.labelSelector, time.Duration(0))
assert.NoError(t, err)
assert.NotNil(t, source)

Expand Down
4 changes: 3 additions & 1 deletion source/contour_httpproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"sort"
"text/template"
"time"

projectcontour "github.com/projectcontour/contour/apis/projectcontour/v1"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -69,6 +70,7 @@ func NewContourHTTPProxySource(
fqdnTemplate string,
combineFqdnAnnotation bool,
ignoreHostnameAnnotation bool,
timeout time.Duration,
) (Source, error) {
tmpl, err := fqdn.ParseTemplate(fqdnTemplate)
if err != nil {
Expand All @@ -91,7 +93,7 @@ func NewContourHTTPProxySource(
informerFactory.Start(ctx.Done())

// wait for the local cache to be populated.
if err := informers.WaitForDynamicCacheSync(ctx, informerFactory); err != nil {
if err := informers.WaitForDynamicCacheSync(ctx, informerFactory, timeout); err != nil {
return nil, err
}

Expand Down
5 changes: 5 additions & 0 deletions source/contour_httpproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"testing"
"time"

fakeDynamic "k8s.io/client-go/dynamic/fake"

Expand Down Expand Up @@ -97,6 +98,7 @@ func (suite *HTTPProxySuite) SetupTest() {
"{{.Name}}",
false,
false,
time.Duration(0),
)
suite.NoError(err, "should initialize httpproxy source")

Expand Down Expand Up @@ -194,6 +196,7 @@ func TestNewContourHTTPProxySource(t *testing.T) {
ti.fqdnTemplate,
ti.combineFQDNAndAnnotation,
false,
time.Duration(0),
)
if ti.expectError {
assert.Error(t, err)
Expand Down Expand Up @@ -1062,6 +1065,7 @@ func testHTTPProxyEndpoints(t *testing.T) {
ti.fqdnTemplate,
ti.combineFQDNAndAnnotation,
ti.ignoreHostnameAnnotation,
time.Duration(0),
)
require.NoError(t, err)

Expand Down Expand Up @@ -1089,6 +1093,7 @@ func newTestHTTPProxySource() (*httpProxySource, error) {
"{{.Name}}",
false,
false,
time.Duration(0),
)
if err != nil {
return nil, err
Expand Down
18 changes: 16 additions & 2 deletions source/crd.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
"fmt"
"os"
"strings"
"time"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"

"sigs.k8s.io/external-dns/source/annotations"
"sigs.k8s.io/external-dns/source/informers"

log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -63,7 +65,7 @@ type crdSource struct {
}

// NewCRDClientForAPIVersionKind return rest client for the given apiVersion and kind of the CRD
func NewCRDClientForAPIVersionKind(client kubernetes.Interface, kubeConfig, apiServerURL, apiVersion, kind string) (*rest.RESTClient, *runtime.Scheme, error) {
func NewCRDClientForAPIVersionKind(client kubernetes.Interface, kubeConfig, apiServerURL, apiVersion, kind string, timeout time.Duration) (*rest.RESTClient, *runtime.Scheme, error) {
if kubeConfig == "" {
if _, err := os.Stat(clientcmd.RecommendedHomeFile); err == nil {
kubeConfig = clientcmd.RecommendedHomeFile
Expand Down Expand Up @@ -101,6 +103,7 @@ func NewCRDClientForAPIVersionKind(client kubernetes.Interface, kubeConfig, apiS
config.GroupVersion = &groupVersion
config.APIPath = "/apis"
config.NegotiatedSerializer = serializer.WithoutConversionCodecFactory{CodecFactory: serializer.NewCodecFactory(scheme)}
config.Timeout = timeout

crdClient, err := rest.UnversionedRESTClientFor(config)
if err != nil {
Expand All @@ -115,7 +118,8 @@ func NewCRDSource(
namespace, kind, annotationFilter string,
labelSelector labels.Selector,
scheme *runtime.Scheme,
startInformer bool) (Source, error) {
startInformer bool,
cacheSyncTimeout time.Duration) (Source, error) {
sourceCrd := crdSource{
crdResource: strings.ToLower(kind) + "s",
namespace: namespace,
Expand All @@ -139,6 +143,16 @@ func NewCRDSource(
&apiv1alpha1.DNSEndpoint{},
0)
go sourceCrd.informer.Run(wait.NeverStop)

// Wait for the informer cache to sync before returning.
if cacheSyncTimeout <= 0 {
cacheSyncTimeout = time.Duration(informers.DefaultCacheSyncTimeout) * time.Second
}
ctx, cancel := context.WithTimeout(context.Background(), cacheSyncTimeout)
defer cancel()
if !cache.WaitForCacheSync(ctx.Done(), sourceCrd.informer.HasSynced) {
return nil, fmt.Errorf("CRD informer cache sync timed out after %s", cacheSyncTimeout)
}
}
return &sourceCrd, nil
}
Expand Down
2 changes: 1 addition & 1 deletion source/crd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ func testCRDSourceEndpoints(t *testing.T) {
// At present, client-go's fake.RESTClient (used by crd_test.go) is known to cause race conditions when used
// with informers: https://github.com/kubernetes/kubernetes/issues/95372
// So don't start the informer during testing.
cs, err := NewCRDSource(restClient, ti.namespace, ti.kind, ti.annotationFilter, labelSelector, scheme, false)
cs, err := NewCRDSource(restClient, ti.namespace, ti.kind, ti.annotationFilter, labelSelector, scheme, false, 0)
require.NoError(t, err)

receivedEndpoints, err := cs.Endpoints(t.Context())
Expand Down
4 changes: 0 additions & 4 deletions source/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,6 @@ func EndpointsForHostname(hostname string, targets endpoint.Targets, ttl endpoin
return endpoints
}

// EndpointTargetsFromServices retrieves endpoint targets from services in a given namespace
// that match the specified selector. It returns external IPs or load balancer addresses.
//
// TODO: add support for service.Spec.Ports (type NodePort) and service.Spec.ClusterIPs (type ClusterIP)
func EndpointTargetsFromServices(svcInformer coreinformers.ServiceInformer, namespace string, selector map[string]string) (endpoint.Targets, error) {
targets := endpoint.Targets{}

Expand Down
4 changes: 3 additions & 1 deletion source/f5_transportserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"strings"
"time"

log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -72,6 +73,7 @@ func NewF5TransportServerSource(
kubeClient kubernetes.Interface,
namespace string,
annotationFilter string,
timeout time.Duration,
) (Source, error) {
informerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicKubeClient, 0, namespace, nil)
transportServerInformer := informerFactory.ForResource(f5TransportServerGVR)
Expand All @@ -86,7 +88,7 @@ func NewF5TransportServerSource(
informerFactory.Start(ctx.Done())

// wait for the local cache to be populated.
if err := informers.WaitForDynamicCacheSync(ctx, informerFactory); err != nil {
if err := informers.WaitForDynamicCacheSync(ctx, informerFactory, timeout); err != nil {
return nil, err
}

Expand Down
3 changes: 2 additions & 1 deletion source/f5_transportserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -345,7 +346,7 @@ func TestF5TransportServerEndpoints(t *testing.T) {
_, err = fakeDynamicClient.Resource(f5TransportServerGVR).Namespace(defaultF5TransportServerNamespace).Create(context.Background(), &transportServer, metav1.CreateOptions{})
assert.NoError(t, err)

source, err := NewF5TransportServerSource(context.TODO(), fakeDynamicClient, fakeKubernetesClient, defaultF5TransportServerNamespace, tc.annotationFilter)
source, err := NewF5TransportServerSource(context.TODO(), fakeDynamicClient, fakeKubernetesClient, defaultF5TransportServerNamespace, tc.annotationFilter, time.Duration(0))
require.NoError(t, err)
assert.NotNil(t, source)

Expand Down
4 changes: 3 additions & 1 deletion source/f5_virtualserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"sort"
"strings"
"time"

log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -72,6 +73,7 @@ func NewF5VirtualServerSource(
kubeClient kubernetes.Interface,
namespace string,
annotationFilter string,
timeout time.Duration,
) (Source, error) {
informerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicKubeClient, 0, namespace, nil)
virtualServerInformer := informerFactory.ForResource(f5VirtualServerGVR)
Expand All @@ -86,7 +88,7 @@ func NewF5VirtualServerSource(
informerFactory.Start(ctx.Done())

// wait for the local cache to be populated.
if err := informers.WaitForDynamicCacheSync(ctx, informerFactory); err != nil {
if err := informers.WaitForDynamicCacheSync(ctx, informerFactory, timeout); err != nil {
return nil, err
}

Expand Down
3 changes: 2 additions & 1 deletion source/f5_virtualserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -588,7 +589,7 @@ func TestF5VirtualServerEndpoints(t *testing.T) {
_, err = fakeDynamicClient.Resource(f5VirtualServerGVR).Namespace(defaultF5VirtualServerNamespace).Create(context.Background(), &virtualServer, metav1.CreateOptions{})
assert.NoError(t, err)

source, err := NewF5VirtualServerSource(context.TODO(), fakeDynamicClient, fakeKubernetesClient, defaultF5VirtualServerNamespace, tc.annotationFilter)
source, err := NewF5VirtualServerSource(context.TODO(), fakeDynamicClient, fakeKubernetesClient, defaultF5VirtualServerNamespace, tc.annotationFilter, time.Duration(0))
require.NoError(t, err)
assert.NotNil(t, source)

Expand Down
13 changes: 7 additions & 6 deletions source/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -195,19 +196,19 @@ func newGatewayRouteSource(
nsInformer := kubeInformerFactory.Core().V1().Namespaces() // TODO: Namespace informer should be shared across gateway sources.
nsInformer.Informer() // Register with factory before starting.

informerFactory.Start(ctx.Done())
kubeInformerFactory.Start(ctx.Done())
informerFactory.Start(wait.NeverStop)
kubeInformerFactory.Start(wait.NeverStop)
if rtInformerFactory != informerFactory {
rtInformerFactory.Start(ctx.Done())
rtInformerFactory.Start(wait.NeverStop)

if err := informers.WaitForCacheSync(ctx, rtInformerFactory); err != nil {
if err := informers.WaitForCacheSync(ctx, rtInformerFactory, config.CacheSyncTimeout); err != nil {
return nil, err
}
}
if err := informers.WaitForCacheSync(ctx, informerFactory); err != nil {
if err := informers.WaitForCacheSync(ctx, informerFactory, config.CacheSyncTimeout); err != nil {
return nil, err
}
if err := informers.WaitForCacheSync(ctx, kubeInformerFactory); err != nil {
if err := informers.WaitForCacheSync(ctx, kubeInformerFactory, config.CacheSyncTimeout); err != nil {
return nil, err
}

Expand Down
7 changes: 4 additions & 3 deletions source/gloo_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"maps"
"strings"
"time"

log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -145,7 +146,7 @@ type glooSource struct {

// NewGlooSource creates a new glooSource with the given config
func NewGlooSource(ctx context.Context, dynamicKubeClient dynamic.Interface, kubeClient kubernetes.Interface,
glooNamespaces []string) (Source, error) {
glooNamespaces []string, timeout time.Duration) (Source, error) {
informerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, 0)
serviceInformer := informerFactory.Core().V1().Services()
ingressInformer := informerFactory.Networking().V1().Ingresses()
Expand All @@ -165,10 +166,10 @@ func NewGlooSource(ctx context.Context, dynamicKubeClient dynamic.Interface, kub

informerFactory.Start(ctx.Done())
dynamicInformerFactory.Start(ctx.Done())
if err := informers.WaitForCacheSync(ctx, informerFactory); err != nil {
if err := informers.WaitForCacheSync(ctx, informerFactory, timeout); err != nil {
return nil, err
}
if err := informers.WaitForDynamicCacheSync(ctx, dynamicInformerFactory); err != nil {
if err := informers.WaitForDynamicCacheSync(ctx, dynamicInformerFactory, timeout); err != nil {
return nil, err
}

Expand Down
Loading