Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix long filepaths in diagnostics exceeding max path limits on Windows. {pull}40909[40909]
- Add backup and delete for AWS S3 polling mode feature back. {pull}41071[41071]
- Fix a bug in Salesforce input to only handle responses with 200 status code {pull}41015[41015]

- Fixed failed job handling and removed false-positive error logs in the GCS input. {pull}41142[41142]

*Heartbeat*

Expand Down
12 changes: 11 additions & 1 deletion x-pack/filebeat/input/gcs/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package gcs

import (
"context"
"errors"
"fmt"
"slices"
"sort"
Expand Down Expand Up @@ -212,7 +213,16 @@ func (s *scheduler) addFailedJobs(ctx context.Context, jobs []*job) []*job {
if !jobMap[name] {
obj, err := s.bucket.Object(name).Attrs(ctx)
if err != nil {
s.log.Errorf("adding failed job %s to job list caused an error: %v", name, err)
if errors.Is(err, storage.ErrObjectNotExist) {
// if the object is not found in the bucket, then remove it from the failed job list
s.state.deleteFailedJob(name)
s.log.Debugf("scheduler: failed job %s not found in bucket %s", name, s.src.BucketName)
} else {
// if there is an error while validating the object,
// then update the failed job retry count and work towards natural removal
s.state.updateFailedJobs(name)
s.log.Errorf("scheduler: adding failed job %s to job list caused an error: %v", name, err)
}
continue
}

Expand Down
8 changes: 8 additions & 0 deletions x-pack/filebeat/input/gcs/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ func (s *state) updateFailedJobs(jobName string) {
s.mu.Unlock()
}

// deleteFailedJob, deletes a failed job from the failedJobs map
// this is used when a job no longer exists in the bucket or gets expired
func (s *state) deleteFailedJob(jobName string) {
s.mu.Lock()
delete(s.cp.FailedJobs, jobName)
s.mu.Unlock()
}

// setCheckpoint, sets checkpoint from source to current state instance
// If for some reason the current state is empty, assigns new states as
// a fail safe mechanism
Expand Down