Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(search): Added Cert Observation Endpoint #655

Merged
merged 1 commit into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions censys/search/v2/certs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
"""Interact with the Censys Search Cert API."""

import warnings
from typing import List, Optional, Union

from ...common.types import Datetime
from ...common.utils import format_rfc3339
from .api import CensysSearchAPIv2


Expand Down Expand Up @@ -306,6 +310,11 @@ def get_hosts_by_cert(self, fingerprint: str, cursor: Optional[str] = None) -> d
Returns:
dict: A list of hosts which contain services presenting this certificate.
"""
warnings.warn(
"This API endpoint is deprecated and scheduled for removal during a future release. Users should migrate to using the search endpoint on the Host index using the 'services.certificate: {fingerprint}' query to find any hosts currently presenting a certificate.",
category=DeprecationWarning,
stacklevel=2,
)
args = {"cursor": cursor}
return self._get(self.view_path + fingerprint + "/hosts", args)["result"]

Expand All @@ -319,3 +328,30 @@ def list_certs_with_tag(self, tag_id: str) -> List[dict]:
List[dict]: A list of certs which are tagged with the specified tag.
"""
return self._list_documents_with_tag(tag_id, "certificates", "certs")

def get_observations(
self,
fingerprint: str,
per_page: int = 50,
start_time: Optional[Datetime] = None,
end_time: Optional[Datetime] = None,
cursor: Optional[str] = None,
) -> dict:
"""Returns a list of observations for the specified certificate.

Args:
fingerprint (str): The SHA-256 fingerprint of the requested certificate.
per_page (int): The number of results to return per page. Defaults to 50.
start_time (str): The start time of the observations to return.
end_time (str): The end time of the observations to return.
cursor (str): Cursor token from the API response, which fetches the next page of observations when added to the endpoint URL.

Returns:
dict: A list of observations for the specified certificate.
"""
args = {"per_page": per_page, "cursor": cursor}
if start_time:
args["start_time"] = format_rfc3339(start_time)
if end_time:
args["end_time"] = format_rfc3339(end_time)
return self._get(self.view_path + fingerprint + "/observations", args)["result"]
56 changes: 56 additions & 0 deletions tests/search/v2/test_certs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
from typing import Any, Dict, List, Optional

import responses
Expand Down Expand Up @@ -178,6 +179,27 @@
"links": {"next": "nextCursorToken"},
},
}
OBSERVATIONS_CERT_JSON = {
"code": 200,
"status": "OK",
"result": {
"fingerprint": "string",
"observations": [
{
"ip": "string",
"name": "string",
"port": 0,
"service_name": "string",
"transport_protocol": "TCP",
"first_observed_at": "2024-10-17T18:13:23.554Z",
"last_observed_at": "2024-10-17T18:13:23.554Z",
"first_updated_at": "2024-10-17T18:13:23.554Z",
"last_updated_at": "2024-10-17T18:13:23.554Z",
}
],
"links": {"next": "nextCursorToken"},
},
}


class TestCerts(CensysTestCase):
Expand Down Expand Up @@ -385,3 +407,37 @@ def test_get_hosts_by_cert_with_cursor(self):
)
results = self.api.get_hosts_by_cert(TEST_CERT, cursor="nextCursorToken")
assert results == VIEW_HOSTS_BY_CERT_JSON["result"]

def test_get_observations_by_cert(self):
self.responses.add(
responses.GET,
f"{V2_URL}/certificates/{TEST_CERT}/observations",
status=200,
json=OBSERVATIONS_CERT_JSON,
)
result = self.api.get_observations(TEST_CERT)
assert result == OBSERVATIONS_CERT_JSON["result"]

def test_get_observations_by_cert_with_cursor(self):
self.responses.add(
responses.GET,
f"{V2_URL}/certificates/{TEST_CERT}/observations?per_page=50&cursor=nextCursorToken",
status=200,
json=OBSERVATIONS_CERT_JSON,
)
results = self.api.get_observations(TEST_CERT, cursor="nextCursorToken")
assert results == OBSERVATIONS_CERT_JSON["result"]

def test_get_observations_with_rfc3339_timestampts(self):
self.responses.add(
responses.GET,
f"{V2_URL}/certificates/{TEST_CERT}/observations?per_page=50&start_time=2024-10-14T18%3A13%3A23.554000Z&end_time=2024-10-17T18%3A13%3A23.554000Z",
status=200,
json=OBSERVATIONS_CERT_JSON,
)
result = self.api.get_observations(
TEST_CERT,
start_time=datetime(2024, 10, 14, 18, 13, 23, 554000),
end_time=datetime(2024, 10, 17, 18, 13, 23, 554000),
)
assert result == OBSERVATIONS_CERT_JSON["result"]