Skip to content

Commit

Permalink
[internal-branch.go1.21-vendor] http2: limit maximum handler goroutin…
Browse files Browse the repository at this point in the history
…es to MaxConcurrentStreams

When the peer opens a new stream while we have MaxConcurrentStreams
handler goroutines running, defer starting a handler until one
of the existing handlers exits.

For golang/go#63417.
For golang/go#63427.
For CVE-2023-39325.

Change-Id: If0531e177b125700f3e24c5ebd24b1023098fa6d
Reviewed-on: https://team-review.git.corp.google.com/c/golang/go-private/+/2047391
Reviewed-by: Tatiana Bradley <[email protected]>
Run-TryBot: Damien Neil <[email protected]>
Reviewed-by: Ian Cottrell <[email protected]>
Reviewed-on: https://go-review.googlesource.com/c/net/+/534218
LUCI-TryBot-Result: Go LUCI <[email protected]>
Reviewed-by: Dmitri Shuralyov <[email protected]>
Auto-Submit: Dmitri Shuralyov <[email protected]>
Reviewed-by: Michael Pratt <[email protected]>
  • Loading branch information
neild authored and gopherbot committed Oct 10, 2023
1 parent 57553cb commit 695775c
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 2 deletions.
66 changes: 64 additions & 2 deletions http2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,9 +581,11 @@ type serverConn struct {
advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
curClientStreams uint32 // number of open streams initiated by the client
curPushedStreams uint32 // number of open streams initiated by server push
curHandlers uint32 // number of running handler goroutines
maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests
maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes
streams map[uint32]*stream
unstartedHandlers []unstartedHandler
initialStreamSendWindowSize int32
maxFrameSize int32
peerMaxHeaderListSize uint32 // zero means unknown (default)
Expand Down Expand Up @@ -981,6 +983,8 @@ func (sc *serverConn) serve() {
return
case gracefulShutdownMsg:
sc.startGracefulShutdownInternal()
case handlerDoneMsg:
sc.handlerDone()
default:
panic("unknown timer")
}
Expand Down Expand Up @@ -1028,6 +1032,7 @@ var (
idleTimerMsg = new(serverMessage)
shutdownTimerMsg = new(serverMessage)
gracefulShutdownMsg = new(serverMessage)
handlerDoneMsg = new(serverMessage)
)

func (sc *serverConn) onSettingsTimer() { sc.sendServeMsg(settingsTimerMsg) }
Expand Down Expand Up @@ -2025,8 +2030,7 @@ func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
}
}

go sc.runHandler(rw, req, handler)
return nil
return sc.scheduleHandler(id, rw, req, handler)
}

func (sc *serverConn) upgradeRequest(req *http.Request) {
Expand All @@ -2046,6 +2050,10 @@ func (sc *serverConn) upgradeRequest(req *http.Request) {
sc.conn.SetReadDeadline(time.Time{})
}

// This is the first request on the connection,
// so start the handler directly rather than going
// through scheduleHandler.
sc.curHandlers++
go sc.runHandler(rw, req, sc.handler.ServeHTTP)
}

Expand Down Expand Up @@ -2286,8 +2294,62 @@ func (sc *serverConn) newResponseWriter(st *stream, req *http.Request) *response
return &responseWriter{rws: rws}
}

type unstartedHandler struct {
streamID uint32
rw *responseWriter
req *http.Request
handler func(http.ResponseWriter, *http.Request)
}

// scheduleHandler starts a handler goroutine,
// or schedules one to start as soon as an existing handler finishes.
func (sc *serverConn) scheduleHandler(streamID uint32, rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) error {
sc.serveG.check()
maxHandlers := sc.advMaxStreams
if sc.curHandlers < maxHandlers {
sc.curHandlers++
go sc.runHandler(rw, req, handler)
return nil
}
if len(sc.unstartedHandlers) > int(4*sc.advMaxStreams) {
return sc.countError("too_many_early_resets", ConnectionError(ErrCodeEnhanceYourCalm))
}
sc.unstartedHandlers = append(sc.unstartedHandlers, unstartedHandler{
streamID: streamID,
rw: rw,
req: req,
handler: handler,
})
return nil
}

func (sc *serverConn) handlerDone() {
sc.serveG.check()
sc.curHandlers--
i := 0
maxHandlers := sc.advMaxStreams
for ; i < len(sc.unstartedHandlers); i++ {
u := sc.unstartedHandlers[i]
if sc.streams[u.streamID] == nil {
// This stream was reset before its goroutine had a chance to start.
continue
}
if sc.curHandlers >= maxHandlers {
break
}
sc.curHandlers++
go sc.runHandler(u.rw, u.req, u.handler)
sc.unstartedHandlers[i] = unstartedHandler{} // don't retain references
}
sc.unstartedHandlers = sc.unstartedHandlers[i:]
if len(sc.unstartedHandlers) == 0 {
sc.unstartedHandlers = nil
}
}

// Run on its own goroutine.
func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) {
defer sc.sendServeMsg(handlerDoneMsg)
didPanic := true
defer func() {
rw.rws.stream.cancelCtx()
Expand Down
113 changes: 113 additions & 0 deletions http2/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4756,3 +4756,116 @@ func TestServerWriteDoesNotRetainBufferAfterServerClose(t *testing.T) {
st.ts.Config.Close()
<-donec
}

func TestServerMaxHandlerGoroutines(t *testing.T) {
const maxHandlers = 10
handlerc := make(chan chan bool)
donec := make(chan struct{})
defer close(donec)
st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
stopc := make(chan bool, 1)
select {
case handlerc <- stopc:
case <-donec:
}
select {
case shouldPanic := <-stopc:
if shouldPanic {
panic(http.ErrAbortHandler)
}
case <-donec:
}
}, func(s *Server) {
s.MaxConcurrentStreams = maxHandlers
})
defer st.Close()

st.writePreface()
st.writeInitialSettings()
st.writeSettingsAck()

// Make maxHandlers concurrent requests.
// Reset them all, but only after the handler goroutines have started.
var stops []chan bool
streamID := uint32(1)
for i := 0; i < maxHandlers; i++ {
st.writeHeaders(HeadersFrameParam{
StreamID: streamID,
BlockFragment: st.encodeHeader(),
EndStream: true,
EndHeaders: true,
})
stops = append(stops, <-handlerc)
st.fr.WriteRSTStream(streamID, ErrCodeCancel)
streamID += 2
}

// Start another request, and immediately reset it.
st.writeHeaders(HeadersFrameParam{
StreamID: streamID,
BlockFragment: st.encodeHeader(),
EndStream: true,
EndHeaders: true,
})
st.fr.WriteRSTStream(streamID, ErrCodeCancel)
streamID += 2

// Start another two requests. Don't reset these.
for i := 0; i < 2; i++ {
st.writeHeaders(HeadersFrameParam{
StreamID: streamID,
BlockFragment: st.encodeHeader(),
EndStream: true,
EndHeaders: true,
})
streamID += 2
}

// The initial maxHandlers handlers are still executing,
// so the last two requests don't start any new handlers.
select {
case <-handlerc:
t.Errorf("handler unexpectedly started while maxHandlers are already running")
case <-time.After(1 * time.Millisecond):
}

// Tell two handlers to exit.
// The pending requests which weren't reset start handlers.
stops[0] <- false // normal exit
stops[1] <- true // panic
stops = stops[2:]
stops = append(stops, <-handlerc)
stops = append(stops, <-handlerc)

// Make a bunch more requests.
// Eventually, the server tells us to go away.
for i := 0; i < 5*maxHandlers; i++ {
st.writeHeaders(HeadersFrameParam{
StreamID: streamID,
BlockFragment: st.encodeHeader(),
EndStream: true,
EndHeaders: true,
})
st.fr.WriteRSTStream(streamID, ErrCodeCancel)
streamID += 2
}
Frames:
for {
f, err := st.readFrame()
if err != nil {
st.t.Fatal(err)
}
switch f := f.(type) {
case *GoAwayFrame:
if f.ErrCode != ErrCodeEnhanceYourCalm {
t.Errorf("err code = %v; want %v", f.ErrCode, ErrCodeEnhanceYourCalm)
}
break Frames
default:
}
}

for _, s := range stops {
close(s)
}
}

0 comments on commit 695775c

Please sign in to comment.