Skip to content

Commit

Permalink
Add agnhost for E2E test (#1981)
Browse files Browse the repository at this point in the history
* Add agnhost for E2E test

1. Change to use agnhost to test connectivity between Pods in E2E test.
2. Add UDP, SCTP test case in E2E test(OVS userspace conntrack doesn't support SCTP, SCTP case will be skipped).
3. Add one port listen to multi-protocol connections situation in E2E test.
  • Loading branch information
GraysonWu committed Mar 29, 2021
1 parent f4599c0 commit 85456c9
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 85 deletions.
6 changes: 3 additions & 3 deletions hack/netpol/pkg/utils/k8s_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (k *Kubernetes) Probe(ns1, pod1, ns2, pod2 string, port int) (bool, error)
"/bin/sh",
"-c",
// 3 tries, timeout is 1 second
fmt.Sprintf("for i in $(seq 1 3); do ncat -vz -w 1 %s %d && exit 0 || true; done; exit 1", toIP, port),
fmt.Sprintf("for i in $(seq 1 3); do /agnhost connect %s:%d --timeout=1s --protocol=tcp && exit 0 || true; done; exit 1", toIP, port),
}
// HACK: inferring container name as c80, c81, etc, for simplicity.
containerName := fmt.Sprintf("c%v", port)
Expand Down Expand Up @@ -205,9 +205,9 @@ func (k *Kubernetes) CreateOrUpdateDeployment(ns, deploymentName string, replica
return v1.Container{
Name: fmt.Sprintf("c%d", port),
ImagePullPolicy: v1.PullIfNotPresent,
Image: "antrea/netpol-test:latest",
Image: "k8s.gcr.io/e2e-test-images/agnhost:2.29",
// "-k" for persistent server
Command: []string{"ncat", "-lk", "-p", fmt.Sprintf("%d", port)},
Command: []string{"/agnhost", "serve-hostname", "--tcp", "--http=false", "--port", fmt.Sprintf("%d", port)},
SecurityContext: &v1.SecurityContext{},
Ports: []v1.ContainerPort{
{
Expand Down
138 changes: 80 additions & 58 deletions test/e2e/antreapolicy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ import (

// common for all tests.
var (
allPods []Pod
k8sUtils *KubernetesUtils
allTestList []*TestCase
pods, namespaces []string
podIPs map[string]string
p80, p81, p5000, p8080, p8081, p8082, p8085 int32
allPods []Pod
k8sUtils *KubernetesUtils
allTestList []*TestCase
pods, namespaces []string
podIPs map[string]string
p80, p81, p8080, p8081, p8082, p8085 int32
)

const (
Expand Down Expand Up @@ -101,7 +101,6 @@ type CustomProbe struct {
func initialize(t *testing.T, data *TestData) {
p80 = 80
p81 = 81
p5000 = 5000
p8080 = 8080
p8081 = 8081
p8082 = 8082
Expand Down Expand Up @@ -755,14 +754,17 @@ func testACNPPriorityOverrideDefaultDeny(t *testing.T) {
}

// testACNPAllowNoDefaultIsolation tests that no default isolation rules are created for Policies.
func testACNPAllowNoDefaultIsolation(t *testing.T) {
func testACNPAllowNoDefaultIsolation(t *testing.T, protocol v1.Protocol) {
if protocol == v1.ProtocolSCTP {
skipIfProviderIs(t, "kind", "OVS userspace conntrack does not have the SCTP support for now.")
}
builder := &ClusterNetworkPolicySpecBuilder{}
builder = builder.SetName("acnp-allow-x-ingress-y-egress-z").
SetPriority(1.1).
SetAppliedToGroup([]ACNPAppliedToSpec{{NSSelector: map[string]string{"ns": "x"}}})
builder.AddIngress(v1.ProtocolTCP, &p81, nil, nil, nil, nil, map[string]string{"ns": "y"},
builder.AddIngress(protocol, &p81, nil, nil, nil, nil, map[string]string{"ns": "y"},
nil, nil, nil, secv1alpha1.RuleActionAllow, "", "")
builder.AddEgress(v1.ProtocolTCP, &p81, nil, nil, nil, nil, map[string]string{"ns": "z"},
builder.AddEgress(protocol, &p81, nil, nil, nil, nil, map[string]string{"ns": "z"},
nil, nil, nil, secv1alpha1.RuleActionAllow, "", "")

reachability := NewReachability(allPods, Connected)
Expand All @@ -773,7 +775,7 @@ func testACNPAllowNoDefaultIsolation(t *testing.T) {
[]metav1.Object{builder.Get()},
nil,
[]int32{81},
v1.ProtocolTCP,
protocol,
0,
nil,
},
Expand All @@ -785,12 +787,15 @@ func testACNPAllowNoDefaultIsolation(t *testing.T) {
}

// testACNPDropEgress tests that a ACNP is able to drop egress traffic from pods labelled A to namespace Z.
func testACNPDropEgress(t *testing.T) {
func testACNPDropEgress(t *testing.T, protocol v1.Protocol) {
if protocol == v1.ProtocolSCTP {
skipIfProviderIs(t, "kind", "OVS userspace conntrack does not have the SCTP support for now.")
}
builder := &ClusterNetworkPolicySpecBuilder{}
builder = builder.SetName("acnp-deny-a-to-z-egress").
SetPriority(1.0).
SetAppliedToGroup([]ACNPAppliedToSpec{{PodSelector: map[string]string{"pod": "a"}}})
builder.AddEgress(v1.ProtocolTCP, &p80, nil, nil, nil, nil, map[string]string{"ns": "z"},
builder.AddEgress(protocol, &p80, nil, nil, nil, nil, map[string]string{"ns": "z"},
nil, nil, nil, secv1alpha1.RuleActionDrop, "", "")

reachability := NewReachability(allPods, Connected)
Expand All @@ -810,7 +815,7 @@ func testACNPDropEgress(t *testing.T) {
[]metav1.Object{builder.Get()},
nil,
[]int32{80},
v1.ProtocolTCP,
protocol,
0,
nil,
},
Expand All @@ -821,6 +826,55 @@ func testACNPDropEgress(t *testing.T) {
executeTests(t, testCase)
}

// testACNPNoEffectOnOtherProtocols tests that a ACNP which drops TCP traffic won't affect other protocols (e.g. UDP).
func testACNPNoEffectOnOtherProtocols(t *testing.T) {
builder := &ClusterNetworkPolicySpecBuilder{}
builder = builder.SetName("acnp-deny-a-to-z-ingress").
SetPriority(1.0).
SetAppliedToGroup([]ACNPAppliedToSpec{{PodSelector: map[string]string{"pod": "a"}}})
builder.AddIngress(v1.ProtocolTCP, &p80, nil, nil, nil, nil, map[string]string{"ns": "z"},
nil, nil, nil, secv1alpha1.RuleActionDrop, "", "")

reachability1 := NewReachability(allPods, Connected)
reachability1.Expect(Pod("z/a"), Pod("x/a"), Dropped)
reachability1.Expect(Pod("z/b"), Pod("x/a"), Dropped)
reachability1.Expect(Pod("z/c"), Pod("x/a"), Dropped)
reachability1.Expect(Pod("z/a"), Pod("y/a"), Dropped)
reachability1.Expect(Pod("z/b"), Pod("y/a"), Dropped)
reachability1.Expect(Pod("z/c"), Pod("y/a"), Dropped)
reachability1.Expect(Pod("z/b"), Pod("z/a"), Dropped)
reachability1.Expect(Pod("z/c"), Pod("z/a"), Dropped)

reachability2 := NewReachability(allPods, Connected)

testStep := []*TestStep{
{
"Port 80",
reachability1,
[]metav1.Object{builder.Get()},
nil,
[]int32{80},
v1.ProtocolTCP,
0,
nil,
},
{
"Port 80",
reachability2,
[]metav1.Object{builder.Get()},
nil,
[]int32{80},
v1.ProtocolUDP,
0,
nil,
},
}
testCase := []*TestCase{
{"ACNP Drop Ingress From All Pod:a to NS:z TCP Not UDP", testStep},
}
executeTests(t, testCase)
}

// testACNPAppliedToDenyXBtoCGWithYA tests traffic from X/B to ClusterGroup Y/A on named port 81 is dropped.
func testACNPAppliedToDenyXBtoCGWithYA(t *testing.T) {
cgName := "cg-pods-ya"
Expand Down Expand Up @@ -1649,12 +1703,12 @@ func testACNPRejectEgress(t *testing.T) {
}

// testACNPRejectIngress tests that a ACNP is able to reject egress traffic from pods labelled A to namespace Z.
func testACNPRejectIngress(t *testing.T) {
func testACNPRejectIngress(t *testing.T, protocol v1.Protocol) {
builder := &ClusterNetworkPolicySpecBuilder{}
builder = builder.SetName("acnp-reject-a-from-z-ingress").
SetPriority(1.0).
SetAppliedToGroup([]ACNPAppliedToSpec{{PodSelector: map[string]string{"pod": "a"}}})
builder.AddIngress(v1.ProtocolTCP, &p80, nil, nil, nil, nil, map[string]string{"ns": "z"},
builder.AddIngress(protocol, &p80, nil, nil, nil, nil, map[string]string{"ns": "z"},
nil, nil, nil, secv1alpha1.RuleActionReject, "", "")

reachability := NewReachability(allPods, Connected)
Expand All @@ -1674,7 +1728,7 @@ func testACNPRejectIngress(t *testing.T) {
[]metav1.Object{builder.Get()},
nil,
[]int32{80},
v1.ProtocolTCP,
protocol,
0,
nil,
},
Expand All @@ -1685,43 +1739,6 @@ func testACNPRejectIngress(t *testing.T) {
executeTests(t, testCase)
}

// testACNPRejectIngressUDP tests that a ACNP is able to reject egress traffic from pods labelled A to namespace Z.
func testACNPRejectIngressUDP(t *testing.T) {
builder := &ClusterNetworkPolicySpecBuilder{}
builder = builder.SetName("acnp-reject-a-from-z-ingress-udp").
SetPriority(1.0).
SetAppliedToGroup([]ACNPAppliedToSpec{{PodSelector: map[string]string{"pod": "a"}}})
builder.AddIngress(v1.ProtocolUDP, &p5000, nil, nil, nil, nil, map[string]string{"ns": "z"},
nil, nil, nil, secv1alpha1.RuleActionReject, "", "")

reachability := NewReachability(allPods, Connected)
reachability.Expect(Pod("z/a"), Pod("x/a"), Rejected)
reachability.Expect(Pod("z/b"), Pod("x/a"), Rejected)
reachability.Expect(Pod("z/c"), Pod("x/a"), Rejected)
reachability.Expect(Pod("z/a"), Pod("y/a"), Rejected)
reachability.Expect(Pod("z/b"), Pod("y/a"), Rejected)
reachability.Expect(Pod("z/c"), Pod("y/a"), Rejected)
reachability.Expect(Pod("z/b"), Pod("z/a"), Rejected)
reachability.Expect(Pod("z/c"), Pod("z/a"), Rejected)

testStep := []*TestStep{
{
"Port 5000",
reachability,
[]metav1.Object{builder.Get()},
nil,
[]int32{5000},
v1.ProtocolUDP,
0,
nil,
},
}
testCase := []*TestCase{
{"ACNP Reject UDP ingress from NS:z to All Pod:a", testStep},
}
executeTests(t, testCase)
}

// testANPPortRange tests the port range in a ANP can work.
func testANPPortRange(t *testing.T) {
builder := &AntreaNetworkPolicySpecBuilder{}
Expand Down Expand Up @@ -2317,12 +2334,17 @@ func TestAntreaPolicy(t *testing.T) {

t.Run("TestGroupNoK8sNP", func(t *testing.T) {
// testcases below do not depend on underlying default-deny K8s NetworkPolicies.
t.Run("Case=ACNPAllowNoDefaultIsolation", func(t *testing.T) { testACNPAllowNoDefaultIsolation(t) })
t.Run("Case=ACNPDropEgress", func(t *testing.T) { testACNPDropEgress(t) })
t.Run("Case=ACNPAllowNoDefaultIsolationTCP", func(t *testing.T) { testACNPAllowNoDefaultIsolation(t, v1.ProtocolTCP) })
t.Run("Case=ACNPAllowNoDefaultIsolationUDP", func(t *testing.T) { testACNPAllowNoDefaultIsolation(t, v1.ProtocolUDP) })
t.Run("Case=ACNPAllowNoDefaultIsolationSCTP", func(t *testing.T) { testACNPAllowNoDefaultIsolation(t, v1.ProtocolSCTP) })
t.Run("Case=ACNPDropEgress", func(t *testing.T) { testACNPDropEgress(t, v1.ProtocolTCP) })
t.Run("Case=ACNPDropEgressUDP", func(t *testing.T) { testACNPDropEgress(t, v1.ProtocolUDP) })
t.Run("Case=ACNPDropEgressSCTP", func(t *testing.T) { testACNPDropEgress(t, v1.ProtocolSCTP) })
t.Run("Case=ACNPPortRange", func(t *testing.T) { testACNPPortRange(t) })
t.Run("Case=ACNPRejectEgress", func(t *testing.T) { testACNPRejectEgress(t) })
t.Run("Case=ACNPRejectIngress", func(t *testing.T) { testACNPRejectIngress(t) })
t.Run("Case=ACNPRejectIngressUDP", func(t *testing.T) { testACNPRejectIngressUDP(t) })
t.Run("Case=ACNPRejectIngress", func(t *testing.T) { testACNPRejectIngress(t, v1.ProtocolTCP) })
t.Run("Case=ACNPRejectIngressUDP", func(t *testing.T) { testACNPRejectIngress(t, v1.ProtocolUDP) })
t.Run("Case=ACNPNoEffectOnOtherProtocols", func(t *testing.T) { testACNPNoEffectOnOtherProtocols(t) })
t.Run("Case=ACNPBaselinePolicy", func(t *testing.T) { testBaselineNamespaceIsolation(t) })
t.Run("Case=ACNPPrioirtyOverride", func(t *testing.T) { testACNPPriorityOverride(t) })
t.Run("Case=ACNPTierOverride", func(t *testing.T) { testACNPTierOverride(t) })
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -937,8 +937,8 @@ func (data *TestData) createServerPod(name string, portName string, portNum int3

// createCustomPod creates a Pod in given Namespace with custom labels.
func (data *TestData) createServerPodWithLabels(name, ns string, portNum int32, labels map[string]string) error {
cmd := []string{"ncat", "-lk", "-p", fmt.Sprintf("%d", portNum)}
image := "antrea/netpol-test:latest"
cmd := []string{"/agnhost", "serve-hostname", "--tcp", "--http=false", "--port", fmt.Sprintf("%d", portNum)}
image := "k8s.gcr.io/e2e-test-images/agnhost:2.29"
env := corev1.EnvVar{Name: fmt.Sprintf("SERVE_PORT_%d", portNum), Value: "foo"}
port := corev1.ContainerPort{ContainerPort: portNum}
containerName := fmt.Sprintf("c%v", portNum)
Expand Down
45 changes: 27 additions & 18 deletions test/e2e/k8s_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (k *KubernetesUtils) GetPodsByLabel(ns string, key string, val string) ([]v
}

// Probe execs into a Pod and checks its connectivity to another Pod. Of course it assumes
// that the target Pod is serving on the input port, and also that ncat is installed.
// that the target Pod is serving on the input port, and also that agnhost is installed.
func (k *KubernetesUtils) Probe(ns1, pod1, ns2, pod2 string, port int32, protocol v1.Protocol) (PodConnectivityMark, error) {
fromPods, err := k.GetPodsByLabel(ns1, "pod", pod1)
if err != nil {
Expand Down Expand Up @@ -112,9 +112,11 @@ func (k *KubernetesUtils) Probe(ns1, pod1, ns2, pod2 string, port int32, protoco
}
switch protocol {
case v1.ProtocolTCP:
cmd = append(cmd, fmt.Sprintf("for i in $(seq 1 3); do ncat -vz -w 1 %s %d && exit 0 || true; done; exit 1", toIP, port))
cmd = append(cmd, fmt.Sprintf("for i in $(seq 1 3); do /agnhost connect %s:%d --timeout=1s --protocol=tcp && exit 0 || true; done; exit 1", toIP, port))
case v1.ProtocolUDP:
cmd = append(cmd, fmt.Sprintf("for i in $(seq 1 3); do ncat -uvz -w 1 %s %d && exit 0 || true; done; exit 1", toIP, port))
cmd = append(cmd, fmt.Sprintf("for i in $(seq 1 3); do /agnhost connect %s:%d --timeout=1s --protocol=udp && exit 0 || true; done; exit 1", toIP, port))
case v1.ProtocolSCTP:
cmd = append(cmd, fmt.Sprintf("for i in $(seq 1 3); do /agnhost connect %s:%d --timeout=1s --protocol=sctp && exit 0 || true; done; exit 1", toIP, port))
}
// HACK: inferring container name as c80, c81, etc, for simplicity.
containerName := fmt.Sprintf("c%v", port)
Expand All @@ -124,12 +126,11 @@ func (k *KubernetesUtils) Probe(ns1, pod1, ns2, pod2 string, port int32, protoco
// log this error as trace since may be an expected failure
log.Tracef("%s/%s -> %s/%s: error when running command: err - %v /// stdout - %s /// stderr - %s", ns1, pod1, ns2, pod2, err, stdout, stderr)
// do not return an error
if protocol == v1.ProtocolTCP && strings.Contains(stderr, TCPRejectProbeReturn) || protocol == v1.ProtocolUDP && strings.Contains(stderr, UDPRejectProbeReturn) {
return Rejected, nil
} else if strings.Contains(stderr, DropProbeReturn) {
if strings.Contains(stderr, "TIMEOUT") {
return Dropped, nil
} else {
return Rejected, nil
}
return Error, nil
}
return Connected, nil
}
Expand Down Expand Up @@ -162,19 +163,25 @@ func (k *KubernetesUtils) CreateOrUpdateDeployment(ns, deploymentName string, re
zero := int64(0)
log.Infof("Creating/updating Deployment '%s/%s'", ns, deploymentName)
makeContainerSpec := func(port int32, protocol v1.Protocol) v1.Container {
var command []string
var args []string
switch protocol {
case v1.ProtocolTCP:
command = []string{"ncat", "-lk", "-p", fmt.Sprintf("%d", port)}
args = []string{fmt.Sprintf("/agnhost serve-hostname --tcp --http=false --port=%d", port)}
case v1.ProtocolUDP:
command = []string{"socat", "-", fmt.Sprintf("udp-listen:%d,fork", port)}
args = []string{fmt.Sprintf("/agnhost serve-hostname --udp --http=false --port=%d", port)}
case v1.ProtocolSCTP:
args = []string{fmt.Sprintf("/agnhost porter")}
default:
args = []string{fmt.Sprintf("/agnhost serve-hostname --udp --http=false --port=%d & /agnhost serve-hostname --tcp --http=false --port=%d & /agnhost porter", port, port)}

}
return v1.Container{
Name: fmt.Sprintf("c%d", port),
ImagePullPolicy: v1.PullIfNotPresent,
Image: "antrea/netpol-test:latest",
// "-k" for persistent server
Command: command,
Image: "k8s.gcr.io/e2e-test-images/agnhost:2.29",
Env: []v1.EnvVar{{Name: fmt.Sprintf("SERVE_SCTP_PORT_%d", port), Value: "foo"}},
Command: []string{"/bin/bash", "-c"},
Args: args,
SecurityContext: &v1.SecurityContext{},
Ports: []v1.ContainerPort{
{
Expand Down Expand Up @@ -202,9 +209,8 @@ func (k *KubernetesUtils) CreateOrUpdateDeployment(ns, deploymentName string, re
Spec: v1.PodSpec{
TerminationGracePeriodSeconds: &zero,
Containers: []v1.Container{
makeContainerSpec(80, v1.ProtocolTCP),
makeContainerSpec(81, v1.ProtocolTCP),
makeContainerSpec(5000, v1.ProtocolUDP),
makeContainerSpec(80, "ALL"),
makeContainerSpec(81, "ALL"),
makeContainerSpec(8080, v1.ProtocolTCP),
makeContainerSpec(8081, v1.ProtocolTCP),
makeContainerSpec(8082, v1.ProtocolTCP),
Expand Down Expand Up @@ -569,13 +575,16 @@ func (k *KubernetesUtils) waitForPodInNamespace(ns string, pod string) (*string,

func (k *KubernetesUtils) waitForHTTPServers(allPods []Pod) error {
const maxTries = 10
log.Infof("waiting for HTTP servers (ports 80, 81, 5000 and 8080:8085) to become ready")
log.Infof("waiting for HTTP servers (ports 80, 81 and 8080:8085) to become ready")
var wrong int
for i := 0; i < maxTries; i++ {
reachability := NewReachability(allPods, Connected)
k.Validate(allPods, reachability, 80, v1.ProtocolTCP)
k.Validate(allPods, reachability, 80, v1.ProtocolUDP)
k.Validate(allPods, reachability, 80, v1.ProtocolSCTP)
k.Validate(allPods, reachability, 81, v1.ProtocolTCP)
k.Validate(allPods, reachability, 5000, v1.ProtocolUDP)
k.Validate(allPods, reachability, 81, v1.ProtocolUDP)
k.Validate(allPods, reachability, 81, v1.ProtocolSCTP)
for j := 8080; j < 8086; j++ {
k.Validate(allPods, reachability, int32(j), v1.ProtocolTCP)
}
Expand Down
4 changes: 0 additions & 4 deletions test/e2e/reachability.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ const (
Error PodConnectivityMark = "Err"
Dropped PodConnectivityMark = "Drp"
Rejected PodConnectivityMark = "Rej"

TCPRejectProbeReturn string = "Connection refused."
UDPRejectProbeReturn string = "Host is unreachable."
DropProbeReturn string = "Operation timed out."
)

type Connectivity struct {
Expand Down

0 comments on commit 85456c9

Please sign in to comment.