diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 2974adb62394..e726118c510e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -39,6 +39,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Affecting all Beats* TLS or Beats that accept connections over TLS and validate client certificates. {pull}14146[14146] +- Fix panic in the Logstash output when trying to send events to closed connection. {pull}15568[15568] *Auditbeat* diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index 96374e192d08..967ae7d0f6c1 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -18,7 +18,9 @@ package logstash import ( + "errors" "net" + "sync" "time" "github.com/elastic/beats/libbeat/beat" @@ -37,6 +39,8 @@ type asyncClient struct { win *window connect func() error + + mutex sync.Mutex } type msgRef struct { @@ -113,7 +117,11 @@ func (c *asyncClient) Connect() error { } func (c *asyncClient) Close() error { + c.mutex.Lock() + defer c.mutex.Unlock() + logp.Debug("logstash", "close connection") + if c.client != nil { err := c.client.Close() c.client = nil @@ -197,12 +205,23 @@ func (c *asyncClient) publishWindowed( } func (c *asyncClient) sendEvents(ref *msgRef, events []publisher.Event) error { + client := c.getClient() + if client == nil { + return errors.New("connection closed") + } window := make([]interface{}, len(events)) for i := range events { window[i] = &events[i].Content } ref.count.Inc() - return c.client.Send(ref.callback, window) + return client.Send(ref.callback, window) +} + +func (c *asyncClient) getClient() *v2.AsyncClient { + c.mutex.Lock() + client := c.client + c.mutex.Unlock() + return client } func (r *msgRef) callback(seq uint32, err error) { diff --git a/testing/environments/snapshot.yml b/testing/environments/snapshot.yml index 3ee259b0400a..16c20266a6e6 100644 --- a/testing/environments/snapshot.yml +++ b/testing/environments/snapshot.yml @@ -17,7 +17,7 @@ services: - "indices.id_field_data.enabled=true" logstash: - image: docker.elastic.co/logstash/logstash:8.0.0-SNAPSHOT + image: docker.elastic.co/logstash/logstash@sha256:e01cf165142edf8d67485115b938c94deeda66153e9516aa2ce69ee417c5fc33 healthcheck: test: ["CMD", "curl", "-f", "http://localhost:9600/_node/stats"] retries: 600