-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce harvester_limit to limit number of harvesters #2417
Conversation
This limits the number of harvesters that are started per prospector. * Changelog entry added * Docs updated * Config file updated Closes elastic#2236
return fmt.Errorf("Harvester limit reached.") | ||
} | ||
|
||
atomic.AddUint64(&p.harvesterCounter, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if createHarvester fails counter is off. Either move counter right before p.wg.Add(1) or decrement counter on fail (+ explain why it must be decremented and can not be moved past this place).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is a race here if startHarverster
can be called concurrently: two threads check the condition when the counter is equal to the limit-1, then they both increment it, so you get to limit+1. Not a big issue, but wouldn't things be simpler with a mutex?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right. Unfortunately atomic
does not provide an api to directly get the incremented counter value. Using a mutex for incrementing would be simpler. Otherwise one would need to use CompareAndSwap to atomically update and check counter still being valid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tsg Just checked the code again and startHarvester cannot be called concurrently. The atomic operation is needed because of the decrementing of the counter inside the go routine.
@@ -155,6 +158,11 @@ func (p *Prospector) createHarvester(state file.State) (*harvester.Harvester, er | |||
} | |||
|
|||
func (p *Prospector) startHarvester(state file.State, offset int64) error { | |||
|
|||
if p.config.HarvesterLimit > 0 && atomic.LoadUint64(&p.harvesterCounter) >= p.config.HarvesterLimit { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should add a comment about atomic ops being safe, due to startHarvester not being executed concurrently and atomic ops only required for harvesters shutting down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment added in separate commit. Can be squashed.
LGTM. |
Hi, i tried with the command PS C:\Program Files\Filebeat> Start-Service filebeat but returning error as filebeat.ymlfilebeat.prospectors:
output.elasticsearch: Array of hosts to connect to.hosts: ["myipaddress:9200"] please help me in starting Beats. |
@shaiknaimuddin For questions please use discuss: https://discuss.elastic.co/c/beats/filebeat |
This limits the number of harvesters that are started per prospector.
Closes #2236