Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add spot price for fair-share scheduled pools (#301) #4079

Merged
merged 1 commit into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,7 @@ scheduling:
executorTimeout: "10m"
maxUnacknowledgedJobsPerExecutor: 2500
executorUpdateFrequency: "60s"
experimentalIndicativePricing:
basePrice: 100.0
basePriority: 500.0

8 changes: 7 additions & 1 deletion internal/scheduler/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ type SchedulingConfig struct {
DefaultPoolSchedulePriority int
Pools []PoolConfig
// TODO: Remove this feature gate
EnableExecutorCordoning bool
EnableExecutorCordoning bool
ExperimentalIndicativePricing ExperimentalIndicativePricing
}

const (
Expand Down Expand Up @@ -289,3 +290,8 @@ func (sc *SchedulingConfig) GetProtectedFractionOfFairShare(poolName string) flo
}
return sc.ProtectedFractionOfFairShare
}

type ExperimentalIndicativePricing struct {
BasePrice float64
BasePriority float64
}
57 changes: 42 additions & 15 deletions internal/scheduler/scheduling/context/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package context

import (
"fmt"
"math"
"strings"
"text/tabwriter"
"time"
Expand All @@ -14,6 +15,7 @@ import (

"github.com/armadaproject/armada/internal/common/armadaerrors"
armadamaps "github.com/armadaproject/armada/internal/common/maps"
"github.com/armadaproject/armada/internal/common/util"
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
"github.com/armadaproject/armada/internal/scheduler/jobdb"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
Expand Down Expand Up @@ -143,22 +145,52 @@ func (sctx *SchedulingContext) GetQueue(queue string) (fairness.Queue, bool) {
return qctx, ok
}

type queueInfo struct {
queueName string
adjustedShare float64
fairShare float64
weight float64
cappedShare float64
}

// UpdateFairShares updates FairShare and AdjustedFairShare for every QueueSchedulingContext associated with the
// SchedulingContext. This works by calculating a far share as queue_weight/sum_of_all_queue_weights and an
// AdjustedFairShare by resharing any unused capacity (as determined by a queue's demand)
func (sctx *SchedulingContext) UpdateFairShares() {
const maxIterations = 5
queueInfos := sctx.updateFairShares(sctx.QueueSchedulingContexts)
for _, q := range queueInfos {
qtx := sctx.QueueSchedulingContexts[q.queueName]
qtx.FairShare = q.fairShare
qtx.AdjustedFairShare = q.adjustedShare
}
}

type queueInfo struct {
queueName string
adjustedShare float64
fairShare float64
weight float64
cappedShare float64
// CalculateTheoreticalShare calculates the maximum potential adjustedFairShare share of a new queue at a given priority.
func (sctx *SchedulingContext) CalculateTheoreticalShare(priority float64) float64 {
qctxs := maps.Clone(sctx.QueueSchedulingContexts)
queueName := util.NewULID()
qctxs[queueName] = &QueueSchedulingContext{
Queue: queueName,
Weight: 1 / priority,
CappedDemand: sctx.TotalResources.Factory().MakeAllMax(), // Infinite demand
}
queueInfos := sctx.updateFairShares(qctxs)
for _, q := range queueInfos {
if q.queueName == queueName {
return q.adjustedShare
}
}
return math.NaN()
}

// UpdateFairShares updates FairShare and AdjustedFairShare for every QueueSchedulingContext associated with the
// SchedulingContext. This works by calculating a far share as queue_weight/sum_of_all_queue_weights and an
// AdjustedFairShare by resharing any unused capacity (as determined by a queue's demand)
func (sctx *SchedulingContext) updateFairShares(qctxs map[string]*QueueSchedulingContext) []*queueInfo {
const maxIterations = 5

queueInfos := make([]*queueInfo, 0, len(sctx.QueueSchedulingContexts))
for queueName, qctx := range sctx.QueueSchedulingContexts {
queueInfos := make([]*queueInfo, 0, len(qctxs))
for queueName, qctx := range qctxs {
cappedShare := 1.0
if !sctx.TotalResources.AllZero() {
cappedShare = sctx.FairnessCostProvider.UnweightedCostFromAllocation(qctx.CappedDemand)
Expand Down Expand Up @@ -202,12 +234,7 @@ func (sctx *SchedulingContext) UpdateFairShares() {
}
}
}

for _, q := range queueInfos {
qtx := sctx.QueueSchedulingContexts[q.queueName]
qtx.FairShare = q.fairShare
qtx.AdjustedFairShare = q.adjustedShare
}
return queueInfos
}

func (sctx *SchedulingContext) ReportString(verbosity int32) string {
Expand Down
80 changes: 80 additions & 0 deletions internal/scheduler/scheduling/context/scheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,86 @@ func TestCalculateFairShares(t *testing.T) {
}
}

func TestCalculateTheoreticalShare(t *testing.T) {
oneCpu := cpu(1)
oneHundredCpu := cpu(100)
oneThousandCpu := cpu(1000)
tests := map[string]struct {
availableResources internaltypes.ResourceList
queueCtxs map[string]*QueueSchedulingContext
basePrice float64
basePriority float64
expectedTheoreticalShare float64
}{
"Cluster Empty": {
availableResources: oneHundredCpu,
queueCtxs: map[string]*QueueSchedulingContext{},
basePriority: 1.0,
expectedTheoreticalShare: 1.0,
},
"One user": {
availableResources: oneHundredCpu,
queueCtxs: map[string]*QueueSchedulingContext{
"queueA": {Weight: 1.0, Demand: oneThousandCpu},
},
basePriority: 1.0,
expectedTheoreticalShare: 0.5,
},
"Two users": {
availableResources: oneHundredCpu,
queueCtxs: map[string]*QueueSchedulingContext{
"queueA": {Weight: 1.0, Demand: oneThousandCpu},
"queueB": {Weight: 1.0, Demand: oneThousandCpu},
},
basePriority: 1.0,
expectedTheoreticalShare: 1.0 / 3,
},
"One user with lower priority": {
availableResources: oneHundredCpu,
queueCtxs: map[string]*QueueSchedulingContext{
"queueA": {Weight: 0.5, Demand: oneThousandCpu},
},
basePriority: 1.0,
expectedTheoreticalShare: 2.0 / 3,
},
"One user with higher priority": {
availableResources: oneHundredCpu,
queueCtxs: map[string]*QueueSchedulingContext{
"queueA": {Weight: 2, Demand: oneThousandCpu},
},
basePriority: 1.0,
expectedTheoreticalShare: 1.0 / 3,
},
"One user with a low demand": {
availableResources: oneHundredCpu,
queueCtxs: map[string]*QueueSchedulingContext{
"queueA": {Weight: 1.0, Demand: oneCpu},
},
basePriority: 1.0,
expectedTheoreticalShare: 0.99,
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
fairnessCostProvider, err := fairness.NewDominantResourceFairness(tc.availableResources, configuration.SchedulingConfig{DominantResourceFairnessResourcesToConsider: []string{"cpu"}})
require.NoError(t, err)
sctx := NewSchedulingContext(
"pool",
fairnessCostProvider,
nil,
tc.availableResources,
)
for qName, q := range tc.queueCtxs {
err = sctx.AddQueueSchedulingContext(
qName, q.Weight, map[string]internaltypes.ResourceList{}, q.Demand, q.Demand, nil)
require.NoError(t, err)
}
theoreticalShare := sctx.CalculateTheoreticalShare(tc.basePriority)
assert.InDelta(t, tc.expectedTheoreticalShare, theoreticalShare, 1e-6)
})
}
}

func TestCalculateFairnessError(t *testing.T) {
tests := map[string]struct {
availableResources internaltypes.ResourceList
Expand Down
25 changes: 23 additions & 2 deletions internal/scheduler/scheduling/scheduling_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,9 +575,13 @@ func (l *FairSchedulingAlgo) SchedulePool(
WithNewRun(node.GetExecutor(), node.GetId(), node.GetName(), pool, priority)
}

// set spot price
if marketDriven {
fractionAllocated := fsctx.schedulingContext.FairnessCostProvider.UnweightedCostFromAllocation(fsctx.schedulingContext.Allocated)
price := l.calculateSpotPrice(maps.Keys(fsctx.nodeIdByJobId), result.ScheduledJobs, result.PreemptedJobs, fractionAllocated, fsctx.Txn)
price := l.calculateMarketDrivenSpotPrice(maps.Keys(fsctx.nodeIdByJobId), result.ScheduledJobs, result.PreemptedJobs, fractionAllocated, fsctx.Txn)
fsctx.schedulingContext.SpotPrice = price
} else {
price := l.calculateFairShareDrivenSpotPrice(fsctx.schedulingContext, l.schedulingConfig.ExperimentalIndicativePricing.BasePrice, l.schedulingConfig.ExperimentalIndicativePricing.BasePriority)
fsctx.schedulingContext.SpotPrice = price
}

Expand Down Expand Up @@ -710,7 +714,7 @@ func (l *FairSchedulingAlgo) filterLaggingExecutors(
return activeExecutors
}

func (l *FairSchedulingAlgo) calculateSpotPrice(initialRunningJobIds []string, scheduledJobs, preemptedJobs []*schedulercontext.JobSchedulingContext, fractionAllocated float64, txn *jobdb.Txn) float64 {
func (l *FairSchedulingAlgo) calculateMarketDrivenSpotPrice(initialRunningJobIds []string, scheduledJobs, preemptedJobs []*schedulercontext.JobSchedulingContext, fractionAllocated float64, txn *jobdb.Txn) float64 {
// If we've allocated less that 95% of available resources then we don't charge.
// TODO: make this configurable
if fractionAllocated < 0.95 {
Expand Down Expand Up @@ -745,3 +749,20 @@ func (l *FairSchedulingAlgo) calculateSpotPrice(initialRunningJobIds []string, s
}
return minPrice
}

func (l *FairSchedulingAlgo) calculateFairShareDrivenSpotPrice(sctx *schedulercontext.SchedulingContext, basePrice float64, basePriority float64) float64 {
theoreticalShare := sctx.CalculateTheoreticalShare(basePriority)

// If you can get 50% or greater than we don't charge
if theoreticalShare >= 0.5 {
return 0
}

// Linear interpolation between 50% and 10%
if theoreticalShare >= 0.1 {
return basePrice * (0.5 - theoreticalShare)
}

// Reciprocal growth below 10%
return basePrice * (0.1 / theoreticalShare)
}
Loading