diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 4f8ad4deddbe..7a0f8d91e93f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -172,7 +172,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix long filepaths in diagnostics exceeding max path limits on Windows. {pull}40909[40909] - Add backup and delete for AWS S3 polling mode feature back. {pull}41071[41071] - Fix a bug in Salesforce input to only handle responses with 200 status code {pull}41015[41015] - +- Fixed failed job handling and removed false-positive error logs in the GCS input. {pull}41142[41142] *Heartbeat* diff --git a/x-pack/filebeat/input/gcs/scheduler.go b/x-pack/filebeat/input/gcs/scheduler.go index 45e7585c14ef..ef1bebd083d9 100644 --- a/x-pack/filebeat/input/gcs/scheduler.go +++ b/x-pack/filebeat/input/gcs/scheduler.go @@ -6,6 +6,7 @@ package gcs import ( "context" + "errors" "fmt" "slices" "sort" @@ -212,7 +213,16 @@ func (s *scheduler) addFailedJobs(ctx context.Context, jobs []*job) []*job { if !jobMap[name] { obj, err := s.bucket.Object(name).Attrs(ctx) if err != nil { - s.log.Errorf("adding failed job %s to job list caused an error: %v", name, err) + if errors.Is(err, storage.ErrObjectNotExist) { + // if the object is not found in the bucket, then remove it from the failed job list + s.state.deleteFailedJob(name) + s.log.Debugf("scheduler: failed job %s not found in bucket %s", name, s.src.BucketName) + } else { + // if there is an error while validating the object, + // then update the failed job retry count and work towards natural removal + s.state.updateFailedJobs(name) + s.log.Errorf("scheduler: adding failed job %s to job list caused an error: %v", name, err) + } continue } diff --git a/x-pack/filebeat/input/gcs/state.go b/x-pack/filebeat/input/gcs/state.go index 59f79fce471c..ea04edcae908 100644 --- a/x-pack/filebeat/input/gcs/state.go +++ b/x-pack/filebeat/input/gcs/state.go @@ -79,6 +79,14 @@ func (s *state) updateFailedJobs(jobName string) { s.mu.Unlock() } +// deleteFailedJob, deletes a failed job from the failedJobs map +// this is used when a job no longer exists in the bucket or gets expired +func (s *state) deleteFailedJob(jobName string) { + s.mu.Lock() + delete(s.cp.FailedJobs, jobName) + s.mu.Unlock() +} + // setCheckpoint, sets checkpoint from source to current state instance // If for some reason the current state is empty, assigns new states as // a fail safe mechanism