Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 34 additions & 23 deletions controller/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,13 @@ func Execute() {
go handleSigterm(cancel)

sCfg := source.NewSourceConfig(cfg)
endpointsSource, err := buildSource(ctx, sCfg)

eventEmitter, err := buildEventEmitter(ctx, cfg, sCfg)
if err != nil {
log.Fatal(err) // nolint: gocritic // exitAfterDefer
}

endpointsSource, err := buildSource(ctx, sCfg, eventEmitter)
if err != nil {
log.Fatal(err) // nolint: gocritic // exitAfterDefer
}
Expand All @@ -131,7 +137,7 @@ func Execute() {
os.Exit(0)
}

ctrl, err := buildController(ctx, cfg, sCfg, endpointsSource, prvdr, domainFilter)
ctrl, err := buildController(cfg, endpointsSource, prvdr, domainFilter, eventEmitter)
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -360,12 +366,11 @@ func buildProvider(
}

func buildController(
ctx context.Context,
cfg *externaldns.Config,
sCfg *source.Config,
src source.Source,
p provider.Provider,
filter *endpoint.DomainFilter,
emitter events.EventEmitter,
) (*Controller, error) {
policy, ok := plan.Policies[cfg.Policy]
if !ok {
Expand All @@ -375,22 +380,6 @@ func buildController(
if err != nil {
return nil, err
}
eventsCfg := events.NewConfig(
events.WithEmitEvents(cfg.EmitEvents),
events.WithDryRun(cfg.DryRun))
var eventEmitter events.EventEmitter
if eventsCfg.IsEnabled() {
kubeClient, err := sCfg.ClientGenerator().KubeClient()
if err != nil {
return nil, err
}
eventCtrl, err := events.NewEventController(kubeClient.EventsV1(), eventsCfg)
if err != nil {
return nil, err
}
eventCtrl.Run(ctx)
eventEmitter = eventCtrl
}

return &Controller{
Source: src,
Expand All @@ -402,10 +391,31 @@ func buildController(
ExcludeRecordTypes: cfg.ExcludeDNSRecordTypes,
MinEventSyncInterval: cfg.MinEventSyncInterval,
TXTOwnerOld: cfg.TXTOwnerOld,
EventEmitter: eventEmitter,
EventEmitter: emitter,
}, nil
}

// buildEventEmitter creates a Kubernetes EventEmitter if event emission is enabled in the
// provided configuration. Returns nil (no-op) when events are not configured.
func buildEventEmitter(ctx context.Context, cfg *externaldns.Config, sCfg *source.Config) (events.EventEmitter, error) {
eventsCfg := events.NewConfig(
events.WithEmitEvents(cfg.EmitEvents),
events.WithDryRun(cfg.DryRun))
if !eventsCfg.IsEnabled() {
return nil, nil
}
kubeClient, err := sCfg.ClientGenerator().KubeClient()
if err != nil {
return nil, err
}
eventCtrl, err := events.NewEventController(kubeClient.EventsV1(), eventsCfg)
if err != nil {
return nil, err
}
eventCtrl.Run(ctx)
return eventCtrl, nil
}

// This function configures the logger format and level based on the provided configuration.
func configureLogger(cfg *externaldns.Config) {
if cfg.LogFormat == "json" {
Expand All @@ -421,7 +431,7 @@ func configureLogger(cfg *externaldns.Config) {
// buildSource creates and configures the source(s) for endpoint discovery based on the provided configuration.
// It initializes the source configuration, generates the required sources, and combines them into a single,
// deduplicated source. Returns the combined source or an error if source creation fails.
func buildSource(ctx context.Context, cfg *source.Config) (source.Source, error) {
func buildSource(ctx context.Context, cfg *source.Config, emitter events.EventEmitter) (source.Source, error) {
sources, err := source.ByNames(ctx, cfg, cfg.ClientGenerator())
if err != nil {
return nil, err
Expand All @@ -433,7 +443,8 @@ func buildSource(ctx context.Context, cfg *source.Config) (source.Source, error)
wrappers.WithTargetNetFilter(cfg.TargetNetFilter),
wrappers.WithExcludeTargetNets(cfg.ExcludeTargetNets),
wrappers.WithMinTTL(cfg.MinTTL),
wrappers.WithPreferAlias(cfg.PreferAlias))
wrappers.WithPreferAlias(cfg.PreferAlias),
wrappers.WithEventEmitter(emitter))
return wrappers.WrapSources(sources, opts)
}

Expand Down
6 changes: 3 additions & 3 deletions controller/execute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func TestBuildSourceWithWrappers(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := buildSource(t.Context(), source.NewSourceConfig(tt.cfg))
_, err := buildSource(t.Context(), source.NewSourceConfig(tt.cfg), nil)
require.NoError(t, err)
})
}
Expand Down Expand Up @@ -449,7 +449,7 @@ func TestControllerRunCancelContextStopsLoop(t *testing.T) {
sCfg := source.NewSourceConfig(cfg)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
src, err := buildSource(ctx, sCfg)
src, err := buildSource(ctx, sCfg, nil)
require.NoError(t, err)
domainFilter := endpoint.NewDomainFilterWithOptions(
endpoint.WithDomainFilter(cfg.DomainFilter),
Expand All @@ -459,7 +459,7 @@ func TestControllerRunCancelContextStopsLoop(t *testing.T) {
)
p, err := buildProvider(ctx, cfg, domainFilter)
require.NoError(t, err)
ctrl, err := buildController(ctx, cfg, sCfg, src, p, domainFilter)
ctrl, err := buildController(cfg, src, p, domainFilter, nil)
require.NoError(t, err)

done := make(chan struct{})
Expand Down
24 changes: 21 additions & 3 deletions source/wrappers/dedupsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,20 @@ import (
log "github.com/sirupsen/logrus"

"sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/pkg/events"
"sigs.k8s.io/external-dns/source"
)

// dedupSource is a Source that removes duplicate endpoints from its wrapped source.
type dedupSource struct {
source source.Source
source source.Source
emitter events.EventEmitter
}

// NewDedupSource creates a new dedupSource wrapping the provided Source.
func NewDedupSource(source source.Source) source.Source {
return &dedupSource{source: source}
// emitter is optional (nil disables Kubernetes event emission).
func NewDedupSource(source source.Source, emitter events.EventEmitter) source.Source {
return &dedupSource{source: source, emitter: emitter}
}

// Endpoints collects endpoints from its wrapped source and returns them without duplicates.
Expand All @@ -47,6 +50,7 @@ func (ms *dedupSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, err
return nil, err
}

invalidEndpointsTotal.Gauge.Reset()
for _, ep := range endpoints {
if ep == nil {
continue
Expand All @@ -55,6 +59,10 @@ func (ms *dedupSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, err
// validate endpoint before normalization
if ok := ep.CheckEndpoint(); !ok {
log.Warnf("Skipping endpoint [%s:%s] due to invalid configuration [%s:%s]", ep.SetIdentifier, ep.DNSName, ep.RecordType, strings.Join(ep.Targets, ","))
invalidEndpointsTotal.AddWithLabels(1, ep.RecordType, endpointSource(ep))
if ms.emitter != nil {
ms.emitter.Add(events.NewEventFromEndpoint(ep, events.ActionFailed, events.RecordError))
}
continue
}

Expand All @@ -80,3 +88,13 @@ func (ms *dedupSource) AddEventHandler(ctx context.Context, handler func()) {
log.Debug("dedupSource: adding event handler")
ms.source.AddEventHandler(ctx, handler)
}

// endpointSource returns the source type from the endpoint's object reference,
// or "unknown" if the reference is not set. Sources that set RefObject will
// populate this with their name (e.g. "ingress", "service").
func endpointSource(ep *endpoint.Endpoint) string {
if ref := ep.RefObject(); ref != nil && ref.Source != "" {
return ref.Source
}
return "unknown"
}
95 changes: 91 additions & 4 deletions source/wrappers/dedupsource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@ import (
"context"
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/internal/testutils"
logtest "sigs.k8s.io/external-dns/internal/testutils/log"
"sigs.k8s.io/external-dns/pkg/events"
"sigs.k8s.io/external-dns/pkg/events/fake"
"sigs.k8s.io/external-dns/source"
)

Expand Down Expand Up @@ -146,7 +152,7 @@ func testDedupEndpoints(t *testing.T) {
mockSource.On("Endpoints").Return(tc.endpoints, nil)

// Create our object under test and get the endpoints.
source := NewDedupSource(mockSource)
source := NewDedupSource(mockSource, nil)

endpoints, err := source.Endpoints(context.Background())
if err != nil {
Expand Down Expand Up @@ -178,7 +184,7 @@ func TestDedupSource_AddEventHandler(t *testing.T) {
t.Run(tt.title, func(t *testing.T) {
mockSource := testutils.NewMockSource()

src := NewDedupSource(mockSource)
src := NewDedupSource(mockSource, nil)
src.AddEventHandler(t.Context(), func() {})

mockSource.AssertNumberOfCalls(t, "AddEventHandler", tt.times)
Expand Down Expand Up @@ -303,7 +309,7 @@ func TestDedupEndpointsValidation(t *testing.T) {
mockSource := new(testutils.MockSource)
mockSource.On("Endpoints").Return(tt.endpoints, nil)

sr := NewDedupSource(mockSource)
sr := NewDedupSource(mockSource, nil)
endpoints, err := sr.Endpoints(context.Background())
require.NoError(t, err)

Expand Down Expand Up @@ -348,11 +354,92 @@ func TestDedupSource_WarnsOnInvalidEndpoint(t *testing.T) {
mockSource := new(testutils.MockSource)
mockSource.On("Endpoints").Return([]*endpoint.Endpoint{tt.endpoint}, nil)

src := NewDedupSource(mockSource)
src := NewDedupSource(mockSource, nil)
_, err := src.Endpoints(context.Background())
require.NoError(t, err)

logtest.TestHelperLogContains(tt.wantLogMsg, hook, t)
})
}
}

func TestDedupSource_InvalidEndpointIncrementsMetric(t *testing.T) {
tests := []struct {
name string
endpoint *endpoint.Endpoint
wantRecordType string
wantSourceType string
}{
{
name: "invalid SRV without RefObject reports unknown source_type",
endpoint: &endpoint.Endpoint{
DNSName: "example.org",
RecordType: endpoint.RecordTypeSRV,
Targets: endpoint.Targets{"10 mail.example.org"}, // missing weight/port — invalid
},
wantRecordType: "srv",
wantSourceType: "unknown",
},
{
name: "invalid MX without RefObject reports unknown source_type",
endpoint: &endpoint.Endpoint{
DNSName: "example.org",
RecordType: endpoint.RecordTypeMX,
Targets: endpoint.Targets{"mail.example.org"}, // missing priority — invalid
},
wantRecordType: "mx",
wantSourceType: "unknown",
},
{
name: "invalid MX with RefObject reports source_type from ref",
endpoint: endpoint.NewEndpoint("example.org", endpoint.RecordTypeMX, "mail.example.org").
WithRefObject(&events.ObjectReference{Source: "ingress"}),
wantRecordType: "mx",
wantSourceType: "ingress",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mockSource := new(testutils.MockSource)
mockSource.On("Endpoints").Return([]*endpoint.Endpoint{tt.endpoint}, nil)

src := NewDedupSource(mockSource, nil)
_, err := src.Endpoints(context.Background())
require.NoError(t, err)

value := testutil.ToFloat64(invalidEndpointsTotal.Gauge.With(
prometheus.Labels{"record_type": tt.wantRecordType, "source_type": tt.wantSourceType},
))
assert.Equal(t, 1.0, value)

mockSource.AssertExpectations(t)
})
}
}

func TestDedupSource_InvalidEndpointEmitsEvent(t *testing.T) {
// An endpoint with a non-nil RefObject so that NewEventFromEndpoint produces a real event.
ref := &events.ObjectReference{
Kind: "Service",
Name: "my-svc",
Namespace: "default",
}
ep := endpoint.NewEndpointWithTTL("example.org", endpoint.RecordTypeMX, 300,
"mail.example.org", // missing priority — invalid MX
).WithRefObject(ref)

mockSource := new(testutils.MockSource)
mockSource.On("Endpoints").Return([]*endpoint.Endpoint{ep}, nil)

fakeEmitter := fake.NewFakeEventEmitter()

src := NewDedupSource(mockSource, fakeEmitter)
_, err := src.Endpoints(context.Background())
require.NoError(t, err)

fakeEmitter.AssertCalled(t, "Add", mock.MatchedBy(func(e events.Event) bool {
return e.Action() == events.ActionFailed && e.Reason() == events.RecordError
}))
mockSource.AssertExpectations(t)
}
38 changes: 38 additions & 0 deletions source/wrappers/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package wrappers

import (
"github.com/prometheus/client_golang/prometheus"

"sigs.k8s.io/external-dns/pkg/metrics"
)

var invalidEndpointsTotal = metrics.NewGaugedVectorOpts(
prometheus.GaugeOpts{
Subsystem: "source",
Name: "invalid_endpoints",
Help: "Number of endpoints currently rejected due to invalid configuration, partitioned by record type and source.",
},
[]string{"record_type", "source_type"},
)

// TODO: add metric for deduplicated records as well, partitioned by record type and source.

func init() {
metrics.RegisterMetric.MustRegister(invalidEndpointsTotal)
}
Loading