Skip to content

Commit

Permalink
Add required family flag for conntrack IPv6 operation
Browse files Browse the repository at this point in the history
This change causes kube-proxy to supply the required "-f ipv6"
family flag whenever the conntrack utility is executed and the
associated service is using IPv6.

This change is required for IPv6-only operation.

Note that unit test coverage for the 2-line changes in
pkg/proxy/iptables/proxier.go and /pkg/proxy/ipvs/proxier.go will need
to be added after support for IPv6 service addresses is added to these
files. For pkg/proxy/iptables/proxier.go, this coverage will be added
either with PR kubernetes#48551.

fixes kubernetes#52027
  • Loading branch information
Dane LeBlanc committed Sep 17, 2017
1 parent f35e341 commit 26bd019
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 58 deletions.
3 changes: 2 additions & 1 deletion pkg/proxy/iptables/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1369,7 +1369,8 @@ func (proxier *Proxier) syncProxyRules() {
// This is very low impact. The NodePort range is intentionally obscure, and unlikely to actually collide with real Services.
// This only affects UDP connections, which are not common.
// See issue: https://github.com/kubernetes/kubernetes/issues/49881
err := utilproxy.ClearUDPConntrackForPort(proxier.exec, lp.Port)
isIPv6 := svcInfo.clusterIP.To4() != nil
err := utilproxy.ClearUDPConntrackForPort(proxier.exec, lp.Port, isIPv6)
if err != nil {
glog.Errorf("Failed to clear udp conntrack for port %d, error: %v", lp.Port, err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/proxy/ipvs/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1126,7 +1126,8 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
continue
}
if lp.Protocol == "udp" {
utilproxy.ClearUDPConntrackForPort(proxier.exec, lp.Port)
isIPv6 := svcInfo.clusterIP.To4() != nil
utilproxy.ClearUDPConntrackForPort(proxier.exec, lp.Port, isIPv6)
}
replacementPortsMap[lp] = socket
} // We're holding the port, so it's OK to install ipvs rules.
Expand Down
26 changes: 21 additions & 5 deletions pkg/proxy/util/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package util

import (
"fmt"
"net"
"strconv"
"strings"

Expand All @@ -28,10 +29,23 @@ import (

const noConnectionToDelete = "0 flow entries have been deleted"

// DeleteServiceConnections uses the conntrack tool to delete the conntrack entries
func isIPv6(ip string) bool {
netIP := net.ParseIP(ip)
return netIP != nil && netIP.To4() == nil
}

func parametersWithFamily(isIPv6 bool, parameters ...string) []string {
if isIPv6 {
parameters = append(parameters, "-f", "ipv6")
}
return parameters
}

// ClearUDPConntrackForIP uses the conntrack tool to delete the conntrack entries
// for the UDP connections specified by the given service IP
func ClearUDPConntrackForIP(execer exec.Interface, ip string) error {
err := ExecConntrackTool(execer, "-D", "--orig-dst", ip, "-p", "udp")
parameters := parametersWithFamily(isIPv6(ip), "-D", "--orig-dst", ip, "-p", "udp")
err := ExecConntrackTool(execer, parameters...)
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
Expand Down Expand Up @@ -60,11 +74,12 @@ func ExecConntrackTool(execer exec.Interface, parameters ...string) error {
// The solution is clearing the conntrack. Known issues:
// https://github.com/docker/docker/issues/8795
// https://github.com/kubernetes/kubernetes/issues/31983
func ClearUDPConntrackForPort(execer exec.Interface, port int) error {
func ClearUDPConntrackForPort(execer exec.Interface, port int, isIPv6 bool) error {
if port <= 0 {
return fmt.Errorf("Wrong port number. The port number must be greater than zero")
}
err := ExecConntrackTool(execer, "-D", "-p", "udp", "--dport", strconv.Itoa(port))
parameters := parametersWithFamily(isIPv6, "-D", "-p", "udp", "--dport", strconv.Itoa(port))
err := ExecConntrackTool(execer, parameters...)
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
return fmt.Errorf("error deleting conntrack entries for UDP port: %d, error: %v", port, err)
}
Expand All @@ -74,7 +89,8 @@ func ClearUDPConntrackForPort(execer exec.Interface, port int) error {
// ClearUDPConntrackForPeers uses the conntrack tool to delete the conntrack entries
// for the UDP connections specified by the {origin, dest} IP pair.
func ClearUDPConntrackForPeers(execer exec.Interface, origin, dest string) error {
err := ExecConntrackTool(execer, "-D", "--orig-dst", origin, "--dst-nat", dest, "-p", "udp")
parameters := parametersWithFamily(isIPv6(origin), "-D", "--orig-dst", origin, "--dst-nat", dest, "-p", "udp")
err := ExecConntrackTool(execer, parameters...)
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
Expand Down
122 changes: 71 additions & 51 deletions pkg/proxy/util/conntrack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,27 @@ package util

import (
"fmt"
"strconv"
"strings"
"testing"

"k8s.io/utils/exec"
fakeexec "k8s.io/utils/exec/testing"
)

func familyParamStr(isIPv6 bool) string {
if isIPv6 {
return " -f ipv6"
}
return ""
}

func TestExecConntrackTool(t *testing.T) {
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
func() ([]byte, error) {
return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.")
return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted")
},
},
}
Expand Down Expand Up @@ -81,45 +87,45 @@ func TestClearUDPConntrackForIP(t *testing.T) {
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
func() ([]byte, error) {
return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.")
return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted")
},
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
},
}
fexec := fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}

testCases := [][]string{
{
"10.240.0.3",
"10.240.0.5",
},
{
"10.240.0.4",
},
testCases := []struct {
name string
ip string
}{
{"IPv4 success", "10.240.0.3"},
{"IPv4 success", "10.240.0.5"},
{"IPv4 simulated error", "10.240.0.4"},
{"IPv6 success", "2001:db8::10"},
}

svcCount := 0
for i := range testCases {
for _, ip := range testCases[i] {
if err := ClearUDPConntrackForIP(&fexec, ip); err != nil {
t.Errorf("Unexepected error: %v", err)
}
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", ip)
execCommand := strings.Join(fcmd.CombinedOutputLog[svcCount], " ")
if expectCommand != execCommand {
t.Errorf("Exepect comand: %s, but executed %s", expectCommand, execCommand)
}
svcCount += 1
for _, tc := range testCases {
if err := ClearUDPConntrackForIP(&fexec, tc.ip); err != nil {
t.Errorf("%s test case:, Unexpected error: %v", tc.name, err)
}
if svcCount != fexec.CommandCalls {
t.Errorf("Exepect comand executed %d times, but got %d", svcCount, fexec.CommandCalls)
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", tc.ip) + familyParamStr(isIPv6(tc.ip))
execCommand := strings.Join(fcmd.CombinedOutputLog[svcCount], " ")
if expectCommand != execCommand {
t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand)
}
svcCount++
}
if svcCount != fexec.CommandCalls {
t.Errorf("Expect command executed %d times, but got %d", svcCount, fexec.CommandCalls)
}
}

Expand All @@ -128,39 +134,44 @@ func TestClearUDPConntrackForPort(t *testing.T) {
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
func() ([]byte, error) {
return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.")
return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted")
},
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
},
}
fexec := fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}

testCases := []string{
"8080",
"9090",
testCases := []struct {
name string
port int
isIPv6 bool
}{
{"IPv4, no error", 8080, false},
{"IPv4, simulated error", 9090, false},
{"IPv6, no error", 6666, true},
}
svcCount := 0
for i := range testCases {
portNum, _ := strconv.Atoi(testCases[i])
err := ClearUDPConntrackForPort(&fexec, portNum)
for _, tc := range testCases {
err := ClearUDPConntrackForPort(&fexec, tc.port, tc.isIPv6)
if err != nil {
t.Errorf("Unexpected error: %v", err)
t.Errorf("%s test case: Unexpected error: %v", tc.name, err)
}
expectCommand := fmt.Sprintf("conntrack -D -p udp --dport %s", testCases[i])
expectCommand := fmt.Sprintf("conntrack -D -p udp --dport %d", tc.port) + familyParamStr(tc.isIPv6)
execCommand := strings.Join(fcmd.CombinedOutputLog[svcCount], " ")
if expectCommand != execCommand {
t.Errorf("Exepect comand: %s, but executed %s", expectCommand, execCommand)
}
svcCount += 1

if svcCount != fexec.CommandCalls {
t.Errorf("Exepect comand executed %d times, but got %d", svcCount, fexec.CommandCalls)
t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand)
}
svcCount++
}
if svcCount != fexec.CommandCalls {
t.Errorf("Expect command executed %d times, but got %d", svcCount, fexec.CommandCalls)
}
}

Expand All @@ -169,46 +180,55 @@ func TestDeleteUDPConnections(t *testing.T) {
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
func() ([]byte, error) {
return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.")
return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted")
},
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
},
}
fexec := fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}

testCases := []struct {
name string
origin string
dest string
}{
{
name: "IPv4 success",
origin: "1.2.3.4",
dest: "10.20.30.40",
},
{
name: "IPv4 simulated failure",
origin: "2.3.4.5",
dest: "20.30.40.50",
},
{
name: "IPv6 success",
origin: "fd00::600d:f00d",
dest: "2001:db8::5",
},
}
svcCount := 0
for i := range testCases {
err := ClearUDPConntrackForPeers(&fexec, testCases[i].origin, testCases[i].dest)
for i, tc := range testCases {
err := ClearUDPConntrackForPeers(&fexec, tc.origin, tc.dest)
if err != nil {
t.Errorf("unexpected error: %v", err)
t.Errorf("%s test case: unexpected error: %v", tc.name, err)
}
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", testCases[i].origin, testCases[i].dest)
execCommand := strings.Join(fcmd.CombinedOutputLog[svcCount], " ")
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.origin, tc.dest) + familyParamStr(isIPv6(tc.origin))
execCommand := strings.Join(fcmd.CombinedOutputLog[i], " ")
if expectCommand != execCommand {
t.Errorf("Exepect comand: %s, but executed %s", expectCommand, execCommand)
}
svcCount += 1

if svcCount != fexec.CommandCalls {
t.Errorf("Exepect comand executed %d times, but got %d", svcCount, fexec.CommandCalls)
t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand)
}
svcCount++
}
if svcCount != fexec.CommandCalls {
t.Errorf("Expect command executed %d times, but got %d", svcCount, fexec.CommandCalls)
}
}

0 comments on commit 26bd019

Please sign in to comment.