Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Commit

Permalink
[0.18] Backport: Adding global TLS/SASL for the KafkaChannel (#1667)
Browse files Browse the repository at this point in the history
* Porting the TLS/SASL support for the Kafka channel

Signed-off-by: Matthias Wessendorf <[email protected]>

* Adding source build tag, to differenciate between SRC and Channel

Signed-off-by: Matthias Wessendorf <[email protected]>
  • Loading branch information
matzew authored Nov 24, 2020
1 parent 3de76f3 commit f268982
Show file tree
Hide file tree
Showing 48 changed files with 5,951 additions and 16 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/kelseyhightower/envconfig v1.4.0
github.com/slinkydeveloper/loadastic v0.0.0-20191203132749-9afe5a010a57
github.com/stretchr/testify v1.6.0
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
go.opencensus.io v0.22.5-0.20200716030834-3456e1d174b2
go.opentelemetry.io/otel v0.4.2 // indirect
go.uber.org/zap v1.15.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1222,7 +1222,9 @@ github.com/xanzy/go-gitlab v0.32.0 h1:tBm+OXv1t+KBsqlXkSDFz+YUjRM0GFsjpOWYOod3Eb
github.com/xanzy/go-gitlab v0.32.0/go.mod h1:sPLojNBn68fMUWSxIJtdVVIP8uSBYqesTfDUseX11Ug=
github.com/xanzy/ssh-agent v0.2.1 h1:TCbipTQL2JiiCprBWx9frJ2eJlCYT00NmctrHxVAr70=
github.com/xanzy/ssh-agent v0.2.1/go.mod h1:mLlQY/MoOhWBj+gOGMQkOeiEvkx+8pJSI+0Bx9h2kr4=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
Expand Down
2 changes: 2 additions & 0 deletions kafka/channel/config/configmaps/kafka-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ data:
# Broker URL. Replace this with the URLs for your kafka cluster,
# which is in the format of my-cluster-kafka-bootstrap.my-kafka-namespace:9092.
bootstrapServers: REPLACE_WITH_CLUSTER_URL
#authSecretName: name-of-your-secret-for-kafka-auth
#authSecretNamespace: namespace-of-your-secret-for-kafka-auth
1 change: 1 addition & 0 deletions kafka/channel/config/roles/controller-clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ rules:
resources:
- services
- configmaps
- secrets
verbs:
- get
- list
Expand Down
1 change: 1 addition & 0 deletions kafka/channel/config/roles/dispatcher-clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ rules:
- ""
resources:
- configmaps
- secrets
verbs:
- get
- list
Expand Down
9 changes: 9 additions & 0 deletions kafka/channel/pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"sync"
"sync/atomic"

source "knative.dev/eventing-contrib/kafka"

"github.com/Shopify/sarama"
protocolkafka "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
"github.com/cloudevents/sdk-go/v2/binding"
Expand Down Expand Up @@ -89,6 +91,12 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat
conf.Consumer.Return.Errors = true // Returns the errors in ConsumerGroup#Errors() https://godoc.org/github.com/Shopify/sarama#ConsumerGroup
conf.Producer.Return.Successes = true // Must be enabled for sync producer

// append the auth info:
err := source.UpdateSaramaConfigWithKafkaAuthConfig(conf, args.KafkaAuthConfig)
if err != nil {
return nil, fmt.Errorf("Error updating the Sarama Auth config: %w", err)
}

producer, err := sarama.NewSyncProducer(args.Brokers, conf)
if err != nil {
return nil, fmt.Errorf("unable to create kafka producer against Kafka bootstrap servers %v : %v", args.Brokers, err)
Expand Down Expand Up @@ -145,6 +153,7 @@ type KafkaDispatcherArgs struct {
KnCEConnectionArgs *kncloudevents.ConnectionArgs
ClientID string
Brokers []string
KafkaAuthConfig *utils.KafkaAuthConfig
TopicFunc TopicFunc
Logger *zap.SugaredLogger
}
Expand Down
10 changes: 9 additions & 1 deletion kafka/channel/pkg/reconciler/controller/kafkachannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"errors"
"fmt"

source "knative.dev/eventing-contrib/kafka"

"k8s.io/utils/pointer"

"github.com/Shopify/sarama"
Expand Down Expand Up @@ -111,6 +113,7 @@ type Reconciler struct {
dispatcherImage string

kafkaConfig *utils.KafkaConfig
kafkaAuthConfig *utils.KafkaAuthConfig
kafkaConfigError error
kafkaClientSet kafkaclientset.Interface

Expand Down Expand Up @@ -469,7 +472,7 @@ func (r *Reconciler) createClient(ctx context.Context, kc *v1beta1.KafkaChannel)
kafkaClusterAdmin := r.kafkaClusterAdmin
if kafkaClusterAdmin == nil {
var err error
kafkaClusterAdmin, err = resources.MakeClient(controllerAgentName, r.kafkaConfig.Brokers)
kafkaClusterAdmin, err = source.MakeAdminClient(controllerAgentName, r.kafkaAuthConfig, r.kafkaConfig.Brokers)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -518,6 +521,11 @@ func (r *Reconciler) updateKafkaConfig(ctx context.Context, configMap *corev1.Co
if err != nil {
logging.FromContext(ctx).Errorw("Error reading Kafka configuration", zap.Error(err))
}

if kafkaConfig.AuthSecretName != "" {
kafkaAuthConfig := utils.GetKafkaAuthData(ctx, kafkaConfig.AuthSecretName, kafkaConfig.AuthSecretNamespace)
r.kafkaAuthConfig = kafkaAuthConfig
}
// For now just override the previous config.
// Eventually the previous config should be snapshotted to delete Kafka topics
r.kafkaConfig = kafkaConfig
Expand Down
3 changes: 3 additions & 0 deletions kafka/channel/pkg/reconciler/dispatcher/kafkachannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl
logger.Fatalw("Error loading kafka config", zap.Error(err))
}

kafkaAuthCfg := utils.GetKafkaAuthData(ctx, kafkaConfig.AuthSecretName, kafkaConfig.AuthSecretNamespace)

connectionArgs := &kncloudevents.ConnectionArgs{
MaxIdleConns: int(kafkaConfig.MaxIdleConns),
MaxIdleConnsPerHost: int(kafkaConfig.MaxIdleConnsPerHost),
Expand All @@ -98,6 +100,7 @@ func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl
KnCEConnectionArgs: connectionArgs,
ClientID: "kafka-ch-dispatcher",
Brokers: kafkaConfig.Brokers,
KafkaAuthConfig: kafkaAuthCfg,
TopicFunc: utils.TopicName,
Logger: logger,
}
Expand Down
71 changes: 71 additions & 0 deletions kafka/channel/pkg/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@ limitations under the License.
package utils

import (
"context"
"errors"
"fmt"
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/logging"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"

Expand All @@ -29,9 +34,18 @@ import (

const (
BrokerConfigMapKey = "bootstrapServers"
AuthSecretName = "authSecretName"
AuthSecretNamespace = "authSecretNamespace"
MaxIdleConnectionsKey = "maxIdleConns"
MaxIdleConnectionsPerHostKey = "maxIdleConnsPerHost"

TlsCacert = "ca.crt"
TlsUsercert = "user.crt"
TlsUserkey = "user.key"
SaslUser = "user"
SaslPassword = "password"
SaslType = "saslType"

KafkaChannelSeparator = "."

// DefaultNumPartitions defines the default number of partitions
Expand All @@ -50,6 +64,57 @@ type KafkaConfig struct {
Brokers []string
MaxIdleConns int32
MaxIdleConnsPerHost int32
AuthSecretName string
AuthSecretNamespace string
}

type KafkaAuthConfig struct {
TLS *KafkaTlsConfig
SASL *KafkaSaslConfig
}

type KafkaTlsConfig struct {
Cacert string
Usercert string
Userkey string
}

type KafkaSaslConfig struct {
User string
Password string
SaslType string
}

func GetKafkaAuthData(ctx context.Context, secretname string, secretNS string) *KafkaAuthConfig {

k8sClient := kubeclient.Get(ctx)
secret, err := k8sClient.CoreV1().Secrets(secretNS).Get(ctx, secretname, metav1.GetOptions{})

if err != nil || secret == nil {
logging.FromContext(ctx).Errorf("Referenced Auth Secret not found")
return nil
}

kafkaAuthConfig := &KafkaAuthConfig{}
// check for TLS
if string(secret.Data[TlsCacert]) != "" {
tls := &KafkaTlsConfig{
Cacert: string(secret.Data[TlsCacert]),
Usercert: string(secret.Data[TlsUsercert]),
Userkey: string(secret.Data[TlsUserkey]),
}
kafkaAuthConfig.TLS = tls
}

if string(secret.Data[SaslUser]) != "" {
sasl := &KafkaSaslConfig{
User: string(secret.Data[SaslUser]),
Password: string(secret.Data[SaslPassword]),
SaslType: string(secret.Data[SaslType]),
}
kafkaAuthConfig.SASL = sasl
}
return kafkaAuthConfig
}

// GetKafkaConfig returns the details of the Kafka cluster.
Expand All @@ -64,9 +129,13 @@ func GetKafkaConfig(configMap map[string]string) (*KafkaConfig, error) {
}

var bootstrapServers string
var authSecretNamespace string
var authSecretName string

err := configmap.Parse(configMap,
configmap.AsString(BrokerConfigMapKey, &bootstrapServers),
configmap.AsString(AuthSecretName, &authSecretName),
configmap.AsString(AuthSecretNamespace, &authSecretNamespace),
configmap.AsInt32(MaxIdleConnectionsKey, &config.MaxIdleConns),
configmap.AsInt32(MaxIdleConnectionsPerHostKey, &config.MaxIdleConnsPerHost),
)
Expand All @@ -84,6 +153,8 @@ func GetKafkaConfig(configMap map[string]string) (*KafkaConfig, error) {
}
}
config.Brokers = bootstrapServersSplitted
config.AuthSecretName = authSecretName
config.AuthSecretNamespace = authSecretNamespace

return config, nil
}
Expand Down
Loading

0 comments on commit f268982

Please sign in to comment.