Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions x-pack/agent/_meta/agent.docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,11 @@ retry:
# With 30s delay and 3 retries: 30, 60, 120s
# Default is false
exponential: false

monitoring:
# enabled turns on monitoring of running processes
enabled: false
# enables log monitoring
logs: false
# enables metrics monitoring
metrics: false
8 changes: 8 additions & 0 deletions x-pack/agent/_meta/agent.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,11 @@ retry:
# With 30s delay and 3 retries: 30, 60, 120s
# Default is false
exponential: false

monitoring:
# enabled turns on monitoring of running processes
enabled: false
# enables log monitoring
logs: false
# enables metrics monitoring
metrics: false
8 changes: 8 additions & 0 deletions x-pack/agent/_meta/common.p2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,11 @@ retry:
# With 30s delay and 3 retries: 30, 60, 120s
# Default is false
exponential: false

monitoring:
# enabled turns on monitoring of running processes
enabled: false
# enables log monitoring
logs: false
# enables metrics monitoring
metrics: false
8 changes: 8 additions & 0 deletions x-pack/agent/_meta/common.reference.p2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,11 @@ retry:
# With 30s delay and 3 retries: 30, 60, 120s
# Default is false
exponential: false

monitoring:
# enabled turns on monitoring of running processes
enabled: false
# enables log monitoring
logs: false
# enables metrics monitoring
metrics: false
8 changes: 8 additions & 0 deletions x-pack/agent/agent.docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,11 @@ retry:
# With 30s delay and 3 retries: 30, 60, 120s
# Default is false
exponential: false

monitoring:
# enabled turns on monitoring of running processes
enabled: false
# enables log monitoring
logs: false
# enables metrics monitoring
metrics: false
8 changes: 8 additions & 0 deletions x-pack/agent/agent.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,11 @@ retry:
# With 30s delay and 3 retries: 30, 60, 120s
# Default is false
exponential: false

monitoring:
# enabled turns on monitoring of running processes
enabled: false
# enables log monitoring
logs: false
# enables metrics monitoring
metrics: false
8 changes: 8 additions & 0 deletions x-pack/agent/agent.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,11 @@ retry:
# With 30s delay and 3 retries: 30, 60, 120s
# Default is false
exponential: false

monitoring:
# enabled turns on monitoring of running processes
enabled: false
# enables log monitoring
logs: false
# enables metrics monitoring
metrics: false
13 changes: 12 additions & 1 deletion x-pack/agent/pkg/agent/application/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (
"github.com/elastic/beats/x-pack/agent/pkg/core/logger"
)

func emitter(log *logger.Logger, router *router) emitterFunc {
type decoratorFunc = func(string, *transpiler.AST, []program.Program) ([]program.Program, error)

func emitter(log *logger.Logger, router *router, decorators ...decoratorFunc) emitterFunc {
return func(files []string) error {
c, err := config.LoadFiles(files...)
if err != nil {
Expand All @@ -41,6 +43,15 @@ func emitter(log *logger.Logger, router *router) emitterFunc {
return err
}

for _, decorator := range decorators {
for outputType, ptr := range programsToRun {
programsToRun[outputType], err = decorator(outputType, ast, ptr)
if err != nil {
return err
}
}
}

return router.Dispatch(ast.HashStr(), programsToRun)
}
}
2 changes: 1 addition & 1 deletion x-pack/agent/pkg/agent/application/local_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func newLocal(
}

discover := discoverer(pathConfigFile, c.Path)
emit := emitter(log, router)
emit := emitter(log, router, injectMonitoring)

var cfgSource source
if !c.Reload.Enabled {
Expand Down
70 changes: 70 additions & 0 deletions x-pack/agent/pkg/agent/application/monitoring_decorator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package application

import (
"github.com/elastic/beats/x-pack/agent/pkg/agent/program"
"github.com/elastic/beats/x-pack/agent/pkg/agent/transpiler"
)

const (
monitoringName = "FLEET_MONITORING"
programsKey = "programs"
monitoringKey = "monitoring"
monitoringOutputKey = "monitoring.elasticsearch"
enabledKey = "monitoring.enabled"
outputKey = "output"
outputsKey = "outputs"
typeKey = "type"
)

func injectMonitoring(outputGroup string, rootAst *transpiler.AST, programsToRun []program.Program) ([]program.Program, error) {
var err error
monitoringProgram := program.Program{
Spec: program.Spec{
Name: monitoringName,
Cmd: monitoringName,
},
}

var config map[string]interface{}

if _, found := transpiler.Lookup(rootAst, monitoringKey); !found {
config = make(map[string]interface{})
config[enabledKey] = false
} else {
ast := rootAst.Clone()
if err := getMonitoringRule(outputGroup).Apply(ast); err != nil {
return programsToRun, err
}

config, err = ast.Map()
if err != nil {
return programsToRun, err
}

programList := make([]string, 0, len(programsToRun))
for _, p := range programsToRun {
programList = append(programList, p.Spec.Cmd)
}
// making program list part of the config
// so it will get regenerated with every change
config[programsKey] = programList
}

monitoringProgram.Config, err = transpiler.NewAST(config)
if err != nil {
return programsToRun, err
}

return append(programsToRun, monitoringProgram), nil
}

func getMonitoringRule(outputName string) *transpiler.RuleList {
return transpiler.NewRuleList(
transpiler.Copy(monitoringOutputKey, outputKey),
transpiler.Filter(monitoringKey, programsKey, outputKey),
)
}
183 changes: 183 additions & 0 deletions x-pack/agent/pkg/agent/application/monitoring_decorator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package application

import (
"testing"

"github.com/elastic/beats/x-pack/agent/pkg/agent/program"
"github.com/elastic/beats/x-pack/agent/pkg/agent/transpiler"
)

func TestMonitoringInjection(t *testing.T) {
ast, err := transpiler.NewAST(inputConfigMap)
if err != nil {
t.Fatal(err)
}

programsToRun, err := program.Programs(ast)
if err != nil {
t.Fatal(err)
}

GROUPLOOP:
for group, ptr := range programsToRun {
programsCount := len(ptr)
newPtr, err := injectMonitoring(group, ast, ptr)
if err != nil {
t.Error(err)
continue GROUPLOOP
}

if programsCount == len(newPtr) {
t.Errorf("incorrect programs to run count, expected: %d, got %d", programsCount+1, len(newPtr))
continue GROUPLOOP
}

for _, p := range newPtr {
if p.Spec.Name != monitoringName {
continue
}

cm, err := p.Config.Map()
if err != nil {
t.Error(err)
continue GROUPLOOP
}

outputCfg, found := cm[outputKey]
if !found {
t.Errorf("output not found for '%s'", group)
continue GROUPLOOP
}

outputMap, ok := outputCfg.(map[string]interface{})
if !ok {
t.Errorf("output is not a map for '%s'", group)
continue GROUPLOOP
}

esCfg, found := outputMap["elasticsearch"]
if !found {
t.Errorf("elasticsearch output not found for '%s'", group)
continue GROUPLOOP
}

esMap, ok := esCfg.(map[string]interface{})
if !ok {
t.Errorf("output.elasticsearch is not a map for '%s'", group)
continue GROUPLOOP
}

if uname, found := esMap["username"]; !found {
t.Errorf("output.elasticsearch.username output not found for '%s'", group)
continue GROUPLOOP
} else if uname != "monitoring-uname" {
t.Errorf("output.elasticsearch.username has incorrect value expected '%s', got '%s for %s", "monitoring-uname", uname, group)
continue GROUPLOOP
}
}
}
}

var inputConfigMap = map[string]interface{}{
"monitoring": map[string]interface{}{
"enabled": true,
"logs": true,
"metrics": true,
"elasticsearch": map[string]interface{}{
"index_name": "general",
"pass": "xxx",
"url": "xxxxx",
"username": "monitoring-uname",
},
},
"outputs": map[string]interface{}{
"default": map[string]interface{}{
"index_name": "general",
"pass": "xxx",
"type": "elasticsearch",
"url": "xxxxx",
"username": "xxx",
},
"infosec1": map[string]interface{}{
"pass": "xxx",
"spool": map[string]interface{}{
"file": "${path.data}/spool.dat",
},
"type": "elasticsearch",
"url": "xxxxx",
"username": "xxx",
},
},
"streams": []interface{}{
map[string]interface{}{
"type": "log",
"path": "/xxxx",
"processors": []interface{}{
map[string]interface{}{
"dissect": map[string]interface{}{
"tokenizer": "---",
},
},
},
"output": map[string]interface{}{
"override": map[string]interface{}{
"index_name": "my_service_logs",
"ingest_pipeline": "process_logs",
},
},
},
map[string]interface{}{
"type": "metric/system",
"username": "xxxx",
"pass": "yyy",
"output": map[string]interface{}{
"index_name": "mysql_metrics",
"use_output": "infosec1",
},
},
},
}

// const inputConfig = `outputs:
// default:
// index_name: general
// pass: xxx
// type: es
// url: xxxxx
// username: xxx
// infosec1:
// pass: xxx
// spool:
// file: "${path.data}/spool.dat"
// type: es
// url: xxxxx
// username: xxx
// streams:
// -
// output:
// override:
// index_name: my_service_logs
// ingest_pipeline: process_logs
// path: /xxxx
// processors:
// -
// dissect:
// tokenizer: "---"
// type: log
// -
// output:
// index_name: mysql_access_logs
// path: /xxxx
// type: log
// -
// output:
// index_name: mysql_metrics
// use_output: infosec1
// pass: yyy
// type: metrics/system
// username: xxxx
// `
Loading