From 31d6daaa0857d84fa9aec052424eb2234380a4b2 Mon Sep 17 00:00:00 2001 From: Aidan Holland Date: Thu, 17 Oct 2024 14:24:16 -0400 Subject: [PATCH] chore(search): Added Cert Observation Endpoint --- censys/search/v2/certs.py | 36 ++++++++++++++++++++++ tests/search/v2/test_certs.py | 56 +++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+) diff --git a/censys/search/v2/certs.py b/censys/search/v2/certs.py index a9fe66cb..1b6eceab 100644 --- a/censys/search/v2/certs.py +++ b/censys/search/v2/certs.py @@ -1,6 +1,10 @@ """Interact with the Censys Search Cert API.""" + +import warnings from typing import List, Optional, Union +from ...common.types import Datetime +from ...common.utils import format_rfc3339 from .api import CensysSearchAPIv2 @@ -306,6 +310,11 @@ def get_hosts_by_cert(self, fingerprint: str, cursor: Optional[str] = None) -> d Returns: dict: A list of hosts which contain services presenting this certificate. """ + warnings.warn( + "This API endpoint is deprecated and scheduled for removal during a future release. Users should migrate to using the search endpoint on the Host index using the 'services.certificate: {fingerprint}' query to find any hosts currently presenting a certificate.", + category=DeprecationWarning, + stacklevel=2, + ) args = {"cursor": cursor} return self._get(self.view_path + fingerprint + "/hosts", args)["result"] @@ -319,3 +328,30 @@ def list_certs_with_tag(self, tag_id: str) -> List[dict]: List[dict]: A list of certs which are tagged with the specified tag. """ return self._list_documents_with_tag(tag_id, "certificates", "certs") + + def get_observations( + self, + fingerprint: str, + per_page: int = 50, + start_time: Optional[Datetime] = None, + end_time: Optional[Datetime] = None, + cursor: Optional[str] = None, + ) -> dict: + """Returns a list of observations for the specified certificate. + + Args: + fingerprint (str): The SHA-256 fingerprint of the requested certificate. + per_page (int): The number of results to return per page. Defaults to 50. + start_time (str): The start time of the observations to return. + end_time (str): The end time of the observations to return. + cursor (str): Cursor token from the API response, which fetches the next page of observations when added to the endpoint URL. + + Returns: + dict: A list of observations for the specified certificate. + """ + args = {"per_page": per_page, "cursor": cursor} + if start_time: + args["start_time"] = format_rfc3339(start_time) + if end_time: + args["end_time"] = format_rfc3339(end_time) + return self._get(self.view_path + fingerprint + "/observations", args)["result"] diff --git a/tests/search/v2/test_certs.py b/tests/search/v2/test_certs.py index 72427ad2..932256d0 100644 --- a/tests/search/v2/test_certs.py +++ b/tests/search/v2/test_certs.py @@ -1,3 +1,4 @@ +from datetime import datetime from typing import Any, Dict, List, Optional import responses @@ -178,6 +179,27 @@ "links": {"next": "nextCursorToken"}, }, } +OBSERVATIONS_CERT_JSON = { + "code": 200, + "status": "OK", + "result": { + "fingerprint": "string", + "observations": [ + { + "ip": "string", + "name": "string", + "port": 0, + "service_name": "string", + "transport_protocol": "TCP", + "first_observed_at": "2024-10-17T18:13:23.554Z", + "last_observed_at": "2024-10-17T18:13:23.554Z", + "first_updated_at": "2024-10-17T18:13:23.554Z", + "last_updated_at": "2024-10-17T18:13:23.554Z", + } + ], + "links": {"next": "nextCursorToken"}, + }, +} class TestCerts(CensysTestCase): @@ -385,3 +407,37 @@ def test_get_hosts_by_cert_with_cursor(self): ) results = self.api.get_hosts_by_cert(TEST_CERT, cursor="nextCursorToken") assert results == VIEW_HOSTS_BY_CERT_JSON["result"] + + def test_get_observations_by_cert(self): + self.responses.add( + responses.GET, + f"{V2_URL}/certificates/{TEST_CERT}/observations", + status=200, + json=OBSERVATIONS_CERT_JSON, + ) + result = self.api.get_observations(TEST_CERT) + assert result == OBSERVATIONS_CERT_JSON["result"] + + def test_get_observations_by_cert_with_cursor(self): + self.responses.add( + responses.GET, + f"{V2_URL}/certificates/{TEST_CERT}/observations?per_page=50&cursor=nextCursorToken", + status=200, + json=OBSERVATIONS_CERT_JSON, + ) + results = self.api.get_observations(TEST_CERT, cursor="nextCursorToken") + assert results == OBSERVATIONS_CERT_JSON["result"] + + def test_get_observations_with_rfc3339_timestampts(self): + self.responses.add( + responses.GET, + f"{V2_URL}/certificates/{TEST_CERT}/observations?per_page=50&start_time=2024-10-14T18%3A13%3A23.554000Z&end_time=2024-10-17T18%3A13%3A23.554000Z", + status=200, + json=OBSERVATIONS_CERT_JSON, + ) + result = self.api.get_observations( + TEST_CERT, + start_time=datetime(2024, 10, 14, 18, 13, 23, 554000), + end_time=datetime(2024, 10, 17, 18, 13, 23, 554000), + ) + assert result == OBSERVATIONS_CERT_JSON["result"]