Skip to content

Commit

Permalink
fix: bug where we were not getting kafka secret
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Jun 2, 2021
1 parent 06b78f7 commit 6156b57
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 29 deletions.
12 changes: 7 additions & 5 deletions examples/dataflow-kafka-default-secret.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ metadata:
The secret must be named `dataflow-kafka-${name}`.
# Brokers as a comma-separated list
brokers: broker.a,broker.b
# Enable TLS
net.tls: ""
# Kafka version
version: "2.0.0"
[Learn about configuration](../docs/CONFIGURATION.md)
dataflow.argoproj.io/name: Default Kafka config
name: dataflow-kafka-default
stringData:
# Comma-separated list
brokers: kafka-0.broker:9092
# If you need TLS
# net.tls: ""
# If you need version
# version: "2.0.0"
55 changes: 31 additions & 24 deletions runner/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"sync"
"time"

corev1 "k8s.io/api/core/v1"

"k8s.io/apimachinery/pkg/api/resource"

"github.com/Shopify/sarama"
Expand Down Expand Up @@ -192,16 +194,9 @@ func enrichSpec(ctx context.Context) error {
return err
}
} else {
x.NATSURL = dfv1.StringOr(x.NATSURL, string(secret.Data["natsUrl"]))
x.ClusterID = dfv1.StringOr(x.ClusterID, string(secret.Data["clusterId"]))
x.SubjectPrefix = dfv1.SubjectPrefixOr(x.SubjectPrefix, dfv1.SubjectPrefix(secret.Data["subjectPrefix"]))
}
switch x.SubjectPrefix {
case dfv1.SubjectPrefixNamespaceName:
x.Subject = fmt.Sprintf("%s.%s", namespace, x.Subject)
case dfv1.SubjectPrefixNamespacedPipelineName:
x.Subject = fmt.Sprintf("%s.%s.%s", namespace, pipelineName, x.Subject)
stanFromSecret(x, secret)
}
subjectivefStan(x)
source.STAN = x
} else if x := source.Kafka; x != nil {
secret, err := secrets.Get(ctx, "dataflow-kafka-"+x.Name, metav1.GetOptions{})
Expand All @@ -210,7 +205,7 @@ func enrichSpec(ctx context.Context) error {
return err
}
} else {
x.Brokers = dfv1.StringsOr(x.Brokers, strings.Split(string(secret.Data["brokers"]), ","))
kafkaFromSecret(x, secret)
}
source.Kafka = x
}
Expand All @@ -225,16 +220,9 @@ func enrichSpec(ctx context.Context) error {
return err
}
} else {
s.NATSURL = dfv1.StringOr(s.NATSURL, string(secret.Data["natsUrl"]))
s.ClusterID = dfv1.StringOr(s.ClusterID, string(secret.Data["clusterId"]))
s.SubjectPrefix = dfv1.SubjectPrefixOr(s.SubjectPrefix, dfv1.SubjectPrefix(secret.Data["subjectPrefix"]))
}
switch s.SubjectPrefix {
case dfv1.SubjectPrefixNamespaceName:
s.Subject = fmt.Sprintf("%s.%s", namespace, s.Subject)
case dfv1.SubjectPrefixNamespacedPipelineName:
s.Subject = fmt.Sprintf("%s.%s.%s", namespace, pipelineName, s.Subject)
stanFromSecret(s, secret)
}
subjectivefStan(s)
sink.STAN = s
} else if k := sink.Kafka; k != nil {
secret, err := secrets.Get(ctx, "dataflow-kafka-"+k.Name, metav1.GetOptions{})
Expand All @@ -243,11 +231,7 @@ func enrichSpec(ctx context.Context) error {
return err
}
} else {
k.Brokers = dfv1.StringsOr(k.Brokers, strings.Split(string(secret.Data["brokers"]), ","))
k.Version = dfv1.StringOr(k.Version, string(secret.Data["version"]))
if _, ok := secret.Data["net.tls"]; ok {
k.NET = &dfv1.KafkaNET{TLS: &dfv1.TLS{}}
}
kafkaFromSecret(k, secret)
}
sink.Kafka = k
}
Expand All @@ -257,6 +241,29 @@ func enrichSpec(ctx context.Context) error {
return nil
}

func subjectivefStan(x *dfv1.STAN) {
switch x.SubjectPrefix {
case dfv1.SubjectPrefixNamespaceName:
x.Subject = fmt.Sprintf("%s.%s", namespace, x.Subject)
case dfv1.SubjectPrefixNamespacedPipelineName:
x.Subject = fmt.Sprintf("%s.%s.%s", namespace, pipelineName, x.Subject)
}
}

func stanFromSecret(s *dfv1.STAN, secret *corev1.Secret) {
s.NATSURL = dfv1.StringOr(s.NATSURL, string(secret.Data["natsUrl"]))
s.ClusterID = dfv1.StringOr(s.ClusterID, string(secret.Data["clusterId"]))
s.SubjectPrefix = dfv1.SubjectPrefixOr(s.SubjectPrefix, dfv1.SubjectPrefix(secret.Data["subjectPrefix"]))
}

func kafkaFromSecret(k *dfv1.Kafka, secret *corev1.Secret) {
k.Brokers = dfv1.StringsOr(k.Brokers, strings.Split(string(secret.Data["brokers"]), ","))
k.Version = dfv1.StringOr(k.Version, string(secret.Data["version"]))
if _, ok := secret.Data["net.tls"]; ok {
k.NET = &dfv1.KafkaNET{TLS: &dfv1.TLS{}}
}
}

func connectSources(ctx context.Context, toMain func([]byte) error) error {
crn := cron.New(
cron.WithParser(cron.NewParser(cron.SecondOptional|cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor)),
Expand Down

0 comments on commit 6156b57

Please sign in to comment.