diff --git a/cmd/ingester/app/flags_test.go b/cmd/ingester/app/flags_test.go index 93be0d13f54..5659dbc4bc4 100644 --- a/cmd/ingester/app/flags_test.go +++ b/cmd/ingester/app/flags_test.go @@ -55,29 +55,30 @@ func TestOptionsWithFlags(t *testing.T) { func TestTLSFlags(t *testing.T) { kerb := auth.KerberosConfig{ServiceName: "kafka", ConfigPath: "/etc/krb5.conf", KeyTabPath: "/etc/security/kafka.keytab"} + plain := auth.PlainTextConfig{Username: "", Password: "", Mechanism: "PLAIN"} tests := []struct { flags []string expected auth.AuthenticationConfig }{ { flags: []string{}, - expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: kerb}, + expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: kerb, PlainText: plain}, }, { flags: []string{"--kafka.consumer.authentication=foo"}, - expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: kerb}, + expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: kerb, PlainText: plain}, }, { flags: []string{"--kafka.consumer.authentication=kerberos", "--kafka.consumer.tls.enabled=true"}, - expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}}, + expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain}, }, { flags: []string{"--kafka.consumer.authentication=tls"}, - expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}}, + expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain}, }, { flags: []string{"--kafka.consumer.authentication=tls", "--kafka.consumer.tls.enabled=false"}, - expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}}, + expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain}, }, } diff --git a/cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter.go b/cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter.go index 4cf4c583b66..49e84f20941 100644 --- a/cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter.go +++ b/cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter.go @@ -77,7 +77,7 @@ func (f Factory) CreateDefaultConfig() configmodels.Exporter { if opts.Config.Authentication == "plaintext" { cfg.Authentication.PlainText = &kafkaexporter.PlainTextConfig{ - Username: opts.Config.PlainText.UserName, + Username: opts.Config.PlainText.Username, Password: opts.Config.PlainText.Password, } } diff --git a/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go b/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go index 912dc906de9..514b5e99a78 100644 --- a/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go +++ b/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go @@ -84,7 +84,7 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver { if opts.Authentication == "plaintext" { cfg.Authentication.PlainText = &kafkaexporter.PlainTextConfig{ - Username: opts.PlainText.UserName, + Username: opts.PlainText.Username, Password: opts.PlainText.Password, } } diff --git a/go.mod b/go.mod index 5c6299e7588..f099265a018 100644 --- a/go.mod +++ b/go.mod @@ -67,6 +67,7 @@ require ( github.com/uber/jaeger-lib v2.4.0+incompatible github.com/vektra/mockery v0.0.0-20181123154057-e78b021dcbb5 github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad + github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c go.mongodb.org/mongo-driver v1.3.2 // indirect go.uber.org/atomic v1.6.0 go.uber.org/automaxprocs v1.3.0 diff --git a/go.sum b/go.sum index 5b1f9758605..b8ae769b040 100644 --- a/go.sum +++ b/go.sum @@ -589,8 +589,10 @@ github.com/vektra/mockery v0.0.0-20181123154057-e78b021dcbb5 h1:Xim2mBRFdXzXmKRO github.com/vektra/mockery v0.0.0-20181123154057-e78b021dcbb5/go.mod h1:ppEjwdhyy7Y31EnHRDm1JkChoC7LXIJ7Ex0VYLWtZtQ= github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad h1:W0LEBv82YCGEtcmPA3uNZBI33/qF//HAAs3MawDjRa0= github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad/go.mod h1:Hy8o65+MXnS6EwGElrSRjUzQDLXreJlzYLlWiHtt8hM= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= +github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= diff --git a/pkg/kafka/auth/config.go b/pkg/kafka/auth/config.go index c9e42e0c0a5..a133b1a29de 100644 --- a/pkg/kafka/auth/config.go +++ b/pkg/kafka/auth/config.go @@ -68,7 +68,10 @@ func (config *AuthenticationConfig) SetConfiguration(saramaConfig *sarama.Config setKerberosConfiguration(&config.Kerberos, saramaConfig) return nil case plaintext: - setPlainTextConfiguration(&config.PlainText, saramaConfig) + err := setPlainTextConfiguration(&config.PlainText, saramaConfig) + if err != nil { + return err + } return nil default: return fmt.Errorf("Unknown/Unsupported authentication method %s to kafka cluster", config.Authentication) @@ -81,7 +84,7 @@ func (config *AuthenticationConfig) InitFromViper(configPrefix string, v *viper. config.Kerberos.ServiceName = v.GetString(configPrefix + kerberosPrefix + suffixKerberosServiceName) config.Kerberos.Realm = v.GetString(configPrefix + kerberosPrefix + suffixKerberosRealm) config.Kerberos.UseKeyTab = v.GetBool(configPrefix + kerberosPrefix + suffixKerberosUseKeyTab) - config.Kerberos.Username = v.GetString(configPrefix + kerberosPrefix + suffixKerberosUserName) + config.Kerberos.Username = v.GetString(configPrefix + kerberosPrefix + suffixKerberosUsername) config.Kerberos.Password = v.GetString(configPrefix + kerberosPrefix + suffixKerberosPassword) config.Kerberos.ConfigPath = v.GetString(configPrefix + kerberosPrefix + suffixKerberosConfig) config.Kerberos.KeyTabPath = v.GetString(configPrefix + kerberosPrefix + suffixKerberosKeyTab) @@ -97,6 +100,7 @@ func (config *AuthenticationConfig) InitFromViper(configPrefix string, v *viper. config.TLS.Enabled = true } - config.PlainText.UserName = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextUserName) + config.PlainText.Username = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextUsername) config.PlainText.Password = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextPassword) + config.PlainText.Mechanism = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextMechanism) } diff --git a/pkg/kafka/auth/options.go b/pkg/kafka/auth/options.go index e2e7edb4804..44e860ced72 100644 --- a/pkg/kafka/auth/options.go +++ b/pkg/kafka/auth/options.go @@ -30,7 +30,7 @@ const ( suffixKerberosServiceName = ".service-name" suffixKerberosRealm = ".realm" suffixKerberosUseKeyTab = ".use-keytab" - suffixKerberosUserName = ".username" + suffixKerberosUsername = ".username" suffixKerberosPassword = ".password" suffixKerberosConfig = ".config-file" suffixKerberosKeyTab = ".keytab-file" @@ -43,12 +43,14 @@ const ( defaultKerberosUsername = "" defaultKerberosKeyTab = "/etc/security/kafka.keytab" - plainTextPrefix = ".plaintext" - suffixPlainTextUserName = ".username" - suffixPlainTextPassword = ".password" + plainTextPrefix = ".plaintext" + suffixPlainTextUsername = ".username" + suffixPlainTextPassword = ".password" + suffixPlainTextMechanism = ".mechanism" - defaultPlainTextUserName = "" - defaultPlainTextPassword = "" + defaultPlainTextUsername = "" + defaultPlainTextPassword = "" + defaultPlainTextMechanism = "PLAIN" ) func addKerberosFlags(configPrefix string, flagSet *flag.FlagSet) { @@ -65,7 +67,7 @@ func addKerberosFlags(configPrefix string, flagSet *flag.FlagSet) { defaultKerberosPassword, "The Kerberos password used for authenticate with KDC") flagSet.String( - configPrefix+kerberosPrefix+suffixKerberosUserName, + configPrefix+kerberosPrefix+suffixKerberosUsername, defaultKerberosUsername, "The Kerberos username used for authenticate with KDC") flagSet.String( @@ -84,13 +86,17 @@ func addKerberosFlags(configPrefix string, flagSet *flag.FlagSet) { func addPlainTextFlags(configPrefix string, flagSet *flag.FlagSet) { flagSet.String( - configPrefix+plainTextPrefix+suffixPlainTextUserName, - defaultPlainTextUserName, + configPrefix+plainTextPrefix+suffixPlainTextUsername, + defaultPlainTextUsername, "The plaintext Username for SASL/PLAIN authentication") flagSet.String( configPrefix+plainTextPrefix+suffixPlainTextPassword, defaultPlainTextPassword, "The plaintext Password for SASL/PLAIN authentication") + flagSet.String( + configPrefix+plainTextPrefix+suffixPlainTextMechanism, + defaultPlainTextMechanism, + "The plaintext Mechanism for SASL/PLAIN authentication, e.g. 'SCRAM-SHA-256' or 'SCRAM-SHA-512' or 'PLAIN'") } // AddFlags add configuration flags to a flagSet. diff --git a/pkg/kafka/auth/plaintext.go b/pkg/kafka/auth/plaintext.go index 277d5d3c58f..1bb95334ec7 100644 --- a/pkg/kafka/auth/plaintext.go +++ b/pkg/kafka/auth/plaintext.go @@ -15,17 +15,77 @@ package auth import ( + "crypto/sha256" + "crypto/sha512" + "fmt" + "hash" + "strings" + "github.com/Shopify/sarama" + "github.com/xdg/scram" ) +// scramClient is the client to use when the auth mechanism is SCRAM +type scramClient struct { + *scram.Client + *scram.ClientConversation + scram.HashGeneratorFcn +} + +// Begin prepares the client for the SCRAM exchange +// with the server with a user name and a password +func (x *scramClient) Begin(userName, password, authzID string) (err error) { + x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID) + if err != nil { + return err + } + x.ClientConversation = x.Client.NewConversation() + return nil +} + +// Step steps client through the SCRAM exchange. It is +// called repeatedly until it errors or `Done` returns true. +func (x *scramClient) Step(challenge string) (response string, err error) { + response, err = x.ClientConversation.Step(challenge) + return +} + +// Done should return true when the SCRAM conversation +// is over. +func (x *scramClient) Done() bool { + return x.ClientConversation.Done() +} + // PlainTextConfig describes the configuration properties needed for SASL/PLAIN with kafka type PlainTextConfig struct { - UserName string `mapstructure:"username"` - Password string `mapstructure:"password" json:"-"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password" json:"-"` + Mechanism string `mapstructure:"mechanism"` } -func setPlainTextConfiguration(config *PlainTextConfig, saramaConfig *sarama.Config) { +var _ sarama.SCRAMClient = (*scramClient)(nil) + +func setPlainTextConfiguration(config *PlainTextConfig, saramaConfig *sarama.Config) error { saramaConfig.Net.SASL.Enable = true - saramaConfig.Net.SASL.User = config.UserName + saramaConfig.Net.SASL.User = config.Username saramaConfig.Net.SASL.Password = config.Password + switch strings.ToUpper(config.Mechanism) { + case "SCRAM-SHA-256": + saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + return &scramClient{HashGeneratorFcn: func() hash.Hash { return sha256.New() }} + } + saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256 + case "SCRAM-SHA-512": + saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + return &scramClient{HashGeneratorFcn: func() hash.Hash { return sha512.New() }} + } + saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 + case "PLAIN": + saramaConfig.Net.SASL.Mechanism = sarama.SASLTypePlaintext + + default: + return fmt.Errorf("config plaintext.mechanism error: %s, only support 'SCRAM-SHA-256' or 'SCRAM-SHA-512' or 'PLAIN'", config.Mechanism) + + } + return nil } diff --git a/plugin/storage/kafka/options_test.go b/plugin/storage/kafka/options_test.go index 5fa6f249301..f90b6040650 100644 --- a/plugin/storage/kafka/options_test.go +++ b/plugin/storage/kafka/options_test.go @@ -173,29 +173,30 @@ func TestRequiredAcksFailures(t *testing.T) { func TestTLSFlags(t *testing.T) { kerb := auth.KerberosConfig{ServiceName: "kafka", ConfigPath: "/etc/krb5.conf", KeyTabPath: "/etc/security/kafka.keytab"} + plain := auth.PlainTextConfig{Username: "", Password: "", Mechanism: "PLAIN"} tests := []struct { flags []string expected auth.AuthenticationConfig }{ { flags: []string{}, - expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: kerb}, + expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: kerb, PlainText: plain}, }, { flags: []string{"--kafka.producer.authentication=foo"}, - expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: kerb}, + expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: kerb, PlainText: plain}, }, { flags: []string{"--kafka.producer.authentication=kerberos", "--kafka.producer.tls.enabled=true"}, - expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}}, + expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain}, }, { flags: []string{"--kafka.producer.authentication=tls"}, - expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}}, + expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain}, }, { flags: []string{"--kafka.producer.authentication=tls", "--kafka.producer.tls.enabled=false"}, - expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}}, + expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain}, }, }