Skip to content

Commit

Permalink
Add: New NVD CPE Match Criteria API
Browse files Browse the repository at this point in the history
Support for the NVD CPE Match Criteria API is added, which lists
CPE Match Strings.
(See https://nvd.nist.gov/developers/products#divCpeMatch)

These match strings can be used to identify CPEs affected by
a given CVEs if the Match String ID is also known.
  • Loading branch information
timopollmeier authored and greenbonebot committed Dec 5, 2024
1 parent c48ff39 commit 51fd693
Show file tree
Hide file tree
Showing 5 changed files with 357 additions and 0 deletions.
49 changes: 49 additions & 0 deletions pontos/nvd/cpe_match/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# SPDX-FileCopyrightText: 2024 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later

import asyncio
from argparse import Namespace
from typing import Callable

import httpx

from pontos.nvd.cpe_match.api import CPEMatchApi

from ._parser import cpe_match_parse, cpe_matches_parse

__all__ = ("CPEMatchApi",)


async def query_cpe_match(args: Namespace) -> None:
async with CPEMatchApi(token=args.token) as api:
cpe_match = await api.cpe_match(args.match_criteria_id)
print(cpe_match)


async def query_cpe_matches(args: Namespace) -> None:
async with CPEMatchApi(token=args.token) as api:
response = api.cpe_matches(
cve_id=args.cve_id,
request_results=args.number,
start_index=args.start,
)
async for cpe_match in response:
print(cpe_match)


def cpe_match_main() -> None:
main(cpe_match_parse(), query_cpe_match)


def cpe_matches_main() -> None:
main(cpe_matches_parse(), query_cpe_matches)


def main(args: Namespace, func: Callable) -> None:
try:
asyncio.run(func(args))
except KeyboardInterrupt:
pass
except httpx.HTTPStatusError as e:
print(f"HTTP Error {e.response.status_code}: {e.response.text}")
36 changes: 36 additions & 0 deletions pontos/nvd/cpe_match/_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# SPDX-FileCopyrightText: 2024 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later

from argparse import ArgumentParser, Namespace
from typing import Optional, Sequence

import shtab


def cpe_matches_parse(args: Optional[Sequence[str]] = None) -> Namespace:
parser = ArgumentParser()
shtab.add_argument_to(parser)
parser.add_argument("--token", help="API key to use for querying.")
parser.add_argument("--cve-id", help="Get matches for a specific CVE")
parser.add_argument(
"--number", "-n", metavar="N", help="Request only N matches", type=int
)
parser.add_argument(
"--start",
"-s",
help="Index of the first match to request.",
type=int,
)
return parser.parse_args(args)


def cpe_match_parse(args: Optional[Sequence[str]] = None) -> Namespace:
parser = ArgumentParser()
shtab.add_argument_to(parser)
parser.add_argument("--token", help="API key to use for querying.")
parser.add_argument(
"--match-criteria-id",
help="Get the match string with the given matchCriteriaId ",
)
return parser.parse_args(args)
213 changes: 213 additions & 0 deletions pontos/nvd/cpe_match/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
# SPDX-FileCopyrightText: 2024 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later

from datetime import datetime
from types import TracebackType
from typing import (
Iterator,
Optional,
Type,
)

from httpx import Timeout

from pontos.errors import PontosError
from pontos.nvd.api import (
DEFAULT_TIMEOUT_CONFIG,
JSON,
NVDApi,
NVDResults,
Params,
convert_camel_case,
format_date,
now,
)
from pontos.nvd.models.cpe_match_string import CPEMatchString, CPEMatch

__all__ = ("CPEMatchApi",)

DEFAULT_NIST_NVD_CPE_MATCH_URL = (
"https://services.nvd.nist.gov/rest/json/cpematch/2.0"
)
MAX_CPE_MATCHES_PER_PAGE = 500


def _result_iterator(data: JSON) -> Iterator[CPEMatchString]:
results: list[dict[str, Any]] = data.get("match_strings", []) # type: ignore
return (
CPEMatchString.from_dict(result["match_string"]) for result in results
)


class CPEMatchApi(NVDApi):
"""
API for querying the NIST NVD CPE match information.
Should be used as an async context manager.
Example:
.. code-block:: python
from pontos.nvd.cpe_match import CPEMatchApi
async with CPEMatchApi() as api:
cpe = await api.cpe_match_string(...)
"""

def __init__(
self,
*,
token: Optional[str] = None,
timeout: Optional[Timeout] = DEFAULT_TIMEOUT_CONFIG,
rate_limit: bool = True,
) -> None:
"""
Create a new instance of the CPE API.
Args:
token: The API key to use. Using an API key allows to run more
requests at the same time.
timeout: Timeout settings for the HTTP requests
rate_limit: Set to False to ignore rate limits. The public rate
limit (without an API key) is 5 requests in a rolling 30 second
window. The rate limit with an API key is 50 requests in a
rolling 30 second window.
See https://nvd.nist.gov/developers/start-here#divRateLimits
Default: True.
"""
super().__init__(
DEFAULT_NIST_NVD_CPE_MATCH_URL,
token=token,
timeout=timeout,
rate_limit=rate_limit,
)

def cpe_matches(
self,
*,
last_modified_start_date: Optional[datetime] = None,
last_modified_end_date: Optional[datetime] = None,
cve_id: Optional[str] = None,
request_results: Optional[int] = None,
start_index: int = 0,
results_per_page: Optional[int] = None,
) -> NVDResults[CPEMatchString]:
"""
Get all CPE matches for the provided arguments
https://nvd.nist.gov/developers/products#divCpeMatch
Args:
last_modified_start_date: Return all CPE matches last modified after this date.
last_modified_end_date: Return all CPE matches last modified before this date.
cve_id: Return all CPE matches for this Common Vulnerabilities and Exposures identifier.
request_results: Number of CPE matches to download. Set to None
(default) to download all available matches.
start_index: Index of the first CPE match to be returned. Useful
only for paginated requests that should not start at the first
page.
results_per_page: Number of results in a single requests. Mostly
useful for paginated requests.
Returns:
A NVDResponse for CPE matches
Example:
.. code-block:: python
from pontos.nvd.cpe_match import CPEMatchApi
async with CPEMatchApi() as api:
async for match_string in api.matches(cve_id='CVE-2024-1234'):
print(match_string)
json = api.matches(cve_id='CVE-2024-1234').json()
async for match_strings in api.matches(
cve_id='CVE-2024-1234',
).chunks():
for match_string in match_strings:
print(match_string)
"""
params: Params = {}

if last_modified_start_date:
params["lastModStartDate"] = format_date(last_modified_start_date)
if not last_modified_end_date:
params["lastModEndDate"] = format_date(now())
if last_modified_end_date:
params["lastModEndDate"] = format_date(last_modified_end_date)

if cve_id:
params["cveId"] = cve_id

results_per_page = min(
results_per_page or MAX_CPE_MATCHES_PER_PAGE,
request_results or MAX_CPE_MATCHES_PER_PAGE,
)
if start_index is None:
start_index = 0

return NVDResults(
self,
params,
_result_iterator,
request_results=request_results,
results_per_page=results_per_page,
start_index=start_index,
)

async def cpe_match(self, match_criteria_id: str) -> CPEMatchString:
"""
Returns a single CPE match for the given match criteria id.
Args:
match_criteria_id: Match criteria identifier
Returns:
A CPE match for the given identifier
Raises:
PontosError: If match criteria ID is empty or if no match with the given ID is
found.
Example:
.. code-block:: python
from pontos.nvd.cpe_match import CVEApi
async with CVEApi() as api:
match = await api.cpe_match("36FBCF0F-8CEE-474C-8A04-5075AF53FAF4")
print(match)
"""
if not match_criteria_id:
raise PontosError("Missing Match Criteria ID.")

response = await self._get(
params={"matchCriteriaId": match_criteria_id}
)
response.raise_for_status()
data = response.json(object_hook=convert_camel_case)
match_strings = data["match_strings"]
if not match_strings:
raise PontosError(
f"No match with Match Criteria ID '{match_criteria_id}' found."
)

match_string = match_strings[0]
return CPEMatchString.from_dict(match_string["match_string"])

async def __aenter__(self) -> "CPEMatchApi":
await super().__aenter__()
return self

async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> Optional[bool]:
return await super().__aexit__( # type: ignore
exc_type, exc_value, traceback
)
57 changes: 57 additions & 0 deletions pontos/nvd/models/cpe_match_string.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# SPDX-FileCopyrightText: 2024 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later
#

from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Optional
from uuid import UUID

from pontos.models import Model


@dataclass
class CPEMatch(Model):
"""
Represents a single CPE match.
Attributes:
cpe_name: Name of the matching CPE
cpe_name_id: Name ID of the matching CPE
"""

cpe_name: str
cpe_name_id: UUID


@dataclass
class CPEMatchString(Model):
"""
Represents a CPE match string, matching criteria to one or more CPEs
Attributes:
match_criteria_id: The identifier of the CPE match
criteria: The CPE formatted match criteria
version_start_including: Optional start of the matching version range, including the given version
version_start_excluding: Optional start of the matching version range, excluding the given version
version_end_including: Optional end of the matching version range, including the given version
version_end_excluding: Optional end of the matching version range, excluding the given version
status: Status of the CPE match
cpe_last_modified: The date the CPEs list of the match was last modified
created: Creation date of the CPE
last_modified: Last modification date of the CPE
matches: List of CPEs matching the criteria string and the optional range limits
"""

match_criteria_id: UUID
criteria: str
status: str
cpe_last_modified: datetime
created: datetime
last_modified: datetime
matches: List[CPEMatch] = field(default_factory=list)
version_start_including: Optional[str] = None
version_start_excluding: Optional[str] = None
version_end_including: Optional[str] = None
version_end_excluding: Optional[str] = None
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ pontos-nvd-cves = 'pontos.nvd.cve:cves_main'
pontos-nvd-cve-changes = 'pontos.nvd.cve_changes:main'
pontos-nvd-cpe = 'pontos.nvd.cpe:cpe_main'
pontos-nvd-cpes = 'pontos.nvd.cpe:cpes_main'
pontos-nvd-cpe-match = 'pontos.nvd.cpe_matches:cpe_match_main'
pontos-nvd-cpe-matches = 'pontos.nvd.cpe_matches:cpe_matches_main'

[build-system]
requires = ["poetry-core>=1.0.0"]
Expand Down

0 comments on commit 51fd693

Please sign in to comment.