Skip to content

Commit

Permalink
Merge pull request #100953 from saschagrunert/automated-cherry-pick-o…
Browse files Browse the repository at this point in the history
…f-#99839-upstream-release-1.19

Automated cherry pick of #99839: Cleanup portforward streams after their usage

Kubernetes-commit: cc22cfabdfdeb5574fcd5310515a2986a2394cdf
  • Loading branch information
k8s-publishing-bot committed May 18, 2021
2 parents aea3c69 + 4d1ec35 commit 509bd57
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 4 deletions.
2 changes: 2 additions & 0 deletions pkg/util/httpstream/httpstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ type Connection interface {
// SetIdleTimeout sets the amount of time the connection may remain idle before
// it is automatically closed.
SetIdleTimeout(timeout time.Duration)
// RemoveStreams can be used to remove a set of streams from the Connection.
RemoveStreams(streams ...Stream)
}

// Stream represents a bidirectional communications channel that is part of an
Expand Down
21 changes: 17 additions & 4 deletions pkg/util/httpstream/spdy/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
// streams.
type connection struct {
conn *spdystream.Connection
streams []httpstream.Stream
streams map[uint32]httpstream.Stream
streamLock sync.Mutex
newStreamHandler httpstream.NewStreamHandler
}
Expand Down Expand Up @@ -64,7 +64,11 @@ func NewServerConnection(conn net.Conn, newStreamHandler httpstream.NewStreamHan
// will be invoked when the server receives a newly created stream from the
// client.
func newConnection(conn *spdystream.Connection, newStreamHandler httpstream.NewStreamHandler) httpstream.Connection {
c := &connection{conn: conn, newStreamHandler: newStreamHandler}
c := &connection{
conn: conn,
newStreamHandler: newStreamHandler,
streams: make(map[uint32]httpstream.Stream),
}
go conn.Serve(c.newSpdyStream)
return c
}
Expand All @@ -81,7 +85,7 @@ func (c *connection) Close() error {
// calling Reset instead of Close ensures that all streams are fully torn down
s.Reset()
}
c.streams = make([]httpstream.Stream, 0)
c.streams = make(map[uint32]httpstream.Stream, 0)
c.streamLock.Unlock()

// now that all streams are fully torn down, it's safe to call close on the underlying connection,
Expand All @@ -90,6 +94,15 @@ func (c *connection) Close() error {
return c.conn.Close()
}

// RemoveStreams can be used to removes a set of streams from the Connection.
func (c *connection) RemoveStreams(streams ...httpstream.Stream) {
c.streamLock.Lock()
for _, stream := range streams {
delete(c.streams, stream.Identifier())
}
c.streamLock.Unlock()
}

// CreateStream creates a new stream with the specified headers and registers
// it with the connection.
func (c *connection) CreateStream(headers http.Header) (httpstream.Stream, error) {
Expand All @@ -109,7 +122,7 @@ func (c *connection) CreateStream(headers http.Header) (httpstream.Stream, error
// it owns.
func (c *connection) registerStream(s httpstream.Stream) {
c.streamLock.Lock()
c.streams = append(c.streams, s)
c.streams[s.Identifier()] = s
c.streamLock.Unlock()
}

Expand Down
38 changes: 38 additions & 0 deletions pkg/util/httpstream/spdy/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,41 @@ func TestConnectionCloseIsImmediateThroughAProxy(t *testing.T) {
}
}
}

type fakeStream struct{ id uint32 }

func (*fakeStream) Read(p []byte) (int, error) { return 0, nil }
func (*fakeStream) Write(p []byte) (int, error) { return 0, nil }
func (*fakeStream) Close() error { return nil }
func (*fakeStream) Reset() error { return nil }
func (*fakeStream) Headers() http.Header { return nil }
func (f *fakeStream) Identifier() uint32 { return f.id }

func TestConnectionRemoveStreams(t *testing.T) {
c := &connection{streams: make(map[uint32]httpstream.Stream)}
stream0 := &fakeStream{id: 0}
stream1 := &fakeStream{id: 1}
stream2 := &fakeStream{id: 2}

c.registerStream(stream0)
c.registerStream(stream1)

if len(c.streams) != 2 {
t.Fatalf("should have two streams, has %d", len(c.streams))
}

// not exists
c.RemoveStreams(stream2)

if len(c.streams) != 2 {
t.Fatalf("should have two streams, has %d", len(c.streams))
}

// remove all existing
c.RemoveStreams(stream0, stream1)

if len(c.streams) != 0 {
t.Fatalf("should not have any streams, has %d", len(c.streams))
}

}

0 comments on commit 509bd57

Please sign in to comment.