Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 68 additions & 48 deletions source/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
gateway "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"

Expand Down Expand Up @@ -188,6 +189,7 @@ func (cfg *Config) ClientGenerator() *SingletonClientGenerator {
// - IstioClient: Istio service mesh client
// - DynamicKubernetesClient: Dynamic client for custom resources
// - OpenShiftClient: OpenShift-specific client for Route resources
// - RESTConfig: Instrumented REST config for creating custom clients
//
// The singleton behavior is implemented in SingletonClientGenerator which uses
// sync.Once to guarantee single initialization of each client type.
Expand All @@ -197,6 +199,7 @@ type ClientGenerator interface {
IstioClient() (istioclient.Interface, error)
DynamicKubernetesClient() (dynamic.Interface, error)
OpenShiftClient() (openshift.Interface, error)
RESTConfig() (*rest.Config, error)
}

// SingletonClientGenerator stores provider clients and guarantees that only one instance of each client
Expand All @@ -210,15 +213,39 @@ type ClientGenerator interface {
//
// Configuration: Clients are configured using KubeConfig, APIServerURL, and RequestTimeout
// which are set during SingletonClientGenerator initialization.
//
// TODO: Fix error handling pattern in client methods. Current implementation has a bug where
// errors are only returned on the first call due to sync.Once behavior. If initialization fails
// on the first call, subsequent calls return (nil, nil) instead of (nil, originalError), which
// can lead to nil pointer dereferences. Solution: Store error in a field alongside the client,
// similar to how the client itself is stored. Example:
//
// type SingletonClientGenerator struct {
// restConfig *rest.Config
// restConfigErr error // Store error persistently
// restConfigOnce sync.Once
// }
//
// func (p *SingletonClientGenerator) RESTConfig() (*rest.Config, error) {
// p.restConfigOnce.Do(func() {
// p.restConfig, p.restConfigErr = kubeclient.InstrumentedRESTConfig(...)
// })
// return p.restConfig, p.restConfigErr // Return stored error
// }
//
// This pattern should be applied to all client methods: KubeClient, GatewayClient,
// DynamicKubernetesClient, OpenShiftClient, and RESTConfig.
type SingletonClientGenerator struct {
KubeConfig string
APIServerURL string
RequestTimeout time.Duration
restConfig *rest.Config
kubeClient kubernetes.Interface
gatewayClient gateway.Interface
istioClient *istioclient.Clientset
dynKubeClient dynamic.Interface
openshiftClient openshift.Interface
restConfigOnce sync.Once
kubeOnce sync.Once
gatewayOnce sync.Once
istioOnce sync.Once
Expand All @@ -235,28 +262,35 @@ func (p *SingletonClientGenerator) KubeClient() (kubernetes.Interface, error) {
return p.kubeClient, err
}

// RESTConfig generates an instrumented REST config if it was not created before.
// The config includes request timeout handling and metrics instrumentation.
// This is useful for sources that need to create custom clients (e.g., controller-runtime clients).
func (p *SingletonClientGenerator) RESTConfig() (*rest.Config, error) {
var err error
p.restConfigOnce.Do(func() {
p.restConfig, err = kubeclient.InstrumentedRESTConfig(p.KubeConfig, p.APIServerURL, p.RequestTimeout)
})
return p.restConfig, err
}

// GatewayClient generates a gateway client if it was not created before
func (p *SingletonClientGenerator) GatewayClient() (gateway.Interface, error) {
var err error
p.gatewayOnce.Do(func() {
p.gatewayClient, err = newGatewayClient(p.KubeConfig, p.APIServerURL, p.RequestTimeout)
var config *rest.Config
config, err = p.RESTConfig()
if err != nil {
return
}
p.gatewayClient, err = gateway.NewForConfig(config)
if err != nil {
return
}
log.Infof("Created GatewayAPI client %s", config.Host)
})
return p.gatewayClient, err
}

func newGatewayClient(kubeConfig, apiServerURL string, requestTimeout time.Duration) (gateway.Interface, error) {
config, err := kubeclient.InstrumentedRESTConfig(kubeConfig, apiServerURL, requestTimeout)
if err != nil {
return nil, err
}
client, err := gateway.NewForConfig(config)
if err != nil {
return nil, err
}
log.Infof("Created GatewayAPI client %s", config.Host)
return client, nil
}

// IstioClient generates an istio go client if it was not created before
func (p *SingletonClientGenerator) IstioClient() (istioclient.Interface, error) {
var err error
Expand All @@ -270,7 +304,16 @@ func (p *SingletonClientGenerator) IstioClient() (istioclient.Interface, error)
func (p *SingletonClientGenerator) DynamicKubernetesClient() (dynamic.Interface, error) {
var err error
p.dynCliOnce.Do(func() {
p.dynKubeClient, err = NewDynamicKubernetesClient(p.KubeConfig, p.APIServerURL, p.RequestTimeout)
var config *rest.Config
config, err = p.RESTConfig()
if err != nil {
return
}
p.dynKubeClient, err = dynamic.NewForConfig(config)
if err != nil {
return
}
log.Infof("Created Dynamic Kubernetes client %s", config.Host)
})
return p.dynKubeClient, err
}
Expand All @@ -279,7 +322,16 @@ func (p *SingletonClientGenerator) DynamicKubernetesClient() (dynamic.Interface,
func (p *SingletonClientGenerator) OpenShiftClient() (openshift.Interface, error) {
var err error
p.openshiftOnce.Do(func() {
p.openshiftClient, err = NewOpenShiftClient(p.KubeConfig, p.APIServerURL, p.RequestTimeout)
var config *rest.Config
config, err = p.RESTConfig()
if err != nil {
return
}
p.openshiftClient, err = openshift.NewForConfig(config)
if err != nil {
return
}
log.Infof("Created OpenShift client %s", config.Host)
})
return p.openshiftClient, err
}
Expand Down Expand Up @@ -630,35 +682,3 @@ func NewIstioClient(kubeConfig string, apiServerURL string) (*istioclient.Client

return ic, nil
}

// NewDynamicKubernetesClient returns a new Dynamic Kubernetes client object. It takes a Config and
// uses APIServerURL and KubeConfig attributes to connect to the cluster. If
// KubeConfig isn't provided it defaults to using the recommended default.
func NewDynamicKubernetesClient(kubeConfig, apiServerURL string, requestTimeout time.Duration) (dynamic.Interface, error) {
config, err := kubeclient.InstrumentedRESTConfig(kubeConfig, apiServerURL, requestTimeout)
if err != nil {
return nil, err
}
client, err := dynamic.NewForConfig(config)
if err != nil {
return nil, err
}
log.Infof("Created Dynamic Kubernetes client %s", config.Host)
return client, nil
}

// NewOpenShiftClient returns a new Openshift client object. It takes a Config and
// uses APIServerURL and KubeConfig attributes to connect to the cluster. If
// KubeConfig isn't provided it defaults to using the recommended default.
func NewOpenShiftClient(kubeConfig, apiServerURL string, requestTimeout time.Duration) (*openshift.Clientset, error) {
config, err := kubeclient.InstrumentedRESTConfig(kubeConfig, apiServerURL, requestTimeout)
if err != nil {
return nil, err
}
client, err := openshift.NewForConfig(config)
if err != nil {
return nil, err
}
log.Infof("Created OpenShift client %s", config.Host)
return client, nil
}
146 changes: 146 additions & 0 deletions source/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
openshift "github.com/openshift/client-go/route/clientset/versioned"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
istioclient "istio.io/client-go/pkg/clientset/versioned"
istiofake "istio.io/client-go/pkg/clientset/versioned/fake"
Expand All @@ -35,6 +36,7 @@ import (
fakeDynamic "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/kubernetes"
fakeKube "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
"sigs.k8s.io/external-dns/source/types"
gateway "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"
)
Expand Down Expand Up @@ -93,6 +95,14 @@ func (m *MockClientGenerator) OpenShiftClient() (openshift.Interface, error) {
return nil, args.Error(1)
}

func (m *MockClientGenerator) RESTConfig() (*rest.Config, error) {
args := m.Called()
if args.Error(1) == nil {
return args.Get(0).(*rest.Config), nil
}
return nil, args.Error(1)
}

type ByNamesTestSuite struct {
suite.Suite
}
Expand Down Expand Up @@ -266,6 +276,7 @@ func (m *minimalMockClientGenerator) DynamicKubernetesClient() (dynamic.Interfac
func (m *minimalMockClientGenerator) OpenShiftClient() (openshift.Interface, error) {
return nil, errMock
}
func (m *minimalMockClientGenerator) RESTConfig() (*rest.Config, error) { return nil, errMock }

func TestBuildWithConfig_InvalidSource(t *testing.T) {
ctx := context.Background()
Expand Down Expand Up @@ -324,3 +335,138 @@ func TestConfig_ClientGenerator_Caching(t *testing.T) {
// Should return the same instance (cached)
assert.Same(t, gen1, gen2, "ClientGenerator should return the same cached instance")
}

// TestSingletonClientGenerator_RESTConfig_TimeoutPropagation verifies timeout configuration
func TestSingletonClientGenerator_RESTConfig_TimeoutPropagation(t *testing.T) {
testCases := []struct {
name string
requestTimeout time.Duration
}{
{
name: "30 second timeout",
requestTimeout: 30 * time.Second,
},
{
name: "60 second timeout",
requestTimeout: 60 * time.Second,
},
{
name: "zero timeout (for watches)",
requestTimeout: 0,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
gen := &SingletonClientGenerator{
KubeConfig: "",
APIServerURL: "",
RequestTimeout: tc.requestTimeout,
}

// Verify the generator was configured with correct timeout
assert.Equal(t, tc.requestTimeout, gen.RequestTimeout,
"SingletonClientGenerator should have the configured RequestTimeout")

config, err := gen.RESTConfig()

// Even if config creation failed, verify the timeout was set in generator
assert.Equal(t, tc.requestTimeout, gen.RequestTimeout,
"RequestTimeout should remain unchanged after RESTConfig() call")

// If config was successfully created, verify timeout propagated correctly
if err == nil {
require.NotNil(t, config, "Config should not be nil when error is nil")
assert.Equal(t, tc.requestTimeout, config.Timeout,
"REST config should have timeout matching RequestTimeout field")
}
})
}
}

// TestConfig_ClientGenerator_RESTConfig_Integration verifies Config → ClientGenerator → RESTConfig flow
func TestConfig_ClientGenerator_RESTConfig_Integration(t *testing.T) {
t.Run("normal timeout is propagated", func(t *testing.T) {
cfg := &Config{
KubeConfig: "",
APIServerURL: "",
RequestTimeout: 45 * time.Second,
UpdateEvents: false,
}

gen := cfg.ClientGenerator()

// Verify ClientGenerator has correct timeout
assert.Equal(t, 45*time.Second, gen.RequestTimeout,
"ClientGenerator should have the configured RequestTimeout")

config, err := gen.RESTConfig()

// Even if config creation fails, the timeout setting should be correct
assert.Equal(t, 45*time.Second, gen.RequestTimeout,
"RequestTimeout should remain 45s after RESTConfig() call")

if err == nil {
require.NotNil(t, config, "Config should not be nil when error is nil")
assert.Equal(t, 45*time.Second, config.Timeout,
"RESTConfig should propagate the timeout")
}
})

t.Run("UpdateEvents sets timeout to zero", func(t *testing.T) {
cfg := &Config{
KubeConfig: "",
APIServerURL: "",
RequestTimeout: 45 * time.Second,
UpdateEvents: true, // Should override to 0
}

gen := cfg.ClientGenerator()

// When UpdateEvents=true, ClientGenerator sets timeout to 0 (for long-running watches)
assert.Equal(t, time.Duration(0), gen.RequestTimeout,
"ClientGenerator should have zero timeout when UpdateEvents=true")

config, err := gen.RESTConfig()

// Verify the timeout is 0, regardless of whether config was created
assert.Equal(t, time.Duration(0), gen.RequestTimeout,
"RequestTimeout should remain 0 after RESTConfig() call")

if err == nil {
require.NotNil(t, config, "Config should not be nil when error is nil")
assert.Equal(t, time.Duration(0), config.Timeout,
"RESTConfig should have zero timeout for watch operations")
}
})
}

// TestSingletonClientGenerator_RESTConfig_SharedAcrossClients verifies singleton is shared
func TestSingletonClientGenerator_RESTConfig_SharedAcrossClients(t *testing.T) {
gen := &SingletonClientGenerator{
KubeConfig: "",
APIServerURL: "",
RequestTimeout: 30 * time.Second,
}

// Get REST config multiple times
restConfig1, err1 := gen.RESTConfig()
restConfig2, err2 := gen.RESTConfig()
restConfig3, err3 := gen.RESTConfig()

// Verify singleton behavior - all should return same instance
assert.Same(t, restConfig1, restConfig2, "RESTConfig should return same instance on second call")
assert.Same(t, restConfig1, restConfig3, "RESTConfig should return same instance on third call")

// Verify the internal field matches
assert.Same(t, restConfig1, gen.restConfig,
"Internal restConfig field should match returned value")

// Verify first call had error (no valid kubeconfig)
assert.Error(t, err1, "First call should return error when kubeconfig is invalid")

// Due to sync.Once bug, subsequent calls won't return the error
// This is documented in the TODO comment on SingletonClientGenerator
require.NoError(t, err2, "Second call does not return error due to sync.Once bug")
require.NoError(t, err3, "Third call does not return error due to sync.Once bug")
}
Loading