diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 1400a290e6f..7e7970a176d 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -619,7 +619,6 @@ func (c *client) InstallGatewayFlows() error { // Add flow to ensure the liveness check packet could be forwarded correctly. flows = append(flows, c.localProbeFlow(gatewayIPs, cookie.Default)...) - flows = append(flows, c.ctRewriteDstMACFlows(gatewayConfig.MAC, cookie.Default)...) flows = append(flows, c.l3FwdFlowToGateway(gatewayIPs, gatewayConfig.MAC, cookie.Default)...) if err := c.ofEntryOperations.AddAll(flows); err != nil { diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index c64cfb35941..e6034745f58 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -636,18 +636,6 @@ func (c *client) connectionTrackFlows(category cookie.Category) []binding.Flow { ctZone = CtZoneV6 } flows = append(flows, - // If a connection was initiated through the gateway (i.e. has gatewayCTMark) and - // the packet is received on the gateway port, go to the next table directly. This - // is to bypass the flow which is installed by ctRewriteDstMACFlow in the same - // table, and which will rewrite the destination MAC address for traffic with the - // gatewayCTMark, but which is flowing in the opposite direction. - connectionTrackStateTable.BuildFlow(priorityHigh).MatchProtocol(proto). - MatchRegRange(int(marksReg), markTrafficFromGateway, binding.Range{0, 15}). - MatchCTMark(gatewayCTMark, nil). - MatchCTStateNew(false).MatchCTStateTrk(true). - Action().GotoTable(connectionTrackStateTable.GetNext()). - Cookie(c.cookieAllocator.Request(category).Raw()). - Done(), // Connections initiated through the gateway are marked with gatewayCTMark. connectionTrackCommitTable.BuildFlow(priorityNormal).MatchProtocol(proto). MatchRegRange(int(marksReg), markTrafficFromGateway, binding.Range{0, 15}). @@ -895,36 +883,6 @@ func (c *client) traceflowNetworkPolicyFlows(dataplaneTag uint8, timeout uint16, return flows } -// ctRewriteDstMACFlow rewrites the destination MAC address with the local host gateway MAC if the -// packet is marked with gatewayCTMark but was not received on the host gateway. In other words, it -// rewrites the destination MAC address for reply traffic for connections which were initiated -// through the gateway, to ensure that this reply traffic gets forwarded correctly (back to the host -// network namespace, through the gateway). In particular, it is necessary in the following 2 cases: -// 1) reply traffic for connections from a local Pod to a ClusterIP Service (when AntreaProxy is -// disabled and kube-proxy is used). In this case the destination IP address of the reply traffic -// is the Pod which initiated the connection to the Service (no SNAT). We need to make sure that -// these packets are sent back through the gateway so that the source IP can be rewritten (Service -// backend IP -> Service ClusterIP). -// 2) when hair-pinning is involved, i.e. for connections between 2 local Pods belonging to this -// Node and for which NAT is performed. This applies regardless of whether AntreaProxy is enabled -// or not, and thus also applies to Windows Nodes (for which AntreaProxy is enabled by default). -// One example is a Pod accessing a NodePort Service for which externalTrafficPolicy is set to -// Local, using the local Node's IP address. -func (c *client) ctRewriteDstMACFlows(gatewayMAC net.HardwareAddr, category cookie.Category) []binding.Flow { - connectionTrackStateTable := c.pipeline[conntrackStateTable] - var flows []binding.Flow - for _, proto := range c.ipProtocols { - flows = append(flows, connectionTrackStateTable.BuildFlow(priorityNormal).MatchProtocol(proto). - MatchCTMark(gatewayCTMark, nil). - MatchCTStateNew(false).MatchCTStateTrk(true). - Action().SetDstMAC(gatewayMAC). - Action().GotoTable(connectionTrackStateTable.GetNext()). - Cookie(c.cookieAllocator.Request(category).Raw()). - Done()) - } - return flows -} - // serviceLBBypassFlows makes packets that belong to a tracked connection bypass // service LB tables and enter egressRuleTable directly. func (c *client) serviceLBBypassFlows(ipProtocol binding.Protocol) []binding.Flow { @@ -1209,7 +1167,7 @@ func (c *client) l3FwdFlowRouteToGW(gwMAC net.HardwareAddr, category cookie.Cate } // l3FwdFlowToGateway generates the L3 forward flows to rewrite the destination MAC of the packets to the gateway interface -// MAC if the destination IP is the gateway IP. +// MAC if the destination IP is the gateway IP or the connection was initiated through the gateway interface. func (c *client) l3FwdFlowToGateway(localGatewayIPs []net.IP, localGatewayMAC net.HardwareAddr, category cookie.Category) []binding.Flow { l3FwdTable := c.pipeline[l3ForwardingTable] var flows []binding.Flow @@ -1223,6 +1181,29 @@ func (c *client) l3FwdFlowToGateway(localGatewayIPs []net.IP, localGatewayMAC ne Cookie(c.cookieAllocator.Request(category).Raw()). Done()) } + // Rewrites the destination MAC address with the local host gateway MAC if the packet is in the reply direction and + // is marked with gatewayCTMark. In other words, it rewrites the destination MAC address for reply traffic for + // connections which were initiated through the gateway, to ensure that this reply traffic gets forwarded correctly + // (back to the host network namespace, through the gateway). In particular, it is necessary in the following 2 cases: + // 1) reply traffic for connections from a local Pod to a ClusterIP Service (when AntreaProxy is disabled and + // kube-proxy is used). In this case the destination IP address of the reply traffic + // is the Pod which initiated the connection to the Service (no SNAT). We need to make sure that + // these packets are sent back through the gateway so that the source IP can be rewritten (Service + // backend IP -> Service ClusterIP). + // 2) when hair-pinning is involved, i.e. for connections between 2 local Pods belonging to this + // Node and for which NAT is performed. This applies regardless of whether AntreaProxy is enabled + // or not, and thus also applies to Windows Nodes (for which AntreaProxy is enabled by default). + // One example is a Pod accessing a NodePort Service for which externalTrafficPolicy is set to + // Local, using the local Node's IP address. + for _, proto := range c.ipProtocols { + flows = append(flows, l3FwdTable.BuildFlow(priorityHigh).MatchProtocol(proto). + MatchCTMark(gatewayCTMark, nil). + MatchCTStateRpl(true).MatchCTStateTrk(true). + Action().SetDstMAC(localGatewayMAC). + Action().GotoTable(l3FwdTable.GetNext()). + Cookie(c.cookieAllocator.Request(category).Raw()). + Done()) + } return flows } diff --git a/test/e2e/service_test.go b/test/e2e/service_test.go index 0a3eb14bf3c..2d58cb5d3c7 100644 --- a/test/e2e/service_test.go +++ b/test/e2e/service_test.go @@ -16,6 +16,7 @@ package e2e import ( "fmt" + "net" "strings" "testing" @@ -23,8 +24,8 @@ import ( corev1 "k8s.io/api/core/v1" ) -// TestClusterIPHostAccess tests traffic from host to ClusterIP Service. -func TestClusterIPHostAccess(t *testing.T) { +// TestClusterIP tests traffic from Nodes and Pods to ClusterIP Service. +func TestClusterIP(t *testing.T) { // TODO: Support for dual-stack and IPv6-only clusters skipIfIPv6Cluster(t) @@ -35,35 +36,57 @@ func TestClusterIPHostAccess(t *testing.T) { defer teardownTest(t, data) svcName := "nginx" - node := nodeName(0) - svc, cleanup := data.createClusterIPServiceAndBackendPods(t, svcName, node) + serverPodNode := nodeName(0) + svc, cleanup := data.createClusterIPServiceAndBackendPods(t, svcName, serverPodNode) defer cleanup() t.Logf("%s Service is ready", svcName) - var linNode, winNode string - linNode = node - if len(clusterInfo.windowsNodes) != 0 { - idx := clusterInfo.windowsNodes[0] - winNode = clusterInfo.nodes[idx].name - } - - curlSvc := func(node string) { + testFromNode := func(node string) { // Retry is needed for rules to be installed by kube-proxy/antrea-proxy. cmd := fmt.Sprintf("curl --connect-timeout 1 --retry 5 --retry-connrefused %s:80", svc.Spec.ClusterIP) rc, stdout, stderr, err := RunCommandOnNode(node, cmd) if rc != 0 || err != nil { t.Errorf("Error when running command '%s' on Node '%s', rc: %d, stdout: %s, stderr: %s, error: %v", cmd, node, rc, stdout, stderr, err) - } else { - t.Logf("curl from Node '%s' succeeded", node) } } - t.Logf("Try to curl ClusterIP Service from a Linux host") - curlSvc(linNode) - if winNode != "" { - t.Logf("Try to curl Cluster IP Service from a Windows host") - curlSvc(winNode) + + testFromPod := func(podName, nodeName string) { + require.NoError(t, data.createBusyboxPodOnNode(podName, nodeName)) + defer data.deletePodAndWait(defaultTimeout, podName) + require.NoError(t, data.podWaitForRunning(defaultTimeout, podName, testNamespace)) + if err := data.runNetcatCommandFromTestPod(podName, svc.Spec.ClusterIP, 80); err != nil { + t.Fatalf("Pod %s should be able to connect %s, but was not able to connect", podName, net.JoinHostPort(svc.Spec.ClusterIP, fmt.Sprint(80))) + } } + + t.Run("ClusterIP", func(t *testing.T) { + t.Run("Same Linux Node can access the Service", func(t *testing.T) { + t.Parallel() + testFromNode(serverPodNode) + }) + t.Run("Different Linux Node can access the Service", func(t *testing.T) { + t.Parallel() + skipIfNumNodesLessThan(t, 2) + testFromNode(nodeName(1)) + }) + t.Run("Windows host can access the Service", func(t *testing.T) { + t.Parallel() + skipIfNoWindowsNodes(t) + idx := clusterInfo.windowsNodes[0] + winNode := clusterInfo.nodes[idx].name + testFromNode(winNode) + }) + t.Run("Linux Pod on same Node can access the Service", func(t *testing.T) { + t.Parallel() + testFromPod("client-on-same-node", serverPodNode) + }) + t.Run("Linux Pod on different Node can access the Service", func(t *testing.T) { + t.Parallel() + skipIfNumNodesLessThan(t, 2) + testFromPod("client-on-different-node", nodeName(1)) + }) + }) } func (data *TestData) createClusterIPServiceAndBackendPods(t *testing.T, name string, node string) (*corev1.Service, func()) { diff --git a/test/integration/agent/openflow_test.go b/test/integration/agent/openflow_test.go index 74ff7e16a02..b82b57e191d 100644 --- a/test/integration/agent/openflow_test.go +++ b/test/integration/agent/openflow_test.go @@ -1072,11 +1072,11 @@ func prepareGatewayFlows(gwIPs []net.IP, gwMAC net.HardwareAddr, vMAC net.Hardwa }, }, expectTableFlows{ - uint8(31), + uint8(70), []*ofTestUtils.ExpectFlow{ { - MatchStr: fmt.Sprintf("priority=200,ct_state=-new+trk,ct_mark=0x20,%s", ipProtoStr), - ActStr: fmt.Sprintf("set_field:%s->eth_dst,goto_table:42", gwMAC.String()), + MatchStr: fmt.Sprintf("priority=210,ct_state=+rpl+trk,ct_mark=0x20,%s", ipProtoStr), + ActStr: fmt.Sprintf("set_field:%s->eth_dst,goto_table:80", gwMAC.String()), }, }, }, @@ -1170,7 +1170,6 @@ func prepareDefaultFlows(config *testConfig) []expectTableFlows { } if config.enableIPv4 { table31Flows.flows = append(table31Flows.flows, - &ofTestUtils.ExpectFlow{MatchStr: "priority=210,ct_state=-new+trk,ct_mark=0x20,ip,reg0=0x1/0xffff", ActStr: "goto_table:42"}, &ofTestUtils.ExpectFlow{MatchStr: "priority=190,ct_state=+inv+trk,ip", ActStr: "drop"}, ) table105Flows.flows = append(table105Flows.flows, @@ -1184,7 +1183,6 @@ func prepareDefaultFlows(config *testConfig) []expectTableFlows { } if config.enableIPv6 { table31Flows.flows = append(table31Flows.flows, - &ofTestUtils.ExpectFlow{MatchStr: "priority=210,ct_state=-new+trk,ct_mark=0x20,ipv6,reg0=0x1/0xffff", ActStr: "goto_table:42"}, &ofTestUtils.ExpectFlow{MatchStr: "priority=190,ct_state=+inv+trk,ipv6", ActStr: "drop"}, ) table105Flows.flows = append(table105Flows.flows,