From 86e5dd7707ca8f4ca27c65088fac843ff184cabb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Ricks?= Date: Fri, 3 Nov 2023 14:21:09 +0100 Subject: [PATCH] Add: Allow to request a specific number of CVEs and CPEs Extend the NVD CVE and CPE API to allow requesting a specific number of results. This has become necessary to just test the CPE API because otherwise more then 1 million CPEs would be downloaded. --- pontos/nvd/api.py | 1 + pontos/nvd/cpe/api.py | 47 ++++++--- pontos/nvd/cve/api.py | 46 ++++++--- tests/nvd/cpe/test_api.py | 107 +++++++++++++++---- tests/nvd/cve/test_api.py | 211 ++++++++++++++++++++++++++++---------- 5 files changed, 311 insertions(+), 101 deletions(-) diff --git a/pontos/nvd/api.py b/pontos/nvd/api.py index 34949b329..dffe646e1 100644 --- a/pontos/nvd/api.py +++ b/pontos/nvd/api.py @@ -32,6 +32,7 @@ Headers = Dict[str, str] Params = Dict[str, Union[str, int]] +JSON = dict[str, Union[int, str, dict[str, Any]]] __all__ = ( "convert_camel_case", diff --git a/pontos/nvd/cpe/api.py b/pontos/nvd/cpe/api.py index 07c455263..de982eb7b 100644 --- a/pontos/nvd/cpe/api.py +++ b/pontos/nvd/cpe/api.py @@ -19,9 +19,7 @@ from datetime import datetime from types import TracebackType from typing import ( - Any, AsyncIterator, - Dict, Iterable, List, Optional, @@ -34,7 +32,9 @@ from pontos.errors import PontosError from pontos.nvd.api import ( DEFAULT_TIMEOUT_CONFIG, + JSON, NVDApi, + Params, convert_camel_case, format_date, now, @@ -42,6 +42,7 @@ from pontos.nvd.models.cpe import CPE DEFAULT_NIST_NVD_CPES_URL = "https://services.nvd.nist.gov/rest/json/cpes/2.0" +MAX_CPES_PER_PAGE = 10000 class CPEApi(NVDApi): @@ -131,6 +132,7 @@ async def cpes( cpe_match_string: Optional[str] = None, keywords: Optional[Union[List[str], str]] = None, match_criteria_id: Optional[str] = None, + request_results: Optional[int] = None, ) -> AsyncIterator[CPE]: """ Get all CPEs for the provided arguments @@ -148,6 +150,8 @@ async def cpes( the metadata title or reference links. match_criteria_id: Returns all CPE records associated with a match string identified by its UUID. + request_results: Number of CPEs to download. Set to None (default) + to download all available CPEs. Returns: An async iterator of CPE model instances. @@ -161,9 +165,7 @@ async def cpes( async for cpe in api.cpes(keywords=["Mac OS X"]): print(cpe.cpe_name, cpe.cpe_name_id) """ - total_results = None - - params: Dict[str, Union[str, int]] = {} + params: Params = {} if last_modified_start_date: params["lastModStartDate"] = format_date(last_modified_start_date) if not last_modified_end_date: @@ -186,9 +188,18 @@ async def cpes( params["matchCriteriaId"] = match_criteria_id start_index = 0 - results_per_page = None + downloaded_results = 0 + results_per_page = ( + request_results + if request_results and request_results < MAX_CPES_PER_PAGE + else MAX_CPES_PER_PAGE + ) + total_results = None + requested_results = request_results - while total_results is None or start_index < total_results: + while ( + requested_results is None or downloaded_results < requested_results + ): params["startIndex"] = start_index if results_per_page is not None: @@ -197,19 +208,31 @@ async def cpes( response = await self._get(params=params) response.raise_for_status() - data: Dict[str, Union[int, str, Dict[str, Any]]] = response.json( - object_hook=convert_camel_case - ) + data: JSON = response.json(object_hook=convert_camel_case) results_per_page: int = data["results_per_page"] # type: ignore total_results: int = data["total_results"] # type: ignore products: Iterable = data.get("products", []) # type: ignore + if not requested_results: + requested_results = total_results + for product in products: yield CPE.from_dict(product["cpe"]) - if results_per_page is not None: - start_index += results_per_page + if results_per_page is None: + # just be safe here. should never occur + results_per_page = len(products) + + start_index += results_per_page + downloaded_results += results_per_page + + if ( + request_results + and downloaded_results + results_per_page > request_results + ): + # avoid downloading more results then requested + results_per_page = request_results - downloaded_results async def __aenter__(self) -> "CPEApi": await super().__aenter__() diff --git a/pontos/nvd/cve/api.py b/pontos/nvd/cve/api.py index 77ad7c4b5..9b2ef324c 100644 --- a/pontos/nvd/cve/api.py +++ b/pontos/nvd/cve/api.py @@ -18,9 +18,7 @@ from datetime import datetime from types import TracebackType from typing import ( - Any, AsyncIterator, - Dict, Iterable, List, Optional, @@ -33,6 +31,7 @@ from pontos.errors import PontosError from pontos.nvd.api import ( DEFAULT_TIMEOUT_CONFIG, + JSON, NVDApi, Params, convert_camel_case, @@ -46,6 +45,7 @@ __all__ = ("CVEApi",) DEFAULT_NIST_NVD_CVES_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0" +MAX_CVES_PER_PAGE = 10000 class CVEApi(NVDApi): @@ -112,6 +112,7 @@ async def cves( has_cert_notes: Optional[bool] = None, has_kev: Optional[bool] = None, has_oval: Optional[bool] = None, + request_results: Optional[int] = None, ) -> AsyncIterator[CVE]: """ Get all CVEs for the provided arguments @@ -160,6 +161,8 @@ async def cves( has_oval: Returns the CVEs that contain information from MITRE's Open Vulnerability and Assessment Language (OVAL) before this transitioned to the Center for Internet Security (CIS). + request_results: Number of CVEs to download. Set to None (default) + to download all available CVEs. Returns: An async iterator to iterate over CVE model instances @@ -173,8 +176,6 @@ async def cves( async for cve in api.cves(keywords=["Mac OS X", "kernel"]): print(cve.id) """ - total_results: Optional[int] = None - params: Params = {} if last_modified_start_date: params["lastModStartDate"] = format_date(last_modified_start_date) @@ -231,9 +232,18 @@ async def cves( params["hasOval"] = "" start_index: int = 0 - results_per_page = None + downloaded_results = 0 + results_per_page = ( + request_results + if request_results and request_results < MAX_CVES_PER_PAGE + else MAX_CVES_PER_PAGE + ) + total_results = None + requested_results = request_results - while total_results is None or start_index < total_results: + while ( + requested_results is None or downloaded_results < requested_results + ): params["startIndex"] = start_index if results_per_page is not None: @@ -242,21 +252,33 @@ async def cves( response = await self._get(params=params) response.raise_for_status() - data: Dict[str, Union[int, str, Dict[str, Any]]] = response.json( - object_hook=convert_camel_case - ) + data: JSON = response.json(object_hook=convert_camel_case) - total_results = data["total_results"] # type: ignore results_per_page: int = data["results_per_page"] # type: ignore + total_results: int = data["total_results"] # type: ignore vulnerabilities: Iterable = data.get( # type: ignore "vulnerabilities", [] ) + if not requested_results: + requested_results = total_results + for vulnerability in vulnerabilities: yield CVE.from_dict(vulnerability["cve"]) - if results_per_page is not None: - start_index += results_per_page + if results_per_page is None: + # just be safe here. should never occur + results_per_page = len(vulnerabilities) + + start_index += results_per_page + downloaded_results += results_per_page + + if ( + request_results + and downloaded_results + results_per_page > request_results + ): + # avoid downloading more results then requested + results_per_page = request_results - downloaded_results async def cve(self, cve_id: str) -> CVE: """ diff --git a/tests/nvd/cpe/test_api.py b/tests/nvd/cpe/test_api.py index b21b9a5ed..82dfcaf93 100644 --- a/tests/nvd/cpe/test_api.py +++ b/tests/nvd/cpe/test_api.py @@ -26,17 +26,23 @@ from pontos.errors import PontosError from pontos.nvd.api import now -from pontos.nvd.cpe.api import CPEApi +from pontos.nvd.cpe.api import MAX_CPES_PER_PAGE, CPEApi from tests import AsyncMock, IsolatedAsyncioTestCase, aiter, anext from tests.nvd import get_cpe_data def create_cpe_response( - cpe_name_id: str, update: Optional[Dict[str, Any]] = None + cpe_name_id: str, + *, + update: Optional[Dict[str, Any]] = None, + results: int = 1, ) -> MagicMock: data = { - "products": [{"cpe": get_cpe_data({"cpe_name_id": cpe_name_id})}], - "results_per_page": 1, + "products": [ + {"cpe": get_cpe_data({"cpe_name_id": f"{cpe_name_id}-{i}"})} + for i in range(1, results + 1) + ], + "results_per_page": results, } if update: data.update(update) @@ -46,10 +52,16 @@ def create_cpe_response( return response -def create_cpes_responses(count: int = 2) -> List[MagicMock]: +def create_cpes_responses( + responses: int = 2, results_per_response: int = 1 +) -> List[MagicMock]: return [ - create_cpe_response(f"CPE-{i}", {"total_results": count}) - for i in range(1, count + 1) + create_cpe_response( + cpe_name_id=f"CPE-{i}", + update={"total_results": responses * results_per_response}, + results=results_per_response, + ) + for i in range(1, responses + 1) ] @@ -81,19 +93,19 @@ async def test_no_cpe(self): async def test_cpe(self): self.http_client.get.return_value = create_cpe_response("CPE-1") - cpe = await self.api.cpe("CPE-1") + cpe = await self.api.cpe("CPE-1-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cpes/2.0", headers={}, - params={"cpeNameId": "CPE-1"}, + params={"cpeNameId": "CPE-1-1"}, ) self.assertEqual( cpe.cpe_name, "cpe:2.3:o:microsoft:windows_10_22h2:-:*:*:*:*:*:arm64:*", ) - self.assertEqual(cpe.cpe_name_id, "CPE-1") + self.assertEqual(cpe.cpe_name_id, "CPE-1-1") self.assertFalse(cpe.deprecated) self.assertEqual( cpe.last_modified, datetime(2022, 12, 9, 18, 15, 16, 973000) @@ -137,7 +149,7 @@ async def test_cves_last_modified_start_date(self, now_mock: MagicMock): ) cve = await anext(it) - self.assertEqual(cve.cpe_name_id, "CPE-1") + self.assertEqual(cve.cpe_name_id, "CPE-1-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cpes/2.0", headers={}, @@ -145,6 +157,7 @@ async def test_cves_last_modified_start_date(self, now_mock: MagicMock): "startIndex": 0, "lastModStartDate": "2022-12-01T00:00:00", "lastModEndDate": "2022-12-31T00:00:00", + "resultsPerPage": MAX_CPES_PER_PAGE, }, ) @@ -152,7 +165,7 @@ async def test_cves_last_modified_start_date(self, now_mock: MagicMock): cve = await anext(it) - self.assertEqual(cve.cpe_name_id, "CPE-2") + self.assertEqual(cve.cpe_name_id, "CPE-2-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cpes/2.0", headers={}, @@ -178,7 +191,7 @@ async def test_cves_last_modified_end_date(self): ) cve = await anext(it) - self.assertEqual(cve.cpe_name_id, "CPE-1") + self.assertEqual(cve.cpe_name_id, "CPE-1-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cpes/2.0", headers={}, @@ -186,6 +199,7 @@ async def test_cves_last_modified_end_date(self): "startIndex": 0, "lastModStartDate": "2022-12-01T00:00:00", "lastModEndDate": "2022-12-31T00:00:00", + "resultsPerPage": MAX_CPES_PER_PAGE, }, ) @@ -193,7 +207,7 @@ async def test_cves_last_modified_end_date(self): cve = await anext(it) - self.assertEqual(cve.cpe_name_id, "CPE-2") + self.assertEqual(cve.cpe_name_id, "CPE-2-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cpes/2.0", headers={}, @@ -214,7 +228,7 @@ async def test_cpes_keywords(self): it = aiter(self.api.cpes(keywords=["Mac OS X", "kernel"])) cve = await anext(it) - self.assertEqual(cve.cpe_name_id, "CPE-1") + self.assertEqual(cve.cpe_name_id, "CPE-1-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cpes/2.0", headers={}, @@ -222,6 +236,7 @@ async def test_cpes_keywords(self): "startIndex": 0, "keywordSearch": "Mac OS X kernel", "keywordExactMatch": "", + "resultsPerPage": MAX_CPES_PER_PAGE, }, ) @@ -229,7 +244,7 @@ async def test_cpes_keywords(self): cve = await anext(it) - self.assertEqual(cve.cpe_name_id, "CPE-2") + self.assertEqual(cve.cpe_name_id, "CPE-2-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cpes/2.0", headers={}, @@ -250,13 +265,14 @@ async def test_cpes_keyword(self): it = aiter(self.api.cpes(keywords="macOS")) cve = await anext(it) - self.assertEqual(cve.cpe_name_id, "CPE-1") + self.assertEqual(cve.cpe_name_id, "CPE-1-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cpes/2.0", headers={}, params={ "startIndex": 0, "keywordSearch": "macOS", + "resultsPerPage": MAX_CPES_PER_PAGE, }, ) @@ -264,7 +280,7 @@ async def test_cpes_keyword(self): cve = await anext(it) - self.assertEqual(cve.cpe_name_id, "CPE-2") + self.assertEqual(cve.cpe_name_id, "CPE-2-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cpes/2.0", headers={}, @@ -288,12 +304,13 @@ async def test_cpes_cpe_match_string(self): ) cve = await anext(it) - self.assertEqual(cve.cpe_name_id, "CPE-1") + self.assertEqual(cve.cpe_name_id, "CPE-1-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cpes/2.0", headers={}, params={ "startIndex": 0, + "resultsPerPage": MAX_CPES_PER_PAGE, "cpeMatchString": "cpe:2.3:o:microsoft:windows_10:20h2:*:*:*:*:*:*:*", }, ) @@ -302,7 +319,7 @@ async def test_cpes_cpe_match_string(self): cve = await anext(it) - self.assertEqual(cve.cpe_name_id, "CPE-2") + self.assertEqual(cve.cpe_name_id, "CPE-2-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cpes/2.0", headers={}, @@ -326,13 +343,14 @@ async def test_cpes_match_criteria_id(self): ) cve = await anext(it) - self.assertEqual(cve.cpe_name_id, "CPE-1") + self.assertEqual(cve.cpe_name_id, "CPE-1-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cpes/2.0", headers={}, params={ "startIndex": 0, "matchCriteriaId": "36FBCF0F-8CEE-474C-8A04-5075AF53FAF4", + "resultsPerPage": MAX_CPES_PER_PAGE, }, ) @@ -340,7 +358,7 @@ async def test_cpes_match_criteria_id(self): cve = await anext(it) - self.assertEqual(cve.cpe_name_id, "CPE-2") + self.assertEqual(cve.cpe_name_id, "CPE-2-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cpes/2.0", headers={}, @@ -354,6 +372,51 @@ async def test_cpes_match_criteria_id(self): with self.assertRaises(StopAsyncIteration): cve = await anext(it) + async def test_cpes_request_results(self): + self.http_client.get.side_effect = create_cpes_responses( + results_per_response=2 + ) + + it = aiter(self.api.cpes(request_results=10)) + cve = await anext(it) + + self.http_client.get.assert_awaited_once_with( + "https://services.nvd.nist.gov/rest/json/cpes/2.0", + headers={}, + params={ + "startIndex": 0, + "resultsPerPage": 10, + }, + ) + self.assertEqual(cve.cpe_name_id, "CPE-1-1") + + self.http_client.get.reset_mock() + cve = await anext(it) + self.assertEqual(cve.cpe_name_id, "CPE-1-2") + self.http_client.get.assert_not_called() + + self.http_client.get.reset_mock() + cve = await anext(it) + self.http_client.get.assert_awaited_once_with( + "https://services.nvd.nist.gov/rest/json/cpes/2.0", + headers={}, + params={ + "startIndex": 2, + "resultsPerPage": 2, + }, + ) + self.assertEqual(cve.cpe_name_id, "CPE-2-1") + + self.http_client.get.reset_mock() + cve = await anext(it) + self.assertEqual(cve.cpe_name_id, "CPE-2-2") + self.http_client.get.assert_not_called() + + self.http_client.get.reset_mock() + + with self.assertRaises(Exception): + cve = await anext(it) + async def test_context_manager(self): async with self.api: pass diff --git a/tests/nvd/cve/test_api.py b/tests/nvd/cve/test_api.py index acfcb69ba..bf6c63d8a 100644 --- a/tests/nvd/cve/test_api.py +++ b/tests/nvd/cve/test_api.py @@ -25,18 +25,24 @@ from pontos.errors import PontosError from pontos.nvd.api import now -from pontos.nvd.cve.api import CVEApi +from pontos.nvd.cve.api import MAX_CVES_PER_PAGE, CVEApi from pontos.nvd.models import cvss_v2, cvss_v3 from tests import AsyncMock, IsolatedAsyncioTestCase, aiter, anext from tests.nvd import get_cve_data def create_cve_response( - cve_id: str, update: Optional[Dict[str, Any]] = None + cve_id: str, + *, + update: Optional[Dict[str, Any]] = None, + results: int = 1, ) -> MagicMock: data = { - "vulnerabilities": [{"cve": get_cve_data({"id": cve_id})}], - "results_per_page": 1, + "vulnerabilities": [ + {"cve": get_cve_data({"id": f"{cve_id}-{i}"})} + for i in range(1, results + 1) + ], + "results_per_page": results, } if update: data.update(update) @@ -46,10 +52,16 @@ def create_cve_response( return response -def create_cves_responses(count: int = 2) -> List[MagicMock]: +def create_cves_responses( + requests: int = 2, results_per_response: int = 1 +) -> List[MagicMock]: return [ - create_cve_response(f"CVE-{i}", {"total_results": count}) - for i in range(1, count + 1) + create_cve_response( + f"CVE-{i}", + update={"total_results": requests * results_per_response}, + results=results_per_response, + ) + for i in range(1, requests + 1) ] @@ -124,18 +136,21 @@ async def test_cves(self): it = aiter(self.api.cves()) cve = await anext(it) - self.assertEqual(cve.id, "CVE-1") + self.assertEqual(cve.id, "CVE-1-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, - params={"startIndex": 0}, + params={ + "startIndex": 0, + "resultsPerPage": MAX_CVES_PER_PAGE, + }, ) self.http_client.get.reset_mock() cve = await anext(it) - self.assertEqual(cve.id, "CVE-2") + self.assertEqual(cve.id, "CVE-2-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, @@ -155,7 +170,7 @@ async def test_cves_last_modified_start_date(self, now_mock: MagicMock): ) cve = await anext(it) - self.assertEqual(cve.id, "CVE-1") + self.assertEqual(cve.id, "CVE-1-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, @@ -163,6 +178,7 @@ async def test_cves_last_modified_start_date(self, now_mock: MagicMock): "startIndex": 0, "lastModStartDate": "2022-12-01T00:00:00", "lastModEndDate": "2022-12-31T00:00:00", + "resultsPerPage": MAX_CVES_PER_PAGE, }, ) @@ -170,7 +186,7 @@ async def test_cves_last_modified_start_date(self, now_mock: MagicMock): cve = await anext(it) - self.assertEqual(cve.id, "CVE-2") + self.assertEqual(cve.id, "CVE-2-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, @@ -196,7 +212,7 @@ async def test_cves_last_modified_end_date(self): ) cve = await anext(it) - self.assertEqual(cve.id, "CVE-1") + self.assertEqual(cve.id, "CVE-1-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, @@ -204,6 +220,7 @@ async def test_cves_last_modified_end_date(self): "startIndex": 0, "lastModStartDate": "2022-12-01T00:00:00", "lastModEndDate": "2022-12-31T00:00:00", + "resultsPerPage": MAX_CVES_PER_PAGE, }, ) @@ -211,7 +228,7 @@ async def test_cves_last_modified_end_date(self): cve = await anext(it) - self.assertEqual(cve.id, "CVE-2") + self.assertEqual(cve.id, "CVE-2-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, @@ -234,7 +251,7 @@ async def test_cves_published_start_date(self, now_mock: MagicMock): it = aiter(self.api.cves(published_start_date=datetime(2022, 12, 1))) cve = await anext(it) - self.assertEqual(cve.id, "CVE-1") + self.assertEqual(cve.id, "CVE-1-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, @@ -242,6 +259,7 @@ async def test_cves_published_start_date(self, now_mock: MagicMock): "startIndex": 0, "pubStartDate": "2022-12-01T00:00:00", "pubEndDate": "2022-12-31T00:00:00", + "resultsPerPage": MAX_CVES_PER_PAGE, }, ) @@ -249,7 +267,7 @@ async def test_cves_published_start_date(self, now_mock: MagicMock): cve = await anext(it) - self.assertEqual(cve.id, "CVE-2") + self.assertEqual(cve.id, "CVE-2-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, @@ -275,7 +293,7 @@ async def test_cves_published_end_date(self): ) cve = await anext(it) - self.assertEqual(cve.id, "CVE-1") + self.assertEqual(cve.id, "CVE-1-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, @@ -283,6 +301,7 @@ async def test_cves_published_end_date(self): "startIndex": 0, "pubStartDate": "2022-12-01T00:00:00", "pubEndDate": "2022-12-31T00:00:00", + "resultsPerPage": MAX_CVES_PER_PAGE, }, ) @@ -290,7 +309,7 @@ async def test_cves_published_end_date(self): cve = await anext(it) - self.assertEqual(cve.id, "CVE-2") + self.assertEqual(cve.id, "CVE-2-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, @@ -311,18 +330,22 @@ async def test_cves_cpe_name(self): it = aiter(self.api.cves(cpe_name="foo-bar")) cve = await anext(it) - self.assertEqual(cve.id, "CVE-1") + self.assertEqual(cve.id, "CVE-1-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, - params={"startIndex": 0, "cpeName": "foo-bar"}, + params={ + "startIndex": 0, + "cpeName": "foo-bar", + "resultsPerPage": MAX_CVES_PER_PAGE, + }, ) self.http_client.get.reset_mock() cve = await anext(it) - self.assertEqual(cve.id, "CVE-2") + self.assertEqual(cve.id, "CVE-2-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, @@ -342,18 +365,23 @@ async def test_cves_is_vulnerable(self): it = aiter(self.api.cves(cpe_name="foo-bar", is_vulnerable=True)) cve = await anext(it) - self.assertEqual(cve.id, "CVE-1") + self.assertEqual(cve.id, "CVE-1-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, - params={"startIndex": 0, "cpeName": "foo-bar", "isVulnerable": ""}, + params={ + "startIndex": 0, + "cpeName": "foo-bar", + "isVulnerable": "", + "resultsPerPage": MAX_CVES_PER_PAGE, + }, ) self.http_client.get.reset_mock() cve = await anext(it) - self.assertEqual(cve.id, "CVE-2") + self.assertEqual(cve.id, "CVE-2-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, @@ -374,13 +402,14 @@ async def test_cves_cvss_v2_vector(self): it = aiter(self.api.cves(cvss_v2_vector="AV:N/AC:M/Au:N/C:N/I:P/A:N")) cve = await anext(it) - self.assertEqual(cve.id, "CVE-1") + self.assertEqual(cve.id, "CVE-1-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, params={ "startIndex": 0, "cvssV2Metrics": "AV:N/AC:M/Au:N/C:N/I:P/A:N", + "resultsPerPage": MAX_CVES_PER_PAGE, }, ) @@ -388,7 +417,7 @@ async def test_cves_cvss_v2_vector(self): cve = await anext(it) - self.assertEqual(cve.id, "CVE-2") + self.assertEqual(cve.id, "CVE-2-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, @@ -412,13 +441,14 @@ async def test_cves_cvss_v3_vector(self): ) cve = await anext(it) - self.assertEqual(cve.id, "CVE-1") + self.assertEqual(cve.id, "CVE-1-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, params={ "startIndex": 0, "cvssV3Metrics": "CVSS:3.1/AV:N/AC:L/PR:H/UI:N/S:U/C:H/I:N/A:N", + "resultsPerPage": MAX_CVES_PER_PAGE, }, ) @@ -426,7 +456,7 @@ async def test_cves_cvss_v3_vector(self): cve = await anext(it) - self.assertEqual(cve.id, "CVE-2") + self.assertEqual(cve.id, "CVE-2-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, @@ -446,13 +476,14 @@ async def test_cves_cvss_v2_severity(self): it = aiter(self.api.cves(cvss_v2_severity=cvss_v2.Severity.HIGH)) cve = await anext(it) - self.assertEqual(cve.id, "CVE-1") + self.assertEqual(cve.id, "CVE-1-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, params={ "startIndex": 0, "cvssV2Severity": "HIGH", + "resultsPerPage": MAX_CVES_PER_PAGE, }, ) @@ -460,7 +491,7 @@ async def test_cves_cvss_v2_severity(self): cve = await anext(it) - self.assertEqual(cve.id, "CVE-2") + self.assertEqual(cve.id, "CVE-2-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, @@ -480,13 +511,14 @@ async def test_cves_cvss_v3_severity(self): it = aiter(self.api.cves(cvss_v3_severity=cvss_v3.Severity.HIGH)) cve = await anext(it) - self.assertEqual(cve.id, "CVE-1") + self.assertEqual(cve.id, "CVE-1-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, params={ "startIndex": 0, "cvssV3Severity": "HIGH", + "resultsPerPage": MAX_CVES_PER_PAGE, }, ) @@ -494,7 +526,7 @@ async def test_cves_cvss_v3_severity(self): cve = await anext(it) - self.assertEqual(cve.id, "CVE-2") + self.assertEqual(cve.id, "CVE-2-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, @@ -514,7 +546,7 @@ async def test_cves_keywords(self): it = aiter(self.api.cves(keywords=["Mac OS X", "kernel"])) cve = await anext(it) - self.assertEqual(cve.id, "CVE-1") + self.assertEqual(cve.id, "CVE-1-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, @@ -522,6 +554,7 @@ async def test_cves_keywords(self): "startIndex": 0, "keywordSearch": "Mac OS X kernel", "keywordExactMatch": "", + "resultsPerPage": MAX_CVES_PER_PAGE, }, ) @@ -529,7 +562,7 @@ async def test_cves_keywords(self): cve = await anext(it) - self.assertEqual(cve.id, "CVE-2") + self.assertEqual(cve.id, "CVE-2-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, @@ -550,13 +583,14 @@ async def test_cves_keyword(self): it = aiter(self.api.cves(keywords="Windows")) cve = await anext(it) - self.assertEqual(cve.id, "CVE-1") + self.assertEqual(cve.id, "CVE-1-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, params={ "startIndex": 0, "keywordSearch": "Windows", + "resultsPerPage": MAX_CVES_PER_PAGE, }, ) @@ -564,7 +598,7 @@ async def test_cves_keyword(self): cve = await anext(it) - self.assertEqual(cve.id, "CVE-2") + self.assertEqual(cve.id, "CVE-2-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, @@ -584,18 +618,22 @@ async def test_cves_cwe(self): it = aiter(self.api.cves(cwe_id="CWE-1")) cve = await anext(it) - self.assertEqual(cve.id, "CVE-1") + self.assertEqual(cve.id, "CVE-1-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, - params={"startIndex": 0, "cweId": "CWE-1"}, + params={ + "startIndex": 0, + "cweId": "CWE-1", + "resultsPerPage": MAX_CVES_PER_PAGE, + }, ) self.http_client.get.reset_mock() cve = await anext(it) - self.assertEqual(cve.id, "CVE-2") + self.assertEqual(cve.id, "CVE-2-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, @@ -611,13 +649,14 @@ async def test_cves_source_identifier(self): it = aiter(self.api.cves(source_identifier="nvd@nist.gov")) cve = await anext(it) - self.assertEqual(cve.id, "CVE-1") + self.assertEqual(cve.id, "CVE-1-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, params={ "startIndex": 0, "sourceIdentifier": "nvd@nist.gov", + "resultsPerPage": MAX_CVES_PER_PAGE, }, ) @@ -625,7 +664,7 @@ async def test_cves_source_identifier(self): cve = await anext(it) - self.assertEqual(cve.id, "CVE-2") + self.assertEqual(cve.id, "CVE-2-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, @@ -645,13 +684,14 @@ async def test_cves_virtual_match_string(self): it = aiter(self.api.cves(virtual_match_string="foo-bar")) cve = await anext(it) - self.assertEqual(cve.id, "CVE-1") + self.assertEqual(cve.id, "CVE-1-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, params={ "startIndex": 0, "virtualMatchString": "foo-bar", + "resultsPerPage": MAX_CVES_PER_PAGE, }, ) @@ -659,7 +699,7 @@ async def test_cves_virtual_match_string(self): cve = await anext(it) - self.assertEqual(cve.id, "CVE-2") + self.assertEqual(cve.id, "CVE-2-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, @@ -679,18 +719,22 @@ async def test_cves_has_cert_alerts(self): it = aiter(self.api.cves(has_cert_alerts=True)) cve = await anext(it) - self.assertEqual(cve.id, "CVE-1") + self.assertEqual(cve.id, "CVE-1-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, - params={"startIndex": 0, "hasCertAlerts": ""}, + params={ + "startIndex": 0, + "hasCertAlerts": "", + "resultsPerPage": MAX_CVES_PER_PAGE, + }, ) self.http_client.get.reset_mock() cve = await anext(it) - self.assertEqual(cve.id, "CVE-2") + self.assertEqual(cve.id, "CVE-2-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, @@ -710,18 +754,22 @@ async def test_cves_has_cert_notes(self): it = aiter(self.api.cves(has_cert_notes=True)) cve = await anext(it) - self.assertEqual(cve.id, "CVE-1") + self.assertEqual(cve.id, "CVE-1-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, - params={"startIndex": 0, "hasCertNotes": ""}, + params={ + "startIndex": 0, + "hasCertNotes": "", + "resultsPerPage": MAX_CVES_PER_PAGE, + }, ) self.http_client.get.reset_mock() cve = await anext(it) - self.assertEqual(cve.id, "CVE-2") + self.assertEqual(cve.id, "CVE-2-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, @@ -741,18 +789,22 @@ async def test_cves_has_kev(self): it = aiter(self.api.cves(has_kev=True)) cve = await anext(it) - self.assertEqual(cve.id, "CVE-1") + self.assertEqual(cve.id, "CVE-1-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, - params={"startIndex": 0, "hasKev": ""}, + params={ + "startIndex": 0, + "hasKev": "", + "resultsPerPage": MAX_CVES_PER_PAGE, + }, ) self.http_client.get.reset_mock() cve = await anext(it) - self.assertEqual(cve.id, "CVE-2") + self.assertEqual(cve.id, "CVE-2-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, @@ -772,18 +824,22 @@ async def test_cves_has_oval(self): it = aiter(self.api.cves(has_oval=True)) cve = await anext(it) - self.assertEqual(cve.id, "CVE-1") + self.assertEqual(cve.id, "CVE-1-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, - params={"startIndex": 0, "hasOval": ""}, + params={ + "startIndex": 0, + "hasOval": "", + "resultsPerPage": MAX_CVES_PER_PAGE, + }, ) self.http_client.get.reset_mock() cve = await anext(it) - self.assertEqual(cve.id, "CVE-2") + self.assertEqual(cve.id, "CVE-2-1") self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cves/2.0", headers={"apiKey": "token"}, @@ -797,6 +853,51 @@ async def test_cves_has_oval(self): with self.assertRaises(StopAsyncIteration): cve = await anext(it) + async def test_cves_request_results(self): + self.http_client.get.side_effect = create_cves_responses( + results_per_response=2 + ) + + it = aiter(self.api.cves(request_results=10)) + cve = await anext(it) + + self.assertEqual(cve.id, "CVE-1-1") + self.http_client.get.assert_awaited_once_with( + "https://services.nvd.nist.gov/rest/json/cves/2.0", + headers={"apiKey": "token"}, + params={ + "startIndex": 0, + "resultsPerPage": 10, + }, + ) + + self.http_client.get.reset_mock() + cve = await anext(it) + self.assertEqual(cve.id, "CVE-1-2") + self.http_client.get.assert_not_called() + + self.http_client.get.reset_mock() + + cve = await anext(it) + + self.assertEqual(cve.id, "CVE-2-1") + self.http_client.get.assert_awaited_once_with( + "https://services.nvd.nist.gov/rest/json/cves/2.0", + headers={"apiKey": "token"}, + params={ + "startIndex": 2, + "resultsPerPage": 2, + }, + ) + + self.http_client.get.reset_mock() + cve = await anext(it) + self.assertEqual(cve.id, "CVE-2-2") + self.http_client.get.assert_not_called() + + with self.assertRaises(Exception): + cve = await anext(it) + async def test_context_manager(self): async with self.api: pass