diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 4e650a193d16..1dfb220b1ed1 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -69,6 +69,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. ==== Bugfixes +- Do not start namespace and node watchers on metricbeat autodiscover if `add_resource_metadata` is disabled.{pull}37181[37181] - Fix how Prometheus histograms are calculated when percentiles are provide.{pull}36537[36537] - Stop using `mage:import` in community beats. This was ignoring the vendorized beats directory for some mage targets, using the code available in GOPATH, this causes inconsistencies and compilation problems if the version of the code in the GOPATH is different to the vendored one. Use of `mage:import` will continue to be unsupported in custom beats till beats is migrated to go modules, or mage supports vendored dependencies. {issue}13998[13998] {pull}14162[14162] - Metricbeat module builders call host parser only once when instantiating light modules. {pull}20149[20149] diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index 31c7297a106e..508954948b2a 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -59,7 +59,7 @@ type pod struct { func NewPodEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publish func(event []bus.Event)) (Eventer, error) { logger := logp.NewLogger("autodiscover.pod") - var replicaSetWatcher, jobWatcher kubernetes.Watcher + var replicaSetWatcher, jobWatcher, nodeWatcher, namespaceWatcher kubernetes.Watcher config := defaultConfig() err := cfg.Unpack(&config) @@ -96,40 +96,50 @@ func NewPodEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publish fu return nil, fmt.Errorf("couldn't create watcher for %T due to error %w", &kubernetes.Pod{}, err) } - options := kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - Node: config.Node, - Namespace: config.Namespace, - } - metaConf := config.AddResourceMetadata - nodeWatcher, err := kubernetes.NewNamedWatcher("node", client, &kubernetes.Node{}, options, nil) - if err != nil { - logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err) + + var options kubernetes.WatchOptions + if metaConf.Node.Enabled() || config.Hints.Enabled() { + options = kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + Node: config.Node, + } + nodeWatcher, err = kubernetes.NewNamedWatcher("node", client, &kubernetes.Node{}, options, nil) + if err != nil { + logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err) + } } - namespaceWatcher, err := kubernetes.NewNamedWatcher("namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - }, nil) - if err != nil { - logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err) + if metaConf.Namespace.Enabled() || config.Hints.Enabled() { + options = kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + Namespace: config.Namespace, + } + namespaceWatcher, err = kubernetes.NewNamedWatcher("namespace", client, &kubernetes.Namespace{}, options, nil) + if err != nil { + logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err) + } } - // Resource is Pod so we need to create watchers for Replicasets and Jobs that it might belongs to + // Resource is Pod, so we need to create watchers for Replicasets and Jobs that it might belong to // in order to be able to retrieve 2nd layer Owner metadata like in case of: // Deployment -> Replicaset -> Pod // CronJob -> job -> Pod if metaConf.Deployment { - replicaSetWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{ + options = kubernetes.WatchOptions{ SyncTimeout: config.SyncPeriod, - }, nil) + } + replicaSetWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, + &kubernetes.ReplicaSet{}, options, nil) if err != nil { logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.ReplicaSet{}, err) } } if metaConf.CronJob { - jobWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{ + options = kubernetes.WatchOptions{ SyncTimeout: config.SyncPeriod, - }, nil) + } + jobWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, + options, nil) if err != nil { logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err) } @@ -152,12 +162,12 @@ func NewPodEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publish fu watcher.AddEventHandler(p) - if nodeWatcher != nil && (config.Hints.Enabled() || metaConf.Node.Enabled()) { + if nodeWatcher != nil { updater := kubernetes.NewNodePodUpdater(p.unlockedUpdate, watcher.Store(), &p.crossUpdate) nodeWatcher.AddEventHandler(updater) } - if namespaceWatcher != nil && (config.Hints.Enabled() || metaConf.Namespace.Enabled()) { + if namespaceWatcher != nil { updater := kubernetes.NewNamespacePodUpdater(p.unlockedUpdate, watcher.Store(), &p.crossUpdate) namespaceWatcher.AddEventHandler(updater) } diff --git a/libbeat/autodiscover/providers/kubernetes/pod_test.go b/libbeat/autodiscover/providers/kubernetes/pod_test.go index 4704dc6b8c75..0b350205c2fd 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod_test.go +++ b/libbeat/autodiscover/providers/kubernetes/pod_test.go @@ -1978,6 +1978,93 @@ func TestPod_EmitEvent(t *testing.T) { } } +func TestPodEventer_Namespace_Node_Watcher(t *testing.T) { + client := k8sfake.NewSimpleClientset() + uuid, err := uuid.NewV4() + if err != nil { + t.Fatal(err) + } + + tests := []struct { + cfg mapstr.M + expectedNil bool + name string + msg string + }{ + { + cfg: mapstr.M{ + "resource": "pod", + "node": "node-1", + "add_resource_metadata": mapstr.M{ + "namespace.enabled": false, + }, + "hints.enabled": false, + "builders": []mapstr.M{ + { + "mock": mapstr.M{}, + }, + }, + }, + expectedNil: true, + name: "add_resource_metadata.namespace disabled and hints disabled.", + msg: "Watcher should be nil.", + }, + { + cfg: mapstr.M{ + "resource": "pod", + "node": "node-1", + "add_resource_metadata": mapstr.M{ + "namespace.enabled": false, + }, + "hints.enabled": true, + }, + expectedNil: false, + name: "add_resource_metadata.namespace disabled and hints enabled.", + msg: "Watcher should not be nil.", + }, + { + cfg: mapstr.M{ + "resource": "pod", + "node": "node-1", + "add_resource_metadata": mapstr.M{ + "namespace.enabled": true, + }, + "hints.enabled": false, + "builders": []mapstr.M{ + { + "mock": mapstr.M{}, + }, + }, + }, + expectedNil: false, + name: "add_resource_metadata.namespace enabled and hints disabled.", + msg: "Watcher should not be nil.", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + config := conf.MustNewConfigFrom(&test.cfg) + + eventer, err := NewPodEventer(uuid, config, client, nil) + if err != nil { + t.Fatal(err) + } + + namespaceWatcher := eventer.(*pod).namespaceWatcher + nodeWatcher := eventer.(*pod).nodeWatcher + + if test.expectedNil { + assert.Equalf(t, nil, namespaceWatcher, "Namespace "+test.msg) + assert.Equalf(t, nil, nodeWatcher, "Node "+test.msg) + } else { + assert.NotEqualf(t, nil, namespaceWatcher, "Namespace "+test.msg) + assert.NotEqualf(t, nil, nodeWatcher, "Node "+test.msg) + } + }) + } +} + func TestNamespacePodUpdater(t *testing.T) { pod := func(name, namespace string) *kubernetes.Pod { return &kubernetes.Pod{ diff --git a/libbeat/autodiscover/providers/kubernetes/service.go b/libbeat/autodiscover/providers/kubernetes/service.go index 5a0c6b3cc3f5..e1a28253a3f7 100644 --- a/libbeat/autodiscover/providers/kubernetes/service.go +++ b/libbeat/autodiscover/providers/kubernetes/service.go @@ -67,19 +67,24 @@ func NewServiceEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publis return nil, fmt.Errorf("couldn't create watcher for %T due to error %w", &kubernetes.Service{}, err) } + metaConf := config.AddResourceMetadata + var namespaceMeta metadata.MetaGen + var options kubernetes.WatchOptions var namespaceWatcher kubernetes.Watcher + if metaConf.Namespace.Enabled() || config.Hints.Enabled() { + options = kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + Namespace: config.Namespace, + } + namespaceWatcher, err = kubernetes.NewNamedWatcher("namespace", client, &kubernetes.Namespace{}, options, nil) + if err != nil { + return nil, fmt.Errorf("couldn't create watcher for %T due to error %w", &kubernetes.Namespace{}, err) + } - metaConf := metadata.GetDefaultResourceMetadataConfig() - namespaceWatcher, err = kubernetes.NewNamedWatcher("namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - Namespace: config.Namespace, - }, nil) - if err != nil { - return nil, fmt.Errorf("couldn't create watcher for %T due to error %w", &kubernetes.Namespace{}, err) - } + namespaceMeta = metadata.NewNamespaceMetadataGenerator(metaConf.Namespace, namespaceWatcher.Store(), client) - namespaceMeta = metadata.NewNamespaceMetadataGenerator(metaConf.Namespace, namespaceWatcher.Store(), client) + } p := &service{ config: config, diff --git a/libbeat/autodiscover/providers/kubernetes/service_test.go b/libbeat/autodiscover/providers/kubernetes/service_test.go index 510ac6ebd0d4..4dff37ad1413 100644 --- a/libbeat/autodiscover/providers/kubernetes/service_test.go +++ b/libbeat/autodiscover/providers/kubernetes/service_test.go @@ -432,6 +432,90 @@ func TestEmitEvent_Service(t *testing.T) { } } +func TestServiceEventer_NamespaceWatcher(t *testing.T) { + client := k8sfake.NewSimpleClientset() + uuid, err := uuid.NewV4() + if err != nil { + t.Fatal(err) + } + + tests := []struct { + cfg mapstr.M + expectedNil bool + name string + msg string + }{ + { + cfg: mapstr.M{ + "resource": "service", + "node": "node-1", + "add_resource_metadata": mapstr.M{ + "namespace.enabled": false, + }, + "hints.enabled": false, + "builders": []mapstr.M{ + { + "mock": mapstr.M{}, + }, + }, + }, + expectedNil: true, + name: "add_resource_metadata.namespace disabled and hints disabled.", + msg: "Namespace watcher should be nil.", + }, + { + cfg: mapstr.M{ + "resource": "service", + "node": "node-1", + "add_resource_metadata": mapstr.M{ + "namespace.enabled": false, + }, + "hints.enabled": true, + }, + expectedNil: false, + name: "add_resource_metadata.namespace disabled and hints enabled.", + msg: "Namespace watcher should not be nil.", + }, + { + cfg: mapstr.M{ + "resource": "service", + "node": "node-1", + "add_resource_metadata": mapstr.M{ + "namespace.enabled": true, + }, + "hints.enabled": false, + "builders": []mapstr.M{ + { + "mock": mapstr.M{}, + }, + }, + }, + expectedNil: false, + name: "add_resource_metadata.namespace enabled and hints disabled.", + msg: "Namespace watcher should not be nil.", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + config := conf.MustNewConfigFrom(&test.cfg) + + eventer, err := NewServiceEventer(uuid, config, client, nil) + if err != nil { + t.Fatal(err) + } + + namespaceWatcher := eventer.(*service).namespaceWatcher + + if test.expectedNil { + assert.Equalf(t, nil, namespaceWatcher, test.msg) + } else { + assert.NotEqualf(t, nil, namespaceWatcher, test.msg) + } + }) + } +} + func NewMockServiceEventerManager(svc *service) EventManager { em := &eventerManager{} em.eventer = svc