From 79b15701864e9050519c63b6e98974923fb93a72 Mon Sep 17 00:00:00 2001 From: Peng Liu Date: Mon, 14 Apr 2025 23:23:21 -0400 Subject: [PATCH] Make EgressIP helper functions more reliable Parse tcpdump output based on regex instead of position. Send probe requests with a sequence number for debgging. Use ClusterIP instead of route as the prober target. Signed-off-by: Peng Liu (cherry picked from commit df3726412ac129487dd6ed67c550e33fcf5b51b9) --- test/extended/networking/egressip_helpers.go | 98 ++++++++++---------- 1 file changed, 49 insertions(+), 49 deletions(-) diff --git a/test/extended/networking/egressip_helpers.go b/test/extended/networking/egressip_helpers.go index e6da27d8c9f5..bbc1f0644fb6 100644 --- a/test/extended/networking/egressip_helpers.go +++ b/test/extended/networking/egressip_helpers.go @@ -404,7 +404,7 @@ func createPacketSnifferDaemonSet(oc *exutil.CLI, namespace string, scheduleOnHo } var ds *appsv1.DaemonSet - retries := 12 + retries := 48 pollInterval := 5 for i := 0; i < retries; i++ { // Get the DS @@ -426,15 +426,14 @@ func createPacketSnifferDaemonSet(oc *exutil.CLI, namespace string, scheduleOnHo // The DaemonSet is not ready, but this is not because of a port conflict. // This shouldn't happen and other parts of the code will likely report this error // as a CI failure. - return ds, fmt.Errorf("Daemonset still not ready after %d tries", retries) + return ds, fmt.Errorf("Daemonset still not ready after %d tries: ready=%d, scheduled=%d, desired=%d", retries, ds.Status.NumberReady, ds.Status.CurrentNumberScheduled, ds.Status.DesiredNumberScheduled) } const ( // The tcpCaptureScript runs tcpdump and extracts all GET request strings from the packets. // The resulting lines will be something like: - // 10.128.2.15.36749 /f8f721fa-53c9-444f-bc96-69c7388fcb5a - tcpCaptureScript = `#!/bin/bash -tcpdump -nne -i %s -l -s 0 'port %d and tcp[((tcp[12:1] & 0xf0) >> 2):4] = 0x47455420' | awk '{print $10 " " $(NF-1)}' + // Parsed 05:38:34.307832 10.128.2.15.36749 /f8f721fa-53c9-444f-bc96-69c7388fcb5a + tcpCaptureScript = `tcpdump -nn -i %s -l -s 0 -A 'tcp and port %d' | awk 'match($0,/IP6?[[:space:]]+([0-9a-fA-F:\.]+[0-9a-fA-F])/,arr) {ts=$1; ip=arr[1]} $0 !~ /HTTP.*GET/ && match($0,/GET[[:space:]]+([^[:space:]]+)/,arr) {print "Parsed", ts, ip, arr[1]} // {print $0}' ` // The udpCaptureScript runs tcpdump with option -xx and then decodes the hexadecimal information. @@ -443,8 +442,8 @@ tcpdump -nne -i %s -l -s 0 'port %d and tcp[((tcp[12:1] & 0xf0) >> 2):4] = 0x47 // that's captured). // tshark would definitely be the better tool here, but that would introduce another dependency. Hence, // decode the hexadecimal information and look for payload that is marked with 'START(.*)EOF$' and extract - // the '(.*)' part. The resulting lines will be `sourceIP + " " + z.group(1)`, hence something like: - // 10.128.2.15.36749 f8f721fa-53c9-444f-bc96-69c7388fcb5a + // the '(.*)' part. The resulting lines will be `"Parsed " + timestamp + " " + sourceIP + " " + z.group(1) + "_" + z.group(2)`, hence something like: + // Parsed 05:38:34.307832 10.128.2.15.36749 f8f721fa-53c9-444f-bc96-69c7388fcb5a_1 udpCaptureScript = `#!/bin/bash cat <<'EOF' > capture-python.py @@ -466,6 +465,7 @@ udpPayloadOffset = 0 # globals fullHex = [] sourceIP = "" +timeStamp = "" def decodePayload(hexArray): payloadStr = "" @@ -482,9 +482,9 @@ def printLine(): global fullHex if sourceIP != "" and fullHex != []: decodedPayload = decodePayload(fullHex) - z = re.search(r'START(.*)EOF$', decodedPayload) + z = re.search(r'START(.*)EOF_(\d+)', decodedPayload) if z: - print(sourceIP + " " + z.group(1)) + print("Parsed " + timeStamp + " " + sourceIP + " " + z.group(1) + "_" + z.group(2)) fullHex = [] sourceIP = "" @@ -497,6 +497,7 @@ for line in sys.stdin: printLine() elif not re.match(r'^$', line): printLine() + timeStamp = line.split()[0] sourceIP = line.split()[sourceIPOffset] printLine() @@ -545,16 +546,6 @@ func createHostNetworkedPacketSnifferDaemonSet(clientset kubernetes.Interface, n }, }, } - readinessProbe := &v1.Probe{ - ProbeHandler: v1.ProbeHandler{ - Exec: &v1.ExecAction{ - Command: []string{ - "echo", - "ready", - }, - }, - }, - } runAsUser := int64(0) securityContext := &v1.SecurityContext{ RunAsUser: &runAsUser, @@ -582,6 +573,12 @@ func createHostNetworkedPacketSnifferDaemonSet(clientset kubernetes.Interface, n Labels: podLabels, }, Spec: corev1.PodSpec{ + Tolerations: []v1.Toleration{ + { + Key: "node-role.kubernetes.io/master", + Effect: corev1.TaintEffectNoSchedule, + }, + }, Affinity: &nodeAffinity, HostNetwork: true, Containers: []v1.Container{ @@ -589,7 +586,6 @@ func createHostNetworkedPacketSnifferDaemonSet(clientset kubernetes.Interface, n Name: "tcpdump", Image: networkPacketSnifferImage, Command: podCommand, - ReadinessProbe: readinessProbe, SecurityContext: securityContext, TTY: true, // needed for immediate log propagation Stdin: true, // needed for immediate log propagation @@ -651,31 +647,33 @@ func scanPacketSnifferDaemonSetPodLogs(oc *exutil.CLI, ds *appsv1.DaemonSet, tar scanner := bufio.NewScanner(buf) for scanner.Scan() { logLine := scanner.Text() - if strings.Contains(logLine, searchString) { - // Currently, it is not necessary to discriminate by protocol. - // a log line should look like this for http: - // 10.0.144.5.33226 /bed729aa-4e83-482d-a433-db798e569147 - // a log line should look like this for udp: - // 10.0.144.5.33226 bed729aa-4e83-482d-a433-db798e569147 - // Should it ever be necessary, the targetProtocol to this method (which is currently - // not used) serves this purpose. - framework.Logf("Found hit in log line: %s", logLine) - logLineExploded := strings.Fields(logLine) - if len(logLineExploded) != 2 { - return nil, fmt.Errorf("Unexpected logline content: %s", logLine) - } - ipAddressPortExploded := strings.Split(logLineExploded[0], ".") - if len(ipAddressPortExploded) == 2 { - // ipv6 - ip = ipAddressPortExploded[0] - } else if len(ipAddressPortExploded) == 5 { - // ipv4 - ip = strings.Join(ipAddressPortExploded[:len(ipAddressPortExploded)-1], ".") - } else { - return nil, fmt.Errorf("Unexpected logline content, invalid IP/Port: %s", logLine) - } - matchedIPs[ip]++ + + if !strings.HasPrefix(logLine, "Parsed") || !strings.Contains(logLine, searchString) { + continue } + // Currently, it is not necessary to discriminate by protocol. + // a log line should look like this for http: + // 10.0.144.5.33226 /bed729aa-4e83-482d-a433-db798e569147 + // a log line should look like this for udp: + // 10.0.144.5.33226 bed729aa-4e83-482d-a433-db798e569147 + // Should it ever be necessary, the targetProtocol to this method (which is currently + // not used) serves this purpose. + framework.Logf("Found hit in log line for node %s: %s", pod.Spec.NodeName, logLine) + logLineExploded := strings.Fields(logLine) + if len(logLineExploded) != 4 { + return nil, fmt.Errorf("Unexpected logline content %s", logLine) + } + ipAddressPortExploded := strings.Split(logLineExploded[2], ".") + if len(ipAddressPortExploded) == 2 { + // ipv6 + ip = ipAddressPortExploded[0] + } else if len(ipAddressPortExploded) == 5 { + // ipv4 + ip = strings.Join(ipAddressPortExploded[:len(ipAddressPortExploded)-1], ".") + } else { + return nil, fmt.Errorf("Unexpected logline content, invalid IP/Port: %s", logLine) + } + matchedIPs[ip]++ } } return matchedIPs, nil @@ -1271,26 +1269,28 @@ func sendProbesToHostPort(oc *exutil.CLI, proberPod *v1.Pod, url, targetProtocol request := fmt.Sprintf("http://%s/dial?protocol=%s&host=%s&port=%d&request=%s", url, targetProtocol, targetHost, targetPort, randomIDStr) var wg sync.WaitGroup errChan := make(chan error, iterations) + for i := 0; i < iterations; i++ { // Make sure that we don“t reuse the i variable when passing it to the go func. - i := i + interval := i // Randomize the start time a little bit per go routine. // Max of 250 ms * current iteration counter - n := rand.Intn(250) * i - framework.Logf("Sleeping for %d ms for iteration %d", n, i) + n := rand.Intn(250) * interval + framework.Logf("Sleeping for %d ms for iteration %d", n, interval) wg.Add(1) go func() { defer wg.Done() time.Sleep(time.Duration(n) * time.Millisecond) - output, err := runOcWithRetry(oc.AsAdmin(), "exec", proberPod.Name, "--", "curl", "--max-time", "15", "-s", request) + output, err := runOcWithRetry(oc.AsAdmin(), "exec", proberPod.Name, "--", "curl", "--max-time", "15", "-s", fmt.Sprintf("%s_%d", request, i)) + framework.Logf("Probed with output: %s", output) // Report errors. if err != nil { errChan <- fmt.Errorf("Query failed. Request: %s, Output: %s, Error: %v", request, output, err) } - return }() } wg.Wait() + close(errChan) // Close the channel after all goroutines finish // If the above yielded any errors, then append them to a list and report them. if len(errChan) > 0 {