From aafda4e48313f8b57c1dcb24b7bfa713ff1d3d27 Mon Sep 17 00:00:00 2001 From: Zac Bergquist Date: Fri, 11 Nov 2022 14:28:13 -0700 Subject: [PATCH] Deflake TestResizeTerminal This test made separate reads, each waiting for some conditions to be met. Due to timing, it was possible that the events expected in the second read got included in the first batch of reads and discarded. Fix this by reading in a single batch and waiting for all required conditions to be met. Closes #13607 --- lib/web/apiserver_test.go | 268 ++++++++++++++------------------------ 1 file changed, 95 insertions(+), 173 deletions(-) diff --git a/lib/web/apiserver_test.go b/lib/web/apiserver_test.go index 1a1ee4f1a38a5..3130fb2ef3dfa 100644 --- a/lib/web/apiserver_test.go +++ b/lib/web/apiserver_test.go @@ -1141,44 +1141,92 @@ func TestResizeTerminal(t *testing.T) { s := newWebSuite(t) sid := session.NewID() - // Create a new user "foo", open a terminal to a new session, and wait for - // it to be ready. + errs := make(chan error, 2) + readLoop := func(ctx context.Context, ws *websocket.Conn, ch chan<- *Envelope) { + for { + select { + case <-ctx.Done(): + return + default: + } + + typ, b, err := ws.ReadMessage() + if err != nil { + errs <- err + return + } + if typ != websocket.BinaryMessage { + errs <- trace.BadParameter("expected binary message, got %v", typ) + return + } + var envelope Envelope + if err := proto.Unmarshal(b, &envelope); err != nil { + errs <- trace.Wrap(err) + return + } + ch <- &envelope + } + } + + // Create a new user "foo", open a terminal to a new session pack1 := s.authPack(t, "foo") ws1, err := s.makeTerminal(t, pack1, withSessionID(sid)) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, ws1.Close()) }) - err = s.waitForRawEvent(ws1, 5*time.Second) - require.NoError(t, err) - // Create a new user "bar", open a terminal to the session created above, - // and wait for it to be ready. + // Create a new user "bar", open a terminal to the session created above pack2 := s.authPack(t, "bar") ws2, err := s.makeTerminal(t, pack2, withSessionID(sid)) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, ws2.Close()) }) - err = s.waitForRawEvent(ws2, 5*time.Second) - require.NoError(t, err) - // Look at the audit events for the first terminal. It should have two - // resize events from the second terminal (80x25 default then 100x100). Only - // the second terminal will get these because resize events are not sent - // back to the originator. - err = s.waitForResizeEvent(ws1, 5*time.Second) - require.NoError(t, err) - err = s.waitForResizeEvent(ws1, 5*time.Second) - require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + ws1Messages := make(chan *Envelope) + ws2Messages := make(chan *Envelope) + go readLoop(ctx, ws1, ws1Messages) + go readLoop(ctx, ws2, ws2Messages) + + // consume events from the first terminal + // we exect to see at least one raw event with PTY data (indicating terminal ready) + // and 2 resize events from the second user joining the session (one for the default + // size, and one for the manual resize request) + done := time.After(10 * time.Second) + t1ResizeEvents, t1RawEvents := 0, 0 +t1ready: + for { + select { + case <-done: + require.FailNow(t, "expected to receive 2 resize events (got %d) and at least 1 raw event (got %d)", t1ResizeEvents, t1RawEvents) + case err := <-errs: + require.NoError(t, err) + case e := <-ws1Messages: + if isResizeEventEnvelope(e) { + t1ResizeEvents++ + } + if e.GetType() == defaults.WebsocketRaw { + t1RawEvents++ + } + if t1ResizeEvents == 2 && t1RawEvents > 0 { + break t1ready + } + } + } - // Look at the stream events for the second terminal. We don't expect to see - // any resize events yet. It will timeout. - ws2Event := s.listenForResizeEvent(ws2) + // we should not expect to see a resize event on terminal 2, + // since they are not broadcasted back to the originator select { - case <-ws2Event: - t.Fatal("unexpected resize event") - case <-time.After(time.Second): + case e := <-ws2Messages: + if isResizeEventEnvelope(e) { + require.FailNow(t, "terminal 2 chould not have received a resize event") + } + case err := <-errs: + require.NoError(t, err) + case <-time.After(1 * time.Second): } - // Resize the second terminal. This should be reflected on the first terminal - // because resize events are not sent back to the originator. + // Resize the second terminal. This should be reflected only on the first terminal + // because resize events are sent to participants but not the originator.. params, err := session.NewTerminalParamsFromInt(300, 120) require.NoError(t, err) data, err := json.Marshal(events.EventFields{ @@ -1198,16 +1246,31 @@ func TestResizeTerminal(t *testing.T) { err = ws2.WriteMessage(websocket.BinaryMessage, envelopeBytes) require.NoError(t, err) - // This time the first terminal will see the resize event. - err = s.waitForResizeEvent(ws1, 5*time.Second) - require.NoError(t, err) + // the first terminal should see the resize event + done = time.After(5 * time.Second) + for { + select { + case <-done: + require.FailNow(t, "expected to receive a final resize event") + case err := <-errs: + require.NoError(t, err) + case e := <-ws1Messages: + if isResizeEventEnvelope(e) { + return + } + } + } +} - // The second terminal will not see any resize event. It will timeout. - select { - case <-ws2Event: - t.Fatal("unexpected resize event") - case <-time.After(time.Second): +func isResizeEventEnvelope(e *Envelope) bool { + if e.GetType() != defaults.WebsocketAudit { + return false + } + var ef events.EventFields + if err := json.Unmarshal([]byte(e.GetPayload()), &ef); err != nil { + return false } + return ef.GetType() == events.ResizeEvent } // TestTerminalPing tests that the server sends continuous ping control messages. @@ -4611,147 +4674,6 @@ func waitForOutput(stream *terminalStream, substr string) error { } } -func (s *WebSuite) waitForRawEvent(ws *websocket.Conn, timeout time.Duration) error { - timeoutContext, timeoutCancel := context.WithTimeout(s.ctx, timeout) - defer timeoutCancel() - - done := make(chan error, 1) - - go func() { - for { - ty, raw, err := ws.ReadMessage() - if err != nil { - done <- trace.Wrap(err) - return - } - - if ty != websocket.BinaryMessage { - done <- trace.BadParameter("expected binary message, got %v", ty) - return - } - - var envelope Envelope - err = proto.Unmarshal(raw, &envelope) - if err != nil { - done <- trace.Wrap(err) - return - } - - if envelope.GetType() == defaults.WebsocketRaw { - done <- nil - return - } - } - }() - - for { - select { - case <-timeoutContext.Done(): - return trace.BadParameter("timeout waiting for raw event") - case err := <-done: - return trace.Wrap(err) - } - } -} - -func (s *WebSuite) waitForResizeEvent(ws *websocket.Conn, timeout time.Duration) error { - timeoutContext, timeoutCancel := context.WithTimeout(s.ctx, timeout) - defer timeoutCancel() - - done := make(chan error, 1) - - go func() { - for { - ty, raw, err := ws.ReadMessage() - if err != nil { - done <- trace.Wrap(err) - return - } - - if ty != websocket.BinaryMessage { - done <- trace.BadParameter("expected binary message, got %v", ty) - return - } - - var envelope Envelope - err = proto.Unmarshal(raw, &envelope) - if err != nil { - done <- trace.Wrap(err) - return - } - - if envelope.GetType() != defaults.WebsocketAudit { - continue - } - - var e events.EventFields - err = json.Unmarshal([]byte(envelope.GetPayload()), &e) - if err != nil { - done <- trace.Wrap(err) - return - } - - if e.GetType() == events.ResizeEvent { - done <- nil - return - } - } - }() - - for { - select { - case <-timeoutContext.Done(): - return trace.BadParameter("timeout waiting for resize event") - case err := <-done: - return trace.Wrap(err) - } - } -} - -func (s *WebSuite) listenForResizeEvent(ws *websocket.Conn) chan struct{} { - ch := make(chan struct{}) - - go func() { - for { - ty, raw, err := ws.ReadMessage() - if err != nil { - close(ch) - return - } - - if ty != websocket.BinaryMessage { - close(ch) - return - } - - var envelope Envelope - err = proto.Unmarshal(raw, &envelope) - if err != nil { - close(ch) - return - } - - if envelope.GetType() != defaults.WebsocketAudit { - continue - } - - var e events.EventFields - err = json.Unmarshal([]byte(envelope.GetPayload()), &e) - if err != nil { - close(ch) - return - } - - if e.GetType() == events.ResizeEvent { - ch <- struct{}{} - return - } - } - }() - - return ch -} - func (s *WebSuite) clientNoRedirects(opts ...roundtrip.ClientParam) *client.WebClient { hclient := client.NewInsecureWebClient() hclient.CheckRedirect = func(req *http.Request, via []*http.Request) error {