Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ issues:
- text: "imported and not used"
linters:
- typecheck
- text: "previous case"
linters:
- typecheck
# From mage we are printing to the console to ourselves
- path: (.*magefile.go|.*dev-tools/mage/.*)
linters: forbidigo
Expand Down
155 changes: 94 additions & 61 deletions metricbeat/module/kubernetes/util/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
k8sclientmeta "k8s.io/client-go/metadata"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
k8sresource "k8s.io/apimachinery/pkg/api/resource"

"github.com/elastic/elastic-agent-autodiscover/kubernetes"
"github.com/elastic/elastic-agent-autodiscover/kubernetes/metadata"
Expand Down Expand Up @@ -101,7 +101,8 @@ type metaWatcher struct {

metricsetsUsing []string // list of metricsets using this shared watcher(e.g. pod, container, state_pod)

enrichers map[string]*enricher // map of enrichers using this watcher. The key is the metricset name. Each metricset has its own enricher
enrichers map[string]*enricher // map of enrichers using this watcher. The key is the metricset name. Each metricset has its own enricher
metricsRepo *MetricsRepo // used to update container metrics derived from metadata, like resource limits

nodeScope bool // whether this watcher should watch for resources in current node or in whole cluster
restartWatcher kubernetes.Watcher // whether this watcher needs a restart. Only relevant in leader nodes due to metricsets with different nodescope(pod, state_pod)
Expand Down Expand Up @@ -311,6 +312,7 @@ func createWatcher(
client k8sclient.Interface,
metadataClient k8sclientmeta.Interface,
resourceWatchers *Watchers,
metricsRepo *MetricsRepo,
namespace string,
extraWatcher bool) (bool, error) {

Expand Down Expand Up @@ -388,22 +390,73 @@ func createWatcher(
watcher: watcher,
started: false, // not started yet
enrichers: make(map[string]*enricher),
metricsRepo: metricsRepo,
metricsetsUsing: make([]string, 0),
restartWatcher: nil,
nodeScope: nodeScope,
}
resourceWatchers.metaWatchersMap[resourceName] = resourceMetaWatcher

// Add event handlers to the watcher. The only action we need to do here is invalidate the enricher cache.
addEventHandlerToWatcher(resourceMetaWatcher, resourceWatchers)
addEventHandlersToWatcher(resourceMetaWatcher, resourceWatchers)

return true, nil
}

// addEventHandlerToWatcher adds an event handler to the watcher that invalidates the cache of enrichers attached
// to the watcher.
func addEventHandlerToWatcher(metaWatcher *metaWatcher, resourceWatchers *Watchers) {
notifyFunc := func(obj interface{}) {
// addEventHandlerToWatcher adds an event handlers to the watcher that invalidate the cache of enrichers attached
// to the watcher and update container metrics on Pod change events.
func addEventHandlersToWatcher(
metaWatcher *metaWatcher,
resourceWatchers *Watchers,
) {
containerMetricsUpdateFunc := func(pod *kubernetes.Pod) {
nodeStore, _ := metaWatcher.metricsRepo.AddNodeStore(pod.Spec.NodeName)
podId := NewPodId(pod.Namespace, pod.Name)
podStore, _ := nodeStore.AddPodStore(podId)

for _, container := range append(pod.Spec.Containers, pod.Spec.InitContainers...) {
metrics := NewContainerMetrics()

if cpu, ok := container.Resources.Limits["cpu"]; ok {
if q, err := k8sresource.ParseQuantity(cpu.String()); err == nil {
metrics.CoresLimit = NewFloat64Metric(float64(q.MilliValue()) / 1000)
}
}
if memory, ok := container.Resources.Limits["memory"]; ok {
if q, err := k8sresource.ParseQuantity(memory.String()); err == nil {
metrics.MemoryLimit = NewFloat64Metric(float64(q.Value()))
}
}

containerStore, _ := podStore.AddContainerStore(container.Name)
containerStore.SetContainerMetrics(metrics)
}
}

containerMetricsDeleteFunc := func(pod *kubernetes.Pod) {
podId := NewPodId(pod.Namespace, pod.Name)
nodeStore := metaWatcher.metricsRepo.GetNodeStore(pod.Spec.NodeName)
nodeStore.DeletePodStore(podId)
}

nodeMetricsUpdateFunc := func(node *kubernetes.Node) {
nodeName := node.GetObjectMeta().GetName()
metrics := NewNodeMetrics()
if cpu, ok := node.Status.Capacity["cpu"]; ok {
if q, err := k8sresource.ParseQuantity(cpu.String()); err == nil {
metrics.CoresAllocatable = NewFloat64Metric(float64(q.MilliValue()) / 1000)
}
}
if memory, ok := node.Status.Capacity["memory"]; ok {
if q, err := k8sresource.ParseQuantity(memory.String()); err == nil {
metrics.MemoryAllocatable = NewFloat64Metric(float64(q.Value()))
}
}
nodeStore, _ := metaWatcher.metricsRepo.AddNodeStore(nodeName)
nodeStore.SetNodeMetrics(metrics)
}

clearMetadataCacheFunc := func(obj interface{}) {
enrichers := make(map[string]*enricher, len(metaWatcher.enrichers))

resourceWatchers.lock.Lock()
Expand All @@ -420,10 +473,35 @@ func addEventHandlerToWatcher(metaWatcher *metaWatcher, resourceWatchers *Watche
enricher.Unlock()
}
}

metaWatcher.watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {}, // do nothing
UpdateFunc: notifyFunc,
DeleteFunc: notifyFunc,
AddFunc: func(obj interface{}) {
switch res := obj.(type) {
case *kubernetes.Pod:
containerMetricsUpdateFunc(res)
case *kubernetes.Node:
nodeMetricsUpdateFunc(res)
}
},
UpdateFunc: func(obj interface{}) {
clearMetadataCacheFunc(obj)
switch res := obj.(type) {
case *kubernetes.Pod:
containerMetricsUpdateFunc(res)
case *kubernetes.Node:
nodeMetricsUpdateFunc(res)
}
},
DeleteFunc: func(obj interface{}) {
clearMetadataCacheFunc(obj)
switch res := obj.(type) {
case *kubernetes.Pod:
containerMetricsDeleteFunc(res)
case *kubernetes.Node:
nodeName := res.GetObjectMeta().GetName()
metaWatcher.metricsRepo.DeleteNodeStore(nodeName)
}
},
})
}

Expand Down Expand Up @@ -481,6 +559,7 @@ func createAllWatchers(
config *kubernetesConfig,
log *logp.Logger,
resourceWatchers *Watchers,
metricsRepo *MetricsRepo,
) error {
res := getResource(resourceName)
if res == nil {
Expand All @@ -494,7 +573,7 @@ func createAllWatchers(
// Create the main watcher for the given resource.
// For example pod metricset's main watcher will be pod watcher.
// If it fails, we return an error, so we can stop the extra watchers from creating.
created, err := createWatcher(resourceName, res, *options, client, metadataClient, resourceWatchers, config.Namespace, false)
created, err := createWatcher(resourceName, res, *options, client, metadataClient, resourceWatchers, metricsRepo, config.Namespace, false)
if err != nil {
return fmt.Errorf("error initializing Kubernetes watcher %s, required by %s: %w", resourceName, metricsetName, err)
} else if created {
Expand All @@ -509,7 +588,7 @@ func createAllWatchers(
for _, extra := range extraWatchers {
extraRes := getResource(extra)
if extraRes != nil {
created, err = createWatcher(extra, extraRes, *options, client, metadataClient, resourceWatchers, config.Namespace, true)
created, err = createWatcher(extra, extraRes, *options, client, metadataClient, resourceWatchers, metricsRepo, config.Namespace, true)
if err != nil {
log.Errorf("Error initializing Kubernetes watcher %s, required by %s: %s", extra, metricsetName, err)
} else {
Expand Down Expand Up @@ -654,7 +733,7 @@ func NewResourceMetadataEnricher(
metricsetName := base.Name()
resourceName := getResourceName(metricsetName)
// Create all watchers needed for this metricset
err = createAllWatchers(client, metadataClient, metricsetName, resourceName, nodeScope, config, log, resourceWatchers)
err = createAllWatchers(client, metadataClient, metricsetName, resourceName, nodeScope, config, log, resourceWatchers, metricsRepo)
if err != nil {
log.Errorf("Error starting the watchers: %s", err)
return &nilEnricher{}
Expand All @@ -680,20 +759,13 @@ func NewResourceMetadataEnricher(
// It is responsible for generating the metadata for a detected resource by executing the metadata generators Generate method.
// It is a common handler for all resource watchers. The kind of resource(e.g. pod or deployment) is checked inside the function.
// It returns a map of a resource identifier(i.e. namespace-resource_name) as key and the metadata as value.
updateFunc := getEventMetadataFunc(log, generalMetaGen, specificMetaGen, metricsRepo)
updateFunc := getEventMetadataFunc(log, generalMetaGen, specificMetaGen)

// deleteFunc to be used as the resource watcher's delete handler.
// The deleteFunc is executed when a watcher is triggered for a resource deletion(e.g. pod deleted).
// It returns the identifier of the resource.
deleteFunc := func(r kubernetes.Resource) []string {
accessor, _ := meta.Accessor(r)

switch r := r.(type) {
case *kubernetes.Node:
nodeName := r.GetObjectMeta().GetName()
metricsRepo.DeleteNodeStore(nodeName)
}

id := accessor.GetName()
namespace := accessor.GetNamespace()
if namespace != "" {
Expand Down Expand Up @@ -772,7 +844,7 @@ func NewContainerMetadataEnricher(

metricsetName := base.Name()

err = createAllWatchers(client, metadataClient, metricsetName, PodResource, nodeScope, config, log, resourceWatchers)
err = createAllWatchers(client, metadataClient, metricsetName, PodResource, nodeScope, config, log, resourceWatchers, metricsRepo)
if err != nil {
log.Errorf("Error starting the watchers: %s", err)
return &nilEnricher{}
Expand Down Expand Up @@ -802,27 +874,8 @@ func NewContainerMetadataEnricher(
mapStatuses(pod.Status.ContainerStatuses)
mapStatuses(pod.Status.InitContainerStatuses)

nodeStore, _ := metricsRepo.AddNodeStore(pod.Spec.NodeName)
podId := NewPodId(pod.Namespace, pod.Name)
podStore, _ := nodeStore.AddPodStore(podId)

for _, container := range append(pod.Spec.Containers, pod.Spec.InitContainers...) {
cmeta := mapstr.M{}
metrics := NewContainerMetrics()

if cpu, ok := container.Resources.Limits["cpu"]; ok {
if q, err := resource.ParseQuantity(cpu.String()); err == nil {
metrics.CoresLimit = NewFloat64Metric(float64(q.MilliValue()) / 1000)
}
}
if memory, ok := container.Resources.Limits["memory"]; ok {
if q, err := resource.ParseQuantity(memory.String()); err == nil {
metrics.MemoryLimit = NewFloat64Metric(float64(q.Value()))
}
}

containerStore, _ := podStore.AddContainerStore(container.Name)
containerStore.SetContainerMetrics(metrics)

if s, ok := statuses[container.Name]; ok {
// Extracting id and runtime ECS fields from ContainerID
Expand All @@ -849,9 +902,6 @@ func NewContainerMetadataEnricher(
if !ok {
base.Logger().Debugf("Error while casting event: %s", ok)
}
podId := NewPodId(pod.Namespace, pod.Name)
nodeStore := metricsRepo.GetNodeStore(pod.Spec.NodeName)
nodeStore.DeletePodStore(podId)

for _, container := range append(pod.Spec.Containers, pod.Spec.InitContainers...) {
id := join(pod.ObjectMeta.GetNamespace(), pod.GetObjectMeta().GetName(), container.Name)
Expand Down Expand Up @@ -1217,7 +1267,6 @@ func getEventMetadataFunc(
logger *logp.Logger,
generalMetaGen *metadata.Resource,
specificMetaGen metadata.MetaGen,
metricsRepo *MetricsRepo,
) func(r kubernetes.Resource) map[string]mapstr.M {
return func(r kubernetes.Resource) map[string]mapstr.M {
accessor, accErr := meta.Accessor(r)
Expand All @@ -1233,23 +1282,7 @@ func getEventMetadataFunc(
switch r := r.(type) {
case *kubernetes.Pod:
return map[string]mapstr.M{id: specificMetaGen.Generate(r)}

case *kubernetes.Node:
nodeName := r.GetObjectMeta().GetName()
metrics := NewNodeMetrics()
if cpu, ok := r.Status.Capacity["cpu"]; ok {
if q, err := resource.ParseQuantity(cpu.String()); err == nil {
metrics.CoresAllocatable = NewFloat64Metric(float64(q.MilliValue()) / 1000)
}
}
if memory, ok := r.Status.Capacity["memory"]; ok {
if q, err := resource.ParseQuantity(memory.String()); err == nil {
metrics.MemoryAllocatable = NewFloat64Metric(float64(q.Value()))
}
}
nodeStore, _ := metricsRepo.AddNodeStore(nodeName)
nodeStore.SetNodeMetrics(metrics)

return map[string]mapstr.M{id: generalMetaGen.Generate(NodeResource, r)}
case *kubernetes.Deployment:
return map[string]mapstr.M{id: generalMetaGen.Generate(DeploymentResource, r)}
Expand Down
Loading