From 15e91103573e67eae7f913608d6dd37ac0de02a7 Mon Sep 17 00:00:00 2001 From: Tucker Beck Date: Tue, 14 Dec 2021 15:00:09 -0800 Subject: [PATCH 1/3] Add cert retrieval for requests The authenticator.py code already retrieves credentials from the config for every request based on url matching. This change makes the authenticator also retrieve certs from the config for each request based on url matching. Also includes unit tests. - Fixed style issues with black - Fixed broken unit test - Fixed broken unit test for windows - Fixing path issue in unit tests for windows - Fixed cert handling for pathlib paths - Style change to make the linter happy - Rebased and fixed type hint - Fixed a style issue - Applied updates from code review - more revisions based on code review feedback - Some revisions based on code review - Applied code review suggestions and fixed unit tests - Fixed broken code from rebase - Applied fix for linting issue and improved test coverage - Fixed issues with linting and type-checking --- src/poetry/utils/authenticator.py | 61 ++++++++++++++++++++++++++++--- tests/utils/test_authenticator.py | 48 ++++++++++++++++++++++++ 2 files changed, 104 insertions(+), 5 deletions(-) diff --git a/src/poetry/utils/authenticator.py b/src/poetry/utils/authenticator.py index a7d0dda3d89..42224bcd23e 100644 --- a/src/poetry/utils/authenticator.py +++ b/src/poetry/utils/authenticator.py @@ -6,16 +6,24 @@ from typing import TYPE_CHECKING from typing import Any +from typing import Dict +from typing import Generator +from typing import Optional +from typing import Tuple import requests import requests.auth import requests.exceptions from poetry.exceptions import PoetryException +from poetry.utils.helpers import get_cert +from poetry.utils.helpers import get_client_cert from poetry.utils.password_manager import PasswordManager if TYPE_CHECKING: + from pathlib import Path + from cleo.io.io import IO from poetry.config.config import Config @@ -30,6 +38,7 @@ def __init__(self, config: Config, io: IO | None = None) -> None: self._io = io self._session = None self._credentials = {} + self._certs = {} self._password_manager = PasswordManager(self._config) def _log(self, message: str, level: str = "debug") -> None: @@ -61,8 +70,16 @@ def request(self, method: str, url: str, **kwargs: Any) -> requests.Response: proxies = kwargs.get("proxies", {}) stream = kwargs.get("stream") - verify = kwargs.get("verify") - cert = kwargs.get("cert") + + certs = self.get_certs_for_url(url) + verify = kwargs.get("verify") or certs.get("verify") + cert = kwargs.get("cert") or certs.get("cert") + + if cert is not None: + cert = str(cert) + + if verify is not None: + verify = str(verify) settings = session.merge_environment_settings( prepared_request.url, proxies, stream, verify, cert @@ -156,8 +173,12 @@ def _get_http_auth(self, name: str, netloc: str | None) -> dict[str, str] | None return auth - def _get_credentials_for_netloc(self, netloc: str) -> tuple[str | None, str | None]: - for repository_name in self._config.get("repositories", []): + def _get_credentials_for_netloc( + self, netloc: str + ) -> Tuple[Optional[str], Optional[str]]: + credentials = (None, None) + + for (repository_name, _) in self._get_repository_netlocs(): auth = self._get_http_auth(repository_name, netloc) if auth is None: @@ -165,7 +186,26 @@ def _get_credentials_for_netloc(self, netloc: str) -> tuple[str | None, str | No return auth["username"], auth["password"] - return None, None + return credentials + + def get_certs_for_url(self, url: str) -> Dict[str, "Path"]: + parsed_url = urllib.parse.urlsplit(url) + + netloc = parsed_url.netloc + + return self._certs.setdefault( + netloc, + self._get_certs_for_netloc_from_config(netloc), + ) + + def _get_repository_netlocs(self) -> Generator[Tuple[str, str], None, None]: + for repository_name in self._config.get("repositories", []): + url = self._config.get(f"repositories.{repository_name}.url") + parsed_url = urllib.parse.urlsplit(url) + yield ( + repository_name, + parsed_url.netloc, + ) def _get_credentials_for_netloc_from_keyring( self, url: str, netloc: str, username: str | None @@ -193,3 +233,14 @@ def _get_credentials_for_netloc_from_keyring( } return None + + def _get_certs_for_netloc_from_config(self, netloc: str) -> Dict[str, "Path"]: + certs = {"cert": None, "verify": None} + + for (repository_name, repository_netloc) in self._get_repository_netlocs(): + if netloc == repository_netloc: + certs["cert"] = get_client_cert(self._config, repository_name) + certs["verify"] = get_cert(self._config, repository_name) + break + + return certs diff --git a/tests/utils/test_authenticator.py b/tests/utils/test_authenticator.py index 25b312211b7..f728fc1dbd6 100644 --- a/tests/utils/test_authenticator.py +++ b/tests/utils/test_authenticator.py @@ -6,6 +6,11 @@ from dataclasses import dataclass from typing import TYPE_CHECKING from typing import Any +from typing import Dict +from typing import List +from typing import Type +from typing import Union +from pathlib import Path import httpretty import pytest @@ -306,3 +311,46 @@ def test_authenticator_uses_env_provided_credentials( request = http.last_request() assert request.headers["Authorization"] == "Basic YmFyOmJheg==" + + +@pytest.mark.parametrize( + "cert,client_cert", + [ + (None, None), + (None, "path/to/provided/client-cert"), + ("/path/to/provided/cert", None), + ("/path/to/provided/cert", "path/to/provided/client-cert"), + ], +) +def test_authenticator_uses_certs_from_config_if_not_provided( + config: "Config", + mock_remote: Type[httpretty.httpretty], + http: Type[httpretty.httpretty], + mocker: "MockerFixture", + cert: Union[str, None], + client_cert: Union[str, None], +): + configured_cert = "/path/to/cert" + configured_client_cert = "/path/to/client-cert" + config.merge( + { + "repositories": {"foo": {"url": "https://foo.bar/simple/"}}, + "http-basic": {"foo": {"username": "bar", "password": "baz"}}, + "certificates": { + "foo": {"cert": configured_cert, "client-cert": configured_client_cert} + }, + } + ) + + authenticator = Authenticator(config, NullIO()) + session_send = mocker.patch.object(authenticator.session, "send") + authenticator.request( + "get", + "https://foo.bar/files/foo-0.1.0.tar.gz", + verify=cert, + cert=client_cert, + ) + kwargs = session_send.call_args[1] + + assert Path(kwargs["verify"]) == Path(cert or configured_cert) + assert Path(kwargs["cert"]) == Path(client_cert or configured_client_cert) From d7b0d218c9ed540b916da633ef9cb2f391b2b650 Mon Sep 17 00:00:00 2001 From: Kyle Finley Date: Tue, 15 Mar 2022 15:36:41 -0400 Subject: [PATCH 2/3] rebased branch by @dusktreader to current master --- src/poetry/utils/authenticator.py | 22 ++++++---------------- tests/utils/test_authenticator.py | 18 +++++++----------- 2 files changed, 13 insertions(+), 27 deletions(-) diff --git a/src/poetry/utils/authenticator.py b/src/poetry/utils/authenticator.py index 42224bcd23e..802870db971 100644 --- a/src/poetry/utils/authenticator.py +++ b/src/poetry/utils/authenticator.py @@ -6,10 +6,7 @@ from typing import TYPE_CHECKING from typing import Any -from typing import Dict from typing import Generator -from typing import Optional -from typing import Tuple import requests import requests.auth @@ -173,11 +170,7 @@ def _get_http_auth(self, name: str, netloc: str | None) -> dict[str, str] | None return auth - def _get_credentials_for_netloc( - self, netloc: str - ) -> Tuple[Optional[str], Optional[str]]: - credentials = (None, None) - + def _get_credentials_for_netloc(self, netloc: str) -> tuple[str | None, str | None]: for (repository_name, _) in self._get_repository_netlocs(): auth = self._get_http_auth(repository_name, netloc) @@ -186,9 +179,9 @@ def _get_credentials_for_netloc( return auth["username"], auth["password"] - return credentials + return None, None - def get_certs_for_url(self, url: str) -> Dict[str, "Path"]: + def get_certs_for_url(self, url: str) -> dict[str, Path]: parsed_url = urllib.parse.urlsplit(url) netloc = parsed_url.netloc @@ -198,14 +191,11 @@ def get_certs_for_url(self, url: str) -> Dict[str, "Path"]: self._get_certs_for_netloc_from_config(netloc), ) - def _get_repository_netlocs(self) -> Generator[Tuple[str, str], None, None]: + def _get_repository_netlocs(self) -> Generator[tuple[str, str], None, None]: for repository_name in self._config.get("repositories", []): url = self._config.get(f"repositories.{repository_name}.url") parsed_url = urllib.parse.urlsplit(url) - yield ( - repository_name, - parsed_url.netloc, - ) + yield repository_name, parsed_url.netloc def _get_credentials_for_netloc_from_keyring( self, url: str, netloc: str, username: str | None @@ -234,7 +224,7 @@ def _get_credentials_for_netloc_from_keyring( return None - def _get_certs_for_netloc_from_config(self, netloc: str) -> Dict[str, "Path"]: + def _get_certs_for_netloc_from_config(self, netloc: str) -> dict[str, Path]: certs = {"cert": None, "verify": None} for (repository_name, repository_netloc) in self._get_repository_netlocs(): diff --git a/tests/utils/test_authenticator.py b/tests/utils/test_authenticator.py index f728fc1dbd6..4c8f77faedf 100644 --- a/tests/utils/test_authenticator.py +++ b/tests/utils/test_authenticator.py @@ -4,13 +4,9 @@ import uuid from dataclasses import dataclass +from pathlib import Path from typing import TYPE_CHECKING from typing import Any -from typing import Dict -from typing import List -from typing import Type -from typing import Union -from pathlib import Path import httpretty import pytest @@ -323,12 +319,12 @@ def test_authenticator_uses_env_provided_credentials( ], ) def test_authenticator_uses_certs_from_config_if_not_provided( - config: "Config", - mock_remote: Type[httpretty.httpretty], - http: Type[httpretty.httpretty], - mocker: "MockerFixture", - cert: Union[str, None], - client_cert: Union[str, None], + config: Config, + mock_remote: type[httpretty.httpretty], + http: type[httpretty.httpretty], + mocker: MockerFixture, + cert: str | None, + client_cert: str | None, ): configured_cert = "/path/to/cert" configured_client_cert = "/path/to/client-cert" From 1db74a28acbc0adafa82b0a0859ca94ff82dbf6a Mon Sep 17 00:00:00 2001 From: Kyle Finley Date: Thu, 14 Apr 2022 09:01:25 -0400 Subject: [PATCH 3/3] make requested changes to type hints --- src/poetry/utils/authenticator.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/poetry/utils/authenticator.py b/src/poetry/utils/authenticator.py index 802870db971..6c7758a1503 100644 --- a/src/poetry/utils/authenticator.py +++ b/src/poetry/utils/authenticator.py @@ -6,7 +6,7 @@ from typing import TYPE_CHECKING from typing import Any -from typing import Generator +from typing import Iterator import requests import requests.auth @@ -171,7 +171,7 @@ def _get_http_auth(self, name: str, netloc: str | None) -> dict[str, str] | None return auth def _get_credentials_for_netloc(self, netloc: str) -> tuple[str | None, str | None]: - for (repository_name, _) in self._get_repository_netlocs(): + for repository_name, _ in self._get_repository_netlocs(): auth = self._get_http_auth(repository_name, netloc) if auth is None: @@ -181,7 +181,7 @@ def _get_credentials_for_netloc(self, netloc: str) -> tuple[str | None, str | No return None, None - def get_certs_for_url(self, url: str) -> dict[str, Path]: + def get_certs_for_url(self, url: str) -> dict[str, Path | None]: parsed_url = urllib.parse.urlsplit(url) netloc = parsed_url.netloc @@ -191,7 +191,7 @@ def get_certs_for_url(self, url: str) -> dict[str, Path]: self._get_certs_for_netloc_from_config(netloc), ) - def _get_repository_netlocs(self) -> Generator[tuple[str, str], None, None]: + def _get_repository_netlocs(self) -> Iterator[tuple[str, str]]: for repository_name in self._config.get("repositories", []): url = self._config.get(f"repositories.{repository_name}.url") parsed_url = urllib.parse.urlsplit(url) @@ -224,10 +224,10 @@ def _get_credentials_for_netloc_from_keyring( return None - def _get_certs_for_netloc_from_config(self, netloc: str) -> dict[str, Path]: + def _get_certs_for_netloc_from_config(self, netloc: str) -> dict[str, Path | None]: certs = {"cert": None, "verify": None} - for (repository_name, repository_netloc) in self._get_repository_netlocs(): + for repository_name, repository_netloc in self._get_repository_netlocs(): if netloc == repository_netloc: certs["cert"] = get_client_cert(self._config, repository_name) certs["verify"] = get_cert(self._config, repository_name)