Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply close_timeout also when output is blocked #3511

Merged
merged 1 commit into from
Feb 7, 2017

Conversation

ruflin
Copy link
Member

@ruflin ruflin commented Feb 1, 2017

Currently close_timeout does not apply in case the output is blocked. This PR changes the behavior of close_timeout to also close a file handler when the output is blocked.

It is important to note, that this closes the file handler but NOT the harvester. This is important as the closing of the harvester requires a state update to set state.Finished=true. If this would not happen and the harvester is closed, processing would not continue when the output becomes available again.

Previously the internal state of a harvester was updated when the event was created. This could lead to the issue that in case an event was not sent but the state update went through, that an event would be missing. This is now prevent by overwriting the internal state only when the event was successfully sent.

The done channels from prospector and harvester are renamed to be more obvious which one belongs to what: h.done -> h.prospectorDone, h.harvestDone -> h.done. As the harvester channel is close with the stop method in all cases h.done is sufficient in most places.

This PR does not solve the problem related to reloading and stopping a harvester mentioned in elastic#3511 (comment) related to reloading. This will be done in a follow up PR.

@ruflin ruflin added discuss Issue needs further discussion. Filebeat Filebeat in progress Pull request is currently in progress. labels Feb 1, 2017
@ruflin
Copy link
Member Author

ruflin commented Feb 3, 2017

In a call with Steffen we came to the following conclusions:

  • We will use close_timeout to also be used for closing the channel. We don't want to introduce an additional config options
  • close_inactive could also be used but would have the consequence that we have a default config which can potentially lead to data loss.
  • In general we hope to have in the long term a solution which is based on the output / pipeline part to not have to handle it in the harvester
  • It is important that the harvesters only close the file but stay open, as otherwise we would have unfinished states.
  • In the code already now is kind of a bug that h.done could be close and then checked again in the last state update. This had no affect so far because we were always shutting down filebeat. But now with reloading this could mean that some files are not continued reading. This must be fixed.
  • If filebeat is stopped when the output is blocked and the harvester is in the closing phase, it can lead to duplicated data on restart.

@ruflin ruflin force-pushed the close_timeout-enhancement branch 2 times, most recently from b703d4a to d2b0036 Compare February 6, 2017 11:27
@ruflin ruflin changed the title Prototype: Introduce close_send_timeout Apply close_timeout also when output is blocked Feb 6, 2017
@ruflin ruflin requested a review from urso February 6, 2017 11:28
@ruflin ruflin added review and removed discuss Issue needs further discussion. in progress Pull request is currently in progress. labels Feb 6, 2017
@ruflin
Copy link
Member Author

ruflin commented Feb 6, 2017

UPDATE: Problem should be fixed.

This cause a race condition on windows. Not sure yet where it is exactly happening:

==================
WARNING: DATA RACE
Read at 0x00c0421102b8 by goroutine 40:
  runtime.convT2E()
      c:/go1.7.4/src/runtime/iface.go:155 +0x0
  github.com/elastic/beats/filebeat/harvester.(*Harvester).Harvest.func1()
      github.com/elastic/beats/filebeat/harvester/_obj/log.go:83 +0x1a9

Previous write at 0x00c0421102b8 by goroutine 39:
  github.com/elastic/beats/filebeat/harvester.(*Harvester).Harvest()
      github.com/elastic/beats/filebeat/harvester/_obj/log.go:177 +0x6b0
  github.com/elastic/beats/filebeat/prospector.(*Prospector).startHarvester.func1()
      github.com/elastic/beats/filebeat/prospector/_obj/prospector.go:300 +0xad

Goroutine 40 (running) created at:
  github.com/elastic/beats/filebeat/harvester.(*Harvester).Harvest()
      github.com/elastic/beats/filebeat/harvester/_obj/log.go:95 +0x1eb
  github.com/elastic/beats/filebeat/prospector.(*Prospector).startHarvester.func1()
      github.com/elastic/beats/filebeat/prospector/_obj/prospector.go:300 +0xad

Goroutine 39 (running) created at:
  github.com/elastic/beats/filebeat/prospector.(*Prospector).startHarvester()
      github.com/elastic/beats/filebeat/prospector/_obj/prospector.go:301 +0x485
  github.com/elastic/beats/filebeat/prospector.(*ProspectorLog).scan()
      github.com/elastic/beats/filebeat/prospector/_obj/prospector_log.go:281 +0x83d
  github.com/elastic/beats/filebeat/prospector.(*ProspectorLog).Run()
      github.com/elastic/beats/filebeat/prospector/_obj/prospector_log.go:93 +0x11e
  github.com/elastic/beats/filebeat/prospector.(*Prospector).Run()
      github.com/elastic/beats/filebeat/prospector/_obj/prospector.go:177 +0xfe
  github.com/elastic/beats/filebeat/prospector.(*Prospector).Start.func1()
      github.com/elastic/beats/filebeat/prospector/_obj/prospector.go:145 +0x8a
==================

@ruflin ruflin added the v5.3.0 label Feb 6, 2017
@@ -39,6 +40,8 @@ type Harvester struct {
fileReader *LogFile
encodingFactory encoding.EncodingFactory
encoding encoding.Encoding
once sync.Once
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be clear once is used to protected prospectorDone from multiple go-routines potentially doing close(prospectorDone). Uhm.. is it used for prospectorDone?

Just seeing it protects the done channel. sync tools and items being protected should always stand together.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, once protects done from only closing done once. Moved it one line down.

if h.config.CloseTimeout > 0 {
closeTimeout = time.After(h.config.CloseTimeout)
}

select {
// Applies when timeout is reached
case <-closeTimeout:
logp.Info("Closing harvester because close_timeout was reached: %s", h.state.Source)
logp.Info("Closing harvester because close_timeout was reached.")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see how access to h.state.Source can race. Consider printing the last state in defer statement when Harvest returns (but after workers have been finished).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already a looking message in the closing part where the state.Source is inside. So I don't think it is necessary here.

Copy link
Member

@andrewkroh andrewkroh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like prospectorDone isn't closed anywhere.

@ruflin
Copy link
Member Author

ruflin commented Feb 6, 2017

@andrewkroh prospectorDone is the p.done channel in the prospector and is closed on prospector.go:200. I'm actually hoping to get rid of the prospectorDone channel in the long run.

@ruflin
Copy link
Member Author

ruflin commented Feb 6, 2017

@andrewkroh @urso new version pushed.

Currently `close_timeout` does not apply in case the output is blocked. This PR changes the behavior of `close_timeout` to also close a file handler when the output is blocked.

It is important to note, that this closes the file handler but NOT the harvester. This is important as the closing of the harvester requires a state update to set `state.Finished=true`. If this would not happen and the harvester is closed, processing would not continue when the output becomes available again.

Previously the internal state of a harvester was updated when the event was created. This could lead to the issue that in case an event was not sent but the state update went through, that an event would be missing. This is now prevent by overwriting the internal state only when the event was successfully sent.

The done channels from prospector and harvester are renamed to be more obvious which one belongs to what: h.done -> h.prospectorDone, h.harvestDone -> h.done. As the harvester channel is close with the `stop` method in all cases `h.done` is sufficient in most places.

This PR does not solve the problem related to reloading and stopping a harvester mentioned in elastic#3511 (comment) related to reloading. This will be done in a follow up PR.
@urso urso merged commit 72ff178 into elastic:master Feb 7, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants