Skip to content

Commit

Permalink
feat: expose Kafka config (#414)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <[email protected]>
  • Loading branch information
alexec authored Oct 5, 2021
1 parent 1cebd30 commit fa7373b
Show file tree
Hide file tree
Showing 21 changed files with 1,164 additions and 348 deletions.
950 changes: 633 additions & 317 deletions api/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

19 changes: 19 additions & 0 deletions api/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions api/v1alpha1/kafka_offset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package v1alpha1

// +kubebuilder:validation:Enum=First;Last
type KafkaOffset string

func (k KafkaOffset) GetAutoOffsetReset() string {
switch k {
case "First":
return "earliest"
default:
return "latest"
}
}
16 changes: 16 additions & 0 deletions api/v1alpha1/kafka_offset_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package v1alpha1

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestKafkaOffset_GetAutoOffsetReset(t *testing.T) {
t.Run("First", func(t *testing.T) {
assert.Equal(t, "earliest", KafkaOffset("First").GetAutoOffsetReset())
})
t.Run("Last", func(t *testing.T) {
assert.Equal(t, "latest", KafkaOffset("Last").GetAutoOffsetReset())
})
}
33 changes: 33 additions & 0 deletions api/v1alpha1/kafka_sink.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,39 @@
package v1alpha1

import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)

type KafkaSink struct {
Kafka `json:",inline" protobuf:"bytes,1,opt,name=kafka"`
Async bool `json:"async,omitempty" protobuf:"varint,2,opt,name=async"`
// +kubebuilder:default="100Ki"
BatchSize *resource.Quantity `json:"batchSize,omitempty" protobuf:"bytes,3,opt,name=batchSize"`
// +kubebuilder:default="0ms"
Linger *metav1.Duration `json:"linger,omitempty" protobuf:"bytes,4,opt,name=linger"`
// +kubebuilder:default="lz4"
CompressionType string `json:"compressionType,omitempty" protobuf:"bytes,5,opt,name=compressionType"`
// +kubebuilder:default="all"
Acks *intstr.IntOrString `json:"acks,omitempty" protobuf:"bytes,6,opt,name=acks"`
}

func (m *KafkaSink) GetBatchSize() int {
return int(m.BatchSize.Value())
}

func (m *KafkaSink) GetLingerMs() int {
return int(m.Linger.Milliseconds())
}

func (m *KafkaSink) GetAcks() interface{} {
if m.Acks.Type == intstr.String {
return m.Acks.String()
}
return m.Acks.IntValue()
}

func (m *KafkaSink) GetMessageMaxBytes() int {
return int(m.Kafka.MaxMessageBytes)
}
46 changes: 46 additions & 0 deletions api/v1alpha1/kafka_sink_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package v1alpha1

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)

func TestKafkaSink_GetBatchSize(t *testing.T) {
v := resource.MustParse("1Ki")
s := KafkaSink{BatchSize: &v}
assert.Equal(t, 1024, s.GetBatchSize())
}

func TestKafkaSink_GetLingerMs(t *testing.T) {
s := KafkaSink{Linger: &metav1.Duration{Duration: time.Second}}
assert.Equal(t, 1000, s.GetLingerMs())
}

func TestKafkaSink_GetAcks(t *testing.T) {
t.Run("all", func(t *testing.T) {
v := intstr.FromString("all")
s := KafkaSink{Acks: &v}
assert.Equal(t, "all", s.GetAcks())
})
t.Run("1", func(t *testing.T) {
v := intstr.FromInt(1)
s := KafkaSink{Acks: &v}
assert.Equal(t, 1, s.GetAcks())
})
}

func TestKafkaSink_GetMessageMaxBytes(t *testing.T) {
s := KafkaSink{
Kafka: Kafka{
KafkaConfig: KafkaConfig{
MaxMessageBytes: 1,
},
},
}
assert.Equal(t, 1, s.GetMessageMaxBytes())
}
22 changes: 20 additions & 2 deletions api/v1alpha1/kafka_source.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,28 @@
package v1alpha1

// +kubebuilder:validation:Enum=First;Last
type KafkaOffset string
import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type KafkaSource struct {
Kafka `json:",inline" protobuf:"bytes,1,opt,name=kafka"`
// +kubebuilder:default=Last
StartOffset KafkaOffset `json:"startOffset,omitempty" protobuf:"bytes,2,opt,name=startOffset,casttype=KafkaOffset"`
// +kubebuilder:default="100Ki"
FetchMin *resource.Quantity `json:"fetchMin,omitempty" protobuf:"bytes,3,opt,name=fetchMin"`
// +kubebuilder:default="500ms"
FetchWaitMax *metav1.Duration `json:"fetchWaitMax,omitempty" protobuf:"bytes,4,opt,name=fetchWaitMax"`
}

func (m *KafkaSource) GetAutoOffsetReset() string {
return m.StartOffset.GetAutoOffsetReset()
}

func (m *KafkaSource) GetFetchMinBytes() int {
return int(m.FetchMin.Value())
}

func (m *KafkaSource) GetFetchWaitMaxMs() int {
return int(m.FetchWaitMax.Milliseconds())
}
26 changes: 26 additions & 0 deletions api/v1alpha1/kafka_source_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package v1alpha1

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestKafkaSource_GetAutoOffsetReset(t *testing.T) {
source := KafkaSource{}
assert.Equal(t, "latest", source.GetAutoOffsetReset())
}

func TestKafkaSource_GetFetchMinBytes(t *testing.T) {
v := resource.MustParse("1Ki")
s := KafkaSource{FetchMin: &v}
assert.Equal(t, 1024, s.GetFetchMinBytes())
}

func TestKafkaSource_GetFetchWaitMaxMs(t *testing.T) {
s := KafkaSource{FetchWaitMax: &metav1.Duration{Duration: time.Second}}
assert.Equal(t, 1000, s.GetFetchWaitMaxMs())
}
26 changes: 26 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 58 additions & 0 deletions config/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1595,12 +1595,31 @@ spec:
type: object
kafka:
properties:
acks:
anyOf:
- type: integer
- type: string
default: all
x-kubernetes-int-or-string: true
async:
type: boolean
batchSize:
anyOf:
- type: integer
- type: string
default: 100Ki
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
brokers:
items:
type: string
type: array
compressionType:
default: lz4
type: string
linger:
default: 0ms
type: string
maxMessageBytes:
format: int32
type: integer
Expand Down Expand Up @@ -3541,6 +3560,16 @@ spec:
items:
type: string
type: array
fetchMin:
anyOf:
- type: integer
- type: string
default: 100Ki
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
fetchWaitMax:
default: 500ms
type: string
maxMessageBytes:
format: int32
type: integer
Expand Down Expand Up @@ -8651,12 +8680,31 @@ spec:
type: object
kafka:
properties:
acks:
anyOf:
- type: integer
- type: string
default: all
x-kubernetes-int-or-string: true
async:
type: boolean
batchSize:
anyOf:
- type: integer
- type: string
default: 100Ki
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
brokers:
items:
type: string
type: array
compressionType:
default: lz4
type: string
linger:
default: 0ms
type: string
maxMessageBytes:
format: int32
type: integer
Expand Down Expand Up @@ -10483,6 +10531,16 @@ spec:
items:
type: string
type: array
fetchMin:
anyOf:
- type: integer
- type: string
default: 100Ki
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
fetchWaitMax:
default: 500ms
type: string
maxMessageBytes:
format: int32
type: integer
Expand Down
Loading

0 comments on commit fa7373b

Please sign in to comment.