Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

orca: create ORCA producer for LB policies to use to receive OOB load reports #5669

Merged
merged 9 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,11 +381,11 @@ var ErrBadResolverState = errors.New("bad resolver state")
// SubConn to create producers when needed.
type ProducerBuilder interface {
// Build creates a Producer. The first parameter is always a
// grpc.InvokerStreamer (a type to allow creating RPCs/streams on the
// grpc.ClientConnInterface (a type to allow creating RPCs/streams on the
// associated SubConn), but is declared as interface{} to avoid a
// dependency cycle. Should also return a close function that will be
// called when all references to the Producer have been given up.
Build(grpcInvokerStreamer interface{}) (p Producer, close func())
Build(grpcClientConnInterface interface{}) (p Producer, close func())
}

// A Producer is a type shared among potentially many consumers. It is
Expand Down
16 changes: 7 additions & 9 deletions balancer_conn_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err)
return nil, err
}
acbw := &acBalancerWrapper{ac: ac, producers: make(map[balancer.ProducerBuilder]*producerData)}
acbw := &acBalancerWrapper{ac: ac, producers: make(map[balancer.ProducerBuilder]*refCountedProducer)}
acbw.ac.mu.Lock()
ac.acbw = acbw
acbw.ac.mu.Unlock()
Expand Down Expand Up @@ -364,7 +364,7 @@ func (ccb *ccBalancerWrapper) Target() string {
type acBalancerWrapper struct {
mu sync.Mutex
ac *addrConn
producers map[balancer.ProducerBuilder]*producerData
producers map[balancer.ProducerBuilder]*refCountedProducer
}

func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
Expand Down Expand Up @@ -444,12 +444,10 @@ func (acbw *acBalancerWrapper) Invoke(ctx context.Context, method string, args i
return cs.RecvMsg(reply)
}

// producerData stores a producer, a ref counting mechanism, and a close
// function to be called when the producer no longer has any references.
type producerData struct {
type refCountedProducer struct {
producer balancer.Producer
refs int
close func()
refs int // number of current refs to the producer
close func() // underlying producer's close function
}

func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) (balancer.Producer, func()) {
Expand All @@ -461,14 +459,14 @@ func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) (
if pData == nil {
// Not found; create a new one and add it to the producers map.
p, close := pb.Build(acbw)
pData = &producerData{producer: p, close: close}
pData = &refCountedProducer{producer: p, close: close}
acbw.producers[pb] = pData
}
// Account for this new reference.
pData.refs++

// Return a cleanup function wrapped in a sync.Once to remove this
// reference and delete the producerData from the map if the total
// reference and delete the refCountedProducer from the map if the total
// reference count goes to zero.
unref := func() {
acbw.mu.Lock()
Expand Down
27 changes: 15 additions & 12 deletions orca/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,17 @@ type OOBListener interface {

// OOBListenerOptions contains options to control how an OOBListener is called.
type OOBListenerOptions struct {
// How often to request the server to provide a load report. May be
// provided less frequently if the server requires a longer interval, or
// may be provided more frequently if another subscriber requests a shorter
// interval.
// ReportInterval specifies how often to request the server to provide a
// load report. May be provided less frequently if the server requires a
// longer interval, or may be provided more frequently if another
// subscriber requests a shorter interval.
ReportInterval time.Duration
}

// RegisterOOBListener registers an out-of-band load report listener on sc.
// Any OOBListener may only be registered once per subchannel at a time. The
easwars marked this conversation as resolved.
Show resolved Hide resolved
// returned stop function must be called when no longer needed.
// returned stop function must be called when no longer needed. Do not
// register a single OOBListener more than once per SubConn.
func RegisterOOBListener(sc balancer.SubConn, l OOBListener, opts OOBListenerOptions) (stop func()) {
pr, close := sc.GetOrBuildProducer(producerBuilderSingleton)
p := pr.(*producer)
Expand All @@ -95,9 +96,11 @@ func RegisterOOBListener(sc balancer.SubConn, l OOBListener, opts OOBListenerOpt
type producer struct {
client v3orcaservicegrpc.OpenRcaServiceClient

ctx context.Context
cancel func()
closed *grpcsync.Event // fired when closure completes
closed *grpcsync.Event // fired when closure completes
// backoff is called between stream attempts to determine how long to delay
// to avoid overloading a server experiencing problems. The attempt count
// is incremented when stream errors occur and is reset when the stream
// reports a result.
Comment on lines +98 to +100
Copy link
Contributor

@zasweq zasweq Oct 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the last part of this comment is correct/like this comment. First of all, reset has two semantical meanings for this field, one for the timer, one for the attempt count, which gets plumbed into Exponential Backoff to determine what the backoff timer gets reset to I'm assuming your reset refers to attempt count. Also, there is no linkage or even local variable named request count. I see backoffAttempt, which isn't a field here, but the int in this function variable. The attempt count is not reset when the stream reports a result. Right now, the attempt count is reset on either a. the minInterval has changed after processing the receive from the stream compared to what was the stream was configured with. b. If the second+ stream.Recv() returns an error, this also feels wrong, I will leave a comment there.

backoff func(int) time.Duration
zasweq marked this conversation as resolved.
Show resolved Hide resolved

mu sync.Mutex
Expand Down Expand Up @@ -130,9 +133,9 @@ func (p *producer) unregisterListener(l OOBListener, interval time.Duration) {
func (p *producer) minInterval() time.Duration {
p.mu.Lock()
defer p.mu.Unlock()
min := time.Duration(-1)
for t := range p.intervals {
if min == time.Duration(-1) || t < min {
var min time.Duration
for i, t := range p.intervals {
if t < min || i == 0 {
min = t
}
}
Expand Down Expand Up @@ -197,7 +200,7 @@ func (p *producer) runStream(ctx context.Context) (resetBackoff bool, err error)
ReportInterval: durationpb.New(interval),
})
if err != nil {
return resetBackoff, err
return false, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You liked this rather than using default value of bools more?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was changed due to an earlier comment from @easwars. I said in my reply there I didn't feel strongly either way. I still don't.

}

for {
Expand Down