diff --git a/docs/design/ovs-pipeline.md b/docs/design/ovs-pipeline.md index 0aa204c1fc4..95607f832df 100644 --- a/docs/design/ovs-pipeline.md +++ b/docs/design/ovs-pipeline.md @@ -337,46 +337,16 @@ This table handles all "tracked" packets (all packets are moved to the tracked state by the previous table, [ConntrackTable]). It serves the following purposes: -* keeps track of connections initiated through the gateway port, i.e. for which - the first packet of the connection (SYN packet for TCP) was received through - the gateway. For all reply packets belonging to such connections we overwrite - the destination MAC to the local gateway MAC to ensure that they get forwarded - though the gateway port. This is required to handle the following cases: - - reply traffic for connections from a local Pod to a ClusterIP Service, which - are handled by kube-proxy and go through DNAT. In this case the destination - IP address of the reply traffic is the Pod which initiated the connection to - the Service (no SNAT by kube-proxy). We need to make sure that these packets - are sent back through the gateway so that the source IP can be rewritten to - the ClusterIP ("undo" DNAT). If we do not use connection tracking and do not - rewrite the destination MAC, reply traffic from the backend will go directly - to the originating Pod without going first through the gateway and - kube-proxy. This means that the reply traffic will arrive at the - originating Pod with the incorrect source IP (it will be set to the - backend's IP instead of the Service IP). - - when hair-pinning is involved, i.e. for connections between 2 local Pods and - for which NAT is performed. One example is a Pod accessing a NodePort - Service for which `externalTrafficPolicy` is set to `Local` using the local - Node's IP address, as there will be no SNAT for such traffic. Another - example could be `hostPort` support, depending on how the feature is - implemented. * drop packets reported as invalid by conntrack If you dump the flows for this table, you should see the following: ```text -1. table=31, priority=210,ct_state=-new+trk,ct_mark=0x20,ip,reg0=0x1/0xffff actions=goto_table:40 -2. table=31, priority=200,ct_state=+inv+trk,ip actions=drop -3. table=31, priority=200,ct_state=-new+trk,ct_mark=0x20,ip actions=mod_dl_dst:e2:e5:a4:9b:1c:b1,goto_table:40 -4. table=31, priority=0 actions=goto_table:40 +1. table=31, priority=190,ct_state=+inv+trk,ip actions=drop +2. table=31, priority=0 actions=goto_table:40 ``` -Flows 1 and 3 implement the destination MAC rewrite described above. Note that -at this stage we have not committed any connection yet. We commit all -connections after enforcing Network Policies, in [ConntrackCommitTable]. This is -also when we set the `ct_mark` to `0x20` for connections initiated through the -gateway. - -Flow 2 drops invalid traffic. All non-dropped traffic finally goes to the +Flow 1 drops invalid traffic. All non-dropped traffic finally goes to the [DNATTable]. ### DNATTable (40) @@ -590,6 +560,35 @@ table=70, priority=200,ip,reg0=0x80000/0x80000,nw_dst=10.10.0.2 actions=mod_dl_s table=70, priority=200,ip,reg0=0x80000/0x80000,nw_dst=10.10.0.1 actions=mod_dl_dst:e2:e5:a4:9b:1c:b1,goto_table:80 ``` +* All reply traffic of connections initiated through the gateway port, i.e. for + which the first packet of the connection (SYN packet for TCP) was received + through the gateway. Such packets can be identified by the packet's direction + in `ct_state` and the `ct_mark` value `0x20` which is committed in + [ConntrackCommitTable] when the first packet of the connection was handled. + A flow will overwrite the destination MAC to the local gateway MAC to ensure + that they get forwarded through the gateway port. This is required to handle + the following cases: + - reply traffic for connections from a local Pod to a ClusterIP Service, which + are handled by kube-proxy and go through DNAT. In this case the destination + IP address of the reply traffic is the Pod which initiated the connection to + the Service (no SNAT by kube-proxy). We need to make sure that these packets + are sent back through the gateway so that the source IP can be rewritten to + the ClusterIP ("undo" DNAT). If we do not use connection tracking and do not + rewrite the destination MAC, reply traffic from the backend will go directly + to the originating Pod without going first through the gateway and + kube-proxy. This means that the reply traffic will arrive at the originating + Pod with the incorrect source IP (it will be set to the backend's IP instead + of the Service IP). + - when hair-pinning is involved, i.e. connections between 2 local Pods, for + which NAT is performed. One example is a Pod accessing a NodePort Service + for which `externalTrafficPolicy` is set to `Local` using the local Node's + IP address, as there will be no SNAT for such traffic. Another example could + be `hostPort` support, depending on how the feature is implemented. + +```text +table=70, priority=210,ct_state=+rpl+trk,ct_mark=0x20,ip actions=mod_dl_dst:e2:e5:a4:9b:1c:b1,goto_table:80 +``` + * All traffic destined to a remote Pod is forwarded through the appropriate tunnel. This means that we install one flow for each peer Node, each one matching the destination IP address of the packet against the Pod subnet for diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 1400a290e6f..7e7970a176d 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -619,7 +619,6 @@ func (c *client) InstallGatewayFlows() error { // Add flow to ensure the liveness check packet could be forwarded correctly. flows = append(flows, c.localProbeFlow(gatewayIPs, cookie.Default)...) - flows = append(flows, c.ctRewriteDstMACFlows(gatewayConfig.MAC, cookie.Default)...) flows = append(flows, c.l3FwdFlowToGateway(gatewayIPs, gatewayConfig.MAC, cookie.Default)...) if err := c.ofEntryOperations.AddAll(flows); err != nil { diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index c64cfb35941..d2607340b4e 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -636,18 +636,6 @@ func (c *client) connectionTrackFlows(category cookie.Category) []binding.Flow { ctZone = CtZoneV6 } flows = append(flows, - // If a connection was initiated through the gateway (i.e. has gatewayCTMark) and - // the packet is received on the gateway port, go to the next table directly. This - // is to bypass the flow which is installed by ctRewriteDstMACFlow in the same - // table, and which will rewrite the destination MAC address for traffic with the - // gatewayCTMark, but which is flowing in the opposite direction. - connectionTrackStateTable.BuildFlow(priorityHigh).MatchProtocol(proto). - MatchRegRange(int(marksReg), markTrafficFromGateway, binding.Range{0, 15}). - MatchCTMark(gatewayCTMark, nil). - MatchCTStateNew(false).MatchCTStateTrk(true). - Action().GotoTable(connectionTrackStateTable.GetNext()). - Cookie(c.cookieAllocator.Request(category).Raw()). - Done(), // Connections initiated through the gateway are marked with gatewayCTMark. connectionTrackCommitTable.BuildFlow(priorityNormal).MatchProtocol(proto). MatchRegRange(int(marksReg), markTrafficFromGateway, binding.Range{0, 15}). @@ -895,36 +883,6 @@ func (c *client) traceflowNetworkPolicyFlows(dataplaneTag uint8, timeout uint16, return flows } -// ctRewriteDstMACFlow rewrites the destination MAC address with the local host gateway MAC if the -// packet is marked with gatewayCTMark but was not received on the host gateway. In other words, it -// rewrites the destination MAC address for reply traffic for connections which were initiated -// through the gateway, to ensure that this reply traffic gets forwarded correctly (back to the host -// network namespace, through the gateway). In particular, it is necessary in the following 2 cases: -// 1) reply traffic for connections from a local Pod to a ClusterIP Service (when AntreaProxy is -// disabled and kube-proxy is used). In this case the destination IP address of the reply traffic -// is the Pod which initiated the connection to the Service (no SNAT). We need to make sure that -// these packets are sent back through the gateway so that the source IP can be rewritten (Service -// backend IP -> Service ClusterIP). -// 2) when hair-pinning is involved, i.e. for connections between 2 local Pods belonging to this -// Node and for which NAT is performed. This applies regardless of whether AntreaProxy is enabled -// or not, and thus also applies to Windows Nodes (for which AntreaProxy is enabled by default). -// One example is a Pod accessing a NodePort Service for which externalTrafficPolicy is set to -// Local, using the local Node's IP address. -func (c *client) ctRewriteDstMACFlows(gatewayMAC net.HardwareAddr, category cookie.Category) []binding.Flow { - connectionTrackStateTable := c.pipeline[conntrackStateTable] - var flows []binding.Flow - for _, proto := range c.ipProtocols { - flows = append(flows, connectionTrackStateTable.BuildFlow(priorityNormal).MatchProtocol(proto). - MatchCTMark(gatewayCTMark, nil). - MatchCTStateNew(false).MatchCTStateTrk(true). - Action().SetDstMAC(gatewayMAC). - Action().GotoTable(connectionTrackStateTable.GetNext()). - Cookie(c.cookieAllocator.Request(category).Raw()). - Done()) - } - return flows -} - // serviceLBBypassFlows makes packets that belong to a tracked connection bypass // service LB tables and enter egressRuleTable directly. func (c *client) serviceLBBypassFlows(ipProtocol binding.Protocol) []binding.Flow { @@ -1209,7 +1167,7 @@ func (c *client) l3FwdFlowRouteToGW(gwMAC net.HardwareAddr, category cookie.Cate } // l3FwdFlowToGateway generates the L3 forward flows to rewrite the destination MAC of the packets to the gateway interface -// MAC if the destination IP is the gateway IP. +// MAC if the destination IP is the gateway IP or the connection was initiated through the gateway interface. func (c *client) l3FwdFlowToGateway(localGatewayIPs []net.IP, localGatewayMAC net.HardwareAddr, category cookie.Category) []binding.Flow { l3FwdTable := c.pipeline[l3ForwardingTable] var flows []binding.Flow @@ -1223,6 +1181,27 @@ func (c *client) l3FwdFlowToGateway(localGatewayIPs []net.IP, localGatewayMAC ne Cookie(c.cookieAllocator.Request(category).Raw()). Done()) } + // Rewrite the destination MAC address with the local host gateway MAC if the packet is in the reply direction and + // is marked with gatewayCTMark. This is for connections which were initiated through the gateway, to ensure that + // this reply traffic gets forwarded correctly (back to the host network namespace, through the gateway). In + // particular, it is necessary in the following 2 cases: + // 1) reply traffic for connections from a local Pod to a ClusterIP Service (when AntreaProxy is disabled and + // kube-proxy is used). In this case the destination IP address of the reply traffic is the Pod which initiated the + // connection to the Service (no SNAT). We need to make sure that these packets are sent back through the gateway + // so that the source IP can be rewritten (Service backend IP -> Service ClusterIP). + // 2) when hair-pinning is involved, i.e. connections between 2 local Pods, for which NAT is performed. This + // applies regardless of whether AntreaProxy is enabled or not, and thus also applies to Windows Nodes (for which + // AntreaProxy is enabled by default). One example is a Pod accessing a NodePort Service for which + // externalTrafficPolicy is set to Local, using the local Node's IP address. + for _, proto := range c.ipProtocols { + flows = append(flows, l3FwdTable.BuildFlow(priorityHigh).MatchProtocol(proto). + MatchCTMark(gatewayCTMark, nil). + MatchCTStateRpl(true).MatchCTStateTrk(true). + Action().SetDstMAC(localGatewayMAC). + Action().GotoTable(l3FwdTable.GetNext()). + Cookie(c.cookieAllocator.Request(category).Raw()). + Done()) + } return flows } diff --git a/test/e2e/service_test.go b/test/e2e/service_test.go index 0a3eb14bf3c..12db7c8bd71 100644 --- a/test/e2e/service_test.go +++ b/test/e2e/service_test.go @@ -16,6 +16,7 @@ package e2e import ( "fmt" + "net" "strings" "testing" @@ -23,8 +24,8 @@ import ( corev1 "k8s.io/api/core/v1" ) -// TestClusterIPHostAccess tests traffic from host to ClusterIP Service. -func TestClusterIPHostAccess(t *testing.T) { +// TestClusterIP tests traffic from Nodes and Pods to ClusterIP Service. +func TestClusterIP(t *testing.T) { // TODO: Support for dual-stack and IPv6-only clusters skipIfIPv6Cluster(t) @@ -35,35 +36,56 @@ func TestClusterIPHostAccess(t *testing.T) { defer teardownTest(t, data) svcName := "nginx" - node := nodeName(0) - svc, cleanup := data.createClusterIPServiceAndBackendPods(t, svcName, node) + serverPodNode := nodeName(0) + svc, cleanup := data.createClusterIPServiceAndBackendPods(t, svcName, serverPodNode) defer cleanup() t.Logf("%s Service is ready", svcName) - var linNode, winNode string - linNode = node - if len(clusterInfo.windowsNodes) != 0 { - idx := clusterInfo.windowsNodes[0] - winNode = clusterInfo.nodes[idx].name - } - - curlSvc := func(node string) { + testFromNode := func(node string) { // Retry is needed for rules to be installed by kube-proxy/antrea-proxy. cmd := fmt.Sprintf("curl --connect-timeout 1 --retry 5 --retry-connrefused %s:80", svc.Spec.ClusterIP) rc, stdout, stderr, err := RunCommandOnNode(node, cmd) if rc != 0 || err != nil { t.Errorf("Error when running command '%s' on Node '%s', rc: %d, stdout: %s, stderr: %s, error: %v", cmd, node, rc, stdout, stderr, err) - } else { - t.Logf("curl from Node '%s' succeeded", node) } } - t.Logf("Try to curl ClusterIP Service from a Linux host") - curlSvc(linNode) - if winNode != "" { - t.Logf("Try to curl Cluster IP Service from a Windows host") - curlSvc(winNode) + + testFromPod := func(podName, nodeName string) { + require.NoError(t, data.createBusyboxPodOnNode(podName, nodeName)) + defer data.deletePodAndWait(defaultTimeout, podName) + require.NoError(t, data.podWaitForRunning(defaultTimeout, podName, testNamespace)) + err := data.runNetcatCommandFromTestPod(podName, svc.Spec.ClusterIP, 80) + require.NoError(t, err, "Pod %s should be able to connect %s, but was not able to connect", podName, net.JoinHostPort(svc.Spec.ClusterIP, fmt.Sprint(80))) } + + t.Run("ClusterIP", func(t *testing.T) { + t.Run("Same Linux Node can access the Service", func(t *testing.T) { + t.Parallel() + testFromNode(serverPodNode) + }) + t.Run("Different Linux Node can access the Service", func(t *testing.T) { + t.Parallel() + skipIfNumNodesLessThan(t, 2) + testFromNode(nodeName(1)) + }) + t.Run("Windows host can access the Service", func(t *testing.T) { + t.Parallel() + skipIfNoWindowsNodes(t) + idx := clusterInfo.windowsNodes[0] + winNode := clusterInfo.nodes[idx].name + testFromNode(winNode) + }) + t.Run("Linux Pod on same Node can access the Service", func(t *testing.T) { + t.Parallel() + testFromPod("client-on-same-node", serverPodNode) + }) + t.Run("Linux Pod on different Node can access the Service", func(t *testing.T) { + t.Parallel() + skipIfNumNodesLessThan(t, 2) + testFromPod("client-on-different-node", nodeName(1)) + }) + }) } func (data *TestData) createClusterIPServiceAndBackendPods(t *testing.T, name string, node string) (*corev1.Service, func()) { diff --git a/test/integration/agent/openflow_test.go b/test/integration/agent/openflow_test.go index 74ff7e16a02..b82b57e191d 100644 --- a/test/integration/agent/openflow_test.go +++ b/test/integration/agent/openflow_test.go @@ -1072,11 +1072,11 @@ func prepareGatewayFlows(gwIPs []net.IP, gwMAC net.HardwareAddr, vMAC net.Hardwa }, }, expectTableFlows{ - uint8(31), + uint8(70), []*ofTestUtils.ExpectFlow{ { - MatchStr: fmt.Sprintf("priority=200,ct_state=-new+trk,ct_mark=0x20,%s", ipProtoStr), - ActStr: fmt.Sprintf("set_field:%s->eth_dst,goto_table:42", gwMAC.String()), + MatchStr: fmt.Sprintf("priority=210,ct_state=+rpl+trk,ct_mark=0x20,%s", ipProtoStr), + ActStr: fmt.Sprintf("set_field:%s->eth_dst,goto_table:80", gwMAC.String()), }, }, }, @@ -1170,7 +1170,6 @@ func prepareDefaultFlows(config *testConfig) []expectTableFlows { } if config.enableIPv4 { table31Flows.flows = append(table31Flows.flows, - &ofTestUtils.ExpectFlow{MatchStr: "priority=210,ct_state=-new+trk,ct_mark=0x20,ip,reg0=0x1/0xffff", ActStr: "goto_table:42"}, &ofTestUtils.ExpectFlow{MatchStr: "priority=190,ct_state=+inv+trk,ip", ActStr: "drop"}, ) table105Flows.flows = append(table105Flows.flows, @@ -1184,7 +1183,6 @@ func prepareDefaultFlows(config *testConfig) []expectTableFlows { } if config.enableIPv6 { table31Flows.flows = append(table31Flows.flows, - &ofTestUtils.ExpectFlow{MatchStr: "priority=210,ct_state=-new+trk,ct_mark=0x20,ipv6,reg0=0x1/0xffff", ActStr: "goto_table:42"}, &ofTestUtils.ExpectFlow{MatchStr: "priority=190,ct_state=+inv+trk,ipv6", ActStr: "drop"}, ) table105Flows.flows = append(table105Flows.flows,