1
1
import base64
2
2
import hashlib
3
+ import itertools
3
4
import json
4
5
import os
5
6
import platform
17
18
from typing import Any
18
19
from typing import ContextManager
19
20
from typing import Dict
21
+ from typing import Iterable
20
22
from typing import Iterator
21
23
from typing import List
22
24
from typing import Optional
43
45
from poetry .utils ._compat import decode
44
46
from poetry .utils ._compat import encode
45
47
from poetry .utils ._compat import list_to_shell_command
48
+ from poetry .utils ._compat import metadata
46
49
from poetry .utils .helpers import is_dir_writable
47
50
from poetry .utils .helpers import paths_csv
48
51
from poetry .utils .helpers import temporary_directory
@@ -166,7 +169,12 @@ def __init__(
166
169
167
170
self ._fallbacks = fallbacks or []
168
171
self ._skip_write_checks = skip_write_checks
169
- self ._candidates = list ({self ._purelib , self ._platlib }) + self ._fallbacks
172
+
173
+ self ._candidates : List [Path ] = []
174
+ for path in itertools .chain ([self ._purelib , self ._platlib ], self ._fallbacks ):
175
+ if path not in self ._candidates :
176
+ self ._candidates .append (path )
177
+
170
178
self ._writable_candidates = None if not skip_write_checks else self ._candidates
171
179
172
180
@property
@@ -198,7 +206,9 @@ def writable_candidates(self) -> List[Path]:
198
206
199
207
return self ._writable_candidates
200
208
201
- def make_candidates (self , path : Path , writable_only : bool = False ) -> List [Path ]:
209
+ def make_candidates (
210
+ self , path : Path , writable_only : bool = False , strict : bool = False
211
+ ) -> List [Path ]:
202
212
candidates = self ._candidates if not writable_only else self .writable_candidates
203
213
if path .is_absolute ():
204
214
for candidate in candidates :
@@ -214,7 +224,91 @@ def make_candidates(self, path: Path, writable_only: bool = False) -> List[Path]
214
224
)
215
225
)
216
226
217
- return [candidate / path for candidate in candidates if candidate ]
227
+ results = [candidate / path for candidate in candidates if candidate ]
228
+
229
+ if not results and strict :
230
+ raise RuntimeError (
231
+ 'Unable to find a suitable destination for "{}" in {}' .format (
232
+ str (path ), paths_csv (self ._candidates )
233
+ )
234
+ )
235
+
236
+ return results
237
+
238
+ def distributions (
239
+ self , name : Optional [str ] = None , writable_only : bool = False
240
+ ) -> Iterable [metadata .PathDistribution ]:
241
+ path = list (
242
+ map (
243
+ str , self ._candidates if not writable_only else self .writable_candidates
244
+ )
245
+ )
246
+ for distribution in metadata .PathDistribution .discover (
247
+ name = name , path = path
248
+ ): # type: metadata.PathDistribution
249
+ yield distribution
250
+
251
+ def find_distribution (
252
+ self , name : str , writable_only : bool = False
253
+ ) -> Optional [metadata .PathDistribution ]:
254
+ for distribution in self .distributions (name = name , writable_only = writable_only ):
255
+ return distribution
256
+ else :
257
+ return None
258
+
259
+ def find_distribution_files_with_suffix (
260
+ self , distribution_name : str , suffix : str , writable_only : bool = False
261
+ ) -> Iterable [Path ]:
262
+ for distribution in self .distributions (
263
+ name = distribution_name , writable_only = writable_only
264
+ ):
265
+ for file in distribution .files :
266
+ if file .name .endswith (suffix ):
267
+ yield Path (distribution .locate_file (file ))
268
+
269
+ def find_distribution_files_with_name (
270
+ self , distribution_name : str , name : str , writable_only : bool = False
271
+ ) -> Iterable [Path ]:
272
+ for distribution in self .distributions (
273
+ name = distribution_name , writable_only = writable_only
274
+ ):
275
+ for file in distribution .files :
276
+ if file .name == name :
277
+ yield Path (distribution .locate_file (file ))
278
+
279
+ def find_distribution_nspkg_pth_files (
280
+ self , distribution_name : str , writable_only : bool = False
281
+ ) -> Iterable [Path ]:
282
+ return self .find_distribution_files_with_suffix (
283
+ distribution_name = distribution_name ,
284
+ suffix = "-nspkg.pth" ,
285
+ writable_only = writable_only ,
286
+ )
287
+
288
+ def find_distribution_direct_url_json_files (
289
+ self , distribution_name : str , writable_only : bool = False
290
+ ) -> Iterable [Path ]:
291
+ return self .find_distribution_files_with_name (
292
+ distribution_name = distribution_name ,
293
+ name = "direct_url.json" ,
294
+ writable_only = writable_only ,
295
+ )
296
+
297
+ def remove_distribution_files (self , distribution_name : str ) -> List [Path ]:
298
+ paths = []
299
+
300
+ for distribution in self .distributions (
301
+ name = distribution_name , writable_only = True
302
+ ):
303
+ for file in distribution .files :
304
+ Path (distribution .locate_file (file )).unlink (missing_ok = True )
305
+
306
+ if distribution ._path .exists ():
307
+ shutil .rmtree (str (distribution ._path ))
308
+
309
+ paths .append (distribution ._path )
310
+
311
+ return paths
218
312
219
313
def _path_method_wrapper (
220
314
self ,
@@ -228,14 +322,9 @@ def _path_method_wrapper(
228
322
if isinstance (path , str ):
229
323
path = Path (path )
230
324
231
- candidates = self .make_candidates (path , writable_only = writable_only )
232
-
233
- if not candidates :
234
- raise RuntimeError (
235
- 'Unable to find a suitable destination for "{}" in {}' .format (
236
- str (path ), paths_csv (self ._candidates )
237
- )
238
- )
325
+ candidates = self .make_candidates (
326
+ path , writable_only = writable_only , strict = True
327
+ )
239
328
240
329
results = []
241
330
@@ -244,8 +333,7 @@ def _path_method_wrapper(
244
333
result = candidate , getattr (candidate , method )(* args , ** kwargs )
245
334
if return_first :
246
335
return result
247
- else :
248
- results .append (result )
336
+ results .append (result )
249
337
except OSError :
250
338
# TODO: Replace with PermissionError
251
339
pass
@@ -267,7 +355,11 @@ def exists(self, path: Union[str, Path]) -> bool:
267
355
for value in self ._path_method_wrapper (path , "exists" , return_first = False )
268
356
)
269
357
270
- def find (self , path : Union [str , Path ], writable_only : bool = False ) -> List [Path ]:
358
+ def find (
359
+ self ,
360
+ path : Union [str , Path ],
361
+ writable_only : bool = False ,
362
+ ) -> List [Path ]:
271
363
return [
272
364
value [0 ]
273
365
for value in self ._path_method_wrapper (
0 commit comments