Skip to content

Commit 6bc7e55

Browse files
jsorianoChrsMark
andcommitted
Use indexers and matchers in config when defaults are enabled (#18818)
Before 7.7.0, indexers and matchers defined in `add_kubernetes_metadata` configuration were used even when defaults were not disabled. Revert to this behaviour and add tests to avoid changing it unexpectedly in the future. (cherry picked from commit 600998b) Co-authored-by: Chris Mark <[email protected]>
1 parent 408423d commit 6bc7e55

File tree

3 files changed

+126
-15
lines changed

3 files changed

+126
-15
lines changed

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
117117
- Gives monitoring reporter hosts, if configured, total precedence over corresponding output hosts. {issue}17937[17937] {pull}17991[17991]
118118
- [Autodiscover] Check if runner is already running before starting again. {pull}18564[18564]
119119
- Fix `keystore add` hanging under Windows. {issue}18649[18649] {pull}18654[18654]
120+
- Fix regression in `add_kubernetes_metadata`, so configured `indexers` and `matchers` are used if defaults are not disabled. {issue}18481[18481] {pull}18818[18818]
120121

121122
*Auditbeat*
122123

libbeat/processors/add_kubernetes_metadata/kubernetes.go

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -90,24 +90,12 @@ func isKubernetesAvailableWithRetry(client k8sclient.Interface) bool {
9090

9191
// New constructs a new add_kubernetes_metadata processor.
9292
func New(cfg *common.Config) (processors.Processor, error) {
93-
config := defaultKubernetesAnnotatorConfig()
94-
log := logp.NewLogger(selector).With("libbeat.processor", "add_kubernetes_metadata")
95-
96-
err := cfg.Unpack(&config)
93+
config, err := newProcessorConfig(cfg, Indexing)
9794
if err != nil {
98-
return nil, fmt.Errorf("fail to unpack the kubernetes configuration: %s", err)
99-
}
100-
101-
//Load default indexer configs
102-
if config.DefaultIndexers.Enabled == true {
103-
config.Indexers = Indexing.GetDefaultIndexerConfigs()
104-
}
105-
106-
//Load default matcher configs
107-
if config.DefaultMatchers.Enabled == true {
108-
config.Matchers = Indexing.GetDefaultMatcherConfigs()
95+
return nil, err
10996
}
11097

98+
log := logp.NewLogger(selector).With("libbeat.processor", "add_kubernetes_metadata")
11199
processor := &kubernetesAnnotator{
112100
log: log,
113101
cache: newCache(config.CleanupTimeout),
@@ -121,6 +109,27 @@ func New(cfg *common.Config) (processors.Processor, error) {
121109
return processor, nil
122110
}
123111

112+
func newProcessorConfig(cfg *common.Config, register *Register) (kubeAnnotatorConfig, error) {
113+
config := defaultKubernetesAnnotatorConfig()
114+
115+
err := cfg.Unpack(&config)
116+
if err != nil {
117+
return config, fmt.Errorf("fail to unpack the kubernetes configuration: %s", err)
118+
}
119+
120+
//Load and append default indexer configs
121+
if config.DefaultIndexers.Enabled {
122+
config.Indexers = append(config.Indexers, register.GetDefaultIndexerConfigs()...)
123+
}
124+
125+
//Load and append default matcher configs
126+
if config.DefaultMatchers.Enabled {
127+
config.Matchers = append(config.Matchers, register.GetDefaultMatcherConfigs()...)
128+
}
129+
130+
return config, nil
131+
}
132+
124133
func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *common.Config) {
125134
k.initOnce.Do(func() {
126135
client, err := kubernetes.GetKubernetesClient(config.KubeConfig)

libbeat/processors/add_kubernetes_metadata/kubernetes_test.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"time"
2323

2424
"github.com/stretchr/testify/assert"
25+
"github.com/stretchr/testify/require"
2526

2627
"github.com/elastic/beats/v7/libbeat/beat"
2728
"github.com/elastic/beats/v7/libbeat/common"
@@ -128,3 +129,103 @@ func TestAnnotatorWithNoKubernetesAvailable(t *testing.T) {
128129

129130
assert.Equal(t, intialEventMap, event.Fields)
130131
}
132+
133+
// TestNewProcessorConfigDefaultIndexers validates the behaviour of default indexers and
134+
// matchers settings
135+
func TestNewProcessorConfigDefaultIndexers(t *testing.T) {
136+
emptyRegister := NewRegister()
137+
registerWithDefaults := NewRegister()
138+
registerWithDefaults.AddDefaultIndexerConfig("ip_port", *common.NewConfig())
139+
registerWithDefaults.AddDefaultMatcherConfig("field_format", *common.MustNewConfigFrom(map[string]interface{}{
140+
"format": "%{[destination.ip]}:%{[destination.port]}",
141+
}))
142+
143+
configWithIndexersAndMatchers := common.MustNewConfigFrom(map[string]interface{}{
144+
"indexers": []map[string]interface{}{
145+
{
146+
"container": map[string]interface{}{},
147+
},
148+
},
149+
"matchers": []map[string]interface{}{
150+
{
151+
"fields": map[string]interface{}{
152+
"lookup_fields": []string{"container.id"},
153+
},
154+
},
155+
},
156+
})
157+
configOverrideDefaults := common.MustNewConfigFrom(map[string]interface{}{
158+
"default_indexers.enabled": "false",
159+
"default_matchers.enabled": "false",
160+
})
161+
require.NoError(t, configOverrideDefaults.Merge(configWithIndexersAndMatchers))
162+
163+
cases := map[string]struct {
164+
register *Register
165+
config *common.Config
166+
expectedMatchers []string
167+
expectedIndexers []string
168+
}{
169+
"no matchers": {
170+
register: emptyRegister,
171+
config: common.NewConfig(),
172+
},
173+
"one configured indexer and matcher": {
174+
register: emptyRegister,
175+
config: configWithIndexersAndMatchers,
176+
expectedIndexers: []string{"container"},
177+
expectedMatchers: []string{"fields"},
178+
},
179+
"default indexers and matchers": {
180+
register: registerWithDefaults,
181+
config: common.NewConfig(),
182+
expectedIndexers: []string{"ip_port"},
183+
expectedMatchers: []string{"field_format"},
184+
},
185+
"default indexers and matchers, don't use indexers": {
186+
register: registerWithDefaults,
187+
config: common.MustNewConfigFrom(map[string]interface{}{
188+
"default_indexers.enabled": "false",
189+
}),
190+
expectedMatchers: []string{"field_format"},
191+
},
192+
"default indexers and matchers, don't use matchers": {
193+
register: registerWithDefaults,
194+
config: common.MustNewConfigFrom(map[string]interface{}{
195+
"default_matchers.enabled": "false",
196+
}),
197+
expectedIndexers: []string{"ip_port"},
198+
},
199+
"one configured indexer and matcher and defaults, configured should come first": {
200+
register: registerWithDefaults,
201+
config: configWithIndexersAndMatchers,
202+
expectedIndexers: []string{"container", "ip_port"},
203+
expectedMatchers: []string{"fields", "field_format"},
204+
},
205+
"override defaults": {
206+
register: registerWithDefaults,
207+
config: configOverrideDefaults,
208+
expectedIndexers: []string{"container"},
209+
expectedMatchers: []string{"fields"},
210+
},
211+
}
212+
213+
names := func(plugins PluginConfig) []string {
214+
var ns []string
215+
for _, plugin := range plugins {
216+
for name := range plugin {
217+
ns = append(ns, name)
218+
}
219+
}
220+
return ns
221+
}
222+
223+
for title, c := range cases {
224+
t.Run(title, func(t *testing.T) {
225+
config, err := newProcessorConfig(c.config, c.register)
226+
require.NoError(t, err)
227+
assert.Equal(t, c.expectedMatchers, names(config.Matchers), "expected matchers")
228+
assert.Equal(t, c.expectedIndexers, names(config.Indexers), "expected indexers")
229+
})
230+
}
231+
}

0 commit comments

Comments
 (0)