Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions x-pack/agent/pkg/agent/application/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ func emitter(log *logger.Logger, router *router) emitterFunc {
return err
}

grouped := map[routingKey][]program.Program{
defautlRK: programsToRun,
}

return router.Dispatch(ast.HashStr(), grouped)
return router.Dispatch(ast.HashStr(), programsToRun)
}
}
174 changes: 172 additions & 2 deletions x-pack/agent/pkg/agent/program/program.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
package program

import (
"fmt"
"strings"

"github.com/pkg/errors"

"github.com/elastic/beats/x-pack/agent/pkg/agent/transpiler"
"github.com/elastic/beats/x-pack/agent/pkg/boolexp"
)
Expand Down Expand Up @@ -45,10 +48,27 @@ func (p *Program) Configuration() map[string]interface{} {

// Programs take a Tree representation of the main configuration and apply all the different
// programs rules and generate individual configuration from the rules.
func Programs(singleConfig *transpiler.AST) ([]Program, error) {
func Programs(singleConfig *transpiler.AST) (map[string][]Program, error) {
grouped, err := groupByOutputs(singleConfig)
if err != nil {
return nil, errors.Wrap(err, "fail to extract program configuration")
}

groupedPrograms := make(map[string][]Program)
for k, config := range grouped {
programs, err := detectPrograms(config)
if err != nil {
return nil, errors.Wrap(err, "fail to generate program configuration")
}
groupedPrograms[k] = programs
}

return groupedPrograms, nil
}

func detectPrograms(singleConfig *transpiler.AST) ([]Program, error) {
programs := make([]Program, 0)
for _, spec := range Supported {
// TODO: better error handling here.
specificAST := singleConfig.Clone()
err := spec.Rules.Apply(specificAST)
if err != nil {
Expand Down Expand Up @@ -80,6 +100,7 @@ func Programs(singleConfig *transpiler.AST) ([]Program, error) {
programs = append(programs, program)
}
return programs, nil

}

// KnownProgramNames returns a list of runnable programs by the agent.
Expand All @@ -90,3 +111,152 @@ func KnownProgramNames() []string {
}
return names
}

func groupByOutputs(single *transpiler.AST) (map[string]*transpiler.AST, error) {
const (
outputsKey = "outputs"
outputKey = "output"
streamsKey = "streams"
typeKey = "type"
)

if _, found := transpiler.Select(single, outputsKey); !found {
return nil, errors.New("invalid configuration missing outputs configuration")
}

// Normalize using an intermediate map.
normMap, err := single.Map()
if err != nil {
return nil, errors.Wrap(err, "could not read configuration")
}

// Recreates multiple configuration grouped by the name of the outputs.
// Each configuration will be started into his own operator with the same name as the output.
grouped := make(map[string]map[string]interface{})

m, ok := normMap[outputsKey]
if !ok {
return nil, errors.New("fail to received a list of configured outputs")
}

out, ok := m.(map[string]interface{})
if !ok {
return nil, fmt.Errorf(
"invalid outputs configuration received, expecting a map not a %T",
m,
)
}

for k, v := range out {
outputsOptions, ok := v.(map[string]interface{})
if !ok {
return nil, errors.New("invalid type for output configuration block")
}

t, ok := outputsOptions[typeKey]
if !ok {
return nil, fmt.Errorf("missing output type named output %s", k)
}

n, ok := t.(string)
if !ok {
return nil, fmt.Errorf("invalid type received %T and expecting a string", t)
}

delete(outputsOptions, typeKey)

// Propagate global configuration to each individual configuration.
clone := cloneMap(normMap)
delete(clone, outputsKey)
clone[outputKey] = map[string]interface{}{n: v}
clone[streamsKey] = make([]map[string]interface{}, 0)

grouped[k] = clone
}

s, ok := normMap[streamsKey]
if !ok {
return nil, errors.New("no streams are configured")
}

list, ok := s.([]interface{})
if !ok {
return nil, errors.New("fail to receive a list of configured streams")
}

for _, item := range list {
stream, ok := item.(map[string]interface{})
if !ok {
return nil, fmt.Errorf(
"invalid type for stream expecting a map of options and received %T",
item,
)
}
targetName := findOutputName(stream)

// Do we have configuration for that specific outputs if not we fail to load the configuration.
config, ok := grouped[targetName]
if !ok {
return nil, fmt.Errorf("unknown configuration output with name %s", targetName)
}

streams := config[streamsKey].([]map[string]interface{})
streams = append(streams, stream)

config[streamsKey] = streams
grouped[targetName] = config
}

transpiled := make(map[string]*transpiler.AST)

for name, group := range grouped {
if len(group[streamsKey].([]map[string]interface{})) == 0 {
continue
}

ast, err := transpiler.NewAST(group)
if err != nil {
return nil, errors.Wrapf(err, "fail to generate configuration for output name %s", name)
}

transpiled[name] = ast
}

return transpiled, nil
}

func findOutputName(m map[string]interface{}) string {
const (
defaultOutputName = "default"
outputKey = "output"
useOutputKey = "use_output"
)

output, ok := m[outputKey]
if !ok {
return defaultOutputName
}

o := output.(map[string]interface{})

name, ok := o[useOutputKey]
if !ok {
return defaultOutputName
}

return name.(string)
}

func cloneMap(m map[string]interface{}) map[string]interface{} {
newMap := make(map[string]interface{})
for k, v := range m {
sV, ok := v.(map[string]interface{})
if ok {
newMap[k] = cloneMap(sV)
continue
}
newMap[k] = v
}

return newMap
}
Loading