Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Added Cisco Meraki module {pull}40836[40836]
- Added Palo Alto Networks module {pull}40686[40686]
- Restore docker.network.in.* and docker.network.out.* fields in docker module {pull}40968[40968]
- Only watch metadata for ReplicaSets in metricbeat k8s module {pull}41289[41289]

*Metricbeat*

Expand Down
187 changes: 126 additions & 61 deletions metricbeat/module/kubernetes/util/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ import (
"sync"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/apimachinery/pkg/runtime/schema"

k8sclient "k8s.io/client-go/kubernetes"
k8sclientmeta "k8s.io/client-go/metadata"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -304,6 +309,7 @@ func createWatcher(
resource kubernetes.Resource,
options kubernetes.WatchOptions,
client k8sclient.Interface,
metadataClient k8sclientmeta.Interface,
resourceWatchers *Watchers,
namespace string,
extraWatcher bool) (bool, error) {
Expand Down Expand Up @@ -355,9 +361,27 @@ func createWatcher(
if isNamespaced(resourceName) {
options.Namespace = namespace
}
watcher, err := kubernetes.NewNamedWatcher(resourceName, client, resource, options, nil)
var (
watcher kubernetes.Watcher
err error
)
switch resource.(type) {
// use a metadata informer for ReplicaSets, as we only need their metadata
case *kubernetes.ReplicaSet:
watcher, err = kubernetes.NewNamedMetadataWatcher(
"resource_metadata_enricher_rs",
client,
metadataClient,
schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"},
options,
nil,
transformReplicaSetMetadata,
)
default:
watcher, err = kubernetes.NewNamedWatcher(resourceName, client, resource, options, nil)
}
if err != nil {
return false, err
return false, fmt.Errorf("error creating watcher for %T: %w", resource, err)
}

resourceMetaWatcher = &metaWatcher{
Expand Down Expand Up @@ -450,6 +474,7 @@ func removeFromMetricsetsUsing(resourceName string, notUsingName string, resourc
// createAllWatchers creates all the watchers required by a metricset
func createAllWatchers(
client k8sclient.Interface,
metadataClient k8sclientmeta.Interface,
metricsetName string,
resourceName string,
nodeScope bool,
Expand All @@ -469,7 +494,7 @@ func createAllWatchers(
// Create the main watcher for the given resource.
// For example pod metricset's main watcher will be pod watcher.
// If it fails, we return an error, so we can stop the extra watchers from creating.
created, err := createWatcher(resourceName, res, *options, client, resourceWatchers, config.Namespace, false)
created, err := createWatcher(resourceName, res, *options, client, metadataClient, resourceWatchers, config.Namespace, false)
if err != nil {
return fmt.Errorf("error initializing Kubernetes watcher %s, required by %s: %w", resourceName, metricsetName, err)
} else if created {
Expand All @@ -484,7 +509,7 @@ func createAllWatchers(
for _, extra := range extraWatchers {
extraRes := getResource(extra)
if extraRes != nil {
created, err = createWatcher(extra, extraRes, *options, client, resourceWatchers, config.Namespace, true)
created, err = createWatcher(extra, extraRes, *options, client, metadataClient, resourceWatchers, config.Namespace, true)
if err != nil {
log.Errorf("Error initializing Kubernetes watcher %s, required by %s: %s", extra, metricsetName, err)
} else {
Expand Down Expand Up @@ -620,11 +645,16 @@ func NewResourceMetadataEnricher(
log.Errorf("Error creating Kubernetes client: %s", err)
return &nilEnricher{}
}
metadataClient, err := kubernetes.GetKubernetesMetadataClient(config.KubeConfig, config.KubeClientOptions)
if err != nil {
log.Errorf("Error creating Kubernetes client: %s", err)
return &nilEnricher{}
}

metricsetName := base.Name()
resourceName := getResourceName(metricsetName)
// Create all watchers needed for this metricset
err = createAllWatchers(client, metricsetName, resourceName, nodeScope, config, log, resourceWatchers)
err = createAllWatchers(client, metadataClient, metricsetName, resourceName, nodeScope, config, log, resourceWatchers)
if err != nil {
log.Errorf("Error starting the watchers: %s", err)
return &nilEnricher{}
Expand All @@ -650,61 +680,7 @@ func NewResourceMetadataEnricher(
// It is responsible for generating the metadata for a detected resource by executing the metadata generators Generate method.
// It is a common handler for all resource watchers. The kind of resource(e.g. pod or deployment) is checked inside the function.
// It returns a map of a resource identifier(i.e. namespace-resource_name) as key and the metadata as value.
updateFunc := func(r kubernetes.Resource) map[string]mapstr.M {
accessor, _ := meta.Accessor(r)
id := accessor.GetName()
namespace := accessor.GetNamespace()
if namespace != "" {
id = join(namespace, id)
}

switch r := r.(type) {
case *kubernetes.Pod:
return map[string]mapstr.M{id: specificMetaGen.Generate(r)}

case *kubernetes.Node:
nodeName := r.GetObjectMeta().GetName()
metrics := NewNodeMetrics()
if cpu, ok := r.Status.Capacity["cpu"]; ok {
if q, err := resource.ParseQuantity(cpu.String()); err == nil {
metrics.CoresAllocatable = NewFloat64Metric(float64(q.MilliValue()) / 1000)
}
}
if memory, ok := r.Status.Capacity["memory"]; ok {
if q, err := resource.ParseQuantity(memory.String()); err == nil {
metrics.MemoryAllocatable = NewFloat64Metric(float64(q.Value()))
}
}
nodeStore, _ := metricsRepo.AddNodeStore(nodeName)
nodeStore.SetNodeMetrics(metrics)

return map[string]mapstr.M{id: generalMetaGen.Generate(NodeResource, r)}
case *kubernetes.Deployment:
return map[string]mapstr.M{id: generalMetaGen.Generate(DeploymentResource, r)}
case *kubernetes.Job:
return map[string]mapstr.M{id: generalMetaGen.Generate(JobResource, r)}
case *kubernetes.CronJob:
return map[string]mapstr.M{id: generalMetaGen.Generate(CronJobResource, r)}
case *kubernetes.Service:
return map[string]mapstr.M{id: specificMetaGen.Generate(r)}
case *kubernetes.StatefulSet:
return map[string]mapstr.M{id: generalMetaGen.Generate(StatefulSetResource, r)}
case *kubernetes.Namespace:
return map[string]mapstr.M{id: generalMetaGen.Generate(NamespaceResource, r)}
case *kubernetes.ReplicaSet:
return map[string]mapstr.M{id: generalMetaGen.Generate(ReplicaSetResource, r)}
case *kubernetes.DaemonSet:
return map[string]mapstr.M{id: generalMetaGen.Generate(DaemonSetResource, r)}
case *kubernetes.PersistentVolume:
return map[string]mapstr.M{id: generalMetaGen.Generate(PersistentVolumeResource, r)}
case *kubernetes.PersistentVolumeClaim:
return map[string]mapstr.M{id: generalMetaGen.Generate(PersistentVolumeClaimResource, r)}
case *kubernetes.StorageClass:
return map[string]mapstr.M{id: generalMetaGen.Generate(StorageClassResource, r)}
default:
return map[string]mapstr.M{id: generalMetaGen.Generate(r.GetObjectKind().GroupVersionKind().Kind, r)}
}
}
updateFunc := getEventMetadataFunc(log, generalMetaGen, specificMetaGen, metricsRepo)

// deleteFunc to be used as the resource watcher's delete handler.
// The deleteFunc is executed when a watcher is triggered for a resource deletion(e.g. pod deleted).
Expand Down Expand Up @@ -788,10 +764,15 @@ func NewContainerMetadataEnricher(
log.Errorf("Error creating Kubernetes client: %s", err)
return &nilEnricher{}
}
metadataClient, err := kubernetes.GetKubernetesMetadataClient(config.KubeConfig, config.KubeClientOptions)
if err != nil {
log.Errorf("Error creating Kubernetes client: %s", err)
return &nilEnricher{}
}

metricsetName := base.Name()

err = createAllWatchers(client, metricsetName, PodResource, nodeScope, config, log, resourceWatchers)
err = createAllWatchers(client, metadataClient, metricsetName, PodResource, nodeScope, config, log, resourceWatchers)
if err != nil {
log.Errorf("Error starting the watchers: %s", err)
return &nilEnricher{}
Expand Down Expand Up @@ -1213,3 +1194,87 @@ func AddClusterECSMeta(base mb.BaseMetricSet) mapstr.M {
}
return ecsClusterMeta
}

// transformReplicaSetMetadata ensures that the PartialObjectMetadata resources we get from a metadata watcher
// can be correctly interpreted by the update function returned by getEventMetadataFunc.
// This really just involves adding missing type information.
func transformReplicaSetMetadata(obj interface{}) (interface{}, error) {
old, ok := obj.(*metav1.PartialObjectMetadata)
if !ok {
return nil, fmt.Errorf("obj of type %T neither a ReplicaSet nor a PartialObjectMetadata", obj)
}
old.TypeMeta = metav1.TypeMeta{
APIVersion: "apps/v1",
Kind: "ReplicaSet",
}
return old, nil
}

// getEventMetadataFunc returns a function that takes a kubernetes Resource as an argument and returns metadata
// that can directly be used for event enrichment.
// This function is intended to be used as the resource watchers add and update handler.
func getEventMetadataFunc(
logger *logp.Logger,
generalMetaGen *metadata.Resource,
specificMetaGen metadata.MetaGen,
metricsRepo *MetricsRepo,
) func(r kubernetes.Resource) map[string]mapstr.M {
return func(r kubernetes.Resource) map[string]mapstr.M {
accessor, accErr := meta.Accessor(r)
if accErr != nil {
logger.Errorf("Error creating accessor: %s", accErr)
}
id := accessor.GetName()
namespace := accessor.GetNamespace()
if namespace != "" {
id = join(namespace, id)
}

switch r := r.(type) {
case *kubernetes.Pod:
return map[string]mapstr.M{id: specificMetaGen.Generate(r)}

case *kubernetes.Node:
nodeName := r.GetObjectMeta().GetName()
metrics := NewNodeMetrics()
if cpu, ok := r.Status.Capacity["cpu"]; ok {
if q, err := resource.ParseQuantity(cpu.String()); err == nil {
metrics.CoresAllocatable = NewFloat64Metric(float64(q.MilliValue()) / 1000)
}
}
if memory, ok := r.Status.Capacity["memory"]; ok {
if q, err := resource.ParseQuantity(memory.String()); err == nil {
metrics.MemoryAllocatable = NewFloat64Metric(float64(q.Value()))
}
}
nodeStore, _ := metricsRepo.AddNodeStore(nodeName)
nodeStore.SetNodeMetrics(metrics)

return map[string]mapstr.M{id: generalMetaGen.Generate(NodeResource, r)}
case *kubernetes.Deployment:
return map[string]mapstr.M{id: generalMetaGen.Generate(DeploymentResource, r)}
case *kubernetes.Job:
return map[string]mapstr.M{id: generalMetaGen.Generate(JobResource, r)}
case *kubernetes.CronJob:
return map[string]mapstr.M{id: generalMetaGen.Generate(CronJobResource, r)}
case *kubernetes.Service:
return map[string]mapstr.M{id: specificMetaGen.Generate(r)}
case *kubernetes.StatefulSet:
return map[string]mapstr.M{id: generalMetaGen.Generate(StatefulSetResource, r)}
case *kubernetes.Namespace:
return map[string]mapstr.M{id: generalMetaGen.Generate(NamespaceResource, r)}
case *kubernetes.ReplicaSet:
return map[string]mapstr.M{id: generalMetaGen.Generate(ReplicaSetResource, r)}
case *kubernetes.DaemonSet:
return map[string]mapstr.M{id: generalMetaGen.Generate(DaemonSetResource, r)}
case *kubernetes.PersistentVolume:
return map[string]mapstr.M{id: generalMetaGen.Generate(PersistentVolumeResource, r)}
case *kubernetes.PersistentVolumeClaim:
return map[string]mapstr.M{id: generalMetaGen.Generate(PersistentVolumeClaimResource, r)}
case *kubernetes.StorageClass:
return map[string]mapstr.M{id: generalMetaGen.Generate(StorageClassResource, r)}
default:
return map[string]mapstr.M{id: generalMetaGen.Generate(r.GetObjectKind().GroupVersionKind().Kind, r)}
}
}
}
Loading