Skip to content

Commit

Permalink
Fix close_timeout for new started harvesters (elastic#3715)
Browse files Browse the repository at this point in the history
If close_timeout is set, a file handler inside the harvester is closed after `close_timeout` is reached even if the output is blocked. For new which were found for harvesting during the output was blocked, it was possible that the Setup of the file happened but then the initial state could not be sent. So the harvester was never started, file handler was open but `close_timeout` this not apply. Setup and state update are now turned around. This has the affect, that in case of an error during the setup phase, the state must be revert again.

See elastic#3091 (comment)
(cherry picked from commit e13a7f2)
  • Loading branch information
ruflin committed Mar 3, 2017
1 parent 65e5005 commit 50a4b19
Showing 1 changed file with 14 additions and 6 deletions.
20 changes: 14 additions & 6 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,16 +299,24 @@ func (p *Prospector) startHarvester(state file.State, offset int64) error {
return err
}

reader, err := h.Setup()
// State is directly updated and not through channel to make state update synchronous
err = p.updateState(input.NewEvent(state))
if err != nil {
return fmt.Errorf("Error setting up harvester: %s", err)
return err
}

// State is directly updated and not through channel to make state update immediate
// State is only updated after setup is completed successfully
err = p.updateState(input.NewEvent(state))
reader, err := h.Setup()
if err != nil {
return err
// Set state to finished True again in case of setup failure to make sure
// file can be picked up again by a future harvester
state.Finished = true

updateErr := p.updateState(input.NewEvent(state))
// This should only happen in the case that filebeat is stopped
if updateErr != nil {
logp.Err("Error updating state: %v", updateErr)
}
return fmt.Errorf("Error setting up harvester: %s", err)
}

p.registry.start(h, reader)
Expand Down

0 comments on commit 50a4b19

Please sign in to comment.