diff --git a/clientconn.go b/clientconn.go index c0c2c9a76abf..09c9f1b4a933 100644 --- a/clientconn.go +++ b/clientconn.go @@ -262,9 +262,10 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * }() // This creates the name resolver, load balancer, etc. - if err := cc.idlenessMgr.ExitIdleMode(); err != nil { - return nil, err + if err := cc.exitIdleMode(); err != nil { + return nil, fmt.Errorf("failed to exit idle mode: %w", err) } + cc.idlenessMgr.UnsafeSetNotIdle() // Return now for non-blocking dials. if !cc.dopts.block { @@ -332,7 +333,7 @@ func (cc *ClientConn) addTraceEvent(msg string) { Severity: channelz.CtInfo, } } - channelz.AddTraceEvent(logger, cc.channelz, 0, ted) + channelz.AddTraceEvent(logger, cc.channelz, 1, ted) } type idler ClientConn @@ -341,14 +342,17 @@ func (i *idler) EnterIdleMode() { (*ClientConn)(i).enterIdleMode() } -func (i *idler) ExitIdleMode() error { - return (*ClientConn)(i).exitIdleMode() +func (i *idler) ExitIdleMode() { + // Ignore the error returned from this method, because from the perspective + // of the caller (idleness manager), the channel would have always moved out + // of IDLE by the time this method returns. + (*ClientConn)(i).exitIdleMode() } // exitIdleMode moves the channel out of idle mode by recreating the name // resolver and load balancer. This should never be called directly; use // cc.idlenessMgr.ExitIdleMode instead. -func (cc *ClientConn) exitIdleMode() (err error) { +func (cc *ClientConn) exitIdleMode() error { cc.mu.Lock() if cc.conns == nil { cc.mu.Unlock() @@ -356,11 +360,23 @@ func (cc *ClientConn) exitIdleMode() (err error) { } cc.mu.Unlock() + // Set state to CONNECTING before building the name resolver + // so the channel does not remain in IDLE. + cc.csMgr.updateState(connectivity.Connecting) + // This needs to be called without cc.mu because this builds a new resolver // which might update state or report error inline, which would then need to // acquire cc.mu. if err := cc.resolverWrapper.start(); err != nil { - return err + // If resolver creation fails, treat it like an error reported by the + // resolver before any valid udpates. Set channel's state to + // TransientFailure, and set an erroring picker with the resolver build + // error, which will returned as part of any subsequent RPCs. + logger.Warningf("Failed to start resolver: %v", err) + cc.csMgr.updateState(connectivity.TransientFailure) + cc.mu.Lock() + cc.updateResolverStateAndUnlock(resolver.State{}, err) + return fmt.Errorf("failed to start resolver: %w", err) } cc.addTraceEvent("exiting idle mode") @@ -681,10 +697,8 @@ func (cc *ClientConn) GetState() connectivity.State { // Notice: This API is EXPERIMENTAL and may be changed or removed in a later // release. func (cc *ClientConn) Connect() { - if err := cc.idlenessMgr.ExitIdleMode(); err != nil { - cc.addTraceEvent(err.Error()) - return - } + cc.idlenessMgr.ExitIdleMode() + // If the ClientConn was not in idle mode, we need to call ExitIdle on the // LB policy so that connections can be created. cc.mu.Lock() @@ -735,8 +749,8 @@ func init() { internal.EnterIdleModeForTesting = func(cc *ClientConn) { cc.idlenessMgr.EnterIdleModeForTesting() } - internal.ExitIdleModeForTesting = func(cc *ClientConn) error { - return cc.idlenessMgr.ExitIdleMode() + internal.ExitIdleModeForTesting = func(cc *ClientConn) { + cc.idlenessMgr.ExitIdleMode() } } diff --git a/clientconn_parsed_target_test.go b/clientconn_parsed_target_test.go index 00e8e283f63f..57ae3ad7020f 100644 --- a/clientconn_parsed_target_test.go +++ b/clientconn_parsed_target_test.go @@ -205,28 +205,31 @@ func (s) TestParsedTarget_Failure_WithoutCustomDialer(t *testing.T) { } func (s) TestParsedTarget_Failure_WithoutCustomDialer_WithNewClient(t *testing.T) { - targets := []string{ - "", - "unix://a/b/c", - "unix://authority", - "unix-abstract://authority/a/b/c", - "unix-abstract://authority", + tests := []struct { + target string + wantErrSubstr string + }{ + + {target: "", wantErrSubstr: "invalid target address"}, + {target: "unix://a/b/c", wantErrSubstr: "invalid (non-empty) authority"}, + {target: "unix://authority", wantErrSubstr: "invalid (non-empty) authority"}, + {target: "unix-abstract://authority/a/b/c", wantErrSubstr: "invalid (non-empty) authority"}, + {target: "unix-abstract://authority", wantErrSubstr: "invalid (non-empty) authority"}, } - for _, target := range targets { - t.Run(target, func(t *testing.T) { + for _, test := range tests { + t.Run(test.target, func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - cc, err := NewClient(target, WithTransportCredentials(insecure.NewCredentials())) + cc, err := NewClient(test.target, WithTransportCredentials(insecure.NewCredentials())) if err != nil { - t.Fatalf("NewClient(%q) failed: %v", target, err) + t.Fatalf("NewClient(%q) failed: %v", test, err) } defer cc.Close() - const wantErrSubstr = "failed to exit idle mode" if _, err := cc.NewStream(ctx, &StreamDesc{}, "/my.service.v1.MyService/UnaryCall"); err == nil { - t.Fatalf("NewStream() succeeded with target = %q, cc.parsedTarget = %+v, expected to fail", target, cc.parsedTarget) - } else if !strings.Contains(err.Error(), wantErrSubstr) { - t.Fatalf("NewStream() with target = %q returned unexpected error: got %v, want substring %q", target, err, wantErrSubstr) + t.Fatalf("NewStream() succeeded with target = %q, cc.parsedTarget = %+v, expected to fail", test, cc.parsedTarget) + } else if !strings.Contains(err.Error(), test.wantErrSubstr) { + t.Fatalf("NewStream() with target = %q returned unexpected error: got %v, want substring %q", test, err, test.wantErrSubstr) } }) } diff --git a/dial_test.go b/dial_test.go index cb86f330d5cf..7b74aac79fa6 100644 --- a/dial_test.go +++ b/dial_test.go @@ -20,6 +20,7 @@ package grpc import ( "context" + "fmt" "net" "strings" "testing" @@ -312,3 +313,36 @@ func (s) TestResolverAddressesWithTypedNilAttribute(t *testing.T) { type stringerVal struct{ s string } func (s stringerVal) String() string { return s.s } + +const errResolverBuilderScheme = "test-resolver-build-failure" + +// errResolverBuilder is a resolver builder that returns an error from its Build +// method. +type errResolverBuilder struct { + err error +} + +func (b *errResolverBuilder) Build(resolver.Target, resolver.ClientConn, resolver.BuildOptions) (resolver.Resolver, error) { + return nil, b.err +} + +func (b *errResolverBuilder) Scheme() string { + return errResolverBuilderScheme +} + +// Tests that Dial returns an error if the resolver builder returns an error +// from its Build method. +func (s) TestDial_ResolverBuilder_Error(t *testing.T) { + resolverErr := fmt.Errorf("resolver builder error") + dopts := []DialOption{ + WithTransportCredentials(insecure.NewCredentials()), + WithResolvers(&errResolverBuilder{err: resolverErr}), + } + _, err := Dial(errResolverBuilderScheme+":///test.server", dopts...) + if err == nil { + t.Fatalf("Dial() succeeded when it should have failed") + } + if !strings.Contains(err.Error(), resolverErr.Error()) { + t.Fatalf("Dial() failed with error %v, want %v", err, resolverErr) + } +} diff --git a/internal/idle/idle.go b/internal/idle/idle.go index 2c13ee9dac75..d3cd24f80bd7 100644 --- a/internal/idle/idle.go +++ b/internal/idle/idle.go @@ -21,7 +21,6 @@ package idle import ( - "fmt" "math" "sync" "sync/atomic" @@ -33,15 +32,15 @@ var timeAfterFunc = func(d time.Duration, f func()) *time.Timer { return time.AfterFunc(d, f) } -// Enforcer is the functionality provided by grpc.ClientConn to enter -// and exit from idle mode. -type Enforcer interface { - ExitIdleMode() error +// ClientConn is the functionality provided by grpc.ClientConn to enter and exit +// from idle mode. +type ClientConn interface { + ExitIdleMode() EnterIdleMode() } -// Manager implements idleness detection and calls the configured Enforcer to -// enter/exit idle mode when appropriate. Must be created by NewManager. +// Manager implements idleness detection and calls the ClientConn to enter/exit +// idle mode when appropriate. Must be created by NewManager. type Manager struct { // State accessed atomically. lastCallEndTime int64 // Unix timestamp in nanos; time when the most recent RPC completed. @@ -51,8 +50,8 @@ type Manager struct { // Can be accessed without atomics or mutex since these are set at creation // time and read-only after that. - enforcer Enforcer // Functionality provided by grpc.ClientConn. - timeout time.Duration + cc ClientConn // Functionality provided by grpc.ClientConn. + timeout time.Duration // idleMu is used to guarantee mutual exclusion in two scenarios: // - Opposing intentions: @@ -72,9 +71,9 @@ type Manager struct { // NewManager creates a new idleness manager implementation for the // given idle timeout. It begins in idle mode. -func NewManager(enforcer Enforcer, timeout time.Duration) *Manager { +func NewManager(cc ClientConn, timeout time.Duration) *Manager { return &Manager{ - enforcer: enforcer, + cc: cc, timeout: timeout, actuallyIdle: true, activeCallsCount: -math.MaxInt32, @@ -127,7 +126,7 @@ func (m *Manager) handleIdleTimeout() { // Now that we've checked that there has been no activity, attempt to enter // idle mode, which is very likely to succeed. - if m.tryEnterIdleMode() { + if m.tryEnterIdleMode(true) { // Successfully entered idle mode. No timer needed until we exit idle. return } @@ -142,10 +141,13 @@ func (m *Manager) handleIdleTimeout() { // that, it performs a last minute check to ensure that no new RPC has come in, // making the channel active. // +// checkActivity controls if a check for RPC activity, since the last time the +// idle_timeout fired, is made. + // Return value indicates whether or not the channel moved to idle mode. // // Holds idleMu which ensures mutual exclusion with exitIdleMode. -func (m *Manager) tryEnterIdleMode() bool { +func (m *Manager) tryEnterIdleMode(checkActivity bool) bool { // Setting the activeCallsCount to -math.MaxInt32 indicates to OnCallBegin() // that the channel is either in idle mode or is trying to get there. if !atomic.CompareAndSwapInt32(&m.activeCallsCount, 0, -math.MaxInt32) { @@ -166,7 +168,7 @@ func (m *Manager) tryEnterIdleMode() bool { atomic.AddInt32(&m.activeCallsCount, math.MaxInt32) return false } - if atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 { + if checkActivity && atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 { // A very short RPC could have come in (and also finished) after we // checked for calls count and activity in handleIdleTimeout(), but // before the CAS operation. So, we need to check for activity again. @@ -177,44 +179,37 @@ func (m *Manager) tryEnterIdleMode() bool { // No new RPCs have come in since we set the active calls count value to // -math.MaxInt32. And since we have the lock, it is safe to enter idle mode // unconditionally now. - m.enforcer.EnterIdleMode() + m.cc.EnterIdleMode() m.actuallyIdle = true return true } // EnterIdleModeForTesting instructs the channel to enter idle mode. func (m *Manager) EnterIdleModeForTesting() { - m.tryEnterIdleMode() + m.tryEnterIdleMode(false) } // OnCallBegin is invoked at the start of every RPC. -func (m *Manager) OnCallBegin() error { +func (m *Manager) OnCallBegin() { if m.isClosed() { - return nil + return } if atomic.AddInt32(&m.activeCallsCount, 1) > 0 { // Channel is not idle now. Set the activity bit and allow the call. atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1) - return nil + return } // Channel is either in idle mode or is in the process of moving to idle // mode. Attempt to exit idle mode to allow this RPC. - if err := m.ExitIdleMode(); err != nil { - // Undo the increment to calls count, and return an error causing the - // RPC to fail. - atomic.AddInt32(&m.activeCallsCount, -1) - return err - } - + m.ExitIdleMode() atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1) - return nil } -// ExitIdleMode instructs m to call the enforcer's ExitIdleMode and update m's +// ExitIdleMode instructs m to call the ClientConn's ExitIdleMode and update its // internal state. -func (m *Manager) ExitIdleMode() error { +func (m *Manager) ExitIdleMode() { // Holds idleMu which ensures mutual exclusion with tryEnterIdleMode. m.idleMu.Lock() defer m.idleMu.Unlock() @@ -231,12 +226,10 @@ func (m *Manager) ExitIdleMode() error { // m.ExitIdleMode. // // In any case, there is nothing to do here. - return nil + return } - if err := m.enforcer.ExitIdleMode(); err != nil { - return fmt.Errorf("failed to exit idle mode: %w", err) - } + m.cc.ExitIdleMode() // Undo the idle entry process. This also respects any new RPC attempts. atomic.AddInt32(&m.activeCallsCount, math.MaxInt32) @@ -244,7 +237,23 @@ func (m *Manager) ExitIdleMode() error { // Start a new timer to fire after the configured idle timeout. m.resetIdleTimerLocked(m.timeout) - return nil +} + +// UnsafeSetNotIdle instructs the Manager to update its internal state to +// reflect the reality that the channel is no longer in IDLE mode. +// +// N.B. This method is intended only for internal use by the gRPC client +// when it exits IDLE mode **manually** from `Dial`. The callsite must ensure: +// - The channel was **actually in IDLE mode** immediately prior to the call. +// - There is **no concurrent activity** that could cause the channel to exit +// IDLE mode *naturally* at the same time. +func (m *Manager) UnsafeSetNotIdle() { + m.idleMu.Lock() + defer m.idleMu.Unlock() + + atomic.AddInt32(&m.activeCallsCount, math.MaxInt32) + m.actuallyIdle = false + m.resetIdleTimerLocked(m.timeout) } // OnCallEnd is invoked at the end of every RPC. diff --git a/internal/idle/idle_e2e_test.go b/internal/idle/idle_e2e_test.go index 3b606a9dbeda..27486c398742 100644 --- a/internal/idle/idle_e2e_test.go +++ b/internal/idle/idle_e2e_test.go @@ -23,7 +23,6 @@ import ( "fmt" "io" "strings" - "sync" "testing" "time" @@ -33,7 +32,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/balancer/stub" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpctest" @@ -549,73 +547,3 @@ func (s) TestChannelIdleness_Connect(t *testing.T) { // Verify that the ClientConn moves back to READY. testutils.AwaitState(ctx, t, cc, connectivity.Ready) } - -// runFunc runs f repeatedly until the context expires. -func runFunc(ctx context.Context, f func()) { - for { - select { - case <-ctx.Done(): - return - case <-time.After(10 * time.Millisecond): - f() - } - } -} - -// Tests the scenario where there are concurrent calls to exit and enter idle -// mode on the ClientConn. Verifies that there is no race under this scenario. -func (s) TestChannelIdleness_RaceBetweenEnterAndExitIdleMode(t *testing.T) { - // Start a test backend and set the bootstrap state of the resolver to - // include this address. This will ensure that when the resolver is - // restarted when exiting idle, it will push the same address to grpc again. - r := manual.NewBuilderWithScheme("whatever") - backend := stubserver.StartTestService(t, nil) - defer backend.Stop() - r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}}) - - // Create a ClientConn with a long idle_timeout. We will explicitly trigger - // entering and exiting IDLE mode from the test. - dopts := []grpc.DialOption{ - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithResolvers(r), - grpc.WithIdleTimeout(30 * time.Minute), - grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"pick_first":{}}]}`), - } - cc, err := grpc.NewClient(r.Scheme()+":///test.server", dopts...) - if err != nil { - t.Fatalf("grpc.NewClient() failed: %v", err) - } - defer cc.Close() - - enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn)) - enterIdleFunc := func() { enterIdle(cc) } - exitIdle := internal.ExitIdleModeForTesting.(func(*grpc.ClientConn) error) - exitIdleFunc := func() { - if err := exitIdle(cc); err != nil { - t.Errorf("Failed to exit idle mode: %v", err) - } - } - // Spawn goroutines that call methods on the ClientConn to enter and exit - // idle mode concurrently for one second. - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - var wg sync.WaitGroup - wg.Add(4) - go func() { - runFunc(ctx, enterIdleFunc) - wg.Done() - }() - go func() { - runFunc(ctx, enterIdleFunc) - wg.Done() - }() - go func() { - runFunc(ctx, exitIdleFunc) - wg.Done() - }() - go func() { - runFunc(ctx, exitIdleFunc) - wg.Done() - }() - wg.Wait() -} diff --git a/internal/idle/idle_test.go b/internal/idle/idle_test.go index c2645bb95c05..acc95ba6a0bc 100644 --- a/internal/idle/idle_test.go +++ b/internal/idle/idle_test.go @@ -48,10 +48,8 @@ type testEnforcer struct { enterIdleCh chan struct{} } -func (ti *testEnforcer) ExitIdleMode() error { +func (ti *testEnforcer) ExitIdleMode() { ti.exitIdleCh <- struct{}{} - return nil - } func (ti *testEnforcer) EnterIdleMode() { @@ -273,9 +271,7 @@ func (s) TestManager_Enabled_ExitIdleOnRPC(t *testing.T) { for i := 0; i < 100; i++ { // A call to OnCallBegin and OnCallEnd simulates an RPC. go func() { - if err := mgr.OnCallBegin(); err != nil { - t.Errorf("OnCallBegin() failed: %v", err) - } + mgr.OnCallBegin() mgr.OnCallEnd() }() } @@ -316,19 +312,20 @@ type racyEnforcer struct { // ExitIdleMode sets the internal state to stateExitedIdle. We should only ever // exit idle when we are currently in idle. -func (ri *racyEnforcer) ExitIdleMode() error { +func (ri *racyEnforcer) ExitIdleMode() { // Set only on the initial ExitIdleMode if ri.started == false { if !atomic.CompareAndSwapInt32((*int32)(ri.state), int32(stateInitial), int32(stateInitial)) { - return fmt.Errorf("idleness enforcer's first ExitIdleMode after EnterIdleMode") + ri.t.Errorf("idleness enforcer's first ExitIdleMode after EnterIdleMode") + return } ri.started = true - return nil + return } if !atomic.CompareAndSwapInt32((*int32)(ri.state), int32(stateEnteredIdle), int32(stateExitedIdle)) { - return fmt.Errorf("idleness enforcer asked to exit idle when it did not enter idle earlier") + ri.t.Errorf("idleness enforcer asked to exit idle when it did not enter idle earlier") + return } - return nil } // EnterIdleMode attempts to set the internal state to stateEnteredIdle. We should only ever enter idle before RPCs start. @@ -370,9 +367,7 @@ func (s) TestManager_IdleTimeoutRacesWithOnCallBegin(t *testing.T) { // Wait for the configured idle timeout and simulate an RPC to // race with the idle timeout timer callback. <-time.After(defaultTestIdleTimeout / 50) - if err := mgr.OnCallBegin(); err != nil { - t.Errorf("OnCallBegin() failed: %v", err) - } + mgr.OnCallBegin() atomic.StoreInt32((*int32)(&idlenessState), int32(stateActiveRPCs)) mgr.OnCallEnd() }() diff --git a/resolver_balancer_ext_test.go b/resolver_balancer_ext_test.go index 98fdbc6e1044..7b6cf0f91fc8 100644 --- a/resolver_balancer_ext_test.go +++ b/resolver_balancer_ext_test.go @@ -29,13 +29,19 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/balancer/stub" "google.golang.org/grpc/internal/channelz" + "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" + "google.golang.org/grpc/status" + + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" ) // TestResolverBalancerInteraction tests: @@ -127,6 +133,40 @@ func (s) TestResolverBuildFailure(t *testing.T) { } } +// Tests the case where the resolver reports an error to the channel before +// reporting an update. Verifies that the channel eventually moves to +// TransientFailure and subsequent RPCs returns the error reported by the +// resolver to the user. +func (s) TestResolverReportError(t *testing.T) { + const resolverErr = "test resolver error" + r := manual.NewBuilderWithScheme("whatever") + r.BuildCallback = func(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) { + cc.ReportError(errors.New(resolverErr)) + } + + cc, err := grpc.NewClient(r.Scheme()+":///", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("Error creating client: %v", err) + } + defer cc.Close() + cc.Connect() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure) + + client := testgrpc.NewTestServiceClient(cc) + for range 5 { + _, err = client.EmptyCall(ctx, &testpb.Empty{}) + if code := status.Code(err); code != codes.Unavailable { + t.Fatalf("EmptyCall() = %v, want %v", err, codes.Unavailable) + } + if err == nil || !strings.Contains(err.Error(), resolverErr) { + t.Fatalf("EmptyCall() = %q, want %q", err, resolverErr) + } + } +} + // TestEnterIdleDuringResolverUpdateState tests a scenario that used to deadlock // while calling UpdateState at the same time as the resolver being closed while // the channel enters idle mode. diff --git a/resolver_wrapper.go b/resolver_wrapper.go index 80e16a327cd3..6e613764372b 100644 --- a/resolver_wrapper.go +++ b/resolver_wrapper.go @@ -69,6 +69,7 @@ func (ccr *ccResolverWrapper) start() error { errCh := make(chan error) ccr.serializer.TrySchedule(func(ctx context.Context) { if ctx.Err() != nil { + errCh <- ctx.Err() return } opts := resolver.BuildOptions{ diff --git a/stream.go b/stream.go index ca87ff9776ef..995c2821bc0c 100644 --- a/stream.go +++ b/stream.go @@ -179,13 +179,41 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth var emptyMethodConfig = serviceconfig.MethodConfig{} +// endOfClientStream performs cleanup actions required for both successful and +// failed streams. This includes incrementing channelz stats and invoking all +// registered OnFinish call options. +func endOfClientStream(cc *ClientConn, err error, opts ...CallOption) { + if channelz.IsOn() { + if err != nil { + cc.incrCallsFailed() + } else { + cc.incrCallsSucceeded() + } + } + + for _, o := range opts { + if o, ok := o.(OnFinishCallOption); ok { + o.OnFinish(err) + } + } +} + func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) { + if channelz.IsOn() { + cc.incrCallsStarted() + } + defer func() { + if err != nil { + // Ensure cleanup when stream creation fails. + endOfClientStream(cc, err, opts...) + } + }() + // Start tracking the RPC for idleness purposes. This is where a stream is // created for both streaming and unary RPCs, and hence is a good place to // track active RPC count. - if err := cc.idlenessMgr.OnCallBegin(); err != nil { - return nil, err - } + cc.idlenessMgr.OnCallBegin() + // Add a calloption, to decrement the active call count, that gets executed // when the RPC completes. opts = append([]CallOption{OnFinish(func(error) { cc.idlenessMgr.OnCallEnd() })}, opts...) @@ -204,14 +232,6 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth } } } - if channelz.IsOn() { - cc.incrCallsStarted() - defer func() { - if err != nil { - cc.incrCallsFailed() - } - }() - } // Provide an opportunity for the first RPC to see the first service config // provided by the resolver. nameResolutionDelayed, err := cc.waitForResolvedAddrs(ctx) @@ -1042,9 +1062,6 @@ func (cs *clientStream) finish(err error) { return } cs.finished = true - for _, onFinish := range cs.callInfo.onFinish { - onFinish(err) - } cs.commitAttemptLocked() if cs.attempt != nil { cs.attempt.finish(err) @@ -1084,13 +1101,7 @@ func (cs *clientStream) finish(err error) { if err == nil { cs.retryThrottler.successfulRPC() } - if channelz.IsOn() { - if err != nil { - cs.cc.incrCallsFailed() - } else { - cs.cc.incrCallsSucceeded() - } - } + endOfClientStream(cs.cc, err, cs.opts...) cs.cancel() } diff --git a/test/clientconn_state_transition_test.go b/test/clientconn_state_transition_test.go index de4e5b3d6894..3d365bd8b378 100644 --- a/test/clientconn_state_transition_test.go +++ b/test/clientconn_state_transition_test.go @@ -23,6 +23,7 @@ import ( "fmt" "io" "net" + "strings" "sync" "testing" "time" @@ -31,6 +32,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/backoff" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" @@ -40,6 +42,7 @@ import ( "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" + "google.golang.org/grpc/status" testgrpc "google.golang.org/grpc/interop/grpc_testing" testpb "google.golang.org/grpc/interop/grpc_testing" @@ -583,8 +586,6 @@ func (s) TestConnectivityStateSubscriber(t *testing.T) { // Test verifies that a channel starts off in IDLE and transitions to CONNECTING // when Connect() is called, and stays there when there are no resolver updates. func (s) TestStateTransitions_WithConnect_NoResolverUpdate(t *testing.T) { - t.Skip("The channel remains in IDLE until the LB policy updates the state to CONNECTING. This is a bug and the channel should transition to CONNECTING as soon as Connect() is called. See issue #7686.") - backend := stubserver.StartTestService(t, nil) defer backend.Stop() @@ -618,8 +619,6 @@ func (s) TestStateTransitions_WithConnect_NoResolverUpdate(t *testing.T) { // Test verifies that a channel starts off in IDLE and transitions to CONNECTING // when Connect() is called, and stays there when there are no resolver updates. func (s) TestStateTransitions_WithRPC_NoResolverUpdate(t *testing.T) { - t.Skip("The channel remains in IDLE until the LB policy updates the state to CONNECTING. This is a bug and the channel should transition to CONNECTING as soon as an RPC call is made. See issue #7686.") - backend := stubserver.StartTestService(t, nil) defer backend.Stop() @@ -641,8 +640,7 @@ func (s) TestStateTransitions_WithRPC_NoResolverUpdate(t *testing.T) { // Make an RPC call to transition the channel to CONNECTING. go func() { - _, err := testgrpc.NewTestServiceClient(cc).EmptyCall(ctx, &testpb.Empty{}) - if err == nil { + if _, err := testgrpc.NewTestServiceClient(cc).EmptyCall(ctx, &testpb.Empty{}); err == nil { t.Errorf("Expected RPC to fail, but it succeeded") } }() @@ -656,3 +654,179 @@ func (s) TestStateTransitions_WithRPC_NoResolverUpdate(t *testing.T) { defer shortCancel() testutils.AwaitNoStateChange(shortCtx, t, cc, connectivity.Connecting) } + +const testResolverBuildFailureScheme = "test-resolver-build-failure" + +// testResolverBuilder is a resolver builder that fails the first time its +// Build method is called, and succeeds thereafter. +type testResolverBuilder struct { + logger interface { + Logf(format string, args ...any) + } + buildCalled bool + manualR *manual.Resolver +} + +func (b *testResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { + b.logger.Logf("testResolverBuilder: Build called with target: %v", target) + if !b.buildCalled { + b.buildCalled = true + b.logger.Logf("testResolverBuilder: returning build failure") + return nil, fmt.Errorf("simulated resolver build failure") + } + return b.manualR.Build(target, cc, opts) +} + +func (b *testResolverBuilder) Scheme() string { + return testResolverBuildFailureScheme +} + +// Tests for state transitions when the resolver initially fails to build. +func (s) TestStateTransitions_ResolverBuildFailure(t *testing.T) { + tests := []struct { + name string + exitIdleWithRPC bool + }{ + { + name: "exitIdleByConnecting", + exitIdleWithRPC: false, + }, + { + name: "exitIdleByRPC", + exitIdleWithRPC: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mr := manual.NewBuilderWithScheme("whatever" + tt.name) + backend := stubserver.StartTestService(t, nil) + defer backend.Stop() + mr.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}}) + + dopts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithResolvers(&testResolverBuilder{logger: t, manualR: mr}), + } + + cc, err := grpc.NewClient(testResolverBuildFailureScheme+":///", dopts...) + if err != nil { + t.Fatalf("Failed to create new client: %v", err) + } + defer cc.Close() + + // Ensure that the client is in IDLE before connecting. + if state := cc.GetState(); state != connectivity.Idle { + t.Fatalf("Expected initial state to be IDLE, got %v", state) + } + + // Subscribe to state updates. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stateCh := make(chan connectivity.State, 1) + s := &funcConnectivityStateSubscriber{ + onMsg: func(s connectivity.State) { + select { + case stateCh <- s: + case <-ctx.Done(): + } + }, + } + internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, s) + + if tt.exitIdleWithRPC { + // The first attempt to kick the channel is expected to return + // the resolver build error to the RPC. + const wantErr = "simulated resolver build failure" + for range 2 { + _, err := testgrpc.NewTestServiceClient(cc).EmptyCall(ctx, &testpb.Empty{}) + if code := status.Code(err); code != codes.Unavailable { + t.Fatalf("EmptyCall RPC failed with code %v, want %v", err, codes.Unavailable) + } + if err == nil || !strings.Contains(err.Error(), wantErr) { + t.Fatalf("EmptyCall RPC failed with error: %q, want %q", err, wantErr) + } + } + } else { + cc.Connect() + } + + wantStates := []connectivity.State{ + connectivity.Connecting, // When channel exits IDLE for the first time. + connectivity.TransientFailure, // Resolver build failure. + connectivity.Idle, // After idle timeout. + connectivity.Connecting, // When channel exits IDLE again. + connectivity.Ready, // Successful resolver build and connection to backend. + } + for _, wantState := range wantStates { + waitForState(ctx, t, stateCh, wantState) + switch wantState { + case connectivity.TransientFailure: + internal.EnterIdleModeForTesting.(func(*grpc.ClientConn))(cc) + case connectivity.Idle: + if tt.exitIdleWithRPC { + if _, err := testgrpc.NewTestServiceClient(cc).EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("EmptyCall RPC failed: %v", err) + } + } else { + cc.Connect() + } + } + } + }) + } +} + +// Tests for state transitions when the resolver reports no addresses. +func (s) TestStateTransitions_WithRPC_ResolverUpdateContainsNoAddresses(t *testing.T) { + mr := manual.NewBuilderWithScheme("e2e-test") + mr.InitialState(resolver.State{}) + defer mr.Close() + + cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("Failed to create new client: %v", err) + } + defer cc.Close() + + if state := cc.GetState(); state != connectivity.Idle { + t.Fatalf("Expected initial state to be IDLE, got %v", state) + } + + // Subscribe to state updates. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stateCh := make(chan connectivity.State, 1) + s := &funcConnectivityStateSubscriber{ + onMsg: func(s connectivity.State) { + select { + case stateCh <- s: + case <-ctx.Done(): + } + }, + } + internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, s) + + // Make an RPC call to transition the channel to CONNECTING. + const wantErr = "name resolver error: produced zero addresses" + for range 2 { + _, err := testgrpc.NewTestServiceClient(cc).EmptyCall(ctx, &testpb.Empty{}) + if code := status.Code(err); code != codes.Unavailable { + t.Errorf("EmptyCall RPC failed with code %v, want %v", err, codes.Unavailable) + } + if err == nil || !strings.Contains(err.Error(), wantErr) { + t.Errorf("EmptyCall RPC failed with error: %q, want %q", err, wantErr) + } + } + + wantStates := []connectivity.State{ + connectivity.Connecting, // When channel exits IDLE for the first time. + connectivity.TransientFailure, // No endpoints from the resolver + connectivity.Idle, // After idle timeout. + } + for _, wantState := range wantStates { + waitForState(ctx, t, stateCh, wantState) + if wantState == connectivity.TransientFailure { + internal.EnterIdleModeForTesting.(func(*grpc.ClientConn))(cc) + } + } +}