Skip to content
Merged
20 changes: 7 additions & 13 deletions controller/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ func Execute() {
go serveMetrics(cfg.MetricsAddress)
go handleSigterm(cancel)

endpointsSource, err := buildSource(ctx, cfg)
sCfg := source.NewSourceConfig(cfg)
endpointsSource, err := buildSource(ctx, sCfg)
if err != nil {
log.Fatal(err) // nolint: gocritic // exitAfterDefer
}
Expand Down Expand Up @@ -168,6 +169,9 @@ func buildProvider(
zoneTypeFilter := provider.NewZoneTypeFilter(cfg.AWSZoneType)
zoneTagFilter := provider.NewZoneTagFilter(cfg.AWSZoneTagFilter)

// TODO: Controller focuses on orchestration, not provider construction
// TODO: refactor to move this to provider package, cover with tests
// TODO: example provider.SelectProvider(cfg, ...)
switch cfg.Provider {
case "akamai":
p, err = akamai.NewAkamaiProvider(
Expand Down Expand Up @@ -413,18 +417,8 @@ func configureLogger(cfg *externaldns.Config) {
// buildSource creates and configures the source(s) for endpoint discovery based on the provided configuration.
// It initializes the source configuration, generates the required sources, and combines them into a single,
// deduplicated source. Returns the combined source or an error if source creation fails.
func buildSource(ctx context.Context, cfg *externaldns.Config) (source.Source, error) {
sourceCfg := source.NewSourceConfig(cfg)
sources, err := source.ByNames(ctx, &source.SingletonClientGenerator{
KubeConfig: cfg.KubeConfig,
APIServerURL: cfg.APIServerURL,
RequestTimeout: func() time.Duration {
if cfg.UpdateEvents {
return 0
}
return cfg.RequestTimeout
}(),
}, cfg.Sources, sourceCfg)
func buildSource(ctx context.Context, cfg *source.Config) (source.Source, error) {
sources, err := source.ByNames(ctx, cfg, cfg.ClientGenerator())
if err != nil {
return nil, err
}
Expand Down
14 changes: 11 additions & 3 deletions controller/execute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/stretchr/testify/require"
"sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/pkg/apis/externaldns"
"sigs.k8s.io/external-dns/source"
)

// Logger
Expand Down Expand Up @@ -267,7 +268,7 @@ func TestBuildSourceWithWrappers(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := buildSource(t.Context(), tt.cfg)
_, err := buildSource(t.Context(), source.NewSourceConfig(tt.cfg))
require.NoError(t, err)
})
}
Expand Down Expand Up @@ -297,14 +298,21 @@ func TestHelperProcess(t *testing.T) {
// runExecuteSubprocess runs Execute in a separate process and returns exit code and output.
func runExecuteSubprocess(t *testing.T, args []string) (int, string, error) {
t.Helper()
// make sure the subprocess does not run forever
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

cmdArgs := append([]string{"-test.run=TestHelperProcess", "--"}, args...)
cmd := exec.Command(os.Args[0], cmdArgs...)
cmd := exec.CommandContext(ctx, os.Args[0], cmdArgs...)
cmd.Env = append(os.Environ(), "GO_WANT_HELPER_PROCESS=1")
var buf bytes.Buffer
cmd.Stdout = &buf
cmd.Stderr = &buf
err := cmd.Run()
output := buf.String()
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
return -1, output, ctx.Err()
}
if err == nil {
return 0, output, nil
}
Expand Down Expand Up @@ -440,7 +448,7 @@ func TestControllerRunCancelContextStopsLoop(t *testing.T) {
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
src, err := buildSource(ctx, cfg)
src, err := buildSource(ctx, source.NewSourceConfig(cfg))
require.NoError(t, err)
domainFilter := endpoint.NewDomainFilterWithOptions(
endpoint.WithDomainFilter(cfg.DomainFilter),
Expand Down
43 changes: 40 additions & 3 deletions source/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,16 @@ type Config struct {
TraefikDisableNew bool
ExcludeUnschedulable bool
ExposeInternalIPv6 bool
ExcludeTargetNets []string
Comment thread
ivankatliarchuk marked this conversation as resolved.
TargetNetFilter []string
NAT64Networks []string
MinTTL time.Duration

sources []string

// clientGen is lazily initialized on first access for efficiency
clientGen *SingletonClientGenerator
clientGenOnce sync.Once
}

func NewSourceConfig(cfg *externaldns.Config) *Config {
Expand Down Expand Up @@ -140,9 +150,36 @@ func NewSourceConfig(cfg *externaldns.Config) *Config {
TraefikDisableNew: cfg.TraefikDisableNew,
ExcludeUnschedulable: cfg.ExcludeUnschedulable,
ExposeInternalIPv6: cfg.ExposeInternalIPV6,
ExcludeTargetNets: cfg.ExcludeTargetNets,
TargetNetFilter: cfg.TargetNetFilter,
NAT64Networks: cfg.NAT64Networks,
MinTTL: cfg.MinTTL,
sources: cfg.Sources,
}
}

// ClientGenerator returns a SingletonClientGenerator from this Config's connection settings.
// The generator is created once and cached for subsequent calls.
// This ensures consistent Kubernetes client creation across all sources using this configuration.
//
// The timeout behavior is special-cased: when UpdateEvents is true, the timeout is set to 0
// (no timeout) to allow long-running watch operations for event-driven source updates.
func (cfg *Config) ClientGenerator() *SingletonClientGenerator {
cfg.clientGenOnce.Do(func() {
cfg.clientGen = &SingletonClientGenerator{
KubeConfig: cfg.KubeConfig,
APIServerURL: cfg.APIServerURL,
RequestTimeout: func() time.Duration {
if cfg.UpdateEvents {
return 0
}
return cfg.RequestTimeout
}(),
}
})
return cfg.clientGen
}

// ClientGenerator provides clients for various Kubernetes APIs and external services.
// This interface abstracts client creation and enables dependency injection for testing.
// It uses the singleton pattern to ensure only one instance of each client is created
Expand Down Expand Up @@ -251,9 +288,9 @@ func (p *SingletonClientGenerator) OpenShiftClient() (openshift.Interface, error
}

// ByNames returns multiple Sources given multiple names.
func ByNames(ctx context.Context, p ClientGenerator, names []string, cfg *Config) ([]Source, error) {
sources := []Source{}
for _, name := range names {
func ByNames(ctx context.Context, cfg *Config, p ClientGenerator) ([]Source, error) {
sources := make([]Source, 0, len(cfg.sources))
for _, name := range cfg.sources {
source, err := BuildWithConfig(ctx, name, p, cfg)
if err != nil {
return nil, err
Expand Down
84 changes: 71 additions & 13 deletions source/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"context"
"errors"
"testing"
"time"

openshift "github.com/openshift/client-go/route/clientset/versioned"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
istioclient "istio.io/client-go/pkg/clientset/versioned"
Expand Down Expand Up @@ -158,29 +160,35 @@ func (suite *ByNamesTestSuite) TestAllInitialized() {
}: "IngressRouteUDPList",
}), nil)

sources, err := ByNames(context.TODO(), mockClientGenerator, []string{
ss := []string{
types.Service, types.Ingress, types.IstioGateway, types.ContourHTTPProxy,
types.KongTCPIngress, types.F5VirtualServer, types.F5TransportServer, types.TraefikProxy, types.Fake,
}, &Config{})
}
sources, err := ByNames(context.TODO(), &Config{
sources: ss,
}, mockClientGenerator)
suite.NoError(err, "should not generate errors")
suite.Len(sources, 9, "should generate all nine sources")
}

func (suite *ByNamesTestSuite) TestOnlyFake() {
mockClientGenerator := new(MockClientGenerator)
mockClientGenerator.On("KubeClient").Return(fakeKube.NewSimpleClientset(), nil)
mockClientGenerator.On("KubeClient").Return(fakeKube.NewClientset(), nil)

sources, err := ByNames(context.TODO(), mockClientGenerator, []string{types.Fake}, &Config{})
sources, err := ByNames(context.TODO(), &Config{
sources: []string{types.Fake},
}, mockClientGenerator)
suite.NoError(err, "should not generate errors")
suite.Len(sources, 1, "should generate fake source")
suite.Nil(mockClientGenerator.kubeClient, "client should not be created")
}

func (suite *ByNamesTestSuite) TestSourceNotFound() {
mockClientGenerator := new(MockClientGenerator)
mockClientGenerator.On("KubeClient").Return(fakeKube.NewSimpleClientset(), nil)

sources, err := ByNames(context.TODO(), mockClientGenerator, []string{"foo"}, &Config{})
mockClientGenerator.On("KubeClient").Return(fakeKube.NewClientset(), nil)
sources, err := ByNames(context.TODO(), &Config{
sources: []string{"foo"},
}, mockClientGenerator)
suite.Equal(err, ErrSourceNotFound, "should return source not found")
suite.Empty(sources, "should not returns any source")
}
Expand All @@ -189,14 +197,16 @@ func (suite *ByNamesTestSuite) TestKubeClientFails() {
mockClientGenerator := new(MockClientGenerator)
mockClientGenerator.On("KubeClient").Return(nil, errors.New("foo"))

sourcesDependentOnKubeClient := []string{
sourceUnderTest := []string{
types.Node, types.Service, types.Ingress, types.Pod, types.IstioGateway, types.IstioVirtualService,
types.AmbassadorHost, types.GlooProxy, types.TraefikProxy, types.CRD, types.KongTCPIngress,
types.F5VirtualServer, types.F5TransportServer,
}

for _, source := range sourcesDependentOnKubeClient {
_, err := ByNames(context.TODO(), mockClientGenerator, []string{source}, &Config{})
for _, source := range sourceUnderTest {
_, err := ByNames(context.TODO(), &Config{
sources: []string{source},
}, mockClientGenerator)
suite.Error(err, source+" should return an error if kubernetes client cannot be created")
}
}
Expand All @@ -210,14 +220,16 @@ func (suite *ByNamesTestSuite) TestIstioClientFails() {
sourcesDependentOnIstioClient := []string{types.IstioGateway, types.IstioVirtualService}

for _, source := range sourcesDependentOnIstioClient {
_, err := ByNames(context.TODO(), mockClientGenerator, []string{source}, &Config{})
_, err := ByNames(context.TODO(), &Config{
sources: []string{source},
}, mockClientGenerator)
suite.Error(err, source+" should return an error if istio client cannot be created")
}
}

func (suite *ByNamesTestSuite) TestDynamicKubernetesClientFails() {
mockClientGenerator := new(MockClientGenerator)
mockClientGenerator.On("KubeClient").Return(fakeKube.NewSimpleClientset(), nil)
mockClientGenerator.On("KubeClient").Return(fakeKube.NewClientset(), nil)
mockClientGenerator.On("IstioClient").Return(istiofake.NewSimpleClientset(), nil)
mockClientGenerator.On("DynamicKubernetesClient").Return(nil, errors.New("foo"))

Expand All @@ -227,7 +239,9 @@ func (suite *ByNamesTestSuite) TestDynamicKubernetesClientFails() {
}

for _, source := range sourcesDependentOnDynamicKubernetesClient {
_, err := ByNames(context.TODO(), mockClientGenerator, []string{source}, &Config{})
_, err := ByNames(context.TODO(), &Config{
sources: []string{source},
}, mockClientGenerator)
suite.Error(err, source+" should return an error if dynamic kubernetes client cannot be created")
}
}
Expand Down Expand Up @@ -266,3 +280,47 @@ func TestBuildWithConfig_InvalidSource(t *testing.T) {
t.Errorf("expected ErrSourceNotFound, got: %v", err)
}
}

func TestConfig_ClientGenerator(t *testing.T) {
cfg := &Config{
KubeConfig: "/path/to/kubeconfig",
APIServerURL: "https://api.example.com",
RequestTimeout: 30 * time.Second,
UpdateEvents: false,
}

gen := cfg.ClientGenerator()

assert.Equal(t, "/path/to/kubeconfig", gen.KubeConfig)
assert.Equal(t, "https://api.example.com", gen.APIServerURL)
assert.Equal(t, 30*time.Second, gen.RequestTimeout)
}

func TestConfig_ClientGenerator_UpdateEvents(t *testing.T) {
cfg := &Config{
KubeConfig: "/path/to/kubeconfig",
APIServerURL: "https://api.example.com",
RequestTimeout: 30 * time.Second,
UpdateEvents: true, // Special case
}

gen := cfg.ClientGenerator()

assert.Equal(t, time.Duration(0), gen.RequestTimeout, "UpdateEvents should set timeout to 0")
}

func TestConfig_ClientGenerator_Caching(t *testing.T) {
cfg := &Config{
KubeConfig: "/path/to/kubeconfig",
APIServerURL: "https://api.example.com",
RequestTimeout: 30 * time.Second,
UpdateEvents: false,
}

// Call ClientGenerator twice
gen1 := cfg.ClientGenerator()
gen2 := cfg.ClientGenerator()

// Should return the same instance (cached)
assert.Same(t, gen1, gen2, "ClientGenerator should return the same cached instance")
}
Loading