Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cross-Node service access when AntreaProxy is disabled #2318

Merged
merged 1 commit into from
Jun 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 32 additions & 33 deletions docs/design/ovs-pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -337,46 +337,16 @@ This table handles all "tracked" packets (all packets are moved to the tracked
state by the previous table, [ConntrackTable]). It serves the following
purposes:

* keeps track of connections initiated through the gateway port, i.e. for which
the first packet of the connection (SYN packet for TCP) was received through
the gateway. For all reply packets belonging to such connections we overwrite
the destination MAC to the local gateway MAC to ensure that they get forwarded
though the gateway port. This is required to handle the following cases:
- reply traffic for connections from a local Pod to a ClusterIP Service, which
are handled by kube-proxy and go through DNAT. In this case the destination
IP address of the reply traffic is the Pod which initiated the connection to
the Service (no SNAT by kube-proxy). We need to make sure that these packets
are sent back through the gateway so that the source IP can be rewritten to
the ClusterIP ("undo" DNAT). If we do not use connection tracking and do not
rewrite the destination MAC, reply traffic from the backend will go directly
to the originating Pod without going first through the gateway and
kube-proxy. This means that the reply traffic will arrive at the
originating Pod with the incorrect source IP (it will be set to the
backend's IP instead of the Service IP).
- when hair-pinning is involved, i.e. for connections between 2 local Pods and
for which NAT is performed. One example is a Pod accessing a NodePort
Service for which `externalTrafficPolicy` is set to `Local` using the local
Node's IP address, as there will be no SNAT for such traffic. Another
example could be `hostPort` support, depending on how the feature is
implemented.
* drop packets reported as invalid by conntrack

If you dump the flows for this table, you should see the following:

```text
1. table=31, priority=210,ct_state=-new+trk,ct_mark=0x20,ip,reg0=0x1/0xffff actions=goto_table:40
2. table=31, priority=200,ct_state=+inv+trk,ip actions=drop
3. table=31, priority=200,ct_state=-new+trk,ct_mark=0x20,ip actions=mod_dl_dst:e2:e5:a4:9b:1c:b1,goto_table:40
4. table=31, priority=0 actions=goto_table:40
1. table=31, priority=190,ct_state=+inv+trk,ip actions=drop
2. table=31, priority=0 actions=goto_table:40
```

Flows 1 and 3 implement the destination MAC rewrite described above. Note that
at this stage we have not committed any connection yet. We commit all
connections after enforcing Network Policies, in [ConntrackCommitTable]. This is
also when we set the `ct_mark` to `0x20` for connections initiated through the
gateway.

Flow 2 drops invalid traffic. All non-dropped traffic finally goes to the
Flow 1 drops invalid traffic. All non-dropped traffic finally goes to the
[DNATTable].

### DNATTable (40)
Expand Down Expand Up @@ -590,6 +560,35 @@ table=70, priority=200,ip,reg0=0x80000/0x80000,nw_dst=10.10.0.2 actions=mod_dl_s
table=70, priority=200,ip,reg0=0x80000/0x80000,nw_dst=10.10.0.1 actions=mod_dl_dst:e2:e5:a4:9b:1c:b1,goto_table:80
```

* All reply traffic of connections initiated through the gateway port, i.e. for
which the first packet of the connection (SYN packet for TCP) was received
through the gateway. Such packets can be identified by the packet's direction
in `ct_state` and the `ct_mark` value `0x20` which is committed in
[ConntrackCommitTable] when the first packet of the connection was handled.
A flow will overwrite the destination MAC to the local gateway MAC to ensure
that they get forwarded through the gateway port. This is required to handle
the following cases:
- reply traffic for connections from a local Pod to a ClusterIP Service, which
are handled by kube-proxy and go through DNAT. In this case the destination
IP address of the reply traffic is the Pod which initiated the connection to
the Service (no SNAT by kube-proxy). We need to make sure that these packets
are sent back through the gateway so that the source IP can be rewritten to
the ClusterIP ("undo" DNAT). If we do not use connection tracking and do not
rewrite the destination MAC, reply traffic from the backend will go directly
to the originating Pod without going first through the gateway and
kube-proxy. This means that the reply traffic will arrive at the originating
Pod with the incorrect source IP (it will be set to the backend's IP instead
of the Service IP).
- when hair-pinning is involved, i.e. connections between 2 local Pods, for
which NAT is performed. One example is a Pod accessing a NodePort Service
for which `externalTrafficPolicy` is set to `Local` using the local Node's
IP address, as there will be no SNAT for such traffic. Another example could
be `hostPort` support, depending on how the feature is implemented.

```text
table=70, priority=210,ct_state=+rpl+trk,ct_mark=0x20,ip actions=mod_dl_dst:e2:e5:a4:9b:1c:b1,goto_table:80
```

* All traffic destined to a remote Pod is forwarded through the appropriate
tunnel. This means that we install one flow for each peer Node, each one
matching the destination IP address of the packet against the Pod subnet for
Expand Down
1 change: 0 additions & 1 deletion pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,6 @@ func (c *client) InstallGatewayFlows() error {

// Add flow to ensure the liveness check packet could be forwarded correctly.
flows = append(flows, c.localProbeFlow(gatewayIPs, cookie.Default)...)
flows = append(flows, c.ctRewriteDstMACFlows(gatewayConfig.MAC, cookie.Default)...)
flows = append(flows, c.l3FwdFlowToGateway(gatewayIPs, gatewayConfig.MAC, cookie.Default)...)

if err := c.ofEntryOperations.AddAll(flows); err != nil {
Expand Down
65 changes: 22 additions & 43 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,18 +636,6 @@ func (c *client) connectionTrackFlows(category cookie.Category) []binding.Flow {
ctZone = CtZoneV6
}
flows = append(flows,
// If a connection was initiated through the gateway (i.e. has gatewayCTMark) and
// the packet is received on the gateway port, go to the next table directly. This
// is to bypass the flow which is installed by ctRewriteDstMACFlow in the same
// table, and which will rewrite the destination MAC address for traffic with the
// gatewayCTMark, but which is flowing in the opposite direction.
connectionTrackStateTable.BuildFlow(priorityHigh).MatchProtocol(proto).
MatchRegRange(int(marksReg), markTrafficFromGateway, binding.Range{0, 15}).
MatchCTMark(gatewayCTMark, nil).
MatchCTStateNew(false).MatchCTStateTrk(true).
Action().GotoTable(connectionTrackStateTable.GetNext()).
Cookie(c.cookieAllocator.Request(category).Raw()).
Done(),
// Connections initiated through the gateway are marked with gatewayCTMark.
connectionTrackCommitTable.BuildFlow(priorityNormal).MatchProtocol(proto).
MatchRegRange(int(marksReg), markTrafficFromGateway, binding.Range{0, 15}).
Expand Down Expand Up @@ -895,36 +883,6 @@ func (c *client) traceflowNetworkPolicyFlows(dataplaneTag uint8, timeout uint16,
return flows
}

// ctRewriteDstMACFlow rewrites the destination MAC address with the local host gateway MAC if the
// packet is marked with gatewayCTMark but was not received on the host gateway. In other words, it
// rewrites the destination MAC address for reply traffic for connections which were initiated
// through the gateway, to ensure that this reply traffic gets forwarded correctly (back to the host
// network namespace, through the gateway). In particular, it is necessary in the following 2 cases:
// 1) reply traffic for connections from a local Pod to a ClusterIP Service (when AntreaProxy is
// disabled and kube-proxy is used). In this case the destination IP address of the reply traffic
// is the Pod which initiated the connection to the Service (no SNAT). We need to make sure that
// these packets are sent back through the gateway so that the source IP can be rewritten (Service
// backend IP -> Service ClusterIP).
// 2) when hair-pinning is involved, i.e. for connections between 2 local Pods belonging to this
// Node and for which NAT is performed. This applies regardless of whether AntreaProxy is enabled
// or not, and thus also applies to Windows Nodes (for which AntreaProxy is enabled by default).
// One example is a Pod accessing a NodePort Service for which externalTrafficPolicy is set to
// Local, using the local Node's IP address.
func (c *client) ctRewriteDstMACFlows(gatewayMAC net.HardwareAddr, category cookie.Category) []binding.Flow {
connectionTrackStateTable := c.pipeline[conntrackStateTable]
var flows []binding.Flow
for _, proto := range c.ipProtocols {
flows = append(flows, connectionTrackStateTable.BuildFlow(priorityNormal).MatchProtocol(proto).
MatchCTMark(gatewayCTMark, nil).
MatchCTStateNew(false).MatchCTStateTrk(true).
Action().SetDstMAC(gatewayMAC).
Action().GotoTable(connectionTrackStateTable.GetNext()).
Cookie(c.cookieAllocator.Request(category).Raw()).
Done())
}
return flows
}

// serviceLBBypassFlows makes packets that belong to a tracked connection bypass
// service LB tables and enter egressRuleTable directly.
func (c *client) serviceLBBypassFlows(ipProtocol binding.Protocol) []binding.Flow {
Expand Down Expand Up @@ -1209,7 +1167,7 @@ func (c *client) l3FwdFlowRouteToGW(gwMAC net.HardwareAddr, category cookie.Cate
}

// l3FwdFlowToGateway generates the L3 forward flows to rewrite the destination MAC of the packets to the gateway interface
// MAC if the destination IP is the gateway IP.
// MAC if the destination IP is the gateway IP or the connection was initiated through the gateway interface.
func (c *client) l3FwdFlowToGateway(localGatewayIPs []net.IP, localGatewayMAC net.HardwareAddr, category cookie.Category) []binding.Flow {
l3FwdTable := c.pipeline[l3ForwardingTable]
var flows []binding.Flow
Expand All @@ -1223,6 +1181,27 @@ func (c *client) l3FwdFlowToGateway(localGatewayIPs []net.IP, localGatewayMAC ne
Cookie(c.cookieAllocator.Request(category).Raw()).
Done())
}
// Rewrite the destination MAC address with the local host gateway MAC if the packet is in the reply direction and
// is marked with gatewayCTMark. This is for connections which were initiated through the gateway, to ensure that
// this reply traffic gets forwarded correctly (back to the host network namespace, through the gateway). In
// particular, it is necessary in the following 2 cases:
// 1) reply traffic for connections from a local Pod to a ClusterIP Service (when AntreaProxy is disabled and
// kube-proxy is used). In this case the destination IP address of the reply traffic is the Pod which initiated the
// connection to the Service (no SNAT). We need to make sure that these packets are sent back through the gateway
// so that the source IP can be rewritten (Service backend IP -> Service ClusterIP).
// 2) when hair-pinning is involved, i.e. connections between 2 local Pods, for which NAT is performed. This
// applies regardless of whether AntreaProxy is enabled or not, and thus also applies to Windows Nodes (for which
// AntreaProxy is enabled by default). One example is a Pod accessing a NodePort Service for which
// externalTrafficPolicy is set to Local, using the local Node's IP address.
for _, proto := range c.ipProtocols {
flows = append(flows, l3FwdTable.BuildFlow(priorityHigh).MatchProtocol(proto).
MatchCTMark(gatewayCTMark, nil).
MatchCTStateRpl(true).MatchCTStateTrk(true).
Action().SetDstMAC(localGatewayMAC).
Action().GotoTable(l3FwdTable.GetNext()).
Cookie(c.cookieAllocator.Request(category).Raw()).
Done())
}
return flows
}

Expand Down
60 changes: 41 additions & 19 deletions test/e2e/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@ package e2e

import (
"fmt"
"net"
"strings"
"testing"

"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
)

// TestClusterIPHostAccess tests traffic from host to ClusterIP Service.
func TestClusterIPHostAccess(t *testing.T) {
// TestClusterIP tests traffic from Nodes and Pods to ClusterIP Service.
func TestClusterIP(t *testing.T) {
// TODO: Support for dual-stack and IPv6-only clusters
skipIfIPv6Cluster(t)

Expand All @@ -35,35 +36,56 @@ func TestClusterIPHostAccess(t *testing.T) {
defer teardownTest(t, data)

svcName := "nginx"
node := nodeName(0)
svc, cleanup := data.createClusterIPServiceAndBackendPods(t, svcName, node)
serverPodNode := nodeName(0)
svc, cleanup := data.createClusterIPServiceAndBackendPods(t, svcName, serverPodNode)
defer cleanup()
t.Logf("%s Service is ready", svcName)

var linNode, winNode string
linNode = node
if len(clusterInfo.windowsNodes) != 0 {
idx := clusterInfo.windowsNodes[0]
winNode = clusterInfo.nodes[idx].name
}

curlSvc := func(node string) {
testFromNode := func(node string) {
// Retry is needed for rules to be installed by kube-proxy/antrea-proxy.
cmd := fmt.Sprintf("curl --connect-timeout 1 --retry 5 --retry-connrefused %s:80", svc.Spec.ClusterIP)
rc, stdout, stderr, err := RunCommandOnNode(node, cmd)
if rc != 0 || err != nil {
t.Errorf("Error when running command '%s' on Node '%s', rc: %d, stdout: %s, stderr: %s, error: %v",
cmd, node, rc, stdout, stderr, err)
} else {
t.Logf("curl from Node '%s' succeeded", node)
}
}
t.Logf("Try to curl ClusterIP Service from a Linux host")
curlSvc(linNode)
if winNode != "" {
t.Logf("Try to curl Cluster IP Service from a Windows host")
curlSvc(winNode)

testFromPod := func(podName, nodeName string) {
require.NoError(t, data.createBusyboxPodOnNode(podName, nodeName))
defer data.deletePodAndWait(defaultTimeout, podName)
require.NoError(t, data.podWaitForRunning(defaultTimeout, podName, testNamespace))
err := data.runNetcatCommandFromTestPod(podName, svc.Spec.ClusterIP, 80)
require.NoError(t, err, "Pod %s should be able to connect %s, but was not able to connect", podName, net.JoinHostPort(svc.Spec.ClusterIP, fmt.Sprint(80)))
}

t.Run("ClusterIP", func(t *testing.T) {
t.Run("Same Linux Node can access the Service", func(t *testing.T) {
t.Parallel()
testFromNode(serverPodNode)
})
t.Run("Different Linux Node can access the Service", func(t *testing.T) {
t.Parallel()
skipIfNumNodesLessThan(t, 2)
testFromNode(nodeName(1))
})
t.Run("Windows host can access the Service", func(t *testing.T) {
t.Parallel()
skipIfNoWindowsNodes(t)
idx := clusterInfo.windowsNodes[0]
winNode := clusterInfo.nodes[idx].name
testFromNode(winNode)
})
t.Run("Linux Pod on same Node can access the Service", func(t *testing.T) {
t.Parallel()
testFromPod("client-on-same-node", serverPodNode)
})
t.Run("Linux Pod on different Node can access the Service", func(t *testing.T) {
t.Parallel()
skipIfNumNodesLessThan(t, 2)
testFromPod("client-on-different-node", nodeName(1))
})
})
}

func (data *TestData) createClusterIPServiceAndBackendPods(t *testing.T, name string, node string) (*corev1.Service, func()) {
Expand Down
8 changes: 3 additions & 5 deletions test/integration/agent/openflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1072,11 +1072,11 @@ func prepareGatewayFlows(gwIPs []net.IP, gwMAC net.HardwareAddr, vMAC net.Hardwa
},
},
expectTableFlows{
uint8(31),
uint8(70),
[]*ofTestUtils.ExpectFlow{
{
MatchStr: fmt.Sprintf("priority=200,ct_state=-new+trk,ct_mark=0x20,%s", ipProtoStr),
ActStr: fmt.Sprintf("set_field:%s->eth_dst,goto_table:42", gwMAC.String()),
MatchStr: fmt.Sprintf("priority=210,ct_state=+rpl+trk,ct_mark=0x20,%s", ipProtoStr),
ActStr: fmt.Sprintf("set_field:%s->eth_dst,goto_table:80", gwMAC.String()),
},
},
},
Expand Down Expand Up @@ -1170,7 +1170,6 @@ func prepareDefaultFlows(config *testConfig) []expectTableFlows {
}
if config.enableIPv4 {
table31Flows.flows = append(table31Flows.flows,
&ofTestUtils.ExpectFlow{MatchStr: "priority=210,ct_state=-new+trk,ct_mark=0x20,ip,reg0=0x1/0xffff", ActStr: "goto_table:42"},
&ofTestUtils.ExpectFlow{MatchStr: "priority=190,ct_state=+inv+trk,ip", ActStr: "drop"},
)
table105Flows.flows = append(table105Flows.flows,
Expand All @@ -1184,7 +1183,6 @@ func prepareDefaultFlows(config *testConfig) []expectTableFlows {
}
if config.enableIPv6 {
table31Flows.flows = append(table31Flows.flows,
&ofTestUtils.ExpectFlow{MatchStr: "priority=210,ct_state=-new+trk,ct_mark=0x20,ipv6,reg0=0x1/0xffff", ActStr: "goto_table:42"},
&ofTestUtils.ExpectFlow{MatchStr: "priority=190,ct_state=+inv+trk,ipv6", ActStr: "drop"},
)
table105Flows.flows = append(table105Flows.flows,
Expand Down