diff --git a/apis/config/v1beta1/configuration_types.go b/apis/config/v1beta1/configuration_types.go index 6c29d467e21..ddaa26aeb3f 100644 --- a/apis/config/v1beta1/configuration_types.go +++ b/apis/config/v1beta1/configuration_types.go @@ -95,6 +95,9 @@ type Configuration struct { // FairSharing controls the Fair Sharing semantics across the cluster. FairSharing *FairSharing `json:"fairSharing,omitempty"` + // admissionFairSharing indicates configuration of FairSharing with the `AdmissionTime` mode on + AdmissionFairSharing *AdmissionFairSharing `json:"admissionFairSharing,omitempty"` + // Resources provides additional configuration options for handling the resources. Resources *Resources `json:"resources,omitempty"` @@ -472,3 +475,18 @@ type FairSharing struct { // The default strategy is ["LessThanOrEqualToFinalShare", "LessThanInitialShare"]. PreemptionStrategies []PreemptionStrategy `json:"preemptionStrategies,omitempty"` } + +type AdmissionFairSharing struct { + // usageHalfLifeTime indicates the time after which the current usage will decay by a half + // If set to 0, usage will be reset to 0 immediately. + UsageHalfLifeTime metav1.Duration `json:"usageHalfLifeTime,omitempty"` + + // usageSamplingInterval indicates how often Kueue updates consumedResources in FairSharingStatus + // Defaults to 5min. + UsageSamplingInterval metav1.Duration `json:"usageSamplingInterval,omitempty"` + + // resourceWeights assigns weights to resources which then are used to calculate LocalQueue's + // resource usage and order Workloads. + // Defaults to 1. + ResourceWeights map[corev1.ResourceName]float64 `json:"resourceWeights,omitempty"` +} diff --git a/apis/config/v1beta1/defaults.go b/apis/config/v1beta1/defaults.go index 924bb6f28c2..7feb5734b75 100644 --- a/apis/config/v1beta1/defaults.go +++ b/apis/config/v1beta1/defaults.go @@ -210,6 +210,11 @@ func SetDefaults_Configuration(cfg *Configuration) { if fs := cfg.FairSharing; fs != nil && fs.Enable && len(fs.PreemptionStrategies) == 0 { fs.PreemptionStrategies = []PreemptionStrategy{LessThanOrEqualToFinalShare, LessThanInitialShare} } + if afs := cfg.AdmissionFairSharing; afs != nil { + if afs.UsageSamplingInterval.Duration == 0 { + afs.UsageSamplingInterval = metav1.Duration{Duration: 5 * time.Minute} + } + } if cfg.Resources != nil { for idx := range cfg.Resources.Transformations { diff --git a/apis/config/v1beta1/zz_generated.deepcopy.go b/apis/config/v1beta1/zz_generated.deepcopy.go index 92f28d9c2b0..cf24392827f 100644 --- a/apis/config/v1beta1/zz_generated.deepcopy.go +++ b/apis/config/v1beta1/zz_generated.deepcopy.go @@ -28,6 +28,30 @@ import ( timex "time" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AdmissionFairSharing) DeepCopyInto(out *AdmissionFairSharing) { + *out = *in + out.UsageHalfLifeTime = in.UsageHalfLifeTime + out.UsageSamplingInterval = in.UsageSamplingInterval + if in.ResourceWeights != nil { + in, out := &in.ResourceWeights, &out.ResourceWeights + *out = make(map[corev1.ResourceName]float64, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AdmissionFairSharing. +func (in *AdmissionFairSharing) DeepCopy() *AdmissionFairSharing { + if in == nil { + return nil + } + out := new(AdmissionFairSharing) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ClientConnection) DeepCopyInto(out *ClientConnection) { *out = *in @@ -118,6 +142,11 @@ func (in *Configuration) DeepCopyInto(out *Configuration) { *out = new(FairSharing) (*in).DeepCopyInto(*out) } + if in.AdmissionFairSharing != nil { + in, out := &in.AdmissionFairSharing, &out.AdmissionFairSharing + *out = new(AdmissionFairSharing) + (*in).DeepCopyInto(*out) + } if in.Resources != nil { in, out := &in.Resources, &out.Resources *out = new(Resources) diff --git a/apis/kueue/v1alpha1/zz_generated.deepcopy.go b/apis/kueue/v1alpha1/zz_generated.deepcopy.go index 201a05c7d75..a44e38a74f2 100644 --- a/apis/kueue/v1alpha1/zz_generated.deepcopy.go +++ b/apis/kueue/v1alpha1/zz_generated.deepcopy.go @@ -117,7 +117,7 @@ func (in *CohortStatus) DeepCopyInto(out *CohortStatus) { if in.FairSharing != nil { in, out := &in.FairSharing, &out.FairSharing *out = new(v1beta1.FairSharingStatus) - **out = **in + (*in).DeepCopyInto(*out) } } diff --git a/apis/kueue/v1beta1/clusterqueue_types.go b/apis/kueue/v1beta1/clusterqueue_types.go index 71a6690a53b..a4076ce88a2 100644 --- a/apis/kueue/v1beta1/clusterqueue_types.go +++ b/apis/kueue/v1beta1/clusterqueue_types.go @@ -137,6 +137,10 @@ type ClusterQueueSpec struct { // if FairSharing is enabled in the Kueue configuration. // +optional FairSharing *FairSharing `json:"fairSharing,omitempty"` + + // admissionScope indicates whether ClusterQueue uses the Admission Fair Sharing + // +optional + AdmissionScope *AdmissionScope `json:"admissionScope,omitempty"` } // AdmissionChecksStrategy defines a strategy for a AdmissionCheck. diff --git a/apis/kueue/v1beta1/fairsharing_types.go b/apis/kueue/v1beta1/fairsharing_types.go index ccbcb907da4..c37bfe44c23 100644 --- a/apis/kueue/v1beta1/fairsharing_types.go +++ b/apis/kueue/v1beta1/fairsharing_types.go @@ -16,7 +16,11 @@ limitations under the License. package v1beta1 -import "k8s.io/apimachinery/pkg/api/resource" +import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) // FairSharing contains the properties of the ClusterQueue or Cohort, // when participating in FairSharing. @@ -49,4 +53,31 @@ type FairSharingStatus struct { // weight of zero and is borrowing, this will return // 9223372036854775807, the maximum possible share value. WeightedShare int64 `json:"weightedShare"` + + // admissionFairSharingStatus represents information relevant to the Admission Fair Sharing + AdmissionFairSharingStatus *AdmissionFairSharingStatus `json:"admissionFairSharingStatus,omitempty"` } + +type AdmissionFairSharingStatus struct { + // ConsumedResources represents the aggregated usage of resources over time, + // with decaying function applied. + // The value is populated if usage consumption functionality is enabled in Kueue config. + ConsumedResources corev1.ResourceList `json:"consumedResources,omitempty"` + + // LastUpdate is the time when share and consumed resources were updated. + LastUpdate metav1.Time `json:"lastUpdate,omitempty"` +} + +type AdmissionScope struct { + AdmissionMode AdmissionMode `json:"admissionMode,omitempty"` +} + +type AdmissionMode string + +const ( + // AdmissionFairSharing based on usage, with QueuingStrategy as defined in CQ. + UsageBasedAdmissionFairSharing AdmissionMode = "UsageBasedAdmissionFairSharing" + + // AdmissionFairSharing is disabled for this CQ + NoAdmissionFairSharing AdmissionMode = "NoAdmissionFairSharing" +) diff --git a/apis/kueue/v1beta1/localqueue_types.go b/apis/kueue/v1beta1/localqueue_types.go index 1d11d167bfd..388dbdb77aa 100644 --- a/apis/kueue/v1beta1/localqueue_types.go +++ b/apis/kueue/v1beta1/localqueue_types.go @@ -48,6 +48,12 @@ type LocalQueueSpec struct { // +kubebuilder:validation:Enum=None;Hold;HoldAndDrain // +kubebuilder:default="None" StopPolicy *StopPolicy `json:"stopPolicy,omitempty"` + + // fairSharing defines the properties of the LocalQueue when + // participating in AdmissionFairSharing. The values are only relevant + // if AdmissionFairSharing is enabled in the Kueue configuration. + // +optional + FairSharing *FairSharing `json:"fairSharing,omitempty"` } type LocalQueueFlavorStatus struct { @@ -149,6 +155,10 @@ type LocalQueueStatus struct { // +kubebuilder:validation:MaxItems=16 // +optional Flavors []LocalQueueFlavorStatus `json:"flavors,omitempty"` + + // FairSharing contains the information about the current status of fair sharing. + // +optional + FairSharing *FairSharingStatus `json:"fairSharing,omitempty"` } const ( diff --git a/apis/kueue/v1beta1/zz_generated.deepcopy.go b/apis/kueue/v1beta1/zz_generated.deepcopy.go index ce07893883c..183ec9d0a2b 100644 --- a/apis/kueue/v1beta1/zz_generated.deepcopy.go +++ b/apis/kueue/v1beta1/zz_generated.deepcopy.go @@ -234,6 +234,44 @@ func (in *AdmissionChecksStrategy) DeepCopy() *AdmissionChecksStrategy { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AdmissionFairSharingStatus) DeepCopyInto(out *AdmissionFairSharingStatus) { + *out = *in + if in.ConsumedResources != nil { + in, out := &in.ConsumedResources, &out.ConsumedResources + *out = make(corev1.ResourceList, len(*in)) + for key, val := range *in { + (*out)[key] = val.DeepCopy() + } + } + in.LastUpdate.DeepCopyInto(&out.LastUpdate) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AdmissionFairSharingStatus. +func (in *AdmissionFairSharingStatus) DeepCopy() *AdmissionFairSharingStatus { + if in == nil { + return nil + } + out := new(AdmissionFairSharingStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AdmissionScope) DeepCopyInto(out *AdmissionScope) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AdmissionScope. +func (in *AdmissionScope) DeepCopy() *AdmissionScope { + if in == nil { + return nil + } + out := new(AdmissionScope) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *BorrowWithinCohort) DeepCopyInto(out *BorrowWithinCohort) { *out = *in @@ -414,6 +452,11 @@ func (in *ClusterQueueSpec) DeepCopyInto(out *ClusterQueueSpec) { *out = new(FairSharing) (*in).DeepCopyInto(*out) } + if in.AdmissionScope != nil { + in, out := &in.AdmissionScope, &out.AdmissionScope + *out = new(AdmissionScope) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterQueueSpec. @@ -458,7 +501,7 @@ func (in *ClusterQueueStatus) DeepCopyInto(out *ClusterQueueStatus) { if in.FairSharing != nil { in, out := &in.FairSharing, &out.FairSharing *out = new(FairSharingStatus) - **out = **in + (*in).DeepCopyInto(*out) } } @@ -495,6 +538,11 @@ func (in *FairSharing) DeepCopy() *FairSharing { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *FairSharingStatus) DeepCopyInto(out *FairSharingStatus) { *out = *in + if in.AdmissionFairSharingStatus != nil { + in, out := &in.AdmissionFairSharingStatus, &out.AdmissionFairSharingStatus + *out = new(AdmissionFairSharingStatus) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FairSharingStatus. @@ -725,6 +773,11 @@ func (in *LocalQueueSpec) DeepCopyInto(out *LocalQueueSpec) { *out = new(StopPolicy) **out = **in } + if in.FairSharing != nil { + in, out := &in.FairSharing, &out.FairSharing + *out = new(FairSharing) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LocalQueueSpec. @@ -768,6 +821,11 @@ func (in *LocalQueueStatus) DeepCopyInto(out *LocalQueueStatus) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.FairSharing != nil { + in, out := &in.FairSharing, &out.FairSharing + *out = new(FairSharingStatus) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LocalQueueStatus. diff --git a/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml b/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml index 0d696cdd870..874ef8b7b12 100644 --- a/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml +++ b/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml @@ -115,6 +115,13 @@ spec: type: object type: array type: object + admissionScope: + description: admissionScope indicates whether ClusterQueue uses the + Admission Fair Sharing + properties: + admissionMode: + type: string + type: object cohort: description: |- cohort that this ClusterQueue belongs to. CQs that belong to the @@ -590,6 +597,28 @@ spec: when participating in Fair Sharing. This is recorded only when Fair Sharing is enabled in the Kueue configuration. properties: + admissionFairSharingStatus: + description: admissionFairSharingStatus represents information + relevant to the Admission Fair Sharing + properties: + consumedResources: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: |- + ConsumedResources represents the aggregated usage of resources over time, + with decaying function applied. + The value is populated if usage consumption functionality is enabled in Kueue config. + type: object + lastUpdate: + description: LastUpdate is the time when share and consumed + resources were updated. + format: date-time + type: string + type: object weightedShare: description: |- WeightedShare represents the maximum of the ratios of usage diff --git a/charts/kueue/templates/crd/kueue.x-k8s.io_cohorts.yaml b/charts/kueue/templates/crd/kueue.x-k8s.io_cohorts.yaml index 873176e192c..b0cacc6f259 100644 --- a/charts/kueue/templates/crd/kueue.x-k8s.io_cohorts.yaml +++ b/charts/kueue/templates/crd/kueue.x-k8s.io_cohorts.yaml @@ -252,6 +252,28 @@ spec: when participating in Fair Sharing. The is recorded only when Fair Sharing is enabled in the Kueue configuration. properties: + admissionFairSharingStatus: + description: admissionFairSharingStatus represents information + relevant to the Admission Fair Sharing + properties: + consumedResources: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: |- + ConsumedResources represents the aggregated usage of resources over time, + with decaying function applied. + The value is populated if usage consumption functionality is enabled in Kueue config. + type: object + lastUpdate: + description: LastUpdate is the time when share and consumed + resources were updated. + format: date-time + type: string + type: object weightedShare: description: |- WeightedShare represents the maximum of the ratios of usage diff --git a/charts/kueue/templates/crd/kueue.x-k8s.io_localqueues.yaml b/charts/kueue/templates/crd/kueue.x-k8s.io_localqueues.yaml index 35e96fd8423..087b38174e1 100644 --- a/charts/kueue/templates/crd/kueue.x-k8s.io_localqueues.yaml +++ b/charts/kueue/templates/crd/kueue.x-k8s.io_localqueues.yaml @@ -80,6 +80,31 @@ spec: x-kubernetes-validations: - message: field is immutable rule: self == oldSelf + fairSharing: + description: |- + fairSharing defines the properties of the LocalQueue when + participating in AdmissionFairSharing. The values are only relevant + if AdmissionFairSharing is enabled in the Kueue configuration. + properties: + weight: + anyOf: + - type: integer + - type: string + default: 1 + description: |- + weight gives a comparative advantage to this ClusterQueue + or Cohort when competing for unused resources in the + Cohort. The share is based on the dominant resource usage + above nominal quotas for each resource, divided by the + weight. Admission prioritizes scheduling workloads from + ClusterQueues and Cohorts with the lowest share and + preempting workloads from the ClusterQueues and Cohorts + with the highest share. A zero weight implies infinite + share value, meaning that this Node will always be at + disadvantage against other ClusterQueues and Cohorts. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + type: object stopPolicy: default: None description: |- @@ -168,6 +193,46 @@ spec: x-kubernetes-list-map-keys: - type x-kubernetes-list-type: map + fairSharing: + description: FairSharing contains the information about the current + status of fair sharing. + properties: + admissionFairSharingStatus: + description: admissionFairSharingStatus represents information + relevant to the Admission Fair Sharing + properties: + consumedResources: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: |- + ConsumedResources represents the aggregated usage of resources over time, + with decaying function applied. + The value is populated if usage consumption functionality is enabled in Kueue config. + type: object + lastUpdate: + description: LastUpdate is the time when share and consumed + resources were updated. + format: date-time + type: string + type: object + weightedShare: + description: |- + WeightedShare represents the maximum of the ratios of usage + above nominal quota to the lendable resources in the + Cohort, among all the resources provided by the Node, and + divided by the weight. If zero, it means that the usage of + the Node is below the nominal quota. If the Node has a + weight of zero and is borrowing, this will return + 9223372036854775807, the maximum possible share value. + format: int64 + type: integer + required: + - weightedShare + type: object flavorUsage: description: |- flavorsUsage are the used quotas, by flavor currently in use by the diff --git a/client-go/applyconfiguration/kueue/v1beta1/admissionfairsharingstatus.go b/client-go/applyconfiguration/kueue/v1beta1/admissionfairsharingstatus.go new file mode 100644 index 00000000000..5eb784ce6f9 --- /dev/null +++ b/client-go/applyconfiguration/kueue/v1beta1/admissionfairsharingstatus.go @@ -0,0 +1,52 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Code generated by applyconfiguration-gen. DO NOT EDIT. + +package v1beta1 + +import ( + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// AdmissionFairSharingStatusApplyConfiguration represents a declarative configuration of the AdmissionFairSharingStatus type for use +// with apply. +type AdmissionFairSharingStatusApplyConfiguration struct { + ConsumedResources *v1.ResourceList `json:"consumedResources,omitempty"` + LastUpdate *metav1.Time `json:"lastUpdate,omitempty"` +} + +// AdmissionFairSharingStatusApplyConfiguration constructs a declarative configuration of the AdmissionFairSharingStatus type for use with +// apply. +func AdmissionFairSharingStatus() *AdmissionFairSharingStatusApplyConfiguration { + return &AdmissionFairSharingStatusApplyConfiguration{} +} + +// WithConsumedResources sets the ConsumedResources field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the ConsumedResources field is set to the value of the last call. +func (b *AdmissionFairSharingStatusApplyConfiguration) WithConsumedResources(value v1.ResourceList) *AdmissionFairSharingStatusApplyConfiguration { + b.ConsumedResources = &value + return b +} + +// WithLastUpdate sets the LastUpdate field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the LastUpdate field is set to the value of the last call. +func (b *AdmissionFairSharingStatusApplyConfiguration) WithLastUpdate(value metav1.Time) *AdmissionFairSharingStatusApplyConfiguration { + b.LastUpdate = &value + return b +} diff --git a/client-go/applyconfiguration/kueue/v1beta1/admissionscope.go b/client-go/applyconfiguration/kueue/v1beta1/admissionscope.go new file mode 100644 index 00000000000..6d23affc278 --- /dev/null +++ b/client-go/applyconfiguration/kueue/v1beta1/admissionscope.go @@ -0,0 +1,42 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Code generated by applyconfiguration-gen. DO NOT EDIT. + +package v1beta1 + +import ( + kueuev1beta1 "sigs.k8s.io/kueue/apis/kueue/v1beta1" +) + +// AdmissionScopeApplyConfiguration represents a declarative configuration of the AdmissionScope type for use +// with apply. +type AdmissionScopeApplyConfiguration struct { + AdmissionMode *kueuev1beta1.AdmissionMode `json:"admissionMode,omitempty"` +} + +// AdmissionScopeApplyConfiguration constructs a declarative configuration of the AdmissionScope type for use with +// apply. +func AdmissionScope() *AdmissionScopeApplyConfiguration { + return &AdmissionScopeApplyConfiguration{} +} + +// WithAdmissionMode sets the AdmissionMode field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the AdmissionMode field is set to the value of the last call. +func (b *AdmissionScopeApplyConfiguration) WithAdmissionMode(value kueuev1beta1.AdmissionMode) *AdmissionScopeApplyConfiguration { + b.AdmissionMode = &value + return b +} diff --git a/client-go/applyconfiguration/kueue/v1beta1/clusterqueuespec.go b/client-go/applyconfiguration/kueue/v1beta1/clusterqueuespec.go index 41a155d831d..62a02cc5ba4 100644 --- a/client-go/applyconfiguration/kueue/v1beta1/clusterqueuespec.go +++ b/client-go/applyconfiguration/kueue/v1beta1/clusterqueuespec.go @@ -35,6 +35,7 @@ type ClusterQueueSpecApplyConfiguration struct { AdmissionChecksStrategy *AdmissionChecksStrategyApplyConfiguration `json:"admissionChecksStrategy,omitempty"` StopPolicy *kueuev1beta1.StopPolicy `json:"stopPolicy,omitempty"` FairSharing *FairSharingApplyConfiguration `json:"fairSharing,omitempty"` + AdmissionScope *AdmissionScopeApplyConfiguration `json:"admissionScope,omitempty"` } // ClusterQueueSpecApplyConfiguration constructs a declarative configuration of the ClusterQueueSpec type for use with @@ -129,3 +130,11 @@ func (b *ClusterQueueSpecApplyConfiguration) WithFairSharing(value *FairSharingA b.FairSharing = value return b } + +// WithAdmissionScope sets the AdmissionScope field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the AdmissionScope field is set to the value of the last call. +func (b *ClusterQueueSpecApplyConfiguration) WithAdmissionScope(value *AdmissionScopeApplyConfiguration) *ClusterQueueSpecApplyConfiguration { + b.AdmissionScope = value + return b +} diff --git a/client-go/applyconfiguration/kueue/v1beta1/fairsharingstatus.go b/client-go/applyconfiguration/kueue/v1beta1/fairsharingstatus.go index da3a5b0004f..1d716ce2e14 100644 --- a/client-go/applyconfiguration/kueue/v1beta1/fairsharingstatus.go +++ b/client-go/applyconfiguration/kueue/v1beta1/fairsharingstatus.go @@ -20,7 +20,8 @@ package v1beta1 // FairSharingStatusApplyConfiguration represents a declarative configuration of the FairSharingStatus type for use // with apply. type FairSharingStatusApplyConfiguration struct { - WeightedShare *int64 `json:"weightedShare,omitempty"` + WeightedShare *int64 `json:"weightedShare,omitempty"` + AdmissionFairSharingStatus *AdmissionFairSharingStatusApplyConfiguration `json:"admissionFairSharingStatus,omitempty"` } // FairSharingStatusApplyConfiguration constructs a declarative configuration of the FairSharingStatus type for use with @@ -36,3 +37,11 @@ func (b *FairSharingStatusApplyConfiguration) WithWeightedShare(value int64) *Fa b.WeightedShare = &value return b } + +// WithAdmissionFairSharingStatus sets the AdmissionFairSharingStatus field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the AdmissionFairSharingStatus field is set to the value of the last call. +func (b *FairSharingStatusApplyConfiguration) WithAdmissionFairSharingStatus(value *AdmissionFairSharingStatusApplyConfiguration) *FairSharingStatusApplyConfiguration { + b.AdmissionFairSharingStatus = value + return b +} diff --git a/client-go/applyconfiguration/kueue/v1beta1/localqueuespec.go b/client-go/applyconfiguration/kueue/v1beta1/localqueuespec.go index 8af741e9aed..c65b91f2bda 100644 --- a/client-go/applyconfiguration/kueue/v1beta1/localqueuespec.go +++ b/client-go/applyconfiguration/kueue/v1beta1/localqueuespec.go @@ -26,6 +26,7 @@ import ( type LocalQueueSpecApplyConfiguration struct { ClusterQueue *kueuev1beta1.ClusterQueueReference `json:"clusterQueue,omitempty"` StopPolicy *kueuev1beta1.StopPolicy `json:"stopPolicy,omitempty"` + FairSharing *FairSharingApplyConfiguration `json:"fairSharing,omitempty"` } // LocalQueueSpecApplyConfiguration constructs a declarative configuration of the LocalQueueSpec type for use with @@ -49,3 +50,11 @@ func (b *LocalQueueSpecApplyConfiguration) WithStopPolicy(value kueuev1beta1.Sto b.StopPolicy = &value return b } + +// WithFairSharing sets the FairSharing field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the FairSharing field is set to the value of the last call. +func (b *LocalQueueSpecApplyConfiguration) WithFairSharing(value *FairSharingApplyConfiguration) *LocalQueueSpecApplyConfiguration { + b.FairSharing = value + return b +} diff --git a/client-go/applyconfiguration/kueue/v1beta1/localqueuestatus.go b/client-go/applyconfiguration/kueue/v1beta1/localqueuestatus.go index 16f7ec661a7..727702d827a 100644 --- a/client-go/applyconfiguration/kueue/v1beta1/localqueuestatus.go +++ b/client-go/applyconfiguration/kueue/v1beta1/localqueuestatus.go @@ -31,6 +31,7 @@ type LocalQueueStatusApplyConfiguration struct { FlavorsReservation []LocalQueueFlavorUsageApplyConfiguration `json:"flavorsReservation,omitempty"` FlavorUsage []LocalQueueFlavorUsageApplyConfiguration `json:"flavorUsage,omitempty"` Flavors []LocalQueueFlavorStatusApplyConfiguration `json:"flavors,omitempty"` + FairSharing *FairSharingStatusApplyConfiguration `json:"fairSharing,omitempty"` } // LocalQueueStatusApplyConfiguration constructs a declarative configuration of the LocalQueueStatus type for use with @@ -114,3 +115,11 @@ func (b *LocalQueueStatusApplyConfiguration) WithFlavors(values ...*LocalQueueFl } return b } + +// WithFairSharing sets the FairSharing field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the FairSharing field is set to the value of the last call. +func (b *LocalQueueStatusApplyConfiguration) WithFairSharing(value *FairSharingStatusApplyConfiguration) *LocalQueueStatusApplyConfiguration { + b.FairSharing = value + return b +} diff --git a/client-go/applyconfiguration/utils.go b/client-go/applyconfiguration/utils.go index 9b153cd165f..8e20e0dc743 100644 --- a/client-go/applyconfiguration/utils.go +++ b/client-go/applyconfiguration/utils.go @@ -59,6 +59,10 @@ func ForKind(kind schema.GroupVersionKind) interface{} { return &kueuev1beta1.AdmissionCheckStatusApplyConfiguration{} case v1beta1.SchemeGroupVersion.WithKind("AdmissionCheckStrategyRule"): return &kueuev1beta1.AdmissionCheckStrategyRuleApplyConfiguration{} + case v1beta1.SchemeGroupVersion.WithKind("AdmissionFairSharingStatus"): + return &kueuev1beta1.AdmissionFairSharingStatusApplyConfiguration{} + case v1beta1.SchemeGroupVersion.WithKind("AdmissionScope"): + return &kueuev1beta1.AdmissionScopeApplyConfiguration{} case v1beta1.SchemeGroupVersion.WithKind("BorrowWithinCohort"): return &kueuev1beta1.BorrowWithinCohortApplyConfiguration{} case v1beta1.SchemeGroupVersion.WithKind("ClusterQueue"): diff --git a/cmd/kueue/main.go b/cmd/kueue/main.go index e132a090c84..5afd52b49db 100644 --- a/cmd/kueue/main.go +++ b/cmd/kueue/main.go @@ -218,6 +218,9 @@ func main() { if cfg.FairSharing != nil { cacheOptions = append(cacheOptions, cache.WithFairSharing(cfg.FairSharing.Enable)) } + if cfg.AdmissionFairSharing != nil { + queueOptions = append(queueOptions, queue.WithAdmissionFairSharing(cfg.AdmissionFairSharing)) + } cCache := cache.New(mgr.GetClient(), cacheOptions...) queues := queue.NewManager(mgr.GetClient(), cCache, queueOptions...) diff --git a/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml b/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml index 6d98229eced..e5d39f68d4d 100644 --- a/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml +++ b/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml @@ -100,6 +100,13 @@ spec: type: object type: array type: object + admissionScope: + description: admissionScope indicates whether ClusterQueue uses the + Admission Fair Sharing + properties: + admissionMode: + type: string + type: object cohort: description: |- cohort that this ClusterQueue belongs to. CQs that belong to the @@ -575,6 +582,28 @@ spec: when participating in Fair Sharing. This is recorded only when Fair Sharing is enabled in the Kueue configuration. properties: + admissionFairSharingStatus: + description: admissionFairSharingStatus represents information + relevant to the Admission Fair Sharing + properties: + consumedResources: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: |- + ConsumedResources represents the aggregated usage of resources over time, + with decaying function applied. + The value is populated if usage consumption functionality is enabled in Kueue config. + type: object + lastUpdate: + description: LastUpdate is the time when share and consumed + resources were updated. + format: date-time + type: string + type: object weightedShare: description: |- WeightedShare represents the maximum of the ratios of usage diff --git a/config/components/crd/bases/kueue.x-k8s.io_cohorts.yaml b/config/components/crd/bases/kueue.x-k8s.io_cohorts.yaml index ac89defd6ae..812c377a56c 100644 --- a/config/components/crd/bases/kueue.x-k8s.io_cohorts.yaml +++ b/config/components/crd/bases/kueue.x-k8s.io_cohorts.yaml @@ -237,6 +237,28 @@ spec: when participating in Fair Sharing. The is recorded only when Fair Sharing is enabled in the Kueue configuration. properties: + admissionFairSharingStatus: + description: admissionFairSharingStatus represents information + relevant to the Admission Fair Sharing + properties: + consumedResources: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: |- + ConsumedResources represents the aggregated usage of resources over time, + with decaying function applied. + The value is populated if usage consumption functionality is enabled in Kueue config. + type: object + lastUpdate: + description: LastUpdate is the time when share and consumed + resources were updated. + format: date-time + type: string + type: object weightedShare: description: |- WeightedShare represents the maximum of the ratios of usage diff --git a/config/components/crd/bases/kueue.x-k8s.io_localqueues.yaml b/config/components/crd/bases/kueue.x-k8s.io_localqueues.yaml index c0ff3951c5d..79d8db5c203 100644 --- a/config/components/crd/bases/kueue.x-k8s.io_localqueues.yaml +++ b/config/components/crd/bases/kueue.x-k8s.io_localqueues.yaml @@ -65,6 +65,31 @@ spec: x-kubernetes-validations: - message: field is immutable rule: self == oldSelf + fairSharing: + description: |- + fairSharing defines the properties of the LocalQueue when + participating in AdmissionFairSharing. The values are only relevant + if AdmissionFairSharing is enabled in the Kueue configuration. + properties: + weight: + anyOf: + - type: integer + - type: string + default: 1 + description: |- + weight gives a comparative advantage to this ClusterQueue + or Cohort when competing for unused resources in the + Cohort. The share is based on the dominant resource usage + above nominal quotas for each resource, divided by the + weight. Admission prioritizes scheduling workloads from + ClusterQueues and Cohorts with the lowest share and + preempting workloads from the ClusterQueues and Cohorts + with the highest share. A zero weight implies infinite + share value, meaning that this Node will always be at + disadvantage against other ClusterQueues and Cohorts. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + type: object stopPolicy: default: None description: |- @@ -153,6 +178,46 @@ spec: x-kubernetes-list-map-keys: - type x-kubernetes-list-type: map + fairSharing: + description: FairSharing contains the information about the current + status of fair sharing. + properties: + admissionFairSharingStatus: + description: admissionFairSharingStatus represents information + relevant to the Admission Fair Sharing + properties: + consumedResources: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: |- + ConsumedResources represents the aggregated usage of resources over time, + with decaying function applied. + The value is populated if usage consumption functionality is enabled in Kueue config. + type: object + lastUpdate: + description: LastUpdate is the time when share and consumed + resources were updated. + format: date-time + type: string + type: object + weightedShare: + description: |- + WeightedShare represents the maximum of the ratios of usage + above nominal quota to the lendable resources in the + Cohort, among all the resources provided by the Node, and + divided by the weight. If zero, it means that the usage of + the Node is below the nominal quota. If the Node has a + weight of zero and is borrowing, this will return + 9223372036854775807, the maximum possible share value. + format: int64 + type: integer + required: + - weightedShare + type: object flavorUsage: description: |- flavorsUsage are the used quotas, by flavor currently in use by the diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index a92572f1fcc..96cf81ae3b7 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -501,6 +501,19 @@ func (c *Cache) DeleteLocalQueue(q *kueue.LocalQueue) { cq.deleteLocalQueue(q) } +func (c *Cache) GetCacheLocalQueue(cqName kueue.ClusterQueueReference, lq *kueue.LocalQueue) (*LocalQueue, error) { + c.Lock() + defer c.Unlock() + cq := c.hm.ClusterQueue(cqName) + if cq == nil { + return nil, ErrCqNotFound + } + if cacheLq, ok := cq.localQueues[queueKey(lq)]; ok { + return cacheLq, nil + } + return nil, errQNotFound +} + func (c *Cache) UpdateLocalQueue(oldQ, newQ *kueue.LocalQueue) error { if oldQ.Spec.ClusterQueue == newQ.Spec.ClusterQueue { return nil diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index 3f1295b3d6c..e150e52fea5 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "sync" "testing" "github.com/google/go-cmp/cmp" @@ -2736,7 +2737,7 @@ func TestCacheQueueOperations(t *testing.T) { cacheQueues[qKey] = cacheQ } } - if diff := cmp.Diff(tc.wantLocalQueues, cacheQueues, cmp.AllowUnexported(LocalQueue{}), cmpopts.EquateEmpty()); diff != "" { + if diff := cmp.Diff(tc.wantLocalQueues, cacheQueues, cmp.AllowUnexported(LocalQueue{}), cmpopts.EquateEmpty(), cmpopts.IgnoreTypes(sync.RWMutex{})); diff != "" { t.Errorf("Unexpected localQueues (-want,+got):\n%s", diff) } }) diff --git a/pkg/cache/clusterqueue.go b/pkg/cache/clusterqueue.go index c571f5341ce..fc69749f25d 100644 --- a/pkg/cache/clusterqueue.go +++ b/pkg/cache/clusterqueue.go @@ -485,7 +485,7 @@ func (c *clusterQueue) updateWorkloadUsage(wi *workload.Info, m int64) { updateFlavorUsage(frUsage, lq.totalReserved, m) lq.reservingWorkloads += int(m) if admitted { - updateFlavorUsage(frUsage, lq.admittedUsage, m) + lq.UpdateAdmittedUsage(frUsage, m) lq.admittedWorkloads += int(m) } if features.Enabled(features.LocalQueueMetrics) { @@ -529,7 +529,7 @@ func (c *clusterQueue) addLocalQueue(q *kueue.LocalQueue) error { updateFlavorUsage(frq, qImpl.totalReserved, 1) qImpl.reservingWorkloads++ if workload.IsAdmitted(wl.Obj) { - updateFlavorUsage(frq, qImpl.admittedUsage, 1) + qImpl.UpdateAdmittedUsage(frq, 1) qImpl.admittedWorkloads++ } } @@ -566,6 +566,8 @@ func (c *clusterQueue) flavorInUse(flavor kueue.ResourceFlavorReference) bool { func (q *LocalQueue) resetFlavorsAndResources(cqUsage resources.FlavorResourceQuantities, cqAdmittedUsage resources.FlavorResourceQuantities) { // Clean up removed flavors or resources. + q.Lock() + defer q.Unlock() q.totalReserved = resetUsage(q.totalReserved, cqUsage) q.admittedUsage = resetUsage(q.admittedUsage, cqAdmittedUsage) } diff --git a/pkg/cache/localqueue.go b/pkg/cache/localqueue.go index 5c1ad1d7720..0faab60c597 100644 --- a/pkg/cache/localqueue.go +++ b/pkg/cache/localqueue.go @@ -17,14 +17,31 @@ limitations under the License. package cache import ( + "sync" + + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/kueue/pkg/queue" "sigs.k8s.io/kueue/pkg/resources" ) type LocalQueue struct { + sync.RWMutex key queue.LocalQueueReference reservingWorkloads int admittedWorkloads int totalReserved resources.FlavorResourceQuantities admittedUsage resources.FlavorResourceQuantities } + +func (lq *LocalQueue) GetAdmittedUsage() corev1.ResourceList { + lq.RLock() + defer lq.RUnlock() + return lq.admittedUsage.FlattenFlavors().ToResourceList() +} + +func (lq *LocalQueue) UpdateAdmittedUsage(usage resources.FlavorResourceQuantities, op int64) { + lq.Lock() + defer lq.Unlock() + updateFlavorUsage(usage, lq.admittedUsage, op) +} diff --git a/pkg/controller/core/core.go b/pkg/controller/core/core.go index 5658abed3e9..3647d00d3d5 100644 --- a/pkg/controller/core/core.go +++ b/pkg/controller/core/core.go @@ -43,7 +43,8 @@ func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache if err := acRec.SetupWithManager(mgr, cfg); err != nil { return "AdmissionCheck", err } - qRec := NewLocalQueueReconciler(mgr.GetClient(), qManager, cc) + qRec := NewLocalQueueReconciler(mgr.GetClient(), qManager, cc, + WithAdmissionFairSharingConfig(cfg.AdmissionFairSharing)) if err := qRec.SetupWithManager(mgr, cfg); err != nil { return "LocalQueue", err } diff --git a/pkg/controller/core/localqueue_controller.go b/pkg/controller/core/localqueue_controller.go index f55fa3f9e53..6b68815aee5 100644 --- a/pkg/controller/core/localqueue_controller.go +++ b/pkg/controller/core/localqueue_controller.go @@ -18,8 +18,10 @@ package core import ( "context" + "math" "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -27,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/clock" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" @@ -60,13 +63,39 @@ const ( clusterQueueIsInactiveReason = "ClusterQueueIsInactive" ) +type LocalQueueReconcilerOptions struct { + admissionFSConfig *config.AdmissionFairSharing + clock clock.Clock +} + +// LocalQueueReconcilerOption configures the reconciler. +type LocalQueueReconcilerOption func(*LocalQueueReconcilerOptions) + +func WithAdmissionFairSharingConfig(cfg *config.AdmissionFairSharing) LocalQueueReconcilerOption { + return func(o *LocalQueueReconcilerOptions) { + o.admissionFSConfig = cfg + } +} + +func WithClock(c clock.Clock) LocalQueueReconcilerOption { + return func(o *LocalQueueReconcilerOptions) { + o.clock = c + } +} + +var defaultLQOptions = LocalQueueReconcilerOptions{ + clock: realClock, +} + // LocalQueueReconciler reconciles a LocalQueue object type LocalQueueReconciler struct { - client client.Client - log logr.Logger - queues *queue.Manager - cache *cache.Cache - wlUpdateCh chan event.GenericEvent + client client.Client + log logr.Logger + queues *queue.Manager + cache *cache.Cache + wlUpdateCh chan event.GenericEvent + admissionFSConfig *config.AdmissionFairSharing + clock clock.Clock } var _ reconcile.Reconciler = (*LocalQueueReconciler)(nil) @@ -76,13 +105,20 @@ func NewLocalQueueReconciler( client client.Client, queues *queue.Manager, cache *cache.Cache, + opts ...LocalQueueReconcilerOption, ) *LocalQueueReconciler { + options := defaultLQOptions + for _, opt := range opts { + opt(&options) + } return &LocalQueueReconciler{ - log: ctrl.Log.WithName("localqueue-reconciler"), - queues: queues, - cache: cache, - client: client, - wlUpdateCh: make(chan event.GenericEvent, updateChBuffer), + log: ctrl.Log.WithName("localqueue-reconciler"), + queues: queues, + cache: cache, + client: client, + wlUpdateCh: make(chan event.GenericEvent, updateChBuffer), + admissionFSConfig: options.admissionFSConfig, + clock: options.clock, } } @@ -120,19 +156,36 @@ func (r *LocalQueueReconciler) Reconcile(ctx context.Context, req ctrl.Request) } var cq kueue.ClusterQueue - err := r.client.Get(ctx, client.ObjectKey{Name: string(queueObj.Spec.ClusterQueue)}, &cq) - if err != nil { + if err := r.client.Get(ctx, client.ObjectKey{Name: string(queueObj.Spec.ClusterQueue)}, &cq); err != nil { if apierrors.IsNotFound(err) { err = r.UpdateStatusIfChanged(ctx, &queueObj, metav1.ConditionFalse, "ClusterQueueDoesNotExist", clusterQueueIsInactiveMsg) } return ctrl.Result{}, client.IgnoreNotFound(err) } if meta.IsStatusConditionTrue(cq.Status.Conditions, kueue.ClusterQueueActive) { - err = r.UpdateStatusIfChanged(ctx, &queueObj, metav1.ConditionTrue, "Ready", "Can submit new workloads to localQueue") + if err := r.UpdateStatusIfChanged(ctx, &queueObj, metav1.ConditionTrue, "Ready", "Can submit new workloads to localQueue"); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } + } else { + err := r.UpdateStatusIfChanged(ctx, &queueObj, metav1.ConditionFalse, clusterQueueIsInactiveReason, clusterQueueIsInactiveMsg) return ctrl.Result{}, client.IgnoreNotFound(err) } - err = r.UpdateStatusIfChanged(ctx, &queueObj, metav1.ConditionFalse, clusterQueueIsInactiveReason, clusterQueueIsInactiveMsg) - return ctrl.Result{}, client.IgnoreNotFound(err) + + if r.admissionFSConfig != nil && features.Enabled(features.AdmissionFairSharing) { + updated := r.initializeAdmissionFsStatus(ctx, &queueObj) + sinceLastUpdate := r.clock.Now().Sub(queueObj.Status.FairSharing.AdmissionFairSharingStatus.LastUpdate.Time) + if interval := r.admissionFSConfig.UsageSamplingInterval.Duration; !updated && sinceLastUpdate < interval { + return ctrl.Result{RequeueAfter: interval - sinceLastUpdate}, nil + } + if err := r.reconcileConsumedUsage(ctx, &queueObj, queueObj.Spec.ClusterQueue); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } + if err := r.queues.HeapifyClusterQueue(&cq, queueObj.Name); err != nil { + return ctrl.Result{}, err + } + return ctrl.Result{RequeueAfter: r.admissionFSConfig.UsageSamplingInterval.Duration}, nil + } + return ctrl.Result{}, nil } func (r *LocalQueueReconciler) Create(e event.TypedCreateEvent[*kueue.LocalQueue]) bool { @@ -204,6 +257,49 @@ func (r *LocalQueueReconciler) Update(e event.TypedUpdateEvent[*kueue.LocalQueue return true } +func (r *LocalQueueReconciler) initializeAdmissionFsStatus(ctx context.Context, lq *kueue.LocalQueue) bool { + if lq.Status.FairSharing == nil { + lq.Status.FairSharing = &kueue.FairSharingStatus{} + } + if lq.Status.FairSharing.AdmissionFairSharingStatus == nil { + lq.Status.FairSharing.AdmissionFairSharingStatus = &kueue.AdmissionFairSharingStatus{ + LastUpdate: metav1.NewTime(r.clock.Now()), + } + return true + } + return false +} + +func (r *LocalQueueReconciler) reconcileConsumedUsage(ctx context.Context, lq *kueue.LocalQueue, cqName kueue.ClusterQueueReference) error { + halfLifeTime := r.admissionFSConfig.UsageHalfLifeTime.Seconds() + + // reset usage to 0 if halfLife is 0 + if halfLifeTime == 0 { + return r.updateAdmissionFsStatus(ctx, lq, corev1.ResourceList{}) + } + cacheLq, err := r.cache.GetCacheLocalQueue(cqName, lq) + if err != nil { + return err + } + // calculate alpha rate + oldUsage := lq.Status.FairSharing.AdmissionFairSharingStatus.ConsumedResources + newUsage := cacheLq.GetAdmittedUsage() + timeSinceLastUpdate := r.clock.Now().Sub(lq.Status.FairSharing.AdmissionFairSharingStatus.LastUpdate.Time).Seconds() + alpha := 1.0 - math.Pow(0.5, timeSinceLastUpdate/halfLifeTime) + // calculate weighted average of old and new usage + scaledNewUsage := resource.MulByFloat(newUsage, alpha) + scaledOldUsage := resource.MulByFloat(oldUsage, 1-alpha) + sum := resource.MergeResourceListKeepSum(scaledOldUsage, scaledNewUsage) + // update status + return r.updateAdmissionFsStatus(ctx, lq, sum) +} + +func (r *LocalQueueReconciler) updateAdmissionFsStatus(ctx context.Context, lq *kueue.LocalQueue, consumedResources corev1.ResourceList) error { + lq.Status.FairSharing.AdmissionFairSharingStatus.ConsumedResources = consumedResources + lq.Status.FairSharing.AdmissionFairSharingStatus.LastUpdate = metav1.NewTime(r.clock.Now()) + return r.client.Status().Update(ctx, lq) +} + func localQueueReferenceFromLocalQueue(lq *kueue.LocalQueue) metrics.LocalQueueReference { return metrics.LocalQueueReference{ Name: kueue.LocalQueueName(lq.Name), diff --git a/pkg/controller/core/localqueue_controller_test.go b/pkg/controller/core/localqueue_controller_test.go index 001c7bb4925..67c2ae21844 100644 --- a/pkg/controller/core/localqueue_controller_test.go +++ b/pkg/controller/core/localqueue_controller_test.go @@ -19,28 +19,39 @@ package core import ( "context" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + testingclock "k8s.io/utils/clock/testing" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/interceptor" "sigs.k8s.io/controller-runtime/pkg/reconcile" + config "sigs.k8s.io/kueue/apis/config/v1beta1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/cache" + "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/queue" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" "sigs.k8s.io/kueue/test/util" ) func TestLocalQueueReconcile(t *testing.T) { + clock := testingclock.NewFakeClock(time.Now().Truncate(time.Second)) cases := map[string]struct { - clusterQueue *kueue.ClusterQueue - localQueue *kueue.LocalQueue - wantLocalQueue *kueue.LocalQueue - wantError error + clusterQueue *kueue.ClusterQueue + localQueue *kueue.LocalQueue + wantLocalQueue *kueue.LocalQueue + wantError error + afsConfig *config.AdmissionFairSharing + runningWls []kueue.Workload + wantRequeueAfter *time.Duration }{ "local queue with Hold StopPolicy": { clusterQueue: utiltesting.MakeClusterQueue("test-cluster-queue"). @@ -112,6 +123,380 @@ func TestLocalQueueReconcile(t *testing.T) { Obj(), wantError: nil, }, + "local queue decaying usage decays if there is no running workloads": { + clusterQueue: utiltesting.MakeClusterQueue("cq"). + Active(metav1.ConditionTrue). + Obj(), + localQueue: utiltesting.MakeLocalQueue("test-queue", "default"). + ClusterQueue("cq"). + Active(metav1.ConditionTrue). + FairSharing(&kueue.FairSharing{ + Weight: ptr.To(resource.MustParse("1")), + }). + FairSharingStatus( + &kueue.FairSharingStatus{ + AdmissionFairSharingStatus: &kueue.AdmissionFairSharingStatus{ + LastUpdate: metav1.NewTime(clock.Now().Add(-5 * time.Minute)), + ConsumedResources: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("8")}, + }, + }). + Obj(), + wantLocalQueue: utiltesting.MakeLocalQueue("test-queue", "default"). + ClusterQueue("cq"). + Active(metav1.ConditionTrue). + FairSharing(&kueue.FairSharing{ + Weight: ptr.To(resource.MustParse("1")), + }). + FairSharingStatus( + &kueue.FairSharingStatus{ + AdmissionFairSharingStatus: &kueue.AdmissionFairSharingStatus{ + ConsumedResources: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("4"), + }, + }, + }). + Obj(), + afsConfig: &config.AdmissionFairSharing{ + UsageHalfLifeTime: metav1.Duration{Duration: 5 * time.Minute}, + UsageSamplingInterval: metav1.Duration{Duration: 5 * time.Minute}, + }, + }, + "local queue decaying usage sums the previous state and running workloads": { + clusterQueue: utiltesting.MakeClusterQueue("cq"). + Active(metav1.ConditionTrue). + Obj(), + localQueue: utiltesting.MakeLocalQueue("lq", "default"). + ClusterQueue("cq"). + Active(metav1.ConditionTrue). + FairSharing(&kueue.FairSharing{ + Weight: ptr.To(resource.MustParse("1")), + }). + FairSharingStatus( + &kueue.FairSharingStatus{ + AdmissionFairSharingStatus: &kueue.AdmissionFairSharingStatus{ + LastUpdate: metav1.NewTime(clock.Now().Add(-5 * time.Minute)), + ConsumedResources: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("8")}, + }, + }). + Obj(), + wantLocalQueue: utiltesting.MakeLocalQueue("lq", "default"). + ClusterQueue("cq"). + Active(metav1.ConditionTrue). + ReservingWorkloads(1). + AdmittedWorkloads(1). + FairSharing(&kueue.FairSharing{ + Weight: ptr.To(resource.MustParse("1")), + }). + FairSharingStatus( + &kueue.FairSharingStatus{ + AdmissionFairSharingStatus: &kueue.AdmissionFairSharingStatus{ + ConsumedResources: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("6"), + }, + }, + }). + Obj(), + runningWls: []kueue.Workload{ + *utiltesting.MakeWorkload("wl", "default"). + Queue("lq"). + Request(corev1.ResourceCPU, "4"). + SimpleReserveQuota("cq", "rf", clock.Now()). + Admitted(true). + Obj(), + }, + afsConfig: &config.AdmissionFairSharing{ + UsageHalfLifeTime: metav1.Duration{Duration: 5 * time.Minute}, + UsageSamplingInterval: metav1.Duration{Duration: 5 * time.Minute}, + }, + }, + "local queue decaying usage sums the usage from different flavors and resources": { + clusterQueue: utiltesting.MakeClusterQueue("cq"). + Active(metav1.ConditionTrue). + Obj(), + localQueue: utiltesting.MakeLocalQueue("lq", "default"). + ClusterQueue("cq"). + Active(metav1.ConditionTrue). + FairSharing(&kueue.FairSharing{ + Weight: ptr.To(resource.MustParse("1")), + }). + FairSharingStatus( + &kueue.FairSharingStatus{ + AdmissionFairSharingStatus: &kueue.AdmissionFairSharingStatus{ + LastUpdate: metav1.NewTime(clock.Now().Add(-5 * time.Minute)), + ConsumedResources: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("8"), + "GPU": resource.MustParse("16"), + }, + }, + }). + Obj(), + wantLocalQueue: utiltesting.MakeLocalQueue("lq", "default"). + ClusterQueue("cq"). + Active(metav1.ConditionTrue). + ReservingWorkloads(3). + AdmittedWorkloads(3). + FairSharing(&kueue.FairSharing{ + Weight: ptr.To(resource.MustParse("1")), + }). + FairSharingStatus( + &kueue.FairSharingStatus{ + AdmissionFairSharingStatus: &kueue.AdmissionFairSharingStatus{ + ConsumedResources: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("6"), + "GPU": resource.MustParse("10"), + }, + }, + }). + Obj(), + runningWls: []kueue.Workload{ + *utiltesting.MakeWorkload("wl-1", "default"). + Queue("lq"). + Request(corev1.ResourceCPU, "2"). + SimpleReserveQuota("cq", "rf-1", clock.Now()). + Admitted(true). + Obj(), + *utiltesting.MakeWorkload("wl-2", "default"). + Queue("lq"). + Request(corev1.ResourceCPU, "2"). + SimpleReserveQuota("cq", "rf-2", clock.Now()). + Admitted(true). + Obj(), + *utiltesting.MakeWorkload("wl-3", "default"). + Queue("lq"). + Request("GPU", "4"). + SimpleReserveQuota("cq", "rf-3", clock.Now()). + Admitted(true). + Obj(), + }, + afsConfig: &config.AdmissionFairSharing{ + UsageHalfLifeTime: metav1.Duration{Duration: 5 * time.Minute}, + UsageSamplingInterval: metav1.Duration{Duration: 5 * time.Minute}, + }, + }, + "local queue decaying usage sums the previous state and running workloads half time twice larger than sampling": { + clusterQueue: utiltesting.MakeClusterQueue("cq"). + Active(metav1.ConditionTrue). + Obj(), + localQueue: utiltesting.MakeLocalQueue("lq", "default"). + ClusterQueue("cq"). + Active(metav1.ConditionTrue). + FairSharing(&kueue.FairSharing{ + Weight: ptr.To(resource.MustParse("1")), + }). + FairSharingStatus( + &kueue.FairSharingStatus{ + AdmissionFairSharingStatus: &kueue.AdmissionFairSharingStatus{ + LastUpdate: metav1.NewTime(clock.Now().Add(-5 * time.Minute)), + ConsumedResources: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("8")}, + }, + }). + Obj(), + wantLocalQueue: utiltesting.MakeLocalQueue("lq", "default"). + ClusterQueue("cq"). + Active(metav1.ConditionTrue). + FairSharing(&kueue.FairSharing{ + Weight: ptr.To(resource.MustParse("1")), + }). + ReservingWorkloads(1). + AdmittedWorkloads(1). + FairSharingStatus( + &kueue.FairSharingStatus{ + AdmissionFairSharingStatus: &kueue.AdmissionFairSharingStatus{ + ConsumedResources: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("6827m"), + }, + }, + }). + Obj(), + runningWls: []kueue.Workload{ + *utiltesting.MakeWorkload("wl", "default"). + Queue("lq"). + Request(corev1.ResourceCPU, "4"). + SimpleReserveQuota("cq", "rf", clock.Now()). + Admitted(true). + Obj(), + }, + afsConfig: &config.AdmissionFairSharing{ + UsageHalfLifeTime: metav1.Duration{Duration: 10 * time.Minute}, + UsageSamplingInterval: metav1.Duration{Duration: 5 * time.Minute}, + }, + }, + "local queue decaying usage sums the previous state and running workloads with long half time": { + clusterQueue: utiltesting.MakeClusterQueue("cq"). + Active(metav1.ConditionTrue). + Obj(), + localQueue: utiltesting.MakeLocalQueue("lq", "default"). + ClusterQueue("cq"). + Active(metav1.ConditionTrue). + FairSharing(&kueue.FairSharing{ + Weight: ptr.To(resource.MustParse("1")), + }). + FairSharingStatus( + &kueue.FairSharingStatus{ + AdmissionFairSharingStatus: &kueue.AdmissionFairSharingStatus{ + LastUpdate: metav1.NewTime(clock.Now().Add(-5 * time.Minute)), + ConsumedResources: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("8")}, + }, + }). + Obj(), + wantLocalQueue: utiltesting.MakeLocalQueue("lq", "default"). + ClusterQueue("cq"). + Active(metav1.ConditionTrue). + FairSharing(&kueue.FairSharing{ + Weight: ptr.To(resource.MustParse("1")), + }). + FairSharingStatus( + &kueue.FairSharingStatus{ + AdmissionFairSharingStatus: &kueue.AdmissionFairSharingStatus{ + ConsumedResources: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("7980m"), + }, + }, + }). + Obj(), + afsConfig: &config.AdmissionFairSharing{ + UsageHalfLifeTime: metav1.Duration{Duration: 24 * time.Hour}, + UsageSamplingInterval: metav1.Duration{Duration: 5 * time.Minute}, + }, + }, + "local queue decaying usage sums the previous state and running GPU workloads half time twice larger than sampling": { + clusterQueue: utiltesting.MakeClusterQueue("cq"). + Active(metav1.ConditionTrue). + Obj(), + localQueue: utiltesting.MakeLocalQueue("lq", "default"). + ClusterQueue("cq"). + Active(metav1.ConditionTrue). + FairSharing(&kueue.FairSharing{ + Weight: ptr.To(resource.MustParse("1")), + }). + FairSharingStatus( + &kueue.FairSharingStatus{ + AdmissionFairSharingStatus: &kueue.AdmissionFairSharingStatus{ + LastUpdate: metav1.NewTime(clock.Now().Add(-5 * time.Minute)), + ConsumedResources: map[corev1.ResourceName]resource.Quantity{ + "GPU": resource.MustParse("8")}, + }, + }). + Obj(), + wantLocalQueue: utiltesting.MakeLocalQueue("lq", "default"). + ClusterQueue("cq"). + Active(metav1.ConditionTrue). + FairSharing(&kueue.FairSharing{ + Weight: ptr.To(resource.MustParse("1")), + }). + ReservingWorkloads(1). + AdmittedWorkloads(1). + FairSharingStatus( + &kueue.FairSharingStatus{ + AdmissionFairSharingStatus: &kueue.AdmissionFairSharingStatus{ + ConsumedResources: map[corev1.ResourceName]resource.Quantity{ + "GPU": resource.MustParse("6827m"), + }, + }, + }). + Obj(), + runningWls: []kueue.Workload{ + *utiltesting.MakeWorkload("wl", "default"). + Queue("lq"). + Request("GPU", "4"). + SimpleReserveQuota("cq", "rf", clock.Now()). + Admitted(true). + Obj(), + }, + afsConfig: &config.AdmissionFairSharing{ + UsageHalfLifeTime: metav1.Duration{Duration: 10 * time.Minute}, + UsageSamplingInterval: metav1.Duration{Duration: 5 * time.Minute}, + }, + }, + "local queue decaying usage resets to 0 when half life is 0": { + clusterQueue: utiltesting.MakeClusterQueue("cq"). + Active(metav1.ConditionTrue). + Obj(), + localQueue: utiltesting.MakeLocalQueue("lq", "default"). + ClusterQueue("cq"). + Active(metav1.ConditionTrue). + FairSharing(&kueue.FairSharing{ + Weight: ptr.To(resource.MustParse("1")), + }). + FairSharingStatus( + &kueue.FairSharingStatus{ + AdmissionFairSharingStatus: &kueue.AdmissionFairSharingStatus{ + LastUpdate: metav1.NewTime(clock.Now().Add(-5 * time.Minute)), + ConsumedResources: map[corev1.ResourceName]resource.Quantity{ + "GPU": resource.MustParse("8")}, + }, + }). + Obj(), + wantLocalQueue: utiltesting.MakeLocalQueue("lq", "default"). + ClusterQueue("cq"). + Active(metav1.ConditionTrue). + FairSharing(&kueue.FairSharing{ + Weight: ptr.To(resource.MustParse("1")), + }). + ReservingWorkloads(1). + AdmittedWorkloads(1). + FairSharingStatus( + &kueue.FairSharingStatus{ + AdmissionFairSharingStatus: &kueue.AdmissionFairSharingStatus{}, + }). + Obj(), + runningWls: []kueue.Workload{ + *utiltesting.MakeWorkload("wl", "default"). + Queue("lq"). + Request("GPU", "4"). + SimpleReserveQuota("cq", "rf", clock.Now()). + Admitted(true). + Obj(), + }, + afsConfig: &config.AdmissionFairSharing{ + UsageHalfLifeTime: metav1.Duration{Duration: 0 * time.Minute}, + UsageSamplingInterval: metav1.Duration{Duration: 5 * time.Minute}, + }, + }, + "local queue decaying usage is not reconciled if not enough time has passed": { + clusterQueue: utiltesting.MakeClusterQueue("cq"). + Active(metav1.ConditionTrue). + Obj(), + localQueue: utiltesting.MakeLocalQueue("test-queue", "default"). + ClusterQueue("cq"). + Active(metav1.ConditionTrue). + FairSharing(&kueue.FairSharing{ + Weight: ptr.To(resource.MustParse("1")), + }). + FairSharingStatus( + &kueue.FairSharingStatus{ + AdmissionFairSharingStatus: &kueue.AdmissionFairSharingStatus{ + LastUpdate: metav1.NewTime(clock.Now().Add(-4 * time.Minute)), + ConsumedResources: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("8")}, + }, + }). + Obj(), + wantLocalQueue: utiltesting.MakeLocalQueue("test-queue", "default"). + ClusterQueue("cq"). + Active(metav1.ConditionTrue). + FairSharing(&kueue.FairSharing{ + Weight: ptr.To(resource.MustParse("1")), + }). + FairSharingStatus( + &kueue.FairSharingStatus{ + AdmissionFairSharingStatus: &kueue.AdmissionFairSharingStatus{ + LastUpdate: metav1.NewTime(clock.Now().Add(-4 * time.Minute)), + ConsumedResources: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("8")}, + }, + }). + Obj(), + wantRequeueAfter: ptr.To(time.Minute), + afsConfig: &config.AdmissionFairSharing{ + UsageHalfLifeTime: metav1.Duration{Duration: 5 * time.Minute}, + UsageSamplingInterval: metav1.Duration{Duration: 5 * time.Minute}, + }, + }, } for name, tc := range cases { @@ -120,25 +505,43 @@ func TestLocalQueueReconcile(t *testing.T) { tc.clusterQueue, tc.localQueue, } + features.SetFeatureGateDuringTest(t, features.AdmissionFairSharing, true) cl := utiltesting.NewClientBuilder(). WithObjects(objs...). WithStatusSubresource(objs...). WithInterceptorFuncs(interceptor.Funcs{SubResourcePatch: utiltesting.TreatSSAAsStrategicMerge}). Build() + ctxWithLogger, _ := utiltesting.ContextWithLog(t) cqCache := cache.New(cl) + if err := cqCache.AddClusterQueue(ctxWithLogger, tc.clusterQueue); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + _ = cqCache.AddLocalQueue(tc.localQueue) + for _, wl := range tc.runningWls { + cqCache.AddOrUpdateWorkload(&wl) + } qManager := queue.NewManager(cl, cqCache) - ctxWithLogger, _ := utiltesting.ContextWithLog(t) + if err := qManager.AddClusterQueue(ctxWithLogger, tc.clusterQueue); err != nil { + t.Fatalf("Unexpected error: %v", err) + } _ = qManager.AddLocalQueue(ctxWithLogger, tc.localQueue) - reconciler := NewLocalQueueReconciler(cl, qManager, cqCache) + reconciler := NewLocalQueueReconciler(cl, qManager, cqCache, + WithClock(clock), + WithAdmissionFairSharingConfig(tc.afsConfig)) ctx, ctxCancel := context.WithCancel(ctxWithLogger) defer ctxCancel() - _, gotError := reconciler.Reconcile( + result, gotError := reconciler.Reconcile( ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(tc.localQueue)}, ) + if tc.wantRequeueAfter != nil { + if diff := cmp.Diff(*tc.wantRequeueAfter, result.RequeueAfter); diff != "" { + t.Errorf("unexpected reconcile requeue after (-want/+got):\n%s", diff) + } + } if diff := cmp.Diff(tc.wantError, gotError); diff != "" { t.Errorf("unexpected reconcile error (-want/+got):\n%s", diff) @@ -156,6 +559,7 @@ func TestLocalQueueReconcile(t *testing.T) { cmpopts.EquateEmpty(), util.IgnoreConditionTimestamps, util.IgnoreObjectMetaResourceVersion, + cmpopts.IgnoreFields(kueue.AdmissionFairSharingStatus{}, "LastUpdate"), } if diff := cmp.Diff(tc.wantLocalQueue, gotLocalQueue, cmpOpts...); diff != "" { t.Errorf("Workloads after reconcile (-want,+got):\n%s", diff) diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 2c9ca6c1b65..622ce26db36 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -157,6 +157,12 @@ const ( // // Enable hierarchical cohorts HierarchicalCohorts featuregate.Feature = "HierarchicalCohorts" + + // owner: @pbundyra + // kep: https://github.com/kubernetes-sigs/kueue/tree/main/keps/4136-admission-fair-sharing + // + // Enable admission fair sharing + AdmissionFairSharing featuregate.Feature = "AdmissionFairSharing" ) func init() { @@ -243,6 +249,9 @@ var defaultVersionedFeatureGates = map[featuregate.Feature]featuregate.Versioned HierarchicalCohorts: { {Version: version.MustParse("0.11"), Default: true, PreRelease: featuregate.Beta}, }, + AdmissionFairSharing: { + {Version: version.MustParse("0.12"), Default: false, PreRelease: featuregate.Alpha}, + }, } func SetFeatureGateDuringTest(tb testing.TB, f featuregate.Feature, value bool) { diff --git a/pkg/queue/cluster_queue.go b/pkg/queue/cluster_queue.go index 944a75259a0..5ac08be6fd7 100644 --- a/pkg/queue/cluster_queue.go +++ b/pkg/queue/cluster_queue.go @@ -28,9 +28,12 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/utils/clock" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + config "sigs.k8s.io/kueue/apis/config/v1beta1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/hierarchy" "sigs.k8s.io/kueue/pkg/util/heap" utilpriority "sigs.k8s.io/kueue/pkg/util/priority" @@ -89,8 +92,9 @@ func workloadKey(i *workload.Info) string { return workload.Key(i.Obj) } -func newClusterQueue(cq *kueue.ClusterQueue, wo workload.Ordering) (*ClusterQueue, error) { - cqImpl := newClusterQueueImpl(wo, realClock) +func newClusterQueue(ctx context.Context, client client.Client, cq *kueue.ClusterQueue, wo workload.Ordering, afsConfig *config.AdmissionFairSharing) (*ClusterQueue, error) { + enableAdmissionFs, fsResWeights := afsResourceWeights(cq, afsConfig) + cqImpl := newClusterQueueImpl(ctx, client, wo, realClock, fsResWeights, enableAdmissionFs) err := cqImpl.Update(cq) if err != nil { return nil, err @@ -98,8 +102,17 @@ func newClusterQueue(cq *kueue.ClusterQueue, wo workload.Ordering) (*ClusterQueu return cqImpl, nil } -func newClusterQueueImpl(wo workload.Ordering, clock clock.Clock) *ClusterQueue { - lessFunc := queueOrderingFunc(wo) +func afsResourceWeights(cq *kueue.ClusterQueue, afsConfig *config.AdmissionFairSharing) (bool, map[corev1.ResourceName]float64) { + enableAdmissionFs, fsResWeights := false, make(map[corev1.ResourceName]float64) + if afsConfig != nil && cq.Spec.AdmissionScope != nil && cq.Spec.AdmissionScope.AdmissionMode == kueue.UsageBasedAdmissionFairSharing && features.Enabled(features.AdmissionFairSharing) { + enableAdmissionFs = true + fsResWeights = afsConfig.ResourceWeights + } + return enableAdmissionFs, fsResWeights +} + +func newClusterQueueImpl(ctx context.Context, client client.Client, wo workload.Ordering, clock clock.Clock, fsResWeights map[corev1.ResourceName]float64, enableAdmissionFs bool) *ClusterQueue { + lessFunc := queueOrderingFunc(ctx, client, wo, fsResWeights, enableAdmissionFs) return &ClusterQueue{ heap: *heap.New(workloadKey, lessFunc), inadmissibleWorkloads: make(map[string]*workload.Info), @@ -171,6 +184,16 @@ func (c *ClusterQueue) PushOrUpdate(wInfo *workload.Info) { c.heap.PushOrUpdate(wInfo) } +func (c *ClusterQueue) Heapify(lqName string) { + c.rwm.Lock() + defer c.rwm.Unlock() + for _, wl := range c.heap.List() { + if string(wl.Obj.Spec.QueueName) == lqName { + c.heap.PushOrUpdate(wl) + } + } +} + // backoffWaitingTimeExpired returns true if the current time is after the requeueAt // and Requeued condition not present or equal True. func (c *ClusterQueue) backoffWaitingTimeExpired(wInfo *workload.Info) bool { @@ -410,8 +433,25 @@ func (c *ClusterQueue) RequeueIfNotPresent(wInfo *workload.Info, reason RequeueR // to sort workloads. The function sorts workloads based on their priority. // When priorities are equal, it uses the workload's creation or eviction // time. -func queueOrderingFunc(wo workload.Ordering) func(a, b *workload.Info) bool { +func queueOrderingFunc(ctx context.Context, c client.Client, wo workload.Ordering, fsResWeights map[corev1.ResourceName]float64, enableAdmissionFs bool) func(a, b *workload.Info) bool { + log := ctrl.LoggerFrom(ctx) return func(a, b *workload.Info) bool { + if enableAdmissionFs { + lqAUsage, errA := a.LqUsage(ctx, c, fsResWeights) + lqBUsage, errB := b.LqUsage(ctx, c, fsResWeights) + switch { + case errA != nil: + log.V(2).Error(errA, "Error determining LocalQueue usage") + case errB != nil: + log.V(2).Error(errB, "Error determining LocalQueue usage") + default: + log.V(3).Info("Resource usage from LocalQueue", "LocalQueue", a.Obj.Spec.QueueName, "Usage", lqAUsage) + log.V(3).Info("Resource usage from LocalQueue", "LocalQueue", b.Obj.Spec.QueueName, "Usage", lqBUsage) + if lqAUsage != lqBUsage { + return lqAUsage < lqBUsage + } + } + } p1 := utilpriority.Priority(a.Obj) p2 := utilpriority.Priority(b.Obj) diff --git a/pkg/queue/cluster_queue_test.go b/pkg/queue/cluster_queue_test.go index a6b566c80c7..3f68625bb15 100644 --- a/pkg/queue/cluster_queue_test.go +++ b/pkg/queue/cluster_queue_test.go @@ -17,12 +17,14 @@ limitations under the License. package queue import ( + "context" "testing" "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" testingclock "k8s.io/utils/clock/testing" @@ -30,6 +32,7 @@ import ( config "sigs.k8s.io/kueue/apis/config/v1beta1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/features" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" "sigs.k8s.io/kueue/pkg/workload" ) @@ -38,6 +41,10 @@ const ( defaultNamespace = "default" ) +const ( + resourceGPU corev1.ResourceName = "example.com/gpu" +) + const ( lowPriority int32 = 0 highPriority int32 = 1000 @@ -149,7 +156,7 @@ func Test_PushOrUpdate(t *testing.T) { } for name, tc := range cases { t.Run(name, func(t *testing.T) { - cq := newClusterQueueImpl(defaultOrdering, fakeClock) + cq := newClusterQueueImpl(t.Context(), nil, defaultOrdering, fakeClock, nil, false) if cq.Pending() != 0 { t.Error("ClusterQueue should be empty") @@ -178,7 +185,7 @@ func Test_PushOrUpdate(t *testing.T) { func Test_Pop(t *testing.T) { now := time.Now() - cq := newClusterQueueImpl(defaultOrdering, testingclock.NewFakeClock(now)) + cq := newClusterQueueImpl(t.Context(), nil, defaultOrdering, testingclock.NewFakeClock(now), nil, false) wl1 := workload.NewInfo(utiltesting.MakeWorkload("workload-1", defaultNamespace).Creation(now).Obj()) wl2 := workload.NewInfo(utiltesting.MakeWorkload("workload-2", defaultNamespace).Creation(now.Add(time.Second)).Obj()) if cq.Pop() != nil { @@ -200,7 +207,7 @@ func Test_Pop(t *testing.T) { } func Test_Delete(t *testing.T) { - cq := newClusterQueueImpl(defaultOrdering, testingclock.NewFakeClock(time.Now())) + cq := newClusterQueueImpl(t.Context(), nil, defaultOrdering, testingclock.NewFakeClock(time.Now()), nil, false) wl1 := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj() wl2 := utiltesting.MakeWorkload("workload-2", defaultNamespace).Obj() cq.PushOrUpdate(workload.NewInfo(wl1)) @@ -221,7 +228,7 @@ func Test_Delete(t *testing.T) { } func Test_Info(t *testing.T) { - cq := newClusterQueueImpl(defaultOrdering, testingclock.NewFakeClock(time.Now())) + cq := newClusterQueueImpl(t.Context(), nil, defaultOrdering, testingclock.NewFakeClock(time.Now()), nil, false) wl := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj() if info := cq.Info(workload.Key(wl)); info != nil { t.Error("Workload should not exist") @@ -233,7 +240,7 @@ func Test_Info(t *testing.T) { } func Test_AddFromLocalQueue(t *testing.T) { - cq := newClusterQueueImpl(defaultOrdering, testingclock.NewFakeClock(time.Now())) + cq := newClusterQueueImpl(t.Context(), nil, defaultOrdering, testingclock.NewFakeClock(time.Now()), nil, false) wl := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj() queue := &LocalQueue{ items: map[string]*workload.Info{ @@ -251,7 +258,7 @@ func Test_AddFromLocalQueue(t *testing.T) { } func Test_DeleteFromLocalQueue(t *testing.T) { - cq := newClusterQueueImpl(defaultOrdering, testingclock.NewFakeClock(time.Now())) + cq := newClusterQueueImpl(t.Context(), nil, defaultOrdering, testingclock.NewFakeClock(time.Now()), nil, false) q := utiltesting.MakeLocalQueue("foo", "").ClusterQueue("cq").Obj() qImpl := newLocalQueue(q) wl1 := utiltesting.MakeWorkload("wl1", "").Queue(kueue.LocalQueueName(q.Name)).Obj() @@ -406,7 +413,7 @@ func TestClusterQueueImpl(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { - cq := newClusterQueueImpl(defaultOrdering, fakeClock) + cq := newClusterQueueImpl(t.Context(), nil, defaultOrdering, fakeClock, nil, false) err := cq.Update(utiltesting.MakeClusterQueue("cq"). NamespaceSelector(&metav1.LabelSelector{ MatchExpressions: []metav1.LabelSelectorRequirement{ @@ -459,7 +466,7 @@ func TestClusterQueueImpl(t *testing.T) { } func TestQueueInadmissibleWorkloadsDuringScheduling(t *testing.T) { - cq := newClusterQueueImpl(defaultOrdering, testingclock.NewFakeClock(time.Now())) + cq := newClusterQueueImpl(t.Context(), nil, defaultOrdering, testingclock.NewFakeClock(time.Now()), nil, false) cq.namespaceSelector = labels.Everything() wl := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj() cl := utiltesting.NewFakeClient(wl, utiltesting.MakeNamespace(defaultNamespace)) @@ -543,7 +550,7 @@ func TestBackoffWaitingTimeExpired(t *testing.T) { } for name, tc := range cases { t.Run(name, func(t *testing.T) { - cq := newClusterQueueImpl(defaultOrdering, fakeClock) + cq := newClusterQueueImpl(t.Context(), nil, defaultOrdering, fakeClock, nil, false) got := cq.backoffWaitingTimeExpired(tc.workloadInfo) if tc.want != got { t.Errorf("Unexpected result from backoffWaitingTimeExpired\nwant: %v\ngot: %v\n", tc.want, got) @@ -600,14 +607,14 @@ func TestBestEffortFIFORequeueIfNotPresent(t *testing.T) { for name, tc := range tests { t.Run(name, func(t *testing.T) { - cq, _ := newClusterQueue( + cq, _ := newClusterQueue(t.Context(), nil, &kueue.ClusterQueue{ Spec: kueue.ClusterQueueSpec{ QueueingStrategy: kueue.BestEffortFIFO, }, }, workload.Ordering{PodsReadyRequeuingTimestamp: config.EvictionTimestamp}, - ) + nil) wl := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj() info := workload.NewInfo(wl) info.LastAssignment = tc.lastAssignment @@ -628,7 +635,7 @@ func TestBestEffortFIFORequeueIfNotPresent(t *testing.T) { } func TestFIFOClusterQueue(t *testing.T) { - q, err := newClusterQueue( + q, err := newClusterQueue(t.Context(), nil, &kueue.ClusterQueue{ Spec: kueue.ClusterQueueSpec{ QueueingStrategy: kueue.StrictFIFO, @@ -636,7 +643,7 @@ func TestFIFOClusterQueue(t *testing.T) { }, workload.Ordering{ PodsReadyRequeuingTimestamp: config.EvictionTimestamp, - }) + }, nil) if err != nil { t.Fatalf("Failed creating ClusterQueue %v", err) } @@ -833,13 +840,14 @@ func TestStrictFIFO(t *testing.T) { // The default ordering: tt.workloadOrdering = &workload.Ordering{PodsReadyRequeuingTimestamp: config.EvictionTimestamp} } - q, err := newClusterQueue( + q, err := newClusterQueue(t.Context(), nil, &kueue.ClusterQueue{ Spec: kueue.ClusterQueueSpec{ QueueingStrategy: kueue.StrictFIFO, }, }, - *tt.workloadOrdering) + *tt.workloadOrdering, + nil) if err != nil { t.Fatalf("Failed creating ClusterQueue %v", err) } @@ -875,14 +883,14 @@ func TestStrictFIFORequeueIfNotPresent(t *testing.T) { for reason, test := range tests { t.Run(string(reason), func(t *testing.T) { - cq, _ := newClusterQueue( + cq, _ := newClusterQueue(t.Context(), nil, &kueue.ClusterQueue{ Spec: kueue.ClusterQueueSpec{ QueueingStrategy: kueue.StrictFIFO, }, }, workload.Ordering{PodsReadyRequeuingTimestamp: config.EvictionTimestamp}, - ) + nil) wl := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj() if ok := cq.RequeueIfNotPresent(workload.NewInfo(wl), reason); !ok { t.Error("failed to requeue nonexistent workload") @@ -899,3 +907,221 @@ func TestStrictFIFORequeueIfNotPresent(t *testing.T) { }) } } + +func TestFsAdmission(t *testing.T) { + wlCmpOpts := []cmp.Option{ + cmpopts.EquateEmpty(), + cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion"), + cmpopts.IgnoreFields(metav1.Condition{}, "ObservedGeneration", "LastTransitionTime"), + } + + cases := map[string]struct { + cq *kueue.ClusterQueue + lqs []kueue.LocalQueue + afsConfig *config.AdmissionFairSharing + wls []kueue.Workload + wantWl kueue.Workload + }{ + "workloads are ordered by LQ usage, instead of priorities": { + cq: utiltesting.MakeClusterQueue("cq"). + AdmissionMode(kueue.UsageBasedAdmissionFairSharing). + Obj(), + lqs: []kueue.LocalQueue{ + *utiltesting.MakeLocalQueue("lqA", "default"). + FairSharing(&kueue.FairSharing{ + Weight: ptr.To(resource.MustParse("1")), + }). + FairSharingStatus( + &kueue.FairSharingStatus{ + AdmissionFairSharingStatus: &kueue.AdmissionFairSharingStatus{ + ConsumedResources: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("2"), + }, + }, + }, + ). + Obj(), + *utiltesting.MakeLocalQueue("lqB", "default"). + FairSharing(&kueue.FairSharing{ + Weight: ptr.To(resource.MustParse("1")), + }). + FairSharingStatus( + &kueue.FairSharingStatus{ + AdmissionFairSharingStatus: &kueue.AdmissionFairSharingStatus{ + ConsumedResources: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("1"), + }, + }, + }, + ).Obj(), + }, + afsConfig: &config.AdmissionFairSharing{}, + wls: []kueue.Workload{ + *utiltesting.MakeWorkload("wlA-high", "default").Queue("lqA").Priority(2).Obj(), + *utiltesting.MakeWorkload("wlB-low", "default").Queue("lqB").Priority(1).Obj(), + }, + wantWl: *utiltesting.MakeWorkload("wlB-low", "default").Queue("lqB").Priority(1).Obj(), + }, + "workloads are ordered by LQ usage with respect to resource weights": { + cq: utiltesting.MakeClusterQueue("cq"). + AdmissionMode(kueue.UsageBasedAdmissionFairSharing). + Obj(), + lqs: []kueue.LocalQueue{ + *utiltesting.MakeLocalQueue("lqA", "default"). + FairSharing(&kueue.FairSharing{ + Weight: ptr.To(resource.MustParse("1")), + }). + FairSharingStatus( + &kueue.FairSharingStatus{ + AdmissionFairSharingStatus: &kueue.AdmissionFairSharingStatus{ + ConsumedResources: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("1"), + resourceGPU: resource.MustParse("10"), + }, + }, + }, + ). + Obj(), + *utiltesting.MakeLocalQueue("lqB", "default"). + FairSharing(&kueue.FairSharing{ + Weight: ptr.To(resource.MustParse("1")), + }). + FairSharingStatus( + &kueue.FairSharingStatus{ + AdmissionFairSharingStatus: &kueue.AdmissionFairSharingStatus{ + ConsumedResources: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("1000"), + resourceGPU: resource.MustParse("1"), + }, + }, + }, + ).Obj(), + }, + afsConfig: &config.AdmissionFairSharing{ + ResourceWeights: map[corev1.ResourceName]float64{ + corev1.ResourceCPU: 0, + resourceGPU: 1, + }, + }, + wls: []kueue.Workload{ + *utiltesting.MakeWorkload("wlA-high", "default").Queue("lqA").Priority(2).Obj(), + *utiltesting.MakeWorkload("wlB-low", "default").Queue("lqB").Priority(1).Obj(), + }, + wantWl: *utiltesting.MakeWorkload("wlB-low", "default").Queue("lqB").Priority(1).Obj(), + }, + "workloads are ordered by LQ usage with respect to LQs' fair sharing weights": { + cq: utiltesting.MakeClusterQueue("cq"). + AdmissionMode(kueue.UsageBasedAdmissionFairSharing). + Obj(), + lqs: []kueue.LocalQueue{ + *utiltesting.MakeLocalQueue("lqA", "default"). + FairSharing(&kueue.FairSharing{ + Weight: ptr.To(resource.MustParse("1")), + }). + FairSharingStatus( + &kueue.FairSharingStatus{ + AdmissionFairSharingStatus: &kueue.AdmissionFairSharingStatus{ + ConsumedResources: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("10"), + }, + }, + }, + ). + Obj(), + *utiltesting.MakeLocalQueue("lqB", "default"). + FairSharing(&kueue.FairSharing{ + Weight: ptr.To(resource.MustParse("2")), + }). + FairSharingStatus( + &kueue.FairSharingStatus{ + AdmissionFairSharingStatus: &kueue.AdmissionFairSharingStatus{ + ConsumedResources: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("6"), + }, + }, + }, + ).Obj(), + }, + afsConfig: &config.AdmissionFairSharing{}, + wls: []kueue.Workload{ + *utiltesting.MakeWorkload("wlA-high", "default").Queue("lqA").Priority(2).Obj(), + *utiltesting.MakeWorkload("wlB-low", "default").Queue("lqB").Priority(1).Obj(), + }, + wantWl: *utiltesting.MakeWorkload("wlB-low", "default").Queue("lqB").Priority(1).Obj(), + }, + "workloads with the same LQ usage are ordered by priority": { + cq: utiltesting.MakeClusterQueue("cq"). + AdmissionMode(kueue.UsageBasedAdmissionFairSharing). + Obj(), + lqs: []kueue.LocalQueue{ + *utiltesting.MakeLocalQueue("lqA", "default"). + FairSharing(&kueue.FairSharing{ + Weight: ptr.To(resource.MustParse("1")), + }). + FairSharingStatus( + &kueue.FairSharingStatus{ + AdmissionFairSharingStatus: &kueue.AdmissionFairSharingStatus{ + ConsumedResources: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("10"), + }, + }, + }, + ).Obj(), + }, + afsConfig: &config.AdmissionFairSharing{}, + wls: []kueue.Workload{ + *utiltesting.MakeWorkload("wlA-low", "default").Queue("lqA").Priority(1).Obj(), + *utiltesting.MakeWorkload("wlA-high", "default").Queue("lqA").Priority(2).Obj(), + }, + wantWl: *utiltesting.MakeWorkload("wlA-high", "default").Queue("lqA").Priority(2).Obj(), + }, + "workloads with NoFairSharing CQ are ordered by priority": { + cq: utiltesting.MakeClusterQueue("cq"). + AdmissionMode(kueue.NoAdmissionFairSharing). + Obj(), + lqs: []kueue.LocalQueue{ + *utiltesting.MakeLocalQueue("lqA", "default").Obj(), + }, + afsConfig: &config.AdmissionFairSharing{}, + wls: []kueue.Workload{ + *utiltesting.MakeWorkload("wlA-low", "default").Queue("lqA").Priority(1).Obj(), + *utiltesting.MakeWorkload("wlA-high", "default").Queue("lqA").Priority(2).Obj(), + }, + wantWl: *utiltesting.MakeWorkload("wlA-high", "default").Queue("lqA").Priority(2).Obj(), + }, + "workloads with no FS config are ordered by priority": { + cq: utiltesting.MakeClusterQueue("cq"). + AdmissionMode(kueue.NoAdmissionFairSharing). + Obj(), + lqs: []kueue.LocalQueue{ + *utiltesting.MakeLocalQueue("lqA", "default").Obj(), + }, + wls: []kueue.Workload{ + *utiltesting.MakeWorkload("wlA-low", "default").Queue("lqA").Priority(1).Obj(), + *utiltesting.MakeWorkload("wlA-high", "default").Queue("lqA").Priority(2).Obj(), + }, + wantWl: *utiltesting.MakeWorkload("wlA-high", "default").Queue("lqA").Priority(2).Obj(), + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + features.SetFeatureGateDuringTest(t, features.AdmissionFairSharing, true) + builder := utiltesting.NewClientBuilder() + for _, lq := range tc.lqs { + builder = builder.WithObjects(&lq) + } + client := builder.Build() + ctx := context.Background() + + cq, _ := newClusterQueue(ctx, client, tc.cq, defaultOrdering, tc.afsConfig) + for _, wl := range tc.wls { + cq.PushOrUpdate(workload.NewInfo(&wl)) + } + + gotWl := cq.Pop() + if diff := cmp.Diff(tc.wantWl, *gotWl.Obj, wlCmpOpts...); diff != "" { + t.Errorf("Unexpected workloads on top of the heap (-want,+got):\n%s", diff) + } + }) + } +} diff --git a/pkg/queue/manager.go b/pkg/queue/manager.go index 5c8582b746b..43afbb37160 100644 --- a/pkg/queue/manager.go +++ b/pkg/queue/manager.go @@ -47,6 +47,7 @@ var ( type options struct { podsReadyRequeuingTimestamp config.RequeuingTimestamp workloadInfoOptions []workload.InfoOption + admissionFairSharing *config.AdmissionFairSharing } // Option configures the manager. @@ -57,6 +58,14 @@ var defaultOptions = options{ workloadInfoOptions: []workload.InfoOption{}, } +func WithAdmissionFairSharing(cfg *config.AdmissionFairSharing) Option { + return func(o *options) { + if features.Enabled(features.AdmissionFairSharing) { + o.admissionFairSharing = cfg + } + } +} + // WithPodsReadyRequeuingTimestamp sets the timestamp that is used for ordering // workloads that have been requeued due to the PodsReady condition. func WithPodsReadyRequeuingTimestamp(ts config.RequeuingTimestamp) Option { @@ -101,6 +110,8 @@ type Manager struct { hm hierarchy.Manager[*ClusterQueue, *cohort] topologyUpdateWatchers []TopologyUpdateWatcher + + admissionFairSharingConfig *config.AdmissionFairSharing } func NewManager(client client.Client, checker StatusChecker, opts ...Option) *Manager { @@ -120,7 +131,8 @@ func NewManager(client client.Client, checker StatusChecker, opts ...Option) *Ma workloadInfoOptions: options.workloadInfoOptions, hm: hierarchy.NewManager[*ClusterQueue, *cohort](newCohort), - topologyUpdateWatchers: make([]TopologyUpdateWatcher, 0), + topologyUpdateWatchers: make([]TopologyUpdateWatcher, 0), + admissionFairSharingConfig: options.admissionFairSharing, } m.cond.L = &m.RWMutex return m @@ -162,7 +174,7 @@ func (m *Manager) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) e return errClusterQueueAlreadyExists } - cqImpl, err := newClusterQueue(cq, m.workloadOrdering) + cqImpl, err := newClusterQueue(ctx, m.client, cq, m.workloadOrdering, m.admissionFairSharingConfig) if err != nil { return err } @@ -236,6 +248,17 @@ func (m *Manager) UpdateClusterQueue(ctx context.Context, cq *kueue.ClusterQueue return nil } +func (m *Manager) HeapifyClusterQueue(cq *kueue.ClusterQueue, lqName string) error { + m.Lock() + defer m.Unlock() + cqImpl := m.hm.ClusterQueue(kueue.ClusterQueueReference(cq.Name)) + if cqImpl == nil { + return ErrClusterQueueDoesNotExist + } + cqImpl.Heapify(lqName) + return nil +} + func (m *Manager) DeleteClusterQueue(cq *kueue.ClusterQueue) { m.Lock() defer m.Unlock() diff --git a/pkg/resources/resource.go b/pkg/resources/resource.go index af0e691f31c..6a0342123fe 100644 --- a/pkg/resources/resource.go +++ b/pkg/resources/resource.go @@ -43,3 +43,11 @@ func (q FlavorResourceQuantities) MarshalJSON() ([]byte, error) { } return json.Marshal(temp) } + +func (frq FlavorResourceQuantities) FlattenFlavors() Requests { + result := Requests{} + for key, val := range frq { + result[key.Resource] += val + } + return result +} diff --git a/pkg/util/resource/resource.go b/pkg/util/resource/resource.go index 2f0782aec6e..16c28aad897 100644 --- a/pkg/util/resource/resource.go +++ b/pkg/util/resource/resource.go @@ -103,3 +103,14 @@ func QuantityToFloat(q *resource.Quantity) float64 { } return float64(q.MilliValue()) / 1000 } + +// MulByFloat multiplies every element in q by f. +// Leverages k8s.io/apimachinery/pkg/api/resource package to provide precision to 3 decimal places +func MulByFloat(q corev1.ResourceList, f float64) corev1.ResourceList { + ret := q.DeepCopy() + for k, v := range ret { + scaledV := float64(v.MilliValue()) * f + ret[k] = *resource.NewMilliQuantity(int64(scaledV), resource.DecimalSI) + } + return ret +} diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index 7c03659aa7c..62e60524e04 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -659,12 +659,24 @@ func (q *LocalQueueWrapper) StopPolicy(p kueue.StopPolicy) *LocalQueueWrapper { return q } +// FairSharing sets the fair sharing config. +func (q *LocalQueueWrapper) FairSharing(fs *kueue.FairSharing) *LocalQueueWrapper { + q.Spec.FairSharing = fs + return q +} + // PendingWorkloads updates the pendingWorkloads in status. func (q *LocalQueueWrapper) PendingWorkloads(n int32) *LocalQueueWrapper { q.Status.PendingWorkloads = n return q } +// ReservingWorkloads updates the reservingWorkloads in status. +func (q *LocalQueueWrapper) ReservingWorkloads(n int32) *LocalQueueWrapper { + q.Status.ReservingWorkloads = n + return q +} + // AdmittedWorkloads updates the admittedWorkloads in status. func (q *LocalQueueWrapper) AdmittedWorkloads(n int32) *LocalQueueWrapper { q.Status.AdmittedWorkloads = n @@ -683,6 +695,22 @@ func (q *LocalQueueWrapper) Condition(conditionType string, status metav1.Condit return q } +func (q *LocalQueueWrapper) Active(status metav1.ConditionStatus) *LocalQueueWrapper { + apimeta.SetStatusCondition(&q.Status.Conditions, metav1.Condition{ + Type: kueue.LocalQueueActive, + Status: status, + Reason: "Ready", + Message: "Can submit new workloads to localQueue", + }) + return q +} + +// AdmittedWorkloads updates the admittedWorkloads in status. +func (q *LocalQueueWrapper) FairSharingStatus(status *kueue.FairSharingStatus) *LocalQueueWrapper { + q.Status.FairSharing = status + return q +} + // Generation sets the generation of the LocalQueue. func (q *LocalQueueWrapper) Generation(num int64) *LocalQueueWrapper { q.ObjectMeta.Generation = num @@ -771,6 +799,24 @@ func (c *ClusterQueueWrapper) AdmissionCheckStrategy(acs ...kueue.AdmissionCheck return c } +func (c *ClusterQueueWrapper) AdmissionMode(am kueue.AdmissionMode) *ClusterQueueWrapper { + if c.Spec.AdmissionScope == nil { + c.Spec.AdmissionScope = &kueue.AdmissionScope{} + } + c.Spec.AdmissionScope.AdmissionMode = am + return c +} + +func (c *ClusterQueueWrapper) Active(status metav1.ConditionStatus) *ClusterQueueWrapper { + apimeta.SetStatusCondition(&c.Status.Conditions, metav1.Condition{ + Type: kueue.ClusterQueueActive, + Status: status, + Reason: "By test", + Message: "by test", + }) + return c +} + // GeneratedName sets the prefix for the server to generate unique name. // No name should be given in the MakeClusterQueue for the GeneratedName to work. func (c *ClusterQueueWrapper) GeneratedName(name string) *ClusterQueueWrapper { diff --git a/pkg/workload/workload.go b/pkg/workload/workload.go index 4a87a52123a..238146356f0 100644 --- a/pkg/workload/workload.go +++ b/pkg/workload/workload.go @@ -282,6 +282,27 @@ func dropExcludedResources(input corev1.ResourceList, excludedPrefixes []string) return res } +func (i *Info) LqUsage(ctx context.Context, c client.Client, resWeights map[corev1.ResourceName]float64) (float64, error) { + var lq kueue.LocalQueue + lqKey := client.ObjectKey{Namespace: i.Obj.Namespace, Name: string(i.Obj.Spec.QueueName)} + if err := c.Get(ctx, lqKey, &lq); err != nil { + return 0, err + } + usage := 0.0 + for resName, resVal := range lq.Status.FairSharing.AdmissionFairSharingStatus.ConsumedResources { + weight, found := resWeights[resName] + if !found { + weight = 1 + } + usage += weight * resVal.AsApproximateFloat64() + } + if lq.Spec.FairSharing != nil && lq.Spec.FairSharing.Weight != nil { + // if no weight for lq was defined, use default weight of 1 + usage /= lq.Spec.FairSharing.Weight.AsApproximateFloat64() + } + return usage, nil +} + // IsUsingTAS returns information if the workload is using TAS func (i *Info) IsUsingTAS() bool { return slices.ContainsFunc(i.TotalRequests, diff --git a/site/content/en/docs/reference/kueue-config.v1beta1.md b/site/content/en/docs/reference/kueue-config.v1beta1.md index 009c9242e82..9cdedb9bba2 100644 --- a/site/content/en/docs/reference/kueue-config.v1beta1.md +++ b/site/content/en/docs/reference/kueue-config.v1beta1.md @@ -15,6 +15,47 @@ description: Generated API reference documentation for Kueue Configuration. +## `AdmissionFairSharing` {#AdmissionFairSharing} + + +**Appears in:** + + + + +
| Field | Description |
|---|---|
usageHalfLifeTime [Required]+ k8s.io/apimachinery/pkg/apis/meta/v1.Duration
+ |
+
+ usageHalfLifeTime indicates the time after which the current usage will decay by a half +If set to 0, usage will be reset to 0 immediately. + |
+
usageSamplingInterval [Required]+ k8s.io/apimachinery/pkg/apis/meta/v1.Duration
+ |
+
+ usageSamplingInterval indicates how often Kueue updates consumedResources in FairSharingStatus +Defaults to 5min. + |
+
resourceWeights [Required]+ map[ResourceName]float64
+ |
+
+ resourceWeights assigns weights to resources which then are used to calculate LocalQueue's +resource usage and order Workloads. +Defaults to 1. + |
+
FairSharing controls the Fair Sharing semantics across the cluster.
+admissionFairSharing [Required]AdmissionFairSharing
+admissionFairSharing indicates configuration of FairSharing with the AdmissionTime mode on
resources [Required]Resources
| Field | Description |
|---|---|
consumedResources [Required]+ k8s.io/api/core/v1.ResourceList
+ |
+
+ ConsumedResources represents the aggregated usage of resources over time, +with decaying function applied. +The value is populated if usage consumption functionality is enabled in Kueue config. + |
+
lastUpdate [Required]+ k8s.io/apimachinery/pkg/apis/meta/v1.Time
+ |
+
+ LastUpdate is the time when share and consumed resources were updated. + |
+
| Field | Description |
|---|---|
admissionMode [Required]+ AdmissionMode
+ |
++ No description provided. | +
admissionScopeAdmissionScope
+admissionScope indicates whether ClusterQueue uses the Admission Fair Sharing
+FairSharing contains the properties of the ClusterQueue or Cohort, when participating in FairSharing.
@@ -1088,6 +1165,8 @@ disadvantage against other ClusterQueues and Cohorts. - [ClusterQueueStatus](#kueue-x-k8s-io-v1beta1-ClusterQueueStatus) +- [LocalQueueStatus](#kueue-x-k8s-io-v1beta1-LocalQueueStatus) +FairSharingStatus contains the information about the current status of Fair Sharing.
@@ -1110,6 +1189,13 @@ weight of zero and is borrowing, this will return 9223372036854775807, the maximum possible share value. +admissionFairSharingStatus [Required]AdmissionFairSharingStatus
+admissionFairSharingStatus represents information relevant to the Admission Fair Sharing
+fairSharingFairSharing
+fairSharing defines the properties of the LocalQueue when +participating in AdmissionFairSharing. The values are only relevant +if AdmissionFairSharing is enabled in the Kueue configuration.
+flavors lists all currently available ResourceFlavors in specified ClusterQueue.
+fairSharingFairSharingStatus
+FairSharing contains the information about the current status of fair sharing.
+