Skip to content

Commit

Permalink
fix(cc): re-creation of CC metrics topic (#976)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartam1 authored May 19, 2023
1 parent 039b608 commit 29541b9
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 4 deletions.
2 changes: 1 addition & 1 deletion controllers/kafkacluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (r *KafkaClusterReconciler) Reconcile(ctx context.Context, request ctrl.Req
kafkamonitoring.New(r.Client, instance),
cruisecontrolmonitoring.New(r.Client, instance),
kafka.New(r.Client, r.DirectClient, instance, r.KafkaClientProvider),
cruisecontrol.New(r.Client, instance),
cruisecontrol.New(r.Client, instance, r.KafkaClientProvider),
}

for _, rec := range reconcilers {
Expand Down
7 changes: 5 additions & 2 deletions pkg/resources/cruisecontrol/cruisecontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/banzaicloud/koperator/api/v1beta1"
"github.com/banzaicloud/koperator/pkg/errorfactory"
"github.com/banzaicloud/koperator/pkg/k8sutil"
"github.com/banzaicloud/koperator/pkg/kafkaclient"
"github.com/banzaicloud/koperator/pkg/resources"
certutil "github.com/banzaicloud/koperator/pkg/util/cert"
pkicommon "github.com/banzaicloud/koperator/pkg/util/pki"
Expand All @@ -54,6 +55,7 @@ type CapacityConfigAnnotation string
// Reconciler implements the Component Reconciler
type Reconciler struct {
resources.Reconciler
KafkaClientProvider kafkaclient.Provider
}

func ccLabelSelector(kafkaCluster string) map[string]string {
Expand All @@ -64,12 +66,13 @@ func ccLabelSelector(kafkaCluster string) map[string]string {
}

// New creates a new reconciler for CC
func New(client client.Client, cluster *v1beta1.KafkaCluster) *Reconciler {
func New(client client.Client, cluster *v1beta1.KafkaCluster, kafkaClientProvider kafkaclient.Provider) *Reconciler {
return &Reconciler{
Reconciler: resources.Reconciler{
Client: client,
KafkaCluster: cluster,
},
KafkaClientProvider: kafkaClientProvider,
}
}

Expand All @@ -90,7 +93,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
}

if r.KafkaCluster.Spec.CruiseControlConfig.CruiseControlEndpoint == "" {
genErr := generateCCTopic(r.KafkaCluster, r.Client, log.WithName("generateCCTopic"))
genErr := generateCCTopic(r.KafkaCluster, r.Client, r.KafkaClientProvider, log.WithName("generateCCTopic"))
if genErr != nil {
updateErr := k8sutil.UpdateCRStatus(r.Client, r.KafkaCluster, v1beta1.CruiseControlTopicNotReady, log)
return errors.Combine(genErr, updateErr)
Expand Down
26 changes: 25 additions & 1 deletion pkg/resources/cruisecontrol/topicManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ import (
"github.com/banzaicloud/koperator/api/v1alpha1"
"github.com/banzaicloud/koperator/api/v1beta1"
"github.com/banzaicloud/koperator/pkg/errorfactory"
"github.com/banzaicloud/koperator/pkg/kafkaclient"
"github.com/banzaicloud/koperator/pkg/resources/templates"
"github.com/banzaicloud/koperator/pkg/webhooks"
properties "github.com/banzaicloud/koperator/properties/pkg"
)

const (
ccMetricTopicAutoCreate = "cruise.control.metrics.topic.auto.create"
ccMetricPropertyTopicName = "cruise.control.metrics.topic"
cruiseControlTopicFormat = "%s-cruise-control-topic"
cruiseControlTopicName = "__CruiseControlMetrics"
cruiseControlTopicPartitions = 12
Expand Down Expand Up @@ -71,7 +73,7 @@ func newCruiseControlTopic(cluster *v1beta1.KafkaCluster) *v1alpha1.KafkaTopic {
}
}

func generateCCTopic(cluster *v1beta1.KafkaCluster, client client.Client, log logr.Logger) error {
func generateCCTopic(cluster *v1beta1.KafkaCluster, client client.Client, kafkaClientProvider kafkaclient.Provider, log logr.Logger) error {
readOnlyConfigProperties, err := properties.NewFromString(cluster.Spec.ReadOnlyConfig)
if err != nil {
return errors.WrapIf(err, "could not parse broker config")
Expand All @@ -87,6 +89,28 @@ func generateCCTopic(cluster *v1beta1.KafkaCluster, client client.Client, log lo
return nil
}
}
// Handle that case when the topic is created by CC but later cruise.control.metrics.topic.auto.create config removed from readOnlyConfig,
// or its value changed to false. In that case we dont need to create CC Metrics topic
broker, close, err := kafkaClientProvider.NewFromCluster(client, cluster)
if err != nil {
return err
}
defer close()

// Check the override of the CC metrics topic name
ccTopicName := cruiseControlTopicName
if ccTopicNameProp, present := readOnlyConfigProperties.Get(ccMetricPropertyTopicName); present {
if ccTopicNameOverride := ccTopicNameProp.Value(); ccTopicNameOverride != "" {
ccTopicName = ccTopicNameOverride
}
}

if topic, err := broker.GetTopic(ccTopicName); err != nil {
return errorfactory.New(errorfactory.ResourceNotReady{}, err, fmt.Sprintf("failed to get kafka topic: %s", ccTopicName))
} else if topic != nil {
log.Info("CruiseControl topic has been created by CruiseControl")
return nil
}

existing := &v1alpha1.KafkaTopic{}
topic := newCruiseControlTopic(cluster)
Expand Down

0 comments on commit 29541b9

Please sign in to comment.