diff --git a/x-pack/agent/pkg/agent/application/emitter.go b/x-pack/agent/pkg/agent/application/emitter.go index 2e5997f50fef..4ec9abcbf151 100644 --- a/x-pack/agent/pkg/agent/application/emitter.go +++ b/x-pack/agent/pkg/agent/application/emitter.go @@ -41,10 +41,6 @@ func emitter(log *logger.Logger, router *router) emitterFunc { return err } - grouped := map[routingKey][]program.Program{ - defautlRK: programsToRun, - } - - return router.Dispatch(ast.HashStr(), grouped) + return router.Dispatch(ast.HashStr(), programsToRun) } } diff --git a/x-pack/agent/pkg/agent/program/program.go b/x-pack/agent/pkg/agent/program/program.go index ad4443abb2c0..9266786967fc 100644 --- a/x-pack/agent/pkg/agent/program/program.go +++ b/x-pack/agent/pkg/agent/program/program.go @@ -5,8 +5,11 @@ package program import ( + "fmt" "strings" + "github.com/pkg/errors" + "github.com/elastic/beats/x-pack/agent/pkg/agent/transpiler" "github.com/elastic/beats/x-pack/agent/pkg/boolexp" ) @@ -45,10 +48,27 @@ func (p *Program) Configuration() map[string]interface{} { // Programs take a Tree representation of the main configuration and apply all the different // programs rules and generate individual configuration from the rules. -func Programs(singleConfig *transpiler.AST) ([]Program, error) { +func Programs(singleConfig *transpiler.AST) (map[string][]Program, error) { + grouped, err := groupByOutputs(singleConfig) + if err != nil { + return nil, errors.Wrap(err, "fail to extract program configuration") + } + + groupedPrograms := make(map[string][]Program) + for k, config := range grouped { + programs, err := detectPrograms(config) + if err != nil { + return nil, errors.Wrap(err, "fail to generate program configuration") + } + groupedPrograms[k] = programs + } + + return groupedPrograms, nil +} + +func detectPrograms(singleConfig *transpiler.AST) ([]Program, error) { programs := make([]Program, 0) for _, spec := range Supported { - // TODO: better error handling here. specificAST := singleConfig.Clone() err := spec.Rules.Apply(specificAST) if err != nil { @@ -80,6 +100,7 @@ func Programs(singleConfig *transpiler.AST) ([]Program, error) { programs = append(programs, program) } return programs, nil + } // KnownProgramNames returns a list of runnable programs by the agent. @@ -90,3 +111,152 @@ func KnownProgramNames() []string { } return names } + +func groupByOutputs(single *transpiler.AST) (map[string]*transpiler.AST, error) { + const ( + outputsKey = "outputs" + outputKey = "output" + streamsKey = "streams" + typeKey = "type" + ) + + if _, found := transpiler.Select(single, outputsKey); !found { + return nil, errors.New("invalid configuration missing outputs configuration") + } + + // Normalize using an intermediate map. + normMap, err := single.Map() + if err != nil { + return nil, errors.Wrap(err, "could not read configuration") + } + + // Recreates multiple configuration grouped by the name of the outputs. + // Each configuration will be started into his own operator with the same name as the output. + grouped := make(map[string]map[string]interface{}) + + m, ok := normMap[outputsKey] + if !ok { + return nil, errors.New("fail to received a list of configured outputs") + } + + out, ok := m.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf( + "invalid outputs configuration received, expecting a map not a %T", + m, + ) + } + + for k, v := range out { + outputsOptions, ok := v.(map[string]interface{}) + if !ok { + return nil, errors.New("invalid type for output configuration block") + } + + t, ok := outputsOptions[typeKey] + if !ok { + return nil, fmt.Errorf("missing output type named output %s", k) + } + + n, ok := t.(string) + if !ok { + return nil, fmt.Errorf("invalid type received %T and expecting a string", t) + } + + delete(outputsOptions, typeKey) + + // Propagate global configuration to each individual configuration. + clone := cloneMap(normMap) + delete(clone, outputsKey) + clone[outputKey] = map[string]interface{}{n: v} + clone[streamsKey] = make([]map[string]interface{}, 0) + + grouped[k] = clone + } + + s, ok := normMap[streamsKey] + if !ok { + return nil, errors.New("no streams are configured") + } + + list, ok := s.([]interface{}) + if !ok { + return nil, errors.New("fail to receive a list of configured streams") + } + + for _, item := range list { + stream, ok := item.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf( + "invalid type for stream expecting a map of options and received %T", + item, + ) + } + targetName := findOutputName(stream) + + // Do we have configuration for that specific outputs if not we fail to load the configuration. + config, ok := grouped[targetName] + if !ok { + return nil, fmt.Errorf("unknown configuration output with name %s", targetName) + } + + streams := config[streamsKey].([]map[string]interface{}) + streams = append(streams, stream) + + config[streamsKey] = streams + grouped[targetName] = config + } + + transpiled := make(map[string]*transpiler.AST) + + for name, group := range grouped { + if len(group[streamsKey].([]map[string]interface{})) == 0 { + continue + } + + ast, err := transpiler.NewAST(group) + if err != nil { + return nil, errors.Wrapf(err, "fail to generate configuration for output name %s", name) + } + + transpiled[name] = ast + } + + return transpiled, nil +} + +func findOutputName(m map[string]interface{}) string { + const ( + defaultOutputName = "default" + outputKey = "output" + useOutputKey = "use_output" + ) + + output, ok := m[outputKey] + if !ok { + return defaultOutputName + } + + o := output.(map[string]interface{}) + + name, ok := o[useOutputKey] + if !ok { + return defaultOutputName + } + + return name.(string) +} + +func cloneMap(m map[string]interface{}) map[string]interface{} { + newMap := make(map[string]interface{}) + for k, v := range m { + sV, ok := v.(map[string]interface{}) + if ok { + newMap[k] = cloneMap(sV) + continue + } + newMap[k] = v + } + + return newMap +} diff --git a/x-pack/agent/pkg/agent/program/program_test.go b/x-pack/agent/pkg/agent/program/program_test.go new file mode 100644 index 000000000000..2929fa6354cd --- /dev/null +++ b/x-pack/agent/pkg/agent/program/program_test.go @@ -0,0 +1,555 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package program + +import ( + "io/ioutil" + "path/filepath" + "strings" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + yaml "gopkg.in/yaml.v2" + + "github.com/elastic/beats/x-pack/agent/pkg/agent/internal/yamltest" + "github.com/elastic/beats/x-pack/agent/pkg/agent/transpiler" +) + +func TestGroupBy(t *testing.T) { + t.Run("only named output", func(t *testing.T) { + sConfig := map[string]interface{}{ + "outputs": map[string]interface{}{ + "special": map[string]interface{}{ + "type": "elasticsearch", + "hosts": "xxx", + "username": "myusername", + "password": "mypassword", + }, + "infosec1": map[string]interface{}{ + "type": "elasticsearch", + "hosts": "yyy", + "username": "anotherusername", + "password": "anotherpassword", + }, + }, + "streams": []map[string]interface{}{ + map[string]interface{}{ + "type": "log", + "path": "/var/log/hello.log", + "output": map[string]interface{}{ + "use_output": "special", + }, + }, + map[string]interface{}{ + "type": "metrics/system", + "output": map[string]interface{}{ + "use_output": "special", + }, + }, + map[string]interface{}{ + "type": "log", + "path": "/var/log/infosec.log", + "output": map[string]interface{}{ + "use_output": "infosec1", + "pipeline": "custompipeline", + "index_name": "myindex", + }, + }, + }, + } + + ast, err := transpiler.NewAST(sConfig) + require.NoError(t, err) + + grouped, err := groupByOutputs(ast) + require.NoError(t, err) + require.Equal(t, 2, len(grouped)) + + c1 := transpiler.MustNewAST(map[string]interface{}{ + "output": map[string]interface{}{ + "elasticsearch": map[string]interface{}{ + "hosts": "xxx", + "username": "myusername", + "password": "mypassword", + }, + }, + "streams": []map[string]interface{}{ + map[string]interface{}{ + "type": "log", + "path": "/var/log/hello.log", + "output": map[string]interface{}{ + "use_output": "special", + }, + }, + map[string]interface{}{ + "type": "metrics/system", + "output": map[string]interface{}{ + "use_output": "special", + }, + }, + }, + }) + + c2, _ := transpiler.NewAST(map[string]interface{}{ + "output": map[string]interface{}{ + "elasticsearch": map[string]interface{}{ + "hosts": "yyy", + "username": "anotherusername", + "password": "anotherpassword", + }, + }, + "streams": []map[string]interface{}{ + map[string]interface{}{ + "type": "log", + "path": "/var/log/infosec.log", + "output": map[string]interface{}{ + "use_output": "infosec1", + "pipeline": "custompipeline", + "index_name": "myindex", + }, + }, + }, + }) + + defaultConfig, ok := grouped["special"] + require.True(t, ok) + require.Equal(t, c1.Hash(), defaultConfig.Hash()) + + infosec1Config, ok := grouped["infosec1"] + + require.True(t, ok) + require.Equal(t, c2.Hash(), infosec1Config.Hash()) + }) + + t.Run("copy any top level configuration options to each configuration", func(t *testing.T) { + sConfig := map[string]interface{}{ + "monitoring": map[string]interface{}{ + "elasticsearch": map[string]interface{}{ + "hosts": "127.0.0.1", + }, + }, + "keystore": map[string]interface{}{ + "path": "${path.data}/keystore", + }, + "outputs": map[string]interface{}{ + "special": map[string]interface{}{ + "type": "elasticsearch", + "hosts": "xxx", + "username": "myusername", + "password": "mypassword", + }, + "infosec1": map[string]interface{}{ + "type": "elasticsearch", + "hosts": "yyy", + "username": "anotherusername", + "password": "anotherpassword", + }, + }, + "streams": []map[string]interface{}{ + map[string]interface{}{ + "type": "log", + "path": "/var/log/hello.log", + "output": map[string]interface{}{ + "use_output": "special", + }, + }, + map[string]interface{}{ + "type": "metrics/system", + "output": map[string]interface{}{ + "use_output": "special", + }, + }, + map[string]interface{}{ + "type": "log", + "path": "/var/log/infosec.log", + "output": map[string]interface{}{ + "use_output": "infosec1", + "pipeline": "custompipeline", + "index_name": "myindex", + }, + }, + }, + } + + ast, err := transpiler.NewAST(sConfig) + require.NoError(t, err) + + grouped, err := groupByOutputs(ast) + require.NoError(t, err) + require.Equal(t, 2, len(grouped)) + + c1 := transpiler.MustNewAST(map[string]interface{}{ + "output": map[string]interface{}{ + "elasticsearch": map[string]interface{}{ + "hosts": "xxx", + "username": "myusername", + "password": "mypassword", + }, + }, + "streams": []map[string]interface{}{ + map[string]interface{}{ + "type": "log", + "path": "/var/log/hello.log", + "output": map[string]interface{}{ + "use_output": "special", + }, + }, + map[string]interface{}{ + "type": "metrics/system", + "output": map[string]interface{}{ + "use_output": "special", + }, + }, + }, + "monitoring": map[string]interface{}{ + "elasticsearch": map[string]interface{}{ + "hosts": "127.0.0.1", + }, + }, + "keystore": map[string]interface{}{ + "path": "${path.data}/keystore", + }, + }) + + c2, _ := transpiler.NewAST(map[string]interface{}{ + "output": map[string]interface{}{ + "elasticsearch": map[string]interface{}{ + "hosts": "yyy", + "username": "anotherusername", + "password": "anotherpassword", + }, + }, + "streams": []map[string]interface{}{ + map[string]interface{}{ + "type": "log", + "path": "/var/log/infosec.log", + "output": map[string]interface{}{ + "use_output": "infosec1", + "pipeline": "custompipeline", + "index_name": "myindex", + }, + }, + }, + "monitoring": map[string]interface{}{ + "elasticsearch": map[string]interface{}{ + "hosts": "127.0.0.1", + }, + }, + "keystore": map[string]interface{}{ + "path": "${path.data}/keystore", + }, + }) + + defaultConfig, ok := grouped["special"] + require.True(t, ok) + require.Equal(t, c1.Hash(), defaultConfig.Hash()) + + infosec1Config, ok := grouped["infosec1"] + + require.True(t, ok) + require.Equal(t, c2.Hash(), infosec1Config.Hash()) + }) + + t.Run("fail when the referenced named output doesn't exist", func(t *testing.T) { + sConfig := map[string]interface{}{ + "monitoring": map[string]interface{}{ + "elasticsearch": map[string]interface{}{ + "hosts": "localhost", + }, + }, + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + "hosts": "xxx", + "username": "myusername", + "password": "mypassword", + }, + "infosec1": map[string]interface{}{ + "type": "elasticsearch", + "hosts": "yyy", + "username": "anotherusername", + "password": "anotherpassword", + }, + }, + "streams": []map[string]interface{}{ + map[string]interface{}{ + "type": "log", + "path": "/var/log/hello.log", + }, + map[string]interface{}{ + "type": "metrics/system", + }, + map[string]interface{}{ + "type": "log", + "path": "/var/log/infosec.log", + "output": map[string]interface{}{ + "use_output": "donotexist", + "pipeline": "custompipeline", + "index_name": "myindex", + }, + }, + }, + } + + ast, err := transpiler.NewAST(sConfig) + require.NoError(t, err) + + _, err = groupByOutputs(ast) + require.Error(t, err) + }) + + t.Run("only default output", func(t *testing.T) { + sConfig := map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + "hosts": "xxx", + "username": "myusername", + "password": "mypassword", + }, + "infosec1": map[string]interface{}{ + "type": "elasticsearch", + "hosts": "yyy", + "username": "anotherusername", + "password": "anotherpassword", + }, + }, + "streams": []map[string]interface{}{ + map[string]interface{}{ + "type": "log", + "path": "/var/log/hello.log", + }, + map[string]interface{}{ + "type": "metrics/system", + }, + map[string]interface{}{ + "type": "log", + "path": "/var/log/infosec.log", + }, + }, + } + + ast, err := transpiler.NewAST(sConfig) + require.NoError(t, err) + + grouped, err := groupByOutputs(ast) + require.NoError(t, err) + require.Equal(t, 1, len(grouped)) + + c1 := transpiler.MustNewAST(map[string]interface{}{ + "output": map[string]interface{}{ + "elasticsearch": map[string]interface{}{ + "hosts": "xxx", + "username": "myusername", + "password": "mypassword", + }, + }, + "streams": []map[string]interface{}{ + map[string]interface{}{ + "type": "log", + "path": "/var/log/hello.log", + }, + map[string]interface{}{ + "type": "metrics/system", + }, + map[string]interface{}{ + "type": "log", + "path": "/var/log/infosec.log", + }, + }, + }) + + defaultConfig, ok := grouped["default"] + require.True(t, ok) + require.Equal(t, c1.Hash(), defaultConfig.Hash()) + + _, ok = grouped["infosec1"] + + require.False(t, ok) + }) + + t.Run("default and named output", func(t *testing.T) { + sConfig := map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + "hosts": "xxx", + "username": "myusername", + "password": "mypassword", + }, + "infosec1": map[string]interface{}{ + "type": "elasticsearch", + "hosts": "yyy", + "username": "anotherusername", + "password": "anotherpassword", + }, + }, + "streams": []map[string]interface{}{ + map[string]interface{}{ + "type": "log", + "path": "/var/log/hello.log", + }, + map[string]interface{}{ + "type": "metrics/system", + }, + map[string]interface{}{ + "type": "log", + "path": "/var/log/infosec.log", + "output": map[string]interface{}{ + "use_output": "infosec1", + "pipeline": "custompipeline", + "index_name": "myindex", + }, + }, + }, + } + + ast, err := transpiler.NewAST(sConfig) + require.NoError(t, err) + + grouped, err := groupByOutputs(ast) + require.NoError(t, err) + require.Equal(t, 2, len(grouped)) + + c1 := transpiler.MustNewAST(map[string]interface{}{ + "output": map[string]interface{}{ + "elasticsearch": map[string]interface{}{ + "hosts": "xxx", + "username": "myusername", + "password": "mypassword", + }, + }, + "streams": []map[string]interface{}{ + map[string]interface{}{ + "type": "log", + "path": "/var/log/hello.log", + }, + map[string]interface{}{ + "type": "metrics/system", + }, + }, + }) + + c2, _ := transpiler.NewAST(map[string]interface{}{ + "output": map[string]interface{}{ + "elasticsearch": map[string]interface{}{ + "hosts": "yyy", + "username": "anotherusername", + "password": "anotherpassword", + }, + }, + "streams": []map[string]interface{}{ + map[string]interface{}{ + "type": "log", + "path": "/var/log/infosec.log", + "output": map[string]interface{}{ + "use_output": "infosec1", + "pipeline": "custompipeline", + "index_name": "myindex", + }, + }, + }, + }) + + defaultConfig, ok := grouped["default"] + require.True(t, ok) + require.Equal(t, c1.Hash(), defaultConfig.Hash()) + + infosec1Config, ok := grouped["infosec1"] + + require.True(t, ok) + require.Equal(t, c2.Hash(), infosec1Config.Hash()) + }) +} + +func TestConfiguration(t *testing.T) { + testcases := map[string]struct { + programs []string + expected int + err bool + }{ + "single_config": { + programs: []string{"filebeat", "metricbeat"}, + expected: 2, + }, + "audit_config": { + programs: []string{"auditbeat"}, + expected: 1, + }, + "journal_config": { + programs: []string{"journalbeat"}, + expected: 1, + }, + "monitor_config": { + programs: []string{"heartbeat"}, + expected: 1, + }, + "enabled_true": { + programs: []string{"filebeat"}, + expected: 1, + }, + "enabled_false": { + expected: 0, + }, + "enabled_output_true": { + programs: []string{"filebeat"}, + expected: 1, + }, + "enabled_output_false": { + expected: 0, + }, + } + + for name, test := range testcases { + t.Run(name, func(t *testing.T) { + singleConfig, err := ioutil.ReadFile(filepath.Join("testdata", name+".yml")) + require.NoError(t, err) + + var m map[string]interface{} + err = yaml.Unmarshal(singleConfig, &m) + require.NoError(t, err) + + ast, err := transpiler.NewAST(m) + require.NoError(t, err) + + programs, err := Programs(ast) + if test.err { + require.Error(t, err) + return + } + require.NoError(t, err) + + require.Equal(t, 1, len(programs)) + + defPrograms, ok := programs["default"] + require.True(t, ok) + require.Equal(t, test.expected, len(defPrograms)) + + for _, program := range defPrograms { + programConfig, err := ioutil.ReadFile(filepath.Join( + "testdata", + name+"-"+strings.ToLower(program.Spec.Name)+".yml", + )) + + require.NoError(t, err) + var m map[string]interface{} + err = yamltest.FromYAML(programConfig, &m) + require.NoError(t, err) + + compareMap := &transpiler.MapVisitor{} + program.Config.Accept(compareMap) + + if !assert.True(t, cmp.Equal(m, compareMap.Content)) { + diff := cmp.Diff(m, compareMap.Content) + if diff != "" { + t.Errorf("%s-%s mismatch (-want +got):\n%s", name, program.Spec.Name, diff) + } + } + } + }) + } +} diff --git a/x-pack/agent/pkg/agent/program/spec_test.go b/x-pack/agent/pkg/agent/program/spec_test.go index 5f964ad9231f..4c76f7214272 100644 --- a/x-pack/agent/pkg/agent/program/spec_test.go +++ b/x-pack/agent/pkg/agent/program/spec_test.go @@ -6,106 +6,17 @@ package program import ( "io/ioutil" - "path/filepath" "regexp" "strings" "testing" - "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gopkg.in/yaml.v2" - "github.com/elastic/beats/x-pack/agent/pkg/agent/internal/yamltest" "github.com/elastic/beats/x-pack/agent/pkg/agent/transpiler" ) -func TestConfiguration(t *testing.T) { - testcases := map[string]struct { - programs []string - expected int - err bool - }{ - "single_config": { - programs: []string{"filebeat", "metricbeat"}, - expected: 2, - }, - "audit_config": { - programs: []string{"auditbeat"}, - expected: 1, - }, - "journal_config": { - programs: []string{"journalbeat"}, - expected: 1, - }, - "monitor_config": { - programs: []string{"heartbeat"}, - expected: 1, - }, - "enabled_true": { - programs: []string{"filebeat"}, - expected: 1, - }, - "enabled_false": { - expected: 0, - }, - "enabled_output_true": { - programs: []string{"filebeat"}, - expected: 1, - }, - "enabled_output_false": { - expected: 0, - }, - "multiple_output_true": { - err: true, - }, - } - - for name, test := range testcases { - t.Run(name, func(t *testing.T) { - singleConfig, err := ioutil.ReadFile(filepath.Join("testdata", name+".yml")) - require.NoError(t, err) - - var m map[string]interface{} - err = yaml.Unmarshal(singleConfig, &m) - require.NoError(t, err) - - ast, err := transpiler.NewAST(m) - require.NoError(t, err) - - programs, err := Programs(ast) - if test.err { - require.Error(t, err) - return - } - require.NoError(t, err) - require.Equal(t, test.expected, len(programs)) - - for _, program := range programs { - programConfig, err := ioutil.ReadFile(filepath.Join( - "testdata", - name+"-"+strings.ToLower(program.Spec.Name)+".yml", - )) - - require.NoError(t, err) - var m map[string]interface{} - err = yamltest.FromYAML(programConfig, &m) - require.NoError(t, err) - - compareMap := &transpiler.MapVisitor{} - program.Config.Accept(compareMap) - - if !assert.True(t, cmp.Equal(m, compareMap.Content)) { - diff := cmp.Diff(m, compareMap.Content) - if diff != "" { - t.Errorf("%s-%s mismatch (-want +got):\n%s", name, program.Spec.Name, diff) - } - } - } - }) - } -} - func TestSerialization(t *testing.T) { spec := Spec{ Name: "hello", diff --git a/x-pack/agent/pkg/agent/program/supported.go b/x-pack/agent/pkg/agent/program/supported.go index b07271fbb48d..e555b05884f9 100644 --- a/x-pack/agent/pkg/agent/program/supported.go +++ b/x-pack/agent/pkg/agent/program/supported.go @@ -17,7 +17,7 @@ func init() { // spec/heartbeat.yml // spec/journalbeat.yml // spec/metricbeat.yml - unpacked := packer.MustUnpack("eJzMVlGTokYQfs/P2OdUDobTOlN1D8IeA+hixHUY5o2ZcQEdhCiikMp/T42goutebus2VXnYqnVgmu7+vq+//uthmy/Yp3DHk4IuwuK3KhUPvz/Q1CzIcxZNMVqF2EuIbypG6hbkMYuC7tnaUSlE9STKChuSnFqeYGIAAv+gEuy+cCCU0BxUxOdiYQ37RjKMbEPfELzq25bXY3A+so1hZFuOmCT6ook9LcfJtj13BYVoyeGgmiR6Tte6yq2n0XimFwE+x9sH2Mu6z9uYp9wi+Xs80xWWmjsGDjGH6I3cmj8GTSV8zCJuiT2ZZufz2zxHxufN6Oo5zzmM+rYxPRiJEgVgsF88Z00+kFQUKH0bih2zkML2zT0Ov/RtaKoECiVIUcyHspdqzB7be5YeH2N23zF6/1ZrRfxeTP35VV1k7ZT0+V5dXjlJ9IICT6VwPnp9X09ZOigmiR5z38s5dESAp7vQ7+059ur2HYXWWUT9wYr7B8H2HU4ss1d4sro9M5RrTjX5FjYcqNzSVT4817QhWNTyLpnmWgjRbpLodoBdh0NUsCoSC7W4yu+ci6oORrNh6jTvTwJfFUzT4wDMMwe03zHY1ja4oNCsORRLBlDMUjdzqr3Mc81SmePTemwM1yEw0xB8O/5P4WDNNC9mIFqPptnXh18bTb0kYnFHUl4qtsQ/pjcKgLqaJHoa+gfRQJ9Xx3Tr7AMoPv8xSkuqWfoFovM5AsTvKWNAcgrnfRseSrLv3JN0wkTQ9bRkmrcK/c9923IVAsWu+13b0AVP0Y4bAyXQhlL2Sza8H4f73n6S6Cqxhje5HEpSDVYUuBuC7b4NvTIAhWDRdRwJxVhzBIGixpq7pRrfUsD6Urby7CYv+axkmqjlvUmi1wvsyjPZe4mJQrDzwlNzy/1LjxlAW+K7CtXsV1IOAepNEl1ZYF20uLWj8Kl/ofWlj+OZXnPo5fSCX6d2JeLQ3V/uXdfcSNTLWSdWJ/8l1fTeKeeT7O/JlXT4eKnbPvGuJpKbmleydbcGyQn0EXK9cP880lz+MltFThJEM2jWz1I32N0HvitGBi859vYcT4/SI/4hZpqXB5orAuwsw0bCTV8qtpW5OaCISVrETrWPHHAoieYqAZayXo1OUo0X4eae/c0gitn62v7C7tkH2x/1B7sQeyVLvuxG++/y7Q2LuOYjW6PtFfcsrwr8Xk2hqRA00ELsZVjqBaDP51jG1bzpcLmdI+m8b5u9goJeLrEdV9Gn+ePh+N4fyVt26LVx3mOJscS0Y4k/ZiVX+PwcNzPim9Wt1WGs5LYRpPa3OGaKtEhUM2guySxaUw0pso6Ga2gbYFcJfbeWcQIQHTl7tpFjLnwT+GQTzI683VLAaw7NOjRYbkRfz1ayzHabdSjuMTQdqCztxRReOcqfVEMVTc3tHVf5bydbNTh/+31o5yXHzi7wDy1yynEycYiq1/k5Xce7qfV8fpoUF2YA1GNykqXzEQexoMvsOGFm2BMUP2UOLmh43c+PQb1B+jyVfhT1dFFsEnYH9GcfKSwVyxbcJfWnfRuqgltOHoCGCBfpXxqJNRJTCwmmviF9WIjFzXZ4kjvBXhX6T//7zZy2vXnfWDqNR0e0Pe6OwtPm+2oUNsRVS2KhLZnKEeWWzFrdbsw3tTebdtNP90x2lqKd3EpvLZt2sX619Xe3dlS/QzQ33/+pMXkVa5y2/cA/Z+NH625XuUZY37Hxv3/5JwAA//82Hfvb") + unpacked := packer.MustUnpack("eJzUV9Fzo7YTfv/9GXn9dVoMZ/fozD0YcgiIgxucSEJvSHIAW4BrsAl0+r93AGODY18vTWZu+uCZREi73+5++6305022WbJf/B2Pcrr085/LWNz8dkNjIyePafCA4drHbkSQIemxk5PbNPD6a4k9ogBW8yDNLUA21HQFE6rsoZcRwc4zl4XkG2pJEBdLczrRo2lg6dqW4PXEMt0xA093lj4NLNMW80hbtrYf9rMoO6w7ggK44kAt55FWcdMWHhpVeiQFFBnFPEiDeh8DhuTfpgFT3JIgI2fFYT2BGcH3B7/T3DLd0kPjigJDIlBVfOymWLEFkeEndpu2PpufVnjYTeeRJi2xJg7nW5vx08QyxiFH7obrn9NZlG3635uYb9PAvv16N1toK6po4w4nSew9fXyNk4PPEwsYIwKE5MUw5NM6n6PwiMnUQg6C4R593MtHg7nk6EWwqotDClgMdx4aic5/8wOkpLI0zH8bs0SrNKBIXdd26hyfn+/w9zkwSxyJxTCk+P6A1d3PIy2nsttwo7ZDYpFxAMtjvWSYEeRIVLGrbm220NqcAlt4+OEYA1WgxEwoHWsIcrHEzVmhJzwl6NPE+hqGTBISQaPqToe/InmQpyMWjKWNpXtxu38cUgQrBowVWQRJ58cui8CWYeZhR/KRUxFklJ4cJLNpjZOXBLkbVrLM0vnWQ2TrLZq/MyrzigOj8nW20YMvX25+anvrORLLC63lxiIjqA3Vk0freaTFPnoRbfk3x1LOFlru4WPrdLT8x9Jz0w5p4mQedofl76j0uvy5h7WCYGvSp7Ola4LHcMd1tcE3j7SMyqx3rqYUlAkaSzPFkQgQuwYfcDe077emDyaCJg97Dpyibi1PmV6xA9fsNg04coszLLXvPQHqypdhLQlrKjtbgq0zOy97UqptqUZqRYGqUKAm80hr14ohrvrbTHHGTH7Zk6puT1GfSfrUxQoJqQlPOb4qTdck7ux8Te3GRydNbe5O/8M1mx5bo5crKejiP+21BQGi6tlqYylqGXH2zFx3/IlZrOYX8DatT3qcnC3qXNUt8nR+9sTT1xLU2PHRuODYfUvr97h/XNtz7Ba8JwW+DMdMcfcsfrrjcijoKg0WwKgW2BUU36c2zmkvhl2H411tr08TFtdyd99JwHe1fbj0t5dG6gLAkCXDker31z54pFKk7nzs7ln0eXdXnNfAmtS86+T7I7QGK3zDQfjMYpgQHBbfqT1lnRscj/Y0FjWu/d2t+vB7u+//V0fswc7bxmwoPGyfxqxpi0Nt7gbjsb/vCs+PuU2a3NUclk76qVWk5rTSfD9dQYb+38X1Pm9mx9zZP2TMrdLdNvHFJcbH6ojF45CCwbT7gyqwpLGRXZh4h0zazzw2Mo5g9Vo9hsxtK5eL5WMacFMU5OFQ8bPzs0U7QfwO0/RtCumfxfINlRzG93FKeZa3dzJoGM/HKOa/ZFC8zLcRu0CgRwQlFovVgSgrih4mFhgJbtobT24LcbrpXxjZoys3/QuEOdzuc4Ld0kf3/4WXyBHrWyTzo14InqwWy3PpPG/Mw4ugxem0Y+4brwvar/d16W0wsKI3Jlevr2aD69aAR+9rnDOMP+aF8df//g4AAP//Oi1UHQ==") for f, v := range unpacked { s, err := NewSpecFromBytes(v) diff --git a/x-pack/agent/pkg/agent/program/testdata/audit_config.yml b/x-pack/agent/pkg/agent/program/testdata/audit_config.yml index 03cafd93b461..fbc963c97354 100644 --- a/x-pack/agent/pkg/agent/program/testdata/audit_config.yml +++ b/x-pack/agent/pkg/agent/program/testdata/audit_config.yml @@ -1,4 +1,10 @@ -inputs: +outputs: + default: + type: elasticsearch + hosts: [127.0.0.1:9200, 127.0.0.1:9300] + username: elastic + password: changeme +streams: - type: log/file ignore_older: 123s - type: audit/auditd @@ -20,8 +26,3 @@ management: host: "localhost" config: reload: 123 -output: - elasticsearch: - hosts: [127.0.0.1:9200, 127.0.0.1:9300] - username: elastic - password: changeme diff --git a/x-pack/agent/pkg/agent/program/testdata/enabled_false.yml b/x-pack/agent/pkg/agent/program/testdata/enabled_false.yml index 7ab37e596134..08ef6b70cd9d 100644 --- a/x-pack/agent/pkg/agent/program/testdata/enabled_false.yml +++ b/x-pack/agent/pkg/agent/program/testdata/enabled_false.yml @@ -1,4 +1,4 @@ -inputs: +streams: - type: event/file enabled: false paths: @@ -8,8 +8,9 @@ management: host: "localhost" config: reload: 123 -output: - elasticsearch: +outputs: + default: + type: elasticsearch hosts: [127.0.0.1:9200, 127.0.0.1:9300] username: elastic password: changeme diff --git a/x-pack/agent/pkg/agent/program/testdata/enabled_output_false.yml b/x-pack/agent/pkg/agent/program/testdata/enabled_output_false.yml index 143dc17eeb3a..94dc21d8bc8c 100644 --- a/x-pack/agent/pkg/agent/program/testdata/enabled_output_false.yml +++ b/x-pack/agent/pkg/agent/program/testdata/enabled_output_false.yml @@ -1,4 +1,4 @@ -inputs: +streams: - type: event/file paths: - /var/log/hello1.log @@ -7,8 +7,9 @@ management: host: "localhost" config: reload: 123 -output: - elasticsearch: +outputs: + default: + type: elasticsearch enabled: false hosts: [127.0.0.1:9200, 127.0.0.1:9300] username: elastic diff --git a/x-pack/agent/pkg/agent/program/testdata/enabled_output_true.yml b/x-pack/agent/pkg/agent/program/testdata/enabled_output_true.yml index d80e194c503a..9b3fb3c065aa 100644 --- a/x-pack/agent/pkg/agent/program/testdata/enabled_output_true.yml +++ b/x-pack/agent/pkg/agent/program/testdata/enabled_output_true.yml @@ -1,4 +1,4 @@ -inputs: +streams: - type: event/file paths: - /var/log/hello1.log @@ -7,8 +7,9 @@ management: host: "localhost" config: reload: 123 -output: - elasticsearch: +outputs: + default: + type: elasticsearch enabled: true hosts: [127.0.0.1:9200, 127.0.0.1:9300] username: elastic diff --git a/x-pack/agent/pkg/agent/program/testdata/enabled_true.yml b/x-pack/agent/pkg/agent/program/testdata/enabled_true.yml index c68bada60b94..ddbcb2929df5 100644 --- a/x-pack/agent/pkg/agent/program/testdata/enabled_true.yml +++ b/x-pack/agent/pkg/agent/program/testdata/enabled_true.yml @@ -1,4 +1,4 @@ -inputs: +streams: - type: event/file enabled: true paths: @@ -8,8 +8,9 @@ management: host: "localhost" config: reload: 123 -output: - elasticsearch: +outputs: + default: + type: elasticsearch hosts: [127.0.0.1:9200, 127.0.0.1:9300] username: elastic password: changeme diff --git a/x-pack/agent/pkg/agent/program/testdata/journal_config.yml b/x-pack/agent/pkg/agent/program/testdata/journal_config.yml index 7891b0ff7122..732ebab6fb2e 100644 --- a/x-pack/agent/pkg/agent/program/testdata/journal_config.yml +++ b/x-pack/agent/pkg/agent/program/testdata/journal_config.yml @@ -1,4 +1,4 @@ -inputs: +streams: - type: log/journal paths: [] backoff: 1s @@ -13,8 +13,9 @@ management: host: "localhost" config: reload: 123 -output: - elasticsearch: +outputs: + default: + type: elasticsearch hosts: [127.0.0.1:9200, 127.0.0.1:9300] username: elastic password: changeme diff --git a/x-pack/agent/pkg/agent/program/testdata/monitor_config-heartbeat.yml b/x-pack/agent/pkg/agent/program/testdata/monitor_config-heartbeat.yml index d9215783a71b..59feb7568854 100644 --- a/x-pack/agent/pkg/agent/program/testdata/monitor_config-heartbeat.yml +++ b/x-pack/agent/pkg/agent/program/testdata/monitor_config-heartbeat.yml @@ -1,5 +1,5 @@ heartbeat: - inputs: + monitors: - type: icmp name: icmp schedule: "*/5 * * * * * *" diff --git a/x-pack/agent/pkg/agent/program/testdata/monitor_config.yml b/x-pack/agent/pkg/agent/program/testdata/monitor_config.yml index f663f6c48dde..97e5d9d8980a 100644 --- a/x-pack/agent/pkg/agent/program/testdata/monitor_config.yml +++ b/x-pack/agent/pkg/agent/program/testdata/monitor_config.yml @@ -1,4 +1,4 @@ -inputs: +streams: - type: monitor/icmp name: icmp schedule: '*/5 * * * * * *' @@ -15,8 +15,9 @@ management: host: "localhost" config: reload: 123 -output: - elasticsearch: +outputs: + default: + type: elasticsearch hosts: [127.0.0.1:9200, 127.0.0.1:9300] username: elastic password: changeme diff --git a/x-pack/agent/pkg/agent/program/testdata/multiple_output_true.yml b/x-pack/agent/pkg/agent/program/testdata/multiple_output_true.yml deleted file mode 100644 index 15519f04ad8e..000000000000 --- a/x-pack/agent/pkg/agent/program/testdata/multiple_output_true.yml +++ /dev/null @@ -1,18 +0,0 @@ -inputs: - - type: event/file - paths: - - /var/log/hello1.log - - /var/log/hello2.log -management: - host: "localhost" -config: - reload: 123 -output: - elasticsearch: - enabled: true - hosts: [127.0.0.1:9200, 127.0.0.1:9300] - username: elastic - password: changeme - logstash: - enabled: true - hosts: [127.0.0.1:9200, 127.0.0.1:9300] diff --git a/x-pack/agent/pkg/agent/program/testdata/single_config.yml b/x-pack/agent/pkg/agent/program/testdata/single_config.yml index c89ce0b7a202..888a769b7f70 100644 --- a/x-pack/agent/pkg/agent/program/testdata/single_config.yml +++ b/x-pack/agent/pkg/agent/program/testdata/single_config.yml @@ -1,4 +1,4 @@ -inputs: +streams: - type: metric/docker setting: one - type: metric/apache @@ -11,8 +11,9 @@ management: host: "localhost" config: reload: 123 -output: - elasticsearch: +outputs: + default: + type: elasticsearch hosts: [127.0.0.1:9200, 127.0.0.1:9300] username: elastic password: changeme diff --git a/x-pack/agent/pkg/agent/transpiler/ast.go b/x-pack/agent/pkg/agent/transpiler/ast.go index 4b1f29036f1d..d0613e2b775c 100644 --- a/x-pack/agent/pkg/agent/transpiler/ast.go +++ b/x-pack/agent/pkg/agent/transpiler/ast.go @@ -66,11 +66,10 @@ type Dict struct { // Find takes a string which is a key and try to find the elements in the associated K/V. func (d *Dict) Find(key string) (Node, bool) { - i := sort.Search(len(d.value), func(i int) bool { - return d.value[i].(*Key).name >= key - }) - if i < len(d.value) && d.value[i].(*Key).name == key { - return d.value[i], true + for _, i := range d.value { + if i.(*Key).name == key { + return i, true + } } return nil, false } diff --git a/x-pack/agent/pkg/agent/transpiler/rules.go b/x-pack/agent/pkg/agent/transpiler/rules.go index f2a3f7457b36..9f8e497a38de 100644 --- a/x-pack/agent/pkg/agent/transpiler/rules.go +++ b/x-pack/agent/pkg/agent/transpiler/rules.go @@ -62,7 +62,7 @@ func (r *RuleList) MarshalYAML() (interface{}, error) { case *FilterValuesWithRegexpRule: name = "filter_values_with_regexp" default: - return nil, fmt.Errorf("unkown rule of type %T", rule) + return nil, fmt.Errorf("unknown rule of type %T", rule) } subdoc := map[string]Rule{ @@ -424,7 +424,7 @@ func (r *FilterRule) Apply(ast *AST) error { mergedAST := &AST{root: &Dict{}} var err error for _, selector := range r.Selectors { - newAST, ok := Select(ast, selector) + newAST, ok := Select(ast.Clone(), selector) if !ok { continue } diff --git a/x-pack/agent/spec/auditbeat.yml b/x-pack/agent/spec/auditbeat.yml index c73e2909f0f7..c07f4a8d1794 100644 --- a/x-pack/agent/spec/auditbeat.yml +++ b/x-pack/agent/spec/auditbeat.yml @@ -4,25 +4,25 @@ rules: - filter_values_with_regexp: key: type re: ^audit/.+ - selector: inputs + selector: streams - map: - path: inputs + path: streams rules: - translate_with_regexp: path: type re: ^audit/(.+) with: $1 - copy: - from: inputs + from: streams to: auditbeat - map: - path: auditbeat.inputs + path: auditbeat.streams rules: - rename: from: type to: module - rename: - from: auditbeat.inputs + from: auditbeat.streams to: modules - filter: selectors: diff --git a/x-pack/agent/spec/filebeat.yml b/x-pack/agent/spec/filebeat.yml index 86a6032c085e..6e86f5808392 100644 --- a/x-pack/agent/spec/filebeat.yml +++ b/x-pack/agent/spec/filebeat.yml @@ -2,7 +2,7 @@ name: Filebeat cmd: filebeat rules: - map: - path: inputs + path: streams rules: - translate: path: type @@ -15,7 +15,7 @@ rules: log/redis_slowlog: redis log/syslog: syslog - filter_values: - selector: inputs + selector: streams key: type values: - log @@ -26,8 +26,11 @@ rules: - redis - syslog - copy: - from: inputs + from: streams to: filebeat +- rename: + from: filebeat.streams + to: inputs - filter: selectors: - filebeat diff --git a/x-pack/agent/spec/heartbeat.yml b/x-pack/agent/spec/heartbeat.yml index c939ebf9ab18..14aaa1d3da47 100644 --- a/x-pack/agent/spec/heartbeat.yml +++ b/x-pack/agent/spec/heartbeat.yml @@ -4,21 +4,24 @@ rules: - filter_values_with_regexp: key: type re: ^monitor/.+ - selector: inputs + selector: streams - map: - path: inputs + path: streams rules: - translate_with_regexp: path: type re: ^monitor/(?P.+) with: $type - copy: - from: inputs + from: streams to: heartbeat +- rename: + from: heartbeat.streams + to: monitors - filter: selectors: - heartbeat - output - keystore -when: HasItems(%{[heartbeat.inputs]}) && HasNamespace('output', 'elasticsearch', 'redis', - 'kafka', 'logstash') +when: HasItems(%{[heartbeat.monitors]}) && HasNamespace('output', 'elasticsearch', + 'redis', 'kafka', 'logstash') diff --git a/x-pack/agent/spec/journalbeat.yml b/x-pack/agent/spec/journalbeat.yml index 40919e234015..6c5045b87bf8 100644 --- a/x-pack/agent/spec/journalbeat.yml +++ b/x-pack/agent/spec/journalbeat.yml @@ -2,13 +2,16 @@ name: Journalbeat cmd: journalbeat rules: - filter_values: - selector: inputs + selector: streams key: type values: - log/journal - copy: - from: inputs + from: streams to: journalbeat +- rename: + from: journalbeat.streams + to: inputs - filter: selectors: - journalbeat diff --git a/x-pack/agent/spec/metricbeat.yml b/x-pack/agent/spec/metricbeat.yml index a3a38a85d510..bc3a1eaba430 100644 --- a/x-pack/agent/spec/metricbeat.yml +++ b/x-pack/agent/spec/metricbeat.yml @@ -4,9 +4,9 @@ rules: - filter_values_with_regexp: key: type re: ^metric/.+ - selector: inputs + selector: streams - map: - path: inputs + path: streams rules: - translate_with_regexp: path: type @@ -16,10 +16,10 @@ rules: from: type to: module - copy: - from: inputs + from: streams to: metricbeat - rename: - from: metricbeat.inputs + from: metricbeat.streams to: modules - filter: selectors: