Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filebeat not resuming after EOF #878

Closed
matejzero opened this issue Jan 28, 2016 · 45 comments
Closed

Filebeat not resuming after EOF #878

matejzero opened this issue Jan 28, 2016 · 45 comments
Assignees

Comments

@matejzero
Copy link

Hello,

I'm having troubles with filebeat not resuming after EOF. My filebeat is sending logs to 2 logstash nodes in loadbalance mode. Every day at random time, it stops sending with EOF error in log file:

2016-01-27T17:15:01+01:00 INFO Events sent: 918
2016-01-27T17:15:01+01:00 INFO Registry file updated. 491 states written.
2016-01-27T17:15:06+01:00 INFO Events sent: 490
2016-01-27T17:15:06+01:00 INFO Registry file updated. 491 states written.
2016-01-27T17:15:18+01:00 INFO Error publishing events (retrying): EOF
2016-01-27T17:15:19+01:00 INFO Events sent: 865
2016-01-27T17:15:19+01:00 INFO Registry file updated. 491 states written.
2016-01-28T00:00:05+01:00 INFO Harvester started for file: /logs/server1/httpd/20160128-httpd_access.log
2016-01-28T00:00:05+01:00 INFO Harvester started for file: /logs/server2/radius/20160128-radius.log

If I look at logstash logs, I see this

{:timestamp=>"2016-01-27T16:04:26.772000+0100", :message=>"Beats input: the pipeline is blocked, temporary refusing new connection.", :reconnect_backoff_sleep=>0.5, :level=>:warn}
{:timestamp=>"2016-01-27T17:15:18.720000+0100", :message=>"CircuitBreaker::rescuing exceptions", :name=>"Beats input", :exception=>LogStash::Inputs::Beats::InsertingToQueueTakeTooLong, :level=>:warn}
{:timestamp=>"2016-01-27T17:15:18.720000+0100", :message=>"Beats input: The circuit breaker has detected a slowdown or stall in the pipeline, the input is closing the current connection and rejecting new connection until the pipeline recover.", :exception=>LogStash::Inputs::BeatsSupport::CircuitBreaker::HalfOpenBreaker, :level=>:warn}
{:timestamp=>"2016-01-27T17:15:18.815000+0100", :message=>"Beats input: the pipeline is blocked, temporary refusing new connection.", :reconnect_backoff_sleep=>0.5, :level=>:warn}
{:timestamp=>"2016-01-27T17:15:18.816000+0100", :message=>"CircuitBreaker::Open", :name=>"Beats input", :level=>:warn}
{:timestamp=>"2016-01-27T17:15:18.817000+0100", :message=>"Beats input: The circuit breaker has detected a slowdown or stall in the pipeline, the input is closing the current connection and rejecting new connection until the pipeline recover.", :exception=>LogStash::Inputs::BeatsSupport::CircuitBreaker::OpenBreaker, :level=>:warn}
{:timestamp=>"2016-01-27T17:15:19.316000+0100", :message=>"Beats input: the pipeline is blocked, temporary refusing new connection.", :reconnect_backoff_sleep=>0.5, :level=>:warn}

Pipeline blocked logs continue for another 20s and then they stop. Logstash is still reading logs from redis, but filebeat never resumes. The only way to get logs flowing again is to restart filebeat.

Shouldn't filebeat resume sending of logs? At least that is the idea a got on filebeat irc channel.

@urso
Copy link

urso commented Jan 28, 2016

which filebeat version are you using?

Can you share your filebeat configuration?

Can you run filebeat in debug mode by running filebeat with -v -d '*'?

@matejzero
Copy link
Author

Yea, sorry for lack of information.

Filebeat version: 1.0.1

Configuration:

################### Filebeat Configuration #########################

filebeat:
  prospectors:
    -
      #
      # HTTP apache log files
      #
      paths:
        - "/logs/httpd/201601/*httpd_access.log"
      input_type: log
      fields:
        syslog_format: true
        http_proto: "http"
      fields_under_root: true
      document_type: httpd
    -
      #
      # HTTPS apache log files
      #
      paths:
        - "/logs/httpd/201601/*httpd_ssl_access.log"
      input_type: log
      fields:
        syslog_format: true
        http_proto: "https"
      fields_under_root: true
      document_type: httpd
    -
      #
      # Dovecot log files
      #
      paths:
        - "/logs/mail/201601/*mail.log"
      input_type: log
      fields:
        syslog_format: true
      fields_under_root: true
      document_type: dovecot
    -
      #
      # Postfix log files
      #
      paths:
        - "/logs/mail/201601/*mail.log"
      input_type: log
      fields:
        syslog_format: true
      fields_under_root: true
      document_type: postfix
    -
      #
      # Radius log files
      #
      paths:
        - "/logs/radius/201601/*radius.log"
      input_type: log
      fields:
        syslog_format: true
      fields_under_root: true
      document_type: radiusd
    -
      #
      # dhcpd log files
      #
      paths:
        - "/logs/dhcpd/201601/*dhcpd.log"
      input_type: log
      fields:
        syslog_format: true
      fields_under_root: true
      document_type: dhcpd

  registry_file: /var/lib/filebeat/registry
  spool_size: 4096
  #config_dir:

output:
  logstash:
    hosts: [ "logstash2:5055", "logstash1:5055" ]
    worker: 2
    loadbalance: true
    tls:
      certificate_authorities: ["/etc/pki/tls/certs/filebeat.crt"]
      certificate: "/etc/pki/tls/certs/filebeat.crt"
      certificate_key: "/etc/pki/tls/private/filebeat.key"

      # Controls whether the client verifies server certificates and host name.
      # If insecure is set to true, all server host names and certificates will be
      # accepted. In this mode TLS based connections are susceptible to
      # man-in-the-middle attacks. Use only for testing.
      #insecure: true

  # Debug output
  file:
    path: "/tmp/filebeat"
    filename: filebeat
    rotate_every_kb: 10000
    number_of_files: 2

shipper:
  name: filebeat

logging:
  to_syslog: false
  to_files: true
  files:
    path: /var/log/beats
    name: filebeat.log
    rotateeverybytes: 10485760 # = 10MB
    keepfiles: 5
  # Other available selectors are beat, publish, service
  #selectors: [ ]
  # Available log levels are: critical, error, warning, info, debug
  level: info

I did run it in debug mode (level: debug in config file) and what I see in logs is a bunch of:

2016-01-27T08:41:27+01:00 DBG  Try to publish %!s(int=200) events to logstash with window size %!s(int=1)
2016-01-27T08:41:27+01:00 DBG  %!s(int=0) events out of %!s(int=200) events sent to logstash. Continue sending ...

I can also run -v -d '*' as well if that will give any other data?

Matej

@urso
Copy link

urso commented Jan 28, 2016

window size becomes 1, if it filebeat failed too send over 'long' period of time. We probe logstash being operative by sending only 1 event in this case and waiting for an ACK. Which obviously failed (0 events out of 200 sent).

Does this repeat or will filebeat stop at some point logging these messages? There should be some info messages regarding EOF in between, right?

@urso
Copy link

urso commented Jan 28, 2016

@matejzero there's an issue in 1.0.1 the window size not growing anymore once reached a size of 1 potentially affecting throughput. Maybe you want to test 1.1 build.

output:
  logstash:
    hosts: [ "logstash2:5055", "logstash1:5055" ]
    worker: 2
    loadbalance: true
    tls:
      certificate_authorities: ["/etc/pki/tls/certs/filebeat.crt"]
      certificate: "/etc/pki/tls/certs/filebeat.crt"
      certificate_key: "/etc/pki/tls/private/filebeat.key"

This config generates 4 output workers (2 workers per logstash). The certificate and certificate_key options are not required, due to logstash not yet support client authentication.

@matejzero
Copy link
Author

This logging continues until I'm forced to restart filebeat (sometimes 1h, sometimes 20h). It never stops. As far as debug goes and infos about EOF, I usually noticed the problem too late and log files were already rotated. Will try to catch it next time.
If logging is set to 'info', then everything stops once it prints the EOF message.

I tried restarting logstash on both nodes, but filebeat did not resume sending (in case beats input plugin would crash). But it was receiving events from redis in the mean time.

I will change the number of workers just in case, logstash really can't ingest all the data (although it should, I'm mostly sending somewhere between 200-400 events/s to a double 8 core nodes).

I will also test out the 1.1 build.

@matejzero
Copy link
Author

It EOF again today. It did not resume sending, but both logstash nodes were working (receiving events via redis and I was able to telnet to beats port on logstash nodes).

I will upgrade to 1.2 nightly build today and see how it goes.

@urso
Copy link

urso commented Jan 29, 2016

yeah, testing 1.2 might be helpfull, as we've added some more debug output. The fact that debug output never stops excludes the chance of output being deadlocked. still no idea why reconnects do fail.

Thanks for testing.

When testing 1.2 you can start filebeat with -httpprof localhost:6060. This opens an http port exposing some debug information. Would be nice to get some stack traces via: http://localhost:6060/debug/pprof/goroutine?debug=1. To be able to collect stack traces from remote you have to start filebeat with -httpprof :6060.

@matejzero
Copy link
Author

I've updated to 1.2 and so far it is working for 3 days without a problem.

I will close the issue for now and reopen if the problem reoccur.

Thanks for the help!

@ruflin
Copy link
Member

ruflin commented Feb 1, 2016

@matejzero Thanks. Let us know if you have any issues with 1.2.

@matejzero
Copy link
Author

Will do.

@matejzero
Copy link
Author

Hello,

problem occurred again today and yesterday.

$ filebeat -version
filebeat version 1.2.0-SNAPSHOT (amd64) (it's a release from Jan 28 2016)

In log files, all I see is:

2016-02-04T07:52:16+01:00 DBG  Try to publish 1024 events to logstash with window size 1
2016-02-04T07:52:16+01:00 DBG  0 events out of 1024 events sent to logstash. Continue sending ...
2016-02-04T07:52:16+01:00 DBG  Try to publish 1024 events to logstash with window size 1
2016-02-04T07:52:16+01:00 DBG  0 events out of 1024 events sent to logstash. Continue sending ...

I did run it with httprof and here is the output of debug:

# curl http://localhost:6060/debug/pprof/goroutine?debug=1
goroutine profile: total 83
1 @ 0x636ee8 0x636cc3 0x6325a4 0x67288e 0x672aa0 0x65117a 0x6529dd 0x65349e 0x650c3e 0x45e581
#   0x636ee8    runtime/pprof.writeRuntimeProfile+0xb8  /usr/local/go/src/runtime/pprof/pprof.go:545
#   0x636cc3    runtime/pprof.writeGoroutine+0x93   /usr/local/go/src/runtime/pprof/pprof.go:507
#   0x6325a4    runtime/pprof.(*Profile).WriteTo+0xd4   /usr/local/go/src/runtime/pprof/pprof.go:236
#   0x67288e    net/http/pprof.handler.ServeHTTP+0x37e  /usr/local/go/src/net/http/pprof/pprof.go:199
#   0x672aa0    net/http/pprof.Index+0x200      /usr/local/go/src/net/http/pprof/pprof.go:211
#   0x65117a    net/http.HandlerFunc.ServeHTTP+0x3a /usr/local/go/src/net/http/server.go:1422
#   0x6529dd    net/http.(*ServeMux).ServeHTTP+0x17d    /usr/local/go/src/net/http/server.go:1699
#   0x65349e    net/http.serverHandler.ServeHTTP+0x19e  /usr/local/go/src/net/http/server.go:1862
#   0x650c3e    net/http.(*conn).serve+0xbee        /usr/local/go/src/net/http/server.go:1361

1 @ 0x42d443 0x42d504 0x404082 0x403bcb 0x463642 0x401099 0x42d060 0x45e581
#   0x463642    github.com/elastic/beats/libbeat/beat.Run+0xc2  /go/src/github.com/elastic/beats/libbeat/beat/beat.go:136
#   0x401099    main.main+0x99                  /go/src/github.com/elastic/beats/filebeat/main.go:20
#   0x42d060    runtime.main+0x2b0              /usr/local/go/src/runtime/proc.go:111

1 @ 0x45e581

1 @ 0x42d443 0x42d504 0x42d248 0x45e581
#   0x42d443    runtime.gopark+0x163        /usr/local/go/src/runtime/proc.go:186
#   0x42d504    runtime.goparkunlock+0x54   /usr/local/go/src/runtime/proc.go:191
#   0x42d248    runtime.forcegchelper+0xb8  /usr/local/go/src/runtime/proc.go:152

1 @ 0x42d2d4 0x41d149 0x45e581
#   0x42d2d4    runtime.Gosched+0x14    /usr/local/go/src/runtime/proc.go:167
#   0x41d149    runtime.bgsweep+0xd9    /usr/local/go/src/runtime/mgcsweep.go:56

1 @ 0x42d443 0x42d504 0x414e6a 0x45e581
#   0x42d443    runtime.gopark+0x163        /usr/local/go/src/runtime/proc.go:186
#   0x42d504    runtime.goparkunlock+0x54   /usr/local/go/src/runtime/proc.go:191
#   0x414e6a    runtime.runfinq+0xaa        /usr/local/go/src/runtime/mfinal.go:154

1 @ 0x40e12e 0x440072 0x6314a8 0x45e581
#   0x6314a8    os/signal.loop+0x18 /usr/local/go/src/os/signal/signal_unix.go:22

24 @ 0x42d443 0x418557 0x45e581
#   0x42d443    runtime.gopark+0x163        /usr/local/go/src/runtime/proc.go:186
#   0x418557    runtime.gcBgMarkWorker+0xf7 /usr/local/go/src/runtime/mgc.go:1289

1 @ 0x42d443 0x42d504 0x404082 0x403bcb 0x46032c 0x4647ab 0x4637c3 0x464c7c 0x45e581
#   0x46032c    github.com/elastic/beats/filebeat/beat.(*Filebeat).Run+0x6dc    /go/src/github.com/elastic/beats/filebeat/beat/filebeat.go:99
#   0x4647ab    github.com/elastic/beats/libbeat/beat.(*Beat).Run+0x33b     /go/src/github.com/elastic/beats/libbeat/beat/beat.go:263
#   0x4637c3    github.com/elastic/beats/libbeat/beat.(*Beat).Start+0x113   /go/src/github.com/elastic/beats/libbeat/beat/beat.go:173
#   0x464c7c    github.com/elastic/beats/libbeat/beat.Run.func1+0x3c        /go/src/github.com/elastic/beats/libbeat/beat/beat.go:120

1 @ 0x42d443 0x43bbb4 0x43b112 0x6a8af6 0x45e581
#   0x6a8af6    github.com/elastic/beats/libbeat/outputs/mode.(*LoadBalancerMode).start.func1+0x3a6 /go/src/github.com/elastic/beats/libbeat/outputs/mode/balance.go:164

1 @ 0x42d443 0x42d504 0x448219 0x41a41e 0x40f215 0x40f8b2 0x448157 0x41a41e 0x40f215 0x40f8b2 0x448157 0x41a41e 0x40f215 0x40f8b2 0x448157 0x41a41e 0x40f215 0x40f8b2 0x448157 0x41a41e 0x40f215 0x40f8b2 0x448157 0x41a41e 0x40f215 0x40f8b2 0x448157 0x41a41e 0x40f215 0x40f8b2 0x448157 0x41a41e
#   0x448219    time.Sleep+0xf9         /usr/local/go/src/runtime/time.go:59
#   0x41a41e    runtime.gcAssistAlloc+0x23e /usr/local/go/src/runtime/mgcmark.go:295
#   0x40f215    runtime.mallocgc+0x535      /usr/local/go/src/runtime/malloc.go:711
#   0x40f8b2    runtime.newobject+0x42      /usr/local/go/src/runtime/malloc.go:760
#   0x448157    time.Sleep+0x37         /usr/local/go/src/runtime/time.go:53
#   0x41a41e    runtime.gcAssistAlloc+0x23e /usr/local/go/src/runtime/mgcmark.go:295
#   0x40f215    runtime.mallocgc+0x535      /usr/local/go/src/runtime/malloc.go:711
#   0x40f8b2    runtime.newobject+0x42      /usr/local/go/src/runtime/malloc.go:760
#   0x448157    time.Sleep+0x37         /usr/local/go/src/runtime/time.go:53
#   0x41a41e    runtime.gcAssistAlloc+0x23e /usr/local/go/src/runtime/mgcmark.go:295
#   0x40f215    runtime.mallocgc+0x535      /usr/local/go/src/runtime/malloc.go:711
#   0x40f8b2    runtime.newobject+0x42      /usr/local/go/src/runtime/malloc.go:760
#   0x448157    time.Sleep+0x37         /usr/local/go/src/runtime/time.go:53
#   0x41a41e    runtime.gcAssistAlloc+0x23e /usr/local/go/src/runtime/mgcmark.go:295
#   0x40f215    runtime.mallocgc+0x535      /usr/local/go/src/runtime/malloc.go:711
#   0x40f8b2    runtime.newobject+0x42      /usr/local/go/src/runtime/malloc.go:760
#   0x448157    time.Sleep+0x37         /usr/local/go/src/runtime/time.go:53
#   0x41a41e    runtime.gcAssistAlloc+0x23e /usr/local/go/src/runtime/mgcmark.go:295
#   0x40f215    runtime.mallocgc+0x535      /usr/local/go/src/runtime/malloc.go:711
#   0x40f8b2    runtime.newobject+0x42      /usr/local/go/src/runtime/malloc.go:760
#   0x448157    time.Sleep+0x37         /usr/local/go/src/runtime/time.go:53
#   0x41a41e    runtime.gcAssistAlloc+0x23e /usr/local/go/src/runtime/mgcmark.go:295
#   0x40f215    runtime.mallocgc+0x535      /usr/local/go/src/runtime/malloc.go:711
#   0x40f8b2    runtime.newobject+0x42      /usr/local/go/src/runtime/malloc.go:760
#   0x448157    time.Sleep+0x37         /usr/local/go/src/runtime/time.go:53
#   0x41a41e    runtime.gcAssistAlloc+0x23e /usr/local/go/src/runtime/mgcmark.go:295
#   0x40f215    runtime.mallocgc+0x535      /usr/local/go/src/runtime/malloc.go:711
#   0x40f8b2    runtime.newobject+0x42      /usr/local/go/src/runtime/malloc.go:760
#   0x448157    time.Sleep+0x37         /usr/local/go/src/runtime/time.go:53
#   0x41a41e    runtime.gcAssistAlloc+0x23e /usr/local/go/src/runtime/mgcmark.go:295

2 @ 0x42d443 0x43bbb4 0x43b112 0x4b0409 0x45e581
#   0x4b0409    github.com/elastic/beats/libbeat/publisher.(*messageWorker).run+0x449   /go/src/github.com/elastic/beats/libbeat/publisher/worker.go:61

32 @ 0x42d443 0x42d504 0x4034e0 0x402c13 0x5456c3 0x45e581
#   0x5456c3    github.com/elastic/beats/filebeat/harvester.(*Harvester).Harvest+0xce3  /go/src/github.com/elastic/beats/filebeat/harvester/log.go:178

1 @ 0x40e12e 0x44887e 0x45e581
#   0x40e12e    runtime.notetsleepg+0x4e    /usr/local/go/src/runtime/lock_futex.go:203
#   0x44887e    runtime.timerproc+0xde      /usr/local/go/src/runtime/time.go:209

1 @ 0x42d443 0x43bbb4 0x43b112 0x4ab2fa 0x45e581
#   0x4ab2fa    github.com/elastic/beats/libbeat/publisher.(*bulkWorker).run+0x3ca  /go/src/github.com/elastic/beats/libbeat/publisher/bulk.go:57

1 @ 0x42d443 0x42d504 0x41735b 0x45e581
#   0x42d443    runtime.gopark+0x163        /usr/local/go/src/runtime/proc.go:186
#   0x42d504    runtime.goparkunlock+0x54   /usr/local/go/src/runtime/proc.go:191
#   0x41735b    runtime.backgroundgc+0x8b   /usr/local/go/src/runtime/mgc.go:900

1 @ 0x42d443 0x42756e 0x426a30 0x584cca 0x584d36 0x5888bc 0x5a626d 0x654371 0x653773 0x653636 0x653ccf 0x4bd399 0x45e581
#   0x426a30    net.runtime_pollWait+0x60                   /usr/local/go/src/runtime/netpoll.go:157
#   0x584cca    net.(*pollDesc).Wait+0x3a                   /usr/local/go/src/net/fd_poll_runtime.go:73
#   0x584d36    net.(*pollDesc).WaitRead+0x36                   /usr/local/go/src/net/fd_poll_runtime.go:78
#   0x5888bc    net.(*netFD).accept+0x27c                   /usr/local/go/src/net/fd_unix.go:408
#   0x5a626d    net.(*TCPListener).AcceptTCP+0x4d               /usr/local/go/src/net/tcpsock_posix.go:254
#   0x654371    net/http.tcpKeepAliveListener.Accept+0x41           /usr/local/go/src/net/http/server.go:2135
#   0x653773    net/http.(*Server).Serve+0xb3                   /usr/local/go/src/net/http/server.go:1887
#   0x653636    net/http.(*Server).ListenAndServe+0x136             /usr/local/go/src/net/http/server.go:1877
#   0x653ccf    net/http.ListenAndServe+0x8f                    /usr/local/go/src/net/http/server.go:1967
#   0x4bd399    github.com/elastic/beats/libbeat/service.BeforeRun.func1+0x69   /go/src/github.com/elastic/beats/libbeat/service/service.go:72

1 @ 0x42d443 0x43bbb4 0x43b112 0x452813 0x45e581
#   0x42d443    runtime.gopark+0x163        /usr/local/go/src/runtime/proc.go:186
#   0x43bbb4    runtime.selectgoImpl+0xa64  /usr/local/go/src/runtime/select.go:392
#   0x43b112    runtime.selectgo+0x12       /usr/local/go/src/runtime/select.go:212
#   0x452813    runtime.ensureSigM.func1+0x353  /usr/local/go/src/runtime/signal1_unix.go:227

1 @ 0x42d443 0x42d504 0x404511 0x403bcb 0x4bd23a 0x45e581
#   0x4bd23a    github.com/elastic/beats/libbeat/service.HandleSignals.func1+0x3a   /go/src/github.com/elastic/beats/libbeat/service/service.go:29

1 @ 0x42d443 0x42d504 0x4034e0 0x402c13 0x461cc8 0x461908 0x461622 0x45e581
#   0x461cc8    github.com/elastic/beats/filebeat/beat.(*Spooler).flush+0x128   /go/src/github.com/elastic/beats/filebeat/beat/spooler.go:148
#   0x461908    github.com/elastic/beats/filebeat/beat.(*Spooler).queue+0x148   /go/src/github.com/elastic/beats/filebeat/beat/spooler.go:124
#   0x461622    github.com/elastic/beats/filebeat/beat.(*Spooler).run+0x392 /go/src/github.com/elastic/beats/filebeat/beat/spooler.go:85

1 @ 0x42d443 0x43bbb4 0x43b112 0x4892b4 0x45e581
#   0x4892b4    github.com/elastic/beats/filebeat/crawler.(*Registrar).Run+0x4d4    /go/src/github.com/elastic/beats/filebeat/crawler/registrar.go:88

6 @ 0x42d443 0x42d504 0x448219 0x484f3d 0x48afaa 0x483c8d 0x45e581
#   0x448219    time.Sleep+0xf9                             /usr/local/go/src/runtime/time.go:59
#   0x484f3d    github.com/elastic/beats/filebeat/crawler.ProspectorLog.Run+0x21d   /go/src/github.com/elastic/beats/filebeat/crawler/prospector_log.go:62
#   0x48afaa    github.com/elastic/beats/filebeat/crawler.(*ProspectorLog).Run+0xaa <autogenerated>:6
#   0x483c8d    github.com/elastic/beats/filebeat/crawler.(*Prospector).Run+0x1ad   /go/src/github.com/elastic/beats/filebeat/crawler/prospector.go:95

1 @ 0x42d443 0x42d504 0x404511 0x403bcb 0x4afc11 0x4af8eb 0x4ac0a7 0x4621a1 0x45e581
#   0x4afc11    github.com/elastic/beats/libbeat/publisher.(*syncPublisher).send+0x301      /go/src/github.com/elastic/beats/libbeat/publisher/sync.go:51
#   0x4af8eb    github.com/elastic/beats/libbeat/publisher.(*syncPublisher).PublishEvents+0xbb  /go/src/github.com/elastic/beats/libbeat/publisher/sync.go:29
#   0x4ac0a7    github.com/elastic/beats/libbeat/publisher.(*client).PublishEvents+0x197    /go/src/github.com/elastic/beats/libbeat/publisher/client.go:104
#   0x4621a1    github.com/elastic/beats/filebeat/beat.(*syncLogPublisher).Start.func1+0x3c1    /go/src/github.com/elastic/beats/filebeat/beat/publish.go:103

I can telnet to logstash port without problem:

# telnet logstash2 5055
Trying 193.2.18.183...
Connected to logstash2.arnes.si.

In logstash, I see the following errors around the time filebeat stopped sending logs:

:timestamp=>"2016-02-03T11:15:45.642000+0100", :message=>"Beats input: the pipeline is blocked, temporary refusing new connection.", :reconnect_backoff_sleep=>0.5, :level=>:warn}
{:timestamp=>"2016-02-03T13:15:20.553000+0100", :message=>"CircuitBreaker::rescuing exceptions", :name=>"Beats input", :exception=>LogStash::Inputs::Beats::InsertingToQueueTakeTooLong, :level=>:warn}
{:timestamp=>"2016-02-03T13:15:20.554000+0100", :message=>"Beats input: The circuit breaker has detected a slowdown or stall in the pipeline, the input is closing the current connection and rejecting new connection until the pipeline recover.", :exception=>LogStash::Inputs::BeatsSupport::CircuitBreaker::HalfOpenBreaker, :level=>:warn}
{:timestamp=>"2016-02-03T13:15:20.605000+0100", :message=>"Beats input: the pipeline is blocked, temporary refusing new connection.", :reconnect_backoff_sleep=>0.5, :level=>:warn}
{:timestamp=>"2016-02-03T13:15:20.606000+0100", :message=>"CircuitBreaker::Open", :name=>"Beats input", :level=>:warn}
{:timestamp=>"2016-02-03T13:15:20.606000+0100", :message=>"Beats input: The circuit breaker has detected a slowdown or stall in the pipeline, the input is closing the current connection and rejecting new connection until the pipeline recover.", :exception=>LogStash::Inputs::BeatsSupport::CircuitBreaker::OpenBreaker, :level=>:warn}
{:timestamp=>"2016-02-03T13:15:21.105000+0100", :message=>"Beats input: the pipeline is blocked, temporary refusing new connection.", :reconnect_backoff_sleep=>0.5, :level=>:warn}
{:timestamp=>"2016-02-03T13:15:21.606000+0100", :message=>"Beats input: the pipeline is blocked, temporary refusing new connection.", :reconnect_backoff_sleep=>0.5, :level=>:warn}

According to grafana metrics, logs stopped flowing to logstash at around 13:16:00. The last error message in logstash logs is at 13:50:50:

{:timestamp=>"2016-02-03T13:15:50.136000+0100", :message=>"Beats input: the pipeline is blocked, temporary refusing new connection.", :reconnect_backoff_sleep=>0.5, :level=>:warn}

Let me know if you need more information.

@urso
Copy link

urso commented Feb 4, 2016

Thanks for the details. Very helpful, as I haven't been able to reproduce this problem myself yet.

Skimming the trace, it looks like a deadlock in filebeat publisher, caused by a number of (potentially) temporary send failures by logstash queues being clogged. I assume filebeat stops writing this message: Try to publish 1024 events to logstash with window size 1?

Would be interesting to take a second trace like 10 seconds after and do a diff to confirm we're really dealing with a deadlock.

A similar error happened like 3 hours before (11:15;45), did filebeat recover in this case? Do we have some logs?

@matejzero
Copy link
Author

Hey there,

filebeat never stops writing Try to publish 1024 events to logstash with window size 1. It just floods the logs (I have my logs set to debug) and if I don't discover stall quick enough, it overwrites all of my log files, so I can't get any useful info out of it.

Next time it hangs, I will do 2 traces, so you will be able to do a diff.

According to logstash logs, filebeat recovered at the first event.

So yesterdays scenario was the following:

  • I restarted filebeat at around 10:14
  • At 11:16, there was a stall, that is seen in logstash logs:
{:timestamp=>"2016-02-03T10:35:26.916000+0100", :message=>"Beats Input: Remote connection closed", :peer=>"193.2.18.167:33116", :exception=>#<Lumberjack::Beats::Connection::ConnectionClosed: Lumberjack::Beats::Connection::ConnectionClosed wrapping: EOFError, End of file reached>, :level=>:warn}
{:timestamp=>"2016-02-03T11:15:16.056000+0100", :message=>"CircuitBreaker::rescuing exceptions", :name=>"Beats input", :exception=>LogStash::Inputs::Beats::InsertingToQueueTakeTooLong, :level=>:warn}
{:timestamp=>"2016-02-03T11:15:16.057000+0100", :message=>"Beats input: The circuit breaker has detected a slowdown or stall in the pipeline, the input is closing the current connection and rejecting new connection until the pipeline recover.", :exception=>LogStash::Inputs::BeatsSupport::CircuitBreaker::HalfOpenBreaker, :level=>:warn}
{:timestamp=>"2016-02-03T11:15:21.114000+0100", :message=>"Beats input: the pipeline is blocked, temporary refusing new connection.", :reconnect_backoff_sleep=>0.5, :level=>:warn}
{:timestamp=>"2016-02-03T11:15:21.115000+0100", :message=>"CircuitBreaker::Open", :name=>"Beats input", :level=>:warn}
{:timestamp=>"2016-02-03T11:15:21.115000+0100", :message=>"Beats input: The circuit breaker has detected a slowdown or stall in the pipeline, the input is closing the current connection and rejecting new connection until the pipeline recover.", :exception=>LogStash::Inputs::BeatsSupport::CircuitBreaker::OpenBreaker, :level=>:warn}
{:timestamp=>"2016-02-03T11:15:21.615000+0100", :message=>"Beats input: the pipeline is blocked, temporary refusing new connection.", :reconnect_backoff_sleep=>0.5, :level=>:warn}
{:timestamp=>"2016-02-03T11:15:22.115000+0100", :message=>"Beats input: the pipeline is blocked, temporary refusing new connection.", :reconnect_backoff_sleep=>0.5, :level=>:warn}
  • Traffic resumed after 2016-02-03T11:15:45.642000+0100.
  • It stalled again at 2016-02-03T13:15:20.553000+0100, logstash was printing errors Beats input: the pipeline is blocked, temporary refusing new connection. until 2016-02-03T13:15:50.136000+0100 and then it stopped

Graph of event:
screen shot 2016-02-04 at 11 46 14

@matejzero matejzero reopened this Feb 4, 2016
@urso
Copy link

urso commented Feb 4, 2016

filebeat never stops writing Try to publish 1024 events to logstash with window size 1. It just floods the logs (I have my logs set to debug) and if I don't discover stall quick enough, it overwrites all of my log files, so I can't get any useful info out of it.

Hm... This is a sign filebeat actually not being deadlocked, but failing to reconnect.

@urso
Copy link

urso commented Feb 4, 2016

As you are running in debug mode (given you have the '*' or 'logstash' selector set), You still see 'connect' and 'close connection' messages in your logs?

@matejzero
Copy link
Author

As far as selectors go, this is what I have in my config file:

  # Other available selectors are beat, publish, service
  #selectors: [ ]
  # Available log levels are: critical, error, warning, info, debug
  level: debug

I forgot to make a copy of log files when I restarted filebeat, so I can't check if I see connect or close connection in the logs. If I had level set to info, the only thing I could see was EOF error and nothing after that. But as far as I remember, I did not see a connect messages in the logs...

I also tried restarting logstash before restarting filebeat just in case, there was a problem with logstash's beat plugin, but even after that, filebeat did not reconnect back. On the other hand, it always reconnects and starts sending data if I restart filebeat.

@urso urso self-assigned this Feb 4, 2016
@urso
Copy link

urso commented Feb 4, 2016

OK, Thanks.

@ph
Copy link
Contributor

ph commented Feb 4, 2016

@matejzero Can you tell me the version of logstash you are using and the version of the beats input?
You can get the version of the plugin you are currently using with bin/plugin list --verbose beats

I have seen concurrency error in pre 2.1.2 version of the plugins, but not causing the kind of deadlock you are currently seeing? The currency was revisited in 2.1.2 and it might solve your issue.

@ph
Copy link
Contributor

ph commented Feb 4, 2016

Also what is your logstash configuration in that case?

@matejzero
Copy link
Author

Logstash version 2.1.1

Beats plugin version:

$ bin/plugin list --verbose beats
logstash-input-beats (2.1.2

I need some time to get logstash configuration together since it's a rather big and need to remove some entries out... Will post later.

@ph
Copy link
Contributor

ph commented Feb 4, 2016

@matejzero One more thing can you do thread dump when this situation happen? jstack <PID>
So we will know fore sure if and what the logstash threads are doing?

@matejzero
Copy link
Author

Thread dump of logstash if filebeat?

@ph
Copy link
Contributor

ph commented Feb 4, 2016

The thread dump of logstash.

@ph
Copy link
Contributor

ph commented Feb 4, 2016

I also tried restarting logstash before restarting filebeat just in case, there was a problem with logstash's beat plugin, but even after that, filebeat did not reconnect back. On the other hand, it always reconnects and starts sending data if I restart filebeat.

@urso ^ this is weird though no?

@matejzero
Copy link
Author

It happened again, and oddly, at the same time?!

According to logstash logs, this are the times where I see slowdown logs in logstash:

  • 2016-02-04T11:15:15.616000+0100
  • 2016-02-04T13:15:15.718000+0100
  • 2016-02-04T14:48:40.716000+0100

I can't send you the output of httpprof, because I upgraded filebeat to 1.1.0 today and forgot to add -httpprof parameter. Will update init script.

As far as jstack goes, I tried getting thread dumb, but I see the following error:

$ jstack 894
894: Unable to open socket file: target process not responding or HotSpot VM not loaded
The -F option can be used when the target process is not responding

I tried with -F and got the following:

jstack -F 894
Attaching to process ID 894, please wait...
Debugger attached successfully.
Server compiler detected.
JVM version is 25.71-b15
Deadlock Detection:

java.lang.RuntimeException: Unable to deduce type of thread from address 0x00007f7d90006000 (expected type JavaThread, CompilerThread, ServiceThread, JvmtiAgentThread, or SurrogateLockerThread)
    at sun.jvm.hotspot.runtime.Threads.createJavaThreadWrapper(Threads.java:169)
    at sun.jvm.hotspot.runtime.Threads.first(Threads.java:153)
    at sun.jvm.hotspot.runtime.DeadlockDetector.createThreadTable(DeadlockDetector.java:149)
    at sun.jvm.hotspot.runtime.DeadlockDetector.print(DeadlockDetector.java:56)
    at sun.jvm.hotspot.runtime.DeadlockDetector.print(DeadlockDetector.java:39)
    at sun.jvm.hotspot.tools.StackTrace.run(StackTrace.java:62)
    at sun.jvm.hotspot.tools.StackTrace.run(StackTrace.java:45)
    at sun.jvm.hotspot.tools.JStack.run(JStack.java:66)
    at sun.jvm.hotspot.tools.Tool.startInternal(Tool.java:260)
    at sun.jvm.hotspot.tools.Tool.start(Tool.java:223)
    at sun.jvm.hotspot.tools.Tool.execute(Tool.java:118)
    at sun.jvm.hotspot.tools.JStack.main(JStack.java:92)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at sun.tools.jstack.JStack.runJStackTool(JStack.java:140)
    at sun.tools.jstack.JStack.main(JStack.java:106)
Caused by: sun.jvm.hotspot.types.WrongTypeException: No suitable match for type of address 0x00007f7d90006000
    at sun.jvm.hotspot.runtime.InstanceConstructor.newWrongTypeException(InstanceConstructor.java:62)
    at sun.jvm.hotspot.runtime.VirtualConstructor.instantiateWrapperFor(VirtualConstructor.java:80)
    at sun.jvm.hotspot.runtime.Threads.createJavaThreadWrapper(Threads.java:165)
    ... 17 more
Can't print deadlocks:Unable to deduce type of thread from address 0x00007f7d90006000 (expected type JavaThread, CompilerThread, ServiceThread, JvmtiAgentThread, or SurrogateLockerThread)
Exception in thread "main" java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at sun.tools.jstack.JStack.runJStackTool(JStack.java:140)
    at sun.tools.jstack.JStack.main(JStack.java:106)
Caused by: java.lang.RuntimeException: Unable to deduce type of thread from address 0x00007f7d90006000 (expected type JavaThread, CompilerThread, ServiceThread, JvmtiAgentThread, or SurrogateLockerThread)
    at sun.jvm.hotspot.runtime.Threads.createJavaThreadWrapper(Threads.java:169)
    at sun.jvm.hotspot.runtime.Threads.first(Threads.java:153)
    at sun.jvm.hotspot.tools.StackTrace.run(StackTrace.java:75)
    at sun.jvm.hotspot.tools.StackTrace.run(StackTrace.java:45)
    at sun.jvm.hotspot.tools.JStack.run(JStack.java:66)
    at sun.jvm.hotspot.tools.Tool.startInternal(Tool.java:260)
    at sun.jvm.hotspot.tools.Tool.start(Tool.java:223)
    at sun.jvm.hotspot.tools.Tool.execute(Tool.java:118)
    at sun.jvm.hotspot.tools.JStack.main(JStack.java:92)
    ... 6 more
Caused by: sun.jvm.hotspot.types.WrongTypeException: No suitable match for type of address 0x00007f7d90006000
    at sun.jvm.hotspot.runtime.InstanceConstructor.newWrongTypeException(InstanceConstructor.java:62)
    at sun.jvm.hotspot.runtime.VirtualConstructor.instantiateWrapperFor(VirtualConstructor.java:80)
    at sun.jvm.hotspot.runtime.Threads.createJavaThreadWrapper(Threads.java:165)
    ... 14 more

@ph
Copy link
Contributor

ph commented Feb 4, 2016

2016-02-04T11:15:15.616000+0100
2016-02-04T13:15:15.718000+0100
2016-02-04T14:48:40.716000+0100

At the same time? This is odd? Nothing scheduled on the box?
Which jvm you are running? openjdk? version?
If you restart logstash filebeat, reconnect and continue sending data?
How is the cpu usage?

@ph
Copy link
Contributor

ph commented Feb 4, 2016

Your config would still be useful here to narrow down the code in the problem.
I suspect logstash deadlock on something and is just stuck, this is why you have to use jstack -F to get something.

Is that the complete output from jstack?

@urso
Copy link

urso commented Feb 5, 2016

Did some more tests with filebeat->LS->ES setup. I make the pipeline in LS clog by killing ES for short period of time every so often. After a while I was able to reproduce this:

2016/02/05 02:02:00.157242 client.go:114: DBG  0 events out of 4096 events sent to logstash. Continue sending ...
2016/02/05 02:02:00.157284 client.go:136: DBG  Try to publish 4096 events to logstash with window size 32

No more INFO messages.

with window size shrinking. Checking with netstat filebeat->LS connections are still established, but no more data are transmitted. Interestingly (in my tests), new publish requests seem to be send to LS (at least no error is returned when writing to the socket), but no ACK is ever received until timeout. Based on many restarts + bad timing in some cases Iogstash output module can get into this weird state which is supposed to be broken by:

  1. eventually get an error from socket when sending/receiving (did not happen for me)
  2. limit number of retries on timeout before closing connection

Interestingly restarting logstash did work for me (connection was closed in filebeat due to error code and new connections have being created), but then I did run my tests all on localhost.

I think I found a bug in case 2 but need to do some more testing. This one might explain why filebeat used to recover once, but not the second time.

Follow progress in PR #927. I started improving unit tests + added a more aggressive connection closing policy. Fix for possible bug not yet included as I first want to try to reproduce it in unit test.

@urso urso added bug libbeat and removed Filebeat Filebeat labels Feb 5, 2016
@matejzero
Copy link
Author

@ph:
According to logs, nothing. sar crontab runs a few seconds before, but then again, it runs every 10s, so that cant be the case.

JVM:

$ java -version
openjdk version "1.8.0_71"
OpenJDK Runtime Environment (build 1.8.0_71-b15)
OpenJDK 64-Bit Server VM (build 25.71-b15, mixed mode)

If I restart filebeat, it reconnects and start sending data, but if I restart logstash, then nothing happens. I HAVE to restart filebeat.

I will put my config together when I get to work.

@urso
Interesting. In my case, restarting logstash did not help, but I will try again when it happens.
Next time it happens, I will take a look at netstat output and do some tcpdump on the traffic to see what's happening.
What is interesting in my case is that redis input is still working and logstash is receiving and processing redis events and saving them to elasticsearch.

@matejzero
Copy link
Author

Here is my config file:

input {
  lumberjack {
    port => 5044
    ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
    ssl_key => "/etc/pki/tls/private/logstash-forwarder.key"
  }

  redis {
    type => "amavis"
    host => "1.2.3.4"
    db => 1
    data_type => "list"
    key => "amavis-log"
    codec => json {}
  }

  beats {
    port => 5055
    ssl => true
    ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
    ssl_key => "/etc/pki/tls/private/logstash-forwarder.key"
  }

  beats {
    port => 5056
    ssl => true
    ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
    ssl_key => "/etc/pki/tls/private/logstash-forwarder.key"
  }
}

filter {

  # Count all events received by Logstash, before any work is done on the events
  metrics {
    meter => "events"
    add_tag => [ "metric-events_received", "parsed" ]
    flush_interval => 10
  }

  ###  Logs with 'syslog_format' tag are parsed with SYSLOG GROK filter and only message is save to 'message' field
  if [syslog_format] == "true" {

    ### Pull out syslog timestamp/host/program and leave only app log under 'message' field
    grok {
      patterns_dir => [ '/etc/logstash/patterns' ]
      match => [ 'message', '%{SYSLOGLINE}']
      overwrite => [ 'message' ]
      add_tag => [ '_grok_syslog_prefilter_success' ]
      tag_on_failure => []
    }

    ### Rename logsource to host and remove "syslog_format" field
    mutate {
      rename => [ "logsource", "host" ]
      remove_field => [ "syslog_format" ]
    }

    ### Set timestamp format
    date {
      match => [ "timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }

    ### Set type according to 'program' field from syslog
    if [type] != "postfix" and [program] =~ /^postfix.*/ {
      mutate {
        replace => [ "type", "postfix" ]
      }
    } else if [type] != "dovecot" and [program] =~ /^dovecot.*/ {
      mutate {
        replace => [ "type", "dovecot" ]
      }
    } else if [program] =~ /^sqlgrey/ {
      mutate {
        replace => [ "type", "sqlgrey" ]
      }
    } else if [program] =~ /^clamd/ {
      mutate {
        replace => [ "type", "clamd" ]
      }
    } else if [program] =~ /^dccd/ {
      mutate {
        replace => [ "type", "dccd" ]
      }
    } else if [program] =~ /^radiusd/ {
      mutate {
        replace => [ "type", "radiusd" ]
      }
    } else if [program] =~ /^dhcpd/ {
      mutate {
        replace => [ "type", "dhcpd" ]
      }
    } else if [program] =~ /^httpd/ {
      mutate {
        replace => [ "type", "httpd" ]
      }
    }

  }

  else {

    mutate {
      add_tag => [ "_grok_syslog_prefilter_skipped" ]
    }

  }

  if [type] == "amavis" {

    # Amavis is sending time_unix in UNIX timestamp, match this field with @timestamp
    date {
      match => [ "time_unix", "UNIX" ]
    }

    # Add routing tags and remove unused fields
    mutate {
      add_tag      => [ "_grok_amavis_match", "save_to_elasticsearch" ]
      remove_field => [ "time_unix", "time_iso_week_date" ]
    }

  }

  if [type] == 'dovecot' {

    grok {
        patterns_dir => [ '/etc/logstash/patterns' ]
        match => [ 'message', '%{DOVECOT}']
        add_tag => [ 'save_to_elasticsearch', '_grok_dovecot_match' ]
        tag_on_failure => [ 'save_to_logfile', '_grok_dovecot_nomatch' ]
    }

    # Set crypto field to ssl,tls or none
    if [lip] {
        if [crypto] == 'secured' {
            mutate {
                replace => [ 'crypto', 'ssl' ]
            }
        } else if [crypto] == 'TLS' {
            mutate {
                lowercase => [ 'crypto' ]
            }
        } else {
            mutate {
                add_field => { 'crypto' => 'none' }
            }
        }
    }

    # Set tags for dovecot proxies
    if [proxy] {
      if [proxy_start] {
        mutate {
            add_tag => [ 'dovecot_proxy', 'proxying_started' ]
            remove_field => [ 'proxy', 'proxy_start' ]
        }
      } else if [conn_status] == 'disconnecting' {
        mutate {
            add_tag => [ 'dovecot_proxy', 'proxying_stopped' ]
            remove_field => [ 'proxy', 'conn_status' ]
        }
      }
    }

    # Get authentication status from [status_message] field
    # When client is disconnected
    if [conn_status] == 'Disconnected' {
        # Set auth_status field for disconnects
        if [status_message] == 'Logged out' {
            # Normal logout - imap(user): Disconnected: Logged out in=116 out=1003
            mutate {
               add_field => { 'auth_status' => 'loggedout' }
            }
        } else if [status_message] =~ /^for inactivity/ {
            # Inactivity - imap(user): Disconnected for inactivity in=223 out=1099
            mutate {
                add_field => { 'auth_status' => 'inactivity' }
            }
        } else if [status_message] == 'Disconnected' {
            # Disconnected - imap(user): Disconnected: Disconnected in=58 out=355013
            mutate {
                add_field => { 'auth_status' => 'disconnected' }
            }
        } else if [status_message] =~ /^Disconnected in IDLE/ {
            # IDLE - imap(user): Disconnected: Disconnected in IDLE in=476 out=4467
            mutate {
               add_field => { 'auth_status' => 'idle' }
            }
        } else if [status_message] =~ /^Disconnected in APPEND/ {
            # No auth attempts - imap(user): Disconnected: Disconnected in APPEND (1 msgs, 5 secs, 892928/1036013 bytes) in=127060...
            mutate {
               add_field => { 'auth_status' => 'append' }
            }
        }  else if [status_message] =~ /Internal error occurred\. Refer to server log for more information\./ {
            # No auth attempts - pop3(56tg): Error: Couldn't init INBOX: Internal error occurred. Refer
            mutate {
                add_field => { 'auth_status' => 'internalerror' }
            }
        } else if [status_message] =~ /auth failed, \d+ attempts( in \d+ secs)?/ {
            # Auth failed - pop3-login: Disconnected (auth failed, 1 attempts in 4 secs): user=<user>, method=PLAIN, ri
            mutate {
               add_field => { 'auth_status' => 'authfailed' }
            }
        } else if [status_message] =~ /no auth attempts( in \d+ secs)?/ {
            # No auth attempts - pop3-login: Aborted login (no auth attempts in 0 secs): user=<>, rip=4.3.4.3, lip=1
            mutate {
                add_field => { 'auth_status' => 'noauthattempt' }
            }
        } else if [status_message] =~ /password expired/ {
            # Password expired - pop3-login: Disconnected (password expired): user=<aaaaa>, method=PLAIN, rip=2
            mutate {
                add_field => { 'auth_status' => 'passexpired' }
            }
        } else if [status_message] =~ /Maximum number of connections from user\+IP exceeded/ {
            # Max number of conn exceeded - imap-login: Maximum number of connections from user+IP exceeded (mail_max_userip_connections=50): user=<username>, method=PLAI
            mutate {
                add_field => { 'auth_status' => 'max_conn_exceeded' }
            }
        } else if [status_message] =~ /disconnected while authenticating(, waited \d+ secs)?/ {
            # Disconnect while auth
            mutate {
               add_field => { 'auth_status' => 'authenticating' }
            }
        } else if [status_message] =~ /client didn\'t finish SASL auth, waited \d+ secs/ {
            # Disconnect while auth
            mutate {
                add_field => { 'auth_status' => 'authenticating' }
            }
        } else if [status_message] =~ /disconnected before greeting(, waited \d+ secs)?/ {
            # Disconnect before greeting
            mutate {
                add_field => { 'auth_status' => 'beforegreeting' }
            }
        } else {
            mutate {
                add_field => { 'auth_status' => 'other' }
            }
        }
    # When connection is closed
    } else if [conn_status] == 'Connection closed' {
        mutate {
            add_field => { 'auth_status' => 'connclosed' }
        }
    # When login is aborted
    } else if [conn_status] == 'Aborted login' {
        if [status_message] =~ /auth failed, \d+ attempts( in \d+ secs)?/ {
            # Auth failed - pop3-login: Aborted login (auth failed, 1 attempts in 4 secs): user=<user>, method=PLAIN, ri
            mutate {
                add_field => { 'auth_status' => 'authfailed' }
            }
        } else if [status_message] =~ /no auth attempts( in \d+ secs)?/ {
            # No auth attempts - pop3-login: Aborted login (no auth attempts in 0 secs): user=<>, rip=3.6.4.5, lip=1
            mutate {
                add_field => { 'auth_status' => 'noauthattempt' }
            }
        } else if [status_message] =~ /disconnected before greeting(, waited \d+ secs)?/ {
            # Disconnect before greeting
            mutate {
                add_field => { 'auth_status' => 'beforegreeting' }
            }
        } else {
            # Other statuses
            mutate {
                add_field => { 'auth_status' => 'other' }
            }
        }
    }

    # Tag lmtp logs and add [saved_location] field with save location
    if [proto] == 'lmtp' {
      if [status_message] =~ /(saved mail to|stored mail into mailbox)/ {
        mutate {
          add_tag => [ 'lmtp_saved' ]
        }
        # Add field if successfully saved
        if [status_message] =~ /INBOX/ {
              mutate {
                add_field => { 'saved_location' => 'inbox' }
              }
        } else if [status_message] =~ /mail\/spam/ {
              mutate {
                add_field => { 'saved_location' => 'spam' }
              }
        } else {
              mutate {
                add_field => { 'saved_location' => 'other' }
              }
        }
      } else if [status_message] =~ /save failed to/ {
        mutate {
          add_tag => [ 'lmtp_failed' ]
        }
      }
    }

    ### Add client's GeoData
    geoip {
        source => [ 'rip' ]
        fields => [ 'country_code2', 'country_name', 'latitude', 'longitude']
    }

    # Do some data type conversions
    mutate {
        convert => [
            # list of integer fields
            'mpid', 'integer',
            'bytes_in', 'integer',
            'bytes_out', 'integer'
        ]
    }

  }

  if ( [type] == "postfix" ) {

    # grok log lines by program name (listed alpabetically)
    if [program] =~ /^postfix.*\/anvil$/ {
        grok {
            patterns_dir   => "/etc/logstash/patterns"
            match          => [ "message", "%{POSTFIX_ANVIL}" ]
            tag_on_failure => [ "_grok_postfix_anvil_nomatch" ]
            add_tag        => [ "_grok_postfix_success" ]
        }
    } else if [program] =~ /^postfix.*\/bounce$/ {
        grok {
            patterns_dir   => "/etc/logstash/patterns"
            match          => [ "message", "%{POSTFIX_BOUNCE}" ]
            tag_on_failure => [ "_grok_postfix_bounce_nomatch" ]
            add_tag        => [ "_grok_postfix_success" ]
        }
    } else if [program] =~ /^postfix.*\/cleanup$/ {
        grok {
            patterns_dir   => "/etc/logstash/patterns"
            match          => [ "message", "%{POSTFIX_CLEANUP}" ]
            tag_on_failure => [ "_grok_postfix_cleanup_nomatch" ]
            add_tag        => [ "_grok_postfix_success" ]
        }
    } else if [program] =~ /^postfix.*\/dnsblog$/ {
        grok {
            patterns_dir   => "/etc/logstash/patterns"
            match          => [ "message", "%{POSTFIX_DNSBLOG}" ]
            tag_on_failure => [ "_grok_postfix_dnsblog_nomatch" ]
            add_tag        => [ "_grok_postfix_success" ]
        }
    } else if [program] =~ /^postfix.*\/local$/ {
        grok {
            patterns_dir   => "/etc/logstash/patterns"
            match          => [ "message", "%{POSTFIX_LOCAL}" ]
            tag_on_failure => [ "_grok_postfix_local_nomatch" ]
            add_tag        => [ "_grok_postfix_success" ]
        }
    } else if [program] =~ /^postfix.*\/master$/ {
        grok {
            patterns_dir   => "/etc/logstash/patterns"
            match          => [ "message", "%{POSTFIX_MASTER}" ]
            tag_on_failure => [ "_grok_postfix_master_nomatch" ]
            add_tag        => [ "_grok_postfix_success" ]
        }
    } else if [program] =~ /^postfix.*\/pickup$/ {
        grok {
            patterns_dir   => "/etc/logstash/patterns"
            match          => [ "message", "%{POSTFIX_PICKUP}" ]
            tag_on_failure => [ "_grok_postfix_pickup_nomatch" ]
            add_tag        => [ "_grok_postfix_success" ]
        }
    } else if [program] =~ /^postfix.*\/pipe$/ {
        grok {
            patterns_dir   => "/etc/logstash/patterns"
            match          => [ "message", "%{POSTFIX_PIPE}" ]
            tag_on_failure => [ "_grok_postfix_pipe_nomatch" ]
            add_tag        => [ "_grok_postfix_success" ]
        }
    } else if [program] =~ /^postfix.*\/postdrop$/ {
        grok {
            patterns_dir   => "/etc/logstash/patterns"
            match          => [ "message", "%{POSTFIX_POSTDROP}" ]
            tag_on_failure => [ "_grok_postfix_postdrop_nomatch" ]
            add_tag        => [ "_grok_postfix_success" ]
        }
    } else if [program] =~ /^postfix.*\/postscreen$/ {
        grok {
            patterns_dir   => "/etc/logstash/patterns"
            match          => [ "message", "%{POSTFIX_POSTSCREEN}" ]
            tag_on_failure => [ "_grok_postfix_postscreen_nomatch" ]
            add_tag        => [ "_grok_postfix_success" ]
        }
    } else if [program] =~ /^postfix.*\/qmgr$/ {
        grok {
            patterns_dir   => "/etc/logstash/patterns"
            match          => [ "message", "%{POSTFIX_QMGR}" ]
            tag_on_failure => [ "_grok_postfix_qmgr_nomatch" ]
            add_tag        => [ "_grok_postfix_success" ]
        }
    } else if [program] =~ /^postfix.*\/scache$/ {
        grok {
            patterns_dir   => "/etc/logstash/patterns"
            match          => [ "message", "%{POSTFIX_SCACHE}" ]
            tag_on_failure => [ "_grok_postfix_scache_nomatch" ]
            add_tag        => [ "_grok_postfix_success" ]
        }
    } else if [program] =~ /^postfix.*\/sendmail$/ {
        grok {
            patterns_dir   => "/etc/logstash/patterns"
            match          => [ "message", "%{POSTFIX_SENDMAIL}" ]
            tag_on_failure => [ "_grok_postfix_sendmail_nomatch" ]
            add_tag        => [ "_grok_postfix_success" ]
        }
    } else if ( [program] =~ /^postfix.*\/smtp$/ ) or ( [program] =~ /^postfix.*\/lmtp$/ ) {
        grok {
            patterns_dir   => "/etc/logstash/patterns"
            match          => [ "message", "%{POSTFIX_SMTP}" ]
            tag_on_failure => [ "_grok_postfix_smtp_nomatch" ]
            add_tag        => [ "_grok_postfix_success" ]
        }
    } else if [program] =~ /^postfix.*\/lmtp$/ {
        grok {
            patterns_dir   => "/etc/logstash/patterns"
            match          => [ "message", "%{POSTFIX_LMTP}" ]
            tag_on_failure => [ "_grok_postfix_lmtp_nomatch" ]
            add_tag        => [ "_grok_postfix_success" ]
        }
    } else if [program] =~ /^postfix.*\/smtpd$/ {
        grok {
            patterns_dir   => "/etc/logstash/patterns"
            match          => [ "message", "%{POSTFIX_SMTPD}" ]
            tag_on_failure => [ "_grok_postfix_smtpd_nomatch" ]
            add_tag        => [ "_grok_postfix_success" ]
        }
    } else if [program] =~ /^postfix.*\/tlsmgr$/ {
        grok {
            patterns_dir   => "/etc/logstash/patterns"
            match          => [ "message", "%{POSTFIX_TLSMGR}" ]
            tag_on_failure => [ "_grok_postfix_tlsmgr_nomatch" ]
            add_tag        => [ "_grok_postfix_success" ]
        }
    } else if [program] =~ /^postfix.*\/tlsproxy$/ {
        grok {
            patterns_dir   => "/etc/logstash/patterns"
            match          => [ "message", "%{POSTFIX_TLSPROXY}" ]
            tag_on_failure => [ "_grok_postfix_tlsproxy_nomatch" ]
            add_tag        => [ "_grok_postfix_success" ]
        }
    } else if [program] =~ /^postfix.*\/trivial-rewrite$/ {
        grok {
            patterns_dir   => "/etc/logstash/patterns"
            match          => [ "message", "%{POSTFIX_TRIVIAL_REWRITE}" ]
            tag_on_failure => [ "_grok_postfix_trivial_rewrite_nomatch" ]
            add_tag        => [ "_grok_postfix_success" ]
        }
    } else if [program] =~ /^postfix.*\/discard$/ {
        grok {
            patterns_dir   => "/etc/logstash/patterns"
            match          => [ "message", "%{POSTFIX_DISCARD}" ]
            tag_on_failure => [ "_grok_postfix_discard_nomatch" ]
            add_tag        => [ "_grok_postfix_success" ]
        }
    } else if [program] =~ /^postfix.*\/virtual$/ {
        grok {
            patterns_dir   => "/etc/logstash/patterns"
            match          => [ "message", "%{POSTFIX_VIRTUAL}" ]
            tag_on_failure => [ "_grok_postfix_virtual_nomatch" ]
            add_tag        => [ "_grok_postfix_success" ]
        }
    } else if [program] =~ /^postfix.*/ {
        mutate {
            add_tag => [ "_grok_postfix_nomatch" ]
        }
    }

    # process key-value data is it exists
    if [postfix_keyvalue_data] {
        kv {
            source       => "postfix_keyvalue_data"
            trim         => "<>,"
            prefix       => "postfix_"
            remove_field => [ "postfix_keyvalue_data" ]
        }

        # some post processing of key-value data
        if [postfix_client] {
            grok {
                patterns_dir   => "/etc/logstash/patterns"
                match          => ["postfix_client", "%{POSTFIX_CLIENT_INFO}"]
                tag_on_failure => [ "_grok_kv_postfix_client_nomatch" ]
                remove_field   => [ "postfix_client" ]
            }
        }
        if [postfix_relay] {
            grok {
                patterns_dir   => "/etc/logstash/patterns"
                match          => ["postfix_relay", "%{POSTFIX_RELAY_INFO}"]
                tag_on_failure => [ "_grok_kv_postfix_relay_nomatch" ]
                remove_field   => [ "postfix_relay" ]
            }
        }
        if [postfix_delays] {
            grok {
                patterns_dir   => "/etc/logstash/patterns"
                match          => ["postfix_delays", "%{POSTFIX_DELAYS}"]
                tag_on_failure => [ "_grok_kv_postfix_delays_nomatch" ]
                remove_field   => [ "postfix_delays" ]
            }
        }
    }

    # Do some data type conversions
    mutate {
        convert => [
            # list of integer fields
            "postfix_anvil_cache_size", "integer",
            "postfix_anvil_conn_count", "integer",
            "postfix_anvil_conn_rate", "integer",
            "postfix_client_port", "integer",
            "postfix_nrcpt", "integer",
            "postfix_postscreen_cache_dropped", "integer",
            "postfix_postscreen_cache_retained", "integer",
            "postfix_postscreen_dnsbl_rank", "integer",
            "postfix_relay_port", "integer",
            "postfix_server_port", "integer",
            "postfix_size", "integer",
            "postfix_status_code", "integer",
            "postfix_termination_signal", "integer",
            "postfix_uid", "integer",

            # list of float fields
            "postfix_delay", "float",
            "postfix_delay_before_qmgr", "float",
            "postfix_delay_conn_setup", "float",
            "postfix_delay_in_qmgr", "float",
            "postfix_delay_transmission", "float",
            "postfix_postscreen_violation_time", "float"
        ]
        # rename logsource field to host
        rename => [ "logsource", "host" ]
    }

    # Add client's GeoData
    geoip {
      source => [ "postfix_client_ip" ]
      fields => [ "country_code2", "country_name", "latitude", "longitude"]
    }


    # Send nomatch logs to logfile
    if "_grok_postfix_success" in [tags] {
      mutate {
        add_tag => [ 'save_to_elasticsearch' ]
      }
    } else {
      mutate {
        add_tag => [ 'save_to_logfile' ]
      }
    }


  }

  if [type] == "radiusd" {

    grok {
      patterns_dir => [ '/etc/logstash/patterns' ]
      match => [ 'message', '%{RADIUS}']
      add_tag => [ 'save_to_elasticsearch', '_grok_radiusd_match' ]
      tag_on_failure => [ '_grok_radiusd_nomatch' ]
    }

    # 'Login OK' to lowercase
    if [status] == "Login OK" {
      mutate {
        replace => [ "status", "ok" ]
      }
    }

    # Replace status 'Login incorrect' with 'failed'
    if [status] == "Login incorrect" {
      mutate {
        replace => [ "status", "failed" ]
      }
    }

    # If message is not a 'login/logout' message, set status to 'other', send it to elasticsearch and log file
    if ![status] {
      mutate {
        add_field => { "status" => "other" }
        remove_tag => [ '_grok_radiusd_nomatch' ]
        add_tag => [ 'save_to_logfile', 'save_to_elasticsearch' ]
      }
    }

    # Add 'timestamp_utc' field, so AAI can show it on their web page
    mutate {
      add_field => {
        "timestamp_utc" => "%{@timestamp}"
      }
    }

  }

  if [type] == "dhcpd" {

    if [message] =~ /^client/ {
      grok {
        patterns_dir => [ '/etc/logstash/patterns' ]
        match => [ 'message', '%{DHCPD_AAI_CLIENT}']
        add_tag => [ 'save_to_elasticsearch', '_grok_dhcpd_client_match' ]
        tag_on_failure => [ 'save_to_logfile', '_grok_dhcpd_client_nomatch']
        add_field => { status => "ok"}
      }
    } else if [message] =~ /^DHCPDISCOVER/ {
      grok {
        patterns_dir => [ '/etc/logstash/patterns' ]
        match => [ 'message', '%{DHCPD_AAI_DISCOVER}']
        add_tag => [ 'save_to_elasticsearch', '_grok_dhcpd_discover_match' ]
        tag_on_failure => [ 'save_to_logfile', '_grok_dhcpd_discover_nomatch']
        add_field => { status => "error"}
      }
    } else if [message] =~ /^DHCPDECLINE/ {
      grok {
        patterns_dir => [ '/etc/logstash/patterns' ]
        match => [ 'message', '%{DHCPD_AAI_DECLINE}']
        add_tag => [ 'save_to_elasticsearch', '_grok_dhcpd_decline_match' ]
        tag_on_failure => [ 'save_to_logfile', '_grok_dhcpd_decline_nomatch']
        add_field => { status => "error"}
      }
    } else {
        mutate {
            add_tag => [ '_grok_dhcpd_nomatch', 'save_to_logfile' ]
        }
    }

    # Add 'timestamp_utc' field
    mutate {
      add_field => {
        "timestamp_utc" => "%{@timestamp}"
      }
    }

  }

  if [type] == "glassfish" {

    # Grok it
    grok {
      patterns_dir => [ '/etc/logstash/patterns' ]
      match => [ 'message', '%{GLASSFISH}']
      add_tag => [ '_grok_glassfish_match', 'save_to_elasticsearch' ]
      tag_on_failure => [ '_grok_glassfish_nomatch', 'save_to_logfile']
    }

    # Set host to server sending the logs
    mutate {
      replace => { "host" => "server1" }
    }

    # Possible date formats
    date {
        'match' => ['timestamp', 'ISO8601', 'YYYY-MM-dd HH:mm:ss,SSS' ]
    }

  }

  if [type] == "httpd" {

    ### Grok log files
    grok {
      patterns_dir => [ '/etc/logstash/patterns' ]
      match => [ 'message', '%{HTTPD}' ]
      add_tag => [ 'save_to_elasticsearch', '_grok_httpd_match' ]
      tag_on_failure => [ 'save_to_logfile', '_grok_httpd_nomatch']
    }

    ### If http_user_agent exists, add tag and apply plugin
    if [http_user_agent] != "-" and [http_user_agent] != "" {
       useragent {
                 add_tag => [ "UA" ]
                 source => "http_user_agent"
                 prefix => "UA-"
       }
    }

    ### Remove empty/useless fields
    if [http_user]        == "-"               { mutate { remove_field => "http_user" } }
    if [bytes]            == "-"               { mutate { remove_field => "bytes" } }
    if [http_method]      =~ "(HEAD|OPTIONS)"  { mutate { remove_field => "http_method" } }
    if [http_user_agent]  == "-"               { mutate { remove_field => "http_user_agent" } }
    if [http_referer]     == "-"               { mutate { remove_field => "http_referer" } }

    ### Remove UA fields that are not recognized
    #if "UA" in [tags] {
    #   if [UA-device] == "Other" { mutate { remove_field => "UA-device" } }
    #   if [UA-name]   == "Other" { mutate { remove_field => "UA-name" } }
    #   if [UA-os]     == "Other" { mutate { remove_field => "UA-os" } }
    #}

    ### Add GeoIP data
    geoip {
      source => [ "client_ip" ]
      fields => [ "country_code2", "country_name", "latitude", "longitude", "location" ]
    }

  }

# Add tags for outputs here

    # Tag all dovecot logs
    if [type] == "dovecot" {
        if [host] == "mailserver1" {
            mutate {
              add_tag => [ "statsd" ]
            }
        } else if [host] == "mailserver2" {
            mutate {
              add_tag => [ "statsd" ]
            }
        } else if [host] == "mailserver3" {
            mutate {
              add_tag => [ "statsd" ]
            }
        }

    }


    # count all events that are saved to ES
    if "save_to_elasticsearch" in [tags] {
        metrics {
            meter => "events"
            add_tag => "metric-output_es"
            flush_interval => 10
        }
    }

    # count all events that are saved to log files (not parsed)
    if "save_to_logfile" in [tags] {
        metrics {
            meter => "events"
            add_tag => "metric-output_logfile"
            flush_interval => 10
        }
    }

    # count all packetbeat packages
    if "packetbeat" in [tags] {
        metrics {
            meter => "events"
            add_tag => "metric-packetbeat"
            flush_interval => 10
        }
    }

}

output {

  ### AMAVIS statistics
  if [type] == "amavis" {

    # Content type: spam, clean, virus
    statsd {
      increment => [ "amavis.content.%{content_type}" ]
      namespace => 'servers'
      host => '1.2.3.4'
    }

    # Actions: DEFERED, REJECT, PASS
    statsd {
      increment => [ "amavis.action.%{action}" ]
      namespace => 'servers'
      host => '1.2.3.4'
    }

    # Elapsed time of events
    statsd {
      timing => [ "amavis.timeelapsed.amavis", "%{[elapsed][Amavis]}",
                  "amavis.timeelapsed.receiving", "%{[elapsed][Receiving]}",
                  "amavis.timeelapsed.sending", "%{[elapsed][Sending]}",
                  "amavis.timeelapsed.total", "%{[elapsed][Total]}" ]
      namespace => 'servers'
      host => '1.2.3.4'
    }

    # Not every event contains VirusCheck, SpamCheck and Decoding
    if [elapsed][Decoding] {
      statsd {
        timing => [ "amavis.timeelapsed.decoding", "%{[elapsed][Decoding]}" ]
        namespace => 'servers'
        host => '1.2.3.4'
      }
    }

    if [elapsed][VirusCheck] {
      statsd {
        timing => [ "amavis.timeelapsed.viruscheck", "%{[elapsed][VirusCheck]}" ]
        namespace => 'servers'
        host => '1.2.3.4'
      }
    }

    if [elapsed][SpamCheck] {
      statsd {
        timing => [ "amavis.timeelapsed.spamcheck", "%{[elapsed][SpamCheck]}" ]
        namespace => 'servers'
        host => '1.2.3.4'
      }
    }

  }


    ### POP3/IMAP logins count
    if ( [type] == "dovecot" ) and ( [status_message] == "Login" ) {
        statsd {
            increment => [ "dovecot.%{proto}-login" ]
            namespace => 'servers'
            host => '1.2.3.4'
            sender => '%{host}_domain_com'
        }
    }


    if [type] == "dovecot" and "statsd" in [tags] {

        ### Per protocol events
        if ( [proto] == "imap" ) or ( [proto] == "pop3" ) {
            statsd {
                increment => [ "dovecot.%{proto}.events" ]
                namespace => 'servers'
                host => '1.2.3.4'
                sender => '%{host}_domain_si'
            }
        }

        # Logins only
        if [conn_status] == 'Login' {
              # Count all successfull logins, method(plain,login) types and crypto(none,ssl,tls) types
              statsd {
                increment => [  "dovecot.%{proto}.login.succeeded",
                                "dovecot.%{proto}.login.method.%{method}",
                                "dovecot.%{proto}.login.crypto.%{crypto}" ]
                namespace => 'servers'
                host => '1.2.3.4'
                sender => '%{host}_domain_si'
            }
        } else if ( [conn_status] == "Disconnected" ) or ( [conn_status] == "Connection closed" ) {
            # Count all disconnect events (total and per status)
            statsd {
                increment => [  "dovecot.%{proto}.disconnected_all",
                                "dovecot.%{proto}.disconnect_per_status.%{auth_status}" ]
                namespace => 'servers'
                host => '1.2.3.4'
                sender => '%{host}_domain_com'
            }

            # If successfully disconnected, count traffic
            if ( [proto] == "imap" ) and [bytes_in] {
                statsd {
                    count => [ "dovecot.%{proto}.bytes.in", "%{bytes_in}",
                               "dovecot.%{proto}.bytes.out", "%{bytes_out}" ]
                    namespace => 'servers'
                    host => '1.2.3.4'
                    sender => '%{host}_domain_com'
                }
            }
        } else if [conn_status] == "Aborted login" {
        # Count all logins that were aborted
            statsd {
                increment => [  "dovecot.%{proto}.aborted_all",
                                "dovecot.%{proto}.abort_per_status.%{auth_status}" ]
                namespace => 'servers'
                host => '1.2.3.4'
                sender => '%{host}_domain_com'
            }
        }

        if [proto] == "lmtp" {
            if "lmtp_saved" in [tags] {
                # Count all "saved to/failed" events and per location (inbox, spam, other)
                statsd {
                    increment => [  "dovecot.%{proto}.saved_ok",
                                    "dovecot.%{proto}.save.%{saved_location}" ]
                    namespace => 'servers'
                    host => '1.2.3.4'
                    sender => '%{host}_domain_com'
                }
            } else if "lmtp_failed" in [tags] {
                statsd {
                    increment => [  "dovecot.%{proto}.saved_failed" ]
                    namespace => 'servers'
                    host => '1.2.3.4'
                    sender => '%{host}_domain_com'
                }
            }
        }
    }

  if ('parsed' in [tags]) or ('_grok_postfix_success' in [tags]) or ('save_to_elasticsearch' in [tags]) {
    elasticsearch {
      hosts               => ["es1:9200", "es2:9200"]
      template_overwrite  => 'true'
      #template            => '/etc/logstash/logstash.template.json'
      template_name       => 'logstash'
      workers             => 1
    }
  }


  if  ( [type] != "amavis" ) and ( [type] != "apache" ) and ( [type] != "dovecot" ) and ( [type] != "roundcube" ) and ( [type] != "postfix" ) and ( [type] != "simplesaml" ) and ( [type] != "cisco" ) and ( [type] != "httpd" ) {
    stdout { codec => rubydebug }
  }

  # Save to log file if tag detected
  if "save_to_logfile" in [tags] {
    file {
      codec => json
      path => "/var/log/logstash/output-debug-%{host}-%{type}.log"
    }
  }

}

@ph
Copy link
Contributor

ph commented Feb 5, 2016

@urso in your scenario to make it crash you used the following configuration?
generatorbeat -> input beat -> elasticsearch?

You said filebeat is in a weird state, by reading your comment you said that restarting LS unblock filebeat. But if you restart filebeat in this scenario is logstash still in weird state? We could have a bug on both end of the pipe.

@urso
Copy link

urso commented Feb 5, 2016

@ph no, I used filebeat, as it will block the spooler if output blocks, whereas generatorbeat is not using the guaranteed mode. I really have filebeat -> input beat plugin -> elasticsearch.

Restarting any of filebeat or LS did work for me. Sometimes it took a while for LS to recover though, like 2+ minutes until new connections have been accepted.

@matejzero
Copy link
Author

Next time system hangs, I will restart logstash and wait for a longer period. I must admit that when I restarted logstash, I only waited half a minute and then restarted filebeat. If I would wait longer, I might get the same results as @urso.

@matejzero
Copy link
Author

I restarted logstash on filebeat lock and checked netstat. There was no connection to the logstash nodes from filebeat, even after 15 minutes.

After restarting filebeat, connection was back up right away.

@urso
Copy link

urso commented Feb 9, 2016

@matejzero Thanks for all the testing. We found a bug in error recovery making the logstash output hang in an infinite loop if too many errors occured + some bad timing. I hope it's the source of the problems described here.

We're preparing a 1.1.1 release containing a fix for this particular error. Maybe you want to give it a try:

https://download.elastic.co/beats/filebeat/filebeat-1.1.1-SNAPSHOT-darwin.tgz
https://download.elastic.co/beats/filebeat/filebeat_1.1.1-SNAPSHOT_i386.deb
https://download.elastic.co/beats/filebeat/filebeat-1.1.1-SNAPSHOT-x86_64.rpm
https://download.elastic.co/beats/filebeat/filebeat-1.1.1-SNAPSHOT-windows.zip
https://download.elastic.co/beats/filebeat/filebeat_1.1.1-SNAPSHOT_amd64.deb
https://download.elastic.co/beats/filebeat/filebeat-1.1.1-SNAPSHOT-x86_64.tar.gz
https://download.elastic.co/beats/filebeat/filebeat-1.1.1-SNAPSHOT-i686.rpm
https://download.elastic.co/beats/filebeat/filebeat-1.1.1-SNAPSHOT-i686.tar.gz

The PR for master branch contains quite some more refactoring/changes and will be merged into master hopefully soonish.

@matejzero
Copy link
Author

I installed the snapshot and we will see. Will report what the results are.

@ph
Copy link
Contributor

ph commented Feb 10, 2016

Restarting any of filebeat or LS did work for me. Sometimes it took a while for LS to recover though, like 2+ minutes until new connections have been accepted.

I will make the back off time configurable.

@dragosrosculete
Copy link

Hello,

I am experiencing the same error. It seems the logstash filebeat plugin ( at the receiver end ) stops working when a node form ES gets disconnect and the flow doesn't resume, it stay stuck . It is really critical ..

@matejzero
Copy link
Author

It looks like you have a different problem than me. If you restart logstash, does the receiving resumes or do you have to restart filebeat?
If the latter, then try the filebeat snapshot package that @urso linked. So far, it is working ok on my end (but I still need to give it some time).

@urso
Copy link

urso commented Feb 10, 2016

Actually the bug found can be triggered by logstash plugin it's internal timeout, given socket close on TCP layer not being communicated correctly to filebeat (e.g. some firewall or NAT inbetween). If elasticsearch indexing takes too long (e.g. one 'long' pause due to garbage collecting) , the pipe in logstash will experience the timeout error disconnecting filebeat. Despite breaking filebeat reconnect (hopefully fixed) this will have a negative effect on throughput too. Increasing congestion_threshold in logstash from default of 5s to e.g. 30s might relieve situation a little. In general I'd set congestion_threshold > elasticsearch timeout.

Logstash works by generating back-pressure internally within logstash. That is, if output is down or becomes unresponsive pipe will block and logstash will disconnect filebeat. There's no option in filebeat stopping logstash from doing so.

@matejzero
Copy link
Author

Is this fix merged to 1.1.1 release?

@urso
Copy link

urso commented Feb 11, 2016

Yes

@dragosrosculete
Copy link

Hello,

I was still experiencing the problem even with Filebeat 1.1.1 on my last test .
I am on Linux , 1.7.2 ES, 1.5.2 Logstash as collector and have filebeat on each servers( sender ) and as soon as a node in ES get d/c the flow stops and on logstash I see this error: :message=>"Beats input: the pipeline is blocked, temporary refusing new connection.", level=>:warn} .

With the new update to Filebeat after 10-20 seconds the error is not thrown anymore but the logs still don't start to come in .

I will do a more detailed test and give more information .

@sumanthakannantha
Copy link

I am using 1.2.3 version of filebeat. We see frequent message in log.

Jul 9 04:08:36 c4t08458 /usr/bin/filebeat[24986]: balance.go:248: Error publishing events (retrying): EOF
Jul 9 04:08:55 c4t08458 /usr/bin/filebeat[24986]: publish.go:104: Events sent: 2048
Jul 9 04:08:55 c4t08458 /usr/bin/filebeat[24986]: registrar.go:162: Registry file updated. 2071 states written.

@ruflin
Copy link
Member

ruflin commented Jul 11, 2016

@sumanthakannantha For questions please open a topic here: https://discuss.elastic.co/c/beats/filebeat

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

7 participants