Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions google/auth/external_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import re

import six
from urllib3.util import parse_url

from google.auth import _helpers
from google.auth import credentials
Expand All @@ -51,6 +52,22 @@
# Cloud resource manager URL used to retrieve project information.
_CLOUD_RESOURCE_MANAGER = "https://cloudresourcemanager.googleapis.com/v1/projects/"

# Token url patterns
_TOKEN_URL_PATTERNS = [
"^[^\\.\\s\\/\\\\]+\\.sts\\.googleapis\\.com$",
"^sts\\.googleapis\\.com$",
"^sts\\.[^\\.\\s\\/\\\\]+\\.googleapis\\.com$",
"^[^\\.\\s\\/\\\\]+\\-sts\\.googleapis\\.com$",
]

# Service account impersonation url patterns
_SERVICE_ACCOUNT_IMPERSONATION_URL_PATTERNS = [
"^[^\\.\\s\\/\\\\]+\\.iamcredentials\\.googleapis\\.com$",
"^iamcredentials\\.googleapis\\.com$",
"^iamcredentials\\.[^\\.\\s\\/\\\\]+\\.googleapis\\.com$",
"^[^\\.\\s\\/\\\\]+\\-iamcredentials\\.googleapis\\.com$",
]


@six.add_metaclass(abc.ABCMeta)
class Credentials(credentials.Scoped, credentials.CredentialsWithQuotaProject):
Expand Down Expand Up @@ -114,6 +131,12 @@ def __init__(
self._default_scopes = default_scopes
self._workforce_pool_user_project = workforce_pool_user_project

Credentials.validate_token_url(token_url)
if service_account_impersonation_url:
Credentials.validate_service_account_impersonation_url(
service_account_impersonation_url
)

if self._client_id:
self._client_auth = utils.ClientAuthentication(
utils.ClientAuthType.basic, self._client_id, self._client_secret
Expand Down Expand Up @@ -413,3 +436,35 @@ def _initialize_impersonated_credentials(self):
quota_project_id=self._quota_project_id,
iam_endpoint_override=self._service_account_impersonation_url,
)

@staticmethod
def validate_token_url(token_url):
if not Credentials.is_valid_url(_TOKEN_URL_PATTERNS, token_url):
raise ValueError("The provided token URL is invalid.")

@staticmethod
def validate_service_account_impersonation_url(url):
if not Credentials.is_valid_url(
_SERVICE_ACCOUNT_IMPERSONATION_URL_PATTERNS, url
):
raise ValueError(
"The provided service account impersonation URL is invalid."
)

@staticmethod
def is_valid_url(patterns, url):
"""
Returns True if the provided URL's scheme is HTTPS and the host comforms to at least one of the provided patterns.
"""
try:
uri = parse_url(url)
except Exception:
return False

if not uri.scheme or uri.scheme != "https":
return False

if not uri.hostname:
return False

return any(re.compile(p).match(uri.hostname.lower()) for p in patterns)
95 changes: 94 additions & 1 deletion tests/test_external_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,77 @@ def assert_resource_manager_request_kwargs(
assert request_kwargs["headers"] == headers
assert "body" not in request_kwargs

def test_valid_token_url_shall_pass_validation(self):
# valid url doesn't throw exception, a None value should be return
assert not external_account.Credentials.validate_token_url(self.TOKEN_URL)

def test_token_url_pattern_matching(self):
# matching *.sts.googleapis.com
assert external_account.Credentials.is_valid_url(
external_account._TOKEN_URL_PATTERNS,
"https://auth.sts.googleapis.com/v1/token",
)
# matching sts.googleapis.com
assert external_account.Credentials.is_valid_url(
external_account._TOKEN_URL_PATTERNS, "https://sts.googleapis.com/v1/token"
)
# matching sts.*.googleapis.com
assert external_account.Credentials.is_valid_url(
external_account._TOKEN_URL_PATTERNS,
"https://sts.auth.googleapis.com/v1/token",
)
# matching *-sts.googleapis.com
assert external_account.Credentials.is_valid_url(
external_account._TOKEN_URL_PATTERNS,
"https://auth-sts.googleapis.com/v1/token",
)
# invalid url cannot match
assert not external_account.Credentials.is_valid_url(
external_account._TOKEN_URL_PATTERNS, "https:///v1/token"
)
assert not external_account.Credentials.is_valid_url(
external_account._TOKEN_URL_PATTERNS, "https://some-invalid-url/v1/token"
)

def test_service_account_impersonation_url_matching(self):
# matching *.iamcredentials.googleapis.com
assert external_account.Credentials.is_valid_url(
external_account._SERVICE_ACCOUNT_IMPERSONATION_URL_PATTERNS,
"https://fooauth.iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/[email protected]:generateAccessToken",
)
# matching iamcredentials.googleapis.com
assert external_account.Credentials.is_valid_url(
external_account._SERVICE_ACCOUNT_IMPERSONATION_URL_PATTERNS,
"https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/[email protected]:generateAccessToken",
)
# matching iamcredentials.*.googleapis.com
assert external_account.Credentials.is_valid_url(
external_account._SERVICE_ACCOUNT_IMPERSONATION_URL_PATTERNS,
"https://iamcredentials.fooauth.googleapis.com/v1/projects/-/serviceAccounts/[email protected]:generateAccessToken",
)
# matching *-iamcredentials.googleapis.com
assert external_account.Credentials.is_valid_url(
external_account._SERVICE_ACCOUNT_IMPERSONATION_URL_PATTERNS,
"https://us-east1-iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/[email protected]:generateAccessToken",
)
# invalid url cannot match
assert not external_account.Credentials.is_valid_url(
external_account._SERVICE_ACCOUNT_IMPERSONATION_URL_PATTERNS,
"https:///v1/accesstoken",
)
assert not external_account.Credentials.is_valid_url(
external_account._SERVICE_ACCOUNT_IMPERSONATION_URL_PATTERNS,
"https://some-invalid-url/v1",
)

def test_default_state(self):
credentials = self.make_credentials()
credentials = self.make_credentials(
service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
)

# Token url and service account impersonation url should be set
assert credentials._token_url
assert credentials._service_account_impersonation_url
# Not token acquired yet
assert not credentials.token
assert not credentials.valid
Expand All @@ -289,6 +357,31 @@ def test_default_state(self):
assert credentials.requires_scopes
assert not credentials.quota_project_id

def test_invalid_token_url(self):
with pytest.raises(ValueError) as excinfo:
CredentialsImpl(
audience=self.AUDIENCE,
subject_token_type=self.SUBJECT_TOKEN_TYPE,
token_url="https:///v1/token",
credential_source=self.CREDENTIAL_SOURCE,
)

assert excinfo.match("The provided token URL is invalid.")

def test_invalid_service_account_impersonate_url(self):
with pytest.raises(ValueError) as excinfo:
CredentialsImpl(
audience=self.AUDIENCE,
subject_token_type=self.SUBJECT_TOKEN_TYPE,
token_url=self.TOKEN_URL,
credential_source=self.CREDENTIAL_SOURCE,
service_account_impersonation_url=12345, # create an exception by sending to parse url
)

assert excinfo.match(
"The provided service account impersonation URL is invalid."
)

def test_nonworkforce_with_workforce_pool_user_project(self):
with pytest.raises(ValueError) as excinfo:
CredentialsImpl(
Expand Down