diff --git a/api/v1alpha1/kafkatopic_types.go b/api/v1alpha1/kafkatopic_types.go index 261a86e45..8abb61fb9 100644 --- a/api/v1alpha1/kafkatopic_types.go +++ b/api/v1alpha1/kafkatopic_types.go @@ -18,14 +18,20 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -const MinPartitions = 1 +const ( + MinPartitions = -1 + MinReplicationFactor = -1 +) // KafkaTopicSpec defines the desired state of KafkaTopic // +k8s:openapi-gen=true type KafkaTopicSpec struct { Name string `json:"name"` - // +kubebuilder:validation:Minimum=1 - Partitions int32 `json:"partitions"` + // Partitions defines the desired number of partitions; must be positive, or -1 to signify using the broker's default + // +kubebuilder:validation:Minimum=-1 + Partitions int32 `json:"partitions"` + // ReplicationFactor defines the desired replication factor; must be positive, or -1 to signify using the broker's default + // +kubebuilder:validation:Minimum=-1 ReplicationFactor int32 `json:"replicationFactor"` Config map[string]string `json:"config,omitempty"` ClusterRef ClusterReference `json:"clusterRef"` diff --git a/charts/kafka-operator/templates/crds.yaml b/charts/kafka-operator/templates/crds.yaml index ab3e49581..d76ad6c66 100644 --- a/charts/kafka-operator/templates/crds.yaml +++ b/charts/kafka-operator/templates/crds.yaml @@ -19348,11 +19348,16 @@ spec: name: type: string partitions: + description: Partitions defines the desired number of partitions; + must be positive, or -1 to signify using the broker's default format: int32 - minimum: 1 + minimum: -1 type: integer replicationFactor: + description: ReplicationFactor defines the desired replication factor; + must be positive, or -1 to signify using the broker's default format: int32 + minimum: -1 type: integer required: - clusterRef diff --git a/config/base/crds/kafka.banzaicloud.io_kafkatopics.yaml b/config/base/crds/kafka.banzaicloud.io_kafkatopics.yaml index 33c184b9a..cbcc93dfc 100644 --- a/config/base/crds/kafka.banzaicloud.io_kafkatopics.yaml +++ b/config/base/crds/kafka.banzaicloud.io_kafkatopics.yaml @@ -53,11 +53,16 @@ spec: name: type: string partitions: + description: Partitions defines the desired number of partitions; + must be positive, or -1 to signify using the broker's default format: int32 - minimum: 1 + minimum: -1 type: integer replicationFactor: + description: ReplicationFactor defines the desired replication factor; + must be positive, or -1 to signify using the broker's default format: int32 + minimum: -1 type: integer required: - clusterRef diff --git a/config/samples/example-topic.yaml b/config/samples/example-topic.yaml index 763912959..e0ca6bc5f 100644 --- a/config/samples/example-topic.yaml +++ b/config/samples/example-topic.yaml @@ -7,8 +7,9 @@ spec: clusterRef: name: kafka name: example-topic - # valid partitions values: [1,...] + # valid partitions values: [1,...], or -1 to use the broker's default partitions: 3 + # valid repliaction factor values: [1, ...], or -1 to use the broker's default replicationFactor: 2 config: "retention.ms": "604800000" diff --git a/pkg/webhook/request_test.go b/pkg/webhook/request_test.go index 87ce162ea..9087c4dc2 100644 --- a/pkg/webhook/request_test.go +++ b/pkg/webhook/request_test.go @@ -38,16 +38,31 @@ func newRawTopic() []byte { Namespace: "test-namespace", } topic.Spec.Partitions = 1 + topic.Spec.ReplicationFactor = 1 out, _ := json.Marshal(topic) return out } -func newRawInvalidTopic() []byte { +func newRawTopicWithInvalidPartitions() []byte { topic := &v1alpha1.KafkaTopic{} topic.ObjectMeta = metav1.ObjectMeta{ Name: "test-topic", Namespace: "test-namespace", } + topic.Spec.Partitions = -2 + topic.Spec.ReplicationFactor = 1 + out, _ := json.Marshal(topic) + return out +} + +func newRawTopicWithInvalidReplicationFactor() []byte { + topic := &v1alpha1.KafkaTopic{} + topic.ObjectMeta = metav1.ObjectMeta{ + Name: "test-topic", + Namespace: "test-namespace", + } + topic.Spec.Partitions = 1 + topic.Spec.ReplicationFactor = -2 out, _ := json.Marshal(topic) return out } @@ -101,17 +116,25 @@ func TestValidate(t *testing.T) { t.Error("Expected bad request, got:", res.Result.Reason) } - req.Request.Object.Raw = newRawInvalidTopic() + req.Request.Object.Raw = newRawTopicWithInvalidPartitions() - if res := server.validate(req); res.Allowed { + if res = server.validate(req); res.Allowed { + t.Error("Expected not allowed, got allowed") + } else if res.Result.Reason != metav1.StatusReasonInvalid { + t.Error("Expected invalid due to invalid partitions, got:", res.Result.Reason) + } + + req.Request.Object.Raw = newRawTopicWithInvalidReplicationFactor() + + if res = server.validate(req); res.Allowed { t.Error("Expected not allowed, got allowed") } else if res.Result.Reason != metav1.StatusReasonInvalid { - t.Error("Expected invalid for no partitions, got:", res.Result.Reason) + t.Error("Expected invalid due to invalid replication factor, got:", res.Result.Reason) } req.Request.Object.Raw = newRawTopic() - if res := server.validate(req); res.Allowed { + if res = server.validate(req); res.Allowed { t.Error("Expected not allowed, got allowed") } else if res.Result.Reason != metav1.StatusReasonNotFound { t.Error("Expected not found for no cluster, got:", res.Result.Reason) diff --git a/pkg/webhook/topic_validator.go b/pkg/webhook/topic_validator.go index 0b6ca58a7..130e19953 100644 --- a/pkg/webhook/topic_validator.go +++ b/pkg/webhook/topic_validator.go @@ -32,9 +32,10 @@ import ( ) const ( - cantConnectErrorMsg = "Failed to connect to kafka cluster" - invalidReplicationFactorErrMsg = "Replication factor is larger than the number of nodes in the kafka cluster" - invalidPartitionsErrMsg = "Number of partitions is less than the minimum partitions number" + cantConnectErrorMsg = "Failed to connect to kafka cluster" + outOfRangeReplicationFactorErrMsg = "Replication factor must be larger than 0 (or set it to be -1 to use the broker's default)" + invalidReplicationFactorErrMsg = "Replication factor is larger than the number of nodes in the kafka cluster" + outOfRangePartitionsErrMsg = "Number of partitions must be larger than 0 (or set it to be -1 to use the broker's default)" ) func (s *webhookServer) validateKafkaTopic(topic *banzaicloudv1alpha1.KafkaTopic) *admissionv1.AdmissionResponse { @@ -42,10 +43,18 @@ func (s *webhookServer) validateKafkaTopic(topic *banzaicloudv1alpha1.KafkaTopic log.Info(fmt.Sprintf("Doing pre-admission validation of kafka topic %s", topic.Spec.Name)) // First check if the kafkatopic is valid - if topic.Spec.Partitions < banzaicloudv1alpha1.MinPartitions { - log.Info(invalidPartitionsErrMsg) + if topic.Spec.Partitions < banzaicloudv1alpha1.MinPartitions || topic.Spec.Partitions == 0 { + log.Info(outOfRangePartitionsErrMsg) return notAllowed( - fmt.Sprintf("KafkaTopic '%s' is invalid: %s - %d.", topic.Spec.Name, invalidPartitionsErrMsg, banzaicloudv1alpha1.MinPartitions), + fmt.Sprintf("KafkaTopic '%s' is invalid: %s.", topic.Spec.Name, outOfRangePartitionsErrMsg), + metav1.StatusReasonInvalid, + ) + } + + if topic.Spec.ReplicationFactor < banzaicloudv1alpha1.MinReplicationFactor || topic.Spec.ReplicationFactor == 0{ + log.Info(outOfRangeReplicationFactorErrMsg) + return notAllowed( + fmt.Sprintf("KafkaTopic '%s' is invalid: %s.", topic.Spec.Name, outOfRangeReplicationFactorErrMsg), metav1.StatusReasonInvalid, ) } diff --git a/pkg/webhook/topic_validator_test.go b/pkg/webhook/topic_validator_test.go index 6c7e8a84e..3d9378c0d 100644 --- a/pkg/webhook/topic_validator_test.go +++ b/pkg/webhook/topic_validator_test.go @@ -43,7 +43,7 @@ func newMockTopic() *v1alpha1.KafkaTopic { Spec: v1alpha1.KafkaTopicSpec{ Name: "test-topic", Partitions: 0, - ReplicationFactor: 1, + ReplicationFactor: 0, ClusterRef: v1alpha1.ClusterReference{ Name: "test-cluster", Namespace: "test-namespace", @@ -70,14 +70,24 @@ func TestValidateTopic(t *testing.T) { } topic := newMockTopic() - // Test kafka topic with partitions number less than 1 + // Test kafka topic with invalid partitions res := server.validateKafkaTopic(topic) if res.Allowed { t.Error("Expected not allowed due to invalid partitions number, got allowed") } + // set a valid partitions topic.Spec.Partitions = 2 + // Test kafka topic with invalid replication factor + res = server.validateKafkaTopic(topic) + if res.Allowed { + t.Error("Expected not allowed due to invalid replication factor, got allowed") + } + + // set a valid replication factor + topic.Spec.ReplicationFactor = 1 + // Test non-existent kafka cluster res = server.validateKafkaTopic(topic) if res.Result.Reason != metav1.StatusReasonNotFound {