Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Windows] Reconcile host-network Pods after agent is restarted #6944

Merged
merged 1 commit into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions pkg/agent/cniserver/pod_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,10 +445,6 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain
var podWg sync.WaitGroup

for _, pod := range pods {
// Skip Pods for which we are not in charge of the networking.
if pod.Spec.HostNetwork {
XinShuYang marked this conversation as resolved.
Show resolved Hide resolved
continue
}
desiredPods.Insert(k8s.NamespacedName(pod.Namespace, pod.Name))
for _, podIP := range pod.Status.PodIPs {
desiredPodIPs.Insert(podIP.IP)
Expand Down
14 changes: 7 additions & 7 deletions pkg/agent/cniserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/containernetworking/cni/pkg/version"
"github.com/containernetworking/plugins/pkg/ip"
"google.golang.org/grpc"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -756,14 +755,15 @@ func (s *CNIServer) interceptCheck(cniConfig *CNIConfig) (*cnipb.CniCmdResponse,
// reconcile performs startup reconciliation for the CNI server. The CNI server is in charge of
// installing Pod flows, so as part of this reconciliation process we retrieve the Pod list from the
// K8s apiserver and replay the necessary flows.
// The Pods are processed in reconcile as below,
// | Pod Type | Spec.HostNetwork | windowsOptions.hostProcess | OVS interface needed? | List Pods in reconcile |
// | Normal Pod (non-HostNetwork) | false | false or N/A | Yes | Yes |
// | Linux HostNetwork Pod | true | N/A | No | No |
// | Windows HostNetwork Pod | true | false | Yes | Yes |
// | Windows HostProcess Pod | true | true | No | Yes |
func (s *CNIServer) reconcile() error {
klog.InfoS("Starting reconciliation for CNI server")
// For performance reasons, use ResourceVersion="0" in the ListOptions to ensure the request is served from
// the watch cache in kube-apiserver.
pods, err := s.kubeClient.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
FieldSelector: "spec.nodeName=" + s.nodeConfig.Name,
ResourceVersion: "0",
})
pods, err := s.kubeClient.CoreV1().Pods("").List(context.TODO(), s.getPodsListOptions())
if err != nil {
return fmt.Errorf("failed to list Pods running on Node %s: %v", s.nodeConfig.Name, err)
}
Expand Down
17 changes: 16 additions & 1 deletion pkg/agent/cniserver/server_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@

package cniserver

import current "github.com/containernetworking/cni/pkg/types/100"
import (
"fmt"

current "github.com/containernetworking/cni/pkg/types/100"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// updateResultDNSConfig updates the DNS config from CNIConfig.
func updateResultDNSConfig(result *current.Result, cniConfig *CNIConfig) {
Expand Down Expand Up @@ -48,3 +53,13 @@ func validateRuntime(netNS string) error {
func (c *CNIConfig) getInfraContainer() string {
return c.ContainerId
}

// getPodsListOptions returns the none host-network Pods running on the current Node.
func (s *CNIServer) getPodsListOptions() metav1.ListOptions {
return metav1.ListOptions{
FieldSelector: fmt.Sprintf("spec.nodeName=%s,spec.hostNetwork=false", s.nodeConfig.Name),
// For performance reasons, use ResourceVersion="0" in the ListOptions to ensure the request is served from
// the watch cache in kube-apiserver.
ResourceVersion: "0",
}
}
13 changes: 13 additions & 0 deletions pkg/agent/cniserver/server_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strings"

current "github.com/containernetworking/cni/pkg/types/100"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
)

Expand Down Expand Up @@ -98,3 +99,15 @@ func getInfraContainer(containerID, netNS string) string {
func (c *CNIConfig) getInfraContainer() string {
return getInfraContainer(c.ContainerId, c.Netns)
}

Copy link
Contributor

@XinShuYang XinShuYang Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| Pod Type | Spec.HostNetwork | windowsOptions.hostProcess | Kubelet CMDADD | OVS interface needed? | Reconcile needed? |
| 1. Normal Pod (non-HostNetwork) | false | false / not set | Yes | Yes | Yes |
| 2. Windows HostNetwork Pod | true | false / not set | Yes | Yes | Yes |
| 3. Windows HostProcess Pod | (any) | true | No | No | No (no real network config) |

@wenyingd Can we add a table to the code comments to explain the different behaviors of Windows Pods for developers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

// getPodsListOptions returns the Pods running on the current Node. Note, the host-network Pods are not filtered
// out on Windows because they are also managed by antrea as long as "spec.SecurityContext.windowsOptions.hostProcess"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what will happen if "spec.SecurityContext.windowsOptions.hostProcess" is configured and the Pod is not filtered out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing was performed since we didn't create OVS interfaces for such Pods.

Having offline sync with @tnqn , we would not perform other filters on Windows in reconcile logic except for the node name to get local Pods and resourceVersion=0 to avoid performance issue, this is to try our best to ensure the reconciled Pod set can cover those Antrea have handled.

// is not configured.
func (s *CNIServer) getPodsListOptions() metav1.ListOptions {
return metav1.ListOptions{
FieldSelector: fmt.Sprintf("spec.nodeName=%s", s.nodeConfig.Name),
// For performance reasons, use ResourceVersion="0" in the ListOptions to ensure the request is served from
// the watch cache in kube-apiserver.
ResourceVersion: "0",
}
}
31 changes: 28 additions & 3 deletions pkg/agent/cniserver/server_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,30 @@ import (
"antrea.io/antrea/pkg/ovs/ovsconfig"
ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing"
"antrea.io/antrea/pkg/util/channel"
utilip "antrea.io/antrea/pkg/util/ip"
)

var (
containerMACStr = "23:34:56:23:22:45"
dnsSearches = []string{"a.b.c.d"}

mockWinnet *winnettest.MockInterface

interfaceForHostNetworkPod = &interfacestore.InterfaceConfig{
InterfaceName: "iface2",
Type: interfacestore.ContainerInterface,
IPs: []net.IP{net.ParseIP("1.1.1.2")},
MAC: utilip.MustParseMAC("00:11:22:33:44:02"),
OVSPortConfig: &interfacestore.OVSPortConfig{
PortUUID: generateUUID(),
OFPort: int32(4),
},
ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{
PodName: pod2.Name,
PodNamespace: testPodNamespace,
ContainerID: generateUUID(),
},
}
)

func TestUpdateResultDNSConfig(t *testing.T) {
Expand Down Expand Up @@ -732,7 +749,7 @@ func TestReconcile(t *testing.T) {
cniServer := newCNIServer(t)
cniServer.routeClient = mockRoute
cniServer.kubeClient = kubeClient
for _, containerIface := range []*interfacestore.InterfaceConfig{normalInterface, staleInterface, unconnectedInterface} {
for _, containerIface := range []*interfacestore.InterfaceConfig{normalInterface, staleInterface, unconnectedInterface, interfaceForHostNetworkPod} {
ifaceStore.AddInterface(containerIface)
}
waiter := newAsyncWaiter(unconnectedInterface.PodName, unconnectedInterface.ContainerID, stopCh)
Expand All @@ -741,11 +758,19 @@ func TestReconcile(t *testing.T) {
go cniServer.podConfigurator.Run(stopCh)

// Re-install Pod1 flows
podFlowsInstalled := make(chan string, 2)
expReinstalledPodCount := 3
podFlowsInstalled := make(chan string, expReinstalledPodCount)
mockOFClient.EXPECT().InstallPodFlows(normalInterface.InterfaceName, normalInterface.IPs, normalInterface.MAC, uint32(normalInterface.OFPort), uint16(0), nil).
Do(func(interfaceName string, _ []net.IP, _ net.HardwareAddr, _ uint32, _ uint16, _ *uint32) {
podFlowsInstalled <- interfaceName
}).Times(1)

// Re-install host-network Pod (Pod2) flows
mockOFClient.EXPECT().InstallPodFlows(interfaceForHostNetworkPod.InterfaceName, interfaceForHostNetworkPod.IPs, interfaceForHostNetworkPod.MAC, uint32(interfaceForHostNetworkPod.OFPort), uint16(0), nil).
Do(func(interfaceName string, _ []net.IP, _ net.HardwareAddr, _ uint32, _ uint16, _ *uint32) {
podFlowsInstalled <- interfaceName
}).Times(1)

// Uninstall Pod3 flows which is deleted.
mockOFClient.EXPECT().UninstallPodFlows(staleInterface.InterfaceName).Return(nil).Times(1)
mockOVSBridgeClient.EXPECT().DeletePort(staleInterface.PortUUID).Return(nil).Times(1)
Expand Down Expand Up @@ -778,7 +803,7 @@ func TestReconcile(t *testing.T) {
assert.NoError(t, err)
_, exists := ifaceStore.GetInterfaceByName("iface3")
assert.False(t, exists)
for i := 0; i < 2; i++ {
for i := 0; i < expReinstalledPodCount; i++ {
select {
case <-podFlowsInstalled:
case <-time.After(500 * time.Millisecond):
Expand Down
120 changes: 100 additions & 20 deletions test/e2e/connectivity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func TestConnectivity(t *testing.T) {
skipIfNumNodesLessThan(t, 2)
testPingLargeMTU(t, data)
})
t.Run("testWindowsPodConnectivityAfterAntreaRestart", func(t *testing.T) {
skipIfNoWindowsNodes(t)
testWindowsPodConnectivityAfterAntreaRestart(t, data)
})
}

func waitForPodIPs(t *testing.T, data *TestData, podInfos []PodInfo) map[string]*PodIPs {
Expand Down Expand Up @@ -121,6 +125,100 @@ func (data *TestData) runPingMesh(t *testing.T, podInfos []PodInfo, ctrname stri
}
}

// verifyWindowsPodConnectivity checks Pod connectivity after antrea-agent is restarted on Windows.
// We test both the generic Pod case and the host-network Pod case, because CNI on Windows is also
// responsible for the host-network Pod's networking as long as it is not using host-process containers.
func testWindowsPodConnectivityAfterAntreaRestart(t *testing.T, data *TestData) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess one issue with the test as currently written is that if there is a failure, it's not clear which one of the 2 Pods may be responsible (assuming only 1 Pod is broken after restart).
Maybe we could have 1 Linux Pod as the client, and then use it to ping each Windows Pod independently? This way we could split this test into 2 subtests, one for the regular Pod and one for the hostNetwork Pod. Would that make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, updated.

linuxWorkerNode := clusterInfo.controlPlaneNodeName
linuxPodName := randName("test-pod-")
clientPod := PodInfo{
Name: linuxPodName,
Namespace: data.testNamespace,
NodeName: linuxWorkerNode,
OS: "linux",
}

t.Logf("Creating Linux Pod %s on Node '%s'", linuxPodName, linuxWorkerNode)
if err := data.createToolboxPodOnNode(clientPod.Name, clientPod.Namespace, clientPod.NodeName, false); err != nil {
t.Fatalf("Error when creating Pod '%s': %v", clientPod.Name, err)
}
defer deletePodWrapper(t, data, clientPod.Namespace, clientPod.Name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feel free to omit this, especially for a simple Linux Pod like this one, the Pod will be automatically deleted when the test Namespace is deleted at the end of the test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep this code since personally I prefer to clean up the new resources created dedicated for the case after the test.


t.Run("testGenericPodConnectivity", func(t *testing.T) {
data.verifyWindowsPodConnectivity(t, clientPod, false)
})
t.Run("testHostNetworkPodConnectivity", func(t *testing.T) {
data.verifyWindowsPodConnectivity(t, clientPod, true)
})
}

func (data *TestData) dumpOVSFlows(t *testing.T, workerNode string) []string {
ovsOfctlCmd := "ovs-ofctl"
if clusterInfo.nodesOS[workerNode] == "windows" {
ovsOfctlCmd = `c:/openvswitch/usr/bin/ovs-ofctl.exe`
}
cmd := []string{ovsOfctlCmd, "dump-flows", defaultBridgeName, "--names"}
antreaPodName, err := data.getAntreaPodOnNode(workerNode)
if err != nil {
t.Fatalf("Error when retrieving the name of the Antrea Pod running on Node '%s': %v", workerNode, err)
}
stdout, stderr, err := data.RunCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, cmd)
if err != nil {
t.Fatalf("error when dumping flows: <%v>, err: <%v>", stderr, err)
}
flows := make([]string, 0)
for _, flow := range strings.Split(stdout, "\n") {
flow = strings.TrimSpace(flow)
if flow == "" {
continue
}
flows = append(flows, flow)
}
t.Logf("Counted %d flow in OVS bridge '%s' for Node '%s'", len(flows), defaultBridgeName, workerNode)
return flows
}

func (data *TestData) verifyWindowsPodConnectivity(t *testing.T, clientPod PodInfo, useHostNetwork bool) {
winPodName := randName("test-pod-")
winWorkerNode := workerNodeName(clusterInfo.windowsNodes[0])
winPod := PodInfo{
Name: winPodName,
Namespace: data.testNamespace,
NodeName: winWorkerNode,
OS: "windows",
}
t.Logf("Creating Windows Pod %s on Node '%s'", winPodName, winWorkerNode)
if err := data.createToolboxPodOnNode(winPod.Name, winPod.Namespace, winPod.NodeName, useHostNetwork); err != nil {
t.Fatalf("Error when creating Pod '%s': %v", winPodName, err)
}
defer deletePodWrapper(t, data, winPod.Namespace, winPod.Name)

testPodInfos := []PodInfo{clientPod, winPod}

// Verify Pod connectivity before agent restart
data.runPingMesh(t, testPodInfos, toolboxContainerName, true)

// Count the OVS flows.
initialOVSFlows := data.dumpOVSFlows(t, winWorkerNode)

// Restart Antrea agent Pods
err := data.RestartAntreaAgentPods(defaultTimeout)
assert.NoError(t, err)

// Wait until Agent completes reconcile and OpenFlows replay.
err = wait.PollUntilContextTimeout(context.Background(), 5*time.Second, 1*time.Minute, false, func(ctx context.Context) (done bool, err error) {
newOVSFlows := data.dumpOVSFlows(t, winWorkerNode)
if len(newOVSFlows) != len(initialOVSFlows) {
return false, nil
}
return true, nil
})
assert.NoErrorf(t, err, "The Openflow entries should be consistent after Antrea agent restarts on Windows Node %s", winWorkerNode)

// Verify Pod connectivity after agent restart
data.runPingMesh(t, testPodInfos, toolboxContainerName, true)
}

func (data *TestData) testPodConnectivitySameNode(t *testing.T) {
numPods := 2 // can be increased
podInfos := make([]PodInfo, numPods)
Expand Down Expand Up @@ -411,24 +509,6 @@ func testOVSFlowReplay(t *testing.T, data *TestData, namespace string) {
}
t.Logf("The Antrea Pod for Node '%s' is '%s'", workerNode, antreaPodName)

dumpFlows := func() []string {
cmd := []string{"ovs-ofctl", "dump-flows", defaultBridgeName, "--names"}
stdout, stderr, err := data.RunCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, cmd)
if err != nil {
t.Fatalf("error when dumping flows: <%v>, err: <%v>", stderr, err)
}
flows := make([]string, 0)
for _, flow := range strings.Split(stdout, "\n") {
flow = strings.TrimSpace(flow)
if flow == "" {
continue
}
flows = append(flows, flow)
}
count := len(flows)
t.Logf("Counted %d flow in OVS bridge '%s' for Node '%s'", count, defaultBridgeName, workerNode)
return flows
}
dumpGroups := func() []string {
cmd := []string{"ovs-ofctl", "dump-groups", defaultBridgeName}
stdout, stderr, err := data.RunCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, cmd)
Expand All @@ -449,7 +529,7 @@ func testOVSFlowReplay(t *testing.T, data *TestData, namespace string) {
return groups
}

flows1, groups1 := dumpFlows(), dumpGroups()
flows1, groups1 := data.dumpOVSFlows(t, workerNode), dumpGroups()
numFlows1, numGroups1 := len(flows1), len(groups1)

// This is necessary because "ovs-ctl restart" saves and restores OpenFlow flows for the
Expand Down Expand Up @@ -486,7 +566,7 @@ func testOVSFlowReplay(t *testing.T, data *TestData, namespace string) {
t.Logf("Running second ping mesh to check that flows have been restored")
data.runPingMesh(t, podInfos, toolboxContainerName, true)

flows2, groups2 := dumpFlows(), dumpGroups()
flows2, groups2 := data.dumpOVSFlows(t, workerNode), dumpGroups()
numFlows2, numGroups2 := len(flows2), len(groups2)
if !assert.Equal(t, numFlows1, numFlows2, "Mismatch in OVS flow count after flow replay") {
fmt.Println("Flows before replay:")
Expand Down
Loading