Skip to content

Commit ae81233

Browse files
rphillipsjacobsee
authored andcommitted
UPSTREAM: <carry>: add management support to kubelet
UPSTREAM: <carry>: management workloads enhancement 741 UPSTREAM: <carry>: lower verbosity of managed workloads logging Support for managed workloads was introduced by PR#627. However, the the CPU manager reconcile loop now seems to flood kubelet log with "reconcileState: skipping pod; pod is managed" warnings. Lower the verbosity of these log messages. UPSTREAM: <carry>: set correctly static pods CPUs when workload partitioning is disabled UPSTREAM: <carry>: Remove reserved CPUs from default set Remove reserved CPUs from default set when workload partitioning is enabled. Co-Authored-By: Brent Rowsell <[email protected]> Signed-off-by: Artyom Lukianov <[email protected]> Signed-off-by: Don Penney <[email protected]> OpenShift-Rebase-Source: b762ced OpenShift-Rebase-Source: 63cf793 OpenShift-Rebase-Source: 32af64c UPSTREAM: <carry>: add management support to kubelet UPSTREAM: <carry>: OCPBUGS-29520: fix cpu manager default cpuset check in workload partitioned env (this can be squashed to 04070bb UPSTREAM: : add management support to kubelet) Workload partitioning makes the separation between reserved and workload cpus more strict. It is therefore expected the reserved cpus are NOT part of the default cpuset and the existing check was overzealous. First execution of kubelet after reboot never gets here as the cpuset is computed on line 209. However a kubelet restart without reboot skips this code, recovers from state file and runs the check on line 220. This was uncovered by decoupling the cpu manager state file cleanup from kubelet restart, doing it only once at reboot as part of OCPBUGS-24366 UPSTREAM: <carry>: add management workload check for guaranteed qos when static pods have workload partitioning enabled we should not alter their resources if they are Guaranteed QoS, this change adds a check for Guaranteed QoS Signed-off-by: ehila <[email protected]> test: add unit tests for error states Signed-off-by: ehila <[email protected]> UPSTREAM: <carry>: OCPBUGS-52169: Workload partitioning of static init containers The pod modification routine that prepares containers for Workload Partitioning quits early when it encounters a container with no resources specified. This causes a logical leak of resource capacity on the node, because the Pod is left untouched and still reports its full resource requests to the scheduler. It is however not using them, because the logic that moves the container to the management partitions works just fine. The end result is lowered node capacity for scheduling. Signed-off-by: Martin Sivak <[email protected]>
1 parent 038a2e0 commit ae81233

File tree

10 files changed

+1239
-17
lines changed

10 files changed

+1239
-17
lines changed

pkg/kubelet/cm/cpumanager/cpu_manager.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
3636
"k8s.io/kubernetes/pkg/kubelet/config"
3737
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
38+
"k8s.io/kubernetes/pkg/kubelet/managed"
3839
"k8s.io/kubernetes/pkg/kubelet/status"
3940
"k8s.io/utils/cpuset"
4041
)
@@ -411,13 +412,18 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
411412
failure = []reconciledContainer{}
412413

413414
m.removeStaleState()
415+
workloadEnabled := managed.IsEnabled()
414416
for _, pod := range m.activePods() {
415417
pstatus, ok := m.podStatusProvider.GetPodStatus(pod.UID)
416418
if !ok {
417419
klog.V(5).InfoS("ReconcileState: skipping pod; status not found", "pod", klog.KObj(pod))
418420
failure = append(failure, reconciledContainer{pod.Name, "", ""})
419421
continue
420422
}
423+
if enabled, _, _ := managed.IsPodManaged(pod); workloadEnabled && enabled {
424+
klog.V(4).InfoS("[cpumanager] reconcileState: skipping pod; pod is managed (pod: %s)", pod.Name)
425+
continue
426+
}
421427

422428
allContainers := pod.Spec.InitContainers
423429
allContainers = append(allContainers, pod.Spec.Containers...)

pkg/kubelet/cm/cpumanager/policy_static.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
3232
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
3333
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
34+
"k8s.io/kubernetes/pkg/kubelet/managed"
3435
"k8s.io/kubernetes/pkg/kubelet/metrics"
3536
"k8s.io/utils/cpuset"
3637
)
@@ -216,6 +217,10 @@ func (p *staticPolicy) validateState(s state.State) error {
216217
// state is empty initialize
217218
s.SetDefaultCPUSet(allCPUs)
218219
klog.InfoS("Static policy initialized", "defaultCPUSet", allCPUs)
220+
if managed.IsEnabled() {
221+
defaultCpus := s.GetDefaultCPUSet().Difference(p.reservedCPUs)
222+
s.SetDefaultCPUSet(defaultCpus)
223+
}
219224
return nil
220225
}
221226

@@ -229,7 +234,9 @@ func (p *staticPolicy) validateState(s state.State) error {
229234
p.reservedCPUs.Intersection(tmpDefaultCPUset).String(), tmpDefaultCPUset.String())
230235
}
231236
} else {
232-
if !p.reservedCPUs.Intersection(tmpDefaultCPUset).Equals(p.reservedCPUs) {
237+
// 2. This only applies when managed mode is disabled. Active workload partitioning feature
238+
// removes the reserved cpus from the default cpu mask on purpose.
239+
if !managed.IsEnabled() && !p.reservedCPUs.Intersection(tmpDefaultCPUset).Equals(p.reservedCPUs) {
233240
return fmt.Errorf("not all reserved cpus: \"%s\" are present in defaultCpuSet: \"%s\"",
234241
p.reservedCPUs.String(), tmpDefaultCPUset.String())
235242
}
@@ -261,10 +268,17 @@ func (p *staticPolicy) validateState(s state.State) error {
261268
}
262269
}
263270
totalKnownCPUs = totalKnownCPUs.Union(tmpCPUSets...)
264-
if !totalKnownCPUs.Equals(allCPUs) {
265-
return fmt.Errorf("current set of available CPUs \"%s\" doesn't match with CPUs in state \"%s\"",
266-
allCPUs.String(), totalKnownCPUs.String())
271+
availableCPUs := p.topology.CPUDetails.CPUs()
267272

273+
// CPU (workload) partitioning removes reserved cpus
274+
// from the default mask intentionally
275+
if managed.IsEnabled() {
276+
availableCPUs = availableCPUs.Difference(p.reservedCPUs)
277+
}
278+
279+
if !totalKnownCPUs.Equals(availableCPUs) {
280+
return fmt.Errorf("current set of available CPUs \"%s\" doesn't match with CPUs in state \"%s\"",
281+
availableCPUs.String(), totalKnownCPUs.String())
268282
}
269283

270284
return nil

pkg/kubelet/cm/cpumanager/policy_static_test.go

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"reflect"
2222
"testing"
2323

24+
"k8s.io/kubernetes/pkg/kubelet/managed"
25+
2426
v1 "k8s.io/api/core/v1"
2527
"k8s.io/apimachinery/pkg/types"
2628
utilfeature "k8s.io/apiserver/pkg/util/feature"
@@ -982,19 +984,20 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) {
982984
// above test cases are without kubelet --reserved-cpus cmd option
983985
// the following tests are with --reserved-cpus configured
984986
type staticPolicyTestWithResvList struct {
985-
description string
986-
topo *topology.CPUTopology
987-
numReservedCPUs int
988-
reserved cpuset.CPUSet
989-
cpuPolicyOptions map[string]string
990-
stAssignments state.ContainerCPUAssignments
991-
stDefaultCPUSet cpuset.CPUSet
992-
pod *v1.Pod
993-
expErr error
994-
expNewErr error
995-
expCPUAlloc bool
996-
expCSet cpuset.CPUSet
997-
expUncoreCache cpuset.CPUSet // represents the expected UncoreCacheIDs
987+
description string
988+
topo *topology.CPUTopology
989+
numReservedCPUs int
990+
reserved cpuset.CPUSet
991+
cpuPolicyOptions map[string]string
992+
stAssignments state.ContainerCPUAssignments
993+
stDefaultCPUSet cpuset.CPUSet
994+
pod *v1.Pod
995+
expErr error
996+
expNewErr error
997+
expCPUAlloc bool
998+
expCSet cpuset.CPUSet
999+
expUncoreCache cpuset.CPUSet // represents the expected UncoreCacheIDs
1000+
managementPartition bool
9981001
}
9991002

10001003
func TestStaticPolicyStartWithResvList(t *testing.T) {
@@ -1046,9 +1049,31 @@ func TestStaticPolicyStartWithResvList(t *testing.T) {
10461049
stDefaultCPUSet: cpuset.New(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
10471050
expNewErr: fmt.Errorf("[cpumanager] unable to reserve the required amount of CPUs (size of 0-1 did not equal 1)"),
10481051
},
1052+
{
1053+
description: "reserved cores 0 & 6 are not present in available cpuset when management partitioning is enabled",
1054+
topo: topoDualSocketHT,
1055+
numReservedCPUs: 2,
1056+
stAssignments: state.ContainerCPUAssignments{},
1057+
managementPartition: true,
1058+
expCSet: cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11),
1059+
},
1060+
{
1061+
description: "reserved cores 0 & 6 are not present in available cpuset when management partitioning is enabled during recovery",
1062+
topo: topoDualSocketHT,
1063+
numReservedCPUs: 2,
1064+
stAssignments: state.ContainerCPUAssignments{},
1065+
stDefaultCPUSet: cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11),
1066+
managementPartition: true,
1067+
expCSet: cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11),
1068+
},
10491069
}
10501070
for _, testCase := range testCases {
10511071
t.Run(testCase.description, func(t *testing.T) {
1072+
wasManaged := managed.IsEnabled()
1073+
managed.TestOnlySetEnabled(testCase.managementPartition)
1074+
defer func() {
1075+
managed.TestOnlySetEnabled(wasManaged)
1076+
}()
10521077
p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), testCase.cpuPolicyOptions)
10531078
if !reflect.DeepEqual(err, testCase.expNewErr) {
10541079
t.Errorf("StaticPolicy Start() error (%v). expected error: %v but got: %v",

pkg/kubelet/cm/qos_container_manager_linux.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"k8s.io/component-helpers/resource"
3636
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
3737
kubefeatures "k8s.io/kubernetes/pkg/features"
38+
"k8s.io/kubernetes/pkg/kubelet/managed"
3839
)
3940

4041
const (
@@ -174,6 +175,9 @@ func (m *qosContainerManagerImpl) setCPUCgroupConfig(configs map[v1.PodQOSClass]
174175
reuseReqs := make(v1.ResourceList, 4)
175176
for i := range pods {
176177
pod := pods[i]
178+
if enabled, _, _ := managed.IsPodManaged(pod); enabled {
179+
continue
180+
}
177181
qosClass := v1qos.GetPodQOS(pod)
178182
if qosClass != v1.PodQOSBurstable {
179183
// we only care about the burstable qos tier

pkg/kubelet/config/file.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"k8s.io/apimachinery/pkg/types"
3131
"k8s.io/client-go/tools/cache"
3232
api "k8s.io/kubernetes/pkg/apis/core"
33+
"k8s.io/kubernetes/pkg/kubelet/managed"
3334
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
3435
utilio "k8s.io/utils/io"
3536
)
@@ -230,6 +231,16 @@ func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) {
230231
if podErr != nil {
231232
return pod, podErr
232233
}
234+
if managed.IsEnabled() {
235+
if newPod, _, err := managed.ModifyStaticPodForPinnedManagement(pod); err != nil {
236+
klog.V(2).Error(err, "Static Pod is managed but errored", "name", pod.ObjectMeta.Name, "namespace", pod.ObjectMeta.Namespace)
237+
} else if newPod != nil {
238+
klog.V(2).InfoS("Static Pod is managed. Using modified pod", "name", newPod.ObjectMeta.Name, "namespace", newPod.ObjectMeta.Namespace, "annotations", newPod.Annotations)
239+
pod = newPod
240+
} else {
241+
klog.V(2).InfoS("Static Pod is not managed", "name", pod.ObjectMeta.Name, "namespace", pod.ObjectMeta.Namespace)
242+
}
243+
}
233244
return pod, nil
234245
}
235246

pkg/kubelet/kubelet.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ import (
9898
"k8s.io/kubernetes/pkg/kubelet/kuberuntime"
9999
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
100100
"k8s.io/kubernetes/pkg/kubelet/logs"
101+
"k8s.io/kubernetes/pkg/kubelet/managed"
101102
"k8s.io/kubernetes/pkg/kubelet/metrics"
102103
"k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
103104
"k8s.io/kubernetes/pkg/kubelet/network/dns"
@@ -701,6 +702,10 @@ func NewMainKubelet(ctx context.Context,
701702

702703
klet.runtimeService = kubeDeps.RemoteRuntimeService
703704

705+
if managed.IsEnabled() {
706+
klog.InfoS("Pinned Workload Management Enabled")
707+
}
708+
704709
if kubeDeps.KubeClient != nil {
705710
klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient)
706711
}

pkg/kubelet/kubelet_node_status.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ import (
4141
kubeletapis "k8s.io/kubelet/pkg/apis"
4242
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
4343
"k8s.io/kubernetes/pkg/features"
44+
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
4445
"k8s.io/kubernetes/pkg/kubelet/events"
46+
"k8s.io/kubernetes/pkg/kubelet/managed"
4547
"k8s.io/kubernetes/pkg/kubelet/nodestatus"
4648
taintutil "k8s.io/kubernetes/pkg/util/taints"
4749
volutil "k8s.io/kubernetes/pkg/volume/util"
@@ -130,6 +132,9 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
130132
requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate
131133
requiresUpdate = kl.reconcileExtendedResource(node, existingNode) || requiresUpdate
132134
requiresUpdate = kl.reconcileHugePageResource(node, existingNode) || requiresUpdate
135+
if managed.IsEnabled() {
136+
requiresUpdate = kl.addManagementNodeCapacity(node, existingNode) || requiresUpdate
137+
}
133138
if requiresUpdate {
134139
if _, _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, existingNode); err != nil {
135140
klog.ErrorS(err, "Unable to reconcile node with API server,error updating node", "node", klog.KObj(node))
@@ -140,6 +145,25 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
140145
return true
141146
}
142147

148+
// addManagementNodeCapacity adds the managednode capacity to the node
149+
func (kl *Kubelet) addManagementNodeCapacity(initialNode, existingNode *v1.Node) bool {
150+
updateDefaultResources(initialNode, existingNode)
151+
machineInfo, err := kl.cadvisor.MachineInfo()
152+
if err != nil {
153+
klog.Errorf("Unable to calculate managed node capacity for %q: %v", kl.nodeName, err)
154+
return false
155+
}
156+
cpuRequest := cadvisor.CapacityFromMachineInfo(machineInfo)[v1.ResourceCPU]
157+
cpuRequestInMilli := cpuRequest.MilliValue()
158+
newCPURequest := resource.NewMilliQuantity(cpuRequestInMilli*1000, cpuRequest.Format)
159+
managedResourceName := managed.GenerateResourceName("management")
160+
if existingCapacity, ok := existingNode.Status.Capacity[managedResourceName]; ok && existingCapacity.Equal(*newCPURequest) {
161+
return false
162+
}
163+
existingNode.Status.Capacity[managedResourceName] = *newCPURequest
164+
return true
165+
}
166+
143167
// reconcileHugePageResource will update huge page capacity for each page size and remove huge page sizes no longer supported
144168
func (kl *Kubelet) reconcileHugePageResource(initialNode, existingNode *v1.Node) bool {
145169
requiresUpdate := updateDefaultResources(initialNode, existingNode)
@@ -370,6 +394,10 @@ func (kl *Kubelet) initialNode(ctx context.Context) (*v1.Node, error) {
370394
node.Spec.ProviderID = kl.providerID
371395
}
372396

397+
if managed.IsEnabled() {
398+
kl.addManagementNodeCapacity(node, node)
399+
}
400+
373401
kl.setNodeStatus(ctx, node)
374402

375403
return node, nil

pkg/kubelet/managed/cpu_shares.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package managed
2+
3+
const (
4+
// These limits are defined in the kernel:
5+
// https://github.com/torvalds/linux/blob/0bddd227f3dc55975e2b8dfa7fc6f959b062a2c7/kernel/sched/sched.h#L427-L428
6+
MinShares = 2
7+
MaxShares = 262144
8+
9+
SharesPerCPU = 1024
10+
MilliCPUToCPU = 1000
11+
)
12+
13+
// MilliCPUToShares converts the milliCPU to CFS shares.
14+
func MilliCPUToShares(milliCPU int64) uint64 {
15+
if milliCPU == 0 {
16+
// Docker converts zero milliCPU to unset, which maps to kernel default
17+
// for unset: 1024. Return 2 here to really match kernel default for
18+
// zero milliCPU.
19+
return MinShares
20+
}
21+
// Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding.
22+
shares := (milliCPU * SharesPerCPU) / MilliCPUToCPU
23+
if shares < MinShares {
24+
return MinShares
25+
}
26+
if shares > MaxShares {
27+
return MaxShares
28+
}
29+
return uint64(shares)
30+
}

0 commit comments

Comments
 (0)