Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions deploy/kubernetes/elastic-agent-standalone-kubernetes.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ spec:
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
securityContext:
runAsUser: 0
resources:
Expand Down Expand Up @@ -649,6 +653,34 @@ rules:
verbs:
- get
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
namespace: kube-system
name: elastic-agent
subjects:
- kind: ServiceAccount
name: elastic-agent
namespace: kube-system
roleRef:
kind: Role
name: elastic-agent
apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: elastic-agent
namespace: kube-system
labels:
k8s-app: elastic-agent
rules:
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs: ["get", "create", "update"]
---
apiVersion: v1
kind: ServiceAccount
metadata:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ spec:
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
securityContext:
runAsUser: 0
resources:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,17 @@ roleRef:
kind: ClusterRole
name: elastic-agent
apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
namespace: kube-system
name: elastic-agent
subjects:
- kind: ServiceAccount
name: elastic-agent
namespace: kube-system
roleRef:
kind: Role
name: elastic-agent
apiGroup: rbac.authorization.k8s.io
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,17 @@ rules:
- "/metrics"
verbs:
- get
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: elastic-agent
namespace: kube-system
labels:
k8s-app: elastic-agent
rules:
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs: ["get", "create", "update"]
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,4 @@
- Add k8s secrets provider for Agent {pull}24789[24789]
- Add STATE_PATH, CONFIG_PATH, LOGS_PATH to Elastic Agent docker image {pull}24817[24817]
- Add status subcommand {pull}24856[24856]
- Add leader_election provider for k8s {pull}24267[24267]
1 change: 1 addition & 0 deletions x-pack/elastic-agent/pkg/agent/cmd/include.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/env"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/host"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/kubernetes"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/kubernetesleaderelection"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/kubernetessecrets"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/local"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/localdynamic"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// TODO review the need for this
// +build linux darwin windows

package kubernetesleaderelection

// Config for kubernetes_leaderelection provider
type Config struct {
KubeConfig string `config:"kube_config"`
// Name of the leaderelection lease
LeaderLease string `config:"leader_lease"`
}

// InitDefaults initializes the default values for the config.
func (c *Config) InitDefaults() {
c.LeaderLease = "elastic-agent-cluster-leader"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package kubernetesleaderelection

import (
"context"
"os"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"

"github.com/elastic/beats/v7/libbeat/common/kubernetes"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
corecomp "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

func init() {
composable.Providers.AddContextProvider("kubernetes_leaderelection", ContextProviderBuilder)
}

type contextProvider struct {
logger *logger.Logger
config *Config
comm corecomp.ContextProviderComm
leaderElection *leaderelection.LeaderElectionConfig
cancelLeaderElection context.CancelFunc
}

// ContextProviderBuilder builds the provider.
func ContextProviderBuilder(logger *logger.Logger, c *config.Config) (corecomp.ContextProvider, error) {
var cfg Config
if c == nil {
c = config.New()
}
err := c.Unpack(&cfg)
if err != nil {
return nil, errors.New(err, "failed to unpack configuration")
}
return &contextProvider{logger, &cfg, nil, nil, nil}, nil
}

// Run runs the leaderelection provider.
func (p *contextProvider) Run(comm corecomp.ContextProviderComm) error {
client, err := kubernetes.GetKubernetesClient(p.config.KubeConfig)
if err != nil {
// info only; return nil (do nothing)
p.logger.Debugf("Kubernetes leaderelection provider skipped, unable to connect: %s", err)
return nil
}

agentInfo, err := info.NewAgentInfo()
if err != nil {
return err
}
var id string
podName, found := os.LookupEnv("POD_NAME")
if found {
id = "elastic-agent-leader-" + podName
} else {
id = "elastic-agent-leader-" + agentInfo.AgentID()
}

ns, err := kubernetes.InClusterNamespace()
if err != nil {
ns = "default"
}
lease := metav1.ObjectMeta{
Name: p.config.LeaderLease,
Namespace: ns,
}
metaUID := lease.GetObjectMeta().GetUID()
p.leaderElection = &leaderelection.LeaderElectionConfig{
Lock: &resourcelock.LeaseLock{
LeaseMeta: lease,
Client: client.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
},
},
ReleaseOnCancel: true,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
p.logger.Debugf("leader election lock GAINED, id %v", id)
p.startLeading(string(metaUID))
},
OnStoppedLeading: func() {
p.logger.Debugf("leader election lock LOST, id %v", id)
p.stopLeading(string(metaUID))
},
},
}
ctx, cancel := context.WithCancel(context.TODO())
p.cancelLeaderElection = cancel
p.comm = comm
p.startLeaderElector(ctx)

return nil
}

// startLeaderElector starts a Leader Elector in the background with the provided config
func (p *contextProvider) startLeaderElector(ctx context.Context) {
le, err := leaderelection.NewLeaderElector(*p.leaderElection)
if err != nil {
p.logger.Errorf("error while creating Leader Elector: %v", err)
}
p.logger.Debugf("Starting Leader Elector")
go le.Run(ctx)
}

func (p *contextProvider) startLeading(metaUID string) {
mapping := map[string]interface{}{
"leader": true,
}

err := p.comm.Set(mapping)
if err != nil {
p.logger.Errorf("Failed updating leaderelection status to leader TRUE: %s", err)
}
}

func (p *contextProvider) stopLeading(metaUID string) {
mapping := map[string]interface{}{
"leader": false,
}

err := p.comm.Set(mapping)
if err != nil {
p.logger.Errorf("Failed updating leaderelection status to leader FALSE: %s", err)
}
}

// Stop signals the stop channel to force the leader election loop routine to stop.
func (p *contextProvider) Stop() {
if p.cancelLeaderElection != nil {
p.cancelLeaderElection()
}
}