Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion x-pack/agent/pkg/agent/transpiler/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,12 @@ func Insert(a *AST, node Node, to Selector) error {
return fmt.Errorf("expecting Key and received %T", current)
}

d.value = &Dict{[]Node{node}}
switch node.(type) {
case *List:
d.value = node
default:
d.value = &Dict{[]Node{node}}
}
return nil
}

Expand Down
95 changes: 70 additions & 25 deletions x-pack/agent/pkg/agent/transpiler/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ func (r *RuleList) MarshalYAML() (interface{}, error) {
name = "filter_values"
case *FilterValuesWithRegexpRule:
name = "filter_values_with_regexp"
case *ExtractListItemRule:
name = "extract_list_items"

default:
return nil, fmt.Errorf("unknown rule of type %T", rule)
}
Expand Down Expand Up @@ -110,46 +113,28 @@ func (r *RuleList) UnmarshalYAML(unmarshal func(interface{}) error) error {
switch name {
case "copy":
r = &CopyRule{}
if err := unpack(fields, r); err != nil {
return err
}
case "rename":
r = &RenameRule{}
if err := unpack(fields, r); err != nil {
return err
}
case "translate":
r = &TranslateRule{}
if err := unpack(fields, r); err != nil {
return err
}
case "translate_with_regexp":
r = &TranslateWithRegexpRule{}
if err := unpack(fields, r); err != nil {
return err
}
case "map":
r = &MapRule{}
if err := unpack(fields, r); err != nil {
return err
}
case "filter":
r = &FilterRule{}
if err := unpack(fields, r); err != nil {
return err
}
case "filter_values":
r = &FilterValuesRule{}
if err := unpack(fields, r); err != nil {
return err
}
case "filter_values_with_regexp":
r = &FilterValuesWithRegexpRule{}
if err := unpack(fields, r); err != nil {
return err
}
case "extract_list_items":
r = &ExtractListItemRule{}
default:
return fmt.Errorf("unkown rule of type %s", name)
return fmt.Errorf("unknown rule of type %s", name)
}

if err := unpack(fields, r); err != nil {
return err
}

rules = append(rules, r)
Expand All @@ -158,6 +143,66 @@ func (r *RuleList) UnmarshalYAML(unmarshal func(interface{}) error) error {
return nil
}

// ExtractListItemRule extract items with specified name from a list of maps.
// The result is store in a new array.
// Example:
// Source: {items: []List{ map{"key": "val1"}, map{"key", "val2"} } }
// extract-list-item -path:items -item:key -to:keys
// result:
// {items: []List{ map{"key": "val1"}, map{"key", "val2"} }, keys: []List {"val1", "val2"} }
type ExtractListItemRule struct {
Path Selector
Item string
To string
}

// Apply extracts items from array.
func (r *ExtractListItemRule) Apply(ast *AST) error {
node, found := Lookup(ast, r.Path)
if !found {
return nil
}

nodeVal := node.Value()
if nodeVal == nil {
return nil
}

l, isList := nodeVal.(*List)
if !isList {
return nil
}

newList := &List{
value: make([]Node, 0, len(l.value)),
}

for _, n := range l.value {
in, found := n.Find(r.Item)
if !found {
continue
}

vn, ok := in.Value().(Node)
if !ok {
continue
}

newList.value = append(newList.value, vn.Clone())
}

return Insert(ast, newList, r.To)
}

// ExtractListItem creates a ExtractListItemRule
func ExtractListItem(path Selector, item, target string) *ExtractListItemRule {
Comment thread
michalpristas marked this conversation as resolved.
return &ExtractListItemRule{
Path: path,
Item: item,
To: target,
}
}

// RenameRule takes a selectors and will rename the last path of a Selector to a new name.
type RenameRule struct {
From Selector
Expand Down
53 changes: 53 additions & 0 deletions x-pack/agent/pkg/agent/transpiler/rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,54 @@ func TestRules(t *testing.T) {
expectedYAML string
rule Rule
}{

"extract items from array": {
givenYAML: `
streams:
- name: MySQL error log
input:
type: file
path: /var/log/mysql/error.log
- name: MySQL access log
input:
type: file
path: /var/log/mysql/access.log
- name: MySQL metrics
input:
type: mysql
host: localhost
port: 3306
`,
expectedYAML: `
streams:
- name: MySQL error log
input:
type: file
path: /var/log/mysql/error.log
- name: MySQL access log
input:
type: file
path: /var/log/mysql/access.log
- name: MySQL metrics
input:
type: mysql
host: localhost
port: 3306
inputs:
- type: file
path: /var/log/mysql/error.log
- type: file
path: /var/log/mysql/access.log
- type: mysql
host: localhost
port: 3306
`,
rule: &RuleList{
Rules: []Rule{
ExtractListItem("streams", "input", "inputs"),
},
},
},
"two level rename": {
givenYAML: `
output:
Expand Down Expand Up @@ -391,6 +439,7 @@ func TestSerialization(t *testing.T) {
Filter("f1", "f2"),
FilterValues("select-v", "key-v", "v1", "v2"),
FilterValuesWithRegexp("inputs", "type", regexp.MustCompile("^metric/.*")),
ExtractListItem("path.p", "item", "target"),
)

y := `- rename:
Expand Down Expand Up @@ -431,6 +480,10 @@ func TestSerialization(t *testing.T) {
key: type
re: ^metric/.*
selector: inputs
- extract_list_items:
path: path.p
item: item
to: target
`

t.Run("serialize_rules", func(t *testing.T) {
Expand Down