Skip to content

Commit

Permalink
Fix cross-Node service access when AntreaProxy is disabled
Browse files Browse the repository at this point in the history
Signed-off-by: Quan Tian <[email protected]>
  • Loading branch information
tnqn committed Jun 28, 2021
1 parent 2d19196 commit f61b5a6
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 68 deletions.
1 change: 0 additions & 1 deletion pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,6 @@ func (c *client) InstallGatewayFlows() error {

// Add flow to ensure the liveness check packet could be forwarded correctly.
flows = append(flows, c.localProbeFlow(gatewayIPs, cookie.Default)...)
flows = append(flows, c.ctRewriteDstMACFlows(gatewayConfig.MAC, cookie.Default)...)
flows = append(flows, c.l3FwdFlowToGateway(gatewayIPs, gatewayConfig.MAC, cookie.Default)...)

if err := c.ofEntryOperations.AddAll(flows); err != nil {
Expand Down
67 changes: 24 additions & 43 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,18 +636,6 @@ func (c *client) connectionTrackFlows(category cookie.Category) []binding.Flow {
ctZone = CtZoneV6
}
flows = append(flows,
// If a connection was initiated through the gateway (i.e. has gatewayCTMark) and
// the packet is received on the gateway port, go to the next table directly. This
// is to bypass the flow which is installed by ctRewriteDstMACFlow in the same
// table, and which will rewrite the destination MAC address for traffic with the
// gatewayCTMark, but which is flowing in the opposite direction.
connectionTrackStateTable.BuildFlow(priorityHigh).MatchProtocol(proto).
MatchRegRange(int(marksReg), markTrafficFromGateway, binding.Range{0, 15}).
MatchCTMark(gatewayCTMark, nil).
MatchCTStateNew(false).MatchCTStateTrk(true).
Action().GotoTable(connectionTrackStateTable.GetNext()).
Cookie(c.cookieAllocator.Request(category).Raw()).
Done(),
// Connections initiated through the gateway are marked with gatewayCTMark.
connectionTrackCommitTable.BuildFlow(priorityNormal).MatchProtocol(proto).
MatchRegRange(int(marksReg), markTrafficFromGateway, binding.Range{0, 15}).
Expand Down Expand Up @@ -895,36 +883,6 @@ func (c *client) traceflowNetworkPolicyFlows(dataplaneTag uint8, timeout uint16,
return flows
}

// ctRewriteDstMACFlow rewrites the destination MAC address with the local host gateway MAC if the
// packet is marked with gatewayCTMark but was not received on the host gateway. In other words, it
// rewrites the destination MAC address for reply traffic for connections which were initiated
// through the gateway, to ensure that this reply traffic gets forwarded correctly (back to the host
// network namespace, through the gateway). In particular, it is necessary in the following 2 cases:
// 1) reply traffic for connections from a local Pod to a ClusterIP Service (when AntreaProxy is
// disabled and kube-proxy is used). In this case the destination IP address of the reply traffic
// is the Pod which initiated the connection to the Service (no SNAT). We need to make sure that
// these packets are sent back through the gateway so that the source IP can be rewritten (Service
// backend IP -> Service ClusterIP).
// 2) when hair-pinning is involved, i.e. for connections between 2 local Pods belonging to this
// Node and for which NAT is performed. This applies regardless of whether AntreaProxy is enabled
// or not, and thus also applies to Windows Nodes (for which AntreaProxy is enabled by default).
// One example is a Pod accessing a NodePort Service for which externalTrafficPolicy is set to
// Local, using the local Node's IP address.
func (c *client) ctRewriteDstMACFlows(gatewayMAC net.HardwareAddr, category cookie.Category) []binding.Flow {
connectionTrackStateTable := c.pipeline[conntrackStateTable]
var flows []binding.Flow
for _, proto := range c.ipProtocols {
flows = append(flows, connectionTrackStateTable.BuildFlow(priorityNormal).MatchProtocol(proto).
MatchCTMark(gatewayCTMark, nil).
MatchCTStateNew(false).MatchCTStateTrk(true).
Action().SetDstMAC(gatewayMAC).
Action().GotoTable(connectionTrackStateTable.GetNext()).
Cookie(c.cookieAllocator.Request(category).Raw()).
Done())
}
return flows
}

// serviceLBBypassFlows makes packets that belong to a tracked connection bypass
// service LB tables and enter egressRuleTable directly.
func (c *client) serviceLBBypassFlows(ipProtocol binding.Protocol) []binding.Flow {
Expand Down Expand Up @@ -1209,7 +1167,7 @@ func (c *client) l3FwdFlowRouteToGW(gwMAC net.HardwareAddr, category cookie.Cate
}

// l3FwdFlowToGateway generates the L3 forward flows to rewrite the destination MAC of the packets to the gateway interface
// MAC if the destination IP is the gateway IP.
// MAC if the destination IP is the gateway IP or the connection was initiated through the gateway interface.
func (c *client) l3FwdFlowToGateway(localGatewayIPs []net.IP, localGatewayMAC net.HardwareAddr, category cookie.Category) []binding.Flow {
l3FwdTable := c.pipeline[l3ForwardingTable]
var flows []binding.Flow
Expand All @@ -1223,6 +1181,29 @@ func (c *client) l3FwdFlowToGateway(localGatewayIPs []net.IP, localGatewayMAC ne
Cookie(c.cookieAllocator.Request(category).Raw()).
Done())
}
// Rewrites the destination MAC address with the local host gateway MAC if the packet is in the reply direction and
// is marked with gatewayCTMark. In other words, it rewrites the destination MAC address for reply traffic for
// connections which were initiated through the gateway, to ensure that this reply traffic gets forwarded correctly
// (back to the host network namespace, through the gateway). In particular, it is necessary in the following 2 cases:
// 1) reply traffic for connections from a local Pod to a ClusterIP Service (when AntreaProxy is disabled and
// kube-proxy is used). In this case the destination IP address of the reply traffic
// is the Pod which initiated the connection to the Service (no SNAT). We need to make sure that
// these packets are sent back through the gateway so that the source IP can be rewritten (Service
// backend IP -> Service ClusterIP).
// 2) when hair-pinning is involved, i.e. for connections between 2 local Pods belonging to this
// Node and for which NAT is performed. This applies regardless of whether AntreaProxy is enabled
// or not, and thus also applies to Windows Nodes (for which AntreaProxy is enabled by default).
// One example is a Pod accessing a NodePort Service for which externalTrafficPolicy is set to
// Local, using the local Node's IP address.
for _, proto := range c.ipProtocols {
flows = append(flows, l3FwdTable.BuildFlow(priorityHigh).MatchProtocol(proto).
MatchCTMark(gatewayCTMark, nil).
MatchCTStateRpl(true).MatchCTStateTrk(true).
Action().SetDstMAC(localGatewayMAC).
Action().GotoTable(l3FwdTable.GetNext()).
Cookie(c.cookieAllocator.Request(category).Raw()).
Done())
}
return flows
}

Expand Down
61 changes: 42 additions & 19 deletions test/e2e/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@ package e2e

import (
"fmt"
"net"
"strings"
"testing"

"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
)

// TestClusterIPHostAccess tests traffic from host to ClusterIP Service.
func TestClusterIPHostAccess(t *testing.T) {
// TestClusterIP tests traffic from Nodes and Pods to ClusterIP Service.
func TestClusterIP(t *testing.T) {
// TODO: Support for dual-stack and IPv6-only clusters
skipIfIPv6Cluster(t)

Expand All @@ -35,35 +36,57 @@ func TestClusterIPHostAccess(t *testing.T) {
defer teardownTest(t, data)

svcName := "nginx"
node := nodeName(0)
svc, cleanup := data.createClusterIPServiceAndBackendPods(t, svcName, node)
serverPodNode := nodeName(0)
svc, cleanup := data.createClusterIPServiceAndBackendPods(t, svcName, serverPodNode)
defer cleanup()
t.Logf("%s Service is ready", svcName)

var linNode, winNode string
linNode = node
if len(clusterInfo.windowsNodes) != 0 {
idx := clusterInfo.windowsNodes[0]
winNode = clusterInfo.nodes[idx].name
}

curlSvc := func(node string) {
testFromNode := func(node string) {
// Retry is needed for rules to be installed by kube-proxy/antrea-proxy.
cmd := fmt.Sprintf("curl --connect-timeout 1 --retry 5 --retry-connrefused %s:80", svc.Spec.ClusterIP)
rc, stdout, stderr, err := RunCommandOnNode(node, cmd)
if rc != 0 || err != nil {
t.Errorf("Error when running command '%s' on Node '%s', rc: %d, stdout: %s, stderr: %s, error: %v",
cmd, node, rc, stdout, stderr, err)
} else {
t.Logf("curl from Node '%s' succeeded", node)
}
}
t.Logf("Try to curl ClusterIP Service from a Linux host")
curlSvc(linNode)
if winNode != "" {
t.Logf("Try to curl Cluster IP Service from a Windows host")
curlSvc(winNode)

testFromPod := func(podName, nodeName string) {
require.NoError(t, data.createBusyboxPodOnNode(podName, nodeName))
defer data.deletePodAndWait(defaultTimeout, podName)
require.NoError(t, data.podWaitForRunning(defaultTimeout, podName, testNamespace))
if err := data.runNetcatCommandFromTestPod(podName, svc.Spec.ClusterIP, 80); err != nil {
t.Fatalf("Pod %s should be able to connect %s, but was not able to connect", podName, net.JoinHostPort(svc.Spec.ClusterIP, fmt.Sprint(80)))
}
}

t.Run("ClusterIP", func(t *testing.T) {
t.Run("Same Linux Node can access the Service", func(t *testing.T) {
t.Parallel()
testFromNode(serverPodNode)
})
t.Run("Different Linux Node can access the Service", func(t *testing.T) {
t.Parallel()
skipIfNumNodesLessThan(t, 2)
testFromNode(nodeName(1))
})
t.Run("Windows host can access the Service", func(t *testing.T) {
t.Parallel()
skipIfNoWindowsNodes(t)
idx := clusterInfo.windowsNodes[0]
winNode := clusterInfo.nodes[idx].name
testFromNode(winNode)
})
t.Run("Linux Pod on same Node can access the Service", func(t *testing.T) {
t.Parallel()
testFromPod("client-on-same-node", serverPodNode)
})
t.Run("Linux Pod on different Node can access the Service", func(t *testing.T) {
t.Parallel()
skipIfNumNodesLessThan(t, 2)
testFromPod("client-on-different-node", nodeName(1))
})
})
}

func (data *TestData) createClusterIPServiceAndBackendPods(t *testing.T, name string, node string) (*corev1.Service, func()) {
Expand Down
8 changes: 3 additions & 5 deletions test/integration/agent/openflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1072,11 +1072,11 @@ func prepareGatewayFlows(gwIPs []net.IP, gwMAC net.HardwareAddr, vMAC net.Hardwa
},
},
expectTableFlows{
uint8(31),
uint8(70),
[]*ofTestUtils.ExpectFlow{
{
MatchStr: fmt.Sprintf("priority=200,ct_state=-new+trk,ct_mark=0x20,%s", ipProtoStr),
ActStr: fmt.Sprintf("set_field:%s->eth_dst,goto_table:42", gwMAC.String()),
MatchStr: fmt.Sprintf("priority=210,ct_state=+rpl+trk,ct_mark=0x20,%s", ipProtoStr),
ActStr: fmt.Sprintf("set_field:%s->eth_dst,goto_table:80", gwMAC.String()),
},
},
},
Expand Down Expand Up @@ -1170,7 +1170,6 @@ func prepareDefaultFlows(config *testConfig) []expectTableFlows {
}
if config.enableIPv4 {
table31Flows.flows = append(table31Flows.flows,
&ofTestUtils.ExpectFlow{MatchStr: "priority=210,ct_state=-new+trk,ct_mark=0x20,ip,reg0=0x1/0xffff", ActStr: "goto_table:42"},
&ofTestUtils.ExpectFlow{MatchStr: "priority=190,ct_state=+inv+trk,ip", ActStr: "drop"},
)
table105Flows.flows = append(table105Flows.flows,
Expand All @@ -1184,7 +1183,6 @@ func prepareDefaultFlows(config *testConfig) []expectTableFlows {
}
if config.enableIPv6 {
table31Flows.flows = append(table31Flows.flows,
&ofTestUtils.ExpectFlow{MatchStr: "priority=210,ct_state=-new+trk,ct_mark=0x20,ipv6,reg0=0x1/0xffff", ActStr: "goto_table:42"},
&ofTestUtils.ExpectFlow{MatchStr: "priority=190,ct_state=+inv+trk,ipv6", ActStr: "drop"},
)
table105Flows.flows = append(table105Flows.flows,
Expand Down

0 comments on commit f61b5a6

Please sign in to comment.