Skip to content

Commit eb55fa5

Browse files
authored
resolverWrapper: remove the watcher goroutine (#2446)
1 parent a612bb6 commit eb55fa5

File tree

2 files changed

+20
-59
lines changed

2 files changed

+20
-59
lines changed

Diff for: clientconn.go

+9-14
Original file line numberDiff line numberDiff line change
@@ -288,19 +288,14 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
288288
}
289289

290290
// Build the resolver.
291-
cc.resolverWrapper, err = newCCResolverWrapper(cc)
291+
rWrapper, err := newCCResolverWrapper(cc)
292292
if err != nil {
293293
return nil, fmt.Errorf("failed to build resolver: %v", err)
294294
}
295-
// Start the resolver wrapper goroutine after resolverWrapper is created.
296-
//
297-
// If the goroutine is started before resolverWrapper is ready, the
298-
// following may happen: The goroutine sends updates to cc. cc forwards
299-
// those to balancer. Balancer creates new addrConn. addrConn fails to
300-
// connect, and calls resolveNow(). resolveNow() tries to use the non-ready
301-
// resolverWrapper.
302-
cc.resolverWrapper.start()
303295

296+
cc.mu.Lock()
297+
cc.resolverWrapper = rWrapper
298+
cc.mu.Unlock()
304299
// A blocking dial blocks until the clientConn is ready.
305300
if cc.dopts.block {
306301
for {
@@ -389,13 +384,13 @@ type ClientConn struct {
389384
csMgr *connectivityStateManager
390385

391386
balancerBuildOpts balancer.BuildOptions
392-
resolverWrapper *ccResolverWrapper
393387
blockingpicker *pickerWrapper
394388

395-
mu sync.RWMutex
396-
sc ServiceConfig
397-
scRaw string
398-
conns map[*addrConn]struct{}
389+
mu sync.RWMutex
390+
resolverWrapper *ccResolverWrapper
391+
sc ServiceConfig
392+
scRaw string
393+
conns map[*addrConn]struct{}
399394
// Keepalive parameter can be updated if a GoAway is received.
400395
mkp keepalive.ClientParameters
401396
curBalancerName string

Diff for: resolver_conn_wrapper.go

+11-45
Original file line numberDiff line numberDiff line change
@@ -93,47 +93,6 @@ func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) {
9393
return ccr, nil
9494
}
9595

96-
func (ccr *ccResolverWrapper) start() {
97-
go ccr.watcher()
98-
}
99-
100-
// watcher processes address updates and service config updates sequentially.
101-
// Otherwise, we need to resolve possible races between address and service
102-
// config (e.g. they specify different balancer types).
103-
func (ccr *ccResolverWrapper) watcher() {
104-
for {
105-
select {
106-
case <-ccr.done:
107-
return
108-
default:
109-
}
110-
111-
select {
112-
case addrs := <-ccr.addrCh:
113-
select {
114-
case <-ccr.done:
115-
return
116-
default:
117-
}
118-
grpclog.Infof("ccResolverWrapper: sending new addresses to cc: %v", addrs)
119-
if channelz.IsOn() {
120-
ccr.addChannelzTraceEvent(addrs)
121-
}
122-
ccr.cc.handleResolvedAddrs(addrs, nil)
123-
case sc := <-ccr.scCh:
124-
select {
125-
case <-ccr.done:
126-
return
127-
default:
128-
}
129-
grpclog.Infof("ccResolverWrapper: got new service config: %v", sc)
130-
ccr.cc.handleServiceConfig(sc)
131-
case <-ccr.done:
132-
return
133-
}
134-
}
135-
}
136-
13796
func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOption) {
13897
ccr.resolver.ResolveNow(o)
13998
}
@@ -146,20 +105,27 @@ func (ccr *ccResolverWrapper) close() {
146105
// NewAddress is called by the resolver implemenetion to send addresses to gRPC.
147106
func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
148107
select {
149-
case <-ccr.addrCh:
108+
case <-ccr.done:
109+
return
150110
default:
151111
}
152-
ccr.addrCh <- addrs
112+
grpclog.Infof("ccResolverWrapper: sending new addresses to cc: %v", addrs)
113+
if channelz.IsOn() {
114+
ccr.addChannelzTraceEvent(addrs)
115+
}
116+
ccr.cc.handleResolvedAddrs(addrs, nil)
153117
}
154118

155119
// NewServiceConfig is called by the resolver implemenetion to send service
156120
// configs to gRPC.
157121
func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
158122
select {
159-
case <-ccr.scCh:
123+
case <-ccr.done:
124+
return
160125
default:
161126
}
162-
ccr.scCh <- sc
127+
grpclog.Infof("ccResolverWrapper: got new service config: %v", sc)
128+
ccr.cc.handleServiceConfig(sc)
163129
}
164130

165131
func (ccr *ccResolverWrapper) addChannelzTraceEvent(addrs []resolver.Address) {

0 commit comments

Comments
 (0)