diff --git a/runner/sidecar/sources.go b/runner/sidecar/sources.go index 607654f8..3a21b302 100644 --- a/runner/sidecar/sources.go +++ b/runner/sidecar/sources.go @@ -129,18 +129,10 @@ func connectKafkaSource(ctx context.Context, x *dfv1.Kafka, sourceName string, f if err != nil { return err } - adminClient, err := sarama.NewClusterAdmin(x.Brokers, config) - if err != nil { - return err - } beforeClosers = append(beforeClosers, func(ctx context.Context) error { logger.Info("closing kafka client", "source", sourceName) return client.Close() }) - beforeClosers = append(beforeClosers, func(ctx context.Context) error { - logger.Info("closing kafka admin client", "source", sourceName) - return adminClient.Close() - }) groupName := pipelineName + "-" + stepName + "-source-" + sourceName + "-" + x.Topic group, err := sarama.NewConsumerGroup(x.Brokers, groupName, config) if err != nil { @@ -161,13 +153,22 @@ func connectKafkaSource(ctx context.Context, x *dfv1.Kafka, sourceName string, f return handler.Close() }) if leadReplica() { - registerKafkaSetPendingHook(x, sourceName, client, adminClient, groupName) + registerKafkaSetPendingHook(x, sourceName, client, config, groupName) } return nil } -func registerKafkaSetPendingHook(x *dfv1.Kafka, sourceName string, client sarama.Client, adminClient sarama.ClusterAdmin, groupName string) { +func registerKafkaSetPendingHook(x *dfv1.Kafka, sourceName string, client sarama.Client, config *sarama.Config, groupName string) { prePatchHooks = append(prePatchHooks, func(ctx context.Context) error { + adminClient, err := sarama.NewClusterAdmin(x.Brokers, config) + if err != nil { + return err + } + defer func() { + if err := adminClient.Close(); err != nil { + logger.Error(err, "failed to close Kafka admin client", "source", sourceName) + } + }() partitions, err := client.Partitions(x.Topic) if err != nil { return fmt.Errorf("failed to get partitions for %q: %w", sourceName, err)