Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ otherwise no tag is added. {issue}42208[42208] {pull}42403[42403]
- Journald `include_matches.match` now accepts `+` to represent a logical disjunction (OR) {issue}40185[40185] {pull}42517[42517]
- The journald input is now generally available. {pull}42107[42107]
- Add metrics for number of events and pages published by HTTPJSON input. {issue}42340[42340] {pull}42442[42442]
- Filestram take over now supports taking over states from other Filestream inputs and dynamic loading of inputs (autodiscover and Elastic-Agent). {issue}42472[42472] {issue}42884[42884] {pull}42624[42624]
- Filestream take over now supports taking over states from other Filestream inputs and dynamic loading of inputs (autodiscover and Elastic Agent). There is a new syntax for the configuration, but the previous one can still be used. {issue}42472[42472] {issue}42884[42884] {pull}42624[42624]
- Add `etw` input fallback to attach an already existing session. {pull}42847[42847]
- Update CEL mito extensions to v1.17.0. {pull}42851[42851]
- Winlog input now can report its status to Elastic-Agent {pull}43089[43089]
Expand Down
5 changes: 4 additions & 1 deletion docs/reference/filebeat/filebeat-input-filestream.md
Original file line number Diff line number Diff line change
Expand Up @@ -354,13 +354,16 @@ The `take over` mode can work correctly only if the source (taken from) inputs a
`take_over.enabled: true` requires the `filestream` to have a unique ID.
::::


This `take over` mode was created to enable smooth migration from
deprecated `log` inputs to the new `filestream` inputs and to allow
changing `filestream` input IDs without data re-ingestion.

See [*Migrate `log` input configurations to `filestream`*](/reference/filebeat/migrate-to-filestream.md) for more details about the migration process.

The previous configuration format `take_over: true`, while
deprecated, is still supported to migrate state from the `log` input
to `filestream`.

::::{warning}
The `take over` mode is still in beta, however, it should be generally safe to use.
::::
Expand Down
10 changes: 4 additions & 6 deletions filebeat/input/filestream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/dustin/go-humanize"

loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
"github.com/elastic/beats/v7/libbeat/common/match"
"github.com/elastic/beats/v7/libbeat/reader/parser"
"github.com/elastic/beats/v7/libbeat/reader/readfile"
Expand All @@ -49,18 +50,15 @@ type config struct {
IgnoreOlder time.Duration `config:"ignore_older"`
IgnoreInactive ignoreInactiveType `config:"ignore_inactive"`
Rotation *conf.Namespace `config:"rotation"`
TakeOver takeOverConfig `config:"take_over"`

// TakeOver is also independently parsed by InputManager.Create
// (see internal/input-logfile/manager.go).
TakeOver loginp.TakeOverConfig `config:"take_over"`
// AllowIDDuplication is used by InputManager.Create
// (see internal/input-logfile/manager.go).
AllowIDDuplication bool `config:"allow_deprecated_id_duplication"`
}

type takeOverConfig struct {
Enabled bool `config:"enabled"`
FromIDs []string `config:"from_ids"`
}

type closerConfig struct {
OnStateChange stateChangeCloserConfig `config:"on_state_change"`
Reader readerCloserConfig `config:"reader"`
Expand Down
5 changes: 3 additions & 2 deletions filebeat/input/filestream/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest/observer"

loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)
Expand All @@ -40,7 +41,7 @@
t.Run("take_over requires ID", func(t *testing.T) {
c := config{
Paths: []string{"/foo/bar"},
TakeOver: takeOverConfig{Enabled: true},
TakeOver: loginp.TakeOverConfig{Enabled: true},
}
err := c.Validate()
assert.Error(t, err, "take_over.enabled can only be true if ID is set")
Expand All @@ -50,7 +51,7 @@
c := config{
Paths: []string{"/foo/bar"},
ID: "some id",
TakeOver: takeOverConfig{Enabled: true},
TakeOver: loginp.TakeOverConfig{Enabled: true},
}
err := c.Validate()
assert.NoError(t, err)
Expand Down Expand Up @@ -228,7 +229,7 @@
require.NoError(t, err, "could not create input configuration")
inputs = append(inputs, cfg)
}
err := logp.DevelopmentSetup(logp.ToObserverOutput())

Check failure on line 232 in filebeat/input/filestream/config_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

SA1019: logp.DevelopmentSetup is deprecated: Prefer using localized loggers. Use logp.NewDevelopmentLogger. (staticcheck)
require.NoError(t, err, "could not setup log for development")

err = ValidateInputIDs(inputs, logp.L())
Expand Down
4 changes: 3 additions & 1 deletion filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
encodingFactory encoding.EncodingFactory
closerConfig closerConfig
parsers parser.Config
takeOver takeOverConfig
takeOver loginp.TakeOverConfig
}

// Plugin creates a new filestream input plugin for creating a stateful input.
Expand All @@ -91,6 +91,8 @@
return nil, nil, err
}

config.TakeOver.LogWarnings(log)

prospector, err := newProspector(config, log)
if err != nil {
return nil, nil, fmt.Errorf("cannot create prospector: %w", err)
Expand Down Expand Up @@ -393,7 +395,7 @@
continue
}

metrics.BytesProcessed.Add(uint64(message.Bytes))

Check failure on line 398 in filebeat/input/filestream/input.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

G115: integer overflow conversion int -> uint64 (gosec)

// add "take_over" tag if `take_over` is set to true
if inp.takeOver.Enabled {
Expand Down
70 changes: 62 additions & 8 deletions filebeat/input/filestream/internal/input-logfile/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@
if err != nil {
store.Release()
cim.shutdown()
return fmt.Errorf("Can not start registry cleanup process: %w", err)

Check failure on line 134 in filebeat/input/filestream/internal/input-logfile/manager.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

ST1005: error strings should not be capitalized (staticcheck)
}

return nil
Expand All @@ -151,14 +151,11 @@

settings := struct {
// All those values are duplicated from the Filestream configuration
ID string `config:"id"`
CleanInactive time.Duration `config:"clean_inactive"`
HarvesterLimit uint64 `config:"harvester_limit"`
AllowIDDuplication bool `config:"allow_deprecated_id_duplication"`
TakeOver struct {
Enabled bool `config:"enabled"`
FromIDs []string `config:"from_ids"`
} `config:"take_over"`
ID string `config:"id"`
CleanInactive time.Duration `config:"clean_inactive"`
HarvesterLimit uint64 `config:"harvester_limit"`
AllowIDDuplication bool `config:"allow_deprecated_id_duplication"`
TakeOver TakeOverConfig `config:"take_over"`
}{
CleanInactive: cim.DefaultCleanTimeout,
}
Expand Down Expand Up @@ -325,3 +322,60 @@
func (i *sourceIdentifier) MatchesInput(id string) bool {
return strings.HasPrefix(id, i.prefix)
}

// TakeOverConfig is the configuration for the take over mode.
// It allows the Filestream input to take over states from the log
// input or other Filestream inputs
type TakeOverConfig struct {
Enabled bool `config:"enabled"`
// Filestream IDs to take over states
FromIDs []string `config:"from_ids"`

// legacyFormat is set to true when `Unpack` detects
// the legacy configuration format. It is used by
// `LogWarnings` to log warnings
legacyFormat bool
}

func (t *TakeOverConfig) Unpack(value any) error {
switch v := value.(type) {
case bool:
t.Enabled = v
t.legacyFormat = true
case map[string]any:
rawEnabled := v["enabled"]
enabled, ok := rawEnabled.(bool)
if !ok {
return fmt.Errorf("cannot parse '%[1]v' (type %[1]T) as bool", rawEnabled)
}
t.Enabled = enabled

rawFromIDs, exists := v["from_ids"]
if !exists {
return nil
}

fromIDs, ok := rawFromIDs.([]any)
if !ok {
return fmt.Errorf("cannot parse '%[1]v' (type %[1]T) as []any", rawFromIDs)
}
for _, el := range fromIDs {
strEl, ok := el.(string)
if !ok {
return fmt.Errorf("cannot parse '%[1]v' (type %[1]T) as string", el)
}
t.FromIDs = append(t.FromIDs, strEl)
}

default:
return fmt.Errorf("cannot parse '%[1]v' (type %[1]T)", value)
}

return nil
}

func (t *TakeOverConfig) LogWarnings(logger *logp.Logger) {
if t.legacyFormat {
logger.Warn("using 'take_over: true' is deprecated, use the new format: 'take_over.enabled: true'")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -434,3 +434,89 @@ func newBufferLogger() (*logp.Logger, *bytes.Buffer) {
}))
return log, buf
}

func TestTakeOverConfigUnpack(t *testing.T) {
testCases := map[string]struct {
cfgYAML string
expected TakeOverConfig
expectErr bool
}{
"legacy mode enabled": {
cfgYAML: `take_over: true`,
expected: TakeOverConfig{
Enabled: true,
legacyFormat: true,
},
},
"legacy mode disabled": {
cfgYAML: `take_over: false`,
expected: TakeOverConfig{
Enabled: false,
legacyFormat: true,
},
},
"new mode enabled": {
cfgYAML: `
take_over:
enabled: true`,
expected: TakeOverConfig{
Enabled: true,
},
},
"new mode disabled": {
cfgYAML: `
take_over:
enabled: false`,
expected: TakeOverConfig{
Enabled: false,
},
},
"new mode with IDs": {
cfgYAML: `
take_over:
enabled: true
from_ids: ["foo", "bar"]`,
expected: TakeOverConfig{
Enabled: true,
FromIDs: []string{"foo", "bar"},
},
},
"take_over not defined": {
cfgYAML: "",
expectErr: false,
},
"invalid new config": {
cfgYAML: "take_over.enabled: 42",
expectErr: true,
},
"invalid from_ids elements ": {
cfgYAML: "take_over.from_ids: [\"foo\", 42]",
expectErr: true,
},
"invalid from_ids type ": {
cfgYAML: "take_over.from_ids: false",
expectErr: true,
},
"invalid legacy config": {
cfgYAML: "take_over: 42",
expectErr: true,
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
cfg := config.MustNewConfigFrom(tc.cfgYAML)
outer := struct {
TakeOver TakeOverConfig `config:"take_over"`
}{}
err := cfg.Unpack(&outer)
if tc.expectErr {
require.Error(t, err, "Unpack must fail")
} else {
require.NoError(t, err, "Unpack must succeed")
}

assert.Equal(t, tc.expected, outer.TakeOver, "TakeOverConfig was not parsed correctly")
})
}
}
2 changes: 1 addition & 1 deletion filebeat/input/filestream/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
ignoreInactiveSince ignoreInactiveType
cleanRemoved bool
stateChangeCloser stateChangeCloserConfig
takeOver takeOverConfig
takeOver loginp.TakeOverConfig
}

func (p *fileProspector) Init(
Expand Down Expand Up @@ -152,7 +152,7 @@
// - The old identifier is neither native nor path
oldIdentifierName := fm.IdentifierName
if oldIdentifierName == identifierName ||
!(oldIdentifierName == nativeName || oldIdentifierName == pathName) {

Check failure on line 155 in filebeat/input/filestream/prospector.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

QF1001: could apply De Morgan's law (staticcheck)
return "", nil
}

Expand Down Expand Up @@ -328,7 +328,7 @@
) {
switch event.Op {
case loginp.OpCreate, loginp.OpWrite:
if event.Op == loginp.OpCreate {

Check failure on line 331 in filebeat/input/filestream/prospector.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

QF1003: could use tagged switch on event.Op (staticcheck)
log.Debugf("A new file %s has been found", event.NewPath)

err := updater.UpdateMetadata(src, fileMeta{Source: event.NewPath, IdentifierName: p.identifier.Name()})
Expand Down
1 change: 0 additions & 1 deletion filebeat/input/v2/compat/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ paths:
prospector:
scanner:
symlinks: true
take_over: true
type: test
`, inputID)

Expand Down
10 changes: 10 additions & 0 deletions filebeat/tests/integration/filestream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,11 @@
"expected-registry-happy-path.json"),
"Entries in the registry are different from the expectation",
)

deprecationLog := "using 'take_over: true' is deprecated, use the new format: 'take_over.enabled: true'"
if filebeat.LogContains(deprecationLog) {
t.Fatalf("deprecation log %q must not be present when using the new syntax", deprecationLog)
}
}

func TestFilestreamTakeOverFromLogInput(t *testing.T) {
Expand Down Expand Up @@ -655,6 +660,11 @@
"expected-registry-happy-path-log-input.json"),
"Entries in the registry are different from the expectation",
)

deprecationLog := "using 'take_over: true' is deprecated, use the new format: 'take_over.enabled: true'"
if !filebeat.LogContains(deprecationLog) {
t.Fatalf("did not find the deprecation log %q", deprecationLog)
}
}

func requireRegistryEntryRemoved(t *testing.T, workDir, identity string) {
Expand Down Expand Up @@ -731,7 +741,7 @@
func waitForEOF(t *testing.T, filebeat *integration.BeatProc, files []string) {
for _, path := range files {
if runtime.GOOS == "windows" {
path = strings.Replace(path, `\`, `\\`, -1)

Check failure on line 744 in filebeat/tests/integration/filestream_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

QF1004: could use strings.ReplaceAll instead (staticcheck)
}
eofMsg := fmt.Sprintf("End of file reached: %s; Backoff now.", path)

Expand All @@ -750,7 +760,7 @@
func waitForDidnotChange(t *testing.T, filebeat *integration.BeatProc, files []string) {
for _, path := range files {
if runtime.GOOS == "windows" {
path = strings.Replace(path, `\`, `\\`, -1)

Check failure on line 763 in filebeat/tests/integration/filestream_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

QF1004: could use strings.ReplaceAll instead (staticcheck)
}
eofMsg := fmt.Sprintf("File didn't change: %s", path)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ filebeat.inputs:
- {{.testdata}}/take-over/*.log
{{ if .takeOver }}
id: take-over-from-log-input
take_over:
enabled: true
take_over: true
file_identity.fingerprint: ~
prospector:
scanner:
Expand Down
Loading