diff --git a/config/scheduler/config.yaml b/config/scheduler/config.yaml index ffc9f65c2f3..d3591db942a 100644 --- a/config/scheduler/config.yaml +++ b/config/scheduler/config.yaml @@ -115,4 +115,7 @@ scheduling: executorTimeout: "10m" maxUnacknowledgedJobsPerExecutor: 2500 executorUpdateFrequency: "60s" + experimentalIndicativePricing: + basePrice: 100.0 + basePriority: 500.0 diff --git a/internal/scheduler/configuration/configuration.go b/internal/scheduler/configuration/configuration.go index 4563a7dc24c..c1866157ede 100644 --- a/internal/scheduler/configuration/configuration.go +++ b/internal/scheduler/configuration/configuration.go @@ -239,7 +239,8 @@ type SchedulingConfig struct { DefaultPoolSchedulePriority int Pools []PoolConfig // TODO: Remove this feature gate - EnableExecutorCordoning bool + EnableExecutorCordoning bool + ExperimentalIndicativePricing ExperimentalIndicativePricing } const ( @@ -289,3 +290,8 @@ func (sc *SchedulingConfig) GetProtectedFractionOfFairShare(poolName string) flo } return sc.ProtectedFractionOfFairShare } + +type ExperimentalIndicativePricing struct { + BasePrice float64 + BasePriority float64 +} diff --git a/internal/scheduler/scheduling/context/scheduling.go b/internal/scheduler/scheduling/context/scheduling.go index 8e62158f65c..cfed7c1f0f3 100644 --- a/internal/scheduler/scheduling/context/scheduling.go +++ b/internal/scheduler/scheduling/context/scheduling.go @@ -2,6 +2,7 @@ package context import ( "fmt" + "math" "strings" "text/tabwriter" "time" @@ -14,6 +15,7 @@ import ( "github.com/armadaproject/armada/internal/common/armadaerrors" armadamaps "github.com/armadaproject/armada/internal/common/maps" + "github.com/armadaproject/armada/internal/common/util" "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" @@ -143,22 +145,52 @@ func (sctx *SchedulingContext) GetQueue(queue string) (fairness.Queue, bool) { return qctx, ok } +type queueInfo struct { + queueName string + adjustedShare float64 + fairShare float64 + weight float64 + cappedShare float64 +} + // UpdateFairShares updates FairShare and AdjustedFairShare for every QueueSchedulingContext associated with the // SchedulingContext. This works by calculating a far share as queue_weight/sum_of_all_queue_weights and an // AdjustedFairShare by resharing any unused capacity (as determined by a queue's demand) func (sctx *SchedulingContext) UpdateFairShares() { - const maxIterations = 5 + queueInfos := sctx.updateFairShares(sctx.QueueSchedulingContexts) + for _, q := range queueInfos { + qtx := sctx.QueueSchedulingContexts[q.queueName] + qtx.FairShare = q.fairShare + qtx.AdjustedFairShare = q.adjustedShare + } +} - type queueInfo struct { - queueName string - adjustedShare float64 - fairShare float64 - weight float64 - cappedShare float64 +// CalculateTheoreticalShare calculates the maximum potential adjustedFairShare share of a new queue at a given priority. +func (sctx *SchedulingContext) CalculateTheoreticalShare(priority float64) float64 { + qctxs := maps.Clone(sctx.QueueSchedulingContexts) + queueName := util.NewULID() + qctxs[queueName] = &QueueSchedulingContext{ + Queue: queueName, + Weight: 1 / priority, + CappedDemand: sctx.TotalResources.Factory().MakeAllMax(), // Infinite demand + } + queueInfos := sctx.updateFairShares(qctxs) + for _, q := range queueInfos { + if q.queueName == queueName { + return q.adjustedShare + } } + return math.NaN() +} + +// UpdateFairShares updates FairShare and AdjustedFairShare for every QueueSchedulingContext associated with the +// SchedulingContext. This works by calculating a far share as queue_weight/sum_of_all_queue_weights and an +// AdjustedFairShare by resharing any unused capacity (as determined by a queue's demand) +func (sctx *SchedulingContext) updateFairShares(qctxs map[string]*QueueSchedulingContext) []*queueInfo { + const maxIterations = 5 - queueInfos := make([]*queueInfo, 0, len(sctx.QueueSchedulingContexts)) - for queueName, qctx := range sctx.QueueSchedulingContexts { + queueInfos := make([]*queueInfo, 0, len(qctxs)) + for queueName, qctx := range qctxs { cappedShare := 1.0 if !sctx.TotalResources.AllZero() { cappedShare = sctx.FairnessCostProvider.UnweightedCostFromAllocation(qctx.CappedDemand) @@ -202,12 +234,7 @@ func (sctx *SchedulingContext) UpdateFairShares() { } } } - - for _, q := range queueInfos { - qtx := sctx.QueueSchedulingContexts[q.queueName] - qtx.FairShare = q.fairShare - qtx.AdjustedFairShare = q.adjustedShare - } + return queueInfos } func (sctx *SchedulingContext) ReportString(verbosity int32) string { diff --git a/internal/scheduler/scheduling/context/scheduling_test.go b/internal/scheduler/scheduling/context/scheduling_test.go index 8031aa7540e..38461849b2f 100644 --- a/internal/scheduler/scheduling/context/scheduling_test.go +++ b/internal/scheduler/scheduling/context/scheduling_test.go @@ -198,6 +198,86 @@ func TestCalculateFairShares(t *testing.T) { } } +func TestCalculateTheoreticalShare(t *testing.T) { + oneCpu := cpu(1) + oneHundredCpu := cpu(100) + oneThousandCpu := cpu(1000) + tests := map[string]struct { + availableResources internaltypes.ResourceList + queueCtxs map[string]*QueueSchedulingContext + basePrice float64 + basePriority float64 + expectedTheoreticalShare float64 + }{ + "Cluster Empty": { + availableResources: oneHundredCpu, + queueCtxs: map[string]*QueueSchedulingContext{}, + basePriority: 1.0, + expectedTheoreticalShare: 1.0, + }, + "One user": { + availableResources: oneHundredCpu, + queueCtxs: map[string]*QueueSchedulingContext{ + "queueA": {Weight: 1.0, Demand: oneThousandCpu}, + }, + basePriority: 1.0, + expectedTheoreticalShare: 0.5, + }, + "Two users": { + availableResources: oneHundredCpu, + queueCtxs: map[string]*QueueSchedulingContext{ + "queueA": {Weight: 1.0, Demand: oneThousandCpu}, + "queueB": {Weight: 1.0, Demand: oneThousandCpu}, + }, + basePriority: 1.0, + expectedTheoreticalShare: 1.0 / 3, + }, + "One user with lower priority": { + availableResources: oneHundredCpu, + queueCtxs: map[string]*QueueSchedulingContext{ + "queueA": {Weight: 0.5, Demand: oneThousandCpu}, + }, + basePriority: 1.0, + expectedTheoreticalShare: 2.0 / 3, + }, + "One user with higher priority": { + availableResources: oneHundredCpu, + queueCtxs: map[string]*QueueSchedulingContext{ + "queueA": {Weight: 2, Demand: oneThousandCpu}, + }, + basePriority: 1.0, + expectedTheoreticalShare: 1.0 / 3, + }, + "One user with a low demand": { + availableResources: oneHundredCpu, + queueCtxs: map[string]*QueueSchedulingContext{ + "queueA": {Weight: 1.0, Demand: oneCpu}, + }, + basePriority: 1.0, + expectedTheoreticalShare: 0.99, + }, + } + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + fairnessCostProvider, err := fairness.NewDominantResourceFairness(tc.availableResources, configuration.SchedulingConfig{DominantResourceFairnessResourcesToConsider: []string{"cpu"}}) + require.NoError(t, err) + sctx := NewSchedulingContext( + "pool", + fairnessCostProvider, + nil, + tc.availableResources, + ) + for qName, q := range tc.queueCtxs { + err = sctx.AddQueueSchedulingContext( + qName, q.Weight, map[string]internaltypes.ResourceList{}, q.Demand, q.Demand, nil) + require.NoError(t, err) + } + theoreticalShare := sctx.CalculateTheoreticalShare(tc.basePriority) + assert.InDelta(t, tc.expectedTheoreticalShare, theoreticalShare, 1e-6) + }) + } +} + func TestCalculateFairnessError(t *testing.T) { tests := map[string]struct { availableResources internaltypes.ResourceList diff --git a/internal/scheduler/scheduling/scheduling_algo.go b/internal/scheduler/scheduling/scheduling_algo.go index c14121fadf9..1b7db752320 100644 --- a/internal/scheduler/scheduling/scheduling_algo.go +++ b/internal/scheduler/scheduling/scheduling_algo.go @@ -575,9 +575,13 @@ func (l *FairSchedulingAlgo) SchedulePool( WithNewRun(node.GetExecutor(), node.GetId(), node.GetName(), pool, priority) } + // set spot price if marketDriven { fractionAllocated := fsctx.schedulingContext.FairnessCostProvider.UnweightedCostFromAllocation(fsctx.schedulingContext.Allocated) - price := l.calculateSpotPrice(maps.Keys(fsctx.nodeIdByJobId), result.ScheduledJobs, result.PreemptedJobs, fractionAllocated, fsctx.Txn) + price := l.calculateMarketDrivenSpotPrice(maps.Keys(fsctx.nodeIdByJobId), result.ScheduledJobs, result.PreemptedJobs, fractionAllocated, fsctx.Txn) + fsctx.schedulingContext.SpotPrice = price + } else { + price := l.calculateFairShareDrivenSpotPrice(fsctx.schedulingContext, l.schedulingConfig.ExperimentalIndicativePricing.BasePrice, l.schedulingConfig.ExperimentalIndicativePricing.BasePriority) fsctx.schedulingContext.SpotPrice = price } @@ -710,7 +714,7 @@ func (l *FairSchedulingAlgo) filterLaggingExecutors( return activeExecutors } -func (l *FairSchedulingAlgo) calculateSpotPrice(initialRunningJobIds []string, scheduledJobs, preemptedJobs []*schedulercontext.JobSchedulingContext, fractionAllocated float64, txn *jobdb.Txn) float64 { +func (l *FairSchedulingAlgo) calculateMarketDrivenSpotPrice(initialRunningJobIds []string, scheduledJobs, preemptedJobs []*schedulercontext.JobSchedulingContext, fractionAllocated float64, txn *jobdb.Txn) float64 { // If we've allocated less that 95% of available resources then we don't charge. // TODO: make this configurable if fractionAllocated < 0.95 { @@ -745,3 +749,20 @@ func (l *FairSchedulingAlgo) calculateSpotPrice(initialRunningJobIds []string, s } return minPrice } + +func (l *FairSchedulingAlgo) calculateFairShareDrivenSpotPrice(sctx *schedulercontext.SchedulingContext, basePrice float64, basePriority float64) float64 { + theoreticalShare := sctx.CalculateTheoreticalShare(basePriority) + + // If you can get 50% or greater than we don't charge + if theoreticalShare >= 0.5 { + return 0 + } + + // Linear interpolation between 50% and 10% + if theoreticalShare >= 0.1 { + return basePrice * (0.5 - theoreticalShare) + } + + // Reciprocal growth below 10% + return basePrice * (0.1 / theoreticalShare) +}