diff --git a/x-pack/agent/_meta/agent.docker.yml b/x-pack/agent/_meta/agent.docker.yml index c7fc460203a3..d976cc96d044 100644 --- a/x-pack/agent/_meta/agent.docker.yml +++ b/x-pack/agent/_meta/agent.docker.yml @@ -108,3 +108,11 @@ retry: # With 30s delay and 3 retries: 30, 60, 120s # Default is false exponential: false + +monitoring: + # enabled turns on monitoring of running processes + enabled: false + # enables log monitoring + logs: false + # enables metrics monitoring + metrics: false diff --git a/x-pack/agent/_meta/agent.yml b/x-pack/agent/_meta/agent.yml index e008fa37b030..e329aaf2146a 100644 --- a/x-pack/agent/_meta/agent.yml +++ b/x-pack/agent/_meta/agent.yml @@ -108,3 +108,11 @@ retry: # With 30s delay and 3 retries: 30, 60, 120s # Default is false exponential: false + +monitoring: + # enabled turns on monitoring of running processes + enabled: false + # enables log monitoring + logs: false + # enables metrics monitoring + metrics: false diff --git a/x-pack/agent/_meta/common.p2.yml b/x-pack/agent/_meta/common.p2.yml index ed87712da640..3a845f2e028d 100644 --- a/x-pack/agent/_meta/common.p2.yml +++ b/x-pack/agent/_meta/common.p2.yml @@ -104,3 +104,11 @@ retry: # With 30s delay and 3 retries: 30, 60, 120s # Default is false exponential: false + +monitoring: + # enabled turns on monitoring of running processes + enabled: false + # enables log monitoring + logs: false + # enables metrics monitoring + metrics: false diff --git a/x-pack/agent/_meta/common.reference.p2.yml b/x-pack/agent/_meta/common.reference.p2.yml index 50c7efab08d3..df6daae3f66c 100644 --- a/x-pack/agent/_meta/common.reference.p2.yml +++ b/x-pack/agent/_meta/common.reference.p2.yml @@ -107,3 +107,11 @@ retry: # With 30s delay and 3 retries: 30, 60, 120s # Default is false exponential: false + +monitoring: + # enabled turns on monitoring of running processes + enabled: false + # enables log monitoring + logs: false + # enables metrics monitoring + metrics: false diff --git a/x-pack/agent/agent.docker.yml b/x-pack/agent/agent.docker.yml index c7fc460203a3..d976cc96d044 100644 --- a/x-pack/agent/agent.docker.yml +++ b/x-pack/agent/agent.docker.yml @@ -108,3 +108,11 @@ retry: # With 30s delay and 3 retries: 30, 60, 120s # Default is false exponential: false + +monitoring: + # enabled turns on monitoring of running processes + enabled: false + # enables log monitoring + logs: false + # enables metrics monitoring + metrics: false diff --git a/x-pack/agent/agent.reference.yml b/x-pack/agent/agent.reference.yml index 2b69e06e342f..b88858929024 100644 --- a/x-pack/agent/agent.reference.yml +++ b/x-pack/agent/agent.reference.yml @@ -112,3 +112,11 @@ retry: # With 30s delay and 3 retries: 30, 60, 120s # Default is false exponential: false + +monitoring: + # enabled turns on monitoring of running processes + enabled: false + # enables log monitoring + logs: false + # enables metrics monitoring + metrics: false diff --git a/x-pack/agent/agent.yml b/x-pack/agent/agent.yml index d8947f9b9653..16ca44a173a6 100644 --- a/x-pack/agent/agent.yml +++ b/x-pack/agent/agent.yml @@ -109,3 +109,11 @@ retry: # With 30s delay and 3 retries: 30, 60, 120s # Default is false exponential: false + +monitoring: + # enabled turns on monitoring of running processes + enabled: false + # enables log monitoring + logs: false + # enables metrics monitoring + metrics: false diff --git a/x-pack/agent/pkg/agent/application/emitter.go b/x-pack/agent/pkg/agent/application/emitter.go index 4ec9abcbf151..fabeec4b694b 100644 --- a/x-pack/agent/pkg/agent/application/emitter.go +++ b/x-pack/agent/pkg/agent/application/emitter.go @@ -15,7 +15,9 @@ import ( "github.com/elastic/beats/x-pack/agent/pkg/core/logger" ) -func emitter(log *logger.Logger, router *router) emitterFunc { +type decoratorFunc = func(string, *transpiler.AST, []program.Program) ([]program.Program, error) + +func emitter(log *logger.Logger, router *router, decorators ...decoratorFunc) emitterFunc { return func(files []string) error { c, err := config.LoadFiles(files...) if err != nil { @@ -41,6 +43,15 @@ func emitter(log *logger.Logger, router *router) emitterFunc { return err } + for _, decorator := range decorators { + for outputType, ptr := range programsToRun { + programsToRun[outputType], err = decorator(outputType, ast, ptr) + if err != nil { + return err + } + } + } + return router.Dispatch(ast.HashStr(), programsToRun) } } diff --git a/x-pack/agent/pkg/agent/application/local_mode.go b/x-pack/agent/pkg/agent/application/local_mode.go index 938d10486785..33808903fa53 100644 --- a/x-pack/agent/pkg/agent/application/local_mode.go +++ b/x-pack/agent/pkg/agent/application/local_mode.go @@ -58,7 +58,7 @@ func newLocal( } discover := discoverer(pathConfigFile, c.Path) - emit := emitter(log, router) + emit := emitter(log, router, injectMonitoring) var cfgSource source if !c.Reload.Enabled { diff --git a/x-pack/agent/pkg/agent/application/monitoring_decorator.go b/x-pack/agent/pkg/agent/application/monitoring_decorator.go new file mode 100644 index 000000000000..a008e5a0a2e5 --- /dev/null +++ b/x-pack/agent/pkg/agent/application/monitoring_decorator.go @@ -0,0 +1,70 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package application + +import ( + "github.com/elastic/beats/x-pack/agent/pkg/agent/program" + "github.com/elastic/beats/x-pack/agent/pkg/agent/transpiler" +) + +const ( + monitoringName = "FLEET_MONITORING" + programsKey = "programs" + monitoringKey = "monitoring" + monitoringOutputKey = "monitoring.elasticsearch" + enabledKey = "monitoring.enabled" + outputKey = "output" + outputsKey = "outputs" + typeKey = "type" +) + +func injectMonitoring(outputGroup string, rootAst *transpiler.AST, programsToRun []program.Program) ([]program.Program, error) { + var err error + monitoringProgram := program.Program{ + Spec: program.Spec{ + Name: monitoringName, + Cmd: monitoringName, + }, + } + + var config map[string]interface{} + + if _, found := transpiler.Lookup(rootAst, monitoringKey); !found { + config = make(map[string]interface{}) + config[enabledKey] = false + } else { + ast := rootAst.Clone() + if err := getMonitoringRule(outputGroup).Apply(ast); err != nil { + return programsToRun, err + } + + config, err = ast.Map() + if err != nil { + return programsToRun, err + } + + programList := make([]string, 0, len(programsToRun)) + for _, p := range programsToRun { + programList = append(programList, p.Spec.Cmd) + } + // making program list part of the config + // so it will get regenerated with every change + config[programsKey] = programList + } + + monitoringProgram.Config, err = transpiler.NewAST(config) + if err != nil { + return programsToRun, err + } + + return append(programsToRun, monitoringProgram), nil +} + +func getMonitoringRule(outputName string) *transpiler.RuleList { + return transpiler.NewRuleList( + transpiler.Copy(monitoringOutputKey, outputKey), + transpiler.Filter(monitoringKey, programsKey, outputKey), + ) +} diff --git a/x-pack/agent/pkg/agent/application/monitoring_decorator_test.go b/x-pack/agent/pkg/agent/application/monitoring_decorator_test.go new file mode 100644 index 000000000000..32935eca8c9d --- /dev/null +++ b/x-pack/agent/pkg/agent/application/monitoring_decorator_test.go @@ -0,0 +1,183 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package application + +import ( + "testing" + + "github.com/elastic/beats/x-pack/agent/pkg/agent/program" + "github.com/elastic/beats/x-pack/agent/pkg/agent/transpiler" +) + +func TestMonitoringInjection(t *testing.T) { + ast, err := transpiler.NewAST(inputConfigMap) + if err != nil { + t.Fatal(err) + } + + programsToRun, err := program.Programs(ast) + if err != nil { + t.Fatal(err) + } + +GROUPLOOP: + for group, ptr := range programsToRun { + programsCount := len(ptr) + newPtr, err := injectMonitoring(group, ast, ptr) + if err != nil { + t.Error(err) + continue GROUPLOOP + } + + if programsCount == len(newPtr) { + t.Errorf("incorrect programs to run count, expected: %d, got %d", programsCount+1, len(newPtr)) + continue GROUPLOOP + } + + for _, p := range newPtr { + if p.Spec.Name != monitoringName { + continue + } + + cm, err := p.Config.Map() + if err != nil { + t.Error(err) + continue GROUPLOOP + } + + outputCfg, found := cm[outputKey] + if !found { + t.Errorf("output not found for '%s'", group) + continue GROUPLOOP + } + + outputMap, ok := outputCfg.(map[string]interface{}) + if !ok { + t.Errorf("output is not a map for '%s'", group) + continue GROUPLOOP + } + + esCfg, found := outputMap["elasticsearch"] + if !found { + t.Errorf("elasticsearch output not found for '%s'", group) + continue GROUPLOOP + } + + esMap, ok := esCfg.(map[string]interface{}) + if !ok { + t.Errorf("output.elasticsearch is not a map for '%s'", group) + continue GROUPLOOP + } + + if uname, found := esMap["username"]; !found { + t.Errorf("output.elasticsearch.username output not found for '%s'", group) + continue GROUPLOOP + } else if uname != "monitoring-uname" { + t.Errorf("output.elasticsearch.username has incorrect value expected '%s', got '%s for %s", "monitoring-uname", uname, group) + continue GROUPLOOP + } + } + } +} + +var inputConfigMap = map[string]interface{}{ + "monitoring": map[string]interface{}{ + "enabled": true, + "logs": true, + "metrics": true, + "elasticsearch": map[string]interface{}{ + "index_name": "general", + "pass": "xxx", + "url": "xxxxx", + "username": "monitoring-uname", + }, + }, + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "index_name": "general", + "pass": "xxx", + "type": "elasticsearch", + "url": "xxxxx", + "username": "xxx", + }, + "infosec1": map[string]interface{}{ + "pass": "xxx", + "spool": map[string]interface{}{ + "file": "${path.data}/spool.dat", + }, + "type": "elasticsearch", + "url": "xxxxx", + "username": "xxx", + }, + }, + "streams": []interface{}{ + map[string]interface{}{ + "type": "log", + "path": "/xxxx", + "processors": []interface{}{ + map[string]interface{}{ + "dissect": map[string]interface{}{ + "tokenizer": "---", + }, + }, + }, + "output": map[string]interface{}{ + "override": map[string]interface{}{ + "index_name": "my_service_logs", + "ingest_pipeline": "process_logs", + }, + }, + }, + map[string]interface{}{ + "type": "metric/system", + "username": "xxxx", + "pass": "yyy", + "output": map[string]interface{}{ + "index_name": "mysql_metrics", + "use_output": "infosec1", + }, + }, + }, +} + +// const inputConfig = `outputs: +// default: +// index_name: general +// pass: xxx +// type: es +// url: xxxxx +// username: xxx +// infosec1: +// pass: xxx +// spool: +// file: "${path.data}/spool.dat" +// type: es +// url: xxxxx +// username: xxx +// streams: +// - +// output: +// override: +// index_name: my_service_logs +// ingest_pipeline: process_logs +// path: /xxxx +// processors: +// - +// dissect: +// tokenizer: "---" +// type: log +// - +// output: +// index_name: mysql_access_logs +// path: /xxxx +// type: log +// - +// output: +// index_name: mysql_metrics +// use_output: infosec1 +// pass: yyy +// type: metrics/system +// username: xxxx +// ` diff --git a/x-pack/agent/pkg/agent/application/stream.go b/x-pack/agent/pkg/agent/application/stream.go index 37e0cef229e8..ddb6ac1d8c4c 100644 --- a/x-pack/agent/pkg/agent/application/stream.go +++ b/x-pack/agent/pkg/agent/application/stream.go @@ -56,7 +56,7 @@ func (b *operatorStream) Execute(cfg *configRequest) error { func streamFactory(cfg *config.Config, client sender, r reporter) func(*logger.Logger, routingKey) (stream, error) { return func(log *logger.Logger, id routingKey) (stream, error) { // new operator per stream to isolate processes without using tags - operator, err := newOperator(log, cfg, r) + operator, err := newOperator(log, id, cfg, r) if err != nil { return nil, err } @@ -68,7 +68,7 @@ func streamFactory(cfg *config.Config, client sender, r reporter) func(*logger.L } } -func newOperator(log *logger.Logger, config *config.Config, r reporter) (*operation.Operator, error) { +func newOperator(log *logger.Logger, id routingKey, config *config.Config, r reporter) (*operation.Operator, error) { operatorConfig := &operatorCfg.Config{} if err := config.Unpack(&operatorConfig); err != nil { return nil, err @@ -87,6 +87,7 @@ func newOperator(log *logger.Logger, config *config.Config, r reporter) (*operat return operation.NewOperator( log, + id, config, fetcher, installer, diff --git a/x-pack/agent/pkg/agent/errors/types.go b/x-pack/agent/pkg/agent/errors/types.go index 1b78eb2b0c73..3d105631232b 100644 --- a/x-pack/agent/pkg/agent/errors/types.go +++ b/x-pack/agent/pkg/agent/errors/types.go @@ -42,4 +42,7 @@ var readableTypes = map[ErrorType]string{ TypeConfig: "CONFIG", TypePath: "PATH", TypeApplicationCrash: "CRASH", + TypeNetwork: "NETWORK", + TypeFilesystem: "FILESYSTEM", + TypeSecurity: "SECURITY", } diff --git a/x-pack/agent/pkg/agent/operation/config/config.go b/x-pack/agent/pkg/agent/operation/config/config.go index 4fb556108101..3f141c1d020a 100644 --- a/x-pack/agent/pkg/agent/operation/config/config.go +++ b/x-pack/agent/pkg/agent/operation/config/config.go @@ -6,6 +6,7 @@ package config import ( "github.com/elastic/beats/x-pack/agent/pkg/artifact" + "github.com/elastic/beats/x-pack/agent/pkg/core/plugin/app/monitoring" "github.com/elastic/beats/x-pack/agent/pkg/core/plugin/process" "github.com/elastic/beats/x-pack/agent/pkg/core/plugin/retry" ) @@ -16,4 +17,6 @@ type Config struct { RetryConfig *retry.Config `yaml:"retry" config:"retry"` DownloadConfig *artifact.Config `yaml:"download" config:"download"` + + MonitoringConfig *monitoring.Config `yaml:"monitoring" config:"monitoring"` } diff --git a/x-pack/agent/pkg/agent/operation/monitoring.go b/x-pack/agent/pkg/agent/operation/monitoring.go new file mode 100644 index 000000000000..767703bb3895 --- /dev/null +++ b/x-pack/agent/pkg/agent/operation/monitoring.go @@ -0,0 +1,265 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package operation + +import ( + "github.com/hashicorp/go-multierror" + "github.com/pkg/errors" + + "github.com/elastic/beats/x-pack/agent/pkg/agent/configrequest" + "github.com/elastic/beats/x-pack/agent/pkg/core/logger" + "github.com/elastic/beats/x-pack/agent/pkg/core/plugin/app" + "github.com/elastic/beats/x-pack/agent/pkg/core/plugin/app/monitoring" +) + +const ( + monitoringName = "FLEET_MONITORING" + monitoringKey = "monitoring" + outputKey = "output" + monitoringEnabledSubkey = "enabled" +) + +func (o *Operator) handleStartSidecar(s configrequest.Step) (result error) { + cfg, err := getConfigFromStep(s) + if err != nil { + return errors.Wrap(err, "operator.handleStartSidecar failed to retrieve config from step") + } + + // if monitoring is disabled and running stop it + if isEnabled := isMonitoringEnabled(o.logger, cfg); !isEnabled { + if o.isMonitoring { + o.logger.Info("operator.handleStartSidecar: monitoring is running and disabled, proceeding to stop") + return o.handleStopSidecar(s) + } + + o.logger.Info("operator.handleStartSidecar: monitoring is not running and disabled, no action taken") + return nil + } + + o.isMonitoring = true + + for _, step := range o.getMonitoringSteps(s) { + p, cfg, err := getProgramFromStepWithTags(step, monitoringTags()) + if err != nil { + return errors.Wrap(err, "operator.handleStartSidecar failed to create program") + } + + // best effort on starting monitoring, if no hosts provided stop and spare resources + if step.ID == configrequest.StepRemove { + if err := o.stop(p); err != nil { + result = multierror.Append(err, err) + } + } else { + if err := o.start(p, cfg); err != nil { + result = multierror.Append(err, err) + } + } + } + + return result +} + +func (o *Operator) handleStopSidecar(s configrequest.Step) (result error) { + for _, step := range o.getMonitoringSteps(s) { + p, _, err := getProgramFromStepWithTags(step, monitoringTags()) + if err != nil { + return errors.Wrap(err, "operator.handleStopSidecar failed to create program") + } + + if err := o.stop(p); err != nil { + result = multierror.Append(err, err) + } + } + + // if result != nil then something might be still running, setting isMonitoring to false + // will prevent tearing it down in a future + if result == nil { + o.isMonitoring = false + } + + return result +} + +func monitoringTags() map[app.Tag]string { + return map[app.Tag]string{ + app.TagSidecar: "true", + } +} + +func isMonitoringEnabled(logger *logger.Logger, cfg map[string]interface{}) bool { + monitoringVal, found := cfg[monitoringKey] + if !found { + logger.Error("operator.isMonitoringEnabled: monitoring not found in config") + return false + } + + monitoringMap, ok := monitoringVal.(map[string]interface{}) + if !ok { + logger.Error("operator.isMonitoringEnabled: monitoring not a map") + return false + } + + enabledVal, found := monitoringMap[monitoringEnabledSubkey] + if !found { + logger.Infof("operator.isMonitoringEnabled: monitoring.enabled key not found: %v", monitoringMap) + return false + } + + enabled, ok := enabledVal.(bool) + + return enabled && ok +} + +func (o *Operator) getMonitoringSteps(step configrequest.Step) []configrequest.Step { + // get output + config, err := getConfigFromStep(step) + if err != nil { + o.logger.Error("operator.getMonitoringSteps: getting config from step failed: %v", err) + return nil + } + + outputIface, found := config[outputKey] + if !found { + o.logger.Errorf("operator.getMonitoringSteps: monitoring configuration not found for sidecar of type %s", step.Process) + return nil + } + + outputMap, ok := outputIface.(map[string]interface{}) + if !ok { + o.logger.Error("operator.getMonitoringSteps: monitoring config is not a map") + return nil + } + + output, found := outputMap["elasticsearch"] + if !found { + o.logger.Error("operator.getMonitoringSteps: monitoring is missing an elasticsearch output configuration configuration for sidecar of type: %s", step.Process) + return nil + } + + return o.generateMonitoringSteps(o.config.MonitoringConfig, step.Version, output) +} + +func (o *Operator) generateMonitoringSteps(cfg *monitoring.Config, version string, output interface{}) []configrequest.Step { + var steps []configrequest.Step + + if cfg.MonitorLogs { + fbConfig, any := o.getMonitoringFilebeatConfig(output) + stepID := configrequest.StepRun + if !any { + stepID = configrequest.StepRemove + } + filebeatStep := configrequest.Step{ + ID: stepID, + Version: version, + Process: "filebeat", + Meta: map[string]interface{}{ + configrequest.MetaConfigKey: fbConfig, + }, + } + + steps = append(steps, filebeatStep) + } + + if cfg.MonitorMetrics { + mbConfig, any := o.getMonitoringMetricbeatConfig(output) + stepID := configrequest.StepRun + if !any { + stepID = configrequest.StepRemove + } + + metricbeatStep := configrequest.Step{ + ID: stepID, + Version: version, + Process: "metricbeat", + Meta: map[string]interface{}{ + configrequest.MetaConfigKey: mbConfig, + }, + } + + steps = append(steps, metricbeatStep) + } + + return steps +} + +func (o *Operator) getMonitoringFilebeatConfig(output interface{}) (map[string]interface{}, bool) { + paths := o.getLogFilePaths() + if len(paths) == 0 { + return nil, false + } + + result := map[string]interface{}{ + "filebeat": map[string]interface{}{ + "inputs": []interface{}{ + map[string]interface{}{ + "type": "log", + "paths": paths, + }, + }, + }, + "output": map[string]interface{}{ + "elasticsearch": output, + }, + } + + return result, true +} + +func (o *Operator) getMonitoringMetricbeatConfig(output interface{}) (map[string]interface{}, bool) { + hosts := o.getMetricbeatEndpoints() + if len(hosts) == 0 { + return nil, false + } + + result := map[string]interface{}{ + "metricbeat": map[string]interface{}{ + "modules": []interface{}{ + map[string]interface{}{ + "module": "beat", + "metricsets": []string{"stats", "state"}, + "period": "10s", + "hosts": hosts, + }, + }, + }, + "output": map[string]interface{}{ + "elasticsearch": output, + }, + } + + return result, true +} + +func (o *Operator) getLogFilePaths() []string { + var paths []string + + o.appsLock.Lock() + defer o.appsLock.Unlock() + + for _, a := range o.apps { + logPath := a.Monitor().LogPath() + if logPath != "" { + paths = append(paths, logPath) + } + } + + return paths +} + +func (o *Operator) getMetricbeatEndpoints() []string { + var endpoints []string + + o.appsLock.Lock() + defer o.appsLock.Unlock() + + for _, a := range o.apps { + metricEndpoint := a.Monitor().MetricsPathPrefixed() + if metricEndpoint != "" { + endpoints = append(endpoints, metricEndpoint) + } + } + + return endpoints +} diff --git a/x-pack/agent/pkg/agent/operation/monitoring_test.go b/x-pack/agent/pkg/agent/operation/monitoring_test.go new file mode 100644 index 000000000000..8becab188c38 --- /dev/null +++ b/x-pack/agent/pkg/agent/operation/monitoring_test.go @@ -0,0 +1,146 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package operation + +import ( + "testing" + "time" + + "github.com/elastic/beats/x-pack/agent/pkg/agent/configrequest" + operatorCfg "github.com/elastic/beats/x-pack/agent/pkg/agent/operation/config" + "github.com/elastic/beats/x-pack/agent/pkg/agent/stateresolver" + "github.com/elastic/beats/x-pack/agent/pkg/artifact" + "github.com/elastic/beats/x-pack/agent/pkg/config" + "github.com/elastic/beats/x-pack/agent/pkg/core/plugin/app/monitoring" + "github.com/elastic/beats/x-pack/agent/pkg/core/plugin/app/monitoring/beats" + "github.com/elastic/beats/x-pack/agent/pkg/core/plugin/process" + "github.com/elastic/beats/x-pack/agent/pkg/core/plugin/retry" + "github.com/elastic/beats/x-pack/agent/pkg/core/plugin/state" +) + +func TestGenerateSteps(t *testing.T) { + const sampleOutput = "sample-output" + operator, _ := getMonitorableTestOperator(t, "tests/scripts") + + type testCase struct { + Name string + Config *monitoring.Config + ExpectedSteps int + FilebeatStep bool + MetricbeatStep bool + } + + testCases := []testCase{ + testCase{"NO monitoring", &monitoring.Config{MonitorLogs: false, MonitorMetrics: false}, 0, false, false}, + testCase{"FB monitoring", &monitoring.Config{MonitorLogs: true, MonitorMetrics: false}, 1, true, false}, + testCase{"MB monitoring", &monitoring.Config{MonitorLogs: false, MonitorMetrics: true}, 1, false, true}, + testCase{"ALL monitoring", &monitoring.Config{MonitorLogs: true, MonitorMetrics: true}, 2, true, true}, + } + + for _, tc := range testCases { + t.Run(tc.Name, func(t *testing.T) { + steps := operator.generateMonitoringSteps(tc.Config, "8.0", sampleOutput) + if actualSteps := len(steps); actualSteps != tc.ExpectedSteps { + t.Fatalf("invalid number of steps, expected %v, got %v", tc.ExpectedSteps, actualSteps) + } + + var fbFound, mbFound bool + for _, s := range steps { + // Filebeat step check + if s.Process == "filebeat" { + fbFound = true + checkStep(t, "filebeat", sampleOutput, s) + } + + // Metricbeat step check + if s.Process == "metricbeat" { + mbFound = true + checkStep(t, "metricbeat", sampleOutput, s) + } + } + + if tc.FilebeatStep != fbFound { + t.Fatalf("Steps for filebeat do not match. Was expected: %v, Was found: %v", tc.FilebeatStep, fbFound) + } + + if tc.MetricbeatStep != mbFound { + t.Fatalf("Steps for metricbeat do not match. Was expected: %v, Was found: %v", tc.MetricbeatStep, mbFound) + } + }) + } +} + +func checkStep(t *testing.T, stepName string, expectedOutput interface{}, s configrequest.Step) { + if meta := s.Meta[configrequest.MetaConfigKey]; meta != nil { + mapstr, ok := meta.(map[string]interface{}) + if !ok { + t.Fatalf("no meta config for %s step", stepName) + } + + esOut, ok := mapstr["output"].(map[string]interface{}) + if !ok { + t.Fatalf("output not found for %s step", stepName) + } + + if actualOutput := esOut["elasticsearch"]; actualOutput != expectedOutput { + t.Fatalf("output for %s step does not match. expected: %v, got %v", stepName, expectedOutput, actualOutput) + } + } +} + +func getMonitorableTestOperator(t *testing.T, installPath string) (*Operator, *operatorCfg.Config) { + operatorConfig := &operatorCfg.Config{ + RetryConfig: &retry.Config{ + Enabled: true, + RetriesCount: 2, + Delay: 3 * time.Second, + MaxDelay: 10 * time.Second, + }, + ProcessConfig: &process.Config{}, + DownloadConfig: &artifact.Config{ + InstallPath: installPath, + OperatingSystem: "darwin", + }, + MonitoringConfig: &monitoring.Config{ + MonitorMetrics: true, + }, + } + + cfg, err := config.NewConfigFrom(operatorConfig) + if err != nil { + t.Fatal(err) + } + + l := getLogger() + + fetcher := &DummyDownloader{} + installer := &DummyInstaller{} + + stateResolver, err := stateresolver.NewStateResolver(l) + if err != nil { + t.Fatal(err) + } + + operator, err := NewOperator(l, "p1", cfg, fetcher, installer, stateResolver, nil) + if err != nil { + t.Fatal(err) + } + + monitor := beats.NewMonitor("dummmy", "p1234", &artifact.Config{OperatingSystem: "linux", InstallPath: "/install/path"}, true, true) + operator.apps["dummy"] = &testMonitorableApp{monitor: monitor} + + return operator, operatorConfig +} + +type testMonitorableApp struct { + monitor *beats.Monitor +} + +func (*testMonitorableApp) Name() string { return "" } +func (*testMonitorableApp) Start(cfg map[string]interface{}) error { return nil } +func (*testMonitorableApp) Stop() {} +func (*testMonitorableApp) Configure(config map[string]interface{}) error { return nil } +func (*testMonitorableApp) State() state.State { return state.State{} } +func (a *testMonitorableApp) Monitor() monitoring.Monitor { return a.monitor } diff --git a/x-pack/agent/pkg/agent/operation/operation.go b/x-pack/agent/pkg/agent/operation/operation.go index 4178eaff3f9c..f709bc4f3e20 100644 --- a/x-pack/agent/pkg/agent/operation/operation.go +++ b/x-pack/agent/pkg/agent/operation/operation.go @@ -6,6 +6,7 @@ package operation import ( "github.com/elastic/beats/x-pack/agent/pkg/artifact" + "github.com/elastic/beats/x-pack/agent/pkg/core/plugin/app/monitoring" "github.com/elastic/beats/x-pack/agent/pkg/core/plugin/state" ) @@ -32,6 +33,7 @@ type Application interface { Stop() Configure(config map[string]interface{}) error State() state.State + Monitor() monitoring.Monitor } // Descriptor defines a program which needs to be run. diff --git a/x-pack/agent/pkg/agent/operation/operator.go b/x-pack/agent/pkg/agent/operation/operator.go index 1eba3497b66d..1f72e3c65582 100644 --- a/x-pack/agent/pkg/agent/operation/operator.go +++ b/x-pack/agent/pkg/agent/operation/operator.go @@ -19,6 +19,7 @@ import ( "github.com/elastic/beats/x-pack/agent/pkg/config" "github.com/elastic/beats/x-pack/agent/pkg/core/logger" "github.com/elastic/beats/x-pack/agent/pkg/core/plugin/app" + "github.com/elastic/beats/x-pack/agent/pkg/core/plugin/app/monitoring" "github.com/elastic/beats/x-pack/agent/pkg/core/plugin/state" rconfig "github.com/elastic/beats/x-pack/agent/pkg/core/remoteconfig/grpc" ) @@ -29,11 +30,13 @@ import ( // Enables running sidecars for processes. // TODO: implement retry strategies type Operator struct { + pipelineID string logger *logger.Logger config *operatorCfg.Config handlers map[string]handleFunc stateResolver *stateresolver.StateResolver eventProcessor callbackHooks + isMonitoring bool apps map[string]Application appsLock sync.Mutex @@ -47,6 +50,7 @@ type Operator struct { // Based on backed up collection it prepares clients, watchers... on init func NewOperator( logger *logger.Logger, + pipelineID string, config *config.Config, fetcher download.Downloader, installer install.Installer, @@ -62,12 +66,20 @@ func NewOperator( return nil, fmt.Errorf("artifacts configuration not provided") } + if operatorConfig.MonitoringConfig == nil { + operatorConfig.MonitoringConfig = &monitoring.Config{ + MonitorLogs: false, + MonitorMetrics: false, + } + } + if eventProcessor == nil { eventProcessor = &noopCallbackHooks{} } operator := &Operator{ config: operatorConfig, + pipelineID: pipelineID, logger: logger, downloader: fetcher, installer: installer, @@ -213,7 +225,27 @@ func (o *Operator) getApp(p Descriptor) (Application, error) { return nil, fmt.Errorf("descriptor is not an app.Specifier") } - a := app.NewApplication(p.ID(), p.BinaryName(), specifier, factory, o.config, o.logger, o.eventProcessor.OnFailing) + monitor := monitoring.NewMonitor(isMonitorable(p), p.BinaryName(), o.pipelineID, o.config.DownloadConfig, o.config.MonitoringConfig.MonitorLogs, o.config.MonitoringConfig.MonitorMetrics) + + a, err := app.NewApplication(p.ID(), p.BinaryName(), o.pipelineID, specifier, factory, o.config, o.logger, o.eventProcessor.OnFailing, monitor) + if err != nil { + return nil, err + } + o.apps[id] = a return a, nil } + +func isMonitorable(descriptor Descriptor) bool { + type taggable interface { + Tags() map[app.Tag]string + } + + if taggable, ok := descriptor.(taggable); ok { + tags := taggable.Tags() + _, isSidecar := tags[app.TagSidecar] + return !isSidecar // everything is monitorable except sidecar + } + + return false +} diff --git a/x-pack/agent/pkg/agent/operation/operator_handlers.go b/x-pack/agent/pkg/agent/operation/operator_handlers.go index c6c3ed2f2ffa..6495a6affba8 100644 --- a/x-pack/agent/pkg/agent/operation/operator_handlers.go +++ b/x-pack/agent/pkg/agent/operation/operator_handlers.go @@ -27,6 +27,10 @@ func (o *Operator) initHandlerMap() { } func (o *Operator) handleRun(step configrequest.Step) error { + if step.Process == monitoringName { + return o.handleStartSidecar(step) + } + p, cfg, err := getProgramFromStep(step) if err != nil { return errors.Wrap(err, "operator.handleStart failed to create program") @@ -36,6 +40,10 @@ func (o *Operator) handleRun(step configrequest.Step) error { } func (o *Operator) handleRemove(step configrequest.Step) error { + if step.Process == monitoringName { + return o.handleStopSidecar(step) + } + p, _, err := getProgramFromStep(step) if err != nil { return errors.Wrap(err, "operator.handleStart failed to create program") @@ -44,27 +52,30 @@ func (o *Operator) handleRemove(step configrequest.Step) error { return o.stop(p) } -func (o *Operator) handleStartSidecar(step configrequest.Step) error { - // TODO: add support for monitoring - return nil +func getProgramFromStep(step configrequest.Step) (Descriptor, map[string]interface{}, error) { + return getProgramFromStepWithTags(step, nil) } -func (o *Operator) handleStopSidecar(step configrequest.Step) error { - // TODO: add support for monitoring - return nil +func getProgramFromStepWithTags(step configrequest.Step, tags map[app.Tag]string) (Descriptor, map[string]interface{}, error) { + config, err := getConfigFromStep(step) + if err != nil { + return nil, nil, err + } + + p := app.NewDescriptor(step.Process, step.Version, tags) + return p, config, nil } -func getProgramFromStep(step configrequest.Step) (Descriptor, map[string]interface{}, error) { +func getConfigFromStep(step configrequest.Step) (map[string]interface{}, error) { metConfig, ok := step.Meta[configrequest.MetaConfigKey] if !ok { - return nil, nil, fmt.Errorf("step: %s, no config in metadata", step.ID) + return nil, fmt.Errorf("step: %s, no config in metadata", step.ID) } config, ok := metConfig.(map[string]interface{}) if !ok { - return nil, nil, fmt.Errorf("step: %s, program config is in invalid format", step.ID) + return nil, fmt.Errorf("step: %s, program config is in invalid format", step.ID) } - p := app.NewDescriptor(step.Process, step.Version, nil) - return p, config, nil + return config, nil } diff --git a/x-pack/agent/pkg/agent/operation/operator_test.go b/x-pack/agent/pkg/agent/operation/operator_test.go index 5d6c644d73ad..a4c4a91fd340 100644 --- a/x-pack/agent/pkg/agent/operation/operator_test.go +++ b/x-pack/agent/pkg/agent/operation/operator_test.go @@ -22,6 +22,7 @@ import ( "github.com/elastic/beats/x-pack/agent/pkg/config" "github.com/elastic/beats/x-pack/agent/pkg/core/logger" "github.com/elastic/beats/x-pack/agent/pkg/core/plugin/app" + "github.com/elastic/beats/x-pack/agent/pkg/core/plugin/app/monitoring" "github.com/elastic/beats/x-pack/agent/pkg/core/plugin/process" "github.com/elastic/beats/x-pack/agent/pkg/core/plugin/retry" "github.com/elastic/beats/x-pack/agent/pkg/core/plugin/state" @@ -355,6 +356,9 @@ func getTestOperator(t *testing.T, installPath string) (*Operator, *operatorCfg. InstallPath: installPath, OperatingSystem: "darwin", }, + MonitoringConfig: &monitoring.Config{ + MonitorMetrics: false, + }, } cfg, err := config.NewConfigFrom(operatorConfig) @@ -372,7 +376,7 @@ func getTestOperator(t *testing.T, installPath string) (*Operator, *operatorCfg. t.Fatal(err) } - operator, err := NewOperator(l, cfg, fetcher, installer, stateResolver, nil) + operator, err := NewOperator(l, "p1", cfg, fetcher, installer, stateResolver, nil) if err != nil { t.Fatal(err) } diff --git a/x-pack/agent/pkg/core/plugin/app/app.go b/x-pack/agent/pkg/core/plugin/app/app.go index 356cbec8c35f..541073d9f1b7 100644 --- a/x-pack/agent/pkg/core/plugin/app/app.go +++ b/x-pack/agent/pkg/core/plugin/app/app.go @@ -15,6 +15,7 @@ import ( "github.com/elastic/beats/x-pack/agent/pkg/agent/operation/config" "github.com/elastic/beats/x-pack/agent/pkg/artifact" "github.com/elastic/beats/x-pack/agent/pkg/core/logger" + "github.com/elastic/beats/x-pack/agent/pkg/core/plugin/app/monitoring" "github.com/elastic/beats/x-pack/agent/pkg/core/plugin/process" "github.com/elastic/beats/x-pack/agent/pkg/core/plugin/retry" "github.com/elastic/beats/x-pack/agent/pkg/core/plugin/state" @@ -38,6 +39,7 @@ type ReportFailureFunc func(string, error) type Application struct { id string name string + pipelineID string spec Specifier state state.State grpcClient remoteconfig.Client @@ -45,6 +47,11 @@ type Application struct { limiter *tokenbucket.Bucket failureReporter ReportFailureFunc + uid int + gid int + + monitor monitoring.Monitor + processConfig *process.Config downloadConfig *artifact.Config retryConfig *retry.Config @@ -54,13 +61,34 @@ type Application struct { appLock sync.Mutex } +// ArgsDecorator decorates arguments before calling an application +type ArgsDecorator func([]string) []string + // NewApplication creates a new instance of an applications. It will not automatically start // the application. -func NewApplication(id, appName string, spec Specifier, factory remoteconfig.ConnectionCreator, cfg *config.Config, logger *logger.Logger, failureReporter ReportFailureFunc) *Application { +func NewApplication(id, appName, pipelineID string, + spec Specifier, + factory remoteconfig.ConnectionCreator, + cfg *config.Config, + logger *logger.Logger, + failureReporter ReportFailureFunc, + monitor monitoring.Monitor) (*Application, error) { + + s, err := spec.Spec(cfg.DownloadConfig) + if err != nil { + return nil, err + } + + uid, gid, err := getUserGroup(s) + if err != nil { + return nil, err + } + b, _ := tokenbucket.NewTokenBucket(3, 3, 1*time.Second) return &Application{ id: id, name: appName, + pipelineID: pipelineID, spec: spec, clientFactory: factory, processConfig: cfg.ProcessConfig, @@ -69,7 +97,15 @@ func NewApplication(id, appName string, spec Specifier, factory remoteconfig.Con logger: logger, limiter: b, failureReporter: failureReporter, - } + monitor: monitor, + uid: uid, + gid: gid, + }, nil +} + +// Monitor returns monitoring handler of this app. +func (a *Application) Monitor() monitoring.Monitor { + return a.monitor } // Name returns application name @@ -100,6 +136,9 @@ func (a *Application) Stop() { // ignoring error: not critical os.Remove(filePath) } + + // cleanup drops + a.monitor.Cleanup() } } @@ -139,6 +178,8 @@ func (a *Application) watch(proc *os.Process, cfg map[string]interface{}) { } func (a *Application) reportCrash() { + a.monitor.Cleanup() + // TODO: reporting crash if a.failureReporter != nil { crashError := errors.New( diff --git a/x-pack/agent/pkg/core/plugin/app/monitoring/beats/beats_monitor.go b/x-pack/agent/pkg/core/plugin/app/monitoring/beats/beats_monitor.go new file mode 100644 index 000000000000..936c6d474795 --- /dev/null +++ b/x-pack/agent/pkg/core/plugin/app/monitoring/beats/beats_monitor.go @@ -0,0 +1,199 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package beats + +import ( + "net/url" + "os" + "path/filepath" + "strings" + "unicode" + + "github.com/elastic/beats/x-pack/agent/pkg/artifact" +) + +const httpPlusPrefix = "http+" + +// Monitor is a monitoring interface providing information about the way +// how beat is monitored +type Monitor struct { + pipelineID string + + process string + monitoringEndpoint string + loggingPath string + + monitorLogs bool + monitorMetrics bool +} + +// NewMonitor creates a beats monitor. +func NewMonitor(process, pipelineID string, downloadConfig *artifact.Config, monitorLogs, monitorMetrics bool) *Monitor { + var monitoringEndpoint, loggingPath string + + if monitorMetrics { + monitoringEndpoint = getMonitoringEndpoint(process, downloadConfig.OS(), pipelineID) + } + if monitorLogs { + loggingPath = getLoggingFileDirectory(downloadConfig.InstallPath, downloadConfig.OS(), pipelineID) + } + + return &Monitor{ + pipelineID: pipelineID, + process: process, + monitoringEndpoint: monitoringEndpoint, + loggingPath: loggingPath, + monitorLogs: monitorLogs, + monitorMetrics: monitorMetrics, + } +} + +// EnrichArgs enriches arguments provided to application, in order to enable +// monitoring +func (b *Monitor) EnrichArgs(args []string) []string { + appendix := make([]string, 0, 7) + + if b.monitoringEndpoint != "" { + appendix = append(appendix, + "-E", "http.enabled=true", + "-E", "http.host="+b.monitoringEndpoint, + ) + } + + if b.loggingPath != "" { + appendix = append(appendix, + "-E", "logging.files.path="+b.loggingPath, + "-E", "logging.files.name="+b.process, + "-E", "logging.files.keepfiles=7", + "-E", "logging.files.permission=0644", + "-E", "logging.files.interval=1h", + ) + } + + return append(args, appendix...) +} + +// Cleanup removes +func (b *Monitor) Cleanup() error { + // do not cleanup logs, they might not be all processed + drop := b.monitoringDrop() + if drop == "" { + return nil + } + + return os.RemoveAll(drop) +} + +// Prepare executes steps in order for monitoring to work correctly +func (b *Monitor) Prepare(uid, gid int) error { + drops := []string{b.loggingPath} + if drop := b.monitoringDrop(); drop != "" { + drops = append(drops, drop) + } + + for _, drop := range drops { + if drop == "" { + continue + } + + _, err := os.Stat(drop) + if err != nil { + if !os.IsNotExist(err) { + return err + } + + // create + if err := os.MkdirAll(drop, 0775); err != nil { + return err + } + } + + if err := os.Chown(drop, uid, gid); err != nil { + return err + } + } + + return nil +} + +// LogPath describes a path where application stores logs. Empty if +// application is not monitorable +func (b *Monitor) LogPath() string { + if !b.monitorLogs { + return "" + } + + return b.loggingPath +} + +// MetricsPath describes a location where application exposes metrics +// collectable by metricbeat. +func (b *Monitor) MetricsPath() string { + if !b.monitorMetrics { + return "" + } + + return b.monitoringEndpoint +} + +// MetricsPathPrefixed return metrics path prefixed with http+ prefix. +func (b *Monitor) MetricsPathPrefixed() string { + return httpPlusPrefix + b.MetricsPath() +} + +func (b *Monitor) monitoringDrop() string { + return monitoringDrop(b.monitoringEndpoint) +} + +func monitoringDrop(path string) (drop string) { + defer func() { + if drop != "" { + drop = filepath.Dir(drop) + } + }() + + if strings.Contains(path, "localhost") { + return "" + } + + if strings.HasPrefix(path, httpPlusPrefix) { + path = strings.TrimPrefix(path, httpPlusPrefix) + } + + // npipe is virtual without a drop + if isNpipe(path) { + return "" + } + + if isWindowsPath(path) { + return path + } + + u, _ := url.Parse(path) + if u == nil || (u.Scheme != "" && u.Scheme != "file" && u.Scheme != "unix") { + return "" + } + + if u.Scheme == "file" { + return strings.TrimPrefix(path, "file://") + } + + if u.Scheme == "unix" { + return strings.TrimPrefix(path, "unix://") + } + + return path +} + +func isNpipe(path string) bool { + return strings.HasPrefix(path, "npipe") || strings.HasPrefix(path, `\\.\pipe\`) +} + +func isWindowsPath(path string) bool { + if len(path) < 4 { + return false + } + return unicode.IsLetter(rune(path[0])) && path[1] == ':' +} diff --git a/x-pack/agent/pkg/core/plugin/app/monitoring/beats/drop_test.go b/x-pack/agent/pkg/core/plugin/app/monitoring/beats/drop_test.go new file mode 100644 index 000000000000..f9f04dd71b21 --- /dev/null +++ b/x-pack/agent/pkg/core/plugin/app/monitoring/beats/drop_test.go @@ -0,0 +1,42 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package beats + +import ( + "testing" +) + +type testCase struct { + Endpoint string + Drop string +} + +func TestMonitoringDrops(t *testing.T) { + cases := []testCase{ + testCase{`/var/lib/drop/abc.sock`, "/var/lib/drop"}, + testCase{`npipe://drop`, ""}, + testCase{`http+npipe://drop`, ""}, + testCase{`\\.\pipe\drop`, ""}, + testCase{`unix:///var/lib/drop/abc.sock`, "/var/lib/drop"}, + testCase{`http+unix:///var/lib/drop/abc.sock`, "/var/lib/drop"}, + testCase{`file:///var/lib/drop/abc.sock`, "/var/lib/drop"}, + testCase{`http://localhost/stats`, ""}, + testCase{`localhost/stats`, ""}, + testCase{`http://localhost:8080/stats`, ""}, + testCase{`localhost:8080/stats`, ""}, + testCase{`http://1.2.3.4/stats`, ""}, + testCase{`http://1.2.3.4:5678/stats`, ""}, + testCase{`1.2.3.4:5678/stats`, ""}, + testCase{`http://hithere.com:5678/stats`, ""}, + testCase{`hithere.com:5678/stats`, ""}, + } + + for _, c := range cases { + drop := monitoringDrop(c.Endpoint) + if drop != c.Drop { + t.Errorf("Case[%s]: Expected '%s', got '%s'", c.Endpoint, c.Drop, drop) + } + } +} diff --git a/x-pack/agent/pkg/core/plugin/app/monitoring/beats/monitoring.go b/x-pack/agent/pkg/core/plugin/app/monitoring/beats/monitoring.go new file mode 100644 index 000000000000..a69581b3ff95 --- /dev/null +++ b/x-pack/agent/pkg/core/plugin/app/monitoring/beats/monitoring.go @@ -0,0 +1,42 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package beats + +import ( + "fmt" + "path/filepath" +) + +const ( + // args: pipeline name, application name + logFileFormat = "/var/log/elastic-agent/%s/%s" + // args: install path, pipeline name, application name + logFileFormatWin = "%s\\logs\\elastic-agent\\%s\\%s" + + // args: pipeline name, application name + mbEndpointFileFormat = "unix:///var/run/elastic-agent/%s/%s/%s.sock" + // args: pipeline name, application name + mbEndpointFileFormatWin = `npipe:///%s-%s` +) + +func getMonitoringEndpoint(program, operatingSystem, pipelineID string) string { + if operatingSystem == "windows" { + return fmt.Sprintf(mbEndpointFileFormatWin, pipelineID, program) + } + + return fmt.Sprintf(mbEndpointFileFormat, pipelineID, program, program) +} + +func getLoggingFile(program, operatingSystem, installPath, pipelineID string) string { + if operatingSystem == "windows" { + return fmt.Sprintf(logFileFormatWin, installPath, pipelineID, program) + } + + return fmt.Sprintf(logFileFormat, pipelineID, program) +} + +func getLoggingFileDirectory(installPath, operatingSystem, pipelineID string) string { + return filepath.Base(getLoggingFile("program", operatingSystem, installPath, pipelineID)) +} diff --git a/x-pack/agent/pkg/core/plugin/app/monitoring/config.go b/x-pack/agent/pkg/core/plugin/app/monitoring/config.go new file mode 100644 index 000000000000..c5e2eba6a57b --- /dev/null +++ b/x-pack/agent/pkg/core/plugin/app/monitoring/config.go @@ -0,0 +1,11 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package monitoring + +// Config describes a configuration of a monitoring +type Config struct { + MonitorLogs bool `yaml:"logs" config:"logs"` + MonitorMetrics bool `yaml:"metrics" config:"metrics"` +} diff --git a/x-pack/agent/pkg/core/plugin/app/monitoring/monitor.go b/x-pack/agent/pkg/core/plugin/app/monitoring/monitor.go new file mode 100644 index 000000000000..b228b4f28acc --- /dev/null +++ b/x-pack/agent/pkg/core/plugin/app/monitoring/monitor.go @@ -0,0 +1,32 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package monitoring + +import ( + "github.com/elastic/beats/x-pack/agent/pkg/artifact" + "github.com/elastic/beats/x-pack/agent/pkg/core/plugin/app/monitoring/beats" + "github.com/elastic/beats/x-pack/agent/pkg/core/plugin/app/monitoring/noop" +) + +// Monitor is a monitoring interface providing information about the way +// how application is monitored +type Monitor interface { + EnrichArgs([]string) []string + Prepare(uid, gid int) error + Cleanup() error + LogPath() string + MetricsPath() string + MetricsPathPrefixed() string +} + +// NewMonitor creates a monitor based on a process configuration. +func NewMonitor(isMonitorable bool, process, pipelineID string, downloadConfig *artifact.Config, monitorLogs, monitorMetrics bool) Monitor { + if !isMonitorable { + return noop.NewMonitor() + } + + // so far we support only beats monitoring + return beats.NewMonitor(process, pipelineID, downloadConfig, monitorLogs, monitorMetrics) +} diff --git a/x-pack/agent/pkg/core/plugin/app/monitoring/noop/noop_monitor.go b/x-pack/agent/pkg/core/plugin/app/monitoring/noop/noop_monitor.go new file mode 100644 index 000000000000..93e8c2c46dcf --- /dev/null +++ b/x-pack/agent/pkg/core/plugin/app/monitoring/noop/noop_monitor.go @@ -0,0 +1,48 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package noop + +// Monitor is a monitoring interface providing information about the way +// how beat is monitored +type Monitor struct { +} + +// NewMonitor creates a beats monitor. +func NewMonitor() *Monitor { + return &Monitor{} +} + +// EnrichArgs enriches arguments provided to application, in order to enable +// monitoring +func (b *Monitor) EnrichArgs(args []string) []string { + return args +} + +// Cleanup cleans up all drops. +func (b *Monitor) Cleanup() error { + return nil +} + +// Prepare executes steps in order for monitoring to work correctly +func (b *Monitor) Prepare(uid, gid int) error { + return nil +} + +// LogPath describes a path where application stores logs. Empty if +// application is not monitorable +func (b *Monitor) LogPath() string { + return "" +} + +// MetricsPath describes a location where application exposes metrics +// collectable by metricbeat. +func (b *Monitor) MetricsPath() string { + return "" +} + +// MetricsPathPrefixed return metrics path prefixed with http+ prefix. +func (b *Monitor) MetricsPathPrefixed() string { + return "" +} diff --git a/x-pack/agent/pkg/core/plugin/app/start.go b/x-pack/agent/pkg/core/plugin/app/start.go index 058d5247fa2f..9f1f4b00ec58 100644 --- a/x-pack/agent/pkg/core/plugin/app/start.go +++ b/x-pack/agent/pkg/core/plugin/app/start.go @@ -47,14 +47,13 @@ func (a *Application) Start(cfg map[string]interface{}) (err error) { } }() - spec, err := a.spec.Spec(a.downloadConfig) - if err != nil { - return errors.New(err, errors.TypeFilesystem) + if err := a.monitor.Prepare(a.uid, a.gid); err != nil { + return err } - uid, gid, err := getUserGroup(spec) + spec, err := a.spec.Spec(a.downloadConfig) if err != nil { - return errors.New(err, errors.TypeConfig) + return errors.New(err, errors.TypeFilesystem) } if err := a.configureByFile(&spec, cfg); err != nil { @@ -75,12 +74,19 @@ func (a *Application) Start(cfg map[string]interface{}) (err error) { a.limiter.Add() } + spec.Args = a.monitor.EnrichArgs(spec.Args) + + // specify beat name to avoid data lock conflicts + // as for https://github.com/elastic/beats/pull/14030 more than one instance + // of the beat with same data path fails to start + spec.Args = injectDataPath(spec.Args, a.pipelineID, a.id) + a.state.ProcessInfo, err = process.Start( a.logger, spec.BinaryPath, a.processConfig, - uid, - gid, + a.uid, + a.gid, processCreds, spec.Args...) if err != nil { @@ -99,6 +105,16 @@ func (a *Application) Start(cfg map[string]interface{}) (err error) { return nil } +func injectDataPath(args []string, pipelineID, id string) []string { + wd := "" + if w, err := os.Getwd(); err == nil { + wd = w + } + + dataPath := filepath.Join(wd, "data", pipelineID, id) + return append(args, "-E", "path.data="+dataPath) +} + func generateCA(configurable string) (*authority.CertificateAuthority, error) { if !isGrpcConfigurable(configurable) { return nil, nil @@ -180,6 +196,11 @@ func (a *Application) configureByFile(spec *ProcessSpec, config map[string]inter } defer f.Close() + // change owner + if err := os.Chown(filePath, a.uid, a.gid); err != nil { + return err + } + encoder := yaml.NewEncoder(f) if err := encoder.Encode(config); err != nil { return errors.New(err, errors.TypeFilesystem) diff --git a/x-pack/agent/pkg/core/plugin/app/tag.go b/x-pack/agent/pkg/core/plugin/app/tag.go index 6fe3fb128535..8b719031b812 100644 --- a/x-pack/agent/pkg/core/plugin/app/tag.go +++ b/x-pack/agent/pkg/core/plugin/app/tag.go @@ -8,11 +8,5 @@ package app // to a process. type Tag string -// TagSidecarOf tags a sidecar process and identifies -// a process which is sidecard-ed. -// Example: -// - p1: filebeat - watches apache logs -// - p2: metricbeat - waches p1 -// p1.Tags = [] -// p2.Tags = [{ "sidecar-of": "filebeat" }] -const TagSidecarOf = "sidecar-of" +// TagSidecar tags a sidecar process +const TagSidecar = "sidecar"