diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index a980d1cef32c..617ee8fdcfc1 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -219,7 +219,7 @@ func (c *asyncClient) sendEvents(ref *msgRef, events []publisher.Event) error { window[i] = &events[i].Content } ref.count.Inc() - return client.Send(ref.callback, window) + return client.Send(ref.customizedCallback(), window) } func (c *asyncClient) getClient() *v2.AsyncClient { @@ -229,7 +229,15 @@ func (c *asyncClient) getClient() *v2.AsyncClient { return client } -func (r *msgRef) callback(seq uint32, err error) { +func (r *msgRef) customizedCallback() func(uint32, error) { + start := time.Now() + + return func(n uint32, err error) { + r.callback(start, n, err) + } +} + +func (r *msgRef) callback(start time.Time, n uint32, err error) { if err != nil { r.fail(seq, err) } else { @@ -243,6 +251,11 @@ func (r *msgRef) done(n uint32) { if r.win != nil { r.win.tryGrowWindow(r.batchSize) } + + // Report the latency for the batch of events + duration := time.Since(start) + r.client.observer.ReportLatency(duration) + r.dec() }