@@ -20,6 +20,7 @@ import (
2020 "bytes"
2121 "context"
2222 "crypto/rand"
23+ "crypto/tls"
2324 "encoding/json"
2425 "fmt"
2526 "io"
@@ -31,16 +32,18 @@ import (
3132 "reflect"
3233 "strings"
3334 "sync"
35+ "sync/atomic"
3436 "testing"
3537 "time"
3638
3739 gwebsocket "github.com/gorilla/websocket"
40+ "github.com/stretchr/testify/require"
3841
3942 v1 "k8s.io/api/core/v1"
4043 apierrors "k8s.io/apimachinery/pkg/api/errors"
4144 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
42- "k8s.io/apimachinery/pkg/util/httpstream"
4345 "k8s.io/apimachinery/pkg/util/httpstream/wsstream"
46+ utilnettesting "k8s.io/apimachinery/pkg/util/net/testing"
4447 "k8s.io/apimachinery/pkg/util/remotecommand"
4548 "k8s.io/apimachinery/pkg/util/wait"
4649 "k8s.io/client-go/rest"
@@ -1342,38 +1345,110 @@ func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *opti
13421345 return wsStreams , nil
13431346}
13441347
1345- // See (https://github.com/kubernetes/kubernetes/issues/126134).
1346- func TestWebSocketClient_HTTPSProxyErrorExpected (t * testing.T ) {
1347- urlStr := "http://127.0.0.1/never-used" + "?" + "stdin=true" + "&" + "stdout=true"
1348- websocketLocation , err := url .Parse (urlStr )
1349- if err != nil {
1350- t .Fatalf ("Unable to parse WebSocket server URL: %s" , urlStr )
1351- }
1352- // proxy url with https scheme will trigger websocket dialing error.
1353- httpsProxyFunc := func (req * http.Request ) (* url.URL , error ) { return url .Parse ("https://127.0.0.1" ) }
1354- exec , err := NewWebSocketExecutor (& rest.Config {Host : websocketLocation .Host , Proxy : httpsProxyFunc }, "GET" , urlStr )
1355- if err != nil {
1356- t .Errorf ("unexpected error creating websocket executor: %v" , err )
1357- }
1358- var stdout bytes.Buffer
1359- options := & StreamOptions {
1360- Stdout : & stdout ,
1361- }
1362- errorChan := make (chan error )
1363- go func () {
1364- // Start the streaming on the WebSocket "exec" client.
1365- errorChan <- exec .StreamWithContext (context .Background (), * options )
1366- }()
1348+ func TestWebSocketClient_ProxySucceeds (t * testing.T ) {
1349+ // Validate websocket proxy succeeds for each of the enumerated schemes.
1350+ proxySchemes := []string {"http" , "https" }
1351+ for _ , proxyScheme := range proxySchemes {
1352+ // Create the proxy handler, keeping track of how many times it was called.
1353+ var proxyCalled atomic.Int64
1354+ proxyHandler := utilnettesting .NewHTTPProxyHandler (t , func (req * http.Request ) bool {
1355+ proxyCalled .Add (1 )
1356+ return true
1357+ })
1358+ defer proxyHandler .Wait ()
1359+ // Create/Start the proxy server, adding TLS functionality depending on scheme.
1360+ proxyServer := httptest .NewUnstartedServer (proxyHandler )
1361+ if proxyScheme == "https" {
1362+ cert , err := tls .X509KeyPair (localhostCert , localhostKey )
1363+ if err != nil {
1364+ t .Errorf ("https (valid hostname): proxy_test: %v" , err )
1365+ }
1366+ proxyServer .TLS = & tls.Config {Certificates : []tls.Certificate {cert }}
1367+ proxyServer .StartTLS ()
1368+ } else {
1369+ proxyServer .Start ()
1370+ }
1371+ defer proxyServer .Close () //nolint:errcheck
1372+ proxyLocation , err := url .Parse (proxyServer .URL )
1373+ require .NoError (t , err )
1374+ t .Logf ("Proxy URL: %s" , proxyLocation .String ())
13671375
1368- select {
1369- case <- time .After (wait .ForeverTestTimeout ):
1370- t .Fatalf ("expect stream to be closed after connection is closed." )
1371- case err := <- errorChan :
1372- if err == nil {
1373- t .Errorf ("expected error but received none" )
1376+ // Create fake WebSocket server. Copy received STDIN data back onto STDOUT stream.
1377+ websocketServer := httptest .NewServer (http .HandlerFunc (func (w http.ResponseWriter , req * http.Request ) {
1378+ conns , err := webSocketServerStreams (req , w , streamOptionsFromRequest (req ))
1379+ if err != nil {
1380+ t .Fatalf ("error on webSocketServerStreams: %v" , err )
1381+ }
1382+ defer conns .conn .Close () //nolint:errcheck
1383+ // Loopback the STDIN stream onto the STDOUT stream.
1384+ _ , err = io .Copy (conns .stdoutStream , conns .stdinStream )
1385+ if err != nil {
1386+ t .Fatalf ("error copying STDIN to STDOUT: %v" , err )
1387+ }
1388+ }))
1389+ defer websocketServer .Close () //nolint:errcheck
1390+
1391+ // Now create the WebSocket client (executor), and point it to the TLS proxy server.
1392+ // The proxy server should open a websocket connection to the fake websocket server.
1393+ websocketServer .URL = websocketServer .URL + "?" + "stdin=true" + "&" + "stdout=true"
1394+ websocketLocation , err := url .Parse (websocketServer .URL )
1395+ require .NoError (t , err )
1396+ clientConfig := & rest.Config {
1397+ Host : websocketLocation .Host ,
1398+ // Unused if "http" scheme.
1399+ TLSClientConfig : rest.TLSClientConfig {CAData : localhostCert },
1400+ Proxy : func (req * http.Request ) (* url.URL , error ) {
1401+ return proxyLocation , nil
1402+ },
1403+ }
1404+ exec , err := NewWebSocketExecutor (clientConfig , "GET" , websocketServer .URL )
1405+ require .NoError (t , err )
1406+
1407+ // Generate random data, and set it up to stream on STDIN. The data will be
1408+ // returned on the STDOUT buffer.
1409+ randomSize := 1024 * 1024
1410+ randomData := make ([]byte , randomSize )
1411+ if _ , err := rand .Read (randomData ); err != nil {
1412+ t .Errorf ("unexpected error reading random data: %v" , err )
1413+ }
1414+ var stdout bytes.Buffer
1415+ options := & StreamOptions {
1416+ Stdin : bytes .NewReader (randomData ),
1417+ Stdout : & stdout ,
1418+ }
1419+ errorChan := make (chan error )
1420+ go func () {
1421+ // Start the streaming on the WebSocket "exec" client.
1422+ errorChan <- exec .StreamWithContext (context .Background (), * options )
1423+ }()
1424+
1425+ select {
1426+ case <- time .After (wait .ForeverTestTimeout ):
1427+ t .Fatalf ("expect stream to be closed after connection is closed." )
1428+ case err := <- errorChan :
1429+ if err != nil {
1430+ t .Fatalf ("unexpected error: %v" , err )
1431+ }
1432+ // Validate remote command v5 protocol was negotiated.
1433+ streamExec := exec .(* wsStreamExecutor )
1434+ if remotecommand .StreamProtocolV5Name != streamExec .negotiated {
1435+ t .Fatalf ("expected remote command v5 protocol, got (%s)" , streamExec .negotiated )
1436+ }
1437+ }
1438+ data , err := io .ReadAll (bytes .NewReader (stdout .Bytes ()))
1439+ if err != nil {
1440+ t .Fatalf ("error reading the stream: %v" , err )
1441+ }
1442+ // Check the random data sent on STDIN was the same returned on STDOUT.
1443+ t .Logf ("comparing %d random bytes sent data versus received" , len (randomData ))
1444+ if ! bytes .Equal (randomData , data ) {
1445+ t .Errorf ("unexpected data received: %d sent: %d" , len (data ), len (randomData ))
1446+ } else {
1447+ t .Log ("success--random bytes are the same" )
13741448 }
1375- if ! httpstream .IsHTTPSProxyError (err ) {
1376- t .Errorf ("expected https proxy error, got (%s)" , err )
1449+ // Ensure the proxy was called once
1450+ if e , a := int64 (1 ), proxyCalled .Load (); e != a {
1451+ t .Errorf ("expected %d proxy call, got %d" , e , a )
13771452 }
13781453 }
13791454}
0 commit comments