@@ -25,6 +25,7 @@ import (
2525 "fmt"
2626 "io"
2727 "net"
28+ "net/http"
2829 "os/exec"
2930 "regexp"
3031 "strconv"
@@ -35,6 +36,9 @@ import (
3536 "golang.org/x/net/websocket"
3637 v1 "k8s.io/api/core/v1"
3738 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
39+ "k8s.io/apimachinery/pkg/util/intstr"
40+ utilnet "k8s.io/apimachinery/pkg/util/net"
41+ "k8s.io/apimachinery/pkg/util/rand"
3842 "k8s.io/apimachinery/pkg/util/wait"
3943 "k8s.io/kubernetes/test/e2e/framework"
4044 e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
@@ -123,6 +127,71 @@ func pfPod(expectedClientData, chunks, chunkSize, chunkIntervalMillis string, bi
123127 }
124128}
125129
130+ func pfNeverReadRequestBodyPod () * v1.Pod {
131+ return & v1.Pod {
132+ ObjectMeta : metav1.ObjectMeta {
133+ Name : "issue-74551" ,
134+ },
135+ Spec : v1.PodSpec {
136+ RestartPolicy : v1 .RestartPolicyNever ,
137+ Containers : []v1.Container {
138+ {
139+ Name : "server" ,
140+ Image : imageutils .GetE2EImage (imageutils .Agnhost ),
141+ Args : []string {
142+ "netexec" ,
143+ "--http-port=80" ,
144+ },
145+ ReadinessProbe : & v1.Probe {
146+ ProbeHandler : v1.ProbeHandler {
147+ HTTPGet : & v1.HTTPGetAction {
148+ Path : "/healthz" ,
149+ Port : intstr.IntOrString {
150+ IntVal : int32 (80 ),
151+ },
152+ Scheme : v1 .URISchemeHTTP ,
153+ },
154+ },
155+ InitialDelaySeconds : 5 ,
156+ TimeoutSeconds : 60 ,
157+ PeriodSeconds : 1 ,
158+ },
159+ },
160+ },
161+ },
162+ }
163+ }
164+
165+ func testWebServerPod () * v1.Pod {
166+ return & v1.Pod {
167+ ObjectMeta : metav1.ObjectMeta {
168+ Name : podName ,
169+ Labels : map [string ]string {"name" : podName },
170+ },
171+ Spec : v1.PodSpec {
172+ Containers : []v1.Container {
173+ {
174+ Name : "testwebserver" ,
175+ Image : imageutils .GetE2EImage (imageutils .Agnhost ),
176+ Args : []string {"test-webserver" },
177+ Ports : []v1.ContainerPort {{ContainerPort : int32 (80 )}},
178+ ReadinessProbe : & v1.Probe {
179+ ProbeHandler : v1.ProbeHandler {
180+ HTTPGet : & v1.HTTPGetAction {
181+ Path : "/" ,
182+ Port : intstr .FromInt32 (int32 (80 )),
183+ },
184+ },
185+ InitialDelaySeconds : 5 ,
186+ TimeoutSeconds : 3 ,
187+ FailureThreshold : 10 ,
188+ },
189+ },
190+ },
191+ },
192+ }
193+ }
194+
126195// WaitForTerminatedContainer waits till a given container be terminated for a given pod.
127196func WaitForTerminatedContainer (ctx context.Context , f * framework.Framework , pod * v1.Pod , containerName string ) error {
128197 return e2epod .WaitForPodCondition (ctx , f .ClientSet , f .Namespace .Name , pod .Name , "container terminated" , framework .PodStartTimeout , func (pod * v1.Pod ) (bool , error ) {
@@ -493,6 +562,110 @@ var _ = SIGDescribe("Kubectl Port forwarding", func() {
493562 doTestOverWebSockets (ctx , "localhost" , f )
494563 })
495564 })
565+
566+ ginkgo .Describe ("with a pod being removed" , func () {
567+ ginkgo .It ("should stop port-forwarding" , func (ctx context.Context ) {
568+ ginkgo .By ("Creating the target pod" )
569+ pod := pfNeverReadRequestBodyPod ()
570+ _ , err := f .ClientSet .CoreV1 ().Pods (f .Namespace .Name ).Create (ctx , pod , metav1.CreateOptions {})
571+ framework .ExpectNoError (err , "couldn't create pod" )
572+
573+ err = e2epod .WaitTimeoutForPodReadyInNamespace (ctx , f .ClientSet , pod .Name , f .Namespace .Name , framework .PodStartTimeout )
574+ framework .ExpectNoError (err , "pod did not start running" )
575+
576+ ginkgo .By ("Running 'kubectl port-forward'" )
577+ cmd := runPortForward (f .Namespace .Name , pod .Name , 80 )
578+ defer cmd .Stop ()
579+
580+ ginkgo .By ("Running port-forward client" )
581+ reqChan := make (chan bool )
582+ errorChan := make (chan error )
583+ go func () {
584+ defer ginkgo .GinkgoRecover ()
585+
586+ // try to mock a big request, which should take some time
587+ for sentBodySize := 0 ; sentBodySize < 1024 * 1024 * 1024 ; {
588+ size := rand .Intn (4 * 1024 * 1024 )
589+ url := fmt .Sprintf ("http://localhost:%d/header" , cmd .port )
590+ _ , err := post (url , strings .NewReader (strings .Repeat ("x" , size )), nil )
591+ if err != nil {
592+ errorChan <- err
593+ }
594+ ginkgo .By (fmt .Sprintf ("Sent %d chunk of data" , sentBodySize ))
595+ if sentBodySize == 0 {
596+ close (reqChan )
597+ }
598+ sentBodySize += size
599+ }
600+ }()
601+
602+ ginkgo .By ("Remove the forwarded pod after the first client request" )
603+ <- reqChan
604+ e2epod .DeletePodOrFail (ctx , f .ClientSet , f .Namespace .Name , pod .Name )
605+
606+ ginkgo .By ("Wait for client being interrupted" )
607+ select {
608+ case err = <- errorChan :
609+ case <- time .After (e2epod .DefaultPodDeletionTimeout ):
610+ }
611+
612+ ginkgo .By ("Check the client error" )
613+ gomega .Expect (err ).To (gomega .HaveOccurred ())
614+ gomega .Expect (err .Error ()).To (gomega .Or (gomega .ContainSubstring ("connection reset by peer" ), gomega .ContainSubstring ("EOF" )))
615+
616+ ginkgo .By ("Check kubectl port-forward exit code" )
617+ gomega .Expect (cmd .cmd .ProcessState .ExitCode ()).To (gomega .BeNumerically ("<" , 0 ), "kubectl port-forward should finish with non-zero exit code" )
618+ })
619+ })
620+
621+ ginkgo .Describe ("Shutdown client connection while the remote stream is writing data to the port-forward connection" , func () {
622+ ginkgo .It ("port-forward should keep working after detect broken connection" , func (ctx context.Context ) {
623+ ginkgo .By ("Creating the target pod" )
624+ pod := testWebServerPod ()
625+ _ , err := f .ClientSet .CoreV1 ().Pods (f .Namespace .Name ).Create (ctx , pod , metav1.CreateOptions {})
626+ framework .ExpectNoError (err , "couldn't create pod" )
627+
628+ err = e2epod .WaitTimeoutForPodReadyInNamespace (ctx , f .ClientSet , pod .Name , f .Namespace .Name , framework .PodStartTimeout )
629+ framework .ExpectNoError (err , "pod did not start running" )
630+
631+ ginkgo .By ("Running 'kubectl port-forward'" )
632+ cmd := runPortForward (f .Namespace .Name , pod .Name , 80 )
633+ defer cmd .Stop ()
634+
635+ ginkgo .By ("Send a http request to verify port-forward working" )
636+ client := http.Client {
637+ Timeout : 10 * time .Second ,
638+ }
639+ resp , err := client .Get (fmt .Sprintf ("http://127.0.0.1:%d/" , cmd .port ))
640+ framework .ExpectNoError (err , "couldn't get http response from port-forward" )
641+ gomega .Expect (resp .StatusCode ).To (gomega .Equal (http .StatusOK ), "unexpected status code" )
642+
643+ ginkgo .By ("Dialing the local port" )
644+ conn , err := net .Dial ("tcp" , fmt .Sprintf ("127.0.0.1:%d" , cmd .port ))
645+ framework .ExpectNoError (err , "couldn't connect to port %d" , cmd .port )
646+
647+ // use raw tcp connection to emulate client close connection without reading response
648+ ginkgo .By ("Request agnhost binary file (40MB+)" )
649+ requestLines := []string {"GET /agnhost HTTP/1.1" , "Host: localhost" , "" }
650+ for _ , line := range requestLines {
651+ _ , err := conn .Write (append ([]byte (line ), []byte ("\r \n " )... ))
652+ framework .ExpectNoError (err , "couldn't write http request to local connection" )
653+ }
654+
655+ ginkgo .By ("Read only one byte from the connection" )
656+ _ , err = conn .Read (make ([]byte , 1 ))
657+ framework .ExpectNoError (err , "couldn't read from the local connection" )
658+
659+ ginkgo .By ("Close client connection without reading remain data" )
660+ err = conn .Close ()
661+ framework .ExpectNoError (err , "couldn't close local connection" )
662+
663+ ginkgo .By ("Send another http request through port-forward again" )
664+ resp , err = client .Get (fmt .Sprintf ("http://127.0.0.1:%d/" , cmd .port ))
665+ framework .ExpectNoError (err , "couldn't get http response from port-forward" )
666+ gomega .Expect (resp .StatusCode ).To (gomega .Equal (http .StatusOK ), "unexpected status code" )
667+ })
668+ })
496669})
497670
498671func wsRead (conn * websocket.Conn ) (byte , []byte , error ) {
@@ -521,3 +694,24 @@ func wsWrite(conn *websocket.Conn, channel byte, data []byte) error {
521694 err := websocket .Message .Send (conn , frame )
522695 return err
523696}
697+
698+ func post (url string , reader io.Reader , transport * http.Transport ) (string , error ) {
699+ if transport == nil {
700+ transport = utilnet .SetTransportDefaults (& http.Transport {})
701+ }
702+ client := & http.Client {Transport : transport }
703+ req , err := http .NewRequest (http .MethodPost , url , reader )
704+ if err != nil {
705+ return "" , err
706+ }
707+ resp , err := client .Do (req )
708+ if err != nil {
709+ return "" , err
710+ }
711+ defer resp .Body .Close () //nolint: errcheck
712+ body , err := io .ReadAll (resp .Body )
713+ if err != nil {
714+ return "" , err
715+ }
716+ return string (body ), nil
717+ }
0 commit comments