From fc9b82c8c53a0d6c72cc0e9dff1c1ce4178fa07a Mon Sep 17 00:00:00 2001 From: William Easton Date: Tue, 4 Feb 2025 00:43:14 -0600 Subject: [PATCH 1/3] Add latency metrics for logstash async output (#42565) * Add latency metrics for logstash async output * Properly handle per-batch latency (cherry picked from commit accc5e147cdbd028f3bcb3c7a03340ea60f7469f) # Conflicts: # libbeat/outputs/logstash/async.go --- libbeat/outputs/logstash/async.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index a980d1cef32c..35d75c45cc0e 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -218,8 +218,14 @@ func (c *asyncClient) sendEvents(ref *msgRef, events []publisher.Event) error { for i := range events { window[i] = &events[i].Content } +<<<<<<< HEAD ref.count.Inc() return client.Send(ref.callback, window) +======= + ref.count.Add(1) + + return client.Send(ref.customizedCallback(), window) +>>>>>>> accc5e147 (Add latency metrics for logstash async output (#42565)) } func (c *asyncClient) getClient() *v2.AsyncClient { @@ -229,6 +235,7 @@ func (c *asyncClient) getClient() *v2.AsyncClient { return client } +<<<<<<< HEAD func (r *msgRef) callback(seq uint32, err error) { if err != nil { r.fail(seq, err) @@ -238,11 +245,27 @@ func (r *msgRef) callback(seq uint32, err error) { } func (r *msgRef) done(n uint32) { +======= +func (r *msgRef) customizedCallback() func(uint32, error) { + start := time.Now() + + return func(n uint32, err error) { + r.callback(start, n, err) + } +} + +func (r *msgRef) callback(start time.Time, n uint32, err error) { +>>>>>>> accc5e147 (Add latency metrics for logstash async output (#42565)) r.client.observer.AckedEvents(int(n)) r.slice = r.slice[n:] if r.win != nil { r.win.tryGrowWindow(r.batchSize) } + + // Report the latency for the batch of events + duration := time.Since(start) + r.client.observer.ReportLatency(duration) + r.dec() } From f1e3068a0637deb212ce4ca9b3e68b234259723e Mon Sep 17 00:00:00 2001 From: William Easton Date: Tue, 4 Feb 2025 10:42:02 -0600 Subject: [PATCH 2/3] Update async.go for 8.17 version --- libbeat/outputs/logstash/async.go | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index 35d75c45cc0e..ec245a968444 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -235,17 +235,6 @@ func (c *asyncClient) getClient() *v2.AsyncClient { return client } -<<<<<<< HEAD -func (r *msgRef) callback(seq uint32, err error) { - if err != nil { - r.fail(seq, err) - } else { - r.done(seq) - } -} - -func (r *msgRef) done(n uint32) { -======= func (r *msgRef) customizedCallback() func(uint32, error) { start := time.Now() @@ -255,7 +244,14 @@ func (r *msgRef) customizedCallback() func(uint32, error) { } func (r *msgRef) callback(start time.Time, n uint32, err error) { ->>>>>>> accc5e147 (Add latency metrics for logstash async output (#42565)) + if err != nil { + r.fail(seq, err) + } else { + r.done(seq) + } +} + +func (r *msgRef) done(n uint32) { r.client.observer.AckedEvents(int(n)) r.slice = r.slice[n:] if r.win != nil { From b5172e7701acd6fd8488216797c17babf56de5c2 Mon Sep 17 00:00:00 2001 From: William Easton Date: Tue, 4 Feb 2025 10:42:54 -0600 Subject: [PATCH 3/3] Update async.go sendevents for 8.17 version --- libbeat/outputs/logstash/async.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index ec245a968444..617ee8fdcfc1 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -218,14 +218,8 @@ func (c *asyncClient) sendEvents(ref *msgRef, events []publisher.Event) error { for i := range events { window[i] = &events[i].Content } -<<<<<<< HEAD ref.count.Inc() - return client.Send(ref.callback, window) -======= - ref.count.Add(1) - return client.Send(ref.customizedCallback(), window) ->>>>>>> accc5e147 (Add latency metrics for logstash async output (#42565)) } func (c *asyncClient) getClient() *v2.AsyncClient {