Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(block-scheduler): introduce job lease and requeue expired jobs #15560

Merged
merged 2 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,17 @@ block_scheduler:
# CLI flag: -block-scheduler.max-jobs-planned-per-interval
[max_jobs_planned_per_interval: <int> | default = 100]

job_queue:
# Interval to check for expired job leases
# CLI flag: -jobqueue.lease-expiry-check-interval
[lease_expiry_check_interval: <duration> | default = 1m]

# Duration after which a job lease is considered expired if the scheduler
# receives no updates from builders about the job. Expired jobs are
# re-enqueued
# CLI flag: -jobqueue.lease-duration
[lease_duration: <duration> | default = 10m]

pattern_ingester:
# Whether the pattern ingester is enabled.
# CLI flag: -pattern-ingester.enabled
Expand Down
91 changes: 83 additions & 8 deletions pkg/blockbuilder/scheduler/queue.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package scheduler

import (
"context"
"errors"
"flag"
"fmt"
"sync"
"time"
Expand All @@ -21,6 +24,7 @@ const (
// JobWithMetadata wraps a job with additional metadata for tracking its lifecycle
type JobWithMetadata struct {
*types.Job

Priority int
Status types.JobStatus
StartTime time.Time
Expand Down Expand Up @@ -60,21 +64,36 @@ func newJobQueueMetrics(r prometheus.Registerer) *jobQueueMetrics {
}
}

type JobQueueConfig struct {
LeaseExpiryCheckInterval time.Duration `yaml:"lease_expiry_check_interval"`
LeaseDuration time.Duration `yaml:"lease_duration"`
}

func (cfg *JobQueueConfig) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.LeaseExpiryCheckInterval, "jobqueue.lease-expiry-check-interval", 1*time.Minute, "Interval to check for expired job leases")
f.DurationVar(&cfg.LeaseDuration, "jobqueue.lease-duration", 10*time.Minute, "Duration after which a job lease is considered expired if the scheduler receives no updates from builders about the job. Expired jobs are re-enqueued")
}

// JobQueue manages the queue of pending jobs and tracks their state.
type JobQueue struct {
logger log.Logger
cfg JobQueueConfig

mu sync.RWMutex
pending *PriorityQueue[string, *JobWithMetadata] // Jobs waiting to be processed, ordered by priority
inProgress map[string]*JobWithMetadata // Jobs currently being processed
completed *CircularBuffer[*JobWithMetadata] // Last N completed jobs
statusMap map[string]types.JobStatus // Maps job ID to its current status
metrics *jobQueueMetrics
mu sync.RWMutex

logger log.Logger
metrics *jobQueueMetrics
}

// NewJobQueue creates a new job queue instance
func NewJobQueue(logger log.Logger, reg prometheus.Registerer) *JobQueue {
func NewJobQueue(cfg JobQueueConfig, logger log.Logger, reg prometheus.Registerer) *JobQueue {
return &JobQueue{
cfg: cfg,
logger: logger,

pending: NewPriorityQueue(
func(a, b *JobWithMetadata) bool {
return a.Priority > b.Priority // Higher priority first
Expand All @@ -88,6 +107,49 @@ func NewJobQueue(logger log.Logger, reg prometheus.Registerer) *JobQueue {
}
}

func (q *JobQueue) RunLeaseExpiryChecker(ctx context.Context) {
ticker := time.NewTicker(q.cfg.LeaseExpiryCheckInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
level.Debug(q.logger).Log("msg", "checking for expired job leases")
if err := q.requeueExpiredJobs(); err != nil {
level.Error(q.logger).Log("msg", "failed to requeue expired jobs", "err", err)
}
case <-ctx.Done():
return
}
}
}

func (q *JobQueue) requeueExpiredJobs() error {
q.mu.Lock()
defer q.mu.Unlock()

var multiErr error
for id, job := range q.inProgress {
if time.Since(job.UpdateTime) > q.cfg.LeaseDuration {
level.Warn(q.logger).Log("msg", "job lease expired. requeuing", "job", id, "update_time", job.UpdateTime, "now", time.Now())

// complete the job with expired status and re-enqueue
delete(q.inProgress, id)
q.metrics.inProgress.Dec()

job.Status = types.JobStatusExpired
q.addToCompletedBuffer(job)

if err := q.enqueueLockLess(job.Job, job.Priority); err != nil {
level.Error(q.logger).Log("msg", "failed to requeue expired job", "job", id, "err", err)
multiErr = errors.Join(multiErr, err)
}
}
}

return multiErr
}

// Exists checks if a job exists in any state and returns its status
func (q *JobQueue) Exists(job *types.Job) (types.JobStatus, bool) {
q.mu.RLock()
Expand Down Expand Up @@ -126,8 +188,12 @@ func (q *JobQueue) Enqueue(job *types.Job, priority int) error {
q.mu.Lock()
defer q.mu.Unlock()

return q.enqueueLockLess(job, priority)
}

func (q *JobQueue) enqueueLockLess(job *types.Job, priority int) error {
// Check if job already exists
if status, exists := q.statusMap[job.ID()]; exists {
if status, exists := q.statusMap[job.ID()]; exists && status != types.JobStatusExpired {
return fmt.Errorf("job %s already exists with status %v", job.ID(), status)
}

Expand Down Expand Up @@ -203,20 +269,29 @@ func (q *JobQueue) MarkComplete(id string, status types.JobStatus) {
level.Error(q.logger).Log("msg", "failed to remove job from pending queue", "job", id)
}
q.metrics.pending.Dec()
case types.JobStatusComplete:
level.Info(q.logger).Log("msg", "job is already complete, ignoring", "job", id)
return
default:
level.Error(q.logger).Log("msg", "unknown job status, cannot mark as complete", "job", id, "status", status)
return
}

jobMeta.Status = status
jobMeta.UpdateTime = time.Now()

// add it to the completed buffer, removing any evicted job from the statusMap
q.addToCompletedBuffer(jobMeta)
}

// add it to the completed buffer, removing any evicted job from the statusMap
func (q *JobQueue) addToCompletedBuffer(jobMeta *JobWithMetadata) {
removal, evicted := q.completed.Push(jobMeta)
if evicted {
delete(q.statusMap, removal.ID())
}
q.statusMap[id] = status
q.metrics.completed.WithLabelValues(status.String()).Inc()

q.statusMap[jobMeta.ID()] = jobMeta.Status
q.metrics.completed.WithLabelValues(jobMeta.Status.String()).Inc()
}

// SyncJob registers a job as in-progress or updates its UpdateTime if already in progress
Expand Down
121 changes: 114 additions & 7 deletions pkg/blockbuilder/scheduler/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
"github.com/grafana/loki/v3/pkg/blockbuilder/types"
)

var testQueueCfg = JobQueueConfig{}

func TestJobQueue_SyncJob(t *testing.T) {
t.Run("non-existent to in-progress", func(t *testing.T) {
q := NewJobQueue(log.NewNopLogger(), nil)
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})
jobID := job.ID()

Expand All @@ -29,7 +31,7 @@ func TestJobQueue_SyncJob(t *testing.T) {
})

t.Run("pending to in-progress", func(t *testing.T) {
q := NewJobQueue(log.NewNopLogger(), nil)
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})

// Start with pending job
Expand All @@ -52,7 +54,7 @@ func TestJobQueue_SyncJob(t *testing.T) {
})

t.Run("already in-progress", func(t *testing.T) {
q := NewJobQueue(log.NewNopLogger(), nil)
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})

// First sync to put in in-progress
Expand All @@ -73,7 +75,7 @@ func TestJobQueue_SyncJob(t *testing.T) {

func TestJobQueue_MarkComplete(t *testing.T) {
t.Run("in-progress to complete", func(t *testing.T) {
q := NewJobQueue(log.NewNopLogger(), nil)
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})

// Start with in-progress job
Expand Down Expand Up @@ -103,7 +105,7 @@ func TestJobQueue_MarkComplete(t *testing.T) {
})

t.Run("pending to complete", func(t *testing.T) {
q := NewJobQueue(log.NewNopLogger(), nil)
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})

// Start with pending job
Expand All @@ -130,7 +132,7 @@ func TestJobQueue_MarkComplete(t *testing.T) {
})

t.Run("non-existent job", func(t *testing.T) {
q := NewJobQueue(log.NewNopLogger(), nil)
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
logger := &testLogger{t: t}
q.logger = logger

Expand All @@ -139,7 +141,7 @@ func TestJobQueue_MarkComplete(t *testing.T) {
})

t.Run("already completed job", func(t *testing.T) {
q := NewJobQueue(log.NewNopLogger(), nil)
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
logger := &testLogger{t: t}
q.logger = logger

Expand All @@ -153,6 +155,111 @@ func TestJobQueue_MarkComplete(t *testing.T) {
})
}

func TestJobQueue_Enqueue(t *testing.T) {
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)

job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})

beforeComplete := time.Now()
err := q.Enqueue(job, 1)
afterComplete := time.Now()
require.NoError(t, err)

status, ok := q.Exists(job)
require.True(t, ok, "job should exist")
require.Equal(t, types.JobStatusPending, status)

// Verify job in pending queue
foundJob, ok := q.pending.Lookup(job.ID())
require.True(t, ok, "job should be in pending queue")
require.Equal(t, job, foundJob.Job)
require.Equal(t, 1, foundJob.Priority)
require.True(t, foundJob.StartTime.IsZero())
require.True(t, foundJob.UpdateTime.After(beforeComplete) || foundJob.UpdateTime.Equal(beforeComplete))
require.True(t, foundJob.UpdateTime.Before(afterComplete) || foundJob.UpdateTime.Equal(afterComplete))

// allow enqueueing of job with same ID if expired
job2 := types.NewJob(2, types.Offsets{Min: 100, Max: 200})
q.statusMap[job2.ID()] = types.JobStatusExpired

err = q.Enqueue(job2, 2)
require.NoError(t, err)

status, ok = q.Exists(job2)
require.True(t, ok, "job should exist")
require.Equal(t, types.JobStatusPending, status)

// Verify job2 in pending queue
foundJob, ok = q.pending.Lookup(job2.ID())
require.True(t, ok, "job2 should be in pending queue")
require.Equal(t, job2, foundJob.Job)
require.Equal(t, 2, foundJob.Priority)

// do not allow enqueueing of job with same ID if not expired
job3 := types.NewJob(3, types.Offsets{Min: 120, Max: 230})
q.statusMap[job3.ID()] = types.JobStatusInProgress

err = q.Enqueue(job3, DefaultPriority)
require.Error(t, err)
}

func TestJobQueue_RequeueExpiredJobs(t *testing.T) {
q := NewJobQueue(JobQueueConfig{
LeaseDuration: 5 * time.Minute,
}, log.NewNopLogger(), nil)

job1 := &JobWithMetadata{
Job: types.NewJob(1, types.Offsets{Min: 100, Max: 200}),
Priority: 1,
Status: types.JobStatusInProgress,
StartTime: time.Now().Add(-time.Hour),
UpdateTime: time.Now().Add(-time.Minute),
}
// expired job
job2 := &JobWithMetadata{
Job: types.NewJob(2, types.Offsets{Min: 300, Max: 400}),
Priority: 2,
Status: types.JobStatusInProgress,
StartTime: time.Now().Add(-time.Hour),
UpdateTime: time.Now().Add(-6 * time.Minute),
}

q.inProgress[job1.ID()] = job1
q.inProgress[job2.ID()] = job2
q.statusMap[job1.ID()] = types.JobStatusInProgress
q.statusMap[job2.ID()] = types.JobStatusInProgress

beforeRequeue := time.Now()
err := q.requeueExpiredJobs()
require.NoError(t, err)

status, ok := q.statusMap[job1.ID()]
require.True(t, ok)
require.Equal(t, types.JobStatusInProgress, status)

got, ok := q.inProgress[job1.ID()]
require.True(t, ok)
require.Equal(t, job1, got)

status, ok = q.statusMap[job2.ID()]
require.True(t, ok)
require.Equal(t, types.JobStatusPending, status)

got, ok = q.pending.Lookup(job2.ID())
require.True(t, ok)
require.Equal(t, job2.Job, got.Job)
require.Equal(t, types.JobStatusPending, got.Status)
require.Equal(t, job2.Priority, got.Priority)
require.True(t, got.StartTime.IsZero())
require.True(t, got.UpdateTime.After(beforeRequeue) || got.UpdateTime.Equal(beforeRequeue))

require.Equal(t, 1, q.completed.Len())
got, ok = q.completed.Pop()
require.True(t, ok)
job2.Status = types.JobStatusExpired
require.Equal(t, job2, got)
}

// testLogger implements log.Logger for testing
type testLogger struct {
t *testing.T
Expand Down
16 changes: 10 additions & 6 deletions pkg/blockbuilder/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ var (
)

type Config struct {
ConsumerGroup string `yaml:"consumer_group"`
Interval time.Duration `yaml:"interval"`
LookbackPeriod time.Duration `yaml:"lookback_period"`
Strategy string `yaml:"strategy"`
TargetRecordCount int64 `yaml:"target_record_count"`
MaxJobsPlannedPerInterval int `yaml:"max_jobs_planned_per_interval"`
ConsumerGroup string `yaml:"consumer_group"`
Interval time.Duration `yaml:"interval"`
LookbackPeriod time.Duration `yaml:"lookback_period"`
Strategy string `yaml:"strategy"`
TargetRecordCount int64 `yaml:"target_record_count"`
MaxJobsPlannedPerInterval int `yaml:"max_jobs_planned_per_interval"`
JobQueueConfig JobQueueConfig `yaml:"job_queue"`
}

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
Expand Down Expand Up @@ -61,6 +62,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
100,
"Maximum number of jobs that the planner can return.",
)
cfg.JobQueueConfig.RegisterFlags(f)
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
Expand Down Expand Up @@ -128,6 +130,8 @@ func (s *BlockScheduler) running(ctx context.Context) error {
level.Error(s.logger).Log("msg", "failed to schedule jobs", "err", err)
}

go s.queue.RunLeaseExpiryChecker(ctx)

ticker := time.NewTicker(s.cfg.Interval)
for {
select {
Expand Down
2 changes: 1 addition & 1 deletion pkg/blockbuilder/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (m *mockOffsetManager) Commit(_ context.Context, _ int32, _ int64) error {
}

func newTestEnv(builderID string) (*testEnv, error) {
queue := NewJobQueue(log.NewNopLogger(), nil)
queue := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
mockOffsetMgr := &mockOffsetManager{
topic: "test-topic",
consumerGroup: "test-group",
Expand Down
Loading
Loading