Skip to content

Commit

Permalink
[fix] Enable Kafka TLS when TLS auth is specified (#2107)
Browse files Browse the repository at this point in the history
* [fix] Enable Kafka TLS when TLS auth is specified

Signed-off-by: Pavol Loffay <[email protected]>

* Use TLS when tls.enabled=true

Signed-off-by: Pavol Loffay <[email protected]>

* Log deprecation message

Signed-off-by: Pavol Loffay <[email protected]>

* Fix fmt and lint

Signed-off-by: Pavol Loffay <[email protected]>

* Remove deprecated TLS enabled flag

Signed-off-by: Pavol Loffay <[email protected]>

* Review comments

Signed-off-by: Pavol Loffay <[email protected]>

* Add tls case back

Signed-off-by: Pavol Loffay <[email protected]>
  • Loading branch information
yurishkuro authored and pavolloffay committed Mar 9, 2020
1 parent bf48066 commit ee79ced
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 2 deletions.
44 changes: 44 additions & 0 deletions cmd/ingester/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@
package app

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/kafka/auth"
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

Expand Down Expand Up @@ -49,6 +53,46 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, kafka.EncodingJSON, o.Encoding)
}

func TestTLSFlags(t *testing.T) {
kerb := auth.KerberosConfig{ServiceName: "kafka", ConfigPath: "/etc/krb5.conf", KeyTabPath: "/etc/security/kafka.keytab"}
tests := []struct {
flags []string
expected auth.AuthenticationConfig
}{
{
flags: []string{},
expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: kerb},
},
{
flags: []string{"--kafka.consumer.authentication=foo"},
expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: kerb},
},
{
flags: []string{"--kafka.consumer.authentication=kerberos", "--kafka.consumer.tls.enabled=true"},
expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}},
},
{
flags: []string{"--kafka.consumer.authentication=tls"},
expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}},
},
{
flags: []string{"--kafka.consumer.authentication=tls", "--kafka.consumer.tls.enabled=false"},
expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}},
},
}

for _, test := range tests {
t.Run(fmt.Sprintf("%s", test.flags), func(t *testing.T) {
o := &Options{}
v, command := config.Viperize(AddFlags)
err := command.ParseFlags(test.flags)
require.NoError(t, err)
o.InitFromViper(v)
assert.Equal(t, test.expected, o.AuthenticationConfig)
})
}
}

func TestFlagDefaults(t *testing.T) {
o := &Options{}
v, command := config.Viperize(AddFlags)
Expand Down
13 changes: 11 additions & 2 deletions pkg/kafka/auth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,20 @@ func (config *AuthenticationConfig) SetConfiguration(saramaConfig *sarama.Config
if strings.Trim(authentication, " ") == "" {
authentication = none
}
if config.Authentication == tls || config.TLS.Enabled {
err := setTLSConfiguration(&config.TLS, saramaConfig)
if err != nil {
return err
}
}
switch authentication {
case none:
return nil
case tls:
return nil
case kerberos:
setKerberosConfiguration(&config.Kerberos, saramaConfig)
return nil
case tls:
return setTLSConfiguration(&config.TLS, saramaConfig)
case plaintext:
setPlainTextConfiguration(&config.PlainText, saramaConfig)
return nil
Expand All @@ -85,6 +91,9 @@ func (config *AuthenticationConfig) InitFromViper(configPrefix string, v *viper.
}

config.TLS = tlsClientConfig.InitFromViper(v)
if config.Authentication == tls {
config.TLS.Enabled = true
}

config.PlainText.UserName = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextUserName)
config.PlainText.Password = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextPassword)
Expand Down
44 changes: 44 additions & 0 deletions plugin/storage/kafka/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package kafka

import (
"fmt"
"testing"
"time"

Expand All @@ -23,6 +24,8 @@ import (
"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/kafka/auth"
)

func TestOptionsWithFlags(t *testing.T) {
Expand Down Expand Up @@ -164,3 +167,44 @@ func TestRequiredAcksFailures(t *testing.T) {
_, err := getRequiredAcks("test")
assert.Error(t, err)
}

func TestTLSFlags(t *testing.T) {
kerb := auth.KerberosConfig{ServiceName: "kafka", ConfigPath: "/etc/krb5.conf", KeyTabPath: "/etc/security/kafka.keytab"}
tests := []struct {
flags []string
expected auth.AuthenticationConfig
}{
{
flags: []string{},
expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: kerb},
},
{
flags: []string{"--kafka.producer.authentication=foo"},
expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: kerb},
},
{
flags: []string{"--kafka.producer.authentication=kerberos", "--kafka.producer.tls.enabled=true"},
expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}},
},
{
flags: []string{"--kafka.producer.authentication=tls"},
expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}},
},
{
flags: []string{"--kafka.producer.authentication=tls", "--kafka.producer.tls.enabled=false"},
expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}},
},
}

for _, test := range tests {
t.Run(fmt.Sprintf("%s", test.flags), func(t *testing.T) {
o := &Options{}
v, command := config.Viperize(o.AddFlags)
err := command.ParseFlags(test.flags)
require.NoError(t, err)
o.InitFromViper(v)
assert.Equal(t, test.expected, o.config.AuthenticationConfig)

})
}
}

0 comments on commit ee79ced

Please sign in to comment.