diff --git a/docs/reference-config.yaml b/docs/reference-config.yaml index 501eee5..e449f63 100644 --- a/docs/reference-config.yaml +++ b/docs/reference-config.yaml @@ -30,6 +30,12 @@ kafka: caFilepath: "" certFilepath: "" keyFilepath: "" + # base64 encoded tls CA, cannot be set if 'caFilepath' is set + ca: "" + # base64 encoded tls cert, cannot be set if 'certFilepath' is set + cert: "" + # base64 encoded tls key, cannot be set if 'keyFilepath' is set + key: "" passphrase: "" insecureSkipTlsVerify: false diff --git a/kafka/client_config_helper.go b/kafka/client_config_helper.go index 9977165..71f96fb 100644 --- a/kafka/client_config_helper.go +++ b/kafka/client_config_helper.go @@ -114,10 +114,14 @@ func NewKgoConfig(cfg Config, logger *zap.Logger) ([]kgo.Opt, error) { var caCertPool *x509.CertPool if cfg.TLS.Enabled { // Root CA - if cfg.TLS.CaFilepath != "" { - ca, err := ioutil.ReadFile(cfg.TLS.CaFilepath) - if err != nil { - return nil, err + if cfg.TLS.CaFilepath != "" || len(cfg.TLS.Ca) > 0 { + ca := cfg.TLS.Ca + if cfg.TLS.CaFilepath != "" { + caBytes, err := ioutil.ReadFile(cfg.TLS.CaFilepath) + if err != nil { + return nil, fmt.Errorf("failed to load ca cert: %w", err) + } + ca = caBytes } caCertPool = x509.NewCertPool() isSuccessful := caCertPool.AppendCertsFromPEM(ca) @@ -128,16 +132,26 @@ func NewKgoConfig(cfg Config, logger *zap.Logger) ([]kgo.Opt, error) { // If configured load TLS cert & key - Mutual TLS var certificates []tls.Certificate - if cfg.TLS.CertFilepath != "" && cfg.TLS.KeyFilepath != "" { + hasCertFile := cfg.TLS.CertFilepath != "" || len(cfg.TLS.Cert) > 0 + hasKeyFile := cfg.TLS.KeyFilepath != "" || len(cfg.TLS.Key) > 0 + if hasCertFile || hasKeyFile { + cert := cfg.TLS.Cert + privateKey := cfg.TLS.Key // 1. Read certificates - cert, err := ioutil.ReadFile(cfg.TLS.CertFilepath) - if err != nil { - return nil, fmt.Errorf("failed to TLS certificate: %w", err) + if cfg.TLS.CertFilepath != "" { + certBytes, err := ioutil.ReadFile(cfg.TLS.CertFilepath) + if err != nil { + return nil, fmt.Errorf("failed to TLS certificate: %w", err) + } + cert = certBytes } - privateKey, err := ioutil.ReadFile(cfg.TLS.KeyFilepath) - if err != nil { - return nil, fmt.Errorf("failed to read TLS key: %w", err) + if cfg.TLS.KeyFilepath != "" { + keyBytes, err := ioutil.ReadFile(cfg.TLS.KeyFilepath) + if err != nil { + return nil, fmt.Errorf("failed to read TLS key: %w", err) + } + privateKey = keyBytes } // 2. Check if private key needs to be decrypted. Decrypt it if passphrase is given, otherwise return error diff --git a/kafka/config_tls.go b/kafka/config_tls.go index 4c0657c..6a9b455 100644 --- a/kafka/config_tls.go +++ b/kafka/config_tls.go @@ -1,11 +1,16 @@ package kafka +import "fmt" + // TLSConfig to connect to Kafka via TLS type TLSConfig struct { Enabled bool `koanf:"enabled"` CaFilepath string `koanf:"caFilepath"` CertFilepath string `koanf:"certFilepath"` KeyFilepath string `koanf:"keyFilepath"` + Ca []byte `koanf:"ca"` + Cert []byte `koanf:"cert"` + Key []byte `koanf:"key"` Passphrase string `koanf:"passphrase"` InsecureSkipTLSVerify bool `koanf:"insecureSkipTlsVerify"` } @@ -15,5 +20,15 @@ func (c *TLSConfig) SetDefaults() { } func (c *TLSConfig) Validate() error { + if len(c.CaFilepath) > 0 && len(c.Ca) > 0 { + return fmt.Errorf("config keys 'caFilepath' and 'ca' are both set. only one can be used at the same time") + } + if len(c.CertFilepath) > 0 && len(c.Cert) > 0 { + return fmt.Errorf("config keys 'certFilepath' and 'cert' are both set. only one can be used at the same time") + } + + if len(c.KeyFilepath) > 0 && len(c.Key) > 0 { + return fmt.Errorf("config keys 'keyFilepath' and 'key' are both set. only one can be used at the same time") + } return nil }