11
11
from typing import Any
12
12
from typing import cast
13
13
14
+ import tomli
15
+ import tomli_w
16
+
14
17
from packaging .utils import canonicalize_name
15
18
from poetry .core .constraints .version import Version
16
19
from poetry .core .constraints .version import parse_constraint
17
20
from poetry .core .packages .dependency import Dependency
18
21
from poetry .core .packages .package import Package
19
- from poetry .core .toml .file import TOMLFile
20
22
from poetry .core .version .markers import parse_marker
21
23
from poetry .core .version .requirements import InvalidRequirement
22
- from tomlkit import array
23
- from tomlkit import comment
24
- from tomlkit import document
25
- from tomlkit import inline_table
26
- from tomlkit import item
27
- from tomlkit import table
28
- from tomlkit .exceptions import TOMLKitError
29
- from tomlkit .items import Array
30
24
31
25
32
26
if TYPE_CHECKING :
33
27
from poetry .core .packages .directory_dependency import DirectoryDependency
34
28
from poetry .core .packages .file_dependency import FileDependency
35
29
from poetry .core .packages .url_dependency import URLDependency
36
30
from poetry .core .packages .vcs_dependency import VCSDependency
37
- from tomlkit .items import Table
38
- from tomlkit .toml_document import TOMLDocument
39
31
40
32
from poetry .repositories .lockfile_repository import LockfileRepository
41
33
@@ -55,17 +47,17 @@ class Locker:
55
47
_relevant_keys = [* _legacy_keys , "group" ]
56
48
57
49
def __init__ (self , lock : str | Path , local_config : dict [str , Any ]) -> None :
58
- self ._lock = TOMLFile (lock )
50
+ self ._lock = lock if isinstance ( lock , Path ) else Path (lock )
59
51
self ._local_config = local_config
60
- self ._lock_data : TOMLDocument | None = None
52
+ self ._lock_data : dict [ str , Any ] | None = None
61
53
self ._content_hash = self ._get_content_hash ()
62
54
63
55
@property
64
- def lock (self ) -> TOMLFile :
56
+ def lock (self ) -> Path :
65
57
return self ._lock
66
58
67
59
@property
68
- def lock_data (self ) -> TOMLDocument :
60
+ def lock_data (self ) -> dict [ str , Any ] :
69
61
if self ._lock_data is None :
70
62
self ._lock_data = self ._get_lock_data ()
71
63
@@ -75,7 +67,7 @@ def is_locked(self) -> bool:
75
67
"""
76
68
Checks whether the locker has been locked (lockfile found).
77
69
"""
78
- if not self ._lock .exists ():
70
+ if not self .lock .exists ():
79
71
return False
80
72
81
73
return "package" in self .lock_data
@@ -84,7 +76,8 @@ def is_fresh(self) -> bool:
84
76
"""
85
77
Checks whether the lock file is still up to date with the current hash.
86
78
"""
87
- lock = self ._lock .read ()
79
+ with self .lock .open ("rb" ) as f :
80
+ lock = tomli .load (f )
88
81
metadata = lock .get ("metadata" , {})
89
82
90
83
if "content-hash" in metadata :
@@ -116,7 +109,7 @@ def locked_repository(self) -> LockfileRepository:
116
109
source_type = source .get ("type" )
117
110
url = source .get ("url" )
118
111
if source_type in ["directory" , "file" ]:
119
- url = self ._lock . path .parent .joinpath (url ).resolve ().as_posix ()
112
+ url = self .lock .parent .joinpath (url ).resolve ().as_posix ()
120
113
121
114
name = info ["name" ]
122
115
package = Package (
@@ -201,7 +194,7 @@ def locked_repository(self) -> LockfileRepository:
201
194
package .marker = parse_marker (split_dep [1 ].strip ())
202
195
203
196
for dep_name , constraint in info .get ("dependencies" , {}).items ():
204
- root_dir = self ._lock . path .parent
197
+ root_dir = self .lock .parent
205
198
if package .source_type == "directory" :
206
199
# root dir should be the source of the package relative to the lock
207
200
# path
@@ -228,27 +221,21 @@ def locked_repository(self) -> LockfileRepository:
228
221
return repository
229
222
230
223
def set_lock_data (self , root : Package , packages : list [Package ]) -> bool :
231
- files : dict [str , Any ] = table ()
224
+ files : dict [str , Any ] = {}
232
225
package_specs = self ._lock_packages (packages )
233
226
# Retrieving hashes
234
227
for package in package_specs :
235
228
if package ["name" ] not in files :
236
229
files [package ["name" ]] = []
237
230
238
231
for f in package ["files" ]:
239
- file_metadata = inline_table ()
232
+ file_metadata = {}
240
233
for k , v in sorted (f .items ()):
241
234
file_metadata [k ] = v
242
235
243
236
files [package ["name" ]].append (file_metadata )
244
237
245
- if files [package ["name" ]]:
246
- package_files = item (files [package ["name" ]])
247
- assert isinstance (package_files , Array )
248
- files [package ["name" ]] = package_files .multiline (True )
249
-
250
- lock = document ()
251
- lock .add (comment (GENERATED_COMMENT ))
238
+ lock : dict [str , Any ] = {}
252
239
lock ["package" ] = package_specs
253
240
254
241
if root .extras :
@@ -270,12 +257,10 @@ def set_lock_data(self, root: Package, packages: list[Package]) -> bool:
270
257
271
258
return False
272
259
273
- def _write_lock_data (self , data : TOMLDocument ) -> None :
274
- self .lock .write (data )
275
-
276
- # Checking lock file data consistency
277
- if data != self .lock .read ():
278
- raise RuntimeError ("Inconsistent lock file data." )
260
+ def _write_lock_data (self , data : dict [str , Any ]) -> None :
261
+ with self .lock .open ("wb" ) as f :
262
+ f .write (f"# { GENERATED_COMMENT } \n \n " .encode ())
263
+ tomli_w .dump (data , f )
279
264
280
265
self ._lock_data = None
281
266
@@ -296,16 +281,17 @@ def _get_content_hash(self) -> str:
296
281
297
282
return sha256 (json .dumps (relevant_content , sort_keys = True ).encode ()).hexdigest ()
298
283
299
- def _get_lock_data (self ) -> TOMLDocument :
300
- if not self ._lock .exists ():
284
+ def _get_lock_data (self ) -> dict [ str , Any ] :
285
+ if not self .lock .exists ():
301
286
raise RuntimeError ("No lockfile found. Unable to read locked packages" )
302
287
303
- try :
304
- lock_data : TOMLDocument = self ._lock .read ()
305
- except TOMLKitError as e :
306
- raise RuntimeError (f"Unable to read the lock file ({ e } )." )
288
+ with self .lock .open ("rb" ) as f :
289
+ try :
290
+ lock_data = tomli .load (f )
291
+ except tomli .TOMLDecodeError as e :
292
+ raise RuntimeError (f"Unable to read the lock file ({ e } )." )
307
293
308
- metadata = cast ( "Table" , lock_data ["metadata" ])
294
+ metadata = lock_data ["metadata" ]
309
295
lock_version = Version .parse (metadata .get ("lock-version" , "1.0" ))
310
296
current_version = Version .parse (self ._VERSION )
311
297
accepted_versions = parse_constraint (self ._READ_VERSION_RANGE )
@@ -356,7 +342,7 @@ def _dump_package(self, package: Package) -> dict[str, Any]:
356
342
if dependency .pretty_name not in dependencies :
357
343
dependencies [dependency .pretty_name ] = []
358
344
359
- constraint = inline_table ()
345
+ constraint : dict [ str , Any ] = {}
360
346
361
347
if dependency .is_directory ():
362
348
dependency = cast ("DirectoryDependency" , dependency )
@@ -422,14 +408,10 @@ def _dump_package(self, package: Package) -> dict[str, Any]:
422
408
}
423
409
424
410
if dependencies :
425
- data ["dependencies" ] = table ()
426
- for k , constraints in dependencies .items ():
427
- if len (constraints ) == 1 :
428
- data ["dependencies" ][k ] = constraints [0 ]
429
- else :
430
- data ["dependencies" ][k ] = array ().multiline (True )
431
- for constraint in constraints :
432
- data ["dependencies" ][k ].append (constraint )
411
+ data ["dependencies" ] = {
412
+ name : constraints [0 ] if len (constraints ) == 1 else constraints
413
+ for name , constraints in dependencies .items ()
414
+ }
433
415
434
416
if package .extras :
435
417
extras = {}
@@ -445,7 +427,7 @@ def _dump_package(self, package: Package) -> dict[str, Any]:
445
427
url = Path (
446
428
os .path .relpath (
447
429
Path (url ).resolve (),
448
- Path (self ._lock . path .parent ).resolve (),
430
+ Path (self .lock .parent ).resolve (),
449
431
)
450
432
).as_posix ()
451
433
0 commit comments