6
6
7
7
from typing import TYPE_CHECKING
8
8
from typing import Any
9
+ from typing import Iterator
9
10
10
11
import requests
11
12
import requests .auth
12
13
import requests .exceptions
13
14
14
15
from poetry .exceptions import PoetryException
16
+ from poetry .utils .helpers import get_cert
17
+ from poetry .utils .helpers import get_client_cert
15
18
from poetry .utils .password_manager import PasswordManager
16
19
17
20
18
21
if TYPE_CHECKING :
22
+ from pathlib import Path
23
+
19
24
from cleo .io .io import IO
20
25
21
26
from poetry .config .config import Config
@@ -30,6 +35,7 @@ def __init__(self, config: Config, io: IO | None = None) -> None:
30
35
self ._io = io
31
36
self ._session = None
32
37
self ._credentials = {}
38
+ self ._certs = {}
33
39
self ._password_manager = PasswordManager (self ._config )
34
40
35
41
def _log (self , message : str , level : str = "debug" ) -> None :
@@ -61,8 +67,16 @@ def request(self, method: str, url: str, **kwargs: Any) -> requests.Response:
61
67
62
68
proxies = kwargs .get ("proxies" , {})
63
69
stream = kwargs .get ("stream" )
64
- verify = kwargs .get ("verify" )
65
- cert = kwargs .get ("cert" )
70
+
71
+ certs = self .get_certs_for_url (url )
72
+ verify = kwargs .get ("verify" ) or certs .get ("verify" )
73
+ cert = kwargs .get ("cert" ) or certs .get ("cert" )
74
+
75
+ if cert is not None :
76
+ cert = str (cert )
77
+
78
+ if verify is not None :
79
+ verify = str (verify )
66
80
67
81
settings = session .merge_environment_settings (
68
82
prepared_request .url , proxies , stream , verify , cert
@@ -157,7 +171,7 @@ def _get_http_auth(self, name: str, netloc: str | None) -> dict[str, str] | None
157
171
return auth
158
172
159
173
def _get_credentials_for_netloc (self , netloc : str ) -> tuple [str | None , str | None ]:
160
- for repository_name in self ._config . get ( "repositories" , [] ):
174
+ for repository_name , _ in self ._get_repository_netlocs ( ):
161
175
auth = self ._get_http_auth (repository_name , netloc )
162
176
163
177
if auth is None :
@@ -167,6 +181,22 @@ def _get_credentials_for_netloc(self, netloc: str) -> tuple[str | None, str | No
167
181
168
182
return None , None
169
183
184
+ def get_certs_for_url (self , url : str ) -> dict [str , Path | None ]:
185
+ parsed_url = urllib .parse .urlsplit (url )
186
+
187
+ netloc = parsed_url .netloc
188
+
189
+ return self ._certs .setdefault (
190
+ netloc ,
191
+ self ._get_certs_for_netloc_from_config (netloc ),
192
+ )
193
+
194
+ def _get_repository_netlocs (self ) -> Iterator [tuple [str , str ]]:
195
+ for repository_name in self ._config .get ("repositories" , []):
196
+ url = self ._config .get (f"repositories.{ repository_name } .url" )
197
+ parsed_url = urllib .parse .urlsplit (url )
198
+ yield repository_name , parsed_url .netloc
199
+
170
200
def _get_credentials_for_netloc_from_keyring (
171
201
self , url : str , netloc : str , username : str | None
172
202
) -> dict [str , str ] | None :
@@ -193,3 +223,14 @@ def _get_credentials_for_netloc_from_keyring(
193
223
}
194
224
195
225
return None
226
+
227
+ def _get_certs_for_netloc_from_config (self , netloc : str ) -> dict [str , Path | None ]:
228
+ certs = {"cert" : None , "verify" : None }
229
+
230
+ for repository_name , repository_netloc in self ._get_repository_netlocs ():
231
+ if netloc == repository_netloc :
232
+ certs ["cert" ] = get_client_cert (self ._config , repository_name )
233
+ certs ["verify" ] = get_cert (self ._config , repository_name )
234
+ break
235
+
236
+ return certs
0 commit comments