Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions google/auth/_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,16 +311,16 @@ def _get_external_account_credentials(info, filename, scopes=None, request=None)
"""
# There are currently 2 types of external_account credentials.
try:
# Check if configuration corresponds to an Identity Pool credentials.
from google.auth import identity_pool
# Check if configuration corresponds to an AWS credentials.
from google.auth import aws

credentials = identity_pool.Credentials.from_info(info, scopes=scopes)
credentials = aws.Credentials.from_info(info, scopes=scopes)
except ValueError:
try:
# Check if configuration corresponds to an AWS credentials.
from google.auth import aws
# Check if configuration corresponds to an Identity Pool credentials.
from google.auth import identity_pool

credentials = aws.Credentials.from_info(info, scopes=scopes)
credentials = identity_pool.Credentials.from_info(info, scopes=scopes)
except ValueError:
# If the configuration is invalid or does not correspond to any
# supported external_account credentials, raise an error.
Expand Down
106 changes: 82 additions & 24 deletions google/auth/identity_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,24 @@
"""Identity Pool Credentials.

This module provides credentials that are initialized using external_account
arguments which are typically loaded from the external credentials file.
Unlike other Credentials that can be initialized with a list of explicit
arguments, secrets or credentials, external account clients use the
environment and hints/guidelines provided by the external_account JSON
file to retrieve credentials and exchange them for Google access tokens.
arguments which are typically loaded from an external credentials file or
an external credentials url. Unlike other Credentials that can be initialized
with a list of explicit arguments, secrets or credentials, external account
clients use the environment and hints/guidelines provided by the
external_account JSON file to retrieve credentials and exchange them for Google
access tokens.

Identity Pool Credentials are used with external credentials (eg. OIDC
ID tokens) retrieved from a file location, typical for K8s workloads
registered with Hub with Hub workload identity enabled.
registered with Hub with Hub workload identity enabled, or retrieved from an
url, typical for Azure based workflows.
"""

try:
from collections.abc import Mapping
# Python 2.7 compatibility
except ImportError: # pragma: NO COVER
from collections import Mapping
import io
import json
import os
Expand All @@ -36,10 +43,7 @@


class Credentials(external_account.Credentials):
"""File-sourced external account credentials.
This is typically used to exchange OIDC ID tokens in K8s (file-sourced
credentials) for Google access tokens.
"""
"""External account credentials sourced from files and urls."""

def __init__(
self,
Expand All @@ -53,15 +57,27 @@ def __init__(
quota_project_id=None,
scopes=None,
):
"""Instantiates a file-sourced external account credentials object.
"""Instantiates an external account credentials object from a file/url.

Args:
audience (str): The STS audience field.
subject_token_type (str): The subject token type.
token_url (str): The STS endpoint URL.
credential_source (Mapping): The credential source dictionary used to
provide instructions on how to retrieve external credential to be
exchanged for Google access tokens..
exchanged for Google access tokens
Example credential_source's:
{
"url": "http://www.example.com",
"format": {
"type": "json",
"subject_token_field_name": "access_token",
},
"headers": {"foo": "bar"},
}
{
"file": "/path/to/token/file.txt"
}
service_account_impersonation_url (Optional[str]): The optional service account
impersonation getAccessToken URL.
client_id (Optional[str]): The optional client ID.
Expand Down Expand Up @@ -91,15 +107,25 @@ def __init__(
quota_project_id=quota_project_id,
scopes=scopes,
)
if isinstance(credential_source, dict):
if not isinstance(credential_source, Mapping):
self._credential_source_file = None
self._credential_source_url = None
else:
self._credential_source_file = credential_source.get("file")
self._credential_source_url = credential_source.get("url")
self._credential_source_headers = credential_source.get("headers")
credential_source_format = credential_source.get("format") or {}
credential_source_format = credential_source.get("format", {})
# Get credential_source format type. When not provided, this
# defaults to text.
self._credential_source_format_type = (
credential_source_format.get("type") or "text"
)
# environment_id is only supported in AWS or dedicated future external
# account credentials.
if "environment_id" in credential_source:
raise ValueError(
"Invalid Identity Pool credential_source field 'environment_id'"
)
if self._credential_source_format_type not in ["text", "json"]:
raise ValueError(
"Invalid credential_source format '{}'".format(
Expand All @@ -117,28 +143,60 @@ def __init__(
)
else:
self._credential_source_field_name = None
else:
self._credential_source_file = None
if not self._credential_source_file:
raise ValueError("Missing credential_source file")

if self._credential_source_file and self._credential_source_url:
raise ValueError(
"Ambiguous credential_source. 'file' is mutually exclusive with 'url'."
)
if not self._credential_source_file and not self._credential_source_url:
raise ValueError(
"Missing credential_source. A 'file' or 'url' must be provided."
)

@_helpers.copy_docstring(external_account.Credentials)
def retrieve_subject_token(self, request):
return self._get_token_file(
self._credential_source_file,
return self._parse_token_data(
self._get_token_data(request),
self._credential_source_format_type,
self._credential_source_field_name,
)

def _get_token_file(
self, filename, format_type="text", subject_token_field_name=None
):
def _get_token_data(self, request):
if self._credential_source_file:
return self._get_file_data(self._credential_source_file)
else:
return self._get_url_data(
request, self._credential_source_url, self._credential_source_headers
)

def _get_file_data(self, filename):
if not os.path.exists(filename):
raise exceptions.RefreshError("File '{}' was not found.".format(filename))

with io.open(filename, "r", encoding="utf-8") as file_obj:
content = file_obj.read()
return file_obj.read(), filename

def _get_url_data(self, request, url, headers):
response = request(url=url, method="GET", headers=headers)

# support both string and bytes type response.data
response_body = (
response.data.decode("utf-8")
if hasattr(response.data, "decode")
else response.data
)

if response.status != 200:
raise exceptions.RefreshError(
"Unable to retrieve Identity Pool subject token", response_body
)

return response_body, url

def _parse_token_data(
self, token_content, format_type="text", subject_token_field_name=None
):
content, filename = token_content
if format_type == "text":
token = content
else:
Expand Down
Loading