diff --git a/x-pack/agent/pkg/agent/transpiler/ast.go b/x-pack/agent/pkg/agent/transpiler/ast.go index 6305dcbc0237..f7170a916421 100644 --- a/x-pack/agent/pkg/agent/transpiler/ast.go +++ b/x-pack/agent/pkg/agent/transpiler/ast.go @@ -694,7 +694,12 @@ func Insert(a *AST, node Node, to Selector) error { return fmt.Errorf("expecting Key and received %T", current) } - d.value = &Dict{[]Node{node}} + switch node.(type) { + case *List: + d.value = node + default: + d.value = &Dict{[]Node{node}} + } return nil } diff --git a/x-pack/agent/pkg/agent/transpiler/rules.go b/x-pack/agent/pkg/agent/transpiler/rules.go index 82dabadf31e5..713477a21d1c 100644 --- a/x-pack/agent/pkg/agent/transpiler/rules.go +++ b/x-pack/agent/pkg/agent/transpiler/rules.go @@ -61,6 +61,9 @@ func (r *RuleList) MarshalYAML() (interface{}, error) { name = "filter_values" case *FilterValuesWithRegexpRule: name = "filter_values_with_regexp" + case *ExtractListItemRule: + name = "extract_list_items" + default: return nil, fmt.Errorf("unknown rule of type %T", rule) } @@ -110,46 +113,28 @@ func (r *RuleList) UnmarshalYAML(unmarshal func(interface{}) error) error { switch name { case "copy": r = &CopyRule{} - if err := unpack(fields, r); err != nil { - return err - } case "rename": r = &RenameRule{} - if err := unpack(fields, r); err != nil { - return err - } case "translate": r = &TranslateRule{} - if err := unpack(fields, r); err != nil { - return err - } case "translate_with_regexp": r = &TranslateWithRegexpRule{} - if err := unpack(fields, r); err != nil { - return err - } case "map": r = &MapRule{} - if err := unpack(fields, r); err != nil { - return err - } case "filter": r = &FilterRule{} - if err := unpack(fields, r); err != nil { - return err - } case "filter_values": r = &FilterValuesRule{} - if err := unpack(fields, r); err != nil { - return err - } case "filter_values_with_regexp": r = &FilterValuesWithRegexpRule{} - if err := unpack(fields, r); err != nil { - return err - } + case "extract_list_items": + r = &ExtractListItemRule{} default: - return fmt.Errorf("unkown rule of type %s", name) + return fmt.Errorf("unknown rule of type %s", name) + } + + if err := unpack(fields, r); err != nil { + return err } rules = append(rules, r) @@ -158,6 +143,66 @@ func (r *RuleList) UnmarshalYAML(unmarshal func(interface{}) error) error { return nil } +// ExtractListItemRule extract items with specified name from a list of maps. +// The result is store in a new array. +// Example: +// Source: {items: []List{ map{"key": "val1"}, map{"key", "val2"} } } +// extract-list-item -path:items -item:key -to:keys +// result: +// {items: []List{ map{"key": "val1"}, map{"key", "val2"} }, keys: []List {"val1", "val2"} } +type ExtractListItemRule struct { + Path Selector + Item string + To string +} + +// Apply extracts items from array. +func (r *ExtractListItemRule) Apply(ast *AST) error { + node, found := Lookup(ast, r.Path) + if !found { + return nil + } + + nodeVal := node.Value() + if nodeVal == nil { + return nil + } + + l, isList := nodeVal.(*List) + if !isList { + return nil + } + + newList := &List{ + value: make([]Node, 0, len(l.value)), + } + + for _, n := range l.value { + in, found := n.Find(r.Item) + if !found { + continue + } + + vn, ok := in.Value().(Node) + if !ok { + continue + } + + newList.value = append(newList.value, vn.Clone()) + } + + return Insert(ast, newList, r.To) +} + +// ExtractListItem creates a ExtractListItemRule +func ExtractListItem(path Selector, item, target string) *ExtractListItemRule { + return &ExtractListItemRule{ + Path: path, + Item: item, + To: target, + } +} + // RenameRule takes a selectors and will rename the last path of a Selector to a new name. type RenameRule struct { From Selector diff --git a/x-pack/agent/pkg/agent/transpiler/rules_test.go b/x-pack/agent/pkg/agent/transpiler/rules_test.go index d549c253e7c3..9c2520ef5531 100644 --- a/x-pack/agent/pkg/agent/transpiler/rules_test.go +++ b/x-pack/agent/pkg/agent/transpiler/rules_test.go @@ -22,6 +22,54 @@ func TestRules(t *testing.T) { expectedYAML string rule Rule }{ + + "extract items from array": { + givenYAML: ` +streams: + - name: MySQL error log + input: + type: file + path: /var/log/mysql/error.log + - name: MySQL access log + input: + type: file + path: /var/log/mysql/access.log + - name: MySQL metrics + input: + type: mysql + host: localhost + port: 3306 +`, + expectedYAML: ` +streams: + - name: MySQL error log + input: + type: file + path: /var/log/mysql/error.log + - name: MySQL access log + input: + type: file + path: /var/log/mysql/access.log + - name: MySQL metrics + input: + type: mysql + host: localhost + port: 3306 +inputs: + - type: file + path: /var/log/mysql/error.log + - type: file + path: /var/log/mysql/access.log + - type: mysql + host: localhost + port: 3306 +`, + rule: &RuleList{ + Rules: []Rule{ + ExtractListItem("streams", "input", "inputs"), + }, + }, + }, "two level rename": { givenYAML: ` output: @@ -391,6 +439,7 @@ func TestSerialization(t *testing.T) { Filter("f1", "f2"), FilterValues("select-v", "key-v", "v1", "v2"), FilterValuesWithRegexp("inputs", "type", regexp.MustCompile("^metric/.*")), + ExtractListItem("path.p", "item", "target"), ) y := `- rename: @@ -431,6 +480,10 @@ func TestSerialization(t *testing.T) { key: type re: ^metric/.* selector: inputs +- extract_list_items: + path: path.p + item: item + to: target ` t.Run("serialize_rules", func(t *testing.T) {