Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion pkg/kubelet/apis/podresources/server_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

v1 "k8s.io/api/core/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/types"
Expand All @@ -36,17 +37,21 @@ type v1PodResourcesServer struct {
cpusProvider CPUsProvider
memoryProvider MemoryProvider
dynamicResourcesProvider DynamicResourcesProvider
useActivePods bool
}

// NewV1PodResourcesServer returns a PodResourcesListerServer which lists pods provided by the PodsProvider
// with device information provided by the DevicesProvider
func NewV1PodResourcesServer(providers PodResourcesProviders) podresourcesv1.PodResourcesListerServer {
useActivePods := true
klog.InfoS("podresources", "method", "list", "useActivePods", useActivePods)
return &v1PodResourcesServer{
podsProvider: providers.Pods,
devicesProvider: providers.Devices,
cpusProvider: providers.Cpus,
memoryProvider: providers.Memory,
dynamicResourcesProvider: providers.DynamicResources,
useActivePods: useActivePods,
}
}

Expand All @@ -55,7 +60,13 @@ func (p *v1PodResourcesServer) List(ctx context.Context, req *podresourcesv1.Lis
metrics.PodResourcesEndpointRequestsTotalCount.WithLabelValues("v1").Inc()
metrics.PodResourcesEndpointRequestsListCount.WithLabelValues("v1").Inc()

pods := p.podsProvider.GetPods()
var pods []*v1.Pod
if p.useActivePods {
pods = p.podsProvider.GetActivePods()
} else {
pods = p.podsProvider.GetPods()
}

podResources := make([]*podresourcesv1.PodResources, len(pods))
p.devicesProvider.UpdateAllocatedDevices()

Expand Down
157 changes: 157 additions & 0 deletions pkg/kubelet/apis/podresources/server_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package podresources
import (
"context"
"fmt"
"sort"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/mock"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -221,6 +223,7 @@ func TestListPodResourcesV1(t *testing.T) {
mockDynamicResourcesProvider := podresourcetest.NewMockDynamicResourcesProvider(t)

mockPodsProvider.EXPECT().GetPods().Return(tc.pods).Maybe()
mockPodsProvider.EXPECT().GetActivePods().Return(tc.pods).Maybe()
mockDevicesProvider.EXPECT().GetDevices(string(podUID), containerName).Return(tc.devices).Maybe()
mockCPUsProvider.EXPECT().GetCPUs(string(podUID), containerName).Return(tc.cpus).Maybe()
mockMemoryProvider.EXPECT().GetMemory(string(podUID), containerName).Return(tc.memory).Maybe()
Expand Down Expand Up @@ -249,6 +252,159 @@ func TestListPodResourcesV1(t *testing.T) {
}
}

func makePod(idx int) *v1.Pod {
podNamespace := "pod-namespace"
podName := fmt.Sprintf("pod-name-%d", idx)
podUID := types.UID(fmt.Sprintf("pod-uid-%d", idx))
containerName := fmt.Sprintf("container-name-%d", idx)
containers := []v1.Container{
{
Name: containerName,
},
}
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: podNamespace,
UID: podUID,
},
Spec: v1.PodSpec{
Containers: containers,
},
}
}

func collectNamespacedNamesFromPods(pods []*v1.Pod) []string {
ret := make([]string, 0, len(pods))
for _, pod := range pods {
ret = append(ret, pod.Namespace+"/"+pod.Name)
}
sort.Strings(ret)
return ret
}

func collectNamespacedNamesFromPodResources(prs []*podresourcesapi.PodResources) []string {
ret := make([]string, 0, len(prs))
for _, pr := range prs {
ret = append(ret, pr.Namespace+"/"+pr.Name)
}
sort.Strings(ret)
return ret
}

func TestListPodResourcesUsesOnlyActivePodsV1(t *testing.T) {
numaID := int64(1)

// we abuse the fact that we don't care about the assignments,
// so we reuse the same for all pods which is actually wrong.
devs := []*podresourcesapi.ContainerDevices{
{
ResourceName: "resource",
DeviceIds: []string{"dev0"},
Topology: &podresourcesapi.TopologyInfo{Nodes: []*podresourcesapi.NUMANode{{ID: numaID}}},
},
}

cpus := []int64{1, 9}

mems := []*podresourcesapi.ContainerMemory{
{
MemoryType: "memory",
Size_: 1073741824,
Topology: &podresourcesapi.TopologyInfo{Nodes: []*podresourcesapi.NUMANode{{ID: numaID}}},
},
{
MemoryType: "hugepages-1Gi",
Size_: 1073741824,
Topology: &podresourcesapi.TopologyInfo{Nodes: []*podresourcesapi.NUMANode{{ID: numaID}}},
},
}

for _, tc := range []struct {
desc string
pods []*v1.Pod
activePods []*v1.Pod
}{
{
desc: "no pods",
pods: []*v1.Pod{},
activePods: []*v1.Pod{},
},
{
desc: "no differences",
pods: []*v1.Pod{
makePod(1),
makePod(2),
makePod(3),
makePod(4),
makePod(5),
},
activePods: []*v1.Pod{
makePod(1),
makePod(2),
makePod(3),
makePod(4),
makePod(5),
},
},
{
desc: "some terminated pods",
pods: []*v1.Pod{
makePod(1),
makePod(2),
makePod(3),
makePod(4),
makePod(5),
makePod(6),
makePod(7),
},
activePods: []*v1.Pod{
makePod(1),
makePod(3),
makePod(4),
makePod(5),
makePod(6),
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
mockDevicesProvider := podresourcetest.NewMockDevicesProvider(t)
mockPodsProvider := podresourcetest.NewMockPodsProvider(t)
mockCPUsProvider := podresourcetest.NewMockCPUsProvider(t)
mockMemoryProvider := podresourcetest.NewMockMemoryProvider(t)
mockDynamicResourcesProvider := podresourcetest.NewMockDynamicResourcesProvider(t)

mockPodsProvider.EXPECT().GetPods().Return(tc.pods).Maybe()
mockPodsProvider.EXPECT().GetActivePods().Return(tc.activePods).Maybe()
mockDevicesProvider.EXPECT().GetDevices(mock.Anything, mock.Anything).Return(devs).Maybe()
mockCPUsProvider.EXPECT().GetCPUs(mock.Anything, mock.Anything).Return(cpus).Maybe()
mockMemoryProvider.EXPECT().GetMemory(mock.Anything, mock.Anything).Return(mems).Maybe()
mockDevicesProvider.EXPECT().UpdateAllocatedDevices().Return().Maybe()
mockCPUsProvider.EXPECT().GetAllocatableCPUs().Return([]int64{}).Maybe()
mockDevicesProvider.EXPECT().GetAllocatableDevices().Return([]*podresourcesapi.ContainerDevices{}).Maybe()
mockMemoryProvider.EXPECT().GetAllocatableMemory().Return([]*podresourcesapi.ContainerMemory{}).Maybe()

providers := PodResourcesProviders{
Pods: mockPodsProvider,
Devices: mockDevicesProvider,
Cpus: mockCPUsProvider,
Memory: mockMemoryProvider,
DynamicResources: mockDynamicResourcesProvider,
}
server := NewV1PodResourcesServer(providers)
resp, err := server.List(context.TODO(), &podresourcesapi.ListPodResourcesRequest{})
if err != nil {
t.Errorf("want err = %v, got %q", nil, err)
}
expectedNames := collectNamespacedNamesFromPods(tc.activePods)
gotNames := collectNamespacedNamesFromPodResources(resp.GetPodResources())
if diff := cmp.Diff(expectedNames, gotNames, cmpopts.EquateEmpty()); diff != "" {
t.Fatal(diff)
}
})
}
}

func TestListPodResourcesWithInitContainersV1(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.KubeletPodResourcesDynamicResources, true)

Expand Down Expand Up @@ -530,6 +686,7 @@ func TestListPodResourcesWithInitContainersV1(t *testing.T) {
mockDynamicResourcesProvider := podresourcetest.NewMockDynamicResourcesProvider(t)

mockPodsProvider.EXPECT().GetPods().Return(tc.pods).Maybe()
mockPodsProvider.EXPECT().GetActivePods().Return(tc.pods).Maybe()
tc.mockFunc(tc.pods, mockDevicesProvider, mockCPUsProvider, mockMemoryProvider, mockDynamicResourcesProvider)

providers := PodResourcesProviders{
Expand Down
47 changes: 47 additions & 0 deletions pkg/kubelet/apis/podresources/testing/pods_provider.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/kubelet/apis/podresources/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type DevicesProvider interface {

// PodsProvider knows how to provide the pods admitted by the node
type PodsProvider interface {
GetActivePods() []*v1.Pod
GetPods() []*v1.Pod
GetPodByName(namespace, name string) (*v1.Pod, bool)
}
Expand Down
18 changes: 17 additions & 1 deletion pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -2962,6 +2962,22 @@ func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint, tp trace.Tr
server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port, tp)
}

type kubeletPodsProvider struct {
kl *Kubelet
}

func (pp *kubeletPodsProvider) GetActivePods() []*v1.Pod {
return pp.kl.GetActivePods()
}

func (pp *kubeletPodsProvider) GetPods() []*v1.Pod {
return pp.kl.podManager.GetPods()
}

func (pp *kubeletPodsProvider) GetPodByName(namespace, name string) (*v1.Pod, bool) {
return pp.kl.podManager.GetPodByName(namespace, name)
}

// ListenAndServePodResources runs the kubelet podresources grpc service
func (kl *Kubelet) ListenAndServePodResources() {
endpoint, err := util.LocalEndpoint(kl.getPodResourcesDir(), podresources.Socket)
Expand All @@ -2971,7 +2987,7 @@ func (kl *Kubelet) ListenAndServePodResources() {
}

providers := podresources.PodResourcesProviders{
Pods: kl.podManager,
Pods: &kubeletPodsProvider{kl: kl},
Devices: kl.containerManager,
Cpus: kl.containerManager,
Memory: kl.containerManager,
Expand Down