Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions minecode/collectors/pypi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# purldb is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/nexB/purldb for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import logging

import requests
from packageurl import PackageURL

from minecode import priority_router
from minecode.miners.pypi import build_packages

"""
Collect PyPI packages from pypi registries.
"""

logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
logger.addHandler(handler)
logger.setLevel(logging.INFO)


def get_package_json(name, version):
"""
Return the contents of the JSON file of the package described by the purl
field arguments in a string.
"""
# Create URLs using purl fields
url = f"https://pypi.org/pypi/{name}/{version}/json"

try:
response = requests.get(url)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as err:
logger.error(f"HTTP error occurred: {err}")


def get_all_package_version(name):
"""
Return a list of all version numbers for the package name.
"""
url = f"https://pypi.org/pypi/{name}/json"
try:
response = requests.get(url)
response.raise_for_status()
data = response.json()
# Get all available versions
versions = list(data["releases"].keys())
return versions
except requests.exceptions.HTTPError as err:
logger.error(f"HTTP error occurred: {err}")


def map_pypi_package(package_url, pipelines, priority=0):
"""
Add a pypi `package_url` to the PackageDB.

Return an error string if any errors are encountered during the process
"""
from minecode.model_utils import add_package_to_scan_queue
from minecode.model_utils import merge_or_create_package

error = ""
package_json = get_package_json(
name=package_url.name,
version=package_url.version,
)

if not package_json:
error = f"Package does not exist on PyPI: {package_url}"
logger.error(error)
return error

packages = build_packages(package_json, package_url)

for package in packages:
db_package, _, _, error = merge_or_create_package(package, visit_level=0)
if error:
break

# Submit package for scanning
if db_package:
add_package_to_scan_queue(
package=db_package, pipelines=pipelines, priority=priority
)

return error


@priority_router.route("pkg:pypi/.*")
def process_request(purl_str, **kwargs):
"""
Process `priority_resource_uri` containing a pypi Package URL (PURL) as a
URI.

This involves obtaining Package information for the PURL from pypi and
using it to create a new PackageDB entry. The package is then added to the
scan queue afterwards.
"""
from minecode.model_utils import DEFAULT_PIPELINES

addon_pipelines = kwargs.get("addon_pipelines", [])
pipelines = DEFAULT_PIPELINES + tuple(addon_pipelines)
priority = kwargs.get("priority", 0)

package_url = PackageURL.from_string(purl_str)

if not package_url.version:
versions = get_all_package_version(package_url.name)
for version in versions:
# package_url.version cannot be set as it will raise
# AttributeError: can't set attribute
# package_url.version = version
purl = purl_str + "@" + version
package_url = PackageURL.from_string(purl)
error_msg = map_pypi_package(package_url, pipelines, priority)

if error_msg:
return error_msg
else:
error_msg = map_pypi_package(package_url, pipelines, priority)

if error_msg:
return error_msg
20 changes: 18 additions & 2 deletions minecode/miners/pypi.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,17 +259,33 @@ def build_packages(metadata, purl=None):
if not url:
continue

packagetype = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if download.get("packagetype") == "sdist":
packagetype = "pypi_sdist_pkginfo"
else:
packagetype = "pypi_bdist_pkginfo"

download_data = dict(
download_url=url,
size=download.get("size"),
release_date=parse_date(download.get("upload_time")),
datasource_id="pypi_sdist_pkginfo",
datasource_id=packagetype,
type="pypi",
)
# TODO: Check for other checksums
download_data["md5"] = download.get("md5_digest")
download_data.update(common_data)
package = scan_models.PackageData.from_data(download_data)
package.datasource_id = "pypi_api_metadata"
package.set_purl(purl)

if purl:
purl_str = purl.to_string()
purl_filename_qualifiers = (
purl_str + "?file_name=" + download.get("filename")
)
updated_purl = PackageURL.from_string(purl_filename_qualifiers)
package.set_purl(updated_purl)
else:
package.set_purl(purl)

yield package
64 changes: 64 additions & 0 deletions minecode/tests/collectors/test_pypi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# purldb is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/nexB/purldb for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import json
import os

from django.test import TestCase as DjangoTestCase

from packageurl import PackageURL

import packagedb
from minecode.collectors import pypi
from minecode.utils_test import JsonBasedTesting


class PypiPriorityQueueTests(JsonBasedTesting, DjangoTestCase):
test_data_dir = os.path.join(
os.path.dirname(os.path.dirname(__file__)), "testfiles"
)

def setUp(self):
super().setUp()
self.expected_json_loc = self.get_test_loc("pypi/cage_1.1.4.json")
with open(self.expected_json_loc) as f:
self.expected_json_contents = json.load(f)

def test_get_package_json(self):
json_contents = pypi.get_package_json(
name="cage",
version="1.1.4",
)
self.assertEqual(self.expected_json_contents, json_contents)

def test_get_all_package_version(self):
releases_list = pypi.get_all_package_version("cage")
expected = ["1.1.2", "1.1.3", "1.1.4"]
# At the time of creating this test, the CAGE project has three
# releases. There may be additional releases in the future.
# Therefore, we will verify that the number of releases is three
# or greater and that it includes the expected release versions.
self.assertTrue(len(releases_list) >= 3)
for version in expected:
self.assertIn(version, releases_list)

def test_map_npm_package(self):
package_count = packagedb.models.Package.objects.all().count()
self.assertEqual(0, package_count)
package_url = PackageURL.from_string("pkg:pypi/[email protected]")
pypi.map_pypi_package(package_url, ("test_pipeline"))
package_count = packagedb.models.Package.objects.all().count()
self.assertEqual(1, package_count)
package = packagedb.models.Package.objects.all().first()
expected_purl_str = "pkg:pypi/[email protected]"
expected_download_url = (
"http://www.alcyone.com/software/cage/cage-latest.tar.gz"
)
self.assertEqual(expected_purl_str, package.purl)
self.assertEqual(expected_download_url, package.download_url)
55 changes: 55 additions & 0 deletions minecode/tests/testfiles/pypi/cage_1.1.4.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
{
"info": {
"author": "Erik Max Francis",
"author_email": "[email protected]",
"bugtrack_url": null,
"classifiers": [
"Development Status :: 6 - Mature",
"Intended Audience :: Developers",
"Intended Audience :: End Users/Desktop",
"Intended Audience :: Science/Research",
"License :: OSI Approved :: GNU General Public License (GPL)",
"Operating System :: OS Independent",
"Programming Language :: Python",
"Topic :: Games/Entertainment",
"Topic :: Scientific/Engineering :: Artificial Intelligence",
"Topic :: Scientific/Engineering :: Mathematics"
],
"description": "CAGE is a fairy generic and complete cellular automaton simulation\r\n engine in Python. It supports both 1D and 2D automata, a variety\r\n of prepackaged rules, and the concept of \"agents\" which can move\r\n about independently on the map for implementing agent behavior.\r\n\r\n CAGE comes with numerous examples of fully-functional CA systems,\r\n including Conway's Game of Life, Langton's self-reproducing\r\n automaton, Langton's \"vants,\" and 1D automata rule explorers. It\r\n also comes with simple displayers (including a curses interface\r\n for 2D automata). Also included is a unique implementation of a\r\n finite state machine (ant.py).",
"description_content_type": null,
"docs_url": null,
"download_url": "http://www.alcyone.com/software/cage/cage-latest.tar.gz",
"downloads": {
"last_day": -1,
"last_month": -1,
"last_week": -1
},
"dynamic": null,
"home_page": "http://www.alcyone.com/software/cage/",
"keywords": "cellular automata, Turing machines, Langton vants, self-organizing systems, finite state machines, finite state automata",
"license": "GPL",
"license_expression": null,
"license_files": null,
"maintainer": "",
"maintainer_email": "",
"name": "CAGE",
"package_url": "https://pypi.org/project/CAGE/",
"platform": "any; Unix for curses frontend",
"project_url": "https://pypi.org/project/CAGE/",
"project_urls": {
"Download": "http://www.alcyone.com/software/cage/cage-latest.tar.gz",
"Homepage": "http://www.alcyone.com/software/cage/"
},
"provides_extra": null,
"release_url": "https://pypi.org/project/CAGE/1.1.4/",
"requires_dist": null,
"requires_python": null,
"summary": "A generic and fairly complete cellular automata simulation engine.",
"version": "1.1.4",
"yanked": false,
"yanked_reason": null
},
"last_serial": 944145,
"urls": [],
"vulnerabilities": []
}
Loading