diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index ac6fa3a6e0b1..052724fe4ecf 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -439,7 +439,7 @@ otherwise no tag is added. {issue}42208[42208] {pull}42403[42403] - Journald `include_matches.match` now accepts `+` to represent a logical disjunction (OR) {issue}40185[40185] {pull}42517[42517] - The journald input is now generally available. {pull}42107[42107] - Add metrics for number of events and pages published by HTTPJSON input. {issue}42340[42340] {pull}42442[42442] -- Filestram take over now supports taking over states from other Filestream inputs and dynamic loading of inputs (autodiscover and Elastic-Agent). {issue}42472[42472] {issue}42884[42884] {pull}42624[42624] +- Filestream take over now supports taking over states from other Filestream inputs and dynamic loading of inputs (autodiscover and Elastic Agent). There is a new syntax for the configuration, but the previous one can still be used. {issue}42472[42472] {issue}42884[42884] {pull}42624[42624] - Add `etw` input fallback to attach an already existing session. {pull}42847[42847] - Update CEL mito extensions to v1.17.0. {pull}42851[42851] - Winlog input now can report its status to Elastic-Agent {pull}43089[43089] diff --git a/docs/reference/filebeat/filebeat-input-filestream.md b/docs/reference/filebeat/filebeat-input-filestream.md index 980bf6131995..0ce959023b86 100644 --- a/docs/reference/filebeat/filebeat-input-filestream.md +++ b/docs/reference/filebeat/filebeat-input-filestream.md @@ -354,13 +354,16 @@ The `take over` mode can work correctly only if the source (taken from) inputs a `take_over.enabled: true` requires the `filestream` to have a unique ID. :::: - This `take over` mode was created to enable smooth migration from deprecated `log` inputs to the new `filestream` inputs and to allow changing `filestream` input IDs without data re-ingestion. See [*Migrate `log` input configurations to `filestream`*](/reference/filebeat/migrate-to-filestream.md) for more details about the migration process. +The previous configuration format `take_over: true`, while +deprecated, is still supported to migrate state from the `log` input +to `filestream`. + ::::{warning} The `take over` mode is still in beta, however, it should be generally safe to use. :::: diff --git a/filebeat/input/filestream/config.go b/filebeat/input/filestream/config.go index c98e6e3acd1b..72da75092451 100644 --- a/filebeat/input/filestream/config.go +++ b/filebeat/input/filestream/config.go @@ -25,6 +25,7 @@ import ( "github.com/dustin/go-humanize" + loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" "github.com/elastic/beats/v7/libbeat/common/match" "github.com/elastic/beats/v7/libbeat/reader/parser" "github.com/elastic/beats/v7/libbeat/reader/readfile" @@ -49,18 +50,15 @@ type config struct { IgnoreOlder time.Duration `config:"ignore_older"` IgnoreInactive ignoreInactiveType `config:"ignore_inactive"` Rotation *conf.Namespace `config:"rotation"` - TakeOver takeOverConfig `config:"take_over"` + // TakeOver is also independently parsed by InputManager.Create + // (see internal/input-logfile/manager.go). + TakeOver loginp.TakeOverConfig `config:"take_over"` // AllowIDDuplication is used by InputManager.Create // (see internal/input-logfile/manager.go). AllowIDDuplication bool `config:"allow_deprecated_id_duplication"` } -type takeOverConfig struct { - Enabled bool `config:"enabled"` - FromIDs []string `config:"from_ids"` -} - type closerConfig struct { OnStateChange stateChangeCloserConfig `config:"on_state_change"` Reader readerCloserConfig `config:"reader"` diff --git a/filebeat/input/filestream/config_test.go b/filebeat/input/filestream/config_test.go index a4cbe576fe96..93b9c1d5173d 100644 --- a/filebeat/input/filestream/config_test.go +++ b/filebeat/input/filestream/config_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest/observer" + loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" ) @@ -40,7 +41,7 @@ func TestConfigValidate(t *testing.T) { t.Run("take_over requires ID", func(t *testing.T) { c := config{ Paths: []string{"/foo/bar"}, - TakeOver: takeOverConfig{Enabled: true}, + TakeOver: loginp.TakeOverConfig{Enabled: true}, } err := c.Validate() assert.Error(t, err, "take_over.enabled can only be true if ID is set") @@ -50,7 +51,7 @@ func TestConfigValidate(t *testing.T) { c := config{ Paths: []string{"/foo/bar"}, ID: "some id", - TakeOver: takeOverConfig{Enabled: true}, + TakeOver: loginp.TakeOverConfig{Enabled: true}, } err := c.Validate() assert.NoError(t, err) diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index 3e657b0a83ac..c4cca14b0dbb 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -64,7 +64,7 @@ type filestream struct { encodingFactory encoding.EncodingFactory closerConfig closerConfig parsers parser.Config - takeOver takeOverConfig + takeOver loginp.TakeOverConfig } // Plugin creates a new filestream input plugin for creating a stateful input. @@ -91,6 +91,8 @@ func configure(cfg *conf.C, log *logp.Logger) (loginp.Prospector, loginp.Harvest return nil, nil, err } + config.TakeOver.LogWarnings(log) + prospector, err := newProspector(config, log) if err != nil { return nil, nil, fmt.Errorf("cannot create prospector: %w", err) diff --git a/filebeat/input/filestream/internal/input-logfile/manager.go b/filebeat/input/filestream/internal/input-logfile/manager.go index 6e817c092a18..f7a7eb889c05 100644 --- a/filebeat/input/filestream/internal/input-logfile/manager.go +++ b/filebeat/input/filestream/internal/input-logfile/manager.go @@ -151,14 +151,11 @@ func (cim *InputManager) Create(config *conf.C) (inp v2.Input, retErr error) { settings := struct { // All those values are duplicated from the Filestream configuration - ID string `config:"id"` - CleanInactive time.Duration `config:"clean_inactive"` - HarvesterLimit uint64 `config:"harvester_limit"` - AllowIDDuplication bool `config:"allow_deprecated_id_duplication"` - TakeOver struct { - Enabled bool `config:"enabled"` - FromIDs []string `config:"from_ids"` - } `config:"take_over"` + ID string `config:"id"` + CleanInactive time.Duration `config:"clean_inactive"` + HarvesterLimit uint64 `config:"harvester_limit"` + AllowIDDuplication bool `config:"allow_deprecated_id_duplication"` + TakeOver TakeOverConfig `config:"take_over"` }{ CleanInactive: cim.DefaultCleanTimeout, } @@ -325,3 +322,60 @@ func (i *sourceIdentifier) ID(s Source) string { func (i *sourceIdentifier) MatchesInput(id string) bool { return strings.HasPrefix(id, i.prefix) } + +// TakeOverConfig is the configuration for the take over mode. +// It allows the Filestream input to take over states from the log +// input or other Filestream inputs +type TakeOverConfig struct { + Enabled bool `config:"enabled"` + // Filestream IDs to take over states + FromIDs []string `config:"from_ids"` + + // legacyFormat is set to true when `Unpack` detects + // the legacy configuration format. It is used by + // `LogWarnings` to log warnings + legacyFormat bool +} + +func (t *TakeOverConfig) Unpack(value any) error { + switch v := value.(type) { + case bool: + t.Enabled = v + t.legacyFormat = true + case map[string]any: + rawEnabled := v["enabled"] + enabled, ok := rawEnabled.(bool) + if !ok { + return fmt.Errorf("cannot parse '%[1]v' (type %[1]T) as bool", rawEnabled) + } + t.Enabled = enabled + + rawFromIDs, exists := v["from_ids"] + if !exists { + return nil + } + + fromIDs, ok := rawFromIDs.([]any) + if !ok { + return fmt.Errorf("cannot parse '%[1]v' (type %[1]T) as []any", rawFromIDs) + } + for _, el := range fromIDs { + strEl, ok := el.(string) + if !ok { + return fmt.Errorf("cannot parse '%[1]v' (type %[1]T) as string", el) + } + t.FromIDs = append(t.FromIDs, strEl) + } + + default: + return fmt.Errorf("cannot parse '%[1]v' (type %[1]T)", value) + } + + return nil +} + +func (t *TakeOverConfig) LogWarnings(logger *logp.Logger) { + if t.legacyFormat { + logger.Warn("using 'take_over: true' is deprecated, use the new format: 'take_over.enabled: true'") + } +} diff --git a/filebeat/input/filestream/internal/input-logfile/manager_test.go b/filebeat/input/filestream/internal/input-logfile/manager_test.go index e8b80fb59044..58e014dba0a0 100644 --- a/filebeat/input/filestream/internal/input-logfile/manager_test.go +++ b/filebeat/input/filestream/internal/input-logfile/manager_test.go @@ -434,3 +434,89 @@ func newBufferLogger() (*logp.Logger, *bytes.Buffer) { })) return log, buf } + +func TestTakeOverConfigUnpack(t *testing.T) { + testCases := map[string]struct { + cfgYAML string + expected TakeOverConfig + expectErr bool + }{ + "legacy mode enabled": { + cfgYAML: `take_over: true`, + expected: TakeOverConfig{ + Enabled: true, + legacyFormat: true, + }, + }, + "legacy mode disabled": { + cfgYAML: `take_over: false`, + expected: TakeOverConfig{ + Enabled: false, + legacyFormat: true, + }, + }, + "new mode enabled": { + cfgYAML: ` +take_over: + enabled: true`, + expected: TakeOverConfig{ + Enabled: true, + }, + }, + "new mode disabled": { + cfgYAML: ` +take_over: + enabled: false`, + expected: TakeOverConfig{ + Enabled: false, + }, + }, + "new mode with IDs": { + cfgYAML: ` +take_over: + enabled: true + from_ids: ["foo", "bar"]`, + expected: TakeOverConfig{ + Enabled: true, + FromIDs: []string{"foo", "bar"}, + }, + }, + "take_over not defined": { + cfgYAML: "", + expectErr: false, + }, + "invalid new config": { + cfgYAML: "take_over.enabled: 42", + expectErr: true, + }, + "invalid from_ids elements ": { + cfgYAML: "take_over.from_ids: [\"foo\", 42]", + expectErr: true, + }, + "invalid from_ids type ": { + cfgYAML: "take_over.from_ids: false", + expectErr: true, + }, + "invalid legacy config": { + cfgYAML: "take_over: 42", + expectErr: true, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + cfg := config.MustNewConfigFrom(tc.cfgYAML) + outer := struct { + TakeOver TakeOverConfig `config:"take_over"` + }{} + err := cfg.Unpack(&outer) + if tc.expectErr { + require.Error(t, err, "Unpack must fail") + } else { + require.NoError(t, err, "Unpack must succeed") + } + + assert.Equal(t, tc.expected, outer.TakeOver, "TakeOverConfig was not parsed correctly") + }) + } +} diff --git a/filebeat/input/filestream/prospector.go b/filebeat/input/filestream/prospector.go index 40856ca3c02d..dabcec62e0fd 100644 --- a/filebeat/input/filestream/prospector.go +++ b/filebeat/input/filestream/prospector.go @@ -81,7 +81,7 @@ type fileProspector struct { ignoreInactiveSince ignoreInactiveType cleanRemoved bool stateChangeCloser stateChangeCloserConfig - takeOver takeOverConfig + takeOver loginp.TakeOverConfig } func (p *fileProspector) Init( diff --git a/filebeat/input/v2/compat/compat_test.go b/filebeat/input/v2/compat/compat_test.go index 3995c85f185d..a8c2265f6779 100644 --- a/filebeat/input/v2/compat/compat_test.go +++ b/filebeat/input/v2/compat/compat_test.go @@ -110,7 +110,6 @@ paths: prospector: scanner: symlinks: true -take_over: true type: test `, inputID) diff --git a/filebeat/tests/integration/filestream_test.go b/filebeat/tests/integration/filestream_test.go index 5b7102bddaf4..d4dadb3ef557 100644 --- a/filebeat/tests/integration/filestream_test.go +++ b/filebeat/tests/integration/filestream_test.go @@ -589,6 +589,11 @@ func TestFilestreamTakeOverFromFilestream(t *testing.T) { "expected-registry-happy-path.json"), "Entries in the registry are different from the expectation", ) + + deprecationLog := "using 'take_over: true' is deprecated, use the new format: 'take_over.enabled: true'" + if filebeat.LogContains(deprecationLog) { + t.Fatalf("deprecation log %q must not be present when using the new syntax", deprecationLog) + } } func TestFilestreamTakeOverFromLogInput(t *testing.T) { @@ -655,6 +660,11 @@ func TestFilestreamTakeOverFromLogInput(t *testing.T) { "expected-registry-happy-path-log-input.json"), "Entries in the registry are different from the expectation", ) + + deprecationLog := "using 'take_over: true' is deprecated, use the new format: 'take_over.enabled: true'" + if !filebeat.LogContains(deprecationLog) { + t.Fatalf("did not find the deprecation log %q", deprecationLog) + } } func requireRegistryEntryRemoved(t *testing.T, workDir, identity string) { diff --git a/filebeat/tests/integration/testdata/take-over/happy-path-log-input.yml b/filebeat/tests/integration/testdata/take-over/happy-path-log-input.yml index f8962abdabab..b3463a436152 100644 --- a/filebeat/tests/integration/testdata/take-over/happy-path-log-input.yml +++ b/filebeat/tests/integration/testdata/take-over/happy-path-log-input.yml @@ -4,8 +4,7 @@ filebeat.inputs: - {{.testdata}}/take-over/*.log {{ if .takeOver }} id: take-over-from-log-input - take_over: - enabled: true + take_over: true file_identity.fingerprint: ~ prospector: scanner: