1
1
from __future__ import annotations
2
2
3
+ import enum
4
+
5
+ from collections import OrderedDict
6
+ from enum import IntEnum
3
7
from typing import TYPE_CHECKING
4
8
5
9
from poetry .repositories .exceptions import PackageNotFound
13
17
from poetry .repositories .repository import Repository
14
18
15
19
20
+ class Priority (IntEnum ):
21
+ # The order of the members below dictates the actual priority. The first member has
22
+ # top priority.
23
+ DEFAULT = enum .auto ()
24
+ PRIMARY = enum .auto ()
25
+ SECONDARY = enum .auto ()
26
+
27
+
16
28
class Pool :
17
29
def __init__ (
18
30
self ,
19
31
repositories : list [Repository ] | None = None ,
20
32
ignore_repository_names : bool = False ,
21
33
) -> None :
22
34
self ._name = "poetry-pool"
35
+ self ._repositories : OrderedDict [
36
+ str , tuple [Repository , RepositoryPriority ]
37
+ ] = OrderedDict ()
38
+ self ._ignore_repository_names = ignore_repository_names
23
39
24
40
if repositories is None :
25
41
repositories = []
26
-
27
- self ._lookup : dict [str , int ] = {}
28
- self ._repositories : list [Repository ] = []
29
- self ._default = False
30
- self ._has_primary_repositories = False
31
- self ._secondary_start_idx : int | None = None
32
-
33
42
for repository in repositories :
34
43
self .add_repository (repository )
35
44
36
- self ._ignore_repository_names = ignore_repository_names
37
-
38
45
@property
39
46
def name (self ) -> str :
40
47
return self ._name
41
48
42
49
@property
43
50
def repositories (self ) -> list [Repository ]:
44
- return self ._repositories
51
+ unsorted_repositories = self ._repositories .values ()
52
+ sorted_repositories = sorted (unsorted_repositories , key = lambda p : p [1 ].value )
53
+ return [repo for (repo , _ ) in sorted_repositories ]
45
54
46
55
def has_default (self ) -> bool :
47
- return self ._default
56
+ return self ._contains_priority ( Priority . DEFAULT )
48
57
49
58
def has_primary_repositories (self ) -> bool :
50
- return self ._has_primary_repositories
59
+ return self ._contains_priority (Priority .PRIMARY )
60
+
61
+ def _contains_priority (self , priority : Priority ) -> bool :
62
+ return any (
63
+ repo_prio is priority for _ , repo_prio in self ._repositories .values ()
64
+ )
51
65
52
66
def has_repository (self , name : str ) -> bool :
53
- return name .lower () in self ._lookup
67
+ return name .lower () in self ._repositories
54
68
55
69
def repository (self , name : str ) -> Repository :
56
70
name = name .lower ()
57
-
58
- lookup = self ._lookup .get (name )
59
- if lookup is not None :
60
- return self ._repositories [lookup ]
61
-
62
- raise ValueError (f'Repository "{ name } " does not exist.' )
71
+ if self .has_repository (name ):
72
+ return self ._repositories [name ][0 ]
73
+ raise IndexError (f'Repository "{ name } " does not exist.' )
63
74
64
75
def add_repository (
65
76
self , repository : Repository , default : bool = False , secondary : bool = False
@@ -68,130 +79,59 @@ def add_repository(
68
79
Adds a repository to the pool.
69
80
"""
70
81
repository_name = repository .name .lower ()
71
- if repository_name in self ._lookup :
72
- raise ValueError (f"{ repository_name } already added" )
73
-
74
- if default :
75
- if self .has_default ():
76
- raise ValueError ("Only one repository can be the default" )
77
-
78
- self ._default = True
79
- self ._repositories .insert (0 , repository )
80
- for name in self ._lookup :
81
- self ._lookup [name ] += 1
82
+ if self .has_repository (repository_name ):
83
+ raise ValueError (
84
+ f"A repository with name { repository_name } was already added."
85
+ )
82
86
83
- if self ._secondary_start_idx is not None :
84
- self . _secondary_start_idx += 1
87
+ if default and self .has_default () :
88
+ raise ValueError ( "Only one repository can be the default." )
85
89
86
- self ._lookup [repository_name ] = 0
90
+ priority = Priority .PRIMARY
91
+ if default :
92
+ priority = Priority .DEFAULT
87
93
elif secondary :
88
- if self ._secondary_start_idx is None :
89
- self ._secondary_start_idx = len (self ._repositories )
90
-
91
- self ._repositories .append (repository )
92
- self ._lookup [repository_name ] = len (self ._repositories ) - 1
93
- else :
94
- self ._has_primary_repositories = True
95
- if self ._secondary_start_idx is None :
96
- self ._repositories .append (repository )
97
- self ._lookup [repository_name ] = len (self ._repositories ) - 1
98
- else :
99
- self ._repositories .insert (self ._secondary_start_idx , repository )
100
-
101
- for name , idx in self ._lookup .items ():
102
- if idx < self ._secondary_start_idx :
103
- continue
104
-
105
- self ._lookup [name ] += 1
106
-
107
- self ._lookup [repository_name ] = self ._secondary_start_idx
108
- self ._secondary_start_idx += 1
109
-
94
+ priority = Priority .SECONDARY
95
+ self ._repositories [repository_name ] = (repository , priority )
110
96
return self
111
97
112
- def remove_repository (self , repository_name : str ) -> Pool :
113
- if repository_name is not None :
114
- repository_name = repository_name .lower ()
115
-
116
- idx = self ._lookup .get (repository_name )
117
- if idx is not None :
118
- del self ._repositories [idx ]
119
- del self ._lookup [repository_name ]
120
-
121
- if idx == 0 :
122
- self ._default = False
123
-
124
- for name in self ._lookup :
125
- if self ._lookup [name ] > idx :
126
- self ._lookup [name ] -= 1
127
-
128
- if (
129
- self ._secondary_start_idx is not None
130
- and self ._secondary_start_idx > idx
131
- ):
132
- self ._secondary_start_idx -= 1
133
-
98
+ def remove_repository (self , name : str ) -> Pool :
99
+ if not self .has_repository (name ):
100
+ raise IndexError (f"Pool can not remove unknown repository '{ name } '." )
101
+ del self ._repositories [name .lower ()]
134
102
return self
135
103
136
104
def package (
137
105
self ,
138
106
name : str ,
139
107
version : Version ,
140
108
extras : list [str ] | None = None ,
141
- repository : str | None = None ,
109
+ repository_name : str | None = None ,
142
110
) -> Package :
143
- if repository is not None :
144
- repository = repository .lower ()
145
-
146
- if (
147
- repository is not None
148
- and repository not in self ._lookup
149
- and not self ._ignore_repository_names
150
- ):
151
- raise ValueError (f'Repository "{ repository } " does not exist.' )
152
-
153
- if repository is not None and not self ._ignore_repository_names :
154
- return self .repository (repository ).package (name , version , extras = extras )
111
+ if repository_name and not self ._ignore_repository_names :
112
+ return self .repository (repository_name ).package (
113
+ name , version , extras = extras
114
+ )
155
115
156
- for repo in self ._repositories :
116
+ for repo in self .repositories :
157
117
try :
158
- package = repo .package (name , version , extras = extras )
118
+ return repo .package (name , version , extras = extras )
159
119
except PackageNotFound :
160
120
continue
161
-
162
- return package
163
-
164
121
raise PackageNotFound (f"Package { name } ({ version } ) not found." )
165
122
166
123
def find_packages (self , dependency : Dependency ) -> list [Package ]:
167
- repository = dependency .source_name
168
- if repository is not None :
169
- repository = repository .lower ()
170
-
171
- if (
172
- repository is not None
173
- and repository not in self ._lookup
174
- and not self ._ignore_repository_names
175
- ):
176
- raise ValueError (f'Repository "{ repository } " does not exist.' )
177
-
178
- if repository is not None and not self ._ignore_repository_names :
179
- return self .repository (repository ).find_packages (dependency )
180
-
181
- packages = []
182
- for repo in self ._repositories :
183
- packages += repo .find_packages (dependency )
124
+ repository_name = dependency .source_name
125
+ if repository_name and not self ._ignore_repository_names :
126
+ return self .repository (repository_name ).find_packages (dependency )
184
127
128
+ packages : list [Package ] = []
129
+ for repo in self .repositories :
130
+ packages += repo .find_packages (dependency )
185
131
return packages
186
132
187
133
def search (self , query : str ) -> list [Package ]:
188
- from poetry .repositories .legacy_repository import LegacyRepository
189
-
190
- results = []
191
- for repository in self ._repositories :
192
- if isinstance (repository , LegacyRepository ):
193
- continue
194
-
134
+ results : list [Package ] = []
135
+ for repository in self .repositories :
195
136
results += repository .search (query )
196
-
197
137
return results
0 commit comments