Skip to content

Commit

Permalink
Add validation for replication factor
Browse files Browse the repository at this point in the history
  • Loading branch information
panyuenlau committed Jun 1, 2022
1 parent 5d099cc commit 344eedb
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 19 deletions.
12 changes: 9 additions & 3 deletions api/v1alpha1/kafkatopic_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const MinPartitions = 1
const (
MinPartitions = -1
MinReplicationFactor = -1
)

// KafkaTopicSpec defines the desired state of KafkaTopic
// +k8s:openapi-gen=true
type KafkaTopicSpec struct {
Name string `json:"name"`
// +kubebuilder:validation:Minimum=1
Partitions int32 `json:"partitions"`
// Partitions defines the desired number of partitions; must be positive, or -1 to signify using the broker's default
// +kubebuilder:validation:Minimum=-1
Partitions int32 `json:"partitions"`
// ReplicationFactor defines the desired replication factor; must be positive, or -1 to signify using the broker's default
// +kubebuilder:validation:Minimum=-1
ReplicationFactor int32 `json:"replicationFactor"`
Config map[string]string `json:"config,omitempty"`
ClusterRef ClusterReference `json:"clusterRef"`
Expand Down
7 changes: 6 additions & 1 deletion charts/kafka-operator/templates/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19348,11 +19348,16 @@ spec:
name:
type: string
partitions:
description: Partitions defines the desired number of partitions;
must be positive, or -1 to signify using the broker's default
format: int32
minimum: 1
minimum: -1
type: integer
replicationFactor:
description: ReplicationFactor defines the desired replication factor;
must be positive, or -1 to signify using the broker's default
format: int32
minimum: -1
type: integer
required:
- clusterRef
Expand Down
7 changes: 6 additions & 1 deletion config/base/crds/kafka.banzaicloud.io_kafkatopics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,16 @@ spec:
name:
type: string
partitions:
description: Partitions defines the desired number of partitions;
must be positive, or -1 to signify using the broker's default
format: int32
minimum: 1
minimum: -1
type: integer
replicationFactor:
description: ReplicationFactor defines the desired replication factor;
must be positive, or -1 to signify using the broker's default
format: int32
minimum: -1
type: integer
required:
- clusterRef
Expand Down
3 changes: 2 additions & 1 deletion config/samples/example-topic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ spec:
clusterRef:
name: kafka
name: example-topic
# valid partitions values: [1,...]
# valid partitions values: [1,...], or -1 to use the broker's default
partitions: 3
# valid repliaction factor values: [1, ...], or -1 to use the broker's default
replicationFactor: 2
config:
"retention.ms": "604800000"
Expand Down
33 changes: 28 additions & 5 deletions pkg/webhook/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,31 @@ func newRawTopic() []byte {
Namespace: "test-namespace",
}
topic.Spec.Partitions = 1
topic.Spec.ReplicationFactor = 1
out, _ := json.Marshal(topic)
return out
}

func newRawInvalidTopic() []byte {
func newRawTopicWithInvalidPartitions() []byte {
topic := &v1alpha1.KafkaTopic{}
topic.ObjectMeta = metav1.ObjectMeta{
Name: "test-topic",
Namespace: "test-namespace",
}
topic.Spec.Partitions = -2
topic.Spec.ReplicationFactor = 1
out, _ := json.Marshal(topic)
return out
}

func newRawTopicWithInvalidReplicationFactor() []byte {
topic := &v1alpha1.KafkaTopic{}
topic.ObjectMeta = metav1.ObjectMeta{
Name: "test-topic",
Namespace: "test-namespace",
}
topic.Spec.Partitions = 1
topic.Spec.ReplicationFactor = -2
out, _ := json.Marshal(topic)
return out
}
Expand Down Expand Up @@ -101,17 +116,25 @@ func TestValidate(t *testing.T) {
t.Error("Expected bad request, got:", res.Result.Reason)
}

req.Request.Object.Raw = newRawInvalidTopic()
req.Request.Object.Raw = newRawTopicWithInvalidPartitions()

if res := server.validate(req); res.Allowed {
if res = server.validate(req); res.Allowed {
t.Error("Expected not allowed, got allowed")
} else if res.Result.Reason != metav1.StatusReasonInvalid {
t.Error("Expected invalid due to invalid partitions, got:", res.Result.Reason)
}

req.Request.Object.Raw = newRawTopicWithInvalidReplicationFactor()

if res = server.validate(req); res.Allowed {
t.Error("Expected not allowed, got allowed")
} else if res.Result.Reason != metav1.StatusReasonInvalid {
t.Error("Expected invalid for no partitions, got:", res.Result.Reason)
t.Error("Expected invalid due to invalid replication factor, got:", res.Result.Reason)
}

req.Request.Object.Raw = newRawTopic()

if res := server.validate(req); res.Allowed {
if res = server.validate(req); res.Allowed {
t.Error("Expected not allowed, got allowed")
} else if res.Result.Reason != metav1.StatusReasonNotFound {
t.Error("Expected not found for no cluster, got:", res.Result.Reason)
Expand Down
21 changes: 15 additions & 6 deletions pkg/webhook/topic_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,29 @@ import (
)

const (
cantConnectErrorMsg = "Failed to connect to kafka cluster"
invalidReplicationFactorErrMsg = "Replication factor is larger than the number of nodes in the kafka cluster"
invalidPartitionsErrMsg = "Number of partitions is less than the minimum partitions number"
cantConnectErrorMsg = "Failed to connect to kafka cluster"
outOfRangeReplicationFactorErrMsg = "Replication factor must be larger than 0 (or set it to be -1 to use the broker's default)"
invalidReplicationFactorErrMsg = "Replication factor is larger than the number of nodes in the kafka cluster"
outOfRangePartitionsErrMsg = "Number of partitions must be larger than 0 (or set it to be -1 to use the broker's default)"
)

func (s *webhookServer) validateKafkaTopic(topic *banzaicloudv1alpha1.KafkaTopic) *admissionv1.AdmissionResponse {
ctx := context.Background()
log.Info(fmt.Sprintf("Doing pre-admission validation of kafka topic %s", topic.Spec.Name))

// First check if the kafkatopic is valid
if topic.Spec.Partitions < banzaicloudv1alpha1.MinPartitions {
log.Info(invalidPartitionsErrMsg)
if topic.Spec.Partitions < banzaicloudv1alpha1.MinPartitions || topic.Spec.Partitions == 0 {
log.Info(outOfRangePartitionsErrMsg)
return notAllowed(
fmt.Sprintf("KafkaTopic '%s' is invalid: %s - %d.", topic.Spec.Name, invalidPartitionsErrMsg, banzaicloudv1alpha1.MinPartitions),
fmt.Sprintf("KafkaTopic '%s' is invalid: %s.", topic.Spec.Name, outOfRangePartitionsErrMsg),
metav1.StatusReasonInvalid,
)
}

if topic.Spec.ReplicationFactor < banzaicloudv1alpha1.MinReplicationFactor || topic.Spec.ReplicationFactor == 0{
log.Info(outOfRangeReplicationFactorErrMsg)
return notAllowed(
fmt.Sprintf("KafkaTopic '%s' is invalid: %s.", topic.Spec.Name, outOfRangeReplicationFactorErrMsg),
metav1.StatusReasonInvalid,
)
}
Expand Down
14 changes: 12 additions & 2 deletions pkg/webhook/topic_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func newMockTopic() *v1alpha1.KafkaTopic {
Spec: v1alpha1.KafkaTopicSpec{
Name: "test-topic",
Partitions: 0,
ReplicationFactor: 1,
ReplicationFactor: 0,
ClusterRef: v1alpha1.ClusterReference{
Name: "test-cluster",
Namespace: "test-namespace",
Expand All @@ -70,14 +70,24 @@ func TestValidateTopic(t *testing.T) {
}
topic := newMockTopic()

// Test kafka topic with partitions number less than 1
// Test kafka topic with invalid partitions
res := server.validateKafkaTopic(topic)
if res.Allowed {
t.Error("Expected not allowed due to invalid partitions number, got allowed")
}

// set a valid partitions
topic.Spec.Partitions = 2

// Test kafka topic with invalid replication factor
res = server.validateKafkaTopic(topic)
if res.Allowed {
t.Error("Expected not allowed due to invalid replication factor, got allowed")
}

// set a valid replication factor
topic.Spec.ReplicationFactor = 1

// Test non-existent kafka cluster
res = server.validateKafkaTopic(topic)
if res.Result.Reason != metav1.StatusReasonNotFound {
Expand Down

0 comments on commit 344eedb

Please sign in to comment.