Skip to content

Commit

Permalink
[Windows] Reconcile host-network Pods after agent is restarted
Browse files Browse the repository at this point in the history
This change is to support reconciling the host-network Pods on Windows because
k8s expects to let CNI manage such Pods as long as they are not using
host-process containers.

Antrea has received the CmdAdd request for such Pods when they are created, so
they should be included in the Pod reconcile list after agent is restarted.

Signed-off-by: Wenying Dong <[email protected]>
  • Loading branch information
wenyingd committed Jan 21, 2025
1 parent 71cd6e0 commit 1193dc7
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 15 deletions.
4 changes: 0 additions & 4 deletions pkg/agent/cniserver/pod_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,10 +445,6 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain
var podWg sync.WaitGroup

for _, pod := range pods {
// Skip Pods for which we are not in charge of the networking.
if pod.Spec.HostNetwork {
continue
}
desiredPods.Insert(k8s.NamespacedName(pod.Namespace, pod.Name))
for _, podIP := range pod.Status.PodIPs {
desiredPodIPs.Insert(podIP.IP)
Expand Down
8 changes: 1 addition & 7 deletions pkg/agent/cniserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/containernetworking/cni/pkg/version"
"github.com/containernetworking/plugins/pkg/ip"
"google.golang.org/grpc"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -758,12 +757,7 @@ func (s *CNIServer) interceptCheck(cniConfig *CNIConfig) (*cnipb.CniCmdResponse,
// K8s apiserver and replay the necessary flows.
func (s *CNIServer) reconcile() error {
klog.InfoS("Starting reconciliation for CNI server")
// For performance reasons, use ResourceVersion="0" in the ListOptions to ensure the request is served from
// the watch cache in kube-apiserver.
pods, err := s.kubeClient.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
FieldSelector: "spec.nodeName=" + s.nodeConfig.Name,
ResourceVersion: "0",
})
pods, err := s.kubeClient.CoreV1().Pods("").List(context.TODO(), s.getPodsListOptions())
if err != nil {
return fmt.Errorf("failed to list Pods running on Node %s: %v", s.nodeConfig.Name, err)
}
Expand Down
17 changes: 16 additions & 1 deletion pkg/agent/cniserver/server_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@

package cniserver

import current "github.com/containernetworking/cni/pkg/types/100"
import (
"fmt"

current "github.com/containernetworking/cni/pkg/types/100"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// updateResultDNSConfig updates the DNS config from CNIConfig.
func updateResultDNSConfig(result *current.Result, cniConfig *CNIConfig) {
Expand Down Expand Up @@ -48,3 +53,13 @@ func validateRuntime(netNS string) error {
func (c *CNIConfig) getInfraContainer() string {
return c.ContainerId
}

// getPodsListOptions returns the none host-network Pods running on the current Node.
func (s *CNIServer) getPodsListOptions() metav1.ListOptions {
return metav1.ListOptions{
FieldSelector: fmt.Sprintf("spec.nodeName=%s,spec.hostNetwork=false", s.nodeConfig.Name),
// For performance reasons, use ResourceVersion="0" in the ListOptions to ensure the request is served from
// the watch cache in kube-apiserver.
ResourceVersion: "0",
}
}
13 changes: 13 additions & 0 deletions pkg/agent/cniserver/server_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strings"

current "github.com/containernetworking/cni/pkg/types/100"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
)

Expand Down Expand Up @@ -98,3 +99,15 @@ func getInfraContainer(containerID, netNS string) string {
func (c *CNIConfig) getInfraContainer() string {
return getInfraContainer(c.ContainerId, c.Netns)
}

// getPodsListOptions returns the Pods running on the current Node. Note, the host-network Pods are not filtered
// out on Windows because they are also managed by antrea as long as "spec.SecurityContext.windowsOptions.hostProcess"
// is not configured.
func (s *CNIServer) getPodsListOptions() metav1.ListOptions {
return metav1.ListOptions{
FieldSelector: fmt.Sprintf("spec.nodeName=%s", s.nodeConfig.Name),
// For performance reasons, use ResourceVersion="0" in the ListOptions to ensure the request is served from
// the watch cache in kube-apiserver.
ResourceVersion: "0",
}
}
31 changes: 28 additions & 3 deletions pkg/agent/cniserver/server_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,30 @@ import (
"antrea.io/antrea/pkg/ovs/ovsconfig"
ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing"
"antrea.io/antrea/pkg/util/channel"
utilip "antrea.io/antrea/pkg/util/ip"
)

var (
containerMACStr = "23:34:56:23:22:45"
dnsSearches = []string{"a.b.c.d"}

mockWinnet *winnettest.MockInterface

interfaceForHostNetworkPod = &interfacestore.InterfaceConfig{
InterfaceName: "iface2",
Type: interfacestore.ContainerInterface,
IPs: []net.IP{net.ParseIP("1.1.1.2")},
MAC: utilip.MustParseMAC("00:11:22:33:44:02"),
OVSPortConfig: &interfacestore.OVSPortConfig{
PortUUID: generateUUID(),
OFPort: int32(4),
},
ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{
PodName: pod2.Name,
PodNamespace: testPodNamespace,
ContainerID: generateUUID(),
},
}
)

func TestUpdateResultDNSConfig(t *testing.T) {
Expand Down Expand Up @@ -732,7 +749,7 @@ func TestReconcile(t *testing.T) {
cniServer := newCNIServer(t)
cniServer.routeClient = mockRoute
cniServer.kubeClient = kubeClient
for _, containerIface := range []*interfacestore.InterfaceConfig{normalInterface, staleInterface, unconnectedInterface} {
for _, containerIface := range []*interfacestore.InterfaceConfig{normalInterface, staleInterface, unconnectedInterface, interfaceForHostNetworkPod} {
ifaceStore.AddInterface(containerIface)
}
waiter := newAsyncWaiter(unconnectedInterface.PodName, unconnectedInterface.ContainerID, stopCh)
Expand All @@ -741,11 +758,19 @@ func TestReconcile(t *testing.T) {
go cniServer.podConfigurator.Run(stopCh)

// Re-install Pod1 flows
podFlowsInstalled := make(chan string, 2)
expReInstalledPodCount := 3
podFlowsInstalled := make(chan string, expReInstalledPodCount)
mockOFClient.EXPECT().InstallPodFlows(normalInterface.InterfaceName, normalInterface.IPs, normalInterface.MAC, uint32(normalInterface.OFPort), uint16(0), nil).
Do(func(interfaceName string, _ []net.IP, _ net.HardwareAddr, _ uint32, _ uint16, _ *uint32) {
podFlowsInstalled <- interfaceName
}).Times(1)

// Re-install host-network Pod (Pod2) flows
mockOFClient.EXPECT().InstallPodFlows(interfaceForHostNetworkPod.InterfaceName, interfaceForHostNetworkPod.IPs, interfaceForHostNetworkPod.MAC, uint32(interfaceForHostNetworkPod.OFPort), uint16(0), nil).
Do(func(interfaceName string, _ []net.IP, _ net.HardwareAddr, _ uint32, _ uint16, _ *uint32) {
podFlowsInstalled <- interfaceName
}).Times(1)

// Uninstall Pod3 flows which is deleted.
mockOFClient.EXPECT().UninstallPodFlows(staleInterface.InterfaceName).Return(nil).Times(1)
mockOVSBridgeClient.EXPECT().DeletePort(staleInterface.PortUUID).Return(nil).Times(1)
Expand Down Expand Up @@ -778,7 +803,7 @@ func TestReconcile(t *testing.T) {
assert.NoError(t, err)
_, exists := ifaceStore.GetInterfaceByName("iface3")
assert.False(t, exists)
for i := 0; i < 2; i++ {
for i := 0; i < expReInstalledPodCount; i++ {
select {
case <-podFlowsInstalled:
case <-time.After(500 * time.Millisecond):
Expand Down

0 comments on commit 1193dc7

Please sign in to comment.