Skip to content

Commit

Permalink
fix: clean up parallel for stan
Browse files Browse the repository at this point in the history
  • Loading branch information
whynowy committed Jun 10, 2021
1 parent b8e88fe commit e09e8e9
Showing 1 changed file with 21 additions and 29 deletions.
50 changes: 21 additions & 29 deletions runner/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,9 +417,6 @@ func connectSources(ctx context.Context, toMain func([]byte) error) error {
if !ok {
return 0, fmt.Errorf("Unrecognized last_seq: %v", o["last_seq"])
}
if err != nil {
return 0, err
}
subs, ok := o["subscriptions"]
if !ok {
return 0, fmt.Errorf("No suscriptions field found in the monitoring endpoint response")
Expand All @@ -434,9 +431,6 @@ func connectSources(ctx context.Context, toMain func([]byte) error) error {
if !ok {
return 0, fmt.Errorf("Unrecognized last_sent: %v", s["last_sent"])
}
if err != nil {
return 0, err
}
if lastSent > maxLastSent {
maxLastSent = lastSent
}
Expand All @@ -446,29 +440,27 @@ func connectSources(ctx context.Context, toMain func([]byte) error) error {

// https://docs.nats.io/developing-with-nats-streaming/queues
queueName := fmt.Sprintf("%s-%s-source-%s", pipelineName, stepName, sourceName)
for i := 0; i < int(x.Parallel); i++ {
if sub, err := sc.QueueSubscribe(x.Subject, queueName, func(m *stan.Msg) {
_ = f(m.Data) // TODO we should decide what to do with errors here, currently we ignore them
}, stan.DurableName(queueName)); err != nil {
return fmt.Errorf("failed to subscribe: %w", err)
} else {
beforeClosers = append(beforeClosers, func(ctx context.Context) error {
logger.Info("closing stan subscription", "source", sourceName)
return sub.Close()
})

if i == 0 && replica == 0 {
go wait.JitterUntil(func() {
// queueNameCombo := {durableName}:{queueGroup}
queueNameCombo := queueName + ":" + queueName
if pending, err := pendingMessages(x.Subject, queueNameCombo); err != nil {
logger.Error(err, "failed to get pending", "source", sourceName)
} else if pending >= 0 {
logger.Info("setting pending", "source", sourceName, "pending", pending)
withLock(func() { step.Status.SourceStatuses.SetPending(sourceName, uint64(pending)) })
}
}, updateInterval, 1.2, true, ctx.Done())
}
if sub, err := sc.QueueSubscribe(x.Subject, queueName, func(m *stan.Msg) {
_ = f(m.Data) // TODO we should decide what to do with errors here, currently we ignore them
}, stan.DurableName(queueName)); err != nil {
return fmt.Errorf("failed to subscribe: %w", err)
} else {
beforeClosers = append(beforeClosers, func(ctx context.Context) error {
logger.Info("closing stan subscription", "source", sourceName)
return sub.Close()
})

if replica == 0 {
go wait.JitterUntil(func() {
// queueNameCombo := {durableName}:{queueGroup}
queueNameCombo := queueName + ":" + queueName
if pending, err := pendingMessages(x.Subject, queueNameCombo); err != nil {
logger.Error(err, "failed to get pending", "source", sourceName)
} else if pending >= 0 {
logger.Info("setting pending", "source", sourceName, "pending", pending)
withLock(func() { step.Status.SourceStatuses.SetPending(sourceName, uint64(pending)) })
}
}, updateInterval, 1.2, true, ctx.Done())
}
}
} else if x := source.Kafka; x != nil {
Expand Down

0 comments on commit e09e8e9

Please sign in to comment.