Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drops event after spooler is blocked for a certain time #3091

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions filebeat/spooler/spooler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Spooler struct {
output Output // batch event output on flush
spool []*input.Event // Events being held by the Spooler.
wg sync.WaitGroup // WaitGroup used to control the shutdown.
skip bool
}

// Output spooler sends event to through Send method
Expand All @@ -31,6 +32,7 @@ type Output interface {
type spoolerConfig struct {
idleTimeout time.Duration // How often to flush the spooler if spoolSize is not reached.
spoolSize uint64 // Maximum number of events that are stored before a flush occurs.
dropAfter time.Duration
}

// New creates and returns a new Spooler. The returned Spooler must be
Expand All @@ -44,6 +46,7 @@ func New(
config: spoolerConfig{
idleTimeout: config.IdleTimeout,
spoolSize: config.SpoolSize,
dropAfter: 5 * time.Second,
},
output: out,
spool: make([]*input.Event, 0, config.SpoolSize),
Expand Down Expand Up @@ -131,15 +134,37 @@ func (s *Spooler) flush() int {
return 0
}

// clear buffer
defer func() {
s.spool = s.spool[:0]
}()

if s.skip {
logp.Debug("SSS", "%v events dropped.", count)
return count
}

// copy buffer
tmpCopy := make([]*input.Event, count)
copy(tmpCopy, s.spool)

// clear buffer
s.spool = s.spool[:0]
done := make(chan struct{})
go func() {
// send batched events to output
s.output.Send(tmpCopy)
close(done)
// TODO: Should be atomic operation
s.skip = false
}()

// send batched events to output
s.output.Send(tmpCopy)
if s.config.dropAfter > 0 {
select {
case <-done:
// Resets skip as soon as events can be sent again
case <-time.After(s.config.dropAfter):
s.skip = true
}
}

return count
}