Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: rework resolver and balancer wrappers to avoid deadlock #6804

Merged
merged 21 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
279 changes: 100 additions & 179 deletions balancer_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,13 @@
"google.golang.org/grpc/resolver"
)

type ccbMode int

const (
ccbModeActive = iota
ccbModeIdle
ccbModeClosed
ccbModeExitingIdle
)

// ccBalancerWrapper sits between the ClientConn and the Balancer.
//
// ccBalancerWrapper implements methods corresponding to the ones on the
// balancer.Balancer interface. The ClientConn is free to call these methods
// concurrently and the ccBalancerWrapper ensures that calls from the ClientConn
// to the Balancer happen synchronously and in order.
// to the Balancer happen in order by performing them in the serializer, without
// any mutexes held.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do hold ccb.mu in close(), and require that the caller hold cc.mu when calling into close(). Would it be better to have that spelled out here?

Copy link
Member Author

@dfawley dfawley Dec 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is about how the calls into the plugin, themselves, are made without cc.mu held. Not that calls into ccb must not hold cc.mu.

Probably we should just document the functions. (Done, and it looks like the only one that matters either way is close, which needs cc.mu.)

//
// ccBalancerWrapper also implements the balancer.ClientConn interface and is
// passed to the Balancer implementations. It invokes unexported methods on the
Expand All @@ -57,84 +49,87 @@
type ccBalancerWrapper struct {
// The following fields are initialized when the wrapper is created and are
// read-only afterwards, and therefore can be accessed without a mutex.
cc *ClientConn
opts balancer.BuildOptions

// Outgoing (gRPC --> balancer) calls are guaranteed to execute in a
// mutually exclusive manner as they are scheduled in the serializer. Fields
// accessed *only* in these serializer callbacks, can therefore be accessed
// without a mutex.
balancer *gracefulswitch.Balancer
curBalancerName string

// mu guards access to the below fields. Access to the serializer and its
// cancel function needs to be mutex protected because they are overwritten
// when the wrapper exits idle mode.
mu sync.Mutex
serializer *grpcsync.CallbackSerializer // To serialize all outoing calls.
serializerCancel context.CancelFunc // To close the seralizer at close/enterIdle time.
mode ccbMode // Tracks the current mode of the wrapper.
cc *ClientConn
opts balancer.BuildOptions
balancer *gracefulswitch.Balancer
zasweq marked this conversation as resolved.
Show resolved Hide resolved
serializer *grpcsync.CallbackSerializer
zasweq marked this conversation as resolved.
Show resolved Hide resolved
serializerCancel context.CancelFunc

curBalancerName string // only accessed within the serializer

// The following field is protected by mu. Caller must take cc.mu before
// taking mu.
mu sync.Mutex
closed bool
easwars marked this conversation as resolved.
Show resolved Hide resolved
}

// newCCBalancerWrapper creates a new balancer wrapper in idle state. The
// underlying balancer is not created until the switchTo() method is invoked.
func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper {
func newCCBalancerWrapper(cc *ClientConn) *ccBalancerWrapper {
ctx, cancel := context.WithCancel(cc.ctx)
ccb := &ccBalancerWrapper{
cc: cc,
opts: bopts,
mode: ccbModeIdle,
cc: cc,
opts: balancer.BuildOptions{
DialCreds: cc.dopts.copts.TransportCredentials,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
Authority: cc.authority,
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
},
serializer: grpcsync.NewCallbackSerializer(ctx),
serializerCancel: cancel,
}
ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts)
zasweq marked this conversation as resolved.
Show resolved Hide resolved
return ccb
}

// updateClientConnState is invoked by grpc to push a ClientConnState update to
// the underlying balancer.
// the underlying balancer. This is always executed from the serializer, so
// it is safe to call into the balancer here.
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
ccb.mu.Lock()
errCh := make(chan error, 1)
// Here and everywhere else where Schedule() is called, it is done with the
// lock held. But the lock guards only the scheduling part. The actual
// callback is called asynchronously without the lock being held.
ok := ccb.serializer.Schedule(func(_ context.Context) {
errCh <- ccb.balancer.UpdateClientConnState(*ccs)
errCh := make(chan error)
ok := ccb.serializer.Schedule(func(ctx context.Context) {
defer close(errCh)
if ctx.Err() != nil || ccb.balancer == nil {
return
}
err := ccb.balancer.UpdateClientConnState(*ccs)
if logger.V(2) && err != nil {
logger.Infof("error from balancer.UpdateClientConnState: %v", err)
}

Check warning on line 101 in balancer_wrapper.go

View check run for this annotation

Codecov / codecov/patch

balancer_wrapper.go#L100-L101

Added lines #L100 - L101 were not covered by tests
errCh <- err
})
if !ok {
// If we are unable to schedule a function with the serializer, it
// indicates that it has been closed. A serializer is only closed when
// the wrapper is closed or is in idle.
ccb.mu.Unlock()
return fmt.Errorf("grpc: cannot send state update to a closed or idle balancer")
}
ccb.mu.Unlock()

// We get here only if the above call to Schedule succeeds, in which case it
// is guaranteed that the scheduled function will run. Therefore it is safe
// to block on this channel.
err := <-errCh
if logger.V(2) && err != nil {
logger.Infof("error from balancer.UpdateClientConnState: %v", err)
return nil

Check warning on line 105 in balancer_wrapper.go

View check run for this annotation

Codecov / codecov/patch

balancer_wrapper.go#L105

Added line #L105 was not covered by tests
}
return err
return <-errCh
zasweq marked this conversation as resolved.
Show resolved Hide resolved
}

// updateSubConnState is invoked by grpc to push a subConn state update to the
// underlying balancer.
func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) {
ccb.mu.Lock()
ccb.serializer.Schedule(func(_ context.Context) {
ccb.serializer.Schedule(func(ctx context.Context) {
if ctx.Err() != nil || ccb.balancer == nil {
return
}
// Even though it is optional for balancers, gracefulswitch ensures
// opts.StateListener is set, so this cannot ever be nil.
// TODO: delete this comment when UpdateSubConnState is removed.
sc.(*acBalancerWrapper).stateListener(balancer.SubConnState{ConnectivityState: s, ConnectionError: err})
})
ccb.mu.Unlock()
}

// resolverError is invoked by grpc to push a resolver error to the underlying
// balancer. The call to the balancer is executed from the serializer.
func (ccb *ccBalancerWrapper) resolverError(err error) {
ccb.mu.Lock()
ccb.serializer.Schedule(func(_ context.Context) {
ccb.serializer.Schedule(func(ctx context.Context) {
if ctx.Err() != nil || ccb.balancer == nil {
return
}

Check warning on line 130 in balancer_wrapper.go

View check run for this annotation

Codecov / codecov/patch

balancer_wrapper.go#L129-L130

Added lines #L129 - L130 were not covered by tests
ccb.balancer.ResolverError(err)
})
ccb.mu.Unlock()
}

// switchTo is invoked by grpc to instruct the balancer wrapper to switch to the
Expand All @@ -148,16 +143,17 @@
// the ccBalancerWrapper keeps track of the current LB policy name, and skips
// the graceful balancer switching process if the name does not change.
func (ccb *ccBalancerWrapper) switchTo(name string) {
ccb.mu.Lock()
ccb.serializer.Schedule(func(_ context.Context) {
ccb.serializer.Schedule(func(ctx context.Context) {
if ctx.Err() != nil || ccb.balancer == nil {
return
}

Check warning on line 149 in balancer_wrapper.go

View check run for this annotation

Codecov / codecov/patch

balancer_wrapper.go#L148-L149

Added lines #L148 - L149 were not covered by tests
// TODO: Other languages use case-sensitive balancer registries. We should
// switch as well. See: https://github.com/grpc/grpc-go/issues/5288.
if strings.EqualFold(ccb.curBalancerName, name) {
return
}
ccb.buildLoadBalancingPolicy(name)
})
ccb.mu.Unlock()
}

// buildLoadBalancingPolicy performs the following:
Expand Down Expand Up @@ -186,113 +182,41 @@

func (ccb *ccBalancerWrapper) close() {
channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: closing")
ccb.closeBalancer(ccbModeClosed)
}

// enterIdleMode is invoked by grpc when the channel enters idle mode upon
// expiry of idle_timeout. This call blocks until the balancer is closed.
func (ccb *ccBalancerWrapper) enterIdleMode() {
channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: entering idle mode")
ccb.closeBalancer(ccbModeIdle)
}

// closeBalancer is invoked when the channel is being closed or when it enters
// idle mode upon expiry of idle_timeout.
func (ccb *ccBalancerWrapper) closeBalancer(m ccbMode) {
ccb.mu.Lock()
if ccb.mode == ccbModeClosed || ccb.mode == ccbModeIdle {
ccb.mu.Unlock()
return
}

ccb.mode = m
done := ccb.serializer.Done()
b := ccb.balancer
ok := ccb.serializer.Schedule(func(_ context.Context) {
// Close the serializer to ensure that no more calls from gRPC are sent
// to the balancer.
ccb.serializerCancel()
// Empty the current balancer name because we don't have a balancer
// anymore and also so that we act on the next call to switchTo by
// creating a new balancer specified by the new resolver.
ccb.curBalancerName = ""
})
if !ok {
ccb.mu.Unlock()
return
}
ccb.mu.Unlock()

// Give enqueued callbacks a chance to finish before closing the balancer.
<-done
b.Close()
}

// exitIdleMode is invoked by grpc when the channel exits idle mode either
// because of an RPC or because of an invocation of the Connect() API. This
// recreates the balancer that was closed previously when entering idle mode.
//
// If the channel is not in idle mode, we know for a fact that we are here as a
// result of the user calling the Connect() method on the ClientConn. In this
// case, we can simply forward the call to the underlying balancer, instructing
// it to reconnect to the backends.
func (ccb *ccBalancerWrapper) exitIdleMode() {
ccb.mu.Lock()
if ccb.mode == ccbModeClosed {
// Request to exit idle is a no-op when wrapper is already closed.
ccb.mu.Unlock()
return
}

if ccb.mode == ccbModeIdle {
// Recreate the serializer which was closed when we entered idle.
ctx, cancel := context.WithCancel(context.Background())
ccb.serializer = grpcsync.NewCallbackSerializer(ctx)
ccb.serializerCancel = cancel
}

// The ClientConn guarantees that mutual exclusion between close() and
// exitIdleMode(), and since we just created a new serializer, we can be
// sure that the below function will be scheduled.
done := make(chan struct{})
ccb.serializer.Schedule(func(context.Context) {
defer close(done)

ccb.mu.Lock()
defer ccb.mu.Unlock()

if ccb.mode != ccbModeIdle {
ccb.balancer.ExitIdle()
if ccb.balancer == nil {
return
}

// Gracefulswitch balancer does not support a switchTo operation after
// being closed. Hence we need to create a new one here.
ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts)
ccb.mode = ccbModeActive
channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: exiting idle mode")

ccb.balancer.Close()
ccb.balancer = nil
})
ccb.mu.Unlock()

<-done
ccb.serializerCancel()
easwars marked this conversation as resolved.
Show resolved Hide resolved
}

func (ccb *ccBalancerWrapper) isIdleOrClosed() bool {
ccb.mu.Lock()
defer ccb.mu.Unlock()
return ccb.mode == ccbModeIdle || ccb.mode == ccbModeClosed
// exitIdle invokes the balancer's exitIdle method in the serializer.
func (ccb *ccBalancerWrapper) exitIdle() {
ccb.serializer.Schedule(func(ctx context.Context) {
if ctx.Err() != nil || ccb.balancer == nil {
return
}

Check warning on line 200 in balancer_wrapper.go

View check run for this annotation

Codecov / codecov/patch

balancer_wrapper.go#L199-L200

Added lines #L199 - L200 were not covered by tests
ccb.balancer.ExitIdle()
})
}

func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
if ccb.isIdleOrClosed() {
return nil, fmt.Errorf("grpc: cannot create SubConn when balancer is closed or idle")
ccb.cc.mu.Lock()
defer ccb.cc.mu.Unlock()
zasweq marked this conversation as resolved.
Show resolved Hide resolved

ccb.mu.Lock()
if ccb.closed {
ccb.mu.Unlock()
return nil, errConnIdling

Check warning on line 212 in balancer_wrapper.go

View check run for this annotation

Codecov / codecov/patch

balancer_wrapper.go#L211-L212

Added lines #L211 - L212 were not covered by tests
easwars marked this conversation as resolved.
Show resolved Hide resolved
}
ccb.mu.Unlock()

if len(addrs) == 0 {
return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
}
ac, err := ccb.cc.newAddrConn(addrs, opts)
ac, err := ccb.cc.newAddrConnLocked(addrs, opts)
if err != nil {
channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err)
return nil, err
Expand All @@ -313,10 +237,6 @@
}

func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
if ccb.isIdleOrClosed() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we not want to check if the balancer is closed here? UpdateAddresses could lead to transport creation, right.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's more relevant whether the SubConn/addrConn was closed and not whether the balancer was closed. And we do that check here:

if ac.state == connectivity.Shutdown ||

return
}

acbw, ok := sc.(*acBalancerWrapper)
if !ok {
return
Expand All @@ -325,25 +245,39 @@
}

func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
if ccb.isIdleOrClosed() {
ccb.cc.mu.Lock()
defer ccb.cc.mu.Unlock()

ccb.mu.Lock()
if ccb.closed {
ccb.mu.Unlock()

Check warning on line 253 in balancer_wrapper.go

View check run for this annotation

Codecov / codecov/patch

balancer_wrapper.go#L253

Added line #L253 was not covered by tests
return
}

ccb.mu.Unlock()
// Update picker before updating state. Even though the ordering here does
// not matter, it can lead to multiple calls of Pick in the common start-up
// case where we wait for ready and then perform an RPC. If the picker is
// updated later, we could call the "connecting" picker when the state is
// updated, and then call the "ready" picker after the picker gets updated.

// Note that there is no need to check if the balancer wrapper was closed,
// as we know the graceful switch LB policy will not call cc if it has been
dfawley marked this conversation as resolved.
Show resolved Hide resolved
// closed.
ccb.cc.pickerWrapper.updatePicker(s.Picker)
ccb.cc.csMgr.updateState(s.ConnectivityState)
}

func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) {
if ccb.isIdleOrClosed() {
ccb.cc.mu.RLock()
defer ccb.cc.mu.RUnlock()

ccb.mu.Lock()
if ccb.closed {
ccb.mu.Unlock()

Check warning on line 276 in balancer_wrapper.go

View check run for this annotation

Codecov / codecov/patch

balancer_wrapper.go#L276

Added line #L276 was not covered by tests
return
}

ccb.cc.resolveNow(o)
ccb.mu.Unlock()
ccb.cc.resolveNowLocked(o)
}

func (ccb *ccBalancerWrapper) Target() string {
Expand Down Expand Up @@ -374,20 +308,7 @@
}

func (acbw *acBalancerWrapper) Shutdown() {
ccb := acbw.ccb
if ccb.isIdleOrClosed() {
// It it safe to ignore this call when the balancer is closed or in idle
// because the ClientConn takes care of closing the connections.
//
// Not returning early from here when the balancer is closed or in idle
// leads to a deadlock though, because of the following sequence of
// calls when holding cc.mu:
// cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close -->
// ccb.RemoveAddrConn --> cc.removeAddrConn
return
}

ccb.cc.removeAddrConn(acbw.ac, errConnDrain)
acbw.ccb.cc.removeAddrConn(acbw.ac, errConnDrain)
}

// NewStream begins a streaming RPC on the addrConn. If the addrConn is not
Expand Down
Loading
Loading