diff --git a/integration/kube_integration_test.go b/integration/kube_integration_test.go index db5958548286d..3e9a2ba8f1320 100644 --- a/integration/kube_integration_test.go +++ b/integration/kube_integration_test.go @@ -2530,31 +2530,73 @@ func testKubeJoin(t *testing.T, suite *KubeSuite) { // send a test message from the participant participantStdinW.Write([]byte("\ahi from peer\n\r")) - // lets type "echo hi" followed by "enter" and then "exit" + "enter": + // type "hi from term" followed by "enter" to broadcast data + // to all participants. term.Type("\ahi from term\n\r") - // Terminate the session after a moment to allow for the IO to reach the second client. - time.AfterFunc(5*time.Second, func() { - // send exit command to close the session - term.Type("exit 0\n\r\a") - }) + // validate that the output from both messages above is + // written to the participant stdout in the expected order. + require.NoError(t, waitForOutput(t.Context(), participantStdoutR, "hi from peer")) + require.NoError(t, waitForOutput(t.Context(), participantStdoutR, "hi from term")) + + // send exit command to close the session + term.Type("exit 0\n\r\a") // wait for all clients to finish require.NoError(t, group.Wait()) - // Verify peer. - participantOutput, err := io.ReadAll(participantStdoutR) - require.NoError(t, err) - require.Contains(t, string(participantOutput), "hi from term") - // Verify original session. require.Contains(t, out.String(), "hi from peer") // Verify observers. for _, capture := range observerCaptures { - assert.Contains(t, capture.String(), "hi from peer") - assert.Contains(t, capture.String(), "hi from term") + output := capture.String() + assert.Contains(t, output, "hi from peer") + assert.Contains(t, output, "hi from term") + } +} + +func waitForOutput(ctx context.Context, r ReaderWithDeadline, expected string) error { + var prev string + out := make([]byte, int64(len(expected)*3)) + for { + select { + case <-ctx.Done(): + return trace.BadParameter("timeout waiting on terminal for output: %v", expected) + default: + } + + // Forward the context deadline to the read deadline. + if deadline, ok := ctx.Deadline(); ok { + if err := r.SetReadDeadline(deadline); err != nil { + return trace.Wrap(err) + } + } + n, err := r.Read(out) + outStr := removeSpace(string(out[:n])) + + // Check for [expected] before checking the error, + // as it's valid for n > 0 even when there is an error. + // The [expected] is checked against the current and previous + // output to account for scenarios where the [expected] is split + // across two reads. While we try to prevent this by reading + // twice the length of [expected] there are no guarantees the + // whole thing will arrive in a single read. + if n > 0 && strings.Contains(prev+outStr, expected) { + return nil + } + if err != nil { + return trace.Wrap(err) + } + prev = outStr + } +} + +func removeSpace(in string) string { + for _, c := range []string{"\n", "\r", "\t"} { + in = strings.ReplaceAll(in, c, " ") } + return strings.TrimSpace(in) } // testKubeJoinWeb tests that joining an interactive exec session