Skip to content

Commit

Permalink
Use errgroup to wait for subscribers in OpenInbound (#600)
Browse files Browse the repository at this point in the history
Signed-off-by: Ian Milligan <[email protected]>
  • Loading branch information
ian-mi authored Oct 14, 2020
1 parent 58f826d commit 7bec7ba
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 46 deletions.
1 change: 1 addition & 0 deletions protocol/pubsub/v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
cloud.google.com/go/pubsub v1.3.1
github.com/cloudevents/sdk-go/v2 v2.0.0
github.com/google/go-cmp v0.4.1
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
google.golang.org/api v0.24.0
google.golang.org/grpc v1.29.1
)
53 changes: 7 additions & 46 deletions protocol/pubsub/v2/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@ package pubsub

import (
"context"
"errors"
"fmt"
"io"
"strings"
"sync"

"cloud.google.com/go/pubsub"
"github.com/cloudevents/sdk-go/protocol/pubsub/v2/internal"
"github.com/cloudevents/sdk-go/v2/binding"
cecontext "github.com/cloudevents/sdk-go/v2/context"
"github.com/cloudevents/sdk-go/v2/protocol"
"golang.org/x/sync/errgroup"
)

const (
Expand Down Expand Up @@ -188,54 +187,16 @@ func (t *Protocol) startSubscriber(ctx context.Context, sub subscriptionWithTopi
}

func (t *Protocol) OpenInbound(ctx context.Context) error {
cctx, cancel := context.WithCancel(ctx)
defer cancel()
n := len(t.subscriptions)

// Make the channels for quit and errors.
quit := make(chan struct{}, n)
errc := make(chan error, n)

eg, ctx := errgroup.WithContext(ctx)
// Start up each subscription.
for _, sub := range t.subscriptions {
go func(ctx context.Context, sub subscriptionWithTopic) {
err := t.startSubscriber(cctx, sub)
if err != nil {
errc <- err
} else {
quit <- struct{}{}
}
}(ctx, sub)
}

// Collect errors and done calls until we have n of them.
errs := []string(nil)
for success := 0; success < n; success++ {
var err error
select {
case <-ctx.Done(): // Block for parent context to finish.
success--
case err = <-errc: // Collect errors
case <-quit:
}
if cancel != nil {
// Stop all other subscriptions.
cancel()
cancel = nil
}
if err != nil {
errs = append(errs, err.Error())
}
}

close(quit)
close(errc)

if errs == nil {
return nil
ctx, sub := ctx, sub
eg.Go(func() error {
return t.startSubscriber(ctx, sub)
})
}

return errors.New(strings.Join(errs, "\n"))
return eg.Wait()
}

// Close implements Closer.Close
Expand Down

0 comments on commit 7bec7ba

Please sign in to comment.