Skip to content

Commit cd89eaf

Browse files
authored
test: fix Test/GracefulStop by not removing activeStreams too aggresivelly (#2857)
Before this fix, stream is removed from activeStreams in finishStream, which happens when the service handler returns status, without waiting for the status to be sent by loopyWriter. If GracefulStop() is called in between, it will close the connection (because activeStreams is empty), which causes the RPC to fail with "transport is closing". This change moves the activeStreams cleanup into loopyWriter, after sending status on wire.
1 parent a1d4c28 commit cd89eaf

File tree

3 files changed

+87
-12
lines changed

3 files changed

+87
-12
lines changed

Diff for: internal/transport/http2_server.go

+8-12
Original file line numberDiff line numberDiff line change
@@ -1024,13 +1024,7 @@ func (t *http2Server) Close() error {
10241024
}
10251025

10261026
// deleteStream deletes the stream s from transport's active streams.
1027-
func (t *http2Server) deleteStream(s *Stream, eosReceived bool) (oldState streamState) {
1028-
oldState = s.swapState(streamDone)
1029-
if oldState == streamDone {
1030-
// If the stream was already done, return.
1031-
return oldState
1032-
}
1033-
1027+
func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
10341028
// In case stream sending and receiving are invoked in separate
10351029
// goroutines (e.g., bi-directional streaming), cancel needs to be
10361030
// called to interrupt the potential blocking on other goroutines.
@@ -1052,30 +1046,32 @@ func (t *http2Server) deleteStream(s *Stream, eosReceived bool) (oldState stream
10521046
atomic.AddInt64(&t.czData.streamsFailed, 1)
10531047
}
10541048
}
1055-
1056-
return oldState
10571049
}
10581050

10591051
// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
10601052
func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
1061-
oldState := t.deleteStream(s, eosReceived)
1062-
// If the stream is already closed, then don't put trailing header to controlbuf.
1053+
oldState := s.swapState(streamDone)
10631054
if oldState == streamDone {
1055+
// If the stream was already done, return.
10641056
return
10651057
}
10661058

10671059
hdr.cleanup = &cleanupStream{
10681060
streamID: s.id,
10691061
rst: rst,
10701062
rstCode: rstCode,
1071-
onWrite: func() {},
1063+
onWrite: func() {
1064+
t.deleteStream(s, eosReceived)
1065+
},
10721066
}
10731067
t.controlBuf.put(hdr)
10741068
}
10751069

10761070
// closeStream clears the footprint of a stream when the stream is not needed any more.
10771071
func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
1072+
s.swapState(streamDone)
10781073
t.deleteStream(s, eosReceived)
1074+
10791075
t.controlBuf.put(&cleanupStream{
10801076
streamID: s.id,
10811077
rst: rst,

Diff for: test/end2end_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -5237,6 +5237,7 @@ type stubServer struct {
52375237
// A client connected to this service the test may use. Created in Start().
52385238
client testpb.TestServiceClient
52395239
cc *grpc.ClientConn
5240+
s *grpc.Server
52405241

52415242
addr string // address of listener
52425243

@@ -5274,6 +5275,7 @@ func (ss *stubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption)
52745275
testpb.RegisterTestServiceServer(s, ss)
52755276
go s.Serve(lis)
52765277
ss.cleanups = append(ss.cleanups, s.Stop)
5278+
ss.s = s
52775279

52785280
target := ss.r.Scheme() + ":///" + ss.addr
52795281

Diff for: test/stream_cleanup_test.go

+77
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ package test
2020

2121
import (
2222
"context"
23+
"io"
2324
"testing"
25+
"time"
2426

2527
"google.golang.org/grpc"
2628
"google.golang.org/grpc/codes"
@@ -55,3 +57,78 @@ func (s) TestStreamCleanup(t *testing.T) {
5557
t.Fatalf("should succeed, err: %v", err)
5658
}
5759
}
60+
61+
func (s) TestStreamCleanupAfterSendStatus(t *testing.T) {
62+
const initialWindowSize uint = 70 * 1024 // Must be higher than default 64K, ignored otherwise
63+
const bodySize = 2 * initialWindowSize // Something that is not going to fit in a single window
64+
65+
serverReturnedStatus := make(chan struct{})
66+
67+
ss := &stubServer{
68+
fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
69+
defer func() {
70+
close(serverReturnedStatus)
71+
}()
72+
return stream.Send(&testpb.StreamingOutputCallResponse{
73+
Payload: &testpb.Payload{
74+
Body: make([]byte, bodySize),
75+
},
76+
})
77+
},
78+
}
79+
if err := ss.Start([]grpc.ServerOption{grpc.MaxConcurrentStreams(1)}, grpc.WithInitialWindowSize(int32(initialWindowSize))); err != nil {
80+
t.Fatalf("Error starting endpoint server: %v", err)
81+
}
82+
defer ss.Stop()
83+
84+
// This test makes sure we don't delete stream from server transport's
85+
// activeStreams list too aggressively.
86+
87+
// 1. Make a long living stream RPC. So server's activeStream list is not
88+
// empty.
89+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
90+
defer cancel()
91+
stream, err := ss.client.FullDuplexCall(ctx)
92+
if err != nil {
93+
t.Fatalf("FullDuplexCall= _, %v; want _, <nil>", err)
94+
}
95+
96+
// 2. Wait for service handler to return status.
97+
//
98+
// This will trigger a stream cleanup code, which will eventually remove
99+
// this stream from activeStream.
100+
//
101+
// But the stream removal won't happen because it's supposed to be done
102+
// after the status is sent by loopyWriter, and the status send is blocked
103+
// by flow control.
104+
<-serverReturnedStatus
105+
106+
// 3. GracefulStop (besides sending goaway) checks the number of
107+
// activeStreams.
108+
//
109+
// It will close the connection if there's no active streams. This won't
110+
// happen because of the pending stream. But if there's a bug in stream
111+
// cleanup that causes stream to be removed too aggressively, the connection
112+
// will be closd and the stream will be broken.
113+
gracefulStopDone := make(chan struct{})
114+
go func() {
115+
defer close(gracefulStopDone)
116+
ss.s.GracefulStop()
117+
}()
118+
119+
// 4. Make sure the stream is not broken.
120+
if _, err := stream.Recv(); err != nil {
121+
t.Fatalf("stream.Recv() = _, %v, want _, <nil>", err)
122+
}
123+
if _, err := stream.Recv(); err != io.EOF {
124+
t.Fatalf("stream.Recv() = _, %v, want _, io.EOF", err)
125+
}
126+
127+
timer := time.NewTimer(time.Second)
128+
select {
129+
case <-gracefulStopDone:
130+
timer.Stop()
131+
case <-timer.C:
132+
t.Fatalf("s.GracefulStop() didn't finish without 1 second after the last RPC")
133+
}
134+
}

0 commit comments

Comments
 (0)