Skip to content

Commit b17a3cf

Browse files
Cherry-pick #18769 to 7.8: Guard against empty stream.datasource and namespace (#18806)
* [Ingest Manager] Guard against empty stream.datasource and namespace (#18769)
1 parent 90ace69 commit b17a3cf

File tree

12 files changed

+228
-6
lines changed

12 files changed

+228
-6
lines changed

x-pack/elastic-agent/CHANGELOG.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
- Clean action store after enrolling to new configuration {pull}18656[18656]
4141
- Avoid watching monitor logs {pull}18723[18723]
4242
- Correctly report platform and family. {issue}18665[18665]
43+
- Guard against empty stream.datasource and namespace {pull}18769[18769]
4344

4445
==== New features
4546

x-pack/elastic-agent/pkg/agent/program/supported.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/elastic-agent/pkg/agent/program/testdata/constraints_config-filebeat.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ filebeat:
44
paths:
55
- /var/log/hello1.log
66
- /var/log/hello2.log
7+
dataset: generic
78
index: logs-generic-default
89
processors:
910
- add_fields:

x-pack/elastic-agent/pkg/agent/program/testdata/enabled_output_true-filebeat.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ filebeat:
44
paths:
55
- /var/log/hello1.log
66
- /var/log/hello2.log
7+
dataset: generic
78
index: logs-generic-default
89
processors:
910
- add_fields:

x-pack/elastic-agent/pkg/agent/program/testdata/enabled_true-filebeat.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ filebeat:
55
paths:
66
- /var/log/hello1.log
77
- /var/log/hello2.log
8+
dataset: generic
89
index: logs-generic-default
910
processors:
1011
- add_fields:

x-pack/elastic-agent/pkg/agent/program/testdata/single_config-filebeat.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ filebeat:
55
- /var/log/hello1.log
66
- /var/log/hello2.log
77
index: logs-generic-default
8+
dataset: generic
89
vars:
910
var: value
1011
processors:

x-pack/elastic-agent/pkg/agent/program/testdata/single_config-metricbeat.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,17 @@ metricbeat:
1111
type: metrics
1212
dataset: docker.status
1313
namespace: default
14+
- module: docker
15+
metricsets: [info]
16+
index: metrics-generic-default
17+
hosts: ["http://127.0.0.1:8080"]
18+
processors:
19+
- add_fields:
20+
target: "stream"
21+
fields:
22+
type: metrics
23+
dataset: generic
24+
namespace: default
1425
- module: apache
1526
metricsets: [info]
1627
index: metrics-generic-testing

x-pack/elastic-agent/pkg/agent/program/testdata/single_config.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ datasources:
2626
streams:
2727
- metricset: status
2828
dataset: docker.status
29+
- metricset: info
30+
dataset: ""
2931
hosts: ["http://127.0.0.1:8080"]
3032
- type: logs
3133
streams:

x-pack/elastic-agent/pkg/agent/transpiler/rules.go

Lines changed: 110 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ func (r *RuleList) MarshalYAML() (interface{}, error) {
7575
name = "make_array"
7676
case *RemoveKeyRule:
7777
name = "remove_key"
78-
78+
case *FixStreamRule:
79+
name = "fix_stream"
7980
default:
8081
return nil, fmt.Errorf("unknown rule of type %T", rule)
8182
}
@@ -153,6 +154,8 @@ func (r *RuleList) UnmarshalYAML(unmarshal func(interface{}) error) error {
153154
r = &MakeArrayRule{}
154155
case "remove_key":
155156
r = &RemoveKeyRule{}
157+
case "fix_stream":
158+
r = &FixStreamRule{}
156159
default:
157160
return fmt.Errorf("unknown rule of type %s", name)
158161
}
@@ -345,6 +348,100 @@ func CopyAllToList(to, onMerge string, except ...string) *CopyAllToListRule {
345348
}
346349
}
347350

351+
// FixStreamRule fixes streams to contain default values
352+
// in case no value or invalid value are provided
353+
type FixStreamRule struct {
354+
}
355+
356+
// Apply stream fixes.
357+
func (r *FixStreamRule) Apply(ast *AST) error {
358+
const defaultNamespace = "default"
359+
const defaultDataset = "generic"
360+
361+
datasourcesNode, found := Lookup(ast, "datasources")
362+
if !found {
363+
return nil
364+
}
365+
366+
datasourcesList, ok := datasourcesNode.Value().(*List)
367+
if !ok {
368+
return nil
369+
}
370+
371+
for _, datasourceNode := range datasourcesList.value {
372+
nsNode, found := datasourceNode.Find("namespace")
373+
if found {
374+
nsKey, ok := nsNode.(*Key)
375+
if ok {
376+
if newNamespace := nsKey.value.String(); newNamespace == "" {
377+
nsKey.value = &StrVal{value: defaultNamespace}
378+
}
379+
}
380+
} else {
381+
datasourceMap, ok := datasourceNode.(*Dict)
382+
if !ok {
383+
continue
384+
}
385+
datasourceMap.value = append(datasourceMap.value, &Key{
386+
name: "namespace",
387+
value: &StrVal{value: defaultNamespace},
388+
})
389+
}
390+
391+
// get input
392+
inputNode, found := datasourceNode.Find("inputs")
393+
if !found {
394+
continue
395+
}
396+
397+
inputsList, ok := inputNode.Value().(*List)
398+
if !ok {
399+
continue
400+
}
401+
402+
for _, inputNode := range inputsList.value {
403+
streamsNode, ok := inputNode.Find("streams")
404+
if !ok {
405+
continue
406+
}
407+
408+
streamsList, ok := streamsNode.Value().(*List)
409+
if !ok {
410+
continue
411+
}
412+
413+
for _, streamNode := range streamsList.value {
414+
streamMap, ok := streamNode.(*Dict)
415+
if !ok {
416+
continue
417+
}
418+
419+
dsNode, found := streamNode.Find("dataset")
420+
if found {
421+
dsKey, ok := dsNode.(*Key)
422+
if ok {
423+
if newDataset := dsKey.value.String(); newDataset == "" {
424+
dsKey.value = &StrVal{value: defaultDataset}
425+
}
426+
}
427+
} else {
428+
streamMap.value = append(streamMap.value, &Key{
429+
name: "dataset",
430+
value: &StrVal{value: defaultDataset},
431+
})
432+
}
433+
}
434+
}
435+
}
436+
437+
return nil
438+
}
439+
440+
// FixStream creates a FixStreamRule
441+
func FixStream() *FixStreamRule {
442+
return &FixStreamRule{}
443+
}
444+
348445
// InjectIndexRule injects index to each input.
349446
// Index is in form {type}-{namespace}-{dataset-type}
350447
// type: is provided to the rule.
@@ -375,7 +472,9 @@ func (r *InjectIndexRule) Apply(ast *AST) error {
375472
if found {
376473
nsKey, ok := nsNode.(*Key)
377474
if ok {
378-
namespace = nsKey.value.String()
475+
if newNamespace := nsKey.value.String(); newNamespace != "" {
476+
namespace = newNamespace
477+
}
379478
}
380479
}
381480

@@ -413,7 +512,9 @@ func (r *InjectIndexRule) Apply(ast *AST) error {
413512
if found {
414513
dsKey, ok := dsNode.(*Key)
415514
if ok {
416-
dataset = dsKey.value.String()
515+
if newDataset := dsKey.value.String(); newDataset != "" {
516+
dataset = newDataset
517+
}
417518
}
418519

419520
}
@@ -464,7 +565,9 @@ func (r *InjectStreamProcessorRule) Apply(ast *AST) error {
464565
if found {
465566
nsKey, ok := nsNode.(*Key)
466567
if ok {
467-
namespace = nsKey.value.String()
568+
if newNamespace := nsKey.value.String(); newNamespace != "" {
569+
namespace = newNamespace
570+
}
468571
}
469572
}
470573

@@ -502,7 +605,9 @@ func (r *InjectStreamProcessorRule) Apply(ast *AST) error {
502605
if found {
503606
dsKey, ok := dsNode.(*Key)
504607
if ok {
505-
dataset = dsKey.value.String()
608+
if newDataset := dsKey.value.String(); newDataset != "" {
609+
dataset = newDataset
610+
}
506611
}
507612
}
508613

x-pack/elastic-agent/pkg/agent/transpiler/rules_test.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,86 @@ func TestRules(t *testing.T) {
2222
expectedYAML string
2323
rule Rule
2424
}{
25+
"fix streams": {
26+
givenYAML: `
27+
datasources:
28+
- name: All default
29+
inputs:
30+
- type: file
31+
streams:
32+
- paths: /var/log/mysql/error.log
33+
- name: Specified namespace
34+
namespace: nsns
35+
inputs:
36+
- type: file
37+
streams:
38+
- paths: /var/log/mysql/access.log
39+
- name: Specified dataset
40+
inputs:
41+
- type: file
42+
streams:
43+
- paths: /var/log/mysql/access.log
44+
dataset: dsds
45+
- name: All specified
46+
namespace: nsns
47+
inputs:
48+
- type: file
49+
streams:
50+
- paths: /var/log/mysql/access.log
51+
dataset: dsds
52+
- name: All specified with empty strings
53+
namespace: ""
54+
inputs:
55+
- type: file
56+
streams:
57+
- paths: /var/log/mysql/access.log
58+
dataset: ""
59+
`,
60+
expectedYAML: `
61+
datasources:
62+
- name: All default
63+
namespace: default
64+
inputs:
65+
- type: file
66+
streams:
67+
- paths: /var/log/mysql/error.log
68+
dataset: generic
69+
- name: Specified namespace
70+
namespace: nsns
71+
inputs:
72+
- type: file
73+
streams:
74+
- paths: /var/log/mysql/access.log
75+
dataset: generic
76+
- name: Specified dataset
77+
namespace: default
78+
inputs:
79+
- type: file
80+
streams:
81+
- paths: /var/log/mysql/access.log
82+
dataset: dsds
83+
- name: All specified
84+
namespace: nsns
85+
inputs:
86+
- type: file
87+
streams:
88+
- paths: /var/log/mysql/access.log
89+
dataset: dsds
90+
- name: All specified with empty strings
91+
namespace: default
92+
inputs:
93+
- type: file
94+
streams:
95+
- paths: /var/log/mysql/access.log
96+
dataset: generic
97+
`,
98+
rule: &RuleList{
99+
Rules: []Rule{
100+
FixStream(),
101+
},
102+
},
103+
},
104+
25105
"inject index": {
26106
givenYAML: `
27107
datasources:
@@ -49,6 +129,13 @@ datasources:
49129
streams:
50130
- paths: /var/log/mysql/access.log
51131
dataset: dsds
132+
- name: All specified with empty strings
133+
namespace: ""
134+
inputs:
135+
- type: file
136+
streams:
137+
- paths: /var/log/mysql/access.log
138+
dataset: ""
52139
`,
53140
expectedYAML: `
54141
datasources:
@@ -80,6 +167,14 @@ datasources:
80167
- paths: /var/log/mysql/access.log
81168
dataset: dsds
82169
index: mytype-dsds-nsns
170+
- name: All specified with empty strings
171+
namespace: ""
172+
inputs:
173+
- type: file
174+
streams:
175+
- paths: /var/log/mysql/access.log
176+
dataset: ""
177+
index: mytype-generic-default
83178
`,
84179
rule: &RuleList{
85180
Rules: []Rule{
@@ -564,6 +659,7 @@ func TestSerialization(t *testing.T) {
564659
InjectStreamProcessor("insert_after", "index-type"),
565660
CopyToList("t1", "t2", "insert_after"),
566661
CopyAllToList("t2", "insert_before", "a", "b"),
662+
FixStream(),
567663
)
568664

569665
y := `- rename:
@@ -623,6 +719,7 @@ func TestSerialization(t *testing.T) {
623719
- a
624720
- b
625721
on_conflict: insert_before
722+
- fix_stream: {}
626723
`
627724

628725
t.Run("serialize_rules", func(t *testing.T) {

0 commit comments

Comments
 (0)