Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 62 additions & 22 deletions google/auth/identity_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@
"""Identity Pool Credentials.

This module provides credentials that are initialized using external_account
arguments which are typically loaded from the external credentials file.
Unlike other Credentials that can be initialized with a list of explicit
arguments, secrets or credentials, external account clients use the
environment and hints/guidelines provided by the external_account JSON
file to retrieve credentials and exchange them for Google access tokens.
arguments which are typically loaded from an external credentials file or
an external credentials url. Unlike other Credentials that can be initialized
with a list of explicit arguments, secrets or credentials, external account
clients use the environment and hints/guidelines provided by the
external_account JSON file to retrieve credentials and exchange them for Google
access tokens.

Identity Pool Credentials are used with external credentials (eg. OIDC
ID tokens) retrieved from a file location, typical for K8s workloads
registered with Hub with Hub workload identity enabled.
registered with Hub with Hub workload identity enabled, or retrieved from an
url, typical for AWS and Azure based workflows.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have a dedicated credentials class for AWS, let's exclude AWS:

url, typical for Azure based workflows.

"""

import io
Expand All @@ -36,10 +38,7 @@


class Credentials(external_account.Credentials):
"""File-sourced external account credentials.
This is typically used to exchange OIDC ID tokens in K8s (file-sourced
credentials) for Google access tokens.
"""
"""External account credentials sourced from files and urls."""

def __init__(
self,
Expand All @@ -53,7 +52,7 @@ def __init__(
quota_project_id=None,
scopes=None,
):
"""Instantiates a file-sourced external account credentials object.
"""Instantiates an external account credentials object from a file/url.

Args:
audience (str): The STS audience field.
Expand Down Expand Up @@ -91,8 +90,12 @@ def __init__(
quota_project_id=quota_project_id,
scopes=scopes,
)
if isinstance(credential_source, dict):
if not isinstance(credential_source, dict):
self._credential_source_file = None
self._credential_source_url = None
else:
self._credential_source_file = credential_source.get("file")
self._credential_source_url = credential_source.get("url")
self._credential_source_headers = credential_source.get("headers")
credential_source_format = credential_source.get("format") or {}
# Get credential_source format type. When not provided, this
Expand All @@ -117,28 +120,65 @@ def __init__(
)
else:
self._credential_source_field_name = None
else:
self._credential_source_file = None
if not self._credential_source_file:
raise ValueError("Missing credential_source file")

if self._credential_source_file and self._credential_source_url:
raise ValueError("Ambiguous credential_source")
if not self._credential_source_file and not self._credential_source_url:
raise ValueError("Missing credential_source")

@_helpers.copy_docstring(external_account.Credentials)
def retrieve_subject_token(self, request):
return self._get_token_file(
self._credential_source_file,
return self._parse_token_data(
self._get_token_data(request),
self._credential_source_format_type,
self._credential_source_field_name,
)

def _get_token_file(
self, filename, format_type="text", subject_token_field_name=None
):
def _get_token_data(self, request):
if self._credential_source_file:
return self._get_file_data(self._credential_source_file)
if self._credential_source_url:
return self._get_url_data(
request,
self._credential_source_url,
self._credential_source_headers)

def _get_file_data(self, filename):
if not os.path.exists(filename):
raise exceptions.RefreshError("File '{}' was not found.".format(filename))

with io.open(filename, "r", encoding="utf-8") as file_obj:
content = file_obj.read()
return file_obj.read(), filename

def _get_url_data(self, request, url, headers):
response = request(
url=url,
method="GET",
headers=headers
)

# support both string and bytes type response.data
response_body = (
response.data.decode("utf-8")
if hasattr(response.data, "decode")
else response.data
)

if response.status != 200:
raise exceptions.RefreshError(
"Unable to retrieve Identity Pool subject token",
response_body
)

return response_body, url

def _parse_token_data(
self,
token_content,
format_type="text",
subject_token_field_name=None
):
content, filename = token_content
if format_type == "text":
token = content
else:
Expand Down
Loading