Skip to content

Commit

Permalink
Scheduler: refactor floating resources to use internaltypes (#4047)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert Smith <[email protected]>
  • Loading branch information
robertdavidsmith authored Nov 14, 2024
1 parent 5a1cc78 commit 5051d79
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 74 deletions.
92 changes: 52 additions & 40 deletions internal/scheduler/floatingresources/floating_resource_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,55 +10,55 @@ import (
"github.com/armadaproject/armada/internal/common/maps"
"github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)

type FloatingResourceTypes struct {
zeroFloatingResources schedulerobjects.ResourceList
pools map[string]*floatingResourcePool
rlFactory *internaltypes.ResourceListFactory
floatingResourceLimitsByPool map[string]internaltypes.ResourceList
}

type floatingResourcePool struct {
totalResources schedulerobjects.ResourceList
func NewFloatingResourceTypes(config []configuration.FloatingResourceConfig, rlFactory *internaltypes.ResourceListFactory) (*FloatingResourceTypes, error) {
err := validate(config)
if err != nil {
return nil, err
}

floatingResourceLimitsByPool := map[string]internaltypes.ResourceList{}
for _, fr := range config {
for _, poolConfig := range fr.Pools {
floatingResourceLimitsByPool[poolConfig.Name] = floatingResourceLimitsByPool[poolConfig.Name].Add(
rlFactory.FromNodeProto(map[string]resource.Quantity{fr.Name: poolConfig.Quantity}),
)
}
}

return &FloatingResourceTypes{
floatingResourceLimitsByPool: floatingResourceLimitsByPool,
}, nil
}

func NewFloatingResourceTypes(config []configuration.FloatingResourceConfig, rlFactory *internaltypes.ResourceListFactory) (*FloatingResourceTypes, error) {
zeroFloatingResources := schedulerobjects.ResourceList{Resources: make(map[string]resource.Quantity, len(config))}
func validate(config []configuration.FloatingResourceConfig) error {
floatingResourceNamesSeen := map[string]bool{}
for _, c := range config {
if _, exists := zeroFloatingResources.Resources[c.Name]; exists {
return nil, fmt.Errorf("duplicate floating resource %s", c.Name)
if _, exists := floatingResourceNamesSeen[c.Name]; exists {
return fmt.Errorf("duplicate floating resource %s", c.Name)
}
zeroFloatingResources.Resources[c.Name] = resource.Quantity{}
floatingResourceNamesSeen[c.Name] = true
}

pools := map[string]*floatingResourcePool{}
for _, fr := range config {
poolNamesSeen := map[string]bool{}
for _, poolConfig := range fr.Pools {
pool, exists := pools[poolConfig.Name]
if !exists {
pool = &floatingResourcePool{
totalResources: zeroFloatingResources.DeepCopy(),
}
pools[poolConfig.Name] = pool
}
existing := pool.totalResources.Resources[fr.Name]
if existing.Cmp(resource.Quantity{}) != 0 {
return nil, fmt.Errorf("duplicate floating resource %s for pool %s", fr.Name, poolConfig.Name)
if _, exists := poolNamesSeen[poolConfig.Name]; exists {
return fmt.Errorf("floating resource %s has duplicate pool %s", fr.Name, poolConfig.Name)
}
pool.totalResources.Resources[fr.Name] = poolConfig.Quantity.DeepCopy()
poolNamesSeen[poolConfig.Name] = true
}
}

return &FloatingResourceTypes{
zeroFloatingResources: zeroFloatingResources,
pools: pools,
rlFactory: rlFactory,
}, nil
return nil
}

func (frt *FloatingResourceTypes) WithinLimits(poolName string, allocated internaltypes.ResourceList) (bool, string) {
available := frt.GetTotalAvailableForPoolInternalTypes(poolName)
available := frt.GetTotalAvailableForPool(poolName)
if available.AllZero() {
return false, fmt.Sprintf("floating resources not connfigured for pool %s", poolName)
}
Expand All @@ -72,26 +72,38 @@ func (frt *FloatingResourceTypes) WithinLimits(poolName string, allocated intern
}

func (frt *FloatingResourceTypes) AllPools() []string {
result := maps.Keys(frt.pools)
result := maps.Keys(frt.floatingResourceLimitsByPool)
slices.Sort(result)
return result
}

func (frt *FloatingResourceTypes) GetTotalAvailableForPool(poolName string) schedulerobjects.ResourceList {
pool, exists := frt.pools[poolName]
if !exists {
return frt.zeroFloatingResources.DeepCopy()
func (frt *FloatingResourceTypes) GetTotalAvailableForPool(poolName string) internaltypes.ResourceList {
limits, ok := frt.floatingResourceLimitsByPool[poolName]
if !ok {
return internaltypes.ResourceList{}
}
return pool.totalResources.DeepCopy()
return limits
}

func (frt *FloatingResourceTypes) GetTotalAvailableForPoolInternalTypes(poolName string) internaltypes.ResourceList {
return frt.rlFactory.FromNodeProto(frt.GetTotalAvailableForPool(poolName).Resources)
func (frt *FloatingResourceTypes) GetTotalAvailableForPoolAsMap(poolName string) map[string]resource.Quantity {
limits := frt.GetTotalAvailableForPool(poolName)
result := map[string]resource.Quantity{}
for _, res := range limits.GetResources() {
if res.Type != internaltypes.Floating {
continue
}
result[res.Name] = res.Value
}
return result
}

func (frt *FloatingResourceTypes) SummaryString() string {
if len(frt.zeroFloatingResources.Resources) == 0 {
if len(frt.floatingResourceLimitsByPool) == 0 {
return "none"
}
return strings.Join(maps.Keys(frt.zeroFloatingResources.Resources), " ")
poolSummaries := []string{}
for _, poolName := range frt.AllPools() {
poolSummaries = append(poolSummaries, fmt.Sprintf("%s: (%s)", poolName, frt.floatingResourceLimitsByPool[poolName]))
}
return strings.Join(poolSummaries, " ")
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,86 @@ func TestAllPools(t *testing.T) {
assert.Equal(t, []string{"cpu", "gpu"}, sut.AllPools())
}

func TestGetTotalAvailableForPool(t *testing.T) {
sut := makeSut(t, makeRlFactory())
zero := resource.Quantity{}
assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": resource.MustParse("200"), "floating-resource-2": resource.MustParse("300")}, sut.GetTotalAvailableForPool("cpu").Resources)
assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": resource.MustParse("100"), "floating-resource-2": zero}, sut.GetTotalAvailableForPool("gpu").Resources)
assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": zero, "floating-resource-2": zero}, sut.GetTotalAvailableForPool("some-other-pool").Resources)
func TestNewFloatingResourceTypes_ErrorsOnDuplicateFloatingResource(t *testing.T) {
cfg := []configuration.FloatingResourceConfig{
{
Name: "floating-resource-1",
Pools: []configuration.FloatingResourcePoolConfig{
{
Name: "cpu",
Quantity: resource.MustParse("200"),
},
},
},
{
Name: "floating-resource-1",
Pools: []configuration.FloatingResourcePoolConfig{
{
Name: "gpu",
Quantity: resource.MustParse("300"),
},
},
},
}

frt, err := NewFloatingResourceTypes(cfg, makeRlFactory())
assert.Nil(t, frt)
assert.NotNil(t, err)
}

func TestGetTotalAvailableForPoolInternalTypes(t *testing.T) {
func TestNewFloatingResourceTypes_ErrorsOnDuplicatePool(t *testing.T) {
cfg := []configuration.FloatingResourceConfig{
{
Name: "floating-resource-1",
Pools: []configuration.FloatingResourcePoolConfig{
{
Name: "cpu",
Quantity: resource.MustParse("200"),
}, {
Name: "cpu",
Quantity: resource.MustParse("200"),
},
},
},
}

frt, err := NewFloatingResourceTypes(cfg, makeRlFactory())
assert.Nil(t, frt)
assert.NotNil(t, err)
}

func TestGetTotalAvailableForPool(t *testing.T) {
sut := makeSut(t, makeRlFactory())

cpuPool := sut.GetTotalAvailableForPoolInternalTypes("cpu")
cpuPool := sut.GetTotalAvailableForPool("cpu")
assert.Equal(t, int64(200000), cpuPool.GetByNameZeroIfMissing("floating-resource-1"))
assert.Equal(t, int64(300000), cpuPool.GetByNameZeroIfMissing("floating-resource-2"))

gpuPool := sut.GetTotalAvailableForPoolInternalTypes("gpu")
gpuPool := sut.GetTotalAvailableForPool("gpu")
assert.Equal(t, int64(100000), gpuPool.GetByNameZeroIfMissing("floating-resource-1"))
assert.Equal(t, int64(0), gpuPool.GetByNameZeroIfMissing("floating-resource-2"))

notFound := sut.GetTotalAvailableForPoolInternalTypes("some-invalid-value")
assert.Equal(t, int64(0), notFound.GetByNameZeroIfMissing("floating-resource-1"))
assert.Equal(t, int64(0), notFound.GetByNameZeroIfMissing("floating-resource-2"))
notFound := sut.GetTotalAvailableForPool("some-invalid-value")
assert.True(t, notFound.IsEmpty())
}

func TestGetTotalAvailableForPoolAsMap(t *testing.T) {
sut := makeSut(t, makeRlFactory())

cpuPool := sut.GetTotalAvailableForPoolAsMap("cpu")
assert.Equal(t, map[string]resource.Quantity{
"floating-resource-1": *resource.NewMilliQuantity(200000, resource.DecimalSI),
"floating-resource-2": *resource.NewMilliQuantity(300000, resource.DecimalSI),
}, cpuPool)

gpuPool := sut.GetTotalAvailableForPoolAsMap("gpu")
assert.Equal(t, map[string]resource.Quantity{
"floating-resource-1": *resource.NewMilliQuantity(100000, resource.DecimalSI),
"floating-resource-2": *resource.NewMilliQuantity(0, resource.DecimalSI),
}, gpuPool)

notFound := sut.GetTotalAvailableForPoolAsMap("some-invalid-value")
assert.Equal(t, map[string]resource.Quantity{}, notFound)
}

func TestWithinLimits_WhenWithinLimits_ReturnsTrue(t *testing.T) {
Expand Down
20 changes: 11 additions & 9 deletions internal/scheduler/internaltypes/resource_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ type ResourceList struct {
}

type Resource struct {
Name string
Value int64
Scale k8sResource.Scale
Type ResourceType
Name string
RawValue int64
Value k8sResource.Quantity
Scale k8sResource.Scale
Type ResourceType
}

func (rl ResourceList) Equal(other ResourceList) bool {
Expand Down Expand Up @@ -87,7 +88,7 @@ func (rl ResourceList) GetResourceByNameZeroIfMissing(name string) k8sResource.Q
return k8sResource.Quantity{}
}

return *k8sResource.NewScaledQuantity(rl.resources[index], rl.factory.scales[index])
return *rl.asQuantity(index)
}

func (rl ResourceList) GetResources() []Resource {
Expand All @@ -98,10 +99,11 @@ func (rl ResourceList) GetResources() []Resource {
result := make([]Resource, len(rl.resources))
for i, q := range rl.resources {
result[i] = Resource{
Name: rl.factory.indexToName[i],
Value: q,
Scale: rl.factory.scales[i],
Type: rl.factory.types[i],
Name: rl.factory.indexToName[i],
RawValue: q,
Value: *rl.asQuantity(i),
Scale: rl.factory.scales[i],
Type: rl.factory.types[i],
}
}
return result
Expand Down
17 changes: 11 additions & 6 deletions internal/scheduler/internaltypes/resource_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,18 @@ func TestGetResources(t *testing.T) {
a := testResourceList(factory, "1", "1Gi")

expected := []Resource{
{Name: "memory", Value: 1024 * 1024 * 1024, Scale: k8sResource.Scale(0), Type: Kubernetes},
{Name: "ephemeral-storage", Value: 0, Scale: k8sResource.Scale(0), Type: Kubernetes},
{Name: "cpu", Value: 1000, Scale: k8sResource.Milli, Type: Kubernetes},
{Name: "nvidia.com/gpu", Value: 0, Scale: k8sResource.Milli, Type: Kubernetes},
{Name: "external-storage-connections", Value: 0, Scale: 0, Type: Floating},
{Name: "external-storage-bytes", Value: 0, Scale: 0, Type: Floating},
{Name: "memory", RawValue: 1024 * 1024 * 1024, Scale: k8sResource.Scale(0), Type: Kubernetes},
{Name: "ephemeral-storage", RawValue: 0, Scale: k8sResource.Scale(0), Type: Kubernetes},
{Name: "cpu", RawValue: 1000, Scale: k8sResource.Milli, Type: Kubernetes},
{Name: "nvidia.com/gpu", RawValue: 0, Scale: k8sResource.Milli, Type: Kubernetes},
{Name: "external-storage-connections", RawValue: 0, Scale: 0, Type: Floating},
{Name: "external-storage-bytes", RawValue: 0, Scale: 0, Type: Floating},
}

for i, r := range expected {
expected[i].Value = *k8sResource.NewScaledQuantity(r.RawValue, r.Scale)
}

assert.Equal(t, expected, a.GetResources())
}

Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p
}

for _, pool := range c.floatingResourceTypes.AllPools() {
totalFloatingResources := c.floatingResourceTypes.GetTotalAvailableForPool(pool)
totalFloatingResources := schedulerobjects.ResourceList{Resources: c.floatingResourceTypes.GetTotalAvailableForPoolAsMap(pool)}
clusterKey := clusterMetricKey{
cluster: "floating",
pool: pool,
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/metrics/cycle_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (m *cycleMetrics) ReportSchedulerResult(result scheduling.SchedulerResult)
m.evictedJobs.WithLabelValues(pool, queue).Set(float64(s.EvictedJobCount))

for _, r := range s.EvictedResources.GetResources() {
m.evictedResources.WithLabelValues(pool, queue, r.Name).Set(float64(r.Value))
m.evictedResources.WithLabelValues(pool, queue, r.Name).Set(float64(r.RawValue))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestEvictOversubscribed(t *testing.T) {
for nodeId, node := range result.AffectedNodesById {
for _, p := range priorities {
for _, r := range node.AllocatableByPriority[p].GetResources() {
assert.True(t, r.Value >= 0, "resource %s oversubscribed by %d on node %s", r.Name, r.Value, nodeId)
assert.True(t, r.RawValue >= 0, "resource %s oversubscribed by %d on node %s", r.Name, r.RawValue, nodeId)
}
}
}
Expand Down Expand Up @@ -2202,7 +2202,7 @@ func TestPreemptingQueueScheduler(t *testing.T) {
for node := it.NextNode(); node != nil; node = it.NextNode() {
for _, p := range priorities {
for _, r := range node.AllocatableByPriority[p].GetResources() {
assert.True(t, r.Value >= 0, "resource %s oversubscribed by %d on node %s", r.Name, r.Value, node.GetId())
assert.True(t, r.RawValue >= 0, "resource %s oversubscribed by %d on node %s", r.Name, r.RawValue, node.GetId())
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions internal/scheduler/scheduling/scheduling_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (l *FairSchedulingAlgo) Schedule(
ctx.Infof("Scheduling on pool %s with capacity %s %s",
pool,
fsctx.nodeDb.TotalKubernetesResources().String(),
l.floatingResourceTypes.GetTotalAvailableForPoolInternalTypes(pool.Name).String(),
l.floatingResourceTypes.GetTotalAvailableForPool(pool.Name).String(),
)

start := time.Now()
Expand Down Expand Up @@ -277,7 +277,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con
}

totalResources := nodeDb.TotalKubernetesResources()
totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPoolInternalTypes(pool.Name))
totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPool(pool.Name))

schedulingContext, err := l.constructSchedulingContext(
pool.Name,
Expand Down Expand Up @@ -528,7 +528,7 @@ func (l *FairSchedulingAlgo) SchedulePool(
pool string,
) (*SchedulerResult, *schedulercontext.SchedulingContext, error) {
totalResources := fsctx.nodeDb.TotalKubernetesResources()
totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPoolInternalTypes(pool))
totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPool(pool))

constraints := schedulerconstraints.NewSchedulingConstraints(pool, totalResources, l.schedulingConfig, maps.Values(fsctx.queues))

Expand Down

0 comments on commit 5051d79

Please sign in to comment.