From 9db2b5b2e70d7d1d01eedb4192cf5e31049d3463 Mon Sep 17 00:00:00 2001 From: Tim Ross Date: Fri, 21 Nov 2025 14:31:15 -0500 Subject: [PATCH] Prevent flakiness in TestKube/Join The test was reliant on all the output of a session being replicated to all parties before a 5s timer closed the session. To make the test more resiliant it was rewritten such that it validates the expected output from one of the parties output, then exits the session, then when all parties have been closed, it examines the entire output of the other parties to validate their output. Fixes https://github.com/gravitational/teleport/issues/55044 --- integration/kube_integration_test.go | 68 ++++++++++++++++++++++------ 1 file changed, 55 insertions(+), 13 deletions(-) diff --git a/integration/kube_integration_test.go b/integration/kube_integration_test.go index db5958548286d..3e9a2ba8f1320 100644 --- a/integration/kube_integration_test.go +++ b/integration/kube_integration_test.go @@ -2530,31 +2530,73 @@ func testKubeJoin(t *testing.T, suite *KubeSuite) { // send a test message from the participant participantStdinW.Write([]byte("\ahi from peer\n\r")) - // lets type "echo hi" followed by "enter" and then "exit" + "enter": + // type "hi from term" followed by "enter" to broadcast data + // to all participants. term.Type("\ahi from term\n\r") - // Terminate the session after a moment to allow for the IO to reach the second client. - time.AfterFunc(5*time.Second, func() { - // send exit command to close the session - term.Type("exit 0\n\r\a") - }) + // validate that the output from both messages above is + // written to the participant stdout in the expected order. + require.NoError(t, waitForOutput(t.Context(), participantStdoutR, "hi from peer")) + require.NoError(t, waitForOutput(t.Context(), participantStdoutR, "hi from term")) + + // send exit command to close the session + term.Type("exit 0\n\r\a") // wait for all clients to finish require.NoError(t, group.Wait()) - // Verify peer. - participantOutput, err := io.ReadAll(participantStdoutR) - require.NoError(t, err) - require.Contains(t, string(participantOutput), "hi from term") - // Verify original session. require.Contains(t, out.String(), "hi from peer") // Verify observers. for _, capture := range observerCaptures { - assert.Contains(t, capture.String(), "hi from peer") - assert.Contains(t, capture.String(), "hi from term") + output := capture.String() + assert.Contains(t, output, "hi from peer") + assert.Contains(t, output, "hi from term") + } +} + +func waitForOutput(ctx context.Context, r ReaderWithDeadline, expected string) error { + var prev string + out := make([]byte, int64(len(expected)*3)) + for { + select { + case <-ctx.Done(): + return trace.BadParameter("timeout waiting on terminal for output: %v", expected) + default: + } + + // Forward the context deadline to the read deadline. + if deadline, ok := ctx.Deadline(); ok { + if err := r.SetReadDeadline(deadline); err != nil { + return trace.Wrap(err) + } + } + n, err := r.Read(out) + outStr := removeSpace(string(out[:n])) + + // Check for [expected] before checking the error, + // as it's valid for n > 0 even when there is an error. + // The [expected] is checked against the current and previous + // output to account for scenarios where the [expected] is split + // across two reads. While we try to prevent this by reading + // twice the length of [expected] there are no guarantees the + // whole thing will arrive in a single read. + if n > 0 && strings.Contains(prev+outStr, expected) { + return nil + } + if err != nil { + return trace.Wrap(err) + } + prev = outStr + } +} + +func removeSpace(in string) string { + for _, c := range []string{"\n", "\r", "\t"} { + in = strings.ReplaceAll(in, c, " ") } + return strings.TrimSpace(in) } // testKubeJoinWeb tests that joining an interactive exec session