Skip to content

Commit

Permalink
Update CandidateGangIterator ordering (#292) (#4069)
Browse files Browse the repository at this point in the history
* add basic defrag

* ARMADA-2970 Small simulator improvements

 - Log input files consistently
 - Fix log output to only occur every 5 seconds
   - Was bugged to log every round, but only after 5 seconds
 - Remove variance on gang jobs, so the finish at the same time

* Set 0 tailmean

* Update job ordering, to order largest job first

* Update so we schedule the larger gang first

* Remove unrelated changes

* Add test + fixes

* Fix comment

* Fix comment

* Add rollout flag + improved naming

* Pass arg in simulator

* Improve comment

---------

Co-authored-by: James Murkin <[email protected]>
Co-authored-by: chrismar503 <[email protected]>
  • Loading branch information
3 people authored Nov 29, 2024
1 parent 12634a5 commit 500e08a
Show file tree
Hide file tree
Showing 10 changed files with 237 additions and 40 deletions.
1 change: 1 addition & 0 deletions config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ scheduling:
resolution: "1"
disableScheduling: false
enableAssertions: false
enablePreferLargeJobOrdering: false
protectedFractionOfFairShare: 1.0
nodeIdLabel: "kubernetes.io/hostname"
priorityClasses:
Expand Down
4 changes: 4 additions & 0 deletions internal/scheduler/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ type SchedulingConfig struct {
DisableScheduling bool
// Set to true to enable scheduler assertions. This results in some performance loss.
EnableAssertions bool
// Experimental
// Set to true to enable larger job preferential ordering in the candidate gang iterator.
// This will result in larger jobs being ordered earlier in the job scheduling order
EnablePreferLargeJobOrdering bool
// Only queues allocated more than this fraction of their fair share are considered for preemption.
ProtectedFractionOfFairShare float64 `validate:"gte=0"`
// Armada adds a node selector term to every scheduled pod using this label with the node name as value.
Expand Down
20 changes: 12 additions & 8 deletions internal/scheduler/scheduling/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type PreemptingQueueScheduler struct {
floatingResourceTypes *floatingresources.FloatingResourceTypes
protectedFractionOfFairShare float64
maxQueueLookBack uint
preferLargeJobOrdering bool
jobRepo JobRepository
nodeDb *nodedb.NodeDb
// Maps job ids to the id of the node the job is associated with.
Expand All @@ -46,6 +47,7 @@ func NewPreemptingQueueScheduler(
sctx *schedulercontext.SchedulingContext,
constraints schedulerconstraints.SchedulingConstraints,
floatingResourceTypes *floatingresources.FloatingResourceTypes,
preferLargeJobOrdering bool,
protectedFractionOfFairShare float64,
maxQueueLookBack uint,
jobRepo JobRepository,
Expand All @@ -72,6 +74,7 @@ func NewPreemptingQueueScheduler(
constraints: constraints,
floatingResourceTypes: floatingResourceTypes,
protectedFractionOfFairShare: protectedFractionOfFairShare,
preferLargeJobOrdering: preferLargeJobOrdering,
maxQueueLookBack: maxQueueLookBack,
jobRepo: jobRepo,
nodeDb: nodeDb,
Expand Down Expand Up @@ -305,7 +308,7 @@ func (sch *PreemptingQueueScheduler) evict(ctx *armadacontext.Context, evictor *
if err := sch.nodeDb.Reset(); err != nil {
return nil, nil, err
}
if err := addEvictedJobsToNodeDb(ctx, sch.schedulingContext, sch.nodeDb, inMemoryJobRepo); err != nil {
if err := sch.addEvictedJobsToNodeDb(ctx, inMemoryJobRepo); err != nil {
return nil, nil, err
}
return result, inMemoryJobRepo, nil
Expand Down Expand Up @@ -477,22 +480,22 @@ func (q MinimalQueue) GetWeight() float64 {

// addEvictedJobsToNodeDb adds evicted jobs to the NodeDb.
// Needed to enable the nodeDb accounting for these when preempting.
func addEvictedJobsToNodeDb(_ *armadacontext.Context, sctx *schedulercontext.SchedulingContext, nodeDb *nodedb.NodeDb, inMemoryJobRepo *InMemoryJobRepository) error {
func (sch *PreemptingQueueScheduler) addEvictedJobsToNodeDb(_ *armadacontext.Context, inMemoryJobRepo *InMemoryJobRepository) error {
gangItByQueue := make(map[string]*QueuedGangIterator)
for _, qctx := range sctx.QueueSchedulingContexts {
for _, qctx := range sch.schedulingContext.QueueSchedulingContexts {
gangItByQueue[qctx.Queue] = NewQueuedGangIterator(
sctx,
sch.schedulingContext,
inMemoryJobRepo.GetJobIterator(qctx.Queue),
0,
false,
)
}
qr := NewMinimalQueueRepositoryFromSchedulingContext(sctx)
candidateGangIterator, err := NewCandidateGangIterator(sctx.Pool, qr, sctx.FairnessCostProvider, gangItByQueue, false)
qr := NewMinimalQueueRepositoryFromSchedulingContext(sch.schedulingContext)
candidateGangIterator, err := NewCandidateGangIterator(sch.schedulingContext.Pool, qr, sch.schedulingContext.FairnessCostProvider, gangItByQueue, false, sch.preferLargeJobOrdering)
if err != nil {
return err
}
txn := nodeDb.Txn(true)
txn := sch.nodeDb.Txn(true)
defer txn.Abort()
i := 0
for {
Expand All @@ -502,7 +505,7 @@ func addEvictedJobsToNodeDb(_ *armadacontext.Context, sctx *schedulercontext.Sch
break
} else {
for _, jctx := range gctx.JobSchedulingContexts {
if err := nodeDb.AddEvictedJobSchedulingContextWithTxn(txn, i, jctx); err != nil {
if err := sch.nodeDb.AddEvictedJobSchedulingContextWithTxn(txn, i, jctx); err != nil {
return err
}
i++
Expand Down Expand Up @@ -547,6 +550,7 @@ func (sch *PreemptingQueueScheduler) schedule(
jobIteratorByQueue,
skipUnsuccessfulSchedulingKeyCheck,
considerPriorityCLassPriority,
sch.preferLargeJobOrdering,
sch.maxQueueLookBack,
)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2063,6 +2063,7 @@ func TestPreemptingQueueScheduler(t *testing.T) {
sctx,
constraints,
testfixtures.TestEmptyFloatingResources,
true,
tc.SchedulingConfig.ProtectedFractionOfFairShare,
tc.SchedulingConfig.MaxQueueLookback,
jobDbTxn,
Expand Down Expand Up @@ -2415,6 +2416,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) {
sctx,
constraints,
testfixtures.TestEmptyFloatingResources,
true,
tc.SchedulingConfig.ProtectedFractionOfFairShare,
tc.SchedulingConfig.MaxQueueLookback,
jobDbTxn,
Expand Down Expand Up @@ -2477,6 +2479,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) {
sctx,
constraints,
testfixtures.TestEmptyFloatingResources,
true,
tc.SchedulingConfig.ProtectedFractionOfFairShare,
tc.SchedulingConfig.MaxQueueLookback,
jobDbTxn,
Expand Down
109 changes: 80 additions & 29 deletions internal/scheduler/scheduling/queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func NewQueueScheduler(
jobIteratorByQueue map[string]JobContextIterator,
skipUnsuccessfulSchedulingKeyCheck bool,
considerPriorityClassPriority bool,
prioritiseLargerJobs bool,
maxQueueLookBack uint,
) (*QueueScheduler, error) {
for queue := range jobIteratorByQueue {
Expand All @@ -51,7 +52,7 @@ func NewQueueScheduler(
for queue, it := range jobIteratorByQueue {
gangIteratorsByQueue[queue] = NewQueuedGangIterator(sctx, it, maxQueueLookBack, true)
}
candidateGangIterator, err := NewCandidateGangIterator(sctx.Pool, sctx, sctx.FairnessCostProvider, gangIteratorsByQueue, considerPriorityClassPriority)
candidateGangIterator, err := NewCandidateGangIterator(sctx.Pool, sctx, sctx.FairnessCostProvider, gangIteratorsByQueue, considerPriorityClassPriority, prioritiseLargerJobs)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -343,19 +344,22 @@ func NewCandidateGangIterator(
fairnessCostProvider fairness.FairnessCostProvider,
iteratorsByQueue map[string]*QueuedGangIterator,
considerPriority bool,
prioritiseLargerJobs bool,
) (*CandidateGangIterator, error) {
it := &CandidateGangIterator{
pool: pool,
queueRepository: queueRepository,
fairnessCostProvider: fairnessCostProvider,
onlyYieldEvictedByQueue: make(map[string]bool),
pq: QueueCandidateGangIteratorPQ{
considerPriority: considerPriority,
items: make([]*QueueCandidateGangIteratorItem, 0, len(iteratorsByQueue)),
considerPriority: considerPriority,
prioritiseLargerJobs: prioritiseLargerJobs,
items: make([]*QueueCandidateGangIteratorItem, 0, len(iteratorsByQueue)),
},
}
for queue, queueIt := range iteratorsByQueue {
if _, err := it.updateAndPushPQItem(it.newPQItem(queue, queueIt)); err != nil {
queueContext := queueIt.schedulingContext.QueueSchedulingContexts[queue]
if _, err := it.updateAndPushPQItem(it.newPQItem(queue, queueContext.AdjustedFairShare, queueIt)); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -406,13 +410,14 @@ func (it *CandidateGangIterator) Peek() (*schedulercontext.GangSchedulingContext
return nil, 0.0, nil
}
first := it.pq.items[0]
return first.gctx, first.queueCost, nil
return first.gctx, first.proposedQueueCost, nil
}

func (it *CandidateGangIterator) newPQItem(queue string, queueIt *QueuedGangIterator) *QueueCandidateGangIteratorItem {
func (it *CandidateGangIterator) newPQItem(queue string, queueFairShare float64, queueIt *QueuedGangIterator) *QueueCandidateGangIteratorItem {
return &QueueCandidateGangIteratorItem{
queue: queue,
it: queueIt,
queue: queue,
fairShare: queueFairShare,
it: queueIt,
}
}

Expand All @@ -435,7 +440,9 @@ func (it *CandidateGangIterator) updateAndPushPQItem(item *QueueCandidateGangIte

func (it *CandidateGangIterator) updatePQItem(item *QueueCandidateGangIteratorItem) error {
item.gctx = nil
item.queueCost = 0
item.proposedQueueCost = 0
item.currentQueueCost = 0
item.itemSize = 0
gctx, err := item.it.Peek()
if err != nil {
return err
Expand All @@ -444,11 +451,15 @@ func (it *CandidateGangIterator) updatePQItem(item *QueueCandidateGangIteratorIt
return nil
}
item.gctx = gctx
cost, err := it.queueCostWithGctx(gctx)
queue, err := it.getQueue(gctx)
if err != nil {
return err
}
item.queueCost = cost
item.proposedQueueCost = it.fairnessCostProvider.WeightedCostFromAllocation(queue.GetAllocation().Add(gctx.TotalResourceRequests), queue.GetWeight())
item.currentQueueCost = it.fairnessCostProvider.WeightedCostFromAllocation(queue.GetAllocation(), queue.GetWeight())
// We multiply here, as queue weights are a fraction
// So for the same job size, highly weighted queues jobs will look larger
item.itemSize = it.fairnessCostProvider.UnweightedCostFromAllocation(gctx.TotalResourceRequests) * queue.GetWeight()

// The PQItem needs to have a priority class priority for the whole gang. This may not be uniform as different
// Gang members may have been scheduled at different priorities due to home/away preemption. We therefore take the
Expand All @@ -473,24 +484,24 @@ func (it *CandidateGangIterator) updatePQItem(item *QueueCandidateGangIteratorIt
return nil
}

// queueCostWithGctx returns the cost associated with a queue if gctx were to be scheduled.
func (it *CandidateGangIterator) queueCostWithGctx(gctx *schedulercontext.GangSchedulingContext) (float64, error) {
// returns the queue of the supplied gctx
func (it *CandidateGangIterator) getQueue(gctx *schedulercontext.GangSchedulingContext) (fairness.Queue, error) {
gangQueue := gctx.Queue
if len(gctx.JobSchedulingContexts) > 0 && !gctx.JobSchedulingContexts[0].IsHomeJob(it.pool) {
gangQueue = schedulercontext.CalculateAwayQueueName(gctx.Queue)
}
queue, ok := it.queueRepository.GetQueue(gangQueue)
if !ok {
return 0, errors.Errorf("unknown queue %s", gangQueue)
return nil, errors.Errorf("unknown queue %s", gangQueue)
}

return it.fairnessCostProvider.WeightedCostFromAllocation(queue.GetAllocation().Add(gctx.TotalResourceRequests), queue.GetWeight()), nil
return queue, nil
}

// QueueCandidateGangIteratorPQ is a priority queue used by CandidateGangIterator to determine from which queue to schedule the next job.
type QueueCandidateGangIteratorPQ struct {
considerPriority bool
items []*QueueCandidateGangIteratorItem
considerPriority bool
prioritiseLargerJobs bool
items []*QueueCandidateGangIteratorItem
}

type QueueCandidateGangIteratorItem struct {
Expand All @@ -501,9 +512,16 @@ type QueueCandidateGangIteratorItem struct {
// Most recent value produced by the iterator.
// Cached here to avoid repeating scheduling checks unnecessarily.
gctx *schedulercontext.GangSchedulingContext
// Cost associated with the queue if the topmost gang in the queue were to be scheduled.
// Used to order queues fairly.
queueCost float64
// Cost associated with the queue if the top most gang in the queue were to be scheduled.
proposedQueueCost float64
// Current cost associated with the queue
currentQueueCost float64
// The fairshare of the queue
// used to compare with proposedQueueCost to determine if scheduling the next item will put the queue over its fairshare
fairShare float64
// The size of top most gang
// Used to determine which job is larger
itemSize float64
priorityClassPriority int32
// The index of the item in the heap.
// maintained by the heap.Interface methods.
Expand All @@ -513,18 +531,51 @@ type QueueCandidateGangIteratorItem struct {
func (pq *QueueCandidateGangIteratorPQ) Len() int { return len(pq.items) }

func (pq *QueueCandidateGangIteratorPQ) Less(i, j int) bool {
// Consider priority class priority first
if pq.considerPriority && pq.items[i].priorityClassPriority != pq.items[j].priorityClassPriority {
return pq.items[i].priorityClassPriority > pq.items[j].priorityClassPriority
}
item1 := pq.items[i]
item2 := pq.items[j]

// Then queue cost
if pq.items[i].queueCost != pq.items[j].queueCost {
return pq.items[i].queueCost < pq.items[j].queueCost
// Consider priority class priority first
if pq.considerPriority && item1.priorityClassPriority != item2.priorityClassPriority {
return item1.priorityClassPriority > item2.priorityClassPriority
}

if pq.prioritiseLargerJobs {
if item1.proposedQueueCost <= item1.fairShare && item2.proposedQueueCost <= item2.fairShare {
// If adding the items results in neither queue exceeding its fairshare
// Take the largest job if the queues are equal current cost (which is the case if all jobs get evicted / on an empty farm)
// The reason we prefer larger jobs is:
// - It reduces fragmentation - a typical strategy is to schedule larger jobs first as smaller jobs can fit in around larger jobs
// - It makes it easier for larger jobs to get on and helps to reduce to bias towards smaller jobs.
// Particularly helpful if users have a single large gang they want to get on, as they'll get considered first
if item1.currentQueueCost == item2.currentQueueCost && item1.itemSize != item2.itemSize {
return item1.itemSize > item2.itemSize
}
// Otherwise let whichever queue has the lowest current cost go first, regardless of job size
// This is so that:
// - We interleave smaller jobs and don't just schedule a queue of large jobs first until it hits its fairshare
// - So we don't block queues with larger jobs from getting on as they make a bigger step than queues with smaller jobs
if item1.currentQueueCost != item2.currentQueueCost {
return item1.currentQueueCost < item2.currentQueueCost
}
} else if item1.proposedQueueCost > item1.fairShare && item2.proposedQueueCost > item2.fairShare {
// If adding the items results in both queues being above their fairshare
// take the item that results in the smallest amount over the fairshare
if item1.proposedQueueCost != item2.proposedQueueCost {
return item1.proposedQueueCost < item2.proposedQueueCost
}
} else if item1.proposedQueueCost <= item1.fairShare {
return true
} else if item2.proposedQueueCost <= item2.fairShare {
return false
}
} else {
if item1.proposedQueueCost != item2.proposedQueueCost {
return item1.proposedQueueCost < item2.proposedQueueCost
}
}

// Tie-break by queue name.
return pq.items[i].queue < pq.items[j].queue
return item1.queue < item2.queue
}

func (pq *QueueCandidateGangIteratorPQ) Swap(i, j int) {
Expand Down
Loading

0 comments on commit 500e08a

Please sign in to comment.