Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add missing Kubernetes metadata fields to Filebeat CoreDNS module, and fix a documentation error. {pull}11591[11591]
- Reduce memory usage if long lines are truncated to fit `max_bytes` limit. The line buffer is copied into a smaller buffer now. This allows the runtime to release unused memory earlier. {pull}11524[11524]
- Fix memory leak in Filebeat pipeline acker. {pull}12063[12063]
- Fix goroutine leak caused on initialization failures of log input. {pull}12125[12125]

*Heartbeat*

Expand Down
52 changes: 26 additions & 26 deletions filebeat/input/log/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,37 +76,18 @@ func NewInput(
outlet channel.Connector,
context input.Context,
) (input.Input, error) {

// Note: underlying output.
// The input and harvester do have different requirements
// on the timings the outlets must be closed/unblocked.
// The outlet generated here is the underlying outlet, only closed
// once all workers have been shut down.
// For state updates and events, separate sub-outlets will be used.
out, err := outlet(cfg, context.DynamicFields)
if err != nil {
return nil, err
}

// stateOut will only be unblocked if the beat is shut down.
// otherwise it can block on a full publisher pipeline, so state updates
// can be forwarded correctly to the registrar.
stateOut := channel.CloseOnSignal(channel.SubOutlet(out), context.BeatDone)

meta := context.Meta
if len(meta) == 0 {
meta = nil
}

p := &Input{
config: defaultConfig,
cfg: cfg,
harvesters: harvester.NewRegistry(),
outlet: out,
stateOutlet: stateOut,
states: file.NewStates(),
done: context.Done,
meta: meta,
config: defaultConfig,
cfg: cfg,
harvesters: harvester.NewRegistry(),
states: file.NewStates(),
done: context.Done,
meta: meta,
}

if err := cfg.Unpack(&p.config); err != nil {
Expand All @@ -121,7 +102,7 @@ func NewInput(

// Create empty harvester to check if configs are fine
// TODO: Do config validation instead
_, err = p.createHarvester(file.State{}, nil)
_, err := p.createHarvester(file.State{}, nil)
if err != nil {
return nil, err
}
Expand All @@ -135,6 +116,25 @@ func NewInput(
return nil, err
}

// Note: underlying output.
// The input and harvester do have different requirements
// on the timings the outlets must be closed/unblocked.
// The outlet generated here is the underlying outlet, only closed
// once all workers have been shut down.
// For state updates and events, separate sub-outlets will be used.
out, err := outlet(cfg, context.DynamicFields)
if err != nil {
return nil, err
}

// stateOut will only be unblocked if the beat is shut down.
// otherwise it can block on a full publisher pipeline, so state updates
// can be forwarded correctly to the registrar.
stateOut := channel.CloseOnSignal(channel.SubOutlet(out), context.BeatDone)

p.outlet = out
p.stateOutlet = stateOut

logp.Info("Configured paths: %v", p.config.Paths)

return p, nil
Expand Down