Skip to content

Commit

Permalink
refactor dependency specification parsing logic
Browse files Browse the repository at this point in the history
This change moves, cleans up and refactors dependency specification
parsing logic from `InitCommand` to
`poetry.utils.dependency_specification`. This is done to improve
usability and maintainability of this logic.
  • Loading branch information
abn committed May 6, 2022
1 parent 3a305d2 commit 7d85d60
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 128 deletions.
136 changes: 8 additions & 128 deletions src/poetry/console/commands/init.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
from __future__ import annotations

import os
import re
import sys
import urllib.parse

from pathlib import Path
from typing import TYPE_CHECKING
Expand All @@ -15,6 +12,7 @@

from poetry.console.commands.command import Command
from poetry.console.commands.env_command import EnvCommand
from poetry.utils.dependency_specification import parse_dependency_specification
from poetry.utils.helpers import canonicalize_name


Expand Down Expand Up @@ -402,137 +400,19 @@ def _find_best_version_for_package(
def _parse_requirements(self, requirements: list[str]) -> list[dict[str, Any]]:
from poetry.core.pyproject.exceptions import PyProjectException

from poetry.puzzle.provider import Provider

result = []

try:
cwd = self.poetry.file.parent
except (PyProjectException, RuntimeError):
cwd = Path.cwd()

for requirement in requirements:
requirement = requirement.strip()
extras = []
extras_m = re.search(r"\[([\w\d,-_ ]+)\]$", requirement)
if extras_m:
extras = [e.strip() for e in extras_m.group(1).split(",")]
requirement, _ = requirement.split("[")

url_parsed = urllib.parse.urlparse(requirement)
if url_parsed.scheme and url_parsed.netloc:
# Url
if url_parsed.scheme in ["git+https", "git+ssh"]:
from poetry.core.vcs.git import Git
from poetry.core.vcs.git import ParsedUrl

parsed = ParsedUrl.parse(requirement)
url = Git.normalize_url(requirement)

pair = {"name": parsed.name, "git": url.url}
if parsed.rev:
pair["rev"] = url.revision

if extras:
pair["extras"] = extras

source_root = (
self.env.path.joinpath("src")
if isinstance(self, EnvCommand) and self.env
else None
)
package = Provider.get_package_from_vcs(
"git", url=url.url, rev=pair.get("rev"), source_root=source_root
)
pair["name"] = package.name
result.append(pair)

continue
elif url_parsed.scheme in ["http", "https"]:
package = Provider.get_package_from_url(requirement)

pair = {"name": package.name, "url": package.source_url}
if extras:
pair["extras"] = extras

result.append(pair)
continue
elif (os.path.sep in requirement or "/" in requirement) and (
cwd.joinpath(requirement).exists()
or Path(requirement).expanduser().exists()
and Path(requirement).expanduser().is_absolute()
):
path = Path(requirement).expanduser()
is_absolute = path.is_absolute()

if not path.is_absolute():
path = cwd.joinpath(requirement)

if path.is_file():
package = Provider.get_package_from_file(path.resolve())
else:
package = Provider.get_package_from_directory(path.resolve())

result.append(
dict(
[
("name", package.name),
(
"path",
path.relative_to(cwd).as_posix()
if not is_absolute
else path.as_posix(),
),
]
+ ([("extras", extras)] if extras else [])
)
)

continue

pair = re.sub(
"^([^@=: ]+)(?:@|==|(?<![<>~!])=|:| )(.*)$", "\\1 \\2", requirement
return [
parse_dependency_specification(
requirement=requirement,
env=self.env if isinstance(self, EnvCommand) and self.env else None,
cwd=cwd,
)
pair = pair.strip()

require: dict[str, str] = {}
if " " in pair:
name, version = pair.split(" ", 2)
extras_m = re.search(r"\[([\w\d,-_]+)\]$", name)
if extras_m:
extras = [e.strip() for e in extras_m.group(1).split(",")]
name, _ = name.split("[")

require["name"] = name
if version != "latest":
require["version"] = version
else:
m = re.match(
r"^([^><=!: ]+)((?:>=|<=|>|<|!=|~=|~|\^).*)$", requirement.strip()
)
if m:
name, constraint = m.group(1), m.group(2)
extras_m = re.search(r"\[([\w\d,-_]+)\]$", name)
if extras_m:
extras = [e.strip() for e in extras_m.group(1).split(",")]
name, _ = name.split("[")

require["name"] = name
require["version"] = constraint
else:
extras_m = re.search(r"\[([\w\d,-_]+)\]$", pair)
if extras_m:
extras = [e.strip() for e in extras_m.group(1).split(",")]
pair, _ = pair.split("[")

require["name"] = pair

if extras:
require["extras"] = extras

result.append(require)

return result
for requirement in requirements
]

def _format_requirements(
self, requirements: list[dict[str, str]]
Expand Down
155 changes: 155 additions & 0 deletions src/poetry/utils/dependency_specification.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
from __future__ import annotations

import os
import re
import urllib.parse

from pathlib import Path
from typing import TYPE_CHECKING
from typing import Union

from poetry.puzzle.provider import Provider


if TYPE_CHECKING:
from poetry.utils.env import Env


DependencySpec = dict[str, Union[str, dict[str, Union[str, bool]], list[str]]]


def _parse_dependency_specification_git_url(
requirement: str, env: Env | None = None
) -> DependencySpec | None:
from poetry.core.vcs.git import Git
from poetry.core.vcs.git import ParsedUrl

parsed = ParsedUrl.parse(requirement)
url = Git.normalize_url(requirement)

pair = {"name": parsed.name, "git": url.url}
if parsed.rev:
pair["rev"] = url.revision

source_root = env.path.joinpath("src") if env else None
package = Provider.get_package_from_vcs(
"git", url=url.url, rev=pair.get("rev"), source_root=source_root
)
pair["name"] = package.name
return pair


def _parse_dependency_specification_url(
requirement: str, env: Env | None = None
) -> DependencySpec | None:
url_parsed = urllib.parse.urlparse(requirement)
if not (url_parsed.scheme and url_parsed.netloc):
return None

if url_parsed.scheme in ["git+https", "git+ssh"]:
return _parse_dependency_specification_git_url(requirement, env)

if url_parsed.scheme in ["http", "https"]:
package = Provider.get_package_from_url(requirement)
return {"name": package.name, "url": package.source_url}

return None


def _parse_dependency_specification_path(
requirement: str, cwd: Path
) -> DependencySpec | None:
if (os.path.sep in requirement or "/" in requirement) and (
cwd.joinpath(requirement).exists()
or Path(requirement).expanduser().exists()
and Path(requirement).expanduser().is_absolute()
):
path = Path(requirement).expanduser()
is_absolute = path.is_absolute()

if not path.is_absolute():
path = cwd.joinpath(requirement)

if path.is_file():
package = Provider.get_package_from_file(path.resolve())
else:
package = Provider.get_package_from_directory(path.resolve())

return {
"name": package.name,
"path": path.relative_to(cwd).as_posix()
if not is_absolute
else path.as_posix(),
}

return None


def _parse_dependency_specification_simple(
requirement: str,
) -> DependencySpec | None:
extras: list[str] = []
pair = re.sub("^([^@=: ]+)(?:@|==|(?<![<>~!])=|:| )(.*)$", "\\1 \\2", requirement)
pair = pair.strip()

require: DependencySpec = {}

if " " in pair:
name, version = pair.split(" ", 2)
extras_m = re.search(r"\[([\w\d,-_]+)\]$", name)
if extras_m:
extras = [e.strip() for e in extras_m.group(1).split(",")]
name, _ = name.split("[")

require["name"] = name
if version != "latest":
require["version"] = version
else:
m = re.match(r"^([^><=!: ]+)((?:>=|<=|>|<|!=|~=|~|\^).*)$", requirement.strip())
if m:
name, constraint = m.group(1), m.group(2)
extras_m = re.search(r"\[([\w\d,-_]+)\]$", name)
if extras_m:
extras = [e.strip() for e in extras_m.group(1).split(",")]
name, _ = name.split("[")

require["name"] = name
require["version"] = constraint
else:
extras_m = re.search(r"\[([\w\d,-_]+)\]$", pair)
if extras_m:
extras = [e.strip() for e in extras_m.group(1).split(",")]
pair, _ = pair.split("[")

require["name"] = pair

if extras:
require["extras"] = extras

return require


def parse_dependency_specification(
requirement: str, env: Env | None = None, cwd: Path | None = None
) -> DependencySpec:
requirement = requirement.strip()
cwd = cwd or Path.cwd()

extras = []
extras_m = re.search(r"\[([\w\d,-_ ]+)\]$", requirement)
if extras_m:
extras = [e.strip() for e in extras_m.group(1).split(",")]
requirement, _ = requirement.split("[")

specification = (
_parse_dependency_specification_url(requirement, env=env)
or _parse_dependency_specification_path(requirement, cwd=cwd)
or _parse_dependency_specification_simple(requirement)
)

if specification:
if extras and "extras" not in specification:
specification["extras"] = extras
return specification

raise ValueError(f"Invalid dependency specification: {requirement}")
63 changes: 63 additions & 0 deletions tests/utils/test_dependency_specification.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from __future__ import annotations

from pathlib import Path
from typing import TYPE_CHECKING

import pytest

from deepdiff import DeepDiff

from poetry.utils.dependency_specification import parse_dependency_specification


if TYPE_CHECKING:
from pytest_mock import MockerFixture

from poetry.utils.dependency_specification import DependencySpec


@pytest.mark.parametrize(
("requirement", "specification"),
[
(
"git+https://github.com/demo/demo.git",
{"git": "https://github.com/demo/demo.git", "name": "demo"},
),
(
"git+ssh://github.com/demo/demo.git",
{"git": "ssh://github.com/demo/demo.git", "name": "demo"},
),
(
"git+https://github.com/demo/demo.git#main",
{"git": "https://github.com/demo/demo.git", "name": "demo", "rev": "main"},
),
(
"git+https://github.com/demo/demo.git@main",
{"git": "https://github.com/demo/demo.git", "name": "demo", "rev": "main"},
),
("demo", {"name": "demo"}),
("[email protected]", {"name": "demo", "version": "1.0.0"}),
("demo@^1.0.0", {"name": "demo", "version": "^1.0.0"}),
("demo[a,b]@1.0.0", {"name": "demo", "version": "1.0.0", "extras": ["a", "b"]}),
("demo[a,b]", {"name": "demo", "extras": ["a", "b"]}),
("../demo", {"name": "demo", "path": "../demo"}),
("../demo/demo.whl", {"name": "demo", "path": "../demo/demo.whl"}),
(
"https://example.com/packages/demo-0.1.0.tar.gz",
{"name": "demo", "url": "https://example.com/packages/demo-0.1.0.tar.gz"},
),
],
)
def test_parse_dependency_specification(
requirement: str, specification: DependencySpec, mocker: MockerFixture
) -> None:
original = Path.exists

def _mock(self: Path) -> bool:
if "/" in requirement and self == Path.cwd().joinpath(requirement):
return True
return original(self)

mocker.patch("pathlib.Path.exists", _mock)

assert not DeepDiff(parse_dependency_specification(requirement), specification)

0 comments on commit 7d85d60

Please sign in to comment.