From 3c0d169b2fd111e313d9a76b62405f1795665edd Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Mon, 21 Jun 2021 14:35:42 -0700 Subject: [PATCH] fix: Kafka pending wrong when re-using pipeline --- runner/sidecar/sources.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runner/sidecar/sources.go b/runner/sidecar/sources.go index 6e2839c1..cbef05fe 100644 --- a/runner/sidecar/sources.go +++ b/runner/sidecar/sources.go @@ -141,7 +141,7 @@ func connectKafkaSource(ctx context.Context, x *dfv1.Kafka, sourceName string, f logger.Info("closing kafka admin client", "source", sourceName) return adminClient.Close() }) - groupName := pipelineName + "-" + stepName + groupName := pipelineName + "-" + stepName + "-source-" + sourceName + "-" + x.Topic group, err := sarama.NewConsumerGroup(x.Brokers, groupName, config) if err != nil { return fmt.Errorf("failed to create kafka consumer group: %w", err)