diff --git a/examples/dataflow-kafka-default-secret.yaml b/examples/dataflow-kafka-default-secret.yaml index 6e184f8d..2edee5fd 100644 --- a/examples/dataflow-kafka-default-secret.yaml +++ b/examples/dataflow-kafka-default-secret.yaml @@ -7,13 +7,15 @@ metadata: The secret must be named `dataflow-kafka-${name}`. + # Brokers as a comma-separated list + brokers: broker.a,broker.b + # Enable TLS + net.tls: "" + # Kafka version + version: "2.0.0" + [Learn about configuration](../docs/CONFIGURATION.md) dataflow.argoproj.io/name: Default Kafka config name: dataflow-kafka-default stringData: - # Comma-separated list brokers: kafka-0.broker:9092 - # If you need TLS - # net.tls: "" - # If you need version - # version: "2.0.0" diff --git a/runner/sidecar/sidecar.go b/runner/sidecar/sidecar.go index ee49fffd..5a7df349 100644 --- a/runner/sidecar/sidecar.go +++ b/runner/sidecar/sidecar.go @@ -13,6 +13,8 @@ import ( "sync" "time" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" "github.com/Shopify/sarama" @@ -192,16 +194,9 @@ func enrichSpec(ctx context.Context) error { return err } } else { - x.NATSURL = dfv1.StringOr(x.NATSURL, string(secret.Data["natsUrl"])) - x.ClusterID = dfv1.StringOr(x.ClusterID, string(secret.Data["clusterId"])) - x.SubjectPrefix = dfv1.SubjectPrefixOr(x.SubjectPrefix, dfv1.SubjectPrefix(secret.Data["subjectPrefix"])) - } - switch x.SubjectPrefix { - case dfv1.SubjectPrefixNamespaceName: - x.Subject = fmt.Sprintf("%s.%s", namespace, x.Subject) - case dfv1.SubjectPrefixNamespacedPipelineName: - x.Subject = fmt.Sprintf("%s.%s.%s", namespace, pipelineName, x.Subject) + stanFromSecret(x, secret) } + subjectivefStan(x) source.STAN = x } else if x := source.Kafka; x != nil { secret, err := secrets.Get(ctx, "dataflow-kafka-"+x.Name, metav1.GetOptions{}) @@ -210,7 +205,7 @@ func enrichSpec(ctx context.Context) error { return err } } else { - x.Brokers = dfv1.StringsOr(x.Brokers, strings.Split(string(secret.Data["brokers"]), ",")) + kafkaFromSecret(x, secret) } source.Kafka = x } @@ -225,16 +220,9 @@ func enrichSpec(ctx context.Context) error { return err } } else { - s.NATSURL = dfv1.StringOr(s.NATSURL, string(secret.Data["natsUrl"])) - s.ClusterID = dfv1.StringOr(s.ClusterID, string(secret.Data["clusterId"])) - s.SubjectPrefix = dfv1.SubjectPrefixOr(s.SubjectPrefix, dfv1.SubjectPrefix(secret.Data["subjectPrefix"])) - } - switch s.SubjectPrefix { - case dfv1.SubjectPrefixNamespaceName: - s.Subject = fmt.Sprintf("%s.%s", namespace, s.Subject) - case dfv1.SubjectPrefixNamespacedPipelineName: - s.Subject = fmt.Sprintf("%s.%s.%s", namespace, pipelineName, s.Subject) + stanFromSecret(s, secret) } + subjectivefStan(s) sink.STAN = s } else if k := sink.Kafka; k != nil { secret, err := secrets.Get(ctx, "dataflow-kafka-"+k.Name, metav1.GetOptions{}) @@ -243,11 +231,7 @@ func enrichSpec(ctx context.Context) error { return err } } else { - k.Brokers = dfv1.StringsOr(k.Brokers, strings.Split(string(secret.Data["brokers"]), ",")) - k.Version = dfv1.StringOr(k.Version, string(secret.Data["version"])) - if _, ok := secret.Data["net.tls"]; ok { - k.NET = &dfv1.KafkaNET{TLS: &dfv1.TLS{}} - } + kafkaFromSecret(k, secret) } sink.Kafka = k } @@ -257,6 +241,29 @@ func enrichSpec(ctx context.Context) error { return nil } +func subjectivefStan(x *dfv1.STAN) { + switch x.SubjectPrefix { + case dfv1.SubjectPrefixNamespaceName: + x.Subject = fmt.Sprintf("%s.%s", namespace, x.Subject) + case dfv1.SubjectPrefixNamespacedPipelineName: + x.Subject = fmt.Sprintf("%s.%s.%s", namespace, pipelineName, x.Subject) + } +} + +func stanFromSecret(s *dfv1.STAN, secret *corev1.Secret) { + s.NATSURL = dfv1.StringOr(s.NATSURL, string(secret.Data["natsUrl"])) + s.ClusterID = dfv1.StringOr(s.ClusterID, string(secret.Data["clusterId"])) + s.SubjectPrefix = dfv1.SubjectPrefixOr(s.SubjectPrefix, dfv1.SubjectPrefix(secret.Data["subjectPrefix"])) +} + +func kafkaFromSecret(k *dfv1.Kafka, secret *corev1.Secret) { + k.Brokers = dfv1.StringsOr(k.Brokers, strings.Split(string(secret.Data["brokers"]), ",")) + k.Version = dfv1.StringOr(k.Version, string(secret.Data["version"])) + if _, ok := secret.Data["net.tls"]; ok { + k.NET = &dfv1.KafkaNET{TLS: &dfv1.TLS{}} + } +} + func connectSources(ctx context.Context, toMain func([]byte) error) error { crn := cron.New( cron.WithParser(cron.NewParser(cron.SecondOptional|cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor)),