diff --git a/filebeat/prospector/prospector.go b/filebeat/prospector/prospector.go index eaa08c50aa1..398dcde286e 100644 --- a/filebeat/prospector/prospector.go +++ b/filebeat/prospector/prospector.go @@ -299,16 +299,24 @@ func (p *Prospector) startHarvester(state file.State, offset int64) error { return err } - reader, err := h.Setup() + // State is directly updated and not through channel to make state update synchronous + err = p.updateState(input.NewEvent(state)) if err != nil { - return fmt.Errorf("Error setting up harvester: %s", err) + return err } - // State is directly updated and not through channel to make state update immediate - // State is only updated after setup is completed successfully - err = p.updateState(input.NewEvent(state)) + reader, err := h.Setup() if err != nil { - return err + // Set state to finished True again in case of setup failure to make sure + // file can be picked up again by a future harvester + state.Finished = true + + updateErr := p.updateState(input.NewEvent(state)) + // This should only happen in the case that filebeat is stopped + if updateErr != nil { + logp.Err("Error updating state: %v", updateErr) + } + return fmt.Errorf("Error setting up harvester: %s", err) } p.registry.start(h, reader)