Skip to content

Commit

Permalink
Fix cross-Node service access when AntreaProxy is disabled
Browse files Browse the repository at this point in the history
When AntreaProxy is disabled, if the reply traffic of a connection that
has been processed by iptables/ipvs rules (of kube-proxy) is received
from the tunnel interface, its destination MAC would be rewritten twice
because it would have both gatewayCTMark and macRewriteMark set. The
latter rewriting would overwrite the former one and would cause the
packets to be delivered to the destination Pod directly without doing
reversed NAT in the host netns.

This patch fixes it by making the pipeline rewrite the destination MAC
as most once. It moves the gatewayCTMark related mac rewritten flow to
l3ForwardingTable, to make L3 forwarding decision in same table
uniformly. It also simplifies the two gatewayCTMark related flows by
matching the direction of traffic which ensures the flow doesn't apply
to traffic from the gateway interface.

Signed-off-by: Quan Tian <[email protected]>
  • Loading branch information
tnqn committed Jun 28, 2021
1 parent 2d19196 commit 08732ce
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 94 deletions.
56 changes: 30 additions & 26 deletions docs/design/ovs-pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -337,37 +337,13 @@ This table handles all "tracked" packets (all packets are moved to the tracked
state by the previous table, [ConntrackTable]). It serves the following
purposes:

* keeps track of connections initiated through the gateway port, i.e. for which
the first packet of the connection (SYN packet for TCP) was received through
the gateway. For all reply packets belonging to such connections we overwrite
the destination MAC to the local gateway MAC to ensure that they get forwarded
though the gateway port. This is required to handle the following cases:
- reply traffic for connections from a local Pod to a ClusterIP Service, which
are handled by kube-proxy and go through DNAT. In this case the destination
IP address of the reply traffic is the Pod which initiated the connection to
the Service (no SNAT by kube-proxy). We need to make sure that these packets
are sent back through the gateway so that the source IP can be rewritten to
the ClusterIP ("undo" DNAT). If we do not use connection tracking and do not
rewrite the destination MAC, reply traffic from the backend will go directly
to the originating Pod without going first through the gateway and
kube-proxy. This means that the reply traffic will arrive at the
originating Pod with the incorrect source IP (it will be set to the
backend's IP instead of the Service IP).
- when hair-pinning is involved, i.e. for connections between 2 local Pods and
for which NAT is performed. One example is a Pod accessing a NodePort
Service for which `externalTrafficPolicy` is set to `Local` using the local
Node's IP address, as there will be no SNAT for such traffic. Another
example could be `hostPort` support, depending on how the feature is
implemented.
* drop packets reported as invalid by conntrack

If you dump the flows for this table, you should see the following:

```text
1. table=31, priority=210,ct_state=-new+trk,ct_mark=0x20,ip,reg0=0x1/0xffff actions=goto_table:40
2. table=31, priority=200,ct_state=+inv+trk,ip actions=drop
3. table=31, priority=200,ct_state=-new+trk,ct_mark=0x20,ip actions=mod_dl_dst:e2:e5:a4:9b:1c:b1,goto_table:40
4. table=31, priority=0 actions=goto_table:40
1. table=31, priority=190,ct_state=+inv+trk,ip actions=drop
2. table=31, priority=0 actions=goto_table:40
```

Flows 1 and 3 implement the destination MAC rewrite described above. Note that
Expand Down Expand Up @@ -568,6 +544,34 @@ traffic to the next table EgressMetricsTable, then ([L3ForwardingTable]).

This is the L3 routing table. It implements the following functionality:

* Route reply traffic of connections initiated through the gateway port, i.e.
for which the first packet of the connection (SYN packet for TCP) was received
through the gateway. For all reply packets belonging to such connections we
overwrite the destination MAC to the local gateway MAC to ensure that they get
forwarded through the gateway port. This is required to handle the following
cases:
- reply traffic for connections from a local Pod to a ClusterIP Service, which
are handled by kube-proxy and go through DNAT. In this case the destination
IP address of the reply traffic is the Pod which initiated the connection to
the Service (no SNAT by kube-proxy). We need to make sure that these packets
are sent back through the gateway so that the source IP can be rewritten to
the ClusterIP ("undo" DNAT). If we do not use connection tracking and do not
rewrite the destination MAC, reply traffic from the backend will go directly
to the originating Pod without going first through the gateway and
kube-proxy. This means that the reply traffic will arrive at the originating
Pod with the incorrect source IP (it will be set to the backend's IP instead
of the Service IP).
- when hair-pinning is involved, i.e. for connections between 2 local Pods and
for which NAT is performed. One example is a Pod accessing a NodePort
Service for which `externalTrafficPolicy` is set to `Local` using the local
Node's IP address, as there will be no SNAT for such traffic. Another
example could be `hostPort` support, depending on how the feature is
implemented.

```text
table=70, priority=210,ct_state=+rpl+trk,ct_mark=0x20,ip actions=mod_dl_dst:e2:e5:a4:9b:1c:b1,goto_table:80
```

* Tunnelled traffic coming-in from a peer Node and destined to a local Pod is
directly forwarded to the Pod. This requires setting the source MAC to the MAC
of the local gateway interface and setting the destination MAC to the Pod's
Expand Down
1 change: 0 additions & 1 deletion pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,6 @@ func (c *client) InstallGatewayFlows() error {

// Add flow to ensure the liveness check packet could be forwarded correctly.
flows = append(flows, c.localProbeFlow(gatewayIPs, cookie.Default)...)
flows = append(flows, c.ctRewriteDstMACFlows(gatewayConfig.MAC, cookie.Default)...)
flows = append(flows, c.l3FwdFlowToGateway(gatewayIPs, gatewayConfig.MAC, cookie.Default)...)

if err := c.ofEntryOperations.AddAll(flows); err != nil {
Expand Down
65 changes: 22 additions & 43 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,18 +636,6 @@ func (c *client) connectionTrackFlows(category cookie.Category) []binding.Flow {
ctZone = CtZoneV6
}
flows = append(flows,
// If a connection was initiated through the gateway (i.e. has gatewayCTMark) and
// the packet is received on the gateway port, go to the next table directly. This
// is to bypass the flow which is installed by ctRewriteDstMACFlow in the same
// table, and which will rewrite the destination MAC address for traffic with the
// gatewayCTMark, but which is flowing in the opposite direction.
connectionTrackStateTable.BuildFlow(priorityHigh).MatchProtocol(proto).
MatchRegRange(int(marksReg), markTrafficFromGateway, binding.Range{0, 15}).
MatchCTMark(gatewayCTMark, nil).
MatchCTStateNew(false).MatchCTStateTrk(true).
Action().GotoTable(connectionTrackStateTable.GetNext()).
Cookie(c.cookieAllocator.Request(category).Raw()).
Done(),
// Connections initiated through the gateway are marked with gatewayCTMark.
connectionTrackCommitTable.BuildFlow(priorityNormal).MatchProtocol(proto).
MatchRegRange(int(marksReg), markTrafficFromGateway, binding.Range{0, 15}).
Expand Down Expand Up @@ -895,36 +883,6 @@ func (c *client) traceflowNetworkPolicyFlows(dataplaneTag uint8, timeout uint16,
return flows
}

// ctRewriteDstMACFlow rewrites the destination MAC address with the local host gateway MAC if the
// packet is marked with gatewayCTMark but was not received on the host gateway. In other words, it
// rewrites the destination MAC address for reply traffic for connections which were initiated
// through the gateway, to ensure that this reply traffic gets forwarded correctly (back to the host
// network namespace, through the gateway). In particular, it is necessary in the following 2 cases:
// 1) reply traffic for connections from a local Pod to a ClusterIP Service (when AntreaProxy is
// disabled and kube-proxy is used). In this case the destination IP address of the reply traffic
// is the Pod which initiated the connection to the Service (no SNAT). We need to make sure that
// these packets are sent back through the gateway so that the source IP can be rewritten (Service
// backend IP -> Service ClusterIP).
// 2) when hair-pinning is involved, i.e. for connections between 2 local Pods belonging to this
// Node and for which NAT is performed. This applies regardless of whether AntreaProxy is enabled
// or not, and thus also applies to Windows Nodes (for which AntreaProxy is enabled by default).
// One example is a Pod accessing a NodePort Service for which externalTrafficPolicy is set to
// Local, using the local Node's IP address.
func (c *client) ctRewriteDstMACFlows(gatewayMAC net.HardwareAddr, category cookie.Category) []binding.Flow {
connectionTrackStateTable := c.pipeline[conntrackStateTable]
var flows []binding.Flow
for _, proto := range c.ipProtocols {
flows = append(flows, connectionTrackStateTable.BuildFlow(priorityNormal).MatchProtocol(proto).
MatchCTMark(gatewayCTMark, nil).
MatchCTStateNew(false).MatchCTStateTrk(true).
Action().SetDstMAC(gatewayMAC).
Action().GotoTable(connectionTrackStateTable.GetNext()).
Cookie(c.cookieAllocator.Request(category).Raw()).
Done())
}
return flows
}

// serviceLBBypassFlows makes packets that belong to a tracked connection bypass
// service LB tables and enter egressRuleTable directly.
func (c *client) serviceLBBypassFlows(ipProtocol binding.Protocol) []binding.Flow {
Expand Down Expand Up @@ -1209,7 +1167,7 @@ func (c *client) l3FwdFlowRouteToGW(gwMAC net.HardwareAddr, category cookie.Cate
}

// l3FwdFlowToGateway generates the L3 forward flows to rewrite the destination MAC of the packets to the gateway interface
// MAC if the destination IP is the gateway IP.
// MAC if the destination IP is the gateway IP or the connection was initiated through the gateway interface.
func (c *client) l3FwdFlowToGateway(localGatewayIPs []net.IP, localGatewayMAC net.HardwareAddr, category cookie.Category) []binding.Flow {
l3FwdTable := c.pipeline[l3ForwardingTable]
var flows []binding.Flow
Expand All @@ -1223,6 +1181,27 @@ func (c *client) l3FwdFlowToGateway(localGatewayIPs []net.IP, localGatewayMAC ne
Cookie(c.cookieAllocator.Request(category).Raw()).
Done())
}
// Rewrite the destination MAC address with the local host gateway MAC if the packet is in the reply direction and
// is marked with gatewayCTMark. This is for connections which were initiated through the gateway, to ensure that
// this reply traffic gets forwarded correctly (back to the host network namespace, through the gateway). In
// particular, it is necessary in the following 2 cases:
// 1) reply traffic for connections from a local Pod to a ClusterIP Service (when AntreaProxy is disabled and
// kube-proxy is used). In this case the destination IP address of the reply traffic is the Pod which initiated the
// connection to the Service (no SNAT). We need to make sure that these packets are sent back through the gateway
// so that the source IP can be rewritten (Service backend IP -> Service ClusterIP).
// 2) when hair-pinning is involved, i.e. for connections between 2 local Pods belonging to this Node and for which
// NAT is performed. This applies regardless of whether AntreaProxy is enabled or not, and thus also applies to
// Windows Nodes (for which AntreaProxy is enabled by default). One example is a Pod accessing a NodePort Service
// for which externalTrafficPolicy is set to Local, using the local Node's IP address.
for _, proto := range c.ipProtocols {
flows = append(flows, l3FwdTable.BuildFlow(priorityHigh).MatchProtocol(proto).
MatchCTMark(gatewayCTMark, nil).
MatchCTStateRpl(true).MatchCTStateTrk(true).
Action().SetDstMAC(localGatewayMAC).
Action().GotoTable(l3FwdTable.GetNext()).
Cookie(c.cookieAllocator.Request(category).Raw()).
Done())
}
return flows
}

Expand Down
60 changes: 41 additions & 19 deletions test/e2e/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@ package e2e

import (
"fmt"
"net"
"strings"
"testing"

"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
)

// TestClusterIPHostAccess tests traffic from host to ClusterIP Service.
func TestClusterIPHostAccess(t *testing.T) {
// TestClusterIP tests traffic from Nodes and Pods to ClusterIP Service.
func TestClusterIP(t *testing.T) {
// TODO: Support for dual-stack and IPv6-only clusters
skipIfIPv6Cluster(t)

Expand All @@ -35,35 +36,56 @@ func TestClusterIPHostAccess(t *testing.T) {
defer teardownTest(t, data)

svcName := "nginx"
node := nodeName(0)
svc, cleanup := data.createClusterIPServiceAndBackendPods(t, svcName, node)
serverPodNode := nodeName(0)
svc, cleanup := data.createClusterIPServiceAndBackendPods(t, svcName, serverPodNode)
defer cleanup()
t.Logf("%s Service is ready", svcName)

var linNode, winNode string
linNode = node
if len(clusterInfo.windowsNodes) != 0 {
idx := clusterInfo.windowsNodes[0]
winNode = clusterInfo.nodes[idx].name
}

curlSvc := func(node string) {
testFromNode := func(node string) {
// Retry is needed for rules to be installed by kube-proxy/antrea-proxy.
cmd := fmt.Sprintf("curl --connect-timeout 1 --retry 5 --retry-connrefused %s:80", svc.Spec.ClusterIP)
rc, stdout, stderr, err := RunCommandOnNode(node, cmd)
if rc != 0 || err != nil {
t.Errorf("Error when running command '%s' on Node '%s', rc: %d, stdout: %s, stderr: %s, error: %v",
cmd, node, rc, stdout, stderr, err)
} else {
t.Logf("curl from Node '%s' succeeded", node)
}
}
t.Logf("Try to curl ClusterIP Service from a Linux host")
curlSvc(linNode)
if winNode != "" {
t.Logf("Try to curl Cluster IP Service from a Windows host")
curlSvc(winNode)

testFromPod := func(podName, nodeName string) {
require.NoError(t, data.createBusyboxPodOnNode(podName, nodeName))
defer data.deletePodAndWait(defaultTimeout, podName)
require.NoError(t, data.podWaitForRunning(defaultTimeout, podName, testNamespace))
err := data.runNetcatCommandFromTestPod(podName, svc.Spec.ClusterIP, 80)
require.NoError(t, err, "Pod %s should be able to connect %s, but was not able to connect", podName, net.JoinHostPort(svc.Spec.ClusterIP, fmt.Sprint(80)))
}

t.Run("ClusterIP", func(t *testing.T) {
t.Run("Same Linux Node can access the Service", func(t *testing.T) {
t.Parallel()
testFromNode(serverPodNode)
})
t.Run("Different Linux Node can access the Service", func(t *testing.T) {
t.Parallel()
skipIfNumNodesLessThan(t, 2)
testFromNode(nodeName(1))
})
t.Run("Windows host can access the Service", func(t *testing.T) {
t.Parallel()
skipIfNoWindowsNodes(t)
idx := clusterInfo.windowsNodes[0]
winNode := clusterInfo.nodes[idx].name
testFromNode(winNode)
})
t.Run("Linux Pod on same Node can access the Service", func(t *testing.T) {
t.Parallel()
testFromPod("client-on-same-node", serverPodNode)
})
t.Run("Linux Pod on different Node can access the Service", func(t *testing.T) {
t.Parallel()
skipIfNumNodesLessThan(t, 2)
testFromPod("client-on-different-node", nodeName(1))
})
})
}

func (data *TestData) createClusterIPServiceAndBackendPods(t *testing.T, name string, node string) (*corev1.Service, func()) {
Expand Down
8 changes: 3 additions & 5 deletions test/integration/agent/openflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1072,11 +1072,11 @@ func prepareGatewayFlows(gwIPs []net.IP, gwMAC net.HardwareAddr, vMAC net.Hardwa
},
},
expectTableFlows{
uint8(31),
uint8(70),
[]*ofTestUtils.ExpectFlow{
{
MatchStr: fmt.Sprintf("priority=200,ct_state=-new+trk,ct_mark=0x20,%s", ipProtoStr),
ActStr: fmt.Sprintf("set_field:%s->eth_dst,goto_table:42", gwMAC.String()),
MatchStr: fmt.Sprintf("priority=210,ct_state=+rpl+trk,ct_mark=0x20,%s", ipProtoStr),
ActStr: fmt.Sprintf("set_field:%s->eth_dst,goto_table:80", gwMAC.String()),
},
},
},
Expand Down Expand Up @@ -1170,7 +1170,6 @@ func prepareDefaultFlows(config *testConfig) []expectTableFlows {
}
if config.enableIPv4 {
table31Flows.flows = append(table31Flows.flows,
&ofTestUtils.ExpectFlow{MatchStr: "priority=210,ct_state=-new+trk,ct_mark=0x20,ip,reg0=0x1/0xffff", ActStr: "goto_table:42"},
&ofTestUtils.ExpectFlow{MatchStr: "priority=190,ct_state=+inv+trk,ip", ActStr: "drop"},
)
table105Flows.flows = append(table105Flows.flows,
Expand All @@ -1184,7 +1183,6 @@ func prepareDefaultFlows(config *testConfig) []expectTableFlows {
}
if config.enableIPv6 {
table31Flows.flows = append(table31Flows.flows,
&ofTestUtils.ExpectFlow{MatchStr: "priority=210,ct_state=-new+trk,ct_mark=0x20,ipv6,reg0=0x1/0xffff", ActStr: "goto_table:42"},
&ofTestUtils.ExpectFlow{MatchStr: "priority=190,ct_state=+inv+trk,ipv6", ActStr: "drop"},
)
table105Flows.flows = append(table105Flows.flows,
Expand Down

0 comments on commit 08732ce

Please sign in to comment.