Skip to content

Commit

Permalink
fix: prevent Kafka pending loop dieing on disconnection
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Jun 29, 2021
1 parent f289249 commit 778f24a
Showing 1 changed file with 11 additions and 10 deletions.
21 changes: 11 additions & 10 deletions runner/sidecar/sources.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,18 +129,10 @@ func connectKafkaSource(ctx context.Context, x *dfv1.Kafka, sourceName string, f
if err != nil {
return err
}
adminClient, err := sarama.NewClusterAdmin(x.Brokers, config)
if err != nil {
return err
}
beforeClosers = append(beforeClosers, func(ctx context.Context) error {
logger.Info("closing kafka client", "source", sourceName)
return client.Close()
})
beforeClosers = append(beforeClosers, func(ctx context.Context) error {
logger.Info("closing kafka admin client", "source", sourceName)
return adminClient.Close()
})
groupName := pipelineName + "-" + stepName + "-source-" + sourceName + "-" + x.Topic
group, err := sarama.NewConsumerGroup(x.Brokers, groupName, config)
if err != nil {
Expand All @@ -161,13 +153,22 @@ func connectKafkaSource(ctx context.Context, x *dfv1.Kafka, sourceName string, f
return handler.Close()
})
if leadReplica() {
registerKafkaSetPendingHook(x, sourceName, client, adminClient, groupName)
registerKafkaSetPendingHook(x, sourceName, client, config, groupName)
}
return nil
}

func registerKafkaSetPendingHook(x *dfv1.Kafka, sourceName string, client sarama.Client, adminClient sarama.ClusterAdmin, groupName string) {
func registerKafkaSetPendingHook(x *dfv1.Kafka, sourceName string, client sarama.Client, config *sarama.Config, groupName string) {
prePatchHooks = append(prePatchHooks, func(ctx context.Context) error {
adminClient, err := sarama.NewClusterAdmin(x.Brokers, config)
if err != nil {
return err
}
defer func() {
if err := adminClient.Close(); err != nil {
logger.Error(err, "failed to close Kafka admin client", "source", sourceName)
}
}()
partitions, err := client.Partitions(x.Topic)
if err != nil {
return fmt.Errorf("failed to get partitions for %q: %w", sourceName, err)
Expand Down

0 comments on commit 778f24a

Please sign in to comment.