diff --git a/x-pack/filebeat/input/azureblobstorage/scheduler.go b/x-pack/filebeat/input/azureblobstorage/scheduler.go index 781514d52712..335d4e80320f 100644 --- a/x-pack/filebeat/input/azureblobstorage/scheduler.go +++ b/x-pack/filebeat/input/azureblobstorage/scheduler.go @@ -7,6 +7,8 @@ package azureblobstorage import ( "context" "fmt" + "slices" + "sort" "sync" azruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" @@ -190,41 +192,19 @@ func (s *scheduler) fetchBlobPager(batchSize int32) *azruntime.Pager[azblob.List // moveToLastSeenJob, moves to the latest job position past the last seen job // Jobs are stored in lexicographical order always, hence the latest position can be found either on the basis of job name or timestamp func (s *scheduler) moveToLastSeenJob(jobs []*job) []*job { - var latestJobs []*job - jobsToReturn := make([]*job, 0) - counter := 0 - flag := false - ignore := false - - for _, job := range jobs { - switch { - case job.timestamp().After(s.state.checkpoint().LatestEntryTime): - latestJobs = append(latestJobs, job) - case job.name() == s.state.checkpoint().BlobName: - flag = true - case job.name() > s.state.checkpoint().BlobName: - flag = true - counter-- - case job.name() <= s.state.checkpoint().BlobName && (!ignore): - ignore = true - } - counter++ - } - - if flag && (counter < len(jobs)-1) { - jobsToReturn = jobs[counter+1:] - } else if !flag && !ignore { - jobsToReturn = jobs - } - - // in a senario where there are some jobs which have a greater timestamp - // but lesser alphanumeric order and some jobs have greater alphanumeric order - // than the current checkpoint blob name, then we append the latest jobs - if len(jobsToReturn) != len(jobs) && len(latestJobs) > 0 { - jobsToReturn = append(latestJobs, jobsToReturn...) - } + cp := s.state.checkpoint() + jobs = slices.DeleteFunc(jobs, func(j *job) bool { + return !(j.timestamp().After(cp.LatestEntryTime) || j.name() > cp.BlobName) + }) - return jobsToReturn + // In a scenario where there are some jobs which have a greater timestamp + // but lesser lexicographic order and some jobs have greater lexicographic order + // than the current checkpoint blob name, we then sort around the pivot checkpoint + // timestamp. + sort.SliceStable(jobs, func(i, _ int) bool { + return jobs[i].timestamp().After(cp.LatestEntryTime) + }) + return jobs } func (s *scheduler) isFileSelected(name string) bool {