Skip to content

Commit b933c46

Browse files
authored
Merge pull request kubernetes#133193 from aojea/deflake_TestStreamTranslator_WebSocketServerErrors
Deflake test stream translator web socket server errors
2 parents f142852 + f07dcd4 commit b933c46

File tree

2 files changed

+71
-63
lines changed

2 files changed

+71
-63
lines changed

staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,14 +73,14 @@ func (h *StreamTranslatorHandler) ServeHTTP(w http.ResponseWriter, req *http.Req
7373
// Creating SPDY executor, ensuring redirects are not followed.
7474
spdyRoundTripper, err := spdy.NewRoundTripperWithConfig(spdy.RoundTripperConfig{UpgradeTransport: h.Transport, PingPeriod: 5 * time.Second})
7575
if err != nil {
76-
websocketStreams.writeStatus(apierrors.NewInternalError(err)) //nolint:errcheck
7776
metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusInternalServerError))
77+
websocketStreams.writeStatus(apierrors.NewInternalError(err)) //nolint:errcheck
7878
return
7979
}
8080
spdyExecutor, err := remotecommand.NewSPDYExecutorRejectRedirects(spdyRoundTripper, spdyRoundTripper, "POST", h.Location)
8181
if err != nil {
82-
websocketStreams.writeStatus(apierrors.NewInternalError(err)) //nolint:errcheck
8382
metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusInternalServerError))
83+
websocketStreams.writeStatus(apierrors.NewInternalError(err)) //nolint:errcheck
8484
return
8585
}
8686

@@ -121,27 +121,27 @@ func (h *StreamTranslatorHandler) ServeHTTP(w http.ResponseWriter, req *http.Req
121121
if err != nil {
122122
//nolint:errcheck // Ignore writeStatus returned error
123123
if statusErr, ok := err.(*apierrors.StatusError); ok {
124-
websocketStreams.writeStatus(statusErr)
125124
// Increment status code returned within status error.
126125
metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(int(statusErr.Status().Code)))
126+
websocketStreams.writeStatus(statusErr)
127127
} else if exitErr, ok := err.(exec.CodeExitError); ok && exitErr.Exited() {
128-
websocketStreams.writeStatus(codeExitToStatusError(exitErr))
129128
// Returned an exit code from the container, so not an error in
130129
// stream translator--add StatusOK to metrics.
131130
metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusOK))
131+
websocketStreams.writeStatus(codeExitToStatusError(exitErr))
132132
} else {
133-
websocketStreams.writeStatus(apierrors.NewInternalError(err))
134133
metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusInternalServerError))
134+
websocketStreams.writeStatus(apierrors.NewInternalError(err))
135135
}
136136
return
137137
}
138138

139+
metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusOK))
139140
// Write the success status back to the WebSocket client.
140141
//nolint:errcheck
141142
websocketStreams.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{
142143
Status: metav1.StatusSuccess,
143144
}})
144-
metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusOK))
145145
}
146146

147147
// translatorSizeQueue feeds the size events from the WebSocket

staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator_test.go

Lines changed: 65 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -755,90 +755,98 @@ func TestStreamTranslator_WebSocketServerErrors(t *testing.T) {
755755
t.Errorf("expected websocket bad handshake error, got (%s)", err)
756756
}
757757
}
758-
// Validate the streamtranslator metrics; should have one 500 failure.
758+
// Validate the streamtranslator metrics; should have one 400 failure.
759+
// Use polling to wait for the metric to be updated asynchronously.
759760
metricNames := []string{"apiserver_stream_translator_requests_total"}
760761
expected := `
761762
# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5
762763
# TYPE apiserver_stream_translator_requests_total counter
763764
apiserver_stream_translator_requests_total{code="400"} 1
764765
`
765-
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil {
766-
t.Fatal(err)
766+
if err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 2*time.Second, true, func(ctx context.Context) (bool, error) {
767+
if testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...) == nil {
768+
return true, nil
769+
}
770+
return false, nil
771+
}); err != nil {
772+
t.Fatalf("Failed to observe metric after waiting 2 seconds: %v", err)
767773
}
768774
}
769775

770776
// TestStreamTranslator_BlockRedirects verifies that the StreamTranslator will *not* follow
771777
// redirects; it will thrown an error instead.
772778
func TestStreamTranslator_BlockRedirects(t *testing.T) {
773779
metrics.Register()
774-
metrics.ResetForTest()
775780
t.Cleanup(metrics.ResetForTest)
781+
776782
for _, statusCode := range []int{
777783
http.StatusMovedPermanently, // 301
778784
http.StatusFound, // 302
779785
http.StatusSeeOther, // 303
780786
http.StatusTemporaryRedirect, // 307
781787
http.StatusPermanentRedirect, // 308
782788
} {
783-
// Create upstream fake SPDY server which returns a redirect.
784-
spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
785-
w.Header().Set("Location", "/")
786-
w.WriteHeader(statusCode)
787-
}))
788-
defer spdyServer.Close()
789-
spdyLocation, err := url.Parse(spdyServer.URL)
790-
if err != nil {
791-
t.Fatalf("Unable to parse spdy server URL: %s", spdyServer.URL)
792-
}
793-
spdyTransport, err := fakeTransport()
794-
if err != nil {
795-
t.Fatalf("Unexpected error creating transport: %v", err)
796-
}
797-
streams := Options{Stdout: true}
798-
streamTranslator := NewStreamTranslatorHandler(spdyLocation, spdyTransport, 0, streams)
799-
streamTranslatorServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
800-
streamTranslator.ServeHTTP(w, req)
801-
}))
802-
defer streamTranslatorServer.Close()
803-
// Now create the websocket client (executor), and point it to the "streamTranslatorServer".
804-
streamTranslatorLocation, err := url.Parse(streamTranslatorServer.URL)
805-
if err != nil {
806-
t.Fatalf("Unable to parse StreamTranslator server URL: %s", streamTranslatorServer.URL)
807-
}
808-
exec, err := remotecommand.NewWebSocketExecutor(&rest.Config{Host: streamTranslatorLocation.Host}, "GET", streamTranslatorServer.URL)
809-
if err != nil {
810-
t.Errorf("unexpected error creating websocket executor: %v", err)
811-
}
812-
errorChan := make(chan error)
813-
go func() {
814-
// Start the streaming on the WebSocket "exec" client.
815-
// Should return "redirect not allowed" error.
816-
errorChan <- exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{})
817-
}()
818-
819-
select {
820-
case <-time.After(wait.ForeverTestTimeout):
821-
t.Fatalf("expect stream to be closed after connection is closed.")
822-
case err := <-errorChan:
823-
// Must return "redirect now allowed" error.
824-
if err == nil {
825-
t.Fatalf("expected error, but received none")
789+
t.Run(fmt.Sprintf("statusCode=%d", statusCode), func(t *testing.T) {
790+
metrics.ResetForTest()
791+
// Create upstream fake SPDY server which returns a redirect.
792+
spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
793+
w.Header().Set("Location", "/")
794+
w.WriteHeader(statusCode)
795+
}))
796+
defer spdyServer.Close()
797+
spdyLocation, err := url.Parse(spdyServer.URL)
798+
if err != nil {
799+
t.Fatalf("Unable to parse spdy server URL: %s", spdyServer.URL)
826800
}
827-
if !strings.Contains(err.Error(), "redirect not allowed") {
828-
t.Errorf("expected redirect not allowed error, got (%s)", err)
801+
spdyTransport, err := fakeTransport()
802+
if err != nil {
803+
t.Fatalf("Unexpected error creating transport: %v", err)
829804
}
830-
}
831-
// Validate the streamtranslator metrics; should have one 500 failure each loop.
832-
metricNames := []string{"apiserver_stream_translator_requests_total"}
833-
expected := `
805+
streams := Options{Stdout: true}
806+
streamTranslator := NewStreamTranslatorHandler(spdyLocation, spdyTransport, 0, streams)
807+
streamTranslatorServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
808+
streamTranslator.ServeHTTP(w, req)
809+
}))
810+
defer streamTranslatorServer.Close()
811+
// Now create the websocket client (executor), and point it to the "streamTranslatorServer".
812+
streamTranslatorLocation, err := url.Parse(streamTranslatorServer.URL)
813+
if err != nil {
814+
t.Fatalf("Unable to parse StreamTranslator server URL: %s", streamTranslatorServer.URL)
815+
}
816+
exec, err := remotecommand.NewWebSocketExecutor(&rest.Config{Host: streamTranslatorLocation.Host}, "GET", streamTranslatorServer.URL)
817+
if err != nil {
818+
t.Errorf("unexpected error creating websocket executor: %v", err)
819+
}
820+
errorChan := make(chan error)
821+
go func() {
822+
// Start the streaming on the WebSocket "exec" client.
823+
// Should return "redirect not allowed" error.
824+
errorChan <- exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{})
825+
}()
826+
827+
select {
828+
case <-time.After(wait.ForeverTestTimeout):
829+
t.Fatalf("expect stream to be closed after connection is closed.")
830+
case err := <-errorChan:
831+
// Must return "redirect now allowed" error.
832+
if err == nil {
833+
t.Fatalf("expected error, but received none")
834+
}
835+
if !strings.Contains(err.Error(), "redirect not allowed") {
836+
t.Errorf("expected redirect not allowed error, got (%s)", err)
837+
}
838+
}
839+
// Validate the streamtranslator metrics; should have one 500 failure each loop.
840+
metricNames := []string{"apiserver_stream_translator_requests_total"}
841+
expected := `
834842
# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5
835843
# TYPE apiserver_stream_translator_requests_total counter
836844
apiserver_stream_translator_requests_total{code="500"} 1
837845
`
838-
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil {
839-
t.Fatal(err)
840-
}
841-
metrics.ResetForTest() // Clear metrics each loop
846+
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil {
847+
t.Fatal(err)
848+
}
849+
})
842850
}
843851
}
844852

0 commit comments

Comments
 (0)