Skip to content

Commit

Permalink
[Windows] Reconcile host-network Pods after agent is restarted
Browse files Browse the repository at this point in the history
This change is to support reconciling the host-network Pods on Windows because
k8s expects to let CNI manage such Pods as long as they are not using
host-process containers.

Antrea has received the CmdAdd request for such Pods when they are created, so
they should be included in the Pod reconcile list after agent is restarted.

Signed-off-by: Wenying Dong <[email protected]>
  • Loading branch information
wenyingd committed Jan 22, 2025
1 parent f8d2ead commit c3d9724
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 15 deletions.
4 changes: 0 additions & 4 deletions pkg/agent/cniserver/pod_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,10 +445,6 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain
var podWg sync.WaitGroup

for _, pod := range pods {
// Skip Pods for which we are not in charge of the networking.
if pod.Spec.HostNetwork {
continue
}
desiredPods.Insert(k8s.NamespacedName(pod.Namespace, pod.Name))
for _, podIP := range pod.Status.PodIPs {
desiredPodIPs.Insert(podIP.IP)
Expand Down
14 changes: 7 additions & 7 deletions pkg/agent/cniserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/containernetworking/cni/pkg/version"
"github.com/containernetworking/plugins/pkg/ip"
"google.golang.org/grpc"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -756,14 +755,15 @@ func (s *CNIServer) interceptCheck(cniConfig *CNIConfig) (*cnipb.CniCmdResponse,
// reconcile performs startup reconciliation for the CNI server. The CNI server is in charge of
// installing Pod flows, so as part of this reconciliation process we retrieve the Pod list from the
// K8s apiserver and replay the necessary flows.
// The Pods are processed in reconcile as below,
// | Pod Type | Spec.HostNetwork | windowsOptions.hostProcess | OVS interface needed? | List Pods in reconcile |
// | Normal Pod (non-HostNetwork) | false | false or N/A | Yes | Yes |
// | Linux HostNetwork Pod | true | N/A | No | No |
// | Windows HostNetwork Pod | true | false | Yes | Yes |
// | Windows HostProcess Pod | true | true | No | Yes |
func (s *CNIServer) reconcile() error {
klog.InfoS("Starting reconciliation for CNI server")
// For performance reasons, use ResourceVersion="0" in the ListOptions to ensure the request is served from
// the watch cache in kube-apiserver.
pods, err := s.kubeClient.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
FieldSelector: "spec.nodeName=" + s.nodeConfig.Name,
ResourceVersion: "0",
})
pods, err := s.kubeClient.CoreV1().Pods("").List(context.TODO(), s.getPodsListOptions())
if err != nil {
return fmt.Errorf("failed to list Pods running on Node %s: %v", s.nodeConfig.Name, err)
}
Expand Down
17 changes: 16 additions & 1 deletion pkg/agent/cniserver/server_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@

package cniserver

import current "github.com/containernetworking/cni/pkg/types/100"
import (
"fmt"

current "github.com/containernetworking/cni/pkg/types/100"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// updateResultDNSConfig updates the DNS config from CNIConfig.
func updateResultDNSConfig(result *current.Result, cniConfig *CNIConfig) {
Expand Down Expand Up @@ -48,3 +53,13 @@ func validateRuntime(netNS string) error {
func (c *CNIConfig) getInfraContainer() string {
return c.ContainerId
}

// getPodsListOptions returns the none host-network Pods running on the current Node.
func (s *CNIServer) getPodsListOptions() metav1.ListOptions {
return metav1.ListOptions{
FieldSelector: fmt.Sprintf("spec.nodeName=%s,spec.hostNetwork=false", s.nodeConfig.Name),
// For performance reasons, use ResourceVersion="0" in the ListOptions to ensure the request is served from
// the watch cache in kube-apiserver.
ResourceVersion: "0",
}
}
13 changes: 13 additions & 0 deletions pkg/agent/cniserver/server_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strings"

current "github.com/containernetworking/cni/pkg/types/100"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
)

Expand Down Expand Up @@ -98,3 +99,15 @@ func getInfraContainer(containerID, netNS string) string {
func (c *CNIConfig) getInfraContainer() string {
return getInfraContainer(c.ContainerId, c.Netns)
}

// getPodsListOptions returns the Pods running on the current Node. Note, the host-network Pods are not filtered
// out on Windows because they are also managed by antrea as long as "spec.SecurityContext.windowsOptions.hostProcess"
// is not configured.
func (s *CNIServer) getPodsListOptions() metav1.ListOptions {
return metav1.ListOptions{
FieldSelector: fmt.Sprintf("spec.nodeName=%s", s.nodeConfig.Name),
// For performance reasons, use ResourceVersion="0" in the ListOptions to ensure the request is served from
// the watch cache in kube-apiserver.
ResourceVersion: "0",
}
}
31 changes: 28 additions & 3 deletions pkg/agent/cniserver/server_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,30 @@ import (
"antrea.io/antrea/pkg/ovs/ovsconfig"
ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing"
"antrea.io/antrea/pkg/util/channel"
utilip "antrea.io/antrea/pkg/util/ip"
)

var (
containerMACStr = "23:34:56:23:22:45"
dnsSearches = []string{"a.b.c.d"}

mockWinnet *winnettest.MockInterface

interfaceForHostNetworkPod = &interfacestore.InterfaceConfig{
InterfaceName: "iface2",
Type: interfacestore.ContainerInterface,
IPs: []net.IP{net.ParseIP("1.1.1.2")},
MAC: utilip.MustParseMAC("00:11:22:33:44:02"),
OVSPortConfig: &interfacestore.OVSPortConfig{
PortUUID: generateUUID(),
OFPort: int32(4),
},
ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{
PodName: pod2.Name,
PodNamespace: testPodNamespace,
ContainerID: generateUUID(),
},
}
)

func TestUpdateResultDNSConfig(t *testing.T) {
Expand Down Expand Up @@ -732,7 +749,7 @@ func TestReconcile(t *testing.T) {
cniServer := newCNIServer(t)
cniServer.routeClient = mockRoute
cniServer.kubeClient = kubeClient
for _, containerIface := range []*interfacestore.InterfaceConfig{normalInterface, staleInterface, unconnectedInterface} {
for _, containerIface := range []*interfacestore.InterfaceConfig{normalInterface, staleInterface, unconnectedInterface, interfaceForHostNetworkPod} {
ifaceStore.AddInterface(containerIface)
}
waiter := newAsyncWaiter(unconnectedInterface.PodName, unconnectedInterface.ContainerID, stopCh)
Expand All @@ -741,11 +758,19 @@ func TestReconcile(t *testing.T) {
go cniServer.podConfigurator.Run(stopCh)

// Re-install Pod1 flows
podFlowsInstalled := make(chan string, 2)
expReinstalledPodCount := 3
podFlowsInstalled := make(chan string, expReinstalledPodCount)
mockOFClient.EXPECT().InstallPodFlows(normalInterface.InterfaceName, normalInterface.IPs, normalInterface.MAC, uint32(normalInterface.OFPort), uint16(0), nil).
Do(func(interfaceName string, _ []net.IP, _ net.HardwareAddr, _ uint32, _ uint16, _ *uint32) {
podFlowsInstalled <- interfaceName
}).Times(1)

// Re-install host-network Pod (Pod2) flows
mockOFClient.EXPECT().InstallPodFlows(interfaceForHostNetworkPod.InterfaceName, interfaceForHostNetworkPod.IPs, interfaceForHostNetworkPod.MAC, uint32(interfaceForHostNetworkPod.OFPort), uint16(0), nil).
Do(func(interfaceName string, _ []net.IP, _ net.HardwareAddr, _ uint32, _ uint16, _ *uint32) {
podFlowsInstalled <- interfaceName
}).Times(1)

// Uninstall Pod3 flows which is deleted.
mockOFClient.EXPECT().UninstallPodFlows(staleInterface.InterfaceName).Return(nil).Times(1)
mockOVSBridgeClient.EXPECT().DeletePort(staleInterface.PortUUID).Return(nil).Times(1)
Expand Down Expand Up @@ -778,7 +803,7 @@ func TestReconcile(t *testing.T) {
assert.NoError(t, err)
_, exists := ifaceStore.GetInterfaceByName("iface3")
assert.False(t, exists)
for i := 0; i < 2; i++ {
for i := 0; i < expReinstalledPodCount; i++ {
select {
case <-podFlowsInstalled:
case <-time.After(500 * time.Millisecond):
Expand Down
56 changes: 56 additions & 0 deletions test/e2e/connectivity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func TestConnectivity(t *testing.T) {
skipIfNumNodesLessThan(t, 2)
testPingLargeMTU(t, data)
})
t.Run("testWindowsPodConnectivityAfterAntreaRestart", func(t *testing.T) {
skipIfNoWindowsNodes(t)
testWindowsPodConnectivityAfterAntreaRestart(t, data)
})
}

func waitForPodIPs(t *testing.T, data *TestData, podInfos []PodInfo) map[string]*PodIPs {
Expand Down Expand Up @@ -121,6 +125,58 @@ func (data *TestData) runPingMesh(t *testing.T, podInfos []PodInfo, ctrname stri
}
}

// runPodsOnWindows checks the Pods' connectivity after antrea-agent is restarted.
// This case only run on Windows. The Pods in the test includes both host-network Pod and none host-network Pod,
// because CNI on Windows is also responsible for the host-network Pod's networking as long as it is not using
// host-process containers.
func testWindowsPodConnectivityAfterAntreaRestart(t *testing.T, data *TestData) {
podInfos, cleanupFn := data.runPodsOnWindows(t)
defer cleanupFn()

data.runPingMesh(t, podInfos, toolboxContainerName, true)
err := data.RestartAntreaAgentPods(defaultTimeout)
assert.NoError(t, err)

data.runPingMesh(t, podInfos, toolboxContainerName, true)
}

func (data *TestData) runPodsOnWindows(t *testing.T) ([]PodInfo, func()) {
numPods := 2
podInfos := make([]PodInfo, numPods)
for idx := 0; idx < numPods; idx++ {
podName := randName(fmt.Sprintf("test-pod-%d-", idx))
nodeIdx := idx
if nodeIdx >= len(clusterInfo.windowsNodes) {
nodeIdx = 0
}
workerNode := workerNodeName(clusterInfo.windowsNodes[nodeIdx])

podInfo := PodInfo{
Name: podName,
OS: clusterInfo.nodesOS[workerNode],
NodeName: workerNode,
Namespace: data.testNamespace,
}

hostNetwork := false
if idx == 0 {
hostNetwork = true
}
t.Logf("Creating %d toolbox Pods on '%s'", numPods, workerNode)
if err := data.createToolboxPodOnNode(podInfo.Name, podInfo.Namespace, podInfo.NodeName, hostNetwork); err != nil {
t.Fatalf("Error when creating toolbox test Pod '%s': %v", podName, err)
}

podInfos[idx] = podInfo
}

return podInfos, func() {
for idx := range podInfos {
deletePodWrapper(t, data, podInfos[idx].Namespace, podInfos[idx].Name)
}
}
}

func (data *TestData) testPodConnectivitySameNode(t *testing.T) {
numPods := 2 // can be increased
podInfos := make([]PodInfo, numPods)
Expand Down

0 comments on commit c3d9724

Please sign in to comment.