From 3fc4d648c9bcac28d8de5d407857553bbfcae785 Mon Sep 17 00:00:00 2001 From: MyMirelHub <15373565+MyMirelHub@users.noreply.github.com> Date: Wed, 14 Jan 2026 09:49:56 +0100 Subject: [PATCH 1/2] feat(pulsar): fix oauth2ClientSecretPath and add oauth2CredentialsFile supports Signed-off-by: MyMirelHub <15373565+MyMirelHub@users.noreply.github.com> --- .../oauth2/clientcredentials.go | 97 +++++- .../oauth2/clientcredentials_test.go | 179 ++++++++++++ pubsub/pulsar/metadata.yaml | 9 +- pubsub/pulsar/pulsar.go | 32 +- pubsub/pulsar/pulsar_test.go | 276 +++++++++++++++++- .../consumer_eight/pulsar.yml.tmpl | 24 ++ .../consumer_seven/pulsar.yml.tmpl | 26 ++ .../pubsub/pulsar/pulsar_test.go | 128 +++++++- 8 files changed, 735 insertions(+), 36 deletions(-) create mode 100644 tests/certification/pubsub/pulsar/components/auth-oauth2/consumer_eight/pulsar.yml.tmpl create mode 100644 tests/certification/pubsub/pulsar/components/auth-oauth2/consumer_seven/pulsar.yml.tmpl diff --git a/common/authentication/oauth2/clientcredentials.go b/common/authentication/oauth2/clientcredentials.go index 0bc105f387..a0cd03a62a 100644 --- a/common/authentication/oauth2/clientcredentials.go +++ b/common/authentication/oauth2/clientcredentials.go @@ -17,10 +17,13 @@ import ( "context" "crypto/tls" "crypto/x509" + "encoding/json" "errors" "fmt" "net/http" "net/url" + "os" + "strings" "sync" "time" @@ -33,13 +36,93 @@ import ( // ClientCredentialsMetadata is the metadata fields which can be used by a // component to configure an OIDC client_credentials token source. type ClientCredentialsMetadata struct { - TokenCAPEM string `mapstructure:"oauth2TokenCAPEM"` - TokenURL string `mapstructure:"oauth2TokenURL"` - ClientID string `mapstructure:"oauth2ClientID"` - ClientSecret string `mapstructure:"oauth2ClientSecret"` - ClientSecretPath string `mapstructure:"oauth2ClientSecretPath"` - Audiences []string `mapstructure:"oauth2Audiences"` - Scopes []string `mapstructure:"oauth2Scopes"` + TokenCAPEM string `mapstructure:"oauth2TokenCAPEM"` + TokenURL string `mapstructure:"oauth2TokenURL"` + ClientID string `mapstructure:"oauth2ClientID"` + ClientSecret string `mapstructure:"oauth2ClientSecret"` + ClientSecretPath string `mapstructure:"oauth2ClientSecretPath"` + CredentialsFilePath string `mapstructure:"oauth2CredentialsFile"` + Audiences []string `mapstructure:"oauth2Audiences"` + Scopes []string `mapstructure:"oauth2Scopes"` +} + +// ResolveCredentials loads client_id and client_secret from files if configured. +func (m *ClientCredentialsMetadata) ResolveCredentials() error { + if m.CredentialsFilePath != "" && m.ClientSecretPath != "" { + return errors.New("'oauth2CredentialsFile' and 'oauth2ClientSecretPath' fields are mutually exclusive") + } + + if m.CredentialsFilePath != "" { + fileClientID, fileClientSecret, fileIssuerURL, err := LoadCredentialsFromJSONFile(m.CredentialsFilePath) + if err != nil { + return fmt.Errorf("failed to load credentials from JSON file: %w", err) + } + + // Metadata overrides file values + if m.ClientID == "" { + m.ClientID = fileClientID + } + if m.ClientSecret == "" { + m.ClientSecret = fileClientSecret + } + if m.TokenURL == "" { + m.TokenURL = fileIssuerURL + } + return nil + } + + if m.ClientSecretPath != "" { + // Metadata overrides file value + if m.ClientSecret == "" { + secretBytes, err := os.ReadFile(m.ClientSecretPath) + if err != nil { + return fmt.Errorf("could not read oauth2 client secret from file %q: %w", m.ClientSecretPath, err) + } + m.ClientSecret = strings.TrimSpace(string(secretBytes)) + } + return nil + } + + return nil +} + +// ToOptions converts ClientCredentialsMetadata to ClientCredentialsOptions. +func (m *ClientCredentialsMetadata) ToOptions(logger logger.Logger) ClientCredentialsOptions { + return ClientCredentialsOptions{ + Logger: logger, + TokenURL: m.TokenURL, + CAPEM: []byte(m.TokenCAPEM), + ClientID: m.ClientID, + ClientSecret: m.ClientSecret, + Scopes: m.Scopes, + Audiences: m.Audiences, + } +} + +// CredentialsFile represents a JSON credentials file. +type CredentialsFile struct { + ClientID string `json:"client_id"` + ClientSecret string `json:"client_secret"` + IssuerURL string `json:"issuer_url"` +} + +// LoadCredentialsFromJSONFile reads client_id, client_secret, and issuer_url from a JSON file. +func LoadCredentialsFromJSONFile(filePath string) (clientID, clientSecret, issuerURL string, err error) { + secretBytes, err := os.ReadFile(filePath) + if err != nil { + return "", "", "", fmt.Errorf("could not read oauth2 credentials from file %q: %w", filePath, err) + } + + var creds CredentialsFile + if err := json.Unmarshal(secretBytes, &creds); err != nil { + return "", "", "", fmt.Errorf("failed to parse JSON credentials file: %w", err) + } + + if creds.ClientID == "" || creds.ClientSecret == "" || creds.IssuerURL == "" { + return "", "", "", errors.New("credentials file must contain client_id, client_secret, and issuer_url") + } + + return creds.ClientID, creds.ClientSecret, creds.IssuerURL, nil } type ClientCredentialsOptions struct { diff --git a/common/authentication/oauth2/clientcredentials_test.go b/common/authentication/oauth2/clientcredentials_test.go index df43b3437e..8e9595ad75 100644 --- a/common/authentication/oauth2/clientcredentials_test.go +++ b/common/authentication/oauth2/clientcredentials_test.go @@ -16,6 +16,7 @@ package oauth2 import ( "context" "net/url" + "os" "testing" "time" @@ -116,3 +117,181 @@ func Test_TokenRenewal(t *testing.T) { require.NoError(t, err) assert.Equal(t, "new-token", tok) } + +func TestLoadCredentialsFromJSONFile(t *testing.T) { + t.Run("valid JSON", func(t *testing.T) { + tmpFile, err := os.CreateTemp(t.TempDir(), "credentials-*.json") + require.NoError(t, err) + defer os.Remove(tmpFile.Name()) + + content := `{"client_id": "test-id", "client_secret": "test-secret", "issuer_url": "https://oauth.example.com/token"}` + _, err = tmpFile.WriteString(content) + require.NoError(t, err) + require.NoError(t, tmpFile.Close()) + + clientID, clientSecret, issuerURL, err := LoadCredentialsFromJSONFile(tmpFile.Name()) + require.NoError(t, err) + assert.Equal(t, "test-id", clientID) + assert.Equal(t, "test-secret", clientSecret) + assert.Equal(t, "https://oauth.example.com/token", issuerURL) + }) + + t.Run("missing required fields", func(t *testing.T) { + tmpFile, err := os.CreateTemp(t.TempDir(), "credentials-*.json") + require.NoError(t, err) + defer os.Remove(tmpFile.Name()) + + _, err = tmpFile.WriteString(`{"client_id": "test-id"}`) + require.NoError(t, err) + require.NoError(t, tmpFile.Close()) + + _, _, _, err = LoadCredentialsFromJSONFile(tmpFile.Name()) + require.Error(t, err) + assert.Contains(t, err.Error(), "must contain client_id, client_secret, and issuer_url") + }) + + t.Run("invalid JSON", func(t *testing.T) { + tmpFile, err := os.CreateTemp(t.TempDir(), "credentials-*.json") + require.NoError(t, err) + defer os.Remove(tmpFile.Name()) + + _, err = tmpFile.WriteString("{ invalid json }") + require.NoError(t, err) + require.NoError(t, tmpFile.Close()) + + _, _, _, err = LoadCredentialsFromJSONFile(tmpFile.Name()) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to parse JSON") + }) + + t.Run("file not found", func(t *testing.T) { + _, _, _, err := LoadCredentialsFromJSONFile("/nonexistent/file/path") + require.Error(t, err) + assert.Contains(t, err.Error(), "could not read oauth2 credentials from file") + }) +} + +func TestClientCredentialsMetadata_ResolveCredentials(t *testing.T) { + t.Run("oauth2CredentialsFile with metadata override", func(t *testing.T) { + tmpFile, err := os.CreateTemp(t.TempDir(), "credentials-*.json") + require.NoError(t, err) + defer os.Remove(tmpFile.Name()) + + _, err = tmpFile.WriteString(`{"client_id": "file-id", "client_secret": "file-secret", "issuer_url": "https://file.com/token"}`) + require.NoError(t, err) + require.NoError(t, tmpFile.Close()) + + m := ClientCredentialsMetadata{ + ClientID: "meta-id", + ClientSecret: "meta-secret", + TokenURL: "https://meta.com/token", + CredentialsFilePath: tmpFile.Name(), + } + + err = m.ResolveCredentials() + require.NoError(t, err) + assert.Equal(t, "meta-id", m.ClientID) // metadata overrides + assert.Equal(t, "meta-secret", m.ClientSecret) // metadata overrides + assert.Equal(t, "https://meta.com/token", m.TokenURL) // metadata overrides + }) + + t.Run("oauth2CredentialsFile without metadata", func(t *testing.T) { + tmpFile, err := os.CreateTemp(t.TempDir(), "credentials-*.json") + require.NoError(t, err) + defer os.Remove(tmpFile.Name()) + + _, err = tmpFile.WriteString(`{"client_id": "file-id", "client_secret": "file-secret", "issuer_url": "https://file.com/token"}`) + require.NoError(t, err) + require.NoError(t, tmpFile.Close()) + + m := ClientCredentialsMetadata{CredentialsFilePath: tmpFile.Name()} + err = m.ResolveCredentials() + require.NoError(t, err) + assert.Equal(t, "file-id", m.ClientID) + assert.Equal(t, "file-secret", m.ClientSecret) + assert.Equal(t, "https://file.com/token", m.TokenURL) + }) + + t.Run("oauth2ClientSecretPath with metadata override", func(t *testing.T) { + tmpFile, err := os.CreateTemp(t.TempDir(), "secret-*.txt") + require.NoError(t, err) + defer os.Remove(tmpFile.Name()) + + _, err = tmpFile.WriteString("file-secret") + require.NoError(t, err) + require.NoError(t, tmpFile.Close()) + + m := ClientCredentialsMetadata{ + ClientID: "meta-id", + ClientSecret: "meta-secret", + ClientSecretPath: tmpFile.Name(), + } + + err = m.ResolveCredentials() + require.NoError(t, err) + assert.Equal(t, "meta-id", m.ClientID) + assert.Equal(t, "meta-secret", m.ClientSecret) // metadata overrides + }) + + t.Run("oauth2ClientSecretPath without metadata", func(t *testing.T) { + tmpFile, err := os.CreateTemp(t.TempDir(), "secret-*.txt") + require.NoError(t, err) + defer os.Remove(tmpFile.Name()) + + _, err = tmpFile.WriteString("file-secret") + require.NoError(t, err) + require.NoError(t, tmpFile.Close()) + + m := ClientCredentialsMetadata{ClientSecretPath: tmpFile.Name()} + err = m.ResolveCredentials() + require.NoError(t, err) + assert.Equal(t, "file-secret", m.ClientSecret) + }) + + t.Run("error both fields set", func(t *testing.T) { + jsonFile, err := os.CreateTemp(t.TempDir(), "credentials-*.json") + require.NoError(t, err) + defer os.Remove(jsonFile.Name()) + _, err = jsonFile.WriteString(`{"client_id": "id", "client_secret": "secret", "issuer_url": "https://example.com"}`) + require.NoError(t, err) + require.NoError(t, jsonFile.Close()) + + txtFile, err := os.CreateTemp(t.TempDir(), "secret-*.txt") + require.NoError(t, err) + defer os.Remove(txtFile.Name()) + _, err = txtFile.WriteString("secret") + require.NoError(t, err) + require.NoError(t, txtFile.Close()) + + m := ClientCredentialsMetadata{ + CredentialsFilePath: jsonFile.Name(), + ClientSecretPath: txtFile.Name(), + } + + err = m.ResolveCredentials() + require.Error(t, err) + assert.Contains(t, err.Error(), "mutually exclusive") + }) +} + +func TestClientCredentialsMetadata_ToOptions(t *testing.T) { + logger := logger.NewLogger("test") + metadata := ClientCredentialsMetadata{ + TokenURL: "https://token.example.com", + TokenCAPEM: "cert-pem-content", + ClientID: "test-client-id", + ClientSecret: "test-client-secret", + Scopes: []string{"scope1", "scope2"}, + Audiences: []string{"audience1"}, + } + + opts := metadata.ToOptions(logger) + + assert.Equal(t, logger, opts.Logger) + assert.Equal(t, "https://token.example.com", opts.TokenURL) + assert.Equal(t, []byte("cert-pem-content"), opts.CAPEM) + assert.Equal(t, "test-client-id", opts.ClientID) + assert.Equal(t, "test-client-secret", opts.ClientSecret) + assert.Equal(t, []string{"scope1", "scope2"}, opts.Scopes) + assert.Equal(t, []string{"audience1"}, opts.Audiences) +} diff --git a/pubsub/pulsar/metadata.yaml b/pubsub/pulsar/metadata.yaml index 6ce951b7b2..5552f36ba3 100644 --- a/pubsub/pulsar/metadata.yaml +++ b/pubsub/pulsar/metadata.yaml @@ -34,8 +34,13 @@ authenticationProfiles: - name: oauth2ClientSecretPath type: string description: | - The path to the OAuth Client Secret. - example: "/path/to/oauth2/client_secret.json" + The path to a plain text file containing the OAuth Client Secret. + example: "/path/to/oauth2/client_secret.txt" + - name: oauth2CredentialsFile + type: string + description: | + The path to a JSON file containing both client_id and client_secret. + example: "/path/to/oauth2/credentials.json" - name: oauth2TokenCAPEM type: string description: | diff --git a/pubsub/pulsar/pulsar.go b/pubsub/pulsar/pulsar.go index e73880c365..dcdb7c3ff9 100644 --- a/pubsub/pulsar/pulsar.go +++ b/pubsub/pulsar/pulsar.go @@ -191,6 +191,11 @@ func parsePulsarMetadata(meta pubsub.Metadata) (*pulsarMetadata, error) { } } + // Resolve credentials from file if ClientSecretPath is set + if err := m.ClientCredentialsMetadata.ResolveCredentials(); err != nil { + return nil, err + } + return &m, nil } @@ -213,28 +218,13 @@ func (p *Pulsar) Init(ctx context.Context, metadata pubsub.Metadata) error { case len(m.Token) > 0: options.Authentication = pulsar.NewAuthenticationToken(m.Token) case len(m.ClientCredentialsMetadata.TokenURL) > 0: - credsOpts := oauth2.ClientCredentialsOptions{ - Logger: p.logger, - TokenURL: m.ClientCredentialsMetadata.TokenURL, - CAPEM: []byte(m.ClientCredentialsMetadata.TokenCAPEM), - ClientID: m.ClientCredentialsMetadata.ClientID, - ClientSecret: m.ClientCredentialsMetadata.ClientSecret, - Scopes: m.ClientCredentialsMetadata.Scopes, - Audiences: m.ClientCredentialsMetadata.Audiences, - } - if len(m.ClientCredentialsMetadata.ClientSecretPath) > 0 { - if _, err = oauth2.NewClientCredentials(ctx, credsOpts); err != nil { - return fmt.Errorf("could not instantiate oauth2 token provider: %w", err) - } - options.Authentication = pulsar.NewAuthenticationTokenFromFile(m.ClientSecretPath) - } else { - var cliCreds *oauth2.ClientCredentials - cliCreds, err = oauth2.NewClientCredentials(ctx, credsOpts) - if err != nil { - return fmt.Errorf("could not instantiate oauth2 token provider: %w", err) - } - options.Authentication = pulsar.NewAuthenticationTokenFromSupplier(cliCreds.Token) + credsOpts := m.ClientCredentialsMetadata.ToOptions(p.logger) + var cliCreds *oauth2.ClientCredentials + cliCreds, err = oauth2.NewClientCredentials(ctx, credsOpts) + if err != nil { + return fmt.Errorf("could not instantiate oauth2 token provider: %w", err) } + options.Authentication = pulsar.NewAuthenticationTokenFromSupplier(cliCreds.Token) } client, err := p.newClientFn(options) diff --git a/pubsub/pulsar/pulsar_test.go b/pubsub/pulsar/pulsar_test.go index b55e1f3191..7dd8d2d12c 100644 --- a/pubsub/pulsar/pulsar_test.go +++ b/pubsub/pulsar/pulsar_test.go @@ -14,6 +14,7 @@ limitations under the License. package pulsar import ( + "fmt" "net/http" "net/http/httptest" "os" @@ -675,7 +676,7 @@ func TestSanitiseURL(t *testing.T) { } } -func TestInitUsesTokenFromFileWhenClientSecretPathProvided(t *testing.T) { +func TestInitUsesTokenSupplierWhenClientSecretPathProvided(t *testing.T) { server := newOAuthTestServer(t) secretPath := writeTempFile(t, "rotating-secret") @@ -702,10 +703,281 @@ func TestInitUsesTokenFromFileWhenClientSecretPathProvided(t *testing.T) { require.NoError(t, err) require.NotNil(t, capturedOpts.Authentication) - expected := pulsar.NewAuthenticationTokenFromFile(secretPath) + // Should use TokenSupplier, not TokenFromFile + expected := pulsar.NewAuthenticationTokenFromSupplier(func() (string, error) { + return "", nil + }) assert.IsType(t, expected, capturedOpts.Authentication) } +func TestInitUsesTokenSupplierWithPlainTextSecretFile(t *testing.T) { + server := newOAuthTestServer(t) + secretPath := writeTempFile(t, "plain-text-secret-12345") + + var capturedOpts pulsar.ClientOptions + p := NewPulsar(logger.NewLogger("test")).(*Pulsar) + t.Cleanup(func() { + p.newClientFn = pulsar.NewClient + }) + p.newClientFn = func(opts pulsar.ClientOptions) (pulsar.Client, error) { + capturedOpts = opts + return nil, nil + } + + md := pubsub.Metadata{} + md.Properties = map[string]string{ + "host": "localhost:6650", + "oauth2TokenURL": server.URL, + "oauth2ClientID": "client-id", + "oauth2ClientSecretPath": secretPath, + "oauth2Scopes": "scope1", + "oauth2Audiences": "aud1", + } + err := p.Init(t.Context(), md) + + require.NoError(t, err) + require.NotNil(t, capturedOpts.Authentication) + expected := pulsar.NewAuthenticationTokenFromSupplier(func() (string, error) { + return "", nil + }) + assert.IsType(t, expected, capturedOpts.Authentication) +} + +func TestInitUsesTokenSupplierWithJSONSecretFile(t *testing.T) { + server := newOAuthTestServer(t) + credentialsPath := writeTempFile(t, fmt.Sprintf(`{ + "client_id": "json-id-from-file", + "client_secret": "json-secret-from-file", + "issuer_url": "%s" + }`, server.URL)) + + var capturedOpts pulsar.ClientOptions + p := NewPulsar(logger.NewLogger("test")).(*Pulsar) + t.Cleanup(func() { + p.newClientFn = pulsar.NewClient + }) + p.newClientFn = func(opts pulsar.ClientOptions) (pulsar.Client, error) { + capturedOpts = opts + return nil, nil + } + + md := pubsub.Metadata{} + md.Properties = map[string]string{ + "host": "localhost:6650", + "oauth2CredentialsFile": credentialsPath, + "oauth2Scopes": "scope1", + "oauth2Audiences": "aud1", + } + err := p.Init(t.Context(), md) + + require.NoError(t, err) + require.NotNil(t, capturedOpts.Authentication) + expected := pulsar.NewAuthenticationTokenFromSupplier(func() (string, error) { + return "", nil + }) + assert.IsType(t, expected, capturedOpts.Authentication) +} + +func TestInitUsesTokenSupplierWithPlainTextSecretFileContent(t *testing.T) { + server := newOAuthTestServer(t) + // oauth2ClientSecretPath should only handle plain text, not JSON + secretPath := writeTempFile(t, "plain-text-secret-content-12345") + + var capturedOpts pulsar.ClientOptions + p := NewPulsar(logger.NewLogger("test")).(*Pulsar) + t.Cleanup(func() { + p.newClientFn = pulsar.NewClient + }) + p.newClientFn = func(opts pulsar.ClientOptions) (pulsar.Client, error) { + capturedOpts = opts + return nil, nil + } + + md := pubsub.Metadata{} + md.Properties = map[string]string{ + "host": "localhost:6650", + "oauth2TokenURL": server.URL, + "oauth2ClientID": "client-id", + "oauth2ClientSecretPath": secretPath, + "oauth2Scopes": "scope1", + "oauth2Audiences": "aud1", + } + err := p.Init(t.Context(), md) + + require.NoError(t, err) + require.NotNil(t, capturedOpts.Authentication) + expected := pulsar.NewAuthenticationTokenFromSupplier(func() (string, error) { + return "", nil + }) + assert.IsType(t, expected, capturedOpts.Authentication) +} + +func TestInitUsesTokenSupplierWithEmptySecretFile(t *testing.T) { + server := newOAuthTestServer(t) + secretPath := writeTempFile(t, "") + + var capturedOpts pulsar.ClientOptions + p := NewPulsar(logger.NewLogger("test")).(*Pulsar) + t.Cleanup(func() { + p.newClientFn = pulsar.NewClient + }) + p.newClientFn = func(opts pulsar.ClientOptions) (pulsar.Client, error) { + capturedOpts = opts + return nil, nil + } + + md := pubsub.Metadata{} + md.Properties = map[string]string{ + "host": "localhost:6650", + "oauth2TokenURL": server.URL, + "oauth2ClientID": "client-id", + "oauth2ClientSecretPath": secretPath, + "oauth2Scopes": "scope1", + "oauth2Audiences": "aud1", + } + err := p.Init(t.Context(), md) + + require.NoError(t, err) + require.NotNil(t, capturedOpts.Authentication) + expected := pulsar.NewAuthenticationTokenFromSupplier(func() (string, error) { + return "", nil + }) + assert.IsType(t, expected, capturedOpts.Authentication) +} + +func TestInitUsesTokenSupplierWithClientCredentialsJSONFile(t *testing.T) { + server := newOAuthTestServer(t) + // Test oauth2CredentialsFile with JSON containing client_id and client_secret + credentialsJSON := fmt.Sprintf(`{ + "client_id": "d9ZyX97q1ef8Cr81WHVC4hFQ64vSlDK3", + "client_secret": "on1uJ...k6F6R", + "issuer_url": "%s" + }`, server.URL) + credentialsPath := writeTempFile(t, credentialsJSON) + + var capturedOpts pulsar.ClientOptions + p := NewPulsar(logger.NewLogger("test")).(*Pulsar) + t.Cleanup(func() { + p.newClientFn = pulsar.NewClient + }) + p.newClientFn = func(opts pulsar.ClientOptions) (pulsar.Client, error) { + capturedOpts = opts + return nil, nil + } + + md := pubsub.Metadata{} + md.Properties = map[string]string{ + "host": "localhost:6650", + "oauth2CredentialsFile": credentialsPath, + "oauth2Scopes": "scope1", + "oauth2Audiences": "aud1", + } + err := p.Init(t.Context(), md) + + require.NoError(t, err) + require.NotNil(t, capturedOpts.Authentication) + expected := pulsar.NewAuthenticationTokenFromSupplier(func() (string, error) { + return "", nil + }) + assert.IsType(t, expected, capturedOpts.Authentication) +} + +func TestInitUsesTokenSupplierWithClientCredentialsJSONFileAndIssuerURL(t *testing.T) { + server := newOAuthTestServer(t) + // Test that issuer_url in JSON file is used as TokenURL + credentialsJSON := fmt.Sprintf(`{ + "client_id": "test-client-id", + "client_secret": "test-client-secret", + "issuer_url": "%s" + }`, server.URL) + credentialsPath := writeTempFile(t, credentialsJSON) + + var capturedOpts pulsar.ClientOptions + p := NewPulsar(logger.NewLogger("test")).(*Pulsar) + t.Cleanup(func() { + p.newClientFn = pulsar.NewClient + }) + p.newClientFn = func(opts pulsar.ClientOptions) (pulsar.Client, error) { + capturedOpts = opts + return nil, nil + } + + md := pubsub.Metadata{} + md.Properties = map[string]string{ + "host": "localhost:6650", + "oauth2CredentialsFile": credentialsPath, + "oauth2Scopes": "scope1", + "oauth2Audiences": "aud1", + } + err := p.Init(t.Context(), md) + + require.NoError(t, err) + require.NotNil(t, capturedOpts.Authentication) + expected := pulsar.NewAuthenticationTokenFromSupplier(func() (string, error) { + return "", nil + }) + assert.IsType(t, expected, capturedOpts.Authentication) +} + +func TestInitUsesClientIDFromMetadataWhenFileHasOnlySecret(t *testing.T) { + server := newOAuthTestServer(t) + // Test that oauth2ClientSecretPath works with plain text (client_id comes from metadata) + //nolint:gosec + plainTextSecret := "plain-text-secret-12345" + secretPath := writeTempFile(t, plainTextSecret) + + var capturedOpts pulsar.ClientOptions + p := NewPulsar(logger.NewLogger("test")).(*Pulsar) + t.Cleanup(func() { + p.newClientFn = pulsar.NewClient + }) + p.newClientFn = func(opts pulsar.ClientOptions) (pulsar.Client, error) { + capturedOpts = opts + return nil, nil + } + + md := pubsub.Metadata{} + md.Properties = map[string]string{ + "host": "localhost:6650", + "oauth2TokenURL": server.URL, + "oauth2ClientID": "metadata-client-id", // client_id from metadata + "oauth2ClientSecretPath": secretPath, // plain text secret in file + "oauth2Scopes": "scope1", + "oauth2Audiences": "aud1", + } + err := p.Init(t.Context(), md) + + require.NoError(t, err) + require.NotNil(t, capturedOpts.Authentication) + expected := pulsar.NewAuthenticationTokenFromSupplier(func() (string, error) { + return "", nil + }) + assert.IsType(t, expected, capturedOpts.Authentication) +} + +func TestInitFailsWhenClientCredentialsTypeMissingClientSecret(t *testing.T) { + // Test that credentials file requires client_secret + //nolint:gosec + credentialsJSON := `{ + "client_id": "test-id", + "issuer_url": "https://oauth.example.com/token" + }` + secretPath := writeTempFile(t, credentialsJSON) + + md := pubsub.Metadata{} + md.Properties = map[string]string{ + "host": "localhost:6650", + "oauth2CredentialsFile": secretPath, + "oauth2Scopes": "scope1", + "oauth2Audiences": "aud1", + } + p := NewPulsar(logger.NewLogger("test")) + err := p.Init(t.Context(), md) + + require.Error(t, err) + assert.Contains(t, err.Error(), "must contain client_id, client_secret, and issuer_url") +} + func TestInitUsesTokenSupplierWhenClientSecretPathMissing(t *testing.T) { server := newOAuthTestServer(t) diff --git a/tests/certification/pubsub/pulsar/components/auth-oauth2/consumer_eight/pulsar.yml.tmpl b/tests/certification/pubsub/pulsar/components/auth-oauth2/consumer_eight/pulsar.yml.tmpl new file mode 100644 index 0000000000..aacdcf3a29 --- /dev/null +++ b/tests/certification/pubsub/pulsar/components/auth-oauth2/consumer_eight/pulsar.yml.tmpl @@ -0,0 +1,24 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: messagebus +spec: + type: pubsub.pulsar + version: v1 + metadata: + - name: host + value: "localhost:6650" + - name: consumerID + value: certification8 + - name: redeliveryDelay + value: 200ms + - name: oauth2TokenURL + value: https://localhost:8085/issuer1/token + - name: oauth2CredentialsFile + value: "{{ .CredentialsJSONFilePath }}" + - name: oauth2Scopes + value: openid + - name: oauth2Audiences + value: pulsar + - name: oauth2TokenCAPEM + value: "{{ .OAuth2CAPEM }}" \ No newline at end of file diff --git a/tests/certification/pubsub/pulsar/components/auth-oauth2/consumer_seven/pulsar.yml.tmpl b/tests/certification/pubsub/pulsar/components/auth-oauth2/consumer_seven/pulsar.yml.tmpl new file mode 100644 index 0000000000..e83f18a99c --- /dev/null +++ b/tests/certification/pubsub/pulsar/components/auth-oauth2/consumer_seven/pulsar.yml.tmpl @@ -0,0 +1,26 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: messagebus +spec: + type: pubsub.pulsar + version: v1 + metadata: + - name: host + value: "localhost:6650" + - name: consumerID + value: certification7 + - name: redeliveryDelay + value: 200ms + - name: oauth2TokenURL + value: https://localhost:8085/issuer1/token + - name: oauth2ClientID + value: foo + - name: oauth2ClientSecretPath + value: "{{ .CredentialsFilePath }}" + - name: oauth2Scopes + value: openid + - name: oauth2Audiences + value: pulsar + - name: oauth2TokenCAPEM + value: "{{ .OAuth2CAPEM }}" \ No newline at end of file diff --git a/tests/certification/pubsub/pulsar/pulsar_test.go b/tests/certification/pubsub/pulsar/pulsar_test.go index c8578dbb53..bcd6ff28c8 100644 --- a/tests/certification/pubsub/pulsar/pulsar_test.go +++ b/tests/certification/pubsub/pulsar/pulsar_test.go @@ -156,12 +156,30 @@ func TestPulsar(t *testing.T) { outf.Close() inf.Close() + // Create credentials files for testing oauth2ClientSecretPath + plainTextCredsFile := filepath.Join(dir, "credentials-plain.txt") + require.NoError(t, os.WriteFile(plainTextCredsFile, []byte("bar"), 0o644)) + + jsonCredsFile := filepath.Join(dir, "credentials.json") + jsonCreds := map[string]string{ + "client_id": "foo", + "client_secret": "bar", + "issuer_url": "https://localhost:8085/issuer1/token", + } + jsonCredsBytes, err := json.Marshal(jsonCreds) + require.NoError(t, err) + require.NoError(t, os.WriteFile(jsonCredsFile, jsonCredsBytes, 0o644)) + td := struct { - TmpDir string - OAuth2CAPEM string + TmpDir string + OAuth2CAPEM string + CredentialsFilePath string + CredentialsJSONFilePath string }{ - TmpDir: dir, - OAuth2CAPEM: strings.ReplaceAll(string(oauth2CA), "\n", "\\n"), + TmpDir: dir, + OAuth2CAPEM: strings.ReplaceAll(string(oauth2CA), "\n", "\\n"), + CredentialsFilePath: plainTextCredsFile, + CredentialsJSONFilePath: jsonCredsFile, } tmpl, err := template.New("").ParseFiles(dockerComposeAuthOAuth2YAML) @@ -942,6 +960,108 @@ func (p *pulsarSuite) TestPulsarSchema() { Run() } +// TestOAuth2WithPlainTextCredentialsFile tests OAuth2 authentication using oauth2ClientSecretPath +// with a plain text credentials file (backward compatibility). +func (p *pulsarSuite) TestOAuth2WithPlainTextCredentialsFile() { + t := p.T() + consumerGroup1 := watcher.NewUnordered() + + if p.authType != "oauth2" { + t.Skip("Skipping OAuth2 credentials file test for non-OAuth2 auth type") + return + } + + flow.New(t, "pulsar certification oauth2 plain text credentials file test"). + + // Run subscriberApplication app1 + Step(app.Run(appID1, fmt.Sprintf(":%d", appPort), + subscriberApplication(appID1, topicActiveName, consumerGroup1))). + Step(dockercompose.Run(clusterName, p.dockerComposeYAML)). + Step("wait", flow.Sleep(10*time.Second)). + Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error { + client, err := p.client(t) + if err != nil { + return fmt.Errorf("could not create pulsar client: %v", err) + } + + defer client.Close() + + consumer, err := client.Subscribe(pulsar.ConsumerOptions{ + Topic: "topic-1", + SubscriptionName: "my-sub", + Type: pulsar.Shared, + }) + if err != nil { + return fmt.Errorf("could not create pulsar Topic: %v", err) + } + defer consumer.Close() + + return err + })). + Step(sidecar.Run(sidecarName1, + append(componentRuntimeOptions(), + embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_seven")), + embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)), + embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort)), + embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort)), + )..., + )). + Step("publish messages to topic1", publishMessages(nil, sidecarName1, topicActiveName, consumerGroup1)). + Step("verify if app1 has received messages published to topic", assertMessages(10*time.Second, consumerGroup1)). + Run() +} + +// TestOAuth2WithJSONCredentialsFile tests OAuth2 authentication using oauth2CredentialsFile +// with a JSON credentials file containing both client_id and client_secret. +func (p *pulsarSuite) TestOAuth2WithJSONCredentialsFile() { + t := p.T() + consumerGroup1 := watcher.NewUnordered() + + if p.authType != "oauth2" { + t.Skip("Skipping OAuth2 credentials file test for non-OAuth2 auth type") + return + } + + flow.New(t, "pulsar certification oauth2 json credentials file test"). + + // Run subscriberApplication app1 + Step(app.Run(appID1, fmt.Sprintf(":%d", appPort), + subscriberApplication(appID1, topicActiveName, consumerGroup1))). + Step(dockercompose.Run(clusterName, p.dockerComposeYAML)). + Step("wait", flow.Sleep(10*time.Second)). + Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error { + client, err := p.client(t) + if err != nil { + return fmt.Errorf("could not create pulsar client: %v", err) + } + + defer client.Close() + + consumer, err := client.Subscribe(pulsar.ConsumerOptions{ + Topic: "topic-1", + SubscriptionName: "my-sub", + Type: pulsar.Shared, + }) + if err != nil { + return fmt.Errorf("could not create pulsar Topic: %v", err) + } + defer consumer.Close() + + return err + })). + Step(sidecar.Run(sidecarName1, + append(componentRuntimeOptions(), + embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_eight")), + embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)), + embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort)), + embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort)), + )..., + )). + Step("publish messages to topic1", publishMessages(nil, sidecarName1, topicActiveName, consumerGroup1)). + Step("verify if app1 has received messages published to topic", assertMessages(10*time.Second, consumerGroup1)). + Run() +} + func componentRuntimeOptions() []embedded.Option { log := logger.NewLogger("dapr.components") From 1a10ee84385d678b0bc182a36267842bbbd1c27a Mon Sep 17 00:00:00 2001 From: MyMirelHub <15373565+MyMirelHub@users.noreply.github.com> Date: Wed, 14 Jan 2026 17:51:20 +0100 Subject: [PATCH 2/2] refactor(pulsar): remove redundant tests for token supplier with secret files Signed-off-by: MyMirelHub <15373565+MyMirelHub@users.noreply.github.com> --- pubsub/pulsar/pulsar_test.go | 141 ----------------------------------- 1 file changed, 141 deletions(-) diff --git a/pubsub/pulsar/pulsar_test.go b/pubsub/pulsar/pulsar_test.go index 7dd8d2d12c..4e9a5f0a0c 100644 --- a/pubsub/pulsar/pulsar_test.go +++ b/pubsub/pulsar/pulsar_test.go @@ -778,147 +778,6 @@ func TestInitUsesTokenSupplierWithJSONSecretFile(t *testing.T) { assert.IsType(t, expected, capturedOpts.Authentication) } -func TestInitUsesTokenSupplierWithPlainTextSecretFileContent(t *testing.T) { - server := newOAuthTestServer(t) - // oauth2ClientSecretPath should only handle plain text, not JSON - secretPath := writeTempFile(t, "plain-text-secret-content-12345") - - var capturedOpts pulsar.ClientOptions - p := NewPulsar(logger.NewLogger("test")).(*Pulsar) - t.Cleanup(func() { - p.newClientFn = pulsar.NewClient - }) - p.newClientFn = func(opts pulsar.ClientOptions) (pulsar.Client, error) { - capturedOpts = opts - return nil, nil - } - - md := pubsub.Metadata{} - md.Properties = map[string]string{ - "host": "localhost:6650", - "oauth2TokenURL": server.URL, - "oauth2ClientID": "client-id", - "oauth2ClientSecretPath": secretPath, - "oauth2Scopes": "scope1", - "oauth2Audiences": "aud1", - } - err := p.Init(t.Context(), md) - - require.NoError(t, err) - require.NotNil(t, capturedOpts.Authentication) - expected := pulsar.NewAuthenticationTokenFromSupplier(func() (string, error) { - return "", nil - }) - assert.IsType(t, expected, capturedOpts.Authentication) -} - -func TestInitUsesTokenSupplierWithEmptySecretFile(t *testing.T) { - server := newOAuthTestServer(t) - secretPath := writeTempFile(t, "") - - var capturedOpts pulsar.ClientOptions - p := NewPulsar(logger.NewLogger("test")).(*Pulsar) - t.Cleanup(func() { - p.newClientFn = pulsar.NewClient - }) - p.newClientFn = func(opts pulsar.ClientOptions) (pulsar.Client, error) { - capturedOpts = opts - return nil, nil - } - - md := pubsub.Metadata{} - md.Properties = map[string]string{ - "host": "localhost:6650", - "oauth2TokenURL": server.URL, - "oauth2ClientID": "client-id", - "oauth2ClientSecretPath": secretPath, - "oauth2Scopes": "scope1", - "oauth2Audiences": "aud1", - } - err := p.Init(t.Context(), md) - - require.NoError(t, err) - require.NotNil(t, capturedOpts.Authentication) - expected := pulsar.NewAuthenticationTokenFromSupplier(func() (string, error) { - return "", nil - }) - assert.IsType(t, expected, capturedOpts.Authentication) -} - -func TestInitUsesTokenSupplierWithClientCredentialsJSONFile(t *testing.T) { - server := newOAuthTestServer(t) - // Test oauth2CredentialsFile with JSON containing client_id and client_secret - credentialsJSON := fmt.Sprintf(`{ - "client_id": "d9ZyX97q1ef8Cr81WHVC4hFQ64vSlDK3", - "client_secret": "on1uJ...k6F6R", - "issuer_url": "%s" - }`, server.URL) - credentialsPath := writeTempFile(t, credentialsJSON) - - var capturedOpts pulsar.ClientOptions - p := NewPulsar(logger.NewLogger("test")).(*Pulsar) - t.Cleanup(func() { - p.newClientFn = pulsar.NewClient - }) - p.newClientFn = func(opts pulsar.ClientOptions) (pulsar.Client, error) { - capturedOpts = opts - return nil, nil - } - - md := pubsub.Metadata{} - md.Properties = map[string]string{ - "host": "localhost:6650", - "oauth2CredentialsFile": credentialsPath, - "oauth2Scopes": "scope1", - "oauth2Audiences": "aud1", - } - err := p.Init(t.Context(), md) - - require.NoError(t, err) - require.NotNil(t, capturedOpts.Authentication) - expected := pulsar.NewAuthenticationTokenFromSupplier(func() (string, error) { - return "", nil - }) - assert.IsType(t, expected, capturedOpts.Authentication) -} - -func TestInitUsesTokenSupplierWithClientCredentialsJSONFileAndIssuerURL(t *testing.T) { - server := newOAuthTestServer(t) - // Test that issuer_url in JSON file is used as TokenURL - credentialsJSON := fmt.Sprintf(`{ - "client_id": "test-client-id", - "client_secret": "test-client-secret", - "issuer_url": "%s" - }`, server.URL) - credentialsPath := writeTempFile(t, credentialsJSON) - - var capturedOpts pulsar.ClientOptions - p := NewPulsar(logger.NewLogger("test")).(*Pulsar) - t.Cleanup(func() { - p.newClientFn = pulsar.NewClient - }) - p.newClientFn = func(opts pulsar.ClientOptions) (pulsar.Client, error) { - capturedOpts = opts - return nil, nil - } - - md := pubsub.Metadata{} - md.Properties = map[string]string{ - "host": "localhost:6650", - "oauth2CredentialsFile": credentialsPath, - "oauth2Scopes": "scope1", - "oauth2Audiences": "aud1", - } - err := p.Init(t.Context(), md) - - require.NoError(t, err) - require.NotNil(t, capturedOpts.Authentication) - expected := pulsar.NewAuthenticationTokenFromSupplier(func() (string, error) { - return "", nil - }) - assert.IsType(t, expected, capturedOpts.Authentication) -} - func TestInitUsesClientIDFromMetadataWhenFileHasOnlySecret(t *testing.T) { server := newOAuthTestServer(t) // Test that oauth2ClientSecretPath works with plain text (client_id comes from metadata)