Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add observableType endpoints #264

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: CI
on: [push]
on: [push, pull_request]
jobs:
build:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -30,4 +30,4 @@ jobs:
pip-audit .
- name: Security check with bandit
run: |
bandit -r thehive4py/
bandit -r thehive4py/
8 changes: 8 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from thehive4py.types.comment import OutputComment
from thehive4py.types.custom_field import OutputCustomField
from thehive4py.types.observable import InputObservable, OutputObservable
from thehive4py.types.observable_type import OutputObservableType
from thehive4py.types.procedure import OutputProcedure
from thehive4py.types.profile import OutputProfile
from thehive4py.types.task import InputTask, OutputTask
Expand Down Expand Up @@ -248,3 +249,10 @@ def test_custom_field(thehive_admin: TheHiveApi) -> OutputCustomField:
"options": [],
}
)


@pytest.fixture
def test_observable_type(thehive_admin: TheHiveApi) -> OutputObservableType:
return thehive_admin.observable_type.create(
observable_type={"name": "my-observable-type"}
)
4 changes: 2 additions & 2 deletions tests/test_custom_field_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from thehive4py.types.custom_field import InputUpdateCustomField, OutputCustomField


class TestTaskLogEndpoint:
class TestCustomeFieldEndpoint:
def test_create_and_list(self, thehive_admin: TheHiveApi):
created_custom_field = thehive_admin.custom_field.create(
custom_field={
Expand All @@ -26,7 +26,7 @@ def test_delete(
thehive_admin.custom_field.delete(custom_field_id=test_custom_field["_id"])

with pytest.raises(TheHiveError):
thehive_admin.task_log.get(task_log_id=test_custom_field["_id"])
thehive_admin.custom_field.delete(custom_field_id=test_custom_field["_id"])

def test_update(
self, thehive_admin: TheHiveApi, test_custom_field: OutputCustomField
Expand Down
2 changes: 1 addition & 1 deletion tests/test_observable_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
import pytest
from thehive4py import TheHiveApi
from thehive4py.errors import TheHiveError
from thehive4py.query.sort import Asc
from thehive4py.types.alert import OutputAlert
from thehive4py.types.case import OutputCase
from thehive4py.query.sort import Asc
from thehive4py.types.observable import (
InputBulkUpdateObservable,
InputUpdateObservable,
Expand Down
36 changes: 36 additions & 0 deletions tests/test_observable_type_endpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import pytest
from thehive4py.client import TheHiveApi
from thehive4py.errors import TheHiveError
from thehive4py.query.filters import Eq
from thehive4py.types.observable_type import OutputObservableType


class TestObservableTypeEndpoint:
def test_create_and_get(self, thehive_admin: TheHiveApi):
created_observable_type = thehive_admin.observable_type.create(
{
"name": "new-observable-type",
}
)

test_observable_type = thehive_admin.observable_type.get(
created_observable_type["_id"]
)
assert created_observable_type == test_observable_type

def test_find(
self, thehive: TheHiveApi, test_observable_type: OutputObservableType
):
found_observable_types = thehive.observable_type.find(
filters=Eq("name", test_observable_type["name"])
)

assert found_observable_types == [test_observable_type]

def test_delete(
self, thehive: TheHiveApi, test_observable_type: OutputObservableType
):
observable_type_id = test_observable_type["_id"]
thehive.observable_type.delete(observable_type_id)
with pytest.raises(TheHiveError):
thehive.observable_type.get(observable_type_id)
7 changes: 7 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ def reinit_hive_container(client: TheHiveApi) -> None:
profiles = client.profile.find(
filters=~Eq("name", "analyst") & ~Eq("name", "read-only")
)
observable_types = client.observable_type.find(
filters=~Eq("_createdBy", "[email protected]")
)
custom_fields = client.custom_field.list()
with ThreadPoolExecutor() as executor:
executor.map(client.alert.delete, [alert["_id"] for alert in alerts])
Expand All @@ -119,5 +122,9 @@ def reinit_hive_container(client: TheHiveApi) -> None:
client.custom_field.delete,
[custom_field["_id"] for custom_field in custom_fields],
)
executor.map(
client.observable_type.delete,
[observable_type["_id"] for observable_type in observable_types],
)

client.session_organisation = original_session_organisation
2 changes: 2 additions & 0 deletions thehive4py/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
)
from thehive4py.endpoints.cortex import CortexEndpoint
from thehive4py.endpoints.custom_field import CustomFieldEndpoint
from thehive4py.endpoints.observable_type import ObservableTypeEndpoint
from thehive4py.session import TheHiveSession


Expand Down Expand Up @@ -54,6 +55,7 @@ def __init__(

# entity endpoints
self.custom_field = CustomFieldEndpoint(self.session)
self.observable_type = ObservableTypeEndpoint(self.session)

# connector endpoints
self.cortex = CortexEndpoint(self.session)
Expand Down
46 changes: 46 additions & 0 deletions thehive4py/endpoints/observable_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from typing import List, Optional

from thehive4py.endpoints._base import EndpointBase
from thehive4py.query import QueryExpr
from thehive4py.types.observable_type import (
InputObservableType,
OutputObservableType,
)
from thehive4py.query.filters import FilterExpr
from thehive4py.query.page import Paginate
from thehive4py.query.sort import SortExpr


class ObservableTypeEndpoint(EndpointBase):
def create(self, observable_type: InputObservableType) -> OutputObservableType:
return self._session.make_request(
"POST", path="/api/v1/observable/type", json=observable_type
)

def get(self, observable_type_id: str) -> OutputObservableType:
return self._session.make_request(
"GET", path=f"/api/v1/observable/type/{observable_type_id}"
)

def delete(self, observable_type_id: str) -> None:
return self._session.make_request(
"DELETE", path=f"/api/v1/observable/type/{observable_type_id}"
)

def find(
self,
filters: Optional[FilterExpr] = None,
sortby: Optional[SortExpr] = None,
paginate: Optional[Paginate] = None,
) -> List[OutputObservableType]:
query: QueryExpr = [
{"_name": "listObservableType"},
*self._build_subquery(filters=filters, sortby=sortby, paginate=paginate),
]

return self._session.make_request(
"POST",
path="/api/v1/query",
params={"name": "observableTypes"},
json={"query": query},
)
23 changes: 23 additions & 0 deletions thehive4py/types/observable_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from typing import TypedDict


class InputObservableTypeRequired(TypedDict):
name: str


class InputObservableType(InputObservableTypeRequired, total=False):
isAttachment: bool


class OutputObservableTypeRequired(TypedDict):
_id: str
_type: str
_createdBy: str
_createdAt: int
name: str
isAttachment: bool


class OutputObservableType(OutputObservableTypeRequired, total=False):
_updatedBy: str
_updatedAt: int