Skip to content

Commit

Permalink
fix: Kafka pending wrong when re-using pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Jun 21, 2021
1 parent 9fefcb6 commit 3c0d169
Showing 1 changed file with 1 addition and 1 deletion.
2 changes: 1 addition & 1 deletion runner/sidecar/sources.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func connectKafkaSource(ctx context.Context, x *dfv1.Kafka, sourceName string, f
logger.Info("closing kafka admin client", "source", sourceName)
return adminClient.Close()
})
groupName := pipelineName + "-" + stepName
groupName := pipelineName + "-" + stepName + "-source-" + sourceName + "-" + x.Topic
group, err := sarama.NewConsumerGroup(x.Brokers, groupName, config)
if err != nil {
return fmt.Errorf("failed to create kafka consumer group: %w", err)
Expand Down

0 comments on commit 3c0d169

Please sign in to comment.