Skip to content

Commit

Permalink
config: add possibility to specify tls as bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
alenkacz committed May 22, 2022
1 parent 4fbec9a commit a066b27
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 11 deletions.
6 changes: 6 additions & 0 deletions docs/reference-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ kafka:
caFilepath: ""
certFilepath: ""
keyFilepath: ""
# base64 encoded tls CA, cannot be set if 'caFilepath' is set
ca: ""
# base64 encoded tls cert, cannot be set if 'certFilepath' is set
cert: ""
# base64 encoded tls key, cannot be set if 'keyFilepath' is set
key: ""
passphrase: ""
insecureSkipTlsVerify: false

Expand Down
36 changes: 25 additions & 11 deletions kafka/client_config_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,14 @@ func NewKgoConfig(cfg Config, logger *zap.Logger) ([]kgo.Opt, error) {
var caCertPool *x509.CertPool
if cfg.TLS.Enabled {
// Root CA
if cfg.TLS.CaFilepath != "" {
ca, err := ioutil.ReadFile(cfg.TLS.CaFilepath)
if err != nil {
return nil, err
if cfg.TLS.CaFilepath != "" || len(cfg.TLS.Ca) > 0 {
ca := cfg.TLS.Ca
if cfg.TLS.CaFilepath != "" {
caBytes, err := ioutil.ReadFile(cfg.TLS.CaFilepath)
if err != nil {
return nil, fmt.Errorf("failed to load ca cert: %w", err)
}
ca = caBytes
}
caCertPool = x509.NewCertPool()
isSuccessful := caCertPool.AppendCertsFromPEM(ca)
Expand All @@ -128,16 +132,26 @@ func NewKgoConfig(cfg Config, logger *zap.Logger) ([]kgo.Opt, error) {

// If configured load TLS cert & key - Mutual TLS
var certificates []tls.Certificate
if cfg.TLS.CertFilepath != "" && cfg.TLS.KeyFilepath != "" {
hasCertFile := cfg.TLS.CertFilepath != "" || len(cfg.TLS.Cert) > 0
hasKeyFile := cfg.TLS.KeyFilepath != "" || len(cfg.TLS.Key) > 0
if hasCertFile || hasKeyFile {
cert := cfg.TLS.Cert
privateKey := cfg.TLS.Key
// 1. Read certificates
cert, err := ioutil.ReadFile(cfg.TLS.CertFilepath)
if err != nil {
return nil, fmt.Errorf("failed to TLS certificate: %w", err)
if cfg.TLS.CertFilepath != "" {
certBytes, err := ioutil.ReadFile(cfg.TLS.CertFilepath)
if err != nil {
return nil, fmt.Errorf("failed to TLS certificate: %w", err)
}
cert = certBytes
}

privateKey, err := ioutil.ReadFile(cfg.TLS.KeyFilepath)
if err != nil {
return nil, fmt.Errorf("failed to read TLS key: %w", err)
if cfg.TLS.KeyFilepath != "" {
keyBytes, err := ioutil.ReadFile(cfg.TLS.KeyFilepath)
if err != nil {
return nil, fmt.Errorf("failed to read TLS key: %w", err)
}
privateKey = keyBytes
}

// 2. Check if private key needs to be decrypted. Decrypt it if passphrase is given, otherwise return error
Expand Down
15 changes: 15 additions & 0 deletions kafka/config_tls.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package kafka

import "fmt"

// TLSConfig to connect to Kafka via TLS
type TLSConfig struct {
Enabled bool `koanf:"enabled"`
CaFilepath string `koanf:"caFilepath"`
CertFilepath string `koanf:"certFilepath"`
KeyFilepath string `koanf:"keyFilepath"`
Ca []byte `koanf:"ca"`
Cert []byte `koanf:"cert"`
Key []byte `koanf:"key"`
Passphrase string `koanf:"passphrase"`
InsecureSkipTLSVerify bool `koanf:"insecureSkipTlsVerify"`
}
Expand All @@ -15,5 +20,15 @@ func (c *TLSConfig) SetDefaults() {
}

func (c *TLSConfig) Validate() error {
if len(c.CaFilepath) > 0 && len(c.Ca) > 0 {
return fmt.Errorf("config keys 'caFilepath' and 'ca' are both set. only one can be used at the same time")
}
if len(c.CertFilepath) > 0 && len(c.Cert) > 0 {
return fmt.Errorf("config keys 'certFilepath' and 'cert' are both set. only one can be used at the same time")
}

if len(c.KeyFilepath) > 0 && len(c.Key) > 0 {
return fmt.Errorf("config keys 'keyFilepath' and 'key' are both set. only one can be used at the same time")
}
return nil
}

0 comments on commit a066b27

Please sign in to comment.