Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 24 additions & 19 deletions test/extended/networking/egressip.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,35 +742,40 @@ func applyEgressIPObject(oc *exutil.CLI, cloudNetworkClientset cloudnetwork.Inte
_, err := runOcWithRetry(oc.AsAdmin(), "apply", "-f", egressIPYamlPath)
o.Expect(err).NotTo(o.HaveOccurred())

framework.Logf(fmt.Sprintf("Waiting for CloudPrivateIPConfig creation for a maximum of %d seconds", timeout))
var exists bool
var isAssigned bool
o.Eventually(func() bool {
for eip := range egressIPSet {
exists, isAssigned, err = cloudPrivateIpConfigExists(oc, cloudNetworkClientset, eip)
o.Expect(err).NotTo(o.HaveOccurred())
if !exists {
framework.Logf("CloudPrivateIPConfig for %s not found.", eip)
return false
}
if !isAssigned {
framework.Logf("CloudPrivateIPConfig for %s not assigned.", eip)
return false
if cloudNetworkClientset != nil {
framework.Logf(fmt.Sprintf("Waiting for CloudPrivateIPConfig creation for a maximum of %d seconds", timeout))
var exists bool
var isAssigned bool
o.Eventually(func() bool {
for eip := range egressIPSet {
exists, isAssigned, err = cloudPrivateIpConfigExists(oc, cloudNetworkClientset, eip)
o.Expect(err).NotTo(o.HaveOccurred())
if !exists {
framework.Logf("CloudPrivateIPConfig for %s not found.", eip)
return false
}
if !isAssigned {
framework.Logf("CloudPrivateIPConfig for %s not assigned.", eip)
return false
}
}
}
framework.Logf("CloudPrivateIPConfigs for %v found.", egressIPSet)
return true
}, time.Duration(timeout)*time.Second, 5*time.Second).Should(o.BeTrue())
framework.Logf("CloudPrivateIPConfigs for %v found.", egressIPSet)
return true
}, time.Duration(timeout)*time.Second, 5*time.Second).Should(o.BeTrue())
}

framework.Logf(fmt.Sprintf("Waiting for EgressIP addresses inside status of EgressIP CR %s for a maximum of %d seconds", egressIPObjectName, timeout))
var hasIP bool
var nodeName string
o.Eventually(func() bool {
for eip := range egressIPSet {
hasIP, err = egressIPStatusHasIP(oc, egressIPObjectName, eip)
hasIP, nodeName, err = egressIPStatusHasIP(oc, egressIPObjectName, eip)
o.Expect(err).NotTo(o.HaveOccurred())
if !hasIP {
framework.Logf("EgressIP object %s does not have IP %s in its status field.", egressIPObjectName, eip)
return false
} else {
egressIPSet[eip] = nodeName
}
}
framework.Logf("Egress IP object %s does have all IPs for %v.", egressIPObjectName, egressIPSet)
Expand Down
112 changes: 57 additions & 55 deletions test/extended/networking/egressip_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ func createPacketSnifferDaemonSet(oc *exutil.CLI, namespace string, scheduleOnHo
}

var ds *appsv1.DaemonSet
retries := 12
retries := 48
pollInterval := 5
for i := 0; i < retries; i++ {
// Get the DS
Expand All @@ -415,7 +415,9 @@ func createPacketSnifferDaemonSet(oc *exutil.CLI, namespace string, scheduleOnHo

// Check if NumberReady == DesiredNumberScheduled.
// In that case, simply return as all went well.
if ds.Status.NumberReady == ds.Status.DesiredNumberScheduled {
if ds.Status.NumberReady == ds.Status.DesiredNumberScheduled &&
ds.Status.CurrentNumberScheduled == ds.Status.DesiredNumberScheduled &&
ds.Status.DesiredNumberScheduled > 0 {
return ds, nil
}
// If no port conflict error was found, simply sleep for pollInterval and then
Expand All @@ -426,15 +428,14 @@ func createPacketSnifferDaemonSet(oc *exutil.CLI, namespace string, scheduleOnHo
// The DaemonSet is not ready, but this is not because of a port conflict.
// This shouldn't happen and other parts of the code will likely report this error
// as a CI failure.
return ds, fmt.Errorf("Daemonset still not ready after %d tries", retries)
return ds, fmt.Errorf("Daemonset still not ready after %d tries: ready=%d, scheduled=%d, desired=%d", retries, ds.Status.NumberReady, ds.Status.CurrentNumberScheduled, ds.Status.DesiredNumberScheduled)
}

const (
// The tcpCaptureScript runs tcpdump and extracts all GET request strings from the packets.
// The resulting lines will be something like:
// 10.128.2.15.36749 /f8f721fa-53c9-444f-bc96-69c7388fcb5a
tcpCaptureScript = `#!/bin/bash
tcpdump -nn -i %s -l -s 0 -A 'tcp and port %d' | awk '/IP / || /IP6 / {ip=$3} /GET \// {print ip, $2}'
// Parsed 05:38:34.307832 10.128.2.15.36749 /f8f721fa-53c9-444f-bc96-69c7388fcb5a
tcpCaptureScript = `tcpdump -nn -i %s -l -s 0 -A 'tcp and port %d' | awk 'match($0,/IP6?[[:space:]]+([0-9a-fA-F:\.]+[0-9a-fA-F])/,arr) {ts=$1; ip=arr[1]} $0 !~ /HTTP.*GET/ && match($0,/GET[[:space:]]+([^[:space:]]+)/,arr) {print "Parsed", ts, ip, arr[1]} // {print $0}'
`

// The udpCaptureScript runs tcpdump with option -xx and then decodes the hexadecimal information.
Expand All @@ -443,8 +444,8 @@ tcpdump -nn -i %s -l -s 0 -A 'tcp and port %d' | awk '/IP / || /IP6 / {ip=$3} /G
// that's captured).
// tshark would definitely be the better tool here, but that would introduce another dependency. Hence,
// decode the hexadecimal information and look for payload that is marked with 'START(.*)EOF$' and extract
// the '(.*)' part. The resulting lines will be `sourceIP + " " + z.group(1)`, hence something like:
// 10.128.2.15.36749 f8f721fa-53c9-444f-bc96-69c7388fcb5a
// the '(.*)' part. The resulting lines will be `"Parsed " + timestamp + " " + sourceIP + " " + z.group(1) + "_" + z.group(2)`, hence something like:
// Parsed 05:38:34.307832 10.128.2.15.36749 f8f721fa-53c9-444f-bc96-69c7388fcb5a_1
udpCaptureScript = `#!/bin/bash

cat <<'EOF' > capture-python.py
Expand All @@ -466,6 +467,7 @@ udpPayloadOffset = 0
# globals
fullHex = []
sourceIP = ""
timeStamp = ""

def decodePayload(hexArray):
payloadStr = ""
Expand All @@ -482,9 +484,9 @@ def printLine():
global fullHex
if sourceIP != "" and fullHex != []:
decodedPayload = decodePayload(fullHex)
z = re.search(r'START(.*)EOF$', decodedPayload)
z = re.search(r'START(.*)EOF_(\d+)', decodedPayload)
if z:
print(sourceIP + " " + z.group(1))
print("Parsed " + timeStamp + " " + sourceIP + " " + z.group(1) + "_" + z.group(2))
fullHex = []
sourceIP = ""

Expand All @@ -497,6 +499,7 @@ for line in sys.stdin:
printLine()
elif not re.match(r'^$', line):
printLine()
timeStamp = line.split()[0]
sourceIP = line.split()[sourceIPOffset]

printLine()
Expand Down Expand Up @@ -545,16 +548,6 @@ func createHostNetworkedPacketSnifferDaemonSet(clientset kubernetes.Interface, n
},
},
}
readinessProbe := &v1.Probe{
ProbeHandler: v1.ProbeHandler{
Exec: &v1.ExecAction{
Command: []string{
"echo",
"ready",
},
},
},
}
runAsUser := int64(0)
securityContext := &v1.SecurityContext{
RunAsUser: &runAsUser,
Expand Down Expand Up @@ -582,14 +575,19 @@ func createHostNetworkedPacketSnifferDaemonSet(clientset kubernetes.Interface, n
Labels: podLabels,
},
Spec: corev1.PodSpec{
Tolerations: []v1.Toleration{
{
Key: "node-role.kubernetes.io/master",
Effect: corev1.TaintEffectNoSchedule,
},
},
Affinity: &nodeAffinity,
HostNetwork: true,
Containers: []v1.Container{
{
Name: "tcpdump",
Image: networkPacketSnifferImage,
Command: podCommand,
ReadinessProbe: readinessProbe,
SecurityContext: securityContext,
TTY: true, // needed for immediate log propagation
Stdin: true, // needed for immediate log propagation
Expand Down Expand Up @@ -651,31 +649,33 @@ func scanPacketSnifferDaemonSetPodLogs(oc *exutil.CLI, ds *appsv1.DaemonSet, tar
scanner := bufio.NewScanner(buf)
for scanner.Scan() {
logLine := scanner.Text()
if strings.Contains(logLine, searchString) {
// Currently, it is not necessary to discriminate by protocol.
// a log line should look like this for http:
// 10.0.144.5.33226 /bed729aa-4e83-482d-a433-db798e569147
// a log line should look like this for udp:
// 10.0.144.5.33226 bed729aa-4e83-482d-a433-db798e569147
// Should it ever be necessary, the targetProtocol to this method (which is currently
// not used) serves this purpose.
framework.Logf("Found hit in log line: %s", logLine)
logLineExploded := strings.Fields(logLine)
if len(logLineExploded) != 2 {
return nil, fmt.Errorf("Unexpected logline content: %s", logLine)
}
ipAddressPortExploded := strings.Split(logLineExploded[0], ".")
if len(ipAddressPortExploded) == 2 {
// ipv6
ip = ipAddressPortExploded[0]
} else if len(ipAddressPortExploded) == 5 {
// ipv4
ip = strings.Join(ipAddressPortExploded[:len(ipAddressPortExploded)-1], ".")
} else {
return nil, fmt.Errorf("Unexpected logline content, invalid IP/Port: %s", logLine)
}
matchedIPs[ip]++

if !strings.HasPrefix(logLine, "Parsed") || !strings.Contains(logLine, searchString) {
continue
}
// Currently, it is not necessary to discriminate by protocol.
// a log line should look like this for http:
// 10.0.144.5.33226 /bed729aa-4e83-482d-a433-db798e569147
// a log line should look like this for udp:
// 10.0.144.5.33226 bed729aa-4e83-482d-a433-db798e569147
// Should it ever be necessary, the targetProtocol to this method (which is currently
// not used) serves this purpose.
framework.Logf("Found hit in log line for node %s: %s", pod.Spec.NodeName, logLine)
logLineExploded := strings.Fields(logLine)
if len(logLineExploded) != 4 {
return nil, fmt.Errorf("Unexpected logline content %s", logLine)
}
ipAddressPortExploded := strings.Split(logLineExploded[2], ".")
if len(ipAddressPortExploded) == 2 {
// ipv6
ip = ipAddressPortExploded[0]
} else if len(ipAddressPortExploded) == 5 {
// ipv4
ip = strings.Join(ipAddressPortExploded[:len(ipAddressPortExploded)-1], ".")
} else {
return nil, fmt.Errorf("Unexpected logline content, invalid IP/Port: %s", logLine)
}
matchedIPs[ip]++
}
}
return matchedIPs, nil
Expand Down Expand Up @@ -1271,26 +1271,28 @@ func sendProbesToHostPort(oc *exutil.CLI, proberPod *v1.Pod, url, targetProtocol
request := fmt.Sprintf("http://%s/dial?protocol=%s&host=%s&port=%d&request=%s", url, targetProtocol, targetHost, targetPort, randomIDStr)
var wg sync.WaitGroup
errChan := make(chan error, iterations)

for i := 0; i < iterations; i++ {
// Make sure that we don´t reuse the i variable when passing it to the go func.
i := i
interval := i
// Randomize the start time a little bit per go routine.
// Max of 250 ms * current iteration counter
n := rand.Intn(250) * i
framework.Logf("Sleeping for %d ms for iteration %d", n, i)
n := rand.Intn(250) * interval
framework.Logf("Sleeping for %d ms for iteration %d", n, interval)
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(time.Duration(n) * time.Millisecond)
output, err := runOcWithRetry(oc.AsAdmin(), "exec", proberPod.Name, "--", "curl", "--max-time", "15", "-s", request)
output, err := runOcWithRetry(oc.AsAdmin(), "exec", proberPod.Name, "--", "curl", "--max-time", "15", "-s", fmt.Sprintf("%s_%d", request, i))
framework.Logf("Probed with output: %s", output)
// Report errors.
if err != nil {
errChan <- fmt.Errorf("Query failed. Request: %s, Output: %s, Error: %v", request, output, err)
}
return
}()
}
wg.Wait()
close(errChan) // Close the channel after all goroutines finish

// If the above yielded any errors, then append them to a list and report them.
if len(errChan) > 0 {
Expand Down Expand Up @@ -1729,21 +1731,21 @@ func cloudPrivateIpConfigExists(oc *exutil.CLI, cloudNetworkClientset cloudnetwo
}

// egressIPStatusHasIP returns if a given ip was found in a given EgressIP object's status field.
func egressIPStatusHasIP(oc *exutil.CLI, egressIPObjectName string, ip string) (bool, error) {
func egressIPStatusHasIP(oc *exutil.CLI, egressIPObjectName string, ip string) (bool, string, error) {
eip, err := getEgressIP(oc, egressIPObjectName)
if err != nil {
if errors.IsNotFound(err) {
return false, nil
return false, "", nil
}
return false, fmt.Errorf("Error looking up EgressIP %s, err: %v", egressIPObjectName, err)
return false, "", fmt.Errorf("Error looking up EgressIP %s, err: %v", egressIPObjectName, err)
}
for _, egressIPStatusItem := range eip.Status.Items {
if egressIPStatusItem.EgressIP == ip {
return true, nil
return true, egressIPStatusItem.Node, nil
}
}

return false, nil
return false, "", nil
}

// sdnNamespaceAddEgressIP adds EgressIP <egressip> to netnamespace <namespace>.
Expand Down
Loading