From 7a3ff7e78a8ca68ec1012b3d2a7c4d1b611547b3 Mon Sep 17 00:00:00 2001 From: Daniel Mai Date: Tue, 13 Jul 2021 18:41:00 -0700 Subject: [PATCH 1/3] feat(cdc): Add superflag to enable TLS without CA or certs. This will attempt to connect to Kafka over TLS using the system certs. --- worker/server_state.go | 2 +- worker/sink_handler.go | 12 +++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/worker/server_state.go b/worker/server_state.go index 5607903304d..1e6e5f12611 100644 --- a/worker/server_state.go +++ b/worker/server_state.go @@ -42,7 +42,7 @@ const ( BadgerDefaults = `compression=snappy; numgoroutines=8;` CacheDefaults = `size-mb=1024; percentage=0,65,35;` CDCDefaults = `file=; kafka=; sasl_user=; sasl_password=; ca_cert=; client_cert=; ` + - `client_key=; sasl-mechanism=PLAIN;` + `client_key=; sasl-mechanism=PLAIN; tls=false;` GraphQLDefaults = `introspection=true; debug=false; extensions=true; poll-interval=1s; ` + `lambda-url=;` LimitDefaults = `mutations=allow; query-edge=1000000; normalize-node=10000; ` + diff --git a/worker/sink_handler.go b/worker/sink_handler.go index b43b261b7fa..b0f33f7c9f5 100644 --- a/worker/sink_handler.go +++ b/worker/sink_handler.go @@ -86,7 +86,17 @@ func newKafkaSink(config *z.SuperFlag) (Sink, error) { saramaConf.Producer.Return.Successes = true saramaConf.Producer.Return.Errors = true - if config.GetPath("ca-cert") != "" { + if config.GetBool("tls") && config.GetPath("ca-cert") != "" { + tlsCfg := &tls.Config{} + var pool *x509.CertPool + var err error + if pool, err = x509.SystemCertPool(); err != nil { + return nil, err + } + tlsCfg.RootCAs = pool + saramaConf.Net.TLS.Enable = true + saramaConf.Net.TLS.Config = tlsCfg + } else if config.GetPath("ca-cert") != "" { tlsCfg := &tls.Config{} var pool *x509.CertPool var err error From 15c8cbce42fc7db4afc9cc256948759e500eae5f Mon Sep 17 00:00:00 2001 From: Daniel Mai Date: Tue, 13 Jul 2021 18:50:18 -0700 Subject: [PATCH 2/3] Fix condition. --- worker/sink_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker/sink_handler.go b/worker/sink_handler.go index b0f33f7c9f5..8fba744b6c6 100644 --- a/worker/sink_handler.go +++ b/worker/sink_handler.go @@ -86,7 +86,7 @@ func newKafkaSink(config *z.SuperFlag) (Sink, error) { saramaConf.Producer.Return.Successes = true saramaConf.Producer.Return.Errors = true - if config.GetBool("tls") && config.GetPath("ca-cert") != "" { + if config.GetBool("tls") && config.GetPath("ca-cert") == "" { tlsCfg := &tls.Config{} var pool *x509.CertPool var err error From b36f9b320e57e99405ae7c70985656511eda0ffb Mon Sep 17 00:00:00 2001 From: Daniel Mai Date: Wed, 14 Jul 2021 11:01:11 -0700 Subject: [PATCH 3/3] Add helper function TLSBaseConfig. Sets the min TLS version to v1.2 along with the minimum cipher suites. --- worker/sink_handler.go | 4 ++-- x/tls_helper.go | 42 ++++++++++++++++++++++++------------------ 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/worker/sink_handler.go b/worker/sink_handler.go index 8fba744b6c6..bc25783825e 100644 --- a/worker/sink_handler.go +++ b/worker/sink_handler.go @@ -87,7 +87,7 @@ func newKafkaSink(config *z.SuperFlag) (Sink, error) { saramaConf.Producer.Return.Errors = true if config.GetBool("tls") && config.GetPath("ca-cert") == "" { - tlsCfg := &tls.Config{} + tlsCfg := x.TLSBaseConfig() var pool *x509.CertPool var err error if pool, err = x509.SystemCertPool(); err != nil { @@ -97,7 +97,7 @@ func newKafkaSink(config *z.SuperFlag) (Sink, error) { saramaConf.Net.TLS.Enable = true saramaConf.Net.TLS.Config = tlsCfg } else if config.GetPath("ca-cert") != "" { - tlsCfg := &tls.Config{} + tlsCfg := x.TLSBaseConfig() var pool *x509.CertPool var err error if pool, err = x509.SystemCertPool(); err != nil { diff --git a/x/tls_helper.go b/x/tls_helper.go index 27536f8ff9a..df5e7722e1c 100644 --- a/x/tls_helper.go +++ b/x/tls_helper.go @@ -263,11 +263,34 @@ func setupClientAuth(authType string) (tls.ClientAuthType, error) { return tls.NoClientCert, nil } +// TLSBaseConfig returns a *tls.Config with the base set of security +// requirements (minimum TLS v1.2 and set of cipher suites) +func TLSBaseConfig() *tls.Config { + tlsCfg := new(tls.Config) + tlsCfg.MinVersion = tls.VersionTLS12 + tlsCfg.CipherSuites = []uint16{ + tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305, + tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305, + tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA, + tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA, + tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA, + tls.TLS_RSA_WITH_AES_128_GCM_SHA256, + tls.TLS_RSA_WITH_AES_256_GCM_SHA384, + tls.TLS_RSA_WITH_AES_256_CBC_SHA, + } + return tlsCfg +} + // GenerateServerTLSConfig creates and returns a new *tls.Config with the // configuration provided. func GenerateServerTLSConfig(config *TLSHelperConfig) (tlsCfg *tls.Config, err error) { if config.CertRequired { - tlsCfg = new(tls.Config) + tlsCfg = TLSBaseConfig() cert, err := tls.LoadX509KeyPair(config.Cert, config.Key) if err != nil { return nil, err @@ -286,23 +309,6 @@ func GenerateServerTLSConfig(config *TLSHelperConfig) (tlsCfg *tls.Config, err e } tlsCfg.ClientAuth = auth - tlsCfg.MinVersion = tls.VersionTLS12 - tlsCfg.CipherSuites = []uint16{ - tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, - tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, - tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, - tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, - tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305, - tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305, - tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA, - tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA, - tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA, - tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA, - tls.TLS_RSA_WITH_AES_128_GCM_SHA256, - tls.TLS_RSA_WITH_AES_256_GCM_SHA384, - tls.TLS_RSA_WITH_AES_256_CBC_SHA, - } - return tlsCfg, nil } return nil, nil