From 5e0fa765ba0c9c9b6c703dfa56705c3acfa056c3 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Thu, 18 Apr 2024 10:18:20 -0700 Subject: [PATCH] transport: make nextID accessed inside t.mu only (#7148) --- internal/transport/http2_client.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index ef461b68f835..3c63c706986d 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -114,11 +114,11 @@ type http2Client struct { streamQuota int64 streamsQuotaAvailable chan struct{} waitingStreams uint32 - nextID uint32 registeredCompressors string // Do not access controlBuf with mu held. mu sync.Mutex // guard the following variables + nextID uint32 state transportState activeStreams map[uint32]*Stream // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame. @@ -808,22 +808,24 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, t.waitingStreams-- } t.streamQuota-- + + t.mu.Lock() + if t.state == draining || t.activeStreams == nil { // Can be niled from Close(). + t.mu.Unlock() + return false // Don't create a stream if the transport is already closed. + } + hdr.streamID = t.nextID t.nextID += 2 - // Drain client transport if nextID > MaxStreamID which signals gRPC that // the connection is closed and a new one must be created for subsequent RPCs. transportDrainRequired = t.nextID > MaxStreamID s.id = hdr.streamID s.fc = &inFlow{limit: uint32(t.initialWindowSize)} - t.mu.Lock() - if t.state == draining || t.activeStreams == nil { // Can be niled from Close(). - t.mu.Unlock() - return false // Don't create a stream if the transport is already closed. - } t.activeStreams[s.id] = s t.mu.Unlock() + if t.streamQuota > 0 && t.waitingStreams > 0 { select { case t.streamsQuotaAvailable <- struct{}{}: