Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libbeat/cmd/instance/locker.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func newLocker(b *Beat) *locker {
}
}

// lock attemps to acquire a lock on the data path for the currently-running
// lock attempts to acquire a lock on the data path for the currently-running
// Beat instance. If another Beats instance already has a lock on the same data path
// an ErrAlreadyLocked error is returned.
func (l *locker) lock() error {
Expand Down
3 changes: 2 additions & 1 deletion x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,5 @@
- Add new --enroll-delay option for install and enroll commands. {pull}27118[27118]
- Add link to troubleshooting guide on fatal exits. {issue}26367[26367] {pull}27236[27236]
- Agent now adapts the beats queue size based on output settings. {issue}26638[26638] {pull}27429[27429]
- Support ephemeral containers in Kubernetes dynamic provider. {issue}#27020[#27020] {pull}27707[27707]
- Support ephemeral containers in Kubernetes dynamic provider. {issue}27020[#27020] {pull}27707[27707]
- Add complete k8s metadata through composable provider. {pull}27691[27691]
14 changes: 14 additions & 0 deletions x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package kubernetes
import (
"time"

"github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata"
"github.com/elastic/beats/v7/libbeat/logp"
)

Expand All @@ -25,6 +26,16 @@ type Config struct {

// Needed when resource is a Pod or Node
Node string `config:"node"`

AddResourceMetadata *metadata.AddResourceMetadataConfig `config:"add_resource_metadata"`
IncludeLabels []string `config:"include_labels"`
ExcludeLabels []string `config:"exclude_labels"`

LabelsDedot bool `config:"labels.dedot"`
AnnotationsDedot bool `config:"annotations.dedot"`

// Undocumented settings, to be deprecated in favor of `drop_fields` processor:
IncludeCreatorMetadata bool `config:"include_creator_metadata"`
}

// Resources config section for resources' config blocks
Expand All @@ -44,6 +55,9 @@ func (c *Config) InitDefaults() {
c.CleanupTimeout = 60 * time.Second
c.SyncPeriod = 10 * time.Minute
c.Scope = "node"
c.IncludeCreatorMetadata = true
c.LabelsDedot = true
c.AnnotationsDedot = true
}

// Validate ensures correctness of config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,25 +46,26 @@ func DynamicProviderBuilder(logger *logger.Logger, c *config.Config) (composable
if err != nil {
return nil, errors.New(err, "failed to unpack configuration")
}

return &dynamicProvider{logger, &cfg}, nil
}

// Run runs the kubernetes context provider.
func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error {
if p.config.Resources.Pod.Enabled {
err := p.watchResource(comm, "pod", p.config)
err := p.watchResource(comm, "pod")
if err != nil {
return err
}
}
if p.config.Resources.Node.Enabled {
err := p.watchResource(comm, "node", p.config)
err := p.watchResource(comm, "node")
if err != nil {
return err
}
}
if p.config.Resources.Service.Enabled {
err := p.watchResource(comm, "service", p.config)
err := p.watchResource(comm, "service")
if err != nil {
return err
}
Expand All @@ -76,9 +77,8 @@ func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error {
// and starts watching for such resource's events.
func (p *dynamicProvider) watchResource(
comm composable.DynamicProviderComm,
resourceType string,
config *Config) error {
client, err := kubernetes.GetKubernetesClient(config.KubeConfig)
resourceType string) error {
client, err := kubernetes.GetKubernetesClient(p.config.KubeConfig)
if err != nil {
// info only; return nil (do nothing)
p.logger.Debugf("Kubernetes provider for resource %s skipped, unable to connect: %s", resourceType, err)
Expand All @@ -93,24 +93,24 @@ func (p *dynamicProvider) watchResource(
p.logger.Debugf(
"Initializing Kubernetes watcher for resource %s using node: %v",
resourceType,
config.Node)
p.config.Node)
nd := &kubernetes.DiscoverKubernetesNodeParams{
ConfigHost: config.Node,
ConfigHost: p.config.Node,
Client: client,
IsInCluster: kubernetes.IsInCluster(config.KubeConfig),
IsInCluster: kubernetes.IsInCluster(p.config.KubeConfig),
HostUtils: &kubernetes.DefaultDiscoveryUtils{},
}
config.Node, err = kubernetes.DiscoverKubernetesNode(p.logger, nd)
p.config.Node, err = kubernetes.DiscoverKubernetesNode(p.logger, nd)
if err != nil {
p.logger.Debugf("Kubernetes provider skipped, unable to discover node: %w", err)
return nil
}

} else {
config.Node = ""
p.config.Node = ""
}

watcher, err := p.newWatcher(resourceType, comm, client, config)
watcher, err := p.newWatcher(resourceType, comm, client)
if err != nil {
return errors.New(err, "couldn't create kubernetes watcher for resource %s", resourceType)
}
Expand All @@ -126,23 +126,22 @@ func (p *dynamicProvider) watchResource(
func (p *dynamicProvider) newWatcher(
resourceType string,
comm composable.DynamicProviderComm,
client k8s.Interface,
config *Config) (kubernetes.Watcher, error) {
client k8s.Interface) (kubernetes.Watcher, error) {
switch resourceType {
case "pod":
watcher, err := NewPodWatcher(comm, config, p.logger, client, p.config.Scope)
watcher, err := NewPodWatcher(comm, p.config, p.logger, client, p.config.Scope)
if err != nil {
return nil, err
}
return watcher, nil
case "node":
watcher, err := NewNodeWatcher(comm, config, p.logger, client, p.config.Scope)
watcher, err := NewNodeWatcher(comm, p.config, p.logger, client, p.config.Scope)
if err != nil {
return nil, err
}
return watcher, nil
case "service":
watcher, err := NewServiceWatcher(comm, config, p.logger, client, p.config.Scope)
watcher, err := NewServiceWatcher(comm, p.config, p.logger, client, p.config.Scope)
if err != nil {
return nil, err
}
Expand Down
61 changes: 37 additions & 24 deletions x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/kubernetes"
"github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata"
"github.com/elastic/beats/v7/libbeat/common/safemapstr"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
Expand All @@ -25,6 +26,7 @@ type node struct {
comm composable.DynamicProviderComm
scope string
config *Config
metagen metadata.MetaGen
}

type nodeData struct {
Expand All @@ -49,13 +51,25 @@ func NewNodeWatcher(
if err != nil {
return nil, errors.New(err, "couldn't create kubernetes watcher")
}
watcher.AddEventHandler(&node{logger, cfg.CleanupTimeout, comm, scope, cfg})

rawConfig, err := common.NewConfigFrom(cfg)
if err != nil {
return nil, errors.New(err, "failed to unpack configuration")
}
metaGen := metadata.NewNodeMetadataGenerator(rawConfig, watcher.Store(), client)
watcher.AddEventHandler(&node{
logger,
cfg.CleanupTimeout,
comm,
scope,
cfg,
metaGen})

return watcher, nil
}

func (n *node) emitRunning(node *kubernetes.Node) {
data := generateNodeData(node, n.config)
data := generateNodeData(node, n.config, n.metagen)
if data == nil {
return
}
Expand Down Expand Up @@ -165,7 +179,7 @@ func isNodeReady(node *kubernetes.Node) bool {
return false
}

func generateNodeData(node *kubernetes.Node, cfg *Config) *nodeData {
func generateNodeData(node *kubernetes.Node, cfg *Config, kubeMetaGen metadata.MetaGen) *nodeData {
host := getAddress(node)

// If a node doesn't have an IP then dont monitor it
Expand All @@ -178,41 +192,40 @@ func generateNodeData(node *kubernetes.Node, cfg *Config) *nodeData {
return nil
}

//TODO: add metadata here too ie -> meta := n.metagen.Generate(node)
meta := kubeMetaGen.Generate(node)
kubemetaMap, err := meta.GetValue("kubernetes")
if err != nil {
return &nodeData{}
}

// Pass annotations to all events so that it can be used in templating and by annotation builders.
annotations := common.MapStr{}
for k, v := range node.GetObjectMeta().GetAnnotations() {
safemapstr.Put(annotations, k, v)
}

labels := common.MapStr{}
for k, v := range node.GetObjectMeta().GetLabels() {
// TODO: add dedoting option
safemapstr.Put(labels, k, v)
}
// k8sMapping includes only the metadata that fall under kubernetes.*
// and these are available as dynamic vars through the provider
k8sMapping := map[string]interface{}(kubemetaMap.(common.MapStr).Clone())

mapping := map[string]interface{}{
"node": map[string]interface{}{
"uid": string(node.GetUID()),
"name": node.GetName(),
"labels": labels,
"annotations": annotations,
"ip": host,
},
}
// add annotations to be discoverable by templates
k8sMapping["annotations"] = annotations

processors := []map[string]interface{}{
{
processors := []map[string]interface{}{}
// meta map includes metadata that go under kubernetes.*
// but also other ECS fields like orchestrator.*
for field, metaMap := range meta {
processor := map[string]interface{}{
"add_fields": map[string]interface{}{
"fields": mapping,
"target": "kubernetes",
"fields": metaMap,
"target": field,
},
},
}
processors = append(processors, processor)
}
return &nodeData{
node: node,
mapping: mapping,
mapping: k8sMapping,
processors: processors,
}
}
100 changes: 83 additions & 17 deletions x-pack/elastic-agent/pkg/composable/providers/kubernetes/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package kubernetes
import (
"testing"

"github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata"

"github.com/elastic/beats/v7/libbeat/common"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -41,32 +43,96 @@ func TestGenerateNodeData(t *testing.T) {
},
}

data := generateNodeData(node, &Config{})
data := generateNodeData(node, &Config{}, &nodeMeta{})

mapping := map[string]interface{}{
"node": map[string]interface{}{
"node": common.MapStr{
"uid": string(node.GetUID()),
"name": node.GetName(),
"labels": common.MapStr{
"foo": "bar",
},
"annotations": common.MapStr{
"baz": "ban",
},
"ip": "node1",
"ip": "node1",
},
"annotations": common.MapStr{
"baz": "ban",
},
"labels": common.MapStr{
"foo": "bar",
},
}

processors := []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"fields": mapping,
"target": "kubernetes",
},
processors := map[string]interface{}{
"orchestrator": common.MapStr{
"cluster": common.MapStr{
"name": "devcluster",
"url": "8.8.8.8:9090"},
}, "kubernetes": common.MapStr{
"labels": common.MapStr{"foo": "bar"},
"annotations": common.MapStr{"baz": "ban"},
"node": common.MapStr{
"ip": "node1",
"name": "testnode",
"uid": "005f3b90-4b9d-12f8-acf0-31020a840133"},
},
}

assert.Equal(t, node, data.node)
assert.Equal(t, mapping, data.mapping)
assert.Equal(t, processors, data.processors)
for _, v := range data.processors {
k := v["add_fields"].(map[string]interface{})
target := k["target"].(string)
fields := k["fields"]
assert.Equal(t, processors[target], fields)
}
}

type nodeMeta struct{}

// Generate generates node metadata from a resource object
// Metadata map is in the following form:
// {
// "kubernetes": {},
// "some.ecs.field": "asdf"
// }
// All Kubernetes fields that need to be stored under kubernetes. prefix are populated by
// GenerateK8s method while fields that are part of ECS are generated by GenerateECS method
func (n *nodeMeta) Generate(obj kubernetes.Resource, opts ...metadata.FieldOptions) common.MapStr {
ecsFields := n.GenerateECS(obj)
meta := common.MapStr{
"kubernetes": n.GenerateK8s(obj, opts...),
}
meta.DeepUpdate(ecsFields)
return meta
}

// GenerateECS generates node ECS metadata from a resource object
func (n *nodeMeta) GenerateECS(obj kubernetes.Resource) common.MapStr {
return common.MapStr{
"orchestrator": common.MapStr{
"cluster": common.MapStr{
"name": "devcluster",
"url": "8.8.8.8:9090",
},
},
}
}

// GenerateK8s generates node metadata from a resource object
func (n *nodeMeta) GenerateK8s(obj kubernetes.Resource, opts ...metadata.FieldOptions) common.MapStr {
k8sNode := obj.(*kubernetes.Node)
return common.MapStr{
"node": common.MapStr{
"uid": string(k8sNode.GetUID()),
"name": k8sNode.GetName(),
"ip": "node1",
},
"labels": common.MapStr{
"foo": "bar",
},
"annotations": common.MapStr{
"baz": "ban",
},
}
}

// GenerateFromName generates node metadata from a node name
func (n *nodeMeta) GenerateFromName(name string, opts ...metadata.FieldOptions) common.MapStr {
return nil
}
Loading