diff --git a/default_dial_option_server_option_test.go b/default_dial_option_server_option_test.go index 67ea9e80da8c..444a8ddeed13 100644 --- a/default_dial_option_server_option_test.go +++ b/default_dial_option_server_option_test.go @@ -146,6 +146,10 @@ func (s) TestJoinDialOption(t *testing.T) { if cc.dopts.copts.InitialWindowSize != initialWindowSize { t.Fatalf("Unexpected cc.dopts.copts.InitialWindowSize: %d != %d", cc.dopts.copts.InitialWindowSize, initialWindowSize) } + // Make sure static window size is not enabled when using WithInitialWindowSize. + if cc.dopts.copts.StaticWindowSize { + t.Fatalf("Unexpected cc.dopts.copts.StaticWindowSize: %t", cc.dopts.copts.StaticWindowSize) + } } // TestJoinServerOption tests the join server option. It configures a joined @@ -162,6 +166,10 @@ func (s) TestJoinServerOption(t *testing.T) { if s.opts.initialWindowSize != initialWindowSize { t.Fatalf("Unexpected s.opts.initialWindowSize: %d != %d", s.opts.initialWindowSize, initialWindowSize) } + // Make sure static window size is not enabled when using InitialWindowSize. + if s.opts.staticWindowSize { + t.Fatalf("Unexpected s.opts.staticWindowSize: %t", s.opts.staticWindowSize) + } } // funcTestHeaderListSizeDialOptionServerOption tests diff --git a/dialoptions.go b/dialoptions.go index 7a5ac2e7c494..0e724fb014cc 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -210,20 +210,29 @@ func WithReadBufferSize(s int) DialOption { // WithInitialWindowSize returns a DialOption which sets the value for initial // window size on a stream. The lower bound for window size is 64K and any value // smaller than that will be ignored. +// +// Deprecated: use [WithInitialStreamWindowSize] to set a stream window size +// without disabling dynamic flow control. Will be supported throughout 1.x. func WithInitialWindowSize(s int32) DialOption { return newFuncDialOption(func(o *dialOptions) { o.copts.InitialWindowSize = s - o.copts.StaticWindowSize = true }) } +// WithInitialStreamWindowSize returns a DialOption which sets the value for +// initial window size on a stream without disabling dynamic flow control. The +// lower bound for window size is 64K and any value smaller than that will be +// ignored. +func WithInitialStreamWindowSize(s int32) DialOption { + return WithInitialWindowSize(s) +} + // WithInitialConnWindowSize returns a DialOption which sets the value for // initial window size on a connection. The lower bound for window size is 64K // and any value smaller than that will be ignored. func WithInitialConnWindowSize(s int32) DialOption { return newFuncDialOption(func(o *dialOptions) { o.copts.InitialConnWindowSize = s - o.copts.StaticWindowSize = true }) } diff --git a/server.go b/server.go index 1b5cefe81715..6a88af8ab512 100644 --- a/server.go +++ b/server.go @@ -277,20 +277,29 @@ func ReadBufferSize(s int) ServerOption { } // InitialWindowSize returns a ServerOption that sets window size for stream. -// The lower bound for window size is 64K and any value smaller than that will be ignored. +// The lower bound for window size is 64K and any value smaller than that will +// be ignored. +// +// Deprecated: use [InitialStreamWindowSize] to set a stream window size without +// disabling dynamic flow control. Will be supported throughout 1.x. func InitialWindowSize(s int32) ServerOption { return newFuncServerOption(func(o *serverOptions) { o.initialWindowSize = s - o.staticWindowSize = true }) } +// InitialStreamWindowSize returns a ServerOption that sets window size for +// stream without disabling dynamic flow control. The lower bound for window +// size is 64K and any value smaller than that will be ignored. +func InitialStreamWindowSize(s int32) ServerOption { + return InitialWindowSize(s) +} + // InitialConnWindowSize returns a ServerOption that sets window size for a connection. // The lower bound for window size is 64K and any value smaller than that will be ignored. func InitialConnWindowSize(s int32) ServerOption { return newFuncServerOption(func(o *serverOptions) { o.initialConnWindowSize = s - o.staticWindowSize = true }) } diff --git a/test/channelz_test.go b/test/channelz_test.go index 93847126dbe4..bc50fbbe8658 100644 --- a/test/channelz_test.go +++ b/test/channelz_test.go @@ -1061,6 +1061,9 @@ func (s) TestCZClientSocketMetricsStreamsAndMessagesCount(t *testing.T) { func (s) TestCZClientAndServerSocketMetricsStreamsCountFlowControlRSTStream(t *testing.T) { e := tcpClearRREnv te := newTest(t, e) + // Disable BDP estimation to make sure the flow control is violated when a + // large message is sent. + te.isServerStaticWindow = true te.serverInitialWindowSize = 65536 // Avoid overflowing connection level flow control window, which will lead to // transport being closed. @@ -1146,6 +1149,8 @@ func (s) TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) { e := tcpClearRREnv te := newTest(t, e) // disable BDP + te.isServerStaticWindow = true + te.clientStaticWindow = true te.serverInitialWindowSize = 65536 te.serverInitialConnWindowSize = 65536 te.clientInitialWindowSize = 65536 diff --git a/test/end2end_test.go b/test/end2end_test.go index 037ab1a8db83..4dea3ed2d097 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -490,6 +490,7 @@ type test struct { unaryServerInt grpc.UnaryServerInterceptor streamServerInt grpc.StreamServerInterceptor serverInitialWindowSize int32 + isServerStaticWindow bool serverInitialConnWindowSize int32 customServerOptions []grpc.ServerOption @@ -505,14 +506,16 @@ type test struct { // Used to test the new compressor registration API UseCompressor. clientUseCompression bool // clientNopCompression is set to create a compressor whose type is not supported. - clientNopCompression bool - unaryClientInt grpc.UnaryClientInterceptor - streamClientInt grpc.StreamClientInterceptor - clientInitialWindowSize int32 - clientInitialConnWindowSize int32 - perRPCCreds credentials.PerRPCCredentials - customDialOptions []grpc.DialOption - resolverScheme string + clientNopCompression bool + unaryClientInt grpc.UnaryClientInterceptor + streamClientInt grpc.StreamClientInterceptor + clientInitialWindowSize int32 + clientUseInitialStreamWindowSize bool + clientInitialConnWindowSize int32 + clientStaticWindow bool + perRPCCreds credentials.PerRPCCredentials + customDialOptions []grpc.DialOption + resolverScheme string // These are set once startServer is called. The common case is to have // only one testServer. @@ -606,11 +609,21 @@ func (te *test) listenAndServe(ts testgrpc.TestServiceServer, listen func(networ sopts = append(sopts, grpc.UnknownServiceHandler(te.unknownHandler)) } if te.serverInitialWindowSize > 0 { - sopts = append(sopts, grpc.InitialWindowSize(te.serverInitialWindowSize)) + if te.isServerStaticWindow { + sopts = append(sopts, grpc.StaticStreamWindowSize(te.serverInitialWindowSize)) + } else { + sopts = append(sopts, grpc.InitialWindowSize(te.serverInitialWindowSize)) + } } + if te.serverInitialConnWindowSize > 0 { - sopts = append(sopts, grpc.InitialConnWindowSize(te.serverInitialConnWindowSize)) + if te.isServerStaticWindow { + sopts = append(sopts, grpc.StaticConnWindowSize(te.serverInitialConnWindowSize)) + } else { + sopts = append(sopts, grpc.InitialConnWindowSize(te.serverInitialConnWindowSize)) + } } + la := ":0" if te.e.network == "unix" { la = "/tmp/testsock" + fmt.Sprintf("%d", time.Now().UnixNano()) @@ -817,10 +830,21 @@ func (te *test) configDial(opts ...grpc.DialOption) ([]grpc.DialOption, string) opts = append(opts, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, te.e.balancer))) } if te.clientInitialWindowSize > 0 { - opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize)) + if te.clientStaticWindow { + opts = append(opts, grpc.WithStaticStreamWindowSize(te.clientInitialWindowSize)) + } else if te.clientUseInitialStreamWindowSize { + opts = append(opts, grpc.WithInitialStreamWindowSize(te.clientInitialWindowSize)) + } else { + opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize)) + } } + if te.clientInitialConnWindowSize > 0 { - opts = append(opts, grpc.WithInitialConnWindowSize(te.clientInitialConnWindowSize)) + if te.clientStaticWindow { + opts = append(opts, grpc.WithStaticConnWindowSize(te.clientInitialConnWindowSize)) + } else { + opts = append(opts, grpc.WithInitialConnWindowSize(te.clientInitialConnWindowSize)) + } } if te.perRPCCreds != nil { opts = append(opts, grpc.WithPerRPCCredentials(te.perRPCCreds)) @@ -5418,18 +5442,23 @@ func (s) TestClientWriteFailsAfterServerClosesStream(t *testing.T) { } type windowSizeConfig struct { - serverStream int32 - serverConn int32 - clientStream int32 - clientConn int32 + serverStream int32 + serverConn int32 + clientStream int32 + clientConn int32 + serverStaticWindow bool + clientStaticWindow bool + clientUseInitialStreamWindowSize bool } func (s) TestConfigurableWindowSizeWithLargeWindow(t *testing.T) { wc := windowSizeConfig{ - serverStream: 8 * 1024 * 1024, - serverConn: 12 * 1024 * 1024, - clientStream: 6 * 1024 * 1024, - clientConn: 8 * 1024 * 1024, + serverStream: 8 * 1024 * 1024, + serverConn: 12 * 1024 * 1024, + clientStream: 6 * 1024 * 1024, + clientConn: 8 * 1024 * 1024, + serverStaticWindow: true, + clientStaticWindow: true, } for _, e := range listTestEnv() { testConfigurableWindowSize(t, e, wc) @@ -5448,12 +5477,43 @@ func (s) TestConfigurableWindowSizeWithSmallWindow(t *testing.T) { } } +func (s) TestConfigurableInitialStreamWindowSizeWithLargeWindow(t *testing.T) { + wc := windowSizeConfig{ + serverStream: 8 * 1024 * 1024, + serverConn: 12 * 1024 * 1024, + clientStream: 6 * 1024 * 1024, + clientConn: 8 * 1024 * 1024, + serverStaticWindow: true, + clientStaticWindow: true, + clientUseInitialStreamWindowSize: true, + } + for _, e := range listTestEnv() { + testConfigurableWindowSize(t, e, wc) + } +} + +func (s) TestConfigurableInitialStreamWindowSizeWithSmallWindow(t *testing.T) { + wc := windowSizeConfig{ + serverStream: 1, + serverConn: 1, + clientStream: 1, + clientConn: 1, + clientUseInitialStreamWindowSize: true, + } + for _, e := range listTestEnv() { + testConfigurableWindowSize(t, e, wc) + } +} + func testConfigurableWindowSize(t *testing.T, e env, wc windowSizeConfig) { te := newTest(t, e) te.serverInitialWindowSize = wc.serverStream te.serverInitialConnWindowSize = wc.serverConn te.clientInitialWindowSize = wc.clientStream te.clientInitialConnWindowSize = wc.clientConn + te.isServerStaticWindow = wc.serverStaticWindow + te.clientStaticWindow = wc.clientStaticWindow + te.clientUseInitialStreamWindowSize = wc.clientUseInitialStreamWindowSize te.startServer(&testServer{security: e.security}) defer te.tearDown() diff --git a/test/stream_cleanup_test.go b/test/stream_cleanup_test.go index 600fb1fadc21..7d41cde3b6ba 100644 --- a/test/stream_cleanup_test.go +++ b/test/stream_cleanup_test.go @@ -48,7 +48,9 @@ func (s) TestStreamCleanup(t *testing.T) { return &testpb.Empty{}, nil }, } - if err := ss.Start(nil, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(callRecvMsgSize))), grpc.WithInitialWindowSize(int32(initialWindowSize))); err != nil { + + // Disable BDP to ensure the message size exceeds window size. + if err := ss.Start(nil, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(callRecvMsgSize))), grpc.WithStaticStreamWindowSize(int32(initialWindowSize))); err != nil { t.Fatalf("Error starting endpoint server: %v", err) } defer ss.Stop() @@ -81,7 +83,9 @@ func (s) TestStreamCleanupAfterSendStatus(t *testing.T) { }) }, } - if err := ss.Start(nil, grpc.WithInitialWindowSize(int32(initialWindowSize))); err != nil { + + // Disable BDP to ensure the message size exceeds window size. + if err := ss.Start(nil, grpc.WithStaticStreamWindowSize(int32(initialWindowSize))); err != nil { t.Fatalf("Error starting endpoint server: %v", err) } defer ss.Stop()