From 51fd6938f1c8696b513a734f69a3ff09b296bde8 Mon Sep 17 00:00:00 2001 From: Timo Pollmeier Date: Thu, 24 Oct 2024 14:35:26 +0200 Subject: [PATCH] Add: New NVD CPE Match Criteria API Support for the NVD CPE Match Criteria API is added, which lists CPE Match Strings. (See https://nvd.nist.gov/developers/products#divCpeMatch) These match strings can be used to identify CPEs affected by a given CVEs if the Match String ID is also known. --- pontos/nvd/cpe_match/__init__.py | 49 ++++++ pontos/nvd/cpe_match/_parser.py | 36 +++++ pontos/nvd/cpe_match/api.py | 213 ++++++++++++++++++++++++++ pontos/nvd/models/cpe_match_string.py | 57 +++++++ pyproject.toml | 2 + 5 files changed, 357 insertions(+) create mode 100644 pontos/nvd/cpe_match/__init__.py create mode 100644 pontos/nvd/cpe_match/_parser.py create mode 100644 pontos/nvd/cpe_match/api.py create mode 100644 pontos/nvd/models/cpe_match_string.py diff --git a/pontos/nvd/cpe_match/__init__.py b/pontos/nvd/cpe_match/__init__.py new file mode 100644 index 000000000..813c7f3d0 --- /dev/null +++ b/pontos/nvd/cpe_match/__init__.py @@ -0,0 +1,49 @@ +# SPDX-FileCopyrightText: 2024 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +import asyncio +from argparse import Namespace +from typing import Callable + +import httpx + +from pontos.nvd.cpe_match.api import CPEMatchApi + +from ._parser import cpe_match_parse, cpe_matches_parse + +__all__ = ("CPEMatchApi",) + + +async def query_cpe_match(args: Namespace) -> None: + async with CPEMatchApi(token=args.token) as api: + cpe_match = await api.cpe_match(args.match_criteria_id) + print(cpe_match) + + +async def query_cpe_matches(args: Namespace) -> None: + async with CPEMatchApi(token=args.token) as api: + response = api.cpe_matches( + cve_id=args.cve_id, + request_results=args.number, + start_index=args.start, + ) + async for cpe_match in response: + print(cpe_match) + + +def cpe_match_main() -> None: + main(cpe_match_parse(), query_cpe_match) + + +def cpe_matches_main() -> None: + main(cpe_matches_parse(), query_cpe_matches) + + +def main(args: Namespace, func: Callable) -> None: + try: + asyncio.run(func(args)) + except KeyboardInterrupt: + pass + except httpx.HTTPStatusError as e: + print(f"HTTP Error {e.response.status_code}: {e.response.text}") diff --git a/pontos/nvd/cpe_match/_parser.py b/pontos/nvd/cpe_match/_parser.py new file mode 100644 index 000000000..47ea0087f --- /dev/null +++ b/pontos/nvd/cpe_match/_parser.py @@ -0,0 +1,36 @@ +# SPDX-FileCopyrightText: 2024 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +from argparse import ArgumentParser, Namespace +from typing import Optional, Sequence + +import shtab + + +def cpe_matches_parse(args: Optional[Sequence[str]] = None) -> Namespace: + parser = ArgumentParser() + shtab.add_argument_to(parser) + parser.add_argument("--token", help="API key to use for querying.") + parser.add_argument("--cve-id", help="Get matches for a specific CVE") + parser.add_argument( + "--number", "-n", metavar="N", help="Request only N matches", type=int + ) + parser.add_argument( + "--start", + "-s", + help="Index of the first match to request.", + type=int, + ) + return parser.parse_args(args) + + +def cpe_match_parse(args: Optional[Sequence[str]] = None) -> Namespace: + parser = ArgumentParser() + shtab.add_argument_to(parser) + parser.add_argument("--token", help="API key to use for querying.") + parser.add_argument( + "--match-criteria-id", + help="Get the match string with the given matchCriteriaId ", + ) + return parser.parse_args(args) diff --git a/pontos/nvd/cpe_match/api.py b/pontos/nvd/cpe_match/api.py new file mode 100644 index 000000000..da8da500a --- /dev/null +++ b/pontos/nvd/cpe_match/api.py @@ -0,0 +1,213 @@ +# SPDX-FileCopyrightText: 2024 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +from datetime import datetime +from types import TracebackType +from typing import ( + Iterator, + Optional, + Type, +) + +from httpx import Timeout + +from pontos.errors import PontosError +from pontos.nvd.api import ( + DEFAULT_TIMEOUT_CONFIG, + JSON, + NVDApi, + NVDResults, + Params, + convert_camel_case, + format_date, + now, +) +from pontos.nvd.models.cpe_match_string import CPEMatchString, CPEMatch + +__all__ = ("CPEMatchApi",) + +DEFAULT_NIST_NVD_CPE_MATCH_URL = ( + "https://services.nvd.nist.gov/rest/json/cpematch/2.0" +) +MAX_CPE_MATCHES_PER_PAGE = 500 + + +def _result_iterator(data: JSON) -> Iterator[CPEMatchString]: + results: list[dict[str, Any]] = data.get("match_strings", []) # type: ignore + return ( + CPEMatchString.from_dict(result["match_string"]) for result in results + ) + + +class CPEMatchApi(NVDApi): + """ + API for querying the NIST NVD CPE match information. + + Should be used as an async context manager. + + Example: + .. code-block:: python + + from pontos.nvd.cpe_match import CPEMatchApi + + async with CPEMatchApi() as api: + cpe = await api.cpe_match_string(...) + """ + + def __init__( + self, + *, + token: Optional[str] = None, + timeout: Optional[Timeout] = DEFAULT_TIMEOUT_CONFIG, + rate_limit: bool = True, + ) -> None: + """ + Create a new instance of the CPE API. + + Args: + token: The API key to use. Using an API key allows to run more + requests at the same time. + timeout: Timeout settings for the HTTP requests + rate_limit: Set to False to ignore rate limits. The public rate + limit (without an API key) is 5 requests in a rolling 30 second + window. The rate limit with an API key is 50 requests in a + rolling 30 second window. + See https://nvd.nist.gov/developers/start-here#divRateLimits + Default: True. + """ + super().__init__( + DEFAULT_NIST_NVD_CPE_MATCH_URL, + token=token, + timeout=timeout, + rate_limit=rate_limit, + ) + + def cpe_matches( + self, + *, + last_modified_start_date: Optional[datetime] = None, + last_modified_end_date: Optional[datetime] = None, + cve_id: Optional[str] = None, + request_results: Optional[int] = None, + start_index: int = 0, + results_per_page: Optional[int] = None, + ) -> NVDResults[CPEMatchString]: + """ + Get all CPE matches for the provided arguments + + https://nvd.nist.gov/developers/products#divCpeMatch + + Args: + last_modified_start_date: Return all CPE matches last modified after this date. + last_modified_end_date: Return all CPE matches last modified before this date. + cve_id: Return all CPE matches for this Common Vulnerabilities and Exposures identifier. + request_results: Number of CPE matches to download. Set to None + (default) to download all available matches. + start_index: Index of the first CPE match to be returned. Useful + only for paginated requests that should not start at the first + page. + results_per_page: Number of results in a single requests. Mostly + useful for paginated requests. + + Returns: + A NVDResponse for CPE matches + + Example: + .. code-block:: python + + from pontos.nvd.cpe_match import CPEMatchApi + + async with CPEMatchApi() as api: + async for match_string in api.matches(cve_id='CVE-2024-1234'): + print(match_string) + + json = api.matches(cve_id='CVE-2024-1234').json() + + async for match_strings in api.matches( + cve_id='CVE-2024-1234', + ).chunks(): + for match_string in match_strings: + print(match_string) + """ + params: Params = {} + + if last_modified_start_date: + params["lastModStartDate"] = format_date(last_modified_start_date) + if not last_modified_end_date: + params["lastModEndDate"] = format_date(now()) + if last_modified_end_date: + params["lastModEndDate"] = format_date(last_modified_end_date) + + if cve_id: + params["cveId"] = cve_id + + results_per_page = min( + results_per_page or MAX_CPE_MATCHES_PER_PAGE, + request_results or MAX_CPE_MATCHES_PER_PAGE, + ) + if start_index is None: + start_index = 0 + + return NVDResults( + self, + params, + _result_iterator, + request_results=request_results, + results_per_page=results_per_page, + start_index=start_index, + ) + + async def cpe_match(self, match_criteria_id: str) -> CPEMatchString: + """ + Returns a single CPE match for the given match criteria id. + + Args: + match_criteria_id: Match criteria identifier + + Returns: + A CPE match for the given identifier + + Raises: + PontosError: If match criteria ID is empty or if no match with the given ID is + found. + + Example: + .. code-block:: python + + from pontos.nvd.cpe_match import CVEApi + + async with CVEApi() as api: + match = await api.cpe_match("36FBCF0F-8CEE-474C-8A04-5075AF53FAF4") + print(match) + """ + if not match_criteria_id: + raise PontosError("Missing Match Criteria ID.") + + response = await self._get( + params={"matchCriteriaId": match_criteria_id} + ) + response.raise_for_status() + data = response.json(object_hook=convert_camel_case) + match_strings = data["match_strings"] + if not match_strings: + raise PontosError( + f"No match with Match Criteria ID '{match_criteria_id}' found." + ) + + match_string = match_strings[0] + return CPEMatchString.from_dict(match_string["match_string"]) + + async def __aenter__(self) -> "CPEMatchApi": + await super().__aenter__() + return self + + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]], + exc_value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> Optional[bool]: + return await super().__aexit__( # type: ignore + exc_type, exc_value, traceback + ) diff --git a/pontos/nvd/models/cpe_match_string.py b/pontos/nvd/models/cpe_match_string.py new file mode 100644 index 000000000..b0b1b935f --- /dev/null +++ b/pontos/nvd/models/cpe_match_string.py @@ -0,0 +1,57 @@ +# SPDX-FileCopyrightText: 2024 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + +from dataclasses import dataclass, field +from datetime import datetime +from typing import List, Optional +from uuid import UUID + +from pontos.models import Model + + +@dataclass +class CPEMatch(Model): + """ + Represents a single CPE match. + + Attributes: + cpe_name: Name of the matching CPE + cpe_name_id: Name ID of the matching CPE + """ + + cpe_name: str + cpe_name_id: UUID + + +@dataclass +class CPEMatchString(Model): + """ + Represents a CPE match string, matching criteria to one or more CPEs + + Attributes: + match_criteria_id: The identifier of the CPE match + criteria: The CPE formatted match criteria + version_start_including: Optional start of the matching version range, including the given version + version_start_excluding: Optional start of the matching version range, excluding the given version + version_end_including: Optional end of the matching version range, including the given version + version_end_excluding: Optional end of the matching version range, excluding the given version + status: Status of the CPE match + cpe_last_modified: The date the CPEs list of the match was last modified + created: Creation date of the CPE + last_modified: Last modification date of the CPE + matches: List of CPEs matching the criteria string and the optional range limits + """ + + match_criteria_id: UUID + criteria: str + status: str + cpe_last_modified: datetime + created: datetime + last_modified: datetime + matches: List[CPEMatch] = field(default_factory=list) + version_start_including: Optional[str] = None + version_start_excluding: Optional[str] = None + version_end_including: Optional[str] = None + version_end_excluding: Optional[str] = None diff --git a/pyproject.toml b/pyproject.toml index 2fccfd235..05f79e216 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -122,6 +122,8 @@ pontos-nvd-cves = 'pontos.nvd.cve:cves_main' pontos-nvd-cve-changes = 'pontos.nvd.cve_changes:main' pontos-nvd-cpe = 'pontos.nvd.cpe:cpe_main' pontos-nvd-cpes = 'pontos.nvd.cpe:cpes_main' +pontos-nvd-cpe-match = 'pontos.nvd.cpe_matches:cpe_match_main' +pontos-nvd-cpe-matches = 'pontos.nvd.cpe_matches:cpe_matches_main' [build-system] requires = ["poetry-core>=1.0.0"]