Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 68 additions & 2 deletions bindings/kafka/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ binding:
authenticationProfiles:
- title: "OIDC Authentication"
description: |
Authenticate using OpenID Connect.
Authenticate using OpenID Connect providing a client secret.
metadata:
- name: authType
type: string
Expand Down Expand Up @@ -60,6 +60,72 @@ authenticationProfiles:
example: |
{"cluster":"kafka","poolid":"kafkapool"}
type: string
- title: "OIDC Private Key JWT Authentication"
description: |
Authenticate using OpenID Connect providing a client certificate and private key.
metadata:
- name: authType
type: string
required: true
description: |
Authentication type.
This must be set to "oidc_private_key_jwt" for this authentication profile.
example: '"oidc_private_key_jwt"'
allowedValues:
- "oidc_private_key_jwt"
- name: oidcTokenEndpoint
type: string
required: true
description: |
URL of the OAuth2 identity provider access token endpoint.
example: '"https://identity.example.com/v1/token"'
- name: oidcClientID
description: |
The OAuth2 client ID that has been provisioned in the identity provider.
example: '"my-client-id"'
type: string
required: true
- name: oidcClientAssertionCert
type: string
required: true
description: |
PEM-encoded X.509 certificate used to advertise the client certificate in the x5c header.
example: |
-----BEGIN CERTIFICATE-----\n...
- name: oidcClientAssertionKey
type: string
required: true
sensitive: true
description: |
PEM-encoded private key used to sign the client certificate.
example: |
-----BEGIN PRIVATE KEY-----\n...
- name: oidcResource
type: string
required: false
description: |
Optional OAuth2 resource (audience) parameter to include in the token request when required by the identity provider.
example: '"api://kafka"'
- name: oidcAudience
type: string
required: false
description: |
Overrides the JWT client assertion audience (aud). If not set, the component uses the
issuer derived from the token endpoint URL when available; otherwise, it falls back to the token URL.
example: '"http://<idp-host>/realms/local"'
- name: oidcScopes
type: string
description: |
Comma-delimited list of OAuth2/OIDC scopes to request with the access token.
Although not required, this field is recommended.
example: '"openid,kafka-prod"'
default: '"openid"'
- name: oidcExtensions
description: |
String containing a JSON-encoded dictionary of OAuth2/OIDC extensions to request with the access token.
example: |
{"cluster":"kafka","poolid":"kafkapool"}
type: string
- title: "SASL Authentication"
description: |
Authenticate using SASL.
Expand Down Expand Up @@ -336,7 +402,7 @@ metadata:
type: bool
required: false
description: |
Enables URL escaping of the message header values.
Enables URL escaping of the message header values.
It allows sending headers with special characters that are usually not allowed in HTTP headers.
example: "true"
default: "false"
17 changes: 17 additions & 0 deletions common/component/kafka/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,23 @@ func updateOidcAuthInfo(config *sarama.Config, metadata *KafkaMetadata) error {
return nil
}

func updateOidcPrivateKeyJWTAuthInfo(config *sarama.Config, metadata *KafkaMetadata) error {
tokenProvider := metadata.getOAuthTokenSourcePrivateKeyJWT()

if metadata.TLSCaCert != "" {
err := tokenProvider.addCa(metadata.TLSCaCert)
if err != nil {
return fmt.Errorf("kafka: error setting oauth client trusted CA: %w", err)
}
}

config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypeOAuth
config.Net.SASL.TokenProvider = tokenProvider

return nil
}

func updateAWSIAMAuthInfo(ctx context.Context, config *sarama.Config, metadata *KafkaMetadata) error {
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypeOAuth
Expand Down
6 changes: 6 additions & 0 deletions common/component/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,12 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
if err != nil {
return err
}
case oidcPrivateKeyJWTAuthType:
k.logger.Info("Configuring SASL OAuth2/OIDC authentication with private key JWT")
err = updateOidcPrivateKeyJWTAuthInfo(config, meta)
if err != nil {
return err
}
case passwordAuthType:
k.logger.Info("Configuring SASL Password authentication")
k.saslUsername = meta.SaslUsername
Expand Down
137 changes: 84 additions & 53 deletions common/component/kafka/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,30 @@ import (
)

const (
key = "partitionKey"
keyMetadataKey = "__key"
timestampMetadataKey = "__timestamp"
offsetMetadataKey = "__offset"
partitionMetadataKey = "__partition"
topicMetadataKey = "__topic"
skipVerify = "skipVerify"
caCert = "caCert"
certificateAuthType = "certificate"
clientCert = "clientCert"
clientKey = "clientKey"
consumeRetryEnabled = "consumeRetryEnabled"
consumeRetryInterval = "consumeRetryInterval"
authType = "authType"
passwordAuthType = "password"
oidcAuthType = "oidc"
mtlsAuthType = "mtls"
awsIAMAuthType = "awsiam"
noAuthType = "none"
consumerFetchMin = "consumerFetchMin"
consumerFetchDefault = "consumerFetchDefault"
channelBufferSize = "channelBufferSize"
valueSchemaType = "valueSchemaType"
key = "partitionKey"
keyMetadataKey = "__key"
timestampMetadataKey = "__timestamp"
offsetMetadataKey = "__offset"
partitionMetadataKey = "__partition"
topicMetadataKey = "__topic"
skipVerify = "skipVerify"
caCert = "caCert"
certificateAuthType = "certificate"
clientCert = "clientCert"
clientKey = "clientKey"
consumeRetryEnabled = "consumeRetryEnabled"
consumeRetryInterval = "consumeRetryInterval"
authType = "authType"
passwordAuthType = "password"
oidcAuthType = "oidc"
oidcPrivateKeyJWTAuthType = "oidc_private_key_jwt"
mtlsAuthType = "mtls"
awsIAMAuthType = "awsiam"
noAuthType = "none"
consumerFetchMin = "consumerFetchMin"
consumerFetchDefault = "consumerFetchDefault"
channelBufferSize = "channelBufferSize"
valueSchemaType = "valueSchemaType"

// Kafka client config default values.
// Refresh interval < keep alive time so that way connection can be kept alive indefinitely if desired.
Expand All @@ -62,36 +63,40 @@ const (
)

type KafkaMetadata struct {
Brokers string `mapstructure:"brokers"`
internalBrokers []string `mapstructure:"-"`
ConsumerGroup string `mapstructure:"consumerGroup"`
ClientID string `mapstructure:"clientId"`
AuthType string `mapstructure:"authType"`
SaslUsername string `mapstructure:"saslUsername"`
SaslPassword string `mapstructure:"saslPassword"`
SaslMechanism string `mapstructure:"saslMechanism"`
InitialOffset string `mapstructure:"initialOffset"`
internalInitialOffset int64 `mapstructure:"-"`
MaxMessageBytes int `mapstructure:"maxMessageBytes"`
OidcTokenEndpoint string `mapstructure:"oidcTokenEndpoint"`
OidcClientID string `mapstructure:"oidcClientID"`
OidcClientSecret string `mapstructure:"oidcClientSecret"`
OidcScopes string `mapstructure:"oidcScopes"`
OidcExtensions string `mapstructure:"oidcExtensions"`
internalOidcScopes []string `mapstructure:"-"`
TLSDisable bool `mapstructure:"disableTls"`
TLSSkipVerify bool `mapstructure:"skipVerify"`
TLSCaCert string `mapstructure:"caCert"`
TLSClientCert string `mapstructure:"clientCert"`
TLSClientKey string `mapstructure:"clientKey"`
ConsumeRetryEnabled bool `mapstructure:"consumeRetryEnabled"`
ConsumeRetryInterval time.Duration `mapstructure:"consumeRetryInterval"`
HeartbeatInterval time.Duration `mapstructure:"heartbeatInterval"`
SessionTimeout time.Duration `mapstructure:"sessionTimeout"`
Version string `mapstructure:"version"`
EscapeHeaders bool `mapstructure:"escapeHeaders"`
internalVersion sarama.KafkaVersion `mapstructure:"-"`
internalOidcExtensions map[string]string `mapstructure:"-"`
Brokers string `mapstructure:"brokers"`
internalBrokers []string `mapstructure:"-"`
ConsumerGroup string `mapstructure:"consumerGroup"`
ClientID string `mapstructure:"clientId"`
AuthType string `mapstructure:"authType"`
SaslUsername string `mapstructure:"saslUsername"`
SaslPassword string `mapstructure:"saslPassword"`
SaslMechanism string `mapstructure:"saslMechanism"`
InitialOffset string `mapstructure:"initialOffset"`
internalInitialOffset int64 `mapstructure:"-"`
MaxMessageBytes int `mapstructure:"maxMessageBytes"`
OidcTokenEndpoint string `mapstructure:"oidcTokenEndpoint"`
OidcClientID string `mapstructure:"oidcClientID"`
OidcClientSecret string `mapstructure:"oidcClientSecret"`
OidcScopes string `mapstructure:"oidcScopes"`
OidcExtensions string `mapstructure:"oidcExtensions"`
OidcClientAssertionCert string `mapstructure:"oidcClientAssertionCert"`
OidcClientAssertionKey string `mapstructure:"oidcClientAssertionKey"`
OidcResource string `mapstructure:"oidcResource"`
OidcAudience string `mapstructure:"oidcAudience"`
internalOidcScopes []string `mapstructure:"-"`
TLSDisable bool `mapstructure:"disableTls"`
TLSSkipVerify bool `mapstructure:"skipVerify"`
TLSCaCert string `mapstructure:"caCert"`
TLSClientCert string `mapstructure:"clientCert"`
TLSClientKey string `mapstructure:"clientKey"`
ConsumeRetryEnabled bool `mapstructure:"consumeRetryEnabled"`
ConsumeRetryInterval time.Duration `mapstructure:"consumeRetryInterval"`
HeartbeatInterval time.Duration `mapstructure:"heartbeatInterval"`
SessionTimeout time.Duration `mapstructure:"sessionTimeout"`
Version string `mapstructure:"version"`
EscapeHeaders bool `mapstructure:"escapeHeaders"`
internalVersion sarama.KafkaVersion `mapstructure:"-"`
internalOidcExtensions map[string]string `mapstructure:"-"`

// configs for kafka client
ClientConnectionTopicMetadataRefreshInterval time.Duration `mapstructure:"clientConnectionTopicMetadataRefreshInterval"`
Expand Down Expand Up @@ -243,6 +248,32 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error)
}
}
k.logger.Debug("Configuring SASL token authentication via OIDC.")
case oidcPrivateKeyJWTAuthType:
if m.OidcTokenEndpoint == "" {
return nil, errors.New("kafka error: missing OIDC Token Endpoint for authType 'oidc_private_key_jwt'")
}
if m.OidcClientID == "" {
return nil, errors.New("kafka error: missing OIDC Client ID for authType 'oidc_private_key_jwt'")
}
if m.OidcClientAssertionCert == "" {
return nil, errors.New("kafka error: missing OIDC Client Assertion Cert for authType 'oidc_private_key_jwt'")
}
if m.OidcClientAssertionKey == "" {
return nil, errors.New("kafka error: missing OIDC Client Assertion Key for authType 'oidc_private_key_jwt'")
}
if m.OidcScopes != "" {
m.internalOidcScopes = strings.Split(m.OidcScopes, ",")
} else {
k.logger.Warn("Warning: no OIDC scopes specified, using default 'openid' scope only. This is a security risk for token reuse.")
m.internalOidcScopes = []string{"openid"}
}
if m.OidcExtensions != "" {
err = json.Unmarshal([]byte(m.OidcExtensions), &m.internalOidcExtensions)
if err != nil || len(m.internalOidcExtensions) < 1 {
return nil, errors.New("kafka error: improper OIDC Extensions format for authType 'oidc_private_key_jwt'")
}
}
k.logger.Debug("Configuring SASL token authentication via OIDC with private_key_jwt.")
case mtlsAuthType:
if m.TLSClientCert != "" {
if !isValidPEM(m.TLSClientCert) {
Expand Down
34 changes: 33 additions & 1 deletion common/component/kafka/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func TestMissingSaslValuesOnUpgrade(t *testing.T) {
require.Equal(t, fmt.Sprintf("kafka error: missing SASL Username for authType '%s'", passwordAuthType), err.Error())
}

func TestMissingOidcValues(t *testing.T) {
func TestMissingOidcClientSecretValues(t *testing.T) {
k := getKafka()
m := map[string]string{"brokers": "akfak.com:9092", "authType": oidcAuthType}
meta, err := k.getKafkaMetadata(m)
Expand All @@ -242,6 +242,38 @@ func TestMissingOidcValues(t *testing.T) {
require.Contains(t, meta.internalOidcScopes, "openid")
}

func TestMissingOidcPrivateKeyJwtValues(t *testing.T) {
k := getKafka()
m := map[string]string{"brokers": "akfak.com:9092", "authType": oidcPrivateKeyJWTAuthType}
meta, err := k.getKafkaMetadata(m)
require.Error(t, err)
require.Nil(t, meta)
require.Equal(t, "kafka error: missing OIDC Token Endpoint for authType 'oidc_private_key_jwt'", err.Error())

m["oidcTokenEndpoint"] = "https://sassa.fra/"
meta, err = k.getKafkaMetadata(m)
require.Error(t, err)
require.Nil(t, meta)
require.Equal(t, "kafka error: missing OIDC Client ID for authType 'oidc_private_key_jwt'", err.Error())

m["oidcClientID"] = "sassafras"
meta, err = k.getKafkaMetadata(m)
require.Error(t, err)
require.Nil(t, meta)
require.Equal(t, "kafka error: missing OIDC Client Assertion Cert for authType 'oidc_private_key_jwt'", err.Error())

m["oidcClientAssertionCert"] = "sassapass"
meta, err = k.getKafkaMetadata(m)
require.Error(t, err)
require.Nil(t, meta)
require.Equal(t, "kafka error: missing OIDC Client Assertion Key for authType 'oidc_private_key_jwt'", err.Error())

m["oidcClientAssertionKey"] = "sassapass"
meta, err = k.getKafkaMetadata(m)
require.NoError(t, err)
require.Contains(t, meta.internalOidcScopes, "openid")
}

func TestPresentSaslValues(t *testing.T) {
k := getKafka()
m := map[string]string{
Expand Down
27 changes: 15 additions & 12 deletions common/component/kafka/sasl_oauthbearer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,27 +51,24 @@ func (m KafkaMetadata) getOAuthTokenSource() *OAuthTokenSource {
}
}

var tokenRequestTimeout, _ = time.ParseDuration("30s")

func (ts *OAuthTokenSource) addCa(caPem string) error {
pemBytes := []byte(caPem)

block, _ := pem.Decode(pemBytes)

if block == nil || block.Type != "CERTIFICATE" {
return errors.New("PEM data not valid or not of a valid type (CERTIFICATE)")
if block == nil {
return errors.New("no PEM block found")
}

caCert, err := x509.ParseCertificate(block.Bytes)
cert, err := x509.ParseCertificate(block.Bytes)
if err != nil {
return fmt.Errorf("error parsing PEM certificate: %w", err)
}
if cert == nil {
return errors.New("no certificate found")
}

if ts.trustedCas == nil {
ts.trustedCas = make([]*x509.Certificate, 0)
}
ts.trustedCas = append(ts.trustedCas, caCert)

ts.trustedCas = append(ts.trustedCas, cert)
return nil
}

Expand Down Expand Up @@ -113,9 +110,15 @@ func (ts *OAuthTokenSource) Token() (*sarama.AccessToken, error) {
return nil, errors.New("cannot generate token, OAuthTokenSource not fully configured")
}

oidcCfg := ccred.Config{ClientID: ts.ClientID, ClientSecret: ts.ClientSecret, Scopes: ts.Scopes, TokenURL: ts.TokenEndpoint.TokenURL, AuthStyle: ts.TokenEndpoint.AuthStyle}
oidcCfg := ccred.Config{
ClientID: ts.ClientID,
ClientSecret: ts.ClientSecret,
Scopes: ts.Scopes,
TokenURL: ts.TokenEndpoint.TokenURL,
AuthStyle: ts.TokenEndpoint.AuthStyle,
}

timeoutCtx, cancel := ctx.WithTimeout(ctx.TODO(), tokenRequestTimeout)
timeoutCtx, cancel := ctx.WithTimeout(ctx.TODO(), 30*time.Second)
defer cancel()

ts.configureClient()
Expand Down
Loading
Loading