diff --git a/controllers/kafkacluster_controller.go b/controllers/kafkacluster_controller.go index 74bbabbae..4043145d8 100644 --- a/controllers/kafkacluster_controller.go +++ b/controllers/kafkacluster_controller.go @@ -119,7 +119,7 @@ func (r *KafkaClusterReconciler) Reconcile(ctx context.Context, request ctrl.Req kafkamonitoring.New(r.Client, instance), cruisecontrolmonitoring.New(r.Client, instance), kafka.New(r.Client, r.DirectClient, instance, r.KafkaClientProvider), - cruisecontrol.New(r.Client, instance), + cruisecontrol.New(r.Client, instance, r.KafkaClientProvider), } for _, rec := range reconcilers { diff --git a/pkg/resources/cruisecontrol/cruisecontrol.go b/pkg/resources/cruisecontrol/cruisecontrol.go index 17c000eda..a1dbf370b 100644 --- a/pkg/resources/cruisecontrol/cruisecontrol.go +++ b/pkg/resources/cruisecontrol/cruisecontrol.go @@ -29,6 +29,7 @@ import ( "github.com/banzaicloud/koperator/api/v1beta1" "github.com/banzaicloud/koperator/pkg/errorfactory" "github.com/banzaicloud/koperator/pkg/k8sutil" + "github.com/banzaicloud/koperator/pkg/kafkaclient" "github.com/banzaicloud/koperator/pkg/resources" certutil "github.com/banzaicloud/koperator/pkg/util/cert" pkicommon "github.com/banzaicloud/koperator/pkg/util/pki" @@ -54,6 +55,7 @@ type CapacityConfigAnnotation string // Reconciler implements the Component Reconciler type Reconciler struct { resources.Reconciler + KafkaClientProvider kafkaclient.Provider } func ccLabelSelector(kafkaCluster string) map[string]string { @@ -64,12 +66,13 @@ func ccLabelSelector(kafkaCluster string) map[string]string { } // New creates a new reconciler for CC -func New(client client.Client, cluster *v1beta1.KafkaCluster) *Reconciler { +func New(client client.Client, cluster *v1beta1.KafkaCluster, kafkaClientProvider kafkaclient.Provider) *Reconciler { return &Reconciler{ Reconciler: resources.Reconciler{ Client: client, KafkaCluster: cluster, }, + KafkaClientProvider: kafkaClientProvider, } } @@ -90,7 +93,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { } if r.KafkaCluster.Spec.CruiseControlConfig.CruiseControlEndpoint == "" { - genErr := generateCCTopic(r.KafkaCluster, r.Client, log.WithName("generateCCTopic")) + genErr := generateCCTopic(r.KafkaCluster, r.Client, r.KafkaClientProvider, log.WithName("generateCCTopic")) if genErr != nil { updateErr := k8sutil.UpdateCRStatus(r.Client, r.KafkaCluster, v1beta1.CruiseControlTopicNotReady, log) return errors.Combine(genErr, updateErr) diff --git a/pkg/resources/cruisecontrol/topicManager.go b/pkg/resources/cruisecontrol/topicManager.go index 9e38da9af..33773ebd5 100644 --- a/pkg/resources/cruisecontrol/topicManager.go +++ b/pkg/resources/cruisecontrol/topicManager.go @@ -27,6 +27,7 @@ import ( "github.com/banzaicloud/koperator/api/v1alpha1" "github.com/banzaicloud/koperator/api/v1beta1" "github.com/banzaicloud/koperator/pkg/errorfactory" + "github.com/banzaicloud/koperator/pkg/kafkaclient" "github.com/banzaicloud/koperator/pkg/resources/templates" "github.com/banzaicloud/koperator/pkg/webhooks" properties "github.com/banzaicloud/koperator/properties/pkg" @@ -34,6 +35,7 @@ import ( const ( ccMetricTopicAutoCreate = "cruise.control.metrics.topic.auto.create" + ccMetricPropertyTopicName = "cruise.control.metrics.topic" cruiseControlTopicFormat = "%s-cruise-control-topic" cruiseControlTopicName = "__CruiseControlMetrics" cruiseControlTopicPartitions = 12 @@ -71,7 +73,7 @@ func newCruiseControlTopic(cluster *v1beta1.KafkaCluster) *v1alpha1.KafkaTopic { } } -func generateCCTopic(cluster *v1beta1.KafkaCluster, client client.Client, log logr.Logger) error { +func generateCCTopic(cluster *v1beta1.KafkaCluster, client client.Client, kafkaClientProvider kafkaclient.Provider, log logr.Logger) error { readOnlyConfigProperties, err := properties.NewFromString(cluster.Spec.ReadOnlyConfig) if err != nil { return errors.WrapIf(err, "could not parse broker config") @@ -87,6 +89,28 @@ func generateCCTopic(cluster *v1beta1.KafkaCluster, client client.Client, log lo return nil } } + // Handle that case when the topic is created by CC but later cruise.control.metrics.topic.auto.create config removed from readOnlyConfig, + // or its value changed to false. In that case we dont need to create CC Metrics topic + broker, close, err := kafkaClientProvider.NewFromCluster(client, cluster) + if err != nil { + return err + } + defer close() + + // Check the override of the CC metrics topic name + ccTopicName := cruiseControlTopicName + if ccTopicNameProp, present := readOnlyConfigProperties.Get(ccMetricPropertyTopicName); present { + if ccTopicNameOverride := ccTopicNameProp.Value(); ccTopicNameOverride != "" { + ccTopicName = ccTopicNameOverride + } + } + + if topic, err := broker.GetTopic(ccTopicName); err != nil { + return errorfactory.New(errorfactory.ResourceNotReady{}, err, fmt.Sprintf("failed to get kafka topic: %s", ccTopicName)) + } else if topic != nil { + log.Info("CruiseControl topic has been created by CruiseControl") + return nil + } existing := &v1alpha1.KafkaTopic{} topic := newCruiseControlTopic(cluster)