diff --git a/controller/execute.go b/controller/execute.go index 45da78dbc5..395b3177fd 100644 --- a/controller/execute.go +++ b/controller/execute.go @@ -109,7 +109,13 @@ func Execute() { go handleSigterm(cancel) sCfg := source.NewSourceConfig(cfg) - endpointsSource, err := buildSource(ctx, sCfg) + + eventEmitter, err := buildEventEmitter(ctx, cfg, sCfg) + if err != nil { + log.Fatal(err) // nolint: gocritic // exitAfterDefer + } + + endpointsSource, err := buildSource(ctx, sCfg, eventEmitter) if err != nil { log.Fatal(err) // nolint: gocritic // exitAfterDefer } @@ -131,7 +137,7 @@ func Execute() { os.Exit(0) } - ctrl, err := buildController(ctx, cfg, sCfg, endpointsSource, prvdr, domainFilter) + ctrl, err := buildController(cfg, endpointsSource, prvdr, domainFilter, eventEmitter) if err != nil { log.Fatal(err) } @@ -360,12 +366,11 @@ func buildProvider( } func buildController( - ctx context.Context, cfg *externaldns.Config, - sCfg *source.Config, src source.Source, p provider.Provider, filter *endpoint.DomainFilter, + emitter events.EventEmitter, ) (*Controller, error) { policy, ok := plan.Policies[cfg.Policy] if !ok { @@ -375,22 +380,6 @@ func buildController( if err != nil { return nil, err } - eventsCfg := events.NewConfig( - events.WithEmitEvents(cfg.EmitEvents), - events.WithDryRun(cfg.DryRun)) - var eventEmitter events.EventEmitter - if eventsCfg.IsEnabled() { - kubeClient, err := sCfg.ClientGenerator().KubeClient() - if err != nil { - return nil, err - } - eventCtrl, err := events.NewEventController(kubeClient.EventsV1(), eventsCfg) - if err != nil { - return nil, err - } - eventCtrl.Run(ctx) - eventEmitter = eventCtrl - } return &Controller{ Source: src, @@ -402,10 +391,31 @@ func buildController( ExcludeRecordTypes: cfg.ExcludeDNSRecordTypes, MinEventSyncInterval: cfg.MinEventSyncInterval, TXTOwnerOld: cfg.TXTOwnerOld, - EventEmitter: eventEmitter, + EventEmitter: emitter, }, nil } +// buildEventEmitter creates a Kubernetes EventEmitter if event emission is enabled in the +// provided configuration. Returns nil (no-op) when events are not configured. +func buildEventEmitter(ctx context.Context, cfg *externaldns.Config, sCfg *source.Config) (events.EventEmitter, error) { + eventsCfg := events.NewConfig( + events.WithEmitEvents(cfg.EmitEvents), + events.WithDryRun(cfg.DryRun)) + if !eventsCfg.IsEnabled() { + return nil, nil + } + kubeClient, err := sCfg.ClientGenerator().KubeClient() + if err != nil { + return nil, err + } + eventCtrl, err := events.NewEventController(kubeClient.EventsV1(), eventsCfg) + if err != nil { + return nil, err + } + eventCtrl.Run(ctx) + return eventCtrl, nil +} + // This function configures the logger format and level based on the provided configuration. func configureLogger(cfg *externaldns.Config) { if cfg.LogFormat == "json" { @@ -421,7 +431,7 @@ func configureLogger(cfg *externaldns.Config) { // buildSource creates and configures the source(s) for endpoint discovery based on the provided configuration. // It initializes the source configuration, generates the required sources, and combines them into a single, // deduplicated source. Returns the combined source or an error if source creation fails. -func buildSource(ctx context.Context, cfg *source.Config) (source.Source, error) { +func buildSource(ctx context.Context, cfg *source.Config, emitter events.EventEmitter) (source.Source, error) { sources, err := source.ByNames(ctx, cfg, cfg.ClientGenerator()) if err != nil { return nil, err @@ -433,7 +443,8 @@ func buildSource(ctx context.Context, cfg *source.Config) (source.Source, error) wrappers.WithTargetNetFilter(cfg.TargetNetFilter), wrappers.WithExcludeTargetNets(cfg.ExcludeTargetNets), wrappers.WithMinTTL(cfg.MinTTL), - wrappers.WithPreferAlias(cfg.PreferAlias)) + wrappers.WithPreferAlias(cfg.PreferAlias), + wrappers.WithEventEmitter(emitter)) return wrappers.WrapSources(sources, opts) } diff --git a/controller/execute_test.go b/controller/execute_test.go index 4d7399fcbd..a67f61c35f 100644 --- a/controller/execute_test.go +++ b/controller/execute_test.go @@ -268,7 +268,7 @@ func TestBuildSourceWithWrappers(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, err := buildSource(t.Context(), source.NewSourceConfig(tt.cfg)) + _, err := buildSource(t.Context(), source.NewSourceConfig(tt.cfg), nil) require.NoError(t, err) }) } @@ -449,7 +449,7 @@ func TestControllerRunCancelContextStopsLoop(t *testing.T) { sCfg := source.NewSourceConfig(cfg) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - src, err := buildSource(ctx, sCfg) + src, err := buildSource(ctx, sCfg, nil) require.NoError(t, err) domainFilter := endpoint.NewDomainFilterWithOptions( endpoint.WithDomainFilter(cfg.DomainFilter), @@ -459,7 +459,7 @@ func TestControllerRunCancelContextStopsLoop(t *testing.T) { ) p, err := buildProvider(ctx, cfg, domainFilter) require.NoError(t, err) - ctrl, err := buildController(ctx, cfg, sCfg, src, p, domainFilter) + ctrl, err := buildController(cfg, src, p, domainFilter, nil) require.NoError(t, err) done := make(chan struct{}) diff --git a/source/wrappers/dedupsource.go b/source/wrappers/dedupsource.go index 11c4295049..24fb06fa75 100644 --- a/source/wrappers/dedupsource.go +++ b/source/wrappers/dedupsource.go @@ -23,17 +23,20 @@ import ( log "github.com/sirupsen/logrus" "sigs.k8s.io/external-dns/endpoint" + "sigs.k8s.io/external-dns/pkg/events" "sigs.k8s.io/external-dns/source" ) // dedupSource is a Source that removes duplicate endpoints from its wrapped source. type dedupSource struct { - source source.Source + source source.Source + emitter events.EventEmitter } // NewDedupSource creates a new dedupSource wrapping the provided Source. -func NewDedupSource(source source.Source) source.Source { - return &dedupSource{source: source} +// emitter is optional (nil disables Kubernetes event emission). +func NewDedupSource(source source.Source, emitter events.EventEmitter) source.Source { + return &dedupSource{source: source, emitter: emitter} } // Endpoints collects endpoints from its wrapped source and returns them without duplicates. @@ -47,6 +50,7 @@ func (ms *dedupSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, err return nil, err } + invalidEndpointsTotal.Gauge.Reset() for _, ep := range endpoints { if ep == nil { continue @@ -55,6 +59,10 @@ func (ms *dedupSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, err // validate endpoint before normalization if ok := ep.CheckEndpoint(); !ok { log.Warnf("Skipping endpoint [%s:%s] due to invalid configuration [%s:%s]", ep.SetIdentifier, ep.DNSName, ep.RecordType, strings.Join(ep.Targets, ",")) + invalidEndpointsTotal.AddWithLabels(1, ep.RecordType, endpointSource(ep)) + if ms.emitter != nil { + ms.emitter.Add(events.NewEventFromEndpoint(ep, events.ActionFailed, events.RecordError)) + } continue } @@ -80,3 +88,13 @@ func (ms *dedupSource) AddEventHandler(ctx context.Context, handler func()) { log.Debug("dedupSource: adding event handler") ms.source.AddEventHandler(ctx, handler) } + +// endpointSource returns the source type from the endpoint's object reference, +// or "unknown" if the reference is not set. Sources that set RefObject will +// populate this with their name (e.g. "ingress", "service"). +func endpointSource(ep *endpoint.Endpoint) string { + if ref := ep.RefObject(); ref != nil && ref.Source != "" { + return ref.Source + } + return "unknown" +} diff --git a/source/wrappers/dedupsource_test.go b/source/wrappers/dedupsource_test.go index ad680465d4..fec5539260 100644 --- a/source/wrappers/dedupsource_test.go +++ b/source/wrappers/dedupsource_test.go @@ -20,11 +20,17 @@ import ( "context" "testing" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/internal/testutils" logtest "sigs.k8s.io/external-dns/internal/testutils/log" + "sigs.k8s.io/external-dns/pkg/events" + "sigs.k8s.io/external-dns/pkg/events/fake" "sigs.k8s.io/external-dns/source" ) @@ -146,7 +152,7 @@ func testDedupEndpoints(t *testing.T) { mockSource.On("Endpoints").Return(tc.endpoints, nil) // Create our object under test and get the endpoints. - source := NewDedupSource(mockSource) + source := NewDedupSource(mockSource, nil) endpoints, err := source.Endpoints(context.Background()) if err != nil { @@ -178,7 +184,7 @@ func TestDedupSource_AddEventHandler(t *testing.T) { t.Run(tt.title, func(t *testing.T) { mockSource := testutils.NewMockSource() - src := NewDedupSource(mockSource) + src := NewDedupSource(mockSource, nil) src.AddEventHandler(t.Context(), func() {}) mockSource.AssertNumberOfCalls(t, "AddEventHandler", tt.times) @@ -303,7 +309,7 @@ func TestDedupEndpointsValidation(t *testing.T) { mockSource := new(testutils.MockSource) mockSource.On("Endpoints").Return(tt.endpoints, nil) - sr := NewDedupSource(mockSource) + sr := NewDedupSource(mockSource, nil) endpoints, err := sr.Endpoints(context.Background()) require.NoError(t, err) @@ -348,7 +354,7 @@ func TestDedupSource_WarnsOnInvalidEndpoint(t *testing.T) { mockSource := new(testutils.MockSource) mockSource.On("Endpoints").Return([]*endpoint.Endpoint{tt.endpoint}, nil) - src := NewDedupSource(mockSource) + src := NewDedupSource(mockSource, nil) _, err := src.Endpoints(context.Background()) require.NoError(t, err) @@ -356,3 +362,84 @@ func TestDedupSource_WarnsOnInvalidEndpoint(t *testing.T) { }) } } + +func TestDedupSource_InvalidEndpointIncrementsMetric(t *testing.T) { + tests := []struct { + name string + endpoint *endpoint.Endpoint + wantRecordType string + wantSourceType string + }{ + { + name: "invalid SRV without RefObject reports unknown source_type", + endpoint: &endpoint.Endpoint{ + DNSName: "example.org", + RecordType: endpoint.RecordTypeSRV, + Targets: endpoint.Targets{"10 mail.example.org"}, // missing weight/port — invalid + }, + wantRecordType: "srv", + wantSourceType: "unknown", + }, + { + name: "invalid MX without RefObject reports unknown source_type", + endpoint: &endpoint.Endpoint{ + DNSName: "example.org", + RecordType: endpoint.RecordTypeMX, + Targets: endpoint.Targets{"mail.example.org"}, // missing priority — invalid + }, + wantRecordType: "mx", + wantSourceType: "unknown", + }, + { + name: "invalid MX with RefObject reports source_type from ref", + endpoint: endpoint.NewEndpoint("example.org", endpoint.RecordTypeMX, "mail.example.org"). + WithRefObject(&events.ObjectReference{Source: "ingress"}), + wantRecordType: "mx", + wantSourceType: "ingress", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockSource := new(testutils.MockSource) + mockSource.On("Endpoints").Return([]*endpoint.Endpoint{tt.endpoint}, nil) + + src := NewDedupSource(mockSource, nil) + _, err := src.Endpoints(context.Background()) + require.NoError(t, err) + + value := testutil.ToFloat64(invalidEndpointsTotal.Gauge.With( + prometheus.Labels{"record_type": tt.wantRecordType, "source_type": tt.wantSourceType}, + )) + assert.Equal(t, 1.0, value) + + mockSource.AssertExpectations(t) + }) + } +} + +func TestDedupSource_InvalidEndpointEmitsEvent(t *testing.T) { + // An endpoint with a non-nil RefObject so that NewEventFromEndpoint produces a real event. + ref := &events.ObjectReference{ + Kind: "Service", + Name: "my-svc", + Namespace: "default", + } + ep := endpoint.NewEndpointWithTTL("example.org", endpoint.RecordTypeMX, 300, + "mail.example.org", // missing priority — invalid MX + ).WithRefObject(ref) + + mockSource := new(testutils.MockSource) + mockSource.On("Endpoints").Return([]*endpoint.Endpoint{ep}, nil) + + fakeEmitter := fake.NewFakeEventEmitter() + + src := NewDedupSource(mockSource, fakeEmitter) + _, err := src.Endpoints(context.Background()) + require.NoError(t, err) + + fakeEmitter.AssertCalled(t, "Add", mock.MatchedBy(func(e events.Event) bool { + return e.Action() == events.ActionFailed && e.Reason() == events.RecordError + })) + mockSource.AssertExpectations(t) +} diff --git a/source/wrappers/metrics.go b/source/wrappers/metrics.go new file mode 100644 index 0000000000..7dc8fdc1a0 --- /dev/null +++ b/source/wrappers/metrics.go @@ -0,0 +1,38 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package wrappers + +import ( + "github.com/prometheus/client_golang/prometheus" + + "sigs.k8s.io/external-dns/pkg/metrics" +) + +var invalidEndpointsTotal = metrics.NewGaugedVectorOpts( + prometheus.GaugeOpts{ + Subsystem: "source", + Name: "invalid_endpoints", + Help: "Number of endpoints currently rejected due to invalid configuration, partitioned by record type and source.", + }, + []string{"record_type", "source_type"}, +) + +// TODO: add metric for deduplicated records as well, partitioned by record type and source. + +func init() { + metrics.RegisterMetric.MustRegister(invalidEndpointsTotal) +} diff --git a/source/wrappers/types.go b/source/wrappers/types.go index 68ca13f286..9ed47b3d36 100644 --- a/source/wrappers/types.go +++ b/source/wrappers/types.go @@ -21,6 +21,7 @@ import ( "time" "sigs.k8s.io/external-dns/endpoint" + "sigs.k8s.io/external-dns/pkg/events" "sigs.k8s.io/external-dns/source" ) @@ -32,6 +33,7 @@ type Config struct { excludeTargetNets []string minTTL time.Duration preferAlias bool + eventEmitter events.EventEmitter sourceWrappers map[string]bool // map of source wrappers, e.g. "targetfilter", "nat64" } @@ -87,6 +89,12 @@ func WithPreferAlias(enabled bool) Option { } } +func WithEventEmitter(e events.EventEmitter) Option { + return func(o *Config) { + o.eventEmitter = e + } +} + // addSourceWrapper registers a source wrapper by name in the Config. // It initializes the sourceWrappers map if it is nil. func (o *Config) addSourceWrapper(name string) { @@ -112,7 +120,7 @@ func WrapSources( sources []source.Source, opts *Config, ) (source.Source, error) { - combinedSource := NewDedupSource(NewMultiSource(sources, opts.defaultTargets, opts.forceDefaultTargets)) + combinedSource := NewDedupSource(NewMultiSource(sources, opts.defaultTargets, opts.forceDefaultTargets), opts.eventEmitter) opts.addSourceWrapper("dedup") if len(opts.nat64Networks) > 0 { var err error