diff --git a/control-plane/pkg/reconciler/broker/broker.go b/control-plane/pkg/reconciler/broker/broker.go index a5991c9452..a850f1e5b3 100644 --- a/control-plane/pkg/reconciler/broker/broker.go +++ b/control-plane/pkg/reconciler/broker/broker.go @@ -22,7 +22,6 @@ import ( "math" "strings" "sync" - "time" "github.com/Shopify/sarama" "go.uber.org/zap" @@ -53,10 +52,16 @@ type Reconciler struct { Resolver *resolver.URIResolver - KafkaClusterAdmin sarama.ClusterAdmin - KafkaClusterAdminLock sync.RWMutex + // TODO these configurations should live in each Broker configuration, so that we don't assume each + // Broker object use the same Kafka cluster KafkaDefaultTopicDetails sarama.TopicDetail KafkaDefaultTopicDetailsLock sync.RWMutex + bootstrapServers []string + bootstrapServersLock sync.RWMutex + + // NewClusterAdmin creates new sarama ClusterAdmin. It's convenient to add this as Reconciler field so that we can + // mock the function used during the reconciliation loop. + NewClusterAdmin func(addrs []string, config *sarama.Config) (sarama.ClusterAdmin, error) Configs *Configs } @@ -300,26 +305,23 @@ func (r *Reconciler) ConfigMapUpdated(ctx context.Context) func(configMap *corev func (r *Reconciler) SetBootstrapServers(servers string) error { addrs := strings.Split(servers, ",") + r.bootstrapServersLock.Lock() + r.bootstrapServers = addrs + r.bootstrapServersLock.Unlock() + + return nil +} + +func (r *Reconciler) getKafkaClusterAdmin(bootstrapServers []string) (sarama.ClusterAdmin, error) { config := sarama.NewConfig() config.Version = sarama.MaxVersion - config.Net.KeepAlive = time.Second * 60 - config.Metadata.RefreshFrequency = time.Minute - kafkaClusterAdmin, err := NewClusterAdmin(addrs, config) + kafkaClusterAdmin, err := r.NewClusterAdmin(bootstrapServers, config) if err != nil { - return fmt.Errorf("failed to create kafka cluster admin: %w", err) + return nil, fmt.Errorf("failed to create cluster admin: %w", err) } - r.KafkaClusterAdminLock.Lock() - oldKafkaClusterAdmin := r.KafkaClusterAdmin - r.KafkaClusterAdmin = kafkaClusterAdmin - r.KafkaClusterAdminLock.Unlock() - - if oldKafkaClusterAdmin != nil { - _ = oldKafkaClusterAdmin.Close() - } - - return nil + return kafkaClusterAdmin, nil } func (r *Reconciler) SetDefaultTopicDetails(topicDetail sarama.TopicDetail) { diff --git a/control-plane/pkg/reconciler/broker/broker_test.go b/control-plane/pkg/reconciler/broker/broker_test.go index a6fa65e106..aeacf795aa 100644 --- a/control-plane/pkg/reconciler/broker/broker_test.go +++ b/control-plane/pkg/reconciler/broker/broker_test.go @@ -960,14 +960,6 @@ func useTable(t *testing.T, table TableTest, configs *Configs) { onDeleteTopicError = want.(error) } - clusterAdmin := &MockKafkaClusterAdmin{ - ExpectedTopicName: fmt.Sprintf("%s%s-%s", TopicPrefix, BrokerNamespace, BrokerName), - ExpectedTopicDetail: defaultTopicDetail, - ErrorOnCreateTopic: onCreateTopicError, - ErrorOnDeleteTopic: onDeleteTopicError, - T: t, - } - reconciler := &Reconciler{ Reconciler: &base.Reconciler{ KubeClient: kubeclient.Get(ctx), @@ -977,7 +969,15 @@ func useTable(t *testing.T, table TableTest, configs *Configs) { DataPlaneConfigFormat: configs.DataPlaneConfigFormat, SystemNamespace: configs.SystemNamespace, }, - KafkaClusterAdmin: clusterAdmin, + NewClusterAdmin: func(addrs []string, config *sarama.Config) (sarama.ClusterAdmin, error) { + return &MockKafkaClusterAdmin{ + ExpectedTopicName: fmt.Sprintf("%s%s-%s", TopicPrefix, BrokerNamespace, BrokerName), + ExpectedTopicDetail: defaultTopicDetail, + ErrorOnCreateTopic: onCreateTopicError, + ErrorOnDeleteTopic: onDeleteTopicError, + T: t, + }, nil + }, KafkaDefaultTopicDetails: defaultTopicDetail, KafkaDefaultTopicDetailsLock: sync.RWMutex{}, Configs: configs, @@ -1018,10 +1018,6 @@ func useTable(t *testing.T, table TableTest, configs *Configs) { func TestConfigMapUpdate(t *testing.T) { - NewClusterAdmin = func(addrs []string, conf *sarama.Config) (sarama.ClusterAdmin, error) { - return MockKafkaClusterAdmin{}, nil - } - cm := corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: "cmname", @@ -1044,7 +1040,6 @@ func TestConfigMapUpdate(t *testing.T) { NumPartitions: 42, ReplicationFactor: 3, }) - assert.NotNil(t, reconciler.KafkaClusterAdmin) } func patchFinalizers() clientgotesting.PatchActionImpl { diff --git a/control-plane/pkg/reconciler/broker/controller.go b/control-plane/pkg/reconciler/broker/controller.go index f8672150cb..b30cbaa584 100644 --- a/control-plane/pkg/reconciler/broker/controller.go +++ b/control-plane/pkg/reconciler/broker/controller.go @@ -46,8 +46,6 @@ const ( DefaultReplicationFactor = 1 ) -var NewClusterAdmin = sarama.NewClusterAdmin - func NewController(ctx context.Context, watcher configmap.Watcher, configs *Configs) *controller.Impl { eventing.RegisterAlternateBrokerConditionSet(ConditionSet) @@ -61,6 +59,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *Conf DataPlaneConfigFormat: configs.DataPlaneConfigFormat, SystemNamespace: configs.SystemNamespace, }, + NewClusterAdmin: sarama.NewClusterAdmin, KafkaDefaultTopicDetails: sarama.TopicDetail{ NumPartitions: DefaultNumPartitions, ReplicationFactor: DefaultReplicationFactor, diff --git a/control-plane/pkg/reconciler/broker/controller_test.go b/control-plane/pkg/reconciler/broker/controller_test.go index fe36b290bc..dc299faf65 100644 --- a/control-plane/pkg/reconciler/broker/controller_test.go +++ b/control-plane/pkg/reconciler/broker/controller_test.go @@ -19,7 +19,6 @@ package broker import ( "testing" - "github.com/Shopify/sarama" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -37,10 +36,6 @@ import ( func TestNewController(t *testing.T) { ctx, _ := reconcilertesting.SetupFakeContext(t) - NewClusterAdmin = func(addrs []string, conf *sarama.Config) (sarama.ClusterAdmin, error) { - return nil, nil - } - configs := &Configs{ EnvConfigs: EnvConfigs{ SystemNamespace: "cm", diff --git a/control-plane/pkg/reconciler/broker/topic.go b/control-plane/pkg/reconciler/broker/topic.go index 9c6dabe8c0..88225dee43 100644 --- a/control-plane/pkg/reconciler/broker/topic.go +++ b/control-plane/pkg/reconciler/broker/topic.go @@ -24,13 +24,18 @@ import ( ) func (r *Reconciler) CreateTopic(broker *eventing.Broker) (string, error) { + r.bootstrapServersLock.RLock() + defer r.bootstrapServersLock.RUnlock() + topic := Topic(broker) topicDetail := r.topicDetailFromBrokerConfig(broker) - r.KafkaClusterAdminLock.Lock() - defer r.KafkaClusterAdminLock.Unlock() + kafkaClusterAdmin, err := r.getKafkaClusterAdmin(r.bootstrapServers) + if err != nil { + return "", err + } - createTopicError := r.KafkaClusterAdmin.CreateTopic(topic, topicDetail, true) + createTopicError := kafkaClusterAdmin.CreateTopic(topic, topicDetail, true) if err, ok := createTopicError.(*sarama.TopicError); ok && err.Err == sarama.ErrTopicAlreadyExists { return topic, nil } @@ -39,13 +44,17 @@ func (r *Reconciler) CreateTopic(broker *eventing.Broker) (string, error) { } func (r *Reconciler) deleteTopic(broker *eventing.Broker) (string, error) { - - r.KafkaClusterAdminLock.RLock() - defer r.KafkaClusterAdminLock.RUnlock() + r.bootstrapServersLock.RLock() + defer r.bootstrapServersLock.RUnlock() topic := Topic(broker) - err := r.KafkaClusterAdmin.DeleteTopic(topic) + kafkaClusterAdmin, err := r.getKafkaClusterAdmin(r.bootstrapServers) + if err != nil { + return "", err + } + + err = kafkaClusterAdmin.DeleteTopic(topic) if sarama.ErrUnknownTopicOrPartition == err { return topic, nil } diff --git a/control-plane/pkg/reconciler/broker/topic_test.go b/control-plane/pkg/reconciler/broker/topic_test.go index effdb8e603..f63f5882b7 100644 --- a/control-plane/pkg/reconciler/broker/topic_test.go +++ b/control-plane/pkg/reconciler/broker/topic_test.go @@ -40,14 +40,16 @@ func TestCreateTopicTopicAlreadyExists(t *testing.T) { errMsg := "topic already exists" r := broker.Reconciler{ - KafkaClusterAdmin: reconcilertesting.MockKafkaClusterAdmin{ - ExpectedTopicName: topic, - ExpectedTopicDetail: sarama.TopicDetail{}, - ErrorOnCreateTopic: &sarama.TopicError{ - Err: sarama.ErrTopicAlreadyExists, - ErrMsg: &errMsg, - }, - T: t, + NewClusterAdmin: func(addrs []string, config *sarama.Config) (sarama.ClusterAdmin, error) { + return reconcilertesting.MockKafkaClusterAdmin{ + ExpectedTopicName: topic, + ExpectedTopicDetail: sarama.TopicDetail{}, + ErrorOnCreateTopic: &sarama.TopicError{ + Err: sarama.ErrTopicAlreadyExists, + ErrMsg: &errMsg, + }, + T: t, + }, nil }, }