Skip to content

Commit

Permalink
feat: Add support for Kafka async producer. Fixes #216 (#220)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec authored Aug 9, 2021
1 parent 86835e1 commit 00c3178
Show file tree
Hide file tree
Showing 21 changed files with 611 additions and 311 deletions.
751 changes: 479 additions & 272 deletions api/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion api/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions api/v1alpha1/get_pod_req.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package v1alpha1

import (
"time"

corev1 "k8s.io/api/core/v1"
)

type GetPodSpecReq struct {
ClusterName string `protobuf:"bytes,9,opt,name=clusterName"`
PipelineName string `protobuf:"bytes,1,opt,name=pipelineName"`
Namespace string `protobuf:"bytes,2,opt,name=namespace"`
Replica int32 `protobuf:"varint,3,opt,name=replica"`
ImageFormat string `protobuf:"bytes,4,opt,name=imageFormat"`
RunnerImage string `protobuf:"bytes,5,opt,name=runnerImage"`
PullPolicy corev1.PullPolicy `protobuf:"bytes,6,opt,name=pullPolicy,casttype=k8s.io/api/core/v1.PullPolicy"`
UpdateInterval time.Duration `protobuf:"varint,7,opt,name=updateInterval,casttype=time.Duration"`
StepStatus StepStatus `protobuf:"bytes,8,opt,name=stepStatus"`
}
6 changes: 6 additions & 0 deletions api/v1alpha1/kafka_sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package v1alpha1

type KafkaSink struct {
Kafka `json:",inline" protobuf:"bytes,1,opt,name=kafka"`
Async bool `json:"async,omitempty" protobuf:"varint,2,opt,name=async"`
}
14 changes: 7 additions & 7 deletions api/v1alpha1/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package v1alpha1

type Sink struct {
// +kubebuilder:default=default
Name string `json:"name,omitempty" protobuf:"bytes,1,opt,name=name"`
STAN *STAN `json:"stan,omitempty" protobuf:"bytes,2,opt,name=stan"`
Kafka *Kafka `json:"kafka,omitempty" protobuf:"bytes,3,opt,name=kafka"`
Log *Log `json:"log,omitempty" protobuf:"bytes,4,opt,name=log"`
HTTP *HTTPSink `json:"http,omitempty" protobuf:"bytes,5,opt,name=http"`
S3 *S3Sink `json:"s3,omitempty" protobuf:"bytes,6,opt,name=s3"`
DB *DBSink `json:"db,omitempty" protobuf:"bytes,7,opt,name=db"`
Name string `json:"name,omitempty" protobuf:"bytes,1,opt,name=name"`
STAN *STAN `json:"stan,omitempty" protobuf:"bytes,2,opt,name=stan"`
Kafka *KafkaSink `json:"kafka,omitempty" protobuf:"bytes,3,opt,name=kafka"`
Log *Log `json:"log,omitempty" protobuf:"bytes,4,opt,name=log"`
HTTP *HTTPSink `json:"http,omitempty" protobuf:"bytes,5,opt,name=http"`
S3 *S3Sink `json:"s3,omitempty" protobuf:"bytes,6,opt,name=s3"`
DB *DBSink `json:"db,omitempty" protobuf:"bytes,7,opt,name=db"`
}
14 changes: 0 additions & 14 deletions api/v1alpha1/step_spec.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package v1alpha1

import (
"time"

corev1 "k8s.io/api/core/v1"
)

Expand Down Expand Up @@ -44,18 +42,6 @@ type StepSpec struct {
Tolerations []corev1.Toleration `json:"tolerations,omitempty" protobuf:"bytes,19,rep,name=tolerations"`
}

type GetPodSpecReq struct {
ClusterName string `protobuf:"bytes,9,opt,name=clusterName"`
PipelineName string `protobuf:"bytes,1,opt,name=pipelineName"`
Namespace string `protobuf:"bytes,2,opt,name=namespace"`
Replica int32 `protobuf:"varint,3,opt,name=replica"`
ImageFormat string `protobuf:"bytes,4,opt,name=imageFormat"`
RunnerImage string `protobuf:"bytes,5,opt,name=runnerImage"`
PullPolicy corev1.PullPolicy `protobuf:"bytes,6,opt,name=pullPolicy,casttype=k8s.io/api/core/v1.PullPolicy"`
UpdateInterval time.Duration `protobuf:"varint,7,opt,name=updateInterval,casttype=time.Duration"`
StepStatus StepStatus `protobuf:"bytes,8,opt,name=stepStatus"`
}

func (in StepSpec) GetIn() *Interface {
if in.Container != nil {
return in.Container.GetIn()
Expand Down
18 changes: 17 additions & 1 deletion api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions config/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1296,6 +1296,8 @@ spec:
type: object
kafka:
properties:
async:
type: boolean
brokers:
items:
type: string
Expand Down Expand Up @@ -4797,6 +4799,8 @@ spec:
type: object
kafka:
properties:
async:
type: boolean
brokers:
items:
type: string
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/dataflow.argoproj.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1291,6 +1291,8 @@ spec:
type: object
kafka:
properties:
async:
type: boolean
brokers:
items:
type: string
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/dataflow.argoproj.io_steps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1248,6 +1248,8 @@ spec:
type: object
kafka:
properties:
async:
type: boolean
brokers:
items:
type: string
Expand Down
4 changes: 4 additions & 0 deletions config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1296,6 +1296,8 @@ spec:
type: object
kafka:
properties:
async:
type: boolean
brokers:
items:
type: string
Expand Down Expand Up @@ -4797,6 +4799,8 @@ spec:
type: object
kafka:
properties:
async:
type: boolean
brokers:
items:
type: string
Expand Down
4 changes: 4 additions & 0 deletions config/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1296,6 +1296,8 @@ spec:
type: object
kafka:
properties:
async:
type: boolean
brokers:
items:
type: string
Expand Down Expand Up @@ -4797,6 +4799,8 @@ spec:
type: object
kafka:
properties:
async:
type: boolean
brokers:
items:
type: string
Expand Down
6 changes: 6 additions & 0 deletions config/quick-start.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,7 @@ spec:
- golang1-16
- java16
- python3-9
- node16
type: string
source:
type: string
Expand Down Expand Up @@ -1295,6 +1296,8 @@ spec:
type: object
kafka:
properties:
async:
type: boolean
brokers:
items:
type: string
Expand Down Expand Up @@ -4198,6 +4201,7 @@ spec:
- golang1-16
- java16
- python3-9
- node16
type: string
source:
type: string
Expand Down Expand Up @@ -4795,6 +4799,8 @@ spec:
type: object
kafka:
properties:
async:
type: boolean
brokers:
items:
type: string
Expand Down
2 changes: 1 addition & 1 deletion runner/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func enrichSinks(ctx context.Context) error {
}
sink.STAN = x
} else if x := sink.Kafka; x != nil {
if err := enrichKafka(ctx, x); err != nil {
if err := enrichKafka(ctx, &x.Kafka); err != nil {
return err
}
sink.Kafka = x
Expand Down
45 changes: 36 additions & 9 deletions runner/sidecar/sink/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kafka

import (
"context"
"io"

corev1 "k8s.io/client-go/kubernetes/typed/core/v1"

Expand All @@ -11,27 +12,53 @@ import (
"github.com/argoproj-labs/argo-dataflow/runner/sidecar/sink"
)

type producer interface {
SendMessage(msg *sarama.ProducerMessage) error
io.Closer
}

type asyncProducer struct{ sarama.AsyncProducer }

func (s asyncProducer) SendMessage(msg *sarama.ProducerMessage) error {
s.AsyncProducer.Input() <- msg
return nil
}

type syncProducer struct{ sarama.SyncProducer }

func (s syncProducer) SendMessage(msg *sarama.ProducerMessage) error {
_, _, err := s.SyncProducer.SendMessage(msg)
return err
}

type kafkaSink struct {
producer sarama.SyncProducer
producer producer
topic string
}

func New(ctx context.Context, secretInterface corev1.SecretInterface, x dfv1.Kafka) (sink.Interface, error) {
func New(ctx context.Context, secretInterface corev1.SecretInterface, x dfv1.KafkaSink) (sink.Interface, error) {
config, err := kafka.GetConfig(ctx, secretInterface, x.KafkaConfig)
if err != nil {
return nil, err
}
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer(x.Brokers, config)
if err != nil {
return nil, err
if x.Async {
producer, err := sarama.NewAsyncProducer(x.Brokers, config)
if err != nil {
return nil, err
}
return kafkaSink{asyncProducer{producer}, x.Topic}, nil
} else {
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer(x.Brokers, config)
if err != nil {
return nil, err
}
return kafkaSink{syncProducer{producer}, x.Topic}, nil
}
return kafkaSink{producer, x.Topic}, nil
}

func (h kafkaSink) Sink(msg []byte) error {
_, _, err := h.producer.SendMessage(&sarama.ProducerMessage{Value: sarama.ByteEncoder(msg), Topic: h.topic})
return err
return h.producer.SendMessage(&sarama.ProducerMessage{Value: sarama.ByteEncoder(msg), Topic: h.topic})
}

func (h kafkaSink) Close() error {
Expand Down
2 changes: 1 addition & 1 deletion test/kafka-e2e/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestKafka(t *testing.T) {
Name: "main",
Cat: &Cat{},
Sources: []Source{{Kafka: &KafkaSource{Kafka: Kafka{Topic: topic}}}},
Sinks: []Sink{{Kafka: &Kafka{Topic: sinkTopic}}},
Sinks: []Sink{{Kafka: &KafkaSink{Kafka: Kafka{Topic: sinkTopic}}}},
}},
},
})
Expand Down
2 changes: 1 addition & 1 deletion test/kafka-fmea/kafka_fmea_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestKafkaFMEA_PodDeletedDisruption(t *testing.T) {
Name: "main",
Cat: &Cat{},
Sources: []Source{{Kafka: &KafkaSource{Kafka: Kafka{Topic: topic}}}},
Sinks: []Sink{{Kafka: &Kafka{Topic: sinkTopic}}},
Sinks: []Sink{{Kafka: &KafkaSink{Kafka: Kafka{Topic: sinkTopic}}}},
}},
},
})
Expand Down
5 changes: 4 additions & 1 deletion test/kafka-stress/kafka_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ func TestKafkaSinkStress(t *testing.T) {
Cat: &Cat{},
Replicas: Params.Replicas,
Sources: []Source{{Kafka: &KafkaSource{Kafka: Kafka{Topic: topic}}}},
Sinks: []Sink{{Kafka: &Kafka{Topic: sinkTopic}}, {Name: "log", Log: &Log{}}},
Sinks: []Sink{
{Kafka: &KafkaSink{Async: Params.Async, Kafka: Kafka{Topic: sinkTopic}}},
{Name: "log", Log: &Log{}},
},
}},
},
})
Expand Down
3 changes: 2 additions & 1 deletion test/kafka-stress/test-results.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"TestKafkaSinkStress/currentContext=docker-desktop,replicas=1,n=10000.tps": 50,
"TestKafkaSinkStress/currentContext=docker-desktop,replicas=1,n=10000,async=true.tps": 400,
"TestKafkaSinkStress/currentContext=docker-desktop,replicas=1,n=10000.tps": 250,
"TestKafkaSinkStress/replicas=1.tps": 100,
"TestKafkaSinkStress/replicas=2.tps": 250,
"TestKafkaSourceStress/currentContext=docker-desktop,replicas=1,n=10000.tps": 550,
Expand Down
5 changes: 4 additions & 1 deletion test/stress/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package stress

import (
"log"
"os"
"time"

sharedutil "github.com/argoproj-labs/argo-dataflow/shared/util"
Expand All @@ -13,12 +14,14 @@ var Params = struct {
N int
Replicas uint32
Timeout time.Duration
Async bool
}{
N: sharedutil.GetEnvInt("N", 10000),
Replicas: uint32(sharedutil.GetEnvInt("REPLICAS", 1)),
Timeout: sharedutil.GetEnvDuration("TIMEOUT", 3*time.Minute),
Async: os.Getenv("ASYNC") == "true",
}

func init() {
log.Printf("replicas=%d,n=%d\n", Params.Replicas, Params.N)
log.Printf("replicas=%d,n=%d,async=%v\n", Params.Replicas, Params.N, Params.Async)
}
6 changes: 5 additions & 1 deletion test/stress/tps.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ func StartTPSReporter(t *testing.T, step, prefix string, n int) (stopTPSLogger f
if start == nil || end == nil {
panic("failed to calculate start time or end time")
}
setTestResult(fmt.Sprintf("%s/currentContext=%s,replicas=%d,n=%d", t.Name(), currentContext, Params.Replicas, Params.N), "tps", roundToNearest50(value()))
textName := fmt.Sprintf("%s/currentContext=%s,replicas=%d,n=%d", t.Name(), currentContext, Params.Replicas, Params.N)
if Params.Async {
textName += ",async=true"
}
setTestResult(textName, "tps", roundToNearest50(value()))
}
}

Expand Down

0 comments on commit 00c3178

Please sign in to comment.