Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Commit

Permalink
Add kafkaSource saslType, e2e tests for auth (tls and sasl512) (#1670)
Browse files Browse the repository at this point in the history
* Add kafkaSource saslType, e2e tests for auth (tls and sasl512)

eventing-kafka#237 backport

* Add kafkasource SASL Type

* Add unit tests for NewConfig in source client

* wip, Add tests, lifecycle, e2e test

* Fix v1alpha1 conversion, tests

* Refactor tests

* Change kafkabinding sasltype json to 'Type'

* Rework test call structure

* Fix CoreV1 interface reference when copying secret to test ns
  • Loading branch information
lberk authored Nov 30, 2020
1 parent f268982 commit 88c73c7
Show file tree
Hide file tree
Showing 18 changed files with 577 additions and 36 deletions.
15 changes: 15 additions & 0 deletions kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type AdapterSASL struct {
Enable bool `envconfig:"KAFKA_NET_SASL_ENABLE" required:"false"`
User string `envconfig:"KAFKA_NET_SASL_USER" required:"false"`
Password string `envconfig:"KAFKA_NET_SASL_PASSWORD" required:"false"`
Type string `envconfig:"KAFKA_NET_SASL_TYPE" required:"false"`
}

type AdapterTLS struct {
Expand Down Expand Up @@ -65,8 +66,22 @@ func NewConfig(ctx context.Context) ([]string, *sarama.Config, error) {

if env.Net.SASL.Enable {
cfg.Net.SASL.Enable = true
cfg.Net.SASL.Handshake = true
cfg.Net.SASL.User = env.Net.SASL.User
cfg.Net.SASL.Password = env.Net.SASL.Password

// We default to plain sasl type
cfg.Net.SASL.Mechanism = sarama.SASLTypePlaintext

if env.Net.SASL.Type == sarama.SASLTypeSCRAMSHA256 {
cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} }
cfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
}

if env.Net.SASL.Type == sarama.SASLTypeSCRAMSHA512 {
cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
cfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
}
}

if env.Net.TLS.Enable {
Expand Down
158 changes: 151 additions & 7 deletions kafka/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,16 +333,160 @@ func generateCert(t *testing.T) (string, string) {
}

func TestNewConfig(t *testing.T) {
ctx := context.Background()

// Increasing coverage
_ = os.Setenv("KAFKA_BOOTSTRAP_SERVERS", "my-cluster-kafka-bootstrap.my-kafka-namespace:9092")
defaultBootstrapServer := "my-cluster-kafka-bootstrap.my-kafka-namespace:9092"
defaultSASLUser := "secret-user"
defaultSASLPassword := "super-seekrit-password"
testCases := map[string]struct {
env map[string]string
enabledTLS bool
enabledSASL bool
wantErr bool
saslMechanism string
bootstrapServer string
saslUser string
saslPassword string
}{
"Just bootstrap Server": {
env: map[string]string{
"KAFKA_BOOTSTRAP_SERVERS": defaultBootstrapServer,
},
bootstrapServer: defaultBootstrapServer,
},
"Incorrect bootstrap Server": {
env: map[string]string{
"KAFKA_BOOTSTRAP_SERVERS": defaultBootstrapServer,
},
bootstrapServer: "ImADoctorNotABootstrapServerJim!",
wantErr: true,
},
/*
TODO
"Multiple bootstrap servers": {
env: map[string]string{
servers, config, err := NewConfig(ctx)
},
require.NoError(t, err)
require.NotNil(t, config)
require.Equal(t, []string{"my-cluster-kafka-bootstrap.my-kafka-namespace:9092"}, servers)
},*/
"No Auth": {
env: map[string]string{
"KAFKA_BOOTSTRAP_SERVERS": defaultBootstrapServer,
},
enabledTLS: false,
enabledSASL: false,
bootstrapServer: defaultBootstrapServer,
},
"Defaulting to SASL-Plain Auth (none specified)": {
env: map[string]string{
"KAFKA_BOOTSTRAP_SERVERS": defaultBootstrapServer,
"KAFKA_NET_SASL_ENABLE": "true",
},
enabledSASL: true,
saslMechanism: sarama.SASLTypePlaintext,
bootstrapServer: defaultBootstrapServer,
},
"Only SASL-PLAIN Auth": {
env: map[string]string{
"KAFKA_BOOTSTRAP_SERVERS": defaultBootstrapServer,
"KAFKA_NET_SASL_ENABLE": "true",
"KAFKA_NET_SASL_USER": defaultSASLUser,
"KAFKA_NET_SASL_PASSWORD": defaultSASLPassword,
},
enabledSASL: true,
saslUser: defaultSASLUser,
saslPassword: defaultSASLPassword,
saslMechanism: sarama.SASLTypePlaintext,
bootstrapServer: defaultBootstrapServer,
},
"SASL-PLAIN Auth, Forgot User": {
env: map[string]string{
"KAFKA_BOOTSTRAP_SERVERS": defaultBootstrapServer,
"KAFKA_NET_SASL_ENABLE": "true",
"KAFKA_NET_SASL_PASSWORD": defaultSASLPassword,
},
enabledSASL: true,
wantErr: true,
saslUser: defaultSASLUser,
saslPassword: defaultSASLPassword,
saslMechanism: sarama.SASLTypePlaintext,
bootstrapServer: defaultBootstrapServer,
},
"SASL-PLAIN Auth, Forgot Password": {
env: map[string]string{
"KAFKA_BOOTSTRAP_SERVERS": defaultBootstrapServer,
"KAFKA_NET_SASL_ENABLE": "true",
"KAFKA_NET_SASL_USER": defaultSASLUser,
},
enabledSASL: true,
wantErr: true,
saslUser: defaultSASLUser,
saslPassword: defaultSASLPassword,
saslMechanism: sarama.SASLTypePlaintext,
bootstrapServer: defaultBootstrapServer,
},
"Only SASL-SCRAM-SHA-256 Auth": {
env: map[string]string{
"KAFKA_BOOTSTRAP_SERVERS": defaultBootstrapServer,
"KAFKA_NET_SASL_ENABLE": "true",
"KAFKA_NET_SASL_USER": defaultSASLUser,
"KAFKA_NET_SASL_PASSWORD": defaultSASLPassword,
"KAFKA_NET_SASL_TYPE": sarama.SASLTypeSCRAMSHA256,
},
enabledSASL: true,
saslUser: defaultSASLUser,
saslPassword: defaultSASLPassword,
saslMechanism: sarama.SASLTypeSCRAMSHA256,
bootstrapServer: defaultBootstrapServer,
},
"Only SASL-SCRAM-SHA-512 Auth": {
env: map[string]string{
"KAFKA_BOOTSTRAP_SERVERS": defaultBootstrapServer,
"KAFKA_NET_SASL_ENABLE": "true",
"KAFKA_NET_SASL_USER": defaultSASLUser,
"KAFKA_NET_SASL_PASSWORD": defaultSASLPassword,
"KAFKA_NET_SASL_TYPE": sarama.SASLTypeSCRAMSHA512,
},
enabledSASL: true,
saslUser: defaultSASLUser,
saslPassword: defaultSASLPassword,
saslMechanism: sarama.SASLTypeSCRAMSHA512,
bootstrapServer: defaultBootstrapServer,
},
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
for k, v := range tc.env {
_ = os.Setenv(k, v)
}
servers, config, err := NewConfig(context.Background())
if err != nil && tc.wantErr != true {
t.Fatal(err)
}
if servers[0] != tc.bootstrapServer && tc.wantErr != true {
t.Fatalf("Incorrect bootstrapServers, got: %s vs want: %s", servers[0], tc.bootstrapServer)
}
if tc.enabledSASL {
if tc.saslMechanism != string(config.Net.SASL.Mechanism) {
t.Fatalf("Incorrect SASL mechanism, got: %s vs want: %s", string(config.Net.SASL.Mechanism), tc.saslMechanism)
}

if config.Net.SASL.Enable != true {
t.Fatal("Incorrect SASL Configuration (not enabled)")
}

if config.Net.SASL.User != tc.saslUser && !tc.wantErr {
t.Fatalf("Incorrect SASL User, got: %s vs want: %s", config.Net.SASL.User, tc.saslUser)
}
if config.Net.SASL.Password != tc.saslPassword && !tc.wantErr {
t.Fatalf("Incorrect SASL Password, got: %s vs want: %s", config.Net.SASL.Password, tc.saslPassword)
}
}
require.NotNil(t, config)
for k := range tc.env {
_ = os.Unsetenv(k)
}
})
}
}

func TestAdminClient(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions kafka/source/pkg/apis/bindings/v1alpha1/kafka_conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ func (source *KafkaAuthSpec) ConvertTo(_ context.Context, obj apis.Convertible)
},
Password: bindingsv1beta1.SecretValueFromSource{
SecretKeyRef: source.Net.SASL.Password.SecretKeyRef},
Type: bindingsv1beta1.SecretValueFromSource{
SecretKeyRef: source.Net.SASL.Type.SecretKeyRef,
},
},
TLS: bindingsv1beta1.KafkaTLSSpec{
Enable: source.Net.TLS.Enable,
Expand Down Expand Up @@ -117,6 +120,9 @@ func (sink *KafkaAuthSpec) ConvertFrom(_ context.Context, obj apis.Convertible)
},
Password: SecretValueFromSource{
SecretKeyRef: source.Net.SASL.Password.SecretKeyRef},
Type: SecretValueFromSource{
SecretKeyRef: source.Net.SASL.Type.SecretKeyRef,
},
},
TLS: KafkaTLSSpec{
Enable: source.Net.TLS.Enable,
Expand Down
18 changes: 18 additions & 0 deletions kafka/source/pkg/apis/bindings/v1alpha1/kafka_conversion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,15 @@ func TestKafkaBindingConversionRoundTripV1alpha1(t *testing.T) {
Optional: pointer.BoolPtr(true),
},
},
Type: SecretValueFromSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: "sasl-type-secret-local-obj-ref",
},
Key: "sasl-type-secret-key",
Optional: pointer.BoolPtr(true),
},
},
},
TLS: KafkaTLSSpec{
Enable: true,
Expand Down Expand Up @@ -248,6 +257,15 @@ func TestKafkaBindingConversionRoundTripV1beta1(t *testing.T) {
Optional: pointer.BoolPtr(true),
},
},
Type: v1beta1.SecretValueFromSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: "sasl-type-secret-local-obj-ref",
},
Key: "sasl-type-secret-key",
Optional: pointer.BoolPtr(true),
},
},
},
TLS: v1beta1.KafkaTLSSpec{
Enable: false,
Expand Down
14 changes: 12 additions & 2 deletions kafka/source/pkg/apis/bindings/v1alpha1/kafka_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ func (kfb *KafkaBinding) Do(ctx context.Context, ps *duckv1.WithPod) {
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: kfb.Spec.Net.SASL.Password.SecretKeyRef,
},
}, corev1.EnvVar{
Name: "KAFKA_NET_SASL_TYPE",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: kfb.Spec.Net.SASL.Type.SecretKeyRef,
},
})
}
if kfb.Spec.Net.TLS.Enable {
Expand Down Expand Up @@ -147,6 +152,11 @@ func (kfb *KafkaBinding) Do(ctx context.Context, ps *duckv1.WithPod) {
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: kfb.Spec.Net.SASL.Password.SecretKeyRef,
},
}, corev1.EnvVar{
Name: "KAFKA_NET_SASL_TYPE",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: kfb.Spec.Net.SASL.Type.SecretKeyRef,
},
})
}
if kfb.Spec.Net.TLS.Enable {
Expand Down Expand Up @@ -184,7 +194,7 @@ func (kfb *KafkaBinding) Undo(ctx context.Context, ps *duckv1.WithPod) {
for j, ev := range c.Env {
switch ev.Name {
case "KAFKA_NET_TLS_ENABLE", "KAFKA_NET_TLS_CERT", "KAFKA_NET_TLS_KEY", "KAFKA_NET_TLS_CA_CERT",
"KAFKA_NET_SASL_ENABLE", "KAFKA_NET_SASL_USER", "KAFKA_NET_SASL_PASSWORD",
"KAFKA_NET_SASL_ENABLE", "KAFKA_NET_SASL_USER", "KAFKA_NET_SASL_PASSWORD", "KAFKA_NET_SASL_TYPE",
"KAFKA_BOOTSTRAP_SERVERS":

continue
Expand All @@ -203,7 +213,7 @@ func (kfb *KafkaBinding) Undo(ctx context.Context, ps *duckv1.WithPod) {
for j, ev := range c.Env {
switch ev.Name {
case "KAFKA_NET_TLS_ENABLE", "KAFKA_NET_TLS_CERT", "KAFKA_NET_TLS_KEY", "KAFKA_NET_TLS_CA_CERT",
"KAFKA_NET_SASL_ENABLE", "KAFKA_NET_SASL_USER", "KAFKA_NET_SASL_PASSWORD",
"KAFKA_NET_SASL_ENABLE", "KAFKA_NET_SASL_USER", "KAFKA_NET_SASL_PASSWORD", "KAFKA_NET_SASL_TYPE",
"KAFKA_BOOTSTRAP_SERVERS":
continue
default:
Expand Down
Loading

0 comments on commit 88c73c7

Please sign in to comment.