diff --git a/google/auth/transport/_mtls_helper.py b/google/auth/transport/_mtls_helper.py index 68568dd60..5ad105a52 100644 --- a/google/auth/transport/_mtls_helper.py +++ b/google/auth/transport/_mtls_helper.py @@ -16,7 +16,7 @@ import json import logging -from os import environ, path +from os import environ, getenv, path import re import subprocess @@ -405,3 +405,47 @@ def client_cert_callback(): # Then dump the decrypted key bytes return crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey) + + +def check_use_client_cert(): + """Returns the value of the GOOGLE_API_USE_CLIENT_CERTIFICATE variable, + or an inferred value('true' or 'false') if unset. + + This value is meant to be interpreted as a "true" or "false" value + representing whether the client certificate should be used, but could be any + arbitrary string. + + If GOOGLE_API_USE_CLIENT_CERTIFICATE is unset, the value value will be + inferred by reading a file pointed at by GOOGLE_API_CERTIFICATE_CONFIG, and + verifying it contains a "workload" section. If so, the function will return + "true", otherwise "false". + + Returns: + str: The value of GOOGLE_API_USE_CLIENT_CERTIFICATE, or an inferred value + ("true" or "false") if unset. This string should contain a value, but may + be an any arbitrary string read from the user's set + GOOGLE_API_USE_CLIENT_CERTIFICATE. + """ + use_client_cert = getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE") + # Check if the value of GOOGLE_API_USE_CLIENT_CERTIFICATE is set. + if use_client_cert: + return use_client_cert.lower() + else: + # Check if the value of GOOGLE_API_CERTIFICATE_CONFIG is set. + cert_path = getenv("GOOGLE_API_CERTIFICATE_CONFIG") + if cert_path: + try: + with open(cert_path, "r") as f: + content = json.load(f) + # verify json has workload key + content["cert_configs"]["workload"] + return "true" + except ( + FileNotFoundError, + OSError, + KeyError, + TypeError, + json.JSONDecodeError, + ) as e: + _LOGGER.debug("error decoding certificate: %s", e) + return "false" diff --git a/google/auth/transport/grpc.py b/google/auth/transport/grpc.py index 1ebe13795..747c1dcb2 100644 --- a/google/auth/transport/grpc.py +++ b/google/auth/transport/grpc.py @@ -17,9 +17,7 @@ from __future__ import absolute_import import logging -import os -from google.auth import environment_vars from google.auth import exceptions from google.auth.transport import _mtls_helper from google.oauth2 import service_account @@ -256,9 +254,7 @@ def my_client_cert_callback(): # If SSL credentials are not explicitly set, try client_cert_callback and ADC. if not ssl_credentials: - use_client_cert = os.getenv( - environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE, "false" - ) + use_client_cert = _mtls_helper.check_use_client_cert() if use_client_cert == "true" and client_cert_callback: # Use the callback if provided. cert, key = client_cert_callback() @@ -295,9 +291,7 @@ class SslCredentials: """ def __init__(self): - use_client_cert = os.getenv( - environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE, "false" - ) + use_client_cert = _mtls_helper.check_use_client_cert() if use_client_cert != "true": self._is_mtls = False else: diff --git a/google/auth/transport/requests.py b/google/auth/transport/requests.py index 2753912c6..9e1f15751 100644 --- a/google/auth/transport/requests.py +++ b/google/auth/transport/requests.py @@ -19,7 +19,6 @@ import functools import logging import numbers -import os import time try: @@ -35,7 +34,6 @@ ) # pylint: disable=ungrouped-imports from google.auth import _helpers -from google.auth import environment_vars from google.auth import exceptions from google.auth import transport import google.auth.transport._mtls_helper @@ -444,13 +442,10 @@ def configure_mtls_channel(self, client_cert_callback=None): google.auth.exceptions.MutualTLSChannelError: If mutual TLS channel creation failed for any reason. """ - use_client_cert = os.getenv( - environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE, "false" - ) + use_client_cert = google.auth.transport._mtls_helper.check_use_client_cert() if use_client_cert != "true": self._is_mtls = False return - try: import OpenSSL except ImportError as caught_exc: diff --git a/google/auth/transport/urllib3.py b/google/auth/transport/urllib3.py index 03ed75aa2..01be1dd05 100644 --- a/google/auth/transport/urllib3.py +++ b/google/auth/transport/urllib3.py @@ -17,7 +17,6 @@ from __future__ import absolute_import import logging -import os import warnings # Certifi is Mozilla's certificate bundle. Urllib3 needs a certificate bundle @@ -51,7 +50,6 @@ from google.auth import _helpers -from google.auth import environment_vars from google.auth import exceptions from google.auth import transport from google.oauth2 import service_account @@ -335,12 +333,9 @@ def configure_mtls_channel(self, client_cert_callback=None): google.auth.exceptions.MutualTLSChannelError: If mutual TLS channel creation failed for any reason. """ - use_client_cert = os.getenv( - environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE, "false" - ) + use_client_cert = transport._mtls_helper.check_use_client_cert() if use_client_cert != "true": return False - try: import OpenSSL except ImportError as caught_exc: diff --git a/tests/transport/test__mtls_helper.py b/tests/transport/test__mtls_helper.py index f6e20b726..c4959c1bc 100644 --- a/tests/transport/test__mtls_helper.py +++ b/tests/transport/test__mtls_helper.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json import os import re @@ -638,3 +639,74 @@ def test_crypto_error(self): _mtls_helper.decrypt_private_key( ENCRYPTED_EC_PRIVATE_KEY, b"wrong_password" ) + + def test_check_use_client_cert(self, monkeypatch): + monkeypatch.setenv("GOOGLE_API_USE_CLIENT_CERTIFICATE", "true") + use_client_cert = _mtls_helper.check_use_client_cert() + assert use_client_cert == "true" + + def test_check_use_client_cert_for_workload_with_config_file(self, monkeypatch): + config_data = { + "version": 1, + "cert_configs": { + "workload": { + "cert_path": "path/to/cert/file", + "key_path": "path/to/key/file", + } + }, + } + config_filename = "mock_certificate_config.json" + config_file_content = json.dumps(config_data) + monkeypatch.setenv("GOOGLE_API_CERTIFICATE_CONFIG", config_filename) + monkeypatch.setenv("GOOGLE_API_USE_CLIENT_CERTIFICATE", "") + # Use mock_open to simulate the file in memory + mock_file_handle = mock.mock_open(read_data=config_file_content) + with mock.patch("builtins.open", mock_file_handle): + use_client_cert = _mtls_helper.check_use_client_cert() + assert use_client_cert == "true" + + def test_check_use_client_cert_false(self, monkeypatch): + monkeypatch.setenv("GOOGLE_API_USE_CLIENT_CERTIFICATE", "false") + use_client_cert = _mtls_helper.check_use_client_cert() + assert use_client_cert == "false" + + def test_check_use_client_cert_for_workload_with_config_file_not_found( + self, monkeypatch + ): + monkeypatch.setenv("GOOGLE_API_USE_CLIENT_CERTIFICATE", "") + use_client_cert = _mtls_helper.check_use_client_cert() + assert use_client_cert == "false" + + def test_check_use_client_cert_for_workload_with_config_file_not_json( + self, monkeypatch + ): + config_filename = "mock_certificate_config.json" + config_file_content = "not_valid_json" + monkeypatch.setenv("GOOGLE_API_CERTIFICATE_CONFIG", config_filename) + monkeypatch.setenv("GOOGLE_API_USE_CLIENT_CERTIFICATE", "") + # Use mock_open to simulate the file in memory + mock_file_handle = mock.mock_open(read_data=config_file_content) + with mock.patch("builtins.open", mock_file_handle): + use_client_cert = _mtls_helper.check_use_client_cert() + assert use_client_cert == "false" + + def test_check_use_client_cert_for_workload_with_config_file_no_workload( + self, monkeypatch + ): + config_data = {"version": 1, "cert_configs": {"dummy_key": {}}} + config_filename = "mock_certificate_config.json" + config_file_content = json.dumps(config_data) + monkeypatch.setenv("GOOGLE_API_CERTIFICATE_CONFIG", config_filename) + monkeypatch.setenv("GOOGLE_API_USE_CLIENT_CERTIFICATE", "") + # Use mock_open to simulate the file in memory + mock_file_handle = mock.mock_open(read_data=config_file_content) + with mock.patch("builtins.open", mock_file_handle): + use_client_cert = _mtls_helper.check_use_client_cert() + assert use_client_cert == "false" + + def test_check_use_client_cert_when_file_does_not_exist(self, monkeypatch): + config_filename = "mock_certificate_config.json" + monkeypatch.setenv("GOOGLE_API_CERTIFICATE_CONFIG", config_filename) + monkeypatch.setenv("GOOGLE_API_USE_CLIENT_CERTIFICATE", "") + use_client_cert = _mtls_helper.check_use_client_cert() + assert use_client_cert == "false"