Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.

==== Bugfixes

- Do not start namespace and node watchers on metricbeat autodiscover if `add_resource_metadata` is disabled.{pull}37181[37181]
- Fix how Prometheus histograms are calculated when percentiles are provide.{pull}36537[36537]
- Stop using `mage:import` in community beats. This was ignoring the vendorized beats directory for some mage targets, using the code available in GOPATH, this causes inconsistencies and compilation problems if the version of the code in the GOPATH is different to the vendored one. Use of `mage:import` will continue to be unsupported in custom beats till beats is migrated to go modules, or mage supports vendored dependencies. {issue}13998[13998] {pull}14162[14162]
- Metricbeat module builders call host parser only once when instantiating light modules. {pull}20149[20149]
Expand Down
54 changes: 32 additions & 22 deletions libbeat/autodiscover/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type pod struct {
func NewPodEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publish func(event []bus.Event)) (Eventer, error) {
logger := logp.NewLogger("autodiscover.pod")

var replicaSetWatcher, jobWatcher kubernetes.Watcher
var replicaSetWatcher, jobWatcher, nodeWatcher, namespaceWatcher kubernetes.Watcher

config := defaultConfig()
err := cfg.Unpack(&config)
Expand Down Expand Up @@ -96,40 +96,50 @@ func NewPodEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publish fu
return nil, fmt.Errorf("couldn't create watcher for %T due to error %w", &kubernetes.Pod{}, err)
}

options := kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Node: config.Node,
Comment thread
ChrsMark marked this conversation as resolved.
Namespace: config.Namespace,
}

metaConf := config.AddResourceMetadata
nodeWatcher, err := kubernetes.NewNamedWatcher("node", client, &kubernetes.Node{}, options, nil)
if err != nil {
logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err)

var options kubernetes.WatchOptions
if metaConf.Node.Enabled() || config.Hints.Enabled() {
options = kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Node: config.Node,
}
nodeWatcher, err = kubernetes.NewNamedWatcher("node", client, &kubernetes.Node{}, options, nil)
if err != nil {
logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err)
}
}
namespaceWatcher, err := kubernetes.NewNamedWatcher("namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err)
if metaConf.Namespace.Enabled() || config.Hints.Enabled() {
options = kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Namespace: config.Namespace,
}
namespaceWatcher, err = kubernetes.NewNamedWatcher("namespace", client, &kubernetes.Namespace{}, options, nil)
if err != nil {
logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err)
}
}

// Resource is Pod so we need to create watchers for Replicasets and Jobs that it might belongs to
// Resource is Pod, so we need to create watchers for Replicasets and Jobs that it might belong to
// in order to be able to retrieve 2nd layer Owner metadata like in case of:
// Deployment -> Replicaset -> Pod
// CronJob -> job -> Pod
if metaConf.Deployment {
replicaSetWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{
options = kubernetes.WatchOptions{
Copy link
Copy Markdown
Member

@ChrsMark ChrsMark Nov 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor suggestion since we touch this codebase: Would it make sense to use the same Namespace scope setting with the one used for the Pod watcher?

If we watch for Pods on specific Namespace then their parent Deployments and CronJobs will be on the same Namespace as well.

SyncTimeout: config.SyncPeriod,
}, nil)
}
replicaSetWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client,
&kubernetes.ReplicaSet{}, options, nil)
if err != nil {
logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.ReplicaSet{}, err)
}
}
if metaConf.CronJob {
jobWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{
options = kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
}
jobWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{},
options, nil)
if err != nil {
logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err)
}
Expand All @@ -152,12 +162,12 @@ func NewPodEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publish fu

watcher.AddEventHandler(p)

if nodeWatcher != nil && (config.Hints.Enabled() || metaConf.Node.Enabled()) {
if nodeWatcher != nil {
updater := kubernetes.NewNodePodUpdater(p.unlockedUpdate, watcher.Store(), &p.crossUpdate)
nodeWatcher.AddEventHandler(updater)
}

if namespaceWatcher != nil && (config.Hints.Enabled() || metaConf.Namespace.Enabled()) {
if namespaceWatcher != nil {
updater := kubernetes.NewNamespacePodUpdater(p.unlockedUpdate, watcher.Store(), &p.crossUpdate)
namespaceWatcher.AddEventHandler(updater)
}
Expand Down
87 changes: 87 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1978,6 +1978,93 @@ func TestPod_EmitEvent(t *testing.T) {
}
}

func TestPodEventer_Namespace_Node_Watcher(t *testing.T) {
client := k8sfake.NewSimpleClientset()
uuid, err := uuid.NewV4()
if err != nil {
t.Fatal(err)
}

tests := []struct {
cfg mapstr.M
expectedNil bool
name string
msg string
}{
{
cfg: mapstr.M{
"resource": "pod",
"node": "node-1",
"add_resource_metadata": mapstr.M{
"namespace.enabled": false,
},
"hints.enabled": false,
"builders": []mapstr.M{
{
"mock": mapstr.M{},
},
},
},
expectedNil: true,
name: "add_resource_metadata.namespace disabled and hints disabled.",
msg: "Watcher should be nil.",
},
{
cfg: mapstr.M{
"resource": "pod",
"node": "node-1",
"add_resource_metadata": mapstr.M{
"namespace.enabled": false,
},
"hints.enabled": true,
},
expectedNil: false,
name: "add_resource_metadata.namespace disabled and hints enabled.",
msg: "Watcher should not be nil.",
},
{
cfg: mapstr.M{
"resource": "pod",
"node": "node-1",
"add_resource_metadata": mapstr.M{
"namespace.enabled": true,
},
"hints.enabled": false,
"builders": []mapstr.M{
{
"mock": mapstr.M{},
},
},
},
expectedNil: false,
name: "add_resource_metadata.namespace enabled and hints disabled.",
msg: "Watcher should not be nil.",
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
config := conf.MustNewConfigFrom(&test.cfg)

eventer, err := NewPodEventer(uuid, config, client, nil)
if err != nil {
t.Fatal(err)
}

namespaceWatcher := eventer.(*pod).namespaceWatcher
nodeWatcher := eventer.(*pod).nodeWatcher

if test.expectedNil {
assert.Equalf(t, nil, namespaceWatcher, "Namespace "+test.msg)
assert.Equalf(t, nil, nodeWatcher, "Node "+test.msg)
} else {
assert.NotEqualf(t, nil, namespaceWatcher, "Namespace "+test.msg)
assert.NotEqualf(t, nil, nodeWatcher, "Node "+test.msg)
}
})
}
}

func TestNamespacePodUpdater(t *testing.T) {
pod := func(name, namespace string) *kubernetes.Pod {
return &kubernetes.Pod{
Expand Down
23 changes: 14 additions & 9 deletions libbeat/autodiscover/providers/kubernetes/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,24 @@ func NewServiceEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publis
return nil, fmt.Errorf("couldn't create watcher for %T due to error %w", &kubernetes.Service{}, err)
}

metaConf := config.AddResourceMetadata

var namespaceMeta metadata.MetaGen
var options kubernetes.WatchOptions
var namespaceWatcher kubernetes.Watcher
if metaConf.Namespace.Enabled() || config.Hints.Enabled() {
options = kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Namespace: config.Namespace,
}
namespaceWatcher, err = kubernetes.NewNamedWatcher("namespace", client, &kubernetes.Namespace{}, options, nil)
if err != nil {
return nil, fmt.Errorf("couldn't create watcher for %T due to error %w", &kubernetes.Namespace{}, err)
}

metaConf := metadata.GetDefaultResourceMetadataConfig()
namespaceWatcher, err = kubernetes.NewNamedWatcher("namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Namespace: config.Namespace,
}, nil)
if err != nil {
return nil, fmt.Errorf("couldn't create watcher for %T due to error %w", &kubernetes.Namespace{}, err)
}
namespaceMeta = metadata.NewNamespaceMetadataGenerator(metaConf.Namespace, namespaceWatcher.Store(), client)

namespaceMeta = metadata.NewNamespaceMetadataGenerator(metaConf.Namespace, namespaceWatcher.Store(), client)
}

p := &service{
config: config,
Expand Down
84 changes: 84 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,90 @@ func TestEmitEvent_Service(t *testing.T) {
}
}

func TestServiceEventer_NamespaceWatcher(t *testing.T) {
client := k8sfake.NewSimpleClientset()
uuid, err := uuid.NewV4()
if err != nil {
t.Fatal(err)
}

tests := []struct {
cfg mapstr.M
expectedNil bool
name string
msg string
}{
{
cfg: mapstr.M{
"resource": "service",
"node": "node-1",
"add_resource_metadata": mapstr.M{
"namespace.enabled": false,
},
"hints.enabled": false,
"builders": []mapstr.M{
{
"mock": mapstr.M{},
},
},
},
expectedNil: true,
name: "add_resource_metadata.namespace disabled and hints disabled.",
msg: "Namespace watcher should be nil.",
},
{
cfg: mapstr.M{
"resource": "service",
"node": "node-1",
"add_resource_metadata": mapstr.M{
"namespace.enabled": false,
},
"hints.enabled": true,
},
expectedNil: false,
name: "add_resource_metadata.namespace disabled and hints enabled.",
msg: "Namespace watcher should not be nil.",
},
{
cfg: mapstr.M{
"resource": "service",
"node": "node-1",
"add_resource_metadata": mapstr.M{
"namespace.enabled": true,
},
"hints.enabled": false,
"builders": []mapstr.M{
{
"mock": mapstr.M{},
},
},
},
expectedNil: false,
name: "add_resource_metadata.namespace enabled and hints disabled.",
msg: "Namespace watcher should not be nil.",
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
config := conf.MustNewConfigFrom(&test.cfg)

eventer, err := NewServiceEventer(uuid, config, client, nil)
if err != nil {
t.Fatal(err)
}

namespaceWatcher := eventer.(*service).namespaceWatcher

if test.expectedNil {
assert.Equalf(t, nil, namespaceWatcher, test.msg)
} else {
assert.NotEqualf(t, nil, namespaceWatcher, test.msg)
}
})
}
}

func NewMockServiceEventerManager(svc *service) EventManager {
em := &eventerManager{}
em.eventer = svc
Expand Down